我的mqtt协议和emqttd开源项目个人理解(23) |
您所在的位置:网站首页 › 抓包分析http › 我的mqtt协议和emqttd开源项目个人理解(23) |
我们可以使用emq自带的Dashboard插件,进行websocket调试,打开谷歌浏览器输入网址,其中192.168.83.128是emq所在的IP地址: http://192.168.83.128:18083/#/websocket 用户名:admin,密码:public WebSocket URI:ws(s)://192.168.83.128:8083/mqtt TCP URI:tcp://192.168.83.128:1883 一、测试实践 测试环境:使用MQTT 3.1.1版本协议,Wireshark 2.4.1版本,使用wireshark抓包分析(如果是虚拟机,要抓VMnet8虚拟网卡)。 clientid:861694030142473 username:libaineu2004 password:12345678 cleanSession:true keepalive:60s 1、测试方案1,使用普通的tcp连接,TCP URI:tcp://192.168.83.128:1883,wireshark过滤条件tcp.port == 1883。 (1)、MQTT Connect Command 鼠标点击wireshark的Frame页面,右键菜单,复制,选中树的所有可见项目 MQ Telemetry Transport Protocol, Connect Command Header Flags: 0x10 (Connect Command) Msg Len: 51 Protocol Name Length: 4 Protocol Name: MQTT Version: 4 Connect Flags: 0xc2 Keep Alive: 60 Client ID Length: 15 Client ID: 861694030142473 User Name Length: 12 User Name: libaineu2004 Password Length: 8 Password: 12345678 右键菜单,复制,字节为HEX + ASCII转储 0000 10 33 00 04 4d 51 54 54 04 c2 00 3c 00 0f 38 36 .3..MQTT... mochiweb:start_http(http, ListenOn, Opts, {emqttd_http, handle_request, []}); 打开8083端口,开启http服务器。Websocket也是基于 TCP 协议的,同时借用了HTTP的协议来完成一部分握手。 2、-module(emqttd_http). %%-------------------------------------------------------------------- %% MQTT Over WebSocket %%-------------------------------------------------------------------- handle_request('GET', "/mqtt", Req) -> lager:info("WebSocket Connection from: ~s", [Req:get(peer)]), Upgrade = Req:get_header_value("Upgrade"), Proto = Req:get_header_value("Sec-WebSocket-Protocol"), case {is_websocket(Upgrade), Proto} of {true, "mqtt" ++ _Vsn} -> emqttd_ws:handle_request(Req); {false, _} -> lager:error("Not WebSocket: Upgrade = ~s", [Upgrade]), Req:respond({400, [], }); {_, Proto} -> lager:error("WebSocket with error Protocol: ~s", [Proto]), Req:respond({400, [], }) end; websocket的入口函数。重点关注emqttd_ws:handle_request(Req); 3、-module(emqttd_ws). ws_loop(Data, State = #wsocket_state{peer = Peer, client_pid = ClientPid, parser_fun = ParserFun}, ReplyChannel) -> ?WSLOG(debug, Peer, "RECV ~p", [Data]), case catch ParserFun(iolist_to_binary(Data)) of {more, NewParser} -> State#wsocket_state{parser_fun = NewParser}; {ok, Packet, Rest} -> gen_server:cast(ClientPid, {received, Packet}), ws_loop(Rest, reset_parser(State), ReplyChannel); {error, Error} -> ?WSLOG(error, Peer, "Frame error: ~p", [Error]), exit({shutdown, Error}); {'EXIT', Reason} -> ?WSLOG(error, Peer, "Frame error: ~p", [Reason]), ?WSLOG(error, Peer, "Error data: ~p", [Data]), exit({shutdown, parser_error}) end.gen_server:cast(ClientPid, {received, Packet}), 接收来自客户端的消息。 4、-module(emqttd_ws_client). handle_cast({received, Packet}, State = #wsclient_state{peer = Peer, proto_state = ProtoState}) -> case emqttd_protocol:received(Packet, ProtoState) of {ok, ProtoState1} -> noreply(State#wsclient_state{proto_state = ProtoState1}); {error, Error} -> ?WSLOG(error, Peer, "Protocol error - ~p", [Error]), shutdown(Error, State); {error, Error, ProtoState1} -> shutdown(Error, State#wsclient_state{proto_state = ProtoState1}); {stop, Reason, ProtoState1} -> stop(Reason, State#wsclient_state{proto_state = ProtoState1}) end;emqttd_protocol:received(Packet, ProtoState)。接收处理消息。 5、-module(emqttd_protocol). process(Packet = ?CONNECT_PACKET(Var), State0) -> #mqtt_packet_connect{proto_ver = ProtoVer, proto_name = ProtoName, username = Username, password = Password, clean_sess = CleanSess, keep_alive = KeepAlive, client_id = ClientId} = Var, State1 = State0#proto_state{proto_ver = ProtoVer, proto_name = ProtoName, username = Username, client_id = ClientId, clean_sess = CleanSess, keepalive = KeepAlive, will_msg = willmsg(Var), connected_at = os:timestamp()}, trace(recv, Packet, State1), {ReturnCode1, SessPresent, State3} = case validate_connect(Var, State1) ofvalidate_connect(Var, State1),校验clientid,username和password的有效性。 |
CopyRight 2018-2019 办公设备维修网 版权所有 豫ICP备15022753号-3 |