python线程,进程,队列和缓存

您所在的位置:网站首页 python3进程间通队列原理 python线程,进程,队列和缓存

python线程,进程,队列和缓存

2024-07-10 06:15| 来源: 网络整理| 查看: 265

一、线程

 threading用于提供线程相关的操作,线程是应用程序中工作的最小单元。

创建线程的两种方式1.threading.Thread import threading def f1(arg): print(arg) t = threading.Thread(target=f1,args=(123,)) #t.start代表这个线程已经准备就绪,等待cpu的调度。 t.start() 2.自定义,继承threading.Thread class MyThread(threading.Thread): def __init__(self, func,args): self.func = func self.args = args super(MyThread, self).__init__() def run(self): self.func(self.args) def f2(arg): print(arg) t1 = MyThread(f2,1234) t1.start()

二、线程锁

当多个线程同时修改同一条数据时可能会出现脏数据,所以,出现了线程锁 - 同一时刻允许一个线程执行操作。

##没使用锁 import threading import time NUM = 10 def f1(): global NUM NUM -= 1 time.sleep(2) print(NUM) for i in range(10): t = threading.Thread(target=f1) t.start() ##使用锁 def f1(l): global NUM #上锁 l.acquire() NUM -= 1 time.sleep(2) print(NUM) #开锁 l.release() #创建锁,Rlock可以加多层锁 lock = threading.RLock() for i in range(10): t = threading.Thread(target=f1,args=(lock,)) t.start()

 三、线程池

#!/usr/bin/env python #-*- coding:utf-8 -*- import queue import threading import time class ThreadPool: def __init__(self,maxsize=5): self.maxsize = maxsize self._q = queue.Queue(maxsize) for i in range(maxsize): self._q.put(threading.Thread) def get_thread(self): return self._q.get() def add_thread(self): self._q.put(threading.Thread) pool = ThreadPool(5) #把用完的线程再放回容器中 def task(arg,p): print(arg) time.sleep(1) p.add_thread() for i in range(100): #t 是threading.Thread类 t = pool.get_thread() #创建线程对象 obj = t(target=task,args=(i,pool,)) obj.start() 进程

    进程之间的数据需要各自持有一份,所以创建进程需要的非常大的开销。并且python不能再Windows下创建进程!

并且在使用多进程的时候,最好是创建和CPU核数相等的进程

默认的进程之间相互是独立,如果想让进程之间数据共享,使用如下方法。

manager.dict()        #共享数据

from multiprocessing import Process from multiprocessing import Manager def foo(i,arg): arg[i] = i + 100 print(arg.values()) if __name__ == '__main__': obj = Manager() li = obj.dict() for i in range(10): p = Process(target=foo,args=(i,li,)) p.start() p.join()

进程池

  进程池内部维护一个进程序列,当使用时,则去进程池中获取一个进程,如果进程池序列中没有可供使用的进进程,那么程序就会等待,直到进程池中有可用进程为止。

进程池中有两个方法:apply和apply_async

#!/usr/bin/env python #-*- coding:utf-8 -*- from multiprocessing import Pool import time def f1(arg): time.sleep(1) print(arg) if __name__ == '__main__': pool = Pool(5) for i in range(20): # pool.apply(func=f1,args=(i,)) #一个一个执行 pool.apply_async(func=f1,args=(i,)) #并发 # pool.close() #所有的任务执行完毕(5个一起执行) #立即终止 time.sleep(1) pool.terminate() pool.join() 协程

线程和进程的操作是由程序触发系统接口,最后的执行者是系统;协程的操作则是程序员。

协程存在的意义:对于多线程应用,CPU通过切片的方式来切换线程间的执行,线程切换时需要耗时(保存状态,下次继续)。协程,则只使用一个线程,在一个线程中规定某个代码块执行顺序。

协程的适用场景:当程序中存在大量不需要CPU的操作时(IO),适用于协程;

#!/usr/bin/env python #-*- coding:utf-8 -*- from greenlet import greenlet def test1(): print(12) gr2.switch() print(34) gr2.switch() def test2(): print(56) gr1.switch() print(78) gr1 = greenlet(test1) gr2 = greenlet(test2) gr1.switch() 遇到IO操作自动切换 #!/usr/bin/env python #-*- coding:utf-8 -*- #应用场景 #####监控url(检测) from gevent import monkey; monkey.patch_all() import gevent import requests def f(url): print('GET: %s' % url) resp = requests.get(url) data = resp.text print('%d bytes received from %s.' % (len(data), url)) gevent.joinall([ gevent.spawn(f, 'https://www.python.org/'), gevent.spawn(f, 'https://www.yahoo.com/'), gevent.spawn(f, 'https://github.com/'), ])  memcache 简述: Memcache是一套分布式的高速缓存系统,目前被许多网站使用以提升网站的访问速度,尤其对于一些大型的、需要频繁访问数据库的网站访问速度提升效果十分显著。工作原理: MemCache的工作流程如下:先检查客户端的请求数据是否在memcached中,如有,直接把请求数据返回,不再对数据库进行任何操作;如果请求的数据不在memcached中,就去查数据库,把从数据库中获取的数据返回给客户端,同时把数据缓存一份到memcached中(memcached客户端不负责,需要程序明确实现);每次更新数据库的同时更新memcached中的数据,保证一致性;当分配给memcached内存空间用完之后,会使用LRU(Least Recently Used,最近最少使用)策略加上到期失效策略,失效数据首先被替换,然后再替换掉最近未使用的数据。 Memcache是一个高性能的分布式的内存对象缓存系统,通过在内存里维护一个统一的巨大的hash表,它能够用来存储各种格式的数据,包括图像、视频、文件以及数据库检索的结果等。简单的说就是将数据调用到内存中,然后从内存中读取,从而大大提高读取速度。 Memcached是以守护程序(监听)方式运行于一个或多个服务器中,随时会接收客户端的连接和操作memcahce的安装 yum install libevent-devel -y wget http://memcached.org/latest tar -zxvf memcached-1.x.x.tar.gz cd memcached-1.x.x ./configure && make && make test && sudo make install

