Skip to content

Replication Sample

A complete sample which shows multi-host replication, with the client partially consuming the stream from one host, disconnecting, and then resuming without data loss or duplication from the backup. The example builds upon the Multi-host sample, so it would be best to be familiar with that sample first.

The code is on GitHub.

Overview

First, the system is configured as below, with the Archive Client receiving data from the Archive Host, and the Archive Backup replicating data from the Archive Host:

%%{init: {'theme': 'base', 'themeVariables': { 'fontFamily': '-apple-system, BlinkMacSystemFont, Roboto, Oxygen-Sans, Ubuntu, Cantarell, sans-serif'}}}%%
graph LR;
    Host[Archive Host]-- replay -->Client[Archive Client];
    Host-- replication -->Backup[Archive Backup]

After receiving 20 items, the client then reconfigures itself to read from the backup as follows:

%%{init: {'theme': 'base', 'themeVariables': { 'fontFamily': '-apple-system, BlinkMacSystemFont, Roboto, Oxygen-Sans, Ubuntu, Cantarell, sans-serif'}}}%%
graph LR;
    Host[Archive Host]-- replication -->Backup[Archive Backup];
    Backup-- replay -->Client[Archive Client]

The sample operates as follows:

  • at start of the Archive Host, it constructs an Archiving Media Driver and Aeron client. This Archiving Media Driver listens on two ports: 17000 and 17001 - this is for the Control Request and Recording Events channel respectively.
  • with the Aeron object ready, the Archive Host constructs a spied aeron-ipc subscription on stream 100 and starts appending data
  • at the start of the Archive Backup, it also constructs an Archiving Media Driver and Aeron client. This Archiving Media Driver listens on the same two ports: 17000 and 17001 - this is for the Control Request and Recording Events channel respectively.
  • the Archive Backup has Aeron Agent installed, with logging for aeron.event.log=admin and aeron.event.archive.log=all to show the actual Archive Backup activity. This is set within the Docker entrypoint.sh
  • with the Archive ready, the Archive Backup connects to the Archive Host and starts a replication session.
  • at the start of the Archive Client, it constructs a normal Media Driver and Aeron client.
  • the Archive Client then creates and connects an instance of AeronArchive to the Archive Host. It will wait until the Archive Host is ready.
  • with a connected AeronArchive, the Archive Client asks the remote archive to list its recordings and finds the correct one matching aeron-ipc subscription on stream 100. It then creates a local subscription and requests the remote Archive Host to begin a replay to it.
  • once the Archive Client has consumed 20 items from the Archive Host, it disconnects and reconnects to the Archive Backup and resumes from 21 onwards

Note

The sample runs in Docker and as a result some configuration is simplified - most importantly, the Archive Host and Archive Backup processes use the same ports. If you run this on a single machine, you will need to adjust the code to use per process ports.

A note on position

The position returned by the offer is the same position you can read in a Header passed into a FragmentHandler. This is the same position you provide when replaying from a given offset - so for example, if you last received a message in the FragmentHandler with header.position() of 1280, you would then replay from 1280 to get the next message.

Building the Archive Host

The archive host is unchanged from the Multi-host Sample, and requires nothing particular to allow replication. The primary host code is found on GitHub: ArchiveHost.java

Building the Archive Backup

Constructing ArchivingMediaDriver

The Archive Backup process has a slightly different configuration to the Archive Host.

The lines that differ from the Archive Host are highlighted. Line 17 specifies the replication channel to which data must be replicated by the Archive Host, and lines 21 to 22 configure the internal Archive Client within the Archive so that archive control responses can be received correctly.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
private ArchivingMediaDriver launchMediaDriver(String thisHost, int controlChannelPort,
                                                   int recordingEventsPort, int replayChannelPort)
{
    LOGGER.info("launching ArchivingMediaDriver");
    final String controlChannel = AERON_UDP_ENDPOINT + thisHost + ":" + controlChannelPort;
    final String controlResponseChannel = AERON_UDP_ENDPOINT + thisHost + ":0";
    final String replicationChannel = AERON_UDP_ENDPOINT + thisHost + ":" + replayChannelPort;
    final String recordingEventsChannel =
        "aeron:udp?control-mode=dynamic|control=" + thisHost + ":" + recordingEventsPort;

    final var archiveContext = new Archive.Context()
        .deleteArchiveOnStart(true)
        .errorHandler(this::errorHandler)
        .controlChannel(controlChannel)
        .recordingEventsChannel(recordingEventsChannel)
        .idleStrategySupplier(SleepingMillisIdleStrategy::new)
        .replicationChannel(replicationChannel) //required to allow replication to this Archive
        .threadingMode(ArchiveThreadingMode.SHARED);

    //this is required for the AeronArchive client within the Aeron Archive host process for replication
    final var archiveClientContext = new AeronArchive.Context()
        .controlResponseChannel(controlResponseChannel);

    archiveContext.archiveClientContext(archiveClientContext);

    final var mediaDriverContext = new MediaDriver.Context()
        .spiesSimulateConnection(true)
        .errorHandler(this::errorHandler)
        .threadingMode(ThreadingMode.SHARED)
        .sharedIdleStrategy(new SleepingMillisIdleStrategy())
        .dirDeleteOnStart(true);

    return ArchivingMediaDriver.launch(mediaDriverContext, archiveContext);
}

This code is in ArchiveReplicatorAgent.java on GitHub.

Starting replication

Starting a replication session is similar to starting the client replay session. First, we create and connect an AeronArchive instance to the remote Archive Host using the same async method as the client in the Multi-host sample. Then, we find the recording. And finally, we start the replication.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
 private void connectAndReplicateRemoteArchive()
{
    if (remoteAsyncConnect == null)
    {
        LOGGER.info("connecting aeron archive");
        remoteAsyncConnect = AeronArchive.asyncConnect(new AeronArchive.Context()
            .controlRequestChannel(AERON_UDP_ENDPOINT + remoteArchiveHost + ":" + controlChannelPort)
            .recordingEventsChannel(AERON_UDP_ENDPOINT + remoteArchiveHost + ":" + recordingEventsPort)
            .controlResponseChannel(AERON_UDP_ENDPOINT + host + ":0")
            .aeron(aeron));
    } else
    {
        //if the archive hasn't been set yet, poll it after idling 250ms
        if (null == remoteArchiveClient)
        {
            LOGGER.info("awaiting aeron archive");
            idleStrategy.idle();
            try
            {
                remoteArchiveClient = remoteAsyncConnect.poll();
            } catch (TimeoutException e)
            {
                LOGGER.info("timeout");
                remoteAsyncConnect = null;
            }
        } else
        {
            LOGGER.info("finding remote recording");
            //archive is connected. find the recording on the remote archive host
            final var recordingId = getRemoteRecordingId("aeron:ipc", STREAM_ID);
            if (recordingId != Long.MIN_VALUE)
            {
                LOGGER.info("remote recording id is {}", recordingId);
                long replicationId = localArchiveClient.replicate(recordingId, NULL_VALUE,
                    remoteArchiveClient.context().controlRequestStreamId(),
                    remoteArchiveClient.context().controlRequestChannel(), "");
                LOGGER.info("replication id is {}", replicationId);
                currentState = State.REPLICATING;
            } else
            {
                //await the remote host being ready, idle 250ms
                idleStrategy.idle();
            }
        }
    }
}

Building a client

As before, the client makes use of a standard Media Driver and Aeron client, without any specific configuration. The only real difference is when connecting to the Archive Backup, the last received position is given instead of 0L for the replay start position.

Replaying from a given position

The example client code keeps track of the current header.position() in the FragmentHandler. Once the 20th value is received, this position is captured and used for the replay. Otherwise, the client connection is much the same as before.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
public class ArchiveClientFragmentHandler implements FragmentHandler
{
    private static final Logger LOGGER = LoggerFactory.getLogger(ArchiveClientFragmentHandler.class);
    private long lastValue;
    private long lastPosition;

    @Override
    public void onFragment(DirectBuffer buffer, int offset, int length, Header header)
    {
        lastValue = buffer.getLong(offset);
        lastPosition = header.position();
        LOGGER.info("received {}", lastValue);
    }

    public long getLastValue()
    {
        return lastValue;
    }

    public long getLastPosition()
    {
        return lastPosition;
    }
}

this value is read once the client reaches value 20:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
private void initialReplayFromHost()
{
    //fragment limit is set low to allow us to consume them one by one
    replayDestinationSubs.poll(fragmentHandler, 1);
    if (fragmentHandler.getLastValue() == 20)
    {
        LOGGER.info("replay has reached item 20 at position {}", fragmentHandler.getLastPosition());

        //stop the replay
        hostArchive.stopReplay(replaySession);

        //kill the replay subscription
        CloseHelper.quietClose(replayDestinationSubs);

        //kill the archive.
        CloseHelper.quietClose(asyncHostConnect);
        CloseHelper.quietClose(hostArchive);

        backupStartPosition = fragmentHandler.getLastPosition();

        this.currentState = SWITCH_TO_BACKUP;
    }
}

and then within the replay from the Archive Backup, it is used:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
private void connectToBackupArchive()
{
    //start an asyncConnect if one not in progress
    if (asyncBackupConnect == null)
    {
        LOGGER.info("connecting to backup aeron archive");
        asyncBackupConnect = AeronArchive.asyncConnect(new AeronArchive.Context()
            .controlRequestChannel(AERON_UDP_ENDPOINT + backupHost + ":" + archiveControlPort)
            .recordingEventsChannel(AERON_UDP_ENDPOINT + backupHost + ":" + archiveEventPort)
            .controlResponseChannel(AERON_UDP_ENDPOINT + thisHost + ":0")
            .aeron(aeron));
    } else
    {
        //if the archive hasn't been set yet, poll it after idling 250ms
        if (null == backupArchive)
        {
            LOGGER.info("awaiting backup aeron archive");
            idleStrategy.idle();
            try
            {
                backupArchive = asyncBackupConnect.poll();
            } catch (TimeoutException e)
            {
                LOGGER.info("timeout");
                asyncBackupConnect = null;
            }
        } else
        {
            LOGGER.info("finding backup remote recording");
            //archive is connected. find the recording on the remote archive host
            final var recordingId = getRecordingId(backupArchive, "aeron:ipc", RECORDED_STREAM_ID);
            if (recordingId != Long.MIN_VALUE)
            {
                //ask aeron to assign an ephemeral port for this replay
                final var localReplayChannelEphemeral = AERON_UDP_ENDPOINT + thisHost + ":0";
                //construct a local subscription for the remote host to replay to
                replayDestinationBackupSubs = aeron.addSubscription(localReplayChannelEphemeral, REPLAY_STREAM_ID);
                //resolve the actual port and use that for the replay
                final var actualReplayChannel = replayDestinationBackupSubs.tryResolveChannelEndpointPort();
                LOGGER.info("actualReplayChannel={}", actualReplayChannel);
                //replay from the archive recording the start
                replaySession =
                    backupArchive.startReplay(recordingId, backupStartPosition, Long.MAX_VALUE, actualReplayChannel,
                        REPLAY_STREAM_ID);
                LOGGER.info("ready to poll subscription, replaying to {} from position {}, image is {}",
                    actualReplayChannel, backupStartPosition, (int) replaySession);
                currentState = State.POLLING_BACKUP_SUBSCRIPTION;
            } else
            {
                //await the remote host being ready, idle 250ms
                idleStrategy.idle();
            }
        }
    }
}

The full code for this is found in ArchiveClientAgent.java on GitHub.

Running the sample

Note

It is assumed the reader is familiar with Docker and has Docker installed.

  • First, you will need a recent version of Docker. This was tested on Docker 3.3.3 with Docker Engine 20.10.6.
  • Dedicate at least 4GB of memory and 4 to 8 cores to Docker. This was tested with 16 cores and 32GB ram.
  • You will need access to docker hub to download the aeroncookbook:jdk16 container, or to build the base container yourself. The base container Dockerfile can be found here. The base container is itself built on top of the azul/zulu-openjdk-debian:16 container. The aeroncookbook:jdk16 container is NOT suitable for production use.
  • Then, you will need to build the shadow jars for the archive client and archive host. To do this, run gradle in the sample source code top level folder (typically aeron-cookbook-code) with no arguments.
  • Finally, move to the archive-replication folder, and run docker-compose up -d.

Execution Output

Info

This log output is from the client only until the client switches to the backup host. The log output has also been edited slightly to improve readability.

You can see the client receiving items 1 to 20, and then swapping over to the backup. The host continues writing the data, and the client continues from 21 from the replicated archive on the Archive Backup host. The Archive Backup host shows movement via the ArchiveProgressListener

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
archive-client_1  | 16:28:46.276 [main] INFO ArchiveClientAgent - detected ip4 address as 10.1.0.4
archive-client_1  | 16:28:46.284 [main] INFO ArchiveClientAgent - launching media driver
archive-client_1  | 16:28:46.466 [main] INFO ArchiveClientAgent - connecting aeron; media driver directory /dev/shm/aeron-root
archive-client_1  | 16:28:46.484 [archive-client] INFO ArchiveClientAgent - starting
archive-client_1  | 16:28:46.485 [archive-client] INFO ArchiveClientAgent - connecting aeron archive
archive-client_1  | 16:28:46.592 [archive-client] INFO ArchiveClientAgent - awaiting aeron archive
archive-client_1  | 16:28:46.845 [archive-client] INFO ArchiveClientAgent - awaiting aeron archive
archive-client_1  | 16:28:47.099 [archive-client] INFO ArchiveClientAgent - finding remote recording
archive-client_1  | 16:28:47.111 [archive-client] INFO ArchiveClientAgent - actualReplayChannel=aeron:udp?endpoint=10.1.0.4:54267
archive-client_1  | 16:28:47.432 [archive-client] INFO ArchiveClientAgent - ready to poll subscription, replaying to aeron:udp?endpoint=10.1.0.4:54267, image is -582469901
archive-client_1  | 16:28:47.435 [archive-client] INFO ArchiveClientFragmentHandler - received 1
archive-client_1  | 16:28:48.561 [archive-client] INFO ArchiveClientFragmentHandler - received 2
archive-client_1  | 16:28:50.562 [archive-client] INFO ArchiveClientFragmentHandler - received 3
archive-client_1  | 16:28:52.562 [archive-client] INFO ArchiveClientFragmentHandler - received 4
...
archive-client_1  | 16:29:18.565 [archive-client] INFO ArchiveClientFragmentHandler - received 17
archive-client_1  | 16:29:20.567 [archive-client] INFO ArchiveClientFragmentHandler - received 18
archive-client_1  | 16:29:22.566 [archive-client] INFO ArchiveClientFragmentHandler - received 19
archive-client_1  | 16:29:24.566 [archive-client] INFO ArchiveClientFragmentHandler - received 20
archive-client_1  | 16:29:24.566 [archive-client] INFO ArchiveClientAgent - replay has reached item 20 at position 1280
archive-client_1  | 16:29:24.574 [archive-client] INFO ArchiveClientAgent - connecting to backup aeron archive
archive-client_1  | 16:29:24.585 [archive-client] INFO ArchiveClientAgent - awaiting backup aeron archive
archive-client_1  | 16:29:24.837 [archive-client] INFO ArchiveClientAgent - awaiting backup aeron archive
archive-client_1  | 16:29:25.089 [archive-client] INFO ArchiveClientAgent - finding backup remote recording
archive-client_1  | 16:29:25.095 [archive-client] INFO ArchiveClientAgent - actualReplayChannel=aeron:udp?endpoint=10.1.0.4:42819
archive-client_1  | 16:29:25.791 [archive-client] INFO ArchiveClientAgent - ready to poll subscription, replaying to aeron:udp?endpoint=10.1.0.4:42819 from position 1280, image is 1806745592
archive-host_1    | 16:29:26.562 [agent-host] INFO ArchiveHostAgent - appended 21
archive-host_1    | 16:29:26.565 [agent-host] INFO ArchiveProgressListener - recording activity recordingId=0 startPos=0 position=1344
archive-client_1  | 16:29:26.570 [archive-client] INFO ArchiveClientFragmentHandler - received 21
archive-backup_1  | 16:29:26.650 [agent-replicator] INFO ArchiveProgressListener - recording activity recordingId=0 startPos=0 position=1344
archive-host_1    | 16:29:28.562 [agent-host] INFO ArchiveHostAgent - appended 22
archive-host_1    | 16:29:28.563 [agent-host] INFO ArchiveProgressListener - recording activity recordingId=0 startPos=0 position=1408
archive-client_1  | 16:29:28.569 [archive-client] INFO ArchiveClientFragmentHandler - received 22
archive-backup_1  | 16:29:28.663 [agent-replicator] INFO ArchiveProgressListener - recording activity recordingId=0 startPos=0 position=1408
archive-host_1    | 16:29:30.562 [agent-host] INFO ArchiveHostAgent - appended 23
archive-host_1    | 16:29:30.564 [agent-host] INFO ArchiveProgressListener - recording activity recordingId=0 startPos=0 position=1472
archive-client_1  | 16:29:30.568 [archive-client] INFO ArchiveClientFragmentHandler - received 23
archive-backup_1  | 16:29:30.675 [agent-replicator] INFO ArchiveProgressListener - recording activity recordingId=0 startPos=0 position=1472
archive-host_1    | 16:29:32.562 [agent-host] INFO ArchiveHostAgent - appended 24
archive-host_1    | 16:29:32.564 [agent-host] INFO ArchiveProgressListener - recording activity recordingId=0 startPos=0 position=1536
archive-client_1  | 16:29:32.568 [archive-client] INFO ArchiveClientFragmentHandler - received 24
...
Back to top