目录:


1 协程

协程 是 Python 的语言特性中一个很重要的部分,学习要抓住重点,因此,很有必要了解一下。

了解协程之前,先熟悉一下生成器的概念。


yield 语句

使用 yield 语句,可以定义生成器对象,让函数生成一个结果序列,以便在迭代中使用。

1
2
3
4
def countdown(n):
while n > 0:
yield n
n -= 1

生成器

任何使用 yield 的关键字函数都称为 生成器 ,调用生成器将创建一个对象,并通过调用 next() 方法生成序列。

1
2
3
4
5
>>> c = countdoen(5)
>>> c.next()
>>> 5
>>> c.next
>>> 4

next() 调用生成器函数一直运行到下一条 yield 语句为止,此时 next() 将返回值传递给 yield ,函数终止执行。

通常使用 for 循环的方式生成值:

1
2
3
4
5
>>> for i in countdown(5):
print i
5 4 3 2 1
>>>

异常:

  • StopIteration 异常
  • GeneratorExit 异常

决不能通过单独的线程或信号处理程序在该生成器上异步地调用 close() 方法。

通过调用 close() 方法关闭生成器对象。

1
>>> c.close()

协程

通常,函数运行时要输入一组参数,但是也可以把函数编写为一个任务,从而能处理发送给它的一系列输入,这类函数称为 协程 。可使用 (yield) 的形式创建协程。如下所示:

1
2
3
4
5
def print_matches(matchtext):
while True:
line = (yield)
if matchtext in line:
print line

调用:

1
2
3
4
5
6
>>> matcher = print_matches("python")
>>> matcher.next()
>>> matcher.send("Hello World")
>>> matcher.send("Python is cool")
python is cool
>>> matcher.close()

其中 matcher.next() 用来执行到 (yield) 之前,协程终止。使用 matcher.send() 为协程发送值,(yield) 表达式返回这个值,并执行下面的语句,直至再次遇到 (yield) 语句。

协程的运行一般是无期限的,除非它被显式的关闭或自己退出。使用 close() 方法可以关闭输入值的流。

协程可用于实现某种形式的并发,例如,安排一个集中式的任务管理器或时间循环,将数据发送到成百上千个用于执行任务的协程中。协程经常可以和使用消息队列的程序一起使用。


基于任务调度程序

select 模块可以用来实现基于小任务(greenlet)或协程的服务器,下面是一个使用协程实现的基于 I/O 的任务调度程序,其中借助了 select 模块进行循环获取 I/O。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
import collections
import select
import types
class Task(object):
"""
表示运行任务的对象
"""
def __init(self, target):
self.target = target
self.sendval = None
self.stack = []
def run(self):
try:
result = self.target.send(self.sendval)
if isinstance(result, SystemCall):
return result
if isinstance(result, types.GeneratorType):
self.stack.append(self.target)
self.sendval = None
self.target = result
else:
if not self.stack:
return
self.sendval = result
self.target = self.stack.pop()
except StopIteration:
if not self.stack:
raise
self.sendval = None
self.target = self.stack.pop()
class SystemCall(object):
"""
表示“系统调用”的对象
"""
def handle(self, sched, task):
pass
class ReadWait(SystemCall):
def __init__(self, f):
self.f = f
def handle(self, sched, task):
fileno = self.f.fileno()
sched.readwait(task, fileno)
class WriteWait(SystemCall):
def __init__(self, f):
self.f = f
def handle(self, sched, task):
fileno = self.f.fileno()
sched.writewait(task, fileno)
class NewTask(SystemCall):
def __init__(self, target):
self.target = target
def handle(self, sched, target):
sched.new(self.target)
sched.schedule(task)
class Scheduler(object):
"""
调度程序对象
"""
def __init__(self):
self.task_queue = collections.deque()
self.read_waiting = {}
self.write_waiting = {}
self.numtasks = 0
def schedule(self, task):
"""
将任务放入任务调度队列
"""
self.task_queue.append(task)
def new(self, target):
"""
通过协程新建任务
"""
newtask = Task(target)
self.schedule(newtask)
self.numtasks += 1
def readwait(self, task, fd):
"""
让任务等待文件描述符上的数据
"""
self.read_waiting[fd] = task
def writewait(self, task, fd):
"""
让任务等待写入文件描述符
"""
self.write_waiting[fd] = task
def mainloop(self, count=-1, timeout=None):
"""
调度程序主循环
"""
while self.numtasks:
if self.read_waiting or self.write_waiting:
wait = 0 if self.task_queue else timeout
r, w, e = select.select(self.read_waiting, self.write_waiting, [],
wait)
for fileno in r:
self.schedule(self.read_waiting.pop(fileno))
for fileno in w:
self.schedule(self.write_waiting.pop(fileno))
while self.task_queue:
task = self.task_queue.popleft()
try:
result = task.run()
if isinstance(result, SystemCall):
result.handle(self, task)
else:
self.schedule(task)
except StopIteration:
self.numtasks -= 1
if count > 0:
count -= 1
if count == 0:
return

使用这种 I/O 任务调度程序实现的网络时间服务器示例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
from socket import socket, AF_INET, SOCK_STREAM
def time_server(address):
import time
s = socket(AF_INET, SOCK_STREAM)
s.bind(address)
s.listen(5)
while True:
yield ReadWait(s)
conn, addr = s.accept()
print "Connection from %s" % str(addr)
yield WriteWait(conn)
resp = time.ctime() + "\r\n"
conn.send(resp.encode('latin-1'))
conn.close()
sched = Scheduler()
sched.new(time_server(('', 10000))) # 10000端口上的服务器
sched.new(time_server(('', 11000))) # 11000端口上的服务器
sched.run()