wal

package
v0.0.0-...-c0d7cef Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Oct 6, 2025 License: MIT Imports: 20 Imported by: 0

Documentation

Overview

Package wal implements a bulletproof Write-Ahead Log for audit logging.

Index

Constants

View Source
const (
	// JournalMagic is the magic bytes for journal files.
	JournalMagic = 0x4A524E4C // "JRNL"
	// StatusIncomplete indicates a journal entry that has not completed writing
	StatusIncomplete = 0
	// StatusComplete indicates a journal entry that has been fully written
	StatusComplete = 1
	// StatusApplied indicates a journal entry that has been applied to the main file
	StatusApplied = 2
)
View Source
const (
	// MagicHeader identifies the start of a WAL record
	MagicHeader = 0x4D544C47 // "MTLG" in hex
	// MagicFooter identifies the end of a WAL record
	MagicFooter = 0x454E4452 // "ENDR" in hex
	// Version is the WAL format version
	Version = 1

	// RecordFlagDeleted marks a record as deleted.
	RecordFlagDeleted = 1 << 0 // Record has been marked for deletion
)
View Source
const RecordFlagCompacted = 1 << 1

RecordFlagCompacted marks a record as part of a compacted segment

Variables

This section is empty.

Functions

func VerifyIntegrity

func VerifyIntegrity(data []byte, checksums map[ChecksumType]uint64) error

VerifyIntegrity performs multi-level integrity verification

Types

type BlockChecksum

type BlockChecksum struct {
	BlockSize int
	Type      ChecksumType
}

BlockChecksum calculates checksums for data blocks

func (*BlockChecksum) CalculateBlocks

func (bc *BlockChecksum) CalculateBlocks(data []byte) []uint64

CalculateBlocks returns checksums for each block of data

func (*BlockChecksum) VerifyBlocks

func (bc *BlockChecksum) VerifyBlocks(data []byte, checksums []uint64) (int, error)

VerifyBlocks verifies checksums for each block of data

type CRC32CChecksum

type CRC32CChecksum struct{}

CRC32CChecksum implements CRC32C (Castagnoli) checksum This is hardware-accelerated on modern CPUs

func (*CRC32CChecksum) Calculate

func (c *CRC32CChecksum) Calculate(data []byte) uint64

Calculate computes the CRC32C checksum of the given data.

func (*CRC32CChecksum) Name

func (c *CRC32CChecksum) Name() string

Name returns the checksum algorithm name.

func (*CRC32CChecksum) Verify

func (c *CRC32CChecksum) Verify(data []byte, expected uint64) bool

Verify checks if the data matches the expected CRC32C checksum.

type CRC32Checksum

type CRC32Checksum struct{}

CRC32Checksum implements CRC32 (IEEE) checksum

func (*CRC32Checksum) Calculate

func (c *CRC32Checksum) Calculate(data []byte) uint64

Calculate computes the CRC32 checksum of the given data.

func (*CRC32Checksum) Name

func (c *CRC32Checksum) Name() string

Name returns the checksum algorithm name.

func (*CRC32Checksum) Verify

func (c *CRC32Checksum) Verify(data []byte, expected uint64) bool

Verify checks if the data matches the expected CRC32 checksum.

type CRC64Checksum

type CRC64Checksum struct{}

CRC64Checksum implements CRC64 (ISO) checksum

func (*CRC64Checksum) Calculate

func (c *CRC64Checksum) Calculate(data []byte) uint64

Calculate computes the CRC64 checksum of the given data.

func (*CRC64Checksum) Name

func (c *CRC64Checksum) Name() string

Name returns the checksum algorithm name.

func (*CRC64Checksum) Verify

func (c *CRC64Checksum) Verify(data []byte, expected uint64) bool

Verify checks if the data matches the expected CRC64 checksum.

type Checksum

type Checksum interface {
	// Calculate returns the checksum of data
	Calculate(data []byte) uint64
	// Verify checks if data matches the expected checksum
	Verify(data []byte, expected uint64) bool
	// Name returns the algorithm name
	Name() string
}

Checksum provides multiple checksum algorithms for data integrity

func NewChecksum

func NewChecksum(typ ChecksumType) Checksum

NewChecksum creates a checksum calculator for the specified type

type ChecksumError

type ChecksumError struct {
	Type     ChecksumType
	Expected uint64
	Actual   uint64
}

ChecksumError represents a checksum mismatch error

func (*ChecksumError) Error

func (e *ChecksumError) Error() string

type ChecksumType

type ChecksumType int

ChecksumType represents different checksum algorithms

const (
	// ChecksumCRC32 is the CRC32 (IEEE) checksum algorithm
	ChecksumCRC32 ChecksumType = iota
	// ChecksumCRC32C is the CRC32C (Castagnoli) checksum algorithm - hardware accelerated
	ChecksumCRC32C
	// ChecksumCRC64 is the CRC64 (ISO) checksum algorithm
	ChecksumCRC64
	// ChecksumXXHash3 is the XXHash64 non-cryptographic hash algorithm
	ChecksumXXHash3
)

type CompactionPolicy

type CompactionPolicy struct {
	// MinSegments is the minimum number of segments to trigger compaction
	MinSegments int
	// MaxSegmentAge is the maximum age before a segment is compacted
	MaxSegmentAge time.Duration
	// MinSegmentSize is the minimum size for a segment to be considered for compaction
	MinSegmentSize int64
	// TargetSegmentSize is the target size for compacted segments
	TargetSegmentSize int64
	// RetentionPeriod is how long to keep segments before deletion
	RetentionPeriod time.Duration
	// CompactRatio is the minimum ratio of live/total data to trigger compaction
	CompactRatio float64
}

CompactionPolicy defines when and how to compact segments

func DefaultCompactionPolicy

func DefaultCompactionPolicy() *CompactionPolicy

DefaultCompactionPolicy returns a sensible default compaction policy

type CompactionStats

type CompactionStats struct {
	LastCompactionTime     time.Time
	LastAnalyzedTime       time.Time
	LastAnalyzedSegment    string
	Errors                 []error
	CompactionsRun         int
	SegmentsCompacted      int
	BytesCompacted         int64
	BytesReclaimed         int64
	LastCompactionDuration time.Duration
	LastAnalyzedRatio      float64
	LastAnalyzedDeleted    int
	LastAnalyzedSuperseded int
}

CompactionStats tracks compaction statistics

type Compactor

type Compactor struct {
	// contains filtered or unexported fields
}

Compactor handles segment compaction for the WAL

func NewCompactor

func NewCompactor(wal *WAL, policy *CompactionPolicy) *Compactor

NewCompactor creates a new segment compactor

func (*Compactor) Compact

func (c *Compactor) Compact() error

Compact performs a compaction cycle

func (*Compactor) CompactRange

func (c *Compactor) CompactRange(startSeq, endSeq uint64) error

CompactRange compacts segments within a sequence range

func (*Compactor) EstimateCompactionGain

func (c *Compactor) EstimateCompactionGain() (int64, error)

EstimateCompactionGain estimates bytes that would be reclaimed by compaction

func (*Compactor) ForceCompact

func (c *Compactor) ForceCompact() error

ForceCompact triggers an immediate compaction

func (*Compactor) GetStats

func (c *Compactor) GetStats() CompactionStats

GetStats returns compaction statistics

func (*Compactor) MarkDeleted

func (c *Compactor) MarkDeleted(sequence uint64) error

MarkDeleted marks a record as deleted for later compaction

func (*Compactor) Start

func (c *Compactor) Start() error

Start begins the background compaction process

func (*Compactor) Stop

func (c *Compactor) Stop() error

Stop halts the background compaction process

func (*Compactor) VacuumDeleted

func (c *Compactor) VacuumDeleted() error

VacuumDeleted removes deleted records from segments

