6.031TS
6.031TS — Software Construction
TypeScript Pilot — Spring 2021

Reading 23: Message-Passing

Software in 6.031

Safe from bugsEasy to understandReady for change
Correct today and correct in the unknown future. Communicating clearly with future programmers, including future you. Designed to accommodate change without rewriting.

Objectives

After reading the notes and examining the code for this class, you should be able to use message passing instead of shared memory for communication between TypeScript workers and Java threads.

Two models for concurrency

In our introduction to concurrency, we saw two models for concurrent programming: shared memory and message passing.

  • shared memory

    In the shared memory model, concurrent modules interact by reading and writing shared mutable objects in memory. Creating multiple threads inside a single Java process is our primary example of shared-memory concurrency.

  • message passing

    In the message passing model, concurrent modules interact by sending immutable messages to one another over a communication channel. That communication channel might connect different computers over a network, as in some of our initial examples: web browsing, instant messaging, etc.

The message passing model has several advantages over the shared memory model, which boil down to greater safety from bugs. In message-passing, concurrent modules interact explicitly, by passing messages through the communication channel, rather than implicitly through mutation of shared data. The implicit interaction of shared memory can too easily lead to inadvertent interaction, sharing and manipulating data in parts of the program that don’t know they’re concurrent and aren’t cooperating properly in the concurrency safety strategy. Message passing also shares only immutable objects (the messages) between modules, whereas shared memory requires sharing mutable objects, which even in non-concurrency programming can be a source of bugs.

We’ll discuss in this reading how to implement message passing in two contexts:

  • between Java threads
  • between TypeScript workers

We will start with Java, because it makes the mechanism of message passing more explicit. We’ll use blocking queues to implement message passing between Java threads. Some of the operations of a blocking queue are blocking in the sense that calling the operation blocks the progress of the thread until the operation can return a result. Blocking makes writing code easier, but it also means we must continue to contend with bugs that cause deadlock.

For message passing between TypeScript workers, we’ll use a channel abstraction for sending and receiving messages. Instead of using blocking operations, we will provide a callback function to receive messages.

In the next concurrency reading we’ll see how to implement message passing between client/server processes over the network.

Message passing between threads

We saw in Mutual Exclusion that a thread blocks trying to acquire a lock until the lock has been released by its current owner. Blocking means that a thread waits (without doing further work) until an event occurs. We can use this term to describe methods and method calls: if a method is a blocking method, then a call to that method can block, waiting until some event occurs before it returns to the caller.

We can use a queue with blocking operations for message passing between Java threads — and the buffered network communication channel in client/server message passing will work the same way. Java provides the BlockingQueue interface for queues with blocking operations.

In an ordinary Queue:

  • add(e) adds element e to the end of the queue.
  • remove() removes and returns the element at the head of the queue, or throws an exception if the queue is empty.

A BlockingQueue extends this interface:

additionally supports operations that wait for the queue to become non-empty when retrieving an element, and wait for space to become available in the queue when storing an element.

  • put(e) blocks until it can add element e to the end of the queue (if the queue does not have a size bound, put will not block).
  • take() blocks until it can remove and return the element at the head of the queue, waiting until the queue is non-empty.

When you are using a BlockingQueue for message passing between threads, make sure to use the put() and take() operations, not add() and remove().

producer-consumer message passing

We will implement the producer-consumer design pattern for message passing between threads. Producer threads and consumer threads share a synchronized queue. Producers put data or requests onto the queue, and consumers remove and process them. One or more producers and one or more consumers might all be adding and removing items from the same queue. This queue must be safe for concurrency.

Java provides two implementations of BlockingQueue:

  • ArrayBlockingQueue is a fixed-size queue that uses an array representation. Using put to add an item to the queue will block if the queue is full.
  • LinkedBlockingQueue is a growable queue using a linked-list representation. If no maximum capacity is specified, the queue will never fill up, so put will never block.

Like other collections classes in Java, these synchronized queues can hold objects of an arbitrary type. We must choose or design a type for messages in the queue: we will choose an immutable type because our goal in using message passing is to avoid the problems of shared memory. Producers and consumers will communicate only by sending and receiving messages, and there will be no opportunity for (mis)communication by mutating an aliased message object.

And just as we designed the operations on a threadsafe ADT to prevent race conditions and enable clients to perform the atomic operations they need, we will design our message objects with those same requirements.

Message passing example

You can see all the code for this example. The parts relevant to discussion are excerpted below.

Here’s a message passing module that represents a refrigerator:

DrinksFridge.java
/**
 * A mutable type representing a refrigerator containing drinks.
 */
public class DrinksFridge {

    private int drinksInFridge;
    private final BlockingQueue<Integer> in;
    private final BlockingQueue<FridgeResult> out;

    // Abstraction function:
    //   AF(drinksInFridge, in, out) = a refrigerator containing `drinksInFridge` drinks
    //                                 that takes requests from `in` and sends replies to `out`
    // Rep invariant:
    //   drinksInFridge >= 0

    /**
     * Make a DrinksFridge that will listen for requests and generate replies.
     * 
     * @param requests queue to receive requests from
     * @param replies queue to send replies to
     */
    public DrinksFridge(BlockingQueue<Integer> requests, BlockingQueue<FridgeResult> replies) {
        this.drinksInFridge = 0;
        this.in = requests;
        this.out = replies;
        checkRep();
    }

    ...
}

The module has a start method that creates an internal thread to service requests on its input queue:

DrinksFridge.start()
/**
 * Start handling drink requests.
 */
public void start() {
    new Thread(new Runnable() {
        public void run() {
            while (true) {
                try {
                    // block until a request arrives
                    int n = in.take();
                    FridgeResult result = handleDrinkRequest(n);
                    out.put(result);
                } catch (InterruptedException ie) {
                    ie.printStackTrace();
                }
            }
        }
    }).start();
}

Incoming messages to the DrinksFridge are integers, representing a number of drinks to take from (or add to) the fridge:

DrinksFridge.handleDrinkRequest()
/**
 * Take (or add) drinks from the fridge.
 * @param n if >= 0, removes up to n drinks from the fridge;
 *          if < 0, adds -n drinks to the fridge.
 * @return FridgeResult reporting how many drinks were actually added or removed
 *      and how many drinks are left in the fridge. 
 */
private FridgeResult handleDrinkRequest(int n) {
    // adjust request to reflect actual drinks available
    int change = Math.min(n, drinksInFridge);
    drinksInFridge -= change;
    checkRep();
    return new FridgeResult(change, drinksInFridge);
}

Outgoing messages are instances of FridgeResult:

FridgeResult.java
/**
 * A threadsafe immutable message describing the result of taking or adding drinks to a DrinksFridge.
 */
public class FridgeResult {
    private final int drinksTakenOrAdded;
    private final int drinksLeftInFridge;
    // Rep invariant: 
    //   TODO

    /**
     * Make a new result message.
     * @param drinksTakenOrAdded (precondition? TODO)
     * @param drinksLeftInFridge (precondition? TODO)
     */
    public FridgeResult(int drinksTakenOrAdded, int drinksLeftInFridge) {
        this.drinksTakenOrAdded = drinksTakenOrAdded;
        this.drinksLeftInFridge = drinksLeftInFridge;
    }

    // TODO: we will want more observers, but for now...

    @Override public String toString() {
        return (drinksTakenOrAdded >= 0 ? "you took " : "you put in ") 
                + Math.abs(drinksTakenOrAdded) + " drinks, fridge has " 
                + drinksLeftInFridge + " left";
    }
}

We would probably add additional observers to FridgeResult so clients can retrieve the values.

Finally, here’s a main method that loads the fridge:

LoadFridge.main()
public static void main(String[] args) {

    BlockingQueue<Integer> requests = new LinkedBlockingQueue<>();
    BlockingQueue<FridgeResult> replies = new LinkedBlockingQueue<>();

    // start an empty fridge
    DrinksFridge fridge = new DrinksFridge(0, requests, replies);
    fridge.start();

    try {
        // deliver some drinks to the fridge
        requests.put(-42);

        // maybe do something concurrently...

        // read the reply
        System.out.println(replies.take());
    } catch (InterruptedException ie) {
        ie.printStackTrace();
    }
    System.out.println("done");
    System.exit(0); // ends the program, including DrinksFridge
}

