赵走x博客
网站访问量:151887
首页
书籍
软件
工具
古诗词
搜索
登录
24、nonlocal
23、Python functools.wraps 深入理解
21、Image.rotate旋转90度图片被截取了
20、python的__get__方法看这一篇就足够了
19、结巴分词(自然语言处理之中文分词器)
18、virtualenv
17、Python string 去掉标点符号 最佳实践
16、python 字符串相似度
15、StringIO和BytesIO
14、基于Python __dict__与dir()的区别详解
13、 set()去重的底层原理
12、顺序表的原理与python中的list类型
11、psutil实现系统监控
10、NSQ
9、utf-8的中文是一个汉字占三个字节长度吗?
8、supervisor+gunicorn部署python web项目
7、socket编程
6、async
5、Python的共享经济
4、Python 内存分配时的小秘密
3、Python中的“特权种族”是什么?
2、装饰器
1、序列化与反序列化
6、async
资源编号:76035
Python
Python 查缺补漏
热度:91
文章开源:https://www.jianshu.com/p/7690edfe9ba5
之前使用Python的人往往纠缠在多线程,多进程,评判哪个效率更高? 其实,相对于别家的协程和异步,不管多线程还是多进程效率都要被吊打,多线程之间切换耗费cpu寄存器资源,OS 调度的不太可控,多进程间通信不便的问题。 后来Python改进了语法,引入了yiled from充当协程调度,后来有人根据这个新特性开发了第三方协程框架,Tornado,Gevent等。 在这场效率之争里,Python这么受欢迎的语言,官方怎么能默不出声?所以Python之父深入简出3年,苦心钻研自家的协程,async/await和asyncio库,并放到Python3.5后成为远程原生的协程, 对于类似爬虫这种延时的IO操作,协程是个大利器,优点很多,他可以在一个阻塞发生时,挂起当前程序,跑去执行其他程序,把事件注册到循环中,实现多程序并发,据说超越了10k限制,不过我没有试验过极限。 现在讲一讲协程的简单的用法,当你爬一个网站,有100个网页,正常是请求一次,回来一次 ``` for url in urls: response=get(url) results=parse(response) ``` 这样效率很低,但协程可以一次发起100个请求(其实也是一个一个发),不同的是协程不会死等返回,而是发一个请求,挂起,再发一个再挂起,发起100个,挂起100个,然后同时等待100个返回,效率提升了100倍。可以理解为同时做100件事,相对于多线程,做到了由自己调度而不是交给CPU,程序流程可控,节约资源,效率极大提升。 ``` async def get(url):#定义协程抓取函数,这里用了aiohttp库 async with aiohttp.ClientSession() as session:#协程上下文 async with session.get(url) as response: return await response.text() #await 是挂起命令,挂起当前,执行response.text(),response.text()执行完成后重新激活当前函数继续运行,返回。 ``` 如果response.text迟迟不回,程序不会死等,而是去你定义的任务循环中寻找另一个任务(如果有的话),如果没有循环任务,那就死等咯。。。毕竟总要有等返回结果的。 所以实现协程就是要实现多个任务的循环。用一张简单的图表示。  也就是说任务一直跑,每到一个地方awit一次,然后await返回,直到最终全部返回,主程序结束。 # 1、调用协程 协程不能直接运行,需要把协程加入到事件循环(loop)。asyncio.get_event_loop方法可以创建一个事件循环,然后使用run_until_complete将协程注册到事件循环,并启动事件循环。 ``` import datetime import asyncio now = lambda: datetime.datetime.now() async def do_some_work(x): print('Waiting', x) start = now() print('start:', start) coroutine = do_some_work(2) loop = asyncio.get_event_loop() loop.run_until_complete(coroutine) print('end:', now()) ``` 结果 ``` start: 2019-11-04 18:16:33.202797 Waiting 2 end: 2019-11-04 18:16:33.203145 ``` # 2、关于task 协程对象不能直接运行,在注册事件循环的时候,其实是run_until_complete方法将协程包装成为了一个任务(task)对象。所谓task对象是Future类的子类。保存了协程运行后的状态,用于未来获取协程的结果。 ``` import datetime import asyncio now = lambda: datetime.datetime.now() async def do_some_work(x): print('Waiting', x) start = now() print('start:', start) coroutine = do_some_work(2) loop = asyncio.get_event_loop() # loop.run_until_complete(coroutine) task=loop.create_task(coroutine) print('task:',task) loop.run_until_complete(task) print('task:',task) print('end:', now()) ``` 输出结果为: ``` start: 2019-11-04 18:19:39.801034 task:
> Waiting 2 task:
result=None> end: 2019-11-04 18:19:39.801846 ``` 创建task后,task在加入事件循环之前是pending状态,当loop事件开始循环,所有pending状态的task都开始执行到await那一步(函数体内并非如此),不管loop里是否调用 asyncio.ensure_future(coroutine) 和 loop.create_task(coroutine)都可以创建一个task,run_until_complete的参数是一个futrue对象。当传入一个协程,其内部会自动封装成task,task是Future的子类。isinstance(task, asyncio.Future)将会输出True。 就是这么个道理,协成里面也可以放yield 表达式,变成生成器式协程,可以使用 async for x in 生成式协程。比如 ``` async def mark(): print('mark') await asyncio.sleep(4) face=yield 666 print(face) ``` 要想取到mark函数弹出的666,需要用 ``` async def do(): async for i in mark(): print(i) if __name__ == '__main__': asyncio.run(do()) ``` 再举一个例子,典型的消费,生产者模型,说是买土豆,实际是拿字典,来源知乎,我做了修改。 ``` from time import sleep from random import random,randint import asyncio all_potatos={x:randint(1,50) for x in 'abcd'} async def take_potatos(num): count=0 while True: if len(all_potatos)==0: await ask_for_potato() else: potato=all_potatos.popitem() yield potato count+=1 if count==num: break async def buy_potatos(): bucket=[] async for p in take_potatos(20): bucket.append(p) print(bucket) async def ask_for_potato(): await asyncio.sleep(3) all_potatos.update({x:randint(1,20)for x in '临兵斗者皆阵列在前'}) def main(): loop=asyncio.get_event_loop() loop.run_until_complete(asyncio.wait([buy_potatos()])) loop.close() if __name__ == '__main__': main() ``` 在实际生产环境中,最常见的依然是生产-消费模型,用队列串联,在任务开始就初始化多个worker,用join方法挂起任务,让worker无限循环,get队列里的值。直到队列为空。以下这个例子。 ``` import time import asyncio from asyncio import Queue def now(): return time.time() async def worker(q):#工作者消费队列 print('Start worker') while 1:#无限循环 start = now() task = await q.get()#开始消费 if not task: await asyncio.sleep(1) continue print('working on ', int(task)) await asyncio.sleep(int(task)) q.task_done()#队列通知 print('Job Done for ', task, now() - start) async def generate_run(q):#生成worker线程函数 asyncio.ensure_future(worker(q)) asyncio.ensure_future(worker(q))#先弄了两个worker去跑 await q.join()主线程挂起等待队列完成通知 jobs = asyncio.Task.all_tasks()完成后收集所有线程,这里是3个,算上自己 print('是否已经关闭任务', asyncio.gather(*jobs).cancel())#关闭线程方法,返回True def main(): loop = asyncio.get_event_loop() q = Queue() for i in range(3): q.put_nowait(str(i))#一定要放入字符,数字0是空,队列一直不会结束。 loop.run_until_complete(generate_run(q))#启动生成函数 loop.close() if __name__ == '__main__': main() ```