Producer and Consumer APIs — Leader Discovery, Ack Modes, and Offset Management
In this section, you'll build client libraries that are the user-facing interface to the distributed log. You'll implement automatic leader discovery, reconnection on failure, ack modes for producers, and offset management for consumers. Code lives in client/ (client-side libraries), consumer/ (server-side offset tracking), and cmd/ (CLI programs).
Step 1: Implement leader discovery and bootstrap
Finding the topic leader
Before producing or consuming, the client must find which node leads the target topic. The bootstrap logic tries each known broker address until one responds:
// client/bootstrap.go
package client
var errNoConnection = errors.New("could not connect to any address")
func TryAddrs(ctx context.Context, addrs []string,
fn func(*RemoteClient) (string, error),
) (string, error) {
var lastErr error
for _, addr := range addrs {
addr = strings.TrimSpace(addr)
if addr == "" {
continue
}
c, err := NewRemoteClient(addr)
if err != nil {
lastErr = err
continue
}
result, err := fn(c)
_ = c.Close()
if err == nil && result != "" {
return result, nil
}
lastErr = err
if err != nil && ShouldReconnect(err) {
continue
}
return "", err
}
if lastErr != nil {
return "", lastErr
}
return "", errNoConnection
}
TryAddrs takes a callback fn that receives a RemoteClient and returns a result string plus error. It iterates addresses, calling fn on each. If the callback returns a non-empty result and no error, that result is returned. If the error is reconnect-worthy (ShouldReconnect), it tries the next address. Otherwise, it returns the error immediately.
To find the topic leader, the CLI programs use TryAddrs with FindTopicLeader:
// Used by both producer and consumer CLI
findLeader := func(ctx context.Context) (string, error) {
return client.TryAddrs(ctx, addrList(), func(c *client.RemoteClient) (string, error) {
resp, err := c.FindTopicLeader(ctx, &protocol.FindTopicLeaderRequest{
Topic: topic,
})
if err != nil {
return "", err
}
if resp.LeaderAddr == "" {
return "", fmt.Errorf("empty leader address returned for topic %s", topic)
}
return resp.LeaderAddr, nil
})
}
Any node in the cluster can answer FindTopicLeader — it reads from its local metadata (populated by Raft) and returns the leader's RPC address. The client then connects directly to the leader.
Step 2: Handle reconnection on leader change
The ShouldReconnect helper
The client package re-exports the protocol-level ShouldReconnect for convenience:
// client/reconnect.go
func ShouldReconnect(err error) bool {
return protocol.ShouldReconnect(err)
}
This returns true when the error indicates a leader change or connection failure, signaling the caller to rediscover the leader and reconnect.
Step 3: Build the producer client
Producer connection and initialization
The producer connects to the topic leader and sends produce requests:
// client/producer.go
type ProducerClient struct {
tc *transport.TransportClient
}
func NewProducerClient(addr string) (*ProducerClient, error) {
tc, err := transport.Dial(addr)
if err != nil {
return nil, err
}
return &ProducerClient{tc: tc}, nil
}
func (c *ProducerClient) Close() error {
return c.tc.Close()
}
Single record produce
func (c *ProducerClient) Produce(ctx context.Context,
req *protocol.ProduceRequest,
) (*protocol.ProduceResponse, error) {
resp, err := c.tc.Call(*req)
if err != nil {
return nil, err
}
r := resp.(protocol.ProduceResponse)
return &r, nil
}
The producer dereferences the request to pass a value (not a pointer) to Call, which encodes it via the Codec. The response is type-asserted from interface{} to protocol.ProduceResponse.
Batch produce for throughput
For higher throughput, send multiple values in one request:
func (c *ProducerClient) ProduceBatch(ctx context.Context,
req *protocol.ProduceBatchRequest,
) (*protocol.ProduceBatchResponse, error) {
resp, err := c.tc.Call(*req)
if err != nil {
return nil, err
}
r := resp.(protocol.ProduceBatchResponse)
return &r, nil
}
Docker hostname fallback for local development
Both NewProducerClient and NewConsumerClient include a fallback for running the client outside Docker when the cluster is inside Docker. If transport.Dial fails with a "no such host" error (e.g., node1:9094 is only resolvable inside the Docker network), the client retries with 127.0.0.1:<port>:
// Inside NewProducerClient (and NewConsumerClient):
if strings.Contains(err.Error(), "no such host") {
if host, port, splitErr := net.SplitHostPort(addr); splitErr == nil &&
strings.HasPrefix(host, "node") && port != "" {
fallback := net.JoinHostPort("127.0.0.1", port)
if tc2, err2 := transport.Dial(fallback); err2 == nil {
return &ProducerClient{tc: tc2}, nil
}
}
}
This is a practical convenience for local development with Docker Compose.
Step 4: Implement producer CLI with auto-reconnect
Reading from stdin and recovering from failures
The CLI producer reads lines from stdin and produces each to the topic. On leader change, it rediscovers the leader with retries:
// cmd/producer/main.go (key logic from connect command)
scanner := bufio.NewScanner(os.Stdin)
for scanner.Scan() {
line := strings.TrimRight(scanner.Text(), "\r\n")
if line == "" {
continue
}
for {
msgCtx, cancelMsg := context.WithTimeout(ctx, 10*time.Second)
resp, err := producerClient.Produce(msgCtx, &protocol.ProduceRequest{
Topic: topic,
Value: []byte(line),
Acks: ackMode,
})
cancelMsg()
if err == nil {
fmt.Printf("offset=%d\n", resp.Offset)
break
}
if client.ShouldReconnect(err) {
fmt.Fprintln(os.Stderr, "reconnecting (leader change or connection issue)...")
_ = producerClient.Close()
var newLeaderAddr string
for attempt := 0; attempt < 10; attempt++ {
leaderCtx, cancelLeader := context.WithTimeout(ctx, 10*time.Second)
newLeaderAddr, err = findLeader(leaderCtx)
cancelLeader()
if err == nil {
break
}
time.Sleep(time.Duration(attempt+1) * 500 * time.Millisecond)
}
if err != nil {
return fmt.Errorf("failed to find new leader after retries: %w", err)
}
producerClient, err = client.NewProducerClient(newLeaderAddr)
if err != nil {
return err
}
continue // retry the produce
}
return err
}
}
Reconnection logic: When the server returns a NotTopicLeader error (or the connection fails), ShouldReconnect returns true. The producer closes the old connection, retries leader discovery up to 10 times with exponential backoff (500ms, 1s, 1.5s, ...), creates a new ProducerClient, and retries the failed produce. The inner for loop ensures the same line is retried after reconnection.
Step 5: Build the consumer client
Consumer connection and replica mode
The consumer connects to the topic leader and fetches records by offset:
// client/consumer.go
type ConsumerClient struct {
tc *transport.TransportClient
ReplicaNodeID string // when set, Fetch uses read-uncommitted (replication)
}
func NewConsumerClient(addr string) (*ConsumerClient, error) {
tc, err := transport.Dial(addr)
if err != nil {
return nil, err
}
return &ConsumerClient{tc: tc}, nil
}
func (c *ConsumerClient) Close() error {
return c.tc.Close()
}
func (c *ConsumerClient) SetReplicaNodeID(id string) {
c.ReplicaNodeID = id
}
The ReplicaNodeID field is exported — it serves double duty: regular consumers leave it empty; the replication thread sets it so the leader knows to serve uncommitted data and record the replica's LEO.
Single and batch fetch
func (c *ConsumerClient) Fetch(ctx context.Context,
req *protocol.FetchRequest,
) (*protocol.FetchResponse, error) {
reqCopy := *req
if c.ReplicaNodeID != "" {
reqCopy.ReplicaNodeID = c.ReplicaNodeID
}
resp, err := c.tc.Call(reqCopy)
if err != nil {
return nil, err
}
r := resp.(protocol.FetchResponse)
return &r, nil
}
func (c *ConsumerClient) FetchBatch(ctx context.Context,
req *protocol.FetchBatchRequest,
) (*protocol.FetchBatchResponse, error) {
reqCopy := *req
if c.ReplicaNodeID != "" {
reqCopy.ReplicaNodeID = c.ReplicaNodeID
}
resp, err := c.tc.Call(reqCopy)
if err != nil {
return nil, err
}
r := resp.(protocol.FetchBatchResponse)
return &r, nil
}
Both Fetch and FetchBatch copy the request before mutating it (setting ReplicaNodeID). This avoids modifying the caller's struct.
Managing consumer position via offset commits
Consumers can persist their position on the server so they can resume after a restart:
func (c *ConsumerClient) CommitOffset(ctx context.Context,
req *protocol.CommitOffsetRequest,
) (*protocol.CommitOffsetResponse, error) {
resp, err := c.tc.Call(*req)
if err != nil {
return nil, err
}
r := resp.(protocol.CommitOffsetResponse)
return &r, nil
}
func (c *ConsumerClient) FetchOffset(ctx context.Context,
req *protocol.FetchOffsetRequest,
) (*protocol.FetchOffsetResponse, error) {
resp, err := c.tc.Call(*req)
if err != nil {
return nil, err
}
r := resp.(protocol.FetchOffsetResponse)
return &r, nil
}
Step 6: Implement consumer CLI with auto-reconnect
Streaming messages and committing offsets
The consumer CLI streams messages from the topic, committing offsets after each message:
// cmd/consumer/main.go (key logic from connect command)
currentOffset := startOffset
pollInterval := 500 * time.Millisecond
for {
resp, err := consumerClient.Fetch(ctx, &protocol.FetchRequest{
Id: id,
Topic: topic,
Offset: currentOffset,
})
if err != nil {
if client.ShouldReconnect(err) {
// Re-resolve leader with up to 10 retries
_ = consumerClient.Close()
var newAddr string
for attempt := 0; attempt < 10; attempt++ {
leaderCtx, leaderCancel := context.WithTimeout(ctx, 5*time.Second)
newAddr, err = findLeader(leaderCtx)
leaderCancel()
if err == nil {
break
}
time.Sleep(time.Duration(attempt+1) * 500 * time.Millisecond)
}
if err != nil {
return fmt.Errorf("failed to find new leader after retries: %w", err)
}
consumerClient, err = client.NewConsumerClient(newAddr)
if err != nil {
return err
}
continue
}
// Offset out of range means we've caught up; back off and poll.
var rpcErr *protocol.RPCError
if errors.As(err, &rpcErr) && rpcErr.Code == protocol.CodeReadOffset {
time.Sleep(pollInterval)
continue
}
return err
}
if resp.Entry == nil {
time.Sleep(pollInterval)
continue
}
fmt.Printf("%d\t%s\n", resp.Entry.Offset, string(resp.Entry.Value))
currentOffset = resp.Entry.Offset + 1
// Commit offset after each message
commitCtx, commitCancel := context.WithTimeout(ctx, 5*time.Second)
if _, err := consumerClient.CommitOffset(commitCtx, &protocol.CommitOffsetRequest{
Id: id,
Topic: topic,
Offset: currentOffset,
}); err != nil {
fmt.Fprintf(os.Stderr, "warning: commit offset %d failed: %v\n",
currentOffset, err)
}
commitCancel()
}
Consumer startup logic: The consumer resolves the starting offset in this priority order:
--from-beginningflag: start at offset 0.--offsetflag explicitly set: start at that offset.- Default: fetch the last committed offset from the server via
FetchOffset. If found, resume from there. Otherwise, start from 0.
Step 7: Implement server-side offset management
Persisting and recovering consumer offsets
The server persists consumer offsets in a local append-only log so they survive restarts:
// consumer/consumer.go
package consumer
type ConsumerManager struct {
mu sync.RWMutex
offsetCache map[string]map[string]uint64
offsetLog *log.Log
recoverOnce sync.Once
recoverErr error
}
The offsetCache is a nested map: consumerID → topic → offset. The offsetLog persists every commit in CSV format so offsets survive restarts. recoverOnce ensures recovery from the log runs exactly once.
Initializing the ConsumerManager
func NewConsumerManager(baseDir string) (*ConsumerManager, error) {
offsetLog, err := log.NewLog(
filepath.Join(baseDir, "__consumer_offsets__.log"))
if err != nil {
return nil, err
}
cm := &ConsumerManager{
offsetCache: make(map[string]map[string]uint64),
offsetLog: offsetLog,
}
// Recover offsets eagerly at startup instead of per-request.
if err := cm.Recover(); err != nil {
return nil, fmt.Errorf("consumer offset recovery: %w", err)
}
return cm, nil
}
The offset log is stored at __consumer_offsets__.log inside the data directory. Recovery runs eagerly at startup — by the time NewConsumerManager returns, the in-memory cache is populated.
Recording offset commits to persistent log
func (c *ConsumerManager) CommitOffset(id string, topic string,
offset uint64,
) error {
c.mu.Lock()
defer c.mu.Unlock()
if _, ok := c.offsetCache[id]; !ok {
c.offsetCache[id] = make(map[string]uint64)
}
c.offsetCache[id][topic] = offset
_, err := c.offsetLog.Append(
[]byte(fmt.Sprintf("%s,%s,%d", id, topic, offset)))
if err != nil {
return err
}
return nil
}
Each commit writes a CSV line "consumerID,topic,offset" to the offset log. The in-memory cache is updated under the write lock so reads always see the latest offset.
Retrieving the last committed offset
func (c *ConsumerManager) GetOffset(id string, topic string) (uint64, error) {
c.mu.RLock()
defer c.mu.RUnlock()
topicMap, ok := c.offsetCache[id]
if !ok {
return 0, ErrOffsetNotFoundForID(id, topic)
}
return topicMap[topic], nil
}
Recovering offsets from append-only log
When the server restarts, replay the offset log to rebuild the cache:
func (c *ConsumerManager) Recover() error {
c.recoverOnce.Do(func() {
c.mu.Lock()
defer c.mu.Unlock()
offset := c.offsetLog.LowestOffset()
highestOffset := c.offsetLog.HighestOffset()
for ; offset < highestOffset; offset++ {
data, err := c.offsetLog.Read(offset)
if err != nil {
c.recoverErr = err
return
}
parts := strings.Split(string(data), ",")
if len(parts) != 3 {
continue
}
off, err := strconv.ParseUint(parts[2], 10, 64)
if err != nil {
c.recoverErr = err
return
}
if _, ok := c.offsetCache[parts[0]]; !ok {
c.offsetCache[parts[0]] = make(map[string]uint64)
}
c.offsetCache[parts[0]][parts[1]] = off
}
})
return c.recoverErr
}
recoverOnce.Do ensures that even if Recover is called multiple times, the log replay happens only once. Since the offset log is append-only and each commit overwrites the previous offset in the cache, only the last commit for each (consumerID, topic) pair survives — exactly what we want.
Step 8: Build a remote client for admin operations
Generic admin operations wrapper
The RemoteClient wraps the transport client for cluster-level operations:
// client/rpc.go
type RemoteClient struct {
tc *transport.TransportClient
}
func NewRemoteClient(addr string) (*RemoteClient, error) {
tc, err := transport.Dial(addr)
if err != nil {
return nil, err
}
return &RemoteClient{tc: tc}, nil
}
func (c *RemoteClient) Close() error {
return c.tc.Close()
}
func (c *RemoteClient) CreateTopic(ctx context.Context,
req *protocol.CreateTopicRequest,
) (*protocol.CreateTopicResponse, error) {
resp, err := c.tc.Call(*req)
if err != nil {
return nil, err
}
r := resp.(protocol.CreateTopicResponse)
return &r, nil
}
func (c *RemoteClient) DeleteTopic(ctx context.Context,
req *protocol.DeleteTopicRequest,
) (*protocol.DeleteTopicResponse, error) {
resp, err := c.tc.Call(*req)
if err != nil {
return nil, err
}
r := resp.(protocol.DeleteTopicResponse)
return &r, nil
}
func (c *RemoteClient) FindTopicLeader(ctx context.Context,
req *protocol.FindTopicLeaderRequest,
) (*protocol.FindTopicLeaderResponse, error) {
resp, err := c.tc.Call(*req)
if err != nil {
return nil, err
}
r := resp.(protocol.FindTopicLeaderResponse)
return &r, nil
}
func (c *RemoteClient) FindRaftLeader(ctx context.Context,
req *protocol.FindRaftLeaderRequest,
) (*protocol.FindRaftLeaderResponse, error) {
resp, err := c.tc.Call(*req)
if err != nil {
return nil, err
}
r := resp.(protocol.FindRaftLeaderResponse)
return &r, nil
}
func (c *RemoteClient) ListTopics(ctx context.Context,
req *protocol.ListTopicsRequest,
) (*protocol.ListTopicsResponse, error) {
resp, err := c.tc.Call(*req)
if err != nil {
return nil, err
}
r := resp.(protocol.ListTopicsResponse)
return &r, nil
}
All methods follow the same pattern: dereference the request, call tc.Call, type-assert the response. Topic management commands (CreateTopic, DeleteTopic) must be sent to the Raft leader (not the topic leader), so the CLI discovers the Raft leader first via FindRaftLeader.
Step 9: Understand producer ack modes
Choosing ack mode for your use case
| Mode | Producer experience | What happens |
|---|---|---|
| AckNone (0) | Fastest. Fire and forget. | Leader appends, responds immediately. No durability guarantee. |
| AckLeader (1) | Moderate latency. Default. | Leader appends and responds after local write. Data can be lost if leader crashes before replication. |
| AckAll (2) | Highest latency, highest durability. | Leader appends, waits for all ISR replicas to catch up (up to 5s timeout), then responds. Data survives any single node failure. |
Choose based on your requirements:
- Logging/metrics → AckNone (speed over durability)
- General messaging → AckLeader (good balance)
- Financial transactions → AckAll (no data loss)
Summary
| Component | Purpose |
|---|---|
| TryAddrs | Bootstrap by trying each broker address. Returns first successful result. |
| ShouldReconnect | Wrapper around protocol.ShouldReconnect. Signals leader change or connection failure. |
| ProducerClient | Produce() and ProduceBatch() with ack modes. All methods take context.Context. |
| ConsumerClient | Fetch(), FetchBatch(), CommitOffset(), FetchOffset(). ReplicaNodeID for replication mode. |
| RemoteClient | Admin operations: CreateTopic, DeleteTopic, FindTopicLeader, FindRaftLeader, ListTopics. |
| ConsumerManager | Server-side offset tracking. Persists to append-only __consumer_offsets__.log. Recovers once at startup. |
| Auto-reconnect | Both producer and consumer CLI retry leader discovery up to 10 times with exponential backoff. |
With producer and consumer APIs complete, the final page shows how to run the entire cluster — Docker setup, deployment, and end-to-end testing.