Teku Event Channels
By Adrian Sutton
Teku uses a really nice framework for separating different components - Event Channels. It’s based on similar patterns in the Sail library used at LMAX for sending network messages between services. In Teku though, it’s designed to work in-process while still decoupling the components in the system. Turns out I never wrote about it here, so I’m very belatedly catching up.
Event channels are defined by declaring a pretty standard interface:
public interface SlotEventsChannel extends VoidReturningChannelInterface {
void onSlot(UInt64 slot);
}
There are a few simple restrictions:
- It must extend from
VoidReturningChannelInterface
(orChannelInterface
but we’ll get to non-void returning cases later) - All methods must return
void
- Methods cannot throw any exceptions
There can be any number of methods on the same interface and any number of subscribers to the channel.
The implementing side simply implements the interface and the calling side simply has an implementation of the interface injected and calls it as normal. So far, this isn’t actually providing any real separation - it’s just using a Java interface. You can pass the concrete implementation of the interface to the calling side and it will all work. The interface provides some decoupling between the caller and receiver, but they’re still coupled temporally because the call is synchronous, and exceptions on the receiving side would propagate back up through the calling side. Both can be fixed to isolate the components fully, but then you have to do that at every call-site.
Instead, the event channel system uses reflection to generate an implementation of the interface that ensures complete isolation between caller and
receiver. The generated implementation is passed to the caller and it implements each method by passing the work to a thread pool, then calling the actual
implementation. It also provides error handling and records metrics to give visibility into the event system. While reflection is used to generate the
implementation most of the code is in abstract classes that the generated implementations extend so it’s easy to maintain. Importantly, the complexity of
that reflection is abstracted away from the code using them framework - it’s just like an interface where part of the API contract is that calls are always
asynchronous and never throw any exceptions. The code for the framework is quite small, all in the infrastructure.events
package.
Calls to the interface are added to the queue the thread pool takes work from in call order. So if the thread pool has a single thread, the calls will all be processed in exactly the same order they were made. In most cases there are multiple threads in the thread pool so processing happens in parallel (but starts in order), but
for cases like the StorageUpdateChannel
where event order is important, a single thread is used.
The VoidReturningChannelInterface
is an ideal case for maximum decoupling of components - the sender is just notifying when events happen and forgetting about them.
But often we need to request data from another component or be able to handle failures. The storage system in Teku is a decoupled component for example. In that case
we use an interface that just extends ChannelInterface
. Then methods are allowed to return SafeFuture
- the promise type used in Teku.
Exceptions are still not allowed, but the returned SafeFuture
can be used to return error information as part of the result. The same implementation applies,
reflection is used to generate an implementation that calls the real implementation via a thread pool, but now when the real implementation completes, the result
is used to complete the originally returned SafeFuture
. For example:
public interface Eth1DepositStorageChannel extends ChannelInterface {
SafeFuture<ReplayDepositsResult> replayDepositEvents();
SafeFuture<Boolean> removeDepositEvents();
}
Note that the actual implementation still provides a method that returns SafeFuture
which allows it to use an asynchronous implementation when suitable. It
can also just use SafeFuture.completedFuture(value)
to return a value synchronously easily. The event system will now only allow a single subscriber to the
topic to ensure it knows where the result value should come from. Since publishers and subscribers are created at startup, if multiple subscribers are added
it means Teku fails to start, a lot of tests fail and it won’t go unnoticed.
There’s a bunch of nice things about this framework:
- EventChannels have “click-throughability”. You can easily jump from a call to the interface to the actual implementation (or see all implementations) using the go to implementation functionality of an IDE. The details of how the decoupling is implemented are all abstracted away.
- The ability to return a value asynchronously is much easier to reason about than having to send responses via a separate event. The request/response is clearly coupled together in the interface rather than piecing together two independent events.
- For testing, the channel interface can be easily mocked, a synchronous event channel passed or a custom stub provided.
One particularly neat trick in Teku is that the validator client can run either within the Teku beacon node process or as a separate process. It’s the event
channel system that makes that work. The validator client was originally built in-process but as it’s own component so all calls to or from it were completely
asynchronous and decoupled through the event channel interfaces. To make it run as an external process we simply wrote an implementation of the channels it called
that worked by sending HTTP requests to the beacon node API rather than using the in-process generated ones. The calls to the validator client were all
timing information like the SlotEventsChannel
above. For most of those we simply wrote a new publisher that ran on an independent timer inside the validator
client. The few that actually depended on the state of the beacon node were produced by subscribing to the beacon node API event stream and sending events
based off of that.
The main downside is that the asynchronicity of the call isn’t visible in the actual code (only in the reflection generated implementation). That’s why by
convention in Teku channel interfaces and the variables for them are always suffixed with Channel
so it is clear that asynchronicity is part of the API contract.
It isn’t immediately obvious to people new to the codebase, but it’s quick to learn and easy to remember so I don’t recall it ever causing any problems in
practice.
Ultimately event channels are a pretty simple system that provides a lot of power and flexibility.