Skip to content

fix: pattern persistence feature flag #18285

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 9 additions & 9 deletions pkg/pattern/drain/drain.go
Original file line number Diff line number Diff line change
Expand Up @@ -144,18 +144,18 @@ func DefaultConfig() *Config {
}
}

func New(tenantID string, config *Config, limits Limits, format string, writer aggregation.EntryWriter, metrics *Metrics) *Drain {
func New(tenantID string, config *Config, limits Limits, format string, patternWriter aggregation.EntryWriter, metrics *Metrics) *Drain {
if config.LogClusterDepth < 3 {
panic("depth argument must be at least 3")
}
config.maxNodeDepth = config.LogClusterDepth - 2

d := &Drain{
config: config,
rootNode: createNode(),
metrics: metrics,
format: format,
writer: writer,
config: config,
rootNode: createNode(),
metrics: metrics,
format: format,
patternWriter: patternWriter,
}

limiter := newLimiter(config.MaxEvictionRatio)
Expand Down Expand Up @@ -203,7 +203,7 @@ type Drain struct {
state interface{}
limiter *limiter
pruning bool
writer aggregation.EntryWriter
patternWriter aggregation.EntryWriter
}

func (d *Drain) Clusters() []*LogCluster {
Expand Down Expand Up @@ -313,8 +313,8 @@ func (d *Drain) writePattern(
{Name: constants.LevelLabel, Value: lvl},
}

if d.writer != nil {
d.writer.WriteEntry(
if d.patternWriter != nil {
d.patternWriter.WriteEntry(
ts.Time(),
aggregation.PatternEntry(ts.Time(), count, pattern, streamLbls),
newLbls,
Expand Down
5 changes: 4 additions & 1 deletion pkg/pattern/ingester.go
Original file line number Diff line number Diff line change
Expand Up @@ -424,7 +424,7 @@ func (i *Ingester) GetOrCreateInstance(instanceID string) (*instance, error) { /
var patternWriter aggregation.EntryWriter
patternCfg := i.cfg.PatternPersistence
if i.limits.PatternPersistenceEnabled(instanceID) {
metricWriter, err = aggregation.NewPush(
patternWriter, err = aggregation.NewPush(
patternCfg.LokiAddr,
instanceID,
patternCfg.WriteTimeout,
Expand Down Expand Up @@ -487,6 +487,9 @@ func (i *Ingester) stopWriters() {
if instance.metricWriter != nil {
instance.metricWriter.Stop()
}
if instance.patternWriter != nil {
instance.patternWriter.Stop()
}
}
}

Expand Down
212 changes: 212 additions & 0 deletions pkg/pattern/pattern_persistence_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,212 @@
package pattern

import (
"context"
"testing"
"time"

"github.com/go-kit/log"
"github.com/grafana/dskit/flagext"
"github.com/grafana/dskit/kv"
"github.com/grafana/dskit/ring"
"github.com/grafana/dskit/services"
"github.com/grafana/dskit/user"
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/require"
)

// TestPatternPersistenceConfiguration tests that pattern persistence writers
// are correctly configured based on the limits configuration.
func TestPatternPersistenceConfiguration(t *testing.T) {
tests := []struct {
name string
patternPersistenceEnabled bool
metricAggregationEnabled bool
expectPatternWriter bool
expectMetricWriter bool
}{
{
name: "both disabled",
patternPersistenceEnabled: false,
metricAggregationEnabled: false,
expectPatternWriter: false,
expectMetricWriter: false,
},
{
name: "only pattern persistence enabled",
patternPersistenceEnabled: true,
metricAggregationEnabled: false,
expectPatternWriter: true,
expectMetricWriter: false,
},
{
name: "only metric aggregation enabled",
patternPersistenceEnabled: false,
metricAggregationEnabled: true,
expectPatternWriter: false,
expectMetricWriter: true,
},
{
name: "both enabled",
patternPersistenceEnabled: true,
metricAggregationEnabled: true,
expectPatternWriter: true,
expectMetricWriter: true,
},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
// Setup mock limits
limits := &configurableLimits{
patternPersistenceEnabled: tt.patternPersistenceEnabled,
metricAggregationEnabled: tt.metricAggregationEnabled,
}

// Setup ring
replicationSet := ring.ReplicationSet{
Instances: []ring.InstanceDesc{
{Id: "localhost", Addr: "ingester0"},
},
}

fakeRing := &fakeRing{}
fakeRing.On("Get", mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything).
Return(replicationSet, nil)

ringClient := &fakeRingClient{
ring: fakeRing,
}

// Create ingester with the specified configuration
cfg := testIngesterConfig(t)
ing, err := New(cfg, limits, ringClient, "test", nil, log.NewNopLogger())
require.NoError(t, err)
defer services.StopAndAwaitTerminated(context.Background(), ing) //nolint:errcheck

err = services.StartAndAwaitRunning(context.Background(), ing)
require.NoError(t, err)

// Create test instance
_ = user.InjectOrgID(context.Background(), "test-tenant")
instance, err := ing.GetOrCreateInstance("test-tenant")
require.NoError(t, err)

// Verify writer configuration based on test expectations
if tt.expectPatternWriter {
require.NotNil(t, instance.patternWriter, "pattern writer should be configured when pattern persistence is enabled")
} else {
require.Nil(t, instance.patternWriter, "pattern writer should be nil when pattern persistence is disabled")
}

if tt.expectMetricWriter {
require.NotNil(t, instance.metricWriter, "metric writer should be configured when metric aggregation is enabled")
} else {
require.Nil(t, instance.metricWriter, "metric writer should be nil when metric aggregation is disabled")
}

// Verify they are different instances when both are enabled
if tt.expectPatternWriter && tt.expectMetricWriter {
require.NotEqual(t, instance.patternWriter, instance.metricWriter,
"pattern writer and metric writer should be different instances")
}
})
}
}

// TestPatternPersistenceStopWriters tests that both pattern and metric writers
// are properly stopped when the ingester shuts down
func TestPatternPersistenceStopWriters(t *testing.T) {
mockPatternWriter := &mockEntryWriter{}
mockMetricWriter := &mockEntryWriter{}

mockPatternWriter.On("Stop").Return()
mockMetricWriter.On("Stop").Return()

replicationSet := ring.ReplicationSet{
Instances: []ring.InstanceDesc{
{Id: "localhost", Addr: "ingester0"},
},
}

fakeRing := &fakeRing{}
fakeRing.On("Get", mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything).
Return(replicationSet, nil)

ringClient := &fakeRingClient{ring: fakeRing}

cfg := testIngesterConfig(t)
ing, err := New(cfg, &configurableLimits{
patternPersistenceEnabled: true,
metricAggregationEnabled: true,
}, ringClient, "test", nil, log.NewNopLogger())
require.NoError(t, err)

err = services.StartAndAwaitRunning(context.Background(), ing)
require.NoError(t, err)

// Create instance and replace writers with mocks
instance, err := ing.GetOrCreateInstance("test-tenant")
require.NoError(t, err)

instance.patternWriter = mockPatternWriter
instance.metricWriter = mockMetricWriter

// Stop the ingester - this should call stopWriters
err = services.StopAndAwaitTerminated(context.Background(), ing)
require.NoError(t, err)

// Verify both writers were stopped
mockPatternWriter.AssertCalled(t, "Stop")
mockMetricWriter.AssertCalled(t, "Stop")
}

// configurableLimits implements the Limits interface with configurable pattern persistence
type configurableLimits struct {
patternPersistenceEnabled bool
metricAggregationEnabled bool
}

var _ Limits = &configurableLimits{}

func (c *configurableLimits) PatternIngesterTokenizableJSONFields(_ string) []string {
return []string{"log", "message", "msg", "msg_", "_msg", "content"}
}

func (c *configurableLimits) PatternPersistenceEnabled(_ string) bool {
return c.patternPersistenceEnabled
}

func (c *configurableLimits) MetricAggregationEnabled(_ string) bool {
return c.metricAggregationEnabled
}

func testIngesterConfig(t testing.TB) Config {
kvClient, err := kv.NewClient(kv.Config{Store: "inmemory"}, ring.GetCodec(), nil, log.NewNopLogger())
require.NoError(t, err)

cfg := Config{}
flagext.DefaultValues(&cfg)
cfg.FlushCheckPeriod = 99999 * time.Hour
cfg.ConcurrentFlushes = 1
cfg.LifecyclerConfig.RingConfig.KVStore.Mock = kvClient
cfg.LifecyclerConfig.NumTokens = 1
cfg.LifecyclerConfig.ListenPort = 0
cfg.LifecyclerConfig.Addr = "localhost"
cfg.LifecyclerConfig.ID = "localhost"
cfg.LifecyclerConfig.FinalSleep = 0
cfg.LifecyclerConfig.MinReadyDuration = 0

// Configure pattern persistence
cfg.PatternPersistence.LokiAddr = "http://localhost:3100"
cfg.PatternPersistence.WriteTimeout = 30 * time.Second
cfg.PatternPersistence.PushPeriod = 10 * time.Second
cfg.PatternPersistence.BatchSize = 1000

// Configure metric aggregation
cfg.MetricAggregation.LokiAddr = "http://localhost:3100"
cfg.MetricAggregation.WriteTimeout = 30 * time.Second
cfg.MetricAggregation.SamplePeriod = 10 * time.Second

return cfg
}
4 changes: 2 additions & 2 deletions pkg/pattern/stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ func newStream(
instanceID string,
drainCfg *drain.Config,
drainLimits drain.Limits,
writer aggregation.EntryWriter,
patternWriter aggregation.EntryWriter,
) (*stream, error) {
linesSkipped, err := metrics.linesSkipped.CurryWith(prometheus.Labels{"tenant": instanceID})
if err != nil {
Expand All @@ -49,7 +49,7 @@ func newStream(

patterns := make(map[string]*drain.Drain, len(constants.LogLevels))
for _, lvl := range constants.LogLevels {
patterns[lvl] = drain.New(instanceID, drainCfg, drainLimits, guessedFormat, writer, &drain.Metrics{
patterns[lvl] = drain.New(instanceID, drainCfg, drainLimits, guessedFormat, patternWriter, &drain.Metrics{
PatternsEvictedTotal: metrics.patternsDiscardedTotal.WithLabelValues(instanceID, guessedFormat, "false"),
PatternsPrunedTotal: metrics.patternsDiscardedTotal.WithLabelValues(instanceID, guessedFormat, "true"),
PatternsDetectedTotal: metrics.patternsDetectedTotal.WithLabelValues(instanceID, guessedFormat),
Expand Down
31 changes: 20 additions & 11 deletions pkg/querier/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -447,18 +447,27 @@ func (q *QuerierAPI) PatternsHandler(ctx context.Context, req *logproto.QueryPat
}

// Query store for older data by converting to LogQL query
// Only query the store if pattern persistence is enabled for this tenant
if storeQueryInterval != nil && !q.cfg.QueryIngesterOnly && q.engineV1 != nil {
g.Go(func() error {
storeReq := *req
storeReq.Start = storeQueryInterval.start
storeReq.End = storeQueryInterval.end
resp, err := q.queryStoreForPatterns(ctx, &storeReq)
if err != nil {
return err
}
responses.add(resp)
return nil
})
tenantID, err := tenant.TenantID(ctx)
if err != nil {
return nil, err
}

// Only query the store if pattern persistence is enabled for this tenant
if q.limits.PatternPersistenceEnabled(tenantID) {
g.Go(func() error {
storeReq := *req
storeReq.Start = storeQueryInterval.start
storeReq.End = storeQueryInterval.end
resp, err := q.queryStoreForPatterns(ctx, &storeReq)
if err != nil {
return err
}
responses.add(resp)
return nil
})
}
}

if err := g.Wait(); err != nil {
Expand Down
Loading