Running a Multi-Node Distributed Log Cluster End-to-End

Running a Multi-Node Distributed Log Cluster End-to-End

In this section, you'll bring it all together and run the distributed log end to end. You'll build the binaries, start a 3-node cluster (with Docker or locally), create topics, produce and consume messages, and verify replication and fault tolerance.

Step 0: Build the binaries

Compiling all binaries

The Makefile compiles five binaries:

make build

This produces:

Binary Source Purpose
bin/mlog . (root) Root CLI (cobra-based)
bin/server cmd/server The broker server (log + Raft + Serf + RPC)
bin/producer cmd/producer CLI producer (sends messages from stdin)
bin/consumer cmd/consumer CLI consumer (prints messages to stdout)
bin/topic cmd/topic Topic management (create, delete, list)

Individual targets are also available:

make build-server    # only the server
make build-producer  # only the producer
make build-consumer  # only the consumer
make build-topic     # only the topic CLI

Step 1: Configure server startup flags

Command-line options for the server

The server binary accepts these flags:

--bind-addr              Serf listen address (default 127.0.0.1:9092)
--advertise-addr         Address other nodes see (e.g., node1 in Docker)
--rpc-port               RPC listen port (default 9094)
--data-dir               Where to store log data (default /tmp/mlog)
--node-id                Unique node identifier (default node-1)
--raft-addr              Raft transport address (default 127.0.0.1:9093)
--bootstrap              Bootstrap a new Raft cluster (only set on ONE node)
--peer                   Peer nodes for Serf join (repeatable: --peer node-2=host:port)
--replication-batch-size Max records per replication fetch (default 5000)

Each node needs three ports: Serf (gossip), Raft (consensus), and RPC (client traffic and replication). All flags can also be set via environment variables with the MLOG_ prefix (e.g., MLOG_NODE_ID=node-1).

Initialization order

The CommandHelper in cmd/server/helper.go wires all components together in the correct order:

// cmd/server/helper.go (setup order)

func NewCommandHelper(config config.Config) (*CommandHelper, error) {
    cmdHelper.setupCoordinator()  // 1. Create TopicManager + Raft Coordinator
    cmdHelper.setupTopicManager() // 2. Wait for Raft, restore from metadata, start replication
    cmdHelper.setupRpcServer()    // 3. Start RPC server (with ConsumerManager)
    cmdHelper.setupMembership()   // 4. Join Serf cluster
}
  1. Coordinator setup creates the TopicManager first (it implements MetadataStore), then the Coordinator (Raft). On bootstrap, waits up to 30s for Raft leader election. The bootstrap node adds itself to the metadata store.
  2. TopicManager setup waits for Raft to be ready (up to 15s), then restores topic state from metadata and starts the replication thread.
  3. RPC server creates the ConsumerManager (with offset recovery) and starts listening.
  4. Serf membership joins the cluster. As other nodes join/leave, Serf events trigger Raft Join/Leave operations.

Shutdown reverses the order: leave Serf, stop RPC, shutdown Raft.

Step 2: Deploy with Docker Compose (recommended)

Setting up the containerized cluster

The Docker Compose file starts a 3-node cluster with networking handled automatically.

Dockerfile

The multi-stage Dockerfile builds the server in an Alpine container:

# infra/Dockerfile

# Build stage
FROM golang:1.25-alpine AS builder
WORKDIR /app

COPY go.mod go.sum ./
RUN go mod download

COPY . .
RUN CGO_ENABLED=0 go build -o /server ./cmd/server

# Run stage
FROM alpine:3.19
RUN apk --no-cache add ca-certificates netcat-openbsd
WORKDIR /app
COPY --from=builder /server .

ENTRYPOINT ["/app/server"]
CMD ["--bind-addr", "0.0.0.0:9092", "--raft-addr", "0.0.0.0:9093",
     "--rpc-port", "9094", "--data-dir", "/data",
     "--node-id", "node-1", "--bootstrap", "false"]

netcat-openbsd is included for the healthcheck (nc -z localhost 9094).

docker-compose.yml

# infra/docker-compose.yml

services:
  node1:
    build:
      context: ..
      dockerfile: infra/Dockerfile
    container_name: mlog-node1
    hostname: node1
    ports:
      - "9092:9092"   # Serf
      - "9093:9093"   # Raft
      - "9094:9094"   # RPC
    volumes:
      - node1-data:/data
    command: [
      "server",
      "--bind-addr", "0.0.0.0:9092",
      "--raft-addr", "0.0.0.0:9093",
      "--rpc-port", "9094",
      "--advertise-addr", "node1",
      "--data-dir", "/data",
      "--node-id", "node-1",
      "--bootstrap", "true",
      "--peer", "node-2=node2:9095",
      "--peer", "node-3=node3:9098"
    ]
    networks:
      - mlog-net
    healthcheck:
      test: ["CMD", "nc", "-z", "localhost", "9094"]
      interval: 3s
      timeout: 3s
      retries: 10
      start_period: 20s

  node2:
    build:
      context: ..
      dockerfile: infra/Dockerfile
    container_name: mlog-node2
    hostname: node2
    ports:
      - "9095:9095"   # Serf
      - "9096:9096"   # Raft
      - "9097:9097"   # RPC
    volumes:
      - node2-data:/data
    command: [
      "server",
      "--bind-addr", "0.0.0.0:9095",
      "--raft-addr", "0.0.0.0:9096",
      "--rpc-port", "9097",
      "--advertise-addr", "node2",
      "--data-dir", "/data",
      "--node-id", "node-2",
      "--peer", "node-1=node1:9092",
      "--peer", "node-3=node3:9098"
    ]
    depends_on:
      node1:
        condition: service_healthy
    networks:
      - mlog-net

  node3:
    build:
      context: ..
      dockerfile: infra/Dockerfile
    container_name: mlog-node3
    hostname: node3
    ports:
      - "9098:9098"   # Serf
      - "9099:9099"   # Raft
      - "9100:9100"   # RPC
    volumes:
      - node3-data:/data
    command: [
      "server",
      "--bind-addr", "0.0.0.0:9098",
      "--raft-addr", "0.0.0.0:9099",
      "--rpc-port", "9100",
      "--advertise-addr", "node3",
      "--data-dir", "/data",
      "--node-id", "node-3",
      "--peer", "node-1=node1:9092",
      "--peer", "node-2=node2:9095"
    ]
    depends_on:
      node1:
        condition: service_healthy
    networks:
      - mlog-net

volumes:
  node1-data:
  node2-data:
  node3-data:

networks:
  mlog-net:
    driver: bridge

Key points:

  • node1 bootstraps the Raft cluster (--bootstrap true). Only one node should bootstrap.
  • node2 and node3 start after node1 is healthy (depends_on with service_healthy) and join via --peer.
  • Containers bind to 0.0.0.0 but advertise their hostname (e.g., node1) so other containers can resolve them on the Docker network.
  • Each node has three ports exposed to the host for local testing.
  • node1 has a healthcheck that probes the RPC port with nc, with a 20s start period to allow Raft leader election.

Start the cluster

make cluster-up

This builds the Docker images and starts all three nodes. Check the logs:

make cluster-logs

You should see Raft leader election complete and all nodes joining the cluster.

Stop the cluster

make cluster-down

To start fresh (delete all data volumes):

make cluster-restart

Step 3: Run a local cluster (no Docker)

Starting three processes on localhost

Run three processes on your local machine with different ports:

# Build first
make build-server

# Start the 3-node local cluster
./scripts/start-local-cluster.sh

The script starts three nodes sequentially (node1 first, then a 3s pause for Raft to bootstrap, then node2 and node3):

Node Serf Raft RPC Data
node-1 127.0.0.1:9092 127.0.0.1:9093 127.0.0.1:9094 /tmp/mlog-local/node1
node-2 127.0.0.1:9095 127.0.0.1:9096 127.0.0.1:9097 /tmp/mlog-local/node2
node-3 127.0.0.1:9098 127.0.0.1:9099 127.0.0.1:9100 /tmp/mlog-local/node3

Stop the local cluster:

./scripts/stop-local-cluster.sh

The stop script sends SIGTERM, waits up to 10s for graceful shutdown, then SIGKILL any remaining processes. It also cleans up any listeners on the cluster ports via lsof as a safety net.

Step 4: Create topics

Creating the first topic

Create a topic with 3 replicas (one on each node):

make create-topic topic=orders replicas=3

Or using the binary directly:

./bin/topic create --addrs 127.0.0.1:9094 --topic orders --replicas 3

The topic CLI first discovers the Raft leader (via FindRaftLeader), then sends the CreateTopicRequest to it. The Raft leader picks a leader node for the topic and assigns replicas. All nodes apply the Raft event and open local logs.

List topics to verify:

make list-topics

Output shows the topic name, leader node, epoch, and replicas with ISR status:

topic=orders leader=node-1 epoch=1 replicas=node-2(isr=true,leo=0),node-3(isr=true,leo=0)

Delete a topic:

make delete-topic topic=orders

Step 5: Produce messages to a topic

Interactive producer from stdin

Open a producer session that reads from stdin:

make producer-connect topic=orders

Or:

./bin/producer connect --addrs 127.0.0.1:9094 --topic orders

Type messages and press Enter:

connected to topic "orders" leader at node1:9094
enter messages, each line will be produced to the topic (Ctrl-D to exit)
hello world
offset=0
this is a test
offset=1
distributed log works!
offset=2

Each line is appended to the leader's log, assigned an offset, and replicated to followers.

Configuring ack mode for durability

For maximum durability (wait for all replicas):

./bin/producer connect --addrs 127.0.0.1:9094 --topic orders --acks 2

The producer will wait until all ISR replicas have replicated each message before printing the offset. The ack modes are: 0 (none), 1 (leader, default), 2 (all).

Step 6: Consume messages from a topic

Streaming messages from the beginning

In a separate terminal, start a consumer:

make consumer-connect topic=orders

Or:

./bin/consumer connect --addrs 127.0.0.1:9094 --topic orders --id my-consumer --from-beginning

Output (tab-separated offset and value):

connected to topic "orders" leader at node1:9094
Starting from beginning (offset 0)
0	hello world
1	this is a test
2	distributed log works!

The consumer reads from offset 0, prints each message as offset\tvalue, commits its offset to the server after each message, and waits for new data (polling every 500ms).

Resuming from the last committed offset

If you restart the consumer without --from-beginning:

./bin/consumer connect --addrs 127.0.0.1:9094 --topic orders --id my-consumer

It fetches its last committed offset via FetchOffset and continues from there:

Resuming from offset 3 (last committed)

Step 7: Verify data replication across nodes

Accessing data from multiple nodes

You can consume from any node's RPC port (the request is routed to the leader via FindTopicLeader):

# Via node 2's RPC port
./bin/consumer connect --addrs 127.0.0.1:9097 --topic orders --id test --from-beginning

# Via node 3's RPC port
./bin/consumer connect --addrs 127.0.0.1:9100 --topic orders --id test --from-beginning

All nodes return the same data because every node knows the leader's address from Raft metadata.

Inspecting segment files on disk

On each node (e.g., /tmp/mlog-local/node1/orders/):

ls /tmp/mlog-local/node1/orders/
00000000000000000000.idx
00000000000000000000.log

The leader and each replica have the same segment files. The leader's log may be slightly ahead (records between HW and LEO) until replication catches up.

Step 8: Test fault tolerance and recovery

Testing leader failure and recovery

Simulating a leader crash

make list-topics
  1. Kill that node (e.g., if node-1 is the leader):
# In Docker:
docker stop mlog-node1

# Or locally, kill the process
  1. Wait a few seconds for Serf to detect the failure and Raft to elect a new metadata leader.

  2. The topic manager reassigns the topic leader from ISR replicas (via maybeReassignTopicLeaders).

  3. Produce again — the producer auto-reconnects to the new leader:

./bin/producer connect --addrs 127.0.0.1:9097,127.0.0.1:9100 --topic orders
reconnecting (leader change or connection issue)...
reconnected to topic leader at node2:9097
after failover
offset=3
  1. Consume — the consumer also reconnects:
./bin/consumer connect --addrs 127.0.0.1:9097 --topic orders --id my-consumer

All previously committed data (up to HW) is available on the new leader.

Restarting a failed node

# In Docker:
docker start mlog-node1

The restarted node rejoins the cluster via Serf, catches up on Raft metadata events, and resumes as a follower for the topic. Its replication thread fetches any records it missed. The maybeAddReplicasForNode function in the topic manager detects the returning node and adds it back as a replica.

Step 9: Reference the build automation targets

Available Makefile targets

# Key targets from Makefile

build                    # Build all binaries to bin/
build-server             # Build only the server
build-producer           # Build only the producer
build-consumer           # Build only the consumer
build-topic              # Build only the topic CLI

cluster-up               # Start 3-node Docker cluster
cluster-down             # Stop Docker cluster
cluster-restart          # Stop, delete volumes, restart
cluster-logs             # Tail Docker cluster logs

create-topic topic=X replicas=N   # Create a topic
delete-topic topic=X              # Delete a topic
list-topics                       # List all topics

producer-connect topic=X          # Interactive producer (stdin)
consumer-connect topic=X          # Streaming consumer (stdout)

local-cluster-start      # Start 3-node local cluster (scripts/)
local-cluster-stop       # Stop local cluster (scripts/)

test                     # Run all Go tests (go test -v ./...)

All topic/producer/consumer targets accept an optional addrs=HOST:PORT parameter to override the default RPC address (127.0.0.1:9094).

Step 10: Understand the complete system you have built

Summary of features and architecture

By this point you have a working distributed log with:

  • Append-only storage with segments, sparse indexes, and memory-mapped I/O.
  • A wire protocol with length-prefixed frames, JSON codec, and binary replication batches.
  • TCP transport with connection tracking, idle timeout, and RPC handler dispatch.
  • Serf-based discovery — nodes find each other via gossip, no static config.
  • Raft consensus — metadata (topics, leaders, ISR) is consistently replicated across the cluster.
  • Topic management — create, delete, and list topics. Leader assignment with automatic failover.
  • Pull-based replication — per-leader goroutine pool. Followers fetch from the leader. ISR tracks who is caught up. HW advances when all ISR replicas have the data.
  • Producer API — append records with configurable ack modes (none, leader, all). Auto-reconnect on leader change.
  • Consumer API — read by offset with committed offset tracking, auto-reconnect, and resume from last committed offset.
  • Fault tolerance — kill a node and the cluster keeps serving. Restart it and it catches up.

This covers the core architecture of systems like Apache Kafka. The concepts — offsets, segments, replication, ISR, high watermarks, leader election, gossip-based discovery — are the same ones used in production event-streaming platforms.