Building the Low-Level Log — Segments, Sparse Index, and Storage in Go

Building the Low-Level Log — Segments, Sparse Index, and Storage in Go

In this section, you'll build the entire storage layer from the bottom up. You'll implement byte-level record encoding, a segment (append-only log file + sparse index), the index with memory-mapped I/O, a Log that manages multiple segments, and a LogManager that tracks LEO and the high watermark. All code lives in the segment/ and log/ packages.

Step 1: Understand the record format

Record format

Each record on disk has a fixed-size header followed by the value:

┌────────────┬────────────┬──────────────────────┐
│  Offset    │  Length    │  Value               │
│  8 bytes   │  4 bytes   │  Length bytes        │
│  (uint64)  │  (uint32)  │  (payload)           │
└────────────┴────────────┴──────────────────────┘
  • Offset — 8 bytes, big-endian uint64. The logical offset of this record.
  • Length — 4 bytes, big-endian uint32. The byte length of the value.
  • ValueLength bytes. The actual payload (opaque bytes).

Total size per record = 12 + len(value). Define the constants:

// segment/segment.go
package segment

import (
    "encoding/binary"
)

const (
    IndexIntervalBytes = 4 * 1024    // 4KB
    MaxSegmentBytes    = 1024 * 1024  // 1MB

    lenWidth         = 4 // 4 bytes for message length
    offWidth         = 8
    totalHeaderWidth = lenWidth + offWidth
)

var endian = binary.BigEndian

Why big-endian? Convention. Kafka uses big-endian for its wire protocol. Picking one endianness and using it consistently is what matters.

Step 2: Create the Segment struct

Segment struct overview

A segment owns one contiguous range of offsets. On disk it has two files:

  • {baseOffset}.log — the append-only record data.
  • {baseOffset}.idx — the sparse index.
// segment/segment.go

type Segment struct {
    BaseOffset          uint64       // base offset of the segment
    NextOffset          uint64       // next offset to be assigned
    MaxOffset           uint64       // max offset of the segment
    logFile             *os.File
    index               *Index
    bytesSinceLastIndex uint64
    writePos            atomic.Int64 // current end-of-log position in bytes
    mu                  sync.Mutex   // protects writes (NextOffset, MaxOffset, bytesSinceLastIndex, index)
}

Notice writePos is an atomic.Int64. This lets the Read path read the current write position without holding the mutex — only writers need the lock.

File naming convention

File naming uses zero-padded 20-digit offsets so segments sort lexicographically:

func formatLogFileName(baseOffset uint64) string {
    return fmt.Sprintf("%020d.log", baseOffset)
}

func formatIndexFileName(baseOffset uint64) string {
    return fmt.Sprintf("%020d.idx", baseOffset)
}

Step 3: Create a new segment

NewSegment function

NewSegment creates fresh .log and .idx files for a given base offset:

func NewSegment(baseOffset uint64, dir string) (*Segment, error) {
    logFilePath := filepath.Join(dir, formatLogFileName(baseOffset))
    logFile, err := os.OpenFile(logFilePath, os.O_RDWR|os.O_CREATE|os.O_APPEND, 0644)
    if err != nil {
        return nil, err
    }
    indexFilePath := filepath.Join(dir, formatIndexFileName(baseOffset))
    index, err := OpenIndex(indexFilePath)
    if err != nil {
        logFile.Close()
        return nil, err
    }
    return &Segment{
        BaseOffset: baseOffset,
        NextOffset: baseOffset,
        logFile:    logFile,
        index:      index,
    }, nil
}

There is no buffered writer — writes go directly to the OS file using a writeFull helper that retries on short writes:

Helper function for writing

// writeFull writes the entire buffer to f, retrying on short writes.
func writeFull(f *os.File, buf []byte) (int, error) {
    total := 0
    for len(buf) > 0 {
        n, err := f.Write(buf)
        if err != nil {
            return total, err
        }
        total += n
        buf = buf[n:]
    }
    return total, nil
}

Step 4: Implement append (encoding and writing)

Single record append

To append a record:

  1. Assign the next offset.
  2. Build a single buffer: [Offset 8 bytes][Length 4 bytes][Value].
  3. Write it in one syscall via writeFull.
  4. Every 4 KB (or for the first record), write a sparse index entry.
  5. Advance counters atomically.
func (s *Segment) Append(value []byte) (uint64, error) {
    s.mu.Lock()
    defer s.mu.Unlock()

    offset := s.NextOffset
    buf := make([]byte, totalHeaderWidth+len(value))
    endian.PutUint64(buf[0:offWidth], offset)
    endian.PutUint32(buf[offWidth:totalHeaderWidth], uint32(len(value)))
    copy(buf[totalHeaderWidth:], value)
    if _, err := writeFull(s.logFile, buf); err != nil {
        return 0, err
    }

    currWritePos := s.writePos.Load()
    s.bytesSinceLastIndex += uint64(totalHeaderWidth + len(value))
    if s.bytesSinceLastIndex >= IndexIntervalBytes || offset == s.BaseOffset {
        if err := s.index.Write(uint32(offset-s.BaseOffset), uint64(currWritePos)); err != nil {
            return 0, err
        }
        s.bytesSinceLastIndex = 0
    }
    s.writePos.Add(int64(totalHeaderWidth + len(value)))
    s.NextOffset++
    s.MaxOffset = offset
    return offset, nil
}

The index stores relative offsets (offset minus base offset) to save 4 bytes per entry. The position is the byte offset in the .log file where the record starts.

Batch append for performance

For replication and high-throughput producers, AppendBatch writes multiple records in a single syscall:

func (s *Segment) AppendBatch(values [][]byte) (uint64, error) {
    s.mu.Lock()
    defer s.mu.Unlock()

    if len(values) == 0 {
        return s.NextOffset, nil
    }

    baseOffset := s.NextOffset

    // Calculate total buffer size
    totalSize := 0
    for _, v := range values {
        totalSize += totalHeaderWidth + len(v)
    }

    // Build combined buffer — one allocation, one syscall
    buf := make([]byte, totalSize)
    pos := 0
    for i, value := range values {
        offset := baseOffset + uint64(i)
        endian.PutUint64(buf[pos:pos+offWidth], offset)
        endian.PutUint32(buf[pos+offWidth:pos+totalHeaderWidth], uint32(len(value)))
        copy(buf[pos+totalHeaderWidth:], value)
        pos += totalHeaderWidth + len(value)
    }

    // Single write syscall for the entire batch
    if _, err := writeFull(s.logFile, buf); err != nil {
        return 0, err
    }

    // Update index entries
    currWritePos := s.writePos.Load()
    pos = 0
    for i, value := range values {
        offset := baseOffset + uint64(i)
        recordSize := totalHeaderWidth + len(value)
        s.bytesSinceLastIndex += uint64(recordSize)
        if s.bytesSinceLastIndex >= IndexIntervalBytes || offset == s.BaseOffset {
            if err := s.index.Write(uint32(offset-s.BaseOffset), uint64(currWritePos+int64(pos))); err != nil {
                return 0, err
            }
            s.bytesSinceLastIndex = 0
        }
        pos += recordSize
    }

    s.writePos.Add(int64(totalSize))
    count := uint64(len(values))
    s.NextOffset = baseOffset + count
    s.MaxOffset = baseOffset + count - 1
    return baseOffset, nil
}

This is critical for replication performance — writing 5000 records as one buffer is vastly faster than 5000 individual Write calls.

Step 5: Build the sparse index

Index entry format

Each index entry is 12 bytes:

┌────────────────────┬────────────────────┐
│  Relative Offset   │  Byte Position     │
│  4 bytes (uint32)  │  8 bytes (uint64)  │
└────────────────────┴────────────────────┘

Index struct with mmap

The index file is memory-mapped so reads are fast (no syscalls — just pointer dereferences into the kernel page cache):

Index struct implementation

// segment/index.go package segment

import ( "encoding/binary" "os"

"github.com/tysonmote/gommap"

)

const ( IndexEntrySize = 4 + 8 // relOffset (4) + position (8) = 12 bytes IndexSize = 12 * 1024 // 12KB initial size (1024 entries) )

var indexEndian = binary.BigEndian

type IndexEntry struct { RelativeOffset uint32 Position uint64 }

type Index struct { file *os.File mmap gommap.MMap size int64 // current number of bytes written }


### Opening an index file

```go
func OpenIndex(filePath string) (*Index, error) {
    file, err := os.OpenFile(filePath, os.O_RDWR|os.O_CREATE|os.O_APPEND, 0644)
    if err != nil {
        return nil, err
    }

    stat, err := file.Stat()
    if err != nil {
        return nil, err
    }
    initalSize := stat.Size()
    if initalSize == 0 {
        if err := file.Truncate(IndexSize); err != nil {
            return nil, err
        }
    }
    m, err := gommap.Map(file.Fd(), gommap.PROT_READ|gommap.PROT_WRITE, gommap.MAP_SHARED)
    if err != nil {
        return nil, err
    }

    idx := &Index{
        file: file,
        mmap: m,
        size: initalSize,
    }
    return idx, nil
}

Writing an index entry

Write at the current size offset, then advance:

func (idx *Index) Write(relOffset uint32, position uint64) error {
    if idx.size+IndexEntrySize > int64(len(idx.mmap)) {
        if err := idx.grow(); err != nil {
            return err
        }
    }

    buf := idx.mmap[idx.size : idx.size+IndexEntrySize]
    indexEndian.PutUint32(buf[0:4], relOffset)
    indexEndian.PutUint64(buf[4:12], position)

    idx.size += IndexEntrySize
    return nil
}

Growing the index dynamically

The grow() method uses UnsafeUnmap(), doubles the file size with Truncate, and re-maps:

func (idx *Index) grow() error {
    newSize := int64(len(idx.mmap)) * 2

    if err := idx.mmap.UnsafeUnmap(); err != nil {
        return err
    }
    if err := idx.file.Truncate(newSize); err != nil {
        return err
    }
    m, err := gommap.Map(idx.file.Fd(), gommap.PROT_READ|gommap.PROT_WRITE, gommap.MAP_SHARED)
    if err != nil {
        return err
    }
    idx.mmap = m
    return nil
}

This is infrequent because 12 KB holds 1024 entries.

Reading a single index entry

func (idx *Index) Entry(i int64) IndexEntry {
    pos := i * IndexEntrySize
    buf := idx.mmap[pos : pos+IndexEntrySize]

    return IndexEntry{
        RelativeOffset: indexEndian.Uint32(buf[0:4]),
        Position:       indexEndian.Uint64(buf[4:12]),
    }
}

Binary search for efficient lookups

Find the largest index entry whose relative offset is <= the target. This gives the closest starting position for a forward scan:

func (idx *Index) Find(relOffset uint32) (IndexEntry, bool) {
    count := idx.size / IndexEntrySize
    if count == 0 {
        return IndexEntry{}, false
    }

    var result IndexEntry
    low, high := int64(0), count-1

    for low <= high {
        mid := (low + high) / 2
        entry := idx.Entry(mid)

        if entry.RelativeOffset <= relOffset {
            result = entry
            low = mid + 1
        } else {
            high = mid - 1
        }
    }
    return result, true
}

Index helper methods

func (idx *Index) Last() (IndexEntry, bool) {
    count := idx.size / IndexEntrySize
    if count == 0 {
        return IndexEntry{}, false
    }
    return idx.Entry(count - 1), true
}

func (idx *Index) TruncateAfter(position uint64) error {
    var truncateSize int64 = idx.size
    count := idx.size / IndexEntrySize

    for i := count - 1; i >= 0; i-- {
        entry := idx.Entry(i)
        if entry.Position <= position {
            break
        }
        truncateSize -= IndexEntrySize
    }

    if truncateSize == idx.size {
        return nil
    }
    idx.size = truncateSize
    return nil
}

Closing the index and syncing to disk

On close, sync the mmap, sync the file, unmap, truncate to the actual size (removing unused pre-allocated space), and close:

func (idx *Index) Close() error {
    if err := idx.mmap.Sync(gommap.MS_SYNC); err != nil {
        return err
    }
    if err := idx.file.Sync(); err != nil {
        return err
    }
    if err := idx.mmap.UnsafeUnmap(); err != nil {
        return err
    }
    if err := idx.file.Truncate(int64(idx.size)); err != nil {
        return err
    }
    return idx.file.Close()
}

Step 6: Implement read with bulk-loading and scanning

Read strategy with index lookups

To read a record by offset:

  1. Check the offset is in range [BaseOffset, NextOffset).
  2. Compute the relative offset: relOffset = offset - BaseOffset.
  3. Binary search the index for the starting position.
  4. Bulk-read the region from that position to writePos into memory (one syscall).
  5. Scan forward in memory until you find the target offset.
func (s *Segment) Read(offset uint64) ([]byte, error) {
    writePos := s.writePos.Load()

    if offset < s.BaseOffset || offset >= s.NextOffset {
        return nil, errs.ErrSegmentOffsetOutOfRange(offset, s.BaseOffset, s.NextOffset)
    }

    // Use sparse index to find starting position (floor entry <= offset)
    var startPos int64
    relOffset := uint32(offset - s.BaseOffset)
    if indexEntry, found := s.index.Find(relOffset); found {
        startPos = int64(indexEntry.Position)
    }

    // Bulk read the region from startPos to writePos into memory — one syscall
    regionSize := writePos - startPos
    region := make([]byte, regionSize)
    n, err := s.logFile.ReadAt(region, startPos)
    if err != nil && err != io.EOF {
        return nil, err
    }
    region = region[:n]

    // Scan in-memory buffer — no further syscalls
    pos := 0
    for pos+totalHeaderWidth <= len(region) {
        foundOffset := endian.Uint64(region[pos : pos+offWidth])
        msgLen := int(endian.Uint32(region[pos+offWidth : pos+totalHeaderWidth]))

        if foundOffset == offset {
            end := pos + totalHeaderWidth + msgLen
            if end > len(region) {
                return nil, io.ErrUnexpectedEOF
            }
            value := make([]byte, offWidth+msgLen)
            endian.PutUint64(value[0:offWidth], foundOffset)
            copy(value[offWidth:], region[pos+totalHeaderWidth:end])
            return value, nil
        }
        if foundOffset > offset {
            return nil, errs.ErrSegmentOffsetNotFound
        }
        pos += totalHeaderWidth + msgLen
    }
    return nil, io.EOF
}

Notice that Read returns [Offset 8 bytes][Value] — not just the raw value. The 8-byte offset prefix is used by the Raft log store and the replication streaming reader.

Why bulk-read? Instead of reading each record header individually (one ReadAt syscall per record), we read the entire region from the index entry to writePos in a single syscall, then scan in memory. This is much faster because between index entries there could be dozens of records.

Step 7: Add segment utility methods

Check if segment is full and closing

func (s *Segment) IsFull() bool {
    return s.writePos.Load() >= MaxSegmentBytes
}

func (s *Segment) Close() error {
    s.mu.Lock()
    defer s.mu.Unlock()

    if err := s.logFile.Close(); err != nil {
        return err
    }
    if err := s.index.Close(); err != nil {
        return err
    }
    return nil
}

Step 8: Implement streaming reader for replication

Streaming read concept

For replication, you need to read a contiguous range of records efficiently. The segment provides two reader methods:

// Reader returns an io.Reader that streams all records from BaseOffset.
func (s *Segment) Reader() io.Reader {
    if s.BaseOffset >= s.NextOffset {
        return bytes.NewReader(nil)
    }
    r, err := s.NewStreamingReader(s.BaseOffset)
    if err != nil {
        return bytes.NewReader(nil)
    }
    return r
}

// NewStreamingReader returns an io.Reader starting at the physical position
// corresponding to startOffset. It reads until the current end-of-segment.
func (s *Segment) NewStreamingReader(startOffset uint64) (io.Reader, error) {
    currentWritePos := s.writePos.Load()

    if startOffset < s.BaseOffset || startOffset >= s.NextOffset {
        return nil, errs.ErrSegmentOffsetOutOfRangeSimple(startOffset)
    }

    // Use the index to find the starting physical position
    relOffset := uint32(startOffset - s.BaseOffset)
    indexEntry, ok := s.index.Find(relOffset)
    if !ok {
        return nil, errs.ErrSegmentIndexNotFound
    }

    // Calculate how many bytes are available to read from that position
    size := currentWritePos - int64(indexEntry.Position)

    // SectionReader acts like a private io.Reader for this consumer
    section := io.NewSectionReader(s.logFile, int64(indexEntry.Position), size)

    return s.catchUp(section, startOffset)
}

CatchUp helper for skipping to target offset

The catchUp helper skips records between the index entry and the target offset:

func (s *Segment) catchUp(r io.Reader, target uint64) (io.Reader, error) {
    for {
        header := make([]byte, 12)
        if _, err := io.ReadFull(r, header); err != nil {
            return nil, err
        }

        offset := binary.BigEndian.Uint64(header[0:8])
        msgLen := binary.BigEndian.Uint32(header[8:12])

        if offset >= target {
            // Found the start — join the header back with the rest of the stream
            return io.MultiReader(bytes.NewReader(header), r), nil
        }

        // Not there yet, skip the payload
        if _, err := io.CopyN(io.Discard, r, int64(msgLen)); err != nil {
            return nil, err
        }
    }
}

Step 9: Implement segment recovery on restart

Recovery process overview

When the process restarts, it must recover existing segments. LoadExistingSegment opens the .log and .idx files and calls Recover to discover the true NextOffset and writePos:

func LoadExistingSegment(baseOffset uint64, dir string) (*Segment, error) {
    logFilePath := filepath.Join(dir, formatLogFileName(baseOffset))
    logFile, err := os.OpenFile(logFilePath, os.O_RDWR|os.O_APPEND, 0644)
    if err != nil {
        return nil, err
    }

    indexFilePath := filepath.Join(dir, formatIndexFileName(baseOffset))
    index, err := OpenIndex(indexFilePath)
    if err != nil {
        logFile.Close()
        return nil, err
    }
    segment := &Segment{
        BaseOffset: baseOffset,
        NextOffset: baseOffset,
        logFile:    logFile,
        index:      index,
    }
    if err := segment.Recover(); err != nil {
        logFile.Close()
        index.Close()
        return nil, err
    }
    return segment, nil
}

Recover method with validation and truncation

The Recover method starts from the last known index entry, scans forward using a buffered reader, validates each record, and truncates at the last valid position:

func (s *Segment) Recover() error {
    var (
        startPos   int64 = 0
        nextOffset       = s.BaseOffset
    )

    // 1. Start from the last healthy index entry
    lastEntry, ok := s.index.Last()
    if ok {
        startPos = int64(lastEntry.Position)
        nextOffset = s.BaseOffset + uint64(lastEntry.RelativeOffset)
    }

    // 2. Seek to the checkpoint
    if _, err := s.logFile.Seek(startPos, io.SeekStart); err != nil {
        return errs.ErrSeekFailed(err)
    }

    // Buffered reader for recovery — avoids thousands of small read syscalls
    reader := bufio.NewReader(s.logFile)
    currPos := startPos
    currOffset := nextOffset

    for {
        // Read Header: [Offset: 8][Len: 4]
        header, err := reader.Peek(12)
        if err != nil {
            if err == io.EOF || err == io.ErrUnexpectedEOF {
                break
            }
            return err
        }

        recOffset := binary.BigEndian.Uint64(header[0:8])
        recLen := binary.BigEndian.Uint32(header[8:12])

        // Logic check: prevents reading partial "ghost" writes from a crash
        if recOffset != currOffset {
            break
        }

        entrySize := int64(12 + recLen)

        if _, err := reader.Discard(int(entrySize)); err != nil {
            break // Partial record at end of file
        }

        currPos += entrySize
        currOffset++
    }

    if err := s.logFile.Truncate(currPos); err != nil {
        return errs.ErrTruncateFailed(err)
    }
    if _, err := s.logFile.Seek(currPos, io.SeekStart); err != nil {
        return err
    }

    // Reset state
    s.writePos.Store(currPos)
    s.NextOffset = currOffset
    if currOffset > s.BaseOffset {
        s.MaxOffset = currOffset - 1
    }
    // Remove index entries that point to truncated/corrupt area
    if err := s.index.TruncateAfter(uint64(currPos)); err != nil {
        return errs.ErrIndexSyncFailed(err)
    }

    return nil
}

Step 10: Build the Log type for managing multiple segments

Log struct overview

The Log type holds a sorted list of segments and an active segment:

// log/log.go
package log

type Log struct {
    mu            sync.RWMutex
    Dir           string
    segments      []*segment.Segment
    activeSegment *segment.Segment
}

Creating a new log

NewLog scans the directory for existing .log files. If found, it loads and recovers each segment. Otherwise it creates the first segment at offset 0:

func NewLog(dir string) (*Log, error) {
    log := &Log{
        Dir:      dir,
        segments: make([]*segment.Segment, 0),
    }

    err := os.MkdirAll(dir, 0755)
    if err != nil {
        return nil, err
    }

    dirEnt, err := os.ReadDir(dir)
    if err != nil {
        return nil, err
    }
    if len(dirEnt) > 0 {
        // Load existing segments
        var baseOffsets []uint64
        for _, entry := range dirEnt {
            var baseOffset uint64
            n, err := fmt.Sscanf(entry.Name(), "%020d.log", &baseOffset)
            if n == 1 && err == nil {
                baseOffsets = append(baseOffsets, baseOffset)
            }
        }
        sort.Slice(baseOffsets, func(i int, j int) bool {
            return baseOffsets[i] < baseOffsets[j]
        })
        for _, baseOffset := range baseOffsets {
            seg, err := segment.LoadExistingSegment(baseOffset, dir)
            if err != nil {
                return nil, err
            }
            log.segments = append(log.segments, seg)
        }
        log.activeSegment = log.segments[len(log.segments)-1]
        return log, nil
    }
    // No existing segments, create a new one
    activeSegment, err := segment.NewSegment(0, dir)
    if err != nil {
        return nil, err
    }
    log.segments = append(log.segments, activeSegment)
    log.activeSegment = activeSegment
    return log, nil
}

Binary search for segment lookups

The findSegment method uses sort.Search for O(log n) lookups instead of linear scanning:

func (l *Log) findSegment(offset uint64) *segment.Segment {
    n := len(l.segments)
    if n == 0 {
        return nil
    }
    // Binary search: find the last segment with BaseOffset <= offset
    i := sort.Search(n, func(i int) bool {
        return l.segments[i].BaseOffset > offset
    }) - 1
    if i < 0 {
        return nil
    }
    seg := l.segments[i]
    if offset >= seg.NextOffset {
        return nil
    }
    return seg
}

Rolling segments when full

When the active segment is full, create a new one:

func (l *Log) rollSegment() error {
    newSeg, err := segment.NewSegment(l.activeSegment.NextOffset, l.Dir)
    if err != nil {
        return err
    }
    l.segments = append(l.segments, newSeg)
    l.activeSegment = newSeg
    return nil
}

Append to the active segment

Append to the active segment. If it is full, roll to a new one:

func (l *Log) Append(value []byte) (uint64, error) {
    l.mu.Lock()
    defer l.mu.Unlock()
    off, err := l.activeSegment.Append(value)
    if err != nil {
        return 0, err
    }
    if l.activeSegment.IsFull() {
        if err := l.rollSegment(); err != nil {
            return 0, err
        }
    }
    return off, nil
}

Batch append for Log

AppendBatch works the same way but delegates to Segment.AppendBatch:

func (l *Log) AppendBatch(values [][]byte) (uint64, error) {
    l.mu.Lock()
    defer l.mu.Unlock()
    baseOff, err := l.activeSegment.AppendBatch(values)
    if err != nil {
        return 0, err
    }
    if l.activeSegment.IsFull() {
        if err := l.rollSegment(); err != nil {
            return 0, err
        }
    }
    return baseOff, nil
}

Read from the correct segment

Find which segment contains the offset using binary search, then delegate:

func (l *Log) Read(offset uint64) ([]byte, error) {
    l.mu.RLock()
    defer l.mu.RUnlock()
    seg := l.findSegment(offset)
    if seg == nil {
        return nil, errs.ErrLogOffsetOutOfRangef(offset)
    }
    return seg.Read(offset)
}

Log accessor methods

func (l *Log) LowestOffset() uint64 {
    l.mu.RLock()
    defer l.mu.RUnlock()
    if len(l.segments) == 0 {
        return 0
    }
    return l.segments[0].BaseOffset
}

func (l *Log) HighestOffset() uint64 {
    l.mu.RLock()
    defer l.mu.RUnlock()
    if l.activeSegment == nil {
        return 0
    }
    return l.activeSegment.MaxOffset
}

// IsEmpty returns true if the log has no entries.
// Used by Raft log store to distinguish "no entries" from "first entry at offset 0".
func (l *Log) IsEmpty() bool {
    l.mu.RLock()
    defer l.mu.RUnlock()
    if len(l.segments) == 0 {
        return true
    }
    seg := l.segments[0]
    return seg.NextOffset == seg.BaseOffset
}

Streaming reader across segments

The ReaderFrom method returns an io.Reader that streams raw segment records from a starting offset across multiple segments. Used for replication:

func (l *Log) ReaderFrom(startOffset uint64) (io.Reader, error) {
    l.mu.RLock()
    defer l.mu.RUnlock()
    endOffset := uint64(0)
    if l.activeSegment != nil {
        endOffset = l.activeSegment.NextOffset
    }
    if startOffset >= endOffset {
        return bytes.NewReader(nil), nil
    }
    targetIdx := l.findSegmentIndex(startOffset)
    if targetIdx < 0 {
        return nil, errs.ErrLogOffsetOutOfRangef(startOffset)
    }
    seg := l.segments[targetIdx]
    r, err := seg.NewStreamingReader(startOffset)
    if err != nil {
        return nil, err
    }
    if targetIdx == len(l.segments)-1 {
        return r, nil
    }
    readers := make([]io.Reader, 0, len(l.segments)-targetIdx)
    readers = append(readers, r)
    for i := targetIdx + 1; i < len(l.segments); i++ {
        readers = append(readers, l.segments[i].Reader())
    }
    return io.MultiReader(readers...), nil
}

Step 11: Add LogManager to track LEO and high watermark

LogManager concepts

The LogManager wraps Log and adds the two key distributed-log concepts:

  • LEO (Log End Offset) — the next offset to be written. Advances on every append.
  • High Watermark (HW) — the highest committed offset. Consumers read only up to HW.

Both use atomic.Uint64 for lock-free reads:

LogManager struct

// log/log_manager.go
package log

import (
    "sync/atomic"

    "github.com/mohitkumar/mlog/errs"
)

type LogManager struct {
    *Log
    leo           atomic.Uint64 // Log End Offset
    highWatermark atomic.Uint64
}

func NewLogManager(dir string) (*LogManager, error) {
    log, err := NewLog(dir)
    if err != nil {
        return nil, err
    }

    lm := &LogManager{
        Log: log,
    }

    // Initialize LEO from the active segment's NextOffset (restart scenario)
    if log.activeSegment != nil {
        lm.leo.Store(log.activeSegment.NextOffset)
    }

    return lm, nil
}

LEO and high watermark accessors

func (l *LogManager) LEO() uint64 {
    return l.leo.Load()
}

func (l *LogManager) SetLEO(leo uint64) {
    l.leo.Store(leo)
}

func (l *LogManager) HighWatermark() uint64 {
    return l.highWatermark.Load()
}

func (l *LogManager) SetHighWatermark(highWatermark uint64) {
    l.highWatermark.Store(highWatermark)
}

Append and batch append (advancing LEO)

func (l *LogManager) Append(value []byte) (uint64, error) {
    offset, err := l.Log.Append(value)
    if err != nil {
        return 0, err
    }

    // LEO is the next offset to write, so after writing at offset N, LEO becomes N+1
    l.leo.Store(offset + 1)

    return offset, nil
}

AppendBatch for LogManager

func (l *LogManager) AppendBatch(values [][]byte) (uint64, error) {
    baseOffset, err := l.Log.AppendBatch(values)
    if err != nil {
        return 0, err
    }

    l.leo.Store(baseOffset + uint64(len(values)))

    return baseOffset, nil
}

Read with high watermark enforcement

Normal consumers read only up to HW:

func (l *LogManager) Read(offset uint64) ([]byte, error) {
    hw := l.highWatermark.Load()
    // Consumers should only read up to (and including) the high watermark
    if offset > hw {
        return nil, errs.ErrLogOffsetBeyondHWf(offset, hw)
    }

    return l.Log.Read(offset)
}

ReadUncommitted for replication

Follower replicas need to read beyond HW (up to LEO) to catch up:

func (l *LogManager) ReadUncommitted(offset uint64) ([]byte, error) {
    return l.Log.Read(offset)
}

Step 12: Test your implementation

Verify your build

Run tests for both packages:

go test ./segment/... -v
go test ./log/... -v

Write tests that:

  1. Create a segment, append several records, and read each by offset.
  2. Verify the sparse index produces correct results (append > 4 KB of data to trigger index entries).
  3. Create a Log, append enough data to trigger segment rotation, and read across segments.
  4. Test LogManager: append, read within HW, verify ReadUncommitted works beyond HW, advance HW and re-read.
  5. Test recovery: create a segment, close it, reopen with LoadExistingSegment, and verify NextOffset and reads are correct.

Summary

Component What it does
Record format [Offset 8][Length 4][Value] — big-endian, self-describing.
Segment One .log file (append-only) + one .idx file (sparse index). Append encodes and writes with writeFull; Read bulk-reads and scans. atomic.Int64 writePos for lock-free reads.
Index Fixed 12-byte entries (relative offset + position). Memory-mapped with gommap. Binary search for lookups. Grows dynamically by doubling.
Log Multiple segments sorted by base offset. Binary search to find segments. Append to active segment; roll when full.
LogManager Wraps Log with atomic.Uint64 LEO and HW. Read enforces HW; ReadUncommitted goes up to LEO. Batch append for replication.

With the storage layer complete, the next page builds the wire protocol — the frame format and message types that let nodes and clients communicate over TCP.