Producer and Consumer APIs — Leader Discovery, Ack Modes, and Offset Management

Producer and Consumer APIs — Leader Discovery, Ack Modes, and Offset Management

In this section, you'll build client libraries that are the user-facing interface to the distributed log. You'll implement automatic leader discovery, reconnection on failure, ack modes for producers, and offset management for consumers. Code lives in client/ (client-side libraries), consumer/ (server-side offset tracking), and cmd/ (CLI programs).

Step 1: Implement leader discovery and bootstrap

Finding the topic leader

Before producing or consuming, the client must find which node leads the target topic. The bootstrap logic tries each known broker address until one responds:

// client/bootstrap.go
package client

var errNoConnection = errors.New("could not connect to any address")

func TryAddrs(ctx context.Context, addrs []string,
    fn func(*RemoteClient) (string, error),
) (string, error) {
    var lastErr error
    for _, addr := range addrs {
        addr = strings.TrimSpace(addr)
        if addr == "" {
            continue
        }
        c, err := NewRemoteClient(addr)
        if err != nil {
            lastErr = err
            continue
        }
        result, err := fn(c)
        _ = c.Close()
        if err == nil && result != "" {
            return result, nil
        }
        lastErr = err
        if err != nil && ShouldReconnect(err) {
            continue
        }
        return "", err
    }
    if lastErr != nil {
        return "", lastErr
    }
    return "", errNoConnection
}

TryAddrs takes a callback fn that receives a RemoteClient and returns a result string plus error. It iterates addresses, calling fn on each. If the callback returns a non-empty result and no error, that result is returned. If the error is reconnect-worthy (ShouldReconnect), it tries the next address. Otherwise, it returns the error immediately.

To find the topic leader, the CLI programs use TryAddrs with FindTopicLeader:

// Used by both producer and consumer CLI
findLeader := func(ctx context.Context) (string, error) {
    return client.TryAddrs(ctx, addrList(), func(c *client.RemoteClient) (string, error) {
        resp, err := c.FindTopicLeader(ctx, &protocol.FindTopicLeaderRequest{
            Topic: topic,
        })
        if err != nil {
            return "", err
        }
        if resp.LeaderAddr == "" {
            return "", fmt.Errorf("empty leader address returned for topic %s", topic)
        }
        return resp.LeaderAddr, nil
    })
}

Any node in the cluster can answer FindTopicLeader — it reads from its local metadata (populated by Raft) and returns the leader's RPC address. The client then connects directly to the leader.

Step 2: Handle reconnection on leader change

The ShouldReconnect helper

The client package re-exports the protocol-level ShouldReconnect for convenience:

// client/reconnect.go

func ShouldReconnect(err error) bool {
    return protocol.ShouldReconnect(err)
}

This returns true when the error indicates a leader change or connection failure, signaling the caller to rediscover the leader and reconnect.

Step 3: Build the producer client

Producer connection and initialization

The producer connects to the topic leader and sends produce requests:

// client/producer.go

type ProducerClient struct {
    tc *transport.TransportClient
}

func NewProducerClient(addr string) (*ProducerClient, error) {
    tc, err := transport.Dial(addr)
    if err != nil {
        return nil, err
    }
    return &ProducerClient{tc: tc}, nil
}

func (c *ProducerClient) Close() error {
    return c.tc.Close()
}

Single record produce

func (c *ProducerClient) Produce(ctx context.Context,
    req *protocol.ProduceRequest,
) (*protocol.ProduceResponse, error) {
    resp, err := c.tc.Call(*req)
    if err != nil {
        return nil, err
    }
    r := resp.(protocol.ProduceResponse)
    return &r, nil
}

The producer dereferences the request to pass a value (not a pointer) to Call, which encodes it via the Codec. The response is type-asserted from interface{} to protocol.ProduceResponse.

Batch produce for throughput

For higher throughput, send multiple values in one request:

func (c *ProducerClient) ProduceBatch(ctx context.Context,
    req *protocol.ProduceBatchRequest,
) (*protocol.ProduceBatchResponse, error) {
    resp, err := c.tc.Call(*req)
    if err != nil {
        return nil, err
    }
    r := resp.(protocol.ProduceBatchResponse)
    return &r, nil
}

Docker hostname fallback for local development

Both NewProducerClient and NewConsumerClient include a fallback for running the client outside Docker when the cluster is inside Docker. If transport.Dial fails with a "no such host" error (e.g., node1:9094 is only resolvable inside the Docker network), the client retries with 127.0.0.1:<port>:

// Inside NewProducerClient (and NewConsumerClient):
if strings.Contains(err.Error(), "no such host") {
    if host, port, splitErr := net.SplitHostPort(addr); splitErr == nil &&
        strings.HasPrefix(host, "node") && port != "" {
        fallback := net.JoinHostPort("127.0.0.1", port)
        if tc2, err2 := transport.Dial(fallback); err2 == nil {
            return &ProducerClient{tc: tc2}, nil
        }
    }
}

This is a practical convenience for local development with Docker Compose.

Step 4: Implement producer CLI with auto-reconnect

Reading from stdin and recovering from failures

The CLI producer reads lines from stdin and produces each to the topic. On leader change, it rediscovers the leader with retries:

// cmd/producer/main.go (key logic from connect command)

scanner := bufio.NewScanner(os.Stdin)
for scanner.Scan() {
    line := strings.TrimRight(scanner.Text(), "\r\n")
    if line == "" {
        continue
    }

    for {
        msgCtx, cancelMsg := context.WithTimeout(ctx, 10*time.Second)
        resp, err := producerClient.Produce(msgCtx, &protocol.ProduceRequest{
            Topic: topic,
            Value: []byte(line),
            Acks:  ackMode,
        })
        cancelMsg()

        if err == nil {
            fmt.Printf("offset=%d\n", resp.Offset)
            break
        }

        if client.ShouldReconnect(err) {
            fmt.Fprintln(os.Stderr, "reconnecting (leader change or connection issue)...")
            _ = producerClient.Close()

            var newLeaderAddr string
            for attempt := 0; attempt < 10; attempt++ {
                leaderCtx, cancelLeader := context.WithTimeout(ctx, 10*time.Second)
                newLeaderAddr, err = findLeader(leaderCtx)
                cancelLeader()
                if err == nil {
                    break
                }
                time.Sleep(time.Duration(attempt+1) * 500 * time.Millisecond)
            }
            if err != nil {
                return fmt.Errorf("failed to find new leader after retries: %w", err)
            }

            producerClient, err = client.NewProducerClient(newLeaderAddr)
            if err != nil {
                return err
            }
            continue // retry the produce
        }

        return err
    }
}

Reconnection logic: When the server returns a NotTopicLeader error (or the connection fails), ShouldReconnect returns true. The producer closes the old connection, retries leader discovery up to 10 times with exponential backoff (500ms, 1s, 1.5s, ...), creates a new ProducerClient, and retries the failed produce. The inner for loop ensures the same line is retried after reconnection.

Step 5: Build the consumer client

Consumer connection and replica mode

The consumer connects to the topic leader and fetches records by offset:

// client/consumer.go

type ConsumerClient struct {
    tc            *transport.TransportClient
    ReplicaNodeID string // when set, Fetch uses read-uncommitted (replication)
}

func NewConsumerClient(addr string) (*ConsumerClient, error) {
    tc, err := transport.Dial(addr)
    if err != nil {
        return nil, err
    }
    return &ConsumerClient{tc: tc}, nil
}

func (c *ConsumerClient) Close() error {
    return c.tc.Close()
}

func (c *ConsumerClient) SetReplicaNodeID(id string) {
    c.ReplicaNodeID = id
}

The ReplicaNodeID field is exported — it serves double duty: regular consumers leave it empty; the replication thread sets it so the leader knows to serve uncommitted data and record the replica's LEO.

Single and batch fetch

func (c *ConsumerClient) Fetch(ctx context.Context,
    req *protocol.FetchRequest,
) (*protocol.FetchResponse, error) {
    reqCopy := *req
    if c.ReplicaNodeID != "" {
        reqCopy.ReplicaNodeID = c.ReplicaNodeID
    }
    resp, err := c.tc.Call(reqCopy)
    if err != nil {
        return nil, err
    }
    r := resp.(protocol.FetchResponse)
    return &r, nil
}

func (c *ConsumerClient) FetchBatch(ctx context.Context,
    req *protocol.FetchBatchRequest,
) (*protocol.FetchBatchResponse, error) {
    reqCopy := *req
    if c.ReplicaNodeID != "" {
        reqCopy.ReplicaNodeID = c.ReplicaNodeID
    }
    resp, err := c.tc.Call(reqCopy)
    if err != nil {
        return nil, err
    }
    r := resp.(protocol.FetchBatchResponse)
    return &r, nil
}

Both Fetch and FetchBatch copy the request before mutating it (setting ReplicaNodeID). This avoids modifying the caller's struct.

Managing consumer position via offset commits

Consumers can persist their position on the server so they can resume after a restart:

func (c *ConsumerClient) CommitOffset(ctx context.Context,
    req *protocol.CommitOffsetRequest,
) (*protocol.CommitOffsetResponse, error) {
    resp, err := c.tc.Call(*req)
    if err != nil {
        return nil, err
    }
    r := resp.(protocol.CommitOffsetResponse)
    return &r, nil
}

func (c *ConsumerClient) FetchOffset(ctx context.Context,
    req *protocol.FetchOffsetRequest,
) (*protocol.FetchOffsetResponse, error) {
    resp, err := c.tc.Call(*req)
    if err != nil {
        return nil, err
    }
    r := resp.(protocol.FetchOffsetResponse)
    return &r, nil
}

Step 6: Implement consumer CLI with auto-reconnect

Streaming messages and committing offsets

The consumer CLI streams messages from the topic, committing offsets after each message:

// cmd/consumer/main.go (key logic from connect command)

currentOffset := startOffset
pollInterval := 500 * time.Millisecond

for {
    resp, err := consumerClient.Fetch(ctx, &protocol.FetchRequest{
        Id:     id,
        Topic:  topic,
        Offset: currentOffset,
    })
    if err != nil {
        if client.ShouldReconnect(err) {
            // Re-resolve leader with up to 10 retries
            _ = consumerClient.Close()
            var newAddr string
            for attempt := 0; attempt < 10; attempt++ {
                leaderCtx, leaderCancel := context.WithTimeout(ctx, 5*time.Second)
                newAddr, err = findLeader(leaderCtx)
                leaderCancel()
                if err == nil {
                    break
                }
                time.Sleep(time.Duration(attempt+1) * 500 * time.Millisecond)
            }
            if err != nil {
                return fmt.Errorf("failed to find new leader after retries: %w", err)
            }
            consumerClient, err = client.NewConsumerClient(newAddr)
            if err != nil {
                return err
            }
            continue
        }
        // Offset out of range means we've caught up; back off and poll.
        var rpcErr *protocol.RPCError
        if errors.As(err, &rpcErr) && rpcErr.Code == protocol.CodeReadOffset {
            time.Sleep(pollInterval)
            continue
        }
        return err
    }
    if resp.Entry == nil {
        time.Sleep(pollInterval)
        continue
    }

    fmt.Printf("%d\t%s\n", resp.Entry.Offset, string(resp.Entry.Value))
    currentOffset = resp.Entry.Offset + 1

    // Commit offset after each message
    commitCtx, commitCancel := context.WithTimeout(ctx, 5*time.Second)
    if _, err := consumerClient.CommitOffset(commitCtx, &protocol.CommitOffsetRequest{
        Id:     id,
        Topic:  topic,
        Offset: currentOffset,
    }); err != nil {
        fmt.Fprintf(os.Stderr, "warning: commit offset %d failed: %v\n",
            currentOffset, err)
    }
    commitCancel()
}

Consumer startup logic: The consumer resolves the starting offset in this priority order:

  1. --from-beginning flag: start at offset 0.
  2. --offset flag explicitly set: start at that offset.
  3. Default: fetch the last committed offset from the server via FetchOffset. If found, resume from there. Otherwise, start from 0.

Step 7: Implement server-side offset management

Persisting and recovering consumer offsets

The server persists consumer offsets in a local append-only log so they survive restarts:

// consumer/consumer.go
package consumer

type ConsumerManager struct {
    mu          sync.RWMutex
    offsetCache map[string]map[string]uint64
    offsetLog   *log.Log
    recoverOnce sync.Once
    recoverErr  error
}

The offsetCache is a nested map: consumerID → topic → offset. The offsetLog persists every commit in CSV format so offsets survive restarts. recoverOnce ensures recovery from the log runs exactly once.

Initializing the ConsumerManager

func NewConsumerManager(baseDir string) (*ConsumerManager, error) {
    offsetLog, err := log.NewLog(
        filepath.Join(baseDir, "__consumer_offsets__.log"))
    if err != nil {
        return nil, err
    }
    cm := &ConsumerManager{
        offsetCache: make(map[string]map[string]uint64),
        offsetLog:   offsetLog,
    }
    // Recover offsets eagerly at startup instead of per-request.
    if err := cm.Recover(); err != nil {
        return nil, fmt.Errorf("consumer offset recovery: %w", err)
    }
    return cm, nil
}

The offset log is stored at __consumer_offsets__.log inside the data directory. Recovery runs eagerly at startup — by the time NewConsumerManager returns, the in-memory cache is populated.

Recording offset commits to persistent log

func (c *ConsumerManager) CommitOffset(id string, topic string,
    offset uint64,
) error {
    c.mu.Lock()
    defer c.mu.Unlock()
    if _, ok := c.offsetCache[id]; !ok {
        c.offsetCache[id] = make(map[string]uint64)
    }
    c.offsetCache[id][topic] = offset
    _, err := c.offsetLog.Append(
        []byte(fmt.Sprintf("%s,%s,%d", id, topic, offset)))
    if err != nil {
        return err
    }
    return nil
}

Each commit writes a CSV line "consumerID,topic,offset" to the offset log. The in-memory cache is updated under the write lock so reads always see the latest offset.

Retrieving the last committed offset

func (c *ConsumerManager) GetOffset(id string, topic string) (uint64, error) {
    c.mu.RLock()
    defer c.mu.RUnlock()
    topicMap, ok := c.offsetCache[id]
    if !ok {
        return 0, ErrOffsetNotFoundForID(id, topic)
    }
    return topicMap[topic], nil
}

Recovering offsets from append-only log

When the server restarts, replay the offset log to rebuild the cache:

func (c *ConsumerManager) Recover() error {
    c.recoverOnce.Do(func() {
        c.mu.Lock()
        defer c.mu.Unlock()
        offset := c.offsetLog.LowestOffset()
        highestOffset := c.offsetLog.HighestOffset()
        for ; offset < highestOffset; offset++ {
            data, err := c.offsetLog.Read(offset)
            if err != nil {
                c.recoverErr = err
                return
            }
            parts := strings.Split(string(data), ",")
            if len(parts) != 3 {
                continue
            }
            off, err := strconv.ParseUint(parts[2], 10, 64)
            if err != nil {
                c.recoverErr = err
                return
            }
            if _, ok := c.offsetCache[parts[0]]; !ok {
                c.offsetCache[parts[0]] = make(map[string]uint64)
            }
            c.offsetCache[parts[0]][parts[1]] = off
        }
    })
    return c.recoverErr
}

recoverOnce.Do ensures that even if Recover is called multiple times, the log replay happens only once. Since the offset log is append-only and each commit overwrites the previous offset in the cache, only the last commit for each (consumerID, topic) pair survives — exactly what we want.

Step 8: Build a remote client for admin operations

Generic admin operations wrapper

The RemoteClient wraps the transport client for cluster-level operations:

// client/rpc.go

type RemoteClient struct {
    tc *transport.TransportClient
}

func NewRemoteClient(addr string) (*RemoteClient, error) {
    tc, err := transport.Dial(addr)
    if err != nil {
        return nil, err
    }
    return &RemoteClient{tc: tc}, nil
}

func (c *RemoteClient) Close() error {
    return c.tc.Close()
}

func (c *RemoteClient) CreateTopic(ctx context.Context,
    req *protocol.CreateTopicRequest,
) (*protocol.CreateTopicResponse, error) {
    resp, err := c.tc.Call(*req)
    if err != nil {
        return nil, err
    }
    r := resp.(protocol.CreateTopicResponse)
    return &r, nil
}

func (c *RemoteClient) DeleteTopic(ctx context.Context,
    req *protocol.DeleteTopicRequest,
) (*protocol.DeleteTopicResponse, error) {
    resp, err := c.tc.Call(*req)
    if err != nil {
        return nil, err
    }
    r := resp.(protocol.DeleteTopicResponse)
    return &r, nil
}

func (c *RemoteClient) FindTopicLeader(ctx context.Context,
    req *protocol.FindTopicLeaderRequest,
) (*protocol.FindTopicLeaderResponse, error) {
    resp, err := c.tc.Call(*req)
    if err != nil {
        return nil, err
    }
    r := resp.(protocol.FindTopicLeaderResponse)
    return &r, nil
}

func (c *RemoteClient) FindRaftLeader(ctx context.Context,
    req *protocol.FindRaftLeaderRequest,
) (*protocol.FindRaftLeaderResponse, error) {
    resp, err := c.tc.Call(*req)
    if err != nil {
        return nil, err
    }
    r := resp.(protocol.FindRaftLeaderResponse)
    return &r, nil
}

func (c *RemoteClient) ListTopics(ctx context.Context,
    req *protocol.ListTopicsRequest,
) (*protocol.ListTopicsResponse, error) {
    resp, err := c.tc.Call(*req)
    if err != nil {
        return nil, err
    }
    r := resp.(protocol.ListTopicsResponse)
    return &r, nil
}

All methods follow the same pattern: dereference the request, call tc.Call, type-assert the response. Topic management commands (CreateTopic, DeleteTopic) must be sent to the Raft leader (not the topic leader), so the CLI discovers the Raft leader first via FindRaftLeader.

Step 9: Understand producer ack modes

Choosing ack mode for your use case

Mode Producer experience What happens
AckNone (0) Fastest. Fire and forget. Leader appends, responds immediately. No durability guarantee.
AckLeader (1) Moderate latency. Default. Leader appends and responds after local write. Data can be lost if leader crashes before replication.
AckAll (2) Highest latency, highest durability. Leader appends, waits for all ISR replicas to catch up (up to 5s timeout), then responds. Data survives any single node failure.

Choose based on your requirements:

  • Logging/metrics → AckNone (speed over durability)
  • General messaging → AckLeader (good balance)
  • Financial transactions → AckAll (no data loss)

Summary

Component Purpose
TryAddrs Bootstrap by trying each broker address. Returns first successful result.
ShouldReconnect Wrapper around protocol.ShouldReconnect. Signals leader change or connection failure.
ProducerClient Produce() and ProduceBatch() with ack modes. All methods take context.Context.
ConsumerClient Fetch(), FetchBatch(), CommitOffset(), FetchOffset(). ReplicaNodeID for replication mode.
RemoteClient Admin operations: CreateTopic, DeleteTopic, FindTopicLeader, FindRaftLeader, ListTopics.
ConsumerManager Server-side offset tracking. Persists to append-only __consumer_offsets__.log. Recovers once at startup.
Auto-reconnect Both producer and consumer CLI retry leader discovery up to 10 times with exponential backoff.

With producer and consumer APIs complete, the final page shows how to run the entire cluster — Docker setup, deployment, and end-to-end testing.