Python 协程(Coroutine)

什么是协程?

knuth解释:协程就是多入多出的子程序,就是可以协同运行的程序。

通常在Python中我们进行并发编程一般都是使用多线程或者多进程来实现的。

对于计算型任务由于GIL的存在我们通常使用多进程来实现;而对于IO型任务我们可以通过线程调度来让线程在执行IO任务时让出GIL,从而实现表面上的并发。

  • Python3.4中加入了asyncio
  • Python3.5上提供了async/await语法层面的支持
  • Python3.6中asyncio已经由临时版改为了稳定版

对于IO型任务我们还有一种选择就是协程,协程是运行在单线程当中的“并发”,协程相比多线程的优势:

  • 省去了多线程之间的切换开销,获得了更大的运行效率。
  • 不需要多线程的锁机制,因为只有一个线程,也不存在同时写变量冲突,在协程中控制共享资源不加锁,只需要判断状态就好了。

两者从系统层面来说:

  • 协程是自己程序调度的,逻辑上并行执行,底层上非并行执行
  • 线程是操作系统调度的,逻辑和底层上都是并行执行

Python的协程是基于generator实现的,Python中的asyncio也是基于协程来进行实现的。

在generator中,我们不但可以通过for循环来迭代,还可以不断调用next()函数获取由yield语句返回的下一个值。但是Python的yield不但可以返回一个值,它还可以接收调用者发出的参数。

使用协程

Python中的协程大概经历了如下三个阶段:

  1. 最初的生成器变形yield/send
  2. 引入@asyncio.coroutine 和 yield from
  3. 在最近的Python3.5版本中引入async/await关键字

生产者与消费者

示例代码1

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
def consumer():
r_msg = ''
while True:
n = yield r_msg
if not n:
# print('n is None or Empty') throw StopIteration
return
print('consumer() %d' % n)
r_msg = 'ok'

# n = yield value
# 包含了3个步骤:
# 1.向函数外抛出(返回)value
# 2.暂停(pause),等待next()或send()恢复
# 3.赋值 n=MockGetValue(),这个MockGetValue()是假想函数,
# 用来接收send()发送进来的值


def produce(cmer):
cmer.send(None) # if delete, throw TypeError: can't send non-None value to a just-started generator
n = 0
while n < 5:
n += 1
print('produce() producing %d' % n)
r_msg = cmer.send(n)
print('produce() receive msg from consumer: %s' % r_msg)
cmer.close()

# 在produce(),通过cmer.send(None)或者next(cmer)启动生成器函数,
# consumer()执行到第一个yield语句结束的位置。

# 此时,n = yield r_msg 仅执行了 1.向函数外抛出(返回)value
# 2.暂停(pause),等待next()或send()恢复
# 特别注意是,n = yield r_msg 还没有执行 3.赋值 n=MockGetValue()

# 当在produce()执行cmer.send(n)时候,传入后,在consumer()中
# n = yield r_msg 开始执行 3.赋值 n=MockGetValue()



c = consumer()
produce(c)

输出结果

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
produce() producing 1
consumer() 1
produce() receive msg from consumer: ok
produce() producing 2
consumer() 2
produce() receive msg from consumer: ok
produce() producing 3
consumer() 3
produce() receive msg from consumer: ok
produce() producing 4
consumer() 4
produce() receive msg from consumer: ok
produce() producing 5
consumer() 5
produce() receive msg from consumer: ok

实际执行过程如下:

  1. 在produce(),cmer.send(None)启动生成器
  2. 在produce(),通过cmer.send(n)切换到consumer()执行
  3. 在consumer(),通过n = yield r_msg拿到消息,处理。最后再通过yield r_msg把结果传回
  4. 在produce(),r_msg = cmer.send(n)拿到consumer处理的结果,继续生产下一条消息
  5. 在produce(),cmer.close()决定停止生产

yield from

示例代码1

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
def gen_yield():
yield range(5)


def gen_yield_from():
yield from range(5)


g_yield = gen_yield()
g_yield_from = gen_yield_from()

for g in g_yield:
print(g)

print('====================')
for g in g_yield_from:
print(g)

输出结果

1
2
3
4
5
6
7
range(0, 5)
====================
0
1
2
3
4

两者的区别:yield直接将range这个generator返回,而yield from解析range,将每个item返回。

特别注意:yield from后面必须跟iterable对象(可以是生成器,迭代器)。

asyncio.coroutine

示例代码1 yield from

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
def fab(max):
n, a, b = 0, 0, 1
while n < max:
yield b
# print b
a, b = b, a + b
n = n + 1


def wrap_iterable(it):
yield from it


generator = fab(5)
for f in wrap_iterable(generator):
print(f)

输出结果

1
2
3
4
5
1
1
2
3
5

示例代码2 asyncio

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
@asyncio.coroutine
def smart_fib(n):
index = 0
a = 0
b = 1
while index < n:
sleep_secs = random.uniform(0, 0.2)
yield from asyncio.sleep(sleep_secs) # 通常yield from后都是接的耗时操作
print('Smart think {} secs to get {}'.format(sleep_secs, b))
a, b = b, a + b
index += 1

# yield from后面接的asyncio.sleep()是一个coroutine(里面也用了yield from),
# 所以线程不会等待asyncio.sleep(),而是直接中断并执行下一个消息循环。
# 当asyncio.sleep()返回时,线程就可以从yield from拿到返回值(此处是None),然后接着执行下一行语句。

# asyncio是一个基于事件循环的实现异步I/O的模块。
# 通过yield from,我们可以将协程asyncio.sleep的控制权交给事件循环,然后挂起当前协程,
# 之后由事件循环决定何时唤醒asyncio.sleep,接着向后执行代码。
# 协程之间的调度都是由事件循环决定。

# 特别注意,yield from asyncio.sleep(sleep_secs) 这里不能用time.sleep(1)因为time.sleep()返回的是None,
# 它不是iterable,否则抛出异常 TypeError: ‘NoneType’ object is not iterable


@asyncio.coroutine
def stupid_fib(n):
index = 0
a = 0
b = 1
while index < n:
sleep_secs = random.uniform(0, 0.4)
yield from asyncio.sleep(sleep_secs) # 通常yield from后都是接的耗时操作
print('Stupid think {} secs to get {}'.format(sleep_secs, b))
a, b = b, a + b
index += 1


loop = asyncio.get_event_loop()
tasks = [
smart_fib(5),
stupid_fib(5),
]
loop.run_until_complete(asyncio.wait(tasks))
print('All fib finished.')
loop.close()

输出结果

1
2
3
4
5
6
7
8
9
10
11
Stupid think 0.08516295175908538 secs to get 1
Smart think 0.1537956191924291 secs to get 1
Smart think 0.12929758554701115 secs to get 1
Stupid think 0.2892256222928606 secs to get 1
Smart think 0.17164449165156828 secs to get 2
Smart think 0.16220980840025945 secs to get 3
Stupid think 0.27210541712117337 secs to get 2
Smart think 0.1715009248020697 secs to get 5
Stupid think 0.2023469563932653 secs to get 3
Stupid think 0.3009223257835448 secs to get 5
All fib finished.

async 和 await

示例代码1

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
async def smart_fib(n):
index = 0
a = 0
b = 1
while index < n:
sleep_secs = random.uniform(0, 0.2)
await asyncio.sleep(sleep_secs)
print('Smart think {} secs to get {}'.format(sleep_secs, b))
a, b = b, a + b
index += 1


async def stupid_fib(n):
index = 0
a = 0
b = 1
while index < n:
sleep_secs = random.uniform(0, 0.2)
await asyncio.sleep(sleep_secs)
print('Stupid think {} secs to get {}'.format(sleep_secs, b))
a, b = b, a + b
index += 1


loop = asyncio.get_event_loop()
tasks = [
smart_fib(5),
stupid_fib(5),
]
loop.run_until_complete(asyncio.wait(tasks))
loop.close()

输出结果

1
2
3
4
5
6
7
8
9
10
Smart think 0.00882750921810851 secs to get 1
Smart think 0.04897605243502872 secs to get 1
Stupid think 0.14133676443250204 secs to get 1
Smart think 0.09460081340195459 secs to get 2
Smart think 0.10392360738913782 secs to get 3
Stupid think 0.15655241126836938 secs to get 1
Stupid think 0.07286491653845838 secs to get 2
Smart think 0.18776425866569166 secs to get 5
Stupid think 0.1964418153612619 secs to get 3
Stupid think 0.010708925895951162 secs to get 5