Skip to content
Prev Previous commit
Next Next commit
register metrics once
  • Loading branch information
ashwanthgoli committed Dec 4, 2024
commit 786186aa092603ce3ed50e3dbe033a3b54e3764e
18 changes: 12 additions & 6 deletions pkg/kafka/partition/reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"fmt"
"math"
"sync"
"time"

"github.com/go-kit/log"
Expand All @@ -12,6 +13,7 @@ import (
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
"github.com/twmb/franz-go/pkg/kgo"
"github.com/twmb/franz-go/plugin/kprom"

"github.com/grafana/loki/v3/pkg/kafka"

Expand All @@ -25,11 +27,11 @@ const (
KafkaEndOffset SpecialOffset = -1
)

var rm *readerMetrics

func init() {
rm = newReaderMetrics(prometheus.DefaultRegisterer)
}
var (
rm *readerMetrics
clientMetrics *kprom.Metrics
registerOnce sync.Once
)

type Record struct {
// Context holds the tracing (and potentially other) info, that the record was enriched with on fetch from Kafka.
Expand Down Expand Up @@ -105,8 +107,12 @@ func NewKafkaReader(
logger log.Logger,
reg prometheus.Registerer,
) (*KafkaReader, error) {
registerOnce.Do(func() {
rm = newReaderMetrics(reg)
clientMetrics = client.NewReaderClientMetrics("partition-reader", reg)
})

// Create a new Kafka client for this reader
clientMetrics := client.NewReaderClientMetrics("partition-reader", reg)
c, err := client.NewReaderClient(
cfg,
clientMetrics,
Expand Down
Loading