故事
首页
指南
  • Java
  • Python
  • Linux
  • 前端
  • Docker
  • 实践
  • 折腾
  • 分类
  • 标签
  • 归档
壁纸 (opens new window)
GitHub (opens new window)
首页
指南
  • Java
  • Python
  • Linux
  • 前端
  • Docker
  • 实践
  • 折腾
  • 分类
  • 标签
  • 归档
壁纸 (opens new window)
GitHub (opens new window)
  • Java基础

    • String类的深入学习
    • HashMap源码学习
    • Java8新特性回顾
  • Java框架

    • POI事件模式解析并读取Excel文件数据
    • SpringMVC的执行流程源码分析
    • netty+websocket实现即时通讯功能
    • 分布式定时任务解决方案xxl-job
    • spring-session实现集群session共享
    • springcloud优雅下线服务
    • Java日志发展历史
    • Mybatis Generator配置
    • JavaSPI机制和Springboot自动装配原理
    • Spring Cloud Config接入
    • 使用spring validation进行参数校验
    • dubbo接口超时配置
    • ConfigurationProperties注解
    • EasyExcel导出excel
  • 中间件

    • 分布式锁解决方案
    • 关于消息中间件MQ
    • kafka学习记录
    • kakfa实践
    • kakfa重复消费问题处理
  • 数据库

    • MySql索引
    • SQL优化学习
    • Otter实现数据全量增量同步
  • 开发工具

    • Maven的生命周期
    • nexus无法下载依赖的问题记录
  • 其他

    • linux服务器安装OpenOffice踩坑
    • POI踩坑-zip file is closed
    • OpenJDK没有jstack等命令的解决办法
    • 微信小程序加密数据对称解密工具类
    • mybatis的classpath配置导致的jar包读取问题
    • feign请求导致的用户ip获取问题记录
    • ribbon刷新服务列表间隔和canal的坑
    • cpu占用率高排查思路
  • 简介
  • java
  • Java框架
storyxc
2021-06-19

netty+websocket实现即时通讯功能

# netty+websocket实现即时通讯功能

大致思路:

  • 后台应用启动后开启nettyserver
  • 前台登录后使用websocket连接netty
  • 登录时先向netty发送一条初始化消息,服务器将保存通道和当前用户的关系映射
  • 通讯双方都上线时,即可以开始聊天

后台:

  • 应用启动后,开启netty服务器
@Component
public class ServerStarter implements ApplicationListener<ContextRefreshedEvent> {

    @Override
    public void onApplicationEvent(ContextRefreshedEvent event) {
        if (event.getApplicationContext().getParent() == null ){
            try {
                new IMServer().start();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }

1
2
3
4
5
6
7
8
9
10
11
12
13
14
  • nettyserver
/**
 * @author Xc
 * @description
 * @createdTime 2020/7/9 9:23
 */
public class IMServer {
    Logger logger = LoggerFactory.getLogger(IMServer.class);

    public void start() throws InterruptedException {
        NioEventLoopGroup boss = new NioEventLoopGroup();
        NioEventLoopGroup worker = new NioEventLoopGroup();
        ServerBootstrap serverBootstrap = new ServerBootstrap();
        serverBootstrap.group(boss,worker)
                .channel(NioServerSocketChannel.class)
                .localAddress(8000)
		//自定义初始化器
                .childHandler(new IMStoryInitializer());
        ChannelFuture future = serverBootstrap.bind();
        future.addListener(new ChannelFutureListener() {
            @Override
            public void operationComplete(ChannelFuture future) throws Exception {
                logger.info("server start on 8000");
            }
        });
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
  • 初始化器
/**
 * @author Xc
 * @description
 * @createdTime 2020/7/9 9:28
 */
public class IMStoryInitializer extends ChannelInitializer<SocketChannel> {
    
    @Override
    protected void initChannel(SocketChannel ch) throws Exception {
        ChannelPipeline pipeline = ch.pipeline();
        //http编解码器
        pipeline.addLast(new HttpServerCodec());
        //以块写数据
        pipeline.addLast(new ChunkedWriteHandler());
        //聚合器
        pipeline.addLast(new HttpObjectAggregator(64*1024));

        //websocket
        pipeline.addLast(new WebSocketServerProtocolHandler("/ws"));
        //自定义handler
        pipeline.addLast(new ChatHandler());
        
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
  • 自定义handler
/**
 * @author Xc
 * @description
 * @createdTime 2020/7/9 9:34
 */
public class ChatHandler extends SimpleChannelInboundHandler<TextWebSocketFrame> {

    /**
     * 管理所有channel
     */
    public static ChannelGroup channels = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);

    @Override
    protected void channelRead0(ChannelHandlerContext ctx, TextWebSocketFrame msg) throws Exception {
        //客户端发过来的消息
        String content = msg.text();
        //当前通道
        Channel channel = ctx.channel();

        Message message = JSON.parseObject(content, Message.class);

        String data = message.getMsg();

        String fromUser = message.getFromUser();
        //客户端建立连接后先发送一条init消息,后台保存这个通道和用户信息的映射
        if (StringUtils.equals(message.getAction(), "init")) {
            ChannelUserContext.put(fromUser,channel);
        } else if (StringUtils.equals(message.getAction(),"chat")){
            Channel toChannel = ChannelUserContext.get(message.getToUser());
            if (toChannel != null) {
                //消息接收方在线
                toChannel.writeAndFlush(new TextWebSocketFrame(JSON.toJSONString(message.getFromUser() + " : " + message.getMsg())));
            } else {
                //接收方不在线 离线消息推送
            }

        }

    }

    @Override
    public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
        channels.add(ctx.channel());
    }

    @Override
    public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
        channels.remove(ctx.channel());
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
  • 通道和用户的映射保存
/**
 * @author Xc
 * @description
 * @createdTime 2020/7/9 9:46
 */
public class ChannelUserContext {

    private static ConcurrentHashMap<String, Channel> userChannelMap;

    static{
        userChannelMap = new ConcurrentHashMap<>();
    }

    public static void put(String user, Channel channel){
        userChannelMap.put(user,channel);
    }

    public static Channel get(String user) {
        return userChannelMap.get(user);
    }
    
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
  • message
/**
 * @author Xc
 * @description
 * @createdTime 2020/7/9 9:38
 */
@Data
public class Message implements Serializable {
    private static final long serialVersionUID = 301234912340234L;
    private String msg;
    private String fromUser;
    private String toUser;
    private String action;
}
1
2
3
4
5
6
7
8
9
10
11
12
13

前台页面1

<!DOCTYPE html>
<html lang="en">
<head>
    <meta charset="UTF-8">
    <title>WebSocket客户端</title>
</head>
<body>
<script type="text/javascript">
    var socket;

    //如果浏览器支持WebSocket
    if(window.WebSocket){
        //参数就是与服务器连接的地址
        socket = new WebSocket("ws://localhost:8000/ws");

        //客户端收到服务器消息的时候就会执行这个回调方法
        socket.onmessage = function (event) {
			console.log(event);
            var ta = document.getElementById("responseText");
            ta.value = ta.value + "\n"+event.data;
        }

        //连接建立的回调函数
        socket.onopen = function(event){
            var ta = document.getElementById("responseText");

            ta.value = "连接开启";
            var message = '{"action":"init","msg":"test","fromUser":"张三","toUser":"李四"}';
            socket.send(message);
        }

        //连接断掉的回调函数
        socket.onclose = function (event) {
            var ta = document.getElementById("responseText");
            ta.value = ta.value +"\n"+"连接关闭";
        }
    }else{
        alert("浏览器不支持WebSocket!");
    }

    //发送数据
    function send(message){
        if(!window.WebSocket){
            return;
        }

        //当websocket状态打开
        if(socket.readyState == WebSocket.OPEN){
			message = '{"action":"chat","msg":"'+ message +'","fromUser":"张三","toUser":"李四"}';
            socket.send(message);
        }else{
            alert("连接没有开启");
        }
    }
</script>
<form onsubmit="return false">
    <textarea name = "message" style="width: 400px;height: 200px"></textarea>

    <input type ="button" value="张三:发送数据" onclick="send(this.form.message.value);">

    <h3>服务器输出:</h3>

    <textarea id ="responseText" style="width: 400px;height: 300px;"></textarea>

    <input type="button" onclick="javascript:document.getElementById('responseText').value=''" value="清空数据">
</form>
</body>
</html>
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68

页面二就是对这个页面稍微改一下

启动后台后打开两个页面,即可开始进行通讯 1.jpg

2.jpg 3.jpg 4.jpg

编辑 (opens new window)
#netty
上次更新: 2022/04/21, 18:12:24
SpringMVC的执行流程源码分析
分布式定时任务解决方案xxl-job

← SpringMVC的执行流程源码分析 分布式定时任务解决方案xxl-job→

Theme by Vdoing | Copyright © 2019-2023 story | 豫ICP备19046036号
  • 跟随系统
  • 浅色模式
  • 深色模式
  • 阅读模式