启动memcache

[root@linux-node1 ~]# memcached -d -m 10 -u root -l 192.168.1.11 -p 11211 -c 256 -P /tmp/memcached.pid [root@linux-node1 ~]# netstat -antlp|grep 11211 tcp 0 0 192.168.1.11:11211 0.0.0.0:* LISTEN 2251/memcached ##### 参数说明 -d 是启动一个守护进程 -m 是分配给Memcache使用的内存数量,单位是MB -u 是运行Memcache的用户 -l 是监听的服务器IP地址 -p 是设置Memcache监听的端口,最好是1024以上的端口 -c 选项是最大运行的并发连接数,默认是1024,按照你服务器的负载量来设定 -P 是设置保存Memcache的pid文件

memcache命令

存储命令: set/add/replace/append/prepend/cas 获取命令: get/gets 其他命令: delete/stats..

python操作memcache

python操作Memcached使用Python-memcached模块 下载安装:https://pypi.python.org/pypi/python-memcached ############### import memcache mc = memcache.Client(['192.168.1.11:11211'], debug=True) mc.set("foo", "bar") ret = mc.get('foo') print ret

###add#!/usr/bin/env python# -*- coding:utf-8 -*-

 import memcache   mc = memcache.Client(['192.168.1.11:11211'], debug=True)  mc.add('k1', 'v1')  # mc.add('k1', 'v2') # 报错,对已经存在的key重复添加,失败!!!

支持集群

python-memcached模块原生支持集群操作,其原理是在内存维护一个主机列表,且集群中主机的权重值和主机在列表中重复出现的次数成正比。

队列

Python队列方法

·put      放数据

·get      取数据

·qsize     返回队列的大小

·maxsize   最大支持个数

·join     等待队列为空的时候,在执行别的操作

·empty     当队列为空的时候,返回True,否则返回False

·full      当队列为满的时候,返回True,否则返回False

######先进先出 #!/usr/bin/env python #-*- coding:utf-8 -*- import queue #maxsize 最大支持多少个排队数 q = queue.Queue(2) #put放数据,默认阻塞,block是否阻塞,timeout超时时间 #empty 检查队列是否为空 print(q.empty()) q.put(123) q.put(456) q.get() #get取数据,默认阻塞,block是否阻塞,timeout超时时间 #队列中的真实个数,qsize print(q.qsize()) #join,task_done阻塞进程,当队列中任务执行完成后,不再阻塞,task_done表示任务执行完成 #######其他三种队列 #!/usr/bin/env python #-*- coding:utf-8 -*- import queue #后进先出队列 q = queue.LifoQueue() q.put(123) q.put(456) print(q.get()) #优先级队列 #当优先级相同时,按放数据顺序取数据 q1 = queue.PriorityQueue() q.put((1,"alex1")) q.put((2,"alex2")) #双向队列 q2 = queue.deque() q2.append(123) q2.append(456) q2.appendleft(333) print(q2) 生产者消费者模型

   消费者      ========》    队列(缓冲区)     =========》 生产者

#!/usr/bin/env python #-*- coding:utf-8 -*- import threading import time import random import queue #队列模块 def Producer(name,que): #生产者 while True: que.put('包子') #相当于把包子放到仓库里 print('%s:做了一个包子' %name) #打印出做了一个包子出来 time.sleep(random.randrange(5)) #厨师5秒内做出一个包子 def Consumer(name,que): #消费者 while True: try: #异常处理,如果碰到没有包子可吃就等待厨师做包子 que.get_nowait() print('%s:吃了一个包子' %name) except Exception: print(u'没有包子了') time.sleep(random.randrange(3)) #消费者3秒内吃掉一个包子 q = queue.Queue() #队列 p1 = threading.Thread(target=Producer,args=['厨师1',q]) #目标是Producer这个函数,args是传参 p2 = threading.Thread(target=Producer,args=['厨师2',q]) p1.start() p2.start() c1 = threading.Thread(target=Consumer,args=['张三',q]) c2 = threading.Thread(target=Consumer,args=['李四',q]) c1.start() c2.start()

  

  

 



【本文地址】


今日新闻


推荐新闻


CopyRight 2018-2019 办公设备维修网 版权所有 豫ICP备15022753号-3