27. Actors

Overview

Fantom includes an actor framework for concurrency. Actors are light weight objects which asynchronously process work on a background thread. Actors are given work by sending them asynchronous message. Actor's then process those messages on background threads controlled by an ActorPool.

Actors

The Actor class is used to define new actors. All actors are constructed within an ActorPool which defines how the actor is executed.

Actors may define their receive behavior in one of two ways:

  1. Pass a function to the Actor's constructor
  2. Subclass Actor and override receive

Here are two simple examples of an actor which receives an Int message and returns the increment:

// pass receive to constructor as a closure function
a := Actor(pool) |Int msg->Int| { msg + 1 }

// subclass and override receive
const class IncrActor : Actor
{
  new make(ActorPool p) : super(p) {}
  override Obj? receive(Obj? msg) { msg->increment }
}

An actor is guaranteed to receive its messages atomically - it is never scheduled on multiple threads concurrently. However, an actor is not guaranteed to receive all of its messages on the same thread over time. For example if messages A, B, and C are sent to an actor, the runtime may use three different threads to process those messages. But the actor is guaranteed to process the messages serially one after the other.

Actor Locals

Actors are const classes which means they must be immutable. This lets you pass actor references between actors, but you can't maintain any mutable state in the actor's fields. Instead you can store the actor's "mutable world state" in Actor.locals. Actor locals is a string/object map which works like a thread local - a unique map is used for every actor. To prevent naming collisions, you should prefix your map keys with your pod name:

// store a actor local
Actor.locals["acme.global"] = "hum bug"

// get a thread local
Actor.locals["acme.global"]

For example to build an actor which maintains a counter every time it receives a message:

pool := ActorPool()
a := Actor(pool) |msg|
{
  count := 1 + (Int)Actor.locals.get("count", 0)
  Actor.locals["count"] = count
  return count
}

100.times { a.send("ignored") }
echo("Count is now " + a.send("ignored").get)

Note that in this example, the actor ignores the messages sent to it, so it doesn't really matter what we pass.

Message Passing

Actors communicate by sending each other messages. Messages cannot be used to pass mutable state between actors. If a message is immutable then it passed by reference. Otherwise the message must be serializable in which case a serialized copy of the object is passed. If a message is neither immutable or serializable, then IOErr is thrown.

Messages are sent to an actor using of three methods:

Futures

All three send methods return a Future which may used to access the result of that message. You can poll for the result using isDone - a future enters the done state by one of three transitions:

Once a future enters the done state, the result is available via the get method. If the future is not done, then calling get will block the caller until the future becomes done. A timeout may be used to block for a fixed period of time. Calling get results in one of these outcomes:

Actors which block via Future.get should never receive messages themselves as this might lead to deadlocks. Best practice is to design service actors using strictly asynchronous messaging, and keep synchronous messaging on client actors which don't service requests themselves.

Timers

The sendLater method can be used to setup a timer. Timers post a message back to the actor's queue when they expire. Example:

pool := ActorPool()
a := Actor(pool) |Obj msg| { echo("$Time.now: $msg") }
a.send("start")
a.sendLater(1sec, "1sec")
a.sendLater(3sec, "3sec")
a.sendLater(2sec, "2sec")
Actor.sleep(5sec)

The sendLater method returns a Future which may be used to cancel the timer or poll/block until the message has been processed.

Chaining

The sendWhenDone method is used to deliver a message once another message has completed processing. Using sendWhenDone allows asynchronous message chaining. Consider this code:

future := actor1.send(msg1)
actor2.sendWhenDone(future, msg2)

In this example, msg2 is enqueued on actor2 only after actor1 completes processing of msg1. Typically the future itself is passed as the message:

a.sendWhenDone(future, future)        // future is message itself
a.sendWhenDone(future, MyMsg(future)) // MyMsg references future

Remember that sendWhenDone is called no matter how the future completes: successfully, with an error, or cancellation.

Coalescing Messages

Often when sending messages to an actor, we can merge two messages into a single message to save ourselves some work. For example, it is common in windowing systems to maintain a single union of all the dirty portions of a window rather than of a bunch of little rectangles. An actor can have its messages automatically coalesced using the makeCoalescing constructor.

Let's look at an example:

const class Repaint
{
  new make(Window w, Rect d) { ... }
  Window window
  Rect dirty
}

toKey := |Repaint msg->Obj| { msg.window }
coalesce := |Repaint a, Repaint b->Obj| { Repaint(a.window, a.dirty.union(b.dirty)) }
a := Actor.makeCoalescing(g, toKey, coalesce) |Repaint msg| {...}

In this example the messages are instances of Repaint. The toKey function is used to obtain the key which determines if two messages can be coalesced. In this example we coalesce repaints per window. If the thread detects two pending messages with the same key (the window in this case), then it calls the coalesce function to merge the messages. In example we return a new Repaint event with the union of the two dirty regions.

Messages sent with sendLater and sendWhenDone are never coalsesed.

Flow Control

The current implementation of Fantom uses unbounded message queues. This means if an actor is receiving messages faster than it can process them, then its queue will continue to grow. Eventually this might result in out of memory exceptions. You can use some of the following techniques to implement flow control to prevent unbounded queues from growing forever:

For example consider a "reader" actor which reads lines of text from a big text file and sends those lines to other "processing" actors for parallel processing. If the reader pushes the lines of text as fast as it can read them, then it could potentially end up enqueuing large numbers of lines in memory. A better strategy would be to have the processing actors enqueue themselves with the reader when they are ready to process a line. This would create a natural feedback loop and allow the reader to throttle its IO based on how fast the processors could work.

Actor Pools

All actor's are created within an ActorPool. ActorPools manage the execution of actors using a shared thread pool.

As messages are sent to actors, they are allocated a thread to perform their work. An ActorPool will create up to 100 threads, after which actor's must wait for a thread to free up. Once a thread frees up, then it is used to process the next actor. If no actor's have pending work, then the thread lingers for a few seconds before being released back to the operating system. In this model an ActorPool utilizes between zero and a peak of 100 threads depending on how many of the pool's actors currently have work. You can tweak the peak limit by setting Actor.maxThreads:

ActorPool { maxThreads = 10 }

An ActorPool is immediately considered running as soon as it is constructed. However, it doesn't actually spawn its first thread until one of its actors is sent a message. If all of a pool's actors finish processing their messages, then after a linger period all of that pool's threads be freed.

An ActorPool can be manually shutdown using the stop method. Once stop is called, the pool enters the stopped state and actors within the pool may not receive any more messages. However all pending messages are allowed to continue processing. Once all pending messages have been processed, the pool enters the done state. Use the join method to block until an ActorPool has fully shutdown.

The kill method can be used to perform an unorderly shutdown. Unlike stop, kill doesn't give actors a chance to finish processing their pending message queues - all pending messages are cancelled. Actors which are currently executing a message are interrupted (which may or may not immediately terminate that thread). Once all actors have relinquished their threads, the ActorPool enters the done state.