博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
本人对于netty框架的一些理解,怎么与网站上的websock建立连接
阅读量:6996 次
发布时间:2019-06-27

本文共 6932 字,大约阅读时间需要 23 分钟。

在Netty的里面有一个Boss,他开了一家公司(开启一个服务端口)对外提供业务服务,它手下有一群做事情的workers。Boss一直对外宣传自己公司提供的业务,并且接受(accept)有需要的客户(client),当一位客户找到Boss说需要他公司提供的业务,Boss便会为这位客户安排一个worker,这个worker全程为这位客户服务(read/write)。如果公司业务繁忙,一个worker可能会为多个客户进行服务。这就是Netty里面Boss和worker之间的关系。下面看看Netty是如何让Boss和Worker进行协助的。

 
private EventLoopGroup boss = new NioEventLoopGroup();private EventLoopGroup work = new NioEventLoopGroup();ServerBootstrap bootstrap = new ServerBootstrap()        .group(boss, work)        .channel(NioServerSocketChannel.class)        .localAddress(new InetSocketAddress(nettyPort))        //保持长连接        .childOption(ChannelOption.SO_KEEPALIVE, true)        .childHandler(new HeartbeatInitializer());ChannelFuture future = bootstrap.bind().sync();if (future.isSuccess()) {    log.info("启动 Netty 成功");}

上诉代码初始化了一条netty的服务,那么如何初始话的写在类HeartbeatInitializer里面

public class HeartbeatInitializer extends ChannelInitializer
{ @Override protected void initChannel(Channel ch) throws Exception { ch.pipeline() //五秒没有收到消息 将IdleStateHandler 添加到 ChannelPipeline 拦截器中 .addLast(new IdleStateHandler(5, 0, 0)) // HttpServerCodec:将请求和应答消息解码为HTTP消息 .addLast("http-codec",new HttpServerCodec()) // HttpObjectAggregator:将HTTP消息的多个部分合成一条完整的HTTP消息 .addLast("aggregator",new HttpObjectAggregator(65536)) // ChunkedWriteHandler:向客户端发送HTML5文件 .addLast("http-chunked",new ChunkedWriteHandler()) .addLast(new HeartBeatSimpleHandle()); }}

上述文件设置了 拦截器,解码和解码合并,还有响应,最后一个new HeartBeatSimpleHandle()用来处理请求

public class HeartBeatSimpleHandle extends SimpleChannelInboundHandler {    private WebSocketServerHandshaker handShaker;    /**     * 取消绑定     * @param ctx     * @throws Exception     */    @Override    public void channelInactive(ChannelHandlerContext ctx) throws Exception {        NettySocketHolder.remove((NioSocketChannel) ctx.channel());    }    @Override    public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {        super.userEventTriggered(ctx, evt);    }    @Override    protected void channelRead0(ChannelHandlerContext ctx, Object msg) throws Exception {        // 传统的HTTP接入        if (msg instanceof FullHttpRequest) {            FullHttpRequest req = (FullHttpRequest) msg;            handleHttpRequest(ctx, req);            //获取url后置参数            String uri=req.uri();            QueryStringDecoder queryStringDecoder = new QueryStringDecoder(uri);            Map
> parameters = queryStringDecoder.parameters(); Integer userId = Integer.valueOf(parameters.get("userId").get(0)); // 存储当前登录ctx if(NettySocketHolder.get((long)userId) == null){ NettySocketHolder.put((long)userId, (NioSocketChannel) ctx.channel()); } // WebSocket接入 } else if (msg instanceof WebSocketFrame) { if ("live".equals(ctx.channel().attr(AttributeKey.valueOf("type")).get())) { handlerWebSocketFrame(ctx, (WebSocketFrame) msg); } log.info("收到msg={},id={}", msg); //保存客户端与 Channel 之间的关系 } } private void handleHttpRequest(ChannelHandlerContext ctx, FullHttpRequest req) { // 如果HTTP解码失败,返回HTTP异常 if (!req.decoderResult().isSuccess() || (!"websocket".equals(req.headers().get("Upgrade")))) { sendHttpResponse(ctx, req, new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.BAD_REQUEST)); return; } //获取url后置参数 HttpMethod method=req.method(); String uri=req.uri(); QueryStringDecoder queryStringDecoder = new QueryStringDecoder(uri); Map
> parameters = queryStringDecoder.parameters(); if(method==HttpMethod.GET&&"/websocket".equals(uri)){ //...处理 ctx.channel().attr(AttributeKey.valueOf("type")).set("live"); } // 构造握手响应返回,本机测试 WebSocketServerHandshakerFactory wsFactory = new WebSocketServerHandshakerFactory( "ws://"+req.headers().get(HttpHeaderNames.HOST)+uri, null, false); handShaker = wsFactory.newHandshaker(req); if (handShaker == null) { WebSocketServerHandshakerFactory.sendUnsupportedVersionResponse(ctx.channel()); } else { handShaker.handshake(ctx.channel(), req); } } private static void sendHttpResponse(ChannelHandlerContext ctx, FullHttpRequest req, DefaultFullHttpResponse res) { // 返回应答给客户端 if (res.status().code() != 200) { ByteBuf buf = Unpooled.copiedBuffer(res.status().toString(), CharsetUtil.UTF_8); res.content().writeBytes(buf); buf.release(); } // 如果是非Keep-Alive,关闭连接 ChannelFuture f = ctx.channel().writeAndFlush(res); if (!HttpUtil.isKeepAlive(req) || res.status().code() != 200) { f.addListener(ChannelFutureListener.CLOSE); } } private void handlerWebSocketFrame(ChannelHandlerContext ctx, WebSocketFrame frame) { // 判断是否关闭链路的指令 if (frame instanceof CloseWebSocketFrame) { handShaker.close(ctx.channel(), (CloseWebSocketFrame) frame.retain()); } }

HeartBeatSimpleHandle 用与处理管道(Channel)的读取工作,将管道储存在NettySocketHolder里面,用的时候取出。WebSocketServerHandshaker用于响应客户端的响应

var websocket;    $(function(){        $.ajax({            type: "POST",            url: "/login",            data: {                userId: 2,                account: "135541",                userName: "123"            },            contentType: "application/x-www-form-urlencoded; charset=utf-8",            dataType: "json",            success: function (result) {                websoketCannel(result.data.userId, result.data.account, result.data.userName);            }        });    });    function websoketCannel(userId, account, userName){        //如果浏览器支持WebSocket        if(window.WebSocket){            websocket = new WebSocket("ws://localhost:1212/websocket?userId="+ userId +"&account="+ account +"&userName="+ userName +"");  //获得WebSocket对象            //当有消息过来的时候触发            websocket.onmessage = function(event){                var data = JSON.parse(event.data);                $("#message").text(data.msg);            };            //连接关闭的时候触发            websocket.onclose = function(event){                console.log("断开连接");            };            //连接打开的时候触发            websocket.onopen = function(event){                console.log("建立连接");            }        }else{            alert("浏览器不支持WebSocket");        }    }    function sendMsg(msg) { //发送消息        if(window.WebSocket){            if(websocket.readyState == WebSocket.OPEN) { //如果WebSocket是打开状态                websocket.send(msg); //send()发送消息            }        }else{            return;        }    }

上面代码是客户端如何连netty

 

转载于:https://www.cnblogs.com/kangniuniu/p/11107768.html

你可能感兴趣的文章
算法之折半查找
查看>>
webpack实用小功能介绍
查看>>
OpenStack high-level Functionsenabled
查看>>
深入理解Linux内核-内核同步
查看>>
zabbix实现mysql数据库的监控(三)
查看>>
外观模式-多了个办事处
查看>>
laravel 文件上传
查看>>
《寻路算法第二篇》A*寻路的路径平滑、静态合并、生成格子工具自动化、
查看>>
求职防骗指南
查看>>
23命令模式Command
查看>>
Cortex系列M0-4简单对比
查看>>
相对定位
查看>>
JAVASCRIPT 类型转换
查看>>
MongoDB入门上
查看>>
B进制星球
查看>>
[mysql] 无法通过insert 创建用户ERROR 1364 (HY000): Field 'ssl_cipher' doesn't have a default value...
查看>>
Ruby初探
查看>>
【移动端】单位em相关资料
查看>>
SQL优化-标量子查询(数据仓库设计的隐患-标量子查询)
查看>>
java 拷贝功能
查看>>