Skip to main content

Replication

🚧 EXPERIMENTAL 🚧

This feature is experimental and needs thorough testing before being production ready.
Please report any issues you encounter to the GitHub issue tracker.

Introduction​

Every data in FlowG is persisted in a BadgerDB store, which internally uses a Log-structured merge tree data-structure.

Every read‐write transaction in BadgerDB is assigned a unique, strictly increasing uint64 "commit timestamp" when it commits. All key‐value mutations in that transaction—whether a set or a delete—are stamped with that timestamp as their version.

When doing an incremental backup since some "version", BadgerDB scans the LSM tree and value logs for every entry whose version is superior to the given "version". After successfully writing the data, it returns the highest version observed which can then be used for the next incremental backup.

As explained in the consensus documentation, FlowG relies on the SWIM Protocol for node discovery. And most importantly, on the hashicorp/memberlist implementation.

Replication between nodes is achieved during memberlist's "TCP Push/Pull".

Node's local state​

FlowG has 3 replicated storages:

  • one for authentication/permissions related data
  • one for configuration (pipelines, transormers, ...)
  • one for actual logs

In a 4th, non replicated, storage, FlowG stores for each other node in the cluster the last known "version":

lastsync:auth:node1   = 1
lastsync:config:node1 = 1
lastsync:log:node1 = 2

lastsync:auth:node2 = 2
lastsync:config:node2 = 3
lastsync:log:node2 = 7

...

During a "TCP Push/Pull", the local state will be serialized as JSON and sent to all other nodes in the cluster:

{
"node_id": "node0",
"last_sync": {
"node1": {
"auth": 1,
"config": 1,
"log": 2
},
"node2": {
"auth": 2,
"config": 3,
"log": 7
}
}
}

Merging node states​

When a node receive from a remote node its state (aka: a "remote state"), it looks up itself to determine the "version" from which the incremental backup will be done:

last_sync = remote_state.last_sync[local_node_id]

Then, 3 HTTP requests are made to remote_state.node_id on its management interface:

POST http://<remote host>/cluster/sync/auth
X-FlowG-ClusterKey: ...
X-FlowG-NodeID: ...
Transfer-Encoding: chunked
Trailer: X-FlowG-Since

... incremental backup of auth storage ...

X-FlowG-Since: ...
POST http://<remote host>/cluster/sync/config
X-FlowG-ClusterKey: ...
X-FlowG-NodeID: ...
Transfer-Encoding: chunked
Trailer: X-FlowG-Since

... incremental backup of config storage ...

X-FlowG-Since: ...
POST http://<remote host>/cluster/sync/log
X-FlowG-ClusterKey: ...
X-FlowG-NodeID: ...
Transfer-Encoding: chunked
Trailer: X-FlowG-Since

... incremental backup of log storage ...

X-FlowG-Since: ...

Once the data has been saved, we register in the local state the new "version", read from the trailer HTTP header X-FlowG-Since.