Building the Low-Level Log — Segments, Sparse Index, and Storage in Go
In this section, you'll build the entire storage layer from the bottom up. You'll implement byte-level record encoding, a segment (append-only log file + sparse index), the index with memory-mapped I/O, a Log that manages multiple segments, and a LogManager that tracks LEO and the high watermark. All code lives in the segment/ and log/ packages.
Step 1: Understand the record format
Record format
Each record on disk has a fixed-size header followed by the value:
┌────────────┬────────────┬──────────────────────┐
│ Offset │ Length │ Value │
│ 8 bytes │ 4 bytes │ Length bytes │
│ (uint64) │ (uint32) │ (payload) │
└────────────┴────────────┴──────────────────────┘
- Offset — 8 bytes, big-endian uint64. The logical offset of this record.
- Length — 4 bytes, big-endian uint32. The byte length of the value.
- Value —
Lengthbytes. The actual payload (opaque bytes).
Total size per record = 12 + len(value). Define the constants:
// segment/segment.go
package segment
import (
"encoding/binary"
)
const (
IndexIntervalBytes = 4 * 1024 // 4KB
MaxSegmentBytes = 1024 * 1024 // 1MB
lenWidth = 4 // 4 bytes for message length
offWidth = 8
totalHeaderWidth = lenWidth + offWidth
)
var endian = binary.BigEndian
Why big-endian? Convention. Kafka uses big-endian for its wire protocol. Picking one endianness and using it consistently is what matters.
Step 2: Create the Segment struct
Segment struct overview
A segment owns one contiguous range of offsets. On disk it has two files:
{baseOffset}.log— the append-only record data.{baseOffset}.idx— the sparse index.
// segment/segment.go
type Segment struct {
BaseOffset uint64 // base offset of the segment
NextOffset uint64 // next offset to be assigned
MaxOffset uint64 // max offset of the segment
logFile *os.File
index *Index
bytesSinceLastIndex uint64
writePos atomic.Int64 // current end-of-log position in bytes
mu sync.Mutex // protects writes (NextOffset, MaxOffset, bytesSinceLastIndex, index)
}
Notice writePos is an atomic.Int64. This lets the Read path read the current write position without holding the mutex — only writers need the lock.
File naming convention
File naming uses zero-padded 20-digit offsets so segments sort lexicographically:
func formatLogFileName(baseOffset uint64) string {
return fmt.Sprintf("%020d.log", baseOffset)
}
func formatIndexFileName(baseOffset uint64) string {
return fmt.Sprintf("%020d.idx", baseOffset)
}
Step 3: Create a new segment
NewSegment function
NewSegment creates fresh .log and .idx files for a given base offset:
func NewSegment(baseOffset uint64, dir string) (*Segment, error) {
logFilePath := filepath.Join(dir, formatLogFileName(baseOffset))
logFile, err := os.OpenFile(logFilePath, os.O_RDWR|os.O_CREATE|os.O_APPEND, 0644)
if err != nil {
return nil, err
}
indexFilePath := filepath.Join(dir, formatIndexFileName(baseOffset))
index, err := OpenIndex(indexFilePath)
if err != nil {
logFile.Close()
return nil, err
}
return &Segment{
BaseOffset: baseOffset,
NextOffset: baseOffset,
logFile: logFile,
index: index,
}, nil
}
There is no buffered writer — writes go directly to the OS file using a writeFull helper that retries on short writes:
Helper function for writing
// writeFull writes the entire buffer to f, retrying on short writes.
func writeFull(f *os.File, buf []byte) (int, error) {
total := 0
for len(buf) > 0 {
n, err := f.Write(buf)
if err != nil {
return total, err
}
total += n
buf = buf[n:]
}
return total, nil
}
Step 4: Implement append (encoding and writing)
Single record append
To append a record:
- Assign the next offset.
- Build a single buffer:
[Offset 8 bytes][Length 4 bytes][Value]. - Write it in one syscall via
writeFull. - Every 4 KB (or for the first record), write a sparse index entry.
- Advance counters atomically.
func (s *Segment) Append(value []byte) (uint64, error) {
s.mu.Lock()
defer s.mu.Unlock()
offset := s.NextOffset
buf := make([]byte, totalHeaderWidth+len(value))
endian.PutUint64(buf[0:offWidth], offset)
endian.PutUint32(buf[offWidth:totalHeaderWidth], uint32(len(value)))
copy(buf[totalHeaderWidth:], value)
if _, err := writeFull(s.logFile, buf); err != nil {
return 0, err
}
currWritePos := s.writePos.Load()
s.bytesSinceLastIndex += uint64(totalHeaderWidth + len(value))
if s.bytesSinceLastIndex >= IndexIntervalBytes || offset == s.BaseOffset {
if err := s.index.Write(uint32(offset-s.BaseOffset), uint64(currWritePos)); err != nil {
return 0, err
}
s.bytesSinceLastIndex = 0
}
s.writePos.Add(int64(totalHeaderWidth + len(value)))
s.NextOffset++
s.MaxOffset = offset
return offset, nil
}
The index stores relative offsets (offset minus base offset) to save 4 bytes per entry. The position is the byte offset in the .log file where the record starts.
Batch append for performance
For replication and high-throughput producers, AppendBatch writes multiple records in a single syscall:
func (s *Segment) AppendBatch(values [][]byte) (uint64, error) {
s.mu.Lock()
defer s.mu.Unlock()
if len(values) == 0 {
return s.NextOffset, nil
}
baseOffset := s.NextOffset
// Calculate total buffer size
totalSize := 0
for _, v := range values {
totalSize += totalHeaderWidth + len(v)
}
// Build combined buffer — one allocation, one syscall
buf := make([]byte, totalSize)
pos := 0
for i, value := range values {
offset := baseOffset + uint64(i)
endian.PutUint64(buf[pos:pos+offWidth], offset)
endian.PutUint32(buf[pos+offWidth:pos+totalHeaderWidth], uint32(len(value)))
copy(buf[pos+totalHeaderWidth:], value)
pos += totalHeaderWidth + len(value)
}
// Single write syscall for the entire batch
if _, err := writeFull(s.logFile, buf); err != nil {
return 0, err
}
// Update index entries
currWritePos := s.writePos.Load()
pos = 0
for i, value := range values {
offset := baseOffset + uint64(i)
recordSize := totalHeaderWidth + len(value)
s.bytesSinceLastIndex += uint64(recordSize)
if s.bytesSinceLastIndex >= IndexIntervalBytes || offset == s.BaseOffset {
if err := s.index.Write(uint32(offset-s.BaseOffset), uint64(currWritePos+int64(pos))); err != nil {
return 0, err
}
s.bytesSinceLastIndex = 0
}
pos += recordSize
}
s.writePos.Add(int64(totalSize))
count := uint64(len(values))
s.NextOffset = baseOffset + count
s.MaxOffset = baseOffset + count - 1
return baseOffset, nil
}
This is critical for replication performance — writing 5000 records as one buffer is vastly faster than 5000 individual Write calls.
Step 5: Build the sparse index
Index entry format
Each index entry is 12 bytes:
┌────────────────────┬────────────────────┐
│ Relative Offset │ Byte Position │
│ 4 bytes (uint32) │ 8 bytes (uint64) │
└────────────────────┴────────────────────┘
Index struct with mmap
The index file is memory-mapped so reads are fast (no syscalls — just pointer dereferences into the kernel page cache):
Index struct implementation
// segment/index.go package segment
import ( "encoding/binary" "os"
"github.com/tysonmote/gommap"
)
const ( IndexEntrySize = 4 + 8 // relOffset (4) + position (8) = 12 bytes IndexSize = 12 * 1024 // 12KB initial size (1024 entries) )
var indexEndian = binary.BigEndian
type IndexEntry struct { RelativeOffset uint32 Position uint64 }
type Index struct { file *os.File mmap gommap.MMap size int64 // current number of bytes written }
### Opening an index file
```go
func OpenIndex(filePath string) (*Index, error) {
file, err := os.OpenFile(filePath, os.O_RDWR|os.O_CREATE|os.O_APPEND, 0644)
if err != nil {
return nil, err
}
stat, err := file.Stat()
if err != nil {
return nil, err
}
initalSize := stat.Size()
if initalSize == 0 {
if err := file.Truncate(IndexSize); err != nil {
return nil, err
}
}
m, err := gommap.Map(file.Fd(), gommap.PROT_READ|gommap.PROT_WRITE, gommap.MAP_SHARED)
if err != nil {
return nil, err
}
idx := &Index{
file: file,
mmap: m,
size: initalSize,
}
return idx, nil
}
Writing an index entry
Write at the current size offset, then advance:
func (idx *Index) Write(relOffset uint32, position uint64) error {
if idx.size+IndexEntrySize > int64(len(idx.mmap)) {
if err := idx.grow(); err != nil {
return err
}
}
buf := idx.mmap[idx.size : idx.size+IndexEntrySize]
indexEndian.PutUint32(buf[0:4], relOffset)
indexEndian.PutUint64(buf[4:12], position)
idx.size += IndexEntrySize
return nil
}
Growing the index dynamically
The grow() method uses UnsafeUnmap(), doubles the file size with Truncate, and re-maps:
func (idx *Index) grow() error {
newSize := int64(len(idx.mmap)) * 2
if err := idx.mmap.UnsafeUnmap(); err != nil {
return err
}
if err := idx.file.Truncate(newSize); err != nil {
return err
}
m, err := gommap.Map(idx.file.Fd(), gommap.PROT_READ|gommap.PROT_WRITE, gommap.MAP_SHARED)
if err != nil {
return err
}
idx.mmap = m
return nil
}
This is infrequent because 12 KB holds 1024 entries.
Reading a single index entry
func (idx *Index) Entry(i int64) IndexEntry {
pos := i * IndexEntrySize
buf := idx.mmap[pos : pos+IndexEntrySize]
return IndexEntry{
RelativeOffset: indexEndian.Uint32(buf[0:4]),
Position: indexEndian.Uint64(buf[4:12]),
}
}
Binary search for efficient lookups
Find the largest index entry whose relative offset is <= the target. This gives the closest starting position for a forward scan:
func (idx *Index) Find(relOffset uint32) (IndexEntry, bool) {
count := idx.size / IndexEntrySize
if count == 0 {
return IndexEntry{}, false
}
var result IndexEntry
low, high := int64(0), count-1
for low <= high {
mid := (low + high) / 2
entry := idx.Entry(mid)
if entry.RelativeOffset <= relOffset {
result = entry
low = mid + 1
} else {
high = mid - 1
}
}
return result, true
}
Index helper methods
func (idx *Index) Last() (IndexEntry, bool) {
count := idx.size / IndexEntrySize
if count == 0 {
return IndexEntry{}, false
}
return idx.Entry(count - 1), true
}
func (idx *Index) TruncateAfter(position uint64) error {
var truncateSize int64 = idx.size
count := idx.size / IndexEntrySize
for i := count - 1; i >= 0; i-- {
entry := idx.Entry(i)
if entry.Position <= position {
break
}
truncateSize -= IndexEntrySize
}
if truncateSize == idx.size {
return nil
}
idx.size = truncateSize
return nil
}
Closing the index and syncing to disk
On close, sync the mmap, sync the file, unmap, truncate to the actual size (removing unused pre-allocated space), and close:
func (idx *Index) Close() error {
if err := idx.mmap.Sync(gommap.MS_SYNC); err != nil {
return err
}
if err := idx.file.Sync(); err != nil {
return err
}
if err := idx.mmap.UnsafeUnmap(); err != nil {
return err
}
if err := idx.file.Truncate(int64(idx.size)); err != nil {
return err
}
return idx.file.Close()
}
Step 6: Implement read with bulk-loading and scanning
Read strategy with index lookups
To read a record by offset:
- Check the offset is in range
[BaseOffset, NextOffset). - Compute the relative offset:
relOffset = offset - BaseOffset. - Binary search the index for the starting position.
- Bulk-read the region from that position to
writePosinto memory (one syscall). - Scan forward in memory until you find the target offset.
func (s *Segment) Read(offset uint64) ([]byte, error) {
writePos := s.writePos.Load()
if offset < s.BaseOffset || offset >= s.NextOffset {
return nil, errs.ErrSegmentOffsetOutOfRange(offset, s.BaseOffset, s.NextOffset)
}
// Use sparse index to find starting position (floor entry <= offset)
var startPos int64
relOffset := uint32(offset - s.BaseOffset)
if indexEntry, found := s.index.Find(relOffset); found {
startPos = int64(indexEntry.Position)
}
// Bulk read the region from startPos to writePos into memory — one syscall
regionSize := writePos - startPos
region := make([]byte, regionSize)
n, err := s.logFile.ReadAt(region, startPos)
if err != nil && err != io.EOF {
return nil, err
}
region = region[:n]
// Scan in-memory buffer — no further syscalls
pos := 0
for pos+totalHeaderWidth <= len(region) {
foundOffset := endian.Uint64(region[pos : pos+offWidth])
msgLen := int(endian.Uint32(region[pos+offWidth : pos+totalHeaderWidth]))
if foundOffset == offset {
end := pos + totalHeaderWidth + msgLen
if end > len(region) {
return nil, io.ErrUnexpectedEOF
}
value := make([]byte, offWidth+msgLen)
endian.PutUint64(value[0:offWidth], foundOffset)
copy(value[offWidth:], region[pos+totalHeaderWidth:end])
return value, nil
}
if foundOffset > offset {
return nil, errs.ErrSegmentOffsetNotFound
}
pos += totalHeaderWidth + msgLen
}
return nil, io.EOF
}
Notice that Read returns [Offset 8 bytes][Value] — not just the raw value. The 8-byte offset prefix is used by the Raft log store and the replication streaming reader.
Why bulk-read? Instead of reading each record header individually (one ReadAt syscall per record), we read the entire region from the index entry to writePos in a single syscall, then scan in memory. This is much faster because between index entries there could be dozens of records.
Step 7: Add segment utility methods
Check if segment is full and closing
func (s *Segment) IsFull() bool {
return s.writePos.Load() >= MaxSegmentBytes
}
func (s *Segment) Close() error {
s.mu.Lock()
defer s.mu.Unlock()
if err := s.logFile.Close(); err != nil {
return err
}
if err := s.index.Close(); err != nil {
return err
}
return nil
}
Step 8: Implement streaming reader for replication
Streaming read concept
For replication, you need to read a contiguous range of records efficiently. The segment provides two reader methods:
// Reader returns an io.Reader that streams all records from BaseOffset.
func (s *Segment) Reader() io.Reader {
if s.BaseOffset >= s.NextOffset {
return bytes.NewReader(nil)
}
r, err := s.NewStreamingReader(s.BaseOffset)
if err != nil {
return bytes.NewReader(nil)
}
return r
}
// NewStreamingReader returns an io.Reader starting at the physical position
// corresponding to startOffset. It reads until the current end-of-segment.
func (s *Segment) NewStreamingReader(startOffset uint64) (io.Reader, error) {
currentWritePos := s.writePos.Load()
if startOffset < s.BaseOffset || startOffset >= s.NextOffset {
return nil, errs.ErrSegmentOffsetOutOfRangeSimple(startOffset)
}
// Use the index to find the starting physical position
relOffset := uint32(startOffset - s.BaseOffset)
indexEntry, ok := s.index.Find(relOffset)
if !ok {
return nil, errs.ErrSegmentIndexNotFound
}
// Calculate how many bytes are available to read from that position
size := currentWritePos - int64(indexEntry.Position)
// SectionReader acts like a private io.Reader for this consumer
section := io.NewSectionReader(s.logFile, int64(indexEntry.Position), size)
return s.catchUp(section, startOffset)
}
CatchUp helper for skipping to target offset
The catchUp helper skips records between the index entry and the target offset:
func (s *Segment) catchUp(r io.Reader, target uint64) (io.Reader, error) {
for {
header := make([]byte, 12)
if _, err := io.ReadFull(r, header); err != nil {
return nil, err
}
offset := binary.BigEndian.Uint64(header[0:8])
msgLen := binary.BigEndian.Uint32(header[8:12])
if offset >= target {
// Found the start — join the header back with the rest of the stream
return io.MultiReader(bytes.NewReader(header), r), nil
}
// Not there yet, skip the payload
if _, err := io.CopyN(io.Discard, r, int64(msgLen)); err != nil {
return nil, err
}
}
}
Step 9: Implement segment recovery on restart
Recovery process overview
When the process restarts, it must recover existing segments. LoadExistingSegment opens the .log and .idx files and calls Recover to discover the true NextOffset and writePos:
func LoadExistingSegment(baseOffset uint64, dir string) (*Segment, error) {
logFilePath := filepath.Join(dir, formatLogFileName(baseOffset))
logFile, err := os.OpenFile(logFilePath, os.O_RDWR|os.O_APPEND, 0644)
if err != nil {
return nil, err
}
indexFilePath := filepath.Join(dir, formatIndexFileName(baseOffset))
index, err := OpenIndex(indexFilePath)
if err != nil {
logFile.Close()
return nil, err
}
segment := &Segment{
BaseOffset: baseOffset,
NextOffset: baseOffset,
logFile: logFile,
index: index,
}
if err := segment.Recover(); err != nil {
logFile.Close()
index.Close()
return nil, err
}
return segment, nil
}
Recover method with validation and truncation
The Recover method starts from the last known index entry, scans forward using a buffered reader, validates each record, and truncates at the last valid position:
func (s *Segment) Recover() error {
var (
startPos int64 = 0
nextOffset = s.BaseOffset
)
// 1. Start from the last healthy index entry
lastEntry, ok := s.index.Last()
if ok {
startPos = int64(lastEntry.Position)
nextOffset = s.BaseOffset + uint64(lastEntry.RelativeOffset)
}
// 2. Seek to the checkpoint
if _, err := s.logFile.Seek(startPos, io.SeekStart); err != nil {
return errs.ErrSeekFailed(err)
}
// Buffered reader for recovery — avoids thousands of small read syscalls
reader := bufio.NewReader(s.logFile)
currPos := startPos
currOffset := nextOffset
for {
// Read Header: [Offset: 8][Len: 4]
header, err := reader.Peek(12)
if err != nil {
if err == io.EOF || err == io.ErrUnexpectedEOF {
break
}
return err
}
recOffset := binary.BigEndian.Uint64(header[0:8])
recLen := binary.BigEndian.Uint32(header[8:12])
// Logic check: prevents reading partial "ghost" writes from a crash
if recOffset != currOffset {
break
}
entrySize := int64(12 + recLen)
if _, err := reader.Discard(int(entrySize)); err != nil {
break // Partial record at end of file
}
currPos += entrySize
currOffset++
}
if err := s.logFile.Truncate(currPos); err != nil {
return errs.ErrTruncateFailed(err)
}
if _, err := s.logFile.Seek(currPos, io.SeekStart); err != nil {
return err
}
// Reset state
s.writePos.Store(currPos)
s.NextOffset = currOffset
if currOffset > s.BaseOffset {
s.MaxOffset = currOffset - 1
}
// Remove index entries that point to truncated/corrupt area
if err := s.index.TruncateAfter(uint64(currPos)); err != nil {
return errs.ErrIndexSyncFailed(err)
}
return nil
}
Step 10: Build the Log type for managing multiple segments
Log struct overview
The Log type holds a sorted list of segments and an active segment:
// log/log.go
package log
type Log struct {
mu sync.RWMutex
Dir string
segments []*segment.Segment
activeSegment *segment.Segment
}
Creating a new log
NewLog scans the directory for existing .log files. If found, it loads and recovers each segment. Otherwise it creates the first segment at offset 0:
func NewLog(dir string) (*Log, error) {
log := &Log{
Dir: dir,
segments: make([]*segment.Segment, 0),
}
err := os.MkdirAll(dir, 0755)
if err != nil {
return nil, err
}
dirEnt, err := os.ReadDir(dir)
if err != nil {
return nil, err
}
if len(dirEnt) > 0 {
// Load existing segments
var baseOffsets []uint64
for _, entry := range dirEnt {
var baseOffset uint64
n, err := fmt.Sscanf(entry.Name(), "%020d.log", &baseOffset)
if n == 1 && err == nil {
baseOffsets = append(baseOffsets, baseOffset)
}
}
sort.Slice(baseOffsets, func(i int, j int) bool {
return baseOffsets[i] < baseOffsets[j]
})
for _, baseOffset := range baseOffsets {
seg, err := segment.LoadExistingSegment(baseOffset, dir)
if err != nil {
return nil, err
}
log.segments = append(log.segments, seg)
}
log.activeSegment = log.segments[len(log.segments)-1]
return log, nil
}
// No existing segments, create a new one
activeSegment, err := segment.NewSegment(0, dir)
if err != nil {
return nil, err
}
log.segments = append(log.segments, activeSegment)
log.activeSegment = activeSegment
return log, nil
}
Binary search for segment lookups
The findSegment method uses sort.Search for O(log n) lookups instead of linear scanning:
func (l *Log) findSegment(offset uint64) *segment.Segment {
n := len(l.segments)
if n == 0 {
return nil
}
// Binary search: find the last segment with BaseOffset <= offset
i := sort.Search(n, func(i int) bool {
return l.segments[i].BaseOffset > offset
}) - 1
if i < 0 {
return nil
}
seg := l.segments[i]
if offset >= seg.NextOffset {
return nil
}
return seg
}
Rolling segments when full
When the active segment is full, create a new one:
func (l *Log) rollSegment() error {
newSeg, err := segment.NewSegment(l.activeSegment.NextOffset, l.Dir)
if err != nil {
return err
}
l.segments = append(l.segments, newSeg)
l.activeSegment = newSeg
return nil
}
Append to the active segment
Append to the active segment. If it is full, roll to a new one:
func (l *Log) Append(value []byte) (uint64, error) {
l.mu.Lock()
defer l.mu.Unlock()
off, err := l.activeSegment.Append(value)
if err != nil {
return 0, err
}
if l.activeSegment.IsFull() {
if err := l.rollSegment(); err != nil {
return 0, err
}
}
return off, nil
}
Batch append for Log
AppendBatch works the same way but delegates to Segment.AppendBatch:
func (l *Log) AppendBatch(values [][]byte) (uint64, error) {
l.mu.Lock()
defer l.mu.Unlock()
baseOff, err := l.activeSegment.AppendBatch(values)
if err != nil {
return 0, err
}
if l.activeSegment.IsFull() {
if err := l.rollSegment(); err != nil {
return 0, err
}
}
return baseOff, nil
}
Read from the correct segment
Find which segment contains the offset using binary search, then delegate:
func (l *Log) Read(offset uint64) ([]byte, error) {
l.mu.RLock()
defer l.mu.RUnlock()
seg := l.findSegment(offset)
if seg == nil {
return nil, errs.ErrLogOffsetOutOfRangef(offset)
}
return seg.Read(offset)
}
Log accessor methods
func (l *Log) LowestOffset() uint64 {
l.mu.RLock()
defer l.mu.RUnlock()
if len(l.segments) == 0 {
return 0
}
return l.segments[0].BaseOffset
}
func (l *Log) HighestOffset() uint64 {
l.mu.RLock()
defer l.mu.RUnlock()
if l.activeSegment == nil {
return 0
}
return l.activeSegment.MaxOffset
}
// IsEmpty returns true if the log has no entries.
// Used by Raft log store to distinguish "no entries" from "first entry at offset 0".
func (l *Log) IsEmpty() bool {
l.mu.RLock()
defer l.mu.RUnlock()
if len(l.segments) == 0 {
return true
}
seg := l.segments[0]
return seg.NextOffset == seg.BaseOffset
}
Streaming reader across segments
The ReaderFrom method returns an io.Reader that streams raw segment records from a starting offset across multiple segments. Used for replication:
func (l *Log) ReaderFrom(startOffset uint64) (io.Reader, error) {
l.mu.RLock()
defer l.mu.RUnlock()
endOffset := uint64(0)
if l.activeSegment != nil {
endOffset = l.activeSegment.NextOffset
}
if startOffset >= endOffset {
return bytes.NewReader(nil), nil
}
targetIdx := l.findSegmentIndex(startOffset)
if targetIdx < 0 {
return nil, errs.ErrLogOffsetOutOfRangef(startOffset)
}
seg := l.segments[targetIdx]
r, err := seg.NewStreamingReader(startOffset)
if err != nil {
return nil, err
}
if targetIdx == len(l.segments)-1 {
return r, nil
}
readers := make([]io.Reader, 0, len(l.segments)-targetIdx)
readers = append(readers, r)
for i := targetIdx + 1; i < len(l.segments); i++ {
readers = append(readers, l.segments[i].Reader())
}
return io.MultiReader(readers...), nil
}
Step 11: Add LogManager to track LEO and high watermark
LogManager concepts
The LogManager wraps Log and adds the two key distributed-log concepts:
- LEO (Log End Offset) — the next offset to be written. Advances on every append.
- High Watermark (HW) — the highest committed offset. Consumers read only up to HW.
Both use atomic.Uint64 for lock-free reads:
LogManager struct
// log/log_manager.go
package log
import (
"sync/atomic"
"github.com/mohitkumar/mlog/errs"
)
type LogManager struct {
*Log
leo atomic.Uint64 // Log End Offset
highWatermark atomic.Uint64
}
func NewLogManager(dir string) (*LogManager, error) {
log, err := NewLog(dir)
if err != nil {
return nil, err
}
lm := &LogManager{
Log: log,
}
// Initialize LEO from the active segment's NextOffset (restart scenario)
if log.activeSegment != nil {
lm.leo.Store(log.activeSegment.NextOffset)
}
return lm, nil
}
LEO and high watermark accessors
func (l *LogManager) LEO() uint64 {
return l.leo.Load()
}
func (l *LogManager) SetLEO(leo uint64) {
l.leo.Store(leo)
}
func (l *LogManager) HighWatermark() uint64 {
return l.highWatermark.Load()
}
func (l *LogManager) SetHighWatermark(highWatermark uint64) {
l.highWatermark.Store(highWatermark)
}
Append and batch append (advancing LEO)
func (l *LogManager) Append(value []byte) (uint64, error) {
offset, err := l.Log.Append(value)
if err != nil {
return 0, err
}
// LEO is the next offset to write, so after writing at offset N, LEO becomes N+1
l.leo.Store(offset + 1)
return offset, nil
}
AppendBatch for LogManager
func (l *LogManager) AppendBatch(values [][]byte) (uint64, error) {
baseOffset, err := l.Log.AppendBatch(values)
if err != nil {
return 0, err
}
l.leo.Store(baseOffset + uint64(len(values)))
return baseOffset, nil
}
Read with high watermark enforcement
Normal consumers read only up to HW:
func (l *LogManager) Read(offset uint64) ([]byte, error) {
hw := l.highWatermark.Load()
// Consumers should only read up to (and including) the high watermark
if offset > hw {
return nil, errs.ErrLogOffsetBeyondHWf(offset, hw)
}
return l.Log.Read(offset)
}
ReadUncommitted for replication
Follower replicas need to read beyond HW (up to LEO) to catch up:
func (l *LogManager) ReadUncommitted(offset uint64) ([]byte, error) {
return l.Log.Read(offset)
}
Step 12: Test your implementation
Verify your build
Run tests for both packages:
go test ./segment/... -v
go test ./log/... -v
Write tests that:
- Create a segment, append several records, and read each by offset.
- Verify the sparse index produces correct results (append > 4 KB of data to trigger index entries).
- Create a Log, append enough data to trigger segment rotation, and read across segments.
- Test LogManager: append, read within HW, verify ReadUncommitted works beyond HW, advance HW and re-read.
- Test recovery: create a segment, close it, reopen with
LoadExistingSegment, and verify NextOffset and reads are correct.
Summary
| Component | What it does |
|---|---|
| Record format | [Offset 8][Length 4][Value] — big-endian, self-describing. |
| Segment | One .log file (append-only) + one .idx file (sparse index). Append encodes and writes with writeFull; Read bulk-reads and scans. atomic.Int64 writePos for lock-free reads. |
| Index | Fixed 12-byte entries (relative offset + position). Memory-mapped with gommap. Binary search for lookups. Grows dynamically by doubling. |
| Log | Multiple segments sorted by base offset. Binary search to find segments. Append to active segment; roll when full. |
| LogManager | Wraps Log with atomic.Uint64 LEO and HW. Read enforces HW; ReadUncommitted goes up to LEO. Batch append for replication. |
With the storage layer complete, the next page builds the wire protocol — the frame format and message types that let nodes and clients communicate over TCP.