Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 32 additions & 0 deletions pkg/eventdrop/doc.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
// Package eventdrop provides a centralized entry point for capturing
// telemetry about dropped events inside the Knative Broker data plane.
//
// This package introduces the types and helpers that Broker components
// (such as the filter and ingress handlers) will call when an event is
// dropped due to TTL exhaustion or other well-defined conditions.
//
// # Phase 1 Scope
//
// Phase 1 only introduces the API surface and OpenTelemetry wiring,
// without modifying existing handler code. Integration points will be
// added in a follow-up PR once the design is reviewed and approved.
//
// The RecordEventDropped function is the single entry point for all
// drop-related telemetry, ensuring consistency across multiple components.
//
// # Telemetry Design
//
// - Metrics: Uses low-cardinality attributes (namespace, broker, trigger, reason)
// to avoid metric explosion. EventType and EventSource are omitted from metrics.
//
// - Traces: Includes richer, high-cardinality attributes (eventType, eventSource)
// since traces are sampled and can safely carry detailed context.
//
// # Future Phases
//
// Follow-up phases will build on this instrumentation to add:
// - Kubernetes Event reporting
// - Series aggregation and series.count tracking
// - Buffering and periodic reconciliation
// - Dead-letter reporting sinks
package eventdrop
123 changes: 123 additions & 0 deletions pkg/eventdrop/otel.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,123 @@
package eventdrop

import (
"context"

"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/metric"
"go.opentelemetry.io/otel/trace"
)

// OTEL providers obtained from the global OpenTelemetry setup.
// These are used to emit metrics and trace events when events are dropped.
var (
meter = otel.Meter("knative.dev/eventing/eventdrop")
tracer = otel.Tracer("knative.dev/eventing/eventdrop")

// droppedEventsCounter is a monotonic counter that increments each time
// an event is dropped. It uses low-cardinality labels to avoid metric explosion.
// It is initialized to nil and only set if metric initialization succeeds.
droppedEventsCounter metric.Int64Counter
counterInitError error
)

// init initializes the OpenTelemetry metric counter.
// If initialization fails, the error is stored and metrics will be skipped.
// This best-effort approach ensures that telemetry initialization failures do not
// disrupt normal Broker operation.
func init() {
var err error
droppedEventsCounter, err = meter.Int64Counter(
"eventing_broker_events_dropped_total",
metric.WithDescription("Number of events dropped by the Broker data plane"),
metric.WithUnit("1"),
)
if err != nil {
// Phase 1 does not fail on metric initialization errors.
// Store the error in case a future phase wants to log it.
// For now, we simply skip metrics if initialization fails; the RecordEventDropped
// function will gracefully degrade to trace-only telemetry.
counterInitError = err
}
}

// recordMetrics emits a low-cardinality metric when an event is dropped.
//
// As per OpenTelemetry best practices and Evan's design review, we keep the
// label set minimal to avoid high-cardinality metric explosion:
//
// - namespace: Kubernetes namespace of the Broker
// - broker: Name of the Broker
// - trigger: Name of the Trigger (may be empty at ingress)
// - reason: Enum value explaining why the event was dropped
//
// EventType and EventSource are intentionally excluded from metrics due to their
// high cardinality. These attributes are included in traces instead, where sampling
// makes cardinality manageable.
func recordMetrics(ctx context.Context, info Info) {
// If the counter failed to initialize, gracefully skip metrics.
if droppedEventsCounter == nil {
return
}

// Build the low-cardinality attribute set.
attrs := []attribute.KeyValue{
attribute.String("namespace", info.Namespace),
attribute.String("broker", info.Broker),
attribute.String("trigger", info.Trigger),
attribute.String("reason", string(info.Reason)),
}

// Increment the dropped events counter.
droppedEventsCounter.Add(ctx, 1, metric.WithAttributes(attrs...))
}

