rpc · 2021-12-31 0

Netty、Zookeeper实现RPC

一、Zookeeper 服务注册与发现

Zookeeper 用于 服务注册与发现

 public class ZooKeeperOp {

    private final static Logger LOGGER = LoggerFactory.getLogger(ZooKeeperOp.class);

    public final static int ZK_SESSION_TIMEOUT = 10000;
    public final static int ZK_CONNECTION_TIMEOUT = 40000;

    private final static String zkAddress = "localhost:2181";
    private final static ZkClient zkClient = new ZkClient(zkAddress, ZK_SESSION_TIMEOUT, ZK_CONNECTION_TIMEOUT);

    public static void register(String serviceName, String serviceAddress) {
        if (!zkClient.exists(serviceName)) {
            zkClient.createPersistent(serviceName);
        }
        zkClient.createEphemeral(serviceName + "/" + serviceAddress);

        LOGGER.info("create node {}", serviceName + "/" + serviceAddress);
    }

    public static String discover(String serviceName) {
        List children = zkClient.getChildren(serviceName);
        if (children == null || children.size() == 0) {
            return "";
        }
        return children.get(ThreadLocalRandom.current().nextInt(children.size()));
    }
}

二、netty 服务端

工具类

Request

public class Request implements Serializable {

    private String requestId;
    private String interfaceName;
    private String methodName;
    private Class<?>[] parameterTypes;
    private Object parameter[];
    // set、get 方法
}

Response

public class Response implements Serializable {

    private String requestId;
    private Object result;
    // set、get 方法
}

NettyServer

public class NettyServer {

    private static final Logger LOGGER = LoggerFactory.getLogger(NettyServer.class);

    public static void start(String ip, int port, Map<String, Object> params) throws Exception {
        EventLoopGroup bossGroup = new NioEventLoopGroup(4);
        EventLoopGroup workerGroup = new NioEventLoopGroup(4);

        try {
            ServerBootstrap serverBootstrap = new ServerBootstrap();
            serverBootstrap.group(bossGroup, workerGroup)
                    // 设置非阻塞
                    .channel(NioServerSocketChannel.class)
                    // 临时存放已完成三次握手的请求的队列的最大长度。
                    // 如果未设置或所设置的值小于1,Java将使用默认值50。
                    // 如果大于队列的最大长度,请求会被拒绝
                    .option(ChannelOption.SO_BACKLOG, 1024)
                    .childOption(ChannelOption.SO_KEEPALIVE, true)
                    .childHandler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel socketChannel) {
                            socketChannel.pipeline()
                                    .addLast(new RpcDecoder(Request.class))
                                    .addLast(new RpcEncoder(Response.class))
                                    .addLast(new RpcServerInboundHandler(params));
                        }
                    });

            ChannelFuture future = serverBootstrap.bind(ip, port).sync();
            if (future.isSuccess()) {
                params.keySet().forEach(key -> ZooKeeperOp.register(key, ip + ":" + port));
            }
            future.channel().closeFuture().sync();
        } finally {
            bossGroup.shutdownGracefully();
            workerGroup.shutdownGracefully();
        }
    }

}

RpcServerInboundHandler

public class RpcServerInboundHandler extends ChannelInboundHandlerAdapter {

    private static final Logger LOGGER = LoggerFactory.getLogger(RpcServerInboundHandler.class);

    private final Map<String, Object> handle;

    public RpcServerInboundHandler(Map<String, Object> handle) {
        this.handle = handle;
    }

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        Request request = (Request) msg;
        LOGGER.info("request data {}", request);

        // jdk 反射调用
        Object bean = handle.get(request.getInterfaceName());
        Method method = bean.getClass().getMethod(request.getMethodName(), request.getParameterTypes());
        method.setAccessible(true);
        Object result = method.invoke(bean, request.getParameter());

        Response response = new Response();
        response.setRequestId(request.getRequestId());
        response.setResult(result);

        // client 接收到信息后主动关闭掉连接
        ctx.writeAndFlush(response).addListener(ChannelFutureListener.CLOSE);
    }

    @Override
    public void channelReadComplete(ChannelHandlerContext ctx) {
        ctx.flush();
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
        ctx.close();
    }
}

三、netty 客户端

NettyClient

public class NettyClient {

    private NettyClient() {

    }

    public static Response client(Request request) throws Exception {
        EventLoopGroup group = new NioEventLoopGroup(4);
        RpcClientInboundHandler rpcClientInboundHandler = new RpcClientInboundHandler();

        try {
            // 创建并初始化 Netty 客户端 Bootstrap 对象
            Bootstrap bootstrap = new Bootstrap();
            bootstrap.group(group)
                    .channel(NioSocketChannel.class)
                    .option(ChannelOption.TCP_NODELAY, true)
                    .handler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        public void initChannel(SocketChannel channel) {
                            channel.pipeline()
                                    .addLast(new RpcDecoder(Response.class))
                                    .addLast(new RpcEncoder(Request.class))
                                    .addLast(rpcClientInboundHandler);
                        }
                    });

            // 连接 RPC 服务器
            String[] discover = ZooKeeperOp.discover(request.getInterfaceName()).split(":");
            ChannelFuture future = bootstrap.connect(discover[0], Integer.parseInt(discover[1])).sync();

            // 写入 RPC 请求数据并关闭连接
            Channel channel = future.channel();
            channel.writeAndFlush(request).sync();
            channel.closeFuture().sync();

            return rpcClientInboundHandler.getResponse();
        } finally {
            group.shutdownGracefully();
        }
    }
}

RpcClientInboundHandler

public class RpcClientInboundHandler extends SimpleChannelInboundHandler<Response> {

    private static final Logger LOGGER = LoggerFactory.getLogger(RpcClientInboundHandler.class);

    private Response response;

    public Response getResponse() {
        return response;
    }

    @Override
    protected void channelRead0(ChannelHandlerContext ctx, Response response) throws Exception {
        this.response = response;
        LOGGER.info("response data {}", response);
    }

    @Override
    public void channelReadComplete(ChannelHandlerContext ctx) {
        ctx.flush();
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
        ctx.close();
    }

}

四、生产端

生产者1

public class ProducerMain {

    public static void main(String[] args) throws Exception {
        IStudentService studentService = new StudentServiceImpl();

        Map<String, Object> rpcMap = new HashMap<>();
        rpcMap.put("/" + IStudentService.class.getName(), studentService);

        String ip = "localhost";
        int port = 8001;
        NettyServer.start(ip, port, rpcMap);
    }
}

生产者2

public class ProducerMain {

    public static void main(String[] args) throws Exception {
        ITeacherService teacherService = new TeacherServiceImpl();

        Map<String, Object> rpcMap = new HashMap<>();
        rpcMap.put("/" + ITeacherService.class.getName(), teacherService);

        String ip = "localhost";
        int port = 8002;
        NettyServer.start(ip, port, rpcMap);
    }
}

四、消费端

public class ConsumerMain {

    public static void main(String[] args) throws Exception {
        IStudentService studentService = RpcProxy.create(IStudentService.class);
        String msg1 = studentService.say();
        System.out.println(msg1);

        ITeacherService teacherService = RpcProxy.create(ITeacherService.class);
        String msg2 = teacherService.say();
        System.out.println(msg2);
    }
}

五、完整示例代码

netty-rpc-sample