Fun With Concurrency and Dataflow

Mar 01, 2007 03:27

Here's some more crap. A simplistic, stupid implementation of dataflow variables (think Oz) for Gambit. I should call them dataflow cells or dataflow objects really; as currently they have nothing really to do with variables. I have some ideas for a hypothetical metacircular evaluator/compiler in which you can declare but not bind variables; such unbound variables become dataflow variables and references to them would be automagically converted to references to dataflow objects like these.

Here's how dataflow variables work in Oz. You can declare a variable and bind it at a later time. When a thread tries to get the value of a variable which is unbound, it stops. When another thread binds a value to that self-same variable, the stopped thread resumes right where it left off with the freshly bound value. What's more, you can have a partial value -- a record, for instance, in which some of the fields are filled but not all; trying to access an unfilled field creates the same behavior. In short, a dataflow thread only runs if all of the values it needs are available.

We will attempt to simulate this behavior using these dataflow objects. The "API" for them is a bit like SRFI-39 parameter objects. (make-dataflow) creates a new dataflow object in the form of a procedure. Calling the procedure without arguments gets; with an argument sets. You can only set once, and if you try to get a value that hasn't been set yet, the current thread will block until some other thread comes in and sets the value. For example:

> (let* ((d (make-dataflow))
(t (make-thread (lambda () (thread-sleep! 10) (d 5)))))
(thread-start! t)
(d))

...after a 10 second pause...

5
>

The reason why is because we are trying to return the value of the dataflow object, which it doesn't have; the thread we create that's tasked with filling it sleeps for 10 seconds before doing so. So the primordial thread is blocked for those 10 seconds.

Here's a fun trick. This bit of code has a producer thread, which generates squares at the rate of one a second; and a consumer thread, which displays them:

(let* ((d1 (make-dataflow))
(t (make-thread
(lambda ()
(let loop ((d d1)
(square 0)
(addend 1))
(if (> square 1000)
(d (cons square '()))
(begin (d (cons square (make-dataflow)))
(thread-sleep! 1)
(loop (cdr (d)) (+ square addend) (+ addend 2)))))))))
(thread-start! t)
(let loop ((v (d1)))
(display (car v))
(newline)
(if (null? (cdr v)) #f (loop ((cdr v))))))

This kind of fakes the partial values of Oz by putting the dataflow object in the cdr of a pair. Of course we have to know our dataflow object is in there. It would be much nicer if this stuff "just worked" without us having to remember to wrap the access to the dataflow object in another set of parentheses. But that requires language support which Gambit (and every other Scheme I know of) doesn't have. But the beautiful thing about Scheme and Lisp is their language-extensibility; writing an evaluator for a language with most of the semantics of Gambit Scheme but extended to include support for dataflow as a language feature would be much less difficult than in, say, Java or C#.

Another cool thing about Scheme and any language with lexical closures is: we were able to hide all the mutex and condition variable messiness behind a lambda, meaning from an end-user's perspective, all threads which need a dataflow object's value automatically synchronize on that object.

We do the synchronization by means of a single mutex and a condition variable. Whenever a read or write is attempted, the thread will take the mutex, perform the access and release the mutex. If we're trying to assign a value to a dataflow object that already has a value, the mutex is released and an error is raised. In addition, we already know that threads which attempt to read the object's value have got nothing to do until they get it, so if the dataflow object has no value yet, they wait on the condition variable. This is their way of going to sleep and saying "Wake me when the dataflow object has a value". They are taken out of the set of runnable threads, until a thread which sets the dataflow object's value issues a broadcast signal on the condition variable. Then all the threads which were waiting on the variable become runnable again, and during their next quantum, read and return the value.

Anyway, I'm glad I could play around with Gambit's concurrency system. The experience is going to come in handy for some features I have planned for Gamsock. You'll see -- this is going to be cool.

Enjoy, and happy hacking!

concurrency, scheme, dataflow variables, threads, gambit

Previous post Next post
Up