基于Netty实现RPC框架
Dubbo 底层使用 Netty 作为网络通讯框架,要求使用 Netty 实现一个简单的 RPC 框架,消费者和提供者约定协议和接口,消费者远程调用提供者的服务。
1、创建一个接口,定义抽象方法,用于消费者和提供者之间的约定。
2、创建一个提供者,该类需要监听消费者的请求,并按照约定返回数据。
3、创建一个消费者,该类需要透明的调用自己不存在的方法,内部需要使用 Netty 进行数据通信。
4、提供者与消费者传输数据使用json字符串格式。
5、提供者使用 Netty 集成 Spring Boot 环境。
案例:客户端调用服务端,利用ID查询User对象的方法
需求分析
具体实现
需要分成三个子项目
1 2 3 4 5
   | . ├── custom-rpc-api ├── custom-rpc-consumer ├── custom-rpc-provider └── pom.xml
   | 
 
主项目
主项目的 pom.xml
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46
   | <?xml version="1.0" encoding="UTF-8"?> <project xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"          xmlns="http://maven.apache.org/POM/4.0.0"          xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">     <modelVersion>4.0.0</modelVersion>     <groupId>com.terewrgreen</groupId>     <artifactId>custom-rpc</artifactId>     <packaging>pom</packaging>     <version>1.0.0</version>
      <modules>         <module>custom-rpc-api</module>         <module>custom-rpc-provider</module>         <module>custom-rpc-consumer</module>     </modules>
      <parent>         <groupId>org.springframework.boot</groupId>         <artifactId>spring-boot-starter-parent</artifactId>         <version>2.6.3</version>     </parent>
      <properties>         <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>         <curator.version>4.3.0</curator.version>     </properties>
      <dependencies>                  <dependency>             <groupId>io.netty</groupId>             <artifactId>netty-all</artifactId>         </dependency>                  <dependency>             <groupId>com.alibaba</groupId>             <artifactId>fastjson</artifactId>             <version>1.2.79</version>         </dependency>                  <dependency>             <groupId>org.projectlombok</groupId>             <artifactId>lombok</artifactId>         </dependency>     </dependencies> </project>
   | 
 
custom-rpc-api
pom.xml
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35
   | <?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"          xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">     <parent>         <artifactId>custom-rpc</artifactId>         <groupId>com.terewrgreen</groupId>         <version>1.0.0</version>     </parent>     <modelVersion>4.0.0</modelVersion>
      <artifactId>custom-rpc-api</artifactId>
      <name>custom-rpc-api</name>     <url>http://www.terwergreen.com</url>
      <properties>         <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>         <maven.compiler.source>1.8</maven.compiler.source>         <maven.compiler.target>1.8</maven.compiler.target>     </properties>
      <dependencies>         <dependency>             <groupId>junit</groupId>             <artifactId>junit</artifactId>             <version>4.13.1</version>             <scope>test</scope>         </dependency>     </dependencies>
      <build>         <pluginManagement>         </pluginManagement>     </build> </project>
   | 
 
custom-rpc-consumer
pom.xml
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41
   | <?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"          xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">     <parent>         <artifactId>custom-rpc</artifactId>         <groupId>com.terewrgreen</groupId>         <version>1.0.0</version>     </parent>     <modelVersion>4.0.0</modelVersion>
      <artifactId>custom-rpc-consumer</artifactId>
      <name>custom-rpc-consumer</name>     <url>http://www.terwergreen.com</url>
      <properties>         <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>         <maven.compiler.source>1.8</maven.compiler.source>         <maven.compiler.target>1.8</maven.compiler.target>     </properties>
      <dependencies>         <dependency>             <groupId>com.terewrgreen</groupId>             <artifactId>custom-rpc-api</artifactId>             <version>1.0.0</version>         </dependency>
          <dependency>             <groupId>junit</groupId>             <artifactId>junit</artifactId>             <version>4.13.1</version>             <scope>test</scope>         </dependency>     </dependencies>
      <build>         <pluginManagement>         </pluginManagement>     </build> </project>
   | 
 
RpcClient
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84
   | 
 
 
 
 
 
 
 
 
  public class RpcClient {
      private NioEventLoopGroup group;     private Channel channel;
      private String ip;     private int port;
      private RpcClientHandler rpcClientHandler = new RpcClientHandler();     private ExecutorService executorService = Executors.newCachedThreadPool();
      public RpcClient(String ip, int port) {         this.ip = ip;         this.port = port;         initClient();     }
      
 
      public void initClient() {
          try {                          group = new NioEventLoopGroup();                          Bootstrap bootstrap = new Bootstrap();             bootstrap.group(group)                     .channel(NioSocketChannel.class)                     .option(ChannelOption.SO_KEEPALIVE, true)                     .option(ChannelOption.SO_TIMEOUT, 3000)                     .handler(new ChannelInitializer<SocketChannel>() {                         @Override                         protected void initChannel(SocketChannel channel) throws Exception {                             ChannelPipeline pipeline = channel.pipeline();                                                          pipeline.addLast(new StringDecoder());                             pipeline.addLast(new StringEncoder());                                                          pipeline.addLast(rpcClientHandler);                         }                     });             channel = bootstrap.connect(ip, port).sync().channel();             System.out.println("===========客户端启动成功==========");         } catch (Exception e) {             if (channel != null) {                 channel.close();                 System.out.println("客户端关闭channel");             }             if (group != null) {                 group.shutdownGracefully();                 System.out.println("客户端关闭group");             }             e.printStackTrace();         }     }
      public void close(){         if (channel != null) {             channel.close();             System.out.println("外部调用客户端关闭channel");         }         if (group != null) {             group.shutdownGracefully();             System.out.println("外部调用客户端关闭group");         }     }
      public Object send(String msg) throws ExecutionException, InterruptedException {         rpcClientHandler.setRequestMessage(msg);         Future future = executorService.submit(rpcClientHandler);         return future.get();     } }
 
  | 
 
RpcClienthandler
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58
   | 
 
 
 
 
 
 
 
  public class RpcClientHandler extends SimpleChannelInboundHandler implements Callable {
      private ChannelHandlerContext ctx;          private String requestMessage;     private String responseMessage;
      public String getRequestMessage() {         return requestMessage;     }
      public void setRequestMessage(String requestMessage) {         this.requestMessage = requestMessage;     }
      
 
 
 
 
 
      @Override     protected synchronized void channelRead0(ChannelHandlerContext channelHandlerContext, Object msg) throws Exception {         responseMessage = (String) msg;                  notify();     }
      
 
 
 
 
      @Override     public void channelActive(ChannelHandlerContext ctx) throws Exception {         this.ctx = ctx;     }
      @Override     public synchronized Object call() throws Exception {                  ctx.writeAndFlush(requestMessage);                  wait();         return responseMessage;     } }
 
  | 
 
RpcClientProxy
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50
   | 
 
 
 
 
 
 
 
 
 
  public class RpcClientProxy {     public static Object createProxy(Class serviceClass) {         return Proxy.newProxyInstance(Thread.currentThread().getContextClassLoader(), new Class[]{serviceClass}, new InvocationHandler() {             @Override             public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {                                  RpcRequest rpcRequest = new RpcRequest();                 rpcRequest.setRequestId(UUID.randomUUID().toString());                 rpcRequest.setClassName(method.getDeclaringClass().getName());                 rpcRequest.setMethodName(method.getName());                 rpcRequest.setParameterTypes(method.getParameterTypes());                 rpcRequest.setParameters(args);
                                   RpcClient rpcClient = new RpcClient("127.0.0.1", 9999);
                  try {                                          Object responseMessage = rpcClient.send(JSON.toJSONString(rpcRequest));
                                           RpcResponse response = JSON.parseObject(responseMessage.toString(), RpcResponse.class);                     if (response.getError() != null) {                         throw new RuntimeException(response.getError());                     }                     Object result = response.getResult();                     Object object = JSON.parseObject(result.toString(), method.getReturnType());                     return object;                 } catch (Exception e) {                     throw e;                 } finally {                     rpcClient.close();                 }
 
              }         });     } }
 
  | 
 
ClientBoosStrap
1 2 3 4 5 6 7 8 9 10 11 12 13 14
   | 
 
 
 
 
 
  public class ClientBootStrap {     public static void main(String[] args) {         IUSerService userService = (IUSerService) RpcClientProxy.createProxy(IUSerService.class);         User user = userService.getById(1);         System.out.println(user);     } }
 
  | 
 
custom-rpc-provider
pom.xml
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50
   | <?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"          xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">     <parent>         <artifactId>custom-rpc</artifactId>         <groupId>com.terewrgreen</groupId>         <version>1.0.0</version>     </parent>     <modelVersion>4.0.0</modelVersion>
      <artifactId>custom-rpc-provider</artifactId>
      <name>custom-rpc-provider</name>     <url>http://www.terwergreen.com</url>
      <properties>         <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>         <maven.compiler.source>1.8</maven.compiler.source>         <maven.compiler.target>1.8</maven.compiler.target>     </properties>
      <dependencies>         <dependency>             <groupId>com.terewrgreen</groupId>             <artifactId>custom-rpc-api</artifactId>             <version>1.0.0</version>         </dependency>                  <dependency>             <groupId>org.springframework</groupId>             <artifactId>spring-context</artifactId>         </dependency>         <dependency>             <groupId>org.springframework.boot</groupId>             <artifactId>spring-boot-autoconfigure</artifactId>         </dependency>
          <dependency>             <groupId>junit</groupId>             <artifactId>junit</artifactId>             <version>4.13.1</version>             <scope>test</scope>         </dependency>     </dependencies>
      <build>         <pluginManagement>         </pluginManagement>     </build> </project>
   | 
 
UserServiceImpl
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27
   | 
 
 
 
 
 
  @RpcService @Service public class UserServiceImpl implements IUSerService {     Map<Object, User> userMap = new HashMap<>();
      @Override     public User getById(int id) {         User user = new User();         user.setId(1);         user.setName("唐有炜");         userMap.put(user.getId(), user);
          User user2 = new User();         user2.setId(2);         user2.setName("张三");         userMap.put(user2.getId(), user2);
          return userMap.get(id);     } }
 
  | 
 
RpcServer
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69
   | 
 
 
 
 
 
  @Service public class RpcServer implements DisposableBean {     private NioEventLoopGroup bossGroup;     private NioEventLoopGroup workerGroup;
      @Autowired     private RpcServerHandler rpcServerHandler;
      public void startServer(String ip, int port) {         try {                          bossGroup = new NioEventLoopGroup(1);             workerGroup = new NioEventLoopGroup();
                           ServerBootstrap serverBootstrap = new ServerBootstrap();
                           serverBootstrap.group(bossGroup, workerGroup)                     .channel(NioServerSocketChannel.class)                     .childHandler(new ChannelInitializer<SocketChannel>() {                         @Override                         protected void initChannel(SocketChannel channel) throws Exception {                             ChannelPipeline pipeline = channel.pipeline();                                                          pipeline.addLast(new StringDecoder());                             pipeline.addLast(new StringEncoder());                                                          pipeline.addLast(rpcServerHandler);                         }                     });
                           ChannelFuture sync = serverBootstrap.bind(ip, port).sync();             System.out.println("===========服务端启动成功=============");             sync.channel().closeFuture().sync();         } catch (InterruptedException e) {             e.printStackTrace();         } finally {             if (bossGroup != null) {                 bossGroup.shutdownGracefully();                 System.out.println("finally bossGroup成功关闭");             }             if (workerGroup != null) {                 workerGroup.shutdownGracefully();                 System.out.println("finally workerGroup成功关闭");             }         }     }
      @Override     public void destroy() throws Exception {         if (bossGroup != null) {             bossGroup.shutdownGracefully();             System.out.println("destroy bossGroup成功关闭");         }         if (workerGroup != null) {             workerGroup.shutdownGracefully();             System.out.println("destroy workerGroup成功关闭");         }     } }
 
  | 
 
RpcServerHandler
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93
   | 
 
 
 
 
 
 
 
 
 
 
  @Component @ChannelHandler.Sharable public class RpcServerHandler extends SimpleChannelInboundHandler<String> implements ApplicationContextAware {     private static final Map SERVICE_INSTANCE_MAP = new ConcurrentHashMap();
      
 
 
 
 
      @Override     public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {         Map<String, Object> serviceMap = applicationContext.getBeansWithAnnotation(RpcService.class);         if (serviceMap != null && serviceMap.size() > 0) {             Set<Map.Entry<String, Object>> entries = serviceMap.entrySet();             for (Map.Entry<String, Object> entry : entries) {                 Object serviceBean = entry.getValue();                 if (serviceBean.getClass().getInterfaces().length == 0) {                     throw new RuntimeException("服务必须实现接口");                 }
                                   SERVICE_INSTANCE_MAP.put(serviceBean.getClass().getInterfaces()[0].getName(), serviceBean);             }
          }     }
      
 
 
 
 
 
      @Override     protected void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception {                  RpcRequest rpcRequest = JSON.parseObject(msg, RpcRequest.class);         RpcResponse rpcResponse = new RpcResponse();         rpcResponse.setRequestId(rpcRequest.getRequestId());
          try {             Object result = handler(rpcRequest);             rpcResponse.setResult(result);         } catch (Exception e) {             rpcResponse.setError(e.getMessage());             e.printStackTrace();         }
          ctx.writeAndFlush(JSON.toJSONString(rpcResponse));     }
      
 
 
 
 
      private Object handler(RpcRequest rpcRequest) throws InvocationTargetException {                  Object serviceBean = SERVICE_INSTANCE_MAP.get(rpcRequest.getClassName());         if(null == serviceBean){             throw new RuntimeException("根据beanName找不到服务"+rpcRequest.getClassName());         }
                   Class<?> beanClass = serviceBean.getClass();         String methodName = rpcRequest.getMethodName();         Class<?>[] parameterTypes = rpcRequest.getParameterTypes();         Object[] parameters = rpcRequest.getParameters();
                   FastClass fastClass = FastClass.create(beanClass);         FastMethod fastMethod = fastClass.getMethod(methodName, parameterTypes);         Object result = fastMethod.invoke(serviceBean, parameters);
          return result;     } }
 
  | 
 
ServerBootdtrapApplication
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26
   | 
 
 
 
 
 
  @SpringBootApplication public class ServerBootstrapApplication implements CommandLineRunner {     @Autowired     private RpcServer rpcServer;
      public static void main(String[] args) {         SpringApplication.run(ServerBootstrapApplication.class, args);     }
      @Override     public void run(String... args) throws Exception {         new Thread(new Runnable() {             @Override             public void run() {                 rpcServer.startServer("127.0.0.1", 9999);             }         }).start();     } }
 
  | 
 
运行效果


错误解决
1
   | com.terewrgreen.rpc.provider.handler.RpcServerHandler is not a @Sharable handler, so can't be added or removed multiple times.
   | 
 
加上 @ChannelHandler.Sharable 注解即可。