博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
An Introduction to Asynchronous Programming and Twisted (3)
阅读量:5817 次
发布时间:2019-06-18

本文共 9349 字,大约阅读时间需要 31 分钟。

Part 11: Your Poetry is Served

A Twisted Poetry Server

Now that we’ve learned so much about writing clients with Twisted, let’s turn around and re-implement our poetry server with Twisted too. And thanks to the generality of Twisted’s abstractions, it turns out we’ve already learned almost everything we need to know.

class PoetryProtocol(Protocol):     def connectionMade(self):        self.transport.write(self.factory.poem)        self.transport.loseConnection()class PoetryFactory(ServerFactory):        protocol = PoetryProtocol    def __init__(self, poem):        self.poem = poemdef main():    options, poetry_file = parse_args()    poem = open(poetry_file).read()    factory = PoetryFactory(poem)     from twisted.internet import reactor    port = reactor.listenTCP(options.port or 0, factory,                             interface=options.iface)      reactor.run()

可见server和client基本原理上是一致的, reactor loop侦听事件, 事件到达时使用protocol去处理, factory用于管理protocol, 继承自ServerFactory.

 

Part 12: A Poetry Transformation Server

这节中实现一个复杂些的server, 根据client发送不同的请求, 将poem做不同的转换并发回client, 这就需要一个协议使得client和server可以正常沟通.

Twisted includes support for several protocols we could use to solve this problem, including , and .

但是为了是我们的例子足够简单以至于容易理解, 我们使用自己的一个简单的协议,

<transform-name>.<text of the poem>

当server接收到从客户端发出的这样的request后, 根据transform-name将text of the poem进行相应的transform, 并发送回client.

class TransformProtocol(NetstringReceiver):     def stringReceived(self, request):        if '.' not in request: # bad request            self.transport.loseConnection()            return         xform_name, poem = request.split('.', 1)         self.xformRequestReceived(xform_name, poem)     def xformRequestReceived(self, xform_name, poem):        new_poem = self.factory.transform(xform_name, poem)         if new_poem is not None:            self.sendString(new_poem)         self.transport.loseConnection()        class TransformFactory(ServerFactory):     protocol = TransformProtocol     def __init__(self, service):        self.service = service     def transform(self, xform_name, poem):        thunk = getattr(self, 'xform_%s' % (xform_name,), None)         if thunk is None: # no such transform            return None         try:            return thunk(poem)        except:            return None # transform failed     def xform_cummingsify(self, poem):        return self.service.cummingsify(poem)    class TransformService(object):     def cummingsify(self, poem):        return poem.lower()def main():    service = TransformService()    factory = PoetryFactory(service)     from twisted.internet import reactor    port = reactor.listenTCP(options.port or 0, factory,                             interface=options.iface)    reactor.run()

来看看这段代码,

首先, TransformProtocol继承自NetstringReceiver而非Protocol, NetstringReceiver是一种专门用来处理string的协议, 这儿可以使用和继承Twisted开发框架提供的各种协议来简化代码, 而不用每次从头开发, 这就是使用框架的好处.

在TransformProtocol中对于poem具体的transform逻辑上, 调用self.factory.transform, 把变数扔给factory, 而保持protocol的高度抽象, transform逻辑变化,添减, 都保持protocol不需要有任何改动.

其次, 在TransformFactory中, 使用python强大的getattr来避免使用大量的if…else.

但这儿只提供了cummingsify service, 如果要增加或删除service, TransformFactory和TransformService难免需要修改...

这段代码已经写的不错...不过缺少些Twisted的感觉...如果加上deferred的callback机制, 应该可以写出更highlevel的代码.

 

Part 13: Deferred All The Way Down

Introduction

Recall poetry client 5.1 from .The client used a Deferred to manage a  that included a call to a poetry transformation engine. In , the engine was implemented as a synchronous function call implemented in the client itself.

Client5.1中异步去获取poem, 然后调用callback函数cummingsify做transform, 现在我们在Part12中实现了TransformService, 即poem transform也要用异步的方式让服务器去完成.

这其实是个比较自然的想法, 由于reactor的特性, 任何callback都必须是unblock的, 但实际上, 很多callback处理是需要花费较长的时间的, 这个时候在callback内也必须异步处理, 来保证callback本身的unblock, 即callback本身也无法直接返回结果, 而只能返回deferred对象.

如下图, 当碰到这种inner deferred时,

The outer deferred needs to wait until the inner deferred is fired. Of course, the outer deferred can’t block either, so instead the outer deferred suspends the execution of the callback chain and returns control to the reactor

And how does the outer deferred know when to resume? Simple — by adding a callback/errback pair to the inner deferred. Thus, when the inner deferred is fired the outer deferred will resume executing its chain. If the inner deferred succeeds (i.e., it calls the callback added by the outer deferred), then the outer deferred calls its N+1 callback with the result. And if the inner deferred fails (calls the errback added by the outer deferred), the outer deferred calls the N+1errback with the failure.

 

 

下面这段代码给出了怎么样封装inner deferred来提供异步callback,

class TransformClientProtocol(NetstringReceiver):     def connectionMade(self):        self.sendRequest(self.factory.xform_name, self.factory.poem)     def sendRequest(self, xform_name, poem):        self.sendString(xform_name + '.' + poem)     def stringReceived(self, s):        self.transport.loseConnection()        self.poemReceived(s)     def poemReceived(self, poem):        self.factory.handlePoem(poem)        class TransformClientFactory(ClientFactory):     protocol = TransformClientProtocol     def __init__(self, xform_name, poem):        self.xform_name = xform_name        self.poem = poem        self.deferred = defer.Deferred()     def handlePoem(self, poem):        d, self.deferred = self.deferred, None        d.callback(poem)     def clientConnectionLost(self, _, reason):        if self.deferred is not None:            d, self.deferred = self.deferred, None            d.errback(reason)     clientConnectionFailed = clientConnectionLost    class TransformProxy(object):    """    I proxy requests to a transformation service.    """     def __init__(self, host, port):        self.host = host        self.port = port     def xform(self, xform_name, poem):        factory = TransformClientFactory(xform_name, poem)        from twisted.internet import reactor        reactor.connectTCP(self.host, self.port, factory)        return factory.deferreddef cummingsify(poem):    d = proxy.xform('cummingsify', poem)     def fail(err):        print >>sys.stderr, 'Cummingsify failed!'        return poem     return d.addErrback(fail)

最后这个函数就是封装好的异步callback, 大家可以和之前part10的callback对比一下...

def cummingsify(poem):    print 'First callback, cummingsify'    poem = engine.cummingsify(poem)    return poemdef cummingsify_failed(err):    if err.check(GibberishError):        print 'Second errback, cummingsify_failed, use original poem'        return err.value.args[0] #return original poem    return err

再来看一下part10中的callback顺序图, 此时cummingsify为异步callback, cummingsify_failed被加到inner deferred中, 当这个inner deferred被fired时, outer deferred会根据inner deferred情况去调用, got_poem或poem_failed. 其中具体过程似乎是透明的...或者说我也不清楚

作者在这儿也没有讲清, 个人认为这儿如果能参照Part10给个完整的代码例子, 会更清晰一些...

Part 14: When a Deferred Isn’t

We’ll make a caching proxy server. When a client connects to the proxy, the proxy will either fetch the poem from the external server or return a cached copy of a previously retrieved poem. 

这儿可见, 如果是直接从cache返回的话可以直接同步处理, 如需要去external server获取的话就需要异步处理.

这样的有时需要同步, 有时需要异步的情况, 怎么办?

如下代码中, get_poem可能返回的是poem, 也有可能是deferred对象, 对于调用者怎么处理...

class ProxyService(object):     poem = None # the cached poem     def __init__(self, host, port):        self.host = host        self.port = port     def get_poem(self):        if self.poem is not None:            print 'Using cached poem.'            return self.poem         print 'Fetching poem from server.'        factory = PoetryClientFactory()        factory.deferred.addCallback(self.set_poem)        from twisted.internet import reactor        reactor.connectTCP(self.host, self.port, factory)        return factory.deferred     def set_poem(self, poem):        self.poem = poem        return poemclass PoetryProxyProtocol(Protocol):     def connectionMade(self):        d = maybeDeferred(self.factory.service.get_poem)        d.addCallback(self.transport.write)        d.addBoth(lambda r: self.transport.loseConnection()) class PoetryProxyFactory(ServerFactory):     protocol = PoetryProxyProtocol     def __init__(self, service):        self.service = service

使用maybeDeferred来解决这个问题, 这个函数会把poem也封装成一个already-fired deferred

    • If the function returns a deferred, maybeDeferred returns that same deferred, or
    • If the function returns a Failure, maybeDeferred returns a new deferred that has been fired (via .errback) with that Failure, or
    • If the function returns a regular value, maybeDeferred returns a deferred that has already been fired with that value as the result, or
    • If the function raises an exception, maybeDeferred returns a deferred that has already been fired (via .errback()) with that exception wrapped in a Failure.

An already-fired deferred may fire the new callback (or errback, depending on the state of the deferred) immediately, i.e., right when you add it.

或者使用succeed函数, The  function is just a handy way to make an already-fired deferred given a result.

 

def get_poem(self):    if self.poem is not None:        print 'Using cached poem.'        # return an already-fired deferred        return succeed(self.poem)     print 'Fetching poem from server.'    factory = PoetryClientFactory()    factory.deferred.addCallback(self.set_poem)    from twisted.internet import reactor    reactor.connectTCP(self.host, self.port, factory)    return factory.deferred
 
本文章摘自博客园,原文发布日期:2011-09-15

转载地址:http://gkhbx.baihongyu.com/

你可能感兴趣的文章
5 主成分分析PCA
查看>>
创新思维 透视会展未来 2018中国会展创新者大会即将在渝举行
查看>>
RabbitMQ 高可用之镜像队列
查看>>
污妖王出品,我竟然秒懂,我是不是很污?
查看>>
webmap搭建使用
查看>>
【阿里云新品发布·周刊】第9期:实时大数据开发难、运维难、应用难?来,一站解决!...
查看>>
Vue项目实战(一)——ToDoList
查看>>
线上内存泄漏引发OOM问题分析和解决
查看>>
Java B2B2C o2o多用户商城 springcloud架构 (一)服务的注册与发现(Eureka)
查看>>
(十二) 整合spring cloud云架构 - SSO单点登录之OAuth2.0 登出流程(3)
查看>>
java B2B2C Springcloud仿淘宝电子商城系统- Gateway运行时动态配置网关
查看>>
java B2B2C springmvc mybatis电子商务平台源码-------zuul网关实现
查看>>
SpringCloud之断路器监控(Hystrix Dashboard)(九)
查看>>
我的友情链接
查看>>
Linux中inittab剖析
查看>>
基于ARM的智能灯光控制系统(5)设备链表
查看>>
layDate时间控件-使用记录_已迁移
查看>>
我的友情链接
查看>>
2. PowerShell -- 脚本执行权限
查看>>
有时OPEN***提示报错,如下错误及解决方法
查看>>