asyncore为select.select、poll的封装(实际上现在大家都使用更为高效的epoll),变成了框架的使用模式,该库已经作为兼容模式存在,新的库为asyncio。且在2和3中asyncore代码有一点点差异。(源码版本2.7.11)
dispatcher类(调度) 文件asyncore.py中最重要的类为dispatcher,使用的时候只需要继承asyncore.dispatcher就好了
可以这样理解每一个继承了asyncore.dispatcher的类都代表了一个socket(监听socket或者连接socket)
在asyncore.py的最开始有一个全局字典socket_map对应fd和一个asyncore.dispatcher类,当你继承一个dispatcher类的时候总会调用函数在全局字典socket_map中创建一个映射,当然在关闭的时候也会从全局字典中删除
最后是一个loop死循环,可以很容易想到是使用了系统IO多路复用接口select.select对全局socket_map进行了操作。根据返回的事件进行操作(3个事件读、写、错误)
源码中我们着重看dispatcher类,它可以划分为以下几个要点
添加了4个状态,connected/accepting/connecting/closing,想想为什么要设置这几个状态呢??? connected:已经连接上对方,比如监听套接字sock,addr = socket.accept()返回的sock
add_channel/del_channel/create_socket/set_socket这几个都是针对全局字典socket_map的操作。添加进去或者删除
readable/writable 用的地方不是太多,预先确定该socket该不该添加到socket的可读可写里面
listen/bind/connect/accept/send/recv/close 这几个函数嘛,对原有的行为稍微改动了下,比如listen设置为accepting状态,connect设置为connecting状态,send,recv一定条件出发close,close即为从全局字典中删除并关闭socket
handle_read_event/handle_connect_event/handle_write_event.当select有事件返回的时候就是调用的这3个方法。只是需要注意。这里并不是最终执行的操作(send、recv等)!!!这里也体现了标注socket状态的作用,read可以分为监听(accepting)、连接完成后(connecting)→→→这里就是一个hook,如果自己调用connect连接那么完成后是此状态,可以自己重写handle_connect处理该事件,如果是直接初始化的socket则直接为connected状态。handle_write_event也很明显,当为connecting状态的时候处理hook,否则处理写入事件
最后这一部分基本就是使用者需要重写的部分了。handle_expt/handle_read/handle_write/handle_connect/handle_accept/handle_close含义很明白,就不表述了
在后面就是dispatcher的继承类dispatcher_with_send和file_dispatcher,前者嘛,试想一下你要发送非常多内容,肯定调用一次send发不完。它就给你解决了这个问题,当然方法也很简单。后面就是为了高效发送文件而产生的了
流程图示(以一个客户端为例)
sockets5 DEMO 对于使用asyncore,官方的例子我觉得已经表现的非常到位了。下面我放出一个使用它写的简单socks5server(作为一个demo,只对tcp进行处理,没有写远程dns解析)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 import asyncoreimport socketimport structsockets_map = {} class Socks5Listen (asyncore.dispatcher ): def __init__ (self, address=('localhost' , 8081 ) ): asyncore.dispatcher.__init__(self) self.create_socket(socket.AF_INET, socket.SOCK_STREAM) self.set_reuse_addr() self.bind(address) self.listen(2048 ) def handle_accept (self ): pair = self.accept() if pair is not None : sock, addr = pair print('socks connection from ' , addr) Local(sock) class Local (asyncore.dispatcher_with_send ): def __init__ (self, sock ): asyncore.dispatcher_with_send.__init__(self, sock=sock) self.read_buffer = '' self.status = 'init' def read (self, num ): data = self.recv(num) self.read_buffer += data if len (self.read_buffer) != num: return None else : _ = self.read_buffer self.read_buffer = '' return _ def handle_read (self ): if self.status == 'init' : self.recv(262 ) self.send(b"\x05\x00" ) self.status = 'request' return if self.status == 'request' : data = self.read(4 ) if data is None : return mode = ord (data[1 ]) addrtype = ord (data[3 ]) if addrtype == 1 : addr = self.read(4 ) if addr is None : return addr = socket.inet_ntoa(addr) elif addrtype == 3 : addr = self.read(ord (self.recv(1 )[0 ])) if addr is None : return port = self.read(2 ) if port is None : return port = struct.unpack('>H' , port)[0 ] if mode == 1 : remote = Remote(addr, port) sockets_map[self] = remote sockets_map[remote] = self self.status = 'transfer' return else : reply = b"\x05\x07\x00\x01" self.send(reply) self.close() return if self.status == 'transfer' : data = self.recv(2048 ) if data: sockets_map[self].send(data) def handle_expt (self ): self.handle_close() def handle_close (self ): asyncore.dispatcher_with_send.handle_close(self) remote = sockets_map.get(self) if remote: del sockets_map[remote] if self in sockets_map: del sockets_map[self] class Remote (asyncore.dispatcher_with_send ): def __init__ (self, addr, port ): asyncore.dispatcher_with_send.__init__(self) self.create_socket(socket.AF_INET, socket.SOCK_STREAM) self.connect((addr, port)) def handle_connect (self ): print('Tcp connect to' , self.addr) local = self.socket.getsockname() reply = b"\x05\x00\x00\x01" reply += socket.inet_aton(local[0 ]) + struct.pack(">H" , local[1 ]) sockets_map[self].socket.send(reply) def handle_read (self ): data = self.recv(2048 ) if data: sockets_map[self].send(data) def handle_expt (self ): reply = '\x05\x05\x00\x01\x00\x00\x00\x00\x00\x00' self.send(reply) self.handle_close() def handle_close (self ): asyncore.dispatcher_with_send.handle_close(self) local = sockets_map.get(self) if local: del sockets_map[local] if self in sockets_map: del sockets_map[self] server = Socks5Listen() asyncore.loop()
一个socks5服务端会有3类socket。所以会有三个类。
小知识 LEGB作用域 asyncore中使用全局字典socket_map记录映射。想到全局就会想到global
1 2 3 4 5 6 a = 1 def main (): global a a += 1 main() print a
可以注意到这里必须要有global关键字,然而换成字典。为何就不需要global了。实际上这里要关注的是对象的id,对于数字、字符串这些对象是不可变的,然而字典对象是可变的。要改变不可变变量实际是重新赋值,所以对于改变外部不可变变量要用global,可变对象不需要
框架和库的区别 俗称好莱坞模式(Don’t call us, we will call you)。比如库函数sum你很容易想到给几个数字它返回给你和。框架嘛。就像上面的asyncore你需要的是继承dispatcher类然后重新一些方法。虽然它能达到目的,可是如果你不看源码或许你永远也无法明白框架在后面做了什么。从我个人的理解来看我是比较喜欢成熟的框架的。毕竟如果不用框架自己用库函数写最后也可能是一个框架,当然很可能是一个垃圾的框架:D
为什么多路IO复用一定是配合的非阻塞socket 妈蛋,这个问题各种说法真是太多了。想一想如果select返回给你一个fd可读,那么他一定是可读的。如果这样那么一般情况下我们用select配合阻塞socket也是可以的。可是现实不这样纸啊。unix手册中明确说明了不是这样的。所以工程上几乎没有见过配合阻塞socket使用的例子。另外asyncore种connect方法用的是connect_ex
相关资料 官方文档
知乎:为什么IO多路复用要搭配非阻塞socket