神刀安全网

一个轻量级分布式RPC框架–NettyRpc

1、背景

最近在搜索Netty和Zookeeper方面的文章时,看到了这篇文章《 轻量级分布式 RPC 框架 》,作者用Zookeeper、Netty和Spring写了一个轻量级的分布式RPC框架。花了一些时间看了下他的代码,写的干净简单,写的RPC框架可以算是一个简易版的 dubbo 。这个RPC框架虽小,但是麻雀虽小,五脏俱全,有兴趣的可以学习一下。

自己花了点时间整理了下他代码,并修改一些bug,以下是自己学习的一点小结。

2、简介

RPC,即 Remote Procedure Call(远程过程调用),调用远程计算机上的服务,就像调用本地服务一样。RPC可以很好的解耦系统,如WebService就是一种基于Http协议的RPC。

这个RPC整体框架如下:

一个轻量级分布式RPC框架--NettyRpc

这个RPC框架使用的一些技术所解决的问题:

服务发布与订阅:服务端使用Zookeeper注册服务地址,客户端从Zookeeper获取可用的服务地址。

通信:使用Netty作为通信框架。

Spring:使用Spring配置服务,加载Bean,扫描注解。

动态代理:客户端使用代理模式透明化服务调用。

消息编解码:使用Protostuff序列化和反序列化消息。

3、服务端发布服务

使用注解标注要发布的服务

服务注解

 @Target({ElementType.TYPE}) @Retention(RetentionPolicy.RUNTIME) @Component public @interface RpcService {     Class<?> value(); } 

一个服务接口:

 public interface HelloService {      String hello(String name);      String hello(Person person); } 

一个服务实现:使用注解标注

 @RpcService(HelloService.class) public class HelloServiceImpl implements HelloService {      @Override     public String hello(String name) {         return "Hello! " + name;     }      @Override     public String hello(Person person) {         return "Hello! " + person.getFirstName() + " " + person.getLastName();     } } 

服务在启动的时候扫描得到所有的服务接口及其实现:

 @Override     public void setApplicationContext(ApplicationContext ctx) throws BeansException {         Map<String, Object> serviceBeanMap = ctx.getBeansWithAnnotation(RpcService.class);         if (MapUtils.isNotEmpty(serviceBeanMap)) {             for (Object serviceBean : serviceBeanMap.values()) {                 String interfaceName = serviceBean.getClass().getAnnotation(RpcService.class).value().getName();                 handlerMap.put(interfaceName, serviceBean);             }         }     } 

在Zookeeper集群上注册服务地址:

  public class ServiceRegistry {      private static final Logger LOGGER = LoggerFactory.getLogger(ServiceRegistry.class);      private CountDownLatch latch = new CountDownLatch(1);      private String registryAddress;      public ServiceRegistry(String registryAddress) {         this.registryAddress = registryAddress;     }      public void register(String data) {         if (data != null) {             ZooKeeper zk = connectServer();             if (zk != null) {                 AddRootNode(zk); // Add root node if not exist                 createNode(zk, data);             }         }     }      private ZooKeeper connectServer() {         ZooKeeper zk = null;         try {             zk = new ZooKeeper(registryAddress, Constant.ZK_SESSION_TIMEOUT, new Watcher() {                 @Override                 public void process(WatchedEvent event) {                     if (event.getState() == Event.KeeperState.SyncConnected) {                         latch.countDown();                     }                 }             });             latch.await();         } catch (IOException e) {             LOGGER.error("", e);         }         catch (InterruptedException ex){             LOGGER.error("", ex);         }         return zk;     }      private void AddRootNode(ZooKeeper zk){         try {             Stat s = zk.exists(Constant.ZK_REGISTRY_PATH, false);             if (s == null) {                 zk.create(Constant.ZK_REGISTRY_PATH, new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);             }         } catch (KeeperException e) {             LOGGER.error(e.toString());         } catch (InterruptedException e) {             LOGGER.error(e.toString());         }     }      private void createNode(ZooKeeper zk, String data) {         try {             byte[] bytes = data.getBytes();             String path = zk.create(Constant.ZK_DATA_PATH, bytes, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);             LOGGER.debug("create zookeeper node ({} => {})", path, data);         } catch (KeeperException e) {             LOGGER.error("", e);         }         catch (InterruptedException ex){             LOGGER.error("", ex);         }     } }  ServiceRegistry

这里在原文的基础上加了AddRootNode()判断服务父节点是否存在,如果不存在则添加一个PERSISTENT的服务父节点,这样虽然启动服务时多了点判断,但是不需要手动命令添加服务父节点了。

关于Zookeeper的使用原理,可以看这里《ZooKeeper基本原理》。

4、客户端调用服务

使用代理模式调用服务:

 public class RpcProxy {      private String serverAddress;     private ServiceDiscovery serviceDiscovery;      public RpcProxy(String serverAddress) {         this.serverAddress = serverAddress;     }      public RpcProxy(ServiceDiscovery serviceDiscovery) {         this.serviceDiscovery = serviceDiscovery;     }      @SuppressWarnings("unchecked")     public <T> T create(Class<?> interfaceClass) {         return (T) Proxy.newProxyInstance(                 interfaceClass.getClassLoader(),                 new Class<?>[]{interfaceClass},                 new InvocationHandler() {                     @Override                     public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {                         RpcRequest request = new RpcRequest();                         request.setRequestId(UUID.randomUUID().toString());                         request.setClassName(method.getDeclaringClass().getName());                         request.setMethodName(method.getName());                         request.setParameterTypes(method.getParameterTypes());                         request.setParameters(args);                          if (serviceDiscovery != null) {                             serverAddress = serviceDiscovery.discover();                         }                         if(serverAddress != null){                             String[] array = serverAddress.split(":");                             String host = array[0];                             int port = Integer.parseInt(array[1]);                              RpcClient client = new RpcClient(host, port);                             RpcResponse response = client.send(request);                              if (response.isError()) {                                 throw new RuntimeException("Response error.",new Throwable(response.getError()));                             } else {                                 return response.getResult();                             }                         }                         else{                             throw new RuntimeException("No server address found!");                         }                     }                 }         );     } } 

这里每次使用代理远程调用服务,从Zookeeper上获取可用的服务地址,通过RpcClient send一个Request,等待该Request的Response返回。这里原文有个比较严重的bug,在原文给出的简单的Test中是很难测出来的,原文使用了obj的wait和notifyAll来等待Response返回,会出现“假死等待”的情况:一个Request发送出去后,在obj.wait()调用之前可能Response就返回了,这时候在channelRead0里已经拿到了Response并且obj.notifyAll()已经在obj.wait()之前调用了,这时候send后再obj.wait()就出现了假死等待,客户端就一直等待在这里。使用CountDownLatch可以解决这个问题。

注意:这里每次调用的send时候才去和服务端建立连接,使用的是短连接,这种短连接在高并发时会有连接数问题,也会影响性能。

从Zookeeper上获取服务地址:

  public class ServiceDiscovery {      private static final Logger LOGGER = LoggerFactory.getLogger(ServiceDiscovery.class);      private CountDownLatch latch = new CountDownLatch(1);      private volatile List<String> dataList = new ArrayList<>();      private String registryAddress;      public ServiceDiscovery(String registryAddress) {         this.registryAddress = registryAddress;         ZooKeeper zk = connectServer();         if (zk != null) {             watchNode(zk);         }     }      public String discover() {         String data = null;         int size = dataList.size();         if (size > 0) {             if (size == 1) {                 data = dataList.get(0);                 LOGGER.debug("using only data: {}", data);             } else {                 data = dataList.get(ThreadLocalRandom.current().nextInt(size));                 LOGGER.debug("using random data: {}", data);             }         }         return data;     }      private ZooKeeper connectServer() {         ZooKeeper zk = null;         try {             zk = new ZooKeeper(registryAddress, Constant.ZK_SESSION_TIMEOUT, new Watcher() {                 @Override                 public void process(WatchedEvent event) {                     if (event.getState() == Event.KeeperState.SyncConnected) {                         latch.countDown();                     }                 }             });             latch.await();         } catch (IOException | InterruptedException e) {             LOGGER.error("", e);         }         return zk;     }      private void watchNode(final ZooKeeper zk) {         try {             List<String> nodeList = zk.getChildren(Constant.ZK_REGISTRY_PATH, new Watcher() {                 @Override                 public void process(WatchedEvent event) {                     if (event.getType() == Event.EventType.NodeChildrenChanged) {                         watchNode(zk);                     }                 }             });             List<String> dataList = new ArrayList<>();             for (String node : nodeList) {                 byte[] bytes = zk.getData(Constant.ZK_REGISTRY_PATH + "/" + node, false, null);                 dataList.add(new String(bytes));             }             LOGGER.debug("node data: {}", dataList);             this.dataList = dataList;         } catch (KeeperException | InterruptedException e) {             LOGGER.error("", e);         }     } }  ServiceDiscovery 

每次服务地址节点发生变化,都需要再次watchNode,获取新的服务地址列表。

5、消息编码

请求消息:

  public class RpcRequest {      private String requestId;     private String className;     private String methodName;     private Class<?>[] parameterTypes;     private Object[] parameters;      public String getRequestId() {         return requestId;     }      public void setRequestId(String requestId) {         this.requestId = requestId;     }      public String getClassName() {         return className;     }      public void setClassName(String className) {         this.className = className;     }      public String getMethodName() {         return methodName;     }      public void setMethodName(String methodName) {         this.methodName = methodName;     }      public Class<?>[] getParameterTypes() {         return parameterTypes;     }      public void setParameterTypes(Class<?>[] parameterTypes) {         this.parameterTypes = parameterTypes;     }      public Object[] getParameters() {         return parameters;     }      public void setParameters(Object[] parameters) {         this.parameters = parameters;     } }  RpcRequest

响应消息:

  public class RpcResponse {      private String requestId;     private String error;     private Object result;      public boolean isError() {         return error != null;     }      public String getRequestId() {         return requestId;     }      public void setRequestId(String requestId) {         this.requestId = requestId;     }      public String getError() {         return error;     }      public void setError(String error) {         this.error = error;     }      public Object getResult() {         return result;     }      public void setResult(Object result) {         this.result = result;     } }  RpcResponse

消息序列化和反序列化工具:(基于 Protostuff 实现)

  public class SerializationUtil {      private static Map<Class<?>, Schema<?>> cachedSchema = new ConcurrentHashMap<>();      private static Objenesis objenesis = new ObjenesisStd(true);      private SerializationUtil() {     }      @SuppressWarnings("unchecked")     private static <T> Schema<T> getSchema(Class<T> cls) {         Schema<T> schema = (Schema<T>) cachedSchema.get(cls);         if (schema == null) {             schema = RuntimeSchema.createFrom(cls);             if (schema != null) {                 cachedSchema.put(cls, schema);             }         }         return schema;     }      /**      * 序列化(对象 -> 字节数组)      */     @SuppressWarnings("unchecked")     public static <T> byte[] serialize(T obj) {         Class<T> cls = (Class<T>) obj.getClass();         LinkedBuffer buffer = LinkedBuffer.allocate(LinkedBuffer.DEFAULT_BUFFER_SIZE);         try {             Schema<T> schema = getSchema(cls);             return ProtostuffIOUtil.toByteArray(obj, schema, buffer);         } catch (Exception e) {             throw new IllegalStateException(e.getMessage(), e);         } finally {             buffer.clear();         }     }      /**      * 反序列化(字节数组 -> 对象)      */     public static <T> T deserialize(byte[] data, Class<T> cls) {         try {             T message = (T) objenesis.newInstance(cls);             Schema<T> schema = getSchema(cls);             ProtostuffIOUtil.mergeFrom(data, message, schema);             return message;         } catch (Exception e) {             throw new IllegalStateException(e.getMessage(), e);         }     } }  SerializationUtil

由于处理的是TCP消息,本人加了TCP的粘包处理Handler

 channel.pipeline().addLast(new LengthFieldBasedFrameDecoder(65536,0,4,0,0)) 

消息编解码时开始4个字节表示消息的长度,也就是消息编码的时候,先写消息的长度,再写消息。

6、性能改进

Netty本身就是一个高性能的网络框架,从网络IO方面来说并没有太大的问题。

从这个RPC框架本身来说,在原文的基础上把Server端处理请求的过程改成了多线程异步:

  public void channelRead0(final ChannelHandlerContext ctx,final RpcRequest request) throws Exception {         RpcServer.submit(new Runnable() {             @Override             public void run() {                 LOGGER.debug("Receive request " + request.getRequestId());                 RpcResponse response = new RpcResponse();                 response.setRequestId(request.getRequestId());                 try {                     Object result = handle(request);                     response.setResult(result);                 } catch (Throwable t) {                     response.setError(t.toString());                     LOGGER.error("RPC Server handle request error",t);                 }                 ctx.writeAndFlush(response).addListener(ChannelFutureListener.CLOSE).addListener(new ChannelFutureListener() {                     @Override                     public void operationComplete(ChannelFuture channelFuture) throws Exception {                         LOGGER.debug("Send response for request " + request.getRequestId());                     }                 });             }         });     } 

Netty 4中的Handler处理在IO线程中,如果Handler处理中有耗时的操作(如数据库相关),会让IO线程等待,影响性能。

个人觉得该RPC的待改进项:

1)客户端保持和服务进行长连接,不需要每次调用服务的时候进行连接,长连接的管理(通过Zookeeper获取有效的地址)。

2)客户端请求异步处理的支持,不需要同步等待:发送一个异步请求,返回Feature,通过Feature的callback机制获取结果。

3)编码序列化的多协议支持。

有时间再改改吧。。

项目地址: https://github.com/luxiaoxun/NettyRpc

参考:

轻量级分布式 RPC 框架:http://my.oschina.net/huangyong/blog/361751

你应该知道的 RPC原理:http://www.cnblogs.com/LBSer/p/4853234.html

转载本站任何文章请注明:转载至神刀安全网,谢谢神刀安全网 » 一个轻量级分布式RPC框架–NettyRpc

分享到:更多 ()

评论 抢沙发

  • 昵称 (必填)
  • 邮箱 (必填)
  • 网址
分享按钮