Skip to content

Building the Client

Much like the server, the client is done in 3 steps:

  1. The client process
  2. The client agent
  3. The client adapter

Client Process

The main method of the Client process is below. It is largely identical to the Server process, with a few minor differences. The key parts are:

  • Create a ShutdownSignalBarrier, which is used to close the process once the sample has completed (line 1)
  • Create a Media Driver. Note a server specific folder is used to ensure the client and server do not share the media driver folder. (lines 5-10)
  • Create the Aeron API (lines 13-15)
  • Construct the Client agent and start it running on a thread (lines 18-21)
  • Await the shutdown signal barrier to be signalled (line 24)
  • Close the Server Agent, Aeron and Media Driver (lines 26-28)

See GitHub for the complete source.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
final IdleStrategy idleStrategy = new SleepingMillisIdleStrategy();
final ShutdownSignalBarrier barrier = new ShutdownSignalBarrier();

//construct Media Driver, cleaning up media driver folder on start/stop
final MediaDriver.Context mediaDriverCtx = new MediaDriver.Context()
    .aeronDirectoryName(CommonContext.getAeronDirectoryName() + "-client")
    .dirDeleteOnStart(true)
    .dirDeleteOnShutdown(true)
    .threadingMode(ThreadingMode.SHARED);
final MediaDriver mediaDriver = MediaDriver.launchEmbedded(mediaDriverCtx);

//construct Aeron, pointing at the media driver's folder
final Aeron.Context aeronCtx = new Aeron.Context()
    .aeronDirectoryName(mediaDriver.aeronDirectoryName());
final Aeron aeron = Aeron.connect(aeronCtx);

//Construct the client agent
Client clientAgent = new ClientAgent(aeron, barrier);
AgentRunner clientAgentRunner = new AgentRunner(idleStrategy,
    Throwable::printStackTrace, null, clientAgent);
AgentRunner.startOnThread(clientAgentRunner);

//Await shutdown signal
barrier.await();

CloseHelper.quietClose(clientAgentRunner);
CloseHelper.quietClose(aeron);
CloseHelper.quietClose(mediaDriver);

Client Agent

The business logic of the client is held largely within the Client Agent. Full source is on GitHub

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
@Override
public void onStart()
{
    log.info("Client starting");
    state = State.AWAITING_OUTBOUND_CONNECT;
    publication = aeron
        .addExclusivePublication(Constants.SERVER_URI, Constants.RPC_STREAM);
    subscription = aeron
        .addSubscription(Constants.CLIENT_URI, Constants.RPC_STREAM);
}

@Override
public int doWork()
{
    switch (state)
    {
        case AWAITING_OUTBOUND_CONNECT:
            awaitConnected();
            state = State.CONNECTED;
            break;
        case CONNECTED:
            sendConnectRequest();
            state = State.AWAITING_INBOUND_CONNECT;
            break;
        case AWAITING_INBOUND_CONNECT:
            awaitSubscriptionConnected();
            state = State.READY;
            break;
        case READY:
            sendMessage();
            state = State.AWAITING_RESULT;
            break;
        case AWAITING_RESULT:
            subscription.poll(clientAdapter, 1);
            break;
        default:
            break;
    }
    return 0;
}

private void sendMessage()
{
    final String input = "string to be made uppercase";
    final String correlation = randomUUID().toString();
    requestMethod.wrapAndApplyHeader(buffer, 0, headerEncoder);
    requestMethod.parameters(input);
    requestMethod.correlation(correlation);

    log.info("sending: {} with correlation {}", input, correlation);
    send(buffer, headerEncoder.encodedLength() + requestMethod.encodedLength());
}

private void sendConnectRequest()
{
    connectRequest.wrapAndApplyHeader(buffer, 0, headerEncoder);
    connectRequest.returnConnectStream(Constants.RPC_STREAM);
    connectRequest.returnConnectUri(Constants.CLIENT_URI);

    send(buffer, headerEncoder.encodedLength() + connectRequest.encodedLength());
}

private void awaitSubscriptionConnected()
{
    log.info("awaiting inbound server connect");

    while (!subscription.isConnected())
    {
        aeron.context().idleStrategy().idle();
    }
}

private void awaitConnected()
{
    log.info("awaiting outbound server connect");

    while (!publication.isConnected())
    {
        aeron.context().idleStrategy().idle();
    }
}

Key parts are highlighted:

  • onStart, which opens a publication to the server as well as a subscription to receive responses on
  • doWork, which follows a simple state machine that works its way from awaiting outbound connect to awaiting the result, in which it polls the subscription.
  • sendConnectRequest, which prepares the connect request and then sends it to the server
  • sendMessage, which sends the message with the string to be uppercased to the server
  • awaitConnected, which blocks until the outbound publication has been connected
  • awaitSubscriptionConnected, which blocks the process until the inbound subscription has been connected

Client Adapter

The client adapter awaits the response event, logs the output and then signals the shutdown barrier. Full source is on GitHub

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
@Override
public void onFragment(DirectBuffer buffer, int offset, int length, Header header)
{
    headerDecoder.wrap(buffer, offset);

    switch (headerDecoder.templateId())
    {
        case RpcResponseEventEncoder.TEMPLATE_ID:
            responseEvent.wrap(buffer, offset + headerDecoder.encodedLength(),
                    headerDecoder.blockLength(), headerDecoder.version());
            logger.info("Received {}", responseEvent.result());
            barrier.signal();
            break;
        default:
            logger.warn("Unknown message");
            break;
    }
}