您现在的位置是:首页 >技术交流 >Idea+maven+springboot项目搭建系列--2 整合Netty完成客户端&服务器端消息收发网站首页技术交流

Idea+maven+springboot项目搭建系列--2 整合Netty完成客户端&服务器端消息收发

拽着尾巴的鱼儿 2024-07-17 06:01:02
简介Idea+maven+springboot项目搭建系列--2 整合Netty完成客户端&服务器端消息收发

前言:Netty 作为主流的nio 通信模型应用相当广泛,本文在spring-boot 项目中集成Netty,并实现客户端以及服务器端消息的接收和发送;本文是 Spring架构篇–2.7 远程通信基础–使用Netty 的扩展;

1 spring-boot jar包引入:引入的jar 和解释如下:

<!-- springboot-web 用于发送http 请求 -->
 <dependency>
   <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-web</artifactId>
</dependency>
<!-- lombok 用于发送生成java 对象的get set 和构造方法 -->
<dependency>
    <groupId>org.projectlombok</groupId>
    <artifactId>lombok</artifactId>
    <!-- lombok 依赖项目不被maven 传递 -->
    <optional>true</optional>
</dependency>
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-test</artifactId>
    <scope>test</scope>
</dependency>

<!-- netty jar :https://mvnrepository.com/artifact/io.netty/netty-all -->
<dependency>
    <groupId>io.netty</groupId>
    <artifactId>netty-all</artifactId>
    <version>4.1.51.Final</version>
</dependency>
<!-- Json 格式化 https://mvnrepository.com/artifact/com.alibaba.fastjson2/fastjson2 -->
<dependency>
    <groupId>com.alibaba.fastjson2</groupId>
    <artifactId>fastjson2</artifactId>
    <version>2.0.31</version>
</dependency>

2 服务端和客户端:

2.1 服务端:
2.1.1 WebNettyServer 服务端:

import com.example.nettydemo.netty.handler.NettyServerHandler;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;
import lombok.extern.slf4j.Slf4j;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;

import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;

@Slf4j
@Component
public class WebNettyServer {
    private final static Logger logger = LoggerFactory.getLogger(WebNettyServer.class);
    private final static int PORT = 8888;
    private EventLoopGroup bossGroup;
    private EventLoopGroup workerGroup;
    private EventLoopGroup extGroup;

    private ChannelFuture channelFuture;
    @PostConstruct
    public void start() throws InterruptedException {
        bossGroup = new NioEventLoopGroup(1);
        workerGroup = new NioEventLoopGroup();
        extGroup = new NioEventLoopGroup();
        ServerBootstrap bootstrap = new ServerBootstrap();
        bootstrap.group(bossGroup, workerGroup)
                .channel(NioServerSocketChannel.class)
                .childHandler(new ChannelInitializer<SocketChannel>() {
                    @Override
                    protected void initChannel(SocketChannel ch) throws Exception {
                        ChannelPipeline pipeline = ch.pipeline();
                        // 添加解码器
                        pipeline.addLast("解码器",new StringDecoder());
                        // 添加编码器
                        pipeline.addLast("编码器",new StringEncoder());
                        // 添加自己的处理器
                        pipeline.addLast("ext",new NettyServerHandler());
                        // 如果业务调用比较耗时,为了不影响netty 服务端处理器请求的性能,可以使用另外的NioEventLoopGroup 进行handler 处理
                       // pipeline.addLast(extGroup,"ext",new NettyServerHandler());
                    }
                })
                // 连接请求时的等待队列
                .option(ChannelOption.SO_BACKLOG,128)
                // 设置TCP连接是否开启长连接
                .childOption(ChannelOption.SO_KEEPALIVE,true);
        // 绑定端口,开启服务
        channelFuture = bootstrap.bind(PORT).sync();
        if (channelFuture.isSuccess()) {
            logger.debug("Netty server started on port {}", PORT);
        }
    }
    @PreDestroy
    public void stop() {
        bossGroup.shutdownGracefully();
        workerGroup.shutdownGracefully();
        extGroup.shutdownGracefully();
    }
}

