Spring Boot 3 + Netty 构建高并发即时通讯服务

01引子
Netty 作为异步事件驱动的网络框架,凭借出色的性能和设计,已成为高并发通信领域的标杆。它不仅简化了 Java NIO 编程的复杂性,还能轻松支持数万并发连接,被 Dubbo 、 Elasticsearch 等众多知名项目采用。本文将探索如何结合 Spring Boot 3 与 Netty ,打造高性能即时通讯系统。
 
02前置科普
在深入 Netty 之前,让我们先系统地了解 IO 模型的基础知识。已熟悉这些概念的读者可直接跳至下一章节。
IO模型的基础概念
阻塞与非阻塞
线程访问资源,该资源是否准备就绪的一种处理方式:
  1. 阻塞:线程请求资源时,如果资源未就绪,线程会一直等待,期间无法执行其他任务
  2. 非阻塞:线程请求资源时,无论资源是否就绪,都会立即返回,线程可以继续执行其他任务
同步与异步
同步和异步是指访问数据的一种机制:
  1. 同步:线程主动等待资源就绪并主动获取结果
  2. 异步:线程不主动等待,通过回调或通知机制在资源就绪时获得结果

小结一下:
  1. 阻塞/非阻塞:是关于你在等待结果时能不能去做别的事
  2. 同步/异步:是关于谁负责"盯着"任务完成 - 是你自己不断去查看,还是由别人通知你
Java中的三种IO模型
BIO (Blocking I/O)
传统的同步阻塞 IO 模型,特点如下:
  1. 工作流程:线程发起 IO 请求 → 线程阻塞等待 → IO 操作完成 → 线程继续执行
  2. 资源消耗:每个连接需要独立的线程处理,当并发连接数增加时,系统需要创建大量线程
  3. 适用场景:连接数较少、请求处理时间短的简单应用
  4. 优缺点:实现简单直观,但在高并发场景下性能严重下降
NIO (Non-blocking I/O)
JDK 1.4引入的同步非阻塞 IO 模型:
  1. 工作流程: Selector (选择器)监控多个 Channel (通道) → 有事件发生时处理对应 Channel (通道) → 处理完成后继续监控
  2. 资源效率:单个线程可以管理多个连接,显著提高了系统的并发处理能力
  3. 适用场景:高并发、低延迟的网络应用,如聊天服务器、游戏服务器

AIO (Asynchronous I/O)
JDK 7引入的异步非阻塞 IO 模型:
  1. 工作流程:应用程序发起 IO 请求 → 继续执行其他任务 → IO 完成后回调通知
  2. 特点:完全由操作系统通知机制驱动,应用无需主动轮询,真正实现了异步通信
  3. 局限性:虽然 API 设计先进,但在 Linux 系统上实际表现不如预期,底层仍依赖多路复用
  4. 适用场景:需要极高吞吐量的应用,特别是在 Windows 平台
小结一下:
  1. BIO (阻塞I/O) : 好比你去餐厅点餐,点完后只能坐在位置上干等,不能做其他事情,直到服务员把饭菜端上来。每个客人都需要一个专属服务员全程等候。
  2. NIO (非阻塞I/O) : 好比你去餐厅点餐,拿了一个取餐号,然后可以去逛逛、玩手机。但你需要时不时自己去看电子屏幕,检查自己的号码是否显示。一个服务员可以同时应对多个客人的点餐需求。
  3. AIO (异步I/O) : 好比你去餐厅点餐,点完后给服务员留下手机号,然后你可以随意活动。餐做好后,服务员会主动打电话通知你来取餐。你完全不需要操心查看进度。
03初识Netty
Netty 是一个高性能的异步事件驱动网络框架,为开发客户端/服务端应用提供了简洁而强大的API。它成功解决了原生 NIO 编程中的诸多痛点:API 复杂难用、网络异常处理困难以及 JDK 内置的 NIO Bug。Netty 在 NIO 的基础上进行了优化封装,既保留了 NIO 的高并发特性,又提供了更友好的开发体验。它通过零拷贝技术提升传输效率,同时支持多种协议,具备强大的功能定制能力。
Ps: 零拷贝技术让数据传输更高效,通过减少内存间的数据复制次数实现。普通 IO 读写文件需要在内核空间和用户空间之间多次复制数据,而零拷贝允许数据直接从磁盘传输到网络,跳过中间环节,大幅提升性能,特别适合大文件传输场景。
线程模型
Netty 提供了三种核心线程模型,可根据应用需求灵活选择:
  1. 单线程模型:所有的 IO 操作(连接建立、数据读写、业务处理)均由同一个 NIO 线程完成。结构简单,适用于连接数少、业务逻辑轻量的场景。
  2. 多线程模型:由一组 NIO 线程共同处理所有 IO 操作,提高系统吞吐量。每个连接可能被不同线程处理,但同一时刻只会被一个线程处理,避免了多线程竞争。
  3. 主从线程模型:将职责分离 - Boss 线程池专门负责接受连接请求,Worker 线程池负责处理IO和业务逻辑。这种模型在高并发场景下表现最佳。

Netty的一大亮点是它的"管道处理"机制,这就像一条流水线:
想象一下,每个网络连接(Channel)就像一个独立的生产线,数据包从一端进入,经过多个"工序"(Handler)处理后从另一端输出。这些"工序"可以是数据解码、安全检查、业务逻辑等不同步骤。每个"工序"只负责自己的专长工作,不需要关心整体流程。
这样设计的好处很直观 - 就像现代工厂的流水线一样,每个岗位只专注做好一件事,整体效率更高,而且可以轻松调整或替换某个环节而不影响整体运作。对开发者来说,这意味着代码结构清晰、容易维护,还能重复利用这些"工序模块"来搭建不同的应用。
生命周期
Netty 组件的生命周期管理是其框架设计的重要部分,理解这些生命周期有助于我们在编码时能够更好地控制应用程序的行为。
Channel的生命周期
Channel 在 Netty 中代表一个网络连接,它的生命周期包括以下几个主要状态:
  1. 注册(Registered):Channel 被注册到 EventLoop 上
  2. 激活(Active):Channel 连接建立并就绪
  3. 非激活(Inactive):Channel 连接断开
  4. 注销(Unregistered):Channel 从 EventLoop 中注销
这些状态变化会触发 ChannelHandler 中的相应生命周期方法,如 channelRegistered()、channelActive() 等。
Handler的生命周期
Handler 是数据处理的核心组件,它们也有清晰的生命周期:
  1. 添加:handlerAdded() 在 Handler 被添加到 ChannelPipeline 时调用
  2. 移除:handlerRemoved() 在Handler从ChannelPipeline 移除时调用
  3. 异常:exceptionCaught() 在处理过程中发生异常时调用
服务器启动流程
  1. 创建服务器引导类:使用 ServerBootstrap 配置服务器参数
  2. 设置线程模型:指定 Boss 线程组和 Worker 线程组
  3. 配置Channel:选择 NioServerSocketChannel 等实现
  4. 添加处理器:配置 ChannelInitializer 来设置每个新连接的处理链
  5. 绑定端口:调用 bind() 方法启动服务器并监听端口
我们可以把这个过程想象成组装和使用一台机器:首先准备好零件(创建组件),然后按照说明书组装(配置连接),接通电源(启动服务),机器开始工作(处理数据),最后关闭电源拆解维护(关闭资源)。整个过程有条不紊,每个组件都知道自己什么时候该做什么事情。
04实时通讯技术方案选型
在构建需要实时数据交互的应用时,有三种主流技术方案:
Ajax轮训
  1. 原理:客户端定时向服务器发送请求,检查是否有新数据。
  2. 优势:实现简单,兼容性极佳,几乎所有浏览器都支持,服务器逻辑直观。
  3. 劣势:产生大量无效请求浪费资源,实时性受轮询间隔限制,延迟明显,高并发时可能造成服务器压力。
Long pull(长轮询)
  1. 原理:客户端发送请求后,服务器保持连接不立即响应,直到有新数据或超时才返回,客户端收到后立即发起新请求。
  2. 优势:减少无效请求,实时性较轮询有所提升,兼容性良好。
  3. 劣势:服务器需维持大量连接,高并发场景资源消耗大,仍有一定延迟。
WebSocket
  1. 原理:建立单一TCP连接后提供持久双向通信通道,双方可随时发送数据。
  2. 优势:真正的实时双向通信,延迟低,协议开销小,适合频繁数据交换,资源消耗相对较低。
  3. 劣势:实现复杂度较高,部分老旧浏览器不支持,某些网络环境可能受限。
在我们要构建的即时通讯服务中, WebSocket 无疑是最佳选择,它能最好地满足我们对实时性的要求。值得一提的是, Netty 提供了对 WebSocket 的原生支持和优化实现,这让我们能够轻松构建可扩展且高效的实时通讯系统,省去了处理底层通信细节的繁琐工作,更专注于业务逻辑的实现。
05代码实现
// 1. WebSocket连接全局配置
globalData: {    // WebSocket服务器连接地址    chatServerUrl: "ws://127.0.0.1:875/ws",    // 全局WebSocket连接对象    CHAT: null,    // 标记WebSocket连接状态    chatSocketOpen: false,},// 2. 应用启动时初始化WebSocket连接onLaunch: function({    // 程序启动时连接聊天服务器    this.doConnect(false);},// 3. 核心方法:建立WebSocket连接doConnect(isFirst) {    // 重连时显示提示    if (isFirst) {        uni.showToast({            icon"loading",            title"断线重连中...",            duration2000        });    }    var me = this;    // 仅当用户已登录时才连接WebSocket    if (me.getUserInfoSession() != null && me.getUserInfoSession() != "" && me.getUserInfoSession() != undefined) {        // 创建WebSocket连接        me.globalData.CHAT = uni.connectSocket({            url: me.globalData.chatServerUrl,            complete()=> {}        });        // 4. 连接成功事件处理        me.globalData.CHAT.onOpen(function(){            // 更新连接状态标记            me.globalData.chatSocketOpen = true;            console.log("ws连接已打开,socketOpen = " + me.globalData.chatSocketOpen);            // 构建初始化消息(消息类型0表示连接初始化)            var chatMsg = {                senderId: me.getUserInfoSession().id,                msgType0            }             var dataContent = {                 chatMsg: chatMsg            }            var msgPending = JSON.stringify(dataContent);            // 发送初始化消息,通知服务器用户身份            me.globalData.CHAT.send({                data: msgPending            });         });        // 5. 连接关闭事件处理        me.globalData.CHAT.onClose(function(){            me.globalData.chatSocketOpen = false;            console.log("ws连接已关闭,socketOpen = " + me.globalData.chatSocketOpen);        });         // 6. 接收消息事件处理        me.globalData.CHAT.onMessage(function(res){            console.log('App.vue 收到服务器内容:' + res.data);            // 处理接收到的消息            me.dealReceiveLastestMsg(JSON.parse(res.data));        });        // 7. 连接错误事件处理        me.globalData.CHAT.onError(function(){            me.globalData.chatSocketOpen = false;            console.log('WebSocket连接打开失败,请检查!');        });    }},// 8. 发送WebSocket消息的通用方法sendSocketMessage(msg) {    // 检查连接状态,只有在连接开启时才发送    if (this.globalData.chatSocketOpen) {        uni.sendSocketMessage({            data: msg        });    } else {        uni.showToast({            icon"none",            title"您已断开聊天服务器的连接"        })    }},// 9. 处理接收到的消息dealReceiveLastestMsg(msgJSON) {    console.log(msgJSON);    var chatMsg = msgJSON.chatMsg;    var chatTime = msgJSON.chatTime;    var senderId = chatMsg.senderId;    var receiverType = chatMsg.receiverType;    console.log('chatMsg.receiverType = ' + receiverType);    var me = this;    // 获取发送者的用户信息    var userId = me.getUserInfoSession().id;    var userToken = me.getUserSessionToken();    var serverUrl = me.globalData.serverUrl;    // 请求发送者详细信息    uni.request({        method"POST",        header: {            headerUserId: userId,            headerUserToken: userToken        },        url: serverUrl + "/userinfo/get?userId=" + senderId,        success(result) {            if (result.data.status == 200) {                var currentSourceUserInfo = result.data.data;                me.currentSourceUserInfo = currentSourceUserInfo;                // 根据消息类型设置显示内容                var msgShow = chatMsg.msg;                if (chatMsg.msgType == 2) {                    msgShow = "[图片]"                } elseif (chatMsg.msgType == 4) {                    msgShow = "[视频]"                } elseif (chatMsg.msgType == 3) {                    msgShow = "[语音]"                }                 // 保存最新消息到本地存储                me.saveLastestMsgToLocal(senderId, currentSourceUserInfo, msgShow, chatTime, msgJSON);            }        }    })},// 10. 将最新消息保存到本地存储saveLastestMsgToLocal(sourceUserId, sourceUser, msgContent, chatTime, msgJSON) {    // 构造最新消息对象    var lastMsg = {        sourceUserId: sourceUserId,     // 源头用户,聊天对象        name: sourceUser.nickname,        face: sourceUser.face,        msgContent: msgContent,        chatTime: chatTime,        unReadCounts0,        communicationType1,   // 1:单聊,2:群聊    }    // 获取本地存储的聊天列表    var lastestUserChatList = uni.getStorageSync("lastestUserChatList");    if (lastestUserChatList == null || lastestUserChatList == undefined || lastestUserChatList == "") {        lastestUserChatList = [];    }    // 更新或新增消息记录    var dealMsg = false;    for (var i = 0; i < lastestUserChatList.length; i++) {        var tmp = lastestUserChatList[i];        if (tmp.sourceUserId == lastMsg.sourceUserId) {            // 已存在聊天记录,更新最新消息            lastestUserChatList.splice(i, 1, lastMsg);            dealMsg = true;            break;        }    }    if (!dealMsg) {        // 新的聊天对象,添加到列表开头        lastestUserChatList.unshift(lastMsg);    }    // 保存更新后的聊天列表    uni.setStorageSync("lastestUserChatList", lastestUserChatList);    // 通知UI更新    uni.$emit('reRenderReceiveMsgInMsgVue'"domeafavor");    uni.$emit('receiveMsgInMsgListVue', msgJSON);},// 11. 关闭WebSocket连接closeWSConnect() {    this.globalData.CHAT.close();
}
 
