Skip to content

Replicated State Machines

A replicated state machine is a distributed computing concept used to maintain consistency and fault tolerance across multiple servers or nodes in a network. It functions by having each server maintain a copy of the same state machine, essentially a model representing a system's internal state and logic.

In a replicated state machine setup, all servers receive the same sequence of commands or inputs, typically through a consensus algorithm such as Raft or Paxos. With Aeron Cluster, Raft Consensus supplies this capability. Each server then processes these commands independently and updates its copy of the state machine accordingly. As long as all servers start with the same initial state and the commands are deterministic, each server's state machine will evolve identically, ensuring consistency across the network.

This approach is beneficial for distributed systems as it provides fault tolerance. If one server fails or experiences an issue, the other servers can continue to operate without disruption, as they all have identical copies of the state machine. This redundancy ensures high availability and reliability for mission-critical applications and services.

In addition to a replicated state machine, Aeron Cluster facilitates the operation of production systems on a 24×5 or 24×7 basis by providing a mechanism to snapshot the cluster's state, which includes the Replicated State Machine's internal state, and truncate the cluster log. Snapshotting significantly minimizes restart times, which will otherwise increase as the cluster log entries accumulate. To add snapshotting support to custom logic, developers must implement code that writes data to and retrieves data deterministically from the snapshot.

Determinism

Aeron Cluster relies on deterministic business logic for its proper functioning. Non-deterministic behavior — which can emerge from unexpected sources — often leads to perplexing and expensive production issues after a leader failure.

Typical Non-Deterministic Behavior Sources:

  • Circumventing the cluster log for external inputs like system time, direct database access, file system access, etc. Any code that calls for external I/O within the ClusteredService logic may introduce non-determinism.
  • Generating random numbers on each node without a shared seed, such as using Math.Random.
  • Producing genuinely random content like encryption initialization vectors, GUIDs, or similar elements. Ideally, this should be done outside the replicated state machine code or replaced with deterministic code. For instance, Snowflake IDs can be a substitute for GUIDs.
  • Utilizing data structures without fixed iteration orders. This may cause different nodes to iterate through datasets in an arbitrary, node-specific order, potentially impacting both runtime state and the reading and writing of snapshots.

To minimize the risk of determinism issues infiltrating the source code, it is recommended to employ tools such as CheckStyle (for controlling imports within specific packages), PMD, and FindBugs, in addition to conducting thorough code reviews.

Quickstart Replicated State Machine

Note

Note that areas where code has been removed for the sake of brevity are noted with ... below.

The following extracts from the quickstart illustrate some key features and constraints of replicated state machines. The core logic is contained within the Auctions.java object.

Time

Firstly, when we deal with time in a replicated state machine, we cannot rely on external sources of time such as the system clock. Note the use of clusterTime() below:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
public class Auctions
{
    ...
    private AddAuctionResult validate(
        final long createdByParticipantId,
        final long startTime,
        final long endTime,
        final String name,
        final String description)
    {
        if (startTime <= context.getClusterTime())
        {
            return AddAuctionResult.INVALID_START_TIME;
        }
        ...
    }
    ...
}

The validation logic on line 11 makes use of the cluster time to safely know what the time was when processing the message. This cluster time originates on the leader, and is the time at which the inbound (ingress) message was written to the cluster log. If logs are being replayed (e.g. for node recovery purposes), or a node is running behind the leader, then the system clock will be different, and the outcome of this validation will not be deterministic. All logic within replicated state machines must be deterministic for correct operation and recovery.

Scheduled Events

Often, business logic will need to have something happen at a scheduled time in the future, for example, starting or closing an auction. Aeron Cluster provides support for this via a cluster timer functionality, though, it is important to note that the guarantee on these timers is simply to run no sooner than the scheduled time.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
public class Auctions
{
    ...
    public void addAuction(
        final long createdByParticipantId,
        final long startTime,
        final long endTime,
        final String correlationId,
        final String name,
        final String description)
    {
        ...
        final var auctionId = idGenerator.incrementAndGet();
        ...
        final var startCorrelationId = timerManager.scheduleTimer(startTime, () -> openAuction(auctionId));
        final var endCorrelationId = timerManager.scheduleTimer(endTime, () -> closeAuction(auctionId));
        final var removeCorrelationId = timerManager.scheduleTimer(
            endTime + REMOVAL_TIMER_DURATION, () -> removeAuction(auctionId));
        auction.setStartTimerCorrelationId(startCorrelationId);
        auction.setEndTimerCorrelationId(endCorrelationId);
        auction.setRemovalTimerCorrelationId(removeCorrelationId);
        ...
    }
}

On line 14, a MutableLong is used as an idGenerator in order to deterministically generate Auction IDs.

On lines 18-22, the cluster timer is used to schedule later events for the Auction logic, such as starting, closing and deleting the auction. While Aeron Cluster will automatically snapshot and restore timers for you, the cluster state that is required to correctly react to them must also be correctly written and read from snapshot. On lines 23-25, the associated cluster timer correlation IDs are captured and stored within the auction object. This is used to support the restoration of timer snapshots.

Snapshots

Initial wiring

In the quickstart, snapshots are managed within the SnapshotManager, and this is wired into two methods of the ClusteredService.

Snapshots that should be read are passed into application code via the onStart method. Here the snapshot manager is passed a snapshot Image (if it exists) to process:

@Override
public void onStart(final Cluster cluster, final Image snapshotImage)
{
    snapshotManager.setIdleStrategy(cluster.idleStrategy());
    ...
    if (snapshotImage != null)
    {
        snapshotManager.loadSnapshot(snapshotImage);
    }
}

The loadSnapshot method polls the Image for data, handing it off to an onFragment method within Snapshot manager.

When writing snapshots, Aeron Cluster provides an ExclusivePublication to

@Override
public void onTakeSnapshot(final ExclusivePublication snapshotPublication)
{
    snapshotManager.takeSnapshot(snapshotPublication);
}

Object snapshot process

The MutableLong used for the Auction Id generation is the simplest example to read and write from snapshot, and is described below. A specific Simple Binary Encoding (SBE) message is used for snapshots.

private void offerAuctionIdGenerator(final ExclusivePublication snapshotPublication)
{
    auctionIdEncoder.wrapAndApplyHeader(buffer, 0, headerEncoder);
    auctionIdEncoder.lastId(auctions.getAuctionId());
    retryingOffer(snapshotPublication, buffer, headerEncoder.encodedLength() +
        auctionIdEncoder.encodedLength());
}

Reading the object is much the same. We wrap the inbound buffer with the correct SBE decoder, read the value, and initialize the Auction ID.

public void onFragment(final DirectBuffer buffer, final int offset, final int length, final Header header)
{
    ...
    headerDecoder.wrap(buffer, offset);

    switch (headerDecoder.templateId())
    {
        ...
        case AuctionIdSnapshotDecoder.TEMPLATE_ID ->
        {
            auctionIdDecoder.wrapAndApplyHeader(buffer, offset, headerDecoder);
            auctions.restoreAuctionId(auctionIdDecoder.lastId());
        }
        ...
    }
}

Further reading

  1. Simple Binary Encoding
  2. Cluster Timers
  3. F. B. Schneider, “Implementing fault-tolerant services using the state machine approach: a tutorial,” ACM Comput. Surv., vol. 22, no. 4, pp. 299–319, Dec. 1990, doi: 10.1145/98163.98167