这里对code做下简要的说明:

  • 通过NioEventLoopGroup 分别定义了负责接收客户端连接的的bossGroup,定义负责客户端数据的读和服务端写回数据到客户端的workerGroup,定义额外的事件处理extGroup用于负责业务handler 比较耗时的操作;
  • NioServerSocketChannel.class 用于服务端使用NioServerSocketChannel 对象,完成对channel 管道事件的处理;
  • StringDecoder 用于对客户端数据的解码,StringEncoder 用于对服务端发送数据进行编码,NettyServerHandler 为业务类处理解码后的数据;
  • ChannelOption.SO_BACKLOG 设置 ServerSocketChannel 的可连接队列大小,用于处理连接请求时的等待队列,即 TCP 握手完成后等待被应用程序处理的连接队列,在系统内核建立连接之前,连接请求会先进入队列中等待被处理。当 ServerSocketChannel 与客户端连接成功后,会创建一个新的 SocketChannel 与该客户端进行通信,如果队列满了,新的连接请求就会被拒绝。 不同操作系统下 backlog 参数的默认值可能不同,通常建议显式设置它的值。在 Windows 操作系统下,它的默认值为 200,而在 Linux 操作系统下,它的默认值为 128;
  • ChannelOption.SO_KEEPALIVE是一个TCP参数,它用于设置TCP连接是否开启长连接。当设置为true时,TCP连接会在一定时间内没有数据传输时发送心跳包,以保持连接状态;当设置为false时,TCP连接会在一定时间内没有数据传输时直接关闭连接。
  • bootstrap.bind(PORT).sync() 绑定端口,启动监听,sync 同步阻塞直到服务端启动成功;
  • @Component 告诉spring 需要扫描改类并进行装配;
  • @PostConstruct 告诉spring 改类被springboot 装备之后需要执行定义好的 start() 方法;
  • @PreDestroy 告诉spring 在容器关闭之前需要执行定义好的stop() 方法,这里stop() 方法里对定义的事件轮询处理组进行优雅的关闭(优雅是指,事件轮询处理组在完成所有任务之后在进行关闭操作);

2.1.1 服务端 NettyServerHandler 业务处理:


import com.alibaba.fastjson2.JSONArray;
import com.alibaba.fastjson2.JSONObject;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import lombok.extern.slf4j.Slf4j;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.Map;

@Slf4j
public class NettyServerHandler extends SimpleChannelInboundHandler<String> {
    private final static Logger logger = LoggerFactory.getLogger(NettyServerHandler.class);

    @Override
    protected void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception {
        logger.debug("Received message: {}", msg,ctx);
        JSONObject json = JSONObject.parseObject(msg);
        String type = json.getString("type");
        Object content = json.get("content");
        JSONObject resultJson = new JSONObject();
        resultJson.put("id", json.getString("id"));
        resultJson.put("type", type);
        switch (type) {
            case "user":
                // 进入 userService 业务处理
                JSONArray jsonArray;
                Map<String, Object> map;
                if (content instanceof JSONArray) {
                    jsonArray = (JSONArray) content;
                    // 处理 jsonArray
                    // ...
                } else if (content instanceof JSONObject) {
                    map = (Map<String, Object>) content;
                    // 处理 map
                    // ...
                }
                resultJson.put("success", true);
                break;
            case "order":
                // 进入 orderService 业务处理
                resultJson.put("success", true);
                break;
            default:
                resultJson.put("success", false);
                resultJson.put("errorMsg", "unsupported type");
        }
        // 回写结果到客户端
        log.debug("回写结果:{}到客户端",resultJson.toJSONString());
        ctx.channel().writeAndFlush(resultJson.toJSONString());
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        cause.printStackTrace();
        ctx.close();
    }
}

code 比较简单,在服务端的处理器中,首先将接收到的字符串解析成一个 JSONObject 对象。然后根据 type 字段进行业务分发,可以在不同的分支中获取 content 字段并根据内容进行业务处理。处理完成后,将结果封装成一个新的 JSONObject 对象,并回写到客户端。需要注意的是,Netty 自带的 StringDecoder 和 StringEncoder 可以直接处理字符串,因此不需要手动进行 Json 转换。同时,回写结果时需要使用 ctx.channel().writeAndFlush() 方法,而不是之前的 channelFuture.channel().writeAndFlush()。

2.2 客户端:
2.2.1 客户端WebNettyClient:



import com.example.nettydemo.netty.handler.NettyClientHandler;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;
import lombok.extern.slf4j.Slf4j;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.context.annotation.DependsOn;
import org.springframework.stereotype.Component;

import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;

@Slf4j
@Component
@DependsOn("webNettyServer")
public class WebNettyClient {
    private final static Logger logger = LoggerFactory.getLogger(WebNettyClient.class);
    private final String host = "127.0.0.1";  // 服务端地址
    private final int port = 8888;  // 服务端端口
    private ChannelFuture channelFuture;  // 连接通道
    private EventLoopGroup eventLoopGroup;
    @PostConstruct
    public void start() throws InterruptedException {
        eventLoopGroup = new NioEventLoopGroup();
        Bootstrap bootstrap = new Bootstrap();
        bootstrap.group(eventLoopGroup)
                .channel(NioSocketChannel.class)
                .handler(new ChannelInitializer<SocketChannel>() {
                    @Override
                    protected void initChannel(SocketChannel ch) throws Exception {
                        ChannelPipeline pipeline = ch.pipeline();
                        // 添加解码器
                        pipeline.addLast(new StringDecoder());
                        // 添加编码器
                        pipeline.addLast(new StringEncoder());
                        // 添加自己的处理器
                        pipeline.addLast(new NettyClientHandler());
                    }
                });
        // 连接服务端
        channelFuture = bootstrap.connect(host, port).sync();
        if (channelFuture.isSuccess()) {
            logger.debug("连接服务端成功");
        }
    }
    public void sendMessage(String message) {
        channelFuture.channel().writeAndFlush(message);
    }
    @PreDestroy
    public void stop() {
        eventLoopGroup.shutdownGracefully();
    }
}

