Running a Multi-Node Distributed Log Cluster End-to-End
In this section, you'll bring it all together and run the distributed log end to end. You'll build the binaries, start a 3-node cluster (with Docker or locally), create topics, produce and consume messages, and verify replication and fault tolerance.
Step 0: Build the binaries
Compiling all binaries
The Makefile compiles five binaries:
make build
This produces:
| Binary | Source | Purpose |
|---|---|---|
bin/mlog |
. (root) |
Root CLI (cobra-based) |
bin/server |
cmd/server |
The broker server (log + Raft + Serf + RPC) |
bin/producer |
cmd/producer |
CLI producer (sends messages from stdin) |
bin/consumer |
cmd/consumer |
CLI consumer (prints messages to stdout) |
bin/topic |
cmd/topic |
Topic management (create, delete, list) |
Individual targets are also available:
make build-server # only the server
make build-producer # only the producer
make build-consumer # only the consumer
make build-topic # only the topic CLI
Step 1: Configure server startup flags
Command-line options for the server
The server binary accepts these flags:
--bind-addr Serf listen address (default 127.0.0.1:9092)
--advertise-addr Address other nodes see (e.g., node1 in Docker)
--rpc-port RPC listen port (default 9094)
--data-dir Where to store log data (default /tmp/mlog)
--node-id Unique node identifier (default node-1)
--raft-addr Raft transport address (default 127.0.0.1:9093)
--bootstrap Bootstrap a new Raft cluster (only set on ONE node)
--peer Peer nodes for Serf join (repeatable: --peer node-2=host:port)
--replication-batch-size Max records per replication fetch (default 5000)
Each node needs three ports: Serf (gossip), Raft (consensus), and RPC (client traffic and replication). All flags can also be set via environment variables with the MLOG_ prefix (e.g., MLOG_NODE_ID=node-1).
Initialization order
The CommandHelper in cmd/server/helper.go wires all components together in the correct order:
// cmd/server/helper.go (setup order)
func NewCommandHelper(config config.Config) (*CommandHelper, error) {
cmdHelper.setupCoordinator() // 1. Create TopicManager + Raft Coordinator
cmdHelper.setupTopicManager() // 2. Wait for Raft, restore from metadata, start replication
cmdHelper.setupRpcServer() // 3. Start RPC server (with ConsumerManager)
cmdHelper.setupMembership() // 4. Join Serf cluster
}
- Coordinator setup creates the
TopicManagerfirst (it implementsMetadataStore), then theCoordinator(Raft). On bootstrap, waits up to 30s for Raft leader election. The bootstrap node adds itself to the metadata store. - TopicManager setup waits for Raft to be ready (up to 15s), then restores topic state from metadata and starts the replication thread.
- RPC server creates the
ConsumerManager(with offset recovery) and starts listening. - Serf membership joins the cluster. As other nodes join/leave, Serf events trigger Raft
Join/Leaveoperations.
Shutdown reverses the order: leave Serf, stop RPC, shutdown Raft.
Step 2: Deploy with Docker Compose (recommended)
Setting up the containerized cluster
The Docker Compose file starts a 3-node cluster with networking handled automatically.
Dockerfile
The multi-stage Dockerfile builds the server in an Alpine container:
# infra/Dockerfile
# Build stage
FROM golang:1.25-alpine AS builder
WORKDIR /app
COPY go.mod go.sum ./
RUN go mod download
COPY . .
RUN CGO_ENABLED=0 go build -o /server ./cmd/server
# Run stage
FROM alpine:3.19
RUN apk --no-cache add ca-certificates netcat-openbsd
WORKDIR /app
COPY /server .
ENTRYPOINT ["/app/server"]
CMD ["--bind-addr", "0.0.0.0:9092", "--raft-addr", "0.0.0.0:9093",
"--rpc-port", "9094", "--data-dir", "/data",
"--node-id", "node-1", "--bootstrap", "false"]
netcat-openbsd is included for the healthcheck (nc -z localhost 9094).
docker-compose.yml
# infra/docker-compose.yml
services:
node1:
build:
context: ..
dockerfile: infra/Dockerfile
container_name: mlog-node1
hostname: node1
ports:
- "9092:9092" # Serf
- "9093:9093" # Raft
- "9094:9094" # RPC
volumes:
- node1-data:/data
command: [
"server",
"--bind-addr", "0.0.0.0:9092",
"--raft-addr", "0.0.0.0:9093",
"--rpc-port", "9094",
"--advertise-addr", "node1",
"--data-dir", "/data",
"--node-id", "node-1",
"--bootstrap", "true",
"--peer", "node-2=node2:9095",
"--peer", "node-3=node3:9098"
]
networks:
- mlog-net
healthcheck:
test: ["CMD", "nc", "-z", "localhost", "9094"]
interval: 3s
timeout: 3s
retries: 10
start_period: 20s
node2:
build:
context: ..
dockerfile: infra/Dockerfile
container_name: mlog-node2
hostname: node2
ports:
- "9095:9095" # Serf
- "9096:9096" # Raft
- "9097:9097" # RPC
volumes:
- node2-data:/data
command: [
"server",
"--bind-addr", "0.0.0.0:9095",
"--raft-addr", "0.0.0.0:9096",
"--rpc-port", "9097",
"--advertise-addr", "node2",
"--data-dir", "/data",
"--node-id", "node-2",
"--peer", "node-1=node1:9092",
"--peer", "node-3=node3:9098"
]
depends_on:
node1:
condition: service_healthy
networks:
- mlog-net
node3:
build:
context: ..
dockerfile: infra/Dockerfile
container_name: mlog-node3
hostname: node3
ports:
- "9098:9098" # Serf
- "9099:9099" # Raft
- "9100:9100" # RPC
volumes:
- node3-data:/data
command: [
"server",
"--bind-addr", "0.0.0.0:9098",
"--raft-addr", "0.0.0.0:9099",
"--rpc-port", "9100",
"--advertise-addr", "node3",
"--data-dir", "/data",
"--node-id", "node-3",
"--peer", "node-1=node1:9092",
"--peer", "node-2=node2:9095"
]
depends_on:
node1:
condition: service_healthy
networks:
- mlog-net
volumes:
node1-data:
node2-data:
node3-data:
networks:
mlog-net:
driver: bridge
Key points:
- node1 bootstraps the Raft cluster (
--bootstrap true). Only one node should bootstrap. - node2 and node3 start after node1 is healthy (
depends_onwithservice_healthy) and join via--peer. - Containers bind to
0.0.0.0but advertise their hostname (e.g.,node1) so other containers can resolve them on the Docker network. - Each node has three ports exposed to the host for local testing.
- node1 has a healthcheck that probes the RPC port with
nc, with a 20s start period to allow Raft leader election.
Start the cluster
make cluster-up
This builds the Docker images and starts all three nodes. Check the logs:
make cluster-logs
You should see Raft leader election complete and all nodes joining the cluster.
Stop the cluster
make cluster-down
To start fresh (delete all data volumes):
make cluster-restart
Step 3: Run a local cluster (no Docker)
Starting three processes on localhost
Run three processes on your local machine with different ports:
# Build first
make build-server
# Start the 3-node local cluster
./scripts/start-local-cluster.sh
The script starts three nodes sequentially (node1 first, then a 3s pause for Raft to bootstrap, then node2 and node3):
| Node | Serf | Raft | RPC | Data |
|---|---|---|---|---|
| node-1 | 127.0.0.1:9092 | 127.0.0.1:9093 | 127.0.0.1:9094 | /tmp/mlog-local/node1 |
| node-2 | 127.0.0.1:9095 | 127.0.0.1:9096 | 127.0.0.1:9097 | /tmp/mlog-local/node2 |
| node-3 | 127.0.0.1:9098 | 127.0.0.1:9099 | 127.0.0.1:9100 | /tmp/mlog-local/node3 |
Stop the local cluster:
./scripts/stop-local-cluster.sh
The stop script sends SIGTERM, waits up to 10s for graceful shutdown, then SIGKILL any remaining processes. It also cleans up any listeners on the cluster ports via lsof as a safety net.
Step 4: Create topics
Creating the first topic
Create a topic with 3 replicas (one on each node):
make create-topic topic=orders replicas=3
Or using the binary directly:
./bin/topic create --addrs 127.0.0.1:9094 --topic orders --replicas 3
The topic CLI first discovers the Raft leader (via FindRaftLeader), then sends the CreateTopicRequest to it. The Raft leader picks a leader node for the topic and assigns replicas. All nodes apply the Raft event and open local logs.
List topics to verify:
make list-topics
Output shows the topic name, leader node, epoch, and replicas with ISR status:
topic=orders leader=node-1 epoch=1 replicas=node-2(isr=true,leo=0),node-3(isr=true,leo=0)
Delete a topic:
make delete-topic topic=orders
Step 5: Produce messages to a topic
Interactive producer from stdin
Open a producer session that reads from stdin:
make producer-connect topic=orders
Or:
./bin/producer connect --addrs 127.0.0.1:9094 --topic orders
Type messages and press Enter:
connected to topic "orders" leader at node1:9094
enter messages, each line will be produced to the topic (Ctrl-D to exit)
hello world
offset=0
this is a test
offset=1
distributed log works!
offset=2
Each line is appended to the leader's log, assigned an offset, and replicated to followers.
Configuring ack mode for durability
For maximum durability (wait for all replicas):
./bin/producer connect --addrs 127.0.0.1:9094 --topic orders --acks 2
The producer will wait until all ISR replicas have replicated each message before printing the offset. The ack modes are: 0 (none), 1 (leader, default), 2 (all).
Step 6: Consume messages from a topic
Streaming messages from the beginning
In a separate terminal, start a consumer:
make consumer-connect topic=orders
Or:
./bin/consumer connect --addrs 127.0.0.1:9094 --topic orders --id my-consumer --from-beginning
Output (tab-separated offset and value):
connected to topic "orders" leader at node1:9094
Starting from beginning (offset 0)
0 hello world
1 this is a test
2 distributed log works!
The consumer reads from offset 0, prints each message as offset\tvalue, commits its offset to the server after each message, and waits for new data (polling every 500ms).
Resuming from the last committed offset
If you restart the consumer without --from-beginning:
./bin/consumer connect --addrs 127.0.0.1:9094 --topic orders --id my-consumer
It fetches its last committed offset via FetchOffset and continues from there:
Resuming from offset 3 (last committed)
Step 7: Verify data replication across nodes
Accessing data from multiple nodes
You can consume from any node's RPC port (the request is routed to the leader via FindTopicLeader):
# Via node 2's RPC port
./bin/consumer connect --addrs 127.0.0.1:9097 --topic orders --id test --from-beginning
# Via node 3's RPC port
./bin/consumer connect --addrs 127.0.0.1:9100 --topic orders --id test --from-beginning
All nodes return the same data because every node knows the leader's address from Raft metadata.
Inspecting segment files on disk
On each node (e.g., /tmp/mlog-local/node1/orders/):
ls /tmp/mlog-local/node1/orders/
00000000000000000000.idx
00000000000000000000.log
The leader and each replica have the same segment files. The leader's log may be slightly ahead (records between HW and LEO) until replication catches up.
Step 8: Test fault tolerance and recovery
Testing leader failure and recovery
Simulating a leader crash
make list-topics
- Kill that node (e.g., if node-1 is the leader):
# In Docker:
docker stop mlog-node1
# Or locally, kill the process
Wait a few seconds for Serf to detect the failure and Raft to elect a new metadata leader.
The topic manager reassigns the topic leader from ISR replicas (via
maybeReassignTopicLeaders).Produce again — the producer auto-reconnects to the new leader:
./bin/producer connect --addrs 127.0.0.1:9097,127.0.0.1:9100 --topic orders
reconnecting (leader change or connection issue)...
reconnected to topic leader at node2:9097
after failover
offset=3
- Consume — the consumer also reconnects:
./bin/consumer connect --addrs 127.0.0.1:9097 --topic orders --id my-consumer
All previously committed data (up to HW) is available on the new leader.
Restarting a failed node
# In Docker:
docker start mlog-node1
The restarted node rejoins the cluster via Serf, catches up on Raft metadata events, and resumes as a follower for the topic. Its replication thread fetches any records it missed. The maybeAddReplicasForNode function in the topic manager detects the returning node and adds it back as a replica.
Step 9: Reference the build automation targets
Available Makefile targets
# Key targets from Makefile
build # Build all binaries to bin/
build-server # Build only the server
build-producer # Build only the producer
build-consumer # Build only the consumer
build-topic # Build only the topic CLI
cluster-up # Start 3-node Docker cluster
cluster-down # Stop Docker cluster
cluster-restart # Stop, delete volumes, restart
cluster-logs # Tail Docker cluster logs
create-topic topic=X replicas=N # Create a topic
delete-topic topic=X # Delete a topic
list-topics # List all topics
producer-connect topic=X # Interactive producer (stdin)
consumer-connect topic=X # Streaming consumer (stdout)
local-cluster-start # Start 3-node local cluster (scripts/)
local-cluster-stop # Stop local cluster (scripts/)
test # Run all Go tests (go test -v ./...)
All topic/producer/consumer targets accept an optional addrs=HOST:PORT parameter to override the default RPC address (127.0.0.1:9094).
Step 10: Understand the complete system you have built
Summary of features and architecture
By this point you have a working distributed log with:
- Append-only storage with segments, sparse indexes, and memory-mapped I/O.
- A wire protocol with length-prefixed frames, JSON codec, and binary replication batches.
- TCP transport with connection tracking, idle timeout, and RPC handler dispatch.
- Serf-based discovery — nodes find each other via gossip, no static config.
- Raft consensus — metadata (topics, leaders, ISR) is consistently replicated across the cluster.
- Topic management — create, delete, and list topics. Leader assignment with automatic failover.
- Pull-based replication — per-leader goroutine pool. Followers fetch from the leader. ISR tracks who is caught up. HW advances when all ISR replicas have the data.
- Producer API — append records with configurable ack modes (none, leader, all). Auto-reconnect on leader change.
- Consumer API — read by offset with committed offset tracking, auto-reconnect, and resume from last committed offset.
- Fault tolerance — kill a node and the cluster keeps serving. Restart it and it catches up.
This covers the core architecture of systems like Apache Kafka. The concepts — offsets, segments, replication, ISR, high watermarks, leader election, gossip-based discovery — are the same ones used in production event-streaming platforms.