One-way IPC between two agents¶
This example shows how to transmit buffers between two agents scheduled on different threads over IPC. Before continuing with this example, it will be useful to review the Simplest Full Example first as it shows IPC communications without the added complexity of agents. The reader should also be familiar with the Media Driver
The process will be constructed as follows:
- an embedded media driver running in default mode (an agent for Sender, Receiver, Conductor)
- an agent to send the IPC data over a publication (SendAgent)
- an agent to receive the IPC data over a subscription (ReceiveAgent)
Code Sample overview¶
The code sample is split over three files held within the
com.aeroncookbook.ipc.agents namespace. They are:
StartHere.java- the class responsible for setting up Aeron and scheduling the agents;
SendAgent.java- the class holding the Agent responsible for sending data;
ReceiveAgent.java- the class holding the Agent responsible for receiving data.
Each part is broken down and discussed further below.
15:13:42.814 [main] starting 15:13:42.964 [receiver] received: 1000000
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51
Constructing support objects¶
1 2 3 4 5 6
This section of the code constructs a few support objects.
- Line 1 holds the channel definition, in this case
- Line 2 holds the stream ID to use, in this case 10
- Line 3 is the number of integers to send over IPC
- Line 4 constructs the
IdleStrategyto be used by the agents. In this case, whenever the doWork duty cycle returns 0, the idle strategy will busy spin.
- Line 5 is a barrier that will be used to co-ordinate a shutdown of the sample. Once the
ReceiveAgenthas received a total of
sendCountintegers, it will signal the barrier, triggering the shutdown.
Constructing the Media Driver¶
1 2 3 4 5 6 7
This section of the code constructs a Media Driver using a defined context. The context is an object holding all of the optional configuration parameters for a Media Driver. In this case, two configurations have been overridden, which ensures that the Media Driver tidies up the Media Driver folder on startup and shutdown. Once the context is ready, the Media Driver is launched as an embedded agent.
See also: Media Driver
Constructing Aeron, the Publication and the Subscription¶
1 2 3 4
This section of the code constructs the Aeron object, again using a Context. With this context, we're letting Aeron know where the Media Driver's Aeron directory is. Once the context is ready, the Aeron object is connected to the Media Driver. Next, the IPC publication and subscription are created using the previously defined
1 2 3
Constructing and scheduling the agents¶
1 2 3 4 5 6 7 8 9 10 11 12 13
This section of the code constructs the SendAgent and ReceiveAgent agents, creates agents runners to manage them, and then starts them on specific threads. Key lines are:
- Lines 6-7 and 8-9: these lines construct the agent runners for Send and Receive respectively. Note that the idle strategy for each is given which controls how the thread uses resources following a doWork duty cycle.
- Lines 12 and 13: these lines create new threads for each agent, and starts the duty cycle.
Shutting down cleanly¶
1 2 3 4 5 6 7 8
This final section of the code is responsible for awaiting for the barrier to be tripped by the ReceiveAgent, and then correctly cleaning up the resources. First, the agents are closed, then the aeron object and finally the media driver. If you do not take care in the order of execution during shutdown, core dumps or other seemingly dramatic failures may occur.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41
The send object is responsible for sending sendCount integers over the provided Aeron Publication. The doWork method holds the agent's duty cycle is called continuously until the agent is shutdown. Once it has reached the sendCount limit, it will stop offering any more to the publication and start idling.
The most interesting parts of this code are:
- Line 18 to 34: the doWork method holding the duty cycle for this agent
- Line 22 and line 34: both of these return statements return 0, which will cause the selected idle strategy SleepingIdleStrategy to park the thread for one microsecond.
- Line 25: this will return true only once the publication is connected. Once connected, it is safe to offer to the publication.
- Line 27: this will offer the buffer data to the publication.
- Line 29: this logs the last sent integer, for example
15:13:42.818 [sender] sent: 123
- Line 40: this sets the thread name to
sender, as is visible in the log output.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40
The ReceiveAgent is responsible for polling the provided subscription and logging the received value. Once the
sendCount is hit, the receive agent signals the barrier. The most interesting parts of this object are:
- Lines 17-21 - the doWork method holding the duty cycle for this agent. The duty cycle consists of two parts - polling the subscription with events being delivered to the provided handler, and then returning 0. With the configured IdleStrategy, returning 0 will result in the thread being parked for one microsecond.
- Line 27 - this logs the integer value received, for example:
15:13:42.814 [receiver] received: 5
- Lines 29-32 - this signals the barrier, triggering the clean shutdown of the process.
- Line 38 - this sets the role name to
receiver, as visible in log output.
This example will transmit around 10 million 4-byte messages/second on an Intel laptop. If using Linux, with available /dev/shm, the code will make use of that automatically. This can be pushed to over 20m messages a second by swapping in
NoOpIdleStrategy, and moving the media driver threading to
DEDICATED. See below for key changes. Note you will need to ensure you have at least 8 physical cores on your hardware.
final IdleStrategy idleStrategySend = new NoOpIdleStrategy(); final IdleStrategy idleStrategyReceive = new NoOpIdleStrategy(); final ShutdownSignalBarrier barrier = new ShutdownSignalBarrier(); //construct Media Driver, cleaning up media driver folder on start/stop final MediaDriver.Context mediaDriverCtx = new MediaDriver.Context() .dirDeleteOnStart(true) .threadingMode(ThreadingMode.DEDICATED) .conductorIdleStrategy(new BusySpinIdleStrategy()) .senderIdleStrategy(new NoOpIdleStrategy()) .receiverIdleStrategy(new NoOpIdleStrategy()) .dirDeleteOnShutdown(true); final MediaDriver mediaDriver = MediaDriver.launchEmbedded(mediaDriverCtx); //construct Aeron, pointing at the media driver's folder final Aeron.Context aeronCtx = new Aeron.Context() .idleStrategy(new NoOpIdleStrategy()) .aeronDirectoryName(mediaDriver.aeronDirectoryName()); final Aeron aeron = Aeron.connect(aeronCtx);
There is a related Two Agent example of OneToOneRingBuffer that is very similar, except it makes use of Agrona's OneToOneRingBuffer and delivers around 18 million 4-byte messages/second with a
BusySpinIdleStrategy or over 50 million messages/second with
Using the C Media Driver¶
To test this example with Aeron's C Media Driver you will need to do the following:
First, build the C Media Driver from source (instructions will vary by your operating system):
- Building the C Media Driver on macOS
- Building the C Media Driver on CentOS Linux 8
- Building the C Media Driver on Ubuntu 20.04
- Building the C Media Driver on Windows 10
Next, start the C Media Driver with default settings
Then, remove the Media Driver from
StartHere.java, and reduce the Aeron context to defaults:
//construct Media Driver, cleaning up media driver folder on start/stop //final MediaDriver.Context mediaDriverCtx = new MediaDriver.Context() // .dirDeleteOnStart(true) // .threadingMode(ThreadingMode.SHARED) // .sharedIdleStrategy(new BusySpinIdleStrategy()) // .dirDeleteOnShutdown(true); //final MediaDriver mediaDriver = MediaDriver.launchEmbedded(mediaDriverCtx); //construct Aeron, pointing at the media driver's folder final Aeron.Context aeronCtx = new Aeron.Context(); final Aeron aeron = Aeron.connect(aeronCtx);
Aeron and the Media Driver will then default to the same directory.
StartHere.java as normal. The process should run as normal and the output should include something like:
14:30:00.293 [main] starting 14:30:00.758 [receiver] received: 10000000