Documentation
¶
Overview ¶
Package wal implements a bulletproof Write-Ahead Log for audit logging.
Index ¶
- Constants
- func VerifyIntegrity(data []byte, checksums map[ChecksumType]uint64) error
- type BlockChecksum
- type CRC32CChecksum
- type CRC32Checksum
- type CRC64Checksum
- type Checksum
- type ChecksumError
- type ChecksumType
- type CompactionPolicy
- type CompactionStats
- type Compactor
- func (c *Compactor) Compact() error
- func (c *Compactor) CompactRange(startSeq, endSeq uint64) error
- func (c *Compactor) EstimateCompactionGain() (int64, error)
- func (c *Compactor) ForceCompact() error
- func (c *Compactor) GetStats() CompactionStats
- func (c *Compactor) MarkDeleted(sequence uint64) error
- func (c *Compactor) Start() error
- func (c *Compactor) Stop() error
- func (c *Compactor) VacuumDeleted() error
- type CompositeChecksum
- type DoubleWriteBuffer
- func (d *DoubleWriteBuffer) Clear() error
- func (d *DoubleWriteBuffer) Compact() error
- func (d *DoubleWriteBuffer) MarkComplete(needsSync bool) error
- func (d *DoubleWriteBuffer) MarkIncomplete() error
- func (d *DoubleWriteBuffer) RecoverIncomplete() ([]JournalEntry, error)
- func (d *DoubleWriteBuffer) WriteToJournal(data []byte, position int64, needsSync bool) error
- type Index
- func (idx *Index) AddEntry(entry IndexEntry)
- func (idx *Index) Build(segments []*Segment) error
- func (idx *Index) FindBySequence(seq uint64) (*IndexEntry, error)
- func (idx *Index) FindBySequenceExcludeDeleted(seq uint64) (*IndexEntry, error)
- func (idx *Index) FindByTimeRange(start, end time.Time) ([]IndexEntry, error)
- func (idx *Index) FindByTimeRangeExcludeDeleted(start, end time.Time) ([]IndexEntry, error)
- func (idx *Index) GetSegmentInfo() []SegmentInfo
- func (idx *Index) GetSequenceRange() (minSeq, maxSeq uint64)
- func (idx *Index) Load() error
- func (idx *Index) RemoveSegment(path string)
- type IndexEntry
- type IntegrityReport
- type JournalEntry
- type Option
- type Reader
- func (r *Reader) Close() error
- func (r *Reader) GetOffset() int64
- func (r *Reader) ReadAll() ([]*core.LogEvent, error)
- func (r *Reader) ReadNext() (*core.LogEvent, error)
- func (r *Reader) ReadRange(start, end time.Time) ([]*core.LogEvent, error)
- func (r *Reader) Seek(offset int64, whence int) (int64, error)
- type Record
- type RecoveredRecord
- type RecoveryEngine
- func (r *RecoveryEngine) ForensicRecover() (*RecoveryReport, [][]byte, error)
- func (r *RecoveryEngine) Recover() (*RecoveryReport, [][]byte, error)
- func (r *RecoveryEngine) RecoverSegments(segments []*Segment) (*RecoveryReport, [][]byte, error)
- func (r *RecoveryEngine) RepairWAL(outputPath string) error
- type RecoveryOption
- type RecoveryReport
- type RollingChecksum
- type Segment
- type SegmentInfo
- type SegmentManager
- func (sm *SegmentManager) AddCompactedSegment(seg *Segment) error
- func (sm *SegmentManager) GetActivePath() string
- func (sm *SegmentManager) GetAllSegments() []*Segment
- func (sm *SegmentManager) GetSegments() []*Segment
- func (sm *SegmentManager) GetSegmentsInRange(startSeq, endSeq uint64) []*Segment
- func (sm *SegmentManager) ReadAllSegments() ([][]byte, error)
- func (sm *SegmentManager) RemoveSegment(seg *Segment) error
- func (sm *SegmentManager) Rotate(currentSeq uint64) (string, error)
- func (sm *SegmentManager) ShouldRotate(currentSize int64) bool
- func (sm *SegmentManager) UpdateSegmentSizes() error
- type SyncMode
- type WAL
- type XXHash3Checksum
Constants ¶
const ( // JournalMagic is the magic bytes for journal files. JournalMagic = 0x4A524E4C // "JRNL" // StatusIncomplete indicates a journal entry that has not completed writing StatusIncomplete = 0 // StatusComplete indicates a journal entry that has been fully written StatusComplete = 1 // StatusApplied indicates a journal entry that has been applied to the main file StatusApplied = 2 )
const ( // MagicHeader identifies the start of a WAL record MagicHeader = 0x4D544C47 // "MTLG" in hex MagicFooter = 0x454E4452 // "ENDR" in hex // Version is the WAL format version Version = 1 // RecordFlagDeleted marks a record as deleted. RecordFlagDeleted = 1 << 0 // Record has been marked for deletion )
const RecordFlagCompacted = 1 << 1
RecordFlagCompacted marks a record as part of a compacted segment
Variables ¶
This section is empty.
Functions ¶
func VerifyIntegrity ¶
func VerifyIntegrity(data []byte, checksums map[ChecksumType]uint64) error
VerifyIntegrity performs multi-level integrity verification
Types ¶
type BlockChecksum ¶
type BlockChecksum struct {
BlockSize int
Type ChecksumType
}
BlockChecksum calculates checksums for data blocks
func (*BlockChecksum) CalculateBlocks ¶
func (bc *BlockChecksum) CalculateBlocks(data []byte) []uint64
CalculateBlocks returns checksums for each block of data
func (*BlockChecksum) VerifyBlocks ¶
func (bc *BlockChecksum) VerifyBlocks(data []byte, checksums []uint64) (int, error)
VerifyBlocks verifies checksums for each block of data
type CRC32CChecksum ¶
type CRC32CChecksum struct{}
CRC32CChecksum implements CRC32C (Castagnoli) checksum This is hardware-accelerated on modern CPUs
func (*CRC32CChecksum) Calculate ¶
func (c *CRC32CChecksum) Calculate(data []byte) uint64
Calculate computes the CRC32C checksum of the given data.
func (*CRC32CChecksum) Name ¶
func (c *CRC32CChecksum) Name() string
Name returns the checksum algorithm name.
type CRC32Checksum ¶
type CRC32Checksum struct{}
CRC32Checksum implements CRC32 (IEEE) checksum
func (*CRC32Checksum) Calculate ¶
func (c *CRC32Checksum) Calculate(data []byte) uint64
Calculate computes the CRC32 checksum of the given data.
func (*CRC32Checksum) Name ¶
func (c *CRC32Checksum) Name() string
Name returns the checksum algorithm name.
type CRC64Checksum ¶
type CRC64Checksum struct{}
CRC64Checksum implements CRC64 (ISO) checksum
func (*CRC64Checksum) Calculate ¶
func (c *CRC64Checksum) Calculate(data []byte) uint64
Calculate computes the CRC64 checksum of the given data.
func (*CRC64Checksum) Name ¶
func (c *CRC64Checksum) Name() string
Name returns the checksum algorithm name.
type Checksum ¶
type Checksum interface {
// Calculate returns the checksum of data
Calculate(data []byte) uint64
// Verify checks if data matches the expected checksum
Verify(data []byte, expected uint64) bool
// Name returns the algorithm name
Name() string
}
Checksum provides multiple checksum algorithms for data integrity
func NewChecksum ¶
func NewChecksum(typ ChecksumType) Checksum
NewChecksum creates a checksum calculator for the specified type
type ChecksumError ¶
type ChecksumError struct {
Type ChecksumType
Expected uint64
Actual uint64
}
ChecksumError represents a checksum mismatch error
func (*ChecksumError) Error ¶
func (e *ChecksumError) Error() string
type ChecksumType ¶
type ChecksumType int
ChecksumType represents different checksum algorithms
const ( // ChecksumCRC32 is the CRC32 (IEEE) checksum algorithm ChecksumCRC32 ChecksumType = iota // ChecksumCRC32C is the CRC32C (Castagnoli) checksum algorithm - hardware accelerated ChecksumCRC32C // ChecksumCRC64 is the CRC64 (ISO) checksum algorithm ChecksumCRC64 // ChecksumXXHash3 is the XXHash64 non-cryptographic hash algorithm ChecksumXXHash3 )
type CompactionPolicy ¶
type CompactionPolicy struct {
// MinSegments is the minimum number of segments to trigger compaction
MinSegments int
// MaxSegmentAge is the maximum age before a segment is compacted
MaxSegmentAge time.Duration
// MinSegmentSize is the minimum size for a segment to be considered for compaction
MinSegmentSize int64
// TargetSegmentSize is the target size for compacted segments
TargetSegmentSize int64
// RetentionPeriod is how long to keep segments before deletion
RetentionPeriod time.Duration
// CompactRatio is the minimum ratio of live/total data to trigger compaction
CompactRatio float64
}
CompactionPolicy defines when and how to compact segments
func DefaultCompactionPolicy ¶
func DefaultCompactionPolicy() *CompactionPolicy
DefaultCompactionPolicy returns a sensible default compaction policy
type CompactionStats ¶
type CompactionStats struct {
LastCompactionTime time.Time
LastAnalyzedTime time.Time
LastAnalyzedSegment string
Errors []error
CompactionsRun int
SegmentsCompacted int
BytesCompacted int64
BytesReclaimed int64
LastCompactionDuration time.Duration
LastAnalyzedRatio float64
LastAnalyzedDeleted int
LastAnalyzedSuperseded int
}
CompactionStats tracks compaction statistics
type Compactor ¶
type Compactor struct {
// contains filtered or unexported fields
}
Compactor handles segment compaction for the WAL
func NewCompactor ¶
func NewCompactor(wal *WAL, policy *CompactionPolicy) *Compactor
NewCompactor creates a new segment compactor
func (*Compactor) CompactRange ¶
CompactRange compacts segments within a sequence range
func (*Compactor) EstimateCompactionGain ¶
EstimateCompactionGain estimates bytes that would be reclaimed by compaction
func (*Compactor) ForceCompact ¶
ForceCompact triggers an immediate compaction
func (*Compactor) GetStats ¶
func (c *Compactor) GetStats() CompactionStats
GetStats returns compaction statistics
func (*Compactor) MarkDeleted ¶
MarkDeleted marks a record as deleted for later compaction
func (*Compactor) VacuumDeleted ¶
VacuumDeleted removes deleted records from segments
type CompositeChecksum ¶
type CompositeChecksum struct {
// contains filtered or unexported fields
}
CompositeChecksum combines multiple checksums for extra protection
func NewCompositeChecksum ¶
func NewCompositeChecksum(primary, secondary ChecksumType) *CompositeChecksum
NewCompositeChecksum creates a checksum that uses two algorithms
func (*CompositeChecksum) Calculate ¶
func (c *CompositeChecksum) Calculate(data []byte) uint64
Calculate computes the composite checksum using both algorithms.
func (*CompositeChecksum) Name ¶
func (c *CompositeChecksum) Name() string
Name returns the composite checksum algorithm name.
type DoubleWriteBuffer ¶
type DoubleWriteBuffer struct {
// contains filtered or unexported fields
}
DoubleWriteBuffer implements torn-write protection using a journal It ensures that writes are atomic even if the process crashes mid-write
func NewDoubleWriteBuffer ¶
func NewDoubleWriteBuffer(journal *os.File, bufferSize int) (*DoubleWriteBuffer, error)
NewDoubleWriteBuffer creates a new double-write buffer
func (*DoubleWriteBuffer) Clear ¶
func (d *DoubleWriteBuffer) Clear() error
Clear truncates the journal file
func (*DoubleWriteBuffer) Compact ¶
func (d *DoubleWriteBuffer) Compact() error
Compact removes completed entries from the journal
func (*DoubleWriteBuffer) MarkComplete ¶
func (d *DoubleWriteBuffer) MarkComplete(needsSync bool) error
MarkComplete marks the last journal entry as complete The needsSync parameter controls whether to sync immediately (for durability)
func (*DoubleWriteBuffer) MarkIncomplete ¶
func (d *DoubleWriteBuffer) MarkIncomplete() error
MarkIncomplete marks the last journal entry as incomplete
func (*DoubleWriteBuffer) RecoverIncomplete ¶
func (d *DoubleWriteBuffer) RecoverIncomplete() ([]JournalEntry, error)
RecoverIncomplete reads any incomplete journal entries that need to be replayed
func (*DoubleWriteBuffer) WriteToJournal ¶
func (d *DoubleWriteBuffer) WriteToJournal(data []byte, position int64, needsSync bool) error
WriteToJournal writes data to the journal for torn-write protection The needsSync parameter controls whether to sync immediately (for durability)
type Index ¶
type Index struct {
// contains filtered or unexported fields
}
Index provides fast lookups for WAL segments and records. It maintains an in-memory index of segments with their sequence ranges and timestamps for efficient time-based queries.
func (*Index) AddEntry ¶
func (idx *Index) AddEntry(entry IndexEntry)
AddEntry adds a new entry to the index
func (*Index) FindBySequence ¶
func (idx *Index) FindBySequence(seq uint64) (*IndexEntry, error)
FindBySequence locates a record by sequence number
func (*Index) FindBySequenceExcludeDeleted ¶
func (idx *Index) FindBySequenceExcludeDeleted(seq uint64) (*IndexEntry, error)
FindBySequenceExcludeDeleted locates a record by sequence number, excluding deleted records
func (*Index) FindByTimeRange ¶
func (idx *Index) FindByTimeRange(start, end time.Time) ([]IndexEntry, error)
FindByTimeRange returns all records within a time range
func (*Index) FindByTimeRangeExcludeDeleted ¶
func (idx *Index) FindByTimeRangeExcludeDeleted(start, end time.Time) ([]IndexEntry, error)
FindByTimeRangeExcludeDeleted returns all non-deleted records within a time range
func (*Index) GetSegmentInfo ¶
func (idx *Index) GetSegmentInfo() []SegmentInfo
GetSegmentInfo returns information about all segments
func (*Index) GetSequenceRange ¶
GetSequenceRange returns the min and max sequence numbers in the index
func (*Index) RemoveSegment ¶
RemoveSegment removes a segment from the index
type IndexEntry ¶
type IndexEntry struct {
Timestamp time.Time
Segment string
Sequence uint64
Offset int64
Size int32
Checksum uint32
Flags uint16
}
IndexEntry represents a single record's location in the WAL
type IntegrityReport ¶
type IntegrityReport struct {
LastTimestamp time.Time
TotalRecords int
CorruptedSegments int
RecoveredRecords int
LastSequence uint64
Valid bool
}
IntegrityReport contains the results of a WAL integrity check.
type JournalEntry ¶
type JournalEntry struct {
Data []byte
Position int64
Magic uint32
Length uint32
CRC32 uint32
Status uint8
}
JournalEntry represents a write that needs to be made atomic
type Option ¶
type Option func(*config) error
Option configures the WAL.
func WithSegmentSize ¶
WithSegmentSize sets the maximum size of a WAL segment before rotation.
func WithSyncInterval ¶
WithSyncInterval sets the sync interval for SyncInterval mode.
func WithSyncMode ¶
WithSyncMode sets when the WAL syncs to disk.
type Reader ¶
type Reader struct {
// contains filtered or unexported fields
}
Reader reads events from a WAL file
type Record ¶
type Record struct {
EventData []byte
Timestamp int64
Sequence uint64
Magic uint32
Length uint32
CRC32Header uint32
CRC32Data uint32
MagicEnd uint32
Version uint16
Flags uint16
PrevHash [32]byte
}
Record represents a single entry in the WAL.
func UnmarshalRecord ¶
UnmarshalRecord deserializes a record from bytes with CRC verification.
func UnmarshalRecordFromBytes ¶
UnmarshalRecordFromBytes unmarshals a record from bytes and returns bytes read
func (*Record) ComputeHash ¶
ComputeHash calculates the SHA256 hash of the record for chaining.
type RecoveredRecord ¶
type RecoveredRecord struct {
EventData []byte
Sequence uint64
Timestamp uint64
BytesRead int
HashChainValid bool
}
RecoveredRecord contains data from a recovered WAL record.
type RecoveryEngine ¶
type RecoveryEngine struct {
// contains filtered or unexported fields
}
RecoveryEngine handles WAL recovery after corruption or crashes.
func NewRecoveryEngine ¶
func NewRecoveryEngine(path string, opts ...RecoveryOption) *RecoveryEngine
NewRecoveryEngine creates a new recovery engine for a WAL file.
func (*RecoveryEngine) ForensicRecover ¶
func (r *RecoveryEngine) ForensicRecover() (*RecoveryReport, [][]byte, error)
ForensicRecover performs advanced forensic recovery with hash chain reconstruction
func (*RecoveryEngine) Recover ¶
func (r *RecoveryEngine) Recover() (*RecoveryReport, [][]byte, error)
Recover attempts to recover all valid records from the WAL.
func (*RecoveryEngine) RecoverSegments ¶
func (r *RecoveryEngine) RecoverSegments(segments []*Segment) (*RecoveryReport, [][]byte, error)
RecoverSegments recovers records from multiple WAL segments.
func (*RecoveryEngine) RepairWAL ¶
func (r *RecoveryEngine) RepairWAL(outputPath string) error
RepairWAL attempts to repair a corrupted WAL by recovering valid records and writing them to a new file.
type RecoveryOption ¶
type RecoveryOption func(*RecoveryEngine)
RecoveryOption configures the recovery engine.
func WithChecksumVerification ¶
func WithChecksumVerification(verify bool) RecoveryOption
WithChecksumVerification enables CRC32 verification during recovery.
func WithForensicRecovery ¶
func WithForensicRecovery(enable bool) RecoveryOption
WithForensicRecovery enables advanced forensic recovery techniques
func WithMaxRecordSize ¶
func WithMaxRecordSize(size int64) RecoveryOption
WithMaxRecordSize sets the maximum expected record size.
func WithSkipCorrupted ¶
func WithSkipCorrupted(skip bool) RecoveryOption
WithSkipCorrupted allows recovery to continue past corrupted records.
type RecoveryReport ¶
type RecoveryReport struct {
RecoveredSegments []string
Errors []error
RecoveryMethods []string
TotalRecords int
RecoveredRecords int
CorruptedRecords int
SkippedBytes int64
LastGoodSequence uint64
HashChainBreaks int
ReconstructedChains int
PartialRecords int
}
RecoveryReport contains the results of a recovery operation.
type RollingChecksum ¶
type RollingChecksum struct {
// contains filtered or unexported fields
}
RollingChecksum provides a rolling checksum for streaming data
func NewRollingChecksum ¶
func NewRollingChecksum(windowSize int, typ ChecksumType) *RollingChecksum
NewRollingChecksum creates a rolling checksum with specified window size
func (*RollingChecksum) GetChecksum ¶
func (rc *RollingChecksum) GetChecksum() uint64
GetChecksum returns the current rolling checksum
func (*RollingChecksum) Update ¶
func (rc *RollingChecksum) Update(b byte) uint64
Update adds new data to the rolling window using incremental updates
type Segment ¶
type Segment struct {
CreatedAt time.Time
Path string
StartSeq uint64
EndSeq uint64
Size int64
Version uint16
Sealed bool
Corrupted bool
}
Segment represents a single WAL segment file.
type SegmentInfo ¶
type SegmentInfo struct {
StartTime time.Time
EndTime time.Time
Path string
StartSeq uint64
EndSeq uint64
Size int64
RecordCount int
Sealed bool
Corrupted bool
}
SegmentInfo contains metadata about a WAL segment
type SegmentManager ¶
type SegmentManager struct {
// contains filtered or unexported fields
}
SegmentManager handles multiple WAL segments.
func NewSegmentManager ¶
func NewSegmentManager(walPath string, maxSize int64) (*SegmentManager, error)
NewSegmentManager creates a new segment manager.
func (*SegmentManager) AddCompactedSegment ¶
func (sm *SegmentManager) AddCompactedSegment(seg *Segment) error
AddCompactedSegment adds a compacted segment to the manager
func (*SegmentManager) GetActivePath ¶
func (sm *SegmentManager) GetActivePath() string
GetActivePath returns the path to the active segment.
func (*SegmentManager) GetAllSegments ¶
func (sm *SegmentManager) GetAllSegments() []*Segment
GetAllSegments returns all segments managed by the segment manager
func (*SegmentManager) GetSegments ¶
func (sm *SegmentManager) GetSegments() []*Segment
GetSegments returns all known segments.
func (*SegmentManager) GetSegmentsInRange ¶
func (sm *SegmentManager) GetSegmentsInRange(startSeq, endSeq uint64) []*Segment
GetSegmentsInRange returns segments within a sequence range
func (*SegmentManager) ReadAllSegments ¶
func (sm *SegmentManager) ReadAllSegments() ([][]byte, error)
ReadAllSegments reads all events from all segments in order.
func (*SegmentManager) RemoveSegment ¶
func (sm *SegmentManager) RemoveSegment(seg *Segment) error
RemoveSegment removes a segment from the manager
func (*SegmentManager) Rotate ¶
func (sm *SegmentManager) Rotate(currentSeq uint64) (string, error)
Rotate creates a new segment and seals the current one.
func (*SegmentManager) ShouldRotate ¶
func (sm *SegmentManager) ShouldRotate(currentSize int64) bool
ShouldRotate checks if the current segment should be rotated.
func (*SegmentManager) UpdateSegmentSizes ¶
func (sm *SegmentManager) UpdateSegmentSizes() error
UpdateSegmentSizes refreshes the size information for all segments
type WAL ¶
type WAL struct {
// contains filtered or unexported fields
}
WAL implements a Write-Ahead Log with guaranteed durability.
func (*WAL) GetSegments ¶
GetSegments returns all segments managed by this WAL.
func (*WAL) VerifyIntegrity ¶
VerifyIntegrity checks the integrity of the entire WAL.
func (*WAL) VerifyIntegrityReport ¶
func (w *WAL) VerifyIntegrityReport() (*IntegrityReport, error)
VerifyIntegrityReport performs a detailed integrity check.
type XXHash3Checksum ¶
type XXHash3Checksum struct{}
XXHash3Checksum implements xxHash - extremely fast non-cryptographic hash Note: This uses xxHash (v2) which is production-ready and widely used
func (*XXHash3Checksum) Calculate ¶
func (c *XXHash3Checksum) Calculate(data []byte) uint64
Calculate computes the xxHash checksum of the given data.
func (*XXHash3Checksum) Name ¶
func (c *XXHash3Checksum) Name() string
Name returns the checksum algorithm name.