actix raft

Build Status Crates.io docs.rs License Crates.io Crates.io GitHub issues open GitHub issues closed

An implementation of the Raft distributed consensus protocol using the Actix actor framework. Blazing fast Rust, a modern consensus protocol, an outstanding actor framework. This project intends to provide a backbone for the next generation of distributed data storage systems (SQL, NoSQL, KV, Streaming &c) built with Rust. Please ⭐ on github!

The guide is the best place to get started, followed by the docs for more in-depth details.

This crate differs from other Raft implementations in that:

  • It is fully reactive and embraces the async ecosystem. It is driven by actual Raft related events taking place in the system as opposed to being driven by a tick operation. Batching of messages during replication is still used whenever possible for maximum throughput.
  • Storage and network integration is well defined via the two traits RaftStorage & RaftNetwork. This provides applications maximum flexibility in being able to choose their storage and networking mediums. This also allows for the storage interface to be synchronous or asynchronous based on the storage engine used, and allows for easy integration with the actix ecosystem's networking components for efficient async networking. See the storage & network chapters of the guide.
  • Submitting Raft RPCs & client requests to a running Raft node is also well defined via the Actix message types defined in the messages module in this crate. The API for this system is clear and concise. See the raft chapter in the guide.
  • It fully supports dynamic cluster membership changes according to the Raft spec. See the dynamic membership chapter in the guide.
  • Details on initial cluster formation, and how to effectively do so from an application level perspective, are discussed in the cluster formation chapter in the guide.

This implementation strictly adheres to the Raft spec (pdf warning), and all data models use the same nomenclature found in the spec for better understandability. This implementation of Raft has integration tests covering all aspects of a Raft cluster's lifecycle including: cluster formation, dynamic membership changes, snapshotting, writing data to a live cluster and more.

If you are building an application using this Raft implementation, open an issue and let me know! I would love to add your project's name & logo to a users list in this project.

contributing

Check out the CONTRIBUTING.md guide for more details on getting started with contributing to this project.

license

actix-raft is licensed under the terms of the MIT License or the Apache License 2.0, at your choosing.


NOTE: the appearance of the "section" symbols § throughout this project are references to specific sections of the Raft spec.

Getting Started

This crate's Raft type is an Actix actor which is intended to run within some parent application, which traditionally will be some sort of data storage system (SQL, NoSQL, KV store, AMQP, Streaming, whatever). Inasmuch as the Raft instance is an actor, it is expected that the parent application is also built upon the Actix actor framework, though that is not technically required.

To use this crate, applications must also implement the RaftStorage & RaftNetwork traits. See the storage & network chapters for details on what these traits represent and how to implement them. In brief, the implementing types must be actors which can handle specific message types which correspond to everything needed for Raft storage and networking.

deep dive

To get started, applications can define a type alias which declares the types which are going to be used for the application's data, errors, RaftNetwork impl & RaftStorage impl.

First, let's define the new application's main data type & a response type. This is the data which will be inside of Raft's normal log entries and the response type which the storage engine will return after applying them to the state machine.


#![allow(unused_variables)]
fn main() {
use actix_raft::{AppData, AppDataResponse};
use serde::{Serialize, Deserialize};

/// The application's data type.
///
/// Enum types are recommended as typically there will be different types of data mutating
/// requests which will be submitted by application clients.
#[derive(Clone, Debug, Serialize, Deserialize)]
enum Data {
    // Your data variants go here.
}

/// The application's data response types.
///
/// Enum types are recommended as typically there will be multiple response types which can be
/// returned from the storage layer.
#[derive(Clone, Debug, Serialize, Deserialize)]
enum DataResponse {
    // Your response variants go here.
}

/// This also has a `'static` lifetime constraint, so no `&` references at this time.
/// The new futures & async/await should help out with this quite a lot, so
/// hopefully this constraint will be removed in actix as well.
impl AppData for Data {}

/// This also has a `'static` lifetime constraint, so no `&` references at this time.
impl AppDataResponse for DataResponse {}
}

Now we'll define the application's error type.


#![allow(unused_variables)]
fn main() {
use actix_raft::AppError;
use serde::{Serialize, Deserialize};

/// The application's error struct. This could be an enum as well.
///
/// NOTE: the below impls for Display & Error can be
/// derived using crates like `Failure` &c.
#[derive(Debug, Serialize, Deserialize)]
pub struct Error;

impl std::fmt::Display for Error {
    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
        // ... snip ...
    }
}

impl std::error::Error for Error {}

// Mark this type for use as an `actix_raft::AppError`.
impl AppError for Error {}
}

Now for the two big parts. RaftNetwork & RaftStorage. Here, we will only look at the skeleton for these types. See the network & storage chapters for more details on how to actually implement these types. First, let's cover the network impl.


#![allow(unused_variables)]
fn main() {
use actix::{Actor, Context, ResponseActFuture};
use actix_raft::{RaftNetwork, messages};

/// Your application's network interface actor.
struct AppNetwork {/* ... snip ... */}

impl Actor for AppNetwork {
    type Context = Context<Self>;

    // ... snip ... other actix methods can be implemented here as needed.
}

// Ensure you impl this over your application's data type. Here, it is `Data`.
impl RaftNetwork<Data> for AppNetwork {}

// Then you just implement the various message handlers.
// See the network chapter for details.
impl Handler<messages::AppendEntriesRequest<Data>> for AppNetwork {
    type Result = ResponseActFuture<Self, messages::AppendEntriesResponse, ()>;

    fn handle(&mut self, _msg: messages::AppendEntriesRequest<Data>, _ctx: &mut Self::Context) -> Self::Result {
        // ... snip ...
        actix::fut::err(())
    }
}

// Impl handlers on `AppNetwork` for the other `actix_raft::messages` message types.
}

Now for the storage impl. We'll use an actix::Context here (which is async), but you could also use an actix::SyncContext.


#![allow(unused_variables)]
fn main() {
use actix::{Actor, Context, ResponseActFuture};
use actix_raft::{NodeId, RaftStorage, storage};

/// Your application's storage interface actor.
struct AppStorage {/* ... snip ... */}

// Ensure you impl this over your application's data, data response & error types.
impl RaftStorage<Data, DataResponse, Error> for AppStorage {
    type Actor = Self;
    type Context = Context<Self>;
}

impl Actor for AppStorage {
    type Context = Context<Self>;

    // ... snip ... other actix methods can be implemented here as needed.
}

// Then you just implement the various message handlers.
// See the storage chapter for details.
impl Handler<storage::GetInitialState<Error>> for AppStorage {
    type Result = ResponseActFuture<Self, storage::InitialState, Error>;

    fn handle(&mut self, _msg: storage::GetInitialState<Error>, _ctx: &mut Self::Context) -> Self::Result {
        // ... snip ...
        actix::fut::err(())
    }
}

// Impl handlers on `AppStorage` for the other `actix_raft::storage` message types.
}

In order for Raft to expose metrics on how it is doing, we will need a type which can receive RaftMetrics messages. Application's can do whatever they want with this info. Expose integrations with Prometheus & Influx, trigger events, whatever is needed. Here we will keep it simple.


#![allow(unused_variables)]
fn main() {
use actix::{Actor, Context};
use actix_raft::RaftMetrics;

/// Your application's metrics interface actor.
struct AppMetrics {/* ... snip ... */}

impl Actor for AppMetrics {
    type Context = Context<Self>;

    // ... snip ... other actix methods can be implemented here as needed.
}

impl Handler<RaftMetrics> for AppMetrics {
    type Result = ();

    fn handle(&mut self, _msg: RaftMetrics, _ctx: &mut Context<Self>) -> Self::Result {
        // ... snip ...
    }
}
}

And finally, a simple type alias which ties everything together. This type alias can then be used throughout the application's code base without the need to specify the various types being used for data, errors, network & storage.


#![allow(unused_variables)]
fn main() {
use actix_raft::Raft;

/// A type alias used to define an application's concrete Raft type.
type AppRaft = Raft<Data, DataResponse, Error, AppNetwork, AppStorage>;
}

