import binascii
import struct
import cantools
from cantools.database import Message
from logutil import Logs
class DBC():
def __init__(self, dbc_path):
# 加载dbc文件
self.db =cantools.db.load_file(filename=dbc_path, database_format=None,
encoding="UTF-8",
frame_id_mask=None, strict=False, cache_dir=None)
self.__log = Logs()
def get_message_by_id(self, message_id: str) -> Message:
"""
根据报文id获取当前dbc的报文name
:param message_id: 报文id
:return: 报文
"""
message = self.db.get_message_by_frame_id(int(message_id, 16))
self.__log.debug(f"message信息:{message}")
return message
def get_message_by_name(self, message_name: str) -> Message:
"""
根据报文名称获取当前dbc的报文frame_id
:param message_name: 报文名称
:return: 报文
"""
message = self.db.get_message_by_name(message_name)
self.__log.debug(f"message信息:{message}")
return message
def get_single_value(self, select_data: dict, value_type_is_num: bool = False) -> [dict, None]:
"""
根据包含有message_id的字典,查询当前报文的信号表示的含义
:param select_data: 要查询的信息,包含message_id和data
:param value_type_is_num: 查询结果为数字还是值
:return: 报文详情
"""
# 在dbc中找到该信号供解析使用
select_message = self.db.get_message_by_frame_id(int(select_data['message_id'], 16))
message_key: bytes = b""
for i in select_data['data']:
message_key += struct.pack('B', i)
if value_type_is_num:
result = select_message.decode(message_key,
decode_choices=False, # 值为数字为 False 值为 True
scaling=True,
decode_containers=False)
else:
result = select_message.decode(message_key,
decode_choices=True, # 值为数字为 False 值为 True
scaling=True,
decode_containers=False)
self.__log.debug(f"报文{select_data['message_id']}值为:{result}")
return result
def get_single_array(self, name: str,single_data: dict = {}):
"""
根据报文名称和详情,返回字节形式的报文
:param name: 要查询的报文名称
:param single_data: 报文详情
:return: 报文详情
"""
# 查询报文对象
message = self.db.get_message_by_name(name)
# 将dbc中所有的键值对取出来
data_orig = {}
for i in range(len(message.signals)):
signal = message.signals[i]
data_orig[signal.name] = 0
# 更新该字典键值对的值
data_orig.update(single_data)
# 返回字节形式
msgdata = message.encode(data_orig, scaling=True, padding=False, strict=False)
data_final = self.single_format(str(binascii.b2a_hex(msgdata), 'utf-8'))
return data_final
def single_format(self, single: str) -> str:
"""
格式化信号
:param single: 信号
:return: 增加空格的信号
"""
single_length: int = len(single)
result: str = ""
for i in range(single_length):
if i > 0 and i%2 ==0 and i < single_length:
result += " "+ single[i]
else:
result += single[i]
return result
if __name__ == "__main__":
dbc = DBC(".\\演示用dbc.dbc")
print(dbc.get_single_array("message_name",{'sigle_name1': 1,'sigle_name2': 1}))
# 00 60 00 00 00 00 00 00
print(dbc.get_message_by_name("message_name"))
# message('message_name', 0x001, False, 8, None)
print(dbc.get_message_by_id("0x001"))
print(dbc.get_single_value({"message_id": "0x001",
"data": [00, 60, 00, 00, 00, 00, 00, 00]
}))
# {'sigle_name1': ' OFF ', 'sigle_name2': ' Invalid ', 'sigle_name3': ' OFF ', 'sigle_name4': 'Not Active'}
最近接受封装周立功接口的工作,在解析dbc中的CAN信号时,发现cantools相关的网络资源少,阅读源码的效率太低,而且只需要用到格式的转换。所以特此发稿。
|