Skip to content

Commit 42469cc

Browse files
authored
fix: Shut down delete client on local rule-evaluator (#15345)
1 parent 2e07c40 commit 42469cc

File tree

1 file changed

+17
-9
lines changed

1 file changed

+17
-9
lines changed

‎pkg/loki/modules.go

Lines changed: 17 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1339,16 +1339,29 @@ func (t *Loki) initRuleEvaluator() (services.Service, error) {
13391339
mode := t.Cfg.Ruler.Evaluation.Mode
13401340
logger := log.With(util_log.Logger, "component", "ruler", "evaluation_mode", mode)
13411341

1342+
var svc services.Service
13421343
switch mode {
13431344
case ruler.EvalModeLocal:
1344-
var engine *logql.Engine
1345+
var deleteStore deletion.DeleteRequestsClient
1346+
deleteStore, err = t.deleteRequestsClient("rule-evaluator", t.Overrides)
1347+
if err != nil {
1348+
break
1349+
}
13451350

1346-
engine, err = t.createRulerQueryEngine(logger)
1351+
var engine *logql.Engine
1352+
engine, err = t.createRulerQueryEngine(logger, deleteStore)
13471353
if err != nil {
13481354
break
13491355
}
13501356

13511357
evaluator, err = ruler.NewLocalEvaluator(engine, logger)
1358+
1359+
// The delete client needs to be stopped when the evaluator is stopped.
1360+
// We wrap the client on a IDLE service and call Stop on shutdown.
1361+
svc = services.NewIdleService(nil, func(_ error) error {
1362+
deleteStore.Stop()
1363+
return nil
1364+
})
13521365
case ruler.EvalModeRemote:
13531366
qfClient, e := ruler.DialQueryFrontend(&t.Cfg.Ruler.Evaluation.QueryFrontend)
13541367
if e != nil {
@@ -1366,7 +1379,7 @@ func (t *Loki) initRuleEvaluator() (services.Service, error) {
13661379

13671380
t.ruleEvaluator = ruler.NewEvaluatorWithJitter(evaluator, t.Cfg.Ruler.Evaluation.MaxJitter, fnv.New32a(), logger)
13681381

1369-
return nil, nil
1382+
return svc, nil
13701383
}
13711384

13721385
func (t *Loki) initMemberlistKV() (services.Service, error) {
@@ -1912,12 +1925,7 @@ func (t *Loki) deleteRequestsClient(clientType string, limits limiter.CombinedLi
19121925
return deletion.NewPerTenantDeleteRequestsClient(client, limits), nil
19131926
}
19141927

1915-
func (t *Loki) createRulerQueryEngine(logger log.Logger) (eng *logql.Engine, err error) {
1916-
deleteStore, err := t.deleteRequestsClient("rule-evaluator", t.Overrides)
1917-
if err != nil {
1918-
return nil, fmt.Errorf("could not create delete requests store: %w", err)
1919-
}
1920-
1928+
func (t *Loki) createRulerQueryEngine(logger log.Logger, deleteStore deletion.DeleteRequestsClient) (eng *logql.Engine, err error) {
19211929
q, err := querier.New(t.Cfg.Querier, t.Store, t.ingesterQuerier, t.Overrides, deleteStore, nil, logger)
19221930
if err != nil {
19231931
return nil, fmt.Errorf("could not create querier: %w", err)

0 commit comments

Comments
 (0)