Reading 24: Message-Passing
Software in 6.031
Objectives
After reading the notes and examining the code for this class, you should be able to use message passing instead of shared memory for communication between TypeScript workers, and understand how message passing works with a blocking queue.
Two models for concurrency
In our introduction to concurrency, we saw two models for concurrent programming: shared memory and message passing.
-
In the message passing model, concurrent modules interact by sending immutable messages to one another over a communication channel. That communication channel might connect different computers over a network, as in some of our initial examples: web browsing, instant messaging, etc.
The message passing model has several advantages over the shared memory model, which boil down to greater safety from bugs. In message-passing, concurrent modules interact explicitly, by passing messages through the communication channel, rather than implicitly through mutation of shared data. The implicit interaction of shared memory can too easily lead to inadvertent interaction, sharing and manipulating data in parts of the program that don’t know they’re concurrent and aren’t cooperating properly in the concurrency safety strategy. Message passing also shares only immutable objects (the messages) between modules, whereas shared memory requires sharing mutable objects, which even in non-concurrency programming can be a source of bugs.
We’ll discuss in this reading how to implement message passing in two contexts:
We will start with Python, because it makes the mechanism of message passing more explicit. We’ll use blocking queues to implement message passing between Python threads. Some of the operations of a blocking queue are blocking in the sense that calling the operation blocks the progress of the thread until the operation can return a result. Blocking makes writing code easier, but it also means we must continue to contend with bugs that cause deadlock.
For message passing between TypeScript workers, we’ll use a channel abstraction for sending and receiving messages. Instead of using blocking operations, we will provide a callback function to receive messages.
In the next concurrency reading we’ll see how to implement message passing between client/server processes over the network.
Message passing between threads
Recall that Python threads use shared memory. Our goal will be to share as little as possible: the only shared mutable objects will be one or more queues for messages. In particular, we’ll use queues with blocking operations – let’s understand what that means.
The reading on promises introduced readFile
, a Node library function to read a file and return its contents in a Promise<string>
.
This asynchronous specification for reading a file has the advantage that if the file is large or the disk is slow, and our program has other work it can do in the meantime, that work can happen concurrently.
Before learning about readFile
, we used a different library function: readFileSync
, which returns a string
synchronously.
A Worker
or main Node process that runs readFileSync
must wait, without doing further work, until the entire file is read into memory and the function call returns.
The JavaScript thread is blocked, and the function is called a blocking function.
In a single-threaded JavaScript process, nothing else happens while our code runs a blocking function. In multi-threaded Python, while one thread is blocked, other threads can execute instead.
We can use a queue with blocking operations for message passing between threads — and as we will see, the buffered network communication channel in client/server message passing will work the same way.
Python provides the queue.Queue
class for queues with blocking operations.
In an ordinary queue ADT, we might have operations:
add(e)
adds elemente
to the end of the queue.remove()
removes and returns the element at the head of the queue, or throws an exception if the queue is empty.
You may have used an ordinary Python list as a queue, e.g. by calling append(..)
for add and pop()
for remove.
Or, if we use a TypeScript array as a queue, we might call push(..)
for add and shift()
for remove.
(Using push
with pop()
would give us a last-in-first-out queue, instead of a first-in-first-out queue.)
The blocking Queue
ADT provides similar operations that block:
put(e)
blocks until it can add elemente
to the end of the queue (if the queue does not have a size bound,put
will not block).get()
blocks until it can remove and return the element at the head of the queue, waiting until the queue is non-empty.
We will implement the producer-consumer design pattern for message passing between threads. Producer threads and consumer threads share a synchronized queue. Producers put data or requests onto the queue, and consumers remove and process them. One or more producers and one or more consumers might all be adding and removing items from the same queue. This queue must be safe for concurrency.
We must also choose or design a type for messages in the queue: we will choose an immutable type because our goal in using message passing is to avoid the problems of shared memory. Producers and consumers will communicate only by sending and receiving messages, and there will be no opportunity for (mis)communication by mutating an aliased message object.
And we will need to design our messages to prevent race conditions and enable clients to perform the atomic operations they need.
Message passing example
You can see all the code for this example. The parts relevant to discussion are excerpted below.
Here’s a message passing module that represents a refrigerator:
drinksfridge.py
class DrinksFridge:
'''
A mutable type representing a refrigerator containing drinks, using
message passing to communicate with clients.
'''
# Abstraction function:
# AF(drinksInFridge, incoming, outgoing) = a refrigerator containing
# `drinksInFridge` drinks that takes
# requests from `incoming` and sends
# replies to `outgoing`
# Rep invariant:
# drinksInFridge >= 0
def __init__(self, requests, replies):
'''
Make a DrinksFridge that will listen for requests and generate replies.
requests: Queue to receive requests from
replies: Queue to send replies to
'''
self.drinksInFridge = 0
self.incoming = requests
self.outgoing = replies
self.checkRep()
...
The module has a start
method that creates an internal thread to service requests on its input queue:
DrinksFridge.start()
def start(self):
'''
Start handling drink requests.
'''
def runFridge():
while True:
n = self.incoming.get()
result = self.handleDrinkRequest(n)
self.outgoing.put(result)
Thread(target=runFridge).start()
Incoming messages to the DrinksFridge
are integers, representing a number of drinks to take from (or add to) the fridge:
DrinksFridge.handleDrinkRequest()
def handleDrinkRequest(self, n):
'''
Take (or add) drinks from the fridge.
n: if >= 0, removes up to n drinks from the fridge;
if < 0, adds -n drinks to the fridge.
Returns: FridgeResult reporting how many drinks were actually added or removed,
and how many drinks are left in the fridge.
'''
change = min(n, self.drinksInFridge)
self.drinksInFridge -= change
self.checkRep()
return FridgeResult(change, self.drinksInFridge)
Outgoing messages are instances of FridgeResult
:
FridgeResult
class FridgeResult:
'''
A threadsafe immutable message describing the result of taking or adding
drinks to a DrinksFridge.
'''
# Rep invariant: TODO
def __init__(self, drinksTakenOrAdded, drinksLeftInFridge):
'''
Make a new result message.
drinksTakenOrAdded: (precondition? TODO)
drinksLeftInFridge: (precondition? TODO)
'''
self.drinksTakenOrAdded = drinksTakenOrAdded
self.drinksLeftInFridge = drinksLeftInFridge
def __str__(self):
return ('you took ' if self.drinksTakenOrAdded >= 0 else 'you put in ') + \
f'{abs(self.drinksTakenOrAdded)} drinks, fridge has {self.drinksLeftInFridge} left'
Finally, here’s some code that loads the fridge:
loadfridge.py
requests = Queue()
replies = Queue()
# start an empty fridge
fridge = DrinksFridge(requests, replies)
fridge.start()
# deliver some drinks to the fridge
requests.put(-42)
# maybe do something concurrently...
# read the reply
print(replies.get())
print('done')
os._exit(0) # abruptly ends the program, including DrinksFridge, generally a bad idea
reading exercises
Evaluate the following code review comments on the code above:
“The DrinksFridge
constructor shouldn’t be putting references to the two queues directly into its rep; it should make defensive copies.”
(missing explanation)
“DrinksFridge.start()
has an infinite loop in it, so the thread will never stop until the whole process is stopped.”
(missing explanation)
“DrinksFridge
can have only one client using it, because if multiple clients put requests in its input queue, their results will get mixed up in the result queue.”
(missing explanation)
Message passing between workers
Now that we’ve seen how message passing works in Python, using a blocking queue of message we create and share, let’s turn our attention to TypeScript workers.
We’ll focus on the Node interface for Worker
(found in the package worker_threads
).
The web browser version of Worker
(called Web Workers) differs slightly in detail, but the basic operations of message-passing exist in that context too.
When a worker is created, it comes with a two-way communication channel, the ends of which are called message ports.
One end of the channel is accessible to the code that created the worker, using the postMessage
operation on the Worker
object:
import { Worker } from 'worker_threads';
const worker = new Worker('./hello.js');
worker.postMessage('hello!'); // sends a message to the worker
The other end of the channel is accessible to the worker, as a global object parentPort
from the worker_threads
module:
// hello.ts
import { parentPort } from 'worker_threads';
parentPort.postMessage('bonjour!'); // sends a message back to parent
These examples are sending simple strings as messages, and neither side is actually receiving the message yet. Let’s address each of those problems in turn.
Message types
Like Python blocking queues, message ports can in general carry a wide variety of types, including arrays, maps, sets, and record types. One important restriction, however, is that they cannot be instances of user-defined classes, because the method code will not be passed over the channel.
A common pattern is to use a record type to represent a message, such as the FridgeResult
messages we already saw in the Python example:
type FridgeResult = {
drinksTakenOrAdded: number,
drinksLeftInFridge: number
};
When different kinds of messages might need to be sent over the channel, we can use a discriminated union to bring them together:
type DepositRequest = { name: 'deposit', amount: number };
type WithdrawRequest = { name: 'withdrawal', amount: number };
type BalanceRequest = { name: 'balance' };
type BankRequest = DepositRequest | WithdrawRequest | BalanceRequest;
Here, the name
field is a literal type that distinguishes between the three different kinds of requests we can make to the bank, and the rest of the fields of each object type might vary depending on the information needed for each kind of request.
Receiving messages
To receive incoming messages from a message port, we provide an event listener: a function that will be called whenever a message arrives. For example, here’s how the worker listens for a message from its parent:
import { parentPort } from 'worker_threads';
parentPort.addListener('message', (greeting: string) => {
console.log('received a message', greeting);
});
When the parent calls worker.postMessage('hello!')
, the worker would receive this message as a call to the anonymous function with greeting
set to "hello!"
.
TypeScript message passing example
Let’s put this together into an implementation of DrinksFridge
for TypeScript, communicating through a message port rather than blocking queues:
drinksfridge.ts
/**
* A mutable type representing a refrigerator containing drinks.
*/
class DrinksFridge {
private drinksInFridge: number = 0;
// Abstraction function:
// AF(drinksInFridge, port) = a refrigerator containing `drinksInFridge` drinks
// that takes requests from and sends replies to `port`
// Rep invariant:
// drinksInFridge >= 0
/**
* Make a DrinksFridge that will listen for requests and generate replies.
*
* @param port port to receive requests from and send replies to
*/
public constructor(
private readonly port: MessagePort
) {
this.checkRep();
}
...
}
The fridge has a start
method that starts listening for incoming messages, and forwards them to a handler method:
/** Start handling drink requests. */
public start(): void {
this.port.addListener('message', (n: number) => {
const reply: FridgeResult = this.handleDrinkRequest(n);
this.port.postMessage(reply);
});
}
Incoming messages are integers representing a number of drinks to add or remove, as in the Python implementation:
private handleDrinkRequest(n: number): FridgeResult {
const change = Math.min(n, this.drinksInFridge);
this.drinksInFridge -= change;
this.checkRep();
return { drinksTakenOrAdded: change, drinksLeftInFridge: this.drinksInFridge };
}
Outgoing messages are instances of FridgeResult
, a simple record type:
/** Record type for DrinksFridge's reply messages. */
type FridgeResult = {
drinksTakenOrAdded: number,
drinksLeftInFridge: number
};
Some code to load and use the fridge is in loadfridge.ts
.
Stopping
What if we want to shut down the DrinksFridge
so it is no longer waiting for new inputs?
One strategy is a poison pill: a special message that signals the consumer of that message to end its work.
To shut down the fridge, since its input messages are merely integers, we would have to choose a magic poison integer (maybe nobody will ever ask for 0 drinks…? bad idea, don’t use magic numbers) or use null (bad idea, don’t use null). Instead, we might change the type of input messages to a discriminated union:
type FridgeRequest = DrinkRequest | StopRequest;
type DrinkRequest = { name: 'drink', drinksRequested: number };
type StopRequest = { name: 'stop' };
When we want to stop the fridge, we send a StopRequest
, i.e. fridge.postMessage({ name:'stop' })
.
And when the DrinksFridge
receives this stop message, it will need to stop listening for incoming messages.
That requires us to keep track of the listener callback so it can be removed later.
For example, fill in the blanks in DrinksFridge
in the exercise below:
class DrinksFridge {
...
// callback function is now stored as a private field
private readonly messageCallback = (req: FridgeRequest) => {
// see if we should stop
if (req.name === 'stop') {
// TypeScript infers that req must be a StopRequest here
this.stop();
▶▶A◀◀;
}
// with the correct code in blank A,
// TypeScript infers that req must be a DrinkRequest here
// compute the answer and send it back
const n = ▶▶B◀◀;
const reply:FridgeResult = this.handleDrinkRequest(n);
this.port.postMessage(reply);
};
/** Start handling drink requests. */
public start(): void {
this.port.addListener('message', this.messageCallback);
}
private stop(): void {
this.port.removeListener('message', ▶▶C◀◀);
// note that once this thread has no more code to run, and no more
// listeners attached, it terminates
}
...
}
(If you’re surprised that the messageCallback
arrow function can reference this
, good eye!
Remember that arrow functions do not define their own this
.
Instead, this
will be looked up in the surrounding scope.
It turns out that field value expressions in a TypeScript class body are evaluated in the constructor, which is why this
is defined as we need it.)
It is also possible to stop a Worker
by calling its terminate()
method.
But like our initial Python example that used the inadvisable os._exit(0)
, this method summarily ends execution in that Worker
, dropping whatever unfinished work it had on the floor, and potentially leaving shared state in the filesystem, or database, or communication channels broken for the rest of the program.
Using the message passing channel to shut down cleanly is the way to go.
Race conditions
In a previous reading, we saw in the bank account example that message-passing doesn’t eliminate the possibility of race conditions. It’s still possible for concurrent message-passing processes to interleave their work in bad ways.
This particularly happens when a client must send multiple messages to the module to do what it needs, because those messages (and the client’s processing of their responses) may interleave with messages sent by other clients.
The message protocol for DrinksFridge
has been carefully designed to manage some of this interleaving, but there are still situations where a race condition can arise.
The next exercises explore this problem.
reading exercises
Suppose the DrinksFridge
has only 2 drinks left, and two very thirsty people send it requests, each asking for 3 drinks.
Which of these outcomes is possible, after both messages have been processed by the fridge?
You can reread the Python code for start()
and handleDrinkRequest()
to remind yourself how the fridge works.
(missing explanation)
Suppose the DrinksFridge
still has only 2 drinks left, and three people would each like a drink.
But they are all more polite than they are thirsty – none of them wants to take the last drink and leave the fridge empty.
So all three people run an algorithm that might be characterized as “LOOK before you TAKE”:
- LOOK: request 0 drinks, just to see how many drinks are left in the fridge without taking any
- if the response shows the fridge has more than 1 drink left, then:
- TAKE: request 1 drink from the fridge
- otherwise go away without a drink
Which of these outcomes is possible, after all three people run their LOOK-TAKE algorithm and all their messages have been processed by the fridge?
(missing explanation)
Deadlock
The blocking behavior of blocking queues is very convenient for programming, but blocking also introduces the possibility of deadlock. In a deadlock, two (or more) concurrent modules are both blocked waiting for each other to do something. Since they’re blocked, no module will be able to make anything happen, and none of them will break the deadlock.
In general, in a system of multiple concurrent modules communicating with each other, we can imagine drawing a graph in which the nodes of the graph are modules, and there’s an edge from A to B if module A is blocked waiting for module B to do something. The system is deadlocked if at some point in time, there is a cycle in this graph. The simplest case is the two-node deadlock, A → B and B → A, but more complex systems can encounter larger deadlocks.
Deadlock is much more common with locks than with message-passing — but when the message-passing queues have limited capacity, and become filled up to that capacity with messages, then even a message-passing system can experience deadlock. A message passing system in deadlock appears to simply get stuck.
Let’s see an example of message-passing deadlock, using the same Python DrinksFridge
we’ve been discussing.
This time, instead of using Queue
instances that can grow arbitrarily (limited only by the size of memory), we will limit them to a fixed capacity by passing a maxsize
argument.
Many message-passing systems use fixed-capacity queues for performance reasons, so this is a common situation.
Finally, to create the conditions needed for deadlock, the client code will make N
requests, each asking for a drink, before checking for any of the replies from DrinksFridge
.
Here is the full code:
manythirstypeople.py
QUEUE_SIZE = 100
N = 100
requests = Queue(maxsize=QUEUE_SIZE)
replies = Queue(maxsize=QUEUE_SIZE)
fridge = DrinksFridge(requests, replies)
fridge.start()
# put enough drinks in the fridge to start
requests.put(-N)
print(replies.get())
# send the requests
for x in range(N):
requests.put(1)
print(f'person #{x} is looking for a drink')
# collect the replies
for x in range(N):
print(f'person #{x}: {replies.get()}')
print('done')
os._exit(0) # abruptly ends the program, generally a bad idea
It turns out with QUEUE_SIZE
=100 and N
=100, the code above works and doesn’t reach a deadlock.
But notice that our client is making N
requests before reading any replies.
If N
is larger than QUEUE_SIZE
, the replies
queue fills up with unread replies.
Then DrinksFridge
blocks trying to put
one more reply into that queue, and it stops calling get
on the requests
queue.
The client can continue putting more requests into the requests
queue, but only up to the size of that queue.
If there are more additional requests than can fit in that queue – i.e., when N
is greater than 2×QUEUE_SIZE
– then the client’s call to requests.put()
will block too.
And now we have our deadly embrace.
DrinksFridge
is waiting for the client to read some replies and free up space on the replies
queue, but the client is waiting for DrinksFridge
to accept some requests and free up space on the requests
queue.
Deadlock.
reading exercises
Final suggestions for preventing deadlock
One solution to deadlock is to design the system so that there is no possibility of a cycle — so that if A is waiting for B, it cannot be that B was already (or will start) waiting for A.
Another approach to deadlock is timeouts. If a module has been blocked for too long (maybe 100 milliseconds? or 10 seconds? the decision will depend on the system), then you stop blocking and throw an exception. Now the problem becomes: what do you do when that exception is thrown?
Summary
Message passing systems avoid the bugs associated with synchronizing actions on shared mutable data by instead sharing a communication channel, e.g. a stream or a queue.
Rather than develop a mutable ADT that is safe for use concurrently, as we did in Mutual Exclusion, concurrent modules (threads, processes, separate machines) only transfer messages back and forth, and mutation is confined to each module individually.