Skip to content

Threads, Agents & Duty Cycles

A common misconception developers new to Aeron have is that Agents are just a new name for Threads. This is however not correct. Any agent that is properly constructed can be scheduled in several ways:

  • On a dedicated thread;
  • With one or more other agents on a dedicated thread;
  • or via an invoker thread, in which the agent is executed by another thread manually.

Agents can deal with being executed like this without change because all they do is execute a duty cycle. They do some work, and then optionally wait. And they repeat this task forever until stopped.

As a result, correctly constructed Agents generally do not break these guidelines:

  • never make use of internal threads. Don't fire up an executor. If absolutely required, consider adding tasks to a List<Runnable> and processing them on the current thread as a part of the duty cycle;
  • never expose public methods that are called across Agents, or Threads;
  • never make use of shared datastructures that are not thread-safe across agents. Concurrent collections, such as Agrona's RingBuffers and IPC messaging systems such as Aeron are well suited for cross agent communication

Of the three guidelines above, the use of shared datastructures is the most likely to be broken in very carefully engineered scenarios. If your performance requirements absolutely demand it - and you've proven tuned Aeron IPC flows are too slow - then consider Agrona concurrent collections.

The following sections describe common solutions to tasks developers tend to try and do when breaking the guidelines.

How to get data into and out of an Agent

In general, stick with Aeron IPC or RingBuffers. This means that the code will need to perform encoding and/or serialization, which can be frustrating when you can just reach over the thread barrier and grab data. To reduce the overhead, ensure that the development process for creating new - and modifying existing - messages is as low friction as possible.

See the async example for two-way flows over Aeron IPC.

How to get a synchronous result from an Agent

If your code that is a client to an agent requires a synchronous API, and is itself not run in a duty cycle, then the simplest approach is to poll for results in a loop, and exit only when the result has been returned or the call has timed out.

A simple pseudocode example might be:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
public Object getResult(Object input)
{
    //prepare the input message
    Message toAgent = new Message(input);
    publication.offer(toAgent,...);

    //Get an instance of a clock to watch for timeout
    EpochClock clock = new SystemEpochClock();

    //create a 1 second timeout
    long timeout = clock.getTime() + 1000; 

    while(!received && clock.getTime() <= timeout)
    {
        subscription.poll(fragmentHandler);
        if (result returned)
        {
            return result;
        }
        else
        {
            idleStrategy.idle();
        }
    }

    throw timeout;
}

Doing this within a duty cycle is simpler - just make use of a simple state machine. Sample states could be AWAITING_RESULT, TIMEOUT and RECEIVED. See the Aeron RPC sample for an example of a duty cycle state machine.

How to get things to happen on schedule

Sometimes, an agent will need to perform some activity on schedule - for example sending a keep alive signal. This is easy to achieve in a duty cycle, and does not need a ScheduledExecutorService or similar.

In an Agent's duty cycle, just add a section for managing scheduled tasks:

1
2
3
4
5
6
7
@Override
public int doWork()
{
    other tasks
    ...
    checkScheduledTasks();
}

Then, within the checkScheduledTasks, the code can check the scheduled task(s) against an agent local Epoch Clock. If you have a single item, e.g. a heartbeat, you could compute and store the next execution time in a long. If you have a small number of tasks, consider an ordered list with the order being the next execution timestamp, and stop processing the list the moment an entry is beyond the current time. If you have a very large number of items, and have to deal with cancelations, you will need to take more careful selection of data structures - you could consider Agrona's DeadlineTimerWheel, or look into building your own custom structures - options include Hierarchial timing wheels (as seen in Kafka), Hashed timing wheels (as seen in Netty), skip lists (as seen in DPDK) or red-black trees (as seen in Linux's high resolution timer).

If time must stay constant for the entirety of the duty cycle, make use of a CachedEpochClock or hold the current time in a long variable. Remember that the idleStrategy will directly impact the correctness of the scheduled task execution time - if for example, tasks are scheduled for every millisecond, but have a 5 millisecond idle, then the tasks will batch up. Generally try to the idle strategy sleep period smaller than the minimum timer resolution.

Note that the above approach is not suitable for Aeron Cluster.

See also