Topic Management — Leaders, Replicas, and Metadata in a Distributed Log
The TopicManager is the application layer that ties everything together. In this section, you'll implement code that tracks all topics, their leaders, and replicas. You'll implement the MetadataStore interface so Raft-committed events update the cluster state. You'll handle produce and consume requests, manage node metadata, and coordinate replication. Code lives in topic/.
Step 1: Understand core types
Topic type
A Topic represents one named log with a leader and a set of replicas:
// topic/topic.go
package topic
type Topic struct {
mu sync.RWMutex `json:"-"`
Name string `json:"name"`
LeaderNodeID string `json:"leader_id"`
LeaderEpoch int64 `json:"leader_epoch"`
DesiredReplicaCount int `json:"desired_replica_count"`
Replicas map[string]*ReplicaState `json:"replicas"`
Log *log.LogManager `json:"-"`
Logger *zap.Logger `json:"-"`
}
Fields marked json:"-" are not serialized in Raft snapshots — they are local runtime state. Log is the local log (only opened if this node is leader or replica for the topic). DesiredReplicaCount is saved from the CreateTopic request and used to re-add replicas when nodes rejoin the cluster.
ReplicaState tracking
Tracks how far each replica has replicated:
type ReplicaState struct {
ReplicaNodeID string `json:"replica_id"`
LEO int64 `json:"leo"`
IsISR bool `json:"is_isr"`
}
LEO is int64 (not uint64) because it is updated from the leader's perspective when serving replication fetches. A value of 0 means the replica has not yet reported its position.
NodeMetadata for addresses
Each node's addresses for Raft and RPC communication:
type NodeMetadata struct {
mu sync.RWMutex `json:"-"`
NodeID string `json:"node_id"`
Addr string `json:"addr"`
RpcAddr string `json:"rpc_addr"`
}
Addr is the Raft address; RpcAddr is where clients and replication threads connect. Both are set when the node joins the cluster via the AddNode metadata event.
Step 2: Create the TopicManager struct
TopicManager structure
var _ coordinator.MetadataStore = (*TopicManager)(nil)
type TopicManager struct {
mu sync.RWMutex
Topics map[string]*Topic json:"topics"
BaseDir string json:"-"
Logger *zap.Logger json:"-"
Nodes map[string]*NodeMetadata json:"nodes"
CurrentNodeID string json:"-"
coordinator TopicCoordinator json:"-"
stopPeriodic chan struct{} json:"-"
stopReplication chan struct{} json:"-"
replicationBatchSize uint32 json:"-"
ISRLagThreshold uint64 json:"-"
}
The `var _ coordinator.MetadataStore = (*TopicManager)(nil)` line is a compile-time assertion that `TopicManager` implements the `MetadataStore` interface.
Fields with `json:"topics"` and `json:"nodes"` are serialized in Raft snapshots (the FSM's `Snapshot()` calls `json.Marshal(c.metadataStore)`). Everything else (`json:"-"`) is local runtime state.
## Step 3: Define the TopicCoordinator interface
### Interface for Raft interaction
```go
// topic/topic_coordinator.go
type TopicCoordinator interface {
ApplyCreateTopicEvent(topic string, replicaCount uint32,
leaderNodeID string, replicaNodeIds []string) error
ApplyDeleteTopicEventInternal(topic string) error
ApplyIsrUpdateEventInternal(topic, replicaNodeID string, isr bool) error
ApplyLeaderChangeEvent(topic, leaderNodeID string, leaderEpoch int64) error
IsLeader() bool
GetRaftLeaderNodeID() (string, error)
}
Each method matches the corresponding Coordinator method signature. This allows tests to use a fake coordinator without running real Raft.
Step 4: Create the constructor
Constructor and initialization
const ( defaultMetadataLogInterval = 30 * time.Second DefaultISRLagThreshold = uint64(100) )
func NewTopicManager(baseDir string, coord TopicCoordinator, logger *zap.Logger) (*TopicManager, error) { if logger == nil { logger = zap.NewNop() } tm := &TopicManager{ Topics: make(map[string]*Topic), BaseDir: baseDir, Logger: logger, Nodes: make(map[string]*NodeMetadata), coordinator: coord, stopPeriodic: make(chan struct{}), ISRLagThreshold: DefaultISRLagThreshold, } go tm.periodicLog(defaultMetadataLogInterval) tm.replicationBatchSize = DefaultReplicationBatchSize return tm, nil }
The constructor starts a `periodicLog` goroutine that logs the entire metadata state (as JSON) every 30 seconds. This is useful for debugging — you can see the cluster state in the logs.
The coordinator is passed in but can also be set later via `SetCoordinator()`. This supports the chicken-and-egg problem: the `TopicManager` is the `MetadataStore` for the `Coordinator`, but the `Coordinator` is the `TopicCoordinator` for the `TopicManager`.
### Coordinator setter and node ID setter
```go
func (tm *TopicManager) SetCoordinator(c TopicCoordinator) {
tm.mu.Lock()
defer tm.mu.Unlock()
tm.coordinator = c
}
func (tm *TopicManager) SetCurrentNodeID(nodeID string) {
tm.mu.Lock()
defer tm.mu.Unlock()
tm.CurrentNodeID = nodeID
}
Step 5: Create and manage topics
Topic creation flow
Topic creation goes through Raft so all nodes agree. The flow:
- Client sends
CreateTopicRequestto the Raft leader. - The leader picks which node leads the topic (least loaded) and which nodes host replicas.
- The leader proposes a
CreateTopicEventthrough Raft. - All nodes apply the event: create the
Topicobject, open local logs if this node participates.
func (tm *TopicManager) CreateTopic(ctx context.Context,
req *protocol.CreateTopicRequest) (*protocol.CreateTopicResponse, error) {
if tm.coordinator == nil {
return nil, fmt.Errorf("topic: no coordinator")
}
c := tm.coordinator
if !c.IsLeader() {
return nil, fmt.Errorf("create topic must be sent to Raft leader: %w",
errs.ErrCannotReachLeader)
}
tm.mu.RLock()
_, exists := tm.Topics[req.Topic]
tm.mu.RUnlock()
if exists {
return nil, errs.ErrTopicExistsf(req.Topic)
}
leaderNodeID, err := tm.GetNodeIDWithLeastTopics()
if err != nil {
return nil, err
}
replicaNodeIds, err := tm.pickReplicaNodeIds(leaderNodeID, int(req.ReplicaCount))
if err != nil {
return nil, errs.ErrCreateTopic(err)
}
tm.Logger.Info("create topic via Raft",
zap.String("topic", req.Topic),
zap.String("leader_node_id", leaderNodeID),
zap.Strings("replica_node_ids", replicaNodeIds))
if err := c.ApplyCreateTopicEvent(req.Topic, req.ReplicaCount,
leaderNodeID, replicaNodeIds); err != nil {
return nil, err
}
return &protocol.CreateTopicResponse{
Topic: req.Topic,
ReplicaNodeIds: replicaNodeIds,
}, nil
}
Least-loaded node selection strategy
The leader is the node with the fewest topics. Ties are broken lexicographically (smallest node ID wins) for determinism:
func (tm *TopicManager) GetNodeIDWithLeastTopics() (string, error) {
tm.mu.RLock()
defer tm.mu.RUnlock()
countByNode := make(map[string]int)
for _, node := range tm.Nodes {
if node != nil {
countByNode[node.NodeID] = 0
}
}
for _, t := range tm.Topics {
if t != nil && t.LeaderNodeID != "" {
countByNode[t.LeaderNodeID]++
}
}
if len(countByNode) == 0 {
return "", errs.ErrNoNodesInCluster
}
ids := make([]string, 0, len(countByNode))
for id := range countByNode {
ids = append(ids, id)
}
sort.Strings(ids)
bestID := ids[0]
minCount := countByNode[bestID]
for _, id := range ids[1:] {
if c := countByNode[id]; c < minCount {
minCount = c
bestID = id
}
}
return bestID, nil
}
Picking replicas
func (tm *TopicManager) pickReplicaNodeIds(leaderNodeID string,
replicaCount int) ([]string, error) {
tm.mu.RLock()
var otherNodes []*NodeMetadata
for _, node := range tm.Nodes {
if node != nil && node.NodeID != leaderNodeID {
otherNodes = append(otherNodes, node)
}
}
tm.mu.RUnlock()
if len(otherNodes) < replicaCount {
return nil, errs.ErrNotEnoughNodesf(replicaCount, len(otherNodes))
}
replicaNodeIds := make([]string, 0, replicaCount)
for i := 0; i < replicaCount; i++ {
replicaNodeIds = append(replicaNodeIds, otherNodes[i].NodeID)
}
return replicaNodeIds, nil
}
Step 6: Apply Raft metadata events
Implementing MetadataStore.Apply()
The TopicManager implements MetadataStore.Apply(). When Raft commits an event, every node processes it. The method holds the write lock for the entire switch, so all events are applied atomically:
func (tm *TopicManager) Apply(ev *protocol.MetadataEvent) error {
tm.mu.Lock()
defer tm.mu.Unlock()
switch ev.EventType {
case protocol.MetadataEventTypeCreateTopic:
e := protocol.CreateTopicEvent{}
if err := json.Unmarshal(ev.Data, &e); err != nil {
return err
}
tm.createTopicFromEvent(e.Topic, e.LeaderNodeID, e.LeaderEpoch, e.ReplicaNodeIds)
case protocol.MetadataEventTypeLeaderChange:
e := protocol.LeaderChangeEvent{}
if err := json.Unmarshal(ev.Data, &e); err != nil {
return err
}
t := tm.Topics[e.Topic]
if t != nil {
oldLeaderID := t.LeaderNodeID
t.LeaderNodeID = e.LeaderNodeID
t.LeaderEpoch = e.LeaderEpoch
delete(t.Replicas, e.LeaderNodeID)
if oldLeaderID != e.LeaderNodeID && oldLeaderID != "" &&
tm.Nodes[oldLeaderID] != nil {
if t.Replicas == nil {
t.Replicas = make(map[string]*ReplicaState)
}
t.Replicas[oldLeaderID] = &ReplicaState{
ReplicaNodeID: oldLeaderID,
LEO: 0,
IsISR: false,
}
}
tm.ensureLocalLogAfterLeaderChange(e.Topic, oldLeaderID, e.LeaderNodeID)
}
case protocol.MetadataEventTypeIsrUpdate:
e := protocol.IsrUpdateEvent{}
if err := json.Unmarshal(ev.Data, &e); err != nil {
return err
}
t := tm.Topics[e.Topic]
if t != nil {
rs := t.Replicas[e.ReplicaNodeID]
if rs != nil {
rs.IsISR = e.Isr
} else {
if t.Replicas == nil {
t.Replicas = make(map[string]*ReplicaState)
}
t.Replicas[e.ReplicaNodeID] = &ReplicaState{
ReplicaNodeID: e.ReplicaNodeID,
LEO: 0,
IsISR: e.Isr,
}
}
tm.maybeAdvanceHW(t)
}
case protocol.MetadataEventTypeDeleteTopic:
e := protocol.DeleteTopicEvent{}
if err := json.Unmarshal(ev.Data, &e); err != nil {
return err
}
tm.deleteTopicFromEvent(e.Topic)
case protocol.MetadataEventTypeAddNode:
e := protocol.AddNodeEvent{}
if err := json.Unmarshal(ev.Data, &e); err != nil {
return err
}
tm.Nodes[e.NodeID] = &NodeMetadata{
NodeID: e.NodeID,
Addr: e.Addr,
RpcAddr: e.RpcAddr,
}
tm.maybeAddReplicasForNode(e.NodeID)
case protocol.MetadataEventTypeRemoveNode:
e := protocol.RemoveNodeEvent{}
if err := json.Unmarshal(ev.Data, &e); err != nil {
return err
}
delete(tm.Nodes, e.NodeID)
tm.maybeReassignTopicLeaders(e.NodeID)
}
return nil
}
Let's walk through the key event handlers.
Creating topics from committed events
When the CreateTopicEvent is applied, each node creates the Topic object and opens a local log if it participates:
func (tm *TopicManager) createTopicFromEvent(topicName, leaderNodeID string,
leaderEpoch int64, replicaNodeIds []string) {
if _, exists := tm.Topics[topicName]; exists {
return // idempotent
}
t := &Topic{
Name: topicName,
LeaderNodeID: leaderNodeID,
LeaderEpoch: leaderEpoch,
DesiredReplicaCount: len(replicaNodeIds),
Replicas: make(map[string]*ReplicaState),
Logger: tm.Logger,
}
for _, replica := range replicaNodeIds {
t.Replicas[replica] = &ReplicaState{
ReplicaNodeID: replica,
LEO: 0,
IsISR: true,
}
}
tm.Topics[topicName] = t
tm.ensureLocalLogForTopic(topicName, leaderNodeID, replicaNodeIds)
}
New replicas start with IsISR: true. The ISR flag is updated as replicas catch up (or fall behind).
The ensureLocalLogForTopic helper opens the log directory if this node is the leader or a replica:
func (tm *TopicManager) ensureLocalLogForTopic(topicName, leaderNodeID string,
replicaNodeIds []string) {
t := tm.Topics[topicName]
if t == nil {
return
}
if tm.CurrentNodeID == leaderNodeID {
if t.Log == nil {
logManager, err := log.NewLogManager(
filepath.Join(tm.BaseDir, topicName))
if err != nil {
tm.Logger.Warn("open leader log failed",
zap.String("topic", topicName), zap.Error(err))
return
}
t.Log = logManager
}
return
}
for _, rid := range replicaNodeIds {
if rid == tm.CurrentNodeID {
if t.Log == nil {
logManager, err := log.NewLogManager(
filepath.Join(tm.BaseDir, topicName))
if err != nil {
tm.Logger.Warn("open replica log failed",
zap.String("topic", topicName), zap.Error(err))
return
}
t.Log = logManager
}
return
}
}
}
Nodes that are neither leader nor replica for the topic don't open a log — they just hold the metadata in memory.
Adding and removing nodes
When a node joins, its addresses are recorded and it is added as a replica for under-replicated topics:
case protocol.MetadataEventTypeAddNode:
// ... unmarshal ...
tm.Nodes[e.NodeID] = &NodeMetadata{
NodeID: e.NodeID,
Addr: e.Addr,
RpcAddr: e.RpcAddr,
}
tm.maybeAddReplicasForNode(e.NodeID)
func (tm *TopicManager) maybeAddReplicasForNode(nodeID string) {
for _, t := range tm.Topics {
if t == nil || t.LeaderNodeID == nodeID {
continue
}
if _, already := t.Replicas[nodeID]; already {
continue
}
if len(t.Replicas) >= t.DesiredReplicaCount {
continue
}
if t.Replicas == nil {
t.Replicas = make(map[string]*ReplicaState)
}
t.Replicas[nodeID] = &ReplicaState{
ReplicaNodeID: nodeID,
LEO: 0,
IsISR: false,
}
tm.Logger.Info("added rejoined node as replica",
zap.String("topic", t.Name),
zap.String("node_id", nodeID),
)
}
}
This is how a crashed and restarted node automatically becomes a replica again — when it rejoins the cluster, the AddNode event triggers maybeAddReplicasForNode, which checks each topic's DesiredReplicaCount. New replicas start with IsISR: false and become in-sync after catching up.
Leader reassignment on failure
When a node is removed (crashed or left), topics it led need new leaders. This is called from Apply() (the FSM goroutine), so it cannot call raft.Apply() synchronously — that would deadlock. Instead, leader changes are applied asynchronously in a goroutine:
func (tm *TopicManager) maybeReassignTopicLeaders(nodeID string) {
if tm.coordinator == nil || !tm.coordinator.IsLeader() {
return
}
type leaderChange struct {
topic string
newLeader string
epoch int64
}
var changes []leaderChange
for topicName, t := range tm.Topics {
if t == nil || t.LeaderNodeID != nodeID {
continue
}
var newLeader string
for rid, rs := range t.Replicas {
if rid == nodeID || rs == nil || !rs.IsISR {
continue
}
n := tm.Nodes[rid]
if n == nil {
continue
}
newLeader = rid
break
}
if newLeader == "" {
tm.Logger.Warn("no ISR replica for leadership",
zap.String("topic", topicName),
zap.String("old_leader_node_id", nodeID))
continue
}
changes = append(changes, leaderChange{
topic: topicName,
newLeader: newLeader,
epoch: t.LeaderEpoch + 1,
})
}
if len(changes) == 0 {
return
}
// Apply leader changes asynchronously so the FSM's
// current Apply() can return first.
go func() {
for _, ch := range changes {
if err := tm.coordinator.ApplyLeaderChangeEvent(
ch.topic, ch.newLeader, ch.epoch); err != nil {
tm.Logger.Warn("leader change apply failed",
zap.String("topic", ch.topic), zap.Error(err))
}
}
}()
}
Key details:
- Only ISR replicas are considered for leadership (they have the most up-to-date data).
- The new leader's epoch is
t.LeaderEpoch + 1. Epoch tracking prevents stale leaders from accepting writes. - The
go func()is critical — without it, the FSM would deadlock becauseApplyLeaderChangeEventcallsraft.Apply(), which waits for the FSM to process the new entry, but the FSM is still blocked in the currentApply().
Leader change event handling
When a LeaderChangeEvent is applied, the old leader becomes a replica and the new leader's entry is removed from replicas:
case protocol.MetadataEventTypeLeaderChange:
// ...
t := tm.Topics[e.Topic]
if t != nil {
oldLeaderID := t.LeaderNodeID
t.LeaderNodeID = e.LeaderNodeID
t.LeaderEpoch = e.LeaderEpoch
// New leader is no longer a replica
delete(t.Replicas, e.LeaderNodeID)
// Old leader becomes a replica (if still alive)
if oldLeaderID != e.LeaderNodeID && oldLeaderID != "" &&
tm.Nodes[oldLeaderID] != nil {
t.Replicas[oldLeaderID] = &ReplicaState{
ReplicaNodeID: oldLeaderID,
LEO: 0,
IsISR: false,
}
}
tm.ensureLocalLogAfterLeaderChange(e.Topic, oldLeaderID, e.LeaderNodeID)
}
The check tm.Nodes[oldLeaderID] != nil prevents adding a dead node as a replica — if the old leader was removed, it was already cleaned up from Nodes by the RemoveNode handler.
Step 7: Handle produce requests with replication
Validating leadership and appending records
The topic manager validates that this node is the leader, appends to the log, and optionally waits for replication:
func (tm *TopicManager) HandleProduce(ctx context.Context, t *Topic,
logEntry *protocol.LogEntry, acks protocol.AckMode) (uint64, error) {
offset, err := t.Log.Append(logEntry.Value)
if err != nil {
return 0, err
}
switch acks {
case protocol.AckLeader:
return offset, nil
case protocol.AckAll:
if err := tm.waitForAllFollowersToCatchUp(ctx, t, offset); err != nil {
return 0, errs.ErrWaitFollowersCatchUp(err)
}
return offset, nil
default:
return 0, errs.ErrInvalidAckModef(int32(acks))
}
}
The HandleProduce method receives the *Topic directly (the RPC handler looks it up). AckLeader returns immediately after the local append. AckAll blocks until all ISR replicas have caught up.
Batch produce
func (tm *TopicManager) HandleProduceBatch(ctx context.Context, t *Topic,
values [][]byte, acks protocol.AckMode) (uint64, uint64, error) {
if len(values) == 0 {
return 0, 0, errs.ErrValuesEmpty
}
base, err := t.Log.AppendBatch(values)
if err != nil {
return 0, 0, err
}
last := base + uint64(len(values)) - 1
switch acks {
case protocol.AckLeader:
return base, last, nil
case protocol.AckAll:
if err := tm.waitForAllFollowersToCatchUp(ctx, t, last); err != nil {
return 0, 0, errs.ErrWaitFollowersCatchUp(err)
}
return base, last, nil
default:
return 0, 0, errs.ErrInvalidAckModef(int32(acks))
}
}
Returns both base and last offsets so the client knows the range of offsets written.
Waiting for replication (AckAll)
func (tm *TopicManager) waitForAllFollowersToCatchUp(ctx context.Context,
t *Topic, offset uint64) error {
timeout := time.After(5 * time.Second)
ticker := time.NewTicker(10 * time.Millisecond)
defer ticker.Stop()
requiredLEO := offset + 1
for {
allCaughtUp := true
candidates := 0
t.mu.RLock()
useISR := false
for _, r := range t.Replicas {
if r != nil && r.IsISR {
useISR = true
break
}
}
for _, replica := range t.Replicas {
if replica == nil {
continue
}
if useISR && !replica.IsISR {
continue
}
candidates++
if uint64(replica.LEO) < requiredLEO {
allCaughtUp = false
break
}
}
t.mu.RUnlock()
if candidates == 0 {
return nil
}
if allCaughtUp {
return nil
}
select {
case <-ticker.C:
continue
case <-ctx.Done():
return ctx.Err()
case <-timeout:
return errs.ErrTimeoutCatchUp
}
}
}
Key details:
requiredLEO = offset + 1— LEO is "next offset to write", so after writing at offset N, a replica is caught up when its LEO is at least N+1.- If any ISR replicas exist, only ISR replicas are checked. If no ISR replicas exist (all fell behind), the method returns immediately — no replicas to wait for.
- The 5-second timeout prevents indefinite blocking if a replica is down.
- Polls every 10ms, which is fast enough for low-latency produce while keeping CPU usage minimal.
Step 8: Advance the high watermark
Computing and setting HW
The high watermark (HW) is the minimum LEO across the leader and all ISR replicas. Consumers can only read up to the HW — this ensures they never see data that hasn't been replicated:
func (tm *TopicManager) maybeAdvanceHW(t *Topic) {
if t.Log == nil {
return
}
minOffset := t.Log.LEO()
for _, r := range t.Replicas {
if r != nil && r.IsISR && uint64(r.LEO) < minOffset {
minOffset = uint64(r.LEO)
}
}
t.Log.SetHighWatermark(minOffset)
}
Non-ISR replicas are excluded — a dead or lagging replica must not hold back consumer visibility.
Step 9: Track replica progress from fetch requests
Recording LEO and updating ISR
When the leader serves a fetch from a replica (the replica sets ReplicaNodeID in its FetchBatchRequest), the leader records the replica's LEO and updates its ISR status:
func (tm *TopicManager) RecordReplicaLEOFromFetch(ctx context.Context,
topicName, replicaNodeID string, leo int64) error {
tm.mu.Lock()
t := tm.Topics[topicName]
if t == nil {
tm.mu.Unlock()
return nil
}
leaderLEO := uint64(0)
if t.Log != nil {
leaderLEO = t.Log.LEO()
}
if t.Replicas == nil {
t.Replicas = make(map[string]*ReplicaState)
}
rs := t.Replicas[replicaNodeID]
if rs == nil {
t.Replicas[replicaNodeID] = &ReplicaState{
ReplicaNodeID: replicaNodeID,
LEO: leo,
IsISR: true,
}
rs = t.Replicas[replicaNodeID]
} else {
rs.LEO = leo
}
lagThreshold := tm.ISRLagThreshold
if lagThreshold == 0 {
lagThreshold = DefaultISRLagThreshold
}
var isr bool
if leaderLEO > lagThreshold {
isr = uint64(leo) >= leaderLEO-lagThreshold
} else {
isr = leo >= 0
}
rs.IsISR = isr
tm.mu.Unlock()
tm.maybeAdvanceHW(t)
if tm.coordinator == nil {
return nil
}
return tm.coordinator.ApplyIsrUpdateEventInternal(topicName, replicaNodeID, isr)
}
The ISR threshold (DefaultISRLagThreshold = 100) determines how far behind a replica can lag and still be considered in-sync. For small topics (LEO < threshold), all replicas are in-sync. After updating ISR status locally, the change is propagated cluster-wide via a Raft event.
Step 10: Provide replication helper methods
Methods for replication thread integration
The TopicManager provides methods used by the replication thread:
func (tm *TopicManager) ListReplicaTopics() []ReplicaTopicInfo {
tm.mu.RLock()
defer tm.mu.RUnlock()
var out []ReplicaTopicInfo
for name, t := range tm.Topics {
if t == nil || t.Log == nil || t.LeaderNodeID == tm.CurrentNodeID {
continue
}
out = append(out, ReplicaTopicInfo{
TopicName: name,
LeaderNodeID: t.LeaderNodeID,
})
}
return out
}
func (tm *TopicManager) ApplyRecordBatch(topicName string,
values [][]byte) error {
if len(values) == 0 {
return nil
}
tm.mu.RLock()
t := tm.Topics[topicName]
tm.mu.RUnlock()
if t == nil {
return nil
}
t.mu.RLock()
l := t.Log
t.mu.RUnlock()
if l == nil {
return nil
}
_, err := l.AppendBatch(values)
return err
}
ListReplicaTopics returns all topics where this node has a local log but is not the leader — those are the topics this node needs to replicate. ApplyRecordBatch appends fetched records to the local replica log.
Step 11: Serialize state via Raft snapshots
JSON-based snapshot and restore
For Raft snapshots, the TopicManager is directly JSON-marshaled (the FSM calls json.Marshal(c.metadataStore)). The fields with json tags (Topics, Nodes) are serialized.
func (tm *TopicManager) Restore(data []byte) error {
var decoded struct {
Topics map[string]*Topic `json:"topics"`
Nodes map[string]*NodeMetadata `json:"nodes"`
}
if err := json.Unmarshal(data, &decoded); err != nil {
return err
}
tm.mu.Lock()
defer tm.mu.Unlock()
tm.Topics = decoded.Topics
tm.Nodes = decoded.Nodes
for name, t := range tm.Topics {
if t == nil {
continue
}
t.Logger = tm.Logger
if tm.CurrentNodeID == t.LeaderNodeID {
if t.Log == nil {
logManager, err := log.NewLogManager(
filepath.Join(tm.BaseDir, name))
if err != nil {
tm.Logger.Warn("open leader log failed",
zap.String("topic", name), zap.Error(err))
continue
}
t.Log = logManager
}
tm.maybeAdvanceHW(t)
} else if _, isReplica := t.Replicas[tm.CurrentNodeID]; isReplica {
if t.Log == nil {
logManager, err := log.NewLogManager(
filepath.Join(tm.BaseDir, name))
if err != nil {
tm.Logger.Warn("open replica log failed",
zap.String("topic", name), zap.Error(err))
continue
}
t.Log = logManager
}
}
}
return nil
}
Restore replaces the in-memory state entirely from the snapshot. After restoring Topics and Nodes, it re-opens local logs for topics this node participates in and advances the HW for leader topics.
Step 12: Query metadata state
Querying topics, leaders, and nodes
func (tm *TopicManager) IsLeader(topic string) (bool, error) {
tm.mu.RLock()
defer tm.mu.RUnlock()
topicObj, ok := tm.Topics[topic]
if !ok {
return false, errs.ErrTopicNotFoundf(topic)
}
return topicObj.LeaderNodeID == tm.CurrentNodeID, nil
}
func (tm *TopicManager) GetTopicLeaderRPCAddr(topic string) (string, error) {
tm.mu.RLock()
defer tm.mu.RUnlock()
topicObj, ok := tm.Topics[topic]
if !ok {
return "", errs.ErrTopicNotFoundf(topic)
}
node := tm.Nodes[topicObj.LeaderNodeID]
if node == nil {
return "", errs.ErrTopicNotFoundf(topic)
}
return node.RpcAddr, nil
}
func (tm *TopicManager) GetRaftLeaderRPCAddr() (string, error) {
if tm.coordinator == nil {
return "", fmt.Errorf("topic: no coordinator")
}
leaderNodeID, err := tm.coordinator.GetRaftLeaderNodeID()
if err != nil {
return "", err
}
tm.mu.RLock()
node := tm.Nodes[leaderNodeID]
tm.mu.RUnlock()
if node == nil {
return "", fmt.Errorf("raft leader node %q not in metadata", leaderNodeID)
}
return node.RpcAddr, nil
}
func (tm *TopicManager) ListTopics() *protocol.ListTopicsResponse {
tm.mu.RLock()
defer tm.mu.RUnlock()
out := make([]protocol.TopicInfo, 0, len(tm.Topics))
for name, t := range tm.Topics {
if t == nil {
continue
}
t.mu.RLock()
replicas := make([]protocol.ReplicaInfo, 0, len(t.Replicas))
for _, rs := range t.Replicas {
if rs != nil {
replicas = append(replicas, protocol.ReplicaInfo{
NodeID: rs.ReplicaNodeID,
IsISR: rs.IsISR,
LEO: rs.LEO,
})
}
}
t.mu.RUnlock()
out = append(out, protocol.TopicInfo{
Name: name,
LeaderNodeID: t.LeaderNodeID,
LeaderEpoch: t.LeaderEpoch,
Replicas: replicas,
})
}
return &protocol.ListTopicsResponse{Topics: out}
}
ListTopics can be served by any node — metadata is replicated via Raft so all nodes have the same view.
Step 13: Understand the end-to-end integration
Complete workflow from request to replication
Client: CreateTopicRequest
│
▼
RPC handler → TopicManager.CreateTopic()
│
├── GetNodeIDWithLeastTopics() → picks leader
├── pickReplicaNodeIds() → picks replicas
│
▼
Coordinator.ApplyCreateTopicEvent(topic, replicaCount, leader, replicas)
│
▼
Raft replicates to majority
│
▼
FSM.Apply() on ALL nodes
│
▼
TopicManager.Apply(CreateTopicEvent)
│
├── createTopicFromEvent() → creates Topic object
└── ensureLocalLogForTopic() → opens log if leader/replica
For produce:
Client: ProduceRequest (topic, value, acks=AckAll)
│
▼
RPC handler → TopicManager.HandleProduce(topic, value, AckAll)
│
├── t.Log.Append(value) → writes to disk
│
└── waitForAllFollowersToCatchUp(offset)
│
└── polls every 10ms until:
- all ISR replicas have LEO > offset
- or 5s timeout
For node failure:
Node 2 crashes
│
▼
Serf detects failure
│
▼
Coordinator.Leave(node2)
│
├── raft.RemoveServer(node2)
└── ApplyNodeRemoveEvent(node2)
│
▼
FSM.Apply(RemoveNodeEvent)
│
├── delete(tm.Nodes, node2)
└── maybeReassignTopicLeaders(node2)
│
└── go func() {
ApplyLeaderChangeEvent(topic, newLeader, epoch+1)
}()
Summary
| Component | Purpose |
|---|---|
| Topic | One named log: leader, epoch, replicas, local LogManager. JSON-serializable for Raft snapshots. |
| TopicManager | Central state: all topics, all nodes, current node ID. Implements MetadataStore. |
| CreateTopic | Picks leader (least loaded, deterministic tie-breaking) and replicas, proposes via Raft. All nodes apply. |
| HandleProduce | Appends to leader log. AckLeader returns immediately. AckAll polls until ISR replicas catch up (5s timeout). |
| Apply() | Switches on event type. CreateTopic creates objects and opens logs. LeaderChange swaps leader/replica roles. AddNode adds replicas for under-replicated topics. RemoveNode triggers async leader reassignment. |
| maybeAdvanceHW | HW = min(leader LEO, all ISR LEOs). Non-ISR replicas excluded. |
| RecordReplicaLEOFromFetch | Updates replica LEO and ISR status when leader serves replication fetches. |
| Snapshot/Restore | Direct JSON marshal of Topics and Nodes. Restore re-opens local logs. |
With topic management in place, the next page covers replication — how follower replicas pull data from the leader and how the replication thread is organized.