潘小伟 发表于 2024-2-25 05:08:42

OPCUA-Python实例

OPCUA-Python

pip install opcua
Server

from threading import Thread
import copy
import logging
from datetime import datetime
import time
from math import sin
import sys
sys.path.insert(0, "..")

try:
    from IPython import embed
except ImportError:
    import code

    def embed():
      myvars = globals()
      myvars.update(locals())
      shell = code.InteractiveConsole(myvars)
      shell.interact()


from opcua import ua, uamethod, Server


class SubHandler(object):

    """
    Subscription Handler. To receive events from server for a subscription
    """

    def datachange_notification(self, node, val, data):
      print("Python: New data change event", node, val)

    def event_notification(self, event):
      print("Python: New event", event)


# method to be exposed through server

def func(parent, variant):
    ret = False
    if variant.Value % 2 == 0:
      ret = True
    return


# method to be exposed through server
# uses a decorator to automatically convert to and from variants

@uamethod
def multiply(parent, x, y):
    print("multiply method call with parameters: ", x, y)
    return x * y


class VarUpdater(Thread):
    def __init__(self, var):
      Thread.__init__(self)
      self._stopev = False
      self.var = var

    def stop(self):
      self._stopev = True

    def run(self):
      while not self._stopev:
            v = sin(time.time() / 10)
            self.var.set_value(v)
            time.sleep(0.1)



if __name__ == "__main__":
    # optional: setup logging
    logging.basicConfig(level=logging.WARN)
    #logger = logging.getLogger("opcua.address_space")
    # logger.setLevel(logging.DEBUG)
    #logger = logging.getLogger("opcua.internal_server")
    # logger.setLevel(logging.DEBUG)
    #logger = logging.getLogger("opcua.binary_server_asyncio")
    # logger.setLevel(logging.DEBUG)
    #logger = logging.getLogger("opcua.uaprocessor")
    # logger.setLevel(logging.DEBUG)

    # now setup our server
    server = Server()
    #server.disable_clock()
    #server.set_endpoint("opc.tcp://localhost:4840/freeopcua/server/")
    server.set_endpoint("opc.tcp://0.0.0.0:4841/freeopcua/server/")
    server.set_server_name("FreeOpcUa Example Server")
    # set all possible endpoint policies for clients to connect through
    server.set_security_policy([
      ua.SecurityPolicyType.NoSecurity,
      # ua.SecurityPolicyType.Basic256Sha256_SignAndEncrypt,
      # ua.SecurityPolicyType.Basic256Sha256_Sign
    ])

    # setup our own namespace
    uri = "http://examples.freeopcua.github.io"
    idx = server.register_namespace(uri)

    # create a new node type we can instantiate in our address space
    dev = server.nodes.base_object_type.add_object_type(0, "MyDevice")
    dev.add_variable(0, "sensor1", 1.0).set_modelling_rule(True)
    dev.add_property(0, "device_id", "0340").set_modelling_rule(True)
    ctrl = dev.add_object(0, "controller")
    ctrl.set_modelling_rule(True)
    ctrl.add_property(0, "state", "Idle").set_modelling_rule(True)

    # populating our address space

    # First a folder to organise our nodes
    myfolder = server.nodes.objects.add_folder(idx, "myEmptyFolder")
    # instanciate one instance of our device
    mydevice = server.nodes.objects.add_object(idx, "Device0001", dev)
    mydevice_var = mydevice.get_child(["0:controller", "0:state"])# get proxy to our device state variable
    # create directly some objects and variables
    myobj = server.nodes.objects.add_object(idx, "MyObject")
    myvar = myobj.add_variable(idx, "MyVariable", 6.7)
    mysin = myobj.add_variable(idx, "MySin", 0, ua.VariantType.Float)
    myvar.set_writable()    # Set MyVariable to be writable by clients
    mystringvar = myobj.add_variable(idx, "MyStringVariable", "Really nice string")
    mystringvar.set_writable()    # Set MyVariable to be writable by clients
    mydtvar = myobj.add_variable(idx, "MyDateTimeVar", datetime.utcnow())
    mydtvar.set_writable()    # Set MyVariable to be writable by clients
    myarrayvar = myobj.add_variable(idx, "myarrayvar", )
    myarrayvar = myobj.add_variable(idx, "myStronglytTypedVariable", ua.Variant([], ua.VariantType.UInt32))
    myprop = myobj.add_property(idx, "myproperty", "I am a property")
    mymethod = myobj.add_method(idx, "mymethod", func, , )
    multiply_node = myobj.add_method(idx, "multiply", multiply, , )

    # import some nodes from xml
    # server.import_xml("custom_nodes.xml")

    # creating a default event object
    # The event object automatically will have members for all events properties
    # you probably want to create a custom event type, see other examples
    myevgen = server.get_event_generator()
    myevgen.event.Severity = 300

    # starting!
    server.start()
    print("Available loggers are: ", logging.Logger.manager.loggerDict.keys())
    vup = VarUpdater(mysin)# justa stupide class update a variable
    vup.start()
    try:
      # enable following if you want to subscribe to nodes on server side
      #handler = SubHandler()
      #sub = server.create_subscription(500, handler)
      #handle = sub.subscribe_data_change(myvar)
      # trigger event, all subscribed clients wil receive it
      var = myarrayvar.get_value()# return a ref to value in db server side! not a copy!
      var = copy.copy(var)# WARNING: we need to copy before writting again otherwise no data change event will be generated
      var.append(9.3)
      myarrayvar.set_value(var)
      mydevice_var.set_value("Running")
      myevgen.trigger(message="This is BaseEvent")
      server.set_attribute_value(myvar.nodeid, ua.DataValue(9.9))# Server side write method which is a but faster than using set_value

      embed()
    finally:
      vup.stop()
      server.stop()这个服务程序演示了opcua服务端,几乎所有的功能,其中Event部分没有不断发送,所以仅供参考。

下图展示了server端的对象结构



客户端

from IPython import embed
from opcua import Client

class SubHandler(object):
    def event_notification(self, event):
      print("Event:", event.EventId, event.Time, event.proper_random, event.Message.Text)

def main_c():
    url = "opc.tcp://127.0.0.1:4841/freeopcua/server/"
    c = Client(url)
    try:
      c.connect()
      root = c.get_root_node()
      embed()
    except Exception as e:
      print("Client Exception:", e)
    finally:
      c.disconnect()
      
if __name__ == "__main__":
    main_c()
客户端遍历流程


[*]1.获取root下的Objects节点的所有子节点,一般类型都为Object,取一代表为A
[*]2.获取A的NodeClass(一般为Object)和TypeDefinition,其中NodeId需要与路径["0:Types","0:ObjectTypes","0:BaseObjectType"]下的类型对照(自定义类型也在内)。
[*]3.继续以A为根,遍历子节点,如果时Object继续2,如果是Variable,NodeId对照["0:Types","0:VariableTypes","0:BaseVariableType"],可以判断是变量(variable,63)还是属性(property,65)。
[*]4.对于Method使用a_root.call_method(a, arg1)调用。对于Variable使用get_value/get_data_value()获取存储的值。如果是Object继续2.
[*]5.对于Variable使用access_level获取是否有写权限,如果有set_value可以设置值。

一个比较完美的遍历客户端

from opcua import Client, ua


def brower_child(root):
    """
    递归调用遍历,格式化不好做,有深度问题
    """
    name = root.get_node_class().name
    # print(name)
    if name == "Object":
      brower_obj(root)
      for c in root.get_children():
            print("", end='')
            brower_child(c)
    elif name == 'Variable':
      brower_var(root)
    else:
      brower_method(root)


class CurState():
    def __init__(self, parent=None, p=None, d=0):
      self.parent = parent# unused
      self.p = p
      self.d = d


def brower_child2(root, max_d=-1, ignore=[]):
    """
    栈+循环遍历,非常好用
    """
    stack =
    while len(stack):
      cur = stack.pop()
      name = cur.p.get_node_class().name

      print(''.join(['' for i in range(cur.d)]), end="")

      if cur.p.get_browse_name().Name in ignore:
            continue

      if name == "Object":
            brower_obj(cur.p)
            if max_d > 0 and cur.d >= max_d:
                continue
            for c in cur.p.get_children():
                stack.append(CurState(cur.p, c, cur.d+1))

      elif name == 'Variable':
            brower_var(cur.p)
      else:
            brower_method(cur.p)


def brower_obj(v):
    # print(v.get_browse_name())
    rw = 'R '
    bname = v.get_browse_name()
    print("*%2d:%-30s (%-2s, %-23s)" %
          (bname.NamespaceIndex, bname.Name, rw, "Object"))


def brower_var(v):
    # print(v.get_browse_name())
    rw = 'R '
    if ua.AccessLevel.CurrentWrite in v.get_access_level():
      rw = "RW"
    bname = v.get_browse_name()
    tv = v.get_data_value().Value
    v_show = tv.Value
    if len(str(v_show)) > 1024:
      v_show = str(v_show[:56]) + "..."
    print("-%2d:%-30s (%-2s, %-23s) =>" %
          (bname.NamespaceIndex, bname.Name, rw, tv.VariantType), v_show)


def brower_method(v):
    # print(v.get_description())
    rw = 'C '
    bname = v.get_browse_name()
    # args = []
    # for a in v.get_properties():
    #   dt = a.get_data_type().NodeIdType.name
    #   args.append(dt)
    print("@%2d:%-30s (%-2s, %-23s)" %
          (bname.NamespaceIndex, bname.Name, rw, "Method"))


def main_c():
    url = "opc.tcp://127.0.0.1:4841/freeopcua/server/"
    c = Client(url)
    try:
      c.connect()
      root = c.get_root_node()
      print("\r\nBrower:")
      brower_child2(root.get_child(["0:Objects"]), -1, ["Server"])
    except Exception as e:
      print("Client Exception:", e)
    finally:
      c.disconnect()


if __name__ == "__main__":
    main_c()
总结

以上为个人经验,希望能给大家一个参考,也希望大家多多支持脚本之家。

来源:https://www.jb51.net/python/31619408f.htm
免责声明:由于采集信息均来自互联网,如果侵犯了您的权益,请联系我们【E-Mail:cb@itdo.tech】 我们会及时删除侵权内容,谢谢合作!
页: [1]
查看完整版本: OPCUA-Python实例