RFQ Logic
RFQ Sequences
Overview
RFQ Id's are managed within the cluster. In the Rfqs
object:
...
private int rfqId = 0;
...
public void createRfq(
final String correlation,
final long expireTimeMs,
final long quantity,
final Side side,
final String cusip,
final int userId)
{
//validation logic etc
//increment the rfqId and create a new Rfq object
final Rfq rfq = new Rfq(++rfqId, correlation, expireTimeMs, quantity, side, cusip, userId);
rfqs.add(rfq);
...
}
...
State Machine
The state machine - in the Gang of Four sense, not Replicated State Machine sense - is run within the com.aeroncookbook.rfq.domain.rfq.states
package. It validates the current state against the proposed transition, and rejects invalid transitions (for example, going from RFQ Created -> RFQ Accepted
).
Interface
| public interface RfqState
{
RfqStates getCurrentState();
short getCurrentStateId();
boolean canTransitionTo(RfqStates newState);
RfqState transitionTo(RfqStates newState);
}
|
Additionally, a RFQ State enumeration is created which matches the states in the diagram shown in the Requirements. The values known to it are: CREATED
, QUOTED
, COUNTERED
, ACCEPTED
, REJECTED
, EXPIRED
, CANCELED
.
Each state has a corresponding implementation of the RfqState
interface. As an example, this is the canTransitionTo
method of the RfqQuoted
state.
| @Override
public boolean canTransitionTo(RfqStates newState)
{
return newState == RfqStates.ACCEPTED
|| newState == RfqStates.COUNTERED
|| newState == RfqStates.EXPIRED
|| newState == RfqStates.REJECTED;
}
|
This specifies that an RFQ in the Quoted
state may transition to ACCEPTED
, COUNTERED
, EXPIRED
and REJECTED
state.
All known states are pre-loaded on creation of the RfqStateHelper object, ready for use.
| private void buildStates(Int2ObjectHashMap<RfqState> stateMachineStates)
{
stateMachineStates.put(RfqStates.CREATED.getStateId(), RfqCreated.INSTANCE);
stateMachineStates.put(RfqStates.QUOTED.getStateId(), RfqQuoted.INSTANCE);
stateMachineStates.put(RfqStates.COUNTERED.getStateId(), RfqCountered.INSTANCE);
stateMachineStates.put(RfqStates.ACCEPTED.getStateId(), RfqAccepted.INSTANCE);
stateMachineStates.put(RfqStates.REJECTED.getStateId(), RfqRejected.INSTANCE);
stateMachineStates.put(RfqStates.EXPIRED.getStateId(), RfqExpired.INSTANCE);
stateMachineStates.put(RfqStates.CANCELED.getStateId(), RfqCanceled.INSTANCE);
stateMachineStates.put(RfqStates.COMPLETED.getStateId(), RfqCompleted.INSTANCE);
}
|
RFQs
The RFQs object holds all known RFQs and RFQ Responses. Internally, it makes use of an ArrayList to hold Rfq
objects. It also receives commands directly from the SbeAdapter
. As an example flow, the Cancel RFQ Command handler is shown below:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30 | public void cancelRfq(final String correlation, final int rfqId, final int cancelUserId)
{
final Rfq rfq = rfqs.stream().filter(r -> r.getRfqId() == rfqId).findFirst().orElse(null);
if (rfq == null)
{
LOGGER.info("Cannot cancel RFQ: RFQ {} not found", rfqId);
clusterClientResponder.cancelRfqConfirm(correlation, null, CancelRfqResult.UNKNOWN_RFQ);
return;
}
if (!rfq.canCancel())
{
LOGGER.info("Cannot cancel RFQ: RFQ {} invalid transition", rfqId);
clusterClientResponder.cancelRfqConfirm(correlation, null, CancelRfqResult.INVALID_TRANSITION);
return;
}
if (rfq.getRequesterUserId() != cancelUserId)
{
LOGGER.info("Cannot cancel RFQ: RFQ {} not created by user {}", rfqId, cancelUserId);
clusterClientResponder.cancelRfqConfirm(correlation, null,
CancelRfqResult.CANNOT_CANCEL_USER_NOT_REQUESTER);
return;
}
rfq.cancel();
LOGGER.info("Cancelled RFQ {}", rfq);
clusterClientResponder.cancelRfqConfirm(correlation, rfq, CancelRfqResult.SUCCESS);
clusterClientResponder.broadcastRfqCanceled(rfq);
}
|
The canCancel
method on the RFQ is used to ensure that it is legal to transition to the RfqStates.CANCELED
state. This method asks the current state of the RFQ if it can transition to CANCELED
.
| public boolean canCancel()
{
return currentState.canTransitionTo(RfqStates.CANCELED);
}
|
Finally, the cancel
method on the RFQ object will update the necessary state:
public void cancel()
{
if (currentState.canTransitionTo(RfqStates.CANCELED))
{
currentState = currentState.transitionTo(RfqStates.CANCELED);
}
}
Timers
Aeron Cluster timers offer a way for tasks to be scheduled in the future. This is useful to manage expiring RFQs and other expiry activities, such as no response within a given time to a Quote or Counter. To safely make use of Aeron Cluster timers, it is best to use monotonically incrementing correlation Ids, and as a result, we build a simple service that creates and associates timers with RFQs. This service is unchanged from the Adaptive Aeron Cluster Quickstart
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61 | public class TimerManager
{
private static final Logger LOGGER = LoggerFactory.getLogger(TimerManager.class);
private final SessionMessageContextImpl context;
private Cluster cluster;
private final Long2ObjectHashMap<Runnable> correlationIdToRunnable = new Long2ObjectHashMap<>();
private long correlationId = 0;
/**
* Constructor, accepting the context to update the cluster timestamp
* @param context the context to update the cluster timestamp
*/
public TimerManager(final SessionMessageContextImpl context)
{
this.context = context;
}
/**
* Schedules a timer
*
* @param deadline the deadline of the timer
* @param timerRunnable the timerRunnable to perform when the timer fires
* @return the correlation id of the timer
*/
public long scheduleTimer(final long deadline, final Runnable timerRunnable)
{
correlationId++;
Objects.requireNonNull(cluster, "Cluster must be set before scheduling timers");
correlationIdToRunnable.put(correlationId, timerRunnable);
cluster.idleStrategy().reset();
while (!cluster.scheduleTimer(correlationId, deadline))
{
cluster.idleStrategy().idle();
}
return correlationId;
}
...
/**
* Called when a timer cluster event occurs
* @param correlationId the cluster timer id
* @param timestamp the timestamp the timer was fired at
*/
public void onTimerEvent(final long correlationId, final long timestamp)
{
context.setClusterTime(timestamp);
if (correlationIdToRunnable.containsKey(correlationId))
{
correlationIdToRunnable.get(correlationId).run();
correlationIdToRunnable.remove(correlationId);
}
else
{
LOGGER.warn("Timer fired for unknown correlation id {}", correlationId);
}
}
...
}
|
Within the RFQs object, the expiry is scheduled as follows:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23 | public void createRfq(
final String correlation,
final long expireTimeMs,
final long quantity,
final Side side,
final String cusip,
final int userId)
{
//validation code removed
...
final Rfq rfq = new Rfq(++rfqId, correlation, expireTimeMs, quantity, side, cusip, userId);
rfqs.add(rfq);
LOGGER.info("Created RFQ {}", rfq);
//send a confirmation to the client that created the RFQ
clusterClientResponder.createRfqConfirm(correlation, rfq, CreateRfqResult.SUCCESS);
//broadcast the new RFQ to all clients
clusterClientResponder.broadcastNewRfq(rfq);
//schedule the RFQ to expire
timerManager.scheduleTimer(rfq.getExpireTimeMs(), () -> expireRfq(rfq.getRfqId()));
}
|
This schedules a Runnable to call expireRfq
, which is:
private void expireRfq(final int rfqId)
{
final Rfq rfq = rfqs.stream().filter(r -> r.getRfqId() == rfqId).findFirst().orElse(null);
if (rfq == null)
{
LOGGER.info("Cannot expire RFQ: RFQ {} not found", rfqId);
return;
}
if (!rfq.canExpire())
{
LOGGER.info("Cannot expire RFQ: RFQ {}", rfqId);
return;
}
rfq.expire();
LOGGER.info("Expired RFQ {}", rfq);
clusterClientResponder.broadcastRfqExpired(rfq);
}