The Python asyncore and aynchat modules
The Python standard library provides two modules—
asynchat—to help in writing concurrent network servers using
event-based designs. The documentation does not give good examples,
so I am making some notes.
The basic idea behind the
asyncore module is that:
- there is a function,
select()on a bunch of ‘channels’. Channels are thin wrappers around sockets.
selectreturns, it reports which sockets have data waiting to be read, which ones are now free to send more data, and which ones have errors;
loop()examines the event and the socket’s state to create a higher level event;
- it then calls a method on the channel corresponding to the higher level event.
asyncore provides a low-level, but flexible API to build network
asynchat builds upon
asyncore and provides an API that is
more suitable for request/response type of protocols.
The asyncore module’s API consists of:
loop()method, to be called by a driver program written by you;
dispatcherclass, to be subclassed by you to do useful stuff. The
dispatcherclass is what is called ‘channel’ elsewhere.
+-------------+ +--------+ | driver code |---------> | loop() | +-------------+ +--------+ | | | | loop-dispatcher API (a) | | | +--------------+ | | dispatcher | +----------------->| subclass | +--------------+ | | dispatcher-logic API (b) | +--------------+ | server logic | +--------------+
This is all packaged nicely in an object oriented way. So, we have
dispatcher class, that extends/wraps around the socket class (from
socket module in the Python standard library). It provides all
socket class’ methods, as well as methods to handle the higher
level events. You are supposed to subclass
dispatcher and implement
the event handling methods to do something useful.
The loop-dispatcher API
The loop function looks like this:
loop( [timeout[, use_poll[, map[,count]]]])
What is the map? It is a dictionary whose keys are the
file-descriptors, or fds, of the socket (i.e.,
whose values are the dispatcher objects which you want to handle events on that socket/fd.
When we create a new
dispatcher object, it automatically gets added to a
global list of sockets (which is invisible to us, and managed behind the scenes).
loop() function does a
select() on this
We can over-ride the list that loop looks at, by providing an explicit map. But then, we would need to add/remove dispatchers we create to/from this map ourselves. (Hmm… we might always want
to use explicit maps; then our loop calls will be thread safe and we
will be able to launch multiple threads, each calling loop on
Methods a dispatcher subclass should implement
loop() needs the
dispatcher to implement some methods:
readable(): should return
True, if you want the fd to be observed for read events;
writable(): should return
True, if you want the fd to be observed for write events;
True, the corresponding fd will be examined
for errors also. Obviously, it makes no sense to have a dispatcher
False for both
Some other methods that
loop calls on
handle_read: socket is readable; dispatcher.recv() can be used to actually get the data
handle_write: socket is writable; dispatcher.send(data) can be used to actually send the data
handle_error: socket encountered an error
handle_expt: socket received OOB data (not really used in practice)
handle_close: socket was closed remotely or locally
For server dispatchers,
loop calls one more event:
handle_accept: a new incoming connection can be accept()ed. Call the accept() method really accept the connection. To create a server socket, call the bind() and listen() methods on it first.
Client sockets get this event:
handle_connect: connection to remote endpoint has been made. To initiate the connection, first call the
connect() method on it.
Client sockets are discussed in the
asyncore documentation so I will not discuss them here.
Other socket methods are available in
set_resue_addr. They are not called by
loop but are available
so that your code can call them when it needs to create a new socket, close an existing socket, and tell the OS to set the SO_REUSEADDR flag on the server socket.
How to write a server using
The standard library documentation gives a client example, but not a
server example. Here are some notes on the latter.
dispatcherto create a listening socket
- In its
handle_acceptmethod, create new dispatchers. They’ll get added to the global socket map.
Note: the handlers must not block or take too much time… or the server won’t be concurrent. This is because when multiple sockets get an event,
loop calls their dispatchers one-by-one, in the same thread.
The socket-like functions that dispatcher extends should not be bypassed in order to access the low level socket functions. They do funky things to detect higher level events. For e.g., how does asyncore figure out that the socket is closed? If I remember correctly, there are two ways to detect whether a non-blocking socket is closed:
- select() returns a read event, but when you call recv()/read() you get zero bytes;
- you call send()/write() and it fails with an error (sending zero bytes is not an error).
(I wish I had a copy of Unix Network Programming by Stevens handy
dispatcher will detect both events above and if any one of them occurs, will call
handle_close. This frees you from having to look at low-level events, and think in terms of higher level events.
The code for a server based on
asyncore is below:
import logging import asyncore import socket logging.basicConfig(level=logging.DEBUG, format="%(created)-15s %(msecs)d %(levelname)8s %(thread)d %(name)s %(message)s") log = logging.getLogger(__name__) BACKLOG = 5 SIZE = 1024 class EchoHandler(asyncore.dispatcher): def __init__(self, conn_sock, client_address, server): self.server = server self.client_address = client_address self.buffer = "" # We dont have anything to write, to start with self.is_writable = False # Create ourselves, but with an already provided socket asyncore.dispatcher.__init__(self, conn_sock) log.debug("created handler; waiting for loop") def readable(self): return True # We are always happy to read def writable(self): return self.is_writable # But we might not have # anything to send all the time def handle_read(self): log.debug("handle_read") data = self.recv(SIZE) log.debug("after recv") if data: log.debug("got data") self.buffer += data self.is_writable = True # sth to send back now else: log.debug("got null data") def handle_write(self): log.debug("handle_write") if self.buffer: sent = self.send(self.buffer) log.debug("sent data") self.buffer = self.buffer[sent:] else: log.debug("nothing to send") if len(self.buffer) == 0: self.is_writable = False # Will this ever get called? Does loop() call # handle_close() if we called close, to start with? def handle_close(self): log.debug("handle_close") log.info("conn_closed: client_address=%s:%s" % \ (self.client_address, self.client_address)) self.close() #pass class EchoServer(asyncore.dispatcher): allow_reuse_address = False request_queue_size = 5 address_family = socket.AF_INET socket_type = socket.SOCK_STREAM def __init__(self, address, handlerClass=EchoHandler): self.address = address self.handlerClass = handlerClass asyncore.dispatcher.__init__(self) self.create_socket(self.address_family, self.socket_type) if self.allow_reuse_address: self.set_reuse_addr() self.server_bind() self.server_activate() def server_bind(self): self.bind(self.address) log.debug("bind: address=%s:%s" % (self.address, self.address)) def server_activate(self): self.listen(self.request_queue_size) log.debug("listen: backlog=%d" % self.request_queue_size) def fileno(self): return self.socket.fileno() def serve_forever(self): asyncore.loop() # TODO: try to implement handle_request() # Internal use def handle_accept(self): (conn_sock, client_address) = self.accept() if self.verify_request(conn_sock, client_address): self.process_request(conn_sock, client_address) def verify_request(self, conn_sock, client_address): return True def process_request(self, conn_sock, client_address): log.info("conn_made: client_address=%s:%s" % \ (client_address, client_address)) self.handlerClass(conn_sock, client_address, self) def handle_close(self): self.close()
and to use it:
interface = "0.0.0.0" port = 8080 server = asyncore_echo_server.EchoServer((interface, port)) server.serve_forever()