Wire Protocol and Codec — Binary Framing and Message Types in Go
In this section, you'll define how messages are encoded on the wire. You'll implement the frame format (length-prefixed binary frames), the message types (produce, fetch, topic management, etc.), the codec (JSON serialization), and the replication batch format for efficient bulk data transfer. All code lives in the protocol/ package.
Step 1: Design the frame format
Frame format overview
Every message sent over TCP is wrapped in a frame — a fixed-size header followed by a payload:
┌───────────────┬────────────────┬─────────────────────┐
│ Message Type │ Payload Size │ Payload (JSON) │
│ 2 bytes │ 4 bytes │ PayloadSize bytes │
│ (uint16) │ (uint32) │ │
└───────────────┴────────────────┴─────────────────────┘
- Message Type — 2 bytes, big-endian uint16. Identifies the request or response type.
- Payload Size — 4 bytes, big-endian uint32. Number of bytes in the payload.
- Payload —
PayloadSizebytes. The serialized message (JSON).
Total header = 6 bytes. Define the constants:
// protocol/frame.go
package protocol
import "encoding/binary"
type MessageType uint16
var byteOrder = binary.BigEndian
const messageTypeSize = 2
const messageSizeSize = 4
// frameHeaderSize is the length prefix size (2 bytes message type + 4 bytes message size)
const frameHeaderSize = messageTypeSize + messageSizeSize
// MaxFrameSize is the maximum allowed frame payload size (4MB).
const MaxFrameSize = 4 * 1024 * 1024
Why length-prefixed? TCP is a byte stream with no built-in message boundaries. The length prefix tells the receiver exactly how many bytes to read for each message. This is simpler than delimiter-based framing and works with binary payloads.
Step 2: Define message types
Message type enumeration
Each message type gets a unique identifier using iota. Request and response types are paired:
// protocol/frame.go
const (
MsgReplicateStream MessageType = iota
MsgReplicateResp
MsgProduce
MsgProduceResp
MsgProduceBatch
MsgProduceBatchResp
MsgFetch
MsgFetchResp
MsgFetchBatch
MsgFetchBatchResp
MsgFetchStream
MsgFetchStreamResp
MsgCommitOffset
MsgCommitOffsetResp
MsgFetchOffset
MsgFetchOffsetResp
MsgCreateTopic
MsgCreateTopicResp
MsgDeleteTopic
MsgDeleteTopicResp
MsgRPCError
MsgFindTopicLeader
MsgFindTopicLeaderResp
MsgFindRaftLeader
MsgFindRaftLeaderResp
MsgListTopics
MsgListTopicsResp
MsgApplyIsrUpdateEvent
MsgApplyIsrUpdateEventResp
)
Using iota keeps the numbering sequential and makes it easy to add new message types.
Step 3: Create request and response structs
Struct overview
Define Go structs for each message. These are serialized to JSON for the payload:
Core type definitions
// protocol/types.go
package protocol
// LogEntry is a log record with offset and value.
type LogEntry struct {
Offset uint64
Value []byte
}
// AckMode for producer acks.
type AckMode int32
const (
AckNone AckMode = 0 // No ack — fire and forget
AckLeader AckMode = 1 // Ack after leader append
AckAll AckMode = 2 // Ack after all ISR replicas
)
Produce request and response
type ProduceRequest struct {
Topic string
Value []byte
Acks AckMode
}
type ProduceResponse struct {
Offset uint64
}
type ProduceBatchRequest struct {
Topic string
Values [][]byte
Acks AckMode
}
type ProduceBatchResponse struct {
BaseOffset uint64
LastOffset uint64
Count uint32
}
Fetch request and response
type FetchRequest struct {
Topic string
Id string
Offset uint64
ReplicaNodeID string // when set, leader uses ReadUncommitted
}
type FetchResponse struct {
Entry *LogEntry
}
type FetchBatchRequest struct {
Topic string
Id string
Offset uint64
MaxCount uint32
ReplicaNodeID string
}
type FetchBatchResponse struct {
Entries []*LogEntry
}
The ReplicaNodeID field is the key to distinguishing consumer fetches from replication fetches. When set, the server reads uncommitted data (up to LEO) and records the replica's LEO for ISR computation.
Topic management request types
type CreateTopicRequest struct {
Topic string
ReplicaCount uint32
DesignatedLeaderNodeID string // used when Raft leader forwards to topic leader
}
type CreateTopicResponse struct {
Topic string
ReplicaNodeIds []string // actual replica set filled by topic leader
}
type DeleteTopicRequest struct {
Topic string
}
type DeleteTopicResponse struct {
Topic string
}
Leader discovery request types
type FindTopicLeaderRequest struct {
Topic string
}
type FindTopicLeaderResponse struct {
LeaderAddr string
}
type FindRaftLeaderRequest struct{}
type FindRaftLeaderResponse struct {
RaftLeaderAddr string
}
Consumer offset management requests
type CommitOffsetRequest struct {
Topic string
Id string
Offset uint64
}
type CommitOffsetResponse struct {
Success bool
}
type FetchOffsetRequest struct {
Topic string
Id string
}
type FetchOffsetResponse struct {
Offset uint64
}
List topics request
type ListTopicsRequest struct{}
type ReplicaInfo struct {
NodeID string
IsISR bool
LEO int64
}
type TopicInfo struct {
Name string
LeaderNodeID string
LeaderEpoch int64
Replicas []ReplicaInfo
}
type ListTopicsResponse struct {
Topics []TopicInfo
}
ISR update request
type ApplyIsrUpdateEventRequest struct {
Topic string
ReplicaNodeID string
Isr bool
}
type ApplyIsrUpdateEventResponse struct{}
Step 4: Implement error handling
Error codes and response types
Define a structured RPC error with error codes. The server sends RPCErrorResponse on the wire; the client converts it to an RPCError Go error:
// protocol/types.go
// RPC error codes
const (
CodeUnknown int32 = iota
CodeTopicRequired
CodeTopicNameRequired
CodeTopicNotFound
CodeNotTopicLeader
CodeValuesRequired
CodeReadOffset
CodeCommitOffset
CodeRecoverOffsets
CodeReplicaCountInvalid
CodeLeaderAddrRequired
CodeRaftLeaderUnavailable
CodeTopicExists
CodeNotEnoughNodes
CodeCannotReachLeader
CodeInvalidAckMode
CodeTimeoutCatchUp
)
// RPCErrorResponse is sent by the server when a handler returns an error.
type RPCErrorResponse struct {
Code int32 `json:"code"`
Message string `json:"message"`
}
// RPCError is returned by the transport client when the server sends an RPCErrorResponse.
type RPCError struct {
Code int32
Message string
}
func (e *RPCError) Error() string { return e.Message }
Reconnection logic
The ShouldReconnect function tells clients whether an error means they should rediscover the leader. It checks structured RPC codes, EOF, syscall errors, and network errors:
func ShouldReconnect(err error) bool {
if err == nil {
return false
}
// Server returned a structured RPC error code
var rpcErr *RPCError
if errors.As(err, &rpcErr) {
switch rpcErr.Code {
case CodeNotTopicLeader, CodeTopicNotFound, CodeRaftLeaderUnavailable:
return true
default:
return false
}
}
// EOF means the connection was closed by the peer
if errors.Is(err, io.EOF) || errors.Is(err, io.ErrUnexpectedEOF) {
return true
}
// Syscall-level connection errors
if errors.Is(err, syscall.ECONNRESET) || errors.Is(err, syscall.EPIPE) ||
errors.Is(err, syscall.ECONNREFUSED) {
return true
}
// net.Error covers timeouts and other transient network failures
var netErr net.Error
if errors.As(err, &netErr) {
return true
}
if errors.Is(err, net.ErrClosed) {
return true
}
return false
}
Step 5: Implement the codec
Codec overview
The codec is a struct with Encode and Decode methods. It uses a type switch to map Go types to message type IDs, then JSON-serializes the payload:
// protocol/codec.go
package protocol
import (
"encoding/json"
"io"
)
type Codec struct{}
Encoding messages to frames
The Encode method writes a complete frame (header + payload) to an io.Writer:
func (c *Codec) Encode(w io.Writer, msg any) error {
var mType MessageType
var payload []byte
var err error
switch v := msg.(type) {
case ProduceRequest, *ProduceRequest:
mType = MsgProduce
payload, err = json.Marshal(v)
case ProduceResponse, *ProduceResponse:
mType = MsgProduceResp
payload, err = json.Marshal(v)
case FetchRequest, *FetchRequest:
mType = MsgFetch
payload, err = json.Marshal(v)
case FetchResponse, *FetchResponse:
mType = MsgFetchResp
payload, err = json.Marshal(v)
// ... (one case per message type)
case RPCErrorResponse, *RPCErrorResponse:
mType = MsgRPCError
payload, err = json.Marshal(v)
default:
return ErrUnknownMessageType(mType)
}
if err != nil {
return err
}
return c.encodeFrame(w, mType, payload)
}
func (c *Codec) encodeFrame(w io.Writer, mType MessageType, payload []byte) error {
length := uint32(len(payload))
if length > MaxFrameSize {
return ErrFrameTooLarge
}
// Stack-allocated header avoids a heap allocation per frame
var header [frameHeaderSize]byte
byteOrder.PutUint16(header[:], uint16(mType))
byteOrder.PutUint32(header[messageTypeSize:], length)
// Single write: combine header + payload to avoid two syscalls
buf := make([]byte, frameHeaderSize+len(payload))
copy(buf, header[:])
copy(buf[frameHeaderSize:], payload)
_, err := w.Write(buf)
return err
}
Decoding frames to messages
The Decode method reads a frame from an io.Reader and returns the typed message:
func (c *Codec) Decode(r io.Reader) (MessageType, any, error) {
mType, payload, err := c.decodeFrame(r)
if err != nil {
return 0, nil, err
}
switch mType {
case MsgProduce:
var msg ProduceRequest
err = json.Unmarshal(payload, &msg)
return mType, msg, err
case MsgProduceResp:
var msg ProduceResponse
err = json.Unmarshal(payload, &msg)
return mType, msg, err
// ... (one case per message type)
case MsgRPCError:
var msg RPCErrorResponse
err = json.Unmarshal(payload, &msg)
return mType, msg, err
default:
return 0, nil, ErrUnknownMessageType(mType)
}
}
func (c *Codec) decodeFrame(r io.Reader) (MessageType, []byte, error) {
var header [frameHeaderSize]byte
if _, err := io.ReadFull(r, header[:]); err != nil {
return 0, nil, err
}
mType := MessageType(byteOrder.Uint16(header[:]))
length := byteOrder.Uint32(header[messageTypeSize:])
if length > MaxFrameSize {
return 0, nil, ErrFrameTooLarge
}
payload := make([]byte, length)
if _, err := io.ReadFull(r, payload); err != nil {
return 0, nil, err
}
return mType, payload, nil
}
The codec handles all message types in one place — adding a new RPC only requires adding two cases (one in Encode, one in Decode).
Step 6: Define metadata events for Raft
Metadata event types
Raft entries carry metadata events — not record data. Define the event types and their payloads:
// protocol/metadata.go
package protocol
import "encoding/json"
type MetadataEventType uint16
const (
MetadataEventTypeCreateTopic MetadataEventType = iota
MetadataEventTypeDeleteTopic
MetadataEventTypeLeaderChange
MetadataEventTypeIsrUpdate
MetadataEventTypeAddNode
MetadataEventTypeRemoveNode
MetadataEventTypeUpdateNode
)
type MetadataEvent struct {
EventType MetadataEventType `json:"event_type"`
Data json.RawMessage `json:"data"`
}
type CreateTopicEvent struct {
Topic string `json:"topic"`
ReplicaCount uint32 `json:"replica_count"`
LeaderNodeID string `json:"leader_id"`
LeaderEpoch int64 `json:"leader_epoch"`
ReplicaNodeIds []string `json:"replicas"`
}
type DeleteTopicEvent struct {
Topic string `json:"topic"`
}
type LeaderChangeEvent struct {
Topic string `json:"topic"`
LeaderNodeID string `json:"leader_id"`
LeaderEpoch int64 `json:"leader_epoch"`
}
type IsrUpdateEvent struct {
Topic string `json:"topic"`
ReplicaNodeID string `json:"replica_id"`
Isr bool `json:"isr"`
}
type AddNodeEvent struct {
NodeID string `json:"node_id"`
Addr string `json:"addr"`
RpcAddr string `json:"rpc_addr"`
}
type RemoveNodeEvent struct {
NodeID string `json:"node_id"`
}
type UpdateNodeEvent struct {
NodeID string `json:"node_id"`
IsHealthy bool `json:"is_healthy"`
}
These events are JSON-serialized into Raft log entries. When committed, the FSM deserializes the event and updates the in-memory metadata state.
Step 7: Implement replication batch format
Replication batch overview
For bulk replication, individual JSON-encoded fetch responses are too slow. The replication batch uses a compact binary format:
Batch header (25 bytes):
┌──────────────┬──────────────┬──────────────┬──────────┬────────────┬──────────────────┐
│ Base Offset │ Batch Length │ Leader Epoch │ CRC │ Attributes │ Last Offset Delta│
│ 8 bytes │ 4 bytes │ 4 bytes │ 4 bytes │ 1 byte │ 4 bytes │
└──────────────┴──────────────┴──────────────┴──────────┴────────────┴──────────────────┘
Followed by records:
┌──────────────┬──────────────┬──────────────────────┐
│ Offset │ Size │ Value │
│ 8 bytes │ 4 bytes │ Size bytes │
├──────────────┼──────────────┼──────────────────────┤
│ Offset │ Size │ Value │
│ ... │ ... │ ... │
└──────────────┴──────────────┴──────────────────────┘
// protocol/replication_batch.go
package protocol
import (
"encoding/binary"
"hash/crc32"
"io"
)
const (
replicationBatchHeaderSize = 8 + 4 + 4 + 4 + 1 + 4 // 25 bytes
replicationRecordHeaderSize = 8 + 4 // offset + size
)
type ReplicationRecord struct {
Offset int64
Value []byte
}
Encoding replication batches
func EncodeReplicationBatch(records []ReplicationRecord) ([]byte, error) {
if len(records) == 0 {
return nil, nil
}
baseOffset := records[0].Offset
lastOffset := records[len(records)-1].Offset
lastOffsetDelta := int32(lastOffset - baseOffset)
var recordBuf []byte
for _, r := range records {
recordBuf = append(recordBuf, encodeRecord(r)...)
}
batchLength := int32(len(recordBuf))
crc := crc32.ChecksumIEEE(recordBuf)
buf := make([]byte, replicationBatchHeaderSize+len(recordBuf))
off := 0
binary.BigEndian.PutUint64(buf[off:off+8], uint64(baseOffset))
off += 8
binary.BigEndian.PutUint32(buf[off:off+4], uint32(batchLength))
off += 4
binary.BigEndian.PutUint32(buf[off:off+4], 0) // leaderEpoch
off += 4
binary.BigEndian.PutUint32(buf[off:off+4], crc)
off += 4
buf[off] = 0 // compression = none
off++
binary.BigEndian.PutUint32(buf[off:off+4], uint32(lastOffsetDelta))
off += 4
copy(buf[off:], recordBuf)
return buf, nil
}
Decoding replication batches
func DecodeReplicationBatch(data []byte) ([]ReplicationRecord, error) {
if len(data) < replicationBatchHeaderSize {
return nil, io.ErrUnexpectedEOF
}
batchLength := int32(binary.BigEndian.Uint32(data[8:12]))
crcStored := binary.BigEndian.Uint32(data[16:20])
recordBuf := data[replicationBatchHeaderSize:]
if int32(len(recordBuf)) != batchLength {
return nil, io.ErrUnexpectedEOF
}
if crc32.ChecksumIEEE(recordBuf) != crcStored {
return nil, ErrReplicationBatchCRC
}
var records []ReplicationRecord
p := recordBuf
for len(p) >= replicationRecordHeaderSize {
offset := int64(binary.BigEndian.Uint64(p[0:8]))
size := int32(binary.BigEndian.Uint32(p[8:12]))
if size < 0 || int(replicationRecordHeaderSize)+int(size) > len(p) {
break
}
value := make([]byte, size)
copy(value, p[12:12+size])
records = append(records, ReplicationRecord{Offset: offset, Value: value})
p = p[12+size:]
}
return records, nil
}
Why a separate binary format? JSON encoding for thousands of records per replication cycle adds overhead. The binary batch format is compact, includes a CRC for integrity, and is fast to encode/decode. This mirrors Kafka's record batch format.
Step 8: Define protocol error types
Error definitions
// protocol/error.go
package protocol
import (
"errors"
"fmt"
)
var (
ErrFrameTooLarge = errors.New("protocol: frame exceeds max size")
ErrReplicationBatchCRC = errors.New("protocol: replication batch CRC mismatch")
)
func ErrUnknownMessageType(mType MessageType) error {
return fmt.Errorf("protocol: can not encode type: %d", mType)
}
Summary
| Component | Purpose |
|---|---|
| Frame format | [MsgType 2][PayloadSize 4][Payload] — length-prefixed, max 4 MB. |
| Message types | MessageType enum using iota for produce, fetch, topic CRUD, leader discovery, offset management, replication, ISR updates. |
| Request/Response structs | Go structs serialized as JSON. Each RPC has a request and response type. |
| RPCError | Server sends RPCErrorResponse; client converts to *RPCError. ShouldReconnect() checks codes, EOF, syscall errors, and net errors. |
| Codec | Struct with Encode(w, msg) and Decode(r) methods. Type-switch maps Go types to message IDs. Combined header+payload write. |
| Metadata events | Raft entry payloads: CreateTopic, DeleteTopic, LeaderChange, IsrUpdate, AddNode, RemoveNode, UpdateNode. |
| Replication batch | Binary format for bulk transfer: 25-byte batch header (base offset, epoch, CRC) + encoded records. |
With the protocol defined, the next page builds the transport and RPC server — TCP connections, request routing, and handler implementation.