type CompositeChecksum

type CompositeChecksum struct {
	// contains filtered or unexported fields
}

CompositeChecksum combines multiple checksums for extra protection

func NewCompositeChecksum

func NewCompositeChecksum(primary, secondary ChecksumType) *CompositeChecksum

NewCompositeChecksum creates a checksum that uses two algorithms

func (*CompositeChecksum) Calculate

func (c *CompositeChecksum) Calculate(data []byte) uint64

Calculate computes the composite checksum using both algorithms.

func (*CompositeChecksum) Name

func (c *CompositeChecksum) Name() string

Name returns the composite checksum algorithm name.

func (*CompositeChecksum) Verify

func (c *CompositeChecksum) Verify(data []byte, expected uint64) bool

Verify checks if the data matches the expected composite checksum.

type DoubleWriteBuffer

type DoubleWriteBuffer struct {
	// contains filtered or unexported fields
}

DoubleWriteBuffer implements torn-write protection using a journal It ensures that writes are atomic even if the process crashes mid-write

func NewDoubleWriteBuffer

func NewDoubleWriteBuffer(journal *os.File, bufferSize int) (*DoubleWriteBuffer, error)

NewDoubleWriteBuffer creates a new double-write buffer

func (*DoubleWriteBuffer) Clear

func (d *DoubleWriteBuffer) Clear() error

Clear truncates the journal file

func (*DoubleWriteBuffer) Compact

func (d *DoubleWriteBuffer) Compact() error

Compact removes completed entries from the journal

func (*DoubleWriteBuffer) MarkComplete

func (d *DoubleWriteBuffer) MarkComplete(needsSync bool) error

MarkComplete marks the last journal entry as complete The needsSync parameter controls whether to sync immediately (for durability)

func (*DoubleWriteBuffer) MarkIncomplete

func (d *DoubleWriteBuffer) MarkIncomplete() error

MarkIncomplete marks the last journal entry as incomplete

func (*DoubleWriteBuffer) RecoverIncomplete

func (d *DoubleWriteBuffer) RecoverIncomplete() ([]JournalEntry, error)

RecoverIncomplete reads any incomplete journal entries that need to be replayed

func (*DoubleWriteBuffer) WriteToJournal

func (d *DoubleWriteBuffer) WriteToJournal(data []byte, position int64, needsSync bool) error

WriteToJournal writes data to the journal for torn-write protection The needsSync parameter controls whether to sync immediately (for durability)

type Index

type Index struct {
	// contains filtered or unexported fields
}

Index provides fast lookups for WAL segments and records. It maintains an in-memory index of segments with their sequence ranges and timestamps for efficient time-based queries.

func NewIndex

func NewIndex(path string) *Index

NewIndex creates a new WAL index

func (*Index) AddEntry

func (idx *Index) AddEntry(entry IndexEntry)

AddEntry adds a new entry to the index

func (*Index) Build

func (idx *Index) Build(segments []*Segment) error

Build scans all WAL segments and builds the index

func (*Index) FindBySequence

func (idx *Index) FindBySequence(seq uint64) (*IndexEntry, error)

FindBySequence locates a record by sequence number

func (*Index) FindBySequenceExcludeDeleted

func (idx *Index) FindBySequenceExcludeDeleted(seq uint64) (*IndexEntry, error)

FindBySequenceExcludeDeleted locates a record by sequence number, excluding deleted records

func (*Index) FindByTimeRange

func (idx *Index) FindByTimeRange(start, end time.Time) ([]IndexEntry, error)

FindByTimeRange returns all records within a time range

func (*Index) FindByTimeRangeExcludeDeleted

func (idx *Index) FindByTimeRangeExcludeDeleted(start, end time.Time) ([]IndexEntry, error)

FindByTimeRangeExcludeDeleted returns all non-deleted records within a time range

func (*Index) GetSegmentInfo

func (idx *Index) GetSegmentInfo() []SegmentInfo

