|
一、概述
上篇文章介绍了木舟通过基于木舟平台浅谈surging 的热点KEY的解决方法,那么此篇文章将介绍基于surging的木舟平台如何分布式接入设备.
木舟 (Kayak) 是什么?
木舟(Kayak)是基于.NET6.0软件环境下的surging微服务引擎进行开发的, 平台包含了微服务和物联网平台。支持异步和响应式编程开发,功能包含了物模型,设备,产品,网络组件的统一管理和微服务平台下的注册中心,服务路由,模块,中间服务等管理。还有多协议适配(TCP,MQTT,UDP,CoAP,HTTP,Grpc,websocket,rtmp,httpflv,webservice,等),通过灵活多样的配置适配能够接入不同厂家不同协议等设备。并且通过设备告警,消息通知,数据可视化等功能。能够让你能快速建立起微服务物联网平台系统。
那么下面就为大家介绍如何从创建组件、协议、设备网关,设备到设备网关接入,再到设备数据上报,把整个流程通过此篇文章进行阐述。
木舟kayal 平台开源地址:https://github.com/microsurging/
surging 微服务引擎开源地址:https://github.com/fanliang11/surging(后面surging 会移动到microsurging进行维护)
二、网络组件
1.编辑创建Tcp协议的网络组件,可以选择独立配置(独立配置是集群模式). 下图是解析方式选择了自定义脚本进行解码操作。
选择了独立配置后(共享配置不会进行注册路由),如果架设了新的网关实例,就会在注册中心networkroute/tcp路径下添加服务节点,以consul注册中心为例,打开:http://127.0.0.1:8500
三、自定义协议
如果是网络编程开发,必然会涉及到协议报文的编码解码处理,那么对于平台也是做到了灵活处理,首先是协议模块创建,通过以下代码看出协议模块可以添加协议说明md文档, 身份鉴权处理,消息编解码,元数据配置。下面一一介绍如何进行编写
- public class Demo3ProtocolSupportProvider : ProtocolSupportProvider
- {
- public override IObservable<ProtocolSupport> Create(ProtocolContext context)
- {
- var support = new ComplexProtocolSupport();
- support.Id = "demo_3";
- support.Name = "演示协议3";
- support.Description = "演示协议3";
- support.AddAuthenticator(MessageTransport.Tcp, new Demo5Authenticator());
- support.AddDocument(MessageTransport.Tcp, "Document/document-tcp.md");
- support.Script = "\r\nvar decode=function(buffer)\r\n{\r\n parser.Fixed(5).Handler(\r\n function(buffer){ \r\n var bytes = BytesUtils.GetBytes(buffer,1,4);\r\n var len = BytesUtils.LeStrToInt(bytes,1,4);//2. 获取消息长度.\r\n var buf = BytesUtils.Slice(buffer,0,5); \r\n parser.Fixed(len).Result(buf); \r\n }).Handler(function(buffer){ parser.Result(buffer).Complete(); \r\n }\r\n )\r\n}\r\nvar encode=function(buffer)\r\n{\r\n}";
- support.AddMessageCodecSupport(MessageTransport.Tcp, () => Observable.Return(new ScriptDeviceMessageCodec(support.Script)));
- support.AddConfigMetadata(MessageTransport.Tcp, _tcpConfig);
- support.AddAuthenticator(MessageTransport.Udp, new Demo5Authenticator());
- support.Script = "\r\nvar decode=function(buffer)\r\n{\r\n parser.Fixed(5).Handler(\r\n function(buffer){ \r\n var bytes = BytesUtils.GetBytes(buffer,1,4);\r\n var len = BytesUtils.LeStrToInt(bytes,1,4);//2. 获取消息长度.\r\n var buf = BytesUtils.Slice(buffer,0,5); \r\n parser.Fixed(len).Result(buf); \r\n }).Handler(function(buffer){ parser.Result(buffer).Complete(); \r\n }\r\n )\r\n}\r\nvar encode=function(buffer)\r\n{\r\n}";
- support.AddMessageCodecSupport(MessageTransport.Udp, () => Observable.Return(new ScriptDeviceMessageCodec(support.Script)));
- support.AddConfigMetadata(MessageTransport.Udp, _udpConfig);
- return Observable.Return(support);
- }
- }
复制代码 1. 添加协议说明文档如代码: support.AddDocument(MessageTransport.Tcp, "Document/document-tcp.md");,文档仅支持 markdown文件,如下所示
- ### 认证说明
- CONNECT报文:
- ```text
- clientId: 设备ID
- password: md5(timestamp+"|"+secureKey)
- ```
复制代码
2. 添加身份鉴权如代码: support.AddAuthenticator(MessageTransport.Http, new Demo5Authenticator()) ,自定义身份鉴权Demo5Authenticator 代码如下:
- public class Demo5Authenticator : IAuthenticator
- {
- public IObservable<AuthenticationResult> Authenticate(IAuthenticationRequest request, IDeviceOperator deviceOperator)
- {
- var result = Observable.Return<AuthenticationResult>(default);
- if (request is DefaultAuthRequest)
- {
- var authRequest = request as DefaultAuthRequest;
- deviceOperator.GetConfig(authRequest.GetTransport()==MessageTransport.Http?"token": "key").Subscribe( config =>
- {
- var password = config.Convert<string>();
- if (authRequest.Password.Equals(password))
- {
- result= result.Publish(AuthenticationResult.Success(authRequest.DeviceId));
- }
- else
- {
- result= result.Publish(AuthenticationResult.Failure(StatusCode.CUSTOM_ERROR, "验证失败,密码错误"));
- }
- });
- }
- else
- result = Observable.Return<AuthenticationResult>(AuthenticationResult.Failure(StatusCode.CUSTOM_ERROR, "不支持请求参数类型"));
- return result;
- }
- public IObservable<AuthenticationResult> Authenticate(IAuthenticationRequest request, IDeviceRegistry registry)
- {
- var result = Observable.Return<AuthenticationResult>(default);
- var authRequest = request as DefaultAuthRequest;
- registry
- .GetDevice(authRequest.DeviceId)
- .Subscribe(async p => {
- var config= await p.GetConfig(authRequest.GetTransport() == MessageTransport.Http ? "token" : "key");
- var password= config.Convert<string>();
- if(authRequest.Password.Equals(password))
- {
- result= result.Publish(AuthenticationResult.Success(authRequest.DeviceId));
- }
- else
- {
- result= result.Publish(AuthenticationResult.Failure(StatusCode.CUSTOM_ERROR, "验证失败,密码错误"));
- }
- });
- return result;
- }
- }
复制代码
3.添加消息编解码代码 support.AddMessageCodecSupport(MessageTransport.Tcp, () => Observable.Return(new ScriptDeviceMessageCodec(support.Script)));, 可以自定义编解码,ScriptDeviceMessageCodec代码如下:
[code]using DotNetty.Buffers;using Jint;using Jint.Parser;using Microsoft.CodeAnalysis.Scripting;using Microsoft.Extensions.Logging;using RulesEngine.Models;using Surging.Core.CPlatform.Codecs.Core;using Surging.Core.CPlatform.Utilities;using Surging.Core.DeviceGateway.Runtime.Device.Message;using Surging.Core.DeviceGateway.Runtime.Device.Message.Event;using Surging.Core.DeviceGateway.Runtime.Device.Message.Property;using Surging.Core.DeviceGateway.Runtime.Device.MessageCodec;using Surging.Core.DeviceGateway.Runtime.RuleParser.Implementation;using Surging.Core.DeviceGateway.Utilities;using System;using System.Collections.Generic;using System.Linq;using System.Reactive.Linq;using System.Reactive.Subjects;using System.Runtime;using System.Text;using System.Text.Json;using System.Text.RegularExpressions;using System.Threading.Tasks;namespace Surging.Core.DeviceGateway.Runtime.Device.Implementation{ public class ScriptDeviceMessageCodec : DeviceMessageCodec { public string GlobalVariable { get; private set; } public string EncoderScript { get; private set; } public string DecoderScript { get; private set; } public IObservable _rulePipePayload; private readonly ILogger _logger; public ScriptDeviceMessageCodec(string script) { _logger = ServiceLocator.GetService(); RegexOptions options = RegexOptions.Singleline | RegexOptions.IgnoreCase; string matchStr = Regex.Match(script, @"var\s*[\w$]*\s*\=.*function.*\(.*\)\s*\{[\s\S]*\}.*?v", options).Value; if (!string.IsNullOrEmpty(matchStr)) { DecoderScript = matchStr.TrimEnd('v'); DecoderScript= Regex.Replace(DecoderScript, @"var\s*[\w$]*\s*\=[.\r|\n|\t|\s]*?(function)\s*\([\w$]*\s*\)\s*\{", "", RegexOptions.IgnoreCase); DecoderScript= DecoderScript.Slice(0, DecoderScript.LastIndexOf('}')); EncoderScript = script.Replace(DecoderScript, ""); } var matchStr1 = Regex.Matches(script, @"(? |
本帖子中包含更多资源
您需要 登录 才可以下载或查看,没有账号?立即注册
x
|