基于Netty实现RPC框架
Dubbo
底层使用 Netty
作为网络通讯框架,要求使用 Netty
实现一个简单的 RPC
框架,消费者和提供者约定协议和接口,消费者远程调用提供者的服务。
1、创建一个接口,定义抽象方法,用于消费者和提供者之间的约定。
2、创建一个提供者,该类需要监听消费者的请求,并按照约定返回数据。
3、创建一个消费者,该类需要透明的调用自己不存在的方法,内部需要使用 Netty
进行数据通信。
4、提供者与消费者传输数据使用json字符串格式。
5、提供者使用 Netty
集成 Spring Boot
环境。
案例:客户端调用服务端,利用ID查询User对象的方法
需求分析
具体实现
需要分成三个子项目
1 2 3 4 5
| . ├── custom-rpc-api ├── custom-rpc-consumer ├── custom-rpc-provider └── pom.xml
|
主项目
主项目的 pom.xml
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46
| <?xml version="1.0" encoding="UTF-8"?> <project xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns="http://maven.apache.org/POM/4.0.0" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>com.terewrgreen</groupId> <artifactId>custom-rpc</artifactId> <packaging>pom</packaging> <version>1.0.0</version>
<modules> <module>custom-rpc-api</module> <module>custom-rpc-provider</module> <module>custom-rpc-consumer</module> </modules>
<parent> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-parent</artifactId> <version>2.6.3</version> </parent>
<properties> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> <curator.version>4.3.0</curator.version> </properties>
<dependencies> <dependency> <groupId>io.netty</groupId> <artifactId>netty-all</artifactId> </dependency> <dependency> <groupId>com.alibaba</groupId> <artifactId>fastjson</artifactId> <version>1.2.79</version> </dependency> <dependency> <groupId>org.projectlombok</groupId> <artifactId>lombok</artifactId> </dependency> </dependencies> </project>
|
custom-rpc-api
pom.xml
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35
| <?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <parent> <artifactId>custom-rpc</artifactId> <groupId>com.terewrgreen</groupId> <version>1.0.0</version> </parent> <modelVersion>4.0.0</modelVersion>
<artifactId>custom-rpc-api</artifactId>
<name>custom-rpc-api</name> <url>http://www.terwergreen.com</url>
<properties> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> <maven.compiler.source>1.8</maven.compiler.source> <maven.compiler.target>1.8</maven.compiler.target> </properties>
<dependencies> <dependency> <groupId>junit</groupId> <artifactId>junit</artifactId> <version>4.13.1</version> <scope>test</scope> </dependency> </dependencies>
<build> <pluginManagement> </pluginManagement> </build> </project>
|
custom-rpc-consumer
pom.xml
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41
| <?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <parent> <artifactId>custom-rpc</artifactId> <groupId>com.terewrgreen</groupId> <version>1.0.0</version> </parent> <modelVersion>4.0.0</modelVersion>
<artifactId>custom-rpc-consumer</artifactId>
<name>custom-rpc-consumer</name> <url>http://www.terwergreen.com</url>
<properties> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> <maven.compiler.source>1.8</maven.compiler.source> <maven.compiler.target>1.8</maven.compiler.target> </properties>
<dependencies> <dependency> <groupId>com.terewrgreen</groupId> <artifactId>custom-rpc-api</artifactId> <version>1.0.0</version> </dependency>
<dependency> <groupId>junit</groupId> <artifactId>junit</artifactId> <version>4.13.1</version> <scope>test</scope> </dependency> </dependencies>
<build> <pluginManagement> </pluginManagement> </build> </project>
|
RpcClient
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84
|
public class RpcClient {
private NioEventLoopGroup group; private Channel channel;
private String ip; private int port;
private RpcClientHandler rpcClientHandler = new RpcClientHandler(); private ExecutorService executorService = Executors.newCachedThreadPool();
public RpcClient(String ip, int port) { this.ip = ip; this.port = port; initClient(); }
public void initClient() {
try { group = new NioEventLoopGroup(); Bootstrap bootstrap = new Bootstrap(); bootstrap.group(group) .channel(NioSocketChannel.class) .option(ChannelOption.SO_KEEPALIVE, true) .option(ChannelOption.SO_TIMEOUT, 3000) .handler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel channel) throws Exception { ChannelPipeline pipeline = channel.pipeline(); pipeline.addLast(new StringDecoder()); pipeline.addLast(new StringEncoder()); pipeline.addLast(rpcClientHandler); } }); channel = bootstrap.connect(ip, port).sync().channel(); System.out.println("===========客户端启动成功=========="); } catch (Exception e) { if (channel != null) { channel.close(); System.out.println("客户端关闭channel"); } if (group != null) { group.shutdownGracefully(); System.out.println("客户端关闭group"); } e.printStackTrace(); } }
public void close(){ if (channel != null) { channel.close(); System.out.println("外部调用客户端关闭channel"); } if (group != null) { group.shutdownGracefully(); System.out.println("外部调用客户端关闭group"); } }
public Object send(String msg) throws ExecutionException, InterruptedException { rpcClientHandler.setRequestMessage(msg); Future future = executorService.submit(rpcClientHandler); return future.get(); } }
|
RpcClienthandler
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58
|
public class RpcClientHandler extends SimpleChannelInboundHandler implements Callable {
private ChannelHandlerContext ctx; private String requestMessage; private String responseMessage;
public String getRequestMessage() { return requestMessage; }
public void setRequestMessage(String requestMessage) { this.requestMessage = requestMessage; }
@Override protected synchronized void channelRead0(ChannelHandlerContext channelHandlerContext, Object msg) throws Exception { responseMessage = (String) msg; notify(); }
@Override public void channelActive(ChannelHandlerContext ctx) throws Exception { this.ctx = ctx; }
@Override public synchronized Object call() throws Exception { ctx.writeAndFlush(requestMessage); wait(); return responseMessage; } }
|
RpcClientProxy
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50
|
public class RpcClientProxy { public static Object createProxy(Class serviceClass) { return Proxy.newProxyInstance(Thread.currentThread().getContextClassLoader(), new Class[]{serviceClass}, new InvocationHandler() { @Override public Object invoke(Object proxy, Method method, Object[] args) throws Throwable { RpcRequest rpcRequest = new RpcRequest(); rpcRequest.setRequestId(UUID.randomUUID().toString()); rpcRequest.setClassName(method.getDeclaringClass().getName()); rpcRequest.setMethodName(method.getName()); rpcRequest.setParameterTypes(method.getParameterTypes()); rpcRequest.setParameters(args);
RpcClient rpcClient = new RpcClient("127.0.0.1", 9999);
try { Object responseMessage = rpcClient.send(JSON.toJSONString(rpcRequest));
RpcResponse response = JSON.parseObject(responseMessage.toString(), RpcResponse.class); if (response.getError() != null) { throw new RuntimeException(response.getError()); } Object result = response.getResult(); Object object = JSON.parseObject(result.toString(), method.getReturnType()); return object; } catch (Exception e) { throw e; } finally { rpcClient.close(); }
} }); } }
|
ClientBoosStrap
1 2 3 4 5 6 7 8 9 10 11 12 13 14
|
public class ClientBootStrap { public static void main(String[] args) { IUSerService userService = (IUSerService) RpcClientProxy.createProxy(IUSerService.class); User user = userService.getById(1); System.out.println(user); } }
|
custom-rpc-provider
pom.xml
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50
| <?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <parent> <artifactId>custom-rpc</artifactId> <groupId>com.terewrgreen</groupId> <version>1.0.0</version> </parent> <modelVersion>4.0.0</modelVersion>
<artifactId>custom-rpc-provider</artifactId>
<name>custom-rpc-provider</name> <url>http://www.terwergreen.com</url>
<properties> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> <maven.compiler.source>1.8</maven.compiler.source> <maven.compiler.target>1.8</maven.compiler.target> </properties>
<dependencies> <dependency> <groupId>com.terewrgreen</groupId> <artifactId>custom-rpc-api</artifactId> <version>1.0.0</version> </dependency> <dependency> <groupId>org.springframework</groupId> <artifactId>spring-context</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-autoconfigure</artifactId> </dependency>
<dependency> <groupId>junit</groupId> <artifactId>junit</artifactId> <version>4.13.1</version> <scope>test</scope> </dependency> </dependencies>
<build> <pluginManagement> </pluginManagement> </build> </project>
|
UserServiceImpl
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27
|
@RpcService @Service public class UserServiceImpl implements IUSerService { Map<Object, User> userMap = new HashMap<>();
@Override public User getById(int id) { User user = new User(); user.setId(1); user.setName("唐有炜"); userMap.put(user.getId(), user);
User user2 = new User(); user2.setId(2); user2.setName("张三"); userMap.put(user2.getId(), user2);
return userMap.get(id); } }
|
RpcServer
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69
|
@Service public class RpcServer implements DisposableBean { private NioEventLoopGroup bossGroup; private NioEventLoopGroup workerGroup;
@Autowired private RpcServerHandler rpcServerHandler;
public void startServer(String ip, int port) { try { bossGroup = new NioEventLoopGroup(1); workerGroup = new NioEventLoopGroup();
ServerBootstrap serverBootstrap = new ServerBootstrap();
serverBootstrap.group(bossGroup, workerGroup) .channel(NioServerSocketChannel.class) .childHandler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel channel) throws Exception { ChannelPipeline pipeline = channel.pipeline(); pipeline.addLast(new StringDecoder()); pipeline.addLast(new StringEncoder()); pipeline.addLast(rpcServerHandler); } });
ChannelFuture sync = serverBootstrap.bind(ip, port).sync(); System.out.println("===========服务端启动成功============="); sync.channel().closeFuture().sync(); } catch (InterruptedException e) { e.printStackTrace(); } finally { if (bossGroup != null) { bossGroup.shutdownGracefully(); System.out.println("finally bossGroup成功关闭"); } if (workerGroup != null) { workerGroup.shutdownGracefully(); System.out.println("finally workerGroup成功关闭"); } } }
@Override public void destroy() throws Exception { if (bossGroup != null) { bossGroup.shutdownGracefully(); System.out.println("destroy bossGroup成功关闭"); } if (workerGroup != null) { workerGroup.shutdownGracefully(); System.out.println("destroy workerGroup成功关闭"); } } }
|
RpcServerHandler
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93
|
@Component @ChannelHandler.Sharable public class RpcServerHandler extends SimpleChannelInboundHandler<String> implements ApplicationContextAware { private static final Map SERVICE_INSTANCE_MAP = new ConcurrentHashMap();
@Override public void setApplicationContext(ApplicationContext applicationContext) throws BeansException { Map<String, Object> serviceMap = applicationContext.getBeansWithAnnotation(RpcService.class); if (serviceMap != null && serviceMap.size() > 0) { Set<Map.Entry<String, Object>> entries = serviceMap.entrySet(); for (Map.Entry<String, Object> entry : entries) { Object serviceBean = entry.getValue(); if (serviceBean.getClass().getInterfaces().length == 0) { throw new RuntimeException("服务必须实现接口"); }
SERVICE_INSTANCE_MAP.put(serviceBean.getClass().getInterfaces()[0].getName(), serviceBean); }
} }
@Override protected void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception { RpcRequest rpcRequest = JSON.parseObject(msg, RpcRequest.class); RpcResponse rpcResponse = new RpcResponse(); rpcResponse.setRequestId(rpcRequest.getRequestId());
try { Object result = handler(rpcRequest); rpcResponse.setResult(result); } catch (Exception e) { rpcResponse.setError(e.getMessage()); e.printStackTrace(); }
ctx.writeAndFlush(JSON.toJSONString(rpcResponse)); }
private Object handler(RpcRequest rpcRequest) throws InvocationTargetException { Object serviceBean = SERVICE_INSTANCE_MAP.get(rpcRequest.getClassName()); if(null == serviceBean){ throw new RuntimeException("根据beanName找不到服务"+rpcRequest.getClassName()); }
Class<?> beanClass = serviceBean.getClass(); String methodName = rpcRequest.getMethodName(); Class<?>[] parameterTypes = rpcRequest.getParameterTypes(); Object[] parameters = rpcRequest.getParameters();
FastClass fastClass = FastClass.create(beanClass); FastMethod fastMethod = fastClass.getMethod(methodName, parameterTypes); Object result = fastMethod.invoke(serviceBean, parameters);
return result; } }
|
ServerBootdtrapApplication
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26
|
@SpringBootApplication public class ServerBootstrapApplication implements CommandLineRunner { @Autowired private RpcServer rpcServer;
public static void main(String[] args) { SpringApplication.run(ServerBootstrapApplication.class, args); }
@Override public void run(String... args) throws Exception { new Thread(new Runnable() { @Override public void run() { rpcServer.startServer("127.0.0.1", 9999); } }).start(); } }
|
运行效果


错误解决
1
| com.terewrgreen.rpc.provider.handler.RpcServerHandler is not a @Sharable handler, so can't be added or removed multiple times.
|
加上 @ChannelHandler.Sharable
注解即可。