Skip to content

Basic Sample

Note

To create a basic clustered Aeron service, we will do the following:

  • Define a clustered service. This is the service holding the business logic within the cluster, and it will run in a Clustered Service Container
  • Build up the configuration for a single node cluster
  • Configure a client to communicate with the cluster
  • Send a message to the cluster, and poll for responses

Converting the Replicated State Machine

The sample state machine within the Replicated State Machine can be easily adapted to run within an Aeron Clustered Container. The commands are now generated Eider flyweights which are stored in the rsm.gen namespace. In the context of the sample, they behave exactly the same, but now operate directly on the DirectBuffer provided in the cluster client.

The other biggest change is that we need to return the result differently - we need to encode a message in a buffer so that it may be offered back to the Cluster Client Session that sent in the message. This is managed in the prepareCurrentValueEvent method. Note that the state machine also supports taking a snapshot (takeSnapshot), and loading from a snapshot (loadFromSnapshot).

They key parts of the state machine logic are highlighted below.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
public class ReplicatedStateMachine
{
    final Snapshot snapshot = new Snapshot();
    final CurrentValueEvent currentValueEvent = new CurrentValueEvent();
    private int currentValue;

    public void add(AddCommand addCommand,
                    ExpandableDirectByteBuffer returnBuffer)
    {
        currentValue += addCommand.readValue();
        prepareCurrentValueEvent(returnBuffer);
    }

    public void multiply(MultiplyCommand multiplyCommand,
                            ExpandableDirectByteBuffer returnBuffer)
    {
        currentValue *= multiplyCommand.readValue();
        prepareCurrentValueEvent(returnBuffer);
    }

    public void setCurrentValue(SetCommand setCommand,
                                ExpandableDirectByteBuffer returnBuffer)
    {
        currentValue = setCommand.readValue();
        prepareCurrentValueEvent(returnBuffer);
    }

    public void takeSnapshot(ExpandableDirectByteBuffer buffer)
    {
        snapshot.setUnderlyingBuffer(buffer, 0);
        snapshot.writeHeader();
        snapshot.writeValue(currentValue);
    }

    public void loadFromSnapshot(Snapshot snapshot)
    {
        currentValue = snapshot.readValue();
    }

    private void prepareCurrentValueEvent(ExpandableDirectByteBuffer
                                            returnBuffer)
    {
        currentValueEvent.setUnderlyingBuffer(returnBuffer, 0);
        currentValueEvent.writeHeader();
        currentValueEvent.writeValue(currentValue);
    }
}

Aeron Cluster provides a single call to receive messages within the ClusteredService:

1
2
3
4
5
6
7
8
@Override
public void onSessionMessage(ClientSession session, long timestamp,
                                DirectBuffer buffer, int offset,
                                int length, Header header)
{
    demuxer.setSession(session);
    demuxer.onFragment(buffer, offset, length, header);
}

Another key method within the ClusteredService is onTakeSnapshot, which is required for snapshot support. Using the takeSnapshot above, an implementation for writing the snapshot is:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
@Override
public void onTakeSnapshot(ExclusivePublication snapshotPublication)
{
    final ExpandableDirectByteBuffer buffer = new ExpandableDirectByteBuffer(Snapshot.BUFFER_LENGTH);

    //get the current value from the state machine
    stateMachine.takeSnapshot(buffer);

    //offer the snapshot to the publication
    snapshotPublication.offer(buffer, 0, Snapshot.BUFFER_LENGTH);
}

To read the snapshot, the onStart method will need to be provided with logic to read the snapshot using the demuxer:

1
2
3
4
5
6
7
8
@Override
public void onStart(Cluster cluster, Image snapshotImage)
{
    if (snapshotImage != null)
    {
        snapshotImage.poll(demuxer, 1);
    }
}

In order to route the messages from the input buffer, we need to decode them via a demuxer. This can be managed via a FragmentHandler, as shown below. Note that we are using Eider generated objects, and we can read in the message type using the EiderHelper object. With the message type known, a switch directs the logic to correctly decode and invoke the state machine. Note that in addition to the standard commands (AddCommand, MultiplyCommand, SetCommand) there is also a switch case for Snapshot. This will be called during the loading of the snapshot in the onStart above.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
private ExpandableDirectByteBuffer returnBuffer
            = new ExpandableDirectByteBuffer(CurrentValueEvent.BUFFER_LENGTH);
