A production gen_event application

December 21, 2016

I've been blogging about Erlang since June, so it's about time. My first production application records email sends and opens, exposes the metrics via Prometheus and uses event sourcing, which I discuss in detail.

A letter slot on an old, banged up door.
© 2016 Clem Onojeghuo for Unsplash

My first production Erlang application.

This month, I went live with a small application to record emails sent and opened, and has basic auth, is proxied behind nginx and exposes metrics to Prometheus. Not surprisingly, it combines a bunch of the techniques I blogged about here.

The application has the following modules:

  1. metrics.erl: the application behavior
    • starts a gen_event server and registers listeners
    • starts prometheus
    • starts a supervisor
  2. metrics_sup.erl: the supervisor behavior
    • loads credentials from disk
    • starts the Elli web server
  3. rest_handler.erl: the elli_handler behavior
    • exposes a RESTful api
    • notifies gen_event server of events
  4. email_event.erl: a gen_event behavior
    • accumulates statistics
    • exposes stats to Prometheus
  5. log_event.erl: a gen_event behavior
    • logs each event to disk
    • on init, “replays” each event to gen_event server

Event sourcing in 65 lines.

The most interesting part of this application is the event sourcing piece. It was quite simple to write in Erlang. I’ll step through the code below and then give some specific examples of how event sourcing was helpful.

The RESTful API fires events for each valid PUT and the log_event.erl gen_event behavior writes every event to disk. When the server restarts, log_event.erl reads the event log from disk and replays them through the gen_event server. This is an example of event sourcing.

Reading and writing events.

Erlang makes reading and writing events particularly easy, as the io:read/2 function supports reading an Erlang term directly from a file handle. Writing an event is also simple: Note the period printed at the end of the term (after the ~p)!


    % Write event to event log.
    write_event(F, Event, Id) ->
        Msg = [{id, Id}, {event, Event}],
        ok = io:format(F, "~p.~n", [Msg]),
        ok.
    

and here is the code to read events from the log file:


    % Read event from event log.
    read_event(F) ->
        case io:read(F, ”) of
          {ok, Term} ->
              Id = proplists:get_value(id, Term),
              Event = proplists:get_value(event, Term),
              {ok, Id, Event};
          eof -> eof
        end.
    

Handling any event

In Erlang, events are arbitrary terms so the above code will read and write any event. Also, since event handlers use their mailbox to process events one-by-one, we can maintain an event id in the handler state and write that to disk with each event:


    handle_event([replay | _], State) -> {ok, State};
    % Write event to disk with next id.
    handle_event(Event, State) ->
        Id = proplists:get_value(id, State) + 1,
        F = proplists:get_value(file, State),
        write_event(F, Event, Id),
        {ok, [{file, F}, {id, Id}]}.
    

Replaying Events

If your eye was sharp, you might have wondered about this line handle_event([replay | _], State) -> {ok, State}; in the code above. This event listener has the added responsibility of replaying the events on disk when the listener is initialized, and we don’t want to log the replayed events to disk again!

The replay logic is:

  1. open the event log file
  2. for each event in the file,
  3. notify the gen_event server of the event with a “replay” prefix

We use the replay prefix to allow the event handlers to distinguish between the original event and the replayed event, as the log_event.erl handler does above.


    % Replay all events in log.  Returns id of the last event logged.
    replay_events(File, LastId) ->
        case read_event(File) of
          {ok, Id, Event} ->
              ok = gen_event:notify(event_dispatcher,
                                    [replay | Event]),
              replay_events(File, Id);
          eof -> LastId
        end.
    
    init(Args) ->
        [LogFile | _] = Args,
        case file:open(LogFile, [read, binary]) of
          {ok, F} ->
              LastId = replay_events(F, 0), ok = file:close(F);
          {error, enoent} -> LastId = 0
        end,
        io:format("using event log ~p~n", [LogFile]),
        {ok, F1} = file:open(LogFile, [append, binary]),
        {ok, [{file, F1}, {id, LastId}]}.
    

The rest of the code is scaffolding, things like exports and empty behavior method implementations to silence the compiler. The full module source is included at the end of this blog post.

How was event sourcing helpful?

The biggest win: changing metric units with no data loss.

Just awesome: change the duration unit from _minutes to _hours … restart the app … and bingo, all the metrics have changed!

Replaying the events feeds all of them back through the (new) metric logic and so it recalculates the metrics using the new units. That would work for any new metric you want to add as well.

A caveat with using Prometheus is that since it only collects the current state, when you rebuild a metric you do need to delete the prometheus data, which means you lose the time series data in prometheus.

Using a text event log was handy.

Initially, I had a bug in the event term I was creating. After fixing that, I was ale to edit the event log and correct the bad entries. So no data loss.

Debugging (and testing) was made simpler.

To debug, copy the production event log locally, and start the app. You have exactly the same data that is in production, which makes things easier to debug.

Testing was likewise simplified to creating a log file that setup the test condition the way you wanted, then just start up the app pointing to that log file.

Nice development environment.

It was very nice to be able to edit the log file, restart the server and have the read models rebuilt so you could check their results. It made it simple to test new code against real data.

log_event.erl


    % Maintain event ID sequence and serialize events to disk.
    % Replay events on a restart.
    -module(log_event).
    
    -behaviour(gen_event).
    
    -export([code_change/3, handle_call/2, handle_event/2,
             handle_info/2, init/1, terminate/2]).
    
    -export([read_event/1, write_event/3]).
    
    % Write event to event log.
    write_event(F, Event, Id) ->
        Msg = [{id, Id}, {event, Event}],
        ok = io:format(F, "~p.~n", [Msg]),
        ok.
    
    % Read event from event log.
    read_event(F) ->
        case io:read(F, ”) of
          {ok, Term} ->
              Id = proplists:get_value(id, Term),
              Event = proplists:get_value(event, Term),
              {ok, Id, Event};
          eof -> eof
        end.
    
    % Replay all events in log.  Returns id of the last event logged.
    replay_events(File, LastId) ->
        case read_event(File) of
          {ok, Id, Event} ->
              ok = gen_event:notify(event_dispatcher,
                                    [replay | Event]),
              %{ok, EmailMetricState} = email_matrics:handle_event([replay|Event], []),
              replay_events(File, Id);
          eof -> LastId
        end.
    
    init(Args) ->
        [LogFile | _] = Args,
        case file:open(LogFile, [read, binary]) of
          {ok, F} ->
              LastId = replay_events(F, 0), ok = file:close(F);
          {error, enoent} -> LastId = 0
        end,
        io:format("using event log ~p~n", [LogFile]),
        {ok, F1} = file:open(LogFile, [append, binary]),
        {ok, [{file, F1}, {id, LastId}]}.
    
    handle_event([replay | _], State) -> {ok, State};
    % Write event to disk with next id.
    handle_event(Event, State) ->
        Id = proplists:get_value(id, State) + 1,
        F = proplists:get_value(file, State),
        write_event(F, Event, Id),
        {ok, [{file, F}, {id, Id}]}.
    
    handle_call(_Request, State) -> {ok, noreply, State}.
    
    code_change(_OldVsn, State, _Extra) -> {ok, State}.
    
    handle_info(_Info, State) -> {noreply, State}.
    
    terminate(_Args, State) ->
        F = proplists:get_value(file, State), file:close(F), ok.
    
< section>

Closing Thoughts

Memory Performance

I didn’t measure performance at all because my application is such low volume; a weekly email sent to ten people. But if events start arriving faster then they can be written to disk, the mailbox of the log_event module will grow and grow and eventually the Erlang process will crash.

And since it is a gen_event, I think the events would just stop being logged.

When that got close, I would probably just surface a queue length metric (or, more simply, a RAM usage metric) to Prometheus and set up an alert on some threshold.

I would also read Queues Don’t Fix Overload.

Currently, the beam process has RSS of 14.6 MB and Prometheus uses 29.3 MB. This is on a cheap RAM Host server with 255 MB of RAM, so I’ll need to keep an eye on that.

Startup Performance

As the event log gets huge, startup time (which includes replaying the entire log), will get slow. I’m not holding my breath on that one, as in my experience file-based stuff is surprisingly fast.

But if it does become a problem, I’ll look at adding a snapshot. See What is snapshotting? in the CQRS FAQ.

Make your events rich with attributes

Who knows what future read model you may want to build. If you raise lots of events, and those events are rich in attributes, then you can add a read model at a later date and when you replay the event log, you have a new metric that spans the entire history of your system.

Next time I build this, I will add events for key system occurrences, even if I have no read models at the time. The ability to add a read model at a later date and have it reflect your entire system’s history is a sweet feature of event sourcing.

Tags: erlang