Friday, July 5, 2013

Writing communicating agents with no effort, step II

Introduction

In step I of this series, we introduced the notion of Communicating Agents and laid down the requirements for being able to easily create communicating agents as if they were normal classes using Python. We also introduced a generic, message-independent mechanism for JSON-based message passing using a UDP-based agent as example.

In this post I'll fulfill the requirements set initially by applying advanced Python features like meta-programming, descriptors and decorators. In addition to enabling you to create your own communicating agents with no effort, I hope the text that follows sheds some light on those Python features and how they can be applied to solving a real problem.

If you're new to decorators and meta-programming I encourage you to read this post on decorators and this post on meta-classes in stackoverflow.com. I find they're enlightening and good enough as to allow me not repeating all the stuff down here.

What are we missing?

Let's review what our initial requirements are and what have we achieved with the example agent from step I:
  • Little time and resources
  • Programming language-independence (of message exchanges)
  • Networking technology-independence
  • Easily extendable/modifiable (message sets)
  • Self-contained implementation (i.e. no need of external tools)
We fulfilled the first requirement by using Python as Rapid Prototyping language. We also fulfilled the second requirement by using JSON as message encoding/decoding format. Fourth requirement is partially fulfilled thanks to the generic code that handles any message as long as it is represented as an object supported by the json.dumps() and json.loads() functions from the json library, though new messages still require code changes in our agent classes. Fifth requirement is also fulfilled thanks to the handy namedtuple class we use to define our messages in-line in the code. Hence, only the third and fourth requirements need to be addressed.

Networking technology independence

If you check our TestAgent class from the previous post, you'll see that network-enabling the agent using the socketserver module is quite intrusive. It requires you to inherit from a Server class in the module (we chose  UDPServer due to its simplicity) and writing a Handler class for processing packets incoming from the network. In addition, to enable our agents to speak (as opposed to only listen) we extended the agent class with a send() method.

Changing the networking technology our agents use thus requires substantial code changes. Remember we want our agents' code to be independent of the inter-communication means and networking technology sitting between them, so we can change any of them without changing our code. Therefore, any networking-related code must be placed somewhere off our agents' implementation code.

We might create a new class inheriting from our agent class and place all the networking code in there. Check the following snippet:

import unittest
from collections import namedtuple
from socketserver import UDPServer, BaseRequestHandler
import socket
import json

...

class UDPAgentHandler ( BaseRequestHandler ):
    '''
    Generic Handler class for any UDP-based agent
    having a handle(msg, src) method.
    Can be placed in an external module for re-use.
    '''
    def handle ( self ):
        jsonencodedmsg = self.request[0].strip()
        src = self.client_address
        msgname, msgbody = jsonencodedmsg.decode().split(':', 1)
        self.server.handle(msg, src)

class UDPAgent ( UDPServer ):
    '''
    Generic Agent class for any UDP-based agent.
    Can be placed in an external module for re-use.
    '''
    def __init__ ( self, local_address ):
        super(UDPAgent, self).__init__(local_address, UDPAgentHandler)

    def address ( self ):
        return self.server_address

    def send ( self, msg, dst ):
        jsonencodedmsg = \
            type(msg).__name__ + ':' + json.dumps(msg.__dict__)
        return self.socket.sendto(bytes(jsonencodedmsg, 'utf8'), dst)

...

class TestAgent ( object ):
    TestMsg = namedtuple('TestMsg', 'a,b')
          
    def __init__ ( self ):
        self.__rcvdmsgs = []

    def testMsgHandler ( self, msg, src ):
        self.__rcvdmsgs.append(msg)

    def handle ( self, msg, src ):
        if msg.__name__ == 'TestMsg':
            self.testMsgHandler(msg, src)
        else:
            pass
               
    def __iter__ ( self ):
        return iter(self.__rcvdmsgs)

...

class CommAgentTest(unittest.TestCase):
    def testCommAgent ( self ):
        class UDPTestAgent ( UDPAgent, TestAgent ):
            def __init__ ( self, local_address ):
                UDPAgent.__init__(self, local_address)
                TestAgent.__init__(self)

            def __iter__ ( self ):
                return TestAgent.__iter__(self)

        testmsgs = [
            TestAgent.TestMsg(a=1, b='Hi '),
            TestAgent.TestMsg(a=2, b='there!'),
        ]
        host = socket.gethostbyname(socket.gethostname())
        ports = (2013, 2014)
        agent1, agent2 = \
            UDPTestAgent((host, ports[0])), UDPTestAgent((host, ports[1]))
        try:
            sndmsg = lambda: \
                [agent1.send(msg, agent2.address()) for msg in testmsgs][0] \
                or sleep(3) \
                or agent2.shutdown()
            Timer(1, sndmsg).start()
            agent2.serve_forever()
            self.assertListEqual(testmsgs, list(agent), "Lists not equal")
        finally:
            agent1.socket.close()
            agent2.socket.close()

...

if __name__ == "__main__":
    unittest.main()

Snippet 1 - First stab at networking technology independence

If you've followed the snippets in the previous post the code above should be self-explanatory. I will therefore focus on the limitations of the implementation instead of how it does its job.

First and foremost: for every combination of agent class and networking technology, we need a new network-enabled class wrapping the agent class. This means that with M agent classes and N networking technologies, we need MxN new classes if we want to support all the possible networking alternatives.

Second, even if we've abstracted away most of the networking&messaging code to a class re-usable by any UDP-based agent, we still need to make changes to our agent class every time a message is added or removed (the if ... else ... clause inside our agent's handle() method).

Third, it becomes difficult to extend the functionality of the re-usable UDPAgent class. Imagine we wanted to keep a count of the number of messages sent by the agent. We might think of adding a send() method to our agent class, which increments a counter then calls UDPAgent's send(). But this poises the problem of which send() method shall be actually called when using a UDPTestAgent instance, since it inherits from both TestAgent and UDPAgent. We might add the send() method to the UDPTestAgent class, thus removing the uncertainty, but then the method is not re-usable by an eventual TCPTestAgent class.

Finally, as you can see from the snippet above (TestAgent's __iter__() method), private members of TestAgent are not visible in UDPTestAgent. This hiding of private members in derived classes forces us to re-define in UDPTestAgent every private member of TestAgent intended to be used from outside the class.

Couldn't we get rid of all the limitations enumerated above while keeping the ease of use of the UDPTestAgent class? Indeed we can, and I'm showing you how to achieve it step-by-step in the following sections.

Networking as an Aspect

What if we turned the ability to talk to other agents over some inter-network an aspect of our agents? Aspects are concerns common to many classes, and commoditized enough that they can be imported into the classes needing them without class-specific adaptations. For example, logging is such an ubiquitous concern that it is a first-hand candidate for being implemented as an aspect (though strangely I've seen no real-world code doing it that way).

Networking ability fulfills all the conditions to be handled as an aspect of our agents. Aspects in Python are implemented using meta-programming, more specifically meta-classes. So let's write a meta-class that performs the functions necessary to fulfill our third and fourth requirements. Additionally, in order to make it look like an aspect we'll invoke our meta-class through class decorators. And finally, to round it all up we'll write a couple utility method decorators tackling some of the nuisances of the implementation shown in Snippet 1 above.

Let me start from the end by showing you how the concepts just introduced fit together in our example Unit Test case.

import unittest
from collections import namedtuple

...

@CommAgent.UDP
class TestAgent ( object ):
    TestMsg = namedtuple('TestMsg', 'a,b')
          
    def __init__ ( self ):
        self.__rcvdmsgs = []

    @CommAgent.handles('TestMsg')
    def testMsgHandler ( self, msg, src ):
        self.__rcvdmsgs.append(msg)

    @CommAgent.export               
    def __iter__ ( self ):
        return iter(self.__rcvdmsgs)

...

class CommAgentTest(unittest.TestCase):
    def testCommAgent ( self ):
        testmsgs = [
            TestAgent.TestMsg(a=1, b='Hi '),
            TestAgent.TestMsg(a=2, b='there!'),
        ]
        host = socket.gethostbyname(socket.gethostname())
        ports = (2013, 2014)
        agent1, agent2 = \
            TestAgent((host, ports[0])), TestAgent((host, ports[1]))
        try:
            sndmsg = lambda: \
                [agent1.send(msg, agent2.address()) for msg in testmsgs][0] \
                or sleep(3) \
                or agent2.shutdown()
            Timer(1, sndmsg).start()
            agent2.serve_forever()
            self.assertListEqual(testmsgs, list(agent), "Lists not equal")
        finally:
            agent1.socket.close()
            agent2.socket.close()

...

if __name__ == "__main__":
    unittest.main()

Snippet 2 - Final solution applied to an example Unit Test Case

Elegant and simple, isn't it? We write our TestAgent class ignoring all the networking aspects. We may even unit-test it before introducing the networking ability, in order to make sure it performs its core duties properly. Then we introduce networking by decorating the class, and adapt the Unit Test to the peculiarities of networking (see the previous post for details).

The code enabling the solution just shown lies within a single meta-class, called CommAgent. I'm showing you CommAgent's code in the following snippet.

class CommAgent(type):
    def __new__(cls, name, bases, d):
        CommAgent.__addaliases(bases, d)
        CommAgent.__decoratesend(bases, d)
        CommAgent.__addgenericjsonhandler(d)
        CommAgent.__exportprivate(bases, d)
        return type.__new__(cls, name, bases, d)

    @staticmethod
    def __addaliases ( bases, d ):
        aliases = [(m.msgname+'Handler', m) \
                   for b in bases \
                   for m in b.__dict__.values() \
                   if hasattr(m, 'msgname')]
        d.update(aliases)
    
    @staticmethod
    def __searchbases ( bases, name ):
        dicts = map(lambda b: b.__dict__, bases)
        m = map(lambda d: d.get(name), filter(lambda d: name in d, dicts))
        try:
            return next(m)
        except StopIteration:
            return None

    @staticmethod
    def __decoratesend ( bases, d ):
        send = d.get('send', CommAgent.__searchbases(bases, 'send'))
        if send is not None:
            d['send'] = CommAgent.jsonencoded(send)
        else:
            d['send'] = lambda s, m, d: \
                print("Auto-generated %s.send() method: msg=%s, dst=%s" % \
                      (type(s).__name__, str(m), str(d)))


    @staticmethod
    def __addgenericjsonhandler ( d ):
        def defaulthandler ( self, message, src ):
            '''
            Called when there is no handler defined for a message.
            Can be re-implemented in derived classes.
            '''
            pass
        
        def unknownhandler ( self, message, src ):
            '''
            Called when we receive an unknown message.
            Can be re-implemented in derived classes.
            '''
            pass
        
        def handle ( self, message, src ):
            '''
            Method to be injected into classes having CommAgent as
            metaclass. Decodes a received JSON message into a Python object and
            calls the handler method 'self.<msgname>Handler()', where <msgname>
            is the message name received at the heading of the message.
            '''
            msgname, msgval = message.decode().split(':', 1)
            if msgname in dir(type(self)):
                MsgType = type(self).__bases__[0].__dict__[msgname]
                msg = MsgType(**json.loads(msgval))
                try:   
                    return type(self).__dict__[msgname + 'Handler'](self, msg, src)
                except KeyError:
                    return self.defaulthandler(msg, src)
            else:
                return self.unknownhandler(msg, src)
            
        d['handle'] = handle
        d['unknownhandler'] = unknownhandler
        d['defaulthandler'] = defaulthandler

    @staticmethod
    def __exportprivate ( bases, d ):
        exported = [(m.__name__, m) \
                    for b in bases \
                    for m in b.__dict__.values() \
                    if hasattr(m, 'export')]
        d.update(exported)
        
    @staticmethod
    def export ( privatefunc ):
        '''
        Decorator method, adds an 'export=True' attribute to privatefunc
        '''
        privatefunc.export = True
        return privatefunc


    @staticmethod
    def handles ( msgname ):
        '''
        Decorator factory method, produces message handler decorators.
        Returns a decorator for the method that shall handle the message with
        name 'msgname'.
        '''
        def decorator ( handlerfunc ):
            @wraps(handlerfunc)
            def wrapper ( self, msg, src ):
                rsp = handlerfunc(self, msg, src)
                if type(rsp).__name__.endswith('Msg'):
                    jsonencodedrsp = \
                        type(rsp).__name__ + ':' + json.dumps(rsp.__dict__)
                    return bytes(jsonencodedrsp, 'utf8') 
            wrapper.msgname = msgname
            return wrapper
        return decorator
    
    @staticmethod
    def jsonencoded ( sendfunc ):
        '''
        Decorator for a method having as arguments a message 'msg' and a
        destination 'dst'. It encodes the message using JSON encoding before
        calling the decorated method.
        Ideally suited to decorate a send() method receiving a Python object as
        message.
        '''
        @wraps(sendfunc)
        def wrapper ( self, msg, dst=None ):
            jsonencodedmsg = type(msg).__name__ + ':' + json.dumps(msg.__dict__)
            return sendfunc(self, bytes(jsonencodedmsg, 'utf8'), dst)
        return wrapper


    @staticmethod
    def local ( cls ):
        class LocalLink ( object ):
            def send ( self, msg, dst ):
                dst.handle(msg, self)
                
        class wrapper(cls, LocalLink, metaclass=CommAgent ):
            def __init__ ( self, *args, **kwargs ):
                cls.__init__(self, *args, **kwargs)

            def address ( self ):
                return self
            
        return wrapper

    @staticmethod
    def UDP ( cls ):
        class UDPHandler(BaseRequestHandler):
            def handle ( self ):
                data = self.request[0].strip()
                socket = self.request[1]
                result = self.server.handle(data, self.client_address)
                if result is not None:
                    socket.sendto(result, self.client_address)
                
        class wrapper(cls, UDPServer, metaclass=CommAgent):
            def __init__ ( self, hostport, *args, **kwargs ):
                UDPServer.__init__(self, hostport, UDPHandler)
                cls.__init__(self, *args, **kwargs)
                
            def address ( self ):
                return self.server_address
            
            def send ( self, msg, dst ):
                # The method providing send() in UDPServer is sendto()
                self.socket.sendto(msg, dst)


        return wrapper

Snippet 3 - The CommAgent meta-class

Phew, that was a lot of code! I didn't concern myself much with where I was placing the functionality so I placed it all inside the CommAgent meta-class. If you have suggestions in this respect I'd be glad to read about them.

First thing noticeable about CommAgent class is its parent: type. This indicates CommAgent is a meta-class. Second tip indicating that CommAgent is a meta-class is the __new__() method. That method is called whenever a class having CommAgent as meta-class is being built. Despite its signature, __new__() is not a class method. The cls actual argument it receives shall never be a CommAgent class but the class being constructed, which shall have CommAgent as its meta-class.

Our __new__() method follows the Template pattern. What it does can be summarized as follows:
  • add aliases for the handler methods in the class being built (we'll see how handler methods are identified shortly), those aliases following a consistent naming scheme of the form <MsgName> + 'Handler', where <MsgName> is the name of the class modelling the message in our agent class. In our example from Snippet 1 above, <MsgName> can only be 'TestMsg'
  • wrap the send() method of the class being built into a new method that performs JSON-encoding of the message passed as actual argument before actually sending the message
  • add to the class being built a handler method able to receive any message defined in that class (we'll see how messages are identified shortly), JSON-decode it and pass it to the corresponding handler method using the aliases previously added as per the first bullet above
  • re-create any private methods of the class being built so they are visible through derived classes
Handler methods in our agent class are identified thanks to the CommAgent.handles() decorator. This decorator tags the decorated method with an attribute called msgname which it sets to the value passed as argument to the decorator. In the CommAgent.__addaliases() static method, we look for class members having a msgname attribute and for each one we find we add a msgname + 'Handler' element to the dictionary of the class being built, set to the class member having the attribute.

Messages in our agent classes are identified by looking the agent class' dictionary for class members whose names end in 'Msg'. This is rather static but works fine as long as you keep consistent when naming the messages in your agent class.

Let's skip the rest of the code below the __new__() method and concentrate in the local() and UDP() static methods. These methods are class decorators (or more precisely, class decorator factories) for our agent classes, and they do the tedious job that allows us expressing networking as an aspect of those classes. Any class decorated with the CommAgent.local or CommAgent.UDP decorator shall be extended with a wrapper class inheriting from a server class (in addition to our agent class) and having CommAgent as its meta-class. This saves us the work of creating one networking-enabled agent class for every pair of agent class and networking method/library. Check the local() code to see if you can find out what kind of networking the CommAgent.local decorator affords.

For every new networking method/library we just have to add a new class decorator to the CommAgent meta-class. For example, to use TCP with our agent classes we'd create a CommAgent.TCP decorator implemented in a TCP() static method in CommAgent. Then we'd use it to decorate any agent classes we want using TCP for talking to each other.

There's not much more to it, and now you should be able to make your way through the CommAgent code yourself. I find using the CommAgent meta-class particularly simple and useful in my designs. Feel free to comment on the implementation above, or point me to any bugs you find in it. I've already implemented TCP and RMcast decorators for my own CommAgent class, but in order not to make this post unbearably long I'll keep those for myself for the time being. Feel free to ask for them if you're interested though.

As always, thanks for reading and I whish you find my posts useful in your day-to-day programming duties.

Issues and cave-at's

In return for the simplicity and ease-of-use of CommAgent, you need to bear in mind a few things. I've found none of them too relevant in the designs I've undertaken so far, but your mileage might vary. Here they go:

  • Minimize the use of private methods and variables. Remember we're using the CommAgent.export decorator to "bring to surface" private methods of the agent class. This approach has a drawback: when called from the wrapper class, private variables and non-surfaced private methods of the agent class shall not be available, so if the surfaced method tries to use one of those it shall fail with an AttributeError exception.
  • When decorating a class in the middle of a class hierarchy, all derived classes shall be extending the wrapper class, not the agent class. The wrapper class in most cases extends one of UDPServer, TCPServer or your own server class, hence you might incur into name clashes (e.g. one of your derived classes might define a shutdown method or variable). Be wary of this situation and try to avoid it if at all possible.
  • You can overload members from the server class providing networking support to your communicating agent in your agent class, but in order to do so you must follow the Smalltalk approach to building classes. For instance, in one design I had to overload the UDPServer.shutdown() method so it cancel() a timer thread my agent class is using. Here's what I did:


...

@ProtocolAgent.RMcast    

class LogicalClockServer(object):

    ...

def shutdown ( self ):
    super(LogicalClockServer, self).shutdown()
    self.hbthread.cancel()

LogicalClockServer.shutdown = shutdown


Snippet 4 - Overloading methods from the base server class in your agent class

You might be wonderig what's going on in the code above. Remember the class decorator defines a new wrapper class and binds it to the name of the wrapped class. Hence, once the Python interpreter finishes processing your agent class and its decorator, the name "LogicalClockServer" is not bound to your agent class anymore, but to the wrapper class, which in this case extends a base RMcastServer class of my own having a shutdown() method.

If you try to define the overloading shutdown() method within your agent class, it won't work because when processing that method the wrapper class doesn't exist yet, and your agent class does not extend a base server class. Therefore you need to define and bind any method overloading a base server class' method once the wrapper class has been created and bound by the interpreter.

Of course, if you don't feel comfortable with the Smalltalk-style of defining classes you can always extend your agent class with another class containing any overloadings you want. 


Thursday, June 20, 2013

Writing communicating agents with no effort, step I

Introduction

If there's a pattern that arises over and over again in distributed computing, it is that of Communicating Agents. Communicating Agents are SW components (e.g. classes) that get instantiated at one or more computers interconnected by some kind of network, each instance able to live on its own but exchanging messages with other instances with the aim of reaching a common goal. For example, when the SW components being instantiated are processes at a single computer, we have Hoare's well-known CSP programming model.

Along the history of distributed computing we've seen multiple versions of the Communicating Agents pattern. From the early days of the Internet, where very simple deployments (typically client-server with a couple of servers and a handful of clients) were the norm, to these days where huge enterprise applications composed of hundreds of agents distributed across dozens of machines talk over a number of disparate networks.

Every seasoned programmer has met this pattern at least once in a life. Depending on the needs, context and tools at hand, one might decide to leverage existing distributed computing infrastructure like e.g. CORBA or EJB or simpler tools like e.g. Google's ProtocolBuffers; on the other hand, one might opt for developing own tools. This post deals with the latter case.

Our goal

We'll start by defining what we want to achieve. Say we are in the need of developing a distributed application built from multiple agents that cooperate by exchanging messages over some network linking the machines these agents run on. For the time being we're not concerned with run-time infrastructure, i.e. the middleware managing the agents' execution and life-cycle, so we will focus on achieving message definition and their exchange between our agents.

We have very little time and resources to do this so we'll be using Python as our rapid prototyping language, but we don't want to constrain ourselves to Python so we'd like our messages to be easily encoded/decoded using other languages as well. Thus we'll be using JSON as our encoding/decoding machinery of choice.

We want our message passing mechanism to be independent of the underlying inter-networking technology. We would also like to be able to change the inter-networking technology our agents use without impacting our agents' implementations.

We want our agents to be easily extensible so they can send and receive new messages. New -and changed- messages must have minimum to no impact on existing code.

Finally, we want our messages and message handling specifications to be defined in the code, so we don't have to resource to additional/external tools like IDL compilers, .proto files or the like.

We'll see that using advanced Python facilities like decorators and meta-classes achieving the above goals is quick and simple.  By writing a few lines of code we'll be able to quickly create classes implementing arbitrarily complex message protocols, using whatever communication means, and all this as easily as we'd write classes local to a single module or program.

Let me follow the TDD paradigm to reach our goal. In TDD, you write your tests before anything else, then along multiple iterations you write the code that eventually shall successfully pass those tests. Using PyUnit, we might write a test like the following:

import unittest

...

class TestAgent ( object ):
    def TestMsg ( *args, **kwargs ):
        pass
          
    def address ( self ):
        pass
             
    def send ( self, msg, target ):
        pass

    def testMsgHandler ( self, msg, src ):
        pass
               
    def __iter__ ( self ):
        return iter([])

...

class CommAgentTest(unittest.TestCase):
    def testCommAgent ( self ):
        testmsgs = [
            TestAgent.TestMsg(a=1, b='Hi '),
            TestAgent.TestMsg(a=2, b='there!'),
        ]
        agent1, agent2 = TestAgent(), TestAgent()
        for msg in testmsgs:
            agent1.send(msg, agent2.address())
        
        self.assertListEqual(testmsgs, list(agent2), "Lists not equal")

...

if __name__ == "__main__":

    unittest.main()

Snippet 0 - PyUnit test of a dummy agent class

The test above runs but fails, as expected from the first iteration of a TDD. Let's complete the TestAgent class so the test passes:

import unittest
from collections import namedtuple

...

class TestAgent ( object ):
    TestMsg = namedtuple('TestMsg', 'a,b')
          
    def __init__ ( self ):
        self.__rcvdmsgs = []

    def address ( self ):
        return self
             
    def send ( self, msg, target ):
        return target.testMsgHandler(msg, self)

    def testMsgHandler ( self, msg, src ):
        self.__rcvdmsgs.append(msg)
               
    def __iter__ ( self ):
        return iter(self.__rcvdmsgs)

...

class CommAgentTest(unittest.TestCase):
    def testCommAgent ( self ):
        testmsgs = [
            TestAgent.TestMsg(a=1, b='Hi '),
            TestAgent.TestMsg(a=2, b='there!'),
        ]
        agent1, agent2 = TestAgent(), TestAgent()
        for msg in testmsgs:
            agent1.send(msg, agent2.address())
        
        self.assertListEqual(testmsgs, list(agent2), "Lists not equal")

...

if __name__ == "__main__":
    unittest.main()

Snippet 1 - PyUnit test of a prototypical Communicating Agent

In the test above, we define an agent class (TestAgent) whose protocol is made of just one message (TestMsg). We've decided to implement messages as instances of class namedtuple, which is as close as you can get to a C struct in Python. Agent functionality comes down to storing received messages in a private list (self.__rcvdmsgs), and is implemented in method TestAgent.testMsgHandler(). The class supports the iterator protocol (the __iter__() method) so we can easily obtain the messages received by an instance of the class.

Then we instantiate two of those agents and send a pre-defined set of messages from the first agent to the second, after which we test the second agent's received messages list against the pre-defined message set.

The unit test above runs OK. However, it falls short of reaching our goal, for the following -otherwise obvious- reasons:
  • our agents are only able to talk to each other when they run on the same processor and within the same memory space, since our "network" is the function call stack
  • we can't change our "network" without modifying our agents' implementations
  • we can't exchange messages with agents written in Java (unless we build a Python-C-Java bridge or we run on Jython), and exchanging messages with agents written in C/C++ forces us to use the Python-C interface which is cumbersome when you're short of time
  • implementing complex protocols would be difficult to maintain, since each agent class needs to know the method names of other agents' classes that handle each message of the protocol
In the following sections we'll fix some of the problems of this prototypical implementation.

Enabling network communication between agents

In order to enable our agents to talk to remote agents over a network, we need some networking code. We might write it ourselves as part of a base CommAgent class, but why bother when we have the fancy socketserver module?.

To retro-fit our agent class with networking capabilities, all we need is inheriting from a class in the socketserver module, and providing a handler class that manages the messages received from remote agents.

Let's take a first stake at networking our agents following the approach above:

import unittest
from collections import namedtuple
from socketserver import UDPServer, BaseRequestHandler
import socket

...

class TestAgentHandler ( BaseRequestHandler ):
    def handle ( self ):
        msg = self.request[0].strip()
        src = self.client_address
        self.server.testMsgHandler(msg, src)

class TestAgent ( UDPServer ):
    TestMsg = namedtuple('TestMsg', 'a,b')
            
    def __init__ ( self, local_address ):
        super(TestAgent, self).__init__(local_address, TestAgentHandler)
        self.__rcvdmsgs = []
       
    def address ( self ):
        return self.server_address

    def send ( self, msg, dst ):
        return self.socket.sendto(msg, dst)

    def testMsgHandler ( self, msg, src ):
        self.__rcvdmsgs.append(msg)
                
    def __iter__ ( self ):
        return iter(self.__rcvdmsgs)

...

class CommAgentTest(unittest.TestCase):

    ...


    def testCommAgent ( self ):
        testmsgs = [
            TestAgent.TestMsg(a=1, b='Hi '),
            TestAgent.TestMsg(a=2, b='there!'),
        ]
        host = socket.gethostbyname(socket.gethostname())
        ports = (2013, 2014)
        agent1, agent2 = \
            TestAgent((host, ports[0])), TestAgent((host, ports[1]))
        [agent1.send(msg, agent2.address()) for msg in testmsgs]
        try:
            Timer(1, lambda: sleep(3) or agent2.shutdown()).start()
            agent2.serve_forever()            
            self.assertListEqual(testmsgs, list(agent2), "Lists not equal")
        finally:
            agent1.socket.close()
            agent2.socket.close()

...

if __name__ == "__main__":
    unittest.main()

Snippet 2 - First stake at a networked Communicating Agent

It didn't take much pain to network-enable our agent, dit it?. I chose to use UDP due to its ease of use, but using a TCPServer instead of UDPServer shouldn't be much harder (I'll leave this to you fellow readers as an exercise).

I had to enhance our unit test a bit. When real networking comes up on stage we need to consider threading issues. We can't run agent2's server loop with agent2.serve_forever() and later on cause the loop to end with agent2.shutdown() from within the same thread. Since our interpreter's main thread blocks on the server loop we need an additional thread that calls agent2.shutdown(), and that's what we get with the Timer class. We're scheduling execution of agent2.shutdown() after 4 seconds (1 second until timer thread start, to provide some time for preparations, and 3 seconds before the shutdown() method is actually called).

Language-independence (of messages)

Unfortunately, if you run the test case above you'll get an annoying exception when calling agent1's send() method: "'TestMsg' does not support the buffer interface".

What does that mean? If you check the doc for the socket class (just type "import socket; help(socket.socket)" at the Python interpreter's prompt), you'll see its sendto() method refers you to its send() method, which reads "sends a data string to the socket". This is ambiguous at the very least, but the key lies in the string word: what it actually means is whatever data you pass to the sendto() method must either be an instance of class bytes, or of some other class that can be converted to an instance of bytes somehow. Class string is one example of the latter, which can be converted as bytes('<any string of characters here>', 'utf8').

Hence we need to take an additional step in order to get to a working networked agent implementation: we need to write a function that converts our protocol messages to instances of class bytes. Since we decided to use JSON for message exchange between our agents, let's add JSON encoding to our design:

import unittest
from collections import namedtuple
from socketserver import UDPServer, BaseRequestHandler
import socket
import json

...

class TestAgent ( UDPServer ):

    ...

    def send ( self, msg, dst ):
        jsonencodedmsg = \
            type(msg).__name__ + ':' + json.dumps(msg.__dict__)
        return self.socket.sendto(bytes(jsonencodedmsg, 'utf8'), dst)

    ...

Snippet 3 - Networked Communicating Agent using JSON encoding

OK, it took just one line in the agent's send() method to enhance our agent (plus the corresponding import at the beginning). Why are we packing "type(msg).__name__ + ':'" ahead of the JSON encoding of our message instance? The answer is simple: when receiving the message, the receiving agent needs a way to tell the message apart the other messages in its protocol. It won't be able to do that just from the JSON encoding, hence we're pre-pending the encoded message with the message class name so the receiver knows which class it needs to instantiate to rebuild the message.

Now the test case runs, but fails. This indicates that we're still missing one last piece in the puzzle: we're sending JSON encoded messages over the network, but the receiving agent is not JSON-decoding the received message into a message instance that satisfies the assertEquals() check. Let's enhance our agent to do so:

import unittest
from collections import namedtuple
from socketserver import UDPServer, BaseRequestHandler
import socket
import json

...

class TestAgentHandler ( BaseRequestHandler ):
    def handle ( self ):
        jsonencodedmsg = self.request[0].strip()
        src = self.client_address
        msgname, msgbody = jsonencodedmsg.decode().split(':', 1)
        if msgname == "TestMsg":
            msg = TestAgent.TestMsg(**json.loads(msgbody))
            self.server.testMsgHandler(msg, src)
        else:
            pass    # ignore the message

...

Snippet 4 - Networked Communicating Agent using JSON encoding/decoding

With just a handful of new lines, in addition to JSON-decoding the received message I've added an 'if ...' clause to the TestAgentHandler class as a placeholder for future extensions of the protocol spoken by our agent.

This completes our first fully functional communicating agent. It's fairly simple to extend the protocol our agent handles - just add a new namedtuple sub-class per new message to the agent class, new handler method for each new namedtuple, and extend the 'if ...' clause inside the TestMsgHandler.handle() method to call the right handler method. I'll let the fun of doing so all for you.

This is how we'd go if we'd use a primitive language like C++ or Java. However, we're still far from reaching the goal we set for our solution above. Even if our agents do talk to each other over a network using JSON, changing the networking used implies changes in the agents' code (you already know that if you did the exercise of changing to TCPServer as networking support). Additionally, modifying or extending the protocol our agents talk requires code changes as well (as you know if you did the second exercise of extending the single-message protocol used in our example).

In the next post, we'll see how we can use advanced features of the Python language to solve those issues and reach an elegant, non-intrusive solution for writing communicating agents without the pain. Catch you all there!

Go middlewares for object-oriented programmers Go language (Golang, http://golang.org ) is a very simple procedural programming language of ...