Cluster Discovery with Serf — Gossip-Based Membership and Failure Detection
Nodes in a distributed log must discover each other without a static configuration file. In this section, you'll implement cluster membership using HashiCorp Serf — a gossip-based discovery system. When a node joins, the cluster learns its RPC and Raft addresses. When a node fails, the cluster detects it and reassigns topic leaders. All code lives in discovery/discovery.go.
Step 1: Understand why Serf
Benefits of gossip-based discovery
You could hardcode peer addresses, but that breaks when nodes are added, removed, or moved. Serf solves this:
- No central coordinator — no single point of failure for discovery.
- Gossip protocol — each node periodically tells a few random peers about the cluster state. Information spreads exponentially.
- Failure detection — Serf probes members and marks unresponsive nodes as failed.
- Custom tags — each node attaches metadata (RPC address, Raft address) that is spread with membership.
Step 2: Define the handler interface
Handler interface for membership changes
Define what the rest of the system needs when membership changes:
// discovery/discovery.go
package discovery
type Handler interface {
Join(id, raftAddr, rpcAddr string) error
Leave(name string) error
}
When Serf detects a new member, Join is called with the node's ID, Raft address, and RPC address. The Coordinator implements this interface — Join adds the node to the Raft cluster, and Leave removes it and triggers leader reassignment.
Note the parameter order: raftAddr comes before rpcAddr. This matches how the coordinator uses them — Raft is the primary cluster communication channel.
Step 3: Create the Membership struct
Membership struct overview
The Membership struct embeds config.Config directly, giving it access to all configuration fields without extra plumbing:
type Membership struct {
config.Config
handler Handler
serf *serf.Serf
events chan serf.Event
logger *zap.Logger
}
By embedding Config, any code with a *Membership can access m.BindAddr, m.NodeConfig, m.RaftConfig, etc. directly.
Step 4: Create the constructor
Creating a Membership instance
The constructor is named New (not NewMembership) — following Go convention since the package is discovery:
func New(handler Handler, config config.Config) (*Membership, error) {
c := &Membership{
Config: config,
handler: handler,
logger: zap.L().Named("discovery"),
}
if err := c.setupSerf(); err != nil {
return nil, err
}
return c, nil
}
The handler is the Coordinator that implements the Handler interface. The config comes from the server startup code and contains bind addresses, node identity, and peer addresses.
Step 5: Set up Serf
Serf initialization and configuration
setupSerf configures the Serf agent, starts the event handler goroutine, and optionally joins existing peers:
func (m *Membership) setupSerf() (err error) {
addr, err := net.ResolveTCPAddr("tcp", m.BindAddr)
if err != nil {
return err
}
config := serf.DefaultConfig()
config.Init()
config.MemberlistConfig.BindAddr = addr.IP.String()
config.MemberlistConfig.BindPort = addr.Port
config.MemberlistConfig.LogOutput = io.Discard
m.events = make(chan serf.Event)
config.EventCh = m.events
rpcAddr, err := m.Config.RPCAddr()
if err != nil {
return err
}
config.Tags = map[string]string{
"rpc_addr": rpcAddr,
"raft_addr": m.RaftConfig.Address,
}
config.NodeName = m.NodeConfig.ID
m.serf, err = serf.Create(config)
if err != nil {
return err
}
go m.eventHandler()
if m.StartJoinAddrs != nil {
_, err = m.serf.Join(m.StartJoinAddrs, true)
if err != nil {
m.logger.Warn("serf join failed", zap.Error(err), zap.Strings("addrs", m.StartJoinAddrs))
}
}
return nil
}
Walk through step by step:
1. Resolve the bind address. m.BindAddr is a string like "127.0.0.1:7946". We split it into IP and port for Serf's memberlist configuration.
2. Create default config and initialize. serf.DefaultConfig() returns sensible defaults for gossip intervals, probe timeouts, etc. config.Init() initializes internal fields.
3. Suppress memberlist logging. Serf's memberlist is chatty. Routing its log output to io.Discard keeps our logs clean — we handle events ourselves.
4. Create the event channel. The channel is unbuffered (make(chan serf.Event)). This means Serf blocks until our event handler reads each event, which is fine because we process events quickly.
5. Compute the RPC address. m.Config.RPCAddr() builds the address from the advertise address (or bind address) plus the RPC port. Here is how RPCAddr() works in config/config.go:
func (c Config) RPCAddr() (string, error) {
if c.AdvertiseAddr != "" {
return fmt.Sprintf("%s:%d", c.AdvertiseAddr, c.NodeConfig.RPCPort), nil
}
host, _, err := net.SplitHostPort(c.BindAddr)
if err != nil {
return "", err
}
return fmt.Sprintf("%s:%d", host, c.NodeConfig.RPCPort), nil
}
If an AdvertiseAddr is set (common in Docker where you bind to 0.0.0.0 but advertise the container hostname), it uses that. Otherwise it extracts the host from BindAddr.
6. Set tags. Tags are key-value metadata that Serf gossips to all members. We store two addresses:
rpc_addr— where clients and replication threads connect (e.g.,node1:9091)raft_addr— where Raft communicates (e.g.,node1:9093)
7. Create Serf and start the event handler. serf.Create() starts the Serf agent. The event handler goroutine begins listening immediately.
8. Join existing peers. If StartJoinAddrs is set (from the --peer flag), join those addresses. The second argument true tells Serf to replay member events for nodes already in the cluster. A failed join is not fatal — this node might be the first one starting.
Note the ordering: the event handler starts before the join. This ensures we don't miss any events from the join process itself.
Key configuration summary:
| Field | Purpose |
|---|---|
NodeName |
Unique identifier for this node (e.g., node1). Must be stable across restarts. |
BindAddr / BindPort |
Address for gossip traffic. All nodes must be reachable on this port. |
Tags |
Key-value metadata. We store rpc_addr and raft_addr. |
EventCh |
Channel where Serf pushes membership events. |
LogOutput |
Set to io.Discard to suppress memberlist's internal logging. |
Step 6: Implement the event handler
Event handler goroutine
The event handler goroutine reads from the event channel and dispatches join and leave events:
func (m *Membership) eventHandler() {
for e := range m.events {
switch e.EventType() {
case serf.EventMemberJoin:
for _, member := range e.(serf.MemberEvent).Members {
if m.isLocal(member) {
continue
}
m.handleJoin(member)
}
case serf.EventMemberLeave, serf.EventMemberFailed:
for _, member := range e.(serf.MemberEvent).Members {
if m.isLocal(member) {
continue
}
m.handleLeave(member)
}
}
}
}
A few things to notice:
- Switch on
EventType(), not a type assertion. Serf events have different types — we only care about member events. - Type-assert to
serf.MemberEventto access theMembersslice. Each event can contain multiple members (e.g., if several nodes joined simultaneously). - Skip the local node. We don't want to add ourselves to our own Raft cluster or try to replicate to ourselves.
- Both
EventMemberLeaveandEventMemberFailedtrigger the samehandleLeavepath. A graceful departure and a crash are handled the same way — remove the node and reassign its work.
Handling join events
Extract the RPC and Raft addresses from the member's tags and call the handler. There is fallback logic: if one tag is missing, use the other:
func (m *Membership) handleJoin(member serf.Member) {
raftAddr := member.Tags["raft_addr"]
rpcAddr := member.Tags["rpc_addr"]
if raftAddr == "" {
raftAddr = rpcAddr
}
if rpcAddr == "" {
rpcAddr = raftAddr
}
err := m.handler.Join(member.Name, raftAddr, rpcAddr)
if err != nil {
m.logError(err, "failed to join", member)
}
}
The fallback (if raftAddr == "" { raftAddr = rpcAddr }) is a safety net. In practice both tags are always set, but during development or misconfiguration, having one address is better than having none.
Handling leave and failure events
func (m *Membership) handleLeave(member serf.Member) {
if err := m.handler.Leave(member.Name); err != nil {
m.logError(err, "failed to leave", member)
}
}
Leave only needs the node ID. The coordinator uses it to remove the node from Raft and trigger topic leader reassignment.
Error logging with level awareness
Not every error deserves the same log level. If a non-leader node receives a join/leave event and tries to modify the Raft cluster, Raft returns raft.ErrNotLeader. This is expected — only the Raft leader can add or remove voters. The helper logs these at Info instead of Error:
func (m *Membership) logError(err error, msg string, member serf.Member) {
log := m.logger.Error
if err == raft.ErrNotLeader {
log = m.logger.Info
}
log(
msg,
zap.Error(err),
zap.String("name", member.Name),
zap.String("rpc_addr", member.Tags["rpc_addr"]),
)
}
This is important because in a 3-node cluster, every node's Serf agent fires join/leave events, but only the Raft leader can act on them. Without this helper, non-leader nodes would flood the logs with errors that are not actually problems.
Checking if member is local
func (m *Membership) isLocal(member serf.Member) bool {
return m.serf.LocalMember().Name == member.Name
}
Step 7: Query the member list
Getting current members
Other parts of the system (e.g., the topic manager when assigning replicas) can query the current members:
func (m *Membership) Members() []serf.Member {
return m.serf.Members()
}
This returns all members, including those in StatusLeft or StatusFailed state. Callers filter by status if they need only alive nodes. This is intentional — the topic manager may need to know about recently-failed nodes to clean up their assignments.
Step 8: Implement graceful shutdown
Leaving the cluster cleanly
When the node shuts down, leave the cluster gracefully so other nodes are notified immediately (instead of waiting for Serf's failure detection timeout, which can take several seconds):
func (m *Membership) Leave() error {
return m.serf.Leave()
}
Step 9: Understand the integration flow
How Serf integrates with the rest of the system
Node starts
│
├── Create Serf with tags (rpc_addr, raft_addr)
│
├── Start event handler goroutine
│
├── Join existing peers (if any)
│
└── Event handler processes events
│
├── Member joins → handler.Join(id, raftAddr, rpcAddr)
│ └── Coordinator adds node to Raft cluster
│ └── TopicManager records node metadata
│
└── Member leaves/fails → handler.Leave(id)
└── Coordinator removes from Raft
└── TopicManager reassigns leaders
Startup sequence:
- The first node starts with no peers. Serf creates a one-node cluster.
- The second node starts with
--start-join-addr node1:gossip_port. Serf gossips the join. - Node 1's event handler fires
Join(node2, raftAddr, rpcAddr). The coordinator (if Raft leader) adds node 2 to the Raft cluster. - The third node joins similarly. Now all three nodes know about each other.
Failure handling:
- Node 2 crashes. Serf detects it via failed probes (within seconds).
- Node 1 and node 3's event handlers fire
Leave(node2). - The coordinator removes node 2 from Raft and the topic manager reassigns any topics that node 2 was leading.
Step 10: Review the complete implementation
Complete discovery.go file
Here is discovery/discovery.go in full:
package discovery
import (
"io"
"net"
"github.com/hashicorp/raft"
"github.com/hashicorp/serf/serf"
"github.com/mohitkumar/mlog/config"
"go.uber.org/zap"
)
type Membership struct {
config.Config
handler Handler
serf *serf.Serf
events chan serf.Event
logger *zap.Logger
}
func New(handler Handler, config config.Config) (*Membership, error) {
c := &Membership{
Config: config,
handler: handler,
logger: zap.L().Named("discovery"),
}
if err := c.setupSerf(); err != nil {
return nil, err
}
return c, nil
}
func (m *Membership) setupSerf() (err error) {
addr, err := net.ResolveTCPAddr("tcp", m.BindAddr)
if err != nil {
return err
}
config := serf.DefaultConfig()
config.Init()
config.MemberlistConfig.BindAddr = addr.IP.String()
config.MemberlistConfig.BindPort = addr.Port
config.MemberlistConfig.LogOutput = io.Discard
m.events = make(chan serf.Event)
config.EventCh = m.events
rpcAddr, err := m.Config.RPCAddr()
if err != nil {
return err
}
config.Tags = map[string]string{
"rpc_addr": rpcAddr,
"raft_addr": m.RaftConfig.Address,
}
config.NodeName = m.NodeConfig.ID
m.serf, err = serf.Create(config)
if err != nil {
return err
}
go m.eventHandler()
if m.StartJoinAddrs != nil {
_, err = m.serf.Join(m.StartJoinAddrs, true)
if err != nil {
m.logger.Warn("serf join failed", zap.Error(err), zap.Strings("addrs", m.StartJoinAddrs))
}
}
return nil
}
type Handler interface {
Join(id, raftAddr, rpcAddr string) error
Leave(name string) error
}
func (m *Membership) eventHandler() {
for e := range m.events {
switch e.EventType() {
case serf.EventMemberJoin:
for _, member := range e.(serf.MemberEvent).Members {
if m.isLocal(member) {
continue
}
m.handleJoin(member)
}
case serf.EventMemberLeave, serf.EventMemberFailed:
for _, member := range e.(serf.MemberEvent).Members {
if m.isLocal(member) {
return
}
m.handleLeave(member)
}
}
}
}
func (m *Membership) handleJoin(member serf.Member) {
raftAddr := member.Tags["raft_addr"]
rpcAddr := member.Tags["rpc_addr"]
if raftAddr == "" {
raftAddr = rpcAddr
}
if rpcAddr == "" {
rpcAddr = raftAddr
}
err := m.handler.Join(member.Name, raftAddr, rpcAddr)
if err != nil {
m.logError(err, "failed to join", member)
}
}
func (m *Membership) handleLeave(member serf.Member) {
if err := m.handler.Leave(member.Name); err != nil {
m.logError(err, "failed to leave", member)
}
}
func (m *Membership) isLocal(member serf.Member) bool {
return m.serf.LocalMember().Name == member.Name
}
func (m *Membership) Members() []serf.Member {
return m.serf.Members()
}
// AliveMembers returns the names of Serf members currently in the Alive state.
func (m *Membership) AliveMembers() []string {
var names []string
for _, member := range m.serf.Members() {
if member.Status == serf.StatusAlive {
names = append(names, member.Name)
}
}
return names
}
// NodeInfo holds identity and address information for a cluster member.
type NodeInfo struct {
Name string
RaftAddr string
RpcAddr string
}
// AliveNodeDetails returns identity and address info for Serf members currently alive.
func (m *Membership) AliveNodeDetails() []NodeInfo {
var nodes []NodeInfo
for _, member := range m.serf.Members() {
if member.Status == serf.StatusAlive {
nodes = append(nodes, NodeInfo{
Name: member.Name,
RaftAddr: member.Tags["raft_addr"],
RpcAddr: member.Tags["rpc_addr"],
})
}
}
return nodes
}
func (m *Membership) Leave() error {
return m.serf.Leave()
}
func (m *Membership) logError(err error, msg string, member serf.Member) {
log := m.logger.Error
if err == raft.ErrNotLeader {
log = m.logger.Info
}
log(
msg,
zap.Error(err),
zap.String("name", member.Name),
zap.String("rpc_addr", member.Tags["rpc_addr"]),
)
}
Summary
| Component | Purpose |
|---|---|
| Serf | Gossip-based cluster membership. No central coordinator. |
| Tags | Per-node metadata (rpc_addr, raft_addr) spread with membership. |
| Event handler | Goroutine that processes join/leave/fail events and calls the Handler interface. |
| Handler interface | Join(id, raftAddr, rpcAddr) and Leave(name) — implemented by the Coordinator. |
| Members() | Returns all Serf members (callers filter by status). |
| logError | Downgrades raft.ErrNotLeader to Info — only the Raft leader can modify cluster membership. |
With discovery in place, the next page integrates Raft consensus — so the cluster can agree on metadata like topic assignments and leader elections.