reading exercises

Rep invariant

Write the rep invariant of FridgeResult, as an expression that could be used in checkRep() below. Use the minimum number of characters you can without any method calls in your answer.

private void checkRep() {
  assert REP_INVARIANT;
}

(missing explanation)

Code review

Evaluate the following code review comments on the code above:

“The DrinksFridge constructor shouldn’t be putting references to the two queues directly into its rep; it should make defensive copies.”

(missing explanation)

DrinksFridge.start() has an infinite loop in it, so the thread will never stop until the whole process is stopped.”

(missing explanation)

DrinksFridge can have only one client using it, because if multiple clients put requests in its input queue, their results will get mixed up in the result queue.”

(missing explanation)

Message passing between workers

Now that we’ve seen how message passing works in Java, using blocking queues, let’s turn our attention to TypeScript workers. We’ll focus on the Node worker_thread library. Web Workers in the browser are slightly different in detail, but the basic operations of message-passing exist in that context too.

When a worker is created, it comes with a two-way communication channel, the ends of which are called message ports. One end of the channel is accessible to the code that created the worker, as the port instance variable of the Worker object:

import { Worker } from 'worker_threads';

const worker = new Worker('./hello.js');
worker.port.postMessage('hello!'); // sends a message to the worker

The other end of the channel is accessible to the worker, as a global object parentPort from the worker_threads module:

// hello.ts
import { parentPort } from 'worker_threads';

parentPort.postMessage('bonjour!'); // sends a message back to parent

These examples are sending simple strings as messages, and neither side is actually receiving the message yet. Let’s address each of those problems in turn.

Message types

Like Java blocking queues, message ports can in general carry a wide variety of types, including arrays, maps, sets, and record types. One important restriction, however, is that they cannot be instances of user-defined classes, because the method code will not be passed over the channel.

A common pattern is to use a record type to represent a message, such as this message we already saw in the Java example:

type FridgeResult = {
  drinksTakenOrAdded: number,
  drinksLeftInFridge: number
};

When different kinds of messages might need to be sent over the channel, we can use a discriminated union to bring them together:

type DepositRequest =  { name:'deposit', amount:number };
type WithdrawRequest = { name:'withdrawal', amount:number };
type BalanceRequest =  { name:'balance' };
type BankRequest = DepositRequest | WithdrawRequest | BalanceRequest;

Here, the name field is a literal type that distinguishes between the three different kinds of requests we can make to the bank, and the rest of the fields of each object type might vary depending on the information needed for each kind of request.

Receiving messages

To receive incoming messages from a message port, we provide an event listener: a function that will be called whenever a message arrives.

For example, here’s how the worker listens for a message from its parent:

import { parentPort } from 'worker_threads';

parentPort.addListener('message', function(greeting:string) {
  console.log('received a message', greeting);
});

When the parent calls worker.port.postMessage('hello!'), the worker would receive this message as a call to the anonymous function with greeting set to "hello!".

This listener function is an example of a general design pattern, a callback. A callback is a function that a client provides to a module for the module to call. This is in contrast to normal control flow, in which the client is doing all the calling: calling down into functions that the module provides. With a callback, the client is providing a piece of code for the implementer to call.

Here’s one analogy for thinking about this idea. Normal function calling is like picking up the phone and calling a service, like calling your bank to find out the balance of your account. You give the information that the bank operator needs to look up your account, they read back the account balance to you over the phone, and you hang up. You are the client, and the bank is the module that you’re calling into.

Sometimes the bank is slow to give an answer. You’re put on hold, and you wait until they figure out the answer for you. This is like a synchronous function call that blocks until it is ready to return.

But sometimes the task may take so long that the bank doesn’t want to put you on hold. Then the bank will ask you for a callback phone number, and they will promise to call you back with the answer. This is analogous to providing a callback function.

Callback functions can be used like promises, as a way to defer returning the result of a function call. In fact, older versions of JavaScript only had callback functions for this purpose; promises are recent addition to the language, so some old asynchronous APIs still only have callback functions. One conceptual difference between promises and callbacks is who has the initiative. Using a promise, it’s up to the caller to eventually await the promise to get the result of the asynchronous function. With a callback, the asynchronous function is responsible for calling the callback, and the original caller doesn’t actually know when (or if) that might happen.

Callback functions are a more general mechanism than promises. The kind of callback used in the event listener just above is not an answer to a one-time request like your account balance. It’s more like a regular service that the bank is promising to provide, using your callback number as needed to reach you. A better analogy for an event listener might be account fraud protection, where the bank calls you on the phone whenever a suspicious transaction occurs on your account.

We will see more examples of callbacks in the next few readings: for example, for handling input events in a graphical user interface, and for handling requests in a web server.

TypeScript message passing example

Let’s put this together into an implementation of DrinksFridge for TypeScript, communicating through a message port rather than blocking queues:

/**
 * A mutable type representing a refrigerator containing drinks.
 */
class DrinksFridge {

    private drinksInFridge:number = 0;

    // Abstraction function:
    //   AF(drinksInFridge, port) = a refrigerator containing `drinksInFridge` drinks
    //                                 that takes requests from and sends replies to `port`
    // Rep invariant:
    //   drinksInFridge >= 0

    /**
     * Make a DrinksFridge that will listen for requests and generate replies.
     * 
     * @param port port to receive requests from and send replies to
     */
    public constructor(
        private readonly port: MessagePort
    ) {
        this.checkRep();
    }

    ...
}

The fridge has a start method that starts listening for incoming messages, and forwards them to a handler method:

// Start handling drink requests.
public start():void {
    this.port.addListener('message', (n:number) => {
        const reply:FridgeResult = this.handleDrinkRequest(n);
        this.port.postMessage(reply);            
    });
}

Incoming messages are integers representing a number of drinks to add or remove, as in the Java implementation:

private handleDrinkRequest(n:number):FridgeResult {
    const change = Math.min(n, this.drinksInFridge);
    this.drinksInFridge -= change;
    this.checkRep();
    return { drinksTakenOrAdded: change, drinksLeftInFridge: this.drinksInFridge };
}

Outgoing messages are instances of FridgeResult, a simple record type:

// Record type used for DrinksFridge's reply messages.
type FridgeResult = {
    drinksTakenOrAdded: number,
    drinksLeftInFridge: number 
};

Race conditions

In a previous reading, we saw in the bank account example that message-passing doesn’t eliminate the possibility of race conditions. It’s still possible for concurrent message-passing processes to interleave their work in bad ways.

This particularly happens when a client must send multiple messages to the module to do what it needs, because those messages (and the client’s processing of their responses) may interleave with messages sent by other clients. The message protocol for DrinksFridge has been carefully designed to manage some of this interleaving, but there are still situations where a race condition can arise. The next exercises explore this problem.

reading exercises

Breaking the rep invariant?

Suppose the DrinksFridge has only 2 drinks left, and two very thirsty people send it requests, each asking for 3 drinks. Which of these outcomes is possible, after both messages have been processed by the fridge? Reread the code for start() and handleDrinkRequest() to remind yourself how the fridge works.

(missing explanation)

Racing for a drink

Suppose the DrinksFridge still has only 2 drinks left, and three people would each like a drink. But they are all more polite than they are thirsty – none of them wants to take the last drink and leave the fridge empty.

So all three people run an algorithm that might be characterized as “LOOK before you TAKE”:

  • LOOK: request 0 drinks, just to see how many drinks are left in the fridge without taking any
  • if the response shows the fridge has more than 1 drink left, then:
    • TAKE: request 1 drink from the fridge
  • otherwise go away without a drink

Which of these outcomes is possible, after all three people run their LOOK-TAKE algorithm and all their messages have been processed by the fridge?

(missing explanation)

Deadlock

The blocking behavior of blocking queues is very convenient for programming, but blocking also introduces the possibility of deadlock. In a deadlock, two (or more) concurrent modules are both blocked waiting for each other to do something. Since they’re blocked, no module will be able to make anything happen, and none of them will break the deadlock.

In general, in a system of multiple concurrent modules communicating with each other, we can imagine drawing a graph in which the nodes of the graph are modules, and there’s an edge from A to B if module A is blocked waiting for module B to do something. The system is deadlocked if at some point in time, there is a cycle in this graph. The simplest case is the two-node deadlock, A → B and B → A, but more complex systems can encounter larger deadlocks.

Deadlock is much more common with locks than with message-passing — but when the message-passing queues have limited capacity, and become filled up to that capacity with messages, then even a message-passing system can experience deadlock. A message passing system in deadlock appears to simply get stuck.

Let’s see an example of message-passing deadlock, using the same DrinksFridge we’ve been using so far. This time, instead of using LinkedBlockingQueues that can grow arbitrarily (limited only by the size of memory), we will use the ArrayBlockingQueue implementation that has a fixed capacity:

private static final int QUEUE_SIZE = 100;
...
    // make request and reply queues big enough to hold QUEUE_SIZE messages each
    BlockingQueue<Integer> requests = new ArrayBlockingQueue<>(QUEUE_SIZE);
    BlockingQueue<FridgeResult> replies = new ArrayBlockingQueue<>(QUEUE_SIZE);

Many message-passing systems use fixed-capacity queues for performance reasons, so this is a common situation.

Finally, to create the conditions needed for deadlock, the client code will make N requests, each asking for a drink, before checking for any of the replies from DrinksFridge. Here is the full code:

ManyThirstyPeople.java
private static final int QUEUE_SIZE = 100;
private static final int N = 100;

/**  Send N thirsty people to the DrinksFridge. */
public static void main(String[] args) throws IOException {
    // make request and reply queues big enough to hold QUEUE_SIZE messages each
    BlockingQueue<Integer> requests = new ArrayBlockingQueue<>(QUEUE_SIZE);
    BlockingQueue<FridgeResult> replies = new ArrayBlockingQueue<>(QUEUE_SIZE);

    DrinksFridge fridge = new DrinksFridge(requests, replies);
    fridge.start();

    try {
        // put enough drinks in the fridge to start
        requests.put(-N);
        System.out.println(replies.take());

        // send the requests
        for (int x = 1; x <= N; ++x) {
            requests.put(1); // give me 1 drink!
            System.out.println("person #" + x + " is looking for a drink");
        }
        // collect the replies
        for (int x = 1; x <= N; ++x) {
            System.out.println("person #" + x + ": " + replies.take());
        }
    } catch (InterruptedException ie) {
        ie.printStackTrace();
    }
    System.out.println("done");
    System.exit(0); // ends the program, including DrinksFridge thread
}

It turns out with N=100 and QUEUE_SIZE=100, the code above works and doesn’t reach a deadlock. But notice that our client is making N requests before reading any replies. If N is larger than QUEUE_SIZE, the replies queue fills up with unread replies. Then DrinksFridge blocks trying to put one more reply into that queue, and it stops calling take on the requests queue. The client can continue putting more requests into the requests queue, but only up to the size of that queue. If there are more additional requests than can fit in that queue – i.e., when N is greater than 2×QUEUE_SIZE – then the client’s call to requests.put() will block too. And now we have our deadly embrace. DrinksFridge is waiting for the client to read some replies and free up space on the replies queue, but the client is waiting for DrinksFridge to accept some requests and free up space on the requests queue. Deadlock.

Final suggestions for preventing deadlock

One solution to deadlock is to design the system so that there is no possibility of a cycle — so that if A is waiting for B, it cannot be that B was already (or will start) waiting for A.

Another approach to deadlock is timeouts. If a module has been blocked for too long (maybe 100 milliseconds? or 10 seconds? how to decide?), then you stop blocking and throw an exception. Now the problem becomes: what do you do when that exception is thrown?

Summary

  • Rather than synchronize with locks, message passing systems synchronize on a shared communication channel, e.g. a stream or a queue.

  • Threads communicating with blocking queues is a useful pattern for message passing within a single process.