Skip to content

Playing With GPars

Way, way, waaaay back, I became something of a minor Occam ‘guru’ (I actually implemented a fair proportion of a compiler/runtime for Occam 1) and I have maintained an (academic and practical) interest in the field throughout my career (which is what I laughingly call what I do these days, see Will Code For Food for more ;-))

It’s therefore natural for me to take a look at GPars.

In its own words, GPars is:

an open-source concurrency library for Groovy that aims to give you multiple high-level abstractions for writing concurrent code in Groovy – map/reduce, fork/join, asynchronous closures, actors, agents, dataflow concurrency and other concepts, which aim to make your Groovy code concurrent with little effort.

Ever played the charmingly-named Chinese Whispers game at a party? That’s what I’m going to play with here.

It’s a very simple example of a pipelined processing task. This sort of thing is very common in signal processing, process control and image munging.

In the applications I build here, there will be a source thread generating messages which are each then sent through a chain of intermediate threads to a sink thread, which simply bounces the message back down the chain. On each ’sinkward’ step the intermediaries may or may not munge the message in some way as it whispers it to the next step, so that the version that eventually makes its way back to the source may be changed in some unpredicatable way.

Here’s the image to keep in your head:

pipe

GPars provides several alternative ways of tackling this problem.

Scala made the idea of Actors popular, so that’s a good starting point.

This is what the GPars doco says about Actors:

The actor support in gpars were inspired by the Actors library in Scala but have meanwhile gone beyond that.

Actors allow for a messaging-based concurrency model, built from independent active objects that exchange messages and have no mutable shared state.

If you are a webby-person, you can think of an Actor as a Servlet…you won’t go far wrong.

Coming up is a version of “Chinese Whispers” implemented using GPars’ Actors:

package chinesewhispers

@GrabResolver(name = 'jboss', root = 'http://repository.jboss.org/maven2/')
@Grab(group = 'org.codehaus.gpars', module = 'gpars', version = '0.9')

import groovyx.gpars.actor.*
import chinesewhispers.utils.Whisperer

Actors.defaultPooledActorGroup.resize 16

class Stop {}
class Go {}

final intermediaries = []

def src = Actors.actor {index ->
  println "Source; Starting..."
  react { Go go ->
    println "Source; time to 'Go'"
    def msg = 'The sky is green, the trees are blue...SNAFU!'
    println "Source; message is: $msg"
    for (i in 0..<8) {
      intermediaries[1].sendAndContinue msg, { ix, reply ->
        println "Source[$ix]; received: '$reply'"
      }.curry(i)
    }
    intermediaries[1].send(new Stop())
    stop()
    print "Source; ...Stopped."
  }
}

intermediaries << src

for (i in 1..8) {
  intermediaries << Actors.messageHandler({ index ->
    when {Stop msg -> intermediaries[index + 1].send msg; stop(); }
    when {msg ->
      def sender = msg.sender
      intermediaries[index + 1].sendAndContinue Whisperer.whisper("${msg}"), { reply ->
        sender.send reply
      }
    }
  }.curry(i))
}

intermediaries << Actors.messageHandler {
  when { Stop msg -> stop() }
  when { msg -> reply msg }
}

src < < new Go()

intermediaries*.join()

This simple script creates a list of Actors, with a source Actor first, followed by a number of essentially identical intermediary Actors and finally a sink Actor.

Here it all is, “in action” (pun intended):

ChineseWhisperingActors
GPars creates Actors in a runnable state so that it is easy to start things off by sending the Source a unique Go message.

Notice how each Actor sends a message and then allocates a closure to asynchrounously handle the appropriate return message. This ensures that intermediaries get new work as soon as possible, so the pipeline is always running at full capacity. This makes for a very performant solution.

It’s a truism that getting a parallel system off and running is fairly easy, but getting one to Stop properly can be surprisingly tricky. For this application, the Source Actor’s last activity before it finishes is to send a (guaranteed-to-be-unique, obviously) Stop message up the pipline. On receipt of a Stop, each intermediate Actor simply forwards the message and then terminates in a nice orderly fashion. the final Sink Actor receives the forwarded Stop and simply stops without trying to send the message back down the chain of intermediaries (since the chain is effectively no longer there).

The messageHandler closure’s distinct ‘when’ syntax makes the sort of message dependent processing shown here very easy.

