Skip to content

Basic Sample

A complete sample which sends data into a spied Aeron Archive, and then replays it. As with other examples, the publisher and subscriber are on the same thread - but this would not make sense in a real world application. Despite this, it is useful as it shows the full setup of a minimal Aeron Archive application. The code can be found in the aeron-archive project, in the SimplestCase.java file on GitHub.

Why is the example so much longer than the basic Aeron one?

Aeron Archive introduces multiple moving parts, including a new Archiver agent server process. All of this, along with the ability for Aeron Archive to support multiple recordings, means that there are additional moving parts to configure. It is strongly suggested that you first review and understand the Aeron Basic Sample before continuing with this.

The code will be broken into the following sections:

  • setting up the Aeron environment;
  • writing the data to the Archive;
  • reading the data from the archive;
  • cleaning up.

This is all done via the static main method, which calls each step:

public static void main(String[] args)
{
    final SimplestCase simplestCase = new SimplestCase();
    simplestCase.setup();
    simplestCase.write();
    simplestCase.read();
    simplestCase.cleanUp();
}

Execution Output

18:34:18.372 [main] Writing
18:34:18.496 [main] Reading
18:34:18.657 [main] Received 0
18:34:18.658 [main] Received 1
...
18:34:18.673 [main] Received 9999
18:34:18.673 [main] Received 10000

Setting up Aeron Archive

First, we need to setup the Media Driver, Aeron and the Aeron Archive.

Info

This sample is using a number of defaults that will mean that the connectivity will not work unless all running on the same host. Further examples show how to achieve multi-host connectivity.

The key code is:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
public void setup()
{
    mediaDriver = ArchivingMediaDriver.launch(
        new MediaDriver.Context()
            .spiesSimulateConnection(true)
            .dirDeleteOnStart(true),
        new Archive.Context()
            .deleteArchiveOnStart(true)
            .archiveDir(tempDir)
    );
    aeron = Aeron.connect();
    aeronArchive = AeronArchive.connect(
        new AeronArchive.Context()
            .aeron(aeron)
    );
}

First, we create the Archiving Media Driver, which requires two components - the Media Driver context and the Archive context. The most interesting bits are:

  • Line 5, which instructs Aeron to create virtual subscribers (spies) on the publication. This ensures the publication can be written to without there being an attached subscriber.
  • Line 9, which tells Aeron Archive where to hold the archive data files.

See also: Media Driver

Publishing messages to the Archive

Next, we need to send data into the archive so that we may later read it.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
private void write()
{
    aeronArchive.startRecording(channel, streamCapture, SourceLocation.LOCAL);
    try (ExclusivePublication publication = aeron.addExclusivePublication(channel, streamCapture))
    {
        while (!publication.isConnected())
        {
            idleStrategy.idle();
        }
        for (int i = 0; i <= sendCount; i++)
        {
            buffer.putInt(0, i);
            while (publication.offer(buffer, 0, Integer.BYTES) < 0)
            {
                idleStrategy.idle();
            }
        }
        final long stopPosition = publication.position();
        final CountersReader countersReader = aeron.countersReader();
        final int counterId = RecordingPos.findCounterIdBySession(countersReader, publication.sessionId());
        while (countersReader.getCounterValue(counterId) < stopPosition)
        {
            idleStrategy.idle();
        }
    }
}

Here, the code is instructing the Aeron Archive created in setup to start recording, then creating the publication which will be recorded, and finally in lines 18-24 it waits until the archive data is fully written.

Why do we need to await the completion of the writing?

This is purely due to the single threaded nature of this example that the archive has to be fully written before being read. In normal applications (which would likely be multi-process), this wouldn't make sense.

Key lines described below:

  • Line 3 instructs Aeron archive to start recording the given channel and stream on the local media driver.
  • Line 4 creates the publication. Note that the stream and channel are identical in lines 3 and 4 -- this is because we're recording the same publication we write to.
  • Lines 6-9 idle the thread until the publication is connected
  • Lines 10-17 publish 10,000 32 bit integer values
  • Lines 18-24 await the completion of the archive write process. This is done by first checking the last position of the publication, then waiting for the counter's current value for the same channel and stream to be equal to the last position of the publication.

Reading the Aeron archive

Finding the recording id

Before we can read the archive, we need to know which recording ID we want to read from. To do this, we need to list the available recordings and get the most recent one. Once we have the recording ID, we can replay the archive.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
private long findLatestRecording(final AeronArchive archive, String channel, int stream)
{
    final MutableLong lastRecordingId = new MutableLong();
    final RecordingDescriptorConsumer consumer =
        (controlSessionId, correlationId, recordingId,
        startTimestamp, stopTimestamp, startPosition,
        stopPosition, initialTermId, segmentFileLength,
        termBufferLength, mtuLength, sessionId,
        streamId, strippedChannel, originalChannel,
        sourceIdentity) -> lastRecordingId.set(recordingId);

    final long fromRecordingId = 0L;
    final int recordCount = 100;
    final int foundCount = archive.listRecordingsForUri(fromRecordingId, recordCount, channel, stream, consumer);

    if (foundCount == 0)
    {
        throw new IllegalStateException("no recordings found");
    }
    return lastRecordingId.get();
}

Replaying the archive

The example below makes use of Aeron Archive's LiveReplay feature - this starts the replay from the very beginning, and will stay reading the data until stopped. This allows a late joiner to get all the data missed at start, and to receive live data once caught up.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
private void read()
{
    try (AeronArchive reader = AeronArchive.connect(new AeronArchive.Context().aeron(aeron)))
    {
        final long recordingId = findLatestRecording(reader, channel, streamCapture);
        final long position = 0L;
        final long length = Long.MAX_VALUE;
        final long sessionId = reader.startReplay(recordingId, position, length, channel, streamReplay);
        final String channelRead = ChannelUri.addSessionId(channel, (int) sessionId);
        final Subscription subscription = reader.context().aeron().addSubscription(channelRead, streamReplay);
        while (!subscription.isConnected())
        {
            idleStrategy.idle();
        }
        while (!complete)
        {
            int fragments = subscription.poll(this::archiveReader, 1);
            idleStrategy.idle(fragments);
        }
    }
}

Key lines are as follows:

  • Line 6 sets up the position variable so that Aeron Archive knows to start from the very beginning of the archive
  • Line 7 sets up the length variable so that Aeron Archive knows to follow the live recording (aka Live Replay)
  • Line 8 begins the replay
  • Lines 9-10 connect a subscription to the replayed aeron archive.

Cleaning up

private void cleanUp()
{
    CloseHelper.quietClose(aeronArchive);
    CloseHelper.quietClose(aeron);
    CloseHelper.quietClose(mediaDriver);
}

When the application is ready to be exited, it is important to cleanly shutdown the archive, Aeron and the Media Driver. Doing this in the incorrect order or not doing it may result in a hung process or core dump.

Back to top