Skip to content

One-way IPC between two agents

Overview

This example shows how to transmit buffers between two agents scheduled on different threads over IPC. Before continuing with this example, it will be useful to review the Simplest Full Example first as it shows IPC communications without the added complexity of agents. The reader should also be familiar with the Media Driver

The process will be constructed as follows:

  • an embedded media driver running in default mode (an agent for Sender, Receiver, Conductor)
  • an agent to send the IPC data over a publication (SendAgent)
  • an agent to receive the IPC data over a subscription (ReceiveAgent)

Code Sample overview

The code sample is split over three files held within the ipc-core project, com.aeroncookbook.ipc.agents namespace. They are:

  • StartHere.java - the class responsible for setting up Aeron and scheduling the agents;
  • SendAgent.java - the class holding the Agent responsible for sending data;
  • ReceiveAgent.java - the class holding the Agent responsible for receiving data.

Each part is broken down and discussed further below.

Execution Output

15:13:42.814 [main] starting
15:13:42.964 [receiver] received: 1000000

StartHere.java

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
public static void main(String[] args)
{
    final String channel = "aeron:ipc";
    final int stream = 10;
    final int sendCount = 1_000_000;
    final IdleStrategy idleStrategySend = new BusySpinIdleStrategy();
    final IdleStrategy idleStrategyReceive = new BusySpinIdleStrategy();
    final ShutdownSignalBarrier barrier = new ShutdownSignalBarrier();

    //construct Media Driver, cleaning up media driver folder on start/stop
    final MediaDriver.Context mediaDriverCtx = new MediaDriver.Context()
            .dirDeleteOnStart(true)
            .threadingMode(ThreadingMode.SHARED)
            .sharedIdleStrategy(new BusySpinIdleStrategy())
            .dirDeleteOnShutdown(true);
    final MediaDriver mediaDriver = MediaDriver.launchEmbedded(mediaDriverCtx);

    //construct Aeron, pointing at the media driver's folder
    final Aeron.Context aeronCtx = new Aeron.Context()
            .aeronDirectoryName(mediaDriver.aeronDirectoryName());
    final Aeron aeron = Aeron.connect(aeronCtx);

    //construct the subs and pubs
    final Subscription subscription = aeron.addSubscription(channel, stream);
    final Publication publication = aeron.addPublication(channel, stream);

    //construct the agents
    final SendAgent sendAgent = new SendAgent(publication, sendCount);
    final ReceiveAgent receiveAgent = new ReceiveAgent(subscription, barrier,
        sendCount);
    //construct agent runners
    final AgentRunner sendAgentRunner = new AgentRunner(idleStrategySend,
            Throwable::printStackTrace, null, sendAgent);
    final AgentRunner receiveAgentRunner = new AgentRunner(idleStrategyReceive,
            Throwable::printStackTrace, null, receiveAgent);

    LOGGER.info("starting");

    //start the runners
    AgentRunner.startOnThread(sendAgentRunner);
    AgentRunner.startOnThread(receiveAgentRunner);

    //wait for the final item to be received before closing
    barrier.await();

    //close the resources
    receiveAgentRunner.close();
    sendAgentRunner.close();
    aeron.close();
    mediaDriver.close();
}

Constructing support objects

1
2
3
4
5
6
final String channel = "aeron:ipc";
final int stream = 10;
final int sendCount = 1000;
final IdleStrategy idleStrategySend = new BusySpinIdleStrategy();
final IdleStrategy idleStrategyReceive = new BusySpinIdleStrategy();
final ShutdownSignalBarrier barrier = new ShutdownSignalBarrier();

This section of the code constructs a few support objects.

  • Line 1 holds the channel definition, in this case aeron:ipc
  • Line 2 holds the stream ID to use, in this case 10
  • Line 3 is the number of integers to send over IPC
  • Line 4 constructs the IdleStrategy to be used by the agents. In this case, whenever the doWork duty cycle returns 0, the idle strategy will busy spin.
  • Line 5 is a barrier that will be used to co-ordinate a shutdown of the sample. Once the ReceiveAgent has received a total of sendCount integers, it will signal the barrier, triggering the shutdown.

Constructing the Media Driver

1
2
3
4
5
6
7
//construct Media Driver, cleaning up media driver folder on start/stop
final MediaDriver.Context mediaDriverCtx = new MediaDriver.Context()
    .dirDeleteOnStart(true)
    .threadingMode(ThreadingMode.SHARED)
    .sharedIdleStrategy(new BusySpinIdleStrategy())
    .dirDeleteOnShutdown(true);
final MediaDriver mediaDriver = MediaDriver.launchEmbedded(mediaDriverCtx);

This section of the code constructs a Media Driver using a defined context. The context is an object holding all of the optional configuration parameters for a Media Driver. In this case, two configurations have been overridden, which ensures that the Media Driver tidies up the Media Driver folder on startup and shutdown. Once the context is ready, the Media Driver is launched as an embedded agent.

See also: Media Driver

Constructing Aeron, the Publication and the Subscription

1
2
3
4
//construct Aeron, pointing at the media driver's folder
final Aeron.Context aeronCtx = new Aeron.Context()
    .aeronDirectoryName(mediaDriver.aeronDirectoryName());
final Aeron aeron = Aeron.connect(aeronCtx);

This section of the code constructs the Aeron object, again using a Context. With this context, we're letting Aeron know where the Media Driver's Aeron directory is. Once the context is ready, the Aeron object is connected to the Media Driver. Next, the IPC publication and subscription are created using the previously defined channel and stream id.

1
2
3
//construct the subs and pubs
final Subscription subscription = aeron.addSubscription(channel, stream);
final Publication publication = aeron.addPublication(channel, stream);

Constructing and scheduling the agents

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
//construct the agents
final SendAgent sendAgent = new SendAgent(publication, sendCount);
final ReceiveAgent receiveAgent = new ReceiveAgent(subscription, barrier, sendCount);

//construct agent runners
final AgentRunner sendAgentRunner = new AgentRunner(idleStrategySend,
        Throwable::printStackTrace, null, sendAgent);
final AgentRunner receiveAgentRunner = new AgentRunner(idleStrategyReceive,
        Throwable::printStackTrace, null, receiveAgent);

//start the runners
AgentRunner.startOnThread(sendAgentRunner);
AgentRunner.startOnThread(receiveAgentRunner);

This section of the code constructs the SendAgent and ReceiveAgent agents, creates agents runners to manage them, and then starts them on specific threads. Key lines are:

  • Lines 6-7 and 8-9: these lines construct the agent runners for Send and Receive respectively. Note that the idle strategy for each is given which controls how the thread uses resources following a doWork duty cycle.
  • Lines 12 and 13: these lines create new threads for each agent, and starts the duty cycle.

Shutting down cleanly

1
2
3
4
5
6
7
8
//wait for the final item to be received before closing
barrier.await();

//close the resources
receiveAgentRunner.close();
sendAgentRunner.close();
aeron.close();
mediaDriver.close();

This final section of the code is responsible for awaiting for the barrier to be tripped by the ReceiveAgent, and then correctly cleaning up the resources. First, the agents are closed, then the aeron object and finally the media driver. If you do not take care in the order of execution during shutdown, core dumps or other seemingly dramatic failures may occur.

SendAgent.java

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
public class SendAgent implements Agent
{
    private final Publication publication;
    private final int sendCount;
    private final UnsafeBuffer unsafeBuffer;
    private int currentCountItem = 1;
    private final Logger logger = LoggerFactory.getLogger(SendAgent.class);

    public SendAgent(final Publication publication, int sendCount)
    {
        this.publication = publication;
        this.sendCount = sendCount;
        this.unsafeBuffer = new UnsafeBuffer(ByteBuffer.allocate(64));
        unsafeBuffer.putInt(0, currentCountItem);
    }

    @Override
    public int doWork()
    {
        if (currentCountItem > sendCount)
        {
            return 0;
        }

        if (publication.isConnected())
        {
            if (publication.offer(unsafeBuffer) > 0)
            {
                currentCountItem += 1;
                unsafeBuffer.putInt(0, currentCountItem);
            }
        }
        return 0;
    }

    @Override
    public String roleName()
    {
        return "sender";
    }
}

The send object is responsible for sending sendCount integers over the provided Aeron Publication. The doWork method holds the agent's duty cycle is called continuously until the agent is shutdown. Once it has reached the sendCount limit, it will stop offering any more to the publication and start idling.

The most interesting parts of this code are:

  • Line 18 to 34: the doWork method holding the duty cycle for this agent
  • Line 22 and line 34: both of these return statements return 0, which will cause the selected idle strategy BusySpinIdleStrategy to call ThreadHints.onSpinWait().
  • Line 25: this will return true only once the publication is connected. Once connected, it is safe to offer to the publication.
  • Line 27: this will offer the buffer data to the publication.
  • Line 29: this logs the last sent integer, for example 15:13:42.818 [sender] sent: 123
  • Line 40: this sets the thread name to sender, as is visible in the log output.

ReceiveAgent.java

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
public class ReceiveAgent implements Agent
{
    private final Subscription subscription;
    private final ShutdownSignalBarrier barrier;
    private final int sendCount;
    private final Logger logger = LoggerFactory.getLogger(ReceiveAgent.class);

    public ReceiveAgent(final Subscription subscription,
                        ShutdownSignalBarrier barrier, int sendCount)
    {
        this.subscription = subscription;
        this.barrier = barrier;
        this.sendCount = sendCount;
    }

    @Override
    public int doWork() throws Exception
    {
        subscription.poll(this::handler, 1000);
        return 0;
    }

    private void handler(DirectBuffer buffer, int offset, int length,
                        Header header)
    {
        final int lastValue = buffer.getInt(offset);

        if (lastValue >= sendCount)
        {
            logger.info("received: {}", lastValue);
            barrier.signal();
        }
    }

    @Override
    public String roleName()
    {
        return "receiver";
    }
}

The ReceiveAgent is responsible for polling the provided subscription and logging the received value. Once the sendCount is hit, the receive agent signals the barrier. The most interesting parts of this object are:

  • Lines 17-21 - the doWork method holding the duty cycle for this agent. The duty cycle consists of two parts - polling the subscription with events being delivered to the provided handler, and then returning 0. With the configured IdleStrategy, returning 0 will result in the thread being parked for one microsecond.
  • Line 27 - this logs the integer value received, for example: 15:13:42.814 [receiver] received: 5
  • Lines 29-32 - this signals the barrier, triggering the clean shutdown of the process.
  • Line 38 - this sets the role name to receiver, as visible in log output.

Performance

This example will transmit around 10 million 4-byte messages/second on an Intel laptop. If using Linux, with available /dev/shm, the code will make use of that automatically. This can be pushed to over 20m messages a second by swapping in NoOpIdleStrategy, and moving the media driver threading to DEDICATED. See below for key changes. Note you will need to ensure you have at least 8 physical cores on your hardware.

final IdleStrategy idleStrategySend = new NoOpIdleStrategy();
final IdleStrategy idleStrategyReceive = new NoOpIdleStrategy();
final ShutdownSignalBarrier barrier = new ShutdownSignalBarrier();

//construct Media Driver, cleaning up media driver folder on start/stop
final MediaDriver.Context mediaDriverCtx = new MediaDriver.Context()
        .dirDeleteOnStart(true)
        .threadingMode(ThreadingMode.DEDICATED)
        .conductorIdleStrategy(new BusySpinIdleStrategy())
        .senderIdleStrategy(new NoOpIdleStrategy())
        .receiverIdleStrategy(new NoOpIdleStrategy())
        .dirDeleteOnShutdown(true);
final MediaDriver mediaDriver = MediaDriver.launchEmbedded(mediaDriverCtx);

//construct Aeron, pointing at the media driver's folder
final Aeron.Context aeronCtx = new Aeron.Context()
        .idleStrategy(new NoOpIdleStrategy())
        .aeronDirectoryName(mediaDriver.aeronDirectoryName());
final Aeron aeron = Aeron.connect(aeronCtx);

There is a related Two Agent example of OneToOneRingBuffer that is very similar, except it makes use of Agrona's OneToOneRingBuffer and delivers around 18 million 4-byte messages/second with a BusySpinIdleStrategy or over 50 million messages/second with NoOpIdleStrategy.

Using the C Media Driver

To test this example with Aeron's C Media Driver you will need to do the following:

First, build the C Media Driver from source (instructions will vary by your operating system):

Next, start the C Media Driver with default settings

  • ./aeronmd (Linux/macOS)
  • aeronmd (Windows)

Then, remove the Media Driver from StartHere.java, and reduce the Aeron context to defaults:

//construct Media Driver, cleaning up media driver folder on start/stop
//final MediaDriver.Context mediaDriverCtx = new MediaDriver.Context()
//        .dirDeleteOnStart(true)
//        .threadingMode(ThreadingMode.SHARED)
//        .sharedIdleStrategy(new BusySpinIdleStrategy())
//        .dirDeleteOnShutdown(true);
//final MediaDriver mediaDriver = MediaDriver.launchEmbedded(mediaDriverCtx);

//construct Aeron, pointing at the media driver's folder
final Aeron.Context aeronCtx = new Aeron.Context();
final Aeron aeron = Aeron.connect(aeronCtx);

Aeron and the Media Driver will then default to the same directory.

Finally, run StartHere.java as normal. The process should run as normal and the output should include something like:

14:30:00.293 [main] starting
14:30:00.758 [receiver] received: 10000000