The final line of the script simply waits for all Actors to finish shutting down before allowing the overall script to exit.

Simple, performant and easy to write and comprehend. What’s not to like!

GPars actors are not perfect (they lack some of the Scala library’s more subtle abilities such as pattern matching to guard message reception, and the ability to see the length of the incoming mesage queue), but they’re not too shabby, either.

For a second bite at the apple, here’s the DataFlow version.

According the the doco:

Dataflow concurrency offers an alternative concurrency model, which is inherently safe and robust. It puts emphasis on the data and their flow though your processes instead of the actual processes that manipulate the data. Dataflow algorithms relieve developers from dealing with live-locks, race-conditions and make dead-ocks[sic] deterministic and thus 100% reproducible. If you don’t get dead-locks in tests you won’t get them in production.

What does a Dataflow-based Chinese Whispers program look like? Viola:

package chinesewhispers

@GrabResolver(name = 'jboss', root = 'http://repository.jboss.org/maven2/')
@Grab(group = 'org.codehaus.gpars', module = 'gpars', version = '0.9')

import groovyx.gpars.dataflow.*
import chinesewhispers.utils.Whisperer

class Link {
  final up = new DataFlowVariable()
  final down = new DataFlowVariable()

  Link(index) {
    /*
      up >> {println "The up[$index] variable has just been bound to $it"}
      down >> {println "The down[$index] variable has just been bound to $it"}
    */
  }
}

final NINT = 8

for (iteration in 0..<8) {
  final intermediaries = []
  for (i in 0..<NINT)
    intermediaries << new Link(i)

  DataFlow.task {
    intermediaries[0].up << 'The sky is green, the trees are blue...SNAFU!'
    println intermediaries[0].down.val
  }

  for (i in 1..<NINT) {
    DataFlow.task({ index ->
      intermediaries[index].up << Whisperer.whisper(intermediaries[index - 1].up.val)
    }.curry(i))
    DataFlow.task({ index ->
      intermediaries[index - 1].down << intermediaries[index].down.val
    }.curry(i))
  }

  DataFlow.task {
    intermediaries[NINT - 1].down << "Did you REALLY mean to say: '${intermediaries[NINT - 1].up.val}'?"
  }
}
"OK"

Neat, eh? Simple assignment to a variable, or reading a variable’s value performs inter-task communication. No messy send and receive mechanisms here!

Here’s what it looks like when running inside IntelliJ:

ChineseWhisperingDataflow

Of course, you lose a fair bit of flexibility, but for straightforward tasks the simplicity of the solution more than compensates…

Each bi-directional link between intermediary tasks is modelled as a Link with a separate ‘up’ and a ‘down’ variable. Although GPars offers alternatives, for this application I have chosen to use plain “Dataflow Variables”, which are write-once, read-many entities; each iteration through the main loop thus recreates the requisite Link instances (and hence the Dataflow Variables) from scratch.

Uncomment the body of the Link constructor and you’ll be able to see the inter-task communication in action. Simple but very effective debugging:

ChineseWhisperingDataflowDebug

Just for completeness sake (it’s a minor supporting actor, not a star): here’s the Whisperer class that the examples use:

package chinesewhispers.utils

class Whisperer {
  final static r = new Random()

  final static map = [
          'sky': 'elephant',
          'trees': 'kettles',
          'green': 'effervescent',
          'blue': 'smelly'
  ]

  static whisper(msg) {

    if (r.nextBoolean())
      return msg

    def split = msg.split(/\W/).findAll { it }
    def l = split.size()
    def sel = r.nextInt(l)
    def word = split[sel]

    def w = map[word]
    w ? msg.replace(word, w) : msg
  }
}

That’s all folks; two versions of the same application is enough for any mortal!

I am eagerly looking forward to the time when the work to integrate JCSP into GPars bears fruit.

By the way, each of these examples should happily work with GPars 0.9 and Groovy 1.7.1. Just copy the listings into GroovyConsole and go (you might need to bring the Whispers class inline into each application, but that’s trivial to do).

Tags: ,

Java Enterprise Edition, JEE, JavaServer Pages, JSP, Tag Libraries, Servlets, Enterprise Java Beans, EJB, Java Messaging Service JMS, BEA Weblogic, JBoss, Application Servers, Spring Framework, Groovy, Grails, Griffon, Seam, Open Source, Service Oriented Architectures, SOA, Java 2 Standard Edition, J2SE