GetSegmentInfo returns information about all segments

func (*Index) GetSequenceRange

func (idx *Index) GetSequenceRange() (minSeq, maxSeq uint64)

GetSequenceRange returns the min and max sequence numbers in the index

func (*Index) Load

func (idx *Index) Load() error

Load loads the index from disk

func (*Index) RemoveSegment

func (idx *Index) RemoveSegment(path string)

RemoveSegment removes a segment from the index

type IndexEntry

type IndexEntry struct {
	Timestamp time.Time
	Segment   string
	Sequence  uint64
	Offset    int64
	Size      int32
	Checksum  uint32
	Flags     uint16
}

IndexEntry represents a single record's location in the WAL

type IntegrityReport

type IntegrityReport struct {
	LastTimestamp     time.Time
	TotalRecords      int
	CorruptedSegments int
	RecoveredRecords  int
	LastSequence      uint64
	Valid             bool
}

IntegrityReport contains the results of a WAL integrity check.

type JournalEntry

type JournalEntry struct {
	Data     []byte
	Position int64
	Magic    uint32
	Length   uint32
	CRC32    uint32
	Status   uint8
}

JournalEntry represents a write that needs to be made atomic

type Option

type Option func(*config) error

Option configures the WAL.

func WithSegmentSize

func WithSegmentSize(size int64) Option

WithSegmentSize sets the maximum size of a WAL segment before rotation.

func WithSyncInterval

func WithSyncInterval(interval time.Duration) Option

WithSyncInterval sets the sync interval for SyncInterval mode.

func WithSyncMode

func WithSyncMode(mode SyncMode) Option

WithSyncMode sets when the WAL syncs to disk.

type Reader

type Reader struct {
	// contains filtered or unexported fields
}

Reader reads events from a WAL file

func NewReader

func NewReader(path string) (*Reader, error)

NewReader creates a new WAL reader

func (*Reader) Close

func (r *Reader) Close() error

Close closes the reader

func (*Reader) GetOffset

func (r *Reader) GetOffset() int64

GetOffset returns the current offset

func (*Reader) ReadAll

func (r *Reader) ReadAll() ([]*core.LogEvent, error)

ReadAll reads all events from the WAL

func (*Reader) ReadNext

func (r *Reader) ReadNext() (*core.LogEvent, error)

ReadNext reads the next event from the WAL

func (*Reader) ReadRange

func (r *Reader) ReadRange(start, end time.Time) ([]*core.LogEvent, error)

ReadRange reads events within a time range

func (*Reader) Seek

func (r *Reader) Seek(offset int64, whence int) (int64, error)

Seek seeks to a specific offset in the WAL

type Record

type Record struct {
	EventData   []byte
	Timestamp   int64
	Sequence    uint64
	Magic       uint32
	Length      uint32
	CRC32Header uint32
	CRC32Data   uint32
	MagicEnd    uint32
	Version     uint16
	Flags       uint16
	PrevHash    [32]byte
}

Record represents a single entry in the WAL.

func NewRecord

func NewRecord(event *core.LogEvent, sequence uint64, prevHash [32]byte) (*Record, error)

NewRecord creates a new WAL record from a log event.

func UnmarshalRecord

func UnmarshalRecord(data []byte) (*Record, error)

UnmarshalRecord deserializes a record from bytes with CRC verification.

func UnmarshalRecordFromBytes

func UnmarshalRecordFromBytes(data []byte) (*Record, int, error)

UnmarshalRecordFromBytes unmarshals a record from bytes and returns bytes read

func (*Record) ComputeHash

func (r *Record) ComputeHash() [32]byte

ComputeHash calculates the SHA256 hash of the record for chaining.

func (*Record) GetEvent

func (r *Record) GetEvent() (*core.LogEvent, error)

GetEvent deserializes the event data back to a LogEvent.

func (*Record) Marshal

func (r *Record) Marshal() ([]byte, error)

Marshal serializes the record to bytes with CRC protection.

