Skip to content

Concurrent Collections

Ring Buffers

OneToOneRingBuffer

When you've built a process which uses multiple threads, have a single producer and a single consumer for a specific flow, and don't want to deal with the complexity of Aeron IPC, then Agrona's OneToOneRingBuffer is a good option.

Two samples using OneToOneRingBuffer are on GitHub:

Storage for this ring buffer is defined upfront, and cannot be resized. In the sample below, the underlying buffer is allocated off heap, and is set to accept 4096 bytes of content. The addition of the RingBufferDescriptor.TRAILER_LENGTH is required so that the data supporting the ring buffer is held within the same underlying buffer. Note that any data written to the ring buffer via write has an additional 8 byte header written with it, so be sure to keep this overhead in mind when sizing the underlying buffer.

1
2
3
4
5
final int bufferLength = 4096 + RingBufferDescriptor.TRAILER_LENGTH;
final UnsafeBuffer internalBuffer
        = new UnsafeBuffer(ByteBuffer.allocateDirect(bufferLength));
final OneToOneRingBuffer ringBuffer
        = new OneToOneRingBuffer(internalBuffer);

Consuming the data is done via an implementation of the agrona MessageHandler interface, for example:

1
2
3
4
5
6
7
8
9
class MessageCapture implements MessageHandler
{
    @Override
    public void onMessage(int msgTypeId, MutableDirectBuffer buffer,
                          int index, int length)
    {
        //do something 
    }
}

You'll notice that this is a very similar interface to the Aeron FragmentHandler. This makes moving from this collection object to Aeron relatively simple. The msgTypeId is used to identify the message within the header. If you do not use this field, it has to be set to a value > 0.

Sending data is very simple, but does not provide the same level of information as Aeron's Publication. An example:

1
2
3
4
5
6
//prepare some data
final UnsafeBuffer toSend = new UnsafeBuffer(ByteBuffer.allocateDirect(10));
toSend.putStringWithoutLengthAscii(0, "012345679");

//write the data
private sentOk = ringBuffer.write(1, toSend, 0, 10);

The value returned by the write method indicates if the write failed. If the value is false, then the write failed. This could be treated much like back pressure in Aeron Publications - if appropriate, give the consumer time to read some data, and retry the write.

The ring buffer can be monitored for both producer and consumer progress via methods the ring buffer object.

1
2
3
4
5
//the current consumer position in the ring buffer
ringBuffer.consumerPosition();

//the current producer position in the ring buffer
ringBuffer.producerPosition();

Using this object between Agrona agents is simple - just share the OneToOneRingBuffer between the two agents, and use the write method as you would an Aeron Publication and poll using the read method in the consumer agent's duty cycle.

Controlled Message Handler

Agrona 1.9.0 adds in the Controlled Message Handler, which operates much like the version found Aeron's Image object.

1
2
3
4
5
6
7
8
9
class ControlledMessageCapture implements ControlledMessageHandler
{
    @Override
    public ControlledMessageHandler.Action onMessage(int msgTypeId, MutableDirectBuffer buffer, int index, int length)
    {
        ..do something
        return Action.COMMIT; //or ABORT, BREAK OR CONTINUE as required.
    }
}

The difference is in the return value of type ControlledMessageHandler.Action - this allows control over what happens with the message. There are 4 options:

  • ABORT: This aborts the read operation for this message. It will be delivered again on next read
  • BREAK: This stops further processing after the current message for this read.
  • COMMIT: Continues processing, but commits at the current message.
  • CONTINUE: Continues processing, committing at the end of the current batch (this is equivalent to the standard handler).

TryClaim

Also new in 1.9.0 is tryClaim, which allows writing directly to the ring buffer. This gives the ring buffer zero copy semantics.

int claimIndex = ringBuffer.tryClaim(1, Integer.BYTES);
if (claimIndex > 0)
{
    final AtomicBuffer buffer = ringBuffer.buffer();
    buffer.putInt(claimIndex, something);
    ringBuffer.commit(claimIndex);
}
return 0;

TryClaim operates similarly to how it does in Aeron:

  • First, call tryClaim to get an index for a pre-defined length. If that index is > 0, you can continue and use the index as the offset in the buffer.put
  • Next, get the ring buffer's underlying buffer (if you don't already have it)
  • Next, write the data
  • Finally, commit (or abort). Note that there is no unblock timeout like in Aeron - if commit (or abort) is not called, then the consumer will not be able to consume past the point of the first uncommitted/non aborted claim.

See Send Agent example of OneToOneRingBuffer for an example.

ManyToOneRingBuffer

The ManyToOneRingBuffer is used exactly like the OneToOneRingBuffer, but allows for multiple producers and a single consumer.

See Three Agent example of ManyToOneRingBuffer

Broadcast

The OneToOneRingBuffer and ManyToOneRingBuffer implementations allow you to have one or many producers, but only a single consumer. Sometimes you might need one producer and one to many consumers - Agrona offers a BroadcastTransmitter and BroadcastReceiver for this.

Warning

BroadcastTransmitter and BroadcastReceiver will drop messages if the sender is producing faster than consumers can consume. There is no back pressure support - if this is required, rather consider Aeron for one to many transmission tasks.

All communication is managed via single shared AtomicBuffer. See GitHub for an example.

Sending

Sending is performed using a BroadcastTransmitter. A buffer must be prepared, and then handed over to the BroadcastTransmitter to transmit.

private final BroadcastTransmitter transmitter;
private final MutableDirectBuffer msgBuffer = new ExpandableArrayBuffer();

public SendAgent(final AtomicBuffer buffer...)
{
    this.transmitter = new BroadcastTransmitter(buffer);
}

@Override
public int doWork()
{
    ...
    msgBuffer.putInt(0, lastSend);
    transmitter.transmit(1, msgBuffer, 0, Integer.BYTES);
    ...
    lastSend++;
}

Receiving

Receiving is performed using a CopyBroadcastReceiver. This makes it simpler to receive messages and allows receipt of messages via a MessageHandler interface.

public class ReceiveAgent implements Agent, MessageHandler
{
    ...
    private final BroadcastReceiver broadcastReceiver;
    private final CopyBroadcastReceiver copyBroadcastReceiver;

    public ReceiveAgent(final AtomicBuffer atomicBuffer, final String name)
    {
        this.broadcastReceiver = new BroadcastReceiver(atomicBuffer);
        this.copyBroadcastReceiver = new CopyBroadcastReceiver(broadcastReceiver);
        ...
    }

    @Override
    public int doWork()
    {
        copyBroadcastReceiver.receive(this::onMessage);
        return 0;
    }

    @Override
    public void onMessage(int msgTypeId, MutableDirectBuffer buffer, int index, int length)
    {
        LOGGER.info("Received {}", buffer.getInt(index));
    }
...
}

If the receiver is running behind, then the following exception is raised:

java.lang.IllegalStateException: unable to keep up with broadcast