原创 使用netty实现一个简易的rpc服务器

发布时间:2021-08-03 07:17:09 浏览 92 来源:猿笔记 作者:??LZ

    利用netty来实现一个简易的RPC服务器,1标记这个注解的类表示可以被扫描,2用来表示目标类的实体类,使用了lombok来简化实体类的代码数量,}首先是根据包名来扫描类4遍历扫描的类list获取Provider注解并且获取接口信息放入map5暴露两个方法根据接口名称获取和判断是否存在接受一个InvokeProtocol类型的msgRegisterMap获取实现类信息根据方法名和参数类型获取方法后invoke进行返回7这一段就是标准的netty使用案例最主要的是initChannel里面添加的handler这里添加了帧解码器和对象解码器


    #主题列表:juejin,github,smartblue,cyanosis,channing-cyan,fancy,hydrogen,condensed-night-purple,greenwillow,v-green,vue-pro,healer-readable,mk-cute,jzman,geek-black,awesome-green,qklhk-chocolate

    #投稿主题:

    theme:juejin

    highlight:

    ##前言

    看了dubbo和netty的源代码,决定练习一下,用netty实现一个简单的RPC服务器。

    ##正文

    java/***资源提供者*/@Documented@Retention(RetentionPolicy.RUNTIME)@Target(ElementType.TYPE)public@interfaceProvider{}可以扫描标注此标注的类表示

    java@DatapublicclassInvokeProtocolimplementsSerializable{privatestaticfinallongserialVersionUID=109558080020766645L;privateStringclassName;privateStringmethodName;privateClass<>[]parameTypes;privateObject[]values;}

    用于表示目标类的实体类使用lombok来简化实体类的代码号

    javapublicclassRegisterMap{privatestaticfinalConcurrentHashMapregisterMap=newConcurrentHashMap<>();privatestaticfinalListCLASS_LISTS=newArrayList<>();

    创建一个类RegisterMap,来存放被扫描的类的map

    privatestaticvoidscannerClass(StringpackageName){

    URLurl=RegisterMap.class.getClassLoader().getResource(

    packageName.replaceAll("\\\\.","/"));

    if(url!=null){

    Filedir=newFile(url.getFile());

    for(Filefile:Objects.requireNonNull(dir.listFiles())){

    if(file.isDirectory()){

    scannerClass(packageName+"."+file.getName());

    }else{

    CLASS_LISTS.add(packageName+"."+

    file.getName().replace(".class","").trim());

    }

    }

    }

    }首先根据包名扫描类。如果是目录,继续递归。如果是类,请删除的后缀。类并加入列表

    javaprivatestaticvoiddoRegister(){for(StringclassName:CLASS_LISTS){try{Class<>clazz=Class.forName(className);if(clazz.isAnnotationPresent(Provider.class)){Class<>i=clazz.getInterfaces()[0];registerMap.put(i.getName(),clazz.newInstance());}}catch(ClassNotFoundException|IllegalAccessException|InstantiationExceptione){e.printStackTrace();}}}遍历扫描的类列表,获取Provider标注,获取接口信息,放入地图

    javapublicstaticObjectgetClass(StringclazzName){returnregisterMap.get(clazzName);}publicstaticbooleancontainsClass(StringclazzName){returnregisterMap.containsKey(clazzName);}

    暴露两种方法获取,根据接口名称判断是否存在

    javapublicclassRegisterHandlerextendsChannelInboundHandlerAdapter{@OverridepublicvoidexceptionCaught(ChannelHandlerContextctx,Throwablecause)throwsException{cause.printStackTrace();}@OverridepublicvoidchannelRead(ChannelHandlerContextctx,Objectmsg)throwsException{Objectresult=newObject();InvokeProtocolrequest=(InvokeProtocol)msg;if(RegisterMap.containsClass(request.getClassName())){Objectclazz=RegisterMap.getClass(request.getClassName());Methodmethod=clazz.getClass().getMethod(request.getMethodName(),request.getParameTypes());result=method.invoke(clazz,request.getValues());}ctx.write(result);ctx.flush();ctx.close();}}RegisterHandler重写channelRead,接受一个InvokeProtocol类型msg,RegisterMap获取实现类信息,然后根据java底层方法、方法名、参数类型获取方法,然后调用返回

    javapublicstaticvoidmain(String[]args){newRegistry(8081).start();}privateintport;publicRegistry(intport){this.port=port;}publicvoidstart(){EventLoopGroupbossGroup=newNioEventLoopGroup();EventLoopGroupworkGroup=newNioEventLoopGroup();try{ServerBootstrapserver=newServerBootstrap();server.group(bossGroup,workGroup).channel(NioServerSocketChannel.class).childHandler(newChannelInitializer(){@OverrideprotectedvoidinitChannel(SocketChannelsocketChannel)throwsException{ChannelPipelinepipeline=socketChannel.pipeline();pipeline.addLast(newLengthFieldBasedFrameDecoder(Integer.MAX_VALUE,0,4,0,4));pipeline.addLast(newLengthFieldPrepender(4));pipeline.addLast("encoder",newObjectEncoder());pipeline.addLast("decoder",newObjectDecoder(Integer.MAX_VALUE,ClassResolvers.cacheDisabled(null)));pipeline.addLast(newRegisterHandler());}}).option(ChannelOption.SO_BACKLOG,128).childOption(ChannelOption.SO_KEEPALIVE,true);ChannelFuturefuture=server.bind(port).sync();future.channel().closeFuture().sync();}catch(Exceptione){e.printStackTrace();bossGroup.shutdownGracefully();workGroup.shutdownGracefully();}}

    这一段是标准的netty用例,最重要的是initChannel中添加的处理程序,这里添加了帧解码器和对象解码器,最后添加了RegisterHandler。当一个请求进来时,这个对象将被解码并交给channelRead

    * *最后一步是用代理包装接口* *

    javaprivatestaticclassMethodProxyimplementsInvocationHandler{privatefinalClass<>clazz;publicMethodProxy(Class<>clazz){this.clazz=clazz;}@OverridepublicObjectinvoke(Objectproxy,Methodmethod,Object[]args)throwsThrowable{if(Object.class.equals(method.getDeclaringClass())){returnmethod.invoke(this,args);}else{returnrpcInvoke(proxy,method,args);}}注意调用方法。这里核心的rpc功能只需要输入**rpcInvoke**

    javapublicObjectrpcInvoke(Objectproxy,Methodmethod,Object[]args){\t//将调用的方法以及参数信息创建InvokeProtocolmsg=newInvokeProtocol();msg.setClassName(this.clazz.getName());msg.setMethodName(method.getName());msg.setValues(args);msg.setParameTypes(method.getParameterTypes());finalProxyHandlerhandler=newProxyHandler();EventLoopGroupgroup=newNioEventLoopGroup();try{Bootstrapbootstrap=newBootstrap();bootstrap.group(group).channel(NioSocketChannel.class).option(ChannelOption.TCP_NODELAY,true).handler(newChannelInitializer(){@OverrideprotectedvoidinitChannel(SocketChannelsocketChannel)throwsException{ChannelPipelinepipeline=socketChannel.pipeline();pipeline.addLast("frameDecoder",newLengthFieldBasedFrameDecoder(Integer.MAX_VALUE,0,4,0,4));pipeline.addLast("frameEncoder",newLengthFieldPrepender(4));pipeline.addLast("encoder",newObjectEncoder());pipeline.addLast("decoder",newObjectDecoder(Integer.MAX_VALUE,ClassResolvers.cacheDisabled(null)));pipeline.addLast("handler",handler);}});ChannelFuturefuture=bootstrap.connect(hosts,port).sync();future.channel().writeAndFlush(msg).sync();future.channel().closeFuture().sync();}catch(Exceptione){e.printStackTrace();}finally{group.shutdownGracefully();}returnhandler.getResponse();}这是netty的客户端,它将信息打包成InvokeProtocol,发送给服务器,接受服务器返回的内容。ProxyHandler用于接受服务器返回的内容,实现非常简单。

    java@GetterpublicclassProxyHandlerextendsChannelInboundHandlerAdapter{privateObjectresponse;@OverridepublicvoidchannelRead(ChannelHandlerContextctx,Objectmsg)throwsException{this.response=msg;}@OverridepublicvoidexceptionCaught(ChannelHandlerContextctx,Throwablecause)throwsException{System.out.println("发生异常");}}

    java@SuppressWarnings("unchecked")publicstaticTcreate(Class<>clazz){MethodProxyproxy=newMethodProxy(clazz);Class<>[]interfaces=clazz.isInterface()newClass[]{clazz}:clazz.getInterfaces();return(T)Proxy.newProxyInstance(clazz.getClassLoader(),interfaces,proxy);}RPC代理的静态方法将一个接口包装为MethodProxy。MethodProxy的invoke方法会向服务器发送一个TCP请求,服务器会返回真实的inoke对象。

    **rpc功能已实现,测试* *

    javapublicinterfaceHelloService{Stringhello(Stringname);}@ProviderpublicclassHelloServiceImplimplementsHelloService{@OverridepublicStringhello(Stringname){return"Hello"+name;}}建立测试接口和实现类

    java

    publicstaticvoidmain(String[]args){

    HelloServicehelloService=RpcProxy.create(HelloService.class);

    System.out.println(helloService.hello("123"));

    }

作者信息

??LZ [等级:3] java开发工程师
发布了 22 篇专栏 · 获得点赞 18 · 获得阅读 2147

相关推荐 更多