booting up the system

Now that the various needed types are in place, the actix system will need to be started, the various actor types we've defined above will need to be started, and then we're off to the races.

use actix;
use actix_raft::{Config, ConfigBuilder, SnapshotPolicy};

fn main() {
    // Build the actix system.
    let sys = actix::System::new("my-awesome-app");

    // Build the needed runtime config for Raft specifying where
    // snapshots will be stored. See the storage chapter for more details.
    let config = Config::build(String::from("/app/snapshots")).validate().unwrap();

    // Start off with just a single node in the cluster. Applications
    // should implement their own discovery system. See the cluster
    // formation chapter for more details.
    let members = vec![1];

    // Start the various actor types and hold on to their addrs.
    let network = AppNetwork.start();
    let storage = AppStorage.start();
    let metrics = AppMetrics.start();
    let app_raft = AppRaft::new(1, config, network, storage, metrics).start();

    // Run the actix system. Unix signals for termination &
    // graceful shutdown are automatically handled.
    let _ = sys.run();
}

You've already ascended to the next level of AWESOME! There is a lot more to cover, we're just getting started. Next, let's take a look at the Raft type in more detail.

Raft

The central most type of this crate is the Raft type. It is a highly generic actor with the signature:

Raft<D: AppData, R: AppDataResponse, E: AppError, N: RaftNetwork<D>, S: RaftStorage<D, R, E>>

The generics here allow Raft to use statically known types, defined in the parent application using this crate, for maximum performance and type-safety. Users of this Raft implementation get to choose the exact types they want to use for application specific error handling coming from the storage layer, and also get to work with their application's data types directly without the overhead of serializing and deserializing the data as it moves through the Raft system.

API

As the Raft type is an Actix Actor, all interaction with Raft is handled via message passing. All pertinent message types derive the serde traits for easier integration with other data serialization formats in the Rust ecosystem, providing maximum flexibility for applications using this crate.

All message types are sent to a Raft node via the actor's Addr. Applications using this crate are expected to have networking capabilities for cluster communication & client interaction. Applications are responsible for handling client requests & Raft RPCs coming from their network layer, and must send them to the Raft actor returning the response. More details on this topic can be found in the network chapter.

The public API of the Raft type is broken up into 3 sections: Client Requests, Raft RPCs & Admin Commands.

Client Requests
  • ClientPayload: a payload of data which needs to be committed to the Raft cluster. Typically, this will be data coming from application clients.
Raft RPCs
  • AppendEntriesRequest: An RPC invoked by the leader to replicate log entries (§5.3); also used as heartbeat (§5.2).
  • VoteRequest: An RPC invoked by candidates to gather votes (§5.2).
  • InstallSnapshotRequest: Invoked by the Raft leader to send chunks of a snapshot to a follower (§7).
Admin Commands
  • InitWithConfig: Initialize a pristine Raft node with the given config & start a campaign to become leader.
  • ProposeConfigChange: Propose a new membership config change to a running cluster.

client requests diagram

The following diagram shows how client requests are presented to Raft from within an application, how the data is stored, replicated and ultimately applied to the application's state machine.

The numbered elements represent segments of the workflow.

  1. The parent application has received a client request, and presents the payload to Raft using the ClientPayload type.
  2. Raft will present the payload to the RaftStorage impl via the AppendEntryToLog type. This is the one location where the RaftStorage impl may return an application specific error. This could be for validation logic, enforcing unique indices, data/schema validation; whatever application level rules the application enforces, this is where they should be enforced. Close to the data, just before it hits the Raft log.
  3. The RaftStorage impl responds to the Raft actor. If it is successful, go to step 4, else the error response will be sent back to the caller immediately. The error response is a statically known type defined by the parent application.
  4. Raft uses the RaftNetwork impl to communicate with the peers of the cluster.
  5. Raft uses the RaftNetwork impl to replicate the entry to all other nodes in the cluster.
  6. Follower nodes in the cluster respond upon successful replication.
  7. Once the entry has been replicated to a majority of nodes in the cluster — known as a "committed" entry in the Raft spec — it is ready to be applied to the application's state machine.
  8. Raft will apply the entry to the application's state machine via the ApplyEntryToStateMachine type.
  9. The RaftStorage impl responds to the Raft actor.
  10. The success response is returned to the caller.

NOTE: this implementation of Raft offers the option for client requests to receive a response once its entry has been committed, and before it is applied to the state machine. This is controlled by the ClientPayload.response_type field, which is an instance of the ResponseMode enum which may be either Committed or Applied. Application's may use either depending on their needs.


The API is simple enough, but there is more to learn about Raft than just feeding it messages. The next logical topic to understand is Raft networking.

Network

Raft is a distributed consensus protocol, so the ability to send and receive data over a network is integral to the proper functionality of nodes within a Raft cluster.

The network capabilities required by this system are broken up into two parts: the application network & the RaftNetwork trait.

Application Network

The main role of the application network, in this context, is to handle client requests and then feed them into Raft. There are a few other important things that it will probably need to do as well, depending on the application's needs, here are a few other common networking roles:

  • discovery: a component which allows the members of an application cluster (its nodes) to discover and communicate with each other. This is not provided by this crate. There are lots of solutions out there to solve this problem. Applications can build their own discovery system by way of DNS, they could use other systems like etcd or consul. The important thing to note here is that once a peer is discovered, it would be prudent for application nodes to maintain a connection with that peer, as heartbeats are very regular, and building network connections is not free.
  • data format: the way that data is serialized and sent accross the networking medium. Popular data formats include protobuf, capnproto, flatbuffers, message pack, JSON &c. Applications are responsible for serializing and deserializing the various message types used in this crate for network transmission. Serde is used throughout this system to aid on this front.

Applications must be able to facilitate message exchange between nodes reliably.

trait RaftNetwork

This trait defines the requirement of an application's ability to send and receive Raft RPCs.


#![allow(unused_variables)]
fn main() {
pub trait RaftNetwork<D>
    where
        D: AppData,
        Self: Actor<Context=Context<Self>>,

        Self: Handler<AppendEntriesRequest<D>>,
        Self::Context: ToEnvelope<Self, AppendEntriesRequest<D>>,

        Self: Handler<InstallSnapshotRequest>,
        Self::Context: ToEnvelope<Self, InstallSnapshotRequest>,

        Self: Handler<VoteRequest>,
        Self::Context: ToEnvelope<Self, VoteRequest>,
{}
}

Stated simply, all this trait requires is that the implementing type be an Actix Actor & that it implement handlers for the following message types:

  • AppendEntriesRequest
  • InstallSnapshotRequest
  • VoteRequest

The type used to implement RaftNetwork could be the same type used to provide the other networking capabilities of an application, or it could be an independent type. The requirement is that the implementing type must be able to transmit the RPCs it receives on its handlers to the target Raft nodes identified in the RPCs. This trait is used directly by the Raft actor to send heartbeats to other nodes in the Raft cluster to maintain leadership, replicate entries, request votes when an election takes place, and to install snapshots.


Now that we've got a solid taste for the network requirements, the next logic topic to understand is Raft storage.

Storage

The way that data is stored and represented is an integral part of every data storage system. Whether it is a SQL or NoSQL database, a columner store, a KV store, or anything which stores data, control over the storage technology and technique is critical.

This implementation of Raft uses the RaftStorage trait to define the behavior needed of an application's storage layer to work with Raft. This is definitely the most complex looking trait in this crate. Ultimately the implementing type must be an Actix Actor and it must implement handlers for a specific set of message types.

When creatinga new RaftStorage instance, it would be logical to supply the ID of the parent Raft node as well as the node's snapshot directory. Such information is needed when booting a node for the first time.


#![allow(unused_variables)]
fn main() {
pub trait RaftStorage<D, R, E>: 'static
    where
        D: AppData,
        R: AppDataResponse,
        E: AppError,
{
    /// The type to use as the storage actor. Should just be Self.
    type Actor: Actor<Context=Self::Context> +
        Handler<GetInitialState<E>> +
        Handler<SaveHardState<E>> +
        Handler<GetLogEntries<D, E>> +
        Handler<AppendEntryToLog<D, E>> +
        Handler<ReplicateToLog<D, E>> +
        Handler<ApplyEntryToStateMachine<D, R, E>> +
        Handler<ReplicateToStateMachine<D, E>> +
        Handler<CreateSnapshot<E>> +
        Handler<InstallSnapshot<E>> +
        Handler<GetCurrentSnapshot<E>>;

    /// The type to use as the storage actor's context. Should be `Context<Self>` or `SyncContext<Self>`.
    type Context: ActorContext +
        ToEnvelope<Self::Actor, GetInitialState<E>> +
        ToEnvelope<Self::Actor, SaveHardState<E>> +
        ToEnvelope<Self::Actor, GetLogEntries<D, E>> +
        ToEnvelope<Self::Actor, AppendEntryToLog<D, E>> +
        ToEnvelope<Self::Actor, ReplicateToLog<D, E>> +
        ToEnvelope<Self::Actor, ApplyEntryToStateMachine<D, R, E>> +
        ToEnvelope<Self::Actor, ReplicateToStateMachine<D, E>> +
        ToEnvelope<Self::Actor, CreateSnapshot<E>> +
        ToEnvelope<Self::Actor, InstallSnapshot<E>> +
        ToEnvelope<Self::Actor, GetCurrentSnapshot<E>>;
}
}

Actix handlers must be implemented for the following types, all of which are found in the storage module:

  • GetInitialState: A request from Raft to get Raft's state information from storage.
  • SaveHardState: A request from Raft to save its HardState.
  • GetLogEntries: A request from Raft to get a series of log entries from storage.
  • AppendEntryToLog: A request from Raft to append a new entry to the log.
  • ReplicateToLog: A request from Raft to replicate a payload of entries to the log.
  • ApplyEntryToStateMachine: A request from Raft to apply the given log entry to the state machine.
  • ReplicateToStateMachine: A request from Raft to apply the given log entries to the state machine, as part of replication.
  • CreateSnapshot: A request from Raft to have a new snapshot created which covers the current breadth of the log.
  • InstallSnapshot: A request from Raft to have a new snapshot written to disk and installed.
  • GetCurrentSnapshot: A request from Raft to get metadata of the current snapshot.

The following sections detail how to implement a safe and correct storage system for Raft using the RaftStorage trait. A very important note to keep in mind: data storage, data layout, data representation ... all of that is up to the implementor of the RaftStorage trait. That's the whole point. Every application is going to have nuances in terms of what they need to do at the storage layer. This is one of the primary locations where an application can innovate and differentiate.

state

This pertains to implementing the GetInitialState & SaveHardState handlers.

GetInitialState

When the storage system comes online, it should check for any state currently on disk. Based on how the storage layer is persisting data, it may have to look in a few locations to get all of the needed data. Once the InitialState data has been collected, respond.

SaveHardState

This handler will be called periodically based on different events happening in Raft. Primarily, membership changes and elections will cause this to be called. Implementation is simple. Persist the data in the given HardState to disk, ensure that it can be accurately retrieved even after a node failure, and respond.

log & state machine

This pertains to implementing the GetLogEntries, AppendEntryToLog, ReplicateToLog, ApplyEntryToStateMachine & ReplicateToStateMachine handlers.

Traditionally, there are a few different terms used to refer to the log of mutations which are to be applied to a data storage system. Write-ahead log (WAL), op-log, there are a few different terms, sometimes with different nuances. In Raft, this is known simply as the log. A log entry describes the "type" of mutation to be applied to the state machine, and the state machine is the actual business-logic representation of all applied log entries.

GetLogEntries

This will be called at various times to fetch a range of entries from the log. The start field is inclusive, the stop field is non-inclusive. Simply fetch the specified range of logs from the storage medium, and return them.

AppendEntryToLog

Called as the direct result of a client request and will only be called on the Raft leader node. THIS IS THE ONE AND ONLY RaftStorage handler which is allowed to return errors which will not cause the Raft node to terminate. Reveiw the docs on the AppendEntryToLog type, and you will see that its message response type is the AppError type, which is a statically known error type chosen by the implementor (which was reviewed earlier in the raft overview chapter).

This is where an application may enforce business-logic rules, such as unique indices, relational constraints, type validation, whatever is needed by the application. If everything checks out, insert the entry at its specified index in the log. Don't just blindly append, use the entry's index. There are times when log entries must be overwritten, and Raft guarantees the safety of such operations.

Another very important note: per the Raft spec in §8, to ensure that client requests are not applied > 1 due to a failure scenario and the client issuing a retry, the Raft spec recommends that applications track client IDs and use serial numbers on each request. This handler may then use that information to reject duplicate request using an application specific error. The application's client may observe this error and treat it as an overall success. This is an application level responsibility, Raft simply provides the mechanism to be able to implement it.

ReplicateToLog

This is similar to AppendEntryToLog except that this handler is only called on followers, and they should never perform validation or falible operations. If this handler returns an error, the Raft node will terminate in order to guard against data corruption. As mentioned previously, there are times when log entries must be overwritten. Raft guarantees the safety of these operations. Use the index of each entry when inserting into the log.

ApplyEntryToStateMachine

Once a log entry is known to be committed (it has been replicated to a majority of nodes in the cluster), the leader will call this handler to apply the entry to the application's state machine. Committed entries will never be removed or overwritten in the log, which is why it is safe to apply the entry to the state machine. To implement this handler, apply the contents of the entry to the application's state machine in whatever way is needed. This handler is allowed to return an application specific response type, which allows the application to return arbitrary information about the process of applying the entry.

For example, if building a SQL database, and the entry calls for inserting a new record and the full row of data needs to be returned to the client, this handler may return such data in its response.

Raft, as a protocol, guarantees strict linearizability. Entries will never be re-applied. The only case where data is removed from the state machine is during some cases of snapshotting where the entire state machine needs to be rebuilt. Read on for more details.

NOTE WELL: there are times when Raft needs to append blank entries to the log which will end up being applied to the state machine. See §8 for more details. Application's should handle this with a "no-op" variant of their AppDataResponse type.

ReplicateToStateMachine

This is similar to ApplyEntryToStateMachine except that this handler is only called on followers as part of replication, and are not allowed to return response data (as there is nothing to return response data to during replication).

snapshots & log compaction

This pertains to implementing the CreateSnapshot, InstallSnapshot & GetCurrentSnapshot.

The snapshot and log compaction capabilities defined in the Raft spec are fully supported by this implementation. The storage layer is left to the application which uses this Raft implementation, but all snapshot behavior defined in the Raft spec is supported. Additionally, this implemention supports:

  • Configurable snapshot policies. This allows nodes to perform log compacation at configurable intervals.
  • Leader based InstallSnapshot RPC support. This allows the Raft leader to make determinations on when a new member (or a slow member) should receive a snapshot in order to come up-to-date faster.

For clarity, it is emphasized that implementing the log compaction & snapshot creation behavior is up to the RaftStorage implementor. This guide is here to help, and §7 of the Raft spec is dedicated to the subject.

CreateSnapshot

This handler is called when the Raft node determines that a snapshot is needed based on the cluster's configured snapshot policy. Raft guarantees that this interface will never be called multiple overlapping times, and it will not be called when an InstallSnapshot operation is in progress.

It is critical to note that the newly created snapshot must be able to be used to completely and accurately create a state machine. In addition to saving space on disk (log compaction), snapshots are used to bring new Raft nodes and slow Raft nodes up-to-speed with the cluster leader.

implementation algorithm:

  • The generated snapshot should include all log entries starting from entry 0 up through the index specified by CreateSnapshot.through. This will include any snapshot which may already exist. If a snapshot does already exist, the new log compaction process should be able to just load the old snapshot first, and resume processing from its last entry.
  • The newly generated snapshot should be written to the configured snapshot directory.
  • All previous entries in the log should be deleted up to the entry specified at index through.
  • The entry at index through should be replaced with a new entry created from calling Entry::new_snapshot_pointer(...).
  • Any old snapshot will no longer have representation in the log, and should be deleted.
  • Return a CurrentSnapshotData struct which contains all metadata pertinent to the snapshot.
InstallSnapshot

This handler is called when the leader of the Raft cluster has determined that the subject node needs to receive a new snapshot. This is typically the case when new nodes are added to a running cluster, or if a node has gone offline for some amount of time without being removed from the cluster, or the node is VERY slow.

This message holds an UnboundedReceiver which will stream in new chunks of data as they are received from the Raft leader. See the docs on the InstallSnapshotChunk for more info.

implementation algorithm:

  • Upon receiving the request, a new snapshot file should be created on disk.
  • Every new chunk of data received should be written to the new snapshot file starting at the offset specified in the chunk. Once the chunk has been successfully written, the InstallSnapshotChunk.cb (a oneshot::Sender) should be called to indicate that the storage engine has finished writing the chunk.
  • If the receiver is dropped, the snapshot which was being created should be removed from disk, and a success response should be returned.

Once a chunk is received which is the final chunk of the snapshot (InstallSnapshotChunk.done), after writing the chunk's data, there are a few important steps to take:

  • Create a new entry in the log via the Entry::new_snapshot_pointer(...) constructor. Insert the new entry into the log at the specified index of the original InstallSnapshot payload.
  • If there are any logs older than index, remove them.
  • If there are any other snapshots in the configured snapshot dir, remove them.
  • If an existing log entry has same index and term as snapshot's last included entry, retain log entries following it, then return.
  • Else, discard the entire log leaving only the new snapshot pointer. The state machine must be rebuilt from the new snapshot. Return once the state machine has been brought up-to-date.
GetCurrentSnapshot

A request to get information on the current snapshot. RaftStorage implementations must take care to ensure that there is only ever one active snapshot, old snapshots should be deleted as part of CreateSnapshot and InstallSnapshot requests, and the snapshot information should be able to be retrieved efficiently. Having to load and parse the entire snapshot on each GetCurrentSnapshot request may not be such a great idea! Snapshots can be quite large.


Woot woot! Made it through the hard part! There is more to learn, so let's keep going.

Cluster Controls

Raft nodes may be controlled in various ways outside of the normal flow of the Raft protocol using the admin message types. This allows the parent application — within which the Raft node is running — to influence the Raft node's behavior based on application level needs.

concepts

In the world of Raft consensus, there are a few aspects of a Raft node's lifecycle which are not directly dictated in the Raft spec. Cluster formation and the preliminary details of what would lead to dynamic cluster membership changes are a few examples of concepts not directly detailed in the spec. This implementation of Raft offers as much flexibility as possible to deal with such details in a way which is safe according to the Raft specification, but also in a way which preserves flexibility for the many different types of applications which may be implemented using Raft.

Cluster Formation

All Raft nodes, when they first come online in a pristine state, will enter into the NonVoter state, which is a completely passive state. This gives the parent application the ability to issue admin commands to the node based on the intention of the parent application.

To form a new cluster, all application nodes must issue the InitWithConfig command to their embedded Raft nodes with the IDs of all discovered nodes which are to be part of the cluster (including the ID of the running node). Or if the application is to run in a standalone / single-node manner, it may issue the command with only its own ID.

InitWithConfig

This command is used exclusively for the formation of new clusters. This command will fail if the node is not in the NonVoter state, or if the node's log index is not 0.

This will cause the Raft node to hold the given configuration in memory and then immediately perform the election protocol. For single-node clusters, the node will immediately become leader, for multi-node clusters it will submit RequestVote RPCs to all of the nodes in the given config list. NOTE WELL that EVERY node in the cluster MUST perform this action when a new cluster is being formed. It is safe for all nodes to issue this command in parallel. Once this process has been completed, the newly elected leader will append the given membership config data to the log, ensuring that the new configuration will be reckoned as the initial cluster configuration moving forward throughout the life of the cluster.

However, in order to ensure that multiple independent clusters aren't formed by prematurely issuing the InitWithConfig command before all peers are discovered, it would be prudent to have all discovered node's exchange some information during their handshake protocol. This will allow the parent application to make informed decisions as to whether the InitWithConfig should be called and how early it should be called when starting a new cluster. An application level configuration for this facet is recommended.

Generally speaking, an application config like initial_cluster_formation_delay (or the like), which configures the application to wait for the specifed amount of time before issuing an InitWithConfig command, should do the trick. The value for such a configuration should simply be a few orders of magnitude greater than the amount of time it takes for all the nodes of a new cluster to come online and discover each other.

As a rule of thumb, when new nodes come online, the leader of an existing Raft cluster will eventually discover the node (via the application's discovery system), and in such cases, the application should submit a new ProposeConfigChange to the leader to add it to the cluster. The same goes for removing nodes from the cluster.

For single-node clusters, scaling up the cluster by adding new nodes via the ProposeConfigChange command should work as expected, but there is one invariant which must be upheld: the original node of the cluster must remain online until at least half of the other new nodes have been brough up-to-date, otherwise the Raft cluster will not be able to make progress. After the other nodes have been brought up-to-date, everything should run normally according to the Raft spec.

Dynamic Membership

Throughout the lifecycle of a Raft cluster, various nodes may need to go offline for various reasons. They may experience hardware or software errors which cause them to go offline when unintended, or perhaps a cluster had too many nodes and it needs to downsize. New nodes may be added to clusters as well in order to replace old nodes, nodes going offline for maintenence, or simply to increase the size of a cluster. Applications may control such events using the ProposeConfigChange command. This command allows for nodes to be safely added and removed form a running Raft cluster.

ProposeConfigChange

This command will propose a new config change to a running cluster. This command will fail if the Raft node to which this command was submitted is not the Raft leader, and the outcome of the proposed config change must not leave the cluster in a state where it will have less than two functioning nodes, as the cluster would no longer be able to make progress in a safe manner. Once the leader receives this command, the new configuration will be appended to the log and the Raft dynamic configuration change protocol will begin. For more details on how this is implemented, see §6 of the Raft spec.

Cluster auto-healing, where cluster members which have been offline for some period of time are automatically removed, is an application specific behavior, but is fully supported via this dynamic cluster membership system.

Likewise, dynamically adding new nodes to a running cluster based on an application's discovery system is also fully supported by this system.

Metrics

Raft exports metrics on a regular interval in order to facilitate maximum observability and integration with metrics aggregations tools (eg, prometheus, influx &c) using the RaftMetrics type.

The Raft instance constructor expects a Recipient<RaftMetrics> (an actix::Recipient) to be supplied, and will use this recipient to export its metrics. The RaftMetrics type holds the baseline metrics on the state of the Raft node the metrics are coming from, its current role in the cluster, its current membership config, as well as information on the Raft log and the last index to be applied to the state machine.

Applications may use this data in whatever way is needed. The obvious use cases are to expose these metrics to a metrics collection system. Applications may also use this data to trigger events within higher levels of the parent application.

Metrics will be exported at a regular interval according to the Config.metrics_rate value, but will also emit a new metrics record any time the state of the Raft node changes, the membership_config changes, or the current_leader changes.

Get To It

In my very humble opinion, Rust is an outstanding language for building new databases, data stores, messaging systems and all of the other things. Books have already been written about this, so I will just say in conclusion:

  • We don't need to keep allocating all of the memory of all of the clouds to the JVM.
  • We ABSOLUTELY need to move away from all of the memory safety issues which have compromised countless systems.
  • Raft is a robust and powerful consensus protocol.
  • Huge companies have invested a lot of money in creating actor-based languages specifically for creating database systems (think FoundationDB), and we already have Actix (thanks Nikolay)!

There are plenty of awesome things to build! Let's do this!