文章目录
- 一、概述
- 1、编解码基础
- 2、Netty编解码器
- 3、Protobuf概述
- 二、Protobuf基本使用
- 1、引入jar包
- 2、下载Protobuf
- 3、编写Student.proto
- 4、生成StudentPOJO类
- 5、服务器端
- 6、客户端
- 7、验证一下吧
- 三、Netty使用Protobuf发送多类型对象
- 1、编写Student.proto
- 2、生成MyDataInfo.java
- 3、服务端
- 4、客户端
- 5、测试一下吧
一、概述
1、编解码基础
编写网络应用程序时,因为数据在网络中传输的都是二进制字节码数据,在发送数据时就需要编码,接收数据0时就需要解码。
通常来说,codec(编解码器)的组成部分有两个: decoder(解码器)和 encoder(编码器)。encoder 负责把业务数据转换成字节2)码数据,decoder 负责把字节码数据转换成业务数据。
2、Netty编解码器
Netty自身也携带了一些codec(编解码器):
StringDecoder
StringEncoder
ObjectDecoder
ObjectEncoder
...
其中,Netty自带的ObjectDecoder 和ObjectEncoder 可以实现对POJO对象或各种业务对象的编码和解码,但是底层使用的仍是Java序列化技术,而Java序列化技术效率不高并且无法跨语言,所以Google的Protobuf是当今最火热的编解码器。
3、Protobuf概述
Protobuf 是 Google 发布的开源项目,全称 Google Protocol Buffers,是一种轻便高效的结构化数据存储格式,可以用于结构化数据串行化,或者说序列化。它很适合做数据存储或 RPC[远程过程调用 remote procedurecall] 数据交换格式。
参考文档:https://developers.google.com/protocol-buffers/docs/proto
Protobug是以message的方式来管理数据的,支持跨平台、跨语言,即 [ 客户端和服务器端可以是不同语言编写的 ] (支持目前绝大多数语言,例如C++、C#、Java、Python、Go等),性能高,可靠性 高。
二、Protobuf基本使用
1、引入jar包
<dependency>
<groupId>com.google.protobuf</groupId>
<artifactId>protobuf-java</artifactId>
<version>3.6.1</version>
</dependency>
2、下载Protobuf
https://github.com/protocolbuffers/protobuf/releases
我这里使用的是3.6.1版本,要找到对应的版本:https://github.com/protocolbuffers/protobuf/releases?page=11
3、编写Student.proto
syntax = "proto3";
option java_outer_classname = "StudentPOJO";
message Student {
int32 id = 1;
string name = 2;
string msg = 3;
}
4、生成StudentPOJO类
(1)将Student.proto文件拷贝到protoc.exe同级目录并执行命令:
protoc-3.6.1-win32\bin>protoc.exe --java_out=. Student.proto
会生成一个StudentPOJO.java文件,该文件可以放在项目中使用了,一定不要修改该文件。
5、服务器端
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.protobuf.ProtobufDecoder;
public class NettyServer {
public static void main(String[] args) throws Exception {
EventLoopGroup bossGroup = new NioEventLoopGroup(1);
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
ServerBootstrap bootstrap = new ServerBootstrap();
bootstrap.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.option(ChannelOption.SO_BACKLOG, 128)
.childOption(ChannelOption.SO_KEEPALIVE, true)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
pipeline.addLast("decoder", new ProtobufDecoder(StudentPOJO.Student.getDefaultInstance()));
pipeline.addLast(new NettyServerHandler());
}
});
System.out.println(".....服务器 is ready...");
ChannelFuture cf = bootstrap.bind(6668).sync();
cf.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
if (cf.isSuccess()) {
System.out.println("监听端口 6668 成功");
} else {
System.out.println("监听端口 6668 失败");
}
}
});
cf.channel().closeFuture().sync();
}finally {
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
}
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.*;
import io.netty.util.CharsetUtil;
public class NettyServerHandler extends SimpleChannelInboundHandler<StudentPOJO.Student> {
@Override
public void channelRead0(ChannelHandlerContext ctx, StudentPOJO.Student msg) throws Exception {
System.out.println("客户端发送的数据 id=" + msg.getId() + " 名字=" + msg.getName() + "msg" + msg.getMsg());
}
@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
ctx.writeAndFlush(Unpooled.copiedBuffer("hello, 客户端", CharsetUtil.UTF_8));
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
ctx.close();
}
}
6、客户端
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.protobuf.ProtobufEncoder;
public class NettyClient {
public static void main(String[] args) throws Exception {
EventLoopGroup group = new NioEventLoopGroup();
try {
Bootstrap bootstrap = new Bootstrap();
bootstrap.group(group)
.channel(NioSocketChannel.class)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
pipeline.addLast("encoder", new ProtobufEncoder());
pipeline.addLast(new NettyClientHandler());
}
});
System.out.println("客户端 ok..");
ChannelFuture channelFuture = bootstrap.connect("127.0.0.1", 6668).sync();
channelFuture.channel().closeFuture().sync();
}finally {
group.shutdownGracefully();
}
}
}
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.util.CharsetUtil;
public class NettyClientHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
StudentPOJO.Student student = StudentPOJO.Student.newBuilder()
.setId(4)
.setName("张三")
.setMsg("我会武功,是个饭桶")
.build();
ctx.writeAndFlush(student);
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
ByteBuf buf = (ByteBuf) msg;
System.out.println("服务器回复的消息:" + buf.toString(CharsetUtil.UTF_8));
System.out.println("服务器的地址: "+ ctx.channel().remoteAddress());
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
cause.printStackTrace();
ctx.close();
}
}
7、验证一下吧
.....服务器 is ready...
监听端口 6668 成功
客户端发送的数据 id=4 名字=张三msg我会武功,是个饭桶
客户端 ok..
服务器回复的消息:hello, 客户端
服务器的地址: /127.0.0.1:6668
三、Netty使用Protobuf发送多类型对象
1、编写Student.proto
syntax = "proto3";
option optimize_for = SPEED;
option java_outer_classname="MyDataInfo";
message MyMessage {
enum DataType {
StudentType = 0;
WorkerType = 1;
}
DataType data_type = 1;
oneof dataBody {
Student student = 2;
Worker worker = 3;
}
}
message Student {
int32 id = 1;
string name = 2;
}
message Worker {
string name=1;
int32 age=2;
}
2、生成MyDataInfo.java
(1)将Student.proto文件拷贝到protoc.exe同级目录并执行命令:
protoc-3.6.1-win32\bin>protoc.exe --java_out=. Student.proto
生成的MyDataInfo.java可以拷贝到项目中使用了。
3、服务端
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.protobuf.ProtobufDecoder;
public class NettyServer {
public static void main(String[] args) throws Exception {
EventLoopGroup bossGroup = new NioEventLoopGroup(1);
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
ServerBootstrap bootstrap = new ServerBootstrap();
bootstrap.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.option(ChannelOption.SO_BACKLOG, 128)
.childOption(ChannelOption.SO_KEEPALIVE, true)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
pipeline.addLast("decoder", new ProtobufDecoder(MyDataInfo.MyMessage.getDefaultInstance()));
pipeline.addLast(new NettyServerHandler());
}
});
System.out.println(".....服务器 is ready...");
ChannelFuture cf = bootstrap.bind(6668).sync();
cf.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
if (cf.isSuccess()) {
System.out.println("监听端口 6668 成功");
} else {
System.out.println("监听端口 6668 失败");
}
}
});
cf.channel().closeFuture().sync();
}finally {
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
}
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.util.CharsetUtil;
public class NettyServerHandler extends SimpleChannelInboundHandler<MyDataInfo.MyMessage> {
@Override
public void channelRead0(ChannelHandlerContext ctx, MyDataInfo.MyMessage msg) throws Exception {
MyDataInfo.MyMessage.DataType dataType = msg.getDataType();
if(dataType == MyDataInfo.MyMessage.DataType.StudentType) {
MyDataInfo.Student student = msg.getStudent();
System.out.println("学生id=" + student.getId() + " 学生名字=" + student.getName());
} else if(dataType == MyDataInfo.MyMessage.DataType.WorkerType) {
MyDataInfo.Worker worker = msg.getWorker();
System.out.println("工人的名字=" + worker.getName() + " 年龄=" + worker.getAge());
} else {
System.out.println("传输的类型不正确");
}
}
@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
ctx.writeAndFlush(Unpooled.copiedBuffer("hello, 客户端~", CharsetUtil.UTF_8));
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
ctx.close();
}
}
4、客户端
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.protobuf.ProtobufEncoder;
public class NettyClient {
public static void main(String[] args) throws Exception {
EventLoopGroup group = new NioEventLoopGroup();
try {
Bootstrap bootstrap = new Bootstrap();
bootstrap.group(group)
.channel(NioSocketChannel.class)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
pipeline.addLast("encoder", new ProtobufEncoder());
pipeline.addLast(new NettyClientHandler());
}
});
System.out.println("客户端 ok..");
ChannelFuture channelFuture = bootstrap.connect("127.0.0.1", 6668).sync();
channelFuture.channel().closeFuture().sync();
}finally {
group.shutdownGracefully();
}
}
}
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.util.CharsetUtil;
import java.util.Random;
public class NettyClientHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
int random = new Random().nextInt(3);
MyDataInfo.MyMessage myMessage = null;
if(0 == random) {
myMessage = MyDataInfo.MyMessage.newBuilder().setDataType(MyDataInfo.MyMessage.DataType.StudentType).setStudent(MyDataInfo.Student.newBuilder().setId(5).setName("张三").build()).build();
} else {
myMessage = MyDataInfo.MyMessage.newBuilder().setDataType(MyDataInfo.MyMessage.DataType.WorkerType).setWorker(MyDataInfo.Worker.newBuilder().setAge(20).setName("李四").build()).build();
}
ctx.writeAndFlush(myMessage);
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
ByteBuf buf = (ByteBuf) msg;
System.out.println("服务器回复的消息:" + buf.toString(CharsetUtil.UTF_8));
System.out.println("服务器的地址: "+ ctx.channel().remoteAddress());
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
cause.printStackTrace();
ctx.close();
}
}
5、测试一下吧
.....服务器 is ready...
监听端口 6668 成功
工人的名字=李四 年龄=20
学生id=5 学生名字=张三
工人的名字=李四 年龄=20
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)