Raft Consensus and the Coordinator — Distributed Metadata with HashiCorp Raft
The cluster needs a single source of truth for metadata — which topics exist, who leads each topic, and which replicas are in sync. In this section, you'll integrate HashiCorp Raft for consensus. You will build a Coordinator (manages the Raft node lifecycle), a Finite State Machine (FSM) (applies committed metadata events), and a LogStore adapter (uses your segment-based log as Raft's persistent storage). All code lives in coordinator/.
Step 1: Understand the Coordinator role
What the Coordinator does
The Coordinator is the bridge between Raft and the rest of the system:
- Sets up Raft — configures the Raft node with the FSM, log store, stable store, and transport.
- Joins and leaves — adds or removes nodes from the Raft cluster (called by Serf's discovery handler).
- Applies events — proposes metadata events (topic creation, ISR updates, leader changes) to the Raft log.
- Queries — answers "who is the Raft leader?" and "which nodes are in the cluster?"
Step 2: Define the MetadataStore interface
Interface for applying events
The FSM delegates to a MetadataStore (implemented by TopicManager) that knows how to handle each metadata event:
// coordinator/metadata.go
package coordinator
import "github.com/mohitkumar/mlog/protocol"
type MetadataStore interface {
Apply(ev *protocol.MetadataEvent) error
Restore(data []byte) error
}
Apply— process a single committed event (e.g., create a topic, update ISR). Takes a pointer toMetadataEvent.Restore— rebuild state from a Raft snapshot.
Note there is no Snapshot() method on this interface. The FSM snapshots the metadata store by directly JSON-marshaling it (json.Marshal(c.metadataStore)), so the TopicManager just needs to be JSON-serializable.
Step 3: Create the Coordinator struct
Coordinator structure and compile-time assertion
// coordinator/coordinator.go
package coordinator
var _ discovery.Handler = (*Coordinator)(nil)
const (
SnapshotThreshold = 10000
SnapshotInterval = 10
RetainSnapshotCount = 10
)
// MemberLister returns information about cluster members currently alive (as seen by Serf).
// Used by the Coordinator to reconcile Raft voters with Serf membership.
type MemberLister interface {
AliveMembers() []string
AliveNodeDetails() []discovery.NodeInfo
}
type Coordinator struct {
mu sync.RWMutex
Logger *zap.Logger
raft *raft.Raft
cfg config.Config
metadataStore MetadataStore
memberLister MemberLister
}
The var _ discovery.Handler = (*Coordinator)(nil) line is a compile-time assertion that Coordinator implements the discovery.Handler interface (the Join and Leave methods from the previous section).
The struct holds the full config.Config (not just RaftConfig) because methods like EnsureSelfInMetadata need the RPC address, and Join needs to apply node-add events with both Raft and RPC addresses.
Step 4: Set up Raft
Raft initialization
The constructor creates the FSM, sets up Raft, and returns the coordinator:
func NewCoordinatorFromConfig(cfg config.Config, metadataStore MetadataStore,
logger *zap.Logger) (*Coordinator, error) {
if logger == nil {
logger = zap.NewNop()
}
fsm, err := NewCoordinatorFSM(cfg.RaftConfig.Dir, metadataStore)
if err != nil {
return nil, err
}
raftNode, err := setupRaft(fsm, cfg.RaftConfig)
if err != nil {
return nil, err
}
rpcAddr, err := cfg.RPCAddr()
if err != nil {
return nil, err
}
c := &Coordinator{
Logger: logger,
raft: raftNode,
cfg: cfg,
metadataStore: metadataStore,
}
c.Logger.Info("coordinator started",
zap.String("raft_addr", cfg.RaftConfig.Address),
zap.String("rpc_addr", rpcAddr))
go c.runLeadershipWatcher()
return c, nil
}
The Raft setup itself is a standalone function (not a method) because it only needs the FSM and Raft config:
Raft configuration and storage setup
func setupRaft(fsm raft.FSM, cfg config.RaftConfig) (*raft.Raft, error) {
raftBindAddr := cfg.Address
if cfg.BindAddress != "" {
raftBindAddr = cfg.BindAddress
}
raftAdvertiseAddr := cfg.Address
raftConfig := raft.DefaultConfig()
raftConfig.SnapshotThreshold = uint64(SnapshotThreshold)
raftConfig.SnapshotInterval = time.Duration(SnapshotInterval) * time.Second
raftConfig.LocalID = raft.ServerID(cfg.ID)
raftConfig.LogLevel = cfg.LogLevel
advertiseAddr, err := net.ResolveTCPAddr("tcp", raftAdvertiseAddr)
if err != nil {
return nil, fmt.Errorf("failed to resolve Raft advertise address %s: %w",
raftAdvertiseAddr, err)
}
transport, err := raft.NewTCPTransport(raftBindAddr, advertiseAddr, 3,
10*time.Second, os.Stderr)
if err != nil {
return nil, fmt.Errorf("failed to make TCP transport bind %s advertise %s: %w",
raftBindAddr, raftAdvertiseAddr, err)
}
snapshots, err := raft.NewFileSnapshotStore(cfg.Dir, RetainSnapshotCount, os.Stderr)
if err != nil {
return nil, fmt.Errorf("failed to create snapshot store at %s: %w", cfg.Dir, err)
}
boltDB, err := raftboltdb.NewBoltStore(filepath.Join(cfg.Dir, "raft.db"))
if err != nil {
return nil, fmt.Errorf("failed to create bolt store: %w", err)
}
logStore, err := NewLogStore(cfg.Dir)
if err != nil {
return nil, fmt.Errorf("failed to create log store: %w", err)
}
ra, err := raft.NewRaft(raftConfig, fsm, logStore, boltDB, snapshots, transport)
if err != nil {
return nil, errs.ErrNewRaft(err)
}
if cfg.Boostatrap {
configuration := raft.Configuration{
Servers: []raft.Server{
{
ID: raftConfig.LocalID,
Address: transport.LocalAddr(),
},
},
}
if err := ra.BootstrapCluster(configuration).Error(); err != nil {
return nil, errs.ErrBootstrapCluster(err)
}
}
return ra, nil
}
Walk through the setup:
1. Bind vs. Advertise address. In Docker, the Raft node binds to 0.0.0.0:9093 (so connections from any interface are accepted) but advertises node1:9093 (so other nodes can reach it by hostname). cfg.BindAddress is the listen address; cfg.Address is what others use.
2. Raft config. Snapshots are taken every 10 seconds if at least 10,000 entries have been committed. LogLevel comes from the config (typically "ERROR" to reduce noise).
3. Transport. Raft uses its own TCP transport, separate from our application transport. The advertiseAddr is resolved to a *net.TCPAddr and passed to NewTCPTransport. The 3 is the max connection pool size; 10*time.Second is the connection timeout.
4. Stores. Five components Raft needs:
| Component | Our implementation |
|---|---|
| FSM | MetadataFSM — deserializes metadata events, calls MetadataStore.Apply() |
| LogStore | logStore — adapter over our segment-based log.Log (converts 1-based Raft indices to 0-based offsets) |
| StableStore | BoltDB (raft.db) — stores Raft's current term and voted-for |
| SnapshotStore | File-based — retains up to 10 snapshots |
| Transport | TCP — Raft's own transport for AppendEntries, RequestVote, etc. |
5. Bootstrap. If cfg.Boostatrap is true (note the typo in the field name — it's in the config struct), bootstrap a single-node Raft cluster. The bootstrap result's error is checked immediately. Only one node should bootstrap; otherwise you get split-brain.
Step 5: Implement the FSM
FSM struct and Apply method
When Raft commits an entry, the FSM's Apply method is called. It deserializes the metadata event and delegates to the MetadataStore:
// coordinator/fsm.go
var _ raft.FSM = (*MetadataFSM)(nil)
type MetadataFSM struct {
mu sync.RWMutex
metadataStore MetadataStore
BaseDir string
}
func NewCoordinatorFSM(baseDir string, metadataStore MetadataStore) (*MetadataFSM, error) {
return &MetadataFSM{
metadataStore: metadataStore,
BaseDir: baseDir,
}, nil
}
func (c *MetadataFSM) Apply(l *raft.Log) interface{} {
var metadataEvent protocol.MetadataEvent
if err := json.Unmarshal(l.Data, &metadataEvent); err != nil {
return err
}
c.mu.Lock()
defer c.mu.Unlock()
return c.metadataStore.Apply(&metadataEvent)
}
This is the core of the Raft integration. Every node in the cluster receives the same committed entries in the same order and applies them to its MetadataStore. This guarantees all nodes have a consistent view of cluster metadata.
The FSM holds its own sync.RWMutex to protect the metadata store during Apply (write lock) and Snapshot (read lock). This ensures snapshots see a consistent state even while new entries are being applied.
FSM snapshotting and restoration
Snapshots compact the Raft log. Instead of replaying thousands of entries, a new node (or a node that fell behind) can load a snapshot and then replay only recent entries:
func (c *MetadataFSM) Snapshot() (raft.FSMSnapshot, error) {
c.mu.RLock()
defer c.mu.RUnlock()
data, err := json.Marshal(c.metadataStore)
if err != nil {
return nil, err
}
return &metadataSnapshot{data: data}, nil
}
func (c *MetadataFSM) Restore(r io.ReadCloser) error {
defer r.Close()
data, err := io.ReadAll(r)
if err != nil {
return err
}
return c.metadataStore.Restore(data)
}
The snapshot is taken under the read lock to get a consistent view of the metadata store. The metadataStore must be JSON-serializable (which TopicManager is — it has exported fields with json tags).
The snapshot struct is simple — it holds the serialized bytes and writes them to the Raft sink:
var _ raft.FSMSnapshot = (*metadataSnapshot)(nil)
type metadataSnapshot struct {
data []byte
}
func (s *metadataSnapshot) Persist(sink raft.SnapshotSink) error {
if _, err := sink.Write(s.data); err != nil {
sink.Cancel()
return err
}
return sink.Close()
}
func (s *metadataSnapshot) Release() {}
Step 6: Implement the LogStore adapter
LogStore overview
Raft needs a persistent log store. Instead of using another library, we adapt our own segment-based log. The key challenge: Raft uses 1-based indices while our log uses 0-based offsets.
LogStore struct with embedded Log
The logStore embeds *log.Log (not wraps it in a field), so it inherits all of the log's methods directly:
// coordinator/logstore.go
var _ raft.LogStore = (*logStore)(nil)
type logStore struct {
*log.Log
}
func NewLogStore(dir string) (*logStore, error) {
log, err := log.NewLog(filepath.Join(dir, "__metadata__.log"))
if err != nil {
return nil, err
}
return &logStore{
Log: log,
}, nil
}
The log directory is filepath.Join(dir, "__metadata__.log") — a subdirectory named __metadata__.log under the Raft data directory. This keeps Raft's log entries separate from topic data.
Implementing LogStore interface methods
func (l *logStore) FirstIndex() (uint64, error) {
if l.IsEmpty() {
return 0, nil
}
return l.LowestOffset() + 1, nil
}
func (l *logStore) LastIndex() (uint64, error) {
if l.IsEmpty() {
return 0, nil
}
return l.HighestOffset() + 1, nil
}
The conversion is straightforward: Raft index = log offset + 1.
const segmentOffsetPrefix = 8
func (l *logStore) GetLog(index uint64, out *raft.Log) error {
if index < 1 {
return errs.ErrRaftLogIndex(index)
}
offset := index - 1
in, err := l.Read(offset)
if err != nil {
return err
}
if len(in) < segmentOffsetPrefix {
return errs.ErrLogRecordTooShort(offset)
}
payload := in[segmentOffsetPrefix:]
if err := json.Unmarshal(payload, out); err != nil {
return err
}
out.Index = index
return nil
}
GetLog has an important detail: our segment Read returns the full record including the 8-byte offset prefix ([Offset 8 bytes][Value]). The segmentOffsetPrefix constant skips those 8 bytes to get the actual JSON payload. After unmarshaling, it sets out.Index to the Raft index (ensuring consistency even if the stored data doesn't include the index).
func (l *logStore) StoreLog(log *raft.Log) error {
data, err := json.Marshal(log)
if err != nil {
return err
}
_, err = l.Append(data)
if err != nil {
return err
}
return nil
}
func (l *logStore) StoreLogs(logs []*raft.Log) error {
for _, log := range logs {
if err := l.StoreLog(log); err != nil {
return err
}
}
return nil
}
func (l *logStore) DeleteRange(min, max uint64) error {
if max < 1 {
return nil
}
return l.Truncate(max - 1)
}
DeleteRange truncates at max - 1 (converting the 1-based Raft index to a 0-based offset). The Truncate method on our log removes all segments up to and including the given offset, which is what Raft needs when compacting old entries after a snapshot.
Why use our own log? It works and avoids adding another storage dependency. The segment-based log handles append, read by offset, and truncation — exactly what Raft needs.
Step 7: Implement cluster join and leave
Join method with configuration cleanup
When Serf detects a new member, it calls Join. The coordinator checks for stale configurations, adds the node to Raft, and applies an AddNode metadata event:
func (c *Coordinator) Join(id, raftAddr, rpcAddr string) error {
if !c.IsLeader() {
c.Logger.Error("not leader, skipping join",
zap.String("joining_node_id", id),
zap.String("raft_addr", raftAddr),
zap.String("rpc_addr", rpcAddr))
return nil
}
c.Logger.Info("join requested",
zap.String("joining_node_id", id),
zap.String("raft_addr", raftAddr),
zap.String("rpc_addr", rpcAddr))
configFuture := c.raft.GetConfiguration()
if err := configFuture.Error(); err != nil {
return err
}
serverID := raft.ServerID(id)
serverAddr := raft.ServerAddress(raftAddr)
for _, srv := range configFuture.Configuration().Servers {
if srv.ID == serverID || srv.Address == serverAddr {
if srv.ID == serverID && srv.Address == serverAddr {
return nil
}
removeFuture := c.raft.RemoveServer(serverID, 0, 5*time.Second)
if err := removeFuture.Error(); err != nil {
return err
}
}
}
addFuture := c.raft.AddVoter(serverID, serverAddr, 0, 5*time.Second)
if err := addFuture.Error(); err != nil {
c.Logger.Error("raft add voter failed", zap.Error(err), zap.String("node_id", id))
return err
}
if err := c.ApplyNodeAddEvent(id, raftAddr, rpcAddr); err != nil {
c.Logger.Error("failed to apply node add event",
zap.Error(err), zap.String("node_id", id))
return err
}
c.Logger.Info("node joined cluster",
zap.String("joined_node_id", id),
zap.String("raft_addr", raftAddr),
zap.String("rpc_addr", rpcAddr))
return nil
}
The join logic handles a subtle edge case: a node that restarted with a different address. If the Raft configuration already has a server with the same ID but a different address (or same address but different ID), it removes the stale entry first, then adds the new one. If both ID and address match, the node is already in the cluster — return immediately.
The timeout for AddVoter and RemoveServer is 5 seconds (not 30). This keeps join operations fast — if Raft can't commit within 5 seconds, something is wrong.
Leave method
func (c *Coordinator) Leave(id string) error {
if !c.IsLeader() {
c.Logger.Error("not leader, skipping leave", zap.String("leaving_node_id", id))
return nil
}
c.Logger.Info("leave requested", zap.String("leaving_node_id", id))
removeFuture := c.raft.RemoveServer(raft.ServerID(id), 0, 5*time.Second)
if err := removeFuture.Error(); err != nil {
c.Logger.Error("raft remove server failed",
zap.Error(err), zap.String("node_id", id))
return err
}
if err := c.ApplyNodeRemoveEvent(id); err != nil {
c.Logger.Error("failed to apply node remove event",
zap.Error(err), zap.String("node_id", id))
return err
}
c.Logger.Info("node left cluster", zap.String("left_node_id", id))
return nil
}
Both Join and Leave are no-ops on non-leader nodes (returning nil, not an error). Only the Raft leader can modify cluster membership. Non-leader nodes still receive the membership events from Serf, but they let the leader handle it.
Ensuring self is in metadata
The bootstrap node needs to add itself to the metadata store. Since it's the first node, there is no Serf join event for it — it needs to explicitly add itself:
func (c *Coordinator) EnsureSelfInMetadata() error {
if c.raft.State() != raft.Leader {
return nil
}
rpcAddr, err := c.cfg.RPCAddr()
if err != nil {
return err
}
return c.ApplyNodeAddEvent(c.cfg.NodeConfig.ID, c.cfg.RaftConfig.Address, rpcAddr)
}
This is called during server startup after Raft is ready. It ensures the bootstrap node's metadata (RPC address, Raft address) is available for topic leader assignment.
Step 8: Apply metadata events
Typed event application methods
The coordinator provides typed methods for proposing events to the Raft log. Each method checks that this node is the Raft leader, serializes the event, and calls raft.Apply():
// coordinator/coordinator_events.go
func (c *Coordinator) ApplyCreateTopicEvent(topic string, replicaCount uint32,
leaderNodeID string, replicaNodeIds []string) error {
if c.raft.State() != raft.Leader {
c.Logger.Debug("not leader, skipping create topic event",
zap.String("topic", topic))
return nil
}
eventData, err := json.Marshal(protocol.CreateTopicEvent{
Topic: topic,
ReplicaCount: replicaCount,
LeaderNodeID: leaderNodeID,
LeaderEpoch: 1,
ReplicaNodeIds: replicaNodeIds,
})
if err != nil {
return err
}
ev := &protocol.MetadataEvent{
EventType: protocol.MetadataEventTypeCreateTopic,
Data: eventData,
}
data, err := json.Marshal(ev)
if err != nil {
return err
}
c.Logger.Info("apply create topic event",
zap.String("topic", topic),
zap.String("leader_node_id", leaderNodeID))
f := c.raft.Apply(data, 5*time.Second)
if err := f.Error(); err != nil {
c.Logger.Error("raft apply create topic failed",
zap.Error(err), zap.String("topic", topic))
return errs.ErrRaftApply(err)
}
return nil
}
Notice the pattern — every apply method:
- Checks
raft.State() != raft.Leaderand returnsnilif not leader. - Marshals the typed event data (e.g.,
CreateTopicEvent). - Wraps it in a
MetadataEventwith the event type. - Double-marshals: the outer
MetadataEventis what goes into the Raft log. - Calls
raft.Apply()with a 5-second timeout. - Wraps errors with
errs.ErrRaftApply()for consistent error reporting.
The other apply methods follow the same pattern:
func (c *Coordinator) ApplyDeleteTopicEventInternal(topic string) error {
// ... same pattern, marshals protocol.DeleteTopicEvent{Topic: topic}
}
func (c *Coordinator) ApplyNodeAddEvent(nodeID, addr, rpcAddr string) error {
// ... marshals protocol.AddNodeEvent{NodeID: nodeID, Addr: addr, RpcAddr: rpcAddr}
}
func (c *Coordinator) ApplyNodeRemoveEvent(nodeID string) error {
// ... marshals protocol.RemoveNodeEvent{NodeID: nodeID}
}
func (c *Coordinator) ApplyLeaderChangeEvent(topic, leaderNodeID string,
leaderEpoch int64) error {
// ... marshals protocol.LeaderChangeEvent{Topic, LeaderNodeID, LeaderEpoch}
}
The ISR update method has special error handling for shutdown and leadership-change scenarios:
func (c *Coordinator) ApplyIsrUpdateEventInternal(topic, replicaNodeID string,
isr bool) error {
if c.raft.State() != raft.Leader {
c.Logger.Debug("not leader, skipping ISR update event",
zap.String("topic", topic))
return nil
}
eventData, err := json.Marshal(protocol.IsrUpdateEvent{
Topic: topic,
ReplicaNodeID: replicaNodeID,
Isr: isr,
})
if err != nil {
return err
}
ev := &protocol.MetadataEvent{
EventType: protocol.MetadataEventTypeIsrUpdate,
Data: eventData,
}
data, err := json.Marshal(ev)
if err != nil {
return err
}
f := c.raft.Apply(data, 5*time.Second)
if err := f.Error(); err != nil {
msg := err.Error()
if strings.Contains(msg, "shutdown") || strings.Contains(msg, "leadership lost") {
c.Logger.Debug("raft apply ISR update failed (shutdown or leadership change)",
zap.Error(err))
} else {
c.Logger.Error("raft apply ISR update failed", zap.Error(err))
}
return errs.ErrRaftApply(err)
}
return nil
}
ISR updates happen frequently (every replication cycle) and are expected to fail during leadership transitions. The special handling logs these at Debug level instead of Error to avoid flooding the logs during normal Raft elections.
The flow: Coordinator.ApplyCreateTopicEvent() → serializes → raft.Apply() → Raft replicates to majority → Raft calls FSM.Apply() on all nodes → MetadataStore.Apply() updates in-memory state.
Step 9: Implement query and leadership methods
Query methods
func (c *Coordinator) IsLeader() bool {
return c.raft.State() == raft.Leader
}
func (c *Coordinator) GetRaftLeaderNodeID() (string, error) {
_, id := c.raft.LeaderWithID()
return string(id), nil
}
func (c *Coordinator) RaftServerIDs() ([]string, error) {
f := c.raft.GetConfiguration()
if err := f.Error(); err != nil {
return nil, err
}
ids := make([]string, 0, len(f.Configuration().Servers))
for _, s := range f.Configuration().Servers {
ids = append(ids, string(s.ID))
}
return ids, nil
}
RaftServerIDs returns an error (unlike the tutorial's previous version) since GetConfiguration() can fail if Raft is shutting down.
Waiting for Raft readiness
On startup, Raft needs time to elect a leader. The server startup code calls WaitforRaftReady to block until a leader exists:
func (c *Coordinator) WaitforRaftReady(timeout time.Duration) error {
timeoutc := time.After(timeout)
ticker := time.NewTicker(time.Second)
for {
select {
case <-timeoutc:
return fmt.Errorf("timed out waiting for raft ready")
case <-ticker.C:
c.Logger.Info("waiting for raft ready",
zap.String("leader", string(c.raft.Leader())))
if c.raft.Leader() != "" {
return nil
}
}
}
}
func (c *Coordinator) IsRaftReady() bool {
return c.raft.Leader() != ""
}
The timeout version is used during startup; IsRaftReady is used for health checks.
Leadership watcher for voter reconciliation
Leadership watcher runs periodically to reconcile the serf membership with raft cluster members.
// runLeadershipWatcher watches for Raft leadership transitions and runs periodic
// reconciliation. When this node becomes leader, it reconciles the Raft voter list
// against Serf alive members after a short delay. It also runs reconciliation
// periodically to catch any missed joins or leaves.
func (c *Coordinator) runLeadershipWatcher() {
leaderCh := c.raft.LeaderCh()
reconcileTicker := time.NewTicker(30 * time.Second)
defer reconcileTicker.Stop()
for {
select {
case isLeader, ok := <-leaderCh:
if !ok {
return // raft shut down
}
if isLeader {
// Delay so Serf has time to propagate failure events for the old leader.
time.Sleep(5 * time.Second)
c.reconcileRaftVoters()
}
case <-reconcileTicker.C:
if c.IsLeader() {
c.reconcileRaftVoters()
}
}
}
}
// reconcileRaftVoters ensures Raft voters and Serf alive members are in sync.
// It removes Raft voters that are no longer alive in Serf, and adds Serf alive
// members that are missing from the Raft voter list.
func (c *Coordinator) reconcileRaftVoters() {
if !c.IsLeader() {
return
}
c.mu.RLock()
ml := c.memberLister
c.mu.RUnlock()
if ml == nil {
return
}
details := ml.AliveNodeDetails()
aliveSet := make(map[string]discovery.NodeInfo, len(details))
for _, info := range details {
aliveSet[info.Name] = info
}
f := c.raft.GetConfiguration()
if err := f.Error(); err != nil {
c.Logger.Warn("reconcile: failed to get raft config", zap.Error(err))
return
}
localID := c.cfg.RaftConfig.ID
raftSet := make(map[string]struct{})
for _, server := range f.Configuration().Servers {
id := string(server.ID)
raftSet[id] = struct{}{}
if id == localID {
continue // never remove self
}
if _, ok := aliveSet[id]; !ok {
c.Logger.Info("reconcile: removing stale raft voter", zap.String("node_id", id))
if err := c.Leave(id); err != nil {
c.Logger.Warn("reconcile: leave failed", zap.String("node_id", id), zap.Error(err))
}
}
}
// Add Serf alive members that are missing from Raft voters.
for _, info := range details {
if info.Name == localID {
continue
}
if _, ok := raftSet[info.Name]; !ok {
c.Logger.Info("reconcile: adding missing raft voter",
zap.String("node_id", info.Name),
zap.String("raft_addr", info.RaftAddr))
if err := c.Join(info.Name, info.RaftAddr, info.RpcAddr); err != nil {
c.Logger.Warn("reconcile: join failed", zap.String("node_id", info.Name), zap.Error(err))
}
}
}
}
Shutdown
func (c *Coordinator) Shutdown() error {
c.Logger.Info("coordinator shutting down")
f := c.raft.Shutdown()
return f.Error()
}
Step 10: Understand the complete integration
How it all connects
Serf event: member join
│
▼
Coordinator.Join(id, raftAddr, rpcAddr)
│
├── Check for stale server config (same ID, different addr)
│ └── RemoveServer(staleID) if needed
│
├── raft.AddVoter(id, raftAddr) ← Raft adds the node
│
└── ApplyNodeAddEvent(id, raftAddr, rpcAddr) ← metadata event
│
▼
Raft replicates to majority
│
▼
FSM.Apply() on ALL nodes
│
▼
MetadataStore.Apply(AddNodeEvent)
│
▼
TopicManager records node's addresses
For topic creation:
Client: CreateTopicRequest → RPC → TopicManager.CreateTopic()
│
▼
TopicManager picks leader node and replicas
│
▼
Coordinator.ApplyCreateTopicEvent(name, replicaCount, leader, replicas)
│
▼
Raft replicates to majority
│
▼
FSM.Apply() on ALL nodes
│
▼
MetadataStore.Apply(CreateTopicEvent)
│
▼
All nodes create the Topic object, open logs if needed,
start replication threads for replicas
Step 11: Bootstrap the cluster
Cluster bootstrap sequence
When starting a fresh cluster:
- Node 1 starts with
--bootstrap. This bootstraps a single-node Raft cluster and elects itself leader.EnsureSelfInMetadata()adds node 1's addresses to the metadata store. - Node 2 starts with
--start-join-addr node1:gossip_port. Serf gossips the join. Node 1 (Raft leader) checks for stale config, callsAddVoter(node2), and applies anAddNodeevent. - Node 3 joins similarly.
- Now all three nodes are Raft voters and have consistent metadata.
Only one node should use --bootstrap. If multiple nodes bootstrap, you get split-brain.
Summary
| Component | Purpose |
|---|---|
| Coordinator | Manages the Raft node lifecycle. Join/Leave modify cluster membership with stale-config cleanup. Typed methods propose metadata events with 5s timeout. |
| MetadataFSM | Raft's FSM with its own sync.RWMutex. Apply() deserializes committed entries and calls MetadataStore.Apply(). Snapshot() JSON-marshals the metadata store under read lock. |
| logStore | Embeds *log.Log. Stores Raft entries in __metadata__.log subdirectory. Converts 1-based Raft indices to 0-based offsets. GetLog strips the 8-byte segment offset prefix. |
| MetadataStore | Interface implemented by TopicManager. Apply(*MetadataEvent) and Restore([]byte). |
| Apply methods | Each checks leader status, double-marshals (event data → MetadataEvent → bytes), calls raft.Apply() with 5s timeout, wraps errors with errs.ErrRaftApply. |
With Raft in place for metadata consensus, the next page builds the TopicManager — the application layer that manages topics, leaders, replicas, and the produce/consume workflow.