一、Zookeeper 服务注册与发现
Zookeeper 用于 服务注册与发现
public class ZooKeeperOp {
private final static Logger LOGGER = LoggerFactory.getLogger(ZooKeeperOp.class);
public final static int ZK_SESSION_TIMEOUT = 10000;
public final static int ZK_CONNECTION_TIMEOUT = 40000;
private final static String zkAddress = "localhost:2181";
private final static ZkClient zkClient = new ZkClient(zkAddress, ZK_SESSION_TIMEOUT, ZK_CONNECTION_TIMEOUT);
public static void register(String serviceName, String serviceAddress) {
if (!zkClient.exists(serviceName)) {
zkClient.createPersistent(serviceName);
}
zkClient.createEphemeral(serviceName + "/" + serviceAddress);
LOGGER.info("create node {}", serviceName + "/" + serviceAddress);
}
public static String discover(String serviceName) {
List children = zkClient.getChildren(serviceName);
if (children == null || children.size() == 0) {
return "";
}
return children.get(ThreadLocalRandom.current().nextInt(children.size()));
}
}
二、netty 服务端
工具类
Request
public class Request implements Serializable {
private String requestId;
private String interfaceName;
private String methodName;
private Class<?>[] parameterTypes;
private Object parameter[];
// set、get 方法
}
Response
public class Response implements Serializable {
private String requestId;
private Object result;
// set、get 方法
}
NettyServer
public class NettyServer {
private static final Logger LOGGER = LoggerFactory.getLogger(NettyServer.class);
public static void start(String ip, int port, Map<String, Object> params) throws Exception {
EventLoopGroup bossGroup = new NioEventLoopGroup(4);
EventLoopGroup workerGroup = new NioEventLoopGroup(4);
try {
ServerBootstrap serverBootstrap = new ServerBootstrap();
serverBootstrap.group(bossGroup, workerGroup)
// 设置非阻塞
.channel(NioServerSocketChannel.class)
// 临时存放已完成三次握手的请求的队列的最大长度。
// 如果未设置或所设置的值小于1,Java将使用默认值50。
// 如果大于队列的最大长度,请求会被拒绝
.option(ChannelOption.SO_BACKLOG, 1024)
.childOption(ChannelOption.SO_KEEPALIVE, true)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel socketChannel) {
socketChannel.pipeline()
.addLast(new RpcDecoder(Request.class))
.addLast(new RpcEncoder(Response.class))
.addLast(new RpcServerInboundHandler(params));
}
});
ChannelFuture future = serverBootstrap.bind(ip, port).sync();
if (future.isSuccess()) {
params.keySet().forEach(key -> ZooKeeperOp.register(key, ip + ":" + port));
}
future.channel().closeFuture().sync();
} finally {
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
}
RpcServerInboundHandler
public class RpcServerInboundHandler extends ChannelInboundHandlerAdapter {
private static final Logger LOGGER = LoggerFactory.getLogger(RpcServerInboundHandler.class);
private final Map<String, Object> handle;
public RpcServerInboundHandler(Map<String, Object> handle) {
this.handle = handle;
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
Request request = (Request) msg;
LOGGER.info("request data {}", request);
// jdk 反射调用
Object bean = handle.get(request.getInterfaceName());
Method method = bean.getClass().getMethod(request.getMethodName(), request.getParameterTypes());
method.setAccessible(true);
Object result = method.invoke(bean, request.getParameter());
Response response = new Response();
response.setRequestId(request.getRequestId());
response.setResult(result);
// client 接收到信息后主动关闭掉连接
ctx.writeAndFlush(response).addListener(ChannelFutureListener.CLOSE);
}
@Override
public void channelReadComplete(ChannelHandlerContext ctx) {
ctx.flush();
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
ctx.close();
}
}
三、netty 客户端
NettyClient
public class NettyClient {
private NettyClient() {
}
public static Response client(Request request) throws Exception {
EventLoopGroup group = new NioEventLoopGroup(4);
RpcClientInboundHandler rpcClientInboundHandler = new RpcClientInboundHandler();
try {
// 创建并初始化 Netty 客户端 Bootstrap 对象
Bootstrap bootstrap = new Bootstrap();
bootstrap.group(group)
.channel(NioSocketChannel.class)
.option(ChannelOption.TCP_NODELAY, true)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel channel) {
channel.pipeline()
.addLast(new RpcDecoder(Response.class))
.addLast(new RpcEncoder(Request.class))
.addLast(rpcClientInboundHandler);
}
});
// 连接 RPC 服务器
String[] discover = ZooKeeperOp.discover(request.getInterfaceName()).split(":");
ChannelFuture future = bootstrap.connect(discover[0], Integer.parseInt(discover[1])).sync();
// 写入 RPC 请求数据并关闭连接
Channel channel = future.channel();
channel.writeAndFlush(request).sync();
channel.closeFuture().sync();
return rpcClientInboundHandler.getResponse();
} finally {
group.shutdownGracefully();
}
}
}
RpcClientInboundHandler
public class RpcClientInboundHandler extends SimpleChannelInboundHandler<Response> {
private static final Logger LOGGER = LoggerFactory.getLogger(RpcClientInboundHandler.class);
private Response response;
public Response getResponse() {
return response;
}
@Override
protected void channelRead0(ChannelHandlerContext ctx, Response response) throws Exception {
this.response = response;
LOGGER.info("response data {}", response);
}
@Override
public void channelReadComplete(ChannelHandlerContext ctx) {
ctx.flush();
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
ctx.close();
}
}
四、生产端
生产者1
public class ProducerMain {
public static void main(String[] args) throws Exception {
IStudentService studentService = new StudentServiceImpl();
Map<String, Object> rpcMap = new HashMap<>();
rpcMap.put("/" + IStudentService.class.getName(), studentService);
String ip = "localhost";
int port = 8001;
NettyServer.start(ip, port, rpcMap);
}
}
生产者2
public class ProducerMain {
public static void main(String[] args) throws Exception {
ITeacherService teacherService = new TeacherServiceImpl();
Map<String, Object> rpcMap = new HashMap<>();
rpcMap.put("/" + ITeacherService.class.getName(), teacherService);
String ip = "localhost";
int port = 8002;
NettyServer.start(ip, port, rpcMap);
}
}
四、消费端
public class ConsumerMain {
public static void main(String[] args) throws Exception {
IStudentService studentService = RpcProxy.create(IStudentService.class);
String msg1 = studentService.say();
System.out.println(msg1);
ITeacherService teacherService = RpcProxy.create(ITeacherService.class);
String msg2 = teacherService.say();
System.out.println(msg2);
}
}