Documentation
¶
Index ¶
- Variables
- type AsyncProducer
- func (p *AsyncProducer) Close() error
- func (p *AsyncProducer) Errors() <-chan error
- func (p *AsyncProducer) Send(ctx context.Context, msg *Message) (*ProducerResult, error)
- func (p *AsyncProducer) SendAsync(msg *Message, callback func(*ProducerResult, error))
- func (p *AsyncProducer) SendJSON(ctx context.Context, topic string, key string, value interface{}) (*ProducerResult, error)
- func (p *AsyncProducer) Successes() <-chan *ProducerResult
- type Component
- func (c *Component) DependsOn() []string
- func (c *Component) GetHealthChecker() component.HealthChecker
- func (c *Component) GetManager() *Manager
- func (c *Component) Init(ctx context.Context, loader component.ConfigLoader) error
- func (c *Component) Name() string
- func (c *Component) Start(ctx context.Context) error
- func (c *Component) Stop(ctx context.Context) error
- type Config
- type ConsumedMessage
- type Consumer
- type ConsumerConfig
- type ConsumerGroup
- type ConsumerGroupInfo
- type ConsumerGroupMember
- type HealthChecker
- type KafkaMetrics
- func (m *KafkaMetrics) IsMetricsEnabled() bool
- func (m *KafkaMetrics) IsRegistered() bool
- func (m *KafkaMetrics) MetricsName() string
- func (m *KafkaMetrics) RecordConsume(ctx context.Context, topic, group string, partition int32, ...)
- func (m *KafkaMetrics) RecordProduce(ctx context.Context, topic string, partition int32, duration time.Duration, ...)
- func (m *KafkaMetrics) RegisterLagCallback(group string, callback func() int64)
- func (m *KafkaMetrics) RegisterMetrics(meter metric.Meter) error
- func (m *KafkaMetrics) UnregisterLagCallback(group string)
- type KafkaMetricsConfig
- type Manager
- func (m *Manager) Close() error
- func (m *Manager) Connect(ctx context.Context) error
- func (m *Manager) CreateConsumer(name string, cfg ConsumerConfig) (*ConsumerGroup, error)
- func (m *Manager) CreateTopic(ctx context.Context, name string, partitions int32, replication int16) error
- func (m *Manager) DeleteTopic(ctx context.Context, name string) error
- func (m *Manager) DescribeConsumerGroup(ctx context.Context, groupID string) (*ConsumerGroupInfo, error)
- func (m *Manager) DescribeTopic(ctx context.Context, name string) (*TopicInfo, error)
- func (m *Manager) GetAsyncProducer() (*AsyncProducer, error)
- func (m *Manager) GetConfig() Config
- func (m *Manager) GetConsumer(name string) *ConsumerGroup
- func (m *Manager) GetOffset(ctx context.Context, groupID, topic string) (map[int32]int64, error)
- func (m *Manager) GetProducer() Producer
- func (m *Manager) ListConsumerGroups(ctx context.Context) ([]string, error)
- func (m *Manager) ListTopics(ctx context.Context) ([]string, error)
- func (m *Manager) Ping(ctx context.Context) error
- func (m *Manager) ResetOffset(ctx context.Context, groupID, topic string, offset int64) error
- type Message
- type MessageHandler
- type PartitionInfo
- type Producer
- type ProducerConfig
- type ProducerResult
- type SASLConfig
- type SimpleConsumer
- type SyncProducer
- func (p *SyncProducer) Close() error
- func (p *SyncProducer) Send(ctx context.Context, msg *Message) (*ProducerResult, error)
- func (p *SyncProducer) SendAsync(msg *Message, callback func(*ProducerResult, error))
- func (p *SyncProducer) SendJSON(ctx context.Context, topic string, key string, value interface{}) (*ProducerResult, error)
- type TLSConfig
- type TopicInfo
- type XDGSCRAMClient
Constants ¶
This section is empty.
Variables ¶
var SHA256 scram.HashGeneratorFcn = func() hash.Hash { return sha256.New() }
SHA256 SCRAM-SHA-256 哈希函数
var SHA512 scram.HashGeneratorFcn = func() hash.Hash { return sha512.New() }
SHA512 SCRAM-SHA-512 哈希函数
Functions ¶
This section is empty.
Types ¶
type AsyncProducer ¶
type AsyncProducer struct {
// contains filtered or unexported fields
}
AsyncProducer 异步生产者实现
func NewAsyncProducer ¶
func NewAsyncProducer(brokers []string, cfg ProducerConfig, saramaCfg *sarama.Config, logger *zap.Logger) (*AsyncProducer, error)
NewAsyncProducer 创建异步生产者
func (*AsyncProducer) Send ¶
func (p *AsyncProducer) Send(ctx context.Context, msg *Message) (*ProducerResult, error)
Send 同步发送(等待结果)
func (*AsyncProducer) SendAsync ¶
func (p *AsyncProducer) SendAsync(msg *Message, callback func(*ProducerResult, error))
SendAsync 异步发送消息
func (*AsyncProducer) SendJSON ¶
func (p *AsyncProducer) SendJSON(ctx context.Context, topic string, key string, value interface{}) (*ProducerResult, error)
SendJSON 发送 JSON 消息
func (*AsyncProducer) Successes ¶
func (p *AsyncProducer) Successes() <-chan *ProducerResult
Successes 返回成功通道
type Component ¶
type Component struct {
// contains filtered or unexported fields
}
Component Kafka 组件
实现 component.Component 接口,提供 Kafka 消息队列能力 依赖:config, logger
func (*Component) GetHealthChecker ¶
func (c *Component) GetHealthChecker() component.HealthChecker
GetHealthChecker 获取健康检查器 实现 component.HealthCheckProvider 接口
type Config ¶
type Config struct {
// Brokers Kafka 集群地址列表
Brokers []string `mapstructure:"brokers"`
// Version Kafka 版本(如 "3.8.0")
Version string `mapstructure:"version"`
// ClientID 客户端标识
ClientID string `mapstructure:"client_id"`
// Producer 生产者配置
Producer ProducerConfig `mapstructure:"producer"`
// Consumer 消费者配置
Consumer ConsumerConfig `mapstructure:"consumer"`
// SASL 认证配置(可选)
SASL *SASLConfig `mapstructure:"sasl"`
// TLS 配置(可选)
TLS *TLSConfig `mapstructure:"tls"`
}
Config Kafka 配置
type ConsumedMessage ¶
type ConsumedMessage struct {
// Topic 消息来源 Topic
Topic string
// Partition 消息分区
Partition int32
// Offset 消息 Offset
Offset int64
// Key 消息键
Key []byte
// Value 消息值
Value []byte
// Headers 消息头
Headers map[string]string
// Timestamp 消息时间戳
Timestamp int64
}
ConsumedMessage 消费的消息
type Consumer ¶
type Consumer interface {
// Start 启动消费者
Start(ctx context.Context, handler MessageHandler) error
// Stop 停止消费者
Stop() error
// IsRunning 检查是否运行中
IsRunning() bool
}
Consumer Kafka 消费者接口
type ConsumerConfig ¶
type ConsumerConfig struct {
// Enabled 是否启用消费者
Enabled bool `mapstructure:"enabled"`
// GroupID 消费者组 ID
GroupID string `mapstructure:"group_id"`
// Topics 订阅的 Topic 列表
Topics []string `mapstructure:"topics"`
// OffsetInitial 初始 Offset:-1=Newest, -2=Oldest
OffsetInitial int64 `mapstructure:"offset_initial"`
// AutoCommit 是否自动提交 Offset
AutoCommit bool `mapstructure:"auto_commit"`
// AutoCommitInterval 自动提交间隔
AutoCommitInterval time.Duration `mapstructure:"auto_commit_interval"`
// SessionTimeout 会话超时
SessionTimeout time.Duration `mapstructure:"session_timeout"`
// HeartbeatInterval 心跳间隔
HeartbeatInterval time.Duration `mapstructure:"heartbeat_interval"`
// MaxProcessingTime 单条消息最大处理时间
MaxProcessingTime time.Duration `mapstructure:"max_processing_time"`
// FetchMin 最小拉取字节数
FetchMin int32 `mapstructure:"fetch_min"`
// FetchMax 最大拉取字节数
FetchMax int32 `mapstructure:"fetch_max"`
// FetchDefault 默认拉取字节数
FetchDefault int32 `mapstructure:"fetch_default"`
// RebalanceStrategy 再平衡策略:range, roundrobin, sticky
RebalanceStrategy string `mapstructure:"rebalance_strategy"`
}
ConsumerConfig 消费者配置
func (*ConsumerConfig) ApplyDefaults ¶
func (c *ConsumerConfig) ApplyDefaults()
ApplyDefaults 应用消费者默认值
type ConsumerGroup ¶
type ConsumerGroup struct {
// contains filtered or unexported fields
}
ConsumerGroup 消费者组实现
func NewConsumerGroup ¶
func NewConsumerGroup(brokers []string, cfg ConsumerConfig, saramaCfg *sarama.Config, logger *zap.Logger) (*ConsumerGroup, error)
NewConsumerGroup 创建消费者组
func (*ConsumerGroup) Start ¶
func (c *ConsumerGroup) Start(ctx context.Context, handler MessageHandler) error
Start 启动消费者
type ConsumerGroupInfo ¶
type ConsumerGroupInfo struct {
State string
ProtocolType string
Members []ConsumerGroupMember
}
ConsumerGroupInfo 消费者组信息
type ConsumerGroupMember ¶
ConsumerGroupMember 消费者组成员
type HealthChecker ¶
type HealthChecker struct {
// contains filtered or unexported fields
}
HealthChecker Kafka 健康检查器
func NewHealthChecker ¶
func NewHealthChecker(manager *Manager) *HealthChecker
NewHealthChecker 创建健康检查器
func (*HealthChecker) SetTimeout ¶
func (h *HealthChecker) SetTimeout(timeout time.Duration)
SetTimeout 设置超时时间
type KafkaMetrics ¶
type KafkaMetrics struct {
// contains filtered or unexported fields
}
KafkaMetrics implements component.MetricsProvider for Kafka instrumentation.
func NewKafkaMetrics ¶
func NewKafkaMetrics(cfg KafkaMetricsConfig) *KafkaMetrics
NewKafkaMetrics creates a new Kafka metrics provider
func (*KafkaMetrics) IsMetricsEnabled ¶
func (m *KafkaMetrics) IsMetricsEnabled() bool
IsMetricsEnabled returns whether metrics collection is enabled
func (*KafkaMetrics) IsRegistered ¶
func (m *KafkaMetrics) IsRegistered() bool
IsRegistered returns whether metrics have been registered
func (*KafkaMetrics) MetricsName ¶
func (m *KafkaMetrics) MetricsName() string
MetricsName returns the metrics group name
func (*KafkaMetrics) RecordConsume ¶
func (m *KafkaMetrics) RecordConsume(ctx context.Context, topic, group string, partition int32, duration time.Duration, err error)
RecordConsume records a message consumption
func (*KafkaMetrics) RecordProduce ¶
func (m *KafkaMetrics) RecordProduce(ctx context.Context, topic string, partition int32, duration time.Duration, err error)
RecordProduce records a message production
func (*KafkaMetrics) RegisterLagCallback ¶
func (m *KafkaMetrics) RegisterLagCallback(group string, callback func() int64)
RegisterLagCallback registers a lag callback for a consumer group
func (*KafkaMetrics) RegisterMetrics ¶
func (m *KafkaMetrics) RegisterMetrics(meter metric.Meter) error
RegisterMetrics registers all Kafka metrics with the provided Meter
func (*KafkaMetrics) UnregisterLagCallback ¶
func (m *KafkaMetrics) UnregisterLagCallback(group string)
UnregisterLagCallback removes a lag callback
type KafkaMetricsConfig ¶
KafkaMetricsConfig holds configuration for Kafka metrics
type Manager ¶
type Manager struct {
// contains filtered or unexported fields
}
Manager Kafka 管理器
func NewManager ¶
NewManager 创建 Kafka 管理器
func (*Manager) CreateConsumer ¶
func (m *Manager) CreateConsumer(name string, cfg ConsumerConfig) (*ConsumerGroup, error)
CreateConsumer 创建消费者
func (*Manager) CreateTopic ¶
func (m *Manager) CreateTopic(ctx context.Context, name string, partitions int32, replication int16) error
CreateTopic 创建 Topic
func (*Manager) DeleteTopic ¶
DeleteTopic 删除 Topic
func (*Manager) DescribeConsumerGroup ¶
func (m *Manager) DescribeConsumerGroup(ctx context.Context, groupID string) (*ConsumerGroupInfo, error)
DescribeConsumerGroup 获取消费者组详情
func (*Manager) DescribeTopic ¶
DescribeTopic 获取 Topic 详情
func (*Manager) GetAsyncProducer ¶
func (m *Manager) GetAsyncProducer() (*AsyncProducer, error)
GetAsyncProducer 获取异步生产者(按需创建)
func (*Manager) GetConsumer ¶
func (m *Manager) GetConsumer(name string) *ConsumerGroup
GetConsumer 获取消费者
func (*Manager) ListConsumerGroups ¶
ListConsumerGroups 列出所有消费者组
func (*Manager) ListTopics ¶
ListTopics 列出所有 Topic
type Message ¶
type Message struct {
// Topic 目标 Topic
Topic string
// Key 消息键(用于分区)
Key []byte
// Value 消息值
Value []byte
// Headers 消息头
Headers map[string]string
// Partition 指定分区(-1 表示自动分配)
Partition int32
// Timestamp 消息时间戳
Timestamp time.Time
}
Message 消息结构
type MessageHandler ¶
type MessageHandler func(ctx context.Context, msg *ConsumedMessage) error
MessageHandler 消息处理函数
type PartitionInfo ¶
PartitionInfo 分区信息
type Producer ¶
type Producer interface {
// Send 同步发送消息
Send(ctx context.Context, msg *Message) (*ProducerResult, error)
// SendAsync 异步发送消息
SendAsync(msg *Message, callback func(*ProducerResult, error))
// SendJSON 发送 JSON 消息
SendJSON(ctx context.Context, topic string, key string, value interface{}) (*ProducerResult, error)
// Close 关闭生产者
Close() error
}
Producer Kafka 生产者接口
type ProducerConfig ¶
type ProducerConfig struct {
// Enabled 是否启用生产者
Enabled bool `mapstructure:"enabled"`
// RequiredAcks 确认级别:0=NoResponse, 1=WaitForLocal, -1=WaitForAll
RequiredAcks int `mapstructure:"required_acks"`
// Timeout 生产超时时间
Timeout time.Duration `mapstructure:"timeout"`
// RetryMax 最大重试次数
RetryMax int `mapstructure:"retry_max"`
// RetryBackoff 重试间隔
RetryBackoff time.Duration `mapstructure:"retry_backoff"`
// MaxMessageBytes 单条消息最大字节数
MaxMessageBytes int `mapstructure:"max_message_bytes"`
// Compression 压缩算法:none, gzip, snappy, lz4, zstd
Compression string `mapstructure:"compression"`
// Idempotent 是否启用幂等生产者
Idempotent bool `mapstructure:"idempotent"`
// BatchSize 批量发送大小
BatchSize int `mapstructure:"batch_size"`
// FlushFrequency 刷新频率
FlushFrequency time.Duration `mapstructure:"flush_frequency"`
}
ProducerConfig 生产者配置
func (*ProducerConfig) ApplyDefaults ¶
func (c *ProducerConfig) ApplyDefaults()
ApplyDefaults 应用生产者默认值
type ProducerResult ¶
type ProducerResult struct {
// Topic 发送的 Topic
Topic string
// Partition 发送的分区
Partition int32
// Offset 消息 Offset
Offset int64
// Timestamp 服务端时间戳
Timestamp time.Time
}
ProducerResult 发送结果
type SASLConfig ¶
type SASLConfig struct {
// Enabled 是否启用
Enabled bool `mapstructure:"enabled"`
// Mechanism 认证机制:PLAIN, SCRAM-SHA-256, SCRAM-SHA-512
Mechanism string `mapstructure:"mechanism"`
// Username 用户名
Username string `mapstructure:"username"`
// Password 密码
Password string `mapstructure:"password"`
}
SASLConfig SASL 认证配置
type SimpleConsumer ¶
type SimpleConsumer struct {
// contains filtered or unexported fields
}
SimpleConsumer 简单消费者(单 Topic,单分区)
func NewSimpleConsumer ¶
func NewSimpleConsumer(brokers []string, saramaCfg *sarama.Config, logger *zap.Logger) (*SimpleConsumer, error)
NewSimpleConsumer 创建简单消费者
func (*SimpleConsumer) ConsumePartition ¶
func (c *SimpleConsumer) ConsumePartition(ctx context.Context, topic string, partition int32, offset int64, handler MessageHandler) error
ConsumePartition 消费指定分区
type SyncProducer ¶
type SyncProducer struct {
// contains filtered or unexported fields
}
SyncProducer 同步生产者实现
func NewSyncProducer ¶
func NewSyncProducer(brokers []string, cfg ProducerConfig, saramaCfg *sarama.Config, logger *zap.Logger) (*SyncProducer, error)
NewSyncProducer 创建同步生产者
func (*SyncProducer) Send ¶
func (p *SyncProducer) Send(ctx context.Context, msg *Message) (*ProducerResult, error)
Send 同步发送消息
func (*SyncProducer) SendAsync ¶
func (p *SyncProducer) SendAsync(msg *Message, callback func(*ProducerResult, error))
SendAsync 异步发送(同步生产者模拟异步)
func (*SyncProducer) SendJSON ¶
func (p *SyncProducer) SendJSON(ctx context.Context, topic string, key string, value interface{}) (*ProducerResult, error)
SendJSON 发送 JSON 消息
type TLSConfig ¶
type TLSConfig struct {
// Enabled 是否启用 TLS
Enabled bool `mapstructure:"enabled"`
// CertFile 证书文件路径
CertFile string `mapstructure:"cert_file"`
// KeyFile 密钥文件路径
KeyFile string `mapstructure:"key_file"`
// CAFile CA 证书文件路径
CAFile string `mapstructure:"ca_file"`
// InsecureSkipVerify 是否跳过证书验证
InsecureSkipVerify bool `mapstructure:"insecure_skip_verify"`
}
TLSConfig TLS 配置
type TopicInfo ¶
type TopicInfo struct {
NumPartitions int32
ReplicationFactor int16
Partitions []PartitionInfo
}
TopicInfo Topic 信息
type XDGSCRAMClient ¶
type XDGSCRAMClient struct {
*scram.Client
*scram.ClientConversation
HashGeneratorFcn scram.HashGeneratorFcn
}
XDGSCRAMClient SCRAM 客户端实现
func (*XDGSCRAMClient) Begin ¶
func (x *XDGSCRAMClient) Begin(userName, password, authzID string) (err error)
Begin 开始 SCRAM 认证