5.7 JDK序列化
1 ObjectEncoder与ObjectDecoder简介
ObjectEncoder
/ObjectDecoder
使用JDK序列化机制编解码,因此我们可以使用Java对象作为请求和响应参数,限制是对象必须实现Serializable。JDK序列化机制的缺点是:序列化的性能以及序列化后对象占用的字节数比较多。优点是:这是对JDK默认支持的机制,不需要引入第三方依赖。
如果仅仅是对象序列化,字节通过网络传输后,那么在解码时,无法判断到底多少个字节可以构成一个Java对象。因此需要结合长度编码,也就是添加一个Length字段,表示序列化后的字节占用的字节数。因此ObjectEncoder/ObjectDecoder采用的通信协议如下:
+--------+----------+ | Length | Content | +--------+----------+
其中:
Length:表示Content字段占用的字节数,Length本身占用的字节数我们可以指定为一个固定的值
Content:对象经过JDK序列化后二进制字节内容
乍一看,这与我们在上一节讲解的LengthFieldBasedFrameDecoder
/LengthFieldPrepender
采用的通信协议很类似。事实上ObjectDecoder本身就继承了LengthFieldBasedFrameDecoder。不过ObjectEncoder略有不同,其并没有继承LengthFieldPrepender,而是内部直接添加了Length字段。
ObjectEncoder源码如下:
@Sharable public class ObjectEncoder extends MessageToByteEncoder<Serializable> { private static final byte[] LENGTH_PLACEHOLDER = new byte[4]; //当需要编码时,encode方法会被回调 //参数msg:就是我们需要序列化的java对象 //参数out:我们需要将序列化后的二进制字节写到ByteBuf中 @Override protected void encode(ChannelHandlerContext ctx, Serializable msg, ByteBuf out) throws Exception { int startIdx = out.writerIndex(); //ByteBufOutputStream是Netty提供的输出流,数据写入其中之后,可以通过其buffer()方法会的对应的ByteBuf实例 ByteBufOutputStream bout = new ByteBufOutputStream(out); //JDK序列化机制的ObjectOutputStream ObjectOutputStream oout = null; try { //首先占用4个字节,这就是Length字段的字节数,这只是占位符,后面为填充对象序列化后的字节数 bout.write(LENGTH_PLACEHOLDER); //CompactObjectOutputStream是netty提供的类,其实现了JDK的ObjectOutputStream,顾名思义用于压缩 //同时把bout作为其底层输出流,意味着对象序列化后的字节直接写到了bout中 oout = new CompactObjectOutputStream(bout); //调用writeObject方法,即表示开始序列化 oout.writeObject(msg); oout.flush(); } finally { if (oout != null) { oout.close(); } else { bout.close(); } } int endIdx = out.writerIndex(); //序列化完成,设置占位符的值,也就是对象序列化后的字节数量 out.setInt(startIdx, endIdx - startIdx - 4); } }
ObjectDecoder源码如下所示:
//注意ObjectDecoder继承了LengthFieldBasedFrameDecoder public class ObjectDecoder extends LengthFieldBasedFrameDecoder { private final ClassResolver classResolver; public ObjectDecoder(ClassResolver classResolver) { this(1048576, classResolver); } //参数maxObjectSize:表示可接受的对象反序列化的最大字节数,默认为1048576 bytes,约等于1M //参数classResolver:由于需要将二进制字节反序列化为Java对象,需要指定一个ClassResolver来加载这个类的字节码对象 public ObjectDecoder(int maxObjectSize, ClassResolver classResolver) { //调用父类LengthFieldBasedFrameDecoder构造方法,关于这几个参数的作用,参见之前章节的分析 super(maxObjectSize, 0, 4, 0, 4); this.classResolver = classResolver; } //当需要解码时,decode方法会被回调 @Override protected Object decode(ChannelHandlerContext ctx, ByteBuf in) throws Exception { //首先调用父类的decode方法进行解码,会解析出包含可解析为java对象的完整二进制字节封装到ByteBuf中,同时Length字段的4个字节会被删除 ByteBuf frame = (ByteBuf) super.decode(ctx, in); if (frame == null) { return null; } //构造JDK ObjectInputStream实例用于解码 ObjectInputStream ois = new CompactObjectInputStream(new ByteBufInputStream(frame, true), classResolver); try { //调用readObject方法进行解码,其返回的就是反序列化之后的Java对象 return ois.readObject(); } finally { ois.close(); } } }
下面我们通过实际案例来演示ObjectEncoder与ObjectDecoder的使用。
2 使用案例
首先我们分别编写两个Java对象Request/Response分别表示请求和响应。
public class Request implements Serializable{ private String request; private Date requestTime; //setters getters and toString }
public class Response implements Serializable{ private String response; private Date responseTime; //setters getters and toString }
接着我们编写Client端代码:
public class JdkSerializerClient { public static void main(String[] args) throws Exception { EventLoopGroup workerGroup = new NioEventLoopGroup(); try { Bootstrap b = new Bootstrap(); // (1) b.group(workerGroup); // (2) b.channel(NioSocketChannel.class); // (3) b.option(ChannelOption.SO_KEEPALIVE, true); // (4) b.handler(new ChannelInitializer<SocketChannel>() { @Override public void initChannel(SocketChannel ch) throws Exception { ch.pipeline().addLast(new ObjectEncoder()); ch.pipeline().addLast(new ObjectDecoder(new ClassResolver() { public Class<?> resolve(String className) throws ClassNotFoundException { return Class.forName(className); } })); ch.pipeline().addLast(new ChannelInboundHandlerAdapter() { // 在于server建立连接后,即发送请求报文 public void channelActive(ChannelHandlerContext ctx) { Request request = new Request(); request.setRequest("i am request!"); request.setRequestTime(new Date()); ctx.writeAndFlush(request); } //接受服务端的响应 @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { Response response= (Response) msg; System.out.println("receive response:"+response); } }); } }); // Start the client. ChannelFuture f = b.connect("127.0.0.1", 8080).sync(); // (5) // Wait until the connection is closed. f.channel().closeFuture().sync(); } finally { workerGroup.shutdownGracefully(); } } }
由于client既要发送Java对象Request作为请求,又要接受服务端响应的Response对象,因此在ChannelPipeline
中,我们同时添加了ObjectDecoder和ObjectEncoder。
另外我们自定义了一个ChannelInboundHandler,在连接建立时,其channelActive方法会被回调,我们在这个方法中构造一个Request对象发送,通过ObjectEncoder进行编码。服务端会返回一个Response对象,此时channelRead方法会被回调,由于已经经过ObjectDecoder解码,因此可以直接转换为Reponse对象,然后打印。
server端:
public class JdkSerializerServer { public static void main(String[] args) throws Exception { EventLoopGroup bossGroup = new NioEventLoopGroup(); // (1) EventLoopGroup workerGroup = new NioEventLoopGroup(); try { ServerBootstrap b = new ServerBootstrap(); // (2) b.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class) // (3) .childHandler(new ChannelInitializer<SocketChannel>() { // (4) @Override public void initChannel(SocketChannel ch) throws Exception { ch.pipeline().addLast(new ObjectEncoder()); ch.pipeline().addLast(new ObjectDecoder(new ClassResolver() { public Class<?> resolve(String className) throws ClassNotFoundException { return Class.forName(className); } })); // 自定义这个ChannelInboundHandler打印拆包后的结果 ch.pipeline().addLast(new ChannelInboundHandlerAdapter() { @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { Request request= (Request) msg; System.out.println("receive request:"+request); Response response = new Response(); response.setResponse("response to:"+request.getRequest()); response.setResponseTime(new Date()); ctx.writeAndFlush(response); } }); } }); // Bind and start to accept incoming connections. ChannelFuture f = b.bind(8080).sync(); // (7) System.out.println("JdkSerializerServer Started on 8080..."); f.channel().closeFuture().sync(); } finally { workerGroup.shutdownGracefully(); bossGroup.shutdownGracefully(); } } }
Server端与Client端分析类似,这里不再赘述。
先后启动Server端与Client端,在Server端控制台我们将看到:
JdkSerializerServer Started on 8080... receive request:Request{request='i am request!', requestTime=2018-9-9 18:47:30}
在Client端控制台我们将看到:
receive response:Response{response='response to:i am request!', responseTime=2018-9-9 18:48:36}
到此,我们的案例完成。Request和Response都编码、解码成功了。
3 总结
把ObjectEncoder/ObjectDecoder作为序列化/反序列化入门是非常合适的,因为其足够简单。同时通过这个案例,我们也发现了,通信协议中必须包含了一个Length字段,用于表示对象序列化后的字节数。事实上,这种模式也可以套用到我们接下来要介绍的其他序列化框架中。