本节将围绕前后端关键实现展开,给大家展示如何基于 Netty 开发即时通讯服务。
前端
本文侧重于后端服务的构建,因此前端只展示核心通信代码。以下代码实现了与 Netty 服务器建立 WebSocket 连接、消息收发及状态管理的关键功能,为后续后端实现提供了交互基础。
后端
万事要开头,始于导入依赖。(Ps:这里大家在实操的时候去Maven仓库找最新版本,不一定非要和我的版本一样)
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
<version>4.2.0.Final</version>
</dependency>
1、首先创建服务器启动类,这是整个Netty服务器的入口点,负责配置和启动WebSocket服务器。
import com.pitayafruits.netty.websocket.WSServerInitializer;
import io.netty.bootstrap.ServerBootstrap;import io.netty.channel.ChannelFuture;import io.netty.channel.EventLoopGroup;import io.netty.channel.nio.NioEventLoopGroup;import io.netty.channel.socket.nio.NioServerSocketChannel;// netty 服务启动类publicclassChatServer {publicstaticvoidmain(String[] args)throws InterruptedException {EventLoopGroupbossGroup=newNioEventLoopGroup();EventLoopGroupworkerGroup=newNioEventLoopGroup();try {ServerBootstrapserver=newServerBootstrap();            server.group(bossGroup, workerGroup)                       .channel(NioServerSocketChannel.class                    .childHandler(newWSServerInitializer()); ChannelFuturechannelFuture= server.bind(875).sync();            channelFuture.channel().closeFuture().sync();        } finally {            bossGroup.shutdownGracefully();            workerGroup.shutdownGracefully();        }    }
}
关键点 :
  1. 采用Reactor模式,使用两个线程池:bossGroup和workerGroup
  2. bossGroup负责接受客户端连接
  3. workerGroup负责处理IO操作
  4. 服务器绑定在875端口