type RecoveredRecord

type RecoveredRecord struct {
	EventData      []byte
	Sequence       uint64
	Timestamp      uint64
	BytesRead      int
	HashChainValid bool
}

RecoveredRecord contains data from a recovered WAL record.

type RecoveryEngine

type RecoveryEngine struct {
	// contains filtered or unexported fields
}

RecoveryEngine handles WAL recovery after corruption or crashes.

func NewRecoveryEngine

func NewRecoveryEngine(path string, opts ...RecoveryOption) *RecoveryEngine

NewRecoveryEngine creates a new recovery engine for a WAL file.

func (*RecoveryEngine) ForensicRecover

func (r *RecoveryEngine) ForensicRecover() (*RecoveryReport, [][]byte, error)

ForensicRecover performs advanced forensic recovery with hash chain reconstruction

func (*RecoveryEngine) Recover

func (r *RecoveryEngine) Recover() (*RecoveryReport, [][]byte, error)

Recover attempts to recover all valid records from the WAL.

func (*RecoveryEngine) RecoverSegments

func (r *RecoveryEngine) RecoverSegments(segments []*Segment) (*RecoveryReport, [][]byte, error)

RecoverSegments recovers records from multiple WAL segments.

func (*RecoveryEngine) RepairWAL

func (r *RecoveryEngine) RepairWAL(outputPath string) error

RepairWAL attempts to repair a corrupted WAL by recovering valid records and writing them to a new file.

type RecoveryOption

type RecoveryOption func(*RecoveryEngine)

RecoveryOption configures the recovery engine.

func WithChecksumVerification

func WithChecksumVerification(verify bool) RecoveryOption

WithChecksumVerification enables CRC32 verification during recovery.

func WithForensicRecovery

func WithForensicRecovery(enable bool) RecoveryOption

WithForensicRecovery enables advanced forensic recovery techniques

func WithMaxRecordSize

func WithMaxRecordSize(size int64) RecoveryOption

WithMaxRecordSize sets the maximum expected record size.

func WithSkipCorrupted

func WithSkipCorrupted(skip bool) RecoveryOption

WithSkipCorrupted allows recovery to continue past corrupted records.

type RecoveryReport

type RecoveryReport struct {
	RecoveredSegments   []string
	Errors              []error
	RecoveryMethods     []string
	TotalRecords        int
	RecoveredRecords    int
	CorruptedRecords    int
	SkippedBytes        int64
	LastGoodSequence    uint64
	HashChainBreaks     int
	ReconstructedChains int
	PartialRecords      int
}

RecoveryReport contains the results of a recovery operation.

type RollingChecksum

type RollingChecksum struct {
	// contains filtered or unexported fields
}

RollingChecksum provides a rolling checksum for streaming data

func NewRollingChecksum

func NewRollingChecksum(windowSize int, typ ChecksumType) *RollingChecksum

NewRollingChecksum creates a rolling checksum with specified window size

func (*RollingChecksum) GetChecksum

func (rc *RollingChecksum) GetChecksum() uint64

GetChecksum returns the current rolling checksum

func (*RollingChecksum) Update

func (rc *RollingChecksum) Update(b byte) uint64

Update adds new data to the rolling window using incremental updates

type Segment

type Segment struct {
	CreatedAt time.Time
	Path      string
	StartSeq  uint64
	EndSeq    uint64
	Size      int64
	Version   uint16
	Sealed    bool
	Corrupted bool
}

Segment represents a single WAL segment file.

type SegmentInfo

type SegmentInfo struct {
	StartTime   time.Time
	EndTime     time.Time
	Path        string
	StartSeq    uint64
	EndSeq      uint64
	Size        int64
	RecordCount int
	Sealed      bool
	Corrupted   bool
}

SegmentInfo contains metadata about a WAL segment

type SegmentManager

type SegmentManager struct {
	// contains filtered or unexported fields
}

SegmentManager handles multiple WAL segments.

