python实现Pyqt5信号槽在多线程中的应用(类相互调用) tcy |
您所在的位置:网站首页 › pyqt5多线程处理数据 › python实现Pyqt5信号槽在多线程中的应用(类相互调用) tcy |
本篇先实现pyqt下信号的一般应用;pyqt下多线程间的信号处理,实现在两个类间相互接收信号。随后用python实现类似pyqt的信号槽函数。你可以发送任意类型的数据(信号);MySignal唯一和Pyqt5的区别是建立信号时不用指定类型,更加灵活方便。解除信号由于应用不多,故未予实现。最后是在两个线程间信号传递的实例。设计到类相互调用:原理是一个类中定义实例,另外一个类中传递类实例对象。 1.Pyqt5信号槽的标准用法 #!/usr/bin/env python3 # -*- coding: utf-8-*- from PyQt5.QtCore import pyqtSignal, QObject class QTypeSignal(QObject): x_single = pyqtSignal(object) #定义一个信号槽,传入一个参数位参数 def __init__(self): QObject.__init__(self) #用super初始化会出错 def run(self): self.x_single.emit(['Tom',22])#发信号 class QTypeSlot(object): def get(self, msg): #槽对象里的槽函数 print ('msg=', msg) if __name__ == "__main__": send = QTypeSignal() slot = QTypeSlot() send.x_single.connect(slot.get) #链接信号槽 send.run() """ 输出: msg= ['Tom', 22] """2.pyqt信号在多线程中的应用 #!/usr/bin/env python3 # -*- coding: utf-8-*- from PyQt5.QtCore import pyqtSignal, QObject,QThread,QCoreApplication from datetime import datetime import sys class A(QThread): a_signal = pyqtSignal(object) def __init__(self): super().__init__() self.b=B() self.b.b_signal.connect(self.get) def get(self,msg): print('1.A get B msg=',msg) def run(self): self.b.start() while True: self.a_signal.emit({'A_Time':datetime.now().strftime('%Y-%m-%d %H:%S:%M')}) self.sleep(1.5) class B(QThread): b_signal = pyqtSignal(object) def __init__(self,a_instance=None): super().__init__() self.a=a_instance if self.a: self.a.a_signal.connect(self.get) def get(self,msg): print('2.B get A msg=',msg) def run(self): if self.a:self.a.start() while True: self.b_signal.emit({'B_Time':datetime.now().strftime('%Y-%m-%d %H:%S:%M')}) self.sleep(1.3) if __name__=="__main__": app = QCoreApplication(sys.argv) a=A() b=B(a) a.start() b.start() sys.exit(app.exec()) """ 结果: 2.B get A msg= {'A_Time': '2020-02-27 22:16:10'} 1.A get B msg= {'B_Time': '2020-02-27 22:16:10'} 2.B get A msg= {'A_Time': '2020-02-27 22:17:10'} 1.A get B msg= {'B_Time': '2020-02-27 22:17:10'} 2.B get A msg= {'A_Time': '2020-02-27 22:18:10'} ... """3.python自定义信号槽-模拟pyqt的信号槽 # from multiprocessing import Queue from threading import Lock import queue,time,threading,typing from datetime import datetime # a=queue.Queue() # b=Queue() class MySignal(object): def __init__(self): self.collection = dict() self._d={str(id(self)):None} self.lock = Lock() def connect(self, fun): if not isinstance(fun,typing.Callable): raise TypeError('Parameter "fun" not callable.') key=str(id(self)) self._d[key]=fun def emit(self, *args, **kwargs): self.lock.acquire() key=str(id(self)) fun=self._d[key] if fun: fun(*args,**kwargs) self.lock.release() class A(threading.Thread): a_signal = MySignal() def __init__(self): super(A, self).__init__() self.b=B(None) self.b.b_signal.connect(self.get) def run(self): while True: self.a_signal.emit('a_signal time=%s'%datetime.now().strftime('%Y-%m-%d %H:%S:%M')) time.sleep(1) def get(self,msg): print('1.A get B msg=',msg) class B(threading.Thread): b_signal = MySignal() def __init__(self,A_Instance): super().__init__() self.a=A_Instance if self.a: self.a.a_signal.connect(self.get) def get(self, msg):#槽对象里的槽函数 print( '2.B get A msg=', msg) def run(self): while True: self.b_signal.emit('b_signal time=%s'%datetime.now().strftime('%Y-%m-%d %H:%S:%M')) time.sleep(1) if __name__ == "__main__": a = A() b = B(a) a.start() b.start() """ 结果: 2.B get A msg= a_signal time=2020-02-27 22:01:39 1.A get B msg= b_signal time=2020-02-27 22:01:39 2.B get A msg= a_signal time=2020-02-27 22:02:39 1.A get B msg= b_signal time=2020-02-27 22:02:39 ... 2020/3/7 23:19 修改不能同时绑定多个槽 #!/usr/bin/env python3 # -*- coding: utf-8-*- #write by tcy shangshai songjiang 2020/2/24--2020/3/1 #Ver:0.11 import typing,pymysql,threading from DBUtils.PooledDB import PooledDB from database.Config import Config Config.pool_config['creator']=pymysql Config.pool_config['database']='new_futures' class Signal(object): def __init__(self): """ threading.Thread.Lock """ self._d={str(id(self)):set()} self.lock = threading.Lock() def connect(self, fun): if not isinstance(fun,typing.Callable): raise TypeError('Parameter "fun" not callable.') key=str(id(self)) if key in self._d.keys(): self._d[key].add(fun) else: self._d[key]=set(fun) def emit(self, *args, **kwargs): self.lock.acquire() key=str(id(self)) # if key not in self._d.keys():return for fun in self._d[key]: if fun: fun(*args,**kwargs) self.lock.release() if __name__ == "__main__": from threading import Lock import time,threading from datetime import datetime class A(threading.Thread): a_signal = Signal() con_signal=Signal() def __init__(self): super(A, self).__init__() self.b=B(None)#or B(None) self.b.b_signal.connect(self.get) self.a_signal.connect(self.get_self) def run(self): while True: self.a_signal.emit('a_signal time=%s'%datetime.now().strftime('%Y-%m-%d %H:%S:%M')) time.sleep(1) def get(self,msg): print('1.A get B msg=',msg) def get_self(self,msg): print('1.A get A msg=',msg) class B(threading.Thread): b_signal = Signal() def __init__(self,A_Instance): super().__init__() self.a=A_Instance if self.a: self.a.a_signal.connect(self.get) self.a.con_signal.connect(self.getcon) def get(self, msg):#槽对象里的槽函数 print( '2.B get A msg=', msg) def run(self): while True: self.b_signal.emit('b_signal time=%s'%datetime.now().strftime('%Y-%m-%d %H:%S:%M')) time.sleep(1) def getcon(self,msg): con=msg cursor = con.cursor(pymysql.cursors.DictCursor) n=cursor.execute('show databases') result=cursor.fetchall() print(result) cursor.close() con.close() class C(threading.Thread): c_signal = Signal() def __init__(self): super(C, self).__init__() self.c_signal.connect(self.get) def run(self): while True: self.c_signal.emit('c_signal time=%s'%datetime.now().strftime('%Y-%m-%d %H:%S:%M')) time.sleep(1) def get(self,msg): print('3.C get C msg=',msg) a = A() b = B(a) a.start() b.start() c=C() c.start()参考: https://zhuanlan.zhihu.com/p/26569895 |
今日新闻 |
推荐新闻 |
CopyRight 2018-2019 办公设备维修网 版权所有 豫ICP备15022753号-3 |