Skip to content

RFQ Logic

RFQ Sequences

Overview

The RFQ Sequence generator is built using an Eider Sequence object. This provides deterministic, snapshot friendly sequence generation.

Creation

Eider Definition

1
2
3
4
5
6
@EiderSpec(eiderId = 7000, name = "RfqSequence", eiderGroup = GroupConstants.RFQ)
public class RfqSeq
{
    @EiderAttribute(sequence = true)
    private int rfqId;
}

The sequence generator is created within the constructor, using a private buffer.

1
RfqSequence rfqSequence = RfqSequence.INSTANCE();

This sets the object up using an internally allocated DirectBuffer, ready to provide new sequences starting from 1.

Next Sequences

Getting the next sequence value is done by calling nextRfqIdSequence on the object:

1
final int nextSequence = rfqSequence.nextRfqIdSequence();

The object guarantees that no sequence will be issued more than once.

State Machine

The state machine - in the Gang of Four sense, not Replicated State Machine sense - is run within the com.aeroncookbook.cluster.rfq.statemachine.states package. It validates the current state against the proposed transition, and rejects it if the actor performing the task is unable to perform that action for that RFQ or if the state transition is invalid (for example, going from RFQ Created -> RFQ Accepted).

Interface

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
public interface RfqState
{
    RfqStates getCurrentState();

    short getCurrentStateId();

    boolean canTransitionTo(RfqStates newState);

    RfqState transitionTo(RfqStates newState);
}

Additionally, a RFQ State enumeration is created which matches the states in the diagram shown in the Requirements. The values known to it are: CREATED, QUOTED, COUNTERED, ACCEPTED, REJECTED, EXPIRED, CANCELED, COMPLETED.

Each state has a corresponding implementation of the RfqState interface. As an example, this is the canTransitionTo method of the RfqQuoted state.

1
2
3
4
5
6
7
8
@Override
public boolean canTransitionTo(RfqStates newState)
{
    return newState == RfqStates.ACCEPTED
        || newState == RfqStates.COUNTERED
        || newState == RfqStates.EXPIRED
        || newState == RfqStates.REJECTED;
}

This specifies that an RFQ in the Quoted state may transition to ACCEPTED, COUNTERED, EXPIRED and REJECTED state.

All known states are pre-loaded on creation of the Rfqs object, ready for use.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
private void buildStates(Int2ObjectHashMap<RfqState> stateMachineStates)
{
    stateMachineStates.put(RfqStates.CREATED.getStateId(), RfqCreated.INSTANCE);
    stateMachineStates.put(RfqStates.QUOTED.getStateId(), RfqQuoted.INSTANCE);
    stateMachineStates.put(RfqStates.COUNTERED.getStateId(), RfqCountered.INSTANCE);
    stateMachineStates.put(RfqStates.ACCEPTED.getStateId(), RfqAccepted.INSTANCE);
    stateMachineStates.put(RfqStates.REJECTED.getStateId(), RfqRejected.INSTANCE);
    stateMachineStates.put(RfqStates.EXPIRED.getStateId(), RfqExpired.INSTANCE);
    stateMachineStates.put(RfqStates.CANCELED.getStateId(), RfqCanceled.INSTANCE);
    stateMachineStates.put(RfqStates.COMPLETED.getStateId(), RfqCompleted.INSTANCE);
}

RFQs

The RFQs object holds all known RFQs and RFQ Responses. Internally, it makes use of Eider repositories to hold data in off-heap stores. It also receives commands directly from the RfqDemuxer. The Cancel RFQ Command handler is shown below:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
public void cancelRfq(CancelRfqCommand cancelRfqCommand, long timestamp)
{
    RfqFlyweight rfqToCancel = rfqsRepository.getByKey(cancelRfqCommand.readRfqId());
    if (rfqToCancel == null)
    {
        replyError(cancelRfqCommand.readRfqId(), UNKNOWN_RFQ, "");
        return;
    }

    //only requester can cancel
    if (rfqToCancel.readRequester() != cancelRfqCommand.readUserId())
    {
        replyError(cancelRfqCommand.readRfqId(), CANNOT_CANCEL_RFQ_NO_RELATION_TO_USER, "");
        return;
    }

    if (rfqCanTransitionToState(rfqToCancel, RfqStates.CANCELED))
    {
        //update repository
        rfqToCancel.writeState(transitionTo(rfqToCancel, RfqStates.CANCELED));
        rfqToCancel.writeLastUpdate(timestamp);
        rfqToCancel.writeLastUpdateUser(cancelRfqCommand.readUserId());

        //broadcast RFQ canceled
        rfqCanceledEvent.writeClOrdId(rfqToCancel.readRequesterClOrdId());
        rfqCanceledEvent.writeRfqId(cancelRfqCommand.readRfqId());
        rfqCanceledEvent.writeRequesterUserId(rfqToCancel.readRequester());
        rfqCanceledEvent.writeResponderUserId(rfqToCancel.readResponder());
        clusterProxy.broadcast(bufferCanceledRfqEvent, 0, RfqCanceledEvent.BUFFER_LENGTH);
    } else
    {
        replyError(rfqToCancel.readId(), ILLEGAL_TRANSITION, "");
    }
}

The rfqCanTransitionToState is used to read the current state of the RFQ and ensure that it is legal to transition to the RfqStates.CANCELED state. This method simply looks up the input state of the RFQ, gets the appropriate RfqState and returns the value of the canTransitionTo call.

1
2
3
4
5
6
private boolean rfqCanTransitionToState(RfqFlyweight rfqToTransition,
                                        RfqStates destinationState)
{
    return stateMachineStates
        .get(rfqToTransition.readState()).canTransitionTo(destinationState);
}

Timers

Aeron Cluster timers offer a way for tasks to be scheduled in the future. This is useful to manage expiring RFQs and other expiry activities, such as no response within a given time to a Quote or Counter. To safely make use of Aeron Cluster timers, it is best to use monotonically incrementing correlation Ids, and as a result, we build a simple service that creates and associates timers with RFQs.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
public class TimerService
{
    private long currentTimerId = 1;
    private Object2ObjectHashMap<Long, Integer> timerMap;

    public TimerService()
    {
        timerMap = new Object2ObjectHashMap<>();
    }

    public long getCorrelationIdForRfqId(int rfqId)
    {
        currentTimerId += 1;
        timerMap.put(currentTimerId, rfqId);
        return currentTimerId;
    }

    public int getRfqIdForCorrelationId(long timerId)
    {
        return timerMap.get(timerId);
    }
}

The timer service is very simple: it just has an internal long representing the current timer correlation, and two methods - one to get a correlation for an RFQ, which returns the correlation to be used in a schedule, and another to reverse that - returning the RFQ associated with the correlation. In the RfqClusteredService, this is put to use when scheduling and accepting events:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
@Override
public void onTimerEvent(long correlationId, long timestamp)
{
    log.info("on timer correlation:{} time:{}", correlationId, timestamp);
    int rfqId = timerService.getRfqIdForCorrelationId(correlationId);
    rfqs.expire(rfqId);
}

@Override
public void scheduleExpiry(long noSoonerThanMs, int rfqId)
{
    long correlationIdForRfqId = timerService.getCorrelationIdForRfqId(rfqId);
    cluster.scheduleTimer(correlationIdForRfqId, noSoonerThanMs);
}

Within the RFQs object, the same pattern of checking state is used before processing the Expiry:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
public void expire(int rfqId)
{
    final RfqFlyweight rfqToExpire = rfqsRepository.getByKey(rfqId);

    //the system is firing this event; no one to report to.
    if (rfqToExpire == null)
    {
        return;
    }

    //ensure the transition is legal for the current state. If not, it doesn't matter
    // since it has moved onto another terminal state.
    if (rfqCanTransitionToState(rfqToExpire, RfqStates.EXPIRED))
    {
        rfqExpiredEvent.writeClOrdId(rfqToExpire.readRequesterClOrdId());
        rfqExpiredEvent.writeRequesterUserId(rfqToExpire.readRequester());
        rfqExpiredEvent.writeResponderUserId(rfqToExpire.readResponder());
        rfqExpiredEvent.writeRfqId(rfqToExpire.readId());
        clusterProxy.broadcast(bufferExpireRfqEvent, 0, RfqExpiredEvent.BUFFER_LENGTH);
    }
}

Performance

JMH sampling of two scenarios in the RFQ logic with 1 million resident RFQs is given below. This excludes any Aeron activity, but does include the RFQ logic, plus reading and writing protocol messages. CreateRfq (rfq) creates a new RFQ within the RFQs object (0.085μs at 99th percentile), while the workflow (wf) is a 3-step workflow for a single RFQ in which User 1 creates the RFQ, User 20 provides a quote for the RFQ and finally User 1 accepts the RFQ (2.052μs at 99th percentile).

Benchmark                             Mode      Cnt       Score    Error   Units
rfq                                   sample  1323481       0.441 ±  0.295   us/op
rfq:createRfq·p0.00                   sample                0.025            us/op
rfq:createRfq·p0.50                   sample                0.059            us/op
rfq:createRfq·p0.90                   sample                0.071            us/op
rfq:createRfq·p0.95                   sample                0.075            us/op
rfq:createRfq·p0.99                   sample                0.085            us/op
rfq:createRfq·p0.999                  sample                2.936            us/op
rfq:createRfq·p0.9999                 sample               26.890            us/op
rfq:createRfq·p1.00                   sample            72744.960            us/op
rfq:·gc.alloc.rate                    sample        5      16.466 ±  0.191  MB/sec
rfq:·gc.alloc.rate.norm               sample        5       0.562 ±  0.022    B/op
rfq:·gc.churn.G1_Eden_Space           sample        5       0.228 ±  0.003  MB/sec
rfq:·gc.churn.G1_Eden_Space.norm      sample        5       0.008 ±  0.001    B/op
rfq:·gc.churn.G1_Old_Gen              sample        5       6.831 ±  0.083  MB/sec
rfq:·gc.churn.G1_Old_Gen.norm         sample        5       0.233 ±  0.009    B/op
rfq:·gc.churn.G1_Survivor_Space       sample        5       0.135 ±  0.003  MB/sec
rfq:·gc.churn.G1_Survivor_Space.norm  sample        5       0.005 ±  0.001    B/op
rfq:·gc.count                         sample        5      35.000           counts
rfq:·gc.time                          sample        5      95.000               ms
wf                                    sample   856014       1.569 ±  0.875   us/op
wf:workflow·p0.00                     sample                0.057            us/op
wf:workflow·p0.50                     sample                0.088            us/op
wf:workflow·p0.90                     sample                0.095            us/op
wf:workflow·p0.95                     sample                0.103            us/op
wf:workflow·p0.99                     sample                2.052            us/op
wf:workflow·p0.999                    sample               20.736            us/op
wf:workflow·p0.9999                   sample               88.394            us/op
wf:workflow·p1.00                     sample           104202.240            us/op
wf:·gc.alloc.rate                     sample        5      81.493 ±  4.498  MB/sec
wf:·gc.alloc.rate.norm                sample        5       8.721 ±  1.451    B/op
wf:·gc.churn.G1_Eden_Space            sample        5      21.388 ± 19.373  MB/sec
wf:·gc.churn.G1_Eden_Space.norm       sample        5       2.273 ±  1.788    B/op
wf:·gc.churn.G1_Old_Gen               sample        5      16.561 ± 14.355  MB/sec
wf:·gc.churn.G1_Old_Gen.norm          sample        5       1.765 ±  1.448    B/op
wf:·gc.churn.G1_Survivor_Space        sample        5       2.181 ±  0.884  MB/sec
wf:·gc.churn.G1_Survivor_Space.norm   sample        5       0.233 ±  0.081    B/op
wf:·gc.count                          sample        5      44.000           counts
wf:·gc.time                           sample        5     831.000               ms
Back to top