runtime

package
v0.1.5 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Nov 26, 2025 License: MIT Imports: 30 Imported by: 0

Documentation

Index

Constants

View Source
const (
	DependencyStatusUnknown  = "unknown"
	DependencyStatusHealthy  = "healthy"
	DependencyStatusDegraded = "degraded"
)

Variables

This section is empty.

Functions

func MustProtoMessage

func MustProtoMessage[T proto.Message]() T

MustProtoMessage instantiates the protobuf message and panics if the type cannot be created.

func NewMessageFromProto

func NewMessageFromProto(event proto.Message, metadata metadatapkg.Metadata) (*message.Message, error)

NewMessageFromProto converts the provided proto message into a Watermill message with the standard metadata required by the event pipeline.

func NewProtoMessage

func NewProtoMessage[T proto.Message]() (T, error)

NewProtoMessage instantiates a zero-value protobuf message for the provided generic type.

func PublishProto

func PublishProto(ctx context.Context, publisher message.Publisher, topic string, event proto.Message, metadata metadatapkg.Metadata) error

PublishProto marshals the proto payload and publishes it to the provided topic.

func RegisterJSONHandler

func RegisterJSONHandler[T any, O any](svc *Service, cfg handlerpkg.JSONHandlerRegistration[T, O]) error

RegisterJSONHandler converts the typed JSON handler into a Watermill handler and registers it.

func RegisterMessageHandler

func RegisterMessageHandler(svc *Service, cfg MessageHandlerRegistration) error

RegisterMessageHandler attaches the provided handler to the service router.

func RegisterProtoHandler

func RegisterProtoHandler[T proto.Message](svc *Service, cfg handlerpkg.ProtoHandlerRegistration[T]) error

RegisterProtoHandler converts the typed handler into a Watermill handler and registers it on the Service router.

Types

type BacklogMetrics

type BacklogMetrics struct {
	InFlight           uint64 `json:"in_flight"`
	MaxInFlight        uint64 `json:"max_in_flight"`
	LastQueueDepth     int64  `json:"last_queue_depth"`
	EstimatedLagMillis int64  `json:"estimated_lag_millis"`
}

type DependencyHealth

type DependencyHealth struct {
	Name        string    `json:"name"`
	Status      string    `json:"status"`
	LastChecked time.Time `json:"last_checked"`
	Details     string    `json:"details,omitempty"`
}

type ErrorBreakdown

type ErrorBreakdown struct {
	Validation uint64 `json:"validation"`
	Transport  uint64 `json:"transport"`
	Downstream uint64 `json:"downstream"`
	Other      uint64 `json:"other"`
	LastError  string `json:"last_error,omitempty"`
}

func (*ErrorBreakdown) Record

func (e *ErrorBreakdown) Record(category ErrorCategory, err error)

type ErrorCategory

type ErrorCategory string
const (
	ErrorCategoryNone       ErrorCategory = "none"
	ErrorCategoryValidation ErrorCategory = "validation"
	ErrorCategoryTransport  ErrorCategory = "transport"
	ErrorCategoryDownstream ErrorCategory = "downstream"
	ErrorCategoryOther      ErrorCategory = "other"
)

type ErrorClassifier

type ErrorClassifier func(error) ErrorCategory

type HandlerInfo

type HandlerInfo struct {
	Name         string        `json:"name"`
	ConsumeQueue string        `json:"consume_queue"`
	PublishQueue string        `json:"publish_queue"`
	Stats        *HandlerStats `json:"stats"`
}

type HandlerStats

type HandlerStats struct {
	MessagesProcessed   uint64    `json:"messages_processed"`
	MessagesFailed      uint64    `json:"messages_failed"`
	TotalProcessingTime int64     `json:"total_processing_time_ns"`
	LastProcessedAt     time.Time `json:"last_processed_at"`

	Latency      LatencyMetrics     `json:"latency"`
	Throughput   ThroughputMetrics  `json:"throughput"`
	Errors       ErrorBreakdown     `json:"errors"`
	Resource     ResourceUsage      `json:"resource"`
	Backlog      BacklogMetrics     `json:"backlog"`
	Dependencies []DependencyHealth `json:"dependencies"`
	// contains filtered or unexported fields
}

func (*HandlerStats) MarshalJSON

func (h *HandlerStats) MarshalJSON() ([]byte, error)

type LatencyMetrics

type LatencyMetrics struct {
	AverageNs  int64 `json:"average_ns"`
	P50Ns      int64 `json:"p50_ns"`
	P95Ns      int64 `json:"p95_ns"`
	P99Ns      int64 `json:"p99_ns"`
	LastNs     int64 `json:"last_ns"`
	SampleSize int   `json:"sample_size"`
}

type MessageHandlerRegistration

type MessageHandlerRegistration struct {
	Name         string
	ConsumeQueue string
	PublishQueue string
	Handler      message.HandlerFunc
	Subscriber   message.Subscriber
	Publisher    message.Publisher
}

MessageHandlerRegistration wires a raw Watermill handler without typed helpers.

type MiddlewareBuilder

type MiddlewareBuilder func(*Service) (message.HandlerMiddleware, error)

MiddlewareBuilder constructs a handler middleware using the provided service instance.

type MiddlewareRegistration

type MiddlewareRegistration struct {
	Name       string
	Middleware message.HandlerMiddleware
	Builder    MiddlewareBuilder
}

MiddlewareRegistration captures how a middleware should be registered on a Service router.

func CorrelationIDMiddleware

func CorrelationIDMiddleware() MiddlewareRegistration

CorrelationIDMiddleware ensures each processed message carries a correlation identifier.

func DefaultMiddlewares

func DefaultMiddlewares() []MiddlewareRegistration

DefaultMiddlewares returns the standard middleware chain used by the Service constructor.

func LogMessagesMiddleware

func LogMessagesMiddleware(logger loggingpkg.ServiceLogger) MiddlewareRegistration

LogMessagesMiddleware logs the full payload and metadata of handled messages.

func MetricsMiddleware

func MetricsMiddleware() MiddlewareRegistration

MetricsMiddleware adds Prometheus metrics to the handler.

func OutboxMiddleware

func OutboxMiddleware() MiddlewareRegistration

OutboxMiddleware persists outgoing messages when an OutboxStore is configured.

func PoisonQueueMiddleware

func PoisonQueueMiddleware(filter func(error) bool) MiddlewareRegistration

PoisonQueueMiddleware publishes messages that match the supplied filter to the configured poison queue.

func ProtoValidateMiddleware

func ProtoValidateMiddleware() MiddlewareRegistration

ProtoValidateMiddleware unmarshals and validates protobuf payloads when possible.

func RecovererMiddleware

func RecovererMiddleware() MiddlewareRegistration

RecovererMiddleware converts panics into handler errors so they can be retried or sent to the poison queue.

func RetryMiddleware

func RetryMiddleware(cfg RetryMiddlewareConfig) MiddlewareRegistration

RetryMiddleware retries handler execution using the provided configuration (defaults applied to zero values).

func TracerMiddleware

func TracerMiddleware() MiddlewareRegistration

TracerMiddleware wraps handler execution in an OpenTelemetry span.

type OutboxStore

type OutboxStore interface {
	StoreOutgoingMessage(ctx context.Context, eventType, uuid, payload string) error
}

OutboxStore persists processed messages so they can be forwarded reliably.

type Producer

type Producer interface {
	PublishProto(ctx context.Context, topic string, event proto.Message, metadata metadatapkg.Metadata) error
}

Producer emits proto-based events onto the configured transport.

type ProtoValidator

type ProtoValidator interface {
	Validate(value any) error
}

ProtoValidator validates unmarshalled payloads. Implementations typically forward to protovalidate or a custom struct validator.

type ResourceUsage

type ResourceUsage struct {
	CPUPercent  float64 `json:"cpu_percent"`
	MemoryBytes uint64  `json:"memory_bytes"`
	Goroutines  int     `json:"goroutines"`
}

type RetryMiddlewareConfig

type RetryMiddlewareConfig struct {
	MaxRetries      int
	InitialInterval time.Duration
	MaxInterval     time.Duration
	RetryIf         func(error) bool
}

RetryMiddlewareConfig customises the retry middleware behaviour.

type Service

type Service struct {
	Conf   *configpkg.Config
	Logger loggingpkg.ServiceLogger
	// contains filtered or unexported fields
}

Service wires a Watermill router, publisher, subscriber, and middleware chain.

func NewService

NewService constructs a Service for the supplied configuration. Register handlers on the returned Service before calling Start.

func (*Service) PublishProto

func (s *Service) PublishProto(ctx context.Context, topic string, event proto.Message, metadata metadatapkg.Metadata) error

PublishProto emits the event using the Service publisher so HTTP handlers can create events without touching the internal Watermill APIs directly.

func (*Service) RegisterHTTPHandler

func (s *Service) RegisterHTTPHandler(port int, pattern string, handler http.Handler)

func (*Service) RegisterMiddleware

func (s *Service) RegisterMiddleware(cfg MiddlewareRegistration) error

RegisterMiddleware attaches the supplied middleware to the router.

func (*Service) RegisterProtoMessage

func (s *Service) RegisterProtoMessage(msg proto.Message)

RegisterProtoMessage exposes a proto message type for validation without registering a handler.

func (*Service) Start

func (s *Service) Start(ctx context.Context) error

Start runs the underlying Watermill router until the provided context is cancelled.

func (*Service) StartWebUIServer

func (s *Service) StartWebUIServer()

type ServiceDependencies

type ServiceDependencies struct {
	Outbox                    OutboxStore
	Validator                 ProtoValidator
	Middlewares               []MiddlewareRegistration // Appended after the default middleware chain.
	DisableDefaultMiddlewares bool                     // Skips registering the default middleware chain when true.
	TransportFactory          transportpkg.Factory
	ErrorClassifier           ErrorClassifier
}

ServiceDependencies holds the optional collaborators that the Service can use. Leave fields nil to skip the related middleware.

type ThroughputMetrics

type ThroughputMetrics struct {
	CurrentRPS       float64 `json:"current_rps"`
	WindowSeconds    float64 `json:"window_seconds"`
	MessagesInWindow uint64  `json:"messages_in_window"`
	TotalMessages    uint64  `json:"total_messages"`
}

type UnprocessableEventError

type UnprocessableEventError struct {
	// contains filtered or unexported fields
}

UnprocessableEventError wraps payloads that failed validation or unmarshalling.

func (*UnprocessableEventError) Error

func (e *UnprocessableEventError) Error() string

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL