Rabbit RPC 代码阅读(一)

前言Surging是一款优秀的微服务框架,以前在使用Surging及RPC的时候,往往一知半解,知其然不知其所以然,最近终于鼓起勇气将气源码进行了详解阅读,其阅读笔记如下,以作记录。RPC介绍Surging的RPC原理很简单,可以用3点概括:1、服务端启动并且向注册中心发送服务信息,注册中心收到后会定时监控服务状态(常见心跳检测)。2、客户端需要开始调用服务的时候,首先去注册中心获取服务信息。3、...

前言

Surging是一款优秀的微服务框架,以前在使用Surging及RPC的时候,往往一知半解,知其然不知其所以然,最近终于鼓起勇气将气源码进行了详解阅读,其阅读笔记如下,以作记录。

RPC介绍

Surging的RPC原理很简单,可以用3点概括:

1、服务端启动并且向注册中心发送服务信息,注册中心收到后会定时监控服务状态(常见心跳检测)。

2、客户端需要开始调用服务的时候,首先去注册中心获取服务信息。

3、客户端创建远程调用连接,连接后服务端返回处理信息。

第3步又可以细分,下面说说远程过程调用的原理:

1、目标:客户端怎么调用远程机器上的公开方法

2、服务发现,向注册中心获取服务(这里需要做的有很多:拿到多个服务时需要做负载均衡,同机房过滤、版本过滤、服务路由过滤、统一网关等);

3、客户端发起调用,将需要调用的服务、方法、参数进行组装;
序列化编码组装的消息,这里可以使用json,也可以使用xml,也可以使用protobuf,也可以使用hessian,几种方案的序列化速度还有序列化后占用字节大小都是选择的重要指标,对内笔者建议使用高效的protobuf,它基于TCP/IP二进制进行序列化,体积小,速度快。

4、传输协议,可以使用传统的IO阻塞传输,也可以使用高效的nio传输(Netty);

5、服务端收到后进行反序列化,然后进行相应的处理;

6、服务端序列化response信息并且返回;

7、客户端收到response信息并且反序列化;

来源:https://www.cnblogs.com/SteveLee/p/rpc_framework_easy.html

下面将针对以上7个步骤所涉及的代码进行遂一解析,过程是痛苦的,结果是让人欣喜的,.Net Core 原来如此强大。

一、客户端怎么调用远程机器上的公开方法

废话不多说,直接开始解析代码

