Netty系列(三):Netty服务端发送消息到客户端
admin
2024-03-13 12:17:39

通常客户端只会主动发送心跳消息,目的是为了保持与服务端连接,而其他消息往往需要服务端发送消息至客户端调取。

实现步骤

  1. 客户端在第一次与服务端建立连接时,将此连接的通道在 Map 中保存下来,为了保证线程安全,可以使用线程安全的 ConcurrentHashMap

  2. 在发送消息给客户端时,通过设备标识遍历 ConcurrentHashMap 找到目标客户端连接通道。找到后先判断通道是否存活,如果连接是存活状态,就通过此通道发送消息给客户端,如果不是存活状态,就从 Map 中删除此通道信息。

  3. 将消息发送至客户端后,服务端正常接收客户端传回的信息。

实现代码

前两篇文章中已经提供了 netty 的整体框架代码,这里只提供一些核心的关键代码,其余代码不再赘述。

指路:

  1. Netty系列(一):Springboot整合Netty,自定义协议实现
  2. Netty系列(二):Netty拆包/沾包问题的解决方案

新建一个 ChannelMap 类,在客户端第一次连接时保存 channel 连接。后续服务端向客户端发送消息时,先从 Map 中找到对应的客户端消息通道连接,再向通道中写入消息进行发送。


/*** @Author 鳄鱼儿* @Description 连接通道保存MAP* @date 2022/11/27 16:30* @Version 1.0*/public class ChannelMap {/*** 存放客户端标识ID(消息ID)与channel的对应关系*/private static volatile ConcurrentHashMap channelMap = null;private ChannelMap() {}public static ConcurrentHashMap getChannelMap() {if (null == channelMap) {synchronized (ChannelMap.class) {if (null == channelMap) {channelMap = new ConcurrentHashMap<>();}}}return channelMap;}public static Channel getChannel(String id) {return getChannelMap().get(id);}
}

在客户端建立连接(服务端收到心跳消息)时,将channel加入map中。


public class ServerListenerHandler extends SimpleChannelInboundHandler {private static final Logger log = LoggerFactory.getLogger(ServerListenerHandler.class);/*** 设备接入连接时处理** @param ctx*/@Overridepublic void handlerAdded(ChannelHandlerContext ctx) {log.info("有新的连接:[{}]", ctx.channel().id().asLongText());}/*** 数据处理** @param ctx* @param msg*/@Overrideprotected void channelRead0(ChannelHandlerContext ctx, Message msg) {// 获取消息实例中的消息体String content = msg.getContent();// 对不同消息类型进行处理MessageEnum type = MessageEnum.getStructureEnum(msg);switch (type) {case CONNECT:// 将通道加入ChannelMapChannelMap.getChannelMap().put(msg.getId(), ctx.channel());// 将客户端ID作为自定义属性加入到channel中,方便随时channel中获取用户IDAttributeKey key = AttributeKey.valueOf("id");ctx.channel().attr(key).setIfAbsent(msg.getId());// TODO 心跳消息处理case STATE:// TODO 设备状态default:System.out.println(type.content + "消息内容" + content);}}/*** 设备下线处理** @param ctx*/@Overridepublic void handlerRemoved(ChannelHandlerContext ctx) {log.info("设备下线了:{}", ctx.channel().id().asLongText());// map中移除channelremoveId(ctx);}/*** 设备连接异常处理** @param ctx* @param cause*/@Overridepublic void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {// 打印异常log.info("异常:{}", cause.getMessage());// map中移除channelremoveId(ctx);// 关闭连接ctx.close();}private void removeId(ChannelHandlerContext ctx) {AttributeKey key = AttributeKey.valueOf("id");// 获取channel中idString id = ctx.channel().attr(key).get();// map移除channelChannelMap.getChannelMap().remove(id);}
}

写一个服务端发送消息的业务层类,并通过客户端id在map中获取到channel通道,将消息转化成json字符串后,通过writeAndFlush发送至客户端。


/*** @Author 鳄鱼儿* @Description 向客户端发送消息* @date 2022/11/27 17:29* @Version 1.0*/@Service
public class PushMsgServiceImpl implements PushMsgService {/*** 向一个客户端发送消息** @param msg*/@Overridepublic void push(Message msg) {// 客户端IDString id = msg.getId();Channel channel = ChannelMap.getChannel(id);if (null == channel) {throw new RuntimeException("客户端已离线");}channel.writeAndFlush(msg);}
}

注意:writeAndFlush参数是自定义编码的泛型对象实例。如本文自定义的Message消息解析类。

public class MessageEncodeHandler extends MessageToByteEncoder {private static String delimiter;public MessageEncodeHandler(String delimiter) {this.delimiter = delimiter;}@Overrideprotected void encode(ChannelHandlerContext channelHandlerContext, Message message, ByteBuf out) throws Exception {out.writeBytes((message.toJsonString() + delimiter).getBytes(CharsetUtil.UTF_8));}
}

之后再编写一个Controller类(这里省略),在Controller类中调用PushMsgService中pushff,就可以完成对客户端的消息发送。

相关内容

热门资讯

总算了解!HHPoker德州外... 亲,HHPoker德州这款游戏可以开挂的,确实是有挂的,很多玩家在这款游戏中打牌都会发现很多用户的牌...
总算了解!【AAPoKer】外... 自定义【AAPoKer系统规律,只需要输入自己想要的开挂功能,一键便可以生成出微扑克专用辅助器,不管...
总算了解!aapoker德州外... 值得一提的是,科技开挂秘籍必备方法+v(8207163)aapoker德州计算辅助可以实时透视知道a...
总算了解!hhpoker大菠萝... 总算了解!hhpoker大菠萝外挂软件透明挂器!(辅助挂)必胜教程(2021已更新)(哔哩哔哩)亲,...
总算了解!88Poker外挂软... 您好.88Poker这款游戏可以开挂的,确实是有挂的,需要了解加微【8207163】很多玩家在这款游...