|
OPCUA-Python
Server
- from threading import Thread
- import copy
- import logging
- from datetime import datetime
- import time
- from math import sin
- import sys
- sys.path.insert(0, "..")
- try:
- from IPython import embed
- except ImportError:
- import code
- def embed():
- myvars = globals()
- myvars.update(locals())
- shell = code.InteractiveConsole(myvars)
- shell.interact()
- from opcua import ua, uamethod, Server
- class SubHandler(object):
- """
- Subscription Handler. To receive events from server for a subscription
- """
- def datachange_notification(self, node, val, data):
- print("Python: New data change event", node, val)
- def event_notification(self, event):
- print("Python: New event", event)
- # method to be exposed through server
- def func(parent, variant):
- ret = False
- if variant.Value % 2 == 0:
- ret = True
- return [ua.Variant(ret, ua.VariantType.Boolean)]
- # method to be exposed through server
- # uses a decorator to automatically convert to and from variants
- @uamethod
- def multiply(parent, x, y):
- print("multiply method call with parameters: ", x, y)
- return x * y
- class VarUpdater(Thread):
- def __init__(self, var):
- Thread.__init__(self)
- self._stopev = False
- self.var = var
- def stop(self):
- self._stopev = True
- def run(self):
- while not self._stopev:
- v = sin(time.time() / 10)
- self.var.set_value(v)
- time.sleep(0.1)
- if __name__ == "__main__":
- # optional: setup logging
- logging.basicConfig(level=logging.WARN)
- #logger = logging.getLogger("opcua.address_space")
- # logger.setLevel(logging.DEBUG)
- #logger = logging.getLogger("opcua.internal_server")
- # logger.setLevel(logging.DEBUG)
- #logger = logging.getLogger("opcua.binary_server_asyncio")
- # logger.setLevel(logging.DEBUG)
- #logger = logging.getLogger("opcua.uaprocessor")
- # logger.setLevel(logging.DEBUG)
- # now setup our server
- server = Server()
- #server.disable_clock()
- #server.set_endpoint("opc.tcp://localhost:4840/freeopcua/server/")
- server.set_endpoint("opc.tcp://0.0.0.0:4841/freeopcua/server/")
- server.set_server_name("FreeOpcUa Example Server")
- # set all possible endpoint policies for clients to connect through
- server.set_security_policy([
- ua.SecurityPolicyType.NoSecurity,
- # ua.SecurityPolicyType.Basic256Sha256_SignAndEncrypt,
- # ua.SecurityPolicyType.Basic256Sha256_Sign
- ])
- # setup our own namespace
- uri = "http://examples.freeopcua.github.io"
- idx = server.register_namespace(uri)
- # create a new node type we can instantiate in our address space
- dev = server.nodes.base_object_type.add_object_type(0, "MyDevice")
- dev.add_variable(0, "sensor1", 1.0).set_modelling_rule(True)
- dev.add_property(0, "device_id", "0340").set_modelling_rule(True)
- ctrl = dev.add_object(0, "controller")
- ctrl.set_modelling_rule(True)
- ctrl.add_property(0, "state", "Idle").set_modelling_rule(True)
- # populating our address space
- # First a folder to organise our nodes
- myfolder = server.nodes.objects.add_folder(idx, "myEmptyFolder")
- # instanciate one instance of our device
- mydevice = server.nodes.objects.add_object(idx, "Device0001", dev)
- mydevice_var = mydevice.get_child(["0:controller", "0:state"]) # get proxy to our device state variable
- # create directly some objects and variables
- myobj = server.nodes.objects.add_object(idx, "MyObject")
- myvar = myobj.add_variable(idx, "MyVariable", 6.7)
- mysin = myobj.add_variable(idx, "MySin", 0, ua.VariantType.Float)
- myvar.set_writable() # Set MyVariable to be writable by clients
- mystringvar = myobj.add_variable(idx, "MyStringVariable", "Really nice string")
- mystringvar.set_writable() # Set MyVariable to be writable by clients
- mydtvar = myobj.add_variable(idx, "MyDateTimeVar", datetime.utcnow())
- mydtvar.set_writable() # Set MyVariable to be writable by clients
- myarrayvar = myobj.add_variable(idx, "myarrayvar", [6.7, 7.9])
- myarrayvar = myobj.add_variable(idx, "myStronglytTypedVariable", ua.Variant([], ua.VariantType.UInt32))
- myprop = myobj.add_property(idx, "myproperty", "I am a property")
- mymethod = myobj.add_method(idx, "mymethod", func, [ua.VariantType.Int64], [ua.VariantType.Boolean])
- multiply_node = myobj.add_method(idx, "multiply", multiply, [ua.VariantType.Int64, ua.VariantType.Int64], [ua.VariantType.Int64])
- # import some nodes from xml
- # server.import_xml("custom_nodes.xml")
- # creating a default event object
- # The event object automatically will have members for all events properties
- # you probably want to create a custom event type, see other examples
- myevgen = server.get_event_generator()
- myevgen.event.Severity = 300
- # starting!
- server.start()
- print("Available loggers are: ", logging.Logger.manager.loggerDict.keys())
- vup = VarUpdater(mysin) # just a stupide class update a variable
- vup.start()
- try:
- # enable following if you want to subscribe to nodes on server side
- #handler = SubHandler()
- #sub = server.create_subscription(500, handler)
- #handle = sub.subscribe_data_change(myvar)
- # trigger event, all subscribed clients wil receive it
- var = myarrayvar.get_value() # return a ref to value in db server side! not a copy!
- var = copy.copy(var) # WARNING: we need to copy before writting again otherwise no data change event will be generated
- var.append(9.3)
- myarrayvar.set_value(var)
- mydevice_var.set_value("Running")
- myevgen.trigger(message="This is BaseEvent")
- server.set_attribute_value(myvar.nodeid, ua.DataValue(9.9)) # Server side write method which is a but faster than using set_value
- embed()
- finally:
- vup.stop()
- server.stop()
复制代码 这个服务程序演示了opcua服务端,几乎所有的功能,其中Event部分没有不断发送,所以仅供参考。
下图展示了server端的对象结构
客户端
- from IPython import embed
- from opcua import Client
- class SubHandler(object):
- def event_notification(self, event):
- print("Event:", event.EventId, event.Time, event.proper_random, event.Message.Text)
- def main_c():
- url = "opc.tcp://127.0.0.1:4841/freeopcua/server/"
- c = Client(url)
- try:
- c.connect()
- root = c.get_root_node()
- embed()
- except Exception as e:
- print("Client Exception:", e)
- finally:
- c.disconnect()
-
- if __name__ == "__main__":
- main_c()
复制代码 客户端遍历流程
- 1.获取下的节点的所有子节点,一般类型都为,取一代表为A
- 2.获取A的(一般为Object)和,其中NodeId需要与路径
- ["0:Types","0:ObjectTypes","0:BaseObjectType"]
复制代码 下的类型对照(自定义类型也在内)。
- 3.继续以A为根,遍历子节点,如果时Object继续2,如果是,NodeId对照
- ["0:Types","0:VariableTypes","0:BaseVariableType"]
复制代码 ,可以判断是变量(variable,63)还是属性(property,65)。
- 4.对于使用
- a_root.call_method(a, arg1)
复制代码 调用。对于使用- get_value/get_data_value()
复制代码 获取存储的值。如果是继续2.
- 5.对于使用获取是否有写权限,如果有可以设置值。
一个比较完美的遍历客户端
- from opcua import Client, ua
- def brower_child(root):
- """
- 递归调用遍历,格式化不好做,有深度问题
- """
- name = root.get_node_class().name
- # print(name)
- if name == "Object":
- brower_obj(root)
- for c in root.get_children():
- print(" ", end='')
- brower_child(c)
- elif name == 'Variable':
- brower_var(root)
- else:
- brower_method(root)
- class CurState():
- def __init__(self, parent=None, p=None, d=0):
- self.parent = parent # unused
- self.p = p
- self.d = d
- def brower_child2(root, max_d=-1, ignore=[]):
- """
- 栈+循环遍历,非常好用
- """
- stack = [CurState(None, root, 0)]
- while len(stack):
- cur = stack.pop()
- name = cur.p.get_node_class().name
- print(''.join([' ' for i in range(cur.d)]), end="")
- if cur.p.get_browse_name().Name in ignore:
- continue
- if name == "Object":
- brower_obj(cur.p)
- if max_d > 0 and cur.d >= max_d:
- continue
- for c in cur.p.get_children():
- stack.append(CurState(cur.p, c, cur.d+1))
- elif name == 'Variable':
- brower_var(cur.p)
- else:
- brower_method(cur.p)
- def brower_obj(v):
- # print(v.get_browse_name())
- rw = 'R '
- bname = v.get_browse_name()
- print("*%2d:%-30s (%-2s, %-23s)" %
- (bname.NamespaceIndex, bname.Name, rw, "Object"))
- def brower_var(v):
- # print(v.get_browse_name())
- rw = 'R '
- if ua.AccessLevel.CurrentWrite in v.get_access_level():
- rw = "RW"
- bname = v.get_browse_name()
- tv = v.get_data_value().Value
- v_show = tv.Value
- if len(str(v_show)) > 1024:
- v_show = str(v_show[:56]) + "..."
- print("-%2d:%-30s (%-2s, %-23s) =>" %
- (bname.NamespaceIndex, bname.Name, rw, tv.VariantType), v_show)
- def brower_method(v):
- # print(v.get_description())
- rw = 'C '
- bname = v.get_browse_name()
- # args = []
- # for a in v.get_properties():
- # dt = a.get_data_type().NodeIdType.name
- # args.append(dt)
- print("@%2d:%-30s (%-2s, %-23s)" %
- (bname.NamespaceIndex, bname.Name, rw, "Method"))
- def main_c():
- url = "opc.tcp://127.0.0.1:4841/freeopcua/server/"
- c = Client(url)
- try:
- c.connect()
- root = c.get_root_node()
- print("\r\nBrower:")
- brower_child2(root.get_child(["0:Objects"]), -1, ["Server"])
- except Exception as e:
- print("Client Exception:", e)
- finally:
- c.disconnect()
- if __name__ == "__main__":
- main_c()
复制代码 总结
以上为个人经验,希望能给大家一个参考,也希望大家多多支持脚本之家。
来源:https://www.jb51.net/python/31619408f.htm
免责声明:由于采集信息均来自互联网,如果侵犯了您的权益,请联系我们【E-Mail:cb@itdo.tech】 我们会及时删除侵权内容,谢谢合作! |
本帖子中包含更多资源
您需要 登录 才可以下载或查看,没有账号?立即注册
x
|