Skip to content

Basic Sample

Note

To create a basic clustered Aeron service, we will do the following:

  • Define a clustered service. This is the service holding the business logic within the cluster, and it will run in a Clustered Service Container
  • Build up the configuration for a single node cluster
  • Configure a client to communicate with the cluster
  • Send a message to the cluster, and poll for responses

Converting the Replicated State Machine

The sample state machine within the Replicated State Machine can be easily adapted to run within an Aeron Clustered Container. The commands to the state machine are no longer messages, but rather raw long and integer values that are moved over the wire via Simple Binary Encoding flyweights. The SBE message definitions are on GitHub.

In earlier editions of this page, the RSM cluster logic and some messaging logic were intertwined, which has proven to be somewhat painful with larger replicated state machines.

The updated cluster hosted replicated state machine logic is now simply:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
public class ReplicatedStateMachine
{
    private final Logger logger = LoggerFactory.getLogger(ReplicatedStateMachine.class);
    private int currentValue;

    public void add(final long correlation, final int addValue)
    {
        currentValue += addValue;
        logger.info("adding {}, value is now {}; correlation = {}", addValue, currentValue, correlation);
    }

    public void multiply(final long correlation, final int multiplyValue)

    {
        currentValue *= multiplyValue;
        logger.info("multiplying by {}, value is now {}; correlation = {}", multiplyValue, currentValue, correlation);
    }

    public void setCurrentValue(final long correlation, final int newValue)
    {
        currentValue = newValue;
        logger.info("setting value to {}; correlation = {}", newValue, correlation);
    }

    public void loadFromSnapshot(final int currentValue)
    {
        logger.info("reading snapshot with current value at {}", currentValue);
    }

    public int getCurrentValue()
    {
        return currentValue;
    }
}

To join the Aeron Cluster infrastructure with the replicated state machine above, we need to adapt messages as all that Aeron provides is single call to receive any client messages within the ClusteredService:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
@Override
public void onSessionMessage(
    ClientSession session,
    long timestamp,
    DirectBuffer buffer,
    int offset,
    int length,
    Header header)
{
    rsmAdapter.setSession(session);
    rsmAdapter.onFragment(buffer, offset, length, header);
}

Note the setSession(session) call above. This captures the client session that sent in the message in the adapter. We will use this later to return the current value.

Message adaption from SBE to the source code is performed by the RsmAdapter, as shown in this snippet below. Note that the messages are encoded with Simple Binary Encoding, which has generated the Encoders and Decoders for us.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
@Override
public void onFragment(final DirectBuffer buffer, final int offset, final int length, final Header header)
{
    messageHeaderDecoder.wrap(buffer, offset);
    final int templateId = messageHeaderDecoder.templateId();
    switch (templateId)
    {
        case AddCommandDecoder.TEMPLATE_ID ->
        {
            addCommand.wrapAndApplyHeader(buffer, offset, messageHeaderDecoder);
            stateMachine.add(addCommand.correlation(), addCommand.value());
            emitCurrentValue(returnBuffer, addCommand.correlation());
        }
        case MultiplyCommandDecoder.TEMPLATE_ID ->
        {
            multiplyCommand.wrapAndApplyHeader(buffer, offset, messageHeaderDecoder);
            stateMachine.multiply(multiplyCommand.correlation(), multiplyCommand.value());
            emitCurrentValue(returnBuffer, multiplyCommand.correlation());
        }
        case SetCommandDecoder.TEMPLATE_ID ->
        {
            setCommand.wrapAndApplyHeader(buffer, offset, messageHeaderDecoder);
            stateMachine.setCurrentValue(setCommand.correlation(), setCommand.value());
            emitCurrentValue(returnBuffer, setCommand.correlation());
        }
        case SnapshotDecoder.TEMPLATE_ID ->
        {
            snapshotDecoder.wrapAndApplyHeader(buffer, offset, messageHeaderDecoder);
            stateMachine.loadFromSnapshot(snapshotDecoder.value());
        }
        default -> logger.error("Unknown message {}", templateId);
    }
}

Along with commands to manipulate the replicated state machine, we also need the ability to reload the cluster state with the correct value using snapshots. The ClusteredService has a method onTakeSnapshot, which provides snapshot support. Using the takeSnapshot above, an implementation using Simple Binary Encoding for writing the snapshot is:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
@Override
public void onTakeSnapshot(final ExclusivePublication snapshotPublication)
{
    log.info("taking snapshot");
    final ExpandableDirectByteBuffer buffer = new ExpandableDirectByteBuffer(64);

    messageHeaderEncoder.wrap(buffer, 0);
    snapshotEncoder.wrapAndApplyHeader(buffer, 0, messageHeaderEncoder);
    snapshotEncoder.value(stateMachine.getCurrentValue());

    //offer SBE encoded buffer to the snapshot publication
    snapshotPublication.offer(buffer, 0,
        MessageHeaderEncoder.ENCODED_LENGTH + snapshotEncoder.encodedLength());
}

To read the snapshot, the onStart method will need to be provided with logic to read the snapshot. We are going to reuse the adapter for this:

1
2
3
4
5
6
7
8
@Override
public void onStart(Cluster cluster, Image snapshotImage)
{
    if (snapshotImage != null)
    {
        snapshotImage.poll(rsmAdapter, 1);
    }
}

Now that we have a state machine that accepts commands, and can read and write snapshots in the cluster, we need to return the current value back to clients. For this sample, we're going to include this in the RsmAdapter, but in more elaborate scenarios it is typically a dedicated object. Earlier, in the ClusteredService, we captured the client Session in a session variable. We need to use that Session object in order to reply directly to the cluster client, as follows:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
private void emitCurrentValue(final ExpandableDirectByteBuffer buffer, final long correlationId)
{
    //setting up simple binary encoding
    messageHeaderEncoder.wrap(returnBuffer, 0);
    currentValue.wrapAndApplyHeader(returnBuffer, 0, messageHeaderEncoder);

    //encode the correlation
    currentValue.correlation(correlationId);

    //encode the current value
    currentValue.value(stateMachine.getCurrentValue());

    //offer the encoded buffer to the client session 
    session.offer(buffer, 0, MessageHeaderEncoder.ENCODED_LENGTH + currentValue.encodedLength());
}

Building the client

For this basic sample, the client will be sending in a mixture of AddCommand and MultiplyCommand commands, and receiving the emitted CurrentValueEvent. To receive messages, the code must implement EgressListener, and pass that to the Aeron Cluster client. Much like with the cluster, we need to adapt the inbound SBE messages from the cluster:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
@Override
public void onMessage(
    final long clusterSessionId,
    final long timestamp,
    final DirectBuffer buffer,
    final int offset,
    final int length,
    final Header header)
{
    messageHeaderDecoder.wrap(buffer, offset);
    final int templateId = messageHeaderDecoder.templateId();
    if (templateId == CurrentValueEventDecoder.TEMPLATE_ID)
    {
        currentValue.wrapAndApplyHeader(buffer, offset, messageHeaderDecoder);
        final long correlationId = currentValue.correlation();
        final long value = currentValue.value();

        //correlation ID is used to control state in this simple example
        if (correlation == MESSAGES_TO_SEND) 
        {
            allResultsReceived = true;
        }
        log.info("Current value is {}; correlation = {}", value, correlationId);
    }
    else
    {
        log.warn("unknown message {}", templateId);
    }
}

The onMessage method is only called if the egress is polled using clusterClient.pollEgress(), for example here it is being called within an offer:

1
2
3
4
5
6
7
private void offer(MutableDirectBuffer buffer, int offset, int length)
{
    while (clusterClient.offer(buffer, offset, length) < 0)
    {
        idleStrategy.idle(clusterClient.pollEgress());
    }
}

