Topic Management — Leaders, Replicas, and Metadata in a Distributed Log

Topic Management — Leaders, Replicas, and Metadata in a Distributed Log

The TopicManager is the application layer that ties everything together. In this section, you'll implement code that tracks all topics, their leaders, and replicas. You'll implement the MetadataStore interface so Raft-committed events update the cluster state. You'll handle produce and consume requests, manage node metadata, and coordinate replication. Code lives in topic/.

Step 1: Understand core types

Topic type

A Topic represents one named log with a leader and a set of replicas:

// topic/topic.go
package topic

type Topic struct {
    mu                  sync.RWMutex             `json:"-"`
    Name                string                   `json:"name"`
    LeaderNodeID        string                   `json:"leader_id"`
    LeaderEpoch         int64                    `json:"leader_epoch"`
    DesiredReplicaCount int                      `json:"desired_replica_count"`
    Replicas            map[string]*ReplicaState `json:"replicas"`
    Log                 *log.LogManager          `json:"-"`
    Logger              *zap.Logger              `json:"-"`
}

Fields marked json:"-" are not serialized in Raft snapshots — they are local runtime state. Log is the local log (only opened if this node is leader or replica for the topic). DesiredReplicaCount is saved from the CreateTopic request and used to re-add replicas when nodes rejoin the cluster.

ReplicaState tracking

Tracks how far each replica has replicated:

type ReplicaState struct {
    ReplicaNodeID string `json:"replica_id"`
    LEO           int64  `json:"leo"`
    IsISR         bool   `json:"is_isr"`
}

LEO is int64 (not uint64) because it is updated from the leader's perspective when serving replication fetches. A value of 0 means the replica has not yet reported its position.

NodeMetadata for addresses

Each node's addresses for Raft and RPC communication:

type NodeMetadata struct {
    mu      sync.RWMutex `json:"-"`
    NodeID  string       `json:"node_id"`
    Addr    string       `json:"addr"`
    RpcAddr string       `json:"rpc_addr"`
}

Addr is the Raft address; RpcAddr is where clients and replication threads connect. Both are set when the node joins the cluster via the AddNode metadata event.

Step 2: Create the TopicManager struct

TopicManager structure

var _ coordinator.MetadataStore = (*TopicManager)(nil)

type TopicManager struct { mu sync.RWMutex Topics map[string]*Topic json:"topics" BaseDir string json:"-" Logger *zap.Logger json:"-" Nodes map[string]*NodeMetadata json:"nodes" CurrentNodeID string json:"-" coordinator TopicCoordinator json:"-" stopPeriodic chan struct{} json:"-" stopReplication chan struct{} json:"-" replicationBatchSize uint32 json:"-" ISRLagThreshold uint64 json:"-" }


The `var _ coordinator.MetadataStore = (*TopicManager)(nil)` line is a compile-time assertion that `TopicManager` implements the `MetadataStore` interface.

Fields with `json:"topics"` and `json:"nodes"` are serialized in Raft snapshots (the FSM's `Snapshot()` calls `json.Marshal(c.metadataStore)`). Everything else (`json:"-"`) is local runtime state.

## Step 3: Define the TopicCoordinator interface

### Interface for Raft interaction

```go
// topic/topic_coordinator.go
type TopicCoordinator interface {
    ApplyCreateTopicEvent(topic string, replicaCount uint32,
        leaderNodeID string, replicaNodeIds []string) error
    ApplyDeleteTopicEventInternal(topic string) error
    ApplyIsrUpdateEventInternal(topic, replicaNodeID string, isr bool) error
    ApplyLeaderChangeEvent(topic, leaderNodeID string, leaderEpoch int64) error
    IsLeader() bool
    GetRaftLeaderNodeID() (string, error)
}

Each method matches the corresponding Coordinator method signature. This allows tests to use a fake coordinator without running real Raft.

Step 4: Create the constructor

Constructor and initialization

const ( defaultMetadataLogInterval = 30 * time.Second DefaultISRLagThreshold = uint64(100) )

func NewTopicManager(baseDir string, coord TopicCoordinator, logger *zap.Logger) (*TopicManager, error) { if logger == nil { logger = zap.NewNop() } tm := &TopicManager{ Topics: make(map[string]*Topic), BaseDir: baseDir, Logger: logger, Nodes: make(map[string]*NodeMetadata), coordinator: coord, stopPeriodic: make(chan struct{}), ISRLagThreshold: DefaultISRLagThreshold, } go tm.periodicLog(defaultMetadataLogInterval) tm.replicationBatchSize = DefaultReplicationBatchSize return tm, nil }


The constructor starts a `periodicLog` goroutine that logs the entire metadata state (as JSON) every 30 seconds. This is useful for debugging — you can see the cluster state in the logs.

The coordinator is passed in but can also be set later via `SetCoordinator()`. This supports the chicken-and-egg problem: the `TopicManager` is the `MetadataStore` for the `Coordinator`, but the `Coordinator` is the `TopicCoordinator` for the `TopicManager`.

### Coordinator setter and node ID setter

```go
func (tm *TopicManager) SetCoordinator(c TopicCoordinator) {
    tm.mu.Lock()
    defer tm.mu.Unlock()
    tm.coordinator = c
}

func (tm *TopicManager) SetCurrentNodeID(nodeID string) {
    tm.mu.Lock()
    defer tm.mu.Unlock()
    tm.CurrentNodeID = nodeID
}

Step 5: Create and manage topics

Topic creation flow

Topic creation goes through Raft so all nodes agree. The flow:

  1. Client sends CreateTopicRequest to the Raft leader.
  2. The leader picks which node leads the topic (least loaded) and which nodes host replicas.
  3. The leader proposes a CreateTopicEvent through Raft.
  4. All nodes apply the event: create the Topic object, open local logs if this node participates.
func (tm *TopicManager) CreateTopic(ctx context.Context,
    req *protocol.CreateTopicRequest) (*protocol.CreateTopicResponse, error) {
    if tm.coordinator == nil {
        return nil, fmt.Errorf("topic: no coordinator")
    }
    c := tm.coordinator
    if !c.IsLeader() {
        return nil, fmt.Errorf("create topic must be sent to Raft leader: %w",
            errs.ErrCannotReachLeader)
    }
    tm.mu.RLock()
    _, exists := tm.Topics[req.Topic]
    tm.mu.RUnlock()
    if exists {
        return nil, errs.ErrTopicExistsf(req.Topic)
    }
    leaderNodeID, err := tm.GetNodeIDWithLeastTopics()
    if err != nil {
        return nil, err
    }
    replicaNodeIds, err := tm.pickReplicaNodeIds(leaderNodeID, int(req.ReplicaCount))
    if err != nil {
        return nil, errs.ErrCreateTopic(err)
    }
    tm.Logger.Info("create topic via Raft",
        zap.String("topic", req.Topic),
        zap.String("leader_node_id", leaderNodeID),
        zap.Strings("replica_node_ids", replicaNodeIds))
    if err := c.ApplyCreateTopicEvent(req.Topic, req.ReplicaCount,
        leaderNodeID, replicaNodeIds); err != nil {
        return nil, err
    }
    return &protocol.CreateTopicResponse{
        Topic:          req.Topic,
        ReplicaNodeIds: replicaNodeIds,
    }, nil
}

Least-loaded node selection strategy

The leader is the node with the fewest topics. Ties are broken lexicographically (smallest node ID wins) for determinism:

func (tm *TopicManager) GetNodeIDWithLeastTopics() (string, error) {
    tm.mu.RLock()
    defer tm.mu.RUnlock()
    countByNode := make(map[string]int)
    for _, node := range tm.Nodes {
        if node != nil {
            countByNode[node.NodeID] = 0
        }
    }
    for _, t := range tm.Topics {
        if t != nil && t.LeaderNodeID != "" {
            countByNode[t.LeaderNodeID]++
        }
    }
    if len(countByNode) == 0 {
        return "", errs.ErrNoNodesInCluster
    }
    ids := make([]string, 0, len(countByNode))
    for id := range countByNode {
        ids = append(ids, id)
    }
    sort.Strings(ids)
    bestID := ids[0]
    minCount := countByNode[bestID]
    for _, id := range ids[1:] {
        if c := countByNode[id]; c < minCount {
            minCount = c
            bestID = id
        }
    }
    return bestID, nil
}

Picking replicas

func (tm *TopicManager) pickReplicaNodeIds(leaderNodeID string,
    replicaCount int) ([]string, error) {
    tm.mu.RLock()
    var otherNodes []*NodeMetadata
    for _, node := range tm.Nodes {
        if node != nil && node.NodeID != leaderNodeID {
            otherNodes = append(otherNodes, node)
        }
    }
    tm.mu.RUnlock()
    if len(otherNodes) < replicaCount {
        return nil, errs.ErrNotEnoughNodesf(replicaCount, len(otherNodes))
    }
    replicaNodeIds := make([]string, 0, replicaCount)
    for i := 0; i < replicaCount; i++ {
        replicaNodeIds = append(replicaNodeIds, otherNodes[i].NodeID)
    }
    return replicaNodeIds, nil
}

Step 6: Apply Raft metadata events

Implementing MetadataStore.Apply()

The TopicManager implements MetadataStore.Apply(). When Raft commits an event, every node processes it. The method holds the write lock for the entire switch, so all events are applied atomically:

func (tm *TopicManager) Apply(ev *protocol.MetadataEvent) error {
    tm.mu.Lock()
    defer tm.mu.Unlock()
    switch ev.EventType {
    case protocol.MetadataEventTypeCreateTopic:
        e := protocol.CreateTopicEvent{}
        if err := json.Unmarshal(ev.Data, &e); err != nil {
            return err
        }
        tm.createTopicFromEvent(e.Topic, e.LeaderNodeID, e.LeaderEpoch, e.ReplicaNodeIds)

    case protocol.MetadataEventTypeLeaderChange:
        e := protocol.LeaderChangeEvent{}
        if err := json.Unmarshal(ev.Data, &e); err != nil {
            return err
        }
        t := tm.Topics[e.Topic]
        if t != nil {
            oldLeaderID := t.LeaderNodeID
            t.LeaderNodeID = e.LeaderNodeID
            t.LeaderEpoch = e.LeaderEpoch
            delete(t.Replicas, e.LeaderNodeID)
            if oldLeaderID != e.LeaderNodeID && oldLeaderID != "" &&
                tm.Nodes[oldLeaderID] != nil {
                if t.Replicas == nil {
                    t.Replicas = make(map[string]*ReplicaState)
                }
                t.Replicas[oldLeaderID] = &ReplicaState{
                    ReplicaNodeID: oldLeaderID,
                    LEO:           0,
                    IsISR:         false,
                }
            }
            tm.ensureLocalLogAfterLeaderChange(e.Topic, oldLeaderID, e.LeaderNodeID)
        }

    case protocol.MetadataEventTypeIsrUpdate:
        e := protocol.IsrUpdateEvent{}
        if err := json.Unmarshal(ev.Data, &e); err != nil {
            return err
        }
        t := tm.Topics[e.Topic]
        if t != nil {
            rs := t.Replicas[e.ReplicaNodeID]
            if rs != nil {
                rs.IsISR = e.Isr
            } else {
                if t.Replicas == nil {
                    t.Replicas = make(map[string]*ReplicaState)
                }
                t.Replicas[e.ReplicaNodeID] = &ReplicaState{
                    ReplicaNodeID: e.ReplicaNodeID,
                    LEO:           0,
                    IsISR:         e.Isr,
                }
            }
            tm.maybeAdvanceHW(t)
        }

    case protocol.MetadataEventTypeDeleteTopic:
        e := protocol.DeleteTopicEvent{}
        if err := json.Unmarshal(ev.Data, &e); err != nil {
            return err
        }
        tm.deleteTopicFromEvent(e.Topic)

    case protocol.MetadataEventTypeAddNode:
        e := protocol.AddNodeEvent{}
        if err := json.Unmarshal(ev.Data, &e); err != nil {
            return err
        }
        tm.Nodes[e.NodeID] = &NodeMetadata{
            NodeID:  e.NodeID,
            Addr:    e.Addr,
            RpcAddr: e.RpcAddr,
        }
        tm.maybeAddReplicasForNode(e.NodeID)

    case protocol.MetadataEventTypeRemoveNode:
        e := protocol.RemoveNodeEvent{}
        if err := json.Unmarshal(ev.Data, &e); err != nil {
            return err
        }
        delete(tm.Nodes, e.NodeID)
        tm.maybeReassignTopicLeaders(e.NodeID)
    }
    return nil
}

Let's walk through the key event handlers.

Creating topics from committed events

When the CreateTopicEvent is applied, each node creates the Topic object and opens a local log if it participates:

func (tm *TopicManager) createTopicFromEvent(topicName, leaderNodeID string,
    leaderEpoch int64, replicaNodeIds []string) {
    if _, exists := tm.Topics[topicName]; exists {
        return // idempotent
    }
    t := &Topic{
        Name:                topicName,
        LeaderNodeID:        leaderNodeID,
        LeaderEpoch:         leaderEpoch,
        DesiredReplicaCount: len(replicaNodeIds),
        Replicas:            make(map[string]*ReplicaState),
        Logger:              tm.Logger,
    }
    for _, replica := range replicaNodeIds {
        t.Replicas[replica] = &ReplicaState{
            ReplicaNodeID: replica,
            LEO:           0,
            IsISR:         true,
        }
    }
    tm.Topics[topicName] = t
    tm.ensureLocalLogForTopic(topicName, leaderNodeID, replicaNodeIds)
}

New replicas start with IsISR: true. The ISR flag is updated as replicas catch up (or fall behind).

The ensureLocalLogForTopic helper opens the log directory if this node is the leader or a replica:

func (tm *TopicManager) ensureLocalLogForTopic(topicName, leaderNodeID string,
    replicaNodeIds []string) {
    t := tm.Topics[topicName]
    if t == nil {
        return
    }
    if tm.CurrentNodeID == leaderNodeID {
        if t.Log == nil {
            logManager, err := log.NewLogManager(
                filepath.Join(tm.BaseDir, topicName))
            if err != nil {
                tm.Logger.Warn("open leader log failed",
                    zap.String("topic", topicName), zap.Error(err))
                return
            }
            t.Log = logManager
        }
        return
    }
    for _, rid := range replicaNodeIds {
        if rid == tm.CurrentNodeID {
            if t.Log == nil {
                logManager, err := log.NewLogManager(
                    filepath.Join(tm.BaseDir, topicName))
                if err != nil {
                    tm.Logger.Warn("open replica log failed",
                        zap.String("topic", topicName), zap.Error(err))
                    return
                }
                t.Log = logManager
            }
            return
        }
    }
}

Nodes that are neither leader nor replica for the topic don't open a log — they just hold the metadata in memory.

Adding and removing nodes

When a node joins, its addresses are recorded and it is added as a replica for under-replicated topics:

case protocol.MetadataEventTypeAddNode:
    // ... unmarshal ...
    tm.Nodes[e.NodeID] = &NodeMetadata{
        NodeID:  e.NodeID,
        Addr:    e.Addr,
        RpcAddr: e.RpcAddr,
    }
    tm.maybeAddReplicasForNode(e.NodeID)
func (tm *TopicManager) maybeAddReplicasForNode(nodeID string) {
    for _, t := range tm.Topics {
        if t == nil || t.LeaderNodeID == nodeID {
            continue
        }
        if _, already := t.Replicas[nodeID]; already {
            continue
        }
        if len(t.Replicas) >= t.DesiredReplicaCount {
            continue
        }
        if t.Replicas == nil {
            t.Replicas = make(map[string]*ReplicaState)
        }
        t.Replicas[nodeID] = &ReplicaState{
            ReplicaNodeID: nodeID,
            LEO:           0,
            IsISR:         false,
        }
        tm.Logger.Info("added rejoined node as replica",
            zap.String("topic", t.Name),
            zap.String("node_id", nodeID),
        )
    }
}

This is how a crashed and restarted node automatically becomes a replica again — when it rejoins the cluster, the AddNode event triggers maybeAddReplicasForNode, which checks each topic's DesiredReplicaCount. New replicas start with IsISR: false and become in-sync after catching up.

Leader reassignment on failure

When a node is removed (crashed or left), topics it led need new leaders. This is called from Apply() (the FSM goroutine), so it cannot call raft.Apply() synchronously — that would deadlock. Instead, leader changes are applied asynchronously in a goroutine:

func (tm *TopicManager) maybeReassignTopicLeaders(nodeID string) {
    if tm.coordinator == nil || !tm.coordinator.IsLeader() {
        return
    }

    type leaderChange struct {
        topic     string
        newLeader string
        epoch     int64
    }
    var changes []leaderChange

    for topicName, t := range tm.Topics {
        if t == nil || t.LeaderNodeID != nodeID {
            continue
        }
        var newLeader string
        for rid, rs := range t.Replicas {
            if rid == nodeID || rs == nil || !rs.IsISR {
                continue
            }
            n := tm.Nodes[rid]
            if n == nil {
                continue
            }
            newLeader = rid
            break
        }
        if newLeader == "" {
            tm.Logger.Warn("no ISR replica for leadership",
                zap.String("topic", topicName),
                zap.String("old_leader_node_id", nodeID))
            continue
        }
        changes = append(changes, leaderChange{
            topic:     topicName,
            newLeader: newLeader,
            epoch:     t.LeaderEpoch + 1,
        })
    }

    if len(changes) == 0 {
        return
    }

    // Apply leader changes asynchronously so the FSM's
    // current Apply() can return first.
    go func() {
        for _, ch := range changes {
            if err := tm.coordinator.ApplyLeaderChangeEvent(
                ch.topic, ch.newLeader, ch.epoch); err != nil {
                tm.Logger.Warn("leader change apply failed",
                    zap.String("topic", ch.topic), zap.Error(err))
            }
        }
    }()
}

Key details:

  • Only ISR replicas are considered for leadership (they have the most up-to-date data).
  • The new leader's epoch is t.LeaderEpoch + 1. Epoch tracking prevents stale leaders from accepting writes.
  • The go func() is critical — without it, the FSM would deadlock because ApplyLeaderChangeEvent calls raft.Apply(), which waits for the FSM to process the new entry, but the FSM is still blocked in the current Apply().

Leader change event handling

When a LeaderChangeEvent is applied, the old leader becomes a replica and the new leader's entry is removed from replicas:

case protocol.MetadataEventTypeLeaderChange:
    // ...
    t := tm.Topics[e.Topic]
    if t != nil {
        oldLeaderID := t.LeaderNodeID
        t.LeaderNodeID = e.LeaderNodeID
        t.LeaderEpoch = e.LeaderEpoch
        // New leader is no longer a replica
        delete(t.Replicas, e.LeaderNodeID)
        // Old leader becomes a replica (if still alive)
        if oldLeaderID != e.LeaderNodeID && oldLeaderID != "" &&
            tm.Nodes[oldLeaderID] != nil {
            t.Replicas[oldLeaderID] = &ReplicaState{
                ReplicaNodeID: oldLeaderID,
                LEO:           0,
                IsISR:         false,
            }
        }
        tm.ensureLocalLogAfterLeaderChange(e.Topic, oldLeaderID, e.LeaderNodeID)
    }

The check tm.Nodes[oldLeaderID] != nil prevents adding a dead node as a replica — if the old leader was removed, it was already cleaned up from Nodes by the RemoveNode handler.

Step 7: Handle produce requests with replication

Validating leadership and appending records

The topic manager validates that this node is the leader, appends to the log, and optionally waits for replication:

func (tm *TopicManager) HandleProduce(ctx context.Context, t *Topic,
    logEntry *protocol.LogEntry, acks protocol.AckMode) (uint64, error) {
    offset, err := t.Log.Append(logEntry.Value)
    if err != nil {
        return 0, err
    }
    switch acks {
    case protocol.AckLeader:
        return offset, nil
    case protocol.AckAll:
        if err := tm.waitForAllFollowersToCatchUp(ctx, t, offset); err != nil {
            return 0, errs.ErrWaitFollowersCatchUp(err)
        }
        return offset, nil
    default:
        return 0, errs.ErrInvalidAckModef(int32(acks))
    }
}

The HandleProduce method receives the *Topic directly (the RPC handler looks it up). AckLeader returns immediately after the local append. AckAll blocks until all ISR replicas have caught up.

Batch produce

func (tm *TopicManager) HandleProduceBatch(ctx context.Context, t *Topic,
    values [][]byte, acks protocol.AckMode) (uint64, uint64, error) {
    if len(values) == 0 {
        return 0, 0, errs.ErrValuesEmpty
    }
    base, err := t.Log.AppendBatch(values)
    if err != nil {
        return 0, 0, err
    }
    last := base + uint64(len(values)) - 1
    switch acks {
    case protocol.AckLeader:
        return base, last, nil
    case protocol.AckAll:
        if err := tm.waitForAllFollowersToCatchUp(ctx, t, last); err != nil {
            return 0, 0, errs.ErrWaitFollowersCatchUp(err)
        }
        return base, last, nil
    default:
        return 0, 0, errs.ErrInvalidAckModef(int32(acks))
    }
}

Returns both base and last offsets so the client knows the range of offsets written.

Waiting for replication (AckAll)

func (tm *TopicManager) waitForAllFollowersToCatchUp(ctx context.Context,
    t *Topic, offset uint64) error {
    timeout := time.After(5 * time.Second)
    ticker := time.NewTicker(10 * time.Millisecond)
    defer ticker.Stop()

    requiredLEO := offset + 1

    for {
        allCaughtUp := true
        candidates := 0

        t.mu.RLock()
        useISR := false
        for _, r := range t.Replicas {
            if r != nil && r.IsISR {
                useISR = true
                break
            }
        }
        for _, replica := range t.Replicas {
            if replica == nil {
                continue
            }
            if useISR && !replica.IsISR {
                continue
            }
            candidates++
            if uint64(replica.LEO) < requiredLEO {
                allCaughtUp = false
                break
            }
        }
        t.mu.RUnlock()

        if candidates == 0 {
            return nil
        }
        if allCaughtUp {
            return nil
        }

        select {
        case <-ticker.C:
            continue
        case <-ctx.Done():
            return ctx.Err()
        case <-timeout:
            return errs.ErrTimeoutCatchUp
        }
    }
}

Key details:

  • requiredLEO = offset + 1 — LEO is "next offset to write", so after writing at offset N, a replica is caught up when its LEO is at least N+1.
  • If any ISR replicas exist, only ISR replicas are checked. If no ISR replicas exist (all fell behind), the method returns immediately — no replicas to wait for.
  • The 5-second timeout prevents indefinite blocking if a replica is down.
  • Polls every 10ms, which is fast enough for low-latency produce while keeping CPU usage minimal.

Step 8: Advance the high watermark

Computing and setting HW

The high watermark (HW) is the minimum LEO across the leader and all ISR replicas. Consumers can only read up to the HW — this ensures they never see data that hasn't been replicated:

func (tm *TopicManager) maybeAdvanceHW(t *Topic) {
    if t.Log == nil {
        return
    }
    minOffset := t.Log.LEO()
    for _, r := range t.Replicas {
        if r != nil && r.IsISR && uint64(r.LEO) < minOffset {
            minOffset = uint64(r.LEO)
        }
    }
    t.Log.SetHighWatermark(minOffset)
}

Non-ISR replicas are excluded — a dead or lagging replica must not hold back consumer visibility.

Step 9: Track replica progress from fetch requests

Recording LEO and updating ISR

When the leader serves a fetch from a replica (the replica sets ReplicaNodeID in its FetchBatchRequest), the leader records the replica's LEO and updates its ISR status:

func (tm *TopicManager) RecordReplicaLEOFromFetch(ctx context.Context,
    topicName, replicaNodeID string, leo int64) error {
    tm.mu.Lock()
    t := tm.Topics[topicName]
    if t == nil {
        tm.mu.Unlock()
        return nil
    }
    leaderLEO := uint64(0)
    if t.Log != nil {
        leaderLEO = t.Log.LEO()
    }
    if t.Replicas == nil {
        t.Replicas = make(map[string]*ReplicaState)
    }
    rs := t.Replicas[replicaNodeID]
    if rs == nil {
        t.Replicas[replicaNodeID] = &ReplicaState{
            ReplicaNodeID: replicaNodeID,
            LEO:           leo,
            IsISR:         true,
        }
        rs = t.Replicas[replicaNodeID]
    } else {
        rs.LEO = leo
    }
    lagThreshold := tm.ISRLagThreshold
    if lagThreshold == 0 {
        lagThreshold = DefaultISRLagThreshold
    }
    var isr bool
    if leaderLEO > lagThreshold {
        isr = uint64(leo) >= leaderLEO-lagThreshold
    } else {
        isr = leo >= 0
    }
    rs.IsISR = isr
    tm.mu.Unlock()
    tm.maybeAdvanceHW(t)
    if tm.coordinator == nil {
        return nil
    }
    return tm.coordinator.ApplyIsrUpdateEventInternal(topicName, replicaNodeID, isr)
}

The ISR threshold (DefaultISRLagThreshold = 100) determines how far behind a replica can lag and still be considered in-sync. For small topics (LEO < threshold), all replicas are in-sync. After updating ISR status locally, the change is propagated cluster-wide via a Raft event.

Step 10: Provide replication helper methods

Methods for replication thread integration

The TopicManager provides methods used by the replication thread:

func (tm *TopicManager) ListReplicaTopics() []ReplicaTopicInfo {
    tm.mu.RLock()
    defer tm.mu.RUnlock()
    var out []ReplicaTopicInfo
    for name, t := range tm.Topics {
        if t == nil || t.Log == nil || t.LeaderNodeID == tm.CurrentNodeID {
            continue
        }
        out = append(out, ReplicaTopicInfo{
            TopicName:    name,
            LeaderNodeID: t.LeaderNodeID,
        })
    }
    return out
}

func (tm *TopicManager) ApplyRecordBatch(topicName string,
    values [][]byte) error {
    if len(values) == 0 {
        return nil
    }
    tm.mu.RLock()
    t := tm.Topics[topicName]
    tm.mu.RUnlock()
    if t == nil {
        return nil
    }
    t.mu.RLock()
    l := t.Log
    t.mu.RUnlock()
    if l == nil {
        return nil
    }
    _, err := l.AppendBatch(values)
    return err
}

ListReplicaTopics returns all topics where this node has a local log but is not the leader — those are the topics this node needs to replicate. ApplyRecordBatch appends fetched records to the local replica log.

Step 11: Serialize state via Raft snapshots

JSON-based snapshot and restore

For Raft snapshots, the TopicManager is directly JSON-marshaled (the FSM calls json.Marshal(c.metadataStore)). The fields with json tags (Topics, Nodes) are serialized.

func (tm *TopicManager) Restore(data []byte) error {
    var decoded struct {
        Topics map[string]*Topic        `json:"topics"`
        Nodes  map[string]*NodeMetadata `json:"nodes"`
    }
    if err := json.Unmarshal(data, &decoded); err != nil {
        return err
    }
    tm.mu.Lock()
    defer tm.mu.Unlock()
    tm.Topics = decoded.Topics
    tm.Nodes = decoded.Nodes
    for name, t := range tm.Topics {
        if t == nil {
            continue
        }
        t.Logger = tm.Logger
        if tm.CurrentNodeID == t.LeaderNodeID {
            if t.Log == nil {
                logManager, err := log.NewLogManager(
                    filepath.Join(tm.BaseDir, name))
                if err != nil {
                    tm.Logger.Warn("open leader log failed",
                        zap.String("topic", name), zap.Error(err))
                    continue
                }
                t.Log = logManager
            }
            tm.maybeAdvanceHW(t)
        } else if _, isReplica := t.Replicas[tm.CurrentNodeID]; isReplica {
            if t.Log == nil {
                logManager, err := log.NewLogManager(
                    filepath.Join(tm.BaseDir, name))
                if err != nil {
                    tm.Logger.Warn("open replica log failed",
                        zap.String("topic", name), zap.Error(err))
                    continue
                }
                t.Log = logManager
            }
        }
    }
    return nil
}

Restore replaces the in-memory state entirely from the snapshot. After restoring Topics and Nodes, it re-opens local logs for topics this node participates in and advances the HW for leader topics.

Step 12: Query metadata state

Querying topics, leaders, and nodes

func (tm *TopicManager) IsLeader(topic string) (bool, error) {
    tm.mu.RLock()
    defer tm.mu.RUnlock()
    topicObj, ok := tm.Topics[topic]
    if !ok {
        return false, errs.ErrTopicNotFoundf(topic)
    }
    return topicObj.LeaderNodeID == tm.CurrentNodeID, nil
}

func (tm *TopicManager) GetTopicLeaderRPCAddr(topic string) (string, error) {
    tm.mu.RLock()
    defer tm.mu.RUnlock()
    topicObj, ok := tm.Topics[topic]
    if !ok {
        return "", errs.ErrTopicNotFoundf(topic)
    }
    node := tm.Nodes[topicObj.LeaderNodeID]
    if node == nil {
        return "", errs.ErrTopicNotFoundf(topic)
    }
    return node.RpcAddr, nil
}

func (tm *TopicManager) GetRaftLeaderRPCAddr() (string, error) {
    if tm.coordinator == nil {
        return "", fmt.Errorf("topic: no coordinator")
    }
    leaderNodeID, err := tm.coordinator.GetRaftLeaderNodeID()
    if err != nil {
        return "", err
    }
    tm.mu.RLock()
    node := tm.Nodes[leaderNodeID]
    tm.mu.RUnlock()
    if node == nil {
        return "", fmt.Errorf("raft leader node %q not in metadata", leaderNodeID)
    }
    return node.RpcAddr, nil
}

func (tm *TopicManager) ListTopics() *protocol.ListTopicsResponse {
    tm.mu.RLock()
    defer tm.mu.RUnlock()
    out := make([]protocol.TopicInfo, 0, len(tm.Topics))
    for name, t := range tm.Topics {
        if t == nil {
            continue
        }
        t.mu.RLock()
        replicas := make([]protocol.ReplicaInfo, 0, len(t.Replicas))
        for _, rs := range t.Replicas {
            if rs != nil {
                replicas = append(replicas, protocol.ReplicaInfo{
                    NodeID: rs.ReplicaNodeID,
                    IsISR:  rs.IsISR,
                    LEO:    rs.LEO,
                })
            }
        }
        t.mu.RUnlock()
        out = append(out, protocol.TopicInfo{
            Name:         name,
            LeaderNodeID: t.LeaderNodeID,
            LeaderEpoch:  t.LeaderEpoch,
            Replicas:     replicas,
        })
    }
    return &protocol.ListTopicsResponse{Topics: out}
}

ListTopics can be served by any node — metadata is replicated via Raft so all nodes have the same view.

Step 13: Understand the end-to-end integration

Complete workflow from request to replication

Client: CreateTopicRequest
    │
    ▼
RPC handler → TopicManager.CreateTopic()
    │
    ├── GetNodeIDWithLeastTopics() → picks leader
    ├── pickReplicaNodeIds() → picks replicas
    │
    ▼
Coordinator.ApplyCreateTopicEvent(topic, replicaCount, leader, replicas)
    │
    ▼
Raft replicates to majority
    │
    ▼
FSM.Apply() on ALL nodes
    │
    ▼
TopicManager.Apply(CreateTopicEvent)
    │
    ├── createTopicFromEvent() → creates Topic object
    └── ensureLocalLogForTopic() → opens log if leader/replica

For produce:

Client: ProduceRequest (topic, value, acks=AckAll)
    │
    ▼
RPC handler → TopicManager.HandleProduce(topic, value, AckAll)
    │
    ├── t.Log.Append(value) → writes to disk
    │
    └── waitForAllFollowersToCatchUp(offset)
            │
            └── polls every 10ms until:
                - all ISR replicas have LEO > offset
                - or 5s timeout

For node failure:

Node 2 crashes
    │
    ▼
Serf detects failure
    │
    ▼
Coordinator.Leave(node2)
    │
    ├── raft.RemoveServer(node2)
    └── ApplyNodeRemoveEvent(node2)
            │
            ▼
        FSM.Apply(RemoveNodeEvent)
            │
            ├── delete(tm.Nodes, node2)
            └── maybeReassignTopicLeaders(node2)
                    │
                    └── go func() {
                            ApplyLeaderChangeEvent(topic, newLeader, epoch+1)
                        }()

Summary

Component Purpose
Topic One named log: leader, epoch, replicas, local LogManager. JSON-serializable for Raft snapshots.
TopicManager Central state: all topics, all nodes, current node ID. Implements MetadataStore.
CreateTopic Picks leader (least loaded, deterministic tie-breaking) and replicas, proposes via Raft. All nodes apply.
HandleProduce Appends to leader log. AckLeader returns immediately. AckAll polls until ISR replicas catch up (5s timeout).
Apply() Switches on event type. CreateTopic creates objects and opens logs. LeaderChange swaps leader/replica roles. AddNode adds replicas for under-replicated topics. RemoveNode triggers async leader reassignment.
maybeAdvanceHW HW = min(leader LEO, all ISR LEOs). Non-ISR replicas excluded.
RecordReplicaLEOFromFetch Updates replica LEO and ISR status when leader serves replication fetches.
Snapshot/Restore Direct JSON marshal of Topics and Nodes. Restore re-opens local logs.

With topic management in place, the next page covers replication — how follower replicas pull data from the leader and how the replication thread is organized.