Python ModbusSerialClient.write

您所在的位置:网站首页 modbus_write_registers Python ModbusSerialClient.write

Python ModbusSerialClient.write

2023-06-13 05:21| 来源: 网络整理| 查看: 265

本文整理汇总了Python中pymodbus.client.sync.ModbusSerialClient.write_register方法的典型用法代码示例。如果您正苦于以下问题:Python ModbusSerialClient.write_register方法的具体用法?Python ModbusSerialClient.write_register怎么用?Python ModbusSerialClient.write_register使用的例子?那么恭喜您, 这里精选的方法代码示例或许可以为您提供帮助。您也可以进一步了解该方法所在pymodbus.client.sync.ModbusSerialClient的用法示例。

在下文中一共展示了ModbusSerialClient.write_register方法的11个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。

示例1: ModbusClient # 需要导入模块: from pymodbus.client.sync import ModbusSerialClient [as 别名] # 或者: from pymodbus.client.sync.ModbusSerialClient import write_register [as 别名] import logging logging.basicConfig() log = logging.getLogger() log.setLevel(logging.DEBUG) client = ModbusClient(method='ascii', port='com3', baudrate='115200', timeout=1) print ("Init Modbus Comm ") print client #example of modbus code ready for commissioning #for x in range(1, 10): # coil1 = client.read_coils(0, 2, unit=1) # print coil1 # time.sleep(1) #client.close() client.write_register(D_AddressRef(600), 123, unit=2 ) #unit=2 : mean PLC server address = 2 # def write_register(self, address, value, **kwargs): // Extracted from pymodbus\client\common.py # ''' # :param address: The starting address to write to # :param value: The value to write to the specified address # :param unit: The slave unit this request is targeting # :returns: A deferred response handle #---------------------------------------------------------------------------# # ShortestDistance #---------------------------------------------------------------------------# #function to find shortest distance between 2 line #provide 2 point of each line #find out what is the 2 closest point #which each belong to a separated line def ShortestDistance(x1,y1,x2,y2,x3,y3,x4,y4):开发者ID:jackietrandt,项目名称:LineCamera,代码行数:33,代码来源:CameraDual.py 示例2: ModbusClient # 需要导入模块: from pymodbus.client.sync import ModbusSerialClient [as 别名] # 或者: from pymodbus.client.sync.ModbusSerialClient import write_register [as 别名] Deps: Pyserial, Pymodbus, logging """ import time # For sleep functionality import logging # For detailed error output from pymodbus.client.sync import ModbusSerialClient \ as ModbusClient # Import MODBUS support class comSettings = { "method" : 'rtu', "port" : 'COM3', "stopbits" : 1, "bytesize" : 8, "parity" : 'N', "baudrate" : 9600, "timeout" : 1 } logging.basicConfig() # Setup error logging log = logging.getLogger() # Start logging client = ModbusClient(**comSettings) # Setup connection object client.connect() # Open the MODBUS connection while(True): client.write_register(3,1000,unit=0x01) # Write valve to 100% time.sleep(4) # Sleep 4 seconds client.write_register(3,0,unit=0x01) # Write valve to 0% time.sleep(4) # Sleep 4 seconds client.close() # Close the connection开发者ID:FlaminMad,项目名称:researchProject,代码行数:33,代码来源:WriteTest.py 示例3: run_sync_client # 需要导入模块: from pymodbus.client.sync import ModbusSerialClient [as 别名] # 或者: from pymodbus.client.sync.ModbusSerialClient import write_register [as 别名] def run_sync_client(): # ------------------------------------------------------------------------# # choose the client you want # ------------------------------------------------------------------------# # make sure to start an implementation to hit against. For this # you can use an existing device, the reference implementation in the tools # directory, or start a pymodbus server. # # If you use the UDP or TCP clients, you can override the framer being used # to use a custom implementation (say RTU over TCP). By default they use # the socket framer:: # # client = ModbusClient('localhost', port=5020, framer=ModbusRtuFramer) # # It should be noted that you can supply an ipv4 or an ipv6 host address # for both the UDP and TCP clients. # # There are also other options that can be set on the client that controls # how transactions are performed. The current ones are: # # * retries - Specify how many retries to allow per transaction (default=3) # * retry_on_empty - Is an empty response a retry (default = False) # * source_address - Specifies the TCP source address to bind to # * strict - Applicable only for Modbus RTU clients. # Adheres to modbus protocol for timing restrictions # (default = True). # Setting this to False would disable the inter char timeout # restriction (t1.5) for Modbus RTU # # # Here is an example of using these options:: # # client = ModbusClient('localhost', retries=3, retry_on_empty=True) # ------------------------------------------------------------------------# client = ModbusClient('localhost', port=5020) # from pymodbus.transaction import ModbusRtuFramer # client = ModbusClient('localhost', port=5020, framer=ModbusRtuFramer) # client = ModbusClient(method='binary', port='/dev/ptyp0', timeout=1) # client = ModbusClient(method='ascii', port='/dev/ptyp0', timeout=1) # client = ModbusClient(method='rtu', port='/dev/ptyp0', timeout=1, # baudrate=9600) client.connect() # ------------------------------------------------------------------------# # specify slave to query # ------------------------------------------------------------------------# # The slave to query is specified in an optional parameter for each # individual request. This can be done by specifying the `unit` parameter # which defaults to `0x00` # ----------------------------------------------------------------------- # log.debug("Reading Coils") rr = client.read_coils(1, 1, unit=UNIT) log.debug(rr) # ----------------------------------------------------------------------- # # example requests # ----------------------------------------------------------------------- # # simply call the methods that you would like to use. An example session # is displayed below along with some assert checks. Note that some modbus # implementations differentiate holding/input discrete/coils and as such # you will not be able to write to these, therefore the starting values # are not known to these tests. Furthermore, some use the same memory # blocks for the two sets, so a change to one is a change to the other. # Keep both of these cases in mind when testing as the following will # _only_ pass with the supplied asynchronous modbus server (script supplied). # ----------------------------------------------------------------------- # log.debug("Write to a Coil and read back") rq = client.write_coil(0, True, unit=UNIT) rr = client.read_coils(0, 1, unit=UNIT) assert(not rq.isError()) # test that we are not an error assert(rr.bits[0] == True) # test the expected value log.debug("Write to multiple coils and read back- test 1") rq = client.write_coils(1, [True]*8, unit=UNIT) assert(not rq.isError()) # test that we are not an error rr = client.read_coils(1, 21, unit=UNIT) assert(not rr.isError()) # test that we are not an error resp = [True]*21 # If the returned output quantity is not a multiple of eight, # the remaining bits in the final data byte will be padded with zeros # (toward the high order end of the byte). resp.extend([False]*3) assert(rr.bits == resp) # test the expected value log.debug("Write to multiple coils and read back - test 2") rq = client.write_coils(1, [False]*8, unit=UNIT) rr = client.read_coils(1, 8, unit=UNIT) assert(not rq.isError()) # test that we are not an error assert(rr.bits == [False]*8) # test the expected value log.debug("Read discrete inputs") rr = client.read_discrete_inputs(0, 8, unit=UNIT) assert(not rq.isError()) # test that we are not an error log.debug("Write to a holding register and read back") rq = client.write_register(1, 10, unit=UNIT) rr = client.read_holding_registers(1, 1, unit=UNIT) #.........这里部分代码省略......... 开发者ID:bashwork,项目名称:pymodbus,代码行数:103,代码来源:synchronous_client.py 示例4: __init__ # 需要导入模块: from pymodbus.client.sync import ModbusSerialClient [as 别名] # 或者: from pymodbus.client.sync.ModbusSerialClient import write_register [as 别名] class Com_Modbus: def __init__(self): #logging.basicConfig() #log = logging.getLogger() #log.setLevel(logging.DEBUG) #scan all available com port port_list = self.scan() s = None print "Found ports:" for n,s in port_list: print "____(%d) %s" % (n,s) if s None: self.client = ModbusClient(method='ascii', port=s, stopbits = 1, parity = 'E', bytesize = 7, baudrate='9600', timeout=1) connect = self.client.connect() print "Com is connected =",connect print "Init Modbus Comm = ",self.client #Scan all available com port on this machine - return list of connected usb com port def scan(self): # scan for available ports. return a list of tuples (num, name) available = [] for i in range(256): try: s = serial.Serial(i) available.append( (i, s.portstr)) s.close() except serial.SerialException: pass return available def D_AddressRef(self,d_Address): #---------------------------------------------------------------------------# # D_AddressRef #---------------------------------------------------------------------------# #Input address in decimal - then function would convert it to PLC address #Address 0x1000 stand for D register in PLC #Address 0x0258 stand for 600 in decimal #So to write to D600 register in the PLC #The reference address is 0x1258 d_Working = 4096 d_Working = d_Working + d_Address return d_Working #_____________________________________________________________________________# # Usage example #_____________________________________________________________________________# #client.write_register(D_AddressRef(600), 123, unit=2 ) #unit=2 : mean PLC server address = 2 # def write_register(self, address, value, **kwargs): // Extracted from pymodbus\client\common.py # ''' # :param address: The starting address to write to # :param value: The value to write to the specified address # :param unit: The slave unit this request is targeting # :returns: A deferred response handle def Send_register(self,address,data): self.client.write_register(self.D_AddressRef(address), data, unit = 1 ) print 'sent' pass #read back single register def Read_register(self,address): #read_coils(address, count=1, unit=0) register_read = self.client.read_holding_registers(self.D_AddressRef(address),1, unit = 1) if register_read None: return register_read.registers[0] else: return None pass #read back multiple register def Read_register_multi(self,address,length): #read_coils(address, count=1, unit=0) register_read = self.client.read_holding_registers(self.D_AddressRef(address),length, unit = 1) print register_read.registers[0] return register_read pass开发者ID:jackietrandt,项目名称:ComServer,代码行数:80,代码来源:ComServer.py 示例5: Controller # 需要导入模块: from pymodbus.client.sync import ModbusSerialClient [as 别名] # 或者: from pymodbus.client.sync.ModbusSerialClient import write_register [as 别名] class Controller(Thread): MOTION = {"POS": "AB", "FWD": "BB", "BCK": "CB"} STOP = {"Hard": 0xABFF, "Smooth": 0xCDFF, "Other": 0xCBFF} CMDS = {'FWD': {'LEFT': 0xBB05, 'RIGHT': 0xCB0A}, 'BCK': {'LEFT': 0xCB05, 'RIGHT': 0xBB0A}} def __init__(self, port='/dev/ttyS0'): Thread.__init__(self) # connect to COM self.client = ModbusSerialClient(method='rtu', port=port,\ baudrate=115200, stopbits=1, parity='N', bytesize=8, timeout=0.1) def cmd_listener(self): # TODO: cmd input. callback? pass def read_register(self, address, size=16, unit=1): _r = self.client.read_holding_registers(address, size, unit) if _r: return [hex(x) for x in _r.registers] return None def write_register(self, address, value, unit=1): _r = self.client.write_register(address, value, unit=1) if _r: return _r return None def start_motor(self, motor, mode): print int(mode+motor, 16) _r = self.write_register(1, int(mode+motor, 16)) print _r def start_motor_raw(self, value): _r = self.write_register(1, value) print _r def stop_motor(self, mode=STOP['Hard']): _r = self.write_register(2, mode) print _r def go_forward(self, _stop='Hard'): self.stop_motor(self.STOP[_stop]) self.start_motor_raw(self.CMDS['FWD']['RIGHT']) self.start_motor_raw(self.CMDS['FWD']['LEFT']) def go_back(self, _stop='Hard'): self.stop_motor(self.STOP[_stop]) self.start_motor_raw(self.CMDS['BCK']['RIGHT']) self.start_motor_raw(self.CMDS['BCK']['LEFT']) def turn_right(self, _stop='Hard'): self.stop_motor(self.STOP[_stop]) self.start_motor_raw(self.CMDS['FWD']['RIGHT']) self.start_motor_raw(self.CMDS['BCK']['LEFT']) def turn_left(self, _stop='Hard'): self.stop_motor(self.STOP[_stop]) self.start_motor_raw(self.CMDS['BCK']['RIGHT']) self.start_motor_raw(self.CMDS['FWD']['LEFT']) def set_global_velocity(self, speed): for motor in range(1, 5): self.set_velocity(motor, speed) def set_rwheel_velocity(self, speed): self.set_velocity(1, speed) self.set_velocity(3, speed) def set_lwheel_velocity(self, speed): self.set_velocity(2, speed) self.set_velocity(4, speed) def set_velocity(self, motor, speed): # TODO: speed in m/s velocity = speed low = velocity % 65536 hi = velocity / 65536 motor -= 1 reg_base = 35 + (motor*7) _r1 = self.write_register(reg_base+2, hi) _r2 = self.write_register(reg_base+3, low) print _r1, _r2 def set_acceleration(self, motor, acceleration): # TODO: acceleration in m/s^2 low = acceleration % 65536 hi = acceleration / 65536 motor -= 1 reg_base = 35 + (motor*7) _r1 = self.write_register(reg_base+4, hi) _r2 = self.write_register(reg_base+5, low) print _r1, _r2 def get_velocity(self, motor, speed): # TODO pass def get_acceleration(self, motor, speed): # TODO #.........这里部分代码省略......... 开发者ID:karmeluk,项目名称:base_controller,代码行数:103,代码来源:controller.py 示例6: device # 需要导入模块: from pymodbus.client.sync import ModbusSerialClient [as 别名] # 或者: from pymodbus.client.sync.ModbusSerialClient import write_register [as 别名] #.........这里部分代码省略......... def _disconnect(self): """ close the connection to the modbus slave (server) """ self.slave.close() def request_data(self): """ """ if not driver_ok: return None if not self._connect(): if self._device_not_accessible == -1: # logger.error("device with id: %d is not accessible" % self.device.pk) self._device_not_accessible -= 1 return [] output = [] for register_block in self._variable_config: result = register_block.request_data(self.slave, self._unit_id) if result is None: self._disconnect() self._connect() result = register_block.request_data(self.slave, self._unit_id) if result is not None: for variable_id in register_block.variables: if self.variables[variable_id].update_value(result[variable_id], time()): recorded_data_element = self.variables[variable_id].create_recorded_data_element() if recorded_data_element is not None: output.append(recorded_data_element) if self.variables[variable_id].accessible < 1: logger.info("variable with id: %d is now accessible" % variable_id) self.variables[variable_id].accessible = 1 else: for variable_id in register_block.variables: if self.variables[variable_id].accessible == -1: logger.error("variable with id: %d is not accessible" % variable_id) self.variables[variable_id].update_value(None, time()) self.variables[variable_id].accessible -= 1 # reset device not accessible status if self._device_not_accessible


【本文地址】


今日新闻


推荐新闻


    CopyRight 2018-2019 办公设备维修网 版权所有 豫ICP备15022753号-3