Posted in Scala

Multithreading with Shared Data in Scala

Scala has many built in options that make it extremely easy to parallelize work and generally I have a preference to not share data between threads. Mostly scala’s preference for immutability and functional style programming helps reason problems in a scalable way without requiring you to think about concurrency or shared mutable data.

Occasionally I still find the need to consider how to use share data across multiple threads. So here’s an explanation of some options available in Scala.

Consider this example of a pub/sub pattern (or Observer pattern). Below is a trait that provides classes with the ability to have clients listen for changes.

trait SynchronizedPublisher[T] {
  private[this] val eventListeners = mutable.Map[AnyRef, (T) => Unit]()

  def subscribe(ref: AnyRef, fn: (T)=>Unit) : Unit = {
    eventListeners += (ref -> fn)
  }

  def unsubscribe(ref: AnyRef):Unit = {
    eventListeners -= ref
  }

  def fireEvent(event: T): Unit = {
    eventListeners.foreach(_._2(event))
  }

}

 

This can be used by classes by extending the trait and gives them the ability to notify observers of new events.

The intention of the trait is to be used in a singleton pattern so there can possibly be multiple threads using functions concurrently but in this case the core data structure is a mutable collection that is not designed to support multi-threaded use.

Lets look at options for making this thread safe…

Synchronized

Based on the Java synchronized block, this is the simplest method. You can get a mutex lock on any object while executing a block of code. You could update the publisher implementation like below…

trait SynchronizedPublisher[T] {

  private[this] val eventListeners = mutable.Map[AnyRef, (T) => Unit]()
  private[this] val rwLock = new Object

  def subscribe(ref: AnyRef, fn: (T)=>Unit) : Unit = {
    rwLock.synchronized {
      eventListeners += (ref -> fn)
    }
  }

  def unsubscribe(ref: AnyRef):Unit = {
    rwLock.synchronized {
      eventListeners -= ref
    }
  }

  def fireEvent(event: T): Unit = {
    rwLock.synchronized {
      eventListeners.foreach(_._2(event))
    }
  }

}

 

The advantage of this approach is its fast and simple to use.

The disadvantage of this approach is its an exclusive lock on that object which means only one thread can hold the lock. This has two problems:

  1. Access to these functions is effectively single threaded so it’s not a very good choice if most threads will be reading and not changing the state as it still only supports 1 reader at a time.
  2. You need to be careful not to hold the lock too long and not to grab other locks in synchronized blocks in case you create a deadlock situation.

Atomic Reference

Leverage the existing powerful java concurrency options. There are java collections that support concurrency like java.util.concurrent.ConcurrentHashMap<K,V>.

A general purpose approach is to use the atomic reference which is built like a wrapper on a volatile variable with the additional functionality of a compareAndSet operation that is guaranteed to be atomic.

A sample implemention could look this…

trait SynchronizedPublisher[T] {

  private[this] val eventListeners = new AtomicReference(Map[AnyRef, (T) => Unit]())

  def subscribe(ref: AnyRef, fn: (T) => Unit): Unit = {
    update(_ + (ref -> fn))
  }

  def unsubscribe(ref: AnyRef): Unit = {
    update(_ - ref)
  }

  protected[this] def fireEvent(event: T): Unit = {
    eventListeners.get().values.foreach(_ (event))
  }

  private[this] def update(fn: (Map[AnyRef, (T) => Unit]) => (Map[AnyRef, (T) => Unit])): Unit = {
    while(true) {
      val listenerMap = eventListeners.get()
      if (eventListeners.compareAndSet(listenerMap, fn(listenerMap)))
        return // success
    }
  }

}

 

The advantage of this approach is that it’s lockfree and thread-safe. It performs well, particularly if the access is mostly read and there isn’t much contention for write.

The disadvantage of this approach is that it might not work as well if access is mostly write operations and there is high contention. It’s also designed for use with single objects so might not be appropriate if you need change a number of objects in a transactional way.

Reentrant Read Write Lock

Another general purpose option from the java concurrency library is  java.util.concurrent.locks.ReentrantReadWriteLock which you can add some Scala sugar to let you use it similar to synchronized block and so you don’t worry about forgetting to close it. (This could probably be implement as an implicit wrapper on ReentrantReadWriteLock object for an even lighter api)

object Util {
  def withReadLock[B](rwLock: ReentrantReadWriteLock)(fn: =>B): B = {
    rwLock.readLock().lock()
    try {
      fn
    } finally {
      rwLock.readLock().unlock()
    }
  }

  def withWriteLock[B](rwLock: ReentrantReadWriteLock)(fn: =>B) : B = {
    rwLock.writeLock().lock()
    try {
      fn
    } finally {
      rwLock.writeLock().unlock()
    }
  }
}

 

Then the updated publisher trait would look like this

trait SynchronizedPublisher[T] {

  private[this] val eventListeners = mutable.Map[AnyRef, (T) => Unit]()
  private[this] val rwLock = new ReentrantReadWriteLock()

  def subscribe(ref: AnyRef, fn: (T)=>Unit) : Unit = {
    Util.withWriteLock(rwLock) {
      eventListeners += (ref -> fn)
    }
  }

  def unsubscribe(ref: AnyRef):Unit = {
    Util.withWriteLock(rwLock) {
      eventListeners -= ref
    }
  }

  def fireEvent(event: T): Unit = {
    Util.withReadLock(rwLock) {
      eventListeners.foreach(_._2(event))
    }
  }

}

This approach has a big advantage over the synchronize blocks as the ReadWriteLocks support multiple readers and only requires an exclusive lock when a thread needs a write lock. The semantics also are not any harder to use than regular synchronize blocks. If the potential blocking nature of the code is a problem, a variation could be to wrap any potentially blocking call (like acquiring a lock) in a Future to avoid blocking the caller.

Akka Actors

The Akka Actors take a different approach to the problem by removing the need for developers to think about locks or threading. An actor exists as a self contained entity that is responsible for its internal data structure. You don’t call functions on the actor, instead, you get a reference to an Actor that allows callers to put a message on a message queue for the Actor asking it do some work. The actor has a single thread running in a continual loop that takes messages off the queue and processes them and responds if necessary (or sends a message to another actor). There is no blocking from the callers perspective which provides for a great model for concurrent programming that is compatible with reactive style programming.

An example of an actor using the observer pattern might look like this…
(this is a direct translation, the call backs could be messages to other actors instead)

object PublisherActor {
  def props[T] = Props[PublisherActor[T]]
}

class PublisherActor[T] extends Actor {
  case class NewEvents(events:T)
  case class Subscribe(ref: AnyRef, fn:(T)=>Unit)
  case class UnSubscribe(ref:AnyRef)

  val eventListeners = mutable.Map[AnyRef, (T) => Unit]()

  def receive = {
    case NewEvents(newMsg) =>
      for(listener <- eventListeners) {
        listener._2(newMsg)
      }
    case Subscribe(ref, fn) =>
      eventListeners += (ref -> fn)
    case UnSubscribe(ref) =>
      eventListeners -= ref
  }
}

 

To use the actor you need a actor reference (see Akka documention). Then you send a message (? = ask that expects a response as a future, ! = send that doesn’t expect a response)

myActorRef ! Subscribe(ref, newMsgFn)

 

This approach has a big advantage over the approaches above in that from a developer perspective it can be less error prone as it’s easy to reason as the interactions become more complex. The big power comes when you have multiple actors communicating with each other as you don’t need to worry about locks and potential of causing a deadlock situation. It’s also compatible with message driven approach in reactive programming and doesn’t introduce any blocking code into your application.

A big disadvantage in the actor approach is that it is still effectively a single threaded event queue so only one read operation can occur at one time which forces you to be quiet disciplined on the actor thread. The standard approach for any big compute or IO is to copy the actor state to local variables and use a Future to free up the actor thread. If the actor thread can’t process the messages faster than it receives new ones then there can be a build up of messages in the messages and this will lead to increased latency, memory issues and failures in the system.

The second big disadvantage is the loss of type safety. There is no compile time checking that the messages being passed are compatible with the actor so if you are not disciplined this can cause problems with messages not being acted on. The strong typing is a major advantage of scala so it’s not a good sacrifice to have to make.

Scala STM

STM stands for Software Transactional Memory and the concept is similar to what seen in other storage like databases. Scala STM is an implementation that gives ACID style protection on in memory data structures. It is based on optimistic locking which means it doesn’t hold a lock before executing an atomic block but instead it keeps a snapshot of the data structure before the changes and checks if changes occurring in atomic blocks are interleaved with changes from other threads. If this happens then it will automatically rollback the atomic block from one thread. If most of the time there is no conflict then you can get better throughput as there is no locking required.

Scala STM’s general abstraction is Ref which is a wrapper on the data structure that is being protected and any access must be done in an atomic block. Scala STM does also provide transactional data structures which are better for the Publisher example.

trait SynchronizedPublisher[T] {

  private[this] val eventListeners = TMap[Any, (T) => Unit]()

  def subscribe(ref: AnyRef, fn: (T) => Unit): Unit = {
    eventListeners.single += (ref -> fn)
  }

  def unsubscribe(ref: AnyRef): Unit = {
    eventListeners.single -= ref
  }

  protected[this] def fireEvent(event: T): Unit = {
    val listenersToNotify = eventListeners.single.toList
    listenersToNotify.foreach(_._2(event)) // don't use the eventListeners directly as small potential for replay
  }

}

 

The advantage of this approach is that it’s very clean. The developer is insulated from much of the complexity.

The big disadvantage is that STM does introduce a performance overhead which might balance out if compared to other option in a highly multithreaded scenario with high contention on writes but often will perform worse if that criteria is not met. It is particular slower in environments where there is little contention on the write or mostly single threaded access. The other disadvantage is you have to be aware that the code can be rolled back at any time so you shouldn’t be making any changes to any data models outside of the Scala STM protection (e.g. database writes or IO that changes state). The code should always be rerunnable.

Conclusion

All options here are reasonable choices and have their advantages. The AtomicReference approach would be my preference for this example the publisher is mostly reading the collection of listeners and AtomicReference performs really well for this scenario. If the scenario was a bit more complicated that I couldn’t model it with AtomicReference then I would look at either Reentrant R/W Locks or Actors depending the scenario.

Advertisements

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out / Change )

Twitter picture

You are commenting using your Twitter account. Log Out / Change )

Facebook photo

You are commenting using your Facebook account. Log Out / Change )

Google+ photo

You are commenting using your Google+ account. Log Out / Change )

Connecting to %s