2、接下来创建通道初始化器,负责配置每个新建立的连接的通道,设置处理器链(Pipeline)。
//初始化器,channel注册后,会执行里面相应的初始化方法
publicclassWSServerInitializerextendsChannelInitializer<SocketChannel> {
@Override
protectedvoidinitChannel(SocketChannel socketChannel)throws Exception {
ChannelPipelinepipeline= socketChannel.pipeline();
        pipeline.addLast(newHttpServerCodec());
        pipeline.addLast(newChunkedWriteHandler());
        pipeline.addLast(newHttpObjectAggregator(1024 * 64));
        pipeline.addLast(newWebSocketServerProtocolHandler("/ws"));
        pipeline.addLast(newChatHandler());
    }
}
关键点 :
  1. 处理HTTP协议: HttpServerCodec 、 ChunkedWriteHandler 、 HttpObjectAggregator
  2. 处理WebSocket协议: WebSocketServerProtocolHandler("/ws") ,指定WebSocket的路由为"/ws"
  3. 添加自定义业务处理器: ChatHandler ,处理具体的消息交互逻辑
3、接着创建会话管理器,管理用户ID与通道(Channel)之间的映射关系,支持同一用户多端登录。(Ps:这个根据实际业务情况来,如果不需要支持多端登录,则不需要创建。)

publicclassUserChannelSession {
privatestatic Map<String, List<Channel>> multiSession = newHashMap<>();privatestatic Map<StringString> userChannelIdRelation = newHashMap<>();publicstaticvoidputUserChannelIdRelation(String userId, String channelId) {        userChannelIdRelation.put(channelId, userId);    }publicstatic String getUserIdByChannelId(String channelId) {return userChannelIdRelation.get(channelId);    }publicstaticvoidputMultiChannels(String userId, Channel channel) {        List<Channel> channels = getMultiChannels(userId);if (channels == null || channels.size() == 0) {            channels = newArrayList<>();        }        channels.add(channel);        multiSession.put(userId, channels);    }publicstaticvoidremoveUserChannels(String userId, String channelId) {        List<Channel> channels = getMultiChannels(userId);if (channels == null || channels.size() == 0) {return;        }for (Channel channel : channels) {if (channel.id().asLongText().equals(channelId)) {                channels.remove(channel);            }        }        multiSession.put(userId, channels);    }publicstatic List<Channel> getMultiChannels(String userId) {return multiSession.get(userId);    }
}
4、最后是创建消息处理器,它是核心业务逻辑处理器,负责处理客户端发送的WebSocket消息。大家可以注意到这里对于消息类型留了扩展的口子,本次我们实现先只实现文字消息。

publicclassChatHandlerextendsSimpleChannelInboundHandler<TextWebSocketFrame> {
publicstaticChannelGroupclients=newDefaultChannelGroup(GlobalEventExecutor.INSTANCE);@OverrideprotectedvoidchannelRead0(ChannelHandlerContext channelHandlerContext,                                TextWebSocketFrame textWebSocketFrame) throws Exception {Stringcontent= textWebSocketFrame.text();DataContentdataContent= JsonUtils.jsonToPojo(content, DataContent.class);ChatMsgchatMsg= dataContent.getChatMsg();StringmsgText= chatMsg.getMsg();StringreceiverId= chatMsg.getReceiverId();StringsenderId= chatMsg.getSenderId();        chatMsg.setChatTime(LocalDateTime.now());IntegermsgType= chatMsg.getMsgType();ChannelcurrentChannel= channelHandlerContext.channel();StringcurrentChannelId= currentChannel.id().asLongText();if (msgType == MsgTypeEnum.CONNECT_INIT.type) {            UserChannelSession.putMultiChannels(senderId, currentChannel);            UserChannelSession.putUserChannelIdRelation(currentChannelId, senderId);        } elseif (msgType == MsgTypeEnum.WORDS.type) {            List<Channel> receiverChannels = UserChannelSession.getMultiChannels(receiverId);if (receiverChannels == null || receiverChannels.size() == 0 || receiverChannels.isEmpty()) {                chatMsg.setIsReceiverOnLine(false);            } else {                chatMsg.setIsReceiverOnLine(true);for (Channel receiverChannel : receiverChannels) {ChannelfindChannel= clients.find(receiverChannel.id());if (findChannel != null) {                        dataContent.setChatMsg(chatMsg);StringchatTimeFormat=                                LocalDateUtils.format(chatMsg.getChatTime(), LocalDateUtils.DATETIME_PATTERN_2);                        dataContent.setChatTime(chatTimeFormat);                        findChannel.writeAndFlush(newTextWebSocketFrame(                                        JsonUtils.objectToJson(dataContent)));                    }                }            }        }        currentChannel.writeAndFlush(newTextWebSocketFrame(currentChannelId));    }@OverridepublicvoidhandlerAdded(ChannelHandlerContext ctx)throws Exception {ChannelcurrentChannel= ctx.channel();        clients.add(currentChannel);    }@OverridepublicvoidhandlerRemoved(ChannelHandlerContext ctx)throws Exception {ChannelcurrentChannel= ctx.channel();StringuserId= UserChannelSession.getUserIdByChannelId(currentChannel.id().asLongText());        UserChannelSession.removeUserChannels(userId, currentChannel.id().asLongText());        clients.remove(currentChannel);    }@OverridepublicvoidexceptionCaught(ChannelHandlerContext ctx, Throwable cause)throws Exception {Channelchannel= ctx.channel();        channel.close();        clients.remove(channel);StringuserId= UserChannelSession.getUserIdByChannelId(channel.id().asLongText());        UserChannelSession.removeUserChannels(userId, channel.id().asLongText());    }
}
再梳理一下完整流程:
1、服务器启动 : ChatServer 创建并配置Netty服务器,设置线程模型和端口;
2、通道初始化 :当有新连接时, WSServerInitializer 设置处理器链Pipeline;
3、连接建立 ChatHandler.handlerAdded() 将连接添加到ChannelGroup;
4、消息处理 :
  1. 客户端先发送初始化消息,建立用户ID与Channel的映射关系;
  2. 客户端后续发送聊天消息,服务器查找接收者的Channel并转发消息;
5、连接断开 ChatHandler.handlerRemoved() 清理资源,移除映射关系。
06小结
至此,我们已成功构建了一个基于 Netty 的即时通讯服务。虽然当前实现仍有一些局限,如缺少离线消息存储机制、消息类型较为单一、未实现消息持久化等,但本文 + 代码示例给大家展示了基于 Netty 构建聊天服务的核心架构与完整流程。
基于现有示例,可以轻松地扩展更多功能,如添加消息队列实现离线消息推送、集成数据库实现消息持久化、增加群聊和多媒体消息支持等。
希望本文能为各位读者提供实现思路,也鼓励大家在这个基础上进行实践操作,打造更加完善的即时通讯服务。
THE END