Documentation
¶
Overview ¶
Package filereader provides generic file reading for structured data formats with composable readers, merge-sort capabilities, and pluggable data transformation.
Overview ¶
The filereader package provides streaming row-by-row access to various telemetry file formats. All readers implement a common interface and can be composed together for complex data processing patterns like merge-sort operations across multiple files. Data transformation is handled by separate translator components for maximum flexibility.
Core Interfaces ¶
All file format readers return raw data without transformation:
type Row map[string]any
type Batch struct {
Rows []Row
}
type Reader interface {
Next() (*Batch, error) // Returns next batch or io.EOF when exhausted
Close() error
TotalRowsReturned() int64 // Total rows successfully returned so far
}
type RowTranslator interface {
TranslateRow(row *Row) error // Transforms row in-place
}
Use translators for data processing and TranslatingReader for composition.
Format Readers ¶
All format readers return raw, untransformed data from files:
- ParquetRawReader: Generic Parquet files using parquet-go/parquet-go (requires io.ReaderAt)
- JSONLinesReader: Streams JSON objects line-by-line from any io.ReadCloser
- IngestProtoLogsReader: Raw OTEL log records from protobuf
- ProtoTracesReader: Raw OTEL span data from protobuf
Example usage:
// For compressed JSON, pass in a gzip reader:
gzReader, err := gzip.NewReader(file)
if err != nil {
return err
}
reader, err := NewJSONLinesReader(gzReader, 1000)
if err != nil {
return err
}
defer reader.Close()
for {
batch, err := reader.Next(ctx)
if err != nil {
if errors.Is(err, io.EOF) {
break
}
return err
}
if batch != nil {
// process raw row data in batch.Rows
for _, row := range batch.Rows {
// process each row
}
}
}
Data Translation ¶
Use translators to transform raw data:
// Create a simple translator that adds tags
translator := NewTagsTranslator(map[string]string{
"source": "myapp",
"env": "prod",
})
// Wrap any reader with translation
translatingReader := NewTranslatingReader(rawReader, translator)
// Chain multiple translators
chain := NewChainTranslator(
NewTagsTranslator(someTags),
customTranslator, // Implement your own RowTranslator
)
reader := NewTranslatingReader(rawReader, chain)
Built-in translators:
- NoopTranslator: Pass-through (no transformation)
- TagsTranslator: Adds static tags to rows
- ChainTranslator: Applies multiple translators in sequence
Implement custom translators by satisfying the RowTranslator interface.
Composite Readers ¶
Sorting Readers ¶
Choose the appropriate sorting reader based on dataset size and memory constraints:
MemorySortingReader - For smaller datasets (high memory usage, no disk I/O):
reader := NewMemorySortingReader(rawReader, &LogSortKeyProvider{})
DiskSortingReader - For larger datasets (moderate memory usage, 2x disk I/O):
reader := NewDiskSortingReader(rawReader, &LogSortKeyProvider{})
MergesortReader - For merging multiple already-sorted sources (low memory, streaming):
keyProvider := NewTimeOrderedSortKeyProvider("timestamp")
reader := NewMergesortReader([]Reader{r1, r2, r3}, keyProvider)
SequentialReader - Sequential processing (no sorting):
reader := NewSequentialReader([]Reader{r1, r2, r3})
Usage Patterns ¶
Time-ordered merge sort across multiple files:
readers := []Reader{
NewParquetRawReader(file1, size1),
NewParquetRawReader(file2, size2),
NewJSONLinesReader(file3),
}
keyProvider := NewTimeOrderedSortKeyProvider("chq_timestamp")
ordered := NewMergesortReader(readers, keyProvider)
defer ordered.Close()
for {
batch, err := ordered.Next(ctx)
if err != nil {
if errors.Is(err, io.EOF) {
break
}
return err
}
if batch != nil {
// rows arrive in timestamp order across all files
for _, row := range batch.Rows {
// process each row
}
}
}
Composable reader trees:
// Process multiple file groups in timestamp order,
// then combine groups sequentially
keyProvider := NewTimeOrderedSortKeyProvider("timestamp")
group1 := NewMergesortReader(readers1, keyProvider)
group2 := NewMergesortReader(readers2, keyProvider)
final := NewSequentialReader([]Reader{group1, group2})
Memory Management & Batch Ownership ¶
The filereader package implements efficient memory management through batch ownership:
**Batch Ownership**: Readers own the returned Batch and its Row maps. Callers must NOT retain references to batches beyond the next Next() call.
**Memory Safety**: Use pipeline.CopyBatch() if you need to retain batch data:
for {
batch, err := reader.Next(ctx)
if err != nil {
if errors.Is(err, io.EOF) {
break
}
return err
}
if batch != nil {
// Use data immediately or copy if retention needed
safeBatch := pipeline.CopyBatch(batch) // For retention
// Process batch.Rows directly for immediate use
}
}
**Data Safety**: Readers maintain clean batch states and handle EOF correctly. Batches must not be accessed after the next Next() call.
**Error Handling**: Next() returns nil batch on errors. Check error before accessing batch.
Resource Management ¶
- All readers must be closed via Close()
- Parquet readers use random access (io.ReaderAt) - no buffering
- Streaming readers (JSON, Proto) process incrementally
- Composite readers automatically close child readers
Package filereader provides a generic interface for reading rows from various file formats. Callers construct readers directly and compose them as needed for their specific use cases.
Index ¶
- Variables
- type Batch
- type CSVLogTranslator
- type CSVReader
- type ChainTranslator
- type ColumnSchema
- type DataType
- type DiskSortingReader
- type IngestLogParquetReader
- type IngestProtoLogsReader
- type IngestProtoTracesReader
- type JSONLinesReader
- type LogSortKey
- type LogSortKeyProvider
- type MemorySortingReader
- type MergesortReader
- type NoopTranslator
- type ParquetRawReader
- type PrefixedRowKeyCache
- type ProtoBinLogTranslator
- type Reader
- type ReaderOptions
- type ReaderSchema
- func (s *ReaderSchema) AddColumn(newName, originalName wkk.RowKey, dataType DataType, hasNonNull bool)
- func (s *ReaderSchema) ColumnCount() int
- func (s *ReaderSchema) Columns() []*ColumnSchema
- func (s *ReaderSchema) Copy() *ReaderSchema
- func (s *ReaderSchema) GetAllOriginalNames() map[wkk.RowKey]wkk.RowKey
- func (s *ReaderSchema) GetColumnType(name string) DataType
- func (s *ReaderSchema) GetOriginalName(newName wkk.RowKey) wkk.RowKey
- func (s *ReaderSchema) HasColumn(name string) bool
- type RowIndex
- type RowNormalizationError
- type RowTranslator
- type SchemaBuilder
- type SequentialReader
- func (sr *SequentialReader) Close() error
- func (sr *SequentialReader) CurrentReaderIndex() int
- func (sr *SequentialReader) GetSchema() *ReaderSchema
- func (sr *SequentialReader) Next(ctx context.Context) (*Batch, error)
- func (sr *SequentialReader) RemainingReaderCount() int
- func (sr *SequentialReader) TotalReaderCount() int
- func (sr *SequentialReader) TotalRowsReturned() int64
- type SignalType
- type SortKey
- type SortKeyProvider
- type TagsTranslator
- type TimeOrderedSortKey
- type TimeOrderedSortKeyProvider
- type TranslatingReader
Constants ¶
This section is empty.
Variables ¶
var ErrRowNormalization = errors.New("row normalization failed")
ErrRowNormalization is a sentinel error indicating row normalization failed. Use errors.Is(err, ErrRowNormalization) to check for this error. Use errors.As(err, &RowNormalizationError{}) to extract details.
Functions ¶
This section is empty.
Types ¶
type Batch ¶ added in v1.3.0
Batch represents a collection of rows with clear ownership semantics. The batch is owned by the reader that returns it.
type CSVLogTranslator ¶ added in v1.4.2
type CSVLogTranslator struct {
// contains filtered or unexported fields
}
CSVLogTranslator handles translation for CSV log files
func NewCSVLogTranslator ¶ added in v1.4.2
func NewCSVLogTranslator(opts ReaderOptions) *CSVLogTranslator
NewCSVLogTranslator creates a new CSV log translator
func (*CSVLogTranslator) TranslateRow ¶ added in v1.4.2
TranslateRow handles CSV-specific field translation for logs
type CSVReader ¶ added in v1.4.2
type CSVReader struct {
// contains filtered or unexported fields
}
CSVReader reads rows from a CSV stream using pipeline semantics.
func NewCSVReader ¶ added in v1.4.2
func NewCSVReader(reader io.ReadCloser, batchSize int) (*CSVReader, error)
NewCSVReader creates a new CSVReader for the given io.ReadCloser. The reader takes ownership of the closer and will close it when Close is called. If the reader is seekable (implements io.Seeker), the file will be scanned once to infer column types before being reset for actual reading.
func (*CSVReader) GetSchema ¶ added in v1.5.0
func (r *CSVReader) GetSchema() *ReaderSchema
GetSchema returns the inferred schema from scanning the file. Returns empty schema if headers haven't been read or schema inference failed.
func (*CSVReader) TotalRowsReturned ¶ added in v1.4.2
TotalRowsReturned returns the total number of rows that have been successfully returned via Next().
type ChainTranslator ¶
type ChainTranslator struct {
// contains filtered or unexported fields
}
ChainTranslator applies multiple translators in sequence.
func NewChainTranslator ¶
func NewChainTranslator(translators ...RowTranslator) *ChainTranslator
NewChainTranslator creates a translator that applies multiple translators in order.
func (*ChainTranslator) TranslateRow ¶
TranslateRow applies all translators in sequence to the row.
type ColumnSchema ¶ added in v1.5.0
ColumnSchema describes a single column in the schema.
type DataType ¶ added in v1.5.0
type DataType int
DataType represents the type of data in a column.
func InferTypeFromString ¶ added in v1.5.0
InferTypeFromString attempts to parse a string and determine its type. Returns the inferred DataType and the parsed value. This is useful for CSV readers where all values start as strings.
func InferTypeFromValue ¶ added in v1.5.0
InferTypeFromValue determines the DataType from a Go value. This handles values from JSON unmarshaling or CSV parsing.
type DiskSortingReader ¶ added in v1.3.0
type DiskSortingReader struct {
// contains filtered or unexported fields
}
Memory Impact: LOW-MODERATE - Only stores extracted sort keys in memory plus file offsets.
Much more memory-efficient than MemorySortingReader for large datasets.
Disk I/O: 2x data size - Each row written once to temp binary file, then read once during output Stability: Records are only guaranteed to be sorted by the sort function;
if the sort function is not stable, the result will not be stable
func NewDiskSortingReader ¶ added in v1.3.0
func NewDiskSortingReader(reader Reader, keyProvider SortKeyProvider, batchSize int) (*DiskSortingReader, error)
NewDiskSortingReader creates a reader that uses disk-based sorting with custom binary encoding.
Use this for large datasets that may not fit in memory. The temp file is automatically cleaned up when the reader is closed. Custom binary encoding provides efficient storage and serialization for the temporary data with no reflection overhead.
The keyProvider creates sort keys from rows to minimize memory usage during sorting.
func (*DiskSortingReader) Close ¶ added in v1.3.0
func (r *DiskSortingReader) Close() error
Close closes the reader and cleans up temp file.
func (*DiskSortingReader) GetOTELMetrics ¶ added in v1.3.0
func (r *DiskSortingReader) GetOTELMetrics() (any, error)
GetOTELMetrics implements the OTELMetricsProvider interface if the underlying reader supports it.
func (*DiskSortingReader) GetSchema ¶ added in v1.5.0
func (r *DiskSortingReader) GetSchema() *ReaderSchema
GetSchema delegates to the wrapped reader.
func (*DiskSortingReader) Next ¶ added in v1.3.0
func (r *DiskSortingReader) Next(ctx context.Context) (*Batch, error)
Next returns the next batch of sorted rows by reading from the temp file in index order.
func (*DiskSortingReader) TotalRowsReturned ¶ added in v1.3.0
func (r *DiskSortingReader) TotalRowsReturned() int64
TotalRowsReturned returns the number of rows that have been returned via Next().
type IngestLogParquetReader ¶ added in v1.5.0
type IngestLogParquetReader struct {
// contains filtered or unexported fields
}
IngestLogParquetReader reads parquet files using Apache Arrow for log ingestion. This reader performs two-pass reading: 1. First pass: Scan the file to extract schema with flattening 2. Second pass: Read rows using the extracted schema with flattening applied
It handles NULL-type columns gracefully and flattens nested structures (maps, structs, lists).
func NewIngestLogParquetReader ¶ added in v1.5.0
func NewIngestLogParquetReader(ctx context.Context, reader parquet.ReaderAtSeeker, batchSize int) (*IngestLogParquetReader, error)
NewIngestLogParquetReader creates an IngestLogParquetReader for the given parquet.ReaderAtSeeker. It performs a first pass to extract the schema with flattening, then prepares for reading rows.
func (*IngestLogParquetReader) Close ¶ added in v1.5.0
func (r *IngestLogParquetReader) Close() error
Close releases resources associated with the reader.
func (*IngestLogParquetReader) GetSchema ¶ added in v1.5.0
func (r *IngestLogParquetReader) GetSchema() *ReaderSchema
GetSchema returns a copy of the schema extracted from the Arrow metadata with flattening. Returns a copy to prevent external mutation of the internal schema.
func (*IngestLogParquetReader) Next ¶ added in v1.5.0
func (r *IngestLogParquetReader) Next(ctx context.Context) (*Batch, error)
Next returns the next batch of rows from the parquet file with flattening applied.
func (*IngestLogParquetReader) TotalRowsReturned ¶ added in v1.5.0
func (r *IngestLogParquetReader) TotalRowsReturned() int64
TotalRowsReturned returns the total number of rows successfully returned.
type IngestProtoLogsReader ¶ added in v1.3.0
type IngestProtoLogsReader struct {
// contains filtered or unexported fields
}
IngestProtoLogsReader reads rows from OpenTelemetry protobuf logs format.
Implements OTELLogsProvider interface.
func NewIngestProtoLogsReader ¶ added in v1.3.0
func NewIngestProtoLogsReader(reader io.Reader, opts ReaderOptions) (*IngestProtoLogsReader, error)
NewIngestProtoLogsReader creates a new IngestProtoLogsReader for the given io.Reader.
func (*IngestProtoLogsReader) Close ¶ added in v1.3.0
func (r *IngestProtoLogsReader) Close() error
Close closes the reader and releases resources.
func (*IngestProtoLogsReader) GetSchema ¶ added in v1.5.0
func (r *IngestProtoLogsReader) GetSchema() *ReaderSchema
GetSchema returns the schema extracted from the OTEL logs.
func (*IngestProtoLogsReader) Next ¶ added in v1.3.0
func (r *IngestProtoLogsReader) Next(ctx context.Context) (*Batch, error)
Next returns the next batch of rows from the OTEL logs.
func (*IngestProtoLogsReader) TotalRowsReturned ¶ added in v1.3.0
func (r *IngestProtoLogsReader) TotalRowsReturned() int64
TotalRowsReturned returns the total number of rows that have been successfully returned via Next().
type IngestProtoTracesReader ¶ added in v1.4.5
type IngestProtoTracesReader struct {
// contains filtered or unexported fields
}
IngestProtoTracesReader reads rows from OpenTelemetry protobuf traces format. Returns raw OTEL trace data without signal-specific transformations.
func NewIngestProtoTracesReader ¶ added in v1.4.0
func NewIngestProtoTracesReader(reader io.Reader, opts ReaderOptions) (*IngestProtoTracesReader, error)
NewIngestProtoTracesReader creates a new ProtoTracesReader for ingestion with exemplar processing.
func NewProtoTracesReader ¶
func NewProtoTracesReader(reader io.Reader, batchSize int) (*IngestProtoTracesReader, error)
NewProtoTracesReader creates a new ProtoTracesReader for the given io.Reader. The caller is responsible for closing the underlying reader.
func (*IngestProtoTracesReader) Close ¶ added in v1.4.5
func (r *IngestProtoTracesReader) Close() error
Close closes the reader and releases resources.
func (*IngestProtoTracesReader) GetSchema ¶ added in v1.5.0
func (r *IngestProtoTracesReader) GetSchema() *ReaderSchema
GetSchema returns the schema extracted from the OTEL traces.
func (*IngestProtoTracesReader) Next ¶ added in v1.4.5
func (r *IngestProtoTracesReader) Next(ctx context.Context) (*Batch, error)
Next returns the next batch of rows from the OTEL traces.
func (*IngestProtoTracesReader) TotalRowsReturned ¶ added in v1.4.5
func (r *IngestProtoTracesReader) TotalRowsReturned() int64
TotalRowsReturned returns the total number of rows that have been successfully returned via Next().
type JSONLinesReader ¶
type JSONLinesReader struct {
// contains filtered or unexported fields
}
JSONLinesReader reads rows from a JSON lines stream using pipeline semantics.
func NewJSONLinesReader ¶
func NewJSONLinesReader(reader io.ReadCloser, batchSize int) (*JSONLinesReader, error)
NewJSONLinesReader creates a new JSONLinesReader for the given io.ReadCloser. The reader takes ownership of the closer and will close it when Close is called. If the reader is seekable (implements io.Seeker), the file will be scanned once to infer schema before being reset for actual reading.
func (*JSONLinesReader) Close ¶
func (r *JSONLinesReader) Close() error
Close closes the reader and the underlying io.ReadCloser.
func (*JSONLinesReader) GetSchema ¶ added in v1.5.0
func (r *JSONLinesReader) GetSchema() *ReaderSchema
GetSchema returns the inferred schema from scanning the file. Returns empty schema if file was not seekable or schema inference failed.
func (*JSONLinesReader) Next ¶ added in v1.3.0
func (r *JSONLinesReader) Next(ctx context.Context) (*Batch, error)
func (*JSONLinesReader) TotalRowsReturned ¶ added in v1.3.0
func (r *JSONLinesReader) TotalRowsReturned() int64
TotalRowsReturned returns the total number of rows that have been successfully returned via Next().
type LogSortKey ¶ added in v1.6.2
LogSortKey represents the sort key for logs: [service_identifier, timestamp_ns] where service_identifier is resource_customer_domain if set, otherwise resource_service_name Timestamp is in nanoseconds for ordering precision.
func (*LogSortKey) Compare ¶ added in v1.6.2
func (k *LogSortKey) Compare(other SortKey) int
Compare implements SortKey interface for LogSortKey
func (*LogSortKey) Release ¶ added in v1.6.2
func (k *LogSortKey) Release()
Release returns the LogSortKey to the pool for reuse
type LogSortKeyProvider ¶ added in v1.6.2
type LogSortKeyProvider struct {
StreamField string // Optional: specific field to use for stream identification
}
LogSortKeyProvider creates LogSortKey instances from rows. If StreamField is set, that field is used for the service identifier. Otherwise, falls back to resource_customer_domain then resource_service_name.
func NewLogSortKeyProvider ¶ added in v1.7.0
func NewLogSortKeyProvider(streamField string) *LogSortKeyProvider
type MemorySortingReader ¶ added in v1.3.0
type MemorySortingReader struct {
// contains filtered or unexported fields
}
MemorySortingReader reads all rows from an underlying reader, then sorts them using a custom sort function and returns them in order. This is useful when you need sorted output with flexible sorting criteria.
Memory Impact: HIGH - All rows are loaded into memory at once Disk I/O: None (pure in-memory operations) Stability: Records are only guaranteed to be sorted by the sort function;
if the sort function is not stable, the result will not be stable
func NewMemorySortingReader ¶ added in v1.3.0
func NewMemorySortingReader(reader Reader, keyProvider SortKeyProvider, batchSize int) (*MemorySortingReader, error)
NewMemorySortingReader creates a reader that buffers all rows, sorts them using the provided key provider, then returns them in order.
Use this for smaller datasets that fit comfortably in memory. For large datasets, consider DiskSortingReader to avoid OOM issues.
func (*MemorySortingReader) Close ¶ added in v1.3.0
func (r *MemorySortingReader) Close() error
Close closes the reader and underlying reader.
func (*MemorySortingReader) GetOTELMetrics ¶ added in v1.3.0
func (r *MemorySortingReader) GetOTELMetrics() (any, error)
GetOTELMetrics implements the OTELMetricsProvider interface if the underlying reader supports it.
func (*MemorySortingReader) GetSchema ¶ added in v1.5.0
func (r *MemorySortingReader) GetSchema() *ReaderSchema
GetSchema delegates to the wrapped reader.
func (*MemorySortingReader) Next ¶ added in v1.3.0
func (r *MemorySortingReader) Next(ctx context.Context) (*Batch, error)
Next returns the next batch of sorted rows from the buffer.
func (*MemorySortingReader) TotalRowsReturned ¶ added in v1.3.0
func (r *MemorySortingReader) TotalRowsReturned() int64
TotalRowsReturned returns the number of rows that have been returned via Next().
type MergesortReader ¶ added in v1.3.0
type MergesortReader struct {
// contains filtered or unexported fields
}
MergesortReader implements merge-sort style reading across multiple pre-sorted readers. It assumes each individual reader returns rows in sorted order according to the provided SortKeyProvider.
func NewMergesortReader ¶ added in v1.3.0
func NewMergesortReader(ctx context.Context, readers []Reader, keyProvider SortKeyProvider, batchSize int) (*MergesortReader, error)
NewMergesortReader creates a new MergesortReader that merges rows from multiple readers in sorted order using the new algorithm with active reader management.
func (*MergesortReader) ActiveReaderCount ¶ added in v1.3.0
func (or *MergesortReader) ActiveReaderCount() int
ActiveReaderCount returns the number of readers that still have data available.
func (*MergesortReader) Close ¶ added in v1.3.0
func (or *MergesortReader) Close() error
Close closes all underlying readers and releases resources.
func (*MergesortReader) GetSchema ¶ added in v1.5.0
func (or *MergesortReader) GetSchema() *ReaderSchema
GetSchema returns a copy of the merged schema from all child readers. Returns a copy to prevent external mutation of the internal schema.
func (*MergesortReader) Next ¶ added in v1.3.0
func (or *MergesortReader) Next(ctx context.Context) (*Batch, error)
Next returns the next batch of rows in sorted order across all readers.
func (*MergesortReader) TotalRowsReturned ¶ added in v1.3.0
func (or *MergesortReader) TotalRowsReturned() int64
TotalRowsReturned returns the total number of rows returned by this reader.
type NoopTranslator ¶
type NoopTranslator struct{}
NoopTranslator returns rows unchanged for high performance. This is a code example of the most efficient translator implementation.
func NewNoopTranslator ¶
func NewNoopTranslator() *NoopTranslator
NewNoopTranslator creates a translator that passes rows through unchanged.
func (*NoopTranslator) TranslateRow ¶
TranslateRow does nothing for maximum performance.
type ParquetRawReader ¶ added in v1.3.0
type ParquetRawReader struct {
// contains filtered or unexported fields
}
ParquetRawReader reads rows from a generic Parquet stream. This reader provides raw parquet data without any opinionated transformations. Use wrapper readers like CookedMetricTranslatingReader for domain-specific logic.
func NewParquetRawReader ¶ added in v1.3.0
NewParquetRawReader creates a new ParquetRawReader for the given io.ReaderAt.
func (*ParquetRawReader) Close ¶ added in v1.3.0
func (r *ParquetRawReader) Close() error
Close closes the reader and releases resources.
func (*ParquetRawReader) GetSchema ¶ added in v1.5.0
func (r *ParquetRawReader) GetSchema() *ReaderSchema
GetSchema returns the schema extracted from the parquet metadata.
func (*ParquetRawReader) Next ¶ added in v1.3.0
func (r *ParquetRawReader) Next(ctx context.Context) (*Batch, error)
Next returns the next batch of rows from the parquet file.
func (*ParquetRawReader) TotalRowsReturned ¶ added in v1.3.0
func (r *ParquetRawReader) TotalRowsReturned() int64
TotalRowsReturned returns the total number of rows that have been successfully returned via Next().
type PrefixedRowKeyCache ¶ added in v1.5.0
type PrefixedRowKeyCache struct {
// contains filtered or unexported fields
}
PrefixedRowKeyCache caches prefixed RowKeys to avoid repeated string building and interning. Each unique attribute name is transformed once and reused.
Thread-safety: This cache is NOT thread-safe. It's designed for single-threaded use within a reader that processes one file at a time.
func NewPrefixedRowKeyCache ¶ added in v1.5.0
func NewPrefixedRowKeyCache(prefix string) *PrefixedRowKeyCache
NewPrefixedRowKeyCache creates a new RowKey cache with the given prefix. The prefix will be prepended to all attribute names (e.g., "resource", "scope", "attr").
func (*PrefixedRowKeyCache) Clear ¶ added in v1.5.0
func (c *PrefixedRowKeyCache) Clear()
Clear removes all cached entries. Useful for resetting between files.
func (*PrefixedRowKeyCache) Get ¶ added in v1.5.0
func (c *PrefixedRowKeyCache) Get(name string) wkk.RowKey
Get returns the cached RowKey for the given attribute name, or computes and caches it if not already present. The returned RowKey has the prefix applied and dots replaced with underscores (e.g., "service.name" with prefix "resource" → "resource_service_name").
func (*PrefixedRowKeyCache) Len ¶ added in v1.5.0
func (c *PrefixedRowKeyCache) Len() int
Len returns the number of cached RowKeys.
type ProtoBinLogTranslator ¶ added in v1.3.0
type ProtoBinLogTranslator struct {
// contains filtered or unexported fields
}
ProtoBinLogTranslator handles translation for protobuf binary log files
func NewProtoBinLogTranslator ¶ added in v1.3.0
func NewProtoBinLogTranslator(opts ReaderOptions) *ProtoBinLogTranslator
NewProtoBinLogTranslator creates a new protobuf log translator
func (*ProtoBinLogTranslator) TranslateRow ¶ added in v1.3.0
TranslateRow handles protobuf-specific field translation
type Reader ¶
type Reader interface {
// Next returns the next batch of rows, or io.EOF when exhausted.
// The returned batch is owned by the reader and must not be retained
// beyond the next Next() call. Use pipeline.CopyBatch() if you need to retain.
// The context can be used for cancellation, deadlines, and updating metrics.
Next(ctx context.Context) (*Batch, error)
// Close releases any resources held by the reader.
Close() error
// TotalRowsReturned returns the total number of rows that have been successfully
// returned via Next() calls from this reader so far.
TotalRowsReturned() int64
// GetSchema returns the ReaderSchema extracted from the content. Must not return nil, and
// must include all columns that may be returned by Next() with the types in the schema.
GetSchema() *ReaderSchema
}
Reader is the core interface for reading rows from any file format using pipeline semantics. This eliminates memory ownership issues by establishing clear ownership: batches are owned by the reader and must not be retained beyond the next Next() call.
func ReaderForFile ¶ added in v1.3.0
func ReaderForFile(filename string, signalType SignalType, orgId string) (Reader, error)
ReaderForFile creates a Reader for the given file based on its extension and signal type. This is a convenience function that uses default options.
func ReaderForFileWithOptions ¶ added in v1.3.0
func ReaderForFileWithOptions(filename string, opts ReaderOptions) (Reader, error)
ReaderForFileWithOptions creates a Reader for the given file with the provided options. Supported file formats:
- .parquet: Creates a ParquetRawReader (works for all signal types)
- .json.gz: Creates a JSONLinesReader with gzip decompression (works for all signal types)
- .json: Creates a JSONLinesReader (works for all signal types)
- .csv: Creates a CSVReader (works for all signal types)
- .csv.gz: Creates a CSVReader with gzip decompression (works for all signal types)
- .binpb: Creates a signal-specific proto reader (NewIngestProtoLogsReader or NewProtoTracesReader)
- .binpb.gz: Creates a signal-specific proto reader with gzip decompression
type ReaderOptions ¶ added in v1.3.0
type ReaderOptions struct {
SignalType SignalType
BatchSize int // Batch size for readers (default: 1000)
// Translation options for protobuf logs and metrics
OrgID string
Bucket string
ObjectID string
// Aggregation options for metrics
EnableAggregation bool // Enable streaming aggregation
AggregationPeriodMs int64 // Aggregation period in milliseconds (e.g., 10000 for 10s)
}
ReaderOptions provides options for creating readers.
type ReaderSchema ¶ added in v1.5.0
type ReaderSchema struct {
// contains filtered or unexported fields
}
ReaderSchema represents the complete schema for a reader.
func ExtractCompleteParquetSchema ¶ added in v1.5.0
func ExtractCompleteParquetSchema(ctx context.Context, pf *file.Reader, fr *pqarrow.FileReader) (*ReaderSchema, error)
ExtractCompleteParquetSchema performs a full two-pass scan of a parquet file to discover all columns, including dynamic map keys and nested structures.
This function: 1. Reads parquet metadata to get column types (physical types from parquet schema) 2. Reads Arrow schema to get structure (for nested types) 3. Scans ALL rows in the file to discover all map keys and track actual data types 4. Returns a complete ReaderSchema with all flattened column paths and proper types
For simple types (int32, int64, string, etc): use parquet physical type For maps: discover keys by scanning data, track value types For nested structs: recursively flatten all paths For arrays/lists: treat as-is (not flattened)
func NewReaderSchema ¶ added in v1.5.0
func NewReaderSchema() *ReaderSchema
NewReaderSchema creates a new empty schema.
func (*ReaderSchema) AddColumn ¶ added in v1.5.0
func (s *ReaderSchema) AddColumn(newName, originalName wkk.RowKey, dataType DataType, hasNonNull bool)
AddColumn adds or updates a column in the schema with name mapping. The column is stored under the new (normalized) name. The mapping tracks the relationship between new and original names. For identity mappings (where new == original), pass the same name for both parameters.
func (*ReaderSchema) ColumnCount ¶ added in v1.9.0
func (s *ReaderSchema) ColumnCount() int
ColumnCount returns the number of columns in the schema without allocating.
func (*ReaderSchema) Columns ¶ added in v1.5.0
func (s *ReaderSchema) Columns() []*ColumnSchema
Columns returns all column schemas.
func (*ReaderSchema) Copy ¶ added in v1.5.0
func (s *ReaderSchema) Copy() *ReaderSchema
Copy returns a deep copy of the schema. This is important because callers may mutate the returned schema.
func (*ReaderSchema) GetAllOriginalNames ¶ added in v1.5.0
func (s *ReaderSchema) GetAllOriginalNames() map[wkk.RowKey]wkk.RowKey
GetAllOriginalNames returns a map of all new names to original names.
func (*ReaderSchema) GetColumnType ¶ added in v1.5.0
func (s *ReaderSchema) GetColumnType(name string) DataType
GetColumnType returns the data type for a column name.
func (*ReaderSchema) GetOriginalName ¶ added in v1.5.0
func (s *ReaderSchema) GetOriginalName(newName wkk.RowKey) wkk.RowKey
GetOriginalName returns the original name for a column, or the same name if no mapping exists.
func (*ReaderSchema) HasColumn ¶ added in v1.5.0
func (s *ReaderSchema) HasColumn(name string) bool
HasColumn returns true if the schema has the specified column.
type RowIndex ¶ added in v1.3.0
type RowIndex struct {
SortKey SortKey // Extracted sort key for sorting
FileOffset int64
ByteLength int32
}
RowIndex represents a lightweight pointer to a binary-encoded row in the temp file. It stores only the extracted sort key plus file location info.
type RowNormalizationError ¶ added in v1.5.0
type RowNormalizationError struct {
// Column is the name of the column that caused the error
Column string
// Reason describes what went wrong
Reason string
// Err is the underlying error if any
Err error
}
RowNormalizationError represents an error that occurred while normalizing a single row. These errors are row-specific and indicate data quality issues rather than systemic failures.
func (*RowNormalizationError) Error ¶ added in v1.5.0
func (e *RowNormalizationError) Error() string
func (*RowNormalizationError) Is ¶ added in v1.5.0
func (e *RowNormalizationError) Is(target error) bool
func (*RowNormalizationError) Unwrap ¶ added in v1.5.0
func (e *RowNormalizationError) Unwrap() error
type RowTranslator ¶
type RowTranslator interface {
// TranslateRow transforms a row in-place by modifying the provided row pointer.
TranslateRow(ctx context.Context, row *pipeline.Row) error
}
RowTranslator transforms rows from one format to another.
type SchemaBuilder ¶ added in v1.5.0
type SchemaBuilder struct {
// contains filtered or unexported fields
}
SchemaBuilder helps build schemas by scanning data and tracking type promotions.
func NewSchemaBuilder ¶ added in v1.5.0
func NewSchemaBuilder() *SchemaBuilder
NewSchemaBuilder creates a new schema builder.
func (*SchemaBuilder) AddStringValue ¶ added in v1.5.0
func (sb *SchemaBuilder) AddStringValue(columnName string, stringValue string)
AddStringValue parses a string value, infers its type, and adds to schema.
func (*SchemaBuilder) AddValue ¶ added in v1.5.0
func (sb *SchemaBuilder) AddValue(columnName string, value any)
AddValue adds a value to the schema, inferring its type and promoting if needed.
func (*SchemaBuilder) Build ¶ added in v1.5.0
func (sb *SchemaBuilder) Build() *ReaderSchema
Build returns the built schema.
type SequentialReader ¶ added in v1.3.0
type SequentialReader struct {
// contains filtered or unexported fields
}
SequentialReader reads from multiple readers sequentially in the order provided. It reads all rows from the first reader, then all rows from the second reader, etc. This is useful when you want to concatenate multiple files without any ordering requirements.
func NewSequentialReader ¶ added in v1.3.0
func NewSequentialReader(readers []Reader, batchSize int) (*SequentialReader, error)
NewSequentialReader creates a new SequentialReader that reads from the provided readers sequentially. Readers will be closed when the SequentialReader is closed.
func (*SequentialReader) Close ¶ added in v1.3.0
func (sr *SequentialReader) Close() error
Close closes all underlying readers and releases resources.
func (*SequentialReader) CurrentReaderIndex ¶ added in v1.3.0
func (sr *SequentialReader) CurrentReaderIndex() int
CurrentReaderIndex returns the index of the reader currently being read from. Returns -1 if all readers are exhausted or the reader is closed.
func (*SequentialReader) GetSchema ¶ added in v1.5.0
func (sr *SequentialReader) GetSchema() *ReaderSchema
GetSchema merges schemas from all child readers using the same type promotion rules as MergesortReader.
func (*SequentialReader) Next ¶ added in v1.3.0
func (sr *SequentialReader) Next(ctx context.Context) (*Batch, error)
func (*SequentialReader) RemainingReaderCount ¶ added in v1.3.0
func (sr *SequentialReader) RemainingReaderCount() int
RemainingReaderCount returns the number of readers that haven't been fully processed yet.
func (*SequentialReader) TotalReaderCount ¶ added in v1.3.0
func (sr *SequentialReader) TotalReaderCount() int
TotalReaderCount returns the total number of readers in this SequentialReader.
func (*SequentialReader) TotalRowsReturned ¶ added in v1.3.0
func (sr *SequentialReader) TotalRowsReturned() int64
TotalRowsReturned returns the total number of rows that have been successfully returned via Next() from all readers.
type SignalType ¶ added in v1.3.0
type SignalType int
SignalType represents the type of telemetry signal being processed.
const ( // SignalTypeLogs represents log data SignalTypeLogs SignalType = iota // SignalTypeTraces represents trace data SignalTypeTraces )
func (SignalType) String ¶ added in v1.3.0
func (s SignalType) String() string
String returns the string representation of the signal type.
type SortKey ¶ added in v1.3.0
type SortKey interface {
// Compare returns:
// -1 if this key should come before other
// 0 if this key equals other
// 1 if this key should come after other
Compare(other SortKey) int
// Release returns the key to its pool for reuse
Release()
}
SortKey represents a key that can be compared for sorting
type SortKeyProvider ¶ added in v1.3.0
type SortKeyProvider interface {
// MakeKey creates a sort key from a row
MakeKey(row pipeline.Row) SortKey
}
SortKeyProvider creates sort keys from rows
type TagsTranslator ¶
type TagsTranslator struct {
// contains filtered or unexported fields
}
TagsTranslator adds static tags to every row.
func NewTagsTranslator ¶
func NewTagsTranslator(tags map[string]string) *TagsTranslator
NewTagsTranslator creates a translator that adds the given tags to each row.
func (*TagsTranslator) TranslateRow ¶
TranslateRow adds tags to the row in-place.
type TimeOrderedSortKey ¶ added in v1.3.0
type TimeOrderedSortKey struct {
// contains filtered or unexported fields
}
TimeOrderedSortKey represents a sort key for timestamp-based ordering
func (*TimeOrderedSortKey) Compare ¶ added in v1.3.0
func (k *TimeOrderedSortKey) Compare(other SortKey) int
func (*TimeOrderedSortKey) Release ¶ added in v1.3.0
func (k *TimeOrderedSortKey) Release()
type TimeOrderedSortKeyProvider ¶ added in v1.3.0
type TimeOrderedSortKeyProvider struct {
// contains filtered or unexported fields
}
TimeOrderedSortKeyProvider creates sort keys based on a single timestamp field
func NewTimeOrderedSortKeyProvider ¶ added in v1.3.0
func NewTimeOrderedSortKeyProvider(fieldName string) *TimeOrderedSortKeyProvider
NewTimeOrderedSortKeyProvider creates a provider that sorts by a timestamp field
type TranslatingReader ¶
type TranslatingReader struct {
// contains filtered or unexported fields
}
TranslatingReader wraps another Reader and applies row transformations. This enables composition where any Reader can be enhanced with signal-specific translation logic without coupling file parsing to data transformation.
func NewTranslatingReader ¶
func NewTranslatingReader(reader Reader, translator RowTranslator, batchSize int) (*TranslatingReader, error)
NewTranslatingReader creates a new TranslatingReader that applies the given translator to each row returned by the underlying reader.
The TranslatingReader takes ownership of the underlying reader and will close it when Close() is called.
func (*TranslatingReader) Close ¶
func (tr *TranslatingReader) Close() error
Close closes the underlying reader and releases resources.
func (*TranslatingReader) GetSchema ¶ added in v1.5.0
func (tr *TranslatingReader) GetSchema() *ReaderSchema
GetSchema delegates to the wrapped reader.
func (*TranslatingReader) Next ¶ added in v1.3.0
func (tr *TranslatingReader) Next(ctx context.Context) (*Batch, error)
Next returns the next batch of translated rows from the underlying reader.
func (*TranslatingReader) TotalRowsReturned ¶ added in v1.3.0
func (tr *TranslatingReader) TotalRowsReturned() int64
TotalRowsReturned returns the total number of rows that have been successfully returned via Next() after translation by this reader.
Source Files
¶
- csv_log_translator.go
- csv_reader.go
- disk_sorting_reader.go
- doc.go
- ingest_log_parquet_reader.go
- ingest_proto_helpers.go
- ingest_proto_logs.go
- ingest_proto_traces.go
- jsonlines.go
- memory_sorting.go
- mergesort_reader.go
- otel_attributes.go
- otel_schema.go
- parquet_raw_reader.go
- parquet_schema.go
- parquet_schema_extractor.go
- protobuf_log_translator.go
- reader.go
- reader_factory.go
- rowkey_cache.go
- schema.go
- schema_inference.go
- sequential_reader.go
- signal_type.go
- sort_log.go
- sorting.go
- telemetry.go
- translating.go
- translators.go