Skip to content

Routing in the cluster

When a command is received by your code in Aeron Cluster, all it receives is:

1
2
3
4
5
6
public void onSessionMessage(ClientSession session, long timestamp,
                                DirectBuffer buffer, int offset, int length,
                                Header header)
{
    //do something
}

Inbound messages will need to adapted from the data on the buffer. Exactly how this is done is dependent on how you've implemented the commands. In this sample, we're using Simple Binary Encoding, which offers a TEMPLATE_ID stored within the MessageHeader. This can be used to route code efficiently.

Adapter

In the sample, the SbeAdapter reads the TemplateIDs the MessageHeaders in the buffer, and then routes the code to the correct method:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
//in AppClusteredService.java
@Override
public void onSessionMessage(
    final ClientSession session,
    final long timestamp,
    final DirectBuffer buffer,
    final int offset,
    final int length,
    final Header header)
{
    context.setSessionContext(session, timestamp);
    sbeAdapter.dispatch(buffer, offset, length);
}

//in SbeAdapter.java
public void dispatch(final DirectBuffer buffer, final int offset, final int length)
{
    if (length < MessageHeaderDecoder.ENCODED_LENGTH)
    {
        LOGGER.error("Message too short, ignored.");
        return;
    }
    headerDecoder.wrap(buffer, offset);

    switch (headerDecoder.templateId())
    {
        case AddInstrumentDecoder.TEMPLATE_ID -> addInstrument(buffer, offset);
        case SetInstrumentEnabledFlagDecoder.TEMPLATE_ID -> setInstrumentEnabledFlag(buffer, offset);
        case InstrumentRecordEncoder.TEMPLATE_ID -> initializeInstrument(buffer, offset);
        case ListInstrumentsCommandDecoder.TEMPLATE_ID -> listInstruments(buffer, offset);
        case CreateRfqCommandDecoder.TEMPLATE_ID -> createRfq(buffer, offset);
        case CancelRfqCommandDecoder.TEMPLATE_ID -> cancelRfq(buffer, offset);
        case QuoteRfqCommandDecoder.TEMPLATE_ID -> quoteRfq(buffer, offset);
        case CounterRfqCommandDecoder.TEMPLATE_ID -> counterRfq(buffer, offset);
        case AcceptRfqCommandDecoder.TEMPLATE_ID -> acceptRfq(buffer, offset);
        case RejectRfqCommandDecoder.TEMPLATE_ID -> rejectRfq(buffer, offset);
        default -> LOGGER.error("Unknown message template {}, ignored.", headerDecoder.templateId());
    }
}