6.031
6.031 — Software Construction
Fall 2021

Code Examples for Reading 24

Download

You can also download a ZIP file containing this code.

Seems to be necessary to have a triple-backtick block at the start of this page,
otherwise all the pre/code tags below aren't syntax highlighted.
So create a hidden one.

drinksfridge.py

from threading import Thread

class DrinksFridge:
    '''
    A mutable type representing a refrigerator containing drinks, using
    message passing to communicate with clients.
    '''
    
    # Abstraction function:
    #   AF(drinksInFridge, incoming, outgoing) = a refrigerator containing
    #                                            `drinksInFridge` drinks that takes
    #                                            requests from `incoming` and sends
    #                                            replies to `outgoing`
    # Rep invariant:
    #   drinksInFridge >= 0
    
    def __init__(self, requests, replies):
        '''
        Make a DrinksFridge that will listen for requests and generate replies.
        requests: Queue to receive requests from
        replies: Queue to send replies to
        '''
        self.drinksInFridge = 0
        self.incoming = requests
        self.outgoing = replies
        self.checkRep()
    
    def checkRep(self):
        assert self.drinksInFridge >= 0
        assert self.incoming
        assert self.outgoing
    
    def start(self):
        '''
        Start handling drink requests.
        '''
        def runFridge():
            while True:
                n = self.incoming.get()
                result = self.handleDrinkRequest(n)
                self.outgoing.put(result)
        Thread(target=runFridge).start()
    
    def handleDrinkRequest(self, n):
        '''
        Take (or add) drinks from the fridge.
        n: if >= 0, removes up to n drinks from the fridge;
           if < 0, adds -n drinks to the fridge.
        Returns: FridgeResult reporting how many drinks were actually added or removed,
                 and how many drinks are left in the fridge.
        '''
        change = min(n, self.drinksInFridge)
        self.drinksInFridge -= change
        self.checkRep()
        return FridgeResult(change, self.drinksInFridge)

class FridgeResult:
    '''
    A threadsafe immutable message describing the result of taking or adding
    drinks to a DrinksFridge.
    '''
    
    # Rep invariant: TODO
    
    def __init__(self, drinksTakenOrAdded, drinksLeftInFridge):
        '''
        Make a new result message.
        drinksTakenOrAdded: (precondition? TODO)
        drinksLeftInFridge: (precondition? TODO)
        '''
        self.drinksTakenOrAdded = drinksTakenOrAdded
        self.drinksLeftInFridge = drinksLeftInFridge
    
    def __str__(self):
        return ('you took ' if self.drinksTakenOrAdded >= 0 else 'you put in ') + \
               f'{abs(self.drinksTakenOrAdded)} drinks, fridge has {self.drinksLeftInFridge} left'

loadfridge.py

from queue import Queue
import os
from drinksfridge import *

#
# Create and use a drinks fridge.
#

requests = Queue()
replies = Queue()

# start an empty fridge
fridge = DrinksFridge(requests, replies)
fridge.start()

# deliver some drinks to the fridge
requests.put(-42)

# maybe do something concurrently...

# read the reply
print(replies.get())

print('done')
os._exit(0) # abruptly ends the program, including DrinksFridge

manythirstypeople.py

from queue import Queue
import os
from drinksfridge import *

QUEUE_SIZE = 100
N = 250

requests = Queue(maxsize=QUEUE_SIZE)
replies = Queue(maxsize=QUEUE_SIZE)

fridge = DrinksFridge(requests, replies)
fridge.start()

# put enough drinks in the fridge to start
requests.put(-N)
print(replies.get())

# send the requests
for x in range(N):
    requests.put(1)
    print(f'person #{x} is looking for a drink')
# collect the replies
for x in range(N):
    print(f'person #{x}: {replies.get()}')

print('done')
os._exit(0) # abruptly ends the program, generally a bad idea

drinksfridge.ts

import assert from 'assert';
import { MessagePort } from 'worker_threads';

/** Record type for DrinksFridge's reply messages. */
export type FridgeResult = {
    drinksTakenOrAdded: number,
    drinksLeftInFridge: number 
};

/**
 * A mutable type representing a refrigerator containing drinks.
 */
export class DrinksFridge {

    private drinksInFridge: number = 0;

    // Abstraction function:
    //   AF(drinksInFridge, port) = a refrigerator containing `drinksInFridge` drinks
    //                              that takes requests from and sends replies to `port`
    // Rep invariant:
    //   drinksInFridge >= 0

    /**
     * Make a DrinksFridge that will listen for requests and generate replies.
     * 
     * @param port port to receive requests from and send replies to
     */
    public constructor(
        private readonly port: MessagePort
    ) {
        this.checkRep();
    }

    private checkRep():void {
        assert(this.drinksInFridge >= 0);
    }

    /** Start handling drink requests. */
    public start(): void {
        this.port.addListener('message', (n: number) => {
            const reply:FridgeResult = this.handleDrinkRequest(n);
            this.port.postMessage(reply);
        });
    }

    private handleDrinkRequest(n: number): FridgeResult {
        const change = Math.min(n, this.drinksInFridge);
        this.drinksInFridge -= change;
        this.checkRep();
        return { drinksTakenOrAdded: change, drinksLeftInFridge: this.drinksInFridge };
    }
}

loadfridge.ts

import assert from 'assert';
import { Worker, isMainThread, parentPort } from 'worker_threads';

import { DrinksFridge, FridgeResult } from './drinksfridge';

//
// Create and use a drinks fridge.
//

/** Runs on the main thread. */
function main() {
    const worker = new Worker('./dist/loadfridge.js');
    worker.addListener('message', (result: FridgeResult) => {
      console.log('result from worker', result);
    });
    worker.postMessage(-42);
    worker.postMessage(2);
    
    // abruptly end the program, including the worker, after a short time
    setTimeout(() => process.exit(0), 1000);
}

/** Runs in a worker. */
function worker() {
    assert(parentPort);
    new DrinksFridge(parentPort).start();
}

if (isMainThread) {
    main();
} else {
    worker();
}