Skip to content

Building the Server

Building the server is done in 3 steps:

  1. The server process
  2. The server agent
  3. The server adapter

Server Process

The main method of the Server process is below. The key parts are:

  • Create a ShutdownSignalBarrier, which is used to close the process once the sample has completed (line 1)
  • Create a Media Driver. Note a server specific folder is used to ensure the client and server do not share the media driver folder. (lines 5-10)
  • Create the Aeron API (lines 13-15)
  • Construct the Server agent and start it running on a thread (lines 18-21)
  • Await the shutdown signal barrier to be signalled (line 24)
  • Close the Server Agent, Aeron and Media Driver (lines 26-28)

See GitHub for the complete source.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
final IdleStrategy idleStrategy = new SleepingMillisIdleStrategy();
final ShutdownSignalBarrier barrier = new ShutdownSignalBarrier();

//construct Media Driver, cleaning up media driver folder on start/stop
final MediaDriver.Context mediaDriverCtx = new MediaDriver.Context()
    .aeronDirectoryName(CommonContext.getAeronDirectoryName() + "-server")
    .dirDeleteOnStart(true)
    .dirDeleteOnShutdown(true)
    .threadingMode(ThreadingMode.SHARED);
final MediaDriver mediaDriver = MediaDriver.launchEmbedded(mediaDriverCtx);

//construct Aeron, pointing at the media driver's folder
final Aeron.Context aeronCtx = new Aeron.Context()
    .aeronDirectoryName(mediaDriver.aeronDirectoryName());
final Aeron aeron = Aeron.connect(aeronCtx);

//Construct the server agent
ServerAgent serverAgent = new ServerAgent(aeron, barrier);
AgentRunner serverAgentRunner = new AgentRunner(idleStrategy,
    Throwable::printStackTrace, null, serverAgent);
AgentRunner.startOnThread(serverAgentRunner);

//Await shutdown signal
barrier.await();

CloseHelper.quietClose(serverAgentRunner);
CloseHelper.quietClose(aeron);
CloseHelper.quietClose(mediaDriver);

Server Agent

In the server, the primary agent is very simple - all it does in doWork is poll the subscription, and in onStart it constructs the server's subscription. See GitHub for the complete source.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
@Override
public void onStart()
{
    log.info("Server starting");
    subscription = aeron.addSubscription(Constants.SERVER_URI,
    Constants.RPC_STREAM);
}

@Override
public int doWork() throws Exception
{
    return subscription.poll(serverAdapter, 1);
}

The adapter contains the majority of logic for the server.

Server Adapter

There are three key methods within the serverAdapter:

  1. The onFragment method, which is called from the subscription polling in the Agent above
  2. The blockingOpenConnection method, which opens the outbound connection to the client so that the response can be sent.
  3. The respond method, which takes the input parameter and makes it uppercase

The full source code is on GitHub

onFragment

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
public void onFragment(DirectBuffer buffer, int offset, int length, Header header)
{
    headerDecoder.wrap(buffer, offset);
    final int headerLength = headerDecoder.encodedLength();
    final int actingLength = headerDecoder.blockLength();
    final int actingVersion = headerDecoder.version();

    switch (headerDecoder.templateId())
    {
        case RpcConnectRequestDecoder.TEMPLATE_ID:
            connectRequest.wrap(buffer, offset + headerLength,
                    actingLength, actingVersion);
            final int streamId = connectRequest.returnConnectStream();
            final String uri = connectRequest.returnConnectUri();
            blockingOpenConnection(streamId, uri);
            break;
        case RpcRequestMethodDecoder.TEMPLATE_ID:
            requestMethod.wrap(buffer, offset + headerLength,
                    actingLength, actingVersion);
            final String parameters = requestMethod.parameters();
            final String correlation = requestMethod.correlation();
            respond(parameters, correlation);
            break;
        default:
            break;
    }
}

Key lines are as follows:

  • Lines 11-16, which accept the Connect Request from the client, and invoke the blockingOpenConnection
  • Lines 18-23, which accept the inbound RequestMethod and responds.

blockingOpenConnection

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
private void blockingOpenConnection(int streamId, String uri)
{
    log.info("received connect request with response URI {} stream {}",
        uri, streamId);
    publication = aeron.addExclusivePublication(uri, streamId);
    while (!publication.isConnected())
    {
        aeron.context().idleStrategy().idle();
    }
}

Key lines are as follows:

  • Line 5 which opens an exclusive publication to the client, using the values provided in the ConnectRequest
  • Lines 6-9 which blocks the process until the publication is connected.

respond

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
private void respond(String parameters, String correlation)
{
    final String returnValue = parameters.toUpperCase();
    log.info("responding on correlation {} with value {}", correlation, returnValue);

    responseEvent.wrapAndApplyHeader(buffer, 0, headerEncoder);
    responseEvent.result(returnValue);
    responseEvent.correlation(correlation);

    int retries = 3;
    do
    {
        long result = publication.offer(buffer, 0, headerEncoder.encodedLength() + responseEvent.encodedLength());
        if (result > 0)
        {
            //shutdown once the result is sent
            barrier.signal();
            break;
        } else
        {
            log.warn("aeron returned {}", result);
        }
    }
    while (--retries > 0);
}

Key lines are as follows:

  • Lines 3-8, which prepares the response data, keeping the correlation
  • Line 13, which attempts the offer to the publication (lines 12-25 wrap this in 3 retries).
  • Line 17, which signals the shutdown signal barrier that the process can terminate

Extending for multiple clients

Adding support for multiple clients can be done as follows:

  • Firstly, the process will need some other condition to exit.
  • When a connect request are received, take the header.sessionId and associate it with the constructed publication in an Int2ObjectHashMap or similar
  • When method requests come in, lookup the correct publication to use using the header.sessionId.
  • Be careful to not leak publications or subscriptions as this can result in errors (for example, Aeron counters run out of capacity around 8192 values)