Skip to content

Publications & Subscriptions

In Aeron, publications are the main objects that applications use to send data, while subscriptions are used to receive data.

It is recommended that you are familiar with Direct Buffers before continuing with this section.

Info

A common misunderstanding with Aeron is which address to use in the publication and subscription. When defining the channels, publications need to make use of the destination subscription's address. Another way to think of it: the successful offering of data to a publication results in data being pushed to a subscription. The subscription does not pull data from a publication.

Publications

Publications are the primary API used by a developer to send data to a subscription. The two methods to send data - offer and tryClaim - are both non-blocking. Note that when offering data to a publication, the offered data is appended to the local Log Buffer. The Media Driver performs the sending of data over the Publication's specified communication medium asynchronously.

Note

Publications that are not spied will not be able to send data until a Subscription is ready on the receiving side. This applies for both UDP and IPC Subscriptions.

Relation between an offered application message and an Aeron message

Messages are always sent per publication. When sending over the network or IPC layer, messages from multiple publications are never mixed together.

The maximum size of a message is determined by the MTU (Maximum Transmission Unit). The maxPayloadLength() function in the Publication API provides the largest possible single message size that can fit within a single UDP or IPC message.

If a message exceeds the maximum payload length, Aeron will automatically fragment it into multiple network messages.

Aeron can also consolidate smaller application message data into fewer network messages, depending on several factors:

  • Aeron's Media Driver sends data as soon as it is ready, without waiting for the maxPayloadLength() to be reached.
  • Network conditions, such as flow control, congestion control, and back pressure, can affect the size of messages sent over the network.

Types

ConcurrentPublication

The ConcurrentPublication is returned from by the Aeron.addPublication call. Concurrent Publications work with multiple senders, and are thread safe.

ExclusivePublication

The ExclusivePublication is returned from by the Aeron.addExclusivePublication call. Exclusive Publications do not provide thread safe implementations of offer and tryClaim. Due to the reduced internal locking, Exclusive Publication can provide higher throughput in some scenarios. Note that if you are attempting to use an initial position for replay purposes, you will need to make use of ExclusivePublication.

Offer

The offer call allows the developer to provide a Byte Buffer to send to the Publication subscribers. Note that offer makes a copy of the data as it copies it from the provided Byte Buffer(s) into the Log Buffer. Use tryClaim to avoid a copy.

Offer variants:

  • basic variant that accepts a single DirectBuffer, offset and length
  • variant that accepts two DirectBuffer parameters, each with offset and length (typically used for writing a header and body independently); note there is no equivalent on the FragmentHandler side
  • variant that accepts just a DirectBuffer. The code assumes the whole buffer must be sent, from offset 0 to the buffer.capacity()
  • variant that accepts a DirectBufferVector array
  • and for each of the above, a variant accepting a ReservedValueSupplier; this allows you to inject a long into the reserved value of the Aeron header - typically a checksum or timestamp.

Offer Usage Notes

  • Offer does not restrict message size beyond that calculated from the minimum of term buffer size / 8 or 16MB, which ever is smaller. When ever the offered data length exceeds maxPayloadLength(), the data is automatically fragmented by Aeron. Note that Aeron does not automatically reassemble the data in a subscription - it is up to the developer to make use of a FragmentAssembler to do this.

There are five response codes possible when calling offer on a publication:

  • a value greater than zero provides the new stream position, and indicates that the log buffer now contains the data in the supplied byte buffer. The data may or may not have been fragmented during the write to the term.
  • a value of -1 indicates that there is no subscription connected. Subscriptions can come and go naturally, so this does not indicate an error.
  • a value of -2 indicates that the offer was back pressured. See below
  • a value of -3 indicates that an admin action was underway as the log buffer terms were being rotated at that moment. The application should attempt the offer again. See Log Buffers
  • a value of -4 indicates that the publication is now closed and is unable to accept data.
  • a value of -5 indicates that offer failed due to the log buffer reaching the maximum position of the stream given term buffer length multiplied by three. When this happens, it is suggested the term buffer size be increased and/or the message size decreased.

TryClaim

Warning

TryClaim offers a mechanism to get the underlying MutableDirectBuffer buffer via BufferClaim.buffer(). Accessing data outside the BufferClaim bounds (for example, instead of using BufferClaim.offset() you accidentally make use of 0 as the offset) of the buffer claim can result in data corruption.

The tryClaim method is best suited to tightly controlled performance-sensitive environments. It allows the developer to reserve ('claim') a section of the log buffer, and then write directly to it. Doing so ensures that there is no copy made of the message buffer during send, as happens with the offer method. Once the write has completed, the developer must commit the write on the BufferClaim received from the tryClaim call. Since tryClaim does not support lengths over maxPayloadLength(), it does not support fragmentation - which ensures that a buffer copy isn't needed in a FragmentAssembler on receipt.

TryClaim Usage Notes

  • The maximum claimable length is given by the maxPayloadLength() function, which is the MTU length less header (with typical 1defaults this is 1,376 bytes).
  • The write must complete before the unblock timeout passes (which is 15 seconds by default), or the Media Driver will unblock the claim.
  • If you're using a ConcurrentPublication, you can use thread local BufferClaim objects to enable safe multithreaded tryClaim

There are five response codes possible when using tryClaim to get a BufferClaim:

  • a value greater than zero provides the new position in the stream. No data has been written yet, but the returned BufferClaim.buffer() can be used for the write.
  • a value of -1 indicates that there is no subscription connected.
  • a value of -2 indicates that the offer was back pressured. See below
  • a value of -3 indicates that an admin action was underway as the log buffer terms were being rotated at that moment. The application should attempt the tryClaim again. See Log Buffers
  • a value of -4 indicates that the publication is now closed and is unable to accept data.
  • a value of -5 indicates that offer failed due to the log buffer reaching the maximum position of the stream given term buffer length multiplied by three. When this happens, it is suggested the term buffer size be increased and/or the message size decreased.

Other useful methods

IsConnected

Indicates that a subscriber either is - or has very recently been - connected.

Close

Instructs the client conductor to tidy up the publication, and removes the underlying Log Buffers.

Implications on /dev/shm

While not required, using /dev/shm to hold the Media Driver folder is strongly recommended for production. Each publication added to Aeron will create its own Log Buffer. Below is the Media Driver folder showing a process with two independent IPC publications:

Tree

If you set the term buffer length (IPC or Publication as needed) for a publication to 128MB, it will create a log buffer file of approximately 384MB (made up of 3 terms, plus the metadata - see Log Buffers). As the application adds additional publications, the space utilized will grow - potentially beyond the maximum length available in /dev/shm. If this happens, an exception will be raised:

io.aeron.exceptions.RegistrationException:
    correlationId=1, errorCodeValue=0, 
AeronException : insufficient usable storage for new log
    of length=123456789 in /dev/shm (shm)

To solve this, increase space allocated to /dev/shm, decrease publications, or decrease the term buffer size.

Back pressure

Consider the following flow: a data source connector is connected to an external source that produces a large volume of data. This connector converts the data format and sends to a router via Aeron. The router in turn sends the data again over Aeron to a recipient which is internally writing to a database cluster.

What happens when the database is unable to write messages fast enough?

backpressure Recipient Router Data Source Connector External Data Source Database Cluster

On the assumption that the database write and Aeron subscription operate within the same duty cycle, then the first impact will be a reduction in polling for data on the Aeron subscription between the router and recipient. Eventually the log buffers on the router will fill up. When the router attempts to offer a message to the publication, it will receive a response indicating back pressure.

At this point the router has a decision to make - should it either drop the message, or slow down its own duty cycle by attempting to offer to the publication in a retry loop? If the code drops the message, then that data would be lost forever. But if it elects to slow down the duty cycle, then the back pressure will move left into the data source conector to router Aeron flow. Eventually then data source connector will also have a back pressure issue with offering data to the publication, and will have to make the same decision again - drop the message, or slow down.

There are several tactics that can used here:

  • Increase log buffer sizes. This would help in scenarios where there was only a temporary blip with the database engine. Note that log buffer size cannot exceed 1GB.
  • Reduce data size. Smaller data means log buffers will take longer to fill up. Again, if the database cannot deal with the data volumes long term then this will not help much.
  • Switch from an Aeron flow to an Aeron Archive flow between the data source connector and router. This moves the message storage out of log buffers (log buffers are still used for transmission) and onto physical disk, allowing tens of gigabytes of storage. The router can then slow down consumption despite a higher write rate on the input. Again, this is not a magical solution - if the database is dramatically slower, then the physical disk holding the archive data will fill up, and again the data source connector will have to decide what to do with the data - drop it or slow down.
  • Reduce data volume. Consider conflating the data within the data source connector, so that the output data is a subset of the input data.
  • Investigate the database engine. Confirm the technique used to write the data to the database is fast enough and check the database storage is efficient. If all else fails, consider increasing the server size and improving the disk speed of the database engine, or swapping to a more efficient database engine.

Constructing Publications

Synchronous API

The synchronous APIs to build new publications are simply:

final Publication exclusivePublication
        = aeron.addExclusivePublication(channel, streamId);
final Publication concurrentPublication
        = aeron.addPublication(channel, streamId);

These APIs will block, and can take a few moments to return.

Asynchronous API

Info

The Asynchronous Publication APIs are new to Aeron 1.35.0

The asynchronous API is useful when you need to construct Publications in a duty cycle that is sensitive to pauses. This API runs in two steps: first, you call asyncAdd, and then poll get, for example:

long registrationId = aeron.asyncAddExclusivePublication(uri, streamId);
while (aeron.getExclusivePublication(registrationId) == null)
{
    aeron.context().idleStrategy().idle();
}
publication = aeron.getExclusivePublication(registrationId);

Note that in a real world application, you would want to control this process with timeouts. As with the synchronous API, there is also a ConcurrentPublication equivalent for both the asyncAdd and get operation.

Subscriptions

Note

The reader should be familiar with Agents & Idle Strategies before continuing with this section.

Subscriptions are not thread safe, and must not be shared. It is possible to have multiple subscriptions polled within a single thread. Fragmented messages are not reassembled unless a FragmentAssembler is used. The call to addSubscription will block until the Media Driver responds.

Subscriptions allow the receipt of the message stream from a local or remote Publication. Use addSubscription to build a subscription for a given channel and stream. You will need to supply an object implementing the FragmentHandler interface to process the received messages.

final Subscription subscription
        = aeron.addSubscription(channel, streamId);

Warning

The buffer returned by a FragmentHandler should be considered read only. Do not keep the buffer around beyond a single duty cycle, as Aeron may begin cleaning up the underlying data. It is typically safest to extract all the data required from the buffer in the FragmentHandler.onFragment before passing it to something else that needs to process the data.

To receive message data from an Aeron subscription, call poll on the subscription. You can specify the number of messages to poll from a given poll via the second parameter. The subscription.poll method returns an int, which is the number of fragments (not messages) returned.

class Sample implements FragmentHandler
{
    void poll()
    {
        ...
        subscription.poll(this::onFragment, 1);
        ...
    }

    ...

    void onFragment(DirectBuffer buffer, int offset,
                    int length, Header header)
    {
        //do something with the buffer.
    }
}

Note that the subscription is expected to be polled within a tight loop of some kind. The duty cycle of an Agrona Agent is a natural fit for this:

@Override
public int doWork() throws Exception
{
    subscription.poll(fragmentHandler, 1);
    return 0;
}

Fragmented Message Handling

When offering data larger than the maxPayloadLength(), Aeron will automatically fragment the messages. To correctly receive the messages, a FragmentAssembler will need to be used. Within the FragmentAssembler on the receive side fragments are copied into a new buffer and handed off to the FragmentHandler.

The required code changes are highlighted below:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
class Sample implements FragmentHandler
{
    private final FragmentAssembler fragmentAssembler
                        = new FragmentAssembler(this);
    void poll()
    {
        ...
        subscription.poll(fragmentAssembler, 1);
        ...
    }

    ...

    void onFragment(DirectBuffer buffer, int offset,
                    int length, Header header)
    {
        //do something with the buffer.
    }
}

Controlled Polling

With normal polling, the subscription can receive the message but cannot make any decisions on what Aeron must do next. When more control is needed - for example, to allow for the retry of message processing, use the ControlledFragmentHandler along with subscription.controlledPoll.

The ControlledFragmentHandler has a different interface to implement:

Action onFragment(DirectBuffer buffer, int offset,
                    int length, Header header);

The action that must be returned by your method informs Aeron what to do with the received fragments. Options are:

Action Description
ABORT This cancels the processing of the fragment(s) in this call. The position is not advanced and the messages will be received again in the next poll
BREAK Stops any additional polling, and commits the position at the end of the current fragment(s). This can be useful for finely controlling the processing of several subscriptions in a single duty cycle
COMMIT Returning commit will commit the position at the current point, and allow the processing to continue. This can be useful to manage flow control
CONTINUE Continues the processing in the same way as the normal poll