Spring Boot 3 + Netty 构建高并发即时通讯服务
-
阻塞:线程请求资源时,如果资源未就绪,线程会一直等待,期间无法执行其他任务 -
非阻塞:线程请求资源时,无论资源是否就绪,都会立即返回,线程可以继续执行其他任务
-
同步:线程主动等待资源就绪并主动获取结果 -
异步:线程不主动等待,通过回调或通知机制在资源就绪时获得结果


-
阻塞/非阻塞:是关于你在等待结果时能不能去做别的事 -
同步/异步:是关于谁负责"盯着"任务完成 - 是你自己不断去查看,还是由别人通知你

-
工作流程:线程发起 IO 请求 → 线程阻塞等待 → IO 操作完成 → 线程继续执行 -
资源消耗:每个连接需要独立的线程处理,当并发连接数增加时,系统需要创建大量线程 -
适用场景:连接数较少、请求处理时间短的简单应用 -
优缺点:实现简单直观,但在高并发场景下性能严重下降
-
工作流程: Selector (选择器)监控多个 Channel (通道) → 有事件发生时处理对应 Channel (通道) → 处理完成后继续监控 -
资源效率:单个线程可以管理多个连接,显著提高了系统的并发处理能力 -
适用场景:高并发、低延迟的网络应用,如聊天服务器、游戏服务器

-
工作流程:应用程序发起 IO 请求 → 继续执行其他任务 → IO 完成后回调通知 -
特点:完全由操作系统通知机制驱动,应用无需主动轮询,真正实现了异步通信 -
局限性:虽然 API 设计先进,但在 Linux 系统上实际表现不如预期,底层仍依赖多路复用 -
适用场景:需要极高吞吐量的应用,特别是在 Windows 平台
-
BIO (阻塞I/O) : 好比你去餐厅点餐,点完后只能坐在位置上干等,不能做其他事情,直到服务员把饭菜端上来。每个客人都需要一个专属服务员全程等候。 -
NIO (非阻塞I/O) : 好比你去餐厅点餐,拿了一个取餐号,然后可以去逛逛、玩手机。但你需要时不时自己去看电子屏幕,检查自己的号码是否显示。一个服务员可以同时应对多个客人的点餐需求。 -
AIO (异步I/O) : 好比你去餐厅点餐,点完后给服务员留下手机号,然后你可以随意活动。餐做好后,服务员会主动打电话通知你来取餐。你完全不需要操心查看进度。


-
单线程模型:所有的 IO 操作(连接建立、数据读写、业务处理)均由同一个 NIO 线程完成。结构简单,适用于连接数少、业务逻辑轻量的场景。 -
多线程模型:由一组 NIO 线程共同处理所有 IO 操作,提高系统吞吐量。每个连接可能被不同线程处理,但同一时刻只会被一个线程处理,避免了多线程竞争。 -
主从线程模型:将职责分离 - Boss 线程池专门负责接受连接请求,Worker 线程池负责处理IO和业务逻辑。这种模型在高并发场景下表现最佳。

-
注册(Registered):Channel 被注册到 EventLoop 上 -
激活(Active):Channel 连接建立并就绪 -
非激活(Inactive):Channel 连接断开 -
注销(Unregistered):Channel 从 EventLoop 中注销
-
添加:handlerAdded() 在 Handler 被添加到 ChannelPipeline 时调用 -
移除:handlerRemoved() 在Handler从ChannelPipeline 移除时调用 -
异常:exceptionCaught() 在处理过程中发生异常时调用
-
创建服务器引导类:使用 ServerBootstrap 配置服务器参数 -
设置线程模型:指定 Boss 线程组和 Worker 线程组 -
配置Channel:选择 NioServerSocketChannel 等实现 -
添加处理器:配置 ChannelInitializer 来设置每个新连接的处理链 -
绑定端口:调用 bind() 方法启动服务器并监听端口
-
原理:客户端定时向服务器发送请求,检查是否有新数据。 -
优势:实现简单,兼容性极佳,几乎所有浏览器都支持,服务器逻辑直观。 -
劣势:产生大量无效请求浪费资源,实时性受轮询间隔限制,延迟明显,高并发时可能造成服务器压力。
-
原理:客户端发送请求后,服务器保持连接不立即响应,直到有新数据或超时才返回,客户端收到后立即发起新请求。 -
优势:减少无效请求,实时性较轮询有所提升,兼容性良好。 -
劣势:服务器需维持大量连接,高并发场景资源消耗大,仍有一定延迟。
-
原理:建立单一TCP连接后提供持久双向通信通道,双方可随时发送数据。 -
优势:真正的实时双向通信,延迟低,协议开销小,适合频繁数据交换,资源消耗相对较低。 -
劣势:实现复杂度较高,部分老旧浏览器不支持,某些网络环境可能受限。
// 1. WebSocket连接全局配置
globalData: { // WebSocket服务器连接地址 chatServerUrl: "ws://127.0.0.1:875/ws", // 全局WebSocket连接对象 CHAT: null, // 标记WebSocket连接状态 chatSocketOpen: false,},// 2. 应用启动时初始化WebSocket连接onLaunch: function() { // 程序启动时连接聊天服务器 this.doConnect(false);},// 3. 核心方法:建立WebSocket连接doConnect(isFirst) { // 重连时显示提示 if (isFirst) { uni.showToast({ icon: "loading", title: "断线重连中...", duration: 2000 }); } var me = this; // 仅当用户已登录时才连接WebSocket if (me.getUserInfoSession() != null && me.getUserInfoSession() != "" && me.getUserInfoSession() != undefined) { // 创建WebSocket连接 me.globalData.CHAT = uni.connectSocket({ url: me.globalData.chatServerUrl, complete: ()=> {} }); // 4. 连接成功事件处理 me.globalData.CHAT.onOpen(function(){ // 更新连接状态标记 me.globalData.chatSocketOpen = true; console.log("ws连接已打开,socketOpen = " + me.globalData.chatSocketOpen); // 构建初始化消息(消息类型0表示连接初始化) var chatMsg = { senderId: me.getUserInfoSession().id, msgType: 0 } var dataContent = { chatMsg: chatMsg } var msgPending = JSON.stringify(dataContent); // 发送初始化消息,通知服务器用户身份 me.globalData.CHAT.send({ data: msgPending }); }); // 5. 连接关闭事件处理 me.globalData.CHAT.onClose(function(){ me.globalData.chatSocketOpen = false; console.log("ws连接已关闭,socketOpen = " + me.globalData.chatSocketOpen); }); // 6. 接收消息事件处理 me.globalData.CHAT.onMessage(function(res){ console.log('App.vue 收到服务器内容:' + res.data); // 处理接收到的消息 me.dealReceiveLastestMsg(JSON.parse(res.data)); }); // 7. 连接错误事件处理 me.globalData.CHAT.onError(function(){ me.globalData.chatSocketOpen = false; console.log('WebSocket连接打开失败,请检查!'); }); }},// 8. 发送WebSocket消息的通用方法sendSocketMessage(msg) { // 检查连接状态,只有在连接开启时才发送 if (this.globalData.chatSocketOpen) { uni.sendSocketMessage({ data: msg }); } else { uni.showToast({ icon: "none", title: "您已断开聊天服务器的连接" }) }},// 9. 处理接收到的消息dealReceiveLastestMsg(msgJSON) { console.log(msgJSON); var chatMsg = msgJSON.chatMsg; var chatTime = msgJSON.chatTime; var senderId = chatMsg.senderId; var receiverType = chatMsg.receiverType; console.log('chatMsg.receiverType = ' + receiverType); var me = this; // 获取发送者的用户信息 var userId = me.getUserInfoSession().id; var userToken = me.getUserSessionToken(); var serverUrl = me.globalData.serverUrl; // 请求发送者详细信息 uni.request({ method: "POST", header: { headerUserId: userId, headerUserToken: userToken }, url: serverUrl + "/userinfo/get?userId=" + senderId, success(result) { if (result.data.status == 200) { var currentSourceUserInfo = result.data.data; me.currentSourceUserInfo = currentSourceUserInfo; // 根据消息类型设置显示内容 var msgShow = chatMsg.msg; if (chatMsg.msgType == 2) { msgShow = "[图片]" } elseif (chatMsg.msgType == 4) { msgShow = "[视频]" } elseif (chatMsg.msgType == 3) { msgShow = "[语音]" } // 保存最新消息到本地存储 me.saveLastestMsgToLocal(senderId, currentSourceUserInfo, msgShow, chatTime, msgJSON); } } })},// 10. 将最新消息保存到本地存储saveLastestMsgToLocal(sourceUserId, sourceUser, msgContent, chatTime, msgJSON) { // 构造最新消息对象 var lastMsg = { sourceUserId: sourceUserId, // 源头用户,聊天对象 name: sourceUser.nickname, face: sourceUser.face, msgContent: msgContent, chatTime: chatTime, unReadCounts: 0, communicationType: 1, // 1:单聊,2:群聊 } // 获取本地存储的聊天列表 var lastestUserChatList = uni.getStorageSync("lastestUserChatList"); if (lastestUserChatList == null || lastestUserChatList == undefined || lastestUserChatList == "") { lastestUserChatList = []; } // 更新或新增消息记录 var dealMsg = false; for (var i = 0; i < lastestUserChatList.length; i++) { var tmp = lastestUserChatList[i]; if (tmp.sourceUserId == lastMsg.sourceUserId) { // 已存在聊天记录,更新最新消息 lastestUserChatList.splice(i, 1, lastMsg); dealMsg = true; break; } } if (!dealMsg) { // 新的聊天对象,添加到列表开头 lastestUserChatList.unshift(lastMsg); } // 保存更新后的聊天列表 uni.setStorageSync("lastestUserChatList", lastestUserChatList); // 通知UI更新 uni.$emit('reRenderReceiveMsgInMsgVue', "domeafavor"); uni.$emit('receiveMsgInMsgListVue', msgJSON);},// 11. 关闭WebSocket连接closeWSConnect() { this.globalData.CHAT.close();
}
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
<version>4.2.0.Final</version>
</dependency>
import com.pitayafruits.netty.websocket.WSServerInitializer;
import io.netty.bootstrap.ServerBootstrap;import io.netty.channel.ChannelFuture;import io.netty.channel.EventLoopGroup;import io.netty.channel.nio.NioEventLoopGroup;import io.netty.channel.socket.nio.NioServerSocketChannel;// netty 服务启动类publicclassChatServer {publicstaticvoidmain(String[] args)throws InterruptedException {EventLoopGroupbossGroup=newNioEventLoopGroup();EventLoopGroupworkerGroup=newNioEventLoopGroup();try {ServerBootstrapserver=newServerBootstrap(); server.group(bossGroup, workerGroup) .channel(NioServerSocketChannel.class) .childHandler(newWSServerInitializer()); ChannelFuturechannelFuture= server.bind(875).sync(); channelFuture.channel().closeFuture().sync(); } finally { bossGroup.shutdownGracefully(); workerGroup.shutdownGracefully(); } }
}
-
采用Reactor模式,使用两个线程池:bossGroup和workerGroup -
bossGroup负责接受客户端连接 -
workerGroup负责处理IO操作 -
服务器绑定在875端口
//初始化器,channel注册后,会执行里面相应的初始化方法
publicclassWSServerInitializerextendsChannelInitializer<SocketChannel> {
@Override
protectedvoidinitChannel(SocketChannel socketChannel)throws Exception {
ChannelPipelinepipeline= socketChannel.pipeline();
pipeline.addLast(newHttpServerCodec());
pipeline.addLast(newChunkedWriteHandler());
pipeline.addLast(newHttpObjectAggregator(1024 * 64));
pipeline.addLast(newWebSocketServerProtocolHandler("/ws"));
pipeline.addLast(newChatHandler());
}
}
-
处理HTTP协议: HttpServerCodec 、 ChunkedWriteHandler 、 HttpObjectAggregator -
处理WebSocket协议: WebSocketServerProtocolHandler("/ws") ,指定WebSocket的路由为"/ws" -
添加自定义业务处理器: ChatHandler ,处理具体的消息交互逻辑
publicclassUserChannelSession {
privatestatic Map<String, List<Channel>> multiSession = newHashMap<>();privatestatic Map<String, String> userChannelIdRelation = newHashMap<>();publicstaticvoidputUserChannelIdRelation(String userId, String channelId) { userChannelIdRelation.put(channelId, userId); }publicstatic String getUserIdByChannelId(String channelId) {return userChannelIdRelation.get(channelId); }publicstaticvoidputMultiChannels(String userId, Channel channel) { List<Channel> channels = getMultiChannels(userId);if (channels == null || channels.size() == 0) { channels = newArrayList<>(); } channels.add(channel); multiSession.put(userId, channels); }publicstaticvoidremoveUserChannels(String userId, String channelId) { List<Channel> channels = getMultiChannels(userId);if (channels == null || channels.size() == 0) {return; }for (Channel channel : channels) {if (channel.id().asLongText().equals(channelId)) { channels.remove(channel); } } multiSession.put(userId, channels); }publicstatic List<Channel> getMultiChannels(String userId) {return multiSession.get(userId); }
}
publicclassChatHandlerextendsSimpleChannelInboundHandler<TextWebSocketFrame> {
publicstaticChannelGroupclients=newDefaultChannelGroup(GlobalEventExecutor.INSTANCE);@OverrideprotectedvoidchannelRead0(ChannelHandlerContext channelHandlerContext, TextWebSocketFrame textWebSocketFrame) throws Exception {Stringcontent= textWebSocketFrame.text();DataContentdataContent= JsonUtils.jsonToPojo(content, DataContent.class);ChatMsgchatMsg= dataContent.getChatMsg();StringmsgText= chatMsg.getMsg();StringreceiverId= chatMsg.getReceiverId();StringsenderId= chatMsg.getSenderId(); chatMsg.setChatTime(LocalDateTime.now());IntegermsgType= chatMsg.getMsgType();ChannelcurrentChannel= channelHandlerContext.channel();StringcurrentChannelId= currentChannel.id().asLongText();if (msgType == MsgTypeEnum.CONNECT_INIT.type) { UserChannelSession.putMultiChannels(senderId, currentChannel); UserChannelSession.putUserChannelIdRelation(currentChannelId, senderId); } elseif (msgType == MsgTypeEnum.WORDS.type) { List<Channel> receiverChannels = UserChannelSession.getMultiChannels(receiverId);if (receiverChannels == null || receiverChannels.size() == 0 || receiverChannels.isEmpty()) { chatMsg.setIsReceiverOnLine(false); } else { chatMsg.setIsReceiverOnLine(true);for (Channel receiverChannel : receiverChannels) {ChannelfindChannel= clients.find(receiverChannel.id());if (findChannel != null) { dataContent.setChatMsg(chatMsg);StringchatTimeFormat= LocalDateUtils.format(chatMsg.getChatTime(), LocalDateUtils.DATETIME_PATTERN_2); dataContent.setChatTime(chatTimeFormat); findChannel.writeAndFlush(newTextWebSocketFrame( JsonUtils.objectToJson(dataContent))); } } } } currentChannel.writeAndFlush(newTextWebSocketFrame(currentChannelId)); }@OverridepublicvoidhandlerAdded(ChannelHandlerContext ctx)throws Exception {ChannelcurrentChannel= ctx.channel(); clients.add(currentChannel); }@OverridepublicvoidhandlerRemoved(ChannelHandlerContext ctx)throws Exception {ChannelcurrentChannel= ctx.channel();StringuserId= UserChannelSession.getUserIdByChannelId(currentChannel.id().asLongText()); UserChannelSession.removeUserChannels(userId, currentChannel.id().asLongText()); clients.remove(currentChannel); }@OverridepublicvoidexceptionCaught(ChannelHandlerContext ctx, Throwable cause)throws Exception {Channelchannel= ctx.channel(); channel.close(); clients.remove(channel);StringuserId= UserChannelSession.getUserIdByChannelId(channel.id().asLongText()); UserChannelSession.removeUserChannels(userId, channel.id().asLongText()); }
}
-
客户端先发送初始化消息,建立用户ID与Channel的映射关系; -
客户端后续发送聊天消息,服务器查找接收者的Channel并转发消息;
版权声明:
作者:shixingao
链接:https://www.xincraft.cn/289.html
文章版权归作者所有,未经允许请勿转载。
THE END