using doteasy.client.Clients;namespace doteasy.client{ internal static class Program {  private static void Main() => RpcClient.TestNoToken(); }}public static void TestNoToken(){ using (var proxy = ClientProxy.Generate<IProxyService>(new Uri(“http://127.0.0.1:8500“))) {  Console.WriteLine($@“{proxy.Sync(1)}“);  Console.WriteLine($@“{proxy.Async(1).Result}“);  Console.WriteLine($@“{proxy.GetDictionaryAsync().Result[“key“]}“); } using (var proxy = ClientProxy.Generate<IProxyCommpoundService>(new Uri(“http://127.0.0.1:8500“))) {  Console.WriteLine($@“{JsonConvert.SerializeObject(proxy.GetCurrentObject(new CompoundObject()))}“); }}

核心代码:

using (var proxy = ClientProxy.Generate(new Uri(“http://127.0.0.1:8500“)))

其作用为生成一个实现了IProxyService接口的代理类,接着调用远程服务器的Sync,Async,GetDictionaryAsync三个方法,IProxyService的定义如下:

public interface IProxyService : IDisposable{ Task<IDictionary<string, string>> GetDictionaryAsync(); Task<string> Async(int id); string Sync(int id);}

继续查看ClientProxy.Generate的定义:

public static T Generate<T>(Uri consulUrl){ serviceCollection.AddLogging().AddClient().UseDotNettyTransport().UseConsulRouteManager(new RpcOptionsConfiguration {  ConsulClientConfiguration = new ConsulClientConfiguration {Address = consulUrl} }); //返回一个预编译的代理对象 return Proxy<T>();}

首先注入相应的接口及需要使用的中间件,接着返回一个预编译的代理对象,我们继续往下,查看Proxy的定义:

private static T Proxy<T>(string accessToken = ““){ var serviceProvider = Builder();#if DEBUG serviceProvider.GetRequiredService<ILoggerFactory>().AddConsole((c, l) => (int) l >= Loglevel);#endif var serviceProxyGenerate = serviceProvider.GetRequiredService<IServiceProxyGenerater>(); var serviceProxyFactory = serviceProvider.GetRequiredService<IServiceProxyFactory>(); if (accessToken == ““) {    return serviceProxyFactory.CreateProxy<T>(serviceProxyGenerate.GenerateProxys(new[] {typeof(T)}) .ToArray() .Single(typeof(T).GetTypeInfo().IsAssignableFrom)); } return serviceProxyFactory  .CreateProxy<T>(serviceProxyGenerate.GenerateProxys(new[] {typeof(T)}, accessToken).ToArray().Single(typeof(T).GetTypeInfo().IsAssignableFrom));}

创建一个代理对象,构造语法树及动态编译,最后返回一个指定的代理对象类。

看来核心是构造语法树及动态编译,返回代理对象类,仔细分析,核心代码为:

serviceProxyFactory.CreateProxy<T>(serviceProxyGenerate.GenerateProxys(new[] {typeof(T)}) .ToArray() .Single(typeof(T).GetTypeInfo().IsAssignableFrom));

我们首先查看.CreateProxy的实现代码

public static T CreateProxy<T>(this IServiceProxyFactory serviceProxyFactory, Type proxyType){ return (T) serviceProxyFactory.CreateProxy(proxyType);}public object CreateProxy(Type proxyType){ return proxyType  .GetTypeInfo()  .GetConstructors()  .First()  .Invoke(//ServiceProxyBase类 构造函数传参new object[] {_remoteInvokeService, _typeConvertibleService}  );}

没啥特别的,返回一个代理对象类,但问题来了,这个代理对象类是怎么生成的呢,原来核心代码在这段:
serviceProxyGenerate.GenerateProxys(new[] {typeof(T)})

查看其详细代码:

public IEnumerable<Type> GenerateProxys(IEnumerable<Type> interfaceTypes){ //获取程序集 var assembles = DependencyContext.Default.RuntimeLibraries  .SelectMany(i => i.GetDefaultAssemblyNames(DependencyContext.Default).Select(z => Assembly.Load(new AssemblyName(z.Name)))); assembles = assembles.Where(i => i.IsDynamic == false).ToArray(); var enumerable = interfaceTypes as Type[] ?? interfaceTypes.ToArray();  //构造语法树 var trees = enumerable.Select(GenerateProxyTree).ToList(); //编译语法树 var stream = CompilationUnits.CompileClientProxy(trees,  assembles.Select(a => MetadataReference.CreateFromFile(a.Location)).Concat(new[]{ MetadataReference.CreateFromFile(typeof(Task).GetTypeInfo().Assembly.Location)}),  enumerable.ToArray()[0],  _logger); if (stream == null) {  throw new ArgumentException(@“没有生成任何客户端代码“, nameof(stream)); } using (stream) {  var className = enumerable.ToArray()[0].Name.StartsWith(“I“)? enumerable.ToArray()[0].Name.Substring(1): enumerable.ToArray()[0].Name;  return AppDomain.CurrentDomain.GetAssemblies().Any(x => x.FullName.Contains(className))? Assembly.Load(StreamToBytes(stream)).GetExportedTypes(): AssemblyLoadContext.Default.LoadFromStream(stream).GetExportedTypes(); }}

核心代码为:

var trees = enumerable.Select(GenerateProxyTree).ToList();var stream = CompilationUnits.CompileClientProxy(trees,assembles .Select(a => MetadataReference.CreateFromFile(a.Location)) .Concat(new[] {  MetadataReference.CreateFromFile(typeof(Task).GetTypeInfo().Assembly.Location) }),enumerable.ToArray()[0],_logger);

首先获取代码对象的语法树,再进行动态编译。
这里有个问题就比较突出了,这个GenerateProxyTree到底是啥?下面那句比较好理解,对trees对象进行编译并返回一个序列化的字节码。

查看GenerateProxyTree的代码

public SyntaxTree GenerateProxyTree(Type interfaceType){ var className = interfaceType.Name.StartsWith(“I“) ? interfaceType.Name.Substring(1) : interfaceType.Name; className  = “ClientProxy“; var members = new List<MemberDeclarationSyntax> {  GetConstructorDeclaration(className) }; var interf = interfaceType.GetInterfaces()[0]; var mthods = interfaceType.GetMethods(); if (interf.FullName != null && interf.FullName.Contains(“IDisposable“)) {  var m = interf.GetMethods()[0];  var mm = mthods.ToList();  mm.Add(m);  mthods = mm.ToArray(); } members.AddRange(GenerateMethodDeclarations(mthods)); return SyntaxFactory.CompilationUnit().WithUsings(GetUsings()).WithMembers(  SyntaxFactory.SingletonList<MemberDeclarationSyntax>(SyntaxFactory.NamespaceDeclaration( SyntaxFactory.QualifiedName(  SyntaxFactory.QualifiedName(SyntaxFactory.IdentifierName(“Rpc“),SyntaxFactory.IdentifierName(“Common“)),  SyntaxFactory.IdentifierName(“ClientProxys“))).WithMembers( SyntaxFactory.SingletonList<MemberDeclarationSyntax>(  SyntaxFactory.ClassDeclaration(className).WithModifiers(SyntaxFactory.TokenList(SyntaxFactory.Token(SyntaxKind.PublicKeyword))).WithBaseList(SyntaxFactory.BaseList(  
源文地址:https://www.guoxiongfei.cn/cntech/13360.html
0