19 June 2015

Idea: Messaging over WebSocket

WebSockets seems to be an ideal protocol for messaging systems. However there are scarcely any specs that take advantage of it. Some of the ones I found are SwaggerSocket and WAMP (Web Application Messaging Protocol, not to be confused with Windows, Apache, MySQL, and PHP). But these don't quite hit the target I'm going for. WAMP, for instance, concentrates on RPC and pub/sub. SwaggerSocket goes a step too far and implements REST over WebSocket. So, I began to wonder what it would take to make an effective messaging system over WebSockets.

The minimum things (I can think of) needed to process a message on the server are: A way to route the message, and a way to deserialize the message payload. An added wrinkle is that web sockets are fully asynchronous, unlike HTTP which is typically request/response with only I/O being asynchronous (on the client anyway).

The perfect candidate to specify routing is a URI path (e.g. /system/boundedcontext/aggregate/message). And my first thought for a "wire" format for the message payload was JSON. So it naturally follows that an HTTP message would fit as a wire format for messaging over a web socket, since it has path and payload. Using HTTP messages, you could also specify different message formats with headers.

However with messaging, one feature we really don't need from HTTP is verb usage. The route (aka message name) is semantic and, if following CQRS patterns, is itself already a verb which conveys intent. Likely, a lot of the other parts of the HTTP spec are also unnecessary for messaging. A default subset could be used so it works with existing HTTP parsers. For instance, always using POST or PUT so a payload can be included. For HTTP verb militants, either verb could be argued: creating new messages to process or sending message payloads which update some other resource.

Aside
Another pattern that some people use is posting messages to return view data. That would typically be the domain of a regular HTTP GET. However, a GET can be insufficient when performing a query with complex parameters, since GETs can't have a message payload attached, and complex query parameters are troublesome. This (messaging for views) is also handy when you need to coordinate resources across different read models / services before returning a coherent view. Such a pattern would also fit nicely into messaging over web socket, but is optional.

How to implement? On the server side, there are already numerous web server implementations which parse and handle HTTP requests. One of these could be leveraged. The next step is to register a route with a handler so the message can be delivered. Deserialization can also be part of the pipeline if the deserialization format (e.g. class) is registered along with the route. The handlers themselves could be agents or actors or whatever computational model suits your fancy. One of the things I'm interested in based on reading about the Orleans framework is the virtual actor approach.

This approach could already be used on top of regular HTTP without websockets, so all the components are already out there. They just need to be assembled in such a way to easily take advantage of websocket performance, and mitigate the difficulty of full asynchrony. Maybe in my spare time... hah!

18 June 2015

Stupid F# Interop Things

I really like F#, but interop with C# libraries can be really annoying. One such example I ran across today was trying to create a value provider for JSON.NET. As part of it, you have to call a protected method on the base object. Since this base method has to be called on a list of things, it has to be called from a lambda. That's where things fall down.

Here's an example from SO on that. It also has links to other questions for more details on the "why".

http://stackoverflow.com/questions/12112377/understanding-a-change-to-protected-base-member-usage-in-f-3-0

The easiest way to make this work is to create a private method on your derived class which calls the protected method. Then in your lamba, you can call your own class private method without issue.

It's irritating and dumb to have to deal with this. Hopefully the F# team will add compiler optimizations to do this for us one day. After all the solution is straight-forward, and you always run into this in a derived class where the base class has protected members.

17 June 2015

The Power of f# Type Providers

Now call me daft, but when I first encountered information on F# Type Providers, I didn't get the big deal. I was looking at JSON parsing, and I ultimately ended up using JSON.NET again, because I didn't get it.

But later, I had reason to need to access a SQL database to import some data into my project. Then I actually tried the SQL Type Provider. In fact, I had such trouble comprehending them because it didn't enter into my mind that their capability could even exist. Having felt the pain of data access layers and object-relational mappers, I shelved the idea that it could be easy.

They automatically and dynamically generate types for your data source. Those types have IntelliSense support in Visual Studio. Basically all you have to do is provide a connection string (for SQL) or example data (for JSON and others).

For those familiar with Entity Framework, it's like that except without having to create and maintain an EDMX model. You can also query the database with "query" computation expressions... similar syntax to LINQ.

For schema-less data types, you just provide example data, and the type provider infers the types from that.

15 April 2015

Nancy with F#, some helpers for routes

Nancy is such a great framework for making HTTP services on the .NET platform, especially if you are using C#. Nancy depends on a lot of the C# trappings like dynamic variables, inheritance, and so forth. However, it's quite unpalatable when used from F#. There isn't a dynamic type in F#. The common alternative is defining my own ? operator to access members parameters. Also defining routes is just plain weird from F#, and only gets weirder when you try to do asynchronous routes. The translation from an async C# lambda is not a smooth one. There's also having to box everything because F# will not do automatic upcasting to System.Object.

I think this is why it seems like there are a good handful of Nancy frameworks for F#. Most of them do more than just provide more sugary syntax for defining routes, however, which is all I really wanted for the time being.

So here are my helper functions for making routes, especially async routes, a bit more palatable from F#.

    let private addRouteSync path f (router:NancyModule.RouteBuilder) =
        router.[path] <-
            fun (parameters:obj) ->
                f (parameters :?> DynamicDictionary) |> box

    // async is the default
    let private addRoute path f (router:NancyModule.RouteBuilder) =
        router.[path, true] <-
            fun (parameters:obj) cancellationToken ->
                async { // unwrap and box the result
                    let! result = f (parameters :?> DynamicDictionary) cancellationToken
                    return box result
                } |> Async.StartAsTask

    // more f# friendly methods
    type NancyModule with
        member me.post path f = me.Post |> addRoute path (f me)
        member me.get path f = me.Get |> addRoute path (f me)
        member me.put path f = me.Put |> addRoute path (f me)
        member me.delete path f = me.Delete |> addRoute path (f me)
        member me.patch path f = me.Patch |> addRoute path (f me)
        member me.options path f = me.Options |> addRoute path (f me)

        member me.postSync path f = me.Post |> addRouteSync path (f me)
        member me.getSync path f = me.Get |> addRouteSync path (f me)
        member me.putSync path f = me.Put |> addRouteSync path (f me)
        member me.deleteSync path f = me.Delete |> addRouteSync path (f me)
        member me.patchSync path f = me.Patch |> addRouteSync path (f me)
        member me.optionsSync path f = me.Options |> addRouteSync path (f me)

Here is a dumb example of usage, asynchronous:

        m.put "/orders/{id:string}"
            <| fun nmodule parameters cancelToken ->
                let id = parameters.["id"].ToString()
                async { return id } // echo id

Now the function you setup receives the parameters as a DynamicDictionary instead of obj. You can also return whatever you want (e.g. Response), and these helpers will box it for you before providing it back to Nancy. Your function can also directly return an async, and these helpers will convert it to a task type which Nancy expects. I'm also passing in the NancyModule in case your code hangs off an F# module (essentially static code) instead of the Nancy module class itself.

I am basically only use the NancyModule as an entry point (like a static main void) and try to remain functionally-styled with my real code.

14 April 2015

Making functions thread safe with agents

The F# MailboxProcessor class is pretty awesome. It allows you to safely run code that would ordinarily be vulnerable to concurrency problems, such as those that use a non-thread-safe collection (like my previously defined CircularDictionary) without worrying about locking.

However, it's a bit of a mess of boilerplate to setup. It also doesn't preserve the nice functional semantics of F#, since you have to use an object. It's also a bit wonky to call, what with the reply channel and all.

But I wouldn't mention it without also having a solution. Here is a little module I threw together that takes a presumably non-thread-safe function, wraps it in an agent to make it thread safe, and returns an identical function which will route the call through the agent.

module Agent =
    /// Wrap the given asynchronous 1 parameter function
    /// in a MailboxProcessor and provide a method to call it.
    let wrap fAsync =
        // create and start a new mailbox processor
        let agent =
            MailboxProcessor.Start
            <| fun inbox -> // inbox is the same as the agent itself
                let rec loop () = async { // define the async message processing loop
                    let! (message, reply) = inbox.Receive() // wait on next message
                    let! result = fAsync message // run async fn
                    reply(result) // reply
                    return! loop () // continue, tail call recursion
                }
                loop () // start the message processing loop
        // create a fn that appropriately posts messages to agent
        let fn x =
            agent.PostAndAsyncReply(fun chan -> (x, chan.Reply))
        fn // return fn

    /// Wrap the given asynchronous 2 parameter function
    /// in a MailboxProcessor and provide a method to call it.
    let wrap2 fAsync2 =
        // wrap two params into 1 tuple
        let fn1 = wrap <| fun (a,b) -> fAsync2 a b
        // convert 2 args to 1 tuple
        fun a b -> fn1 (a,b)

I am defaulting to calling functions that return asynchronous results. That can easily be changed by changing the let! to a let (without the bang). This doesn't really provide a way to stop the agent, but in my case, I don't care if messages are lost on shutdown. It's not hard to make a version that is stoppable if you care about that.

Here's how I'm calling it:

module IdGenerator =

    let Create (esConnection:IEventStoreConnection) idHistorySize =

        // changes to history are not thread safe
        let idHistory = new CircularDictionary<TrackingId, CreatedId>(idHistorySize)

        let generate trackingId prefix =
            ...

        // wrap generate function in an agent, thread-safe for great justice
        Agent.wrap2 generate

Oh yes, and since agents are asynchronous, the result of the wrapped function is asynchronous. At worst, you can call Async.RunSynchronously to wait on message to finish and get the result.

Some links on F# Async:
http://fsharpforfunandprofit.com/posts/concurrency-async-and-parallel/
https://msdn.microsoft.com/en-us/library/dd233250.aspx

12 April 2015

Weak Serialization

When I started implementing JSON-based messaging for the first time, my first step was to make a monolithic object for each use case.

public class UseCase1 // command
{
    public Guid MessageId {get; set;}
    public string TargetId {get; set;}
    public int Version {get; set;}
    public string UseCaseData1 {get; set;}
    public int UseCaseData2 {get; set;}
}

This is actually not a bad way to do it. However, I eventually became annoyed at having the same boilerplate metadata (MessageId, TargetId, Version, for instance) in every message. In grand “I love compiled objects” fashion, I decided it was possible to encapsulate any given message into one segmented contract. Something like this:

public class CommandMessage
{
    public Metadata Meta {get; set;}
    public ICommand Data {get; set;}
}
public class Metadata
{
    public Guid MessageId {get; set;}
    public string TargetId {get; set;}
    public int Version {get; set;}
}
public interface ICommand { }
public class UseCase1 : ICommand
{
    public string SomeData {get; set;}
}
I have the data and metadata separated. However, this doesn’t quite sit right. Message construction on the client now deals with 2 objects. Ok, not a big deal... You also have to make sure that you can deserialize to the proper type. This involves a type hint as part of the data the client sends. No problem I guess…

But eventually it hit me that my compiled object obsession was missing one of the biggest advantages of the JSON format. It’s something I first heard on the DDD/CQRS groups called weak serialization. And once you see it, it’s so obvious. So let me boldly state the obvious with code.

public class Metadata
{
    public Guid MessageId {get; set;}
    public string TargetId {get; set;}
    public int Version {get; set;}
}
public class UseCase1
{
    public string UseCaseData1 {get; set;}
    public int UseCaseData2 {get; set;}
}
JSON sent from client
{
    MessageId: "...",
    TargetId: "asdf-1",
    Version: 0,
    UseCaseData1: "asdf",
    UseCaseData2: 7
}
And the data turned into server objects
var meta = JsonConvert
    .Deserialize<MetaData>(requestBodyString);
var command = JsonConvert .Deserialize<UseCase1>(requestBodyString);
Yes, duh.

Lesson: don’t treat the JSON message as an “object”. Treat it as a data container (dictionary) which could represent multiple objects. This also frees me to add arbitrary meta information to any message without affecting other parts of the system… naming conflicts not withstanding.

09 April 2015

Circular Dictionary, where are you?

It is surprisingly hard to find a .NET implementation of a "circular dictionary".

(At least not in the 15 minutes that I looked for one)

In my definition, it's a dictionary for key-based lookups that won't exceed a set capacity. It does this by removing oldest entries to make room for new ones. My use case for this to keep a limited history of the last ??? IDs which have been issued. That way, if a client has a transient network failure, they can just retry with the same request ID, and I will provide the same answer back. I could even save the history periodically so it could be loaded on startup if the failure was server-side. However, the capacity limit is needed so that memory usage doesn't grow unbounded over time.

I originally looked at using an OrderedDictionary for this purpose, but it's implementation is such that a Remove operation is O(n), because it copies all the elements of the array down when one is removed.

The way around that is to use a circular buffer instead of an array that is always strictly ordered. In other words, when the array fills up, it just loops back around to the beginning and starts replacing existing entries.

However, it took me a surprising amount of thought to come up with a solution. And after all that mind churn, the implementation isn't even complicated. You will have to forgive the very OO-centric F#, though.

namespace Xe.DataStructures

open System.Collections.Generic

type CircularDictionary<'key, 'value when 'key : equality>(capacity: int) =
    inherit Dictionary<'key, 'value>(capacity)

    let maxIndex = capacity - 1
    let buffer = Array.zeroCreate<'key> capacity
    let mutable isFull : bool = false
    let mutable next : int = 0

    member me.CircularAdd(key:'key, value:'value) =
        let put () =
            me.Add(key, value)
            buffer.[next] <- key
        
        let moveNext () =
            if next = maxIndex then
                isFull <- true
                next <- 0
            else
                next <- next + 1

        let clear () =
            me.Remove(buffer.[next])
            |> ignore

        if isFull then clear()
        put()
        moveNext()

The unit tests for this are stupid -- too long and boring to post. It's akin to calling CircularAdd, then verifying the count, verifying the last X items are still in the dictionary, and verifying that ones added before that are not.

Obviously things: Not thread safe. Call CircularAdd after checking that the key doesn't already exist. Initialize with positive capacity. At least my use case obviates all these things, so I didn't bother with them. Feel free to use, but I'm not responsible for it breaking when you use it. :)