Skip to content

Static Data

There are several static data objects held in the RFQ sample, and they all follow the same pattern.

To illustrate how static data is managed, Instruments are used. Instruments are held within a Global Instruments repository. This stores all the instruments known to the cluster, and provides support methods for other objects to make use of.

Demuxer

The demuxer is responsible for accepting data from the Aeron buffer data, and applying the changes to the Instruments repository. Supported methods include adding an instrument (AddInstrumentCommand), enabling/disabling and instrument (EnableInstrumentCommand) and finally reading from a snapshot (Instrument).

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
public class InstrumentDemuxer implements FragmentHandler
{
    private final Instruments instruments;
    ...
    public InstrumentDemuxer(Instruments instruments)
    {
        this.instruments = instruments;
        ...
    }

    @Override
    public void onFragment(DirectBuffer buffer, int offset, int length, Header header)
    {
        short eiderId = getEiderId(buffer, offset);
        switch (eiderId)
        {
            case AddInstrumentCommand.EIDER_ID:
                instrumentCommand.setUnderlyingBuffer(buffer, offset);
                instruments.addInstrument(instrumentCommand);
                break;
            case EnableInstrumentCommand.EIDER_ID:
                enableInstrumentCommand.setUnderlyingBuffer(buffer, offset);
                instruments.enableInstrument(enableInstrumentCommand, timestamp, session);
                break;
            case Instrument.EIDER_ID:
                instruments.loadFromSnapshot(buffer, offset);
                break;
            default:
                log.warn("Unknown instrument command {}", eiderId);
                break;
        }
    }
}

Instruments Repository

The repository holds the Instrument static data at runtime, and provides key helper methods.

Of particular interest is:

  • Line 16, which is where the inbound AddCommand results in a new instrument being appended to the collection.
  • Line 43, which uses an indexed field to check if the repository contains a given cusip.
  • Line 65, which snapshots the index repository to the provided ExclusivePublication.
  • Line 81, which reads the snapshot data in (this comes in via the demuxer above, line 25).

The key source for the repository is:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
public class Instruments
{
    private static final int DEFAULT_MIN_VALUE = 0;

    private final InstrumentRepository instrumentRepository;
    private final InstrumentSequence instrumentSequence;

    public Instruments()
    {
        instrumentRepository = InstrumentRepository.createWithCapacity(100);
        MutableDirectBuffer sequenceBuffer = new ExpandableArrayBuffer(InstrumentSequence.BUFFER_LENGTH);
        instrumentSequence = new InstrumentSequence();
        instrumentSequence.setUnderlyingBuffer(sequenceBuffer, 0);
    }

    public void addInstrument(AddInstrumentCommand addInstrument)
    {
        int nextId = instrumentSequence.nextInstrumentIdSequence();
        final Instrument instrument = instrumentRepository.appendWithKey(nextId);
        if (instrument != null)
        {
            instrument.writeCusip(addInstrument.readCusip());
            instrument.writeMinLevel(addInstrument.readMinLevel());
            instrument.writeSecurityId(addInstrument.readSecurityId());
        }
        else
        {
            log.info("Instrument repository is full. CUSIP {} ignored", addInstrument.readCusip());
        }
    }

    public int getMinValue(String cusip)
    {
        List<Integer> allWithIndexCusipValue = instrumentRepository.getAllWithIndexCusipValue(cusip);
        if (allWithIndexCusipValue.isEmpty())
        {
            return DEFAULT_MIN_VALUE;
        }
        final Instrument byBufferOffset = instrumentRepository.getByBufferOffset(allWithIndexCusipValue.get(0));
        return byBufferOffset.readMinLevel();
    }

    public boolean knownCusip(String cusip)
    {
        return !instrumentRepository.getAllWithIndexCusipValue(cusip).isEmpty();
    }


    public void enableInstrument(EnableInstrumentCommand enableInstrument, long timestamp, ClientSession session)
    {
        List<Integer> withCusip = instrumentRepository.getAllWithIndexCusipValue(enableInstrument.readCusip());
        for (Integer offset : withCusip)
        {
            Instrument byBufferOffset = instrumentRepository.getByBufferOffset(offset);
            if (byBufferOffset != null)
            {
                byBufferOffset.writeEnabled(enableInstrument.readEnabled());
            }
        }
    }

    ...

    @Override
    public void snapshotTo(ExclusivePublication snapshotPublication)
    {
        for (int index = 0; index < instrumentRepository.getCurrentCount(); index++)
        {
            final int offset = instrumentRepository.getOffsetByBufferIndex(index);
            boolean success = reliableSnapshotOffer(snapshotPublication, instrumentRepository.getUnderlyingBuffer(),
                offset, Instrument.BUFFER_LENGTH);

            if (!success)
            {
                log.info("Could not offer instrument to snapshot publication");
            }
        }
    }

    @Override
    public void loadFromSnapshot(DirectBuffer buffer, int offset)
    {
        instrumentRepository.appendByCopyFromBuffer(buffer, offset);
    }
}
Back to top