func NewSegmentManager

func NewSegmentManager(walPath string, maxSize int64) (*SegmentManager, error)

NewSegmentManager creates a new segment manager.

func (*SegmentManager) AddCompactedSegment

func (sm *SegmentManager) AddCompactedSegment(seg *Segment) error

AddCompactedSegment adds a compacted segment to the manager

func (*SegmentManager) GetActivePath

func (sm *SegmentManager) GetActivePath() string

GetActivePath returns the path to the active segment.

func (*SegmentManager) GetAllSegments

func (sm *SegmentManager) GetAllSegments() []*Segment

GetAllSegments returns all segments managed by the segment manager

func (*SegmentManager) GetSegments

func (sm *SegmentManager) GetSegments() []*Segment

GetSegments returns all known segments.

func (*SegmentManager) GetSegmentsInRange

func (sm *SegmentManager) GetSegmentsInRange(startSeq, endSeq uint64) []*Segment

GetSegmentsInRange returns segments within a sequence range

func (*SegmentManager) ReadAllSegments

func (sm *SegmentManager) ReadAllSegments() ([][]byte, error)

ReadAllSegments reads all events from all segments in order.

func (*SegmentManager) RemoveSegment

func (sm *SegmentManager) RemoveSegment(seg *Segment) error

RemoveSegment removes a segment from the manager

func (*SegmentManager) Rotate

func (sm *SegmentManager) Rotate(currentSeq uint64) (string, error)

Rotate creates a new segment and seals the current one.

func (*SegmentManager) ShouldRotate

func (sm *SegmentManager) ShouldRotate(currentSize int64) bool

ShouldRotate checks if the current segment should be rotated.

func (*SegmentManager) UpdateSegmentSizes

func (sm *SegmentManager) UpdateSegmentSizes() error

UpdateSegmentSizes refreshes the size information for all segments

type SyncMode

type SyncMode int

SyncMode defines when the WAL syncs to disk.

const (
	// SyncImmediate syncs after every write (safest, slowest)
	SyncImmediate SyncMode = iota
	// SyncInterval syncs periodically
	SyncInterval
	// SyncBatch syncs after a batch of writes
	SyncBatch
)

type WAL

type WAL struct {
	// contains filtered or unexported fields
}

WAL implements a Write-Ahead Log with guaranteed durability.

func New

func New(path string, opts ...Option) (*WAL, error)

New creates a new WAL instance with guaranteed durability.

func (*WAL) Close

func (w *WAL) Close() error

Close gracefully shuts down the WAL.

func (*WAL) Flush

func (w *WAL) Flush() error

Flush forces any buffered data to disk.

func (*WAL) GetSegments

func (w *WAL) GetSegments() []*Segment

GetSegments returns all segments managed by this WAL.

func (*WAL) VerifyIntegrity

func (w *WAL) VerifyIntegrity() error

VerifyIntegrity checks the integrity of the entire WAL.

func (*WAL) VerifyIntegrityReport

func (w *WAL) VerifyIntegrityReport() (*IntegrityReport, error)

VerifyIntegrityReport performs a detailed integrity check.

func (*WAL) Write

func (w *WAL) Write(event *core.LogEvent) error

Write appends a log event to the WAL with guaranteed durability.

type XXHash3Checksum

type XXHash3Checksum struct{}

XXHash3Checksum implements xxHash - extremely fast non-cryptographic hash Note: This uses xxHash (v2) which is production-ready and widely used

func (*XXHash3Checksum) Calculate

func (c *XXHash3Checksum) Calculate(data []byte) uint64

Calculate computes the xxHash checksum of the given data.

func (*XXHash3Checksum) Name

func (c *XXHash3Checksum) Name() string

Name returns the checksum algorithm name.

func (*XXHash3Checksum) Verify

func (c *XXHash3Checksum) Verify(data []byte, expected uint64) bool

Verify checks if the data matches the expected xxHash checksum.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL