Documentation
¶
Index ¶
- type CSVWriter
- type CSVWriterError
- type CSVWriterOptions
- type CSVWriterStats
- type ConflictResolution
- type JSONWriter
- type JSONWriterError
- type JSONWriterOptions
- type JSONWriterStats
- type ParquetWriter
- type ParquetWriterError
- type ParquetWriterOptions
- type PostgresWriter
- type PostgresWriterError
- type PostgresWriterOption
- func WithColumns(columns []string) PostgresWriterOption
- func WithConflictResolution(resolution ConflictResolution, conflictCols, updateCols []string) PostgresWriterOption
- func WithCreateTable(create bool) PostgresWriterOption
- func WithMaxErrors(maxErrors int64) PostgresWriterOption
- func WithPostgresBatchSize(size int) PostgresWriterOption
- func WithPostgresConnectionPool(maxOpen, maxIdle int, maxLifetime, maxIdleTime time.Duration) PostgresWriterOption
- func WithPostgresDSN(dsn string) PostgresWriterOption
- func WithPostgresMetadata(metadata map[string]string) PostgresWriterOption
- func WithPostgresQueryTimeout(timeout time.Duration) PostgresWriterOption
- func WithTableName(tableName string) PostgresWriterOption
- func WithTransactionMode(enabled bool) PostgresWriterOption
- func WithTruncateTable(truncate bool) PostgresWriterOption
- type PostgresWriterOptions
- type PostgresWriterStats
- type WriterOption
- func WithBatchSize(size int64) WriterOption
- func WithCompression(compression compress.Compression) WriterOption
- func WithFieldOrder(fields []string) WriterOption
- func WithMetadata(metadata map[string]string) WriterOption
- func WithRowGroupSize(size int64) WriterOption
- func WithSchemaValidation(validate bool) WriterOption
- type WriterOptionCSV
- func WithCSVBatchSize(size int) WriterOptionCSV
- func WithCSVDelimiter(delim rune) WriterOptionCSVdeprecated
- func WithCSVHeaders(headers []string) WriterOptionCSVdeprecated
- func WithCSVWriteHeader(write bool) WriterOptionCSVdeprecated
- func WithComma(delim rune) WriterOptionCSV
- func WithHeaders(headers []string) WriterOptionCSV
- func WithUseCRLF(useCRLF bool) WriterOptionCSV
- func WithWriteHeader(write bool) WriterOptionCSV
- type WriterOptionJSON
- type WriterStats
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type CSVWriter ¶
type CSVWriter struct {
// contains filtered or unexported fields
}
CSVWriter implements core.DataSink for CSV output with stats and batching. It supports batching, header management, delimiter configuration, and statistics.
func NewCSVWriter ¶
func NewCSVWriter(w io.WriteCloser, opts ...WriterOptionCSV) (*CSVWriter, error)
NewCSVWriter creates a new CSV writer with extended options. Accepts functional options for configuration. Returns a ready-to-use writer or an error.
func (*CSVWriter) Close ¶
Close implements the core.DataSink interface. Flushes and closes all resources.
func (*CSVWriter) Flush ¶
Flush implements the core.DataSink interface. Forces any buffered records to be written to the CSV output.
func (*CSVWriter) Stats ¶
func (c *CSVWriter) Stats() CSVWriterStats
Stats returns write statistics.
type CSVWriterError ¶
type CSVWriterError struct {
Op string // Operation that failed (e.g., "write", "flush", "write_row")
Err error // Underlying error
}
CSVWriterError wraps CSV-specific write errors with context about the operation.
func (*CSVWriterError) Error ¶
func (e *CSVWriterError) Error() string
func (*CSVWriterError) Unwrap ¶
func (e *CSVWriterError) Unwrap() error
type CSVWriterOptions ¶
type CSVWriterOptions struct {
Comma rune // Field delimiter (default ',')
UseCRLF bool // Use CRLF line endings
WriteHeader bool // Write header row
Headers []string // Explicit header order
BatchSize int // Number of records to buffer before writing
}
CSVWriterOptions configures CSV output.
type CSVWriterStats ¶
type CSVWriterStats struct {
RecordsWritten int64 // Total records written
FlushCount int64 // Number of flushes performed
FlushDuration time.Duration // Total time spent flushing
LastFlushTime time.Time // Time of last flush
NullValueCounts map[string]int64 // Count of null values per field
}
CSVWriterStats holds CSV write performance statistics.
type ConflictResolution ¶
type ConflictResolution int
ConflictResolution defines how to handle INSERT conflicts in PostgreSQL.
const ( // ConflictError returns an error on conflict (default PostgreSQL behavior). ConflictError ConflictResolution = iota // ConflictIgnore ignores conflicting rows (ON CONFLICT DO NOTHING). ConflictIgnore // ConflictUpdate updates conflicting rows (ON CONFLICT DO UPDATE). ConflictUpdate )
type JSONWriter ¶
type JSONWriter struct {
// contains filtered or unexported fields
}
JSONWriter implements goetl.DataSink for line-delimited JSON output. It supports batching, buffered output, flush control, and statistics.
func NewJSONWriter ¶
func NewJSONWriter(w io.WriteCloser, opts ...WriterOptionJSON) *JSONWriter
NewJSONWriter creates a JSONWriter with optional buffered output and options. Accepts functional options for configuration. Returns a ready-to-use writer.
func (*JSONWriter) Close ¶
func (j *JSONWriter) Close() error
Close implements the core.DataSink interface. Flushes and closes all resources.
func (*JSONWriter) Flush ¶
func (j *JSONWriter) Flush() error
Flush implements the core.DataSink interface. Forces any buffered records to be written to the output.
func (*JSONWriter) Stats ¶
func (j *JSONWriter) Stats() JSONWriterStats
Stats returns the writer's performance stats.
type JSONWriterError ¶
type JSONWriterError struct {
Op string // Operation that failed (e.g., "write", "flush", "marshal_record")
Err error // Underlying error
}
JSONWriterError wraps detailed error context for JSONWriter operations.
func (*JSONWriterError) Error ¶
func (e *JSONWriterError) Error() string
func (*JSONWriterError) Unwrap ¶
func (e *JSONWriterError) Unwrap() error
type JSONWriterOptions ¶
type JSONWriterOptions struct {
BatchSize int // Number of records to buffer before writing
FlushOnWrite bool // Whether to flush after every write
BufferSize int // Size of the underlying buffer in bytes
}
JSONWriterOptions configures the JSONWriter behavior.
type JSONWriterStats ¶
type JSONWriterStats struct {
RecordsWritten int64 // Total records written
FlushCount int64 // Number of flushes performed
FlushDuration time.Duration // Total time spent flushing
LastFlushTime time.Time // Time of last flush
NullValueCounts map[string]int64 // Count of null values per field
}
JSONWriterStats holds statistics about the writer's performance.
type ParquetWriter ¶
type ParquetWriter struct {
// contains filtered or unexported fields
}
ParquetWriter implements goetl.DataSink for Parquet files. It supports batching, Arrow schema inference, compression, field ordering, schema validation, and statistics.
func NewParquetWriter ¶
func NewParquetWriter(filename string, options ...WriterOption) (*ParquetWriter, error)
NewParquetWriter creates a new Parquet writer for a file. Accepts functional options for configuration. Returns a ready-to-use writer or an error.
func (*ParquetWriter) Close ¶
func (p *ParquetWriter) Close() error
Close implements the core.DataSink interface. Flushes and closes all resources.
func (*ParquetWriter) Flush ¶
func (p *ParquetWriter) Flush() error
Flush implements the core.DataSink interface. Forces any buffered records to be written to the Parquet file.
func (*ParquetWriter) Stats ¶
func (p *ParquetWriter) Stats() WriterStats
Stats returns the current statistics of the Parquet writer.
type ParquetWriterError ¶
type ParquetWriterError struct {
Op string // Operation that failed (e.g., "read", "load_batch", "open_file", "schema")
Err error // Underlying error
}
ParquetWriterError wraps Parquet-specific write errors with context about the operation.
func (*ParquetWriterError) Error ¶
func (e *ParquetWriterError) Error() string
Error returns the error string for ParquetWriterError.
func (*ParquetWriterError) Unwrap ¶
func (e *ParquetWriterError) Unwrap() error
Unwrap returns the underlying error for ParquetWriterError.
type ParquetWriterOptions ¶
type ParquetWriterOptions struct {
BatchSize int64 // Number of records to buffer before writing
Schema *arrow.Schema // Pre-defined schema (optional)
Compression compress.Compression // Compression algorithm
FieldOrder []string // Explicit field ordering
RowGroupSize int64 // New: Control row group size
PageSize int64 // New: Control page size
DictionaryLevel map[string]bool // New: Per-field dictionary encoding
Metadata map[string]string // New: File metadata
ValidateSchema bool // New: Enable strict schema validation
}
ParquetWriterOptions configures the Parquet writer.
type PostgresWriter ¶
type PostgresWriter struct {
// contains filtered or unexported fields
}
PostgresWriter implements goetl.DataSink for PostgreSQL output. It supports batching, transactions, conflict resolution, and statistics.
func NewPostgresWriter ¶
func NewPostgresWriter(opts ...PostgresWriterOption) (*PostgresWriter, error)
NewPostgresWriter creates a new PostgreSQL writer with the given options. Accepts functional options for configuration. Returns a ready-to-use writer or an error.
func (*PostgresWriter) Close ¶
func (w *PostgresWriter) Close() error
Close implements the core.DataSink interface. Flushes and closes all resources.
func (*PostgresWriter) Flush ¶
func (w *PostgresWriter) Flush() error
Flush implements the core.DataSink interface. Forces any buffered records to be written to PostgreSQL.
func (*PostgresWriter) Stats ¶
func (w *PostgresWriter) Stats() PostgresWriterStats
Stats returns a copy of the current write statistics.
type PostgresWriterError ¶
type PostgresWriterError struct {
Op string // The operation being performed (e.g., "write", "connect")
Err error // The underlying error
}
PostgresWriterError wraps PostgreSQL-specific write errors with context about the operation.
func (*PostgresWriterError) Error ¶
func (e *PostgresWriterError) Error() string
Error returns the error string for PostgresWriterError.
func (*PostgresWriterError) Unwrap ¶
func (e *PostgresWriterError) Unwrap() error
Unwrap returns the underlying error for PostgresWriterError.
type PostgresWriterOption ¶
type PostgresWriterOption func(*PostgresWriterOptions)
PostgresWriterOption represents a configuration function for PostgresWriterOptions.
func WithColumns ¶
func WithColumns(columns []string) PostgresWriterOption
WithColumns sets the columns to write.
func WithConflictResolution ¶
func WithConflictResolution(resolution ConflictResolution, conflictCols, updateCols []string) PostgresWriterOption
WithConflictResolution sets the conflict resolution strategy and columns.
func WithCreateTable ¶
func WithCreateTable(create bool) PostgresWriterOption
WithCreateTable enables or disables table creation.
func WithMaxErrors ¶
func WithMaxErrors(maxErrors int64) PostgresWriterOption
Add error threshold option
func WithPostgresBatchSize ¶
func WithPostgresBatchSize(size int) PostgresWriterOption
WithPostgresBatchSize sets the batch size for writes.
func WithPostgresConnectionPool ¶
func WithPostgresConnectionPool(maxOpen, maxIdle int, maxLifetime, maxIdleTime time.Duration) PostgresWriterOption
WithPostgresConnectionPool configures the connection pool.
func WithPostgresDSN ¶
func WithPostgresDSN(dsn string) PostgresWriterOption
WithPostgresDSN sets the PostgreSQL connection string.
func WithPostgresMetadata ¶
func WithPostgresMetadata(metadata map[string]string) PostgresWriterOption
WithPostgresMetadata sets user metadata for the writer.
func WithPostgresQueryTimeout ¶
func WithPostgresQueryTimeout(timeout time.Duration) PostgresWriterOption
WithPostgresQueryTimeout sets the query timeout.
func WithTableName ¶
func WithTableName(tableName string) PostgresWriterOption
WithTableName sets the target table name.
func WithTransactionMode ¶
func WithTransactionMode(enabled bool) PostgresWriterOption
WithTransactionMode enables or disables transaction wrapping for batches.
func WithTruncateTable ¶
func WithTruncateTable(truncate bool) PostgresWriterOption
WithTruncateTable enables or disables table truncation before writing.
type PostgresWriterOptions ¶
type PostgresWriterOptions struct {
DSN string // PostgreSQL connection string
TableName string // Target table name
Columns []string // Columns to write (order matters)
BatchSize int // Number of records per batch
CreateTable bool // Create table if not exists
TruncateTable bool // Truncate table before writing
ConflictResolution ConflictResolution // Conflict handling strategy
ConflictColumns []string // Columns that define uniqueness for conflict resolution
UpdateColumns []string // Columns to update on conflict (for ConflictUpdate)
TransactionMode bool // Wrap batches in transactions
ConnMaxLifetime time.Duration // Max connection lifetime
ConnMaxIdleTime time.Duration // Max idle connection time
MaxOpenConns int // Max open connections
MaxIdleConns int // Max idle connections
QueryTimeout time.Duration // Timeout for queries
Metadata map[string]string // Arbitrary metadata for user tracking
MaxErrors int64
}
PostgresWriterOptions configures the PostgreSQL writer.
type PostgresWriterStats ¶
type PostgresWriterStats struct {
RecordsWritten int64 // Total records written
BatchesWritten int64 // Number of batches written
TransactionCount int64 // Number of transactions committed
LastWriteTime time.Time // Time of last write
WriteDuration time.Duration // Total time spent writing
ConnectionTime time.Duration // Time spent establishing connection
NullValueCounts map[string]int64 // Count of null values per column
ConflictCount int64 // Number of conflicts encountered
}
PostgresWriterStats holds PostgreSQL write performance statistics.
type WriterOption ¶
type WriterOption func(*ParquetWriterOptions)
WriterOption represents a configuration function for ParquetWriterOptions.
func WithBatchSize ¶
func WithBatchSize(size int64) WriterOption
WithBatchSize sets the number of records to buffer before writing a batch.
func WithCompression ¶
func WithCompression(compression compress.Compression) WriterOption
WithCompression sets the Parquet compression algorithm.
func WithFieldOrder ¶
func WithFieldOrder(fields []string) WriterOption
WithFieldOrder sets the explicit field ordering for the Parquet schema.
func WithMetadata ¶
func WithMetadata(metadata map[string]string) WriterOption
WithMetadata sets user metadata for the Parquet file.
func WithRowGroupSize ¶
func WithRowGroupSize(size int64) WriterOption
WithRowGroupSize sets the row group size for the Parquet file.
func WithSchemaValidation ¶
func WithSchemaValidation(validate bool) WriterOption
WithSchemaValidation enables or disables strict schema validation.
type WriterOptionCSV ¶
type WriterOptionCSV func(*CSVWriterOptions)
WriterOptionCSV is a functional option for CSVWriter configuration.
func WithCSVBatchSize ¶
func WithCSVBatchSize(size int) WriterOptionCSV
WithCSVBatchSize sets the batch size for the CSVWriter.
func WithCSVDelimiter
deprecated
func WithCSVDelimiter(delim rune) WriterOptionCSV
Deprecated: Use WithComma instead.
func WithCSVHeaders
deprecated
func WithCSVHeaders(headers []string) WriterOptionCSV
Deprecated: Use WithHeaders instead.
func WithCSVWriteHeader
deprecated
func WithCSVWriteHeader(write bool) WriterOptionCSV
Deprecated: Use WithWriteHeader instead.
func WithComma ¶
func WithComma(delim rune) WriterOptionCSV
WithComma sets the field delimiter for the CSV output.
func WithHeaders ¶
func WithHeaders(headers []string) WriterOptionCSV
WithHeaders sets the header row for the CSV output.
func WithUseCRLF ¶
func WithUseCRLF(useCRLF bool) WriterOptionCSV
WithUseCRLF enables or disables CRLF line endings.
func WithWriteHeader ¶
func WithWriteHeader(write bool) WriterOptionCSV
WithWriteHeader enables or disables writing the header row.
type WriterOptionJSON ¶
type WriterOptionJSON func(*JSONWriterOptions)
WriterOptionJSON is a functional option for JSONWriter configuration.
func WithBufferSize ¶
func WithBufferSize(size int) WriterOptionJSON
WithBufferSize sets the buffer size for the JSONWriter. This controls the size of the underlying bufio.Writer buffer. Larger buffers can improve performance for high-throughput scenarios.
func WithFlushOnWrite ¶
func WithFlushOnWrite(enabled bool) WriterOptionJSON
WithFlushOnWrite enables or disables flushing after every write.
func WithJSONBatchSize ¶
func WithJSONBatchSize(size int) WriterOptionJSON
WithJSONBatchSize sets the batch size for the JSONWriter.