【手写一个RPC框架】simpleRPC-06

2023-10-27

在这里插入图片描述

本项目所有代码可见:https://github.com/weiyu-zeng/SimpleRPC

前言

本次改进我们将引入zookeeper作为RPC框架的注册中心,服务端在zookeeper上注册自己的服务,而客户端调用服务,回去zookeeper上根据服务名寻找调用的服务器地址,使得我们RPC支持集群调度通信的能力。

实现

zookeeper安装与使用

zookeeper安装请见:

【zookeeper】windows版zookeeper安装与启动 可能遇到的各种问题

安装好之后,我们打开zookeeper的server:

在这里插入图片描述
server启动如下:
在这里插入图片描述

开启zookeeper的client:

在这里插入图片描述
如下,说明成功启动了
在这里插入图片描述

按回车:
在这里插入图片描述

输入ls /我们查看目录:

在这里插入图片描述

到此为止,先放在这不要关,我们写代码去。

项目创建

创建一个名为simpleRPC-06的module:

在这里插入图片描述

创建com.rpc的package:

在这里插入图片描述

依赖配置

pom.xml

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <parent>
        <artifactId>SimpleRPC</artifactId>
        <groupId>org.example</groupId>
        <version>1.0-SNAPSHOT</version>
    </parent>
    <modelVersion>4.0.0</modelVersion>

    <artifactId>simpleRPC-06</artifactId>

    <properties>
        <maven.compiler.source>8</maven.compiler.source>
        <maven.compiler.target>8</maven.compiler.target>
    </properties>

    <dependencies>
        <!-- https://mvnrepository.com/artifact/org.projectlombok/lombok -->
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <version>1.18.12</version>
            <scope>provided</scope>
        </dependency>

        <dependency>
            <groupId>io.netty</groupId>
            <artifactId>netty-all</artifactId>
            <version>4.1.51.Final</version>
        </dependency>

        <!-- 阿里的fastjson序列化框架 -->
        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>fastjson</artifactId>
            <version>1.2.67</version>
        </dependency>

        <!--这个jar包应该依赖log4j,不引入log4j会有控制台会有warn,但不影响正常使用-->
        <dependency>
            <groupId>org.apache.curator</groupId>
            <artifactId>curator-recipes</artifactId>
            <version>2.13.0</version>
        </dependency>
        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-nop</artifactId>
            <version>1.7.30</version>
        </dependency>
    </dependencies>

</project>

请注意一下,curator必须要和zookeeper版本适配,如果curator版本太高,项目将无法运行。

我们在resources目录下配置一下 log4j的配置,文件名为 log4j.properties:

在这里插入图片描述

log4j.properties

log4j.rootLogger=INFO, stdout 
log4j.appender.stdout=org.apache.log4j.ConsoleAppender 
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout 
log4j.appender.stdout.layout.ConversionPattern=%d %p [%c] - %m%n 
log4j.appender.logfile=org.apache.log4j.FileAppender 
log4j.appender.logfile.File=target/spring.log
log4j.appender.logfile.layout=org.apache.log4j.PatternLayout
log4j.appender.logfile.layout.ConversionPattern=%d %p [%c] - %m%n 

register

我们创建一个名为register的package:

在这里插入图片描述

创建注册中心的注册服务接口ServiceRegister.java:

package com.rpc.register;

import java.net.InetSocketAddress;

/**
 * @author weiyu_zeng
 *
 * 服务注册接口,两大基本功能,注册:保存服务与地址。 查询:根据服务名查找地址
 */
public interface ServiceRegister {

    void register(String serviceName, InetSocketAddress serverAddress);

    InetSocketAddress serviceDiscovery(String serviceName);
}

然后创建服务注册实现类 ZkServiceRegister.java:

package com.rpc.register;

import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.data.Stat;

import java.net.InetSocketAddress;
import java.util.List;

/**
 * @author weiyu_zeng
 *
 * Curator:是Zookeeper开源的客户端框架,封装了很多API,使用起来非常的方便
 * CuratorFramework:连接zookeeper服务的框架,客户端创建使用静态工厂方式CuratorFrameworkFactory进行创建
 * tickTime:zk的心跳间隔(heartbeat interval),也是session timeout基本单位.单位为毫秒.
 * minSessionTimeout:最小超时时间,zk设置的默认值为2*tickTime.
 * maxSessionTimeout:最大超时时间,zk设置的默认值为20*tickTime.
 * retryPolicy()重连策略:
 * Curator 四种重连策略:
 *      1.RetryUntilElapsed(int maxElapsedTimeMs, int sleepMsBetweenRetries)
 *      以sleepMsBetweenRetries的间隔重连,直到超过maxElapsedTimeMs的时间设置
 *
 *      2.RetryNTimes(int n, int sleepMsBetweenRetries)
 *      指定重连次数
 *
 *      3.RetryOneTime(int sleepMsBetweenRetry)
 *      重连一次,简单粗暴
 *
 *      4.ExponentialBackoffRetry
 *      ExponentialBackoffRetry(int baseSleepTimeMs, int maxRetries)
 *      ExponentialBackoffRetry(int baseSleepTimeMs, int maxRetries, int maxSleepMs)
 *      时间间隔 = baseSleepTimeMs * Math.max(1, random.nextInt(1 << (retryCount + 1)))
 *
 * namespace(): 为了避免各个应用的zk patch冲突, Curator Framework内部会给每一个Curator Framework实例分配一个namespace(可选).
 *              这样你在create ZNode的时候都会自动加上这个namespace作为这个node path的root.
 * CuratorFramework.create():开始创建操作,可以调用额外的方法(比如方式mode 或者后台执行background) 并在最后调用forPath()
 *                            指定要操作的ZNode
 * CuratorFramework.checkExists(): 开始检查ZNode是否存在的操作. 可以调用额外的方法(监控或者后台处理)并在最后调用forPath()
 *                                指定要操作的ZNode
 * CuratorFramework.start() / close():启动和关闭客户端
 * CuratorFramework(client).create().withMode(CreateMode.EPHEMERAL):这将使用给定的数据创建临时结点 EPHEMERAL ZNode
 * CuratorFramework.getChildren():开始获得ZNode的子节点列表。 以调用额外的方法(监控、后台处理或者获取状态watch,
 *                                 background or get stat)并在最后调用forPath()指定要操作的ZNode
 *
 * InetSocketAddress:该类实现了可序列化接口,直接继承自java.net.SocketAddress类。实现 IP 套接字地址(IP 地址 + 端口号)。
 *                    它还可以是一个对(主机名 + 端口号),在此情况下,将尝试解析主机名。如果解析失败,则该地址将被视为未解析
 *                    地址,但是其在某些情形下仍然可以使用,比如通过代理连接。
 *                    构造方法:InetSocketAddress(InetAddress addr,  int port)  根据 IP 地址和端口号创建套接字地址。
 *                             InetSocketAddress(String hostname, int port) 根据主机名(IP地址指代)和端口号创建套接字地址。
 * InetSocketAddress.getHostName():获取 hostname。即地址的主机名部分。
 * InetSocketAddress.getPort()  获取端口号。
 */
public class ZkServiceRegister implements ServiceRegister {

    // curator 提供的zookeeper客户端
    private CuratorFramework client;

    // zookeeper根路径结点
    private static final String ROOT_PATH = "MyRPC";

    // 构造方法
    // 这里负责zookeeper客户端的初始化,并与zookeeper服务端建立连接。
    // 初始化包括指定重连策略,指定连接zookeeper的端口,指定超时时间,指定命名空间
    // 初始化完成之后start()开启zookeeper客户端。
    public ZkServiceRegister() {

        // 重连策略:指数时间重试
        RetryPolicy policy = new ExponentialBackoffRetry(1000, 3);

        // zookeeper的地址固定,不管是服务提供者还是消费者,都要与之建立连接
        // sessionTimeoutMs 与 zoo.cfg中的tickTime 有关系,
        // zk还会根据minSessionTimeout与maxSessionTimeout两个参数重新调整最后的超时值。默认分别为tickTime 的2倍和20倍
        // 使用心跳监听状态
        this.client = CuratorFrameworkFactory.builder().connectString("127.0.0.1:2181")
                                             .sessionTimeoutMs(40000)
                                             .retryPolicy(policy)
                                             .namespace(ROOT_PATH)
                                             .build();
        this.client.start();
        System.out.println("zookeeper 连接成功");
    }

    // 注册:传入服务方法名(String),传入主机名和端口号的套接字地址(InetSocketAddress)
    @Override
    public void register(String serviceName, InetSocketAddress serverAddress) {
        try {
            // serviceName创建成永久节点,服务提供者下线时,不删服务名,只删地址
            Stat stat = client.checkExists().forPath("/" + serviceName);
            if (stat == null) {
                client.create()
                        .creatingParentsIfNeeded()
                        .withMode(CreateMode.PERSISTENT)
                        .forPath("/" + serviceName);
            }
            // 路径地址,一个/代表一个节点
            String path = "/" + serviceName + "/" + getServiceAddress(serverAddress);
            // 临时节点,服务器下线就删除节点
            client.create().creatingParentsIfNeeded().withMode(CreateMode.EPHEMERAL).forPath(path);
        } catch (Exception e) {
            System.out.println("此服务已存在");
        }
    }

    // 根据服务名返回地址
    @Override
    public InetSocketAddress serviceDiscovery(String serviceName) {
        try {
            List<String> strings = client.getChildren().forPath("/" + serviceName);
            // 这里默认用的第一个,后面加负载均衡
            String string = strings.get(0);
            return parseAddress(string);
        } catch (Exception e) {
            e.printStackTrace();
        }
        return null;
    }

    // 地址 -> XXX.XXX.XXX.XXX:port 字符串
    private String getServiceAddress(InetSocketAddress serverAddress) {
        return serverAddress.getHostName() + ":" + serverAddress.getPort();
    }

    // 字符串解析为地址:按照":"切分开,前半是host(String),后半是port(int)
    private InetSocketAddress parseAddress(String address) {
        String[] result = address.split(":");
        return new InetSocketAddress(result[0], Integer.parseInt(result[1]));
    }
}

接下来可以对service,client和server进行修改。

client

NettyRPCClient.java 做一点修改:

package com.rpc.client;

import com.rpc.common.RPCRequest;
import com.rpc.common.RPCResponse;
import com.rpc.register.ServiceRegister;
import com.rpc.register.ZkServiceRegister;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.util.AttributeKey;

import java.net.InetSocketAddress;

/**
 * @author zwy
 *
 * 实现RPCClient接口
 */
public class NettyRPCClient implements RPCClient {
    private static final Bootstrap bootstrap;
    private static final EventLoopGroup eventLoopGroup;
    private String host;
    private int port;
    private ServiceRegister serviceRegister; // ServiceRegister接口类class

    // 构造函数:初始化zookeeper
    public NettyRPCClient() {
        this.serviceRegister = new ZkServiceRegister();
    }
    // netty客户端初始化,重复使用
    static {
        eventLoopGroup = new NioEventLoopGroup();
        bootstrap = new Bootstrap();
        bootstrap.group(eventLoopGroup).channel(NioSocketChannel.class)
                .handler(new NettyClientInitializer());
    }

    /**
     * 这里需要操作一下,因为netty的传输都是异步的,你发送request,会立刻返回一个值, 而不是想要的相应的response
     */
    @Override
    public RPCResponse sendRequest(RPCRequest request) {
        InetSocketAddress address = serviceRegister.serviceDiscovery(request.getInterfaceName());
        host = address.getHostName();
        port = address.getPort();
        try {
            ChannelFuture channelFuture  = bootstrap.connect(host, port).sync();
            Channel channel = channelFuture.channel();
            // 发送数据
            channel.writeAndFlush(request);
            channel.closeFuture().sync();
            // 阻塞的获得结果,通过给channel设计别名,获取特定名字下的channel中的内容(这个在hanlder中设置)
            // AttributeKey是,线程隔离的,不会由线程安全问题。
            // 实际上不应通过阻塞,可通过回调函数,后面可以再进行优化
            AttributeKey<RPCResponse> key = AttributeKey.valueOf("RPCResponse");
            RPCResponse response = channel.attr(key).get();

            System.out.println(response);
            return response;
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        return null;
    }
}


TestClient.java 也做相应的修改:

package com.rpc.client;

import com.rpc.common.Blog;
import com.rpc.common.User;
import com.rpc.service.BlogService;
import com.rpc.service.UserService;


/**
 * @author zwy
 */
public class TestClient {
    public static void main(String[] args) {
        // 不需传host,port
        RPCClient rpcClient = new NettyRPCClient();
        // 把这个客户端传入代理客户端
        RPCClientProxy rpcClientProxy = new RPCClientProxy(rpcClient);
        // 代理客户端根据不同的服务,获得一个代理类, 并且这个代理类的方法以或者增强(封装数据,发送请求)
        UserService userService = rpcClientProxy.getProxy(UserService.class);

        // 服务的方法1
        User userByUserId = userService.getUserByUserId(10);
        System.out.println("从服务器端得到的user为:" + userByUserId);

        // 服务的方法2
        User user = User.builder().userName("张三").id(100).sex(true).build();
        Integer integer = userService.insertUserId(user);
        System.out.println("向服务器端插入数据" + integer);

        // 服务的方法3
        BlogService blogService = rpcClientProxy.getProxy(BlogService.class);
        Blog blogById = blogService.getBlogById(10000);
        System.out.println("从服务端得到的blog为:" + blogById);
    }
}

client中的其他代码和simpleRPC-05一样,可以直接从simpleRPC-05复制粘贴过来,为了完整,我还是把代码放下面:

RPCClient.java

package com.rpc.client;


import com.rpc.common.RPCRequest;
import com.rpc.common.RPCResponse;

/**
 * @author zwy
 *
 * RPC客户端:发送请求,获得response
 */
public interface RPCClient {
    RPCResponse sendRequest(RPCRequest request);
}

RPCClientProxy.java

package com.rpc.client;

import com.rpc.common.RPCRequest;
import com.rpc.common.RPCResponse;
import lombok.AllArgsConstructor;

import java.lang.reflect.InvocationHandler;
import java.lang.reflect.Method;
import java.lang.reflect.Proxy;

/**
 * @author zwy
 *
 * 客户端代理:把动态代理封装request对象(这里和simpleRPC-02的ClientProxy函数一样,保留了动态代理的设计)
 */
@AllArgsConstructor
public class RPCClientProxy implements InvocationHandler {
    private RPCClient client;

    // jdk动态代理,每一次代理对象调用方法,会经过此方法增强(反射获取request对象,socket发送至客户端)
    @Override
    public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
        // request的构建,使用了lombok中的builder,更加简洁
        RPCRequest request = RPCRequest.builder().interfaceName(method.getDeclaringClass().getName())
                                       .methodName(method.getName())
                                       .params(args)
                                       .paramsTypes(method.getParameterTypes())
                                       .build();
        // 数据传输
        RPCResponse response = client.sendRequest(request);
//        System.out.println(response);
        return response.getData();
    }

    <T> T getProxy(Class<T> clazz) {
        Object o = Proxy.newProxyInstance(clazz.getClassLoader(), new Class[]{clazz}, this);
        return (T)o;
    }
}

NettyClientInitializer.java

package com.rpc.client;

import com.rpc.codec.JsonSerializer;
import com.rpc.codec.MyDecode;
import com.rpc.codec.MyEncode;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.socket.SocketChannel;

/**
 * @author zwy
 *
 * 同样的与服务端解码和编码格式
 */
public class NettyClientInitializer extends ChannelInitializer<SocketChannel> {

    @Override
    protected void initChannel(SocketChannel ch) throws Exception {
        ChannelPipeline pipeline = ch.pipeline();
        // 使用自定义的编解码器
        pipeline.addLast(new MyDecode());
        // 编码需要传入序列化器,这里是json,还支持ObjectSerializer,也可以自己实现其他的
        pipeline.addLast(new MyEncode(new JsonSerializer()));
        pipeline.addLast(new NettyClientHandler());
    }
}

NettyClientHandler.java

package com.rpc.client;

import com.rpc.common.RPCResponse;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.util.AttributeKey;

/**
 * @author zwy
 */
public class NettyClientHandler extends SimpleChannelInboundHandler<RPCResponse> {
    @Override
    protected void channelRead0(ChannelHandlerContext ctx, RPCResponse msg) throws Exception {
        // 接收到response, 给channel设计别名,让sendRequest里读取response
        AttributeKey<RPCResponse> key = AttributeKey.valueOf("RPCResponse");
        ctx.channel().attr(key).set(msg);
        ctx.channel().close();
    }

    // 跟NettyRPCServerHandler一样
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        cause.printStackTrace();
        ctx.close();
    }
}

service

服务暴露类加入注册的功能,ServiceProvider.java 做相应的修改:

package com.rpc.service;


import com.rpc.register.ServiceRegister;
import com.rpc.register.ZkServiceRegister;

import java.net.InetSocketAddress;
import java.util.HashMap;
import java.util.Map;

/**
 * @author zwy
 */
public class ServiceProvider {
    /**
     * 一个实现类可能实现多个服务接口,
     */
    private Map<String, Object> interfaceProvider;

    private ServiceRegister serviceRegister;
    private String host;
    private int port;

    public ServiceProvider(String host, int port){
        // 需要传入服务端自身的服务的网络地址
        this.host = host;
        this.port = port;
        this.interfaceProvider = new HashMap<>();
        this.serviceRegister = new ZkServiceRegister();
    }

    public void provideServiceInterface(Object service) throws Exception {
        Class<?>[] interfaces = service.getClass().getInterfaces();

        for(Class clazz : interfaces){
            // 本机的映射表
            interfaceProvider.put(clazz.getName(),service);
            // 在注册中心注册服务
            serviceRegister.register(clazz.getName(), new InetSocketAddress(host, port));
        }
    }

    public Object getService(String interfaceName){
        return interfaceProvider.get(interfaceName);
    }
}

service中的其他代码和simpleRPC-05一样,可以直接从simpleRPC-05复制粘贴过来,为了完整,我还是把代码放下面:

BlogService.java

package com.rpc.service;

import com.rpc.common.Blog;

public interface BlogService {

    Blog getBlogById(Integer id);
}

BlogServiceImpl.java

package com.rpc.service;

import com.rpc.common.Blog;

public class BlogServiceImpl implements BlogService {

    @Override
    public Blog getBlogById(Integer id) {
        Blog blog = Blog.builder()
                        .id(id)
                        .title("我的博客")
                        .useId(22).build();
        System.out.println("客户端查询了" + id + "博客");
        return blog;
    }
}

UserService.java

package com.rpc.service;

import com.rpc.common.User;

/**
 * @author zwy
 */
public interface UserService {

    // 客户端通过这个接口调用服务端的实现类
    User getUserByUserId(Integer id);

    // 给这个服务增加一个功能
    Integer insertUserId(User user);
}

UserServiceImpl.java

package com.rpc.service;

import com.rpc.common.User;


/**
 * @author zwy
 */
public class UserServiceImpl implements UserService {

    @Override
    public User getUserByUserId(Integer id) {
        // 模拟从数据库中取用户的行为
        User user = User.builder()
                        .id(id)
                        .userName("he2121")
                        .sex(true).build();
        System.out.println("客户端查询了" + id + "的用户");
        return user;
    }

    @Override
    public Integer insertUserId(User user) {
        System.out.println("插入数据成功: " + user);
        return 1;
    }
}

server

TestServer.java 做相应的修改:

package com.rpc.server;

import com.rpc.service.*;

public class TestServer {
    public static void main(String[] args) throws Exception {
        UserService userService = new UserServiceImpl();
        BlogService blogService = new BlogServiceImpl();

        // 这里重用了服务暴露类,顺便在注册中心注册,实际上应分开,每个类做各自独立的事
        ServiceProvider serviceProvider = new ServiceProvider("127.0.0.1", 8899); // 8899
        serviceProvider.provideServiceInterface(userService);
        serviceProvider.provideServiceInterface(blogService);

        RPCServer RPCServer = new NettyRPCServer(serviceProvider);
        RPCServer.start(8899);
    }
}

server中的其他代码和simpleRPC-05一样,可以直接从simpleRPC-05复制粘贴过来,为了完整,我还是把代码放下面:

RPCServer.java

package com.rpc.server;

/**
 * @author zwy
 */
public interface RPCServer {
    void start(int port);
    void stop();
}

NettyRPCServer.java

package com.rpc.server;

import com.rpc.service.ServiceProvider;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import lombok.AllArgsConstructor;


/**
 * @author zwy
 */
@AllArgsConstructor
public class NettyRPCServer implements RPCServer {
    private ServiceProvider serviceProvider;

    @Override
    public void start(int port) {
        // netty服务线程组负责建立连接(TCP/IP连接),work负责具体的请求
        NioEventLoopGroup bossGroup = new NioEventLoopGroup();
        NioEventLoopGroup workGroup = new NioEventLoopGroup();
        System.out.println("Netty服务端启动了");

        try {
            // 启动Netty服务器
            ServerBootstrap serverBootstrap = new ServerBootstrap();
            // 初始化
            serverBootstrap.group(bossGroup, workGroup).channel(NioServerSocketChannel.class)
                           .childHandler(new NettyServerInitializer(serviceProvider));
            // 同步阻塞
            ChannelFuture channelFuture = serverBootstrap.bind(port).sync();
            // 死循环监听
            channelFuture.channel().closeFuture().sync();
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            bossGroup.shutdownGracefully();
            workGroup.shutdownGracefully();
        }
    }

    @Override
    public void stop() {

    }
}

NettyServerInitializer.java

package com.rpc.server;

import com.rpc.codec.JsonSerializer;
import com.rpc.codec.MyDecode;
import com.rpc.codec.MyEncode;
import com.rpc.service.ServiceProvider;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.socket.SocketChannel;
import lombok.AllArgsConstructor;

/**
 * @author zwy
 */
@AllArgsConstructor
public class NettyServerInitializer extends ChannelInitializer<SocketChannel> {
    private ServiceProvider serviceProvider;

    @Override
    protected void initChannel(SocketChannel ch) throws Exception {
        ChannelPipeline pipeline = ch.pipeline();
        // 使用自定义的解码器
        pipeline.addLast(new MyDecode());
        // 使用自定义的编码器,而且解码器需要传入序列化器,这里是json,还支持ObjectSerializer,也可以自己实现其他的
        pipeline.addLast(new MyEncode(new JsonSerializer()));
        pipeline.addLast(new NettyRPCServerHandler(serviceProvider));
    }
}

NettyRPCServerHandler.java

package com.rpc.server;

import com.rpc.common.RPCRequest;
import com.rpc.common.RPCResponse;
import com.rpc.service.ServiceProvider;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import lombok.AllArgsConstructor;

import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;

/**
 * @author zwy
 */
@AllArgsConstructor
public class NettyRPCServerHandler extends SimpleChannelInboundHandler<RPCRequest> {
    private ServiceProvider serviceProvider;

    @Override
    protected void channelRead0(ChannelHandlerContext ctx, RPCRequest msg) throws Exception {
        // System.out.println(msg);
        RPCResponse response = getResponse(msg);
        ctx.writeAndFlush(response);
        ctx.close();
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        cause.printStackTrace();
        ctx.close();
    }

    // 这里和WorkThread里的getResponse差不多
    RPCResponse getResponse(RPCRequest request) {
        // 得到服务名
        String interfaceName = request.getInterfaceName();
        // 得到服务器相应类
        Object service = serviceProvider.getService(interfaceName);
        // 反射调用方法
        Method method = null;
        try {
            method = service.getClass().getMethod(request.getMethodName(), request.getParamsTypes());
            Object invoke = method.invoke(service, request.getParams());
            return RPCResponse.success(invoke);
        } catch (NoSuchMethodException | IllegalAccessException | InvocationTargetException e) {
            e.printStackTrace();
            System.out.println("方法执行错误");
            return RPCResponse.fail();
        }
    }
}

common

和simpleRPC-05一样,可以直接复制过来。

在这里插入图片描述

codec

和simpleRPC-05一样,可以直接复制过来。

在这里插入图片描述

文件结构

simpleRPC-06的文件结构如下:

在这里插入图片描述

在这里插入图片描述

运行

启动TestServer.java :
在这里插入图片描述
然后启动TestClient.java:

在这里插入图片描述
在这里插入图片描述

我们来看看我们最开始开的zookeeper客户端:

现在输入ls /

在这里插入图片描述

发现我们多了一个结点 MyRPC:

输入ls /MyRPC

在这里插入图片描述
可以看到我们注册的服务都在这里,成功!

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

【手写一个RPC框架】simpleRPC-06 的相关文章

  • 网工我劝你,这11种协议报文格式要烂熟于心!

    你们好 我的网工朋友 你最熟悉的报文是啥 TCP UDP 在网络世界里 就如同道路系统让车辆畅通无阻一样 网络协议是确保信息顺畅传输的关键 作为网络工程师 理解网络协议的种类与结构至关重要 今天就和你一起探索一下11种关键协议报文格式啊 包
  • 通过一个寒假能学会黑客技术吗?看完你就知道了

    一个寒假能成为黑客吗 资深白帽子来告诉你 如果你想的是学完去美国五角大楼内网随意溜达几圈 想顺走一点机密文件的话 劝你还是趁早放弃 但是成为一名初级黑客还是绰绰有余的 你只需要掌握好渗透测试 Web安全 数据库 搞懂web安全防护 SQL注
  • 广告竞价策略:激发广告变现潜能的关键

    在数字化时代 广告已经成为企业推广品牌 产品和服务的关键手段之一 为了最大程度地发挥广告的效果 广告竞价策略成为广告主和数字营销专业人士关注的焦点 通过巧妙运用竞价策略 广告主可以在激烈的市场竞争中脱颖而出 实现广告变现的潜能 admaoy
  • 成为一个黑客,就按照这个路线来!

    前几天一个同学在聊天中提到毕业后想要从事网络安全方向的工作 虽然他本身也是学计算机的 但是又怕心有余而力不足 因为 从事网络安全方面的工作向来起点都比较高 大学里少有开设这类课程的 在学校能够学到的知识比较有限 网上的关于这方面课程的质量又
  • 【CTF必看】从零开始的CTF学习路线(超详细),让你从小白进阶成大神!

    最近很多朋友在后台私信我 问应该怎么入门CTF 个人认为入门CTF之前大家应该先了解到底 什么是CTF 而你 学CTF的目的又到底是什么 其次便是最好具备相应的编程能力 若是完全不具备这些能力极有可能直接被劝退 毕竟比赛的时候动不动写个脚本
  • 【信道估计】【MIMO】【FBMC】未来移动通信的滤波器组多载波调制方案(Matlab代码实现)

    欢迎来到本博客 博主优势 博客内容尽量做到思维缜密 逻辑清晰 为了方便读者 座右铭 行百里者 半于九十 本文目录如下 目录 1 概述 2 运行结果 3 参考文献 4 Matlab代码及文章
  • 前端必备的 web 安全知识手记

    前言 安全这种东西就是不发生则已 一发生则惊人 作为前端 平时对这方面的知识没啥研究 最近了解了下 特此沉淀 文章内容包括以下几个典型的 web 安全知识点 XSS CSRF 点击劫持 SQL 注入和上传问题等 下文以小王代指攻击者 话不多
  • WEB前端常见受攻击方式及解决办法总结

    一个网址建立后 如果不注意安全问题 就很容易被人攻击 下面讨论一下集中漏洞情况和放置攻击的方法 一 SQL注入 所谓的SQL注入 就是通过把SQL命令插入到web表单提交或输入域名或页面请求的查询字符串 最终达到欺骗服务器执行恶意的SQL命
  • 用户数据中的幸存者偏差

    幸存者偏差 Survivorship bias 是一种常见的逻辑谬误 意思是没有考虑到筛选的过程 忽略了被筛选掉的关键信息 只看到经过筛选后而产生的结果 先讲个故事 二战时 无奈德国空防强大 盟军战机损毁严重 于是军方便找来科学家统计飞机受
  • 「网络安全渗透」如果你还不懂CSRF?这一篇让你彻底掌握

    1 什么是 CSRF 面试的时候的著名问题 谈一谈你对 CSRF 与 SSRF 区别的看法 这个问题 如果我们用非常通俗的语言讲的话 CSRF 更像是钓鱼的举动 是用户攻击用户的 而对于 SSRF 来说 是由服务器发出请求 用户 日 服务器
  • 通俗易懂,十分钟读懂DES,详解DES加密算法原理,DES攻击手段以及3DES原理

    文章目录 1 什么是DES 2 DES的基本概念 3 DES的加密流程 4 DES算法步骤详解 4 1 初始置换 Initial Permutation IP置换 4 2 加密轮次 4 3 F轮函数 4 3 1 拓展R到48位 4 3 2
  • 【信道估计】【MIMO】【FBMC】未来移动通信的滤波器组多载波调制方案(Matlab代码实现)

    欢迎来到本博客 博主优势 博客内容尽量做到思维缜密 逻辑清晰 为了方便读者 座右铭 行百里者 半于九十 本文目录如下 目录 1 概述 2 运行结果 3 参考文献 4 Matlab代码及文章
  • 如何使用Imagewheel搭建一个简单的的私人图床无公网ip也能访问

    文章目录 1 前言 2 Imagewheel网站搭建 2 1 Imagewheel下载和安装 2 2 Imagewheel网页测试 2 3 cpolar的安装和注册 3 本地网页发布 3 1 Cpolar临时数据隧道
  • 揭秘网络世界的幕后密码——Wireshark网络协议分析软件

    在我们日常生活中 计算机和互联网已经成为不可或缺的一部分 然而 很少有人真正了解网络背后复杂的工作原理和通信协议 幸运的是 有一款强大而实用的软件 Wireshark 可以帮助我们深入了解网络世界的幕后密码 Wireshark是一款免费的网
  • 为什么我强烈推荐大学生打CTF!

    前言 写这个文章是因为我很多粉丝都是学生 经常有人问 感觉大一第一个学期忙忙碌碌的过去了 啥都会一点 但是自己很难系统的学习到整个知识体系 很迷茫 想知道要如何高效学习 这篇文章我主要就围绕两点 减少那些罗里吧嗦的废话 直接上干货 CTF如
  • 【无标题】

    大家都知道该赛项的规程和样题向来都是模棱两可 从来不说具体的内容 导致选手在备赛时没有头绪 不知道该怎么训练 到了赛时发现题目和备赛的时候完全不一样 那么本文将以往年信息安全管理与评估赛项经验来解读今年2023年国赛的规程 帮助选手们指明方
  • 【安全】网络安全态势感知

    文章目录 一 态势感知简介 1 概念 2 形象举例 3 应具备的能力 二 为什么要态势感知 为什么网络安全态势感知很重要 三 态势感知系统的功能 四 如何评估态势感知的建设结果 五 什么是态势感知的三个层级 四 业界的态势感知产品 1 安全
  • Java RMI 和 RPC 有什么区别?

    Java RMI 和 RPC 之间的实际区别是什么 我在一些地方读到 RMI 使用对象 RPC是基于C的 因此它具有结构化编程语义 另一方面 RMI是基于Java的技术 并且是面向对象的 通过 RPC 您可以调用导出到服务器中的远程函数 在
  • 远程过程调用认证

    我正在使用远程过程调用 RPC 在本地计算机上通信数据 我的要求是使用 RPC 在两个处理之间通信数据 但服务器应该通过某种方式对客户端进行身份验证 我遇到了 RpcBindingSetAuthInfo 它设置身份验证和授权信息 第四个参数
  • java.lang.ClassNotFoundException:com.google.gwt.user.client.rpc.RemoteService

    在 Tomcat 6 中部署 war 文件时出现以下异常 java lang ClassNotFoundException com google gwt user client rpc RemoteService 所以我尝试通过 webAp

随机推荐

  • 【C++】string

    C 修炼秘籍 string 目录 C 修炼秘籍 string 文章目录 前言 一 标准库里的string 二 string常用接口功能简介 具体使用和底层转到模拟实现 1 string类的常见构造函数 2 string类对象的容量操作 3
  • 【华为OD统一考试B卷

    华为OD统一考试A卷 B卷 新题库说明 2023年5月份 华为官方已经将的 2022 0223Q 1 2 3 4 统一修改为OD统一考试 A卷 和OD统一考试 B卷 你收到的链接上面会标注A卷还是B卷 请注意 根据反馈 目前大部分收到的都是
  • Vue中DOM的更新为什么是异步的?

    在 Vue 中 DOM 的更新是异步的机制是为了优化性能和提升用户体验 这个机制被称为 异步更新队列 Vue的异步更新队列机制是其实现高效渲染的关键 它通过将多次数据变化合并到一个批处理中 从而减少了不必要的DOM操作 提高了性能 下面是V
  • costmap 代价地图

    转自 https sychaichangkun gitbooks io ros tutorial icourse163 content chapter10 10 3 html 10 3 costmap costmap是Navigation
  • 嵩天老师的零基础Python笔记:https://www.bilibili.com/video/av13570243/?from=search&seid=15873837810484552531 中的...

    coding gbk 嵩天老师的零基础Python笔记 https www bilibili com video av13570243 from search seid 15873837810484552531 中的15 22讲 数字类型的
  • python批量发送带附件的邮件时,收到的附件名异常且后缀为bin,解决办法

    1 问题描述 在使用 Python 内置的模块 smtplib 发送带中文名的附件邮件时 数据可以正常传输 但是但收件人收到的附件格式是bin 附件名也不是发送时的名称 附注 smtp Simple Mail Transfer Protoc
  • 进一步理解angular

    在上一篇文章中我介绍了安装和结构 以及运行过程 https blog csdn net weixin 42603009 article details 94382945 我们进一步理解其中的数据单向 双向 组建传值 Modules等的理解
  • C语言编译过程、VIM常用命令

    一 编译过程 1 预处理 gcc E 源文件 c o 源文件 i 预处理宏以及注释 2 编译 gcc S 源文件 i o 源文件 s 通过编译转换为汇编文件 3 汇编 gcc c 源文件 s o 源文件 o 经汇编转换为二进制文件 4 链接
  • linux创建链接文件

    链接文件的创建 1 概念 链接文件就类似我们windows的快捷方式 只保留目标文件的地址 不占用存储空间 使用链接文件与使用目标文件的效果是一样的 2 为什么要使用链接文件 在windows都会把文件放在一个比较大的磁盘中 我们每次需要使
  • ChatGPT 速通手册——GPT 训练数据集介绍

    GPT 训练数据集介绍 所有人工智能算法都会分为训练和推理两步 算法的效果好坏 很大程度上取决于训练数据本身的质量 ChatGPT 所用的训练数据 openai 公司没有单独公布过细节 不过考虑到 ChatGPT 是在前序 GPT 算法基础
  • linux使用记录(一)

    1 tar 解压tar xvf file tar 解压 tar包 tar xzvf file tar gz 解压tar gz tar xjvf file tar bz2 解压 tar bz2 tar xZvf file tar Z 解压ta
  • 使用ADO.NET访问数据库

    一 ADO NET 数据库访问的方法和技术 二 ADO NEt的重要组件 1 DataSet 独立于数据源的数据访问 2 Net framework数据提供程序 用于连接到数据库执行命令和检索结果 三 NET数据提供程序的四个核心对象 1
  • 无向图——邻接表和邻接矩阵的实现

    邻接矩阵 include
  • 封装七牛云存储工具类

    文章目录 封装七牛云存储工具类 为啥选择七牛云 当然是因为它能免费使用喽 白嫖怪哈哈哈 图片存储方案 Java SDK操作七牛云 封装工具类 封装七牛云存储工具类 为啥选择七牛云 当然是因为它能免费使用喽 白嫖怪哈哈哈 图片存储方案 在实际
  • UnityShader——Compute Shader

    Compute Shader是基于DX11 SM4 5 的在GPU上运行的程序 通过Compute Shader我们可以将大量可以并行的计算放到GPU中计算从而节省CPU资源 Unity 5 6版本提供的 Graphics DrawMesh
  • U盘数据丢失是什么原因?轻松让U盘数据恢复的教程

    在数字化时代 我们不可避免地使用各种便携式存储设备 如U盘 来传输和存储重要数据 然而 有时我们可能不小心删除了U盘中的文件 或者格式化了U盘等等而导致数据丢失 这种情况下 你可能会困惑地想知道 是否有简单的方式可以快速而轻松地恢复U盘中丢
  • 解决报错:You don‘t have enough free space in /var/cache/apt/archives/.

    apt安装package时遇到问题 You don t have enough free space in var cache apt archives 问题 var 目录下空间不足 apt下载时没有地方存放缓存文件 解决方法 删除部分日志
  • Java中解决CAS机制出现的ABA问题

    Java中解决CAS机制出现的ABA问题 学习目标 Java中解决CAS机制出现的ABA问题 1 先了解一下什么是CAS 2 CAS的底层原理 3 CAS的问题 4 怎么解决ABA问题 1 先了解一下什么是CAS 一句话总结就是 比较并交换
  • 分库分表需要考虑的问题及方案

    转自 http www jianshu com p 32b3e91aa22c from timeline 分库分表需要考虑的问题及方案 作者 jackcooper 2017 02 08 16 08 字数 5042 阅读 5240 评论 3
  • 【手写一个RPC框架】simpleRPC-06

    目录 前言 实现 zookeeper安装与使用 项目创建 依赖配置 register client service server common codec 文件结构 运行 本项目所有代码可见 https github com weiyu z