However, it's a bit of a mess of boilerplate to setup. It also doesn't preserve the nice functional semantics of F#, since you have to use an object. It's also a bit wonky to call, what with the reply channel and all.
But I wouldn't mention it without also having a solution. Here is a little module I threw together that takes a presumably non-thread-safe function, wraps it in an agent to make it thread safe, and returns an identical function which will route the call through the agent.
module Agent =
/// Wrap the given asynchronous 1 parameter function
/// in a MailboxProcessor and provide a method to call it.
let wrap fAsync =
// create and start a new mailbox processor
let agent =
MailboxProcessor.Start
<| fun inbox -> // inbox is the same as the agent itself
let rec loop () = async { // define the async message processing loop
let! (message, reply) = inbox.Receive() // wait on next message
let! result = fAsync message // run async fn
reply(result) // reply
return! loop () // continue, tail call recursion
}
loop () // start the message processing loop
// create a fn that appropriately posts messages to agent
let fn x =
agent.PostAndAsyncReply(fun chan -> (x, chan.Reply))
fn // return fn
/// Wrap the given asynchronous 2 parameter function
/// in a MailboxProcessor and provide a method to call it.
let wrap2 fAsync2 =
// wrap two params into 1 tuple
let fn1 = wrap <| fun (a,b) -> fAsync2 a b
// convert 2 args to 1 tuple
fun a b -> fn1 (a,b)
/// Wrap the given asynchronous 1 parameter function
/// in a MailboxProcessor and provide a method to call it.
let wrap fAsync =
// create and start a new mailbox processor
let agent =
MailboxProcessor.Start
<| fun inbox -> // inbox is the same as the agent itself
let rec loop () = async { // define the async message processing loop
let! (message, reply) = inbox.Receive() // wait on next message
let! result = fAsync message // run async fn
reply(result) // reply
return! loop () // continue, tail call recursion
}
loop () // start the message processing loop
// create a fn that appropriately posts messages to agent
let fn x =
agent.PostAndAsyncReply(fun chan -> (x, chan.Reply))
fn // return fn
/// Wrap the given asynchronous 2 parameter function
/// in a MailboxProcessor and provide a method to call it.
let wrap2 fAsync2 =
// wrap two params into 1 tuple
let fn1 = wrap <| fun (a,b) -> fAsync2 a b
// convert 2 args to 1 tuple
fun a b -> fn1 (a,b)
I am defaulting to calling functions that return asynchronous results. That can easily be changed by changing the let! to a let (without the bang). This doesn't really provide a way to stop the agent, but in my case, I don't care if messages are lost on shutdown. It's not hard to make a version that is stoppable if you care about that.
Here's how I'm calling it:
Here's how I'm calling it:
module IdGenerator =
let Create (esConnection:IEventStoreConnection) idHistorySize =
// changes to history are not thread safe
let idHistory = new CircularDictionary<TrackingId, CreatedId>(idHistorySize)
let generate trackingId prefix =
...
// wrap generate function in an agent, thread-safe for great justice
Agent.wrap2 generate
let Create (esConnection:IEventStoreConnection) idHistorySize =
// changes to history are not thread safe
let idHistory = new CircularDictionary<TrackingId, CreatedId>(idHistorySize)
let generate trackingId prefix =
...
// wrap generate function in an agent, thread-safe for great justice
Agent.wrap2 generate
Oh yes, and since agents are asynchronous, the result of the wrapped function is asynchronous. At worst, you can call Async.RunSynchronously to wait on message to finish and get the result.
Some links on F# Async:
http://fsharpforfunandprofit.com/posts/concurrency-async-and-parallel/
https://msdn.microsoft.com/en-us/library/dd233250.aspx
No comments:
Post a Comment