You can also call clusterClient.pollEgress() in your Duty Cycle.

Running the Sample

Warning

With JDK 17+, you must run both the client and cluster with the following jvm argument: --add-opens=java.base/sun.nio.ch=ALL-UNNAMED

Steps to run the samples:

  • in the aeron-cookbook-code root folder, run the gradle build with ./gradlew.
  • build the executable single jars for the Rsm Cluster and Rsm Client using ./gradlew uberRsmCluster uberRsmClient
  • in one terminal window, move into the build/libs folder under aeron-cookbook-code/cluster-rsm root folder
  • run the cluster with java --add-opens=java.base/sun.nio.ch=ALL-UNNAMED -jar cluster-rsm-uber-cluster.jar
  • in another terminal window, move into the build/libs folder under aeron-cookbook-code/cluster-rsm root folder
  • run the client with java --add-opens=java.base/sun.nio.ch=ALL-UNNAMED -jar cluster-rsm-uber-client.jar

Both applications can be exited using Ctrl+C.

First run of cluster

Cluster directory is /.../.../.../aeron-cluster-0/cluster
10:50:19.893 [clustered-service] Cluster Node is in role LEADER

Client connects

Sample client log:

10:53:44.929 [main] Starting
10:53:44.931 [main] Adding 1; correlation = 1
10:53:44.934 [main] Adding 2; correlation = 2
10:53:44.936 [main] Adding 3; correlation = 3
10:53:44.937 [main] Adding 4; correlation = 4
10:53:44.939 [main] Adding 5; correlation = 5
10:53:44.940 [main] Adding 6; correlation = 6
10:53:44.943 [main] Current value is 1; correlation = 1
10:53:44.943 [main] Current value is 3; correlation = 2
10:53:44.943 [main] Current value is 6; correlation = 3
10:53:44.943 [main] Current value is 10; correlation = 4
10:53:44.943 [main] Current value is 15; correlation = 5
10:53:44.944 [main] Current value is 21; correlation = 6
...
10:53:45.027 [main] Adding 96; correlation = 96
10:53:45.028 [main] Adding 97; correlation = 97
10:53:45.029 [main] Adding 98; correlation = 98
10:53:45.030 [main] Current value is 7725; correlation = 96
10:53:45.030 [main] Multiplying by 2; correlation = 99
10:53:45.031 [main] Adding 100; correlation = 100
10:53:45.031 [main] Current value is 7822; correlation = 97
10:53:45.031 [main] Current value is 7920; correlation = 98
10:53:45.031 [main] Current value is 15840; correlation = 99
10:53:45.038 [main] Current value is 15940; correlation = 100
10:53:45.038 [main] Done

Sample cluster log:

10:53:44.938 [clustered-service] adding 1, value is now 1; correlation = 1
10:53:44.939 [clustered-service] adding 2, value is now 3; correlation = 2
10:53:44.939 [clustered-service] adding 3, value is now 6; correlation = 3
10:53:44.939 [clustered-service] adding 4, value is now 10; correlation = 4
10:53:44.940 [clustered-service] adding 5, value is now 15; correlation = 5
10:53:44.941 [clustered-service] adding 6, value is now 21; correlation = 6
...
10:53:45.029 [clustered-service] adding 96, value is now 7725; correlation = 96
10:53:45.030 [clustered-service] adding 97, value is now 7822; correlation = 97
10:53:45.030 [clustered-service] adding 98, value is now 7920; correlation = 98
10:53:45.030 [clustered-service] multiplying by 2, value is now 15840; correlation = 99
10:53:45.035 [clustered-service] adding 100, value is now 15940; correlation = 100
10:53:50.038 [clustered-service] Cluster Client Session closed

Restarting the cluster

In the sample, the cluster has deleteDirOnStart set to false which means that restarting the cluster will result in the messages already sent in being replayed, and the cluster state is restored to the value in the first run:

10:58:03.682 [clustered-service] Cluster Client Session opened
10:58:03.685 [clustered-service] adding 1, value is now 1; correlation = 1
10:58:03.685 [clustered-service] adding 2, value is now 3; correlation = 2
...
10:58:03.685 [clustered-service] adding 98, value is now 7920; correlation = 98
10:58:03.685 [clustered-service] multiplying by 2, value is now 15840; correlation = 99
10:58:03.685 [clustered-service] adding 100, value is now 15940; correlation = 100
10:58:03.685 [clustered-service] Cluster Client Session closed
10:58:03.685 [clustered-service] Cluster Node is in role LEADER

Then, when we connect the client, the existing value in the state machine is used:

11:03:43.150 [main] Starting
11:03:43.152 [main] Adding 1; correlation = 1
11:03:43.155 [main] Adding 2; correlation = 2
11:03:43.157 [main] Adding 3; correlation = 3
11:03:43.158 [main] Adding 4; correlation = 4
11:03:43.160 [main] Current value is 15941; correlation = 1
11:03:43.160 [main] Current value is 15943; correlation = 2
11:03:43.160 [main] Current value is 15946; correlation = 3
11:03:43.160 [main] Current value is 15950; correlation = 4
...
11:03:43.249 [main] Adding 96; correlation = 96
11:03:43.249 [main] Adding 97; correlation = 97
11:03:43.250 [main] Adding 98; correlation = 98
11:03:43.251 [main] Multiplying by 2; correlation = 99
11:03:43.252 [main] Current value is 71485; correlation = 96
11:03:43.252 [main] Current value is 71582; correlation = 97
11:03:43.252 [main] Adding 100; correlation = 100
11:03:43.252 [main] Current value is 71680; correlation = 98
11:03:43.254 [main] Current value is 143360; correlation = 99
11:03:43.254 [main] Current value is 143460; correlation = 100
11:03:43.254 [main] Done

The cluster logs then show:

11:03:43.139 [clustered-service] Cluster Client Session opened
11:03:43.157 [clustered-service] adding 1, value is now 15941; correlation = 1
11:03:43.158 [clustered-service] adding 2, value is now 15943; correlation = 2
11:03:43.158 [clustered-service] adding 3, value is now 15946; correlation = 3
11:03:43.159 [clustered-service] adding 4, value is now 15950; correlation = 4
...
11:03:43.250 [clustered-service] adding 96, value is now 71485; correlation = 96
11:03:43.250 [clustered-service] adding 97, value is now 71582; correlation = 97
11:03:43.251 [clustered-service] adding 98, value is now 71680; correlation = 98
11:03:43.253 [clustered-service] multiplying by 2, value is now 143360; correlation = 99
11:03:43.253 [clustered-service] adding 100, value is now 143460; correlation = 100
11:03:48.255 [clustered-service] Cluster Client Session closed

Using Cluster Tool to take snapshots

For information on cluster tool, see Operating Aeron Cluster.

Note

  • This assumes that you have downloaded the latest Aeron jar, and you have moved into the correct directory (as logged by the cluster process on start) holding cluster-mark.dat.

Once we request a snapshot with the cluster tool, using the command:

java -cp aeron-all-*.jar io.aeron.cluster.ClusterTool . snapshot
...
INFO: Mark file exists: ./cluster-mark.dat
Member [0]: SNAPSHOT applied successfully

Once executed, the cluster log will show:

11:04:08.710 [clustered-service] taking snapshot
11:04:08.711 [clustered-service] taking snapshot with current value at 143460

This will write the snapshot state, and truncate the cluster log. When restarted, the cluster log then shows the snapshot load:

11:05:44.489 [clustered-service] loading snapshot
11:05:44.489 [clustered-service] reading snapshot with current value at 143460
11:05:44.512 [clustered-service] Cluster Node is in role LEADER