Skip to content

netty+websocket实现即时通讯功能

大致思路:

  • 后台应用启动后开启nettyserver
  • 前台登录后使用websocket连接netty
  • 登录时先向netty发送一条初始化消息,服务器将保存通道和当前用户的关系映射
  • 通讯双方都上线时,即可以开始聊天

后台:

  • 应用启动后,开启netty服务器
java
@Component
public class ServerStarter implements ApplicationListener<ContextRefreshedEvent> {

    @Override
    public void onApplicationEvent(ContextRefreshedEvent event) {
        if (event.getApplicationContext().getParent() == null ){
            try {
                new IMServer().start();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
  • nettyserver
java
/**
 * @author Xc
 * @description
 * @createdTime 2020/7/9 9:23
 */
public class IMServer {
    Logger logger = LoggerFactory.getLogger(IMServer.class);

    public void start() throws InterruptedException {
        NioEventLoopGroup boss = new NioEventLoopGroup();
        NioEventLoopGroup worker = new NioEventLoopGroup();
        ServerBootstrap serverBootstrap = new ServerBootstrap();
        serverBootstrap.group(boss,worker)
                .channel(NioServerSocketChannel.class)
                .localAddress(8000)
		//自定义初始化器
                .childHandler(new IMStoryInitializer());
        ChannelFuture future = serverBootstrap.bind();
        future.addListener(new ChannelFutureListener() {
            @Override
            public void operationComplete(ChannelFuture future) throws Exception {
                logger.info("server start on 8000");
            }
        });
    }
}
  • 初始化器
java
/**
 * @author Xc
 * @description
 * @createdTime 2020/7/9 9:28
 */
public class IMStoryInitializer extends ChannelInitializer<SocketChannel> {
    
    @Override
    protected void initChannel(SocketChannel ch) throws Exception {
        ChannelPipeline pipeline = ch.pipeline();
        //http编解码器
        pipeline.addLast(new HttpServerCodec());
        //以块写数据
        pipeline.addLast(new ChunkedWriteHandler());
        //聚合器
        pipeline.addLast(new HttpObjectAggregator(64*1024));

        //websocket
        pipeline.addLast(new WebSocketServerProtocolHandler("/ws"));
        //自定义handler
        pipeline.addLast(new ChatHandler());
        
    }
}
  • 自定义handler
java
/**
 * @author Xc
 * @description
 * @createdTime 2020/7/9 9:34
 */
public class ChatHandler extends SimpleChannelInboundHandler<TextWebSocketFrame> {

    /**
     * 管理所有channel
     */
    public static ChannelGroup channels = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);

    @Override
    protected void channelRead0(ChannelHandlerContext ctx, TextWebSocketFrame msg) throws Exception {
        //客户端发过来的消息
        String content = msg.text();
        //当前通道
        Channel channel = ctx.channel();

        Message message = JSON.parseObject(content, Message.class);

        String data = message.getMsg();

        String fromUser = message.getFromUser();
        //客户端建立连接后先发送一条init消息,后台保存这个通道和用户信息的映射
        if (StringUtils.equals(message.getAction(), "init")) {
            ChannelUserContext.put(fromUser,channel);
        } else if (StringUtils.equals(message.getAction(),"chat")){
            Channel toChannel = ChannelUserContext.get(message.getToUser());
            if (toChannel != null) {
                //消息接收方在线
                toChannel.writeAndFlush(new TextWebSocketFrame(JSON.toJSONString(message.getFromUser() + " : " + message.getMsg())));
            } else {
                //接收方不在线 离线消息推送
            }

        }

    }

    @Override
    public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
        channels.add(ctx.channel());
    }

    @Override
    public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
        channels.remove(ctx.channel());
    }
}
  • 通道和用户的映射保存
java
/**
 * @author Xc
 * @description
 * @createdTime 2020/7/9 9:46
 */
public class ChannelUserContext {

    private static ConcurrentHashMap<String, Channel> userChannelMap;

    static{
        userChannelMap = new ConcurrentHashMap<>();
    }

    public static void put(String user, Channel channel){
        userChannelMap.put(user,channel);
    }

    public static Channel get(String user) {
        return userChannelMap.get(user);
    }
    
}
  • message
java
/**
 * @author Xc
 * @description
 * @createdTime 2020/7/9 9:38
 */
@Data
public class Message implements Serializable {
    private static final long serialVersionUID = 301234912340234L;
    private String msg;
    private String fromUser;
    private String toUser;
    private String action;
}

前台页面1

html
<!DOCTYPE html>
<html lang="en">
<head>
    <meta charset="UTF-8">
    <title>WebSocket客户端</title>
</head>
<body>
<script type="text/javascript">
    var socket;

    //如果浏览器支持WebSocket
    if(window.WebSocket){
        //参数就是与服务器连接的地址
        socket = new WebSocket("ws://localhost:8000/ws");

        //客户端收到服务器消息的时候就会执行这个回调方法
        socket.onmessage = function (event) {
			console.log(event);
            var ta = document.getElementById("responseText");
            ta.value = ta.value + "\n"+event.data;
        }

        //连接建立的回调函数
        socket.onopen = function(event){
            var ta = document.getElementById("responseText");

            ta.value = "连接开启";
            var message = '{"action":"init","msg":"test","fromUser":"张三","toUser":"李四"}';
            socket.send(message);
        }

        //连接断掉的回调函数
        socket.onclose = function (event) {
            var ta = document.getElementById("responseText");
            ta.value = ta.value +"\n"+"连接关闭";
        }
    }else{
        alert("浏览器不支持WebSocket!");
    }

    //发送数据
    function send(message){
        if(!window.WebSocket){
            return;
        }

        //当websocket状态打开
        if(socket.readyState == WebSocket.OPEN){
			message = '{"action":"chat","msg":"'+ message +'","fromUser":"张三","toUser":"李四"}';
            socket.send(message);
        }else{
            alert("连接没有开启");
        }
    }
</script>
<form onsubmit="return false">
    <textarea name = "message" style="width: 400px;height: 200px"></textarea>

    <input type ="button" value="张三:发送数据" onclick="send(this.form.message.value);">

    <h3>服务器输出:</h3>

    <textarea id ="responseText" style="width: 400px;height: 300px;"></textarea>

    <input type="button" onclick="javascript:document.getElementById('responseText').value=''" value="清空数据">
</form>
</body>
</html>

页面二就是对这个页面稍微改一下

启动后台后打开两个页面,即可开始进行通讯 1.jpg

2.jpg3.jpg4.jpg