Pull-Based Replication, ISR, and High Watermark in a Distributed Log

Pull-Based Replication, ISR, and High Watermark in a Distributed Log

In this section, you'll implement the replication thread (followers pull data from the leader), ISR tracking (the leader monitors which replicas are in sync), and high watermark advancement (consumers see data only after it is replicated). Code lives in topic/replication.go and parts of topic/topic.go.

Step 1: Understand pull-based replication model

Why pull instead of push

Our system uses pull-based replication, similar to Kafka:

Leader (Node 1)                     Follower (Node 2)
┌──────────────┐                    ┌──────────────┐
│ Log:         │                    │ Log:         │
│ [0][1][2][3] │  ← FetchBatch ──   │ [0][1]       │
│   LEO=4      │  ── records ──→    │   LEO=2      │
│   HW=2       │                    │              │
└──────────────┘                    └──────────────┘
  1. The follower periodically calls FetchBatch on the leader, starting from its current LEO.
  2. The leader reads from its log using ReadUncommitted (up to LEO, not HW) and returns records.
  3. The leader records the follower's LEO for ISR computation.
  4. The follower appends the received records to its local log in a batch.
  5. After all ISR replicas have caught up, the leader advances HW.

Why pull, not push? Pull gives each follower control over its own pace. A slow follower does not block the leader's write path. The leader just serves fetch requests when they arrive.

Step 2: Manage replication thread lifecycle

Starting and stopping the replication thread

Each node runs a replication thread that handles all topics this node replicates (i.e., topics where this node is a follower, not the leader). Start and stop are guarded by a nil-check so calling them multiple times is safe:

// topic/replication.go

func (tm *TopicManager) StartReplicationThread() {
    tm.mu.Lock()
    if tm.stopReplication != nil {
        tm.mu.Unlock()
        return
    }
    tm.stopReplication = make(chan struct{})
    tm.mu.Unlock()
    go tm.runReplicationThread()
}

func (tm *TopicManager) StopReplicationThread() {
    tm.mu.Lock()
    ch := tm.stopReplication
    tm.stopReplication = nil
    tm.mu.Unlock()
    if ch != nil {
        close(ch)
    }
}

StartReplicationThread checks under a lock whether the stop channel already exists — if so, the thread is already running. StopReplicationThread swaps the channel to nil under the lock and then closes it, so a double-stop is harmless.

The replication loop with cancellable context

The thread wakes every second and replicates all topics. It derives a context.Context from the stop channel so that every downstream operation can be cancelled cleanly on shutdown:

const replicationTickInterval = 1 * time.Second

func (tm *TopicManager) runReplicationThread() {
    ticker := time.NewTicker(replicationTickInterval)
    defer ticker.Stop()

    tm.mu.RLock()
    stop := tm.stopReplication
    tm.mu.RUnlock()

    // Derive a cancellable context from the stop channel.
    ctx, cancel := context.WithCancel(context.Background())
    go func() {
        <-stop
        cancel()
    }()

    for {
        select {
        case <-ctx.Done():
            return
        case <-ticker.C:
            tm.replicateAllTopics(ctx)
        }
    }
}

A background goroutine waits on the stop channel and calls cancel() when it fires, propagating cancellation to every in-flight FetchBatch call.

Step 3: Organize replication with per-leader goroutine pools

Why per-leader goroutines

A critical design choice: topics are grouped by leader, and each leader gets its own goroutine. A slow or dead leader blocks only its own goroutine — other leaders continue unaffected.

Identifying topics to replicate

ListReplicaTopics scans all topics and returns the ones this node replicates (i.e., where this node is not the leader and has a local log):

// topic/replication.go

type ReplicaTopicInfo struct {
    TopicName    string
    LeaderNodeID string
}

// topic/topic.go

func (tm *TopicManager) ListReplicaTopics() []ReplicaTopicInfo {
    tm.mu.RLock()
    defer tm.mu.RUnlock()
    var out []ReplicaTopicInfo
    for name, t := range tm.Topics {
        if t == nil || t.Log == nil || t.LeaderNodeID == tm.CurrentNodeID {
            continue
        }
        out = append(out, ReplicaTopicInfo{
            TopicName: name, LeaderNodeID: t.LeaderNodeID,
        })
    }
    return out
}

The return type is minimal — just TopicName and LeaderNodeID. The actual LEO is fetched on demand inside ReplicateFromLeader.

Launching worker goroutines per leader

replicateAllTopics groups topics by leader and launches one goroutine per leader:

func (tm *TopicManager) replicateAllTopics(ctx context.Context) {
    leaderToTopics := make(map[string][]string)
    for _, info := range tm.ListReplicaTopics() {
        leaderToTopics[info.LeaderNodeID] = append(
            leaderToTopics[info.LeaderNodeID], info.TopicName)
    }
    if len(leaderToTopics) == 0 {
        return
    }

    batchSize := tm.replicationBatchSize
    if batchSize == 0 {
        batchSize = DefaultReplicationBatchSize
    }

    var wg sync.WaitGroup
    for leaderID, topicNames := range leaderToTopics {
        wg.Add(1)
        go func(leaderID string, topicNames []string) {
            defer wg.Done()
            if err := tm.ReplicateFromLeader(
                ctx, leaderID, topicNames, batchSize,
            ); err != nil {
                tm.Logger.Warn("replication from leader failed",
                    zap.String("leader_id", leaderID),
                    zap.Error(err),
                )
            }
        }(leaderID, topicNames)
    }
    wg.Wait()
}

sync.WaitGroup ensures the tick handler waits for all goroutines to finish before the next tick fires. Errors are logged but do not crash the loop — the next tick will retry.

Step 4: Fetch records from a leader

The ReplicateFromLeader engine

ReplicateFromLeader is the core of the replication engine. For each leader, it:

  1. Creates a dedicated ConsumerClient (not shared — avoids concurrent mutation).
  2. Iterates topics in a loop, removing topics that are caught up.
  3. Uses batch writes (ApplyRecordBatch) instead of per-record appends.
  4. Handles connection errors and offset-not-found errors gracefully.
const DefaultReplicationBatchSize = 5000

func (tm *TopicManager) ReplicateFromLeader(
    ctx context.Context, leaderID string,
    topicNames []string, batchSize uint32,
) error {
    // Look up leader RPC address.
    tm.mu.RLock()
    node := tm.Nodes[leaderID]
    tm.mu.RUnlock()
    if node == nil {
        return fmt.Errorf("leader node %s not found", leaderID)
    }

    // Create a dedicated client for this replication goroutine (not shared).
    cc, err := client.NewConsumerClient(node.RpcAddr)
    if err != nil {
        return fmt.Errorf("connect to leader %s at %s: %w",
            leaderID, node.RpcAddr, err)
    }
    defer cc.Close()
    cc.SetReplicaNodeID(tm.CurrentNodeID)

    consumerID := fmt.Sprintf("replicate-%s-%s", tm.CurrentNodeID, leaderID)

    // Fetch each topic in a loop. A topic is removed from pending when caught up.
    pending := make([]string, len(topicNames))
    copy(pending, topicNames)

    for len(pending) > 0 {
        if ctx.Err() != nil {
            return ctx.Err()
        }

        next := make([]string, 0, len(pending))
        for _, topicName := range pending {
            if ctx.Err() != nil {
                return ctx.Err()
            }

            leo, ok := tm.GetLEO(topicName)
            if !ok {
                continue
            }

            resp, err := cc.FetchBatch(ctx, &protocol.FetchBatchRequest{
                Topic:    topicName,
                Id:       consumerID,
                Offset:   leo,
                MaxCount: batchSize,
            })
            if err != nil {
                var rpcErr *protocol.RPCError
                if errors.As(err, &rpcErr) &&
                    rpcErr.Code == protocol.CodeReadOffset {
                    // Caught up — skip this topic.
                    continue
                }
                if protocol.ShouldReconnect(err) {
                    // Connection gone — abort this leader entirely;
                    // next tick will retry.
                    return fmt.Errorf(
                        "connection lost to leader %s: %w", leaderID, err)
                }
                // Transient error — keep topic for next round.
                next = append(next, topicName)
                continue
            }

            if len(resp.Entries) > 0 {
                values := make([][]byte, 0, len(resp.Entries))
                for _, entry := range resp.Entries {
                    if entry != nil {
                        values = append(values, entry.Value)
                    }
                }
                if err := tm.ApplyRecordBatch(topicName, values); err != nil {
                    next = append(next, topicName)
                    continue
                }
            }

            // If we got a full batch, there's likely more data — keep fetching.
            if uint32(len(resp.Entries)) >= batchSize {
                next = append(next, topicName)
            }
        }
        pending = next
    }
    return nil
}

Dedicated clients prevent data races

A key design choice: each ReplicateFromLeader call creates its own client.NewConsumerClient. This avoids a subtle bug — if all goroutines shared a cached client from NodeMetadata, calling SetReplicaNodeID on one goroutine would race with another. By creating a dedicated client per goroutine, there is no shared mutable state.

The pending-slice pattern for termination

Instead of using a fixed MaxReplicationRounds limit, the code uses a pending slice:

  1. Start with all topic names in pending.
  2. For each topic, fetch a batch. If fewer than batchSize entries are returned, the topic is caught up — drop it from pending.
  3. If a full batch was returned, there might be more data — keep the topic in pending for another round.
  4. Topics that hit transient errors also stay in pending for a retry.

This naturally terminates when all topics are caught up, and handles the common case (small lag) in a single pass.

Handling fetch errors gracefully

Three categories of errors during fetch:

Error type Handling
CodeReadOffset (offset out of range) Topic is caught up. Remove from pending.
ShouldReconnect (connection lost) Abort the entire leader goroutine. Next tick will reconnect.
Transient error Keep topic in pending for another round.

Writing fetched records in batches

The fetched entries are applied to the local log in a single batch write:

// topic/topic.go

func (tm *TopicManager) ApplyRecordBatch(topicName string, values [][]byte) error {
    if len(values) == 0 {
        return nil
    }
    tm.mu.RLock()
    t := tm.Topics[topicName]
    tm.mu.RUnlock()
    if t == nil {
        return nil
    }
    t.mu.RLock()
    l := t.Log
    t.mu.RUnlock()
    if l == nil {
        return nil
    }
    _, err := l.AppendBatch(values)
    return err
}

AppendBatch writes all records to the segment in a single call, which is significantly faster than calling Append N times (fewer syscalls, one lock acquisition).

Step 5: Track and update in-sync replica status

Recording replica LEO and ISR membership

When a follower fetches data, the leader records its LEO. This happens in the fetch handler (RPC side calls RecordReplicaLEOFromFetch):

// topic/topic.go

func (tm *TopicManager) RecordReplicaLEOFromFetch(
    ctx context.Context, topicName, replicaNodeID string, leo int64,
) error {
    tm.mu.Lock()
    t := tm.Topics[topicName]
    if t == nil {
        tm.mu.Unlock()
        return nil
    }
    leaderLEO := uint64(0)
    if t.Log != nil {
        leaderLEO = t.Log.LEO()
    }
    if t.Replicas == nil {
        t.Replicas = make(map[string]*ReplicaState)
    }
    rs := t.Replicas[replicaNodeID]
    if rs == nil {
        t.Replicas[replicaNodeID] = &ReplicaState{
            ReplicaNodeID: replicaNodeID,
            LEO:           leo,
            IsISR:         true,
        }
        rs = t.Replicas[replicaNodeID]
    } else {
        rs.LEO = leo
    }
    lagThreshold := tm.ISRLagThreshold
    if lagThreshold == 0 {
        lagThreshold = DefaultISRLagThreshold
    }
    var isr bool
    if leaderLEO > lagThreshold {
        isr = uint64(leo) >= leaderLEO-lagThreshold
    } else {
        isr = leo >= 0 // all replicas are in-sync for small topics
    }
    rs.IsISR = isr
    tm.mu.Unlock()
    tm.maybeAdvanceHW(t)
    if tm.coordinator == nil {
        return nil
    }
    return tm.coordinator.ApplyIsrUpdateEventInternal(topicName, replicaNodeID, isr)
}

Several things to note:

  1. New replicas start in ISR — when a ReplicaState is first created, IsISR is set to true.
  2. ISR threshold is configurable — defaults to DefaultISRLagThreshold (100 records).
  3. Small-topic optimization — when the leader LEO is smaller than the lag threshold, all replicas with non-negative LEO are considered in-sync. This prevents false ISR demotions when the topic has very few records.
  4. ISR changes propagated via RaftApplyIsrUpdateEventInternal submits a Raft log entry so all nodes agree on the ISR set.
  5. HW advanced after every fetchmaybeAdvanceHW runs after updating the replica LEO.

ISR membership computation logic

A replica is in the ISR if its LEO is within the threshold of the leader's LEO:

Leader LEO = 1500
ISR Threshold = 100

Replica A: LEO = 1480 → in ISR  (1480 >= 1500-100=1400)
Replica B: LEO = 1300 → NOT ISR (1300 < 1500-100=1400)

When a replica falls out of ISR, a Raft event updates the cluster state so all nodes agree on the ISR set.

Step 6: Advance the high watermark safely

HW = minimum offset across ISR

The high watermark advances when all ISR replicas have reached a given offset:

// topic/topic.go

func (tm *TopicManager) maybeAdvanceHW(t *Topic) {
    if t.Log == nil {
        return
    }
    minOffset := t.Log.LEO()
    for _, r := range t.Replicas {
        if r != nil && r.IsISR && uint64(r.LEO) < minOffset {
            minOffset = uint64(r.LEO)
        }
    }
    t.Log.SetHighWatermark(minOffset)
}

The formula: HW = min(leader LEO, ISR replicas' LEO). Since the leader's own LEO is always >= any follower's LEO, we use it as the starting value and take the minimum across ISR followers. Non-ISR replicas are excluded — a dead or lagging replica must not hold back consumer visibility.

Note that maybeAdvanceHW always calls SetHighWatermark, even if the value hasn't changed. This is safe because SetHighWatermark is an atomic store.

Step 7: Wait for replication on AckAll produces

Producer waiting for in-sync replicas

When a producer uses AckAll, the leader waits for all ISR replicas to catch up before acknowledging the write:

// topic/topic.go

func (tm *TopicManager) waitForAllFollowersToCatchUp(
    ctx context.Context, t *Topic, offset uint64,
) error {
    timeout := time.After(5 * time.Second)
    ticker := time.NewTicker(10 * time.Millisecond)
    defer ticker.Stop()

    requiredLEO := offset + 1

    for {
        allCaughtUp := true
        candidates := 0

        t.mu.RLock()
        useISR := false
        for _, r := range t.Replicas {
            if r != nil && r.IsISR {
                useISR = true
                break
            }
        }
        for _, replica := range t.Replicas {
            if replica == nil {
                continue
            }
            if useISR && !replica.IsISR {
                continue
            }
            candidates++
            if uint64(replica.LEO) < requiredLEO {
                allCaughtUp = false
                break
            }
        }
        t.mu.RUnlock()

        if candidates == 0 {
            return nil
        }
        if allCaughtUp {
            return nil
        }

        select {
        case <-ticker.C:
            continue
        case <-ctx.Done():
            return ctx.Err()
        case <-timeout:
            if t.Logger != nil {
                t.Logger.Warn("followers catch-up timeout",
                    zap.String("topic", t.Name),
                    zap.Uint64("required_offset", offset))
            }
            return errs.ErrTimeoutCatchUp
        }
    }
}

The key detail: requiredLEO = offset + 1. If the producer wrote to offset 5, the follower's LEO must reach 6 (meaning it has replicated through offset 5). The function polls every 10ms and times out after 5 seconds. If there are ISR replicas, only those are waited on. If no replicas exist at all (candidates == 0), the function returns immediately.

Step 8: Trace the complete replication cycle

End-to-end replication flow

Here is the complete cycle:

1. Producer appends to leader
   Leader: LEO goes from 100 → 101

2. Follower's replication thread wakes (every 1s)
   Follower → FetchBatch(topic, offset=100, maxCount=5000)

3. Leader's fetch handler:
   - Reads records starting at offset 100 using ReadUncommitted
   - Returns entries to follower
   - Calls RecordReplicaLEOFromFetch(topic, followerID, 100)

4. Leader's RecordReplicaLEOFromFetch:
   - Updates follower's LEO = 100
   - Checks ISR: follower is within threshold ✓
   - Calls maybeAdvanceHW → HW stays at 100 (follower LEO 100 < leader LEO 101)
   - Propagates ISR status via Raft

5. Follower applies records to local log via ApplyRecordBatch
   Follower: LEO goes from 100 → 101

6. Next fetch cycle:
   Follower → FetchBatch(topic, offset=101)

7. Leader's RecordReplicaLEOFromFetch:
   - Updates follower's LEO = 101
   - maybeAdvanceHW → HW = min(101, 101) = 101

8. Consumer can now read offset 100
   (HW=101 means offsets 0..100 are committed)

Step 9: Configure replication parameters

Important timing and sizing constants

Constant Value Purpose
DefaultReplicationBatchSize 5,000 Max records per fetch in replication
DefaultISRLagThreshold 100 Max LEO lag to stay in ISR
replicationTickInterval 1 second How often the replication thread wakes
AckAll timeout 5 seconds Max wait for replicas when acks=all
AckAll poll interval 10 ms How often waitForAllFollowersToCatchUp checks replica LEO

Summary

Component Purpose
Replication thread Runs on every node. Wakes every second. Launches one goroutine per leader.
Per-leader goroutines Each leader gets a dedicated ConsumerClient and goroutine. A slow leader does not block others.
Pending-slice pattern Topics are removed from pending once caught up. No fixed round limit.
Batch writes ApplyRecordBatch writes all fetched records in one AppendBatch call.
RecordReplicaLEO Leader records each follower's LEO on every fetch. Computes ISR membership.
ISR tracking Replica within 100 records of leader = in ISR. Changes propagated via Raft.
HW advancement HW = min(LEO across ISR). Consumers read only up to HW. Advances as replicas catch up.
AckAll Producer waits (up to 5s, polling every 10ms) for all ISR replicas to reach offset+1 before responding.

With replication complete, the next page builds the producer and consumer client libraries — the interface users interact with to write and read data.