6.005 — Software Construction
Fall 2014

Reading 20, Part 2: Message Passing with Threads & Queues

Goals of concurrent program design

Now is a good time to pop up a level and look at what we’re doing. Recall that our primary goals are to create software that is safe from bugs, easy to understand, and ready for change.

Building concurrent software is clearly a challenge for all three of these goals. We can break the issues into two general classes. When we ask whether a concurrent program is safe from bugs, we care about two properties:

Deadlocks threaten liveness. Liveness may also require fairness, which means that concurrent modules are given processing capacity to make progress on their computations. Fairness is mostly a matter for the operating system’s thread scheduler, but you can influence it (for good or for ill) by setting thread priorities.

Message passing with threads

We’ve previously talked about message passing between processes: clients and servers communicating over network sockets. We can also use message passing between threads within the same process, and this design is often preferable to a shared memory design with locks.

Use a synchronized queue for message passing between threads. The queue serves the same function as the buffered network communication channel in client/server message passing. Java provides the BlockingQueue interface for queues with blocking operations:

In an ordinary Queue:

A synchronized queue, according to the Java API documentation:

additionally supports operations that wait for the queue to become non-empty when retrieving an element, and wait for space to become available in the queue when storing an element.

producer-consumer message passing

Analogous to the client/server pattern for message passing over a network is the producer-consumer design pattern for message passing between threads. Producer threads and consumer threads share a synchronized queue. Producers put data or requests onto the queue, and consumers remove and process them. One or more producers and one or more consumers might all be adding and removing items from the same queue. This queue must be safe for concurrency.

Java provides two implementations of BlockingQueue:

Unlike the streams of bytes sent and received by sockets, these synchronized queues (like normal collections classes in Java) can hold objects of an arbitrary type. Instead of designing a wire protocol, we must choose or design a type for messages in the queue. And just as we did with operations on a threadsafe ADT or messages in a wire protocol, we must design our messages here to prevent race conditions and enable clients to perform the atomic operations they need.

Bank account example

message passing model for bank accounts

Our first example of message passing was the bank account example.

Each cash machine and each account is its own module, and modules interact by sending messages to one another. Incoming messages arrive on a queue.

We designed messages for get-balance and withdraw, and said that each cash machine checks the account balance before withdrawing to prevent overdrafts:

get-balance
if balance >= 1 then withdraw 1

But it is still possible to interleave messages from two cash machines so they are both fooled into thinking they can safely withdraw the last dollar from an account with only $1 in it.

We need to choose a better atomic operation: withdraw-if-sufficient-funds would be a better operation than just withdraw.

Implementing message passing with queues

You can see all the code for this example on GitHub: squarer example. All the relevant parts are excerpted below.

Here’s a message passing module for squaring integers:

/** Squares integers. */
public class Squarer {

    private final BlockingQueue<Integer> in;
    private final BlockingQueue<SquareResult> out;
    // Rep invariant: in, out != null

    /** Make a new squarer.
     *  @param requests queue to receive requests from
     *  @param replies queue to send replies to */
    public Squarer(BlockingQueue<Integer> requests,
                   BlockingQueue<SquareResult> replies) {
        this.in = requests;
        this.out = replies;
    }

    /** Start handling squaring requests. */
    public void start() {
        new Thread(new Runnable() {
            public void run() {
                while (true) {
                    // TODO: we may want a way to stop the thread
                    try {
                        // block until a request arrives
                        int x = in.take();
                        // compute the answer and send it back
                        int y = x * x;
                        out.put(new SquareResult(x, y));
                    } catch (InterruptedException ie) {
                        ie.printStackTrace();
                    }
                }
            }
        }).start();
    }
}

Incoming messages to the Squarer are integers; the squarer knows that its job is to square those numbers, so no further details are required.

Outgoing messages are instances of SquareResult:

/** A squaring result message. */
public class SquareResult {
    private final int input;
    private final int output;

    /** Make a new result message.
     *  @param input input number
     *  @param output square of input */
    public SquareResult(int input, int output) {
        this.input = input;
        this.output = output;
    }

    @Override public String toString() {
        return input + "^2 = " + output;
    }
}

We would probably add additional observers to SquareResult so clients can retrieve the input number and output result.

Finally, here’s a main method that uses the squarer:

public static void main(String[] args) {

    BlockingQueue<Integer> requests = new LinkedBlockingQueue<>();
    BlockingQueue<SquareResult> replies = new LinkedBlockingQueue<>();

    Squarer squarer = new Squarer(requests, replies);
    squarer.start();

    try {
        // make a request
        requests.put(42);
        // ... maybe do something concurrently ...
        // read the reply
        System.out.println(replies.take());
    } catch (InterruptedException ie) {
        ie.printStackTrace();
    }
}

It should not surprise us that this code has a very similar flavor to the code for implementing message passing with sockets.

Stopping

What if we want to shut down the Squarer so it is no longer waiting for new inputs? In the client/server model, if we want the client or server to stop listening for our messages, we close the socket. And if we want the client or server to stop altogether, we can quit that process. But here, the squarer is just another thread in the same process, and we can’t “close” a queue.

One strategy is a poison pill: a special message on the queue that signals the consumer of that message to end its work. To shut down the squarer, since its input messages are merely integers, we would have to choose a magic poison integer (everyone knows the square of 0 is 0 right? no one will need to ask for the square of 0…) or use null (don’t use null). Instead, we might change the type of elements on the requests queue to an ADT:

SquareRequest = IntegerRequest + StopRequest

with operations:

input : SquareRequest → int
shouldStop : SquareRequest → boolean

and when we want to stop the squarer, we enqueue a StopRequest where shouldStop returns true.

For example, in Squarer.start():

    public void run() {
        while (true) {
            try {
                // block until a request arrives
                SquareRequest req = in.take();
                // see if we should stop
                if (req.shouldStop()) { break; }
                // compute the answer and send it back
                int x = req.input();
                int y = x * x;
                out.put(new SquareResult(x, y));
            } catch (InterruptedException ie) {
                ie.printStackTrace();
            }
        }
    }

It is also possible to interrupt a thread by calling its interrupt() method. If the thread is blocked waiting, the method it’s blocked in will throw an InterruptedException (that’s why we have to try-catch that exception almost any time we call a blocking method). If the thread was not blocked, an interrupted flag will be set. The thread must check for this flag to see whether it should stop working. For example:

    public void run() {
        // handle requests until we are interrupted
        while ( ! Thread.interrupted()) {
            try {
                // block until a request arrives
                int x = in.take();
                // compute the answer and send it back
                int y = x * x;
                out.put(new SquareResult(x, y));
            } catch (InterruptedException ie) {
                // stop
                break;
            }
        }
    }

Thread safety arguments with message passing

A thread safety argument with message passing might rely on:

Message passing deadlock

When buffers fill up, message passing systems can experience deadlock.

In a deadlock, two (or more) concurrent modules are both blocked waiting for each other to do something. Since they’re blocked, no module will be able to make anything happen, and none of them will break the deadlock.

In general, in a system of multiple concurrent modules communicating with each other, we can imagine drawing a graph in which the nodes of the graph are modules, and there’s an edge from A to B if module A is blocked waiting for module B to do something. The system is deadlocked if at some point in time, there is a cycle in this graph. The simplest case is the two-node deadlock, A → B and B → A, but more complex systems can achieve (not that they’ll be happy about it) larger deadlocks.

A message passing system in deadlock appears to simply hang, just the way a deadlock with locks does.

Example: network square server

Let’s see an example of message passing deadlock. We’ll use a network version of the square server, so you can compare their implementations. All you need to pay attention to is the spec of the server: the messages it receives and sends.

/**
 * SquareServer is a server that squares integers passed to it.
 * It accepts requests of the form:
 *      Request ::= Number "\n"
 *      Number ::= [0-9]+
 * and for each request, returns a reply of the form:
 *      Reply ::= (Number | "err") "\n"
 * where a Number is the square of the request number,
 * or "err" is used to indicate a misformatted request.
 * SquareServer can handle only one client at a time.
 */
public class SquareServer {
    /** Default port number where the server listens for connections. */
    public static final int SQUARE_PORT = 4949;

    private ServerSocket serverSocket;
    // Rep invariant: serverSocket != null

    /** Make a SquareServer that listens for connections on port.
     *  @param port port number, requires 0 <= port <= 65535 */
    public SquareServer(int port) throws IOException {
        serverSocket = new ServerSocket(port);
    }

    /** Run the server, listening for connections and handling them.
     *  @throws IOException if the main server socket is broken */
    public void serve() throws IOException {
        while (true) {
            // block until a client connects
            Socket socket = serverSocket.accept();
            try {
                handle(socket);
            } catch (IOException ioe) {
                ioe.printStackTrace(); // but don't terminate serve()
            } finally {
                socket.close();
            }
        }
    }

    /** Handle one client connection. Returns when client disconnects.
     *  @param socket socket where client is connected
     *  @throws IOException if connection encounters an error */
    private void handle(Socket socket) throws IOException {
        System.err.println("client connected");

        // get the socket's input stream, and wrap converters around it
        // that convert it from a byte stream to a character stream,
        // and that buffer it so that we can read a line at a time
        BufferedReader in = new BufferedReader(new InputStreamReader(
                                    socket.getInputStream()));

        // similarly, wrap character=>bytestream converter around the
        // socket output stream, and wrap a PrintWriter around that so
        // that we have more convenient ways to write Java primitive
        // types to it.
        PrintWriter out = new PrintWriter(new OutputStreamWriter(
                                  socket.getOutputStream()));

        try {
            // each request is a single line containing a number
            for (String line = in.readLine(); line != null; line = in.readLine()) {
                System.err.println("request: " + line);
                try {
                    int x = Integer.valueOf(line);
                    // compute answer and send back to client
                    int y = x * x;
                    System.err.println("reply: " + y);
                    out.print(y + "\n");
                } catch (NumberFormatException e) {
                    // complain about ill-formatted request
                    System.err.println("reply: err");
                    out.println("err");
                }
                // important! flush our buffer so the reply is sent
                out.flush();
            }
        } finally {
            out.close();
            in.close();
        }
    }

    /** Start a SquareServer running on the default port. */
    public static void main(String[] args) {
        try {
            SquareServer server = new SquareServer(SQUARE_PORT);
            server.serve();
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
}

Here’s a client for our number-squaring protocol; of course we could also talk to the server in a program like Telnet:

/**
 * SquareClient is a client that sends requests to the SquareServer
 * and interprets its replies.
 * A new SquareClient is "open" until the close() method is called,
 * at which point it is "closed" and may not be used further.
 */
public class SquareClient {
    private Socket socket;
    private BufferedReader in;
    private PrintWriter out;
    // Rep invariant: socket, in, out != null

    /** Make a SquareClient and connect it to a server running on
     *  hostname at the specified port.
     *  @throws IOException if can't connect */
    public SquareClient(String hostname, int port) throws IOException {
        socket = new Socket(hostname, port);
        in = new BufferedReader(new InputStreamReader(
                     socket.getInputStream()));
        out = new PrintWriter(new OutputStreamWriter(
                      socket.getOutputStream()));
    }

    /** Send a request to the server. Requires this is "open".
     *  @param x number to square
     *  @throws IOException if network or server failure */
    public void sendRequest(int x) throws IOException {
        out.print(x + "\n");
        out.flush(); // important! make sure x actually gets sent
    }

    /** Get a reply from the next request that was submitted.
     *  Requires this is "open". 
     *  @return square of requested number
     *  @throws IOException if network or server failure */
    public int getReply() throws IOException {
        String reply = in.readLine();
        if (reply == null) {
            throw new IOException("connection terminated unexpectedly");
        }

        try {
            return Integer.valueOf(reply);
        } catch (NumberFormatException nfe) {
            throw new IOException("misformatted reply: " + reply);
        }
    }

    /** Closes the client's connection to the server.
     *  This client is now "closed". Requires this is "open".
     *  @throws IOException if close fails */
    public void close() throws IOException {
        in.close();
        out.close();
        socket.close();
    }
}

Finally, here’s the broken part: a program that uses SquareClient to talk to the SquareServer:

    private static final int N = 100;

    /**  Use a SquareServer to square all the integers from 1 to N. */
    public static void main(String[] args) throws IOException {
        SquareClient client = new SquareClient("localhost",
                                               SquareServer.SQUARE_PORT);
        // send the requests to square 1...N
        for (int x = 1; x <= N; ++x) {
            client.sendRequest(x);
            System.out.println(x + "^2 = ?");
        }
        // collect the replies
        for (int x = 1; x <= N; ++x) {
            int y = client.getReply();
            System.out.println(x + "^2 = " + y);
        }
        client.close();
    }

Can you smell danger in the air?

As N grows larger and larger, our client is making many requests to the SquareServer without reading any replies. Eventually the client’s receive buffer will fill up with unread replies. Then the server’s sending buffer will fill up, since data can’t be sent successfully. Finally, one of its calls to out.print will block, waiting for more buffer space — and we have our deadly embrace. The server is waiting for the client to read some replies, but the client is waiting for the server to accept its requests. Deadlock.

Final suggestions for preventing deadlock

One solution to deadlock is to design the system so that there is no possibility of a cycle — so that if A is waiting for B, it cannot be that B was already (or will start) waiting for A.

Another approach to deadlock is timeouts. If a module has been blocked for too long (maybe 100 milliseconds? or 10 seconds? how to decide?), then you stop blocking and throw an exception. Now the problem becomes: what do you do when that exception is thrown?

Next: Concurrency in practice, and a summary