import collections
import select
import types
class Task(object):
"""
表示运行任务的对象
"""
def __init(self, target):
self.target = target
self.sendval = None
self.stack = []
def run(self):
try:
result = self.target.send(self.sendval)
if isinstance(result, SystemCall):
return result
if isinstance(result, types.GeneratorType):
self.stack.append(self.target)
self.sendval = None
self.target = result
else:
if not self.stack:
return
self.sendval = result
self.target = self.stack.pop()
except StopIteration:
if not self.stack:
raise
self.sendval = None
self.target = self.stack.pop()
class SystemCall(object):
"""
表示“系统调用”的对象
"""
def handle(self, sched, task):
pass
class ReadWait(SystemCall):
def __init__(self, f):
self.f = f
def handle(self, sched, task):
fileno = self.f.fileno()
sched.readwait(task, fileno)
class WriteWait(SystemCall):
def __init__(self, f):
self.f = f
def handle(self, sched, task):
fileno = self.f.fileno()
sched.writewait(task, fileno)
class NewTask(SystemCall):
def __init__(self, target):
self.target = target
def handle(self, sched, target):
sched.new(self.target)
sched.schedule(task)
class Scheduler(object):
"""
调度程序对象
"""
def __init__(self):
self.task_queue = collections.deque()
self.read_waiting = {}
self.write_waiting = {}
self.numtasks = 0
def schedule(self, task):
"""
将任务放入任务调度队列
"""
self.task_queue.append(task)
def new(self, target):
"""
通过协程新建任务
"""
newtask = Task(target)
self.schedule(newtask)
self.numtasks += 1
def readwait(self, task, fd):
"""
让任务等待文件描述符上的数据
"""
self.read_waiting[fd] = task
def writewait(self, task, fd):
"""
让任务等待写入文件描述符
"""
self.write_waiting[fd] = task
def mainloop(self, count=-1, timeout=None):
"""
调度程序主循环
"""
while self.numtasks:
if self.read_waiting or self.write_waiting:
wait = 0 if self.task_queue else timeout
r, w, e = select.select(self.read_waiting, self.write_waiting, [],
wait)
for fileno in r:
self.schedule(self.read_waiting.pop(fileno))
for fileno in w:
self.schedule(self.write_waiting.pop(fileno))
while self.task_queue:
task = self.task_queue.popleft()
try:
result = task.run()
if isinstance(result, SystemCall):
result.handle(self, task)
else:
self.schedule(task)
except StopIteration:
self.numtasks -= 1
if count > 0:
count -= 1
if count == 0:
return