Wire Protocol and Codec — Binary Framing and Message Types in Go

Wire Protocol and Codec — Binary Framing and Message Types in Go

In this section, you'll define how messages are encoded on the wire. You'll implement the frame format (length-prefixed binary frames), the message types (produce, fetch, topic management, etc.), the codec (JSON serialization), and the replication batch format for efficient bulk data transfer. All code lives in the protocol/ package.

Step 1: Design the frame format

Frame format overview

Every message sent over TCP is wrapped in a frame — a fixed-size header followed by a payload:

┌───────────────┬────────────────┬─────────────────────┐
│  Message Type │  Payload Size  │  Payload (JSON)     │
│  2 bytes      │  4 bytes       │  PayloadSize bytes  │
│  (uint16)     │  (uint32)      │                     │
└───────────────┴────────────────┴─────────────────────┘
  • Message Type — 2 bytes, big-endian uint16. Identifies the request or response type.
  • Payload Size — 4 bytes, big-endian uint32. Number of bytes in the payload.
  • PayloadPayloadSize bytes. The serialized message (JSON).

Total header = 6 bytes. Define the constants:

// protocol/frame.go
package protocol

import "encoding/binary"

type MessageType uint16

var byteOrder = binary.BigEndian

const messageTypeSize = 2
const messageSizeSize = 4

// frameHeaderSize is the length prefix size (2 bytes message type + 4 bytes message size)
const frameHeaderSize = messageTypeSize + messageSizeSize

// MaxFrameSize is the maximum allowed frame payload size (4MB).
const MaxFrameSize = 4 * 1024 * 1024

Why length-prefixed? TCP is a byte stream with no built-in message boundaries. The length prefix tells the receiver exactly how many bytes to read for each message. This is simpler than delimiter-based framing and works with binary payloads.

Step 2: Define message types

Message type enumeration

Each message type gets a unique identifier using iota. Request and response types are paired:

// protocol/frame.go
const (
    MsgReplicateStream MessageType = iota
    MsgReplicateResp
    MsgProduce
    MsgProduceResp
    MsgProduceBatch
    MsgProduceBatchResp
    MsgFetch
    MsgFetchResp
    MsgFetchBatch
    MsgFetchBatchResp
    MsgFetchStream
    MsgFetchStreamResp
    MsgCommitOffset
    MsgCommitOffsetResp
    MsgFetchOffset
    MsgFetchOffsetResp
    MsgCreateTopic
    MsgCreateTopicResp
    MsgDeleteTopic
    MsgDeleteTopicResp
    MsgRPCError
    MsgFindTopicLeader
    MsgFindTopicLeaderResp
    MsgFindRaftLeader
    MsgFindRaftLeaderResp
    MsgListTopics
    MsgListTopicsResp
    MsgApplyIsrUpdateEvent
    MsgApplyIsrUpdateEventResp
)

Using iota keeps the numbering sequential and makes it easy to add new message types.

Step 3: Create request and response structs

Struct overview

Define Go structs for each message. These are serialized to JSON for the payload:

Core type definitions

// protocol/types.go
package protocol

// LogEntry is a log record with offset and value.
type LogEntry struct {
    Offset uint64
    Value  []byte
}

// AckMode for producer acks.
type AckMode int32

const (
    AckNone   AckMode = 0  // No ack — fire and forget
    AckLeader AckMode = 1  // Ack after leader append
    AckAll    AckMode = 2  // Ack after all ISR replicas
)

Produce request and response

type ProduceRequest struct {
    Topic string
    Value []byte
    Acks  AckMode
}

type ProduceResponse struct {
    Offset uint64
}

type ProduceBatchRequest struct {
    Topic  string
    Values [][]byte
    Acks   AckMode
}

type ProduceBatchResponse struct {
    BaseOffset uint64
    LastOffset uint64
    Count      uint32
}

Fetch request and response

type FetchRequest struct {
    Topic         string
    Id            string
    Offset        uint64
    ReplicaNodeID string // when set, leader uses ReadUncommitted
}

type FetchResponse struct {
    Entry *LogEntry
}

type FetchBatchRequest struct {
    Topic         string
    Id            string
    Offset        uint64
    MaxCount      uint32
    ReplicaNodeID string
}

type FetchBatchResponse struct {
    Entries []*LogEntry
}

The ReplicaNodeID field is the key to distinguishing consumer fetches from replication fetches. When set, the server reads uncommitted data (up to LEO) and records the replica's LEO for ISR computation.

Topic management request types

type CreateTopicRequest struct {
    Topic                  string
    ReplicaCount           uint32
    DesignatedLeaderNodeID string // used when Raft leader forwards to topic leader
}

type CreateTopicResponse struct {
    Topic          string
    ReplicaNodeIds []string // actual replica set filled by topic leader
}

type DeleteTopicRequest struct {
    Topic string
}

type DeleteTopicResponse struct {
    Topic string
}

Leader discovery request types

type FindTopicLeaderRequest struct {
    Topic string
}

type FindTopicLeaderResponse struct {
    LeaderAddr string
}

type FindRaftLeaderRequest struct{}

type FindRaftLeaderResponse struct {
    RaftLeaderAddr string
}

Consumer offset management requests

type CommitOffsetRequest struct {
    Topic  string
    Id     string
    Offset uint64
}

type CommitOffsetResponse struct {
    Success bool
}

type FetchOffsetRequest struct {
    Topic string
    Id    string
}

type FetchOffsetResponse struct {
    Offset uint64
}

List topics request

type ListTopicsRequest struct{}

type ReplicaInfo struct {
    NodeID string
    IsISR  bool
    LEO    int64
}

type TopicInfo struct {
    Name          string
    LeaderNodeID  string
    LeaderEpoch   int64
    Replicas      []ReplicaInfo
}

type ListTopicsResponse struct {
    Topics []TopicInfo
}

ISR update request

type ApplyIsrUpdateEventRequest struct {
    Topic         string
    ReplicaNodeID string
    Isr           bool
}

type ApplyIsrUpdateEventResponse struct{}

Step 4: Implement error handling

Error codes and response types

Define a structured RPC error with error codes. The server sends RPCErrorResponse on the wire; the client converts it to an RPCError Go error:

// protocol/types.go

// RPC error codes
const (
    CodeUnknown             int32 = iota
    CodeTopicRequired
    CodeTopicNameRequired
    CodeTopicNotFound
    CodeNotTopicLeader
    CodeValuesRequired
    CodeReadOffset
    CodeCommitOffset
    CodeRecoverOffsets
    CodeReplicaCountInvalid
    CodeLeaderAddrRequired
    CodeRaftLeaderUnavailable
    CodeTopicExists
    CodeNotEnoughNodes
    CodeCannotReachLeader
    CodeInvalidAckMode
    CodeTimeoutCatchUp
)

// RPCErrorResponse is sent by the server when a handler returns an error.
type RPCErrorResponse struct {
    Code    int32  `json:"code"`
    Message string `json:"message"`
}

// RPCError is returned by the transport client when the server sends an RPCErrorResponse.
type RPCError struct {
    Code    int32
    Message string
}

func (e *RPCError) Error() string { return e.Message }

Reconnection logic

The ShouldReconnect function tells clients whether an error means they should rediscover the leader. It checks structured RPC codes, EOF, syscall errors, and network errors:

func ShouldReconnect(err error) bool {
    if err == nil {
        return false
    }
    // Server returned a structured RPC error code
    var rpcErr *RPCError
    if errors.As(err, &rpcErr) {
        switch rpcErr.Code {
        case CodeNotTopicLeader, CodeTopicNotFound, CodeRaftLeaderUnavailable:
            return true
        default:
            return false
        }
    }
    // EOF means the connection was closed by the peer
    if errors.Is(err, io.EOF) || errors.Is(err, io.ErrUnexpectedEOF) {
        return true
    }
    // Syscall-level connection errors
    if errors.Is(err, syscall.ECONNRESET) || errors.Is(err, syscall.EPIPE) ||
       errors.Is(err, syscall.ECONNREFUSED) {
        return true
    }
    // net.Error covers timeouts and other transient network failures
    var netErr net.Error
    if errors.As(err, &netErr) {
        return true
    }
    if errors.Is(err, net.ErrClosed) {
        return true
    }
    return false
}

Step 5: Implement the codec

Codec overview

The codec is a struct with Encode and Decode methods. It uses a type switch to map Go types to message type IDs, then JSON-serializes the payload:

// protocol/codec.go
package protocol

import (
    "encoding/json"
    "io"
)

type Codec struct{}

Encoding messages to frames

The Encode method writes a complete frame (header + payload) to an io.Writer:

func (c *Codec) Encode(w io.Writer, msg any) error {
    var mType MessageType
    var payload []byte
    var err error
    switch v := msg.(type) {
    case ProduceRequest, *ProduceRequest:
        mType = MsgProduce
        payload, err = json.Marshal(v)
    case ProduceResponse, *ProduceResponse:
        mType = MsgProduceResp
        payload, err = json.Marshal(v)
    case FetchRequest, *FetchRequest:
        mType = MsgFetch
        payload, err = json.Marshal(v)
    case FetchResponse, *FetchResponse:
        mType = MsgFetchResp
        payload, err = json.Marshal(v)
    // ... (one case per message type)
    case RPCErrorResponse, *RPCErrorResponse:
        mType = MsgRPCError
        payload, err = json.Marshal(v)
    default:
        return ErrUnknownMessageType(mType)
    }
    if err != nil {
        return err
    }
    return c.encodeFrame(w, mType, payload)
}

func (c *Codec) encodeFrame(w io.Writer, mType MessageType, payload []byte) error {
    length := uint32(len(payload))
    if length > MaxFrameSize {
        return ErrFrameTooLarge
    }
    // Stack-allocated header avoids a heap allocation per frame
    var header [frameHeaderSize]byte
    byteOrder.PutUint16(header[:], uint16(mType))
    byteOrder.PutUint32(header[messageTypeSize:], length)
    // Single write: combine header + payload to avoid two syscalls
    buf := make([]byte, frameHeaderSize+len(payload))
    copy(buf, header[:])
    copy(buf[frameHeaderSize:], payload)
    _, err := w.Write(buf)
    return err
}

Decoding frames to messages

The Decode method reads a frame from an io.Reader and returns the typed message:

func (c *Codec) Decode(r io.Reader) (MessageType, any, error) {
    mType, payload, err := c.decodeFrame(r)
    if err != nil {
        return 0, nil, err
    }
    switch mType {
    case MsgProduce:
        var msg ProduceRequest
        err = json.Unmarshal(payload, &msg)
        return mType, msg, err
    case MsgProduceResp:
        var msg ProduceResponse
        err = json.Unmarshal(payload, &msg)
        return mType, msg, err
    // ... (one case per message type)
    case MsgRPCError:
        var msg RPCErrorResponse
        err = json.Unmarshal(payload, &msg)
        return mType, msg, err
    default:
        return 0, nil, ErrUnknownMessageType(mType)
    }
}

func (c *Codec) decodeFrame(r io.Reader) (MessageType, []byte, error) {
    var header [frameHeaderSize]byte
    if _, err := io.ReadFull(r, header[:]); err != nil {
        return 0, nil, err
    }
    mType := MessageType(byteOrder.Uint16(header[:]))
    length := byteOrder.Uint32(header[messageTypeSize:])
    if length > MaxFrameSize {
        return 0, nil, ErrFrameTooLarge
    }
    payload := make([]byte, length)
    if _, err := io.ReadFull(r, payload); err != nil {
        return 0, nil, err
    }
    return mType, payload, nil
}

The codec handles all message types in one place — adding a new RPC only requires adding two cases (one in Encode, one in Decode).

Step 6: Define metadata events for Raft

Metadata event types

Raft entries carry metadata events — not record data. Define the event types and their payloads:

// protocol/metadata.go
package protocol

import "encoding/json"

type MetadataEventType uint16

const (
    MetadataEventTypeCreateTopic  MetadataEventType = iota
    MetadataEventTypeDeleteTopic
    MetadataEventTypeLeaderChange
    MetadataEventTypeIsrUpdate
    MetadataEventTypeAddNode
    MetadataEventTypeRemoveNode
    MetadataEventTypeUpdateNode
)

type MetadataEvent struct {
    EventType MetadataEventType `json:"event_type"`
    Data      json.RawMessage   `json:"data"`
}

type CreateTopicEvent struct {
    Topic          string   `json:"topic"`
    ReplicaCount   uint32   `json:"replica_count"`
    LeaderNodeID   string   `json:"leader_id"`
    LeaderEpoch    int64    `json:"leader_epoch"`
    ReplicaNodeIds []string `json:"replicas"`
}

type DeleteTopicEvent struct {
    Topic string `json:"topic"`
}

type LeaderChangeEvent struct {
    Topic        string `json:"topic"`
    LeaderNodeID string `json:"leader_id"`
    LeaderEpoch  int64  `json:"leader_epoch"`
}

type IsrUpdateEvent struct {
    Topic         string `json:"topic"`
    ReplicaNodeID string `json:"replica_id"`
    Isr           bool   `json:"isr"`
}

type AddNodeEvent struct {
    NodeID  string `json:"node_id"`
    Addr    string `json:"addr"`
    RpcAddr string `json:"rpc_addr"`
}

type RemoveNodeEvent struct {
    NodeID string `json:"node_id"`
}

type UpdateNodeEvent struct {
    NodeID    string `json:"node_id"`
    IsHealthy bool   `json:"is_healthy"`
}

These events are JSON-serialized into Raft log entries. When committed, the FSM deserializes the event and updates the in-memory metadata state.

Step 7: Implement replication batch format

Replication batch overview

For bulk replication, individual JSON-encoded fetch responses are too slow. The replication batch uses a compact binary format:

Batch header (25 bytes):
┌──────────────┬──────────────┬──────────────┬──────────┬────────────┬──────────────────┐
│ Base Offset  │ Batch Length │ Leader Epoch │   CRC    │ Attributes │ Last Offset Delta│
│  8 bytes     │  4 bytes     │  4 bytes     │ 4 bytes  │  1 byte    │  4 bytes         │
└──────────────┴──────────────┴──────────────┴──────────┴────────────┴──────────────────┘

Followed by records:
┌──────────────┬──────────────┬──────────────────────┐
│   Offset     │    Size      │    Value             │
│   8 bytes    │   4 bytes    │   Size bytes         │
├──────────────┼──────────────┼──────────────────────┤
│   Offset     │    Size      │    Value             │
│   ...        │   ...        │   ...                │
└──────────────┴──────────────┴──────────────────────┘
// protocol/replication_batch.go
package protocol

import (
    "encoding/binary"
    "hash/crc32"
    "io"
)

const (
    replicationBatchHeaderSize  = 8 + 4 + 4 + 4 + 1 + 4 // 25 bytes
    replicationRecordHeaderSize = 8 + 4                   // offset + size
)

type ReplicationRecord struct {
    Offset int64
    Value  []byte
}

Encoding replication batches

func EncodeReplicationBatch(records []ReplicationRecord) ([]byte, error) {
    if len(records) == 0 {
        return nil, nil
    }
    baseOffset := records[0].Offset
    lastOffset := records[len(records)-1].Offset
    lastOffsetDelta := int32(lastOffset - baseOffset)

    var recordBuf []byte
    for _, r := range records {
        recordBuf = append(recordBuf, encodeRecord(r)...)
    }
    batchLength := int32(len(recordBuf))
    crc := crc32.ChecksumIEEE(recordBuf)

    buf := make([]byte, replicationBatchHeaderSize+len(recordBuf))
    off := 0
    binary.BigEndian.PutUint64(buf[off:off+8], uint64(baseOffset))
    off += 8
    binary.BigEndian.PutUint32(buf[off:off+4], uint32(batchLength))
    off += 4
    binary.BigEndian.PutUint32(buf[off:off+4], 0) // leaderEpoch
    off += 4
    binary.BigEndian.PutUint32(buf[off:off+4], crc)
    off += 4
    buf[off] = 0 // compression = none
    off++
    binary.BigEndian.PutUint32(buf[off:off+4], uint32(lastOffsetDelta))
    off += 4
    copy(buf[off:], recordBuf)
    return buf, nil
}

Decoding replication batches

func DecodeReplicationBatch(data []byte) ([]ReplicationRecord, error) {
    if len(data) < replicationBatchHeaderSize {
        return nil, io.ErrUnexpectedEOF
    }
    batchLength := int32(binary.BigEndian.Uint32(data[8:12]))
    crcStored := binary.BigEndian.Uint32(data[16:20])

    recordBuf := data[replicationBatchHeaderSize:]
    if int32(len(recordBuf)) != batchLength {
        return nil, io.ErrUnexpectedEOF
    }
    if crc32.ChecksumIEEE(recordBuf) != crcStored {
        return nil, ErrReplicationBatchCRC
    }

    var records []ReplicationRecord
    p := recordBuf
    for len(p) >= replicationRecordHeaderSize {
        offset := int64(binary.BigEndian.Uint64(p[0:8]))
        size := int32(binary.BigEndian.Uint32(p[8:12]))
        if size < 0 || int(replicationRecordHeaderSize)+int(size) > len(p) {
            break
        }
        value := make([]byte, size)
        copy(value, p[12:12+size])
        records = append(records, ReplicationRecord{Offset: offset, Value: value})
        p = p[12+size:]
    }
    return records, nil
}

Why a separate binary format? JSON encoding for thousands of records per replication cycle adds overhead. The binary batch format is compact, includes a CRC for integrity, and is fast to encode/decode. This mirrors Kafka's record batch format.

Step 8: Define protocol error types

Error definitions

// protocol/error.go
package protocol

import (
    "errors"
    "fmt"
)

var (
    ErrFrameTooLarge       = errors.New("protocol: frame exceeds max size")
    ErrReplicationBatchCRC = errors.New("protocol: replication batch CRC mismatch")
)

func ErrUnknownMessageType(mType MessageType) error {
    return fmt.Errorf("protocol: can not encode type: %d", mType)
}

Summary

Component Purpose
Frame format [MsgType 2][PayloadSize 4][Payload] — length-prefixed, max 4 MB.
Message types MessageType enum using iota for produce, fetch, topic CRUD, leader discovery, offset management, replication, ISR updates.
Request/Response structs Go structs serialized as JSON. Each RPC has a request and response type.
RPCError Server sends RPCErrorResponse; client converts to *RPCError. ShouldReconnect() checks codes, EOF, syscall errors, and net errors.
Codec Struct with Encode(w, msg) and Decode(r) methods. Type-switch maps Go types to message IDs. Combined header+payload write.
Metadata events Raft entry payloads: CreateTopic, DeleteTopic, LeaderChange, IsrUpdate, AddNode, RemoveNode, UpdateNode.
Replication batch Binary format for bulk transfer: 25-byte batch header (base offset, epoch, CRC) + encoded records.

With the protocol defined, the next page builds the transport and RPC server — TCP connections, request routing, and handler implementation.