Documentation
¶
Index ¶
- Constants
- func MustProtoMessage[T proto.Message]() T
- func NewMessageFromProto(event proto.Message, metadata metadatapkg.Metadata) (*message.Message, error)
- func NewProtoMessage[T proto.Message]() (T, error)
- func PublishProto(ctx context.Context, publisher message.Publisher, topic string, ...) error
- func RegisterJSONHandler[T any, O any](svc *Service, cfg handlerpkg.JSONHandlerRegistration[T, O]) error
- func RegisterMessageHandler(svc *Service, cfg MessageHandlerRegistration) error
- func RegisterProtoHandler[T proto.Message](svc *Service, cfg handlerpkg.ProtoHandlerRegistration[T]) error
- type BacklogMetrics
- type DependencyHealth
- type ErrorBreakdown
- type ErrorCategory
- type ErrorClassifier
- type HandlerInfo
- type HandlerStats
- type LatencyMetrics
- type MessageHandlerRegistration
- type MiddlewareBuilder
- type MiddlewareRegistration
- func CorrelationIDMiddleware() MiddlewareRegistration
- func DefaultMiddlewares() []MiddlewareRegistration
- func LogMessagesMiddleware(logger loggingpkg.ServiceLogger) MiddlewareRegistration
- func MetricsMiddleware() MiddlewareRegistration
- func OutboxMiddleware() MiddlewareRegistration
- func PoisonQueueMiddleware(filter func(error) bool) MiddlewareRegistration
- func ProtoValidateMiddleware() MiddlewareRegistration
- func RecovererMiddleware() MiddlewareRegistration
- func RetryMiddleware(cfg RetryMiddlewareConfig) MiddlewareRegistration
- func TracerMiddleware() MiddlewareRegistration
- type OutboxStore
- type Producer
- type ProtoValidator
- type ResourceUsage
- type RetryMiddlewareConfig
- type Service
- func (s *Service) PublishProto(ctx context.Context, topic string, event proto.Message, ...) error
- func (s *Service) RegisterHTTPHandler(port int, pattern string, handler http.Handler)
- func (s *Service) RegisterMiddleware(cfg MiddlewareRegistration) error
- func (s *Service) RegisterProtoMessage(msg proto.Message)
- func (s *Service) Start(ctx context.Context) error
- func (s *Service) StartWebUIServer()
- type ServiceDependencies
- type ThroughputMetrics
- type UnprocessableEventError
Constants ¶
const ( DependencyStatusUnknown = "unknown" DependencyStatusHealthy = "healthy" DependencyStatusDegraded = "degraded" )
Variables ¶
This section is empty.
Functions ¶
func MustProtoMessage ¶
MustProtoMessage instantiates the protobuf message and panics if the type cannot be created.
func NewMessageFromProto ¶
func NewMessageFromProto(event proto.Message, metadata metadatapkg.Metadata) (*message.Message, error)
NewMessageFromProto converts the provided proto message into a Watermill message with the standard metadata required by the event pipeline.
func NewProtoMessage ¶
NewProtoMessage instantiates a zero-value protobuf message for the provided generic type.
func PublishProto ¶
func PublishProto(ctx context.Context, publisher message.Publisher, topic string, event proto.Message, metadata metadatapkg.Metadata) error
PublishProto marshals the proto payload and publishes it to the provided topic.
func RegisterJSONHandler ¶
func RegisterJSONHandler[T any, O any](svc *Service, cfg handlerpkg.JSONHandlerRegistration[T, O]) error
RegisterJSONHandler converts the typed JSON handler into a Watermill handler and registers it.
func RegisterMessageHandler ¶
func RegisterMessageHandler(svc *Service, cfg MessageHandlerRegistration) error
RegisterMessageHandler attaches the provided handler to the service router.
func RegisterProtoHandler ¶
func RegisterProtoHandler[T proto.Message](svc *Service, cfg handlerpkg.ProtoHandlerRegistration[T]) error
RegisterProtoHandler converts the typed handler into a Watermill handler and registers it on the Service router.
Types ¶
type BacklogMetrics ¶
type DependencyHealth ¶
type ErrorBreakdown ¶
type ErrorBreakdown struct {
Validation uint64 `json:"validation"`
Transport uint64 `json:"transport"`
Downstream uint64 `json:"downstream"`
Other uint64 `json:"other"`
LastError string `json:"last_error,omitempty"`
}
func (*ErrorBreakdown) Record ¶
func (e *ErrorBreakdown) Record(category ErrorCategory, err error)
type ErrorCategory ¶
type ErrorCategory string
const ( ErrorCategoryNone ErrorCategory = "none" ErrorCategoryValidation ErrorCategory = "validation" ErrorCategoryTransport ErrorCategory = "transport" ErrorCategoryDownstream ErrorCategory = "downstream" ErrorCategoryOther ErrorCategory = "other" )
type ErrorClassifier ¶
type ErrorClassifier func(error) ErrorCategory
type HandlerInfo ¶
type HandlerInfo struct {
Name string `json:"name"`
ConsumeQueue string `json:"consume_queue"`
PublishQueue string `json:"publish_queue"`
Stats *HandlerStats `json:"stats"`
}
type HandlerStats ¶
type HandlerStats struct {
MessagesProcessed uint64 `json:"messages_processed"`
MessagesFailed uint64 `json:"messages_failed"`
TotalProcessingTime int64 `json:"total_processing_time_ns"`
LastProcessedAt time.Time `json:"last_processed_at"`
Latency LatencyMetrics `json:"latency"`
Throughput ThroughputMetrics `json:"throughput"`
Errors ErrorBreakdown `json:"errors"`
Resource ResourceUsage `json:"resource"`
Backlog BacklogMetrics `json:"backlog"`
Dependencies []DependencyHealth `json:"dependencies"`
// contains filtered or unexported fields
}
func (*HandlerStats) MarshalJSON ¶
func (h *HandlerStats) MarshalJSON() ([]byte, error)
type LatencyMetrics ¶
type MessageHandlerRegistration ¶
type MessageHandlerRegistration struct {
Name string
ConsumeQueue string
PublishQueue string
Handler message.HandlerFunc
Subscriber message.Subscriber
Publisher message.Publisher
}
MessageHandlerRegistration wires a raw Watermill handler without typed helpers.
type MiddlewareBuilder ¶
type MiddlewareBuilder func(*Service) (message.HandlerMiddleware, error)
MiddlewareBuilder constructs a handler middleware using the provided service instance.
type MiddlewareRegistration ¶
type MiddlewareRegistration struct {
Name string
Middleware message.HandlerMiddleware
Builder MiddlewareBuilder
}
MiddlewareRegistration captures how a middleware should be registered on a Service router.
func CorrelationIDMiddleware ¶
func CorrelationIDMiddleware() MiddlewareRegistration
CorrelationIDMiddleware ensures each processed message carries a correlation identifier.
func DefaultMiddlewares ¶
func DefaultMiddlewares() []MiddlewareRegistration
DefaultMiddlewares returns the standard middleware chain used by the Service constructor.
func LogMessagesMiddleware ¶
func LogMessagesMiddleware(logger loggingpkg.ServiceLogger) MiddlewareRegistration
LogMessagesMiddleware logs the full payload and metadata of handled messages.
func MetricsMiddleware ¶
func MetricsMiddleware() MiddlewareRegistration
MetricsMiddleware adds Prometheus metrics to the handler.
func OutboxMiddleware ¶
func OutboxMiddleware() MiddlewareRegistration
OutboxMiddleware persists outgoing messages when an OutboxStore is configured.
func PoisonQueueMiddleware ¶
func PoisonQueueMiddleware(filter func(error) bool) MiddlewareRegistration
PoisonQueueMiddleware publishes messages that match the supplied filter to the configured poison queue.
func ProtoValidateMiddleware ¶
func ProtoValidateMiddleware() MiddlewareRegistration
ProtoValidateMiddleware unmarshals and validates protobuf payloads when possible.
func RecovererMiddleware ¶
func RecovererMiddleware() MiddlewareRegistration
RecovererMiddleware converts panics into handler errors so they can be retried or sent to the poison queue.
func RetryMiddleware ¶
func RetryMiddleware(cfg RetryMiddlewareConfig) MiddlewareRegistration
RetryMiddleware retries handler execution using the provided configuration (defaults applied to zero values).
func TracerMiddleware ¶
func TracerMiddleware() MiddlewareRegistration
TracerMiddleware wraps handler execution in an OpenTelemetry span.
type OutboxStore ¶
type OutboxStore interface {
StoreOutgoingMessage(ctx context.Context, eventType, uuid, payload string) error
}
OutboxStore persists processed messages so they can be forwarded reliably.
type Producer ¶
type Producer interface {
PublishProto(ctx context.Context, topic string, event proto.Message, metadata metadatapkg.Metadata) error
}
Producer emits proto-based events onto the configured transport.
type ProtoValidator ¶
ProtoValidator validates unmarshalled payloads. Implementations typically forward to protovalidate or a custom struct validator.
type ResourceUsage ¶
type RetryMiddlewareConfig ¶
type RetryMiddlewareConfig struct {
MaxRetries int
InitialInterval time.Duration
MaxInterval time.Duration
RetryIf func(error) bool
}
RetryMiddlewareConfig customises the retry middleware behaviour.
type Service ¶
type Service struct {
Conf *configpkg.Config
Logger loggingpkg.ServiceLogger
// contains filtered or unexported fields
}
Service wires a Watermill router, publisher, subscriber, and middleware chain.
func NewService ¶
func NewService(conf *configpkg.Config, log loggingpkg.ServiceLogger, ctx context.Context, deps ServiceDependencies) *Service
NewService constructs a Service for the supplied configuration. Register handlers on the returned Service before calling Start.
func (*Service) PublishProto ¶
func (s *Service) PublishProto(ctx context.Context, topic string, event proto.Message, metadata metadatapkg.Metadata) error
PublishProto emits the event using the Service publisher so HTTP handlers can create events without touching the internal Watermill APIs directly.
func (*Service) RegisterHTTPHandler ¶
func (*Service) RegisterMiddleware ¶
func (s *Service) RegisterMiddleware(cfg MiddlewareRegistration) error
RegisterMiddleware attaches the supplied middleware to the router.
func (*Service) RegisterProtoMessage ¶
RegisterProtoMessage exposes a proto message type for validation without registering a handler.
func (*Service) Start ¶
Start runs the underlying Watermill router until the provided context is cancelled.
func (*Service) StartWebUIServer ¶
func (s *Service) StartWebUIServer()
type ServiceDependencies ¶
type ServiceDependencies struct {
Outbox OutboxStore
Validator ProtoValidator
Middlewares []MiddlewareRegistration // Appended after the default middleware chain.
DisableDefaultMiddlewares bool // Skips registering the default middleware chain when true.
TransportFactory transportpkg.Factory
ErrorClassifier ErrorClassifier
}
ServiceDependencies holds the optional collaborators that the Service can use. Leave fields nil to skip the related middleware.
type ThroughputMetrics ¶
type UnprocessableEventError ¶
type UnprocessableEventError struct {
// contains filtered or unexported fields
}
UnprocessableEventError wraps payloads that failed validation or unmarshalling.
func (*UnprocessableEventError) Error ¶
func (e *UnprocessableEventError) Error() string