...
@Override
public void onFragment(DirectBuffer buffer, int offset,
                        int length, Header header)
{

    short eiderId = EiderHelper.getEiderId(buffer, offset);

    switch (eiderId)
    {
        case AddCommand.EIDER_ID:
            addCommand.setUnderlyingBuffer(buffer, offset);
            stateMachine.add(addCommand, returnBuffer);
            emitCurrentValue(returnBuffer);
            break;
        case MultiplyCommand.EIDER_ID:
            multiplyCommand.setUnderlyingBuffer(buffer, offset);
            stateMachine.multiply(multiplyCommand, returnBuffer);
            emitCurrentValue(returnBuffer);
            break;
        case SetCommand.EIDER_ID:
            setCommand.setUnderlyingBuffer(buffer, offset);
            stateMachine.setCurrentValue(setCommand, returnBuffer);
            emitCurrentValue(returnBuffer);
            break;
        case Snapshot.EIDER_ID:
            snapshot.setUnderlyingBuffer(buffer, offset);
            stateMachine.loadFromSnapshot(snapshot);
            break;
        default:
            logger.error("Unknown message {}", eiderId);
    }
}

We need to return the input message to the session that sent in a message. To do this, we store the session within the context of the demuxer, and then offer the buffer we constructed in the state machine in the prepareCurrentValueEvent method.

1
2
3
4
private void emitCurrentValue(ExpandableDirectByteBuffer buffer)
{
    session.offer(buffer, 0, CurrentValueEvent.BUFFER_LENGTH);
}

Building the client

For this basic sample, the client will be sending in a mixture of AddCommand and MultiplyCommand commands, and receiving the emitted CurrentValueEvent. To receive messages, the code must implement EgressListener, and pass that to the Aeron Cluster client.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
@Override
public void onMessage(long clusterSessionId, long timestamp,
                        DirectBuffer buffer,
                        int offset, int length, Header header)
{
    short eiderId = EiderHelper.getEiderId(buffer, offset);
    if (eiderId == CurrentValueEvent.EIDER_ID)
    {
        event.setUnderlyingBuffer(buffer, offset);
        log.info("Current value is {}", event.readValue());
    }
    else
    {
        log.warn("unknown message {}", eiderId);
    }
}

The onMessage method is only called if the egress is polled using clusterClient.pollEgress(), for example here it is being called within an offer:

1
2
3
4
5
6
7
private void offer(MutableDirectBuffer buffer, int offset, int length)
{
    while (clusterClient.offer(buffer, offset, length) < 0)
    {
        idleStrategy.idle(clusterClient.pollEgress());
    }
}

You can also call clusterClient.pollEgress() in your Duty Cycle.

Running the Sample

First run of cluster

10:50:19.372 [main] Aeron Dir = /var/folders/np/9nn7kn7d15qfc7qcnt9rbsgm0000gn/T/aeron-xxxxxx-cluster
10:50:19.377 [main] Cluster Dir = /Users/xxxxxx/src/aeron-cookbook-code/aeron-cluster
10:50:19.893 [clustered-service] Cluster Node is in role LEADER

Client connects

Client log

10:53:44.929 [main] Starting
10:53:44.931 [main] Adding 1; correlation = 1
10:53:44.934 [main] Adding 2; correlation = 2
10:53:44.936 [main] Adding 3; correlation = 3
10:53:44.937 [main] Adding 4; correlation = 4
10:53:44.939 [main] Adding 5; correlation = 5
10:53:44.940 [main] Adding 6; correlation = 6
10:53:44.943 [main] Current value is 1; correlation = 1
10:53:44.943 [main] Current value is 3; correlation = 2
10:53:44.943 [main] Current value is 6; correlation = 3
10:53:44.943 [main] Current value is 10; correlation = 4
10:53:44.943 [main] Current value is 15; correlation = 5
10:53:44.944 [main] Current value is 21; correlation = 6
...
10:53:45.027 [main] Adding 96; correlation = 96
10:53:45.028 [main] Adding 97; correlation = 97
10:53:45.029 [main] Adding 98; correlation = 98
10:53:45.030 [main] Current value is 7725; correlation = 96
10:53:45.030 [main] Multiplying by 2; correlation = 99
10:53:45.031 [main] Adding 100; correlation = 100
10:53:45.031 [main] Current value is 7822; correlation = 97
10:53:45.031 [main] Current value is 7920; correlation = 98
10:53:45.031 [main] Current value is 15840; correlation = 99
10:53:45.038 [main] Current value is 15940; correlation = 100
10:53:45.038 [main] Done

Cluster log

10:53:44.938 [clustered-service] adding 1, value is now 1; correlation = 1
10:53:44.939 [clustered-service] adding 2, value is now 3; correlation = 2
10:53:44.939 [clustered-service] adding 3, value is now 6; correlation = 3
10:53:44.939 [clustered-service] adding 4, value is now 10; correlation = 4
10:53:44.940 [clustered-service] adding 5, value is now 15; correlation = 5
10:53:44.941 [clustered-service] adding 6, value is now 21; correlation = 6
...
10:53:45.029 [clustered-service] adding 96, value is now 7725; correlation = 96
10:53:45.030 [clustered-service] adding 97, value is now 7822; correlation = 97
10:53:45.030 [clustered-service] adding 98, value is now 7920; correlation = 98
10:53:45.030 [clustered-service] multiplying by 2, value is now 15840; correlation = 99
10:53:45.035 [clustered-service] adding 100, value is now 15940; correlation = 100
10:53:50.038 [clustered-service] Cluster Client Session closed

Restarting the cluster

In the sample, the cluster has deleteDirOnStart set to false which means that restarting the cluster will result in the messages already sent in being replayed, and the cluster state is restored to the value in the first run:

10:58:03.682 [clustered-service] Cluster Client Session opened
10:58:03.685 [clustered-service] adding 1, value is now 1; correlation = 1
10:58:03.685 [clustered-service] adding 2, value is now 3; correlation = 2
...
10:58:03.685 [clustered-service] adding 98, value is now 7920; correlation = 98
10:58:03.685 [clustered-service] multiplying by 2, value is now 15840; correlation = 99
10:58:03.685 [clustered-service] adding 100, value is now 15940; correlation = 100
10:58:03.685 [clustered-service] Cluster Client Session closed
10:58:03.685 [clustered-service] Cluster Node is in role LEADER

Then, when we connect the client, the existing value in the state machine is used:

11:03:43.150 [main] Starting
11:03:43.152 [main] Adding 1; correlation = 1
11:03:43.155 [main] Adding 2; correlation = 2
11:03:43.157 [main] Adding 3; correlation = 3
11:03:43.158 [main] Adding 4; correlation = 4
11:03:43.160 [main] Current value is 15941; correlation = 1
11:03:43.160 [main] Current value is 15943; correlation = 2
11:03:43.160 [main] Current value is 15946; correlation = 3
11:03:43.160 [main] Current value is 15950; correlation = 4
...
11:03:43.249 [main] Adding 96; correlation = 96
11:03:43.249 [main] Adding 97; correlation = 97
11:03:43.250 [main] Adding 98; correlation = 98
11:03:43.251 [main] Multiplying by 2; correlation = 99
11:03:43.252 [main] Current value is 71485; correlation = 96
11:03:43.252 [main] Current value is 71582; correlation = 97
11:03:43.252 [main] Adding 100; correlation = 100
11:03:43.252 [main] Current value is 71680; correlation = 98
11:03:43.254 [main] Current value is 143360; correlation = 99
11:03:43.254 [main] Current value is 143460; correlation = 100
11:03:43.254 [main] Done

The cluster logs then show:

11:03:43.139 [clustered-service] Cluster Client Session opened
11:03:43.157 [clustered-service] adding 1, value is now 15941; correlation = 1
11:03:43.158 [clustered-service] adding 2, value is now 15943; correlation = 2
11:03:43.158 [clustered-service] adding 3, value is now 15946; correlation = 3
11:03:43.159 [clustered-service] adding 4, value is now 15950; correlation = 4
...
11:03:43.250 [clustered-service] adding 96, value is now 71485; correlation = 96
11:03:43.250 [clustered-service] adding 97, value is now 71582; correlation = 97
11:03:43.251 [clustered-service] adding 98, value is now 71680; correlation = 98
11:03:43.253 [clustered-service] multiplying by 2, value is now 143360; correlation = 99
11:03:43.253 [clustered-service] adding 100, value is now 143460; correlation = 100
11:03:48.255 [clustered-service] Cluster Client Session closed

Using Cluster Tool to take snapshots

For information on cluster tool, see Operating Aeron Cluster.

Once we request a snapshot with the cluster tool, using the command:

java -cp aeron-all-1.28.2.jar io.aeron.cluster.ClusterTool . snapshot
...
INFO: Mark file exists: ./cluster-mark.dat
Member [0]: SNAPSHOT applied successfully

Once executed, the cluster log will show:

11:04:08.710 [clustered-service] taking snapshot
11:04:08.711 [clustered-service] taking snapshot with current value at 143460

This will write the snapshot state, and truncate the cluster log. When restarted, the cluster log then shows the snapshot load:

11:05:44.489 [clustered-service] loading snapshot
11:05:44.489 [clustered-service] reading snapshot with current value at 143460
11:05:44.512 [clustered-service] Cluster Node is in role LEADER
Back to top