// recordTrace emits a trace event with richer, high-cardinality attributes.
//
// Traces are sampled (e.g., 1 in 1000 events), so they can safely include
// high-cardinality attributes like eventType and eventSource without impacting
// the observability backend.
//
// The function adds an "event-dropped" event to the current span if one is active.
// If no span is currently recording, the function returns early without creating
// a new span (Phase 1 design choice).
func recordTrace(ctx context.Context, info Info) {
// Get the current span from the context.
span := trace.SpanFromContext(ctx)

// Only emit trace events if the span is actively recording.
if !span.IsRecording() {
return
}

// Build the full attribute set for the trace event.
// Start with core attributes that are always present.
attrs := []attribute.KeyValue{
attribute.String("namespace", info.Namespace),
attribute.String("broker", info.Broker),
attribute.String("trigger", info.Trigger),
attribute.String("reason", string(info.Reason)),
}

// Include event metadata – safe for traces due to sampling.
// These attributes may vary widely between events, making them unsuitable
// for metrics but ideal for sampled trace context.
if info.EventType != "" {
attrs = append(attrs, attribute.String("event_type", info.EventType))
}
if info.EventSource != "" {
attrs = append(attrs, attribute.String("event_source", info.EventSource))
}

// Include optional details field if provided (e.g., "TTL=0", "loop detected").
// This provides additional context for debugging without adding cardinality
// to metrics.
if info.Details != "" {
attrs = append(attrs, attribute.String("details", info.Details))
}

// Add the "event-dropped" event to the current span.
// This event, along with its attributes, will be recorded in the trace.
span.AddEvent("event-dropped", trace.WithAttributes(attrs...))
}
51 changes: 51 additions & 0 deletions pkg/eventdrop/record.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
package eventdrop

import "context"

// RecordEventDropped is the single entry point for recording telemetry
// whenever an event is dropped in the Broker data plane.
//
// This function records both metrics (low-cardinality) and traces (high-cardinality),
// providing visibility into drop events across different observability dimensions.
//
// # Integration Points (Phase 1.5)
//
// The following Broker handlers will invoke this function in a follow-up PR:
//
// 1. pkg/broker/filter/filter_handler.go (ReasonTTLMissing)
// - Called when an event lacks the internal TTL extension.
// - Typically indicates the event was not sent by the Broker and cannot
// - be safely treated as a looped event.
// - At this point, the Trigger name is known and should be populated.
//
// 2. pkg/broker/ingress/ingress_handler.go (ReasonTTLExhausted)
// - Called when the TTL countdown reaches <= 0.
// - Indicates the event has been evaluated by multiple Triggers and
// - the TTL mechanism is breaking an event loop.
// - At this point, the Trigger may not be known (will be populated in future phases).
//
// # Parameters
//
// ctx: The context carrying the current span and OTEL context.
// info: The Info struct describing the dropped event and drop reason.
//
// # Error Handling
//
// RecordEventDropped does not return errors and performs best-effort telemetry:
// - If metrics initialization failed, only traces will be recorded.
// - If the span is not recording, only metrics will be recorded.
// - If both metric and trace recording fail silently, the function returns cleanly.
//
// This design ensures that telemetry failures do not disrupt event processing.
//
// # Phase 1 Contract
//
// This function is safe to call even if OpenTelemetry is not fully configured.
// It gracefully degrades to the available observability infrastructure.
func RecordEventDropped(ctx context.Context, info Info) {
// Record both metrics and traces.
// Each function performs its own availability checks and gracefully
// degrades if the underlying OTEL infrastructure is not ready.
recordMetrics(ctx, info)
recordTrace(ctx, info)
}
105 changes: 105 additions & 0 deletions pkg/eventdrop/record_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
package eventdrop

import (
"context"
"testing"
)

// TestRecordEventDropped_DoesNotPanic validates the Phase 1 contract:
// RecordEventDropped is safe to call even if OTEL infrastructure is not
// fully initialized or configured.
//
// Phase 1 tests focus on the API surface and graceful degradation.
// Detailed metric and trace verification will be added in Phase 1.5
// once the handlers integrate with RecordEventDropped and emit real events.
func TestRecordEventDropped_DoesNotPanic(t *testing.T) {
ctx := context.Background()

// Construct a complete Info struct with all fields populated.
// This validates that RecordEventDropped handles the full API surface.
info := Info{
Namespace: "test-ns",
Broker: "test-broker",
Trigger: "test-trigger",
EventType: "dev.knative.test",
EventSource: "test-source",
Reason: ReasonTTLExhausted,
Details: "TTL count reached 0",
}

// The function should not panic under any circumstances.
// Even if OTEL is not initialized, it should degrade gracefully.
RecordEventDropped(ctx, info)
}

// TestRecordEventDropped_WithPartialInfo validates that RecordEventDropped
// gracefully handles Info structs where not all fields are populated.
//
// This is important because:
// - At ingress time, Trigger is not yet known (empty string is OK).
// - EventType and EventSource may not always be available.
// - Details field is optional for all call sites.
//
// Metrics should still be emitted with empty string values for missing fields.
// Traces should omit attributes that are empty.
func TestRecordEventDropped_WithPartialInfo(t *testing.T) {
ctx := context.Background()

// Simulate the ingress handler case where Trigger is not yet known.
info := Info{
Namespace: "test-ns",
Broker: "test-broker",
Trigger: "", // Not known at ingress
EventType: "com.example.event",
EventSource: "", // May not be available
Reason: ReasonTTLMissing,
Details: "", // Optional
}

// Should handle partial Info without panicking.
RecordEventDropped(ctx, info)
}

// TestRecordEventDropped_MinimalInfo validates that RecordEventDropped
// works with only the required fields populated.
//
// This test ensures backward compatibility and graceful degradation
// if handler code is written before all context is available.
func TestRecordEventDropped_MinimalInfo(t *testing.T) {
ctx := context.Background()

// Only required fields (Namespace, Broker, Reason).
info := Info{
Namespace: "test-ns",
Broker: "test-broker",
Reason: ReasonTTLExhausted,
}

// Should work fine with minimal Info.
RecordEventDropped(ctx, info)
}

// TestRecordEventDropped_AllReasons validates that both Reason enum values
// are handled correctly.
//
// This ensures that as new reasons are added in future phases, they will
// be handled without code changes.
func TestRecordEventDropped_AllReasons(t *testing.T) {
ctx := context.Background()

reasons := []Reason{
ReasonTTLMissing,
ReasonTTLExhausted,
}

for _, reason := range reasons {
info := Info{
Namespace: "test-ns",
Broker: "test-broker",
Reason: reason,
}

// Each reason should be handled without panic.
RecordEventDropped(ctx, info)
}
}
59 changes: 59 additions & 0 deletions pkg/eventdrop/types.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
package eventdrop

// Reason indicates why a Broker data-plane component dropped an event.
// This enum is intentionally small for Phase 1; additional reasons can be
// added in future phases as new drop conditions are identified and instrumented.
type Reason string

const (
// ReasonTTLMissing is used by the filter handler when an event lacks the
// internal TTL extension. This typically means the event was not sent by
// the Broker itself (e.g., direct ingestion) and cannot be safely treated
// as a looped event.
//
// Call site: pkg/broker/filter/filter_handler.go
ReasonTTLMissing Reason = "ttl-missing"

// ReasonTTLExhausted is used by the ingress handler when the TTL countdown
// has reached <= 0. This indicates the event has been processed through
// multiple Trigger evaluations and the TTL mechanism is breaking an event loop.
//
// Call site: pkg/broker/ingress/ingress_handler.go
ReasonTTLExhausted Reason = "ttl-exhausted"

// Future reasons might include:
// - ReasonDeadLetterFailed (Phase 2: dead-letter delivery failure)
// - ReasonDeliveryExhausted (Phase 2: max retry attempts exceeded)
// - ReasonInternalError (Phase 2: non-retryable internal error)
)

// Info describes the contextual metadata related to a dropped event.
// Handler code will populate these fields when invoking RecordEventDropped.
//
// Phase 1 does not require all fields to be available at all drop locations.
// For example, at ingress time, the Trigger is not yet known, so it may be empty.
// The minimal subset (Namespace, Broker, Reason) is sufficient for metrics,
// while richer data (EventType, EventSource, Trigger) will be used in traces.
//
// Fields:
//
// Namespace: The Kubernetes namespace where the Broker resides. Required for metrics.
// Broker: The name of the Broker that dropped the event. Required for metrics.
// Trigger: The name of the Trigger associated with this drop (if known).
// May be empty at ingress; populated by filter. Important for traces.
// EventType: The CloudEvents "type" attribute of the dropped event.
// Used in traces only; omitted from metrics to avoid cardinality explosion.
// EventSource: The CloudEvents "source" attribute of the dropped event.
// Used in traces only; omitted from metrics to avoid cardinality explosion.
// Reason: The Reason enum indicating why the event was dropped. Required.
// Details: Optional additional context (e.g., "TTL=0", "loop detected").
// Used in traces for richer debugging information.
type Info struct {
Namespace string
Broker string
Trigger string
EventType string
EventSource string
Reason Reason
Details string // Optional: rich context for traces, not metrics.
}