14 April 2015

Making functions thread safe with agents

The F# MailboxProcessor class is pretty awesome. It allows you to safely run code that would ordinarily be vulnerable to concurrency problems, such as those that use a non-thread-safe collection (like my previously defined CircularDictionary) without worrying about locking.

However, it's a bit of a mess of boilerplate to setup. It also doesn't preserve the nice functional semantics of F#, since you have to use an object. It's also a bit wonky to call, what with the reply channel and all.

But I wouldn't mention it without also having a solution. Here is a little module I threw together that takes a presumably non-thread-safe function, wraps it in an agent to make it thread safe, and returns an identical function which will route the call through the agent.

module Agent =
    /// Wrap the given asynchronous 1 parameter function
    /// in a MailboxProcessor and provide a method to call it.
    let wrap fAsync =
        // create and start a new mailbox processor
        let agent =
            <| fun inbox -> // inbox is the same as the agent itself
                let rec loop () = async { // define the async message processing loop
                    let! (message, reply) = inbox.Receive() // wait on next message
                    let! result = fAsync message // run async fn
                    reply(result) // reply
                    return! loop () // continue, tail call recursion
                loop () // start the message processing loop
        // create a fn that appropriately posts messages to agent
        let fn x =
            agent.PostAndAsyncReply(fun chan -> (x, chan.Reply))
        fn // return fn

    /// Wrap the given asynchronous 2 parameter function
    /// in a MailboxProcessor and provide a method to call it.
    let wrap2 fAsync2 =
        // wrap two params into 1 tuple
        let fn1 = wrap <| fun (a,b) -> fAsync2 a b
        // convert 2 args to 1 tuple
        fun a b -> fn1 (a,b)

I am defaulting to calling functions that return asynchronous results. That can easily be changed by changing the let! to a let (without the bang). This doesn't really provide a way to stop the agent, but in my case, I don't care if messages are lost on shutdown. It's not hard to make a version that is stoppable if you care about that.

Here's how I'm calling it:

module IdGenerator =

    let Create (esConnection:IEventStoreConnection) idHistorySize =

        // changes to history are not thread safe
        let idHistory = new CircularDictionary<TrackingId, CreatedId>(idHistorySize)

        let generate trackingId prefix =

        // wrap generate function in an agent, thread-safe for great justice
        Agent.wrap2 generate

Oh yes, and since agents are asynchronous, the result of the wrapped function is asynchronous. At worst, you can call Async.RunSynchronously to wait on message to finish and get the result.

Some links on F# Async:

No comments: