本文整理汇总了Python中pymodbus.client.sync.ModbusSerialClient.write_register方法的典型用法代码示例。如果您正苦于以下问题:Python ModbusSerialClient.write_register方法的具体用法?Python ModbusSerialClient.write_register怎么用?Python ModbusSerialClient.write_register使用的例子?那么恭喜您, 这里精选的方法代码示例或许可以为您提供帮助。您也可以进一步了解该方法所在类pymodbus.client.sync.ModbusSerialClient的用法示例。
在下文中一共展示了ModbusSerialClient.write_register方法的11个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。
示例1: ModbusClient
# 需要导入模块: from pymodbus.client.sync import ModbusSerialClient [as 别名]
# 或者: from pymodbus.client.sync.ModbusSerialClient import write_register [as 别名]
import logging
logging.basicConfig()
log = logging.getLogger()
log.setLevel(logging.DEBUG)
client = ModbusClient(method='ascii', port='com3', baudrate='115200', timeout=1)
print ("Init Modbus Comm ")
print client
#example of modbus code ready for commissioning
#for x in range(1, 10):
# coil1 = client.read_coils(0, 2, unit=1)
# print coil1
# time.sleep(1)
#client.close()
client.write_register(D_AddressRef(600), 123, unit=2 ) #unit=2 : mean PLC server address = 2
# def write_register(self, address, value, **kwargs): // Extracted from pymodbus\client\common.py
# '''
# :param address: The starting address to write to
# :param value: The value to write to the specified address
# :param unit: The slave unit this request is targeting
# :returns: A deferred response handle
#---------------------------------------------------------------------------#
# ShortestDistance
#---------------------------------------------------------------------------#
#function to find shortest distance between 2 line
#provide 2 point of each line
#find out what is the 2 closest point
#which each belong to a separated line
def ShortestDistance(x1,y1,x2,y2,x3,y3,x4,y4):开发者ID:jackietrandt,项目名称:LineCamera,代码行数:33,代码来源:CameraDual.py
示例2: ModbusClient
# 需要导入模块: from pymodbus.client.sync import ModbusSerialClient [as 别名]
# 或者: from pymodbus.client.sync.ModbusSerialClient import write_register [as 别名]
Deps: Pyserial, Pymodbus, logging
"""
import time # For sleep functionality
import logging # For detailed error output
from pymodbus.client.sync import ModbusSerialClient \
as ModbusClient # Import MODBUS support class
comSettings = {
"method" : 'rtu',
"port" : 'COM3',
"stopbits" : 1,
"bytesize" : 8,
"parity" : 'N',
"baudrate" : 9600,
"timeout" : 1
}
logging.basicConfig() # Setup error logging
log = logging.getLogger() # Start logging
client = ModbusClient(**comSettings) # Setup connection object
client.connect() # Open the MODBUS connection
while(True):
client.write_register(3,1000,unit=0x01) # Write valve to 100%
time.sleep(4) # Sleep 4 seconds
client.write_register(3,0,unit=0x01) # Write valve to 0%
time.sleep(4) # Sleep 4 seconds
client.close() # Close the connection开发者ID:FlaminMad,项目名称:researchProject,代码行数:33,代码来源:WriteTest.py
示例3: run_sync_client
# 需要导入模块: from pymodbus.client.sync import ModbusSerialClient [as 别名]
# 或者: from pymodbus.client.sync.ModbusSerialClient import write_register [as 别名]
def run_sync_client():
# ------------------------------------------------------------------------#
# choose the client you want
# ------------------------------------------------------------------------#
# make sure to start an implementation to hit against. For this
# you can use an existing device, the reference implementation in the tools
# directory, or start a pymodbus server.
#
# If you use the UDP or TCP clients, you can override the framer being used
# to use a custom implementation (say RTU over TCP). By default they use
# the socket framer::
#
# client = ModbusClient('localhost', port=5020, framer=ModbusRtuFramer)
#
# It should be noted that you can supply an ipv4 or an ipv6 host address
# for both the UDP and TCP clients.
#
# There are also other options that can be set on the client that controls
# how transactions are performed. The current ones are:
#
# * retries - Specify how many retries to allow per transaction (default=3)
# * retry_on_empty - Is an empty response a retry (default = False)
# * source_address - Specifies the TCP source address to bind to
# * strict - Applicable only for Modbus RTU clients.
# Adheres to modbus protocol for timing restrictions
# (default = True).
# Setting this to False would disable the inter char timeout
# restriction (t1.5) for Modbus RTU
#
#
# Here is an example of using these options::
#
# client = ModbusClient('localhost', retries=3, retry_on_empty=True)
# ------------------------------------------------------------------------#
client = ModbusClient('localhost', port=5020)
# from pymodbus.transaction import ModbusRtuFramer
# client = ModbusClient('localhost', port=5020, framer=ModbusRtuFramer)
# client = ModbusClient(method='binary', port='/dev/ptyp0', timeout=1)
# client = ModbusClient(method='ascii', port='/dev/ptyp0', timeout=1)
# client = ModbusClient(method='rtu', port='/dev/ptyp0', timeout=1,
# baudrate=9600)
client.connect()
# ------------------------------------------------------------------------#
# specify slave to query
# ------------------------------------------------------------------------#
# The slave to query is specified in an optional parameter for each
# individual request. This can be done by specifying the `unit` parameter
# which defaults to `0x00`
# ----------------------------------------------------------------------- #
log.debug("Reading Coils")
rr = client.read_coils(1, 1, unit=UNIT)
log.debug(rr)
# ----------------------------------------------------------------------- #
# example requests
# ----------------------------------------------------------------------- #
# simply call the methods that you would like to use. An example session
# is displayed below along with some assert checks. Note that some modbus
# implementations differentiate holding/input discrete/coils and as such
# you will not be able to write to these, therefore the starting values
# are not known to these tests. Furthermore, some use the same memory
# blocks for the two sets, so a change to one is a change to the other.
# Keep both of these cases in mind when testing as the following will
# _only_ pass with the supplied asynchronous modbus server (script supplied).
# ----------------------------------------------------------------------- #
log.debug("Write to a Coil and read back")
rq = client.write_coil(0, True, unit=UNIT)
rr = client.read_coils(0, 1, unit=UNIT)
assert(not rq.isError()) # test that we are not an error
assert(rr.bits[0] == True) # test the expected value
log.debug("Write to multiple coils and read back- test 1")
rq = client.write_coils(1, [True]*8, unit=UNIT)
assert(not rq.isError()) # test that we are not an error
rr = client.read_coils(1, 21, unit=UNIT)
assert(not rr.isError()) # test that we are not an error
resp = [True]*21
# If the returned output quantity is not a multiple of eight,
# the remaining bits in the final data byte will be padded with zeros
# (toward the high order end of the byte).
resp.extend([False]*3)
assert(rr.bits == resp) # test the expected value
log.debug("Write to multiple coils and read back - test 2")
rq = client.write_coils(1, [False]*8, unit=UNIT)
rr = client.read_coils(1, 8, unit=UNIT)
assert(not rq.isError()) # test that we are not an error
assert(rr.bits == [False]*8) # test the expected value
log.debug("Read discrete inputs")
rr = client.read_discrete_inputs(0, 8, unit=UNIT)
assert(not rq.isError()) # test that we are not an error
log.debug("Write to a holding register and read back")
rq = client.write_register(1, 10, unit=UNIT)
rr = client.read_holding_registers(1, 1, unit=UNIT)
#.........这里部分代码省略.........
开发者ID:bashwork,项目名称:pymodbus,代码行数:103,代码来源:synchronous_client.py
示例4: __init__
# 需要导入模块: from pymodbus.client.sync import ModbusSerialClient [as 别名]
# 或者: from pymodbus.client.sync.ModbusSerialClient import write_register [as 别名]
class Com_Modbus:
def __init__(self):
#logging.basicConfig()
#log = logging.getLogger()
#log.setLevel(logging.DEBUG)
#scan all available com port
port_list = self.scan()
s = None
print "Found ports:"
for n,s in port_list: print "____(%d) %s" % (n,s)
if s None:
self.client = ModbusClient(method='ascii', port=s, stopbits = 1, parity = 'E', bytesize = 7, baudrate='9600', timeout=1)
connect = self.client.connect()
print "Com is connected =",connect
print "Init Modbus Comm = ",self.client
#Scan all available com port on this machine - return list of connected usb com port
def scan(self):
# scan for available ports. return a list of tuples (num, name)
available = []
for i in range(256):
try:
s = serial.Serial(i)
available.append( (i, s.portstr))
s.close()
except serial.SerialException:
pass
return available
def D_AddressRef(self,d_Address):
#---------------------------------------------------------------------------#
# D_AddressRef
#---------------------------------------------------------------------------#
#Input address in decimal - then function would convert it to PLC address
#Address 0x1000 stand for D register in PLC
#Address 0x0258 stand for 600 in decimal
#So to write to D600 register in the PLC
#The reference address is 0x1258
d_Working = 4096
d_Working = d_Working + d_Address
return d_Working
#_____________________________________________________________________________#
# Usage example
#_____________________________________________________________________________#
#client.write_register(D_AddressRef(600), 123, unit=2 ) #unit=2 : mean PLC server address = 2
# def write_register(self, address, value, **kwargs): // Extracted from pymodbus\client\common.py
# '''
# :param address: The starting address to write to
# :param value: The value to write to the specified address
# :param unit: The slave unit this request is targeting
# :returns: A deferred response handle
def Send_register(self,address,data):
self.client.write_register(self.D_AddressRef(address), data, unit = 1 )
print 'sent'
pass
#read back single register
def Read_register(self,address):
#read_coils(address, count=1, unit=0)
register_read = self.client.read_holding_registers(self.D_AddressRef(address),1, unit = 1)
if register_read None:
return register_read.registers[0]
else:
return None
pass
#read back multiple register
def Read_register_multi(self,address,length):
#read_coils(address, count=1, unit=0)
register_read = self.client.read_holding_registers(self.D_AddressRef(address),length, unit = 1)
print register_read.registers[0]
return register_read
pass开发者ID:jackietrandt,项目名称:ComServer,代码行数:80,代码来源:ComServer.py
示例5: Controller
# 需要导入模块: from pymodbus.client.sync import ModbusSerialClient [as 别名]
# 或者: from pymodbus.client.sync.ModbusSerialClient import write_register [as 别名]
class Controller(Thread):
MOTION = {"POS": "AB", "FWD": "BB", "BCK": "CB"}
STOP = {"Hard": 0xABFF, "Smooth": 0xCDFF, "Other": 0xCBFF}
CMDS = {'FWD': {'LEFT': 0xBB05, 'RIGHT': 0xCB0A}, 'BCK': {'LEFT': 0xCB05, 'RIGHT': 0xBB0A}}
def __init__(self, port='/dev/ttyS0'):
Thread.__init__(self)
# connect to COM
self.client = ModbusSerialClient(method='rtu', port=port,\
baudrate=115200, stopbits=1, parity='N', bytesize=8, timeout=0.1)
def cmd_listener(self):
# TODO: cmd input. callback?
pass
def read_register(self, address, size=16, unit=1):
_r = self.client.read_holding_registers(address, size, unit)
if _r:
return [hex(x) for x in _r.registers]
return None
def write_register(self, address, value, unit=1):
_r = self.client.write_register(address, value, unit=1)
if _r:
return _r
return None
def start_motor(self, motor, mode):
print int(mode+motor, 16)
_r = self.write_register(1, int(mode+motor, 16))
print _r
def start_motor_raw(self, value):
_r = self.write_register(1, value)
print _r
def stop_motor(self, mode=STOP['Hard']):
_r = self.write_register(2, mode)
print _r
def go_forward(self, _stop='Hard'):
self.stop_motor(self.STOP[_stop])
self.start_motor_raw(self.CMDS['FWD']['RIGHT'])
self.start_motor_raw(self.CMDS['FWD']['LEFT'])
def go_back(self, _stop='Hard'):
self.stop_motor(self.STOP[_stop])
self.start_motor_raw(self.CMDS['BCK']['RIGHT'])
self.start_motor_raw(self.CMDS['BCK']['LEFT'])
def turn_right(self, _stop='Hard'):
self.stop_motor(self.STOP[_stop])
self.start_motor_raw(self.CMDS['FWD']['RIGHT'])
self.start_motor_raw(self.CMDS['BCK']['LEFT'])
def turn_left(self, _stop='Hard'):
self.stop_motor(self.STOP[_stop])
self.start_motor_raw(self.CMDS['BCK']['RIGHT'])
self.start_motor_raw(self.CMDS['FWD']['LEFT'])
def set_global_velocity(self, speed):
for motor in range(1, 5):
self.set_velocity(motor, speed)
def set_rwheel_velocity(self, speed):
self.set_velocity(1, speed)
self.set_velocity(3, speed)
def set_lwheel_velocity(self, speed):
self.set_velocity(2, speed)
self.set_velocity(4, speed)
def set_velocity(self, motor, speed):
# TODO: speed in m/s
velocity = speed
low = velocity % 65536
hi = velocity / 65536
motor -= 1
reg_base = 35 + (motor*7)
_r1 = self.write_register(reg_base+2, hi)
_r2 = self.write_register(reg_base+3, low)
print _r1, _r2
def set_acceleration(self, motor, acceleration):
# TODO: acceleration in m/s^2
low = acceleration % 65536
hi = acceleration / 65536
motor -= 1
reg_base = 35 + (motor*7)
_r1 = self.write_register(reg_base+4, hi)
_r2 = self.write_register(reg_base+5, low)
print _r1, _r2
def get_velocity(self, motor, speed):
# TODO
pass
def get_acceleration(self, motor, speed):
# TODO
#.........这里部分代码省略.........
开发者ID:karmeluk,项目名称:base_controller,代码行数:103,代码来源:controller.py
示例6: device
# 需要导入模块: from pymodbus.client.sync import ModbusSerialClient [as 别名]
# 或者: from pymodbus.client.sync.ModbusSerialClient import write_register [as 别名]
#.........这里部分代码省略.........
def _disconnect(self):
"""
close the connection to the modbus slave (server)
"""
self.slave.close()
def request_data(self):
"""
"""
if not driver_ok:
return None
if not self._connect():
if self._device_not_accessible == -1: #
logger.error("device with id: %d is not accessible" % self.device.pk)
self._device_not_accessible -= 1
return []
output = []
for register_block in self._variable_config:
result = register_block.request_data(self.slave, self._unit_id)
if result is None:
self._disconnect()
self._connect()
result = register_block.request_data(self.slave, self._unit_id)
if result is not None:
for variable_id in register_block.variables:
if self.variables[variable_id].update_value(result[variable_id], time()):
recorded_data_element = self.variables[variable_id].create_recorded_data_element()
if recorded_data_element is not None:
output.append(recorded_data_element)
if self.variables[variable_id].accessible < 1:
logger.info("variable with id: %d is now accessible" % variable_id)
self.variables[variable_id].accessible = 1
else:
for variable_id in register_block.variables:
if self.variables[variable_id].accessible == -1:
logger.error("variable with id: %d is not accessible" % variable_id)
self.variables[variable_id].update_value(None, time())
self.variables[variable_id].accessible -= 1
# reset device not accessible status
if self._device_not_accessible |