kafka

package
v0.0.0-...-66d340f Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 15, 2026 License: MIT Imports: 16 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

View Source
var SHA256 scram.HashGeneratorFcn = func() hash.Hash { return sha256.New() }

SHA256 SCRAM-SHA-256 哈希函数

View Source
var SHA512 scram.HashGeneratorFcn = func() hash.Hash { return sha512.New() }

SHA512 SCRAM-SHA-512 哈希函数

Functions

This section is empty.

Types

type AsyncProducer

type AsyncProducer struct {
	// contains filtered or unexported fields
}

AsyncProducer 异步生产者实现

func NewAsyncProducer

func NewAsyncProducer(brokers []string, cfg ProducerConfig, saramaCfg *sarama.Config, logger *zap.Logger) (*AsyncProducer, error)

NewAsyncProducer 创建异步生产者

func (*AsyncProducer) Close

func (p *AsyncProducer) Close() error

Close 关闭生产者

func (*AsyncProducer) Errors

func (p *AsyncProducer) Errors() <-chan error

Errors 返回错误通道

func (*AsyncProducer) Send

func (p *AsyncProducer) Send(ctx context.Context, msg *Message) (*ProducerResult, error)

Send 同步发送(等待结果)

func (*AsyncProducer) SendAsync

func (p *AsyncProducer) SendAsync(msg *Message, callback func(*ProducerResult, error))

SendAsync 异步发送消息

func (*AsyncProducer) SendJSON

func (p *AsyncProducer) SendJSON(ctx context.Context, topic string, key string, value interface{}) (*ProducerResult, error)

SendJSON 发送 JSON 消息

func (*AsyncProducer) Successes

func (p *AsyncProducer) Successes() <-chan *ProducerResult

Successes 返回成功通道

type Component

type Component struct {
	// contains filtered or unexported fields
}

Component Kafka 组件

实现 component.Component 接口,提供 Kafka 消息队列能力 依赖:config, logger

func NewComponent

func NewComponent() *Component

NewComponent 创建 Kafka 组件

func (*Component) DependsOn

func (c *Component) DependsOn() []string

DependsOn Kafka 组件依赖配置和日志组件

func (*Component) GetHealthChecker

func (c *Component) GetHealthChecker() component.HealthChecker

GetHealthChecker 获取健康检查器 实现 component.HealthCheckProvider 接口

func (*Component) GetManager

func (c *Component) GetManager() *Manager

GetManager 获取 Kafka 管理器

func (*Component) Init

func (c *Component) Init(ctx context.Context, loader component.ConfigLoader) error

Init 初始化 Kafka 管理器

func (*Component) Name

func (c *Component) Name() string

Name 组件名称

func (*Component) Start

func (c *Component) Start(ctx context.Context) error

Start 启动 Kafka 组件(连接 Kafka)

func (*Component) Stop

func (c *Component) Stop(ctx context.Context) error

Stop 停止 Kafka 组件(关闭连接)

type Config

type Config struct {
	// Brokers Kafka 集群地址列表
	Brokers []string `mapstructure:"brokers"`

	// Version Kafka 版本(如 "3.8.0")
	Version string `mapstructure:"version"`

	// ClientID 客户端标识
	ClientID string `mapstructure:"client_id"`

	// Producer 生产者配置
	Producer ProducerConfig `mapstructure:"producer"`

	// Consumer 消费者配置
	Consumer ConsumerConfig `mapstructure:"consumer"`

	// SASL 认证配置(可选)
	SASL *SASLConfig `mapstructure:"sasl"`

	// TLS 配置(可选)
	TLS *TLSConfig `mapstructure:"tls"`
}

Config Kafka 配置

func (*Config) ApplyDefaults

func (c *Config) ApplyDefaults()

ApplyDefaults 应用默认值

func (*Config) Validate

func (c *Config) Validate() error

Validate 验证配置

type ConsumedMessage

