翼度科技»论坛 编程开发 .net 查看内容

基于surging 的木舟平台如何通过HTTP网络组件接入设备

12

主题

12

帖子

36

积分

新手上路

Rank: 1

积分
36
一、概述

      上篇文章介绍了木舟如何上传模块热部署,那么此篇文章将介绍如何利用HTTP网络组件接入设备,那么有些人会问木舟又是什么,是什么架构为基础,能做什么呢?
       木舟 (Kayak) 是什么?
       木舟(Kayak)是基于.NET6.0软件环境下的surging微服务引擎进行开发的, 平台包含了微服务和物联网平台。支持异步和响应式编程开发,功能包含了物模型,设备,产品,网络组件的统一管理和微服务平台下的注册中心,服务路由,模块,中间服务等管理。还有多协议适配(TCP,MQTT,UDP,CoAP,HTTP,Grpc,websocket,rtmp,httpflv,webservice,等),通过灵活多样的配置适配能够接入不同厂家不同协议等设备。并且通过设备告警,消息通知,数据可视化等功能。能够让你能快速建立起微服务物联网平台系统。
     那么下面就为大家介绍如何从创建组件、协议、设备网关,设备到设备网关接入,再到设备数据上报,把整个流程通过此篇文章进行阐述。
二、网络组件

1.编辑创建HTTP协议的网络组件,可以选择共享配置和独立配置(独立配置是集群模式),然后可以选择开启swagger和webservice.

 开启成功后,可以看看swagger 是否可以访问

 又或者是访问一下中间服务,以上篇文章上传的Testapi 模块为例:

 三、自定义协议


  • 如何创建自定义协议模块
如果是网络编程开发,必然会涉及到协议报文的编码解码处理,那么对于平台也是做到了灵活处理,首先是协议模块创建,通过以下代码看出协议模块可以添加协议说明md文档, 身份鉴权处理,HTTP路由,消息编解码,元数据配置。下面一一介绍如何进行编写
  1.   public class Demo5ProtocolSupportProvider : ProtocolSupportProvider
  2.     {
  3.         public override IObservable<ProtocolSupport> Create(ProtocolContext context)
  4.         {<br>            var support = new ComplexProtocolSupport();<br>              support.Id = "demo5";<br>              support.Name = "演示协议5";<br>              support.Description = "演示协议5";
  5.               support.AddDocument(MessageTransport.Http, "Document/document-http.md");
  6.            support.AddAuthenticator(MessageTransport.Http, new Demo5Authenticator());
  7.            support.AddRoutes(MessageTransport.Http, new List<BasicMessageCodec>() {
  8.             BasicMessageCodec.DeviceOnline,
  9.              BasicMessageCodec.ReportProperty,
  10.              BasicMessageCodec.WriteProperty,
  11.               BasicMessageCodec.ReadProperty,
  12.               BasicMessageCodec.Event
  13.        }.Select(p => HttpDescriptor.Instance(p.Pattern)
  14.            .GroupName(p.Route.GroupName())
  15.            .HttpMethod(p.Route.HttpMethod())
  16.            .Path(p.Pattern)
  17.            .ContentType(MediaType.ToString(MediaType.ApplicationJson))
  18.            .Description(p.Route.Description())
  19.            .Example(p.Route.Example())
  20.            ).ToList());
  21.        support.AddMessageCodecSupport(MessageTransport.Http, () => Observable.Return(new HttpDeviceMessageCodec()));
  22.        support.AddConfigMetadata(MessageTransport.Http, _httpConfig);
  23.        return Observable.Return(support);
  24.                
  25.         }
  26.      
  27.      }               
复制代码
1. 添加协议说明文档如代码: support.AddDocument(MessageTransport.Http, "Document/document-http.md");,文档仅支持 markdown文件,如下所示
 
  1. ### 使用HTTP推送设备数据
  2. 上报属性例子:
  3. POST /{productId}/{deviceId}/properties/report
  4. Authorization:{产品或者设备中配置的Token}
  5. Content-Type: application/json
  6. {
  7. "properties":{
  8.    "temp":11.5
  9. }
  10. }
  11. 上报事件例子:
  12. POST /{productId}/{deviceId}/event/{eventId}
  13. Authorization:{产品或者设备中配置的Token}
  14. Content-Type: application/json
  15. {
  16. "data":{
  17.    "createtime": ""
  18. }
  19. }
复制代码
 
2. 添加身份鉴权如代码:  support.AddAuthenticator(MessageTransport.Http, new Demo5Authenticator()) ,自定义身份鉴权Demo5Authenticator 代码如下:

 
  1.        public class Demo5Authenticator : IAuthenticator
  2.        {
  3.            public IObservable<AuthenticationResult> Authenticate(IAuthenticationRequest request, IDeviceOperator deviceOperator)
  4.            {
  5.                var result = Observable.Return<AuthenticationResult>(default);
  6.                if (request is DefaultAuthRequest)
  7.                {
  8.                    var authRequest = request as DefaultAuthRequest;
  9.                    deviceOperator.GetConfig(authRequest.GetTransport()==MessageTransport.Http?"token": "key").Subscribe(  config =>
  10.                    {
  11.                        var password = config.Convert<string>();
  12.                        if (authRequest.Password.Equals(password))
  13.                        {
  14.                            result= result.Publish(AuthenticationResult.Success(authRequest.DeviceId));
  15.                        }
  16.                        else
  17.                        {
  18.                            result= result.Publish(AuthenticationResult.Failure(StatusCode.CUSTOM_ERROR, "验证失败,密码错误"));
  19.                        }
  20.                    });
  21.                }
  22.                else
  23.                result = Observable.Return<AuthenticationResult>(AuthenticationResult.Failure(StatusCode.CUSTOM_ERROR, "不支持请求参数类型"));
  24.                return result;
  25.            }
  26.            public IObservable<AuthenticationResult> Authenticate(IAuthenticationRequest request, IDeviceRegistry registry)
  27.            {
  28.                var result = Observable.Return<AuthenticationResult>(default);
  29.                var authRequest = request as DefaultAuthRequest;
  30.                registry
  31.                  .GetDevice(authRequest.DeviceId)
  32.                  .Subscribe(async p => {
  33.                     var config=  await p.GetConfig(authRequest.GetTransport() == MessageTransport.Http ? "token" : "key");
  34.                      var password= config.Convert<string>();
  35.                     if(authRequest.Password.Equals(password))
  36.                      {
  37.                          result= result.Publish(AuthenticationResult.Success(authRequest.DeviceId));
  38.                      }
  39.                      else
  40.                      {
  41.                          result= result.Publish(AuthenticationResult.Failure(StatusCode.CUSTOM_ERROR, "验证失败,密码错误"));
  42.                      }
  43.                  });
  44.                return result;
  45.            }
  46.        }
复制代码
 
3. 添加Http路由代码support.AddRoutes,那么如何配置呢,代码如下:
 
  1.     public static BasicMessageCodec ReportProperty =>
  2. new BasicMessageCodec("/*/properties/report", typeof(ReadPropertyMessage), route => route.GroupName("属性上报")
  3.                      .HttpMethod("Post")
  4.                      .Description("上报物模型属性数据")
  5.                      .Example("{"properties":{"属性ID":"属性值"}}"));
复制代码
 
4.添加消息编解码代码 support.AddMessageCodecSupport(MessageTransport.Http, () => Observable.Return(new HttpDeviceMessageCodec())), 可以自定义编解码,HttpDeviceMessageCodec代码如下:
 
  1.   public class HttpDeviceMessageCodec : DeviceMessageCodec
  2.   {
  3.       private readonly MessageTransport _transport;
  4.       public HttpDeviceMessageCodec() : this(MessageTransport.Http)
  5.       {
  6.       }
  7.       private static DefaultHttpResponseMessage Unauthorized(String msg)
  8.       {
  9.           return new DefaultHttpResponseMessage()
  10.                   .ContentType(MediaType.ApplicationJson)
  11.                   .Body("{"success":false,"code":"unauthorized","message":"" + msg + ""}")
  12.                   .Status(HttpStatus.AuthorizationFailed);
  13.       }
  14.       private static DefaultHttpResponseMessage BadRequest()
  15.       {
  16.           return new DefaultHttpResponseMessage()
  17.                   .ContentType(MediaType.ApplicationJson)
  18.                   .Body("{"success":false,"code":"bad_request"}")
  19.                   .Status(HttpStatus.RequestError);
  20.       }
  21.       public HttpDeviceMessageCodec(MessageTransport transport)
  22.       {
  23.           _transport = transport;
  24.       }
  25.       public override IObservable<IDeviceMessage> Decode(MessageDecodeContext context)
  26.       {
  27.           if (context.GetMessage() is HttpRequestMessage)
  28.           {
  29.               return DecodeHttpRequestMessage(context);
  30.           }
  31.           return Observable.Return<IDeviceMessage>(default);
  32.       }
  33.       public override  IObservable<IEncodedMessage> Encode(MessageEncodeContext context)
  34.       {
  35.           return Observable.Return<IEncodedMessage>(default);
  36.       }
  37.       private IObservable<IDeviceMessage> DecodeHttpRequestMessage(MessageDecodeContext context)
  38.       {
  39.           var result = Observable.Return<IDeviceMessage>(default);
  40.           var message = (HttpExchangeMessage)context.GetMessage();
  41.           Header? header = message.Request.GetHeader("Authorization");
  42.           if (header == null || header.Value == null || header.Value.Length == 0)
  43.           {
  44.               message
  45.                    .Response(Unauthorized("Authorization header is required")).ToObservable()
  46.                    .Subscribe(p => result = result.Publish(default));
  47.               return result;
  48.           }
  49.           var httpToken = header.Value[0];
  50.           var paths = message.Path.Split("/");
  51.           if (paths.Length == 0)
  52.           {
  53.               message.Response(BadRequest()).ToObservable()
  54.                  .Subscribe(p => result = result.Publish(default));
  55.               return result;
  56.           }
  57.           String deviceId = paths[1];
  58.           context.GetDevice(deviceId).Subscribe(async deviceOperator =>
  59.           {
  60.               var config = deviceOperator==null?null: await deviceOperator.GetConfig("token");
  61.               var token = config?.Convert<string>();
  62.               if (token == null || !httpToken.Equals(token))
  63.               {
  64.                   await message
  65.                        .Response(Unauthorized("Device not registered or authentication failed"));
  66.               }
  67.               else
  68.               {
  69.                   var deviceMessage = await DecodeBody(message, deviceId);
  70.                   if (deviceMessage != null)
  71.                   {
  72.                       await message.Success("{"success":true,"code":"success"}");
  73.                       result = result.Publish(deviceMessage);
  74.                   }
  75.                   else
  76.                   {
  77.                       await message.Response(BadRequest());
  78.                   }
  79.               }
  80.           });
  81.           return result;
  82.       }
  83.       private async Task<IDeviceMessage> DecodeBody(HttpExchangeMessage message,string deviceId)
  84.       {
  85.           byte[] body = new byte[message.Payload.ReadableBytes];
  86.           message.Payload.ReadBytes(body);
  87.           var deviceMessage = await TopicMessageCodec.Dodecode(message.Path, body);
  88.           deviceMessage.DeviceId = deviceId;
  89.           return deviceMessage;
  90.       }
  91.   }
复制代码
 
5.添加元数据配置代码 support.AddConfigMetadata(MessageTransport.Http, _httpConfig);  _httpConfig代码如下
  1.         private readonly DefaultConfigMetadata _httpConfig = new DefaultConfigMetadata(
  2.         "Http认证配置"
  3.         , "token为http认证令牌")
  4.         .Add("token", "token", "http令牌", StringType.Instance);
复制代码

  • 如何加载协议模块,协议模块包含了协议模块支持添加引用加载和上传热部署加载。                           
   引用加载模块

 上传热部署协议模块

 四、设备网关

创建设备网关

 五、产品管理

以下是添加产品。

 设备接入

 六、设备管理

添加设备

 HTTP认证配置

 创建告警阈值

 七、测试

 利用Postman 进行测试,以调用http://127.0.0.1:168/{productid}/{deviceid}/properties/report 为例,Authorization设置:123456
1.正常数据测试

 
 
 2. 如果是选用Get方式调用,会因为找不到ServiceRoute而返回错误。

 3. 把Authorization改成1111,会返回错误Device not registered or authentication failed,从而上报数据失败

 以上上传的数据可以在设备信息-》运行状态中查看

 告警信息可以在超临界数据中查看

  七、总结

 以上是基于HTTP网络组件设备接入,现有平台网络组件可以支持TCP,MQTT,UDP,CoAP,HTTP,Grpc,websocket,rtmp,httpflv,webservice,tcpclient, 而设备接入支持TCP,UDP,HTTP网络组件,后面会陆续添加支持所有网络组件接入,后面我也会陆续介绍其它网路组件设备接入 ,  然后定于11月20日发布1.0测试版平台。也请大家到时候关注捧场。
 

来源:https://www.cnblogs.com/fanliang11/p/18527947
免责声明:由于采集信息均来自互联网,如果侵犯了您的权益,请联系我们【E-Mail:cb@itdo.tech】 我们会及时删除侵权内容,谢谢合作!

本帖子中包含更多资源

您需要 登录 才可以下载或查看,没有账号?立即注册

x

举报 回复 使用道具