首页 技术 正文
技术 2022年11月9日
0 收藏 642 点赞 3,135 浏览 9805 个字

看了几天高并发和NIO 今晚终于要开始学习Netty

http://ifeve.com/netty5-user-guide/

Netty实现通信的步骤

1.创建两个NIO线程组,一个专门用于网络事件的处理(接收客户端的连接), 另一个则进行网络通信读写

2.创建一个ServerBootStarp对象,配置Netty的一些列参数,例如接受传出数据内存的大小等。

3.创建一个世纪处理数据的类ChannelInitializer,进行初始化的工作,例如设置接收传出数据的字符集·格式·已经实现处理数据的接口

4.绑定端口,执行同步阻塞方法等待服务端的启动

TCP粘包拆包

TCP是一个流的协议,数据没有分界线 如果传了三个包 ABC  DEF  GHI   TCP会根据缓冲区实际情况进行包的划分就容易 解读为 AB  CDEF  GHI

解决办法:1.消息定长,例如每个报文大小固定为200,如果不够,空位补空格

     2.在包尾加特殊字符进行分割

     3.将消息分为消息头和消息体,在消息头中包含消息总长度的字段,然后进行业务逻辑的处理

public class Server {    public static void main(String[] args) throws Exception {
//1 创建线两个程组
//一个是用于处理服务器端接收客户端连接的
//一个是进行网络通信的(网络读写的)
EventLoopGroup pGroup = new NioEventLoopGroup();
EventLoopGroup cGroup = new NioEventLoopGroup(); //2 创建辅助工具类,用于服务器通道的一系列配置
ServerBootstrap b = new ServerBootstrap();
b.group(pGroup, cGroup) //绑定俩个线程组
.channel(NioServerSocketChannel.class) //指定NIO的模式
.option(ChannelOption.SO_BACKLOG, 1024) //设置tcp缓冲区
.option(ChannelOption.SO_SNDBUF, 32*1024) //设置发送缓冲大小
.option(ChannelOption.SO_RCVBUF, 32*1024) //这是接收缓冲大小
.option(ChannelOption.SO_KEEPALIVE, true) //保持连接
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel sc) throws Exception {
//3 在这里配置具体数据接收方法的处理
sc.pipeline().addLast(new ServerHandler());
}
}); //4 进行绑定
ChannelFuture cf1 = b.bind(8765).sync();
//ChannelFuture cf2 = b.bind(8764).sync();
//5 等待关闭
cf1.channel().closeFuture().sync();
//cf2.channel().closeFuture().sync();
pGroup.shutdownGracefully();
cGroup.shutdownGracefully();
}
}
public class ServerHandler extends ChannelHandlerAdapter {    @Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
System.out.println("server channel active... ");
} @Override
public void channelRead(ChannelHandlerContext ctx, Object msg)
throws Exception {
ByteBuf buf = (ByteBuf) msg;
byte[] req = new byte[buf.readableBytes()];
buf.readBytes(req);
String body = new String(req, "utf-8");
System.out.println("Server :" + body );
String response = "进行返回给客户端的响应:" + body ;
ctx.writeAndFlush(Unpooled.copiedBuffer(response.getBytes()))
.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
if (future.isSuccess()) {
System.out.println("write is success");
} else {
future.cause().printStackTrace();
}
}
})
.addListener(ChannelFutureListener.CLOSE); // 关闭
} @Override
public void channelReadComplete(ChannelHandlerContext ctx)
throws Exception {
System.out.println("读完了");
ctx.flush();
} @Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable t)
throws Exception {
ctx.close();
}}
public class Client {    public static void main(String[] args) throws Exception{        EventLoopGroup group = new NioEventLoopGroup();
Bootstrap b = new Bootstrap();
b.group(group)
.channel(NioSocketChannel.class)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel sc) throws Exception {
sc.pipeline().addLast(new ClientHandler());
}
}); ChannelFuture cf1 = b.connect("127.0.0.1", 8765).sync();

   //这个对象可以看作是一个异步操
  //作的结果的占位符;它将在未来的某个时刻完成,并提供对其结果的访问

//ChannelFuture cf2 = b.connect("127.0.0.1", 8764).sync();
//发送消息
Thread.sleep(1000);
cf1.channel().writeAndFlush(Unpooled.copiedBuffer("777".getBytes()));
cf1.channel().writeAndFlush(Unpooled.copiedBuffer("666".getBytes()));
//cf2.channel().writeAndFlush(Unpooled.copiedBuffer("888".getBytes()));
Thread.sleep(2000);
cf1.channel().writeAndFlush(Unpooled.copiedBuffer("888".getBytes()));
//cf2.channel().writeAndFlush(Unpooled.copiedBuffer("666".getBytes())); cf1.channel().closeFuture().sync();
//cf2.channel().closeFuture().sync();
group.shutdownGracefully(); }
}
public class ClientHandler extends ChannelHandlerAdapter{    @Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
} @Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
try {
ByteBuf buf = (ByteBuf) msg; byte[] req = new byte[buf.readableBytes()];
buf.readBytes(req); String body = new String(req, "utf-8");
System.out.println("Client :" + body );
String response = "收到服务器端的返回信息:" + body;
} finally {
ReferenceCountUtil.release(msg);
}
} @Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception { } @Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)
throws Exception {
ctx.close();
}}

这里面的Bootstarp 相当于配置引导类

对于 ChannelInitializer 是ChannelHandler  Netty 程序都是基于 ChannelPipeline。ChannelPipeline 和 EventLoop 和EventLoopGroup 密切相关,因为它们三个都和事件处理相关,所以这就是为什么它们处理IO 的工作由 EventLoop 管理的原因

当注册一个Channel后 Netty 将这个 Channel 绑定到一个 EventLoop,在 Channel 的生 命周期内总是被绑定到一个 EventLoop。在 Netty IO 操作中,你的程序不需要同步,因为

一个指定通道的所有 IO 始终由同一个线程来执行

Netty的学习

ServerBootstrap 使用 2 个 EventLoopGroup

一个 ServerBootstrap 可以认为有 2个 channels 组,
第一组包含一个单例 ServerChannel,代表持有一个绑定了本地端口的socket;
第二组包含所有的 Channel,代表服务器已接受了的连接

Netty的学习

EventLoopGroup A 唯一的目的就是接受连接然后交给 EventLoopGroup B,

//1 创建线两个程组
//一个是用于处理服务器端接收客户端连接的
//一个是进行网络通信的(网络读写的)

Netty可以使用两个不同的 Group,因为服务器程序需要接受很多客户端连接的情况下,一个EventLoopGroup 将是程序性能的瓶颈,因为事件循环忙于处理连接请求,没有多余的资源
和空闲来处理业务逻辑,最后的结果会是很多连接请求超时。若有两 EventLoops, 即使在高负载下,所有的连接也都会被接受,因为 EventLoops 接受连接不会和哪些已经连接了的处理共享资源

EventLoopGroup 和 EventLoop 是什么关系?EventLoopGroup 可以包含很多个EventLoop,每个 Channel 绑定一个 EventLoop 不会被改变,因为 EventLoopGroup 包含少量的 EventLoop 的 Channels,很多 Channel 会共享同一个 EventLoop。这意味着在一个Channel 保持 EventLoop 繁忙会禁止其他 Channel 绑定到相同的 EventLoop。我们可以理解为 EventLoop 是一个事件循环线程,而 EventLoopGroup 是一个事件循环集合。

 ChannelFuture 

这个对象可以看作是一个异步操作的结果的占位符;它将在未来的某个时刻完成,并提供对其结果的访问

每个 Netty 的出站 I/O 操作都将返回一个 ChannelFuture;也就是说,它们都不会阻塞。正如我们前面所提到过的一样,Netty 完全是异步和事件驱动的

Netty的学习

Netty 中发送消息有两种方法:直接写入通道或写入ChannelHandlerContext 对象。这两种方法的主要区别如下:

  直接写入通道导致处理消息从 ChannelPipeline 的尾部开始(ChannelOutboundHandler 出站)

  写入 ChannelHandlerContext 对象导致处理消息从 ChannelPipeline 的下一个handler 开始(ChannelInboundHandler入站)

对于这些Handler  常 有“ByteToMessageDecoder”、“MessageToByteEncoder”,还有 Google 的协议“ProtobufEncoder”和“ProtobufDecoder”

我们可以在添加 ChannelHandler 到 ChannelPipeline 中时指定一个 EventExecutorGroup,EventExecutorGroup 会获得一个 EventExecutor,EventExecutor将执行ChannelHandler 的所有方法。EventExecutor 将使用不同的线程来执行和释放EventLoop

Transport API

Netty的学习

如上图所示,每个 Channel 都会分配一个 ChannelPipeline 和 ChannelConfig

ChannelConfig 负责设置并存储配置,并允许在运行期间更新它们, ChannelPipeline 容纳了使用的 ChannelHandler 实例

ChannelHandler 

 传输数据时,将数据从一种格式转换到另一种格式
 异常通知
 Channel 变为有效或无效时获得通知
 Channel 被注册或从 EventLoop 中注销时获得通知
 通知用户特定事件

Channel 

 eventLoop(),返回分配给 Channel 的 EventLoop
 pipeline(),返回分配给 Channel 的 ChannelPipeline
 isActive(),返回 Channel 是否激活,已激活说明与远程连接对等
 localAddress(),返回已绑定的本地 SocketAddress
 remoteAddress(),返回已绑定的远程 SocketAddress
 write(),写数据到远程客户端,数据通过 ChannelPipeline 传输过去

 不同类型的 ByteBuf

Heap Buffer(堆缓冲区)

存储在JVM的堆上,存储在数组中,清除也很快 ByteBuf.array()获取数据   hasArray 检查

Direct Buffer(直接缓冲区)

直接缓冲区,在堆之外直接分配内存。直接缓冲区不会占用堆空间容量。直接缓冲区在使用 Socket 传递数据时性能很好,因为若使用间接缓冲区,JVM 会先将数据复制到直接缓冲区再进行传递;缺点是在分配内存空间和释放内存时比堆缓冲区更复杂,而 Netty 使用内存池来解决这样的问题,这也是 Netty 使用内存池的原因之一。直接缓冲区不支持数组访问数据,但是我们可以间接的访问数据数组

  ByteBuf directBuf = Unpooled.directBuffer(16);
if(!directBuf.hasArray()){
int len = directBuf.readableBytes();
byte[] arr = new byte[len];
directBuf.getBytes(0, arr);
}

Composite Buffer(复合缓冲区)

CompositeByteBuf compBuf = Unpooled.compositeBuffer();
ByteBuf heapBuf = Unpooled.buffer(8);
ByteBuf directBuf = Unpooled.directBuffer(16);
// 添加 ByteBuf 到 CompositeByteBuf
compBuf.addComponents(heapBuf, directBuf);
// 删除第一个 ByteBuf
compBuf.removeComponent(0);
Iterator<ByteBuf> iter = compBuf.iterator();
while (iter.hasNext()) {
System.out.println(iter.next().toString());
}
// 使用数组访问数据
if (!compBuf.hasArray()) {
int len = compBuf.readableBytes();
byte[] arr = new byte[len];
compBuf.getBytes(0, arr);
}

ByteBuf 提供两个指针变量支付读和写操作,读操作是使用 readerIndex(),写操作时使用 writerIndex()。这和 JDK 的 ByteBuffer 不同,ByteBuffer 只有一个方法来设置索引所以需要使用 flip()方法来切换读和写模式

discardReadBytes()可以用来清空 ByteBuf 中已读取的数据, 

isReadable()  判断 writerIndex > readerIndex;

writableBytes   capacity() – writerIndex 能否写

clear() 重设索引    readerIndex = writerIndex = 0;

调用 duplicate()、slice()、slice(int index, int length)、order(ByteOrder endianness)
会创建一个现有缓冲区的视图

ByteBufAllocator 负责分配 ByteBuf 实例  ByteBufAllocator.heapBuffer()

可以从 Channel 的 alloc()获取,也可以从ChannelHandlerContext 的 alloc()获取

Unpooled 也能创建ByteBuf实例    Unpooled.compositeBuffer();   Unpooled.buffer(8);

ChannelHandler

ChannelPipeline

ChannelPipeline是ChannelHandler实例的列表,用于处理或截获通道的接收和发送数据。

addFirst(…),添加ChannelHandler在ChannelPipeline的第一个位置  addBefore(…),在ChannelPipeline中指定的ChannelHandler名称之前添加ChannelHandler

Netty的学习

每个ChannelHandler被添加到ChannelPipeline后,都会创建一个ChannelHandlerContext并与之创建的ChannelHandler关联绑定。ChannelHandlerContext允许ChannelHandler与其他的ChannelHandler实现进行交互。

   如果你想有一些事件流全部通过ChannelPipeline,有两个不同的方法可以做到:

  • 调用Channel的方法
  • 调用ChannelPipeline的方法
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(new ChannelInboundHandlerAdapter() {
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
//Event via Channel
Channel channel = ctx.channel();
channel.write(Unpooled.copiedBuffer("netty in action", CharsetUtil.UTF_8));
//Event via ChannelPipeline
ChannelPipeline pipeline = ctx.pipeline();
pipeline.write(Unpooled.copiedBuffer("netty in action", CharsetUtil.UTF_8));

    //为了节省开销,不感兴趣的ChannelHandler不让通过 直接跳过前面

    //排除一些ChannelHandler

    ctx.write(Unpooled.copiedBuffer(“Netty in Action”, CharsetUtil.UTF_8));

        }
});
}

ChannelHandler实例如果带有@Sharable注解则可以被添加到多个ChannelPipeline。也就是说单个ChannelHandler实例可以有多个ChannelHandlerContext,因此可以调用不同ChannelHandlerContext获取同一个ChannelHandler

Channel的状态在其生命周期中变化

Netty的学习

  • handlerAdded,ChannelHandler添加到实际上下文中准备处理事件
  • handlerRemoved,将ChannelHandler从实际上下文中删除,不再处理事件
  • exceptionCaught,处理抛出的异常

ChannelInboundHandler\

  • channelRegistered,ChannelHandlerContext的Channel被注册到EventLoop;
  • channelUnregistered,ChannelHandlerContext的Channel从EventLoop中注销
  • channelActive,ChannelHandlerContext的Channel已激活
  • channelInactive,ChannelHanderContxt的Channel结束生命周期
  • channelRead,从当前Channel的对端读取消息
  • channelReadComplete,消息读取完成后执行
  • userEventTriggered,一个用户事件被处罚
  • channelWritabilityChanged,改变通道的可写状态,可以使用Channel.isWritable()检查
  • exceptionCaught,重写父类ChannelHandler的方法,处理异常

常用的 SimpleChannelInboundHandler

ChannelOutboundHandler

        ChannelOutboundHandler用来处理“出站”的数据消息。ChannelOutboundHandler提供了下面一些方法:

  • bind,Channel绑定本地地址
  • connect,Channel连接操作
  • disconnect,Channel断开连接
  • close,关闭Channel
  • deregister,注销Channel
  • read,读取消息,实际是截获ChannelHandlerContext.read()
  • write,写操作,实际是通过ChannelPipeline写消息,Channel.flush()属性到实际通道
  • flush,刷新消息到通道

 ByteToMessageDecoder

MessageToByteEncoder

 ByteToMessageCodec   用来处理 byte-to-message 和 message-to-byte

MessageToMessageCodec

CombinedChannelDuplexHandler 来结合解码器和编码器   组合

相关推荐
python开发_常用的python模块及安装方法
adodb:我们领导推荐的数据库连接组件bsddb3:BerkeleyDB的连接组件Cheetah-1.0:我比较喜欢这个版本的cheeta…
日期:2022-11-24 点赞:878 阅读:9,489
Educational Codeforces Round 11 C. Hard Process 二分
C. Hard Process题目连接:http://www.codeforces.com/contest/660/problem/CDes…
日期:2022-11-24 点赞:807 阅读:5,904
下载Ubuntn 17.04 内核源代码
zengkefu@server1:/usr/src$ uname -aLinux server1 4.10.0-19-generic #21…
日期:2022-11-24 点赞:569 阅读:6,737
可用Active Desktop Calendar V7.86 注册码序列号
可用Active Desktop Calendar V7.86 注册码序列号Name: www.greendown.cn Code: &nb…
日期:2022-11-24 点赞:733 阅读:6,490
Android调用系统相机、自定义相机、处理大图片
Android调用系统相机和自定义相机实例本博文主要是介绍了android上使用相机进行拍照并显示的两种方式,并且由于涉及到要把拍到的照片显…
日期:2022-11-24 点赞:512 阅读:8,128
Struts的使用
一、Struts2的获取  Struts的官方网站为:http://struts.apache.org/  下载完Struts2的jar包,…
日期:2022-11-24 点赞:671 阅读:5,291