Skip to content

Commit a5fa6f9

Browse files
authored
fix(pattern): Fix feature flag for enabling pattern persistence (#18215)
PR #17737 introduced a feature to persist detected patterns on the pattern ingesters. And while there is a dedicated global config `PatternPersistence` and a per-tenant config `PatternPersistenceEnabled`, none of these were honoured, because for writing the patterns to the distributor, the same writer (and its config) as for persisting aggregated metrics is used. So, when aggregated metrics is enabled, also patterns were automatically written/persisted as well. Signed-off-by: Christian Haudum <christian.haudum@gmail.com>
1 parent 2d65d4d commit a5fa6f9

File tree

3 files changed

+50
-15
lines changed

3 files changed

+50
-15
lines changed

‎pkg/pattern/ingester.go

Lines changed: 30 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -398,12 +398,12 @@ func (i *Ingester) GetOrCreateInstance(instanceID string) (*instance, error) { /
398398
inst, ok = i.instances[instanceID]
399399
if !ok {
400400
var err error
401-
var writer aggregation.EntryWriter
401+
metricAggregationMetrics := aggregation.NewMetrics(i.registerer)
402402

403+
var metricWriter aggregation.EntryWriter
403404
aggCfg := i.cfg.MetricAggregation
404405
if i.limits.MetricAggregationEnabled(instanceID) {
405-
metricAggregationMetrics := aggregation.NewMetrics(i.registerer)
406-
writer, err = aggregation.NewPush(
406+
metricWriter, err = aggregation.NewPush(
407407
aggCfg.LokiAddr,
408408
instanceID,
409409
aggCfg.WriteTimeout,
@@ -413,13 +413,35 @@ func (i *Ingester) GetOrCreateInstance(instanceID string) (*instance, error) { /
413413
string(aggCfg.BasicAuth.Password),
414414
aggCfg.UseTLS,
415415
&aggCfg.BackoffConfig,
416-
i.logger,
416+
log.With(i.logger, "writer", "metric-aggregation"),
417417
metricAggregationMetrics,
418418
)
419419
if err != nil {
420420
return nil, err
421421
}
422422
}
423+
424+
var patternWriter aggregation.EntryWriter
425+
patternCfg := i.cfg.PatternPersistence
426+
if i.limits.PatternPersistenceEnabled(instanceID) {
427+
metricWriter, err = aggregation.NewPush(
428+
patternCfg.LokiAddr,
429+
instanceID,
430+
patternCfg.WriteTimeout,
431+
patternCfg.PushPeriod,
432+
patternCfg.HTTPClientConfig,
433+
patternCfg.BasicAuth.Username,
434+
string(patternCfg.BasicAuth.Password),
435+
patternCfg.UseTLS,
436+
&patternCfg.BackoffConfig,
437+
log.With(i.logger, "writer", "pattern"),
438+
metricAggregationMetrics,
439+
)
440+
if err != nil {
441+
return nil, err
442+
}
443+
}
444+
423445
inst, err = newInstance(
424446
instanceID,
425447
i.logger,
@@ -428,7 +450,8 @@ func (i *Ingester) GetOrCreateInstance(instanceID string) (*instance, error) { /
428450
i.limits,
429451
i.ringClient,
430452
i.lifecycler.ID,
431-
writer,
453+
metricWriter,
454+
patternWriter,
432455
)
433456
if err != nil {
434457
return nil, err
@@ -461,8 +484,8 @@ func (i *Ingester) stopWriters() {
461484
instances := i.getInstances()
462485

463486
for _, instance := range instances {
464-
if instance.writer != nil {
465-
instance.writer.Stop()
487+
if instance.metricWriter != nil {
488+
instance.metricWriter.Stop()
466489
}
467490
}
468491
}

‎pkg/pattern/ingester_test.go

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,7 @@ func TestInstancePushQuery(t *testing.T) {
5959
ringClient,
6060
ingesterID,
6161
mockWriter,
62+
mockWriter,
6263
)
6364
require.NoError(t, err)
6465

@@ -245,6 +246,7 @@ func TestInstancePushAggregateMetrics(t *testing.T) {
245246
ringClient,
246247
ingesterID,
247248
mockWriter,
249+
mockWriter,
248250
)
249251
require.NoError(t, err)
250252

@@ -442,14 +444,21 @@ func (m *mockEntryWriter) Stop() {
442444
}
443445

444446
type fakeLimits struct {
445-
Limits
446-
metricAggregationEnabled bool
447+
metricAggregationEnabled bool
448+
patternPersistenceEnabled bool
447449
}
448450

451+
var _ drain.Limits = &fakeLimits{}
452+
var _ Limits = &fakeLimits{}
453+
449454
func (f *fakeLimits) PatternIngesterTokenizableJSONFields(_ string) []string {
450455
return []string{"log", "message", "msg", "msg_", "_msg", "content"}
451456
}
452457

453458
func (f *fakeLimits) MetricAggregationEnabled(_ string) bool {
454459
return f.metricAggregationEnabled
455460
}
461+
462+
func (f *fakeLimits) PatternPersistenceEnabled(_ string) bool {
463+
return f.patternPersistenceEnabled
464+
}

‎pkg/pattern/instance.go

Lines changed: 9 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -53,7 +53,8 @@ type instance struct {
5353
aggMetricsLock sync.Mutex
5454
aggMetricsByStreamAndLevel map[string]map[string]*aggregatedMetrics
5555

56-
writer aggregation.EntryWriter
56+
metricWriter aggregation.EntryWriter
57+
patternWriter aggregation.EntryWriter
5758
}
5859

5960
type aggregatedMetrics struct {
@@ -69,7 +70,8 @@ func newInstance(
6970
drainLimits drain.Limits,
7071
ringClient RingClient,
7172
ingesterID string,
72-
writer aggregation.EntryWriter,
73+
metricWriter aggregation.EntryWriter,
74+
patternWriter aggregation.EntryWriter,
7375
) (*instance, error) {
7476
index, err := index.NewBitPrefixWithShards(indexShards)
7577
if err != nil {
@@ -87,7 +89,8 @@ func newInstance(
8789
ringClient: ringClient,
8890
ingesterID: ingesterID,
8991
aggMetricsByStreamAndLevel: make(map[string]map[string]*aggregatedMetrics),
90-
writer: writer,
92+
metricWriter: metricWriter,
93+
patternWriter: patternWriter,
9194
}
9295
i.mapper = ingester.NewFPMapper(i.getLabelsFromFingerprint)
9396
return i, nil
@@ -233,7 +236,7 @@ func (i *instance) createStream(_ context.Context, pushReqStream logproto.Stream
233236
fp := i.getHashForLabels(labels)
234237
sortedLabels := i.index.Add(logproto.FromLabelsToLabelAdapters(labels), fp)
235238
firstEntryLine := pushReqStream.Entries[0].Line
236-
s, err := newStream(fp, sortedLabels, i.metrics, i.logger, drain.DetectLogFormat(firstEntryLine), i.instanceID, i.drainCfg, i.drainLimits, i.writer)
239+
s, err := newStream(fp, sortedLabels, i.metrics, i.logger, drain.DetectLogFormat(firstEntryLine), i.instanceID, i.drainCfg, i.drainLimits, i.patternWriter)
237240
if err != nil {
238241
return nil, fmt.Errorf("failed to create stream: %w", err)
239242
}
@@ -339,8 +342,8 @@ func (i *instance) writeAggregatedMetrics(
339342
{Name: constants.LevelLabel, Value: level},
340343
}
341344

342-
if i.writer != nil {
343-
i.writer.WriteEntry(
345+
if i.metricWriter != nil {
346+
i.metricWriter.WriteEntry(
344347
now.Time(),
345348
aggregation.AggregatedMetricEntry(now, totalBytes, totalCount, streamLbls),
346349
newLbls,

0 commit comments

Comments
 (0)