TCP Transport and RPC Server — Network Layer for a Distributed Log

TCP Transport and RPC Server — Network Layer for a Distributed Log

With the protocol defined, you'll now build the network layer: a TCP transport (server and client) and an RPC server that routes incoming messages to handler functions. The transport handles connection management; the RPC server maps message types to business logic. Code lives in transport/ and rpc/.

Step 1: Build the TCP server

Transport server overview

The transport server listens on a TCP port, accepts connections, and dispatches each request to a registered handler. It uses the Codec from the protocol package for frame encoding/decoding.

Transport struct

// transport/transport.go
package transport

import (
    "context"
    "errors"
    "io"
    "net"
    "sync"
    "time"

    "github.com/mohitkumar/mlog/protocol"
)

type Transport struct {
    Codec    *protocol.Codec
    handlers map[protocol.MessageType]func(context.Context, any) (any, error)
    ln       net.Listener
    mu       sync.Mutex           // protects conns
    conns    map[net.Conn]struct{} // active connections for graceful shutdown
    wg       sync.WaitGroup       // tracks active connection goroutines
}

func NewTransport() *Transport {
    return &Transport{
        Codec:    &protocol.Codec{},
        handlers: make(map[protocol.MessageType]func(context.Context, any) (any, error)),
        conns:    make(map[net.Conn]struct{}),
    }
}

Registering message handlers

Each message type maps to a handler function that takes a context and a decoded message, and returns a response:

func (t *Transport) RegisterHandler(msgType protocol.MessageType,
    handler func(context.Context, any) (any, error)) {
    t.handlers[msgType] = handler
}

Starting the server

func (t *Transport) Listen(addr string) (net.Listener, error) {
    ln, err := net.Listen("tcp", addr)
    if err != nil {
        return nil, err
    }
    t.ln = ln
    return ln, nil
}

func (t *Transport) Serve(ln net.Listener) {
    for {
        conn, err := ln.Accept()
        if err != nil {
            return
        }
        t.wg.Add(1)
        go func() {
            defer t.wg.Done()
            t.handleConn(conn)
        }()
    }
}

Each accepted connection runs in its own goroutine, processing requests in a loop.

Handling incoming connections

For each request: decode the frame via the codec, look up the handler, call it with a timeout context, and encode the response:

const (
    idleTimeout    = 5 * time.Minute
    handlerTimeout = 30 * time.Second
)

func (t *Transport) handleConn(conn net.Conn) {
    t.trackConn(conn)
    defer func() {
        t.untrackConn(conn)
        conn.Close()
    }()
    for {
        // Set a read deadline so idle connections don't hang forever
        _ = conn.SetReadDeadline(time.Now().Add(idleTimeout))

        mType, msg, err := t.Codec.Decode(conn)
        if err != nil {
            if !errors.Is(err, io.EOF) && !errors.Is(err, net.ErrClosed) {
                slog.Warn("read error", "remote", conn.RemoteAddr(), "err", err)
            }
            return
        }
        handler := t.handlers[mType]
        if handler == nil {
            continue
        }

        ctx, cancel := context.WithTimeout(context.Background(), handlerTimeout)
        resp, err := handler(ctx, msg)
        cancel()

        if err != nil {
            code := protocol.CodeUnknown
            message := err.Error()
            var rpcErr *protocol.RPCError
            if errors.As(err, &rpcErr) {
                code = rpcErr.Code
                message = rpcErr.Message
            }
            _ = t.Codec.Encode(conn, &protocol.RPCErrorResponse{Code: code, Message: message})
            continue
        }
        if err := t.Codec.Encode(conn, resp); err != nil {
            return
        }
    }
}

Key design decisions:

  • Idle timeout: connections that sit idle for 5 minutes are closed to reclaim resources.
  • Handler timeout: each handler gets 30 seconds max.
  • Error handling: handler errors are converted to RPCErrorResponse frames so the client always gets a well-formed response.

Graceful shutdown and connection tracking

The transport tracks all active connections so it can close them on shutdown:

func (t *Transport) trackConn(c net.Conn) {
    t.mu.Lock()
    t.conns[c] = struct{}{}
    t.mu.Unlock()
}

func (t *Transport) untrackConn(c net.Conn) {
    t.mu.Lock()
    delete(t.conns, c)
    t.mu.Unlock()
}

func (t *Transport) Close() error {
    var err error
    if t.ln != nil {
        err = t.ln.Close()
        t.ln = nil
    }
    // Close all tracked connections so handleConn goroutines unblock
    t.mu.Lock()
    for c := range t.conns {
        _ = c.Close()
    }
    t.mu.Unlock()
    t.wg.Wait()
    return err
}

Step 2: Build the TCP client

Transport client overview

The client opens a persistent TCP connection to a server and supports request-response calls.

TransportClient struct

type TransportClient struct {
    mu    sync.Mutex
    conn  net.Conn
    codec *protocol.Codec
}

func Dial(addr string) (*TransportClient, error) {
    conn, err := net.Dial("tcp", addr)
    if err != nil {
        return nil, err
    }
    if tcp, ok := conn.(*net.TCPConn); ok {
        _ = tcp.SetKeepAlive(true)
        _ = tcp.SetKeepAlivePeriod(30 * time.Second)
    }
    return &TransportClient{conn: conn, codec: &protocol.Codec{}}, nil
}

Sending requests and reading responses

The Call method sends a request and reads the response in one operation. It automatically converts RPCErrorResponse frames into Go *RPCError values:

func (c *TransportClient) Call(msg any) (any, error) {
    c.mu.Lock()
    defer c.mu.Unlock()
    if err := c.codec.Encode(c.conn, msg); err != nil {
        return nil, err
    }
    return c.readResponse()
}

func (c *TransportClient) readResponse() (any, error) {
    mType, value, err := c.codec.Decode(c.conn)
    if err != nil {
        return nil, err
    }
    if mType == protocol.MsgRPCError {
        if r, ok := value.(protocol.RPCErrorResponse); ok {
            return nil, &protocol.RPCError{Code: r.Code, Message: r.Message}
        }
        return nil, &protocol.RPCError{Code: protocol.CodeUnknown, Message: "rpc error"}
    }
    return value, nil
}

Streaming operations with separate Write and Read

The client can also be used for streaming with separate Write and Read methods:

func (c *TransportClient) Write(msg any) error {
    c.mu.Lock()
    defer c.mu.Unlock()
    return c.codec.Encode(c.conn, msg)
}

func (c *TransportClient) Read() (any, error) {
    c.mu.Lock()
    defer c.mu.Unlock()
    _, resp, err := c.codec.Decode(c.conn)
    return resp, err
}

func (c *TransportClient) Close() error {
    return c.conn.Close()
}

Step 3: Build the RPC server

RPC server overview

The RPC server sits on top of the transport. It registers handlers for each message type and delegates to the TopicManager and ConsumerManager for business logic.

RpcServer struct

// rpc/server.go
package rpc

import (
    "context"

    consumermgr "github.com/mohitkumar/mlog/consumer"
    "github.com/mohitkumar/mlog/protocol"
    "github.com/mohitkumar/mlog/topic"
    "github.com/mohitkumar/mlog/transport"
)

type RpcServer struct {
    Addr            string
    topicManager    *topic.TopicManager
    consumerManager *consumermgr.ConsumerManager
    transport       *transport.Transport
}

func NewRpcServer(addr string, topicManager *topic.TopicManager,
    consumerManager *consumermgr.ConsumerManager) *RpcServer {
    srv := &RpcServer{
        Addr:            addr,
        topicManager:    topicManager,
        consumerManager: consumerManager,
        transport:       transport.NewTransport(),
    }
    srv.RegisterHandlers()
    return srv
}

Registering all RPC handlers

Handlers are registered as closures that type-assert the decoded message and call the appropriate method:

func (s *RpcServer) RegisterHandlers() {
    // Producer
    s.transport.RegisterHandler(protocol.MsgProduce, func(ctx context.Context, req any) (any, error) {
        r := req.(protocol.ProduceRequest)
        return s.Produce(ctx, &r)
    })
    s.transport.RegisterHandler(protocol.MsgProduceBatch, func(ctx context.Context, req any) (any, error) {
        r := req.(protocol.ProduceBatchRequest)
        return s.ProduceBatch(ctx, &r)
    })
    // Consumer
    s.transport.RegisterHandler(protocol.MsgFetch, func(ctx context.Context, req any) (any, error) {
        r := req.(protocol.FetchRequest)
        return s.Fetch(ctx, &r)
    })
    s.transport.RegisterHandler(protocol.MsgFetchBatch, func(ctx context.Context, req any) (any, error) {
        r := req.(protocol.FetchBatchRequest)
        return s.FetchBatch(ctx, &r)
    })
    s.transport.RegisterHandler(protocol.MsgCommitOffset, func(ctx context.Context, req any) (any, error) {
        r := req.(protocol.CommitOffsetRequest)
        return s.CommitOffset(ctx, &r)
    })
    s.transport.RegisterHandler(protocol.MsgFetchOffset, func(ctx context.Context, req any) (any, error) {
        r := req.(protocol.FetchOffsetRequest)
        return s.FetchOffset(ctx, &r)
    })
    // Topic management
    s.transport.RegisterHandler(protocol.MsgCreateTopic, func(ctx context.Context, req any) (any, error) {
        r := req.(protocol.CreateTopicRequest)
        return s.CreateTopic(ctx, &r)
    })
    s.transport.RegisterHandler(protocol.MsgDeleteTopic, func(ctx context.Context, req any) (any, error) {
        r := req.(protocol.DeleteTopicRequest)
        return s.DeleteTopic(ctx, &r)
    })
    // Discovery
    s.transport.RegisterHandler(protocol.MsgFindTopicLeader, func(ctx context.Context, req any) (any, error) {
        r := req.(protocol.FindTopicLeaderRequest)
        return s.FindTopicLeader(ctx, &r)
    })
    s.transport.RegisterHandler(protocol.MsgFindRaftLeader, func(ctx context.Context, req any) (any, error) {
        r := req.(protocol.FindRaftLeaderRequest)
        return s.FindRaftLeader(ctx, &r)
    })
    s.transport.RegisterHandler(protocol.MsgListTopics, func(ctx context.Context, req any) (any, error) {
        r := req.(protocol.ListTopicsRequest)
        return s.ListTopics(ctx, &r)
    })
}

func (s *RpcServer) Start() error {
    ln, err := s.transport.Listen(s.Addr)
    if err != nil {
        return err
    }
    s.Addr = s.transport.Addr()
    go s.transport.Serve(ln)
    return nil
}

func (s *RpcServer) Stop() error {
    return s.transport.Close()
}

Produce RPC handler

The produce handler validates the request, checks this node is the topic leader, then delegates to the TopicManager:

// rpc/producer.go
func (srv *RpcServer) Produce(ctx context.Context, req *protocol.ProduceRequest) (*protocol.ProduceResponse, error) {
    if req.Topic == "" {
        return nil, Err(protocol.CodeTopicRequired, "topic is required")
    }
    if len(req.Value) == 0 {
        return nil, Err(protocol.CodeValuesRequired, "value is required")
    }
    topicObj, err := srv.topicManager.GetTopic(req.Topic)
    if err != nil {
        return nil, &protocol.RPCError{Code: protocol.CodeTopicNotFound,
            Message: fmt.Sprintf("topic %s not found: %v", req.Topic, err)}
    }
    isLeader, _ := srv.topicManager.IsLeader(req.Topic)
    if !isLeader {
        return nil, Err(protocol.CodeNotTopicLeader,
            "this node is not the topic leader; produce to the topic leader")
    }

    offset, err := srv.topicManager.HandleProduce(ctx, topicObj, &protocol.LogEntry{
        Value: req.Value,
    }, req.Acks)
    if err != nil {
        return nil, FromError(err)
    }
    return &protocol.ProduceResponse{Offset: offset}, nil
}

Fetch RPC handler

The fetch handler reads a single record. If ReplicaNodeID is set, it reads uncommitted data (for replication) and records the replica's LEO for ISR computation:

// rpc/consumer.go
func (s *RpcServer) Fetch(ctx context.Context, req *protocol.FetchRequest) (*protocol.FetchResponse, error) {
    if req.Topic == "" {
        return nil, Err(protocol.CodeTopicRequired, "topic is required")
    }

    off := req.Offset
    if off == 0 && req.ReplicaNodeID == "" {
        // Try to recover consumer offset
        if err := s.consumerManager.Recover(); err == nil {
            if cached, err := s.consumerManager.GetOffset(req.Id, req.Topic); err == nil {
                off = cached
            }
        }
    }

    leaderLog, err := s.topicManager.GetLeader(req.Topic)
    if err != nil {
        return nil, &protocol.RPCError{Code: protocol.CodeTopicNotFound,
            Message: fmt.Sprintf("topic %s not found: %v", req.Topic, err)}
    }

    var raw []byte
    if req.ReplicaNodeID != "" {
        // Replication fetch: read up to LEO (uncommitted)
        raw, err = leaderLog.ReadUncommitted(off)
    } else {
        // Consumer fetch: read up to HW (committed only)
        raw, err = leaderLog.Read(off)
    }
    if err != nil {
        if req.ReplicaNodeID != "" {
            _ = s.topicManager.RecordReplicaLEOFromFetch(ctx, req.Topic, req.ReplicaNodeID, int64(req.Offset))
        }
        return nil, &protocol.RPCError{Code: protocol.CodeReadOffset,
            Message: fmt.Sprintf("failed to read offset %d: %v", off, err)}
    }

    // Segment returns [offset 8 bytes][value]; strip header for response
    const offWidth = 8
    if len(raw) >= offWidth {
        raw = raw[offWidth:]
    }

    if req.ReplicaNodeID != "" {
        _ = s.topicManager.RecordReplicaLEOFromFetch(ctx, req.Topic, req.ReplicaNodeID, int64(off+1))
    }
    return &protocol.FetchResponse{
        Entry: &protocol.LogEntry{Offset: off, Value: raw},
    }, nil
}

This dual-path read is how the same RPC endpoint serves both consumers and replication followers.

Topic management RPC handlers

These handlers forward to the topic manager which coordinates with Raft:

// rpc/leader.go
func (s *RpcServer) CreateTopic(ctx context.Context, req *protocol.CreateTopicRequest) (*protocol.CreateTopicResponse, error) {
    if req.Topic == "" {
        return nil, Err(protocol.CodeTopicNameRequired, "topic name is required")
    }
    resp, err := s.topicManager.CreateTopic(ctx, req)
    if err != nil {
        return nil, FromError(err)
    }
    return resp, nil
}

func (srv *RpcServer) FindTopicLeader(ctx context.Context, req *protocol.FindTopicLeaderRequest) (*protocol.FindTopicLeaderResponse, error) {
    if req.Topic == "" {
        return nil, Err(protocol.CodeTopicRequired, "topic is required")
    }
    leaderAddr, err := srv.topicManager.GetTopicLeaderRPCAddr(req.Topic)
    if err != nil {
        return nil, &protocol.RPCError{Code: protocol.CodeTopicNotFound,
            Message: fmt.Sprintf("topic %s not found: %v", req.Topic, err)}
    }
    return &protocol.FindTopicLeaderResponse{LeaderAddr: leaderAddr}, nil
}

func (srv *RpcServer) FindRaftLeader(ctx context.Context, req *protocol.FindRaftLeaderRequest) (*protocol.FindRaftLeaderResponse, error) {
    addr, err := srv.topicManager.GetRaftLeaderRPCAddr()
    if err != nil {
        return nil, &protocol.RPCError{Code: protocol.CodeRaftLeaderUnavailable, Message: err.Error()}
    }
    return &protocol.FindRaftLeaderResponse{RaftLeaderAddr: addr}, nil
}

func (srv *RpcServer) ListTopics(ctx context.Context, req *protocol.ListTopicsRequest) (*protocol.ListTopicsResponse, error) {
    return srv.topicManager.ListTopics(), nil
}

Error code mapping

Map domain errors to RPC error codes:

// rpc/error.go
func Err(code int32, message string) error {
    return &protocol.RPCError{Code: code, Message: message}
}

func CodeFor(err error) int32 {
    if err == nil {
        return 0
    }
    switch {
    case errors.Is(err, errs.ErrTopicNotFound):
        return protocol.CodeTopicNotFound
    case errors.Is(err, errs.ErrTopicExists):
        return protocol.CodeTopicExists
    case errors.Is(err, errs.ErrNotEnoughNodes):
        return protocol.CodeNotEnoughNodes
    case errors.Is(err, errs.ErrCannotReachLeader):
        return protocol.CodeCannotReachLeader
    case errors.Is(err, errs.ErrThisNodeNotLeader):
        return protocol.CodeNotTopicLeader
    case errors.Is(err, errs.ErrInvalidAckMode):
        return protocol.CodeInvalidAckMode
    case errors.Is(err, errs.ErrTimeoutCatchUp):
        return protocol.CodeTimeoutCatchUp
    case errors.Is(err, errs.ErrValuesEmpty):
        return protocol.CodeValuesRequired
    case errors.Is(err, errs.ErrLogOffsetOutOfRange),
         errors.Is(err, errs.ErrSegmentOffsetNotFound):
        return protocol.CodeReadOffset
    case errors.Is(err, errs.ErrRaftNoLeader),
         errors.Is(err, errs.ErrRaftNodeNotFound):
        return protocol.CodeRaftLeaderUnavailable
    default:
        return protocol.CodeUnknown
    }
}

func FromError(err error) error {
    if err == nil {
        return nil
    }
    return &protocol.RPCError{Code: CodeFor(err), Message: err.Error()}
}

Step 4: Understand the request/response flow

Complete produce flow

Here is the complete flow for a produce request:

Client                          Server
  │                                │
  │  [MsgType][Size][JSON]  →      │  1. Codec.Decode() reads frame
  │                                │  2. Look up handler for message type
  │                                │  3. Type-assert → ProduceRequest
  │                                │  4. Call TopicManager.HandleProduce()
  │                                │  5. Encode ProduceResponse → JSON
  │  ← [MsgType][Size][JSON]       │  6. Codec.Encode() writes response frame
  │                                │

If the handler returns an error:

Error response flow

  │                                │  4. Handler returns *RPCError
  │  ← [MsgRPCError][Size][JSON]   │  5. Encode RPCErrorResponse frame
  │                                │
  │  Client checks ShouldReconnect │
  │  If true: rediscover leader    │

Summary

Component Purpose
Transport (server) TCP listener, accept connections, decode frames via Codec, dispatch to handlers with context timeout, encode response. Tracks connections for graceful shutdown.
Transport (client) TCP dialer with keepalive. Call() sends request and reads response. Auto-converts error frames to *RPCError. Separate Write/Read for streaming.
RPC server Maps message types to handler closures. Handlers type-assert requests, call business logic, return typed responses.
Produce handler Validates request, checks leader status, calls TopicManager.HandleProduce() with ack mode.
Fetch handler Dual-path: consumer reads up to HW; replica reads up to LEO. Strips segment offset header. Records replica LEO for ISR.
Leader discovery Returns the RPC address of the topic leader or Raft leader. Any node can answer.
Error handling CodeFor() maps domain errors to RPC codes. FromError() wraps errors for the wire. Clients use ShouldReconnect() for leader rediscovery.

With transport and RPC in place, the next page covers Serf discovery — how nodes find each other and form a cluster.