writers

package
v0.0.0-...-b3910b2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jun 13, 2025 License: GPL-2.0 Imports: 23 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type CSVWriter

type CSVWriter struct {
	// contains filtered or unexported fields
}

CSVWriter implements core.DataSink for CSV output with stats and batching. It supports batching, header management, delimiter configuration, and statistics.

func NewCSVWriter

func NewCSVWriter(w io.WriteCloser, opts ...WriterOptionCSV) (*CSVWriter, error)

NewCSVWriter creates a new CSV writer with extended options. Accepts functional options for configuration. Returns a ready-to-use writer or an error.

func (*CSVWriter) Close

func (c *CSVWriter) Close() error

Close implements the core.DataSink interface. Flushes and closes all resources.

func (*CSVWriter) Flush

func (c *CSVWriter) Flush() error

Flush implements the core.DataSink interface. Forces any buffered records to be written to the CSV output.

func (*CSVWriter) Stats

func (c *CSVWriter) Stats() CSVWriterStats

Stats returns write statistics.

func (*CSVWriter) Write

func (c *CSVWriter) Write(ctx context.Context, record core.Record) error

Write implements the core.DataSink interface. Buffers records and writes in batches or on flush. Thread-safe.

type CSVWriterError

type CSVWriterError struct {
	Op  string // Operation that failed (e.g., "write", "flush", "write_row")
	Err error  // Underlying error
}

CSVWriterError wraps CSV-specific write errors with context about the operation.

func (*CSVWriterError) Error

func (e *CSVWriterError) Error() string

func (*CSVWriterError) Unwrap

func (e *CSVWriterError) Unwrap() error

type CSVWriterOptions

type CSVWriterOptions struct {
	Comma       rune     // Field delimiter (default ',')
	UseCRLF     bool     // Use CRLF line endings
	WriteHeader bool     // Write header row
	Headers     []string // Explicit header order
	BatchSize   int      // Number of records to buffer before writing
}

CSVWriterOptions configures CSV output.

type CSVWriterStats

type CSVWriterStats struct {
	RecordsWritten  int64            // Total records written
	FlushCount      int64            // Number of flushes performed
	FlushDuration   time.Duration    // Total time spent flushing
	LastFlushTime   time.Time        // Time of last flush
	NullValueCounts map[string]int64 // Count of null values per field
}

CSVWriterStats holds CSV write performance statistics.

type ConflictResolution

type ConflictResolution int

ConflictResolution defines how to handle INSERT conflicts in PostgreSQL.

const (
	// ConflictError returns an error on conflict (default PostgreSQL behavior).
	ConflictError ConflictResolution = iota
	// ConflictIgnore ignores conflicting rows (ON CONFLICT DO NOTHING).
	ConflictIgnore
	// ConflictUpdate updates conflicting rows (ON CONFLICT DO UPDATE).
	ConflictUpdate
)

type JSONWriter

type JSONWriter struct {
	// contains filtered or unexported fields
}

JSONWriter implements goetl.DataSink for line-delimited JSON output. It supports batching, buffered output, flush control, and statistics.

func NewJSONWriter

func NewJSONWriter(w io.WriteCloser, opts ...WriterOptionJSON) *JSONWriter

NewJSONWriter creates a JSONWriter with optional buffered output and options. Accepts functional options for configuration. Returns a ready-to-use writer.

func (*JSONWriter) Close

func (j *JSONWriter) Close() error

Close implements the core.DataSink interface. Flushes and closes all resources.

func (*JSONWriter) Flush

func (j *JSONWriter) Flush() error

Flush implements the core.DataSink interface. Forces any buffered records to be written to the output.

func (*JSONWriter) Stats

func (j *JSONWriter) Stats() JSONWriterStats

Stats returns the writer's performance stats.

func (*JSONWriter) Write

func (j *JSONWriter) Write(ctx context.Context, record core.Record) error

Write implements the core.DataSink interface. Buffers records and writes in batches or on flush. Thread-safe.

type JSONWriterError

type JSONWriterError struct {
	Op  string // Operation that failed (e.g., "write", "flush", "marshal_record")
	Err error  // Underlying error
}

JSONWriterError wraps detailed error context for JSONWriter operations.

func (*JSONWriterError) Error

func (e *JSONWriterError) Error() string

func (*JSONWriterError) Unwrap

func (e *JSONWriterError) Unwrap() error

type JSONWriterOptions

type JSONWriterOptions struct {
	BatchSize    int  // Number of records to buffer before writing
	FlushOnWrite bool // Whether to flush after every write
	BufferSize   int  // Size of the underlying buffer in bytes
}

JSONWriterOptions configures the JSONWriter behavior.

type JSONWriterStats

type JSONWriterStats struct {
	RecordsWritten  int64            // Total records written
	FlushCount      int64            // Number of flushes performed
	FlushDuration   time.Duration    // Total time spent flushing
	LastFlushTime   time.Time        // Time of last flush
	NullValueCounts map[string]int64 // Count of null values per field
}

JSONWriterStats holds statistics about the writer's performance.

type ParquetWriter

type ParquetWriter struct {
	// contains filtered or unexported fields
}

ParquetWriter implements goetl.DataSink for Parquet files. It supports batching, Arrow schema inference, compression, field ordering, schema validation, and statistics.

func NewParquetWriter

func NewParquetWriter(filename string, options ...WriterOption) (*ParquetWriter, error)

NewParquetWriter creates a new Parquet writer for a file. Accepts functional options for configuration. Returns a ready-to-use writer or an error.

func (*ParquetWriter) Close

func (p *ParquetWriter) Close() error

Close implements the core.DataSink interface. Flushes and closes all resources.

func (*ParquetWriter) Flush

func (p *ParquetWriter) Flush() error

Flush implements the core.DataSink interface. Forces any buffered records to be written to the Parquet file.

func (*ParquetWriter) Stats

func (p *ParquetWriter) Stats() WriterStats

Stats returns the current statistics of the Parquet writer.

func (*ParquetWriter) Write

func (p *ParquetWriter) Write(ctx context.Context, record core.Record) error

Write implements the core.DataSink interface. Buffers records and writes in batches. Thread-safe.

type ParquetWriterError

type ParquetWriterError struct {
	Op  string // Operation that failed (e.g., "read", "load_batch", "open_file", "schema")
	Err error  // Underlying error
}

ParquetWriterError wraps Parquet-specific write errors with context about the operation.

func (*ParquetWriterError) Error

func (e *ParquetWriterError) Error() string

Error returns the error string for ParquetWriterError.

func (*ParquetWriterError) Unwrap

func (e *ParquetWriterError) Unwrap() error

Unwrap returns the underlying error for ParquetWriterError.

type ParquetWriterOptions

type ParquetWriterOptions struct {
	BatchSize       int64                // Number of records to buffer before writing
	Schema          *arrow.Schema        // Pre-defined schema (optional)
	Compression     compress.Compression // Compression algorithm
	FieldOrder      []string             // Explicit field ordering
	RowGroupSize    int64                // New: Control row group size
	PageSize        int64                // New: Control page size
	DictionaryLevel map[string]bool      // New: Per-field dictionary encoding
	Metadata        map[string]string    // New: File metadata
	ValidateSchema  bool                 // New: Enable strict schema validation

}

ParquetWriterOptions configures the Parquet writer.

type PostgresWriter

type PostgresWriter struct {
	// contains filtered or unexported fields
}

PostgresWriter implements goetl.DataSink for PostgreSQL output. It supports batching, transactions, conflict resolution, and statistics.

func NewPostgresWriter

func NewPostgresWriter(opts ...PostgresWriterOption) (*PostgresWriter, error)

NewPostgresWriter creates a new PostgreSQL writer with the given options. Accepts functional options for configuration. Returns a ready-to-use writer or an error.

func (*PostgresWriter) Close

func (w *PostgresWriter) Close() error

Close implements the core.DataSink interface. Flushes and closes all resources.

func (*PostgresWriter) Flush

func (w *PostgresWriter) Flush() error

Flush implements the core.DataSink interface. Forces any buffered records to be written to PostgreSQL.

func (*PostgresWriter) Stats

Stats returns a copy of the current write statistics.

func (*PostgresWriter) Write

func (w *PostgresWriter) Write(ctx context.Context, record core.Record) error

Write implements the core.DataSink interface. Buffers records and writes in batches. Thread-safe.

type PostgresWriterError

type PostgresWriterError struct {
	Op  string // The operation being performed (e.g., "write", "connect")
	Err error  // The underlying error
}

PostgresWriterError wraps PostgreSQL-specific write errors with context about the operation.

func (*PostgresWriterError) Error

func (e *PostgresWriterError) Error() string

Error returns the error string for PostgresWriterError.

func (*PostgresWriterError) Unwrap

func (e *PostgresWriterError) Unwrap() error

Unwrap returns the underlying error for PostgresWriterError.

type PostgresWriterOption

type PostgresWriterOption func(*PostgresWriterOptions)

PostgresWriterOption represents a configuration function for PostgresWriterOptions.

func WithColumns

func WithColumns(columns []string) PostgresWriterOption

WithColumns sets the columns to write.

func WithConflictResolution

func WithConflictResolution(resolution ConflictResolution, conflictCols, updateCols []string) PostgresWriterOption

WithConflictResolution sets the conflict resolution strategy and columns.

func WithCreateTable

func WithCreateTable(create bool) PostgresWriterOption

WithCreateTable enables or disables table creation.

func WithMaxErrors

func WithMaxErrors(maxErrors int64) PostgresWriterOption

Add error threshold option

func WithPostgresBatchSize

func WithPostgresBatchSize(size int) PostgresWriterOption

WithPostgresBatchSize sets the batch size for writes.

func WithPostgresConnectionPool

func WithPostgresConnectionPool(maxOpen, maxIdle int, maxLifetime, maxIdleTime time.Duration) PostgresWriterOption

WithPostgresConnectionPool configures the connection pool.

func WithPostgresDSN

func WithPostgresDSN(dsn string) PostgresWriterOption

WithPostgresDSN sets the PostgreSQL connection string.

func WithPostgresMetadata

func WithPostgresMetadata(metadata map[string]string) PostgresWriterOption

WithPostgresMetadata sets user metadata for the writer.

func WithPostgresQueryTimeout

func WithPostgresQueryTimeout(timeout time.Duration) PostgresWriterOption

WithPostgresQueryTimeout sets the query timeout.

func WithTableName

func WithTableName(tableName string) PostgresWriterOption

WithTableName sets the target table name.

func WithTransactionMode

func WithTransactionMode(enabled bool) PostgresWriterOption

WithTransactionMode enables or disables transaction wrapping for batches.

func WithTruncateTable

func WithTruncateTable(truncate bool) PostgresWriterOption

WithTruncateTable enables or disables table truncation before writing.

type PostgresWriterOptions

type PostgresWriterOptions struct {
	DSN                string             // PostgreSQL connection string
	TableName          string             // Target table name
	Columns            []string           // Columns to write (order matters)
	BatchSize          int                // Number of records per batch
	CreateTable        bool               // Create table if not exists
	TruncateTable      bool               // Truncate table before writing
	ConflictResolution ConflictResolution // Conflict handling strategy
	ConflictColumns    []string           // Columns that define uniqueness for conflict resolution
	UpdateColumns      []string           // Columns to update on conflict (for ConflictUpdate)
	TransactionMode    bool               // Wrap batches in transactions
	ConnMaxLifetime    time.Duration      // Max connection lifetime
	ConnMaxIdleTime    time.Duration      // Max idle connection time
	MaxOpenConns       int                // Max open connections
	MaxIdleConns       int                // Max idle connections
	QueryTimeout       time.Duration      // Timeout for queries
	Metadata           map[string]string  // Arbitrary metadata for user tracking
	MaxErrors          int64
}

PostgresWriterOptions configures the PostgreSQL writer.

type PostgresWriterStats

type PostgresWriterStats struct {
	RecordsWritten   int64            // Total records written
	BatchesWritten   int64            // Number of batches written
	TransactionCount int64            // Number of transactions committed
	LastWriteTime    time.Time        // Time of last write
	WriteDuration    time.Duration    // Total time spent writing
	ConnectionTime   time.Duration    // Time spent establishing connection
	NullValueCounts  map[string]int64 // Count of null values per column
	ConflictCount    int64            // Number of conflicts encountered
}

PostgresWriterStats holds PostgreSQL write performance statistics.

type WriterOption

type WriterOption func(*ParquetWriterOptions)

WriterOption represents a configuration function for ParquetWriterOptions.

func WithBatchSize

func WithBatchSize(size int64) WriterOption

WithBatchSize sets the number of records to buffer before writing a batch.

func WithCompression

func WithCompression(compression compress.Compression) WriterOption

WithCompression sets the Parquet compression algorithm.

func WithFieldOrder

func WithFieldOrder(fields []string) WriterOption

WithFieldOrder sets the explicit field ordering for the Parquet schema.

func WithMetadata

func WithMetadata(metadata map[string]string) WriterOption

WithMetadata sets user metadata for the Parquet file.

func WithRowGroupSize

func WithRowGroupSize(size int64) WriterOption

WithRowGroupSize sets the row group size for the Parquet file.

func WithSchemaValidation

func WithSchemaValidation(validate bool) WriterOption

WithSchemaValidation enables or disables strict schema validation.

type WriterOptionCSV

type WriterOptionCSV func(*CSVWriterOptions)

WriterOptionCSV is a functional option for CSVWriter configuration.

func WithCSVBatchSize

func WithCSVBatchSize(size int) WriterOptionCSV

WithCSVBatchSize sets the batch size for the CSVWriter.

func WithCSVDelimiter deprecated

func WithCSVDelimiter(delim rune) WriterOptionCSV

Deprecated: Use WithComma instead.

func WithCSVHeaders deprecated

func WithCSVHeaders(headers []string) WriterOptionCSV

Deprecated: Use WithHeaders instead.

func WithCSVWriteHeader deprecated

func WithCSVWriteHeader(write bool) WriterOptionCSV

Deprecated: Use WithWriteHeader instead.

func WithComma

func WithComma(delim rune) WriterOptionCSV

WithComma sets the field delimiter for the CSV output.

func WithHeaders

func WithHeaders(headers []string) WriterOptionCSV

WithHeaders sets the header row for the CSV output.

func WithUseCRLF

func WithUseCRLF(useCRLF bool) WriterOptionCSV

WithUseCRLF enables or disables CRLF line endings.

func WithWriteHeader

func WithWriteHeader(write bool) WriterOptionCSV

WithWriteHeader enables or disables writing the header row.

type WriterOptionJSON

type WriterOptionJSON func(*JSONWriterOptions)

WriterOptionJSON is a functional option for JSONWriter configuration.

func WithBufferSize

func WithBufferSize(size int) WriterOptionJSON

WithBufferSize sets the buffer size for the JSONWriter. This controls the size of the underlying bufio.Writer buffer. Larger buffers can improve performance for high-throughput scenarios.

func WithFlushOnWrite

func WithFlushOnWrite(enabled bool) WriterOptionJSON

WithFlushOnWrite enables or disables flushing after every write.

func WithJSONBatchSize

func WithJSONBatchSize(size int) WriterOptionJSON

WithJSONBatchSize sets the batch size for the JSONWriter.

type WriterStats

type WriterStats struct {
	RecordsWritten  int64
	BatchesWritten  int64
	BytesWritten    int64
	FlushDuration   time.Duration
	LastFlushTime   time.Time
	ErrorCount      int64
	NullValueCounts map[string]int64
}

WriterStats holds statistics about the Parquet writer's performance.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL