2013年Tornado3.0版本。gen.coroutine上线,直到今天它依旧是目前Tornado中使用最为广泛的装饰器。同时它也是接替gen.engine的存在。它基本去掉了gen.Task的套路。相对gen.engine而言。它只需要gen.coroutine就够了,写法上更为美观
 
目标 需要实现的目标是这样的
1 2 3 4 5 6 7 8 class  GenAsyncHandler (RequestHandler ):    @asynchronous     @gen.coroutine     def  get (self ):         http_client = AsyncHTTPClient()         response = yield  http_client.fetch("http://example.com" )         do_something_with_response(response)         self.render("template.html" ) 
 
在这里,它并不是gen.engine时代的作风。在上一篇中可以看到,添加gen.engine的时候没有改动过任何已有代码。可是在gen.coroutine时代。它去掉了gen.Task。于此同时,改动的是http_client.fetch。
Future Task这个名词不够炫酷,然后它新增了一个类叫做Future。未来这个词看起来就很有科技感。是的,它表述的是,在未来某一刻,这个对象会得到结果,当然这背后的一切仍然是换汤不换药由回调操作来完成。看一下Future都有啥
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 class  Future :    def  __init__ (self ):         self._val = None          self._callback = set ()     def  set_result (self, value ):         self._val = value         for  callback in  self._callback:             callback(self)     def  result (self ):         return  self._val     def  add_done_callback (self, callback ):         self._callback.add(callback) 
 
一个Future类可以如此的简单。它只是提供了一个_val的存储场所,另外当使用set_result设置值的时候会顺便将加入到该对象的回调进行调用。提供了添加回调的函数add_done_callback
回忆gen.Task,它做的事情是对包含callback参数的函数A进行包装,当函数A的callback被调用的时候,实际上调用的是Runner.result_callback。这种做法的缺点呢就是在RequestHandler.get里面需要出现gen.Task。现在我们不希望它出现。我们完全可以改变这一逻辑。因为http_client.fetch肯定是会调用callback的。可以想象它最终的语句是callback(result)。既然框架希望让用户不必输入gen.Task,那么可以选择在http_client.fetch中加入Future,设置它的callback为Future.set_result  。随便举例如下。假设有一个函数是add_one,原本逻辑如下
1 2 3 def  add_one (ret, callback ):    ret += 1      IOLoop.instance().add_callback(lambda : callback(ret)) 
 
改写成这个样子
1 2 3 4 5 6 7 def  add_one (ret ):    future = Future()     ret += 1      def  callback ():         return  future.set_result(ret)     IOLoop.instance().add_callback(callback)     return  future 
 
虽然这个地方最终被future.set_result执行。可是Future提供了add_done_callback接口,意味着允许我们自己的callback能够在add_one执行完毕后被future.set_result所触发  ,同时注意add_one返回的是一个future对象
coroutine 假如我们类比gen.engine去实现它
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 class  Runner ():    def  __init__ (self, g ):         self.g = g                  self.yielded = self.g.send(None )         self.yielded.add_done_callback(self.future_callback)     def  future_callback (self, future ):         return  self.result_callback(future.result())     def  result_callback (self, value ):         self.result = value         self.run()     def  run (self ):         try :             self.yielded = self.g.send(self.yielded.result())             self.yielded.add_done_callback(self.future_callback)         except  StopIteration:             return  def  coroutine (func ):    @wraps(func )     def  inner (*args, **kwargs ):         result = func(*args, **kwargs)         return  Runner(result)     return  inner 
 
是不是几乎一模一样,还可以正常运行。但是这里有一个极大的缺陷。它无法被嵌套使用,什么意思呢
1 2 3 4 5 6 7 8 from  tornado.gen import  coroutine@coroutine def  a ():    yield  b() @coroutine def  b ():    return  1  
 
同时在gen.engine时代, 这种调用方式也是不被允许的,可是gen.coroutine实现了这种方式的调用。主要原理是我们规定,所有yield右边的值全部是Future对象,对一个Future对象处理完毕后再处理下一个。那么就要修改coroutine了
1 2 3 4 5 6 7 8 def  coroutine (func ):    def  inner ():         f = Future()         def  final_callback (value ):             f.set_result(value)         Runner(func, final_callback)         return  f     return  inner 
 
对于每一个子coroutine,它们都会生成一个Runner对象(此时Runner已经将生成器进行初始化,执行send(None),并add_done_callback),只是它们返回的并不是Runner,而是Future.在Future被执行set_result操作的时候子coroutine的yield往下走。直到遇到StopIteration异常,此时final_callback函数被调用,它被父coroutine所接受,触发父coroutine的yield往下走。。。。堪称完美。。。。
就这样,它实现了和yield from差不多的逻辑。父生成器调用子生成器,简直六的飞起不服不行
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 class  Runner ():    def  __init__ (self, gen, final_callback ):         self.final_callback = final_callback         self.gen = gen()         self.yielded = self.gen.send(None )                  self.yielded.add_done_callback(self.future_callback)     def  future_callback (self, future ):         self.run()     def  run (self ):         try :             self.yielded = self.gen.send(self.yielded.result())         except  StopIteration as  e:             self.final_callback(e.value)             return          else :             self.yielded.add_done_callback(self.future_callback) 
 
说实话,还是感觉有点绕,希望你能够看明白
改进 和上面的简易写法不同。Tornado源码的实现要复杂一些,因为它要考虑更复杂的需求
兼容性,它不能因为有了Future对象后就完全不顾以前gen.Task的写法 
它同样需要实现一次yield多个Future的需求 
 
所以,它另外生成了一个和gen.Task类似的对象YieldFuture。和gen.Task拥有的成员对象一样start、is_ready、get_result。同时由于Future在Tornado中应用是如此的普遍。IOLoop新增了一个方法add_future函数(还有一方面就是前面提到的callback异常问题)
1 2 3 4 5 def  add_future (self, future, callback ):    assert  isinstance (future, Future)     callback = stack_context.wrap(callback)     future.add_done_callback(         lambda  future: self.add_callback(callback, future)) 
 
在使用的时候
1 2 3 4 self.runner = runner self.key = object() runner.register_callback(self.key) self.io_loop.add_future(self.future, runner.result_callback(self.key)) 
 
实际上如果不考虑异常情况,和它是等价的
1 Future.add_done_callback(self.future_callback) 
 
和gen.engine一样。coroutine也会遇到这种问题
1 2 3 4 5 6 7 8 9 @gen.coroutine def  get (self ):    http_client = AsyncHTTPClient()     response1, response2 = yield  [http_client.fetch(url1),                                   http_client.fetch(url2)]     response_dict = yield  dict (response3=http_client.fetch(url3),                                response4=http_client.fetch(url4))     response3 = response_dict['response3' ]     response4 = response_dict['response4' ] 
 
一次yield 多个Future对象。那么解决办法也是和engine差不多的。当发现send(value)返回值是一个list或者dict对象时。它会使用Multi进行封装。在被回调的时候检查Multi对象的is_ready状态。仅仅当都得到结果才算完成
另外在Tornado3.1版本HandlerRequest._execute进行改动。被coroutine装饰的函数不需要再被asynchronous所装饰。至此这个从1.0.0跨越到3.1.0版本的装饰器的生命走到了尽头。然而依旧很多人不管不顾硬是要加上这个装饰器才安心
应用一 可以说自此之后Tornado只存在Future配合yield、coroutine的操作。你从头新建一个Tornado的异步库。用户调用最后必定是返回给用户Future对象。这里我印象比较深的就是用Future实现了celery结果的异步获取。说是异步。实质上就是轮询。代码在这里 
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 from  tornado.concurrent import  TracebackFuturefrom  tornado.ioloop import  IOLoopdef  async (task, *args, **kwargs ):    future = TracebackFuture()     callback = kwargs.pop("callback" , None )     if  callback:         IOLoop.instance().add_future(future,                                      lambda  future: callback(future.result()))     result = task.delay(*args, **kwargs)     IOLoop.instance().add_callback(_on_result, result, future)     return  future def  _on_result (result, future ):         if  result.ready():         future.set_result(result.result)     else :         IOLoop.instance().add_callback(_on_result, result, future) 
 
celery执行task.delay立刻返回了result对象。然后将resule和future加入回调,查询resule.ready确定任务是否完成。一旦完成则调用future.set_resule。Future对象设置值之后yield就会继续往下走。完美~~~,否则它会再次循环,直到得到结果为止。可以看到它这种方式还是相当的粗暴的,因为一旦没有结果就会不停的循环。可是这种方式胜在代码简单
应用二 再来一个简单的例子,gen.sleep() 比较没有想到,它会是4.1.0版本才加入的,实现如下
1 2 3 4 def  sleep (duration ):    f = Future()     IOLoop.current().call_later(duration, lambda : f.set_result(None ))     return  f 
 
(⊙v⊙)嗯,意不意外。这应该是我在Tornado里面发现最简单的代码了,创建一个Future对象,然后在IOLoop的_timeout列表中加入一个到期执行回调,设置Future的值。至此yield继续往下走~~
简易代码 可以用下面这段简易代码观察一下coroutine的逻辑
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 import  tornado.ioloopfrom  functools import  partialclass  Future :    def  __init__ (self ):         self._val = None          self._callback = set ()     def  set_result (self, value ):         self._val = value         for  callback in  self._callback:             callback(self)     def  result (self ):         return  self._val     def  add_done_callback (self, callback ):         self._callback.add(callback) class  Runner ():    def  __init__ (self, gen, final_callback ):         self.final_callback = final_callback         self.gen = gen()         self.yielded = self.gen.send(None )                  self.yielded.add_done_callback(self.future_callback)     def  future_callback (self, future ):         self.run()     def  run (self ):         try :             self.yielded = self.gen.send(self.yielded.result())         except  StopIteration as  e:             self.final_callback(e.value)             return          else :             self.yielded.add_done_callback(self.future_callback) def  coroutine (func ):    def  inner ():         f = Future()         def  final_callback (value ):             f.set_result(value)         Runner(func, final_callback)         return  f     return  inner @coroutine def  haha ():    c = yield  haha1()     r = yield  add_one(1 )     b = yield  add_one(2 )     print(r, b, c) @coroutine def  haha1 ():    r = yield  add_one(1 )     b = yield  add_one(2 )     return  r + b def  add_one (ret ):    f = Future()     def  callback (future, result ):         future.set_result(result)     tornado.ioloop.IOLoop.instance().add_callback(partial(callback, f, ret + 1 ))     return  f haha() tornado.ioloop.IOLoop().instance().start()