2.2.1 客户端业务处理:

package com.example.nettydemo.netty.handler;

import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import lombok.extern.slf4j.Slf4j;

@Slf4j
public class NettyClientHandler extends SimpleChannelInboundHandler<String> {
    @Override
    protected void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception {
        // 接收到服务端消息时的处理
        log.debug("Received from server:{} ", msg);
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        cause.printStackTrace();
        ctx.close();
    }
}

客户端的code 基本和服务端一致,客户端接收了服务端的回写数据并进行了打印,其中@DependsOn(“webNettyServer”) 告诉spring 在装载客户端的bean 时,先要去装载服务端的bean;

2.3 web controller 客户端发送数据测试:
controller:



import com.alibaba.fastjson2.JSONObject;
import com.example.nettydemo.modules.user.dto.UserDto;
import com.example.nettydemo.netty.client.WebNettyClient;
import com.example.nettydemo.netty.data.NettyData;
import com.example.nettydemo.netty.server.WebNettyServer;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;

@RestController
@RequestMapping("user/")
public class UserController {
    @Autowired
    private WebNettyClient webNettyClient;

    /**
     * 客户端信息发送
     * @return
     */
    @GetMapping("client/msg")
    public boolean testClientUser(){
        // 处理业务
        NettyData data = dealAndGetClientData();
        webNettyClient.sendMessage(JSONObject.toJSONString(data));

        return true;
    }

    private NettyData dealAndGetClientData() {
        NettyData data = new NettyData();
        data.setId("1").setType("user-client").setContent(new UserDto().setUserId("001").setUserName(" haha"));
        return data;
    }

}

UserDto:



import lombok.Data;
import lombok.experimental.Accessors;

import java.io.Serializable;

@Data
@Accessors(chain = true)
public class UserDto implements Serializable {
    private String userId;
    private String userName;
}

NettyData:


import lombok.Data;
import lombok.experimental.Accessors;

import java.io.Serializable;

@Data
@Accessors(chain = true)
public class NettyData implements Serializable {
    private String id;
    private String type;
    private Object content;
}

可以看到服务端和客户端的结果接收情况:
在这里插入图片描述

3 总结:

  • Netty 的服务端,通过定义bossGroup对客户端的accept 事件进行处理,然后通过定义的workerGroup 进行具体channel 内读写事件发生的处理;
  • Netty在read数据时,会依次从头到尾调用 ChannelInboundHandlerAdapter 的hadler 进行数据的处理;
  • Netty在write 数据时,如果使用SocketChannel ch进行数据写入,会依次从尾到头调用ChannelOutboundHandlerAdapter 的handler 进行数据处理,如果通过ChannelHandlerContext ctx会从当前处理器,向前找ChannelOutboundHandlerAdapter 的handler 进行数据处理;
  • Netty 对于事件的处理是通过SocketChannel channel 中的pipeline 中每个handler 进行的处理,而pipeline 中维护的是handler 的双向链表;

4 git 项目代码:
https://codeup.aliyun.com/61cd21816112fe9819da8d9c/netty-demo.git

5 扩展:
5.1 使用EmbeddedChannel 来测试chanel 中handler 的处理走向:


import io.netty.buffer.ByteBufAllocator;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.channel.ChannelOutboundHandlerAdapter;
import io.netty.channel.ChannelPromise;
import io.netty.channel.embedded.EmbeddedChannel;
import lombok.extern.slf4j.Slf4j;

import java.nio.charset.StandardCharsets;

/**
 * 测试出入handler
 */
@Slf4j
public class TestEmbeddedChannel {
    public static void main(String[] args) {
        ChannelInboundHandlerAdapter h1 =new ChannelInboundHandlerAdapter(){
            @Override
            public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
                log.debug("h1");

                super.channelRead(ctx, msg);
            }
        };
        ChannelInboundHandlerAdapter h2 =new ChannelInboundHandlerAdapter(){
            @Override
            public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
                log.debug("h2");
                super.channelRead(ctx, msg);
            }
        };
        ChannelOutboundHandlerAdapter h3 = new ChannelOutboundHandlerAdapter(){
            @Override
            public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
                log.debug("h3");
                super.write(ctx, msg, promise);
            }
        };
        ChannelOutboundHandlerAdapter h4 = new ChannelOutboundHandlerAdapter(){
            @Override
            public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
                log.debug("h4");
                super.write(ctx, msg, promise);
            }
        };
        EmbeddedChannel channel = new EmbeddedChannel(h1,h2,h3,h4);
        //  测试read 
        channel.writeInbound(ByteBufAllocator.DEFAULT.buffer().writeBytes("hello read".getBytes(StandardCharsets.UTF_8)));
        //  测试write
        channel.writeOutbound(ByteBufAllocator.DEFAULT.buffer().writeBytes("hello write".getBytes(StandardCharsets.UTF_8)));


    }
}

风语者!平时喜欢研究各种技术,目前在从事后端开发工作,热爱生活、热爱工作。