Multi-host Sample¶
A complete sample which sends data from a host on one Docker container to another in a different Docker container. This forces different IP addresses and requires correct wiring of both the host and client.
The code is on GitHub.
Overview¶
%%{init: {'theme': 'base', 'themeVariables': { 'fontFamily': '-apple-system, BlinkMacSystemFont, Roboto, Oxygen-Sans, Ubuntu, Cantarell, sans-serif'}}}%%
graph LR;
Host[Archive Host]-- replay -->Client[Archive Client];
The sample operates as follows:
- at start of the Archive Host, it constructs an Archiving Media Driver and Aeron client. This Archiving Media Driver listens on two ports: 17000 and 17001 - this is for the Control Request and Recording Events channel respectively.
- with the Aeron object ready, the Archive Host constructs a spied
aeron-ipc
subscription on stream 100 - at the start of the Archive Client, it constructs a normal Media Driver and Aeron client.
- the Archive Client then creates and connects an instance of
AeronArchive
to the Archive Host. It will wait until the Archive Host is ready. - with a connected
AeronArchive
, the Archive Client asks the remote archive to list its recordings and finds the correct one matchingaeron-ipc
subscription on stream 100. It then creates a local subscription and requests the remote Archive Host to begin a replay to it.
Building the Archive Host¶
The most important part of the Archive Host is the setup of the Archiving Media Driver and the starting of the archive recording.
Constructing ArchivingMediaDriver¶
There are two key channels configured - the control channel and recording events channel. These must bind to the Archive Host's IP address - in the Docker container the value for host
is 10.1.0.2
, and the controlChannelPort
is 17000
and the recordingEventsPort
is 17001
. These values come from the docker-compose.yml file.
Also of note, see line 17 which tells the Media Driver that spies simulate connections. This allows the aeron:ipc
stream to be published to without an explicit subscription.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 |
|
This code is in ArchiveHostAgent.java on GitHub.
Starting a recording¶
To start a recording, we need to connect an instance of AeronArchive
to the ArchivingMediaDriver
using the same host and port information above. The responses will flow over controlResponseChannel
, which is set to use an ephemeral port over UDP in the sample.
With the AeronArchive
connected, a local publication to aeron:ipc
on stream 100 is created. The archive is then requested to start the recording on the local ArchivingMediaDriver for the given aeron:ipc
stream. Since this is an asynchronous operation, we then keep track of the recording getting started by observing the Aeron counters. Next, the recording ID is captured and logged and finally, the activity and events listeners are wired up.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 |
|
Building a client¶
The client makes use of a standard Media Driver and Aeron client, without any specific configuration. The interesting part comes once the Aeron client is connected.
Connecting to the remote Aeron Archive¶
The example code runs the connect to remote Aeron Archive code within the duty cycle of the agent, and it is expected that it would be called many times during operation.
First, the code gets hold of an AsyncConnect
object, which is used for asynchronous connections to an ArchivingMediaDriver. Note that the controlRequestChannel
and recordingEventsChannel
both point to the Archive Host, and use the ports defined by the host. The control response channel (to which the remote ArchivingMediaDriver sends responses) this time however is for this machine. This AsyncConnect
is expected to be polled within a duty cycle, however, if the remote host is not available before timeout, it will result in a TimeoutException
being thrown.
Once the AsyncConnect
object returns a non-null AeronArchive
, the code can proceed. The code then calls getRecordingId
, which requests the remote ArchivingMediaDriver to list all recordings for the given uri (aeron:ipc
in this case) and stream (100 in this case). With the recording Id known, the code then constructs a local UDP subscription on an ephemeral port, and requests the remote ArchivingMediaDriver to replay the recorded stream to it, from position 0 to the end of the stream.
With this all in place, the local subscription can then be polled as normal.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 |
|
The full code for this is found in ArchiveClientAgent.java on GitHub.
Running the sample¶
Note
It is assumed the reader is familiar with Docker and has Docker installed.
- First, you will need a recent version of Docker. This was tested on Docker 3.3.3 with Docker Engine 20.10.6.
- Dedicate at least 4GB of memory and 4 to 8 cores to Docker. This was tested with 16 cores and 32GB ram.
- You will need access to docker hub to download the
aeroncookbook:jdk17
container, or to build the base container yourself. The base container Dockerfile can be found here. The base container is itself built on top of theazul/zulu-openjdk-debian:17
container. Theaeroncookbook:jdk17
container is NOT suitable for production use. - Then, you will need to build the shadow jars for the archive client and archive host. To do this, run
gradle
in the sample source code top level folder (typicallyaeron-cookbook-code
) with no arguments. - Finally, move to the
archive-multi-host
folder, and rundocker-compose up -d
.
Execution Output¶
Info
This log output is from running docker-compose up
. After the first few messages were sent/received, the Docker containers were killed.
Inspecting the host¶
If you ran the sample with the -d
option, you can now enter one of the running containers and take a look at the running process. First, open a bash terminal:
docker exec -it archive-core_archive-host_1 bash
Once inside the container, AeronStat can be launched with:
java -cp /root/aeron/aeron-all-*.jar io.aeron.samples.AeronStat watch=false
Sample output (notice the archive specific ports & Archive Control Sessions):
11:26:39 - Aeron Stat (CnC v0.2.0), pid 82, heartbeat age 740ms
======================================================================
0: 684,992 - Bytes sent
1: 506,784 - Bytes received
2: 0 - Failed offers to ReceiverProxy
3: 0 - Failed offers to SenderProxy
4: 0 - Failed offers to DriverConductorProxy
5: 0 - NAKs sent
6: 0 - NAKs received
7: 7,929 - Status Messages sent
8: 10,566 - Status Messages received
9: 20,842 - Heartbeats sent
10: 15,805 - Heartbeats received
11: 0 - Retransmits sent
12: 0 - Flow control under runs
13: 0 - Flow control over runs
14: 0 - Invalid packets
15: 0 - Errors
16: 0 - Short sends
17: 0 - Failed attempts to free log buffers
18: 0 - Sender flow control limits, i.e. back-pressure events
19: 0 - Unblocked Publications
20: 0 - Unblocked Control Commands
21: 0 - Possible TTL Asymmetry
22: 0 - ControllableIdleStrategy status
23: 0 - Loss gap fills
24: 0 - Client liveness timeouts
25: 0 - Resolution changes: driverName= hostname=archive-host
26: 637,070,400 - Conductor max cycle time doing its work (ns)
27: 0 - Conductor work cycle exceeded threshold count
28: 1,621,509,998,989 - client-heartbeat: 1
29: 2 - Archive Control Sessions
30: 1 - rcv-channel: aeron:udp?endpoint=archive-host:17000|sparse=true 10.1.0.2:17000
31: 1 - rcv-local-sockaddr: 30 10.1.0.2:17000
32: 1 - snd-channel: aeron:udp?control-mode=dynamic|control=archive-host:17001 10.1.0.2:17001
33: 1 - snd-local-sockaddr: 32 10.1.0.2:17001
34: 0 - pub-pos (sampled): 7 1179597151 30 aeron:udp?control-mode=dynamic|control=archive-host:17001
35: 0 - pub-lmt: 7 1179597151 30 aeron:udp?control-mode=dynamic|control=archive-host:17001
36: 0 - snd-pos: 7 1179597151 30 aeron:udp?control-mode=dynamic|control=archive-host:17001
37: 0 - snd-lmt: 7 1179597151 30 aeron:udp?control-mode=dynamic|control=archive-host:17001
38: 0 - snd-bpe: 7 1179597151 30 aeron:udp?control-mode=dynamic|control=archive-host:17001
39: 448 - sub-pos: 5 -485099970 10 aeron:udp?endpoint=archive-host:17000|sparse=true @0
40: 448 - rcv-hwm: 9 -485099970 10 aeron:udp?endpoint=archive-host:17000|sparse=true
41: 448 - rcv-pos: 9 -485099970 10 aeron:udp?endpoint=archive-host:17000|sparse=true
42: 1 - rcv-channel: aeron:udp?sparse=true|term-length=65536|mtu=1408|endpoint=10.1.0.2:0 10.1.0.2:56551
43: 1 - rcv-local-sockaddr: 42 10.1.0.2:56551
44: 1,621,509,999,409 - client-heartbeat: 8
45: 1 - snd-channel: aeron:udp?sparse=true|term-length=65536|mtu=1408|endpoint=10.1.0.2:17000 10.1.0.2:36485
46: 1 - snd-local-sockaddr: 45 10.1.0.2:36485
47: 320 - pub-pos (sampled): 11 1179597152 10 aeron:udp?sparse=true|term-length=65536|mtu=1408|endpoint=10.1.0.2:17000
48: 33,088 - pub-lmt: 11 1179597152 10 aeron:udp?sparse=true|term-length=65536|mtu=1408|endpoint=10.1.0.2:17000
49: 320 - snd-pos: 11 1179597152 10 aeron:udp?sparse=true|term-length=65536|mtu=1408|endpoint=10.1.0.2:17000
50: 32,768 - snd-lmt: 11 1179597152 10 aeron:udp?sparse=true|term-length=65536|mtu=1408|endpoint=10.1.0.2:17000
51: 0 - snd-bpe: 11 1179597152 10 aeron:udp?sparse=true|term-length=65536|mtu=1408|endpoint=10.1.0.2:17000
52: 320 - sub-pos: 5 1179597152 10 aeron:udp?endpoint=archive-host:17000|sparse=true @0
53: 320 - rcv-hwm: 12 1179597152 10 aeron:udp?endpoint=archive-host:17000|sparse=true
54: 320 - rcv-pos: 12 1179597152 10 aeron:udp?endpoint=archive-host:17000|sparse=true
55: 1 - snd-channel: aeron:udp?endpoint=10.1.0.2:56551|mtu=1408|term-length=65536|sparse=true 10.1.0.2:37917
56: 1 - snd-local-sockaddr: 55 10.1.0.2:37917
57: 288 - pub-pos (sampled): 14 1179597153 20 aeron:udp?endpoint=10.1.0.2:56551|mtu=1408|term-length=65536|sparse=true
58: 33,056 - pub-lmt: 14 1179597153 20 aeron:udp?endpoint=10.1.0.2:56551|mtu=1408|term-length=65536|sparse=true
59: 288 - snd-pos: 14 1179597153 20 aeron:udp?endpoint=10.1.0.2:56551|mtu=1408|term-length=65536|sparse=true
60: 32,768 - snd-lmt: 14 1179597153 20 aeron:udp?endpoint=10.1.0.2:56551|mtu=1408|term-length=65536|sparse=true
61: 0 - snd-bpe: 14 1179597153 20 aeron:udp?endpoint=10.1.0.2:56551|mtu=1408|term-length=65536|sparse=true
62: 192 - sub-pos: 10 1179597153 20 aeron:udp?sparse=true|term-length=65536|mtu=1408|endpoint=10.1.0.2:0 @0
63: 288 - rcv-hwm: 15 1179597153 20 aeron:udp?sparse=true|term-length=65536|mtu=1408|endpoint=10.1.0.2:0
64: 288 - rcv-pos: 15 1179597153 20 aeron:udp?sparse=true|term-length=65536|mtu=1408|endpoint=10.1.0.2:0
65: 16,960 - pub-pos (sampled): 16 1179597154 100 aeron:ipc
66: 33,554,432 - pub-lmt: 16 1179597154 100 aeron:ipc
67: 1 - snd-channel: aeron:udp?endpoint=10.1.0.3:36566|mtu=1408|term-length=65536|sparse=true 10.1.0.2:46532
68: 1 - snd-local-sockaddr: 67 10.1.0.2:46532
69: 512 - pub-pos (sampled): 17 1179597155 20 aeron:udp?endpoint=10.1.0.3:36566|mtu=1408|term-length=65536|sparse=true
70: 33,280 - pub-lmt: 17 1179597155 20 aeron:udp?endpoint=10.1.0.3:36566|mtu=1408|term-length=65536|sparse=true
71: 512 - snd-pos: 17 1179597155 20 aeron:udp?endpoint=10.1.0.3:36566|mtu=1408|term-length=65536|sparse=true
72: 32,768 - snd-lmt: 17 1179597155 20 aeron:udp?endpoint=10.1.0.3:36566|mtu=1408|term-length=65536|sparse=true
73: 0 - snd-bpe: 17 1179597155 20 aeron:udp?endpoint=10.1.0.3:36566|mtu=1408|term-length=65536|sparse=true
74: 16,960 - sub-pos: 19 1179597154 100 aeron:ipc @0
75: 16,960 - rec-pos: 0 1179597154 100 aeron:ipc
76: 1 - snd-channel: aeron:udp?endpoint=10.1.0.3:55231|mtu=1408|term-length=67108864|init-term-id=1837361267|term-id=1837361267|term-offset=0|linger=705032704 10.1.0.2:50753
77: 1 - snd-local-sockaddr: 76 10.1.0.2:50753
78: 16,960 - pub-pos (sampled): 21 1179597156 200 aeron:udp?endpoint=10.1.0.3:55231|mtu=1408|term-length=67108864|init-term-id=1837361267|term-id=1837361267|term-offset=0|linger=705032704
79: 33,571,392 - pub-lmt: 21 1179597156 200 aeron:udp?endpoint=10.1.0.3:55231|mtu=1408|term-length=67108864|init-term-id=1837361267|term-id=1837361267|term-offset=0|linger=705032704
80: 16,960 - snd-pos: 21 1179597156 200 aeron:udp?endpoint=10.1.0.3:55231|mtu=1408|term-length=67108864|init-term-id=1837361267|term-id=1837361267|term-offset=0|linger=705032704
81: 131,072 - snd-lmt: 21 1179597156 200 aeron:udp?endpoint=10.1.0.3:55231|mtu=1408|term-length=67108864|init-term-id=1837361267|term-id=1837361267|term-offset=0|linger=705032704
82: 0 - snd-bpe: 21 1179597156 200 aeron:udp?endpoint=10.1.0.3:55231|mtu=1408|term-length=67108864|init-term-id=1837361267|term-id=1837361267|term-offset=0|linger=705032704
The archive data is written to the folder ~/jar/aeron-archive
. The contents of the archive can be viewed by using the dump
command of ArchiveTool
(further details here). Sample output is below, showing the 8 byte incrementing long
clearly:
root@archive-host:~/jar/aeron-archive# java -cp /root/aeron/aeron-all-*.jar io.aeron.archive.ArchiveTool . dump 4
INFO: Mark file exists: ./archive-mark.dat
12:58:55 (start: 2021-05-22 12:55:50, activity: 2021-05-22 12:58:54)
[MarkFileHeader](sbeTemplateId=200|sbeSchemaId=100|sbeSchemaVersion=1|sbeBlockLength=128):version=196864|activityTimestamp=1621688334854|startTimestamp=1621688150252|pid=8|controlStreamId=10|localControlStreamId=10|eventsStreamId=30|headerLength=8192|errorBufferLength=0|controlChannel='aeron:udp?endpoint=archive-host:17000'|localControlChannel='aeron:ipc'|eventsChannel='aeron:udp?control-mode=dynamic|control=archive-host:17001'|aeronDirectory='/dev/shm/aeron-root'
Catalog capacity in bytes: 1048576
Dumping up to 4 fragments per recording
Recording 0
channel: aeron:ipc
streamId: 100
stream length: -1
[RecordingDescriptorHeader](sbeTemplateId=21|sbeSchemaId=101|sbeSchemaVersion=6|sbeBlockLength=32):length=160|state=VALID|checksum=0|reserved=0
[RecordingDescriptor](sbeTemplateId=22|sbeSchemaId=101|sbeSchemaVersion=6|sbeBlockLength=80):controlSessionId=0|correlationId=0|recordingId=0|startTimestamp=1621688151126|stopTimestamp=-1|startPosition=0|stopPosition=-1|initialTermId=-1358088455|segmentFileLength=134217728|termBufferLength=67108864|mtuLength=1408|sessionId=-1552063206|streamId=100|strippedChannel='aeron:ipc'|originalChannel='aeron:ipc'|sourceIdentity='aeron:ipc'
Frame at position [0] data at offset [32] with length = 8
+-------------------------------------------------+
| 0 1 2 3 4 5 6 7 8 9 a b c d e f |
+--------+-------------------------------------------------+----------------+
|00000000| 01 00 00 00 00 00 00 00 |........ |
+--------+-------------------------------------------------+----------------+
Frame at position [64] data at offset [96] with length = 8
+-------------------------------------------------+
| 0 1 2 3 4 5 6 7 8 9 a b c d e f |
+--------+-------------------------------------------------+----------------+
|00000000| 02 00 00 00 00 00 00 00 |........ |
+--------+-------------------------------------------------+----------------+
Frame at position [128] data at offset [160] with length = 8
+-------------------------------------------------+
| 0 1 2 3 4 5 6 7 8 9 a b c d e f |
+--------+-------------------------------------------------+----------------+
|00000000| 03 00 00 00 00 00 00 00 |........ |
+--------+-------------------------------------------------+----------------+
Frame at position [192] data at offset [224] with length = 8
+-------------------------------------------------+
| 0 1 2 3 4 5 6 7 8 9 a b c d e f |
+--------+-------------------------------------------------+----------------+
|00000000| 04 00 00 00 00 00 00 00 |........ |
+--------+-------------------------------------------------+----------------+
Specified frame limit 4 reached. Continue? (y/n): n
The thread structure of the host can be seen by first getting the process PID by using jps
, and then running pidstat
(notice the archive-conductor
thread):
root@archive-host:~/jar# jps
8 archive-host-0.1-SNAPSHOT-all.jar
62 Jps
root@archive-host:~/jar# pidstat -t -p 8
Linux 5.10.25-linuxkit (archive-host) 05/20/21 _x86_64_ (16 CPU)
11:25:48 UID TGID TID %usr %system %guest %wait %CPU CPU Command
11:25:48 0 7 - 0.33 1.38 0.00 0.00 1.71 4 java
11:25:48 0 - 7 0.00 0.00 0.00 0.00 0.00 4 |__java
11:25:48 0 - 8 0.00 0.00 0.00 0.00 0.01 6 |__java
11:25:48 0 - 9 0.00 0.00 0.00 0.00 0.00 7 |__GC Thread#0
11:25:48 0 - 10 0.00 0.00 0.00 0.00 0.00 5 |__G1 Main Marker
11:25:48 0 - 11 0.00 0.00 0.00 0.00 0.00 0 |__G1 Conc#0
11:25:48 0 - 12 0.00 0.00 0.00 0.00 0.00 2 |__G1 Refine#0
11:25:48 0 - 13 0.00 0.00 0.00 0.00 0.00 5 |__G1 Service
11:25:48 0 - 14 0.00 0.00 0.00 0.00 0.00 1 |__VM Thread
11:25:48 0 - 15 0.00 0.00 0.00 0.00 0.00 1 |__Reference Handl
11:25:48 0 - 16 0.00 0.00 0.00 0.00 0.00 5 |__Finalizer
11:25:48 0 - 17 0.00 0.00 0.00 0.00 0.00 7 |__Signal Dispatch
11:25:48 0 - 18 0.00 0.00 0.00 0.00 0.00 7 |__Service Thread
11:25:48 0 - 19 0.00 0.00 0.00 0.00 0.00 4 |__Monitor Deflati
11:25:48 0 - 20 0.01 0.00 0.00 0.00 0.01 2 |__C2 CompilerThre
11:25:48 0 - 21 0.00 0.00 0.00 0.00 0.01 4 |__C1 CompilerThre
11:25:48 0 - 22 0.00 0.00 0.00 0.00 0.00 5 |__Sweeper thread
11:25:48 0 - 23 0.00 0.00 0.00 0.00 0.00 4 |__Common-Cleaner
11:25:48 0 - 24 0.00 0.00 0.00 0.00 0.00 0 |__Notification Th
11:25:48 0 - 25 0.00 0.01 0.00 0.00 0.01 5 |__VM Periodic Tas
11:25:48 0 - 26 0.11 0.42 0.00 0.10 0.54 0 |__[sender,receive
11:25:48 0 - 27 0.09 0.38 0.00 0.11 0.47 7 |__archive-conduct
11:25:48 0 - 28 0.05 0.29 0.00 0.17 0.33 3 |__aeron-client-co
11:25:48 0 - 29 0.05 0.29 0.00 0.17 0.33 7 |__agent-host