type ConsumedMessage struct {
	// Topic 消息来源 Topic
	Topic string

	// Partition 消息分区
	Partition int32

	// Offset 消息 Offset
	Offset int64

	// Key 消息键
	Key []byte

	// Value 消息值
	Value []byte

	// Headers 消息头
	Headers map[string]string

	// Timestamp 消息时间戳
	Timestamp int64
}

ConsumedMessage 消费的消息

type Consumer

type Consumer interface {
	// Start 启动消费者
	Start(ctx context.Context, handler MessageHandler) error

	// Stop 停止消费者
	Stop() error

	// IsRunning 检查是否运行中
	IsRunning() bool
}

Consumer Kafka 消费者接口

type ConsumerConfig

type ConsumerConfig struct {
	// Enabled 是否启用消费者
	Enabled bool `mapstructure:"enabled"`

	// GroupID 消费者组 ID
	GroupID string `mapstructure:"group_id"`

	// Topics 订阅的 Topic 列表
	Topics []string `mapstructure:"topics"`

	// OffsetInitial 初始 Offset:-1=Newest, -2=Oldest
	OffsetInitial int64 `mapstructure:"offset_initial"`

	// AutoCommit 是否自动提交 Offset
	AutoCommit bool `mapstructure:"auto_commit"`

	// AutoCommitInterval 自动提交间隔
	AutoCommitInterval time.Duration `mapstructure:"auto_commit_interval"`

	// SessionTimeout 会话超时
	SessionTimeout time.Duration `mapstructure:"session_timeout"`

	// HeartbeatInterval 心跳间隔
	HeartbeatInterval time.Duration `mapstructure:"heartbeat_interval"`

	// MaxProcessingTime 单条消息最大处理时间
	MaxProcessingTime time.Duration `mapstructure:"max_processing_time"`

	// FetchMin 最小拉取字节数
	FetchMin int32 `mapstructure:"fetch_min"`

	// FetchMax 最大拉取字节数
	FetchMax int32 `mapstructure:"fetch_max"`

	// FetchDefault 默认拉取字节数
	FetchDefault int32 `mapstructure:"fetch_default"`

	// RebalanceStrategy 再平衡策略:range, roundrobin, sticky
	RebalanceStrategy string `mapstructure:"rebalance_strategy"`
}

ConsumerConfig 消费者配置

func (*ConsumerConfig) ApplyDefaults

func (c *ConsumerConfig) ApplyDefaults()

ApplyDefaults 应用消费者默认值

func (*ConsumerConfig) Validate

func (c *ConsumerConfig) Validate() error

Validate 验证消费者配置

type ConsumerGroup

type ConsumerGroup struct {
	// contains filtered or unexported fields
}

ConsumerGroup 消费者组实现

func NewConsumerGroup

func NewConsumerGroup(brokers []string, cfg ConsumerConfig, saramaCfg *sarama.Config, logger *zap.Logger) (*ConsumerGroup, error)

NewConsumerGroup 创建消费者组

func (*ConsumerGroup) IsRunning

func (c *ConsumerGroup) IsRunning() bool

IsRunning 检查是否运行中

func (*ConsumerGroup) Start

func (c *ConsumerGroup) Start(ctx context.Context, handler MessageHandler) error

Start 启动消费者

func (*ConsumerGroup) Stop

func (c *ConsumerGroup) Stop() error

Stop 停止消费者

type ConsumerGroupInfo

type ConsumerGroupInfo struct {
	State        string
	ProtocolType string
	Members      []ConsumerGroupMember
}

ConsumerGroupInfo 消费者组信息

type ConsumerGroupMember

type ConsumerGroupMember struct {
	MemberID   string
	ClientID   string
	ClientHost string
}

ConsumerGroupMember 消费者组成员

type HealthChecker

type HealthChecker struct {
	// contains filtered or unexported fields
}

HealthChecker Kafka 健康检查器

func NewHealthChecker

func NewHealthChecker(manager *Manager) *HealthChecker

NewHealthChecker 创建健康检查器

func (*HealthChecker) Check

func (h *HealthChecker) Check(ctx context.Context) error

Check 执行健康检查

func (*HealthChecker) Name

func (h *HealthChecker) Name() string

Name 返回检查项名称

func (*HealthChecker) SetTimeout

func (h *HealthChecker) SetTimeout(timeout time.Duration)

SetTimeout 设置超时时间

type KafkaMetrics

type KafkaMetrics struct {
	// contains filtered or unexported fields
}

KafkaMetrics implements component.MetricsProvider for Kafka instrumentation.

func NewKafkaMetrics

func NewKafkaMetrics(cfg KafkaMetricsConfig) *KafkaMetrics

NewKafkaMetrics creates a new Kafka metrics provider

func (*KafkaMetrics) IsMetricsEnabled

func (m *KafkaMetrics) IsMetricsEnabled() bool

IsMetricsEnabled returns whether metrics collection is enabled

func (*KafkaMetrics) IsRegistered

func (m *KafkaMetrics) IsRegistered() bool

IsRegistered returns whether metrics have been registered

func (*KafkaMetrics) MetricsName

func (m *KafkaMetrics) MetricsName() string

MetricsName returns the metrics group name

func (*KafkaMetrics) RecordConsume

func (m *KafkaMetrics) RecordConsume(ctx context.Context, topic, group string, partition int32, duration time.Duration, err error)

RecordConsume records a message consumption

func (*KafkaMetrics) RecordProduce

func (m *KafkaMetrics) RecordProduce(ctx context.Context, topic string, partition int32, duration time.Duration, err error)

RecordProduce records a message production

func (*KafkaMetrics) RegisterLagCallback

func (m *KafkaMetrics) RegisterLagCallback(group string, callback func() int64)

RegisterLagCallback registers a lag callback for a consumer group

func (*KafkaMetrics) RegisterMetrics

func (m *KafkaMetrics) RegisterMetrics(meter metric.Meter) error

RegisterMetrics registers all Kafka metrics with the provided Meter

func (*KafkaMetrics) UnregisterLagCallback

func (m *KafkaMetrics) UnregisterLagCallback(group string)

UnregisterLagCallback removes a lag callback

type KafkaMetricsConfig

type KafkaMetricsConfig struct {
	Enabled         bool
	RecordLag       bool
	RecordBatchSize bool
}

KafkaMetricsConfig holds configuration for Kafka metrics

type Manager

type Manager struct {
	// contains filtered or unexported fields
}

Manager Kafka 管理器

func NewManager

func NewManager(cfg Config, logger *zap.Logger) (*Manager, error)

NewManager 创建 Kafka 管理器

func (*Manager) Close

func (m *Manager) Close() error

Close 关闭管理器

func (*Manager) Connect

func (m *Manager) Connect(ctx context.Context) error

Connect 连接 Kafka

func (*Manager) CreateConsumer

func (m *Manager) CreateConsumer(name string, cfg ConsumerConfig) (*ConsumerGroup, error)

CreateConsumer 创建消费者

func (*Manager) CreateTopic

func (m *Manager) CreateTopic(ctx context.Context, name string, partitions int32, replication int16) error

CreateTopic 创建 Topic

func (*Manager) DeleteTopic

func (m *Manager) DeleteTopic(ctx context.Context, name string) error

DeleteTopic 删除 Topic

func (*Manager) DescribeConsumerGroup

func (m *Manager) DescribeConsumerGroup(ctx context.Context, groupID string) (*ConsumerGroupInfo, error)

DescribeConsumerGroup 获取消费者组详情

func (*Manager) DescribeTopic

func (m *Manager) DescribeTopic(ctx context.Context, name string) (*TopicInfo, error)

DescribeTopic 获取 Topic 详情

func (*Manager) GetAsyncProducer

func (m *Manager) GetAsyncProducer() (*AsyncProducer, error)

GetAsyncProducer 获取异步生产者(按需创建)

func (*Manager) GetConfig

func (m *Manager) GetConfig() Config

GetConfig 获取配置

func (*Manager) GetConsumer

func (m *Manager) GetConsumer(name string) *ConsumerGroup

GetConsumer 获取消费者

func (*Manager) GetOffset

func (m *Manager) GetOffset(ctx context.Context, groupID, topic string) (map[int32]int64, error)

GetOffset 获取消费者组的 Offset

func (*Manager) GetProducer

func (m *Manager) GetProducer() Producer

GetProducer 获取生产者

func (*Manager) ListConsumerGroups

func (m *Manager) ListConsumerGroups(ctx context.Context) ([]string, error)

ListConsumerGroups 列出所有消费者组

func (*Manager) ListTopics

func (m *Manager) ListTopics(ctx context.Context) ([]string, error)

ListTopics 列出所有 Topic

func (*Manager) Ping

func (m *Manager) Ping(ctx context.Context) error

Ping 检查 Kafka 连接

func (*Manager) ResetOffset

func (m *Manager) ResetOffset(ctx context.Context, groupID, topic string, offset int64) error

ResetOffset 重置消费者组的 Offset

type Message

type Message struct {
	// Topic 目标 Topic
	Topic string

	// Key 消息键(用于分区)
	Key []byte

	// Value 消息值
	Value []byte

	// Headers 消息头
	Headers map[string]string

	// Partition 指定分区(-1 表示自动分配)
	Partition int32

	// Timestamp 消息时间戳
	Timestamp time.Time
}

Message 消息结构

type MessageHandler

type MessageHandler func(ctx context.Context, msg *ConsumedMessage) error

MessageHandler 消息处理函数

type PartitionInfo

type PartitionInfo struct {
	ID       int32
	Leader   int32
	Replicas []int32
	ISR      []int32
}

PartitionInfo 分区信息

type Producer

type Producer interface {
	// Send 同步发送消息
	Send(ctx context.Context, msg *Message) (*ProducerResult, error)

	// SendAsync 异步发送消息
	SendAsync(msg *Message, callback func(*ProducerResult, error))

	// SendJSON 发送 JSON 消息
	SendJSON(ctx context.Context, topic string, key string, value interface{}) (*ProducerResult, error)

	// Close 关闭生产者
	Close() error
}

Producer Kafka 生产者接口

type ProducerConfig

type ProducerConfig struct {
	// Enabled 是否启用生产者
	Enabled bool `mapstructure:"enabled"`

	// RequiredAcks 确认级别:0=NoResponse, 1=WaitForLocal, -1=WaitForAll
	RequiredAcks int `mapstructure:"required_acks"`

	// Timeout 生产超时时间
	Timeout time.Duration `mapstructure:"timeout"`

	// RetryMax 最大重试次数
	RetryMax int `mapstructure:"retry_max"`

	// RetryBackoff 重试间隔
	RetryBackoff time.Duration `mapstructure:"retry_backoff"`

	// MaxMessageBytes 单条消息最大字节数
	MaxMessageBytes int `mapstructure:"max_message_bytes"`

	// Compression 压缩算法:none, gzip, snappy, lz4, zstd
	Compression string `mapstructure:"compression"`

	// Idempotent 是否启用幂等生产者
	Idempotent bool `mapstructure:"idempotent"`

	// BatchSize 批量发送大小
	BatchSize int `mapstructure:"batch_size"`

	// FlushFrequency 刷新频率
	FlushFrequency time.Duration `mapstructure:"flush_frequency"`
}

ProducerConfig 生产者配置

func (*ProducerConfig) ApplyDefaults

func (c *ProducerConfig) ApplyDefaults()

ApplyDefaults 应用生产者默认值

func (*ProducerConfig) Validate

func (c *ProducerConfig) Validate() error

Validate 验证生产者配置

type ProducerResult

type ProducerResult struct {
	// Topic 发送的 Topic
	Topic string

	// Partition 发送的分区
	Partition int32

	// Offset 消息 Offset
	Offset int64

	// Timestamp 服务端时间戳
	Timestamp time.Time
}

ProducerResult 发送结果

type SASLConfig

type SASLConfig struct {
	// Enabled 是否启用
	Enabled bool `mapstructure:"enabled"`

	// Mechanism 认证机制:PLAIN, SCRAM-SHA-256, SCRAM-SHA-512
	Mechanism string `mapstructure:"mechanism"`

	// Username 用户名
	Username string `mapstructure:"username"`

	// Password 密码
	Password string `mapstructure:"password"`
}

SASLConfig SASL 认证配置

func (*SASLConfig) Validate

func (c *SASLConfig) Validate() error

Validate 验证 SASL 配置

type SimpleConsumer

type SimpleConsumer struct {
	// contains filtered or unexported fields
}

SimpleConsumer 简单消费者(单 Topic,单分区)

func NewSimpleConsumer

func NewSimpleConsumer(brokers []string, saramaCfg *sarama.Config, logger *zap.Logger) (*SimpleConsumer, error)

NewSimpleConsumer 创建简单消费者

func (*SimpleConsumer) ConsumePartition

func (c *SimpleConsumer) ConsumePartition(ctx context.Context, topic string, partition int32, offset int64, handler MessageHandler) error

ConsumePartition 消费指定分区

func (*SimpleConsumer) IsRunning

func (c *SimpleConsumer) IsRunning() bool

IsRunning 检查是否运行中

func (*SimpleConsumer) Stop

func (c *SimpleConsumer) Stop() error

Stop 停止消费者

type SyncProducer

type SyncProducer struct {
	// contains filtered or unexported fields
}

SyncProducer 同步生产者实现

func NewSyncProducer

func NewSyncProducer(brokers []string, cfg ProducerConfig, saramaCfg *sarama.Config, logger *zap.Logger) (*SyncProducer, error)

NewSyncProducer 创建同步生产者

func (*SyncProducer) Close

func (p *SyncProducer) Close() error

Close 关闭生产者

func (*SyncProducer) Send

func (p *SyncProducer) Send(ctx context.Context, msg *Message) (*ProducerResult, error)

Send 同步发送消息

func (*SyncProducer) SendAsync

func (p *SyncProducer) SendAsync(msg *Message, callback func(*ProducerResult, error))

SendAsync 异步发送(同步生产者模拟异步)

func (*SyncProducer) SendJSON

func (p *SyncProducer) SendJSON(ctx context.Context, topic string, key string, value interface{}) (*ProducerResult, error)

SendJSON 发送 JSON 消息

type TLSConfig

type TLSConfig struct {
	// Enabled 是否启用 TLS
	Enabled bool `mapstructure:"enabled"`

	// CertFile 证书文件路径
	CertFile string `mapstructure:"cert_file"`

	// KeyFile 密钥文件路径
	KeyFile string `mapstructure:"key_file"`

	// CAFile CA 证书文件路径
	CAFile string `mapstructure:"ca_file"`

	// InsecureSkipVerify 是否跳过证书验证
	InsecureSkipVerify bool `mapstructure:"insecure_skip_verify"`
}

TLSConfig TLS 配置

type TopicInfo

type TopicInfo struct {
	NumPartitions     int32
	ReplicationFactor int16
	Partitions        []PartitionInfo
}

TopicInfo Topic 信息

type XDGSCRAMClient

type XDGSCRAMClient struct {
	*scram.Client
	*scram.ClientConversation
	HashGeneratorFcn scram.HashGeneratorFcn
}

XDGSCRAMClient SCRAM 客户端实现

func (*XDGSCRAMClient) Begin

func (x *XDGSCRAMClient) Begin(userName, password, authzID string) (err error)

Begin 开始 SCRAM 认证

func (*XDGSCRAMClient) Done

func (x *XDGSCRAMClient) Done() bool

Done 检查认证是否完成

func (*XDGSCRAMClient) Step

func (x *XDGSCRAMClient) Step(challenge string) (response string, err error)

Step 执行认证步骤

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL