翼度科技»论坛 编程开发 python 查看内容

OPCUA-Python实例

8

主题

8

帖子

24

积分

新手上路

Rank: 1

积分
24
OPCUA-Python
  1. pip install opcua
复制代码
Server
  1. from threading import Thread
  2. import copy
  3. import logging
  4. from datetime import datetime
  5. import time
  6. from math import sin
  7. import sys
  8. sys.path.insert(0, "..")

  9. try:
  10.     from IPython import embed
  11. except ImportError:
  12.     import code

  13.     def embed():
  14.         myvars = globals()
  15.         myvars.update(locals())
  16.         shell = code.InteractiveConsole(myvars)
  17.         shell.interact()


  18. from opcua import ua, uamethod, Server


  19. class SubHandler(object):

  20.     """
  21.     Subscription Handler. To receive events from server for a subscription
  22.     """

  23.     def datachange_notification(self, node, val, data):
  24.         print("Python: New data change event", node, val)

  25.     def event_notification(self, event):
  26.         print("Python: New event", event)


  27. # method to be exposed through server

  28. def func(parent, variant):
  29.     ret = False
  30.     if variant.Value % 2 == 0:
  31.         ret = True
  32.     return [ua.Variant(ret, ua.VariantType.Boolean)]


  33. # method to be exposed through server
  34. # uses a decorator to automatically convert to and from variants

  35. @uamethod
  36. def multiply(parent, x, y):
  37.     print("multiply method call with parameters: ", x, y)
  38.     return x * y


  39. class VarUpdater(Thread):
  40.     def __init__(self, var):
  41.         Thread.__init__(self)
  42.         self._stopev = False
  43.         self.var = var

  44.     def stop(self):
  45.         self._stopev = True

  46.     def run(self):
  47.         while not self._stopev:
  48.             v = sin(time.time() / 10)
  49.             self.var.set_value(v)
  50.             time.sleep(0.1)



  51. if __name__ == "__main__":
  52.     # optional: setup logging
  53.     logging.basicConfig(level=logging.WARN)
  54.     #logger = logging.getLogger("opcua.address_space")
  55.     # logger.setLevel(logging.DEBUG)
  56.     #logger = logging.getLogger("opcua.internal_server")
  57.     # logger.setLevel(logging.DEBUG)
  58.     #logger = logging.getLogger("opcua.binary_server_asyncio")
  59.     # logger.setLevel(logging.DEBUG)
  60.     #logger = logging.getLogger("opcua.uaprocessor")
  61.     # logger.setLevel(logging.DEBUG)

  62.     # now setup our server
  63.     server = Server()
  64.     #server.disable_clock()
  65.     #server.set_endpoint("opc.tcp://localhost:4840/freeopcua/server/")
  66.     server.set_endpoint("opc.tcp://0.0.0.0:4841/freeopcua/server/")
  67.     server.set_server_name("FreeOpcUa Example Server")
  68.     # set all possible endpoint policies for clients to connect through
  69.     server.set_security_policy([
  70.         ua.SecurityPolicyType.NoSecurity,
  71.         # ua.SecurityPolicyType.Basic256Sha256_SignAndEncrypt,
  72.         # ua.SecurityPolicyType.Basic256Sha256_Sign
  73.     ])

  74.     # setup our own namespace
  75.     uri = "http://examples.freeopcua.github.io"
  76.     idx = server.register_namespace(uri)

  77.     # create a new node type we can instantiate in our address space
  78.     dev = server.nodes.base_object_type.add_object_type(0, "MyDevice")
  79.     dev.add_variable(0, "sensor1", 1.0).set_modelling_rule(True)
  80.     dev.add_property(0, "device_id", "0340").set_modelling_rule(True)
  81.     ctrl = dev.add_object(0, "controller")
  82.     ctrl.set_modelling_rule(True)
  83.     ctrl.add_property(0, "state", "Idle").set_modelling_rule(True)

  84.     # populating our address space

  85.     # First a folder to organise our nodes
  86.     myfolder = server.nodes.objects.add_folder(idx, "myEmptyFolder")
  87.     # instanciate one instance of our device
  88.     mydevice = server.nodes.objects.add_object(idx, "Device0001", dev)
  89.     mydevice_var = mydevice.get_child(["0:controller", "0:state"])  # get proxy to our device state variable
  90.     # create directly some objects and variables
  91.     myobj = server.nodes.objects.add_object(idx, "MyObject")
  92.     myvar = myobj.add_variable(idx, "MyVariable", 6.7)
  93.     mysin = myobj.add_variable(idx, "MySin", 0, ua.VariantType.Float)
  94.     myvar.set_writable()    # Set MyVariable to be writable by clients
  95.     mystringvar = myobj.add_variable(idx, "MyStringVariable", "Really nice string")
  96.     mystringvar.set_writable()    # Set MyVariable to be writable by clients
  97.     mydtvar = myobj.add_variable(idx, "MyDateTimeVar", datetime.utcnow())
  98.     mydtvar.set_writable()    # Set MyVariable to be writable by clients
  99.     myarrayvar = myobj.add_variable(idx, "myarrayvar", [6.7, 7.9])
  100.     myarrayvar = myobj.add_variable(idx, "myStronglytTypedVariable", ua.Variant([], ua.VariantType.UInt32))
  101.     myprop = myobj.add_property(idx, "myproperty", "I am a property")
  102.     mymethod = myobj.add_method(idx, "mymethod", func, [ua.VariantType.Int64], [ua.VariantType.Boolean])
  103.     multiply_node = myobj.add_method(idx, "multiply", multiply, [ua.VariantType.Int64, ua.VariantType.Int64], [ua.VariantType.Int64])

  104.     # import some nodes from xml
  105.     # server.import_xml("custom_nodes.xml")

  106.     # creating a default event object
  107.     # The event object automatically will have members for all events properties
  108.     # you probably want to create a custom event type, see other examples
  109.     myevgen = server.get_event_generator()
  110.     myevgen.event.Severity = 300

  111.     # starting!
  112.     server.start()
  113.     print("Available loggers are: ", logging.Logger.manager.loggerDict.keys())
  114.     vup = VarUpdater(mysin)  # just  a stupide class update a variable
  115.     vup.start()
  116.     try:
  117.         # enable following if you want to subscribe to nodes on server side
  118.         #handler = SubHandler()
  119.         #sub = server.create_subscription(500, handler)
  120.         #handle = sub.subscribe_data_change(myvar)
  121.         # trigger event, all subscribed clients wil receive it
  122.         var = myarrayvar.get_value()  # return a ref to value in db server side! not a copy!
  123.         var = copy.copy(var)  # WARNING: we need to copy before writting again otherwise no data change event will be generated
  124.         var.append(9.3)
  125.         myarrayvar.set_value(var)
  126.         mydevice_var.set_value("Running")
  127.         myevgen.trigger(message="This is BaseEvent")
  128.         server.set_attribute_value(myvar.nodeid, ua.DataValue(9.9))  # Server side write method which is a but faster than using set_value

  129.         embed()
  130.     finally:
  131.         vup.stop()
  132.         server.stop()
