Skip to content

RFQ Logic

RFQ Sequences

Overview

RFQ Id's are managed within the cluster. In the Rfqs object:

...
private int rfqId = 0;
...
public void createRfq(
    final String correlation,
    final long expireTimeMs,
    final long quantity,
    final Side side,
    final String cusip,
    final int userId)
{
    //validation logic etc

    //increment the rfqId and create a new Rfq object
    final Rfq rfq = new Rfq(++rfqId, correlation, expireTimeMs, quantity, side, cusip, userId);
    rfqs.add(rfq);
    ...
}
...

State Machine

The state machine - in the Gang of Four sense, not Replicated State Machine sense - is run within the com.aeroncookbook.rfq.domain.rfq.states package. It validates the current state against the proposed transition, and rejects invalid transitions (for example, going from RFQ Created -> RFQ Accepted).

Interface

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
public interface RfqState
{
    RfqStates getCurrentState();

    short getCurrentStateId();

    boolean canTransitionTo(RfqStates newState);

    RfqState transitionTo(RfqStates newState);
}

Additionally, a RFQ State enumeration is created which matches the states in the diagram shown in the Requirements. The values known to it are: CREATED, QUOTED, COUNTERED, ACCEPTED, REJECTED, EXPIRED, CANCELED.

Each state has a corresponding implementation of the RfqState interface. As an example, this is the canTransitionTo method of the RfqQuoted state.

1
2
3
4
5
6
7
8
@Override
public boolean canTransitionTo(RfqStates newState)
{
    return newState == RfqStates.ACCEPTED
        || newState == RfqStates.COUNTERED
        || newState == RfqStates.EXPIRED
        || newState == RfqStates.REJECTED;
}

This specifies that an RFQ in the Quoted state may transition to ACCEPTED, COUNTERED, EXPIRED and REJECTED state.

All known states are pre-loaded on creation of the RfqStateHelper object, ready for use.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
private void buildStates(Int2ObjectHashMap<RfqState> stateMachineStates)
{
    stateMachineStates.put(RfqStates.CREATED.getStateId(), RfqCreated.INSTANCE);
    stateMachineStates.put(RfqStates.QUOTED.getStateId(), RfqQuoted.INSTANCE);
    stateMachineStates.put(RfqStates.COUNTERED.getStateId(), RfqCountered.INSTANCE);
    stateMachineStates.put(RfqStates.ACCEPTED.getStateId(), RfqAccepted.INSTANCE);
    stateMachineStates.put(RfqStates.REJECTED.getStateId(), RfqRejected.INSTANCE);
    stateMachineStates.put(RfqStates.EXPIRED.getStateId(), RfqExpired.INSTANCE);
    stateMachineStates.put(RfqStates.CANCELED.getStateId(), RfqCanceled.INSTANCE);
    stateMachineStates.put(RfqStates.COMPLETED.getStateId(), RfqCompleted.INSTANCE);
}

RFQs

The RFQs object holds all known RFQs and RFQ Responses. Internally, it makes use of an ArrayList to hold Rfq objects. It also receives commands directly from the SbeAdapter. As an example flow, the Cancel RFQ Command handler is shown below:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
public void cancelRfq(final String correlation, final int rfqId, final int cancelUserId)
{
    final Rfq rfq = rfqs.stream().filter(r -> r.getRfqId() == rfqId).findFirst().orElse(null);
    if (rfq == null)
    {
        LOGGER.info("Cannot cancel RFQ: RFQ {} not found", rfqId);
        clusterClientResponder.cancelRfqConfirm(correlation, null, CancelRfqResult.UNKNOWN_RFQ);
        return;
    }

    if (!rfq.canCancel())
    {
        LOGGER.info("Cannot cancel RFQ: RFQ {} invalid transition", rfqId);
        clusterClientResponder.cancelRfqConfirm(correlation, null, CancelRfqResult.INVALID_TRANSITION);
        return;
    }

    if (rfq.getRequesterUserId() != cancelUserId)
    {
        LOGGER.info("Cannot cancel RFQ: RFQ {} not created by user {}", rfqId, cancelUserId);
        clusterClientResponder.cancelRfqConfirm(correlation, null,
            CancelRfqResult.CANNOT_CANCEL_USER_NOT_REQUESTER);
        return;
    }

    rfq.cancel();
    LOGGER.info("Cancelled RFQ {}", rfq);
    clusterClientResponder.cancelRfqConfirm(correlation, rfq, CancelRfqResult.SUCCESS);
    clusterClientResponder.broadcastRfqCanceled(rfq);
}

The canCancel method on the RFQ is used to ensure that it is legal to transition to the RfqStates.CANCELED state. This method asks the current state of the RFQ if it can transition to CANCELED.

1
2
3
4
public boolean canCancel()
{
    return currentState.canTransitionTo(RfqStates.CANCELED);
}

Finally, the cancel method on the RFQ object will update the necessary state:

public void cancel()
{
    if (currentState.canTransitionTo(RfqStates.CANCELED))
    {
        currentState = currentState.transitionTo(RfqStates.CANCELED);
    }
}

Timers

Aeron Cluster timers offer a way for tasks to be scheduled in the future. This is useful to manage expiring RFQs and other expiry activities, such as no response within a given time to a Quote or Counter. To safely make use of Aeron Cluster timers, it is best to use monotonically incrementing correlation Ids, and as a result, we build a simple service that creates and associates timers with RFQs. This service is unchanged from the Adaptive Aeron Cluster Quickstart

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
public class TimerManager
{
    private static final Logger LOGGER = LoggerFactory.getLogger(TimerManager.class);
    private final SessionMessageContextImpl context;
    private Cluster cluster;

    private final Long2ObjectHashMap<Runnable> correlationIdToRunnable = new Long2ObjectHashMap<>();

    private long correlationId = 0;

    /**
     * Constructor, accepting the context to update the cluster timestamp
     * @param context the context to update the cluster timestamp
     */
    public TimerManager(final SessionMessageContextImpl context)
    {
        this.context = context;
    }

    /**
     * Schedules a timer
     *
     * @param deadline the deadline of the timer
     * @param timerRunnable the timerRunnable to perform when the timer fires
     * @return the correlation id of the timer
     */
    public long scheduleTimer(final long deadline, final Runnable timerRunnable)
    {
        correlationId++;
        Objects.requireNonNull(cluster, "Cluster must be set before scheduling timers");
        correlationIdToRunnable.put(correlationId, timerRunnable);

        cluster.idleStrategy().reset();
        while (!cluster.scheduleTimer(correlationId, deadline))
        {
            cluster.idleStrategy().idle();
        }
        return correlationId;
    }

    ...
    /**
     * Called when a timer cluster event occurs
     * @param correlationId the cluster timer id
     * @param timestamp     the timestamp the timer was fired at
     */
    public void onTimerEvent(final long correlationId, final long timestamp)
    {
        context.setClusterTime(timestamp);
        if (correlationIdToRunnable.containsKey(correlationId))
        {
            correlationIdToRunnable.get(correlationId).run();
            correlationIdToRunnable.remove(correlationId);
        }
        else
        {
            LOGGER.warn("Timer fired for unknown correlation id {}", correlationId);
        }
    }
...
}

Within the RFQs object, the expiry is scheduled as follows:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
public void createRfq(
    final String correlation,
    final long expireTimeMs,
    final long quantity,
    final Side side,
    final String cusip,
    final int userId)
{
    //validation code removed 
    ...
    final Rfq rfq = new Rfq(++rfqId, correlation, expireTimeMs, quantity, side, cusip, userId);
    rfqs.add(rfq);
    LOGGER.info("Created RFQ {}", rfq);

    //send a confirmation to the client that created the RFQ
    clusterClientResponder.createRfqConfirm(correlation, rfq, CreateRfqResult.SUCCESS);

    //broadcast the new RFQ to all clients
    clusterClientResponder.broadcastNewRfq(rfq);

    //schedule the RFQ to expire
    timerManager.scheduleTimer(rfq.getExpireTimeMs(), () -> expireRfq(rfq.getRfqId()));
}

This schedules a Runnable to call expireRfq, which is:

private void expireRfq(final int rfqId)
{
    final Rfq rfq = rfqs.stream().filter(r -> r.getRfqId() == rfqId).findFirst().orElse(null);
    if (rfq == null)
    {
        LOGGER.info("Cannot expire RFQ: RFQ {} not found", rfqId);
        return;
    }

    if (!rfq.canExpire())
    {
        LOGGER.info("Cannot expire RFQ: RFQ {}", rfqId);
        return;
    }

    rfq.expire();
    LOGGER.info("Expired RFQ {}", rfq);
    clusterClientResponder.broadcastRfqExpired(rfq);
}