Thursday, June 20, 2013

Writing communicating agents with no effort, step I

Introduction

If there's a pattern that arises over and over again in distributed computing, it is that of Communicating Agents. Communicating Agents are SW components (e.g. classes) that get instantiated at one or more computers interconnected by some kind of network, each instance able to live on its own but exchanging messages with other instances with the aim of reaching a common goal. For example, when the SW components being instantiated are processes at a single computer, we have Hoare's well-known CSP programming model.

Along the history of distributed computing we've seen multiple versions of the Communicating Agents pattern. From the early days of the Internet, where very simple deployments (typically client-server with a couple of servers and a handful of clients) were the norm, to these days where huge enterprise applications composed of hundreds of agents distributed across dozens of machines talk over a number of disparate networks.

Every seasoned programmer has met this pattern at least once in a life. Depending on the needs, context and tools at hand, one might decide to leverage existing distributed computing infrastructure like e.g. CORBA or EJB or simpler tools like e.g. Google's ProtocolBuffers; on the other hand, one might opt for developing own tools. This post deals with the latter case.

Our goal

We'll start by defining what we want to achieve. Say we are in the need of developing a distributed application built from multiple agents that cooperate by exchanging messages over some network linking the machines these agents run on. For the time being we're not concerned with run-time infrastructure, i.e. the middleware managing the agents' execution and life-cycle, so we will focus on achieving message definition and their exchange between our agents.

We have very little time and resources to do this so we'll be using Python as our rapid prototyping language, but we don't want to constrain ourselves to Python so we'd like our messages to be easily encoded/decoded using other languages as well. Thus we'll be using JSON as our encoding/decoding machinery of choice.

We want our message passing mechanism to be independent of the underlying inter-networking technology. We would also like to be able to change the inter-networking technology our agents use without impacting our agents' implementations.

We want our agents to be easily extensible so they can send and receive new messages. New -and changed- messages must have minimum to no impact on existing code.

Finally, we want our messages and message handling specifications to be defined in the code, so we don't have to resource to additional/external tools like IDL compilers, .proto files or the like.

We'll see that using advanced Python facilities like decorators and meta-classes achieving the above goals is quick and simple.  By writing a few lines of code we'll be able to quickly create classes implementing arbitrarily complex message protocols, using whatever communication means, and all this as easily as we'd write classes local to a single module or program.

Let me follow the TDD paradigm to reach our goal. In TDD, you write your tests before anything else, then along multiple iterations you write the code that eventually shall successfully pass those tests. Using PyUnit, we might write a test like the following:

import unittest

...

class TestAgent ( object ):
    def TestMsg ( *args, **kwargs ):
        pass
          
    def address ( self ):
        pass
             
    def send ( self, msg, target ):
        pass

    def testMsgHandler ( self, msg, src ):
        pass
               
    def __iter__ ( self ):
        return iter([])

...

class CommAgentTest(unittest.TestCase):
    def testCommAgent ( self ):
        testmsgs = [
            TestAgent.TestMsg(a=1, b='Hi '),
            TestAgent.TestMsg(a=2, b='there!'),
        ]
        agent1, agent2 = TestAgent(), TestAgent()
        for msg in testmsgs:
            agent1.send(msg, agent2.address())
        
        self.assertListEqual(testmsgs, list(agent2), "Lists not equal")

...

if __name__ == "__main__":

    unittest.main()

Snippet 0 - PyUnit test of a dummy agent class

The test above runs but fails, as expected from the first iteration of a TDD. Let's complete the TestAgent class so the test passes:

import unittest
from collections import namedtuple

...

class TestAgent ( object ):
    TestMsg = namedtuple('TestMsg', 'a,b')
          
    def __init__ ( self ):
        self.__rcvdmsgs = []

    def address ( self ):
        return self
             
    def send ( self, msg, target ):
        return target.testMsgHandler(msg, self)

    def testMsgHandler ( self, msg, src ):
        self.__rcvdmsgs.append(msg)
               
    def __iter__ ( self ):
        return iter(self.__rcvdmsgs)

...

class CommAgentTest(unittest.TestCase):
    def testCommAgent ( self ):
        testmsgs = [
            TestAgent.TestMsg(a=1, b='Hi '),
            TestAgent.TestMsg(a=2, b='there!'),
        ]
        agent1, agent2 = TestAgent(), TestAgent()
        for msg in testmsgs:
            agent1.send(msg, agent2.address())
        
        self.assertListEqual(testmsgs, list(agent2), "Lists not equal")

...

if __name__ == "__main__":
    unittest.main()

Snippet 1 - PyUnit test of a prototypical Communicating Agent

In the test above, we define an agent class (TestAgent) whose protocol is made of just one message (TestMsg). We've decided to implement messages as instances of class namedtuple, which is as close as you can get to a C struct in Python. Agent functionality comes down to storing received messages in a private list (self.__rcvdmsgs), and is implemented in method TestAgent.testMsgHandler(). The class supports the iterator protocol (the __iter__() method) so we can easily obtain the messages received by an instance of the class.

Then we instantiate two of those agents and send a pre-defined set of messages from the first agent to the second, after which we test the second agent's received messages list against the pre-defined message set.

The unit test above runs OK. However, it falls short of reaching our goal, for the following -otherwise obvious- reasons:
  • our agents are only able to talk to each other when they run on the same processor and within the same memory space, since our "network" is the function call stack
  • we can't change our "network" without modifying our agents' implementations
  • we can't exchange messages with agents written in Java (unless we build a Python-C-Java bridge or we run on Jython), and exchanging messages with agents written in C/C++ forces us to use the Python-C interface which is cumbersome when you're short of time
  • implementing complex protocols would be difficult to maintain, since each agent class needs to know the method names of other agents' classes that handle each message of the protocol
In the following sections we'll fix some of the problems of this prototypical implementation.

Enabling network communication between agents

In order to enable our agents to talk to remote agents over a network, we need some networking code. We might write it ourselves as part of a base CommAgent class, but why bother when we have the fancy socketserver module?.

To retro-fit our agent class with networking capabilities, all we need is inheriting from a class in the socketserver module, and providing a handler class that manages the messages received from remote agents.

Let's take a first stake at networking our agents following the approach above:

import unittest
from collections import namedtuple
from socketserver import UDPServer, BaseRequestHandler
import socket

...

class TestAgentHandler ( BaseRequestHandler ):
    def handle ( self ):
        msg = self.request[0].strip()
        src = self.client_address
        self.server.testMsgHandler(msg, src)

class TestAgent ( UDPServer ):
    TestMsg = namedtuple('TestMsg', 'a,b')
            
    def __init__ ( self, local_address ):
        super(TestAgent, self).__init__(local_address, TestAgentHandler)
        self.__rcvdmsgs = []
       
    def address ( self ):
        return self.server_address

    def send ( self, msg, dst ):
        return self.socket.sendto(msg, dst)

    def testMsgHandler ( self, msg, src ):
        self.__rcvdmsgs.append(msg)
                
    def __iter__ ( self ):
        return iter(self.__rcvdmsgs)

...

class CommAgentTest(unittest.TestCase):

    ...


    def testCommAgent ( self ):
        testmsgs = [
            TestAgent.TestMsg(a=1, b='Hi '),
            TestAgent.TestMsg(a=2, b='there!'),
        ]
        host = socket.gethostbyname(socket.gethostname())
        ports = (2013, 2014)
        agent1, agent2 = \
            TestAgent((host, ports[0])), TestAgent((host, ports[1]))
        [agent1.send(msg, agent2.address()) for msg in testmsgs]
        try:
            Timer(1, lambda: sleep(3) or agent2.shutdown()).start()
            agent2.serve_forever()            
            self.assertListEqual(testmsgs, list(agent2), "Lists not equal")
        finally:
            agent1.socket.close()
            agent2.socket.close()

...

if __name__ == "__main__":
    unittest.main()

Snippet 2 - First stake at a networked Communicating Agent

It didn't take much pain to network-enable our agent, dit it?. I chose to use UDP due to its ease of use, but using a TCPServer instead of UDPServer shouldn't be much harder (I'll leave this to you fellow readers as an exercise).

I had to enhance our unit test a bit. When real networking comes up on stage we need to consider threading issues. We can't run agent2's server loop with agent2.serve_forever() and later on cause the loop to end with agent2.shutdown() from within the same thread. Since our interpreter's main thread blocks on the server loop we need an additional thread that calls agent2.shutdown(), and that's what we get with the Timer class. We're scheduling execution of agent2.shutdown() after 4 seconds (1 second until timer thread start, to provide some time for preparations, and 3 seconds before the shutdown() method is actually called).

Language-independence (of messages)

Unfortunately, if you run the test case above you'll get an annoying exception when calling agent1's send() method: "'TestMsg' does not support the buffer interface".

What does that mean? If you check the doc for the socket class (just type "import socket; help(socket.socket)" at the Python interpreter's prompt), you'll see its sendto() method refers you to its send() method, which reads "sends a data string to the socket". This is ambiguous at the very least, but the key lies in the string word: what it actually means is whatever data you pass to the sendto() method must either be an instance of class bytes, or of some other class that can be converted to an instance of bytes somehow. Class string is one example of the latter, which can be converted as bytes('<any string of characters here>', 'utf8').

Hence we need to take an additional step in order to get to a working networked agent implementation: we need to write a function that converts our protocol messages to instances of class bytes. Since we decided to use JSON for message exchange between our agents, let's add JSON encoding to our design:

import unittest
from collections import namedtuple
from socketserver import UDPServer, BaseRequestHandler
import socket
import json

...

class TestAgent ( UDPServer ):

    ...

    def send ( self, msg, dst ):
        jsonencodedmsg = \
            type(msg).__name__ + ':' + json.dumps(msg.__dict__)
        return self.socket.sendto(bytes(jsonencodedmsg, 'utf8'), dst)

    ...

Snippet 3 - Networked Communicating Agent using JSON encoding

OK, it took just one line in the agent's send() method to enhance our agent (plus the corresponding import at the beginning). Why are we packing "type(msg).__name__ + ':'" ahead of the JSON encoding of our message instance? The answer is simple: when receiving the message, the receiving agent needs a way to tell the message apart the other messages in its protocol. It won't be able to do that just from the JSON encoding, hence we're pre-pending the encoded message with the message class name so the receiver knows which class it needs to instantiate to rebuild the message.

Now the test case runs, but fails. This indicates that we're still missing one last piece in the puzzle: we're sending JSON encoded messages over the network, but the receiving agent is not JSON-decoding the received message into a message instance that satisfies the assertEquals() check. Let's enhance our agent to do so:

import unittest
from collections import namedtuple
from socketserver import UDPServer, BaseRequestHandler
import socket
import json

...

class TestAgentHandler ( BaseRequestHandler ):
    def handle ( self ):
        jsonencodedmsg = self.request[0].strip()
        src = self.client_address
        msgname, msgbody = jsonencodedmsg.decode().split(':', 1)
        if msgname == "TestMsg":
            msg = TestAgent.TestMsg(**json.loads(msgbody))
            self.server.testMsgHandler(msg, src)
        else:
            pass    # ignore the message

...

Snippet 4 - Networked Communicating Agent using JSON encoding/decoding

With just a handful of new lines, in addition to JSON-decoding the received message I've added an 'if ...' clause to the TestAgentHandler class as a placeholder for future extensions of the protocol spoken by our agent.

This completes our first fully functional communicating agent. It's fairly simple to extend the protocol our agent handles - just add a new namedtuple sub-class per new message to the agent class, new handler method for each new namedtuple, and extend the 'if ...' clause inside the TestMsgHandler.handle() method to call the right handler method. I'll let the fun of doing so all for you.

This is how we'd go if we'd use a primitive language like C++ or Java. However, we're still far from reaching the goal we set for our solution above. Even if our agents do talk to each other over a network using JSON, changing the networking used implies changes in the agents' code (you already know that if you did the exercise of changing to TCPServer as networking support). Additionally, modifying or extending the protocol our agents talk requires code changes as well (as you know if you did the second exercise of extending the single-message protocol used in our example).

In the next post, we'll see how we can use advanced features of the Python language to solve those issues and reach an elegant, non-intrusive solution for writing communicating agents without the pain. Catch you all there!

Go middlewares for object-oriented programmers Go language (Golang, http://golang.org ) is a very simple procedural programming language of ...