复制代码
这个服务程序演示了opcua服务端,几乎所有的功能,其中Event部分没有不断发送,所以仅供参考。

下图展示了server端的对象结构



客户端
  1. from IPython import embed
  2. from opcua import Client

  3. class SubHandler(object):
  4.     def event_notification(self, event):
  5.         print("Event:", event.EventId, event.Time, event.proper_random, event.Message.Text)

  6. def main_c():
  7.     url = "opc.tcp://127.0.0.1:4841/freeopcua/server/"
  8.     c = Client(url)
  9.     try:
  10.         c.connect()
  11.         root = c.get_root_node()
  12.         embed()
  13.     except Exception as e:
  14.         print("Client Exception:", e)
  15.     finally:
  16.         c.disconnect()
  17.         
  18. if __name__ == "__main__":
  19.     main_c()
复制代码
客户端遍历流程


  • 1.获取
    1. root
    复制代码
    下的
    1. Objects
    复制代码
    节点的所有子节点,一般类型都为
    1. Object
    复制代码
    ,取一代表为A
  • 2.获取A的
    1. NodeClass
    复制代码
    (一般为Object)和
    1. TypeDefinition
    复制代码
    ,其中NodeId需要与路径
    1. ["0:Types","0:ObjectTypes","0:BaseObjectType"]
    复制代码
    下的类型对照(自定义类型也在内)。
  • 3.继续以A为根,遍历子节点,如果时Object继续2,如果是
    1. Variable
    复制代码
    ,NodeId对照
    1. ["0:Types","0:VariableTypes","0:BaseVariableType"]
    复制代码
    ,可以判断是变量(variable,63)还是属性(property,65)。
  • 4.对于
    1. Method
    复制代码
    使用
    1. a_root.call_method(a, arg1)
    复制代码
    调用。对于
    1. Variable
    复制代码
    使用
    1. get_value/get_data_value()
    复制代码
    获取存储的值。如果是
    1. Object
    复制代码
    继续2.
  • 5.对于
    1. Variable
    复制代码
    使用
    1. access_level
    复制代码
    获取是否有写权限,如果有
    1. set_value
    复制代码
    可以设置值。

一个比较完美的遍历客户端
  1. from opcua import Client, ua


  2. def brower_child(root):
  3.     """
  4.     递归调用遍历,格式化不好做,有深度问题
  5.     """
  6.     name = root.get_node_class().name
  7.     # print(name)
  8.     if name == "Object":
  9.         brower_obj(root)
  10.         for c in root.get_children():
  11.             print("  ", end='')
  12.             brower_child(c)
  13.     elif name == 'Variable':
  14.         brower_var(root)
  15.     else:
  16.         brower_method(root)


  17. class CurState():
  18.     def __init__(self, parent=None, p=None, d=0):
  19.         self.parent = parent  # unused
  20.         self.p = p
  21.         self.d = d


  22. def brower_child2(root, max_d=-1, ignore=[]):
  23.     """
  24.     栈+循环遍历,非常好用
  25.     """
  26.     stack = [CurState(None, root, 0)]
  27.     while len(stack):
  28.         cur = stack.pop()
  29.         name = cur.p.get_node_class().name

  30.         print(''.join(['  ' for i in range(cur.d)]), end="")

  31.         if cur.p.get_browse_name().Name in ignore:
  32.             continue

  33.         if name == "Object":
  34.             brower_obj(cur.p)
  35.             if max_d > 0 and cur.d >= max_d:
  36.                 continue
  37.             for c in cur.p.get_children():
  38.                 stack.append(CurState(cur.p, c, cur.d+1))

  39.         elif name == 'Variable':
  40.             brower_var(cur.p)
  41.         else:
  42.             brower_method(cur.p)


  43. def brower_obj(v):
  44.     # print(v.get_browse_name())
  45.     rw = 'R '
  46.     bname = v.get_browse_name()
  47.     print("*%2d:%-30s (%-2s, %-23s)" %
  48.           (bname.NamespaceIndex, bname.Name, rw, "Object"))


  49. def brower_var(v):
  50.     # print(v.get_browse_name())
  51.     rw = 'R '
  52.     if ua.AccessLevel.CurrentWrite in v.get_access_level():
  53.         rw = "RW"
  54.     bname = v.get_browse_name()
  55.     tv = v.get_data_value().Value
  56.     v_show = tv.Value
  57.     if len(str(v_show)) > 1024:
  58.         v_show = str(v_show[:56]) + "..."
  59.     print("-%2d:%-30s (%-2s, %-23s) =>" %
  60.           (bname.NamespaceIndex, bname.Name, rw, tv.VariantType), v_show)


  61. def brower_method(v):
  62.     # print(v.get_description())
  63.     rw = 'C '
  64.     bname = v.get_browse_name()
  65.     # args = []
  66.     # for a in v.get_properties():
  67.     #     dt = a.get_data_type().NodeIdType.name
  68.     #     args.append(dt)
  69.     print("@%2d:%-30s (%-2s, %-23s)" %
  70.           (bname.NamespaceIndex, bname.Name, rw, "Method"))


  71. def main_c():
  72.     url = "opc.tcp://127.0.0.1:4841/freeopcua/server/"
  73.     c = Client(url)
  74.     try:
  75.         c.connect()
  76.         root = c.get_root_node()
  77.         print("\r\nBrower:")
  78.         brower_child2(root.get_child(["0:Objects"]), -1, ["Server"])
  79.     except Exception as e:
  80.         print("Client Exception:", e)
  81.     finally:
  82.         c.disconnect()


  83. if __name__ == "__main__":
  84.     main_c()
复制代码
总结

以上为个人经验,希望能给大家一个参考,也希望大家多多支持脚本之家。

来源:https://www.jb51.net/python/31619408f.htm
免责声明:由于采集信息均来自互联网,如果侵犯了您的权益,请联系我们【E-Mail:cb@itdo.tech】 我们会及时删除侵权内容,谢谢合作!

本帖子中包含更多资源

您需要 登录 才可以下载或查看,没有账号?立即注册

x

举报 回复 使用道具