Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
fix: Shut down delete client on local rule-evaluator shutdown
  • Loading branch information
salvacorts committed Dec 10, 2024
commit 89ee1130f45be18ccc7f3e05181a1d3854e2afe0
26 changes: 17 additions & 9 deletions pkg/loki/modules.go
Original file line number Diff line number Diff line change
Expand Up @@ -1339,16 +1339,29 @@ func (t *Loki) initRuleEvaluator() (services.Service, error) {
mode := t.Cfg.Ruler.Evaluation.Mode
logger := log.With(util_log.Logger, "component", "ruler", "evaluation_mode", mode)

var svc services.Service
switch mode {
case ruler.EvalModeLocal:
var engine *logql.Engine
var deleteStore deletion.DeleteRequestsClient
deleteStore, err = t.deleteRequestsClient("rule-evaluator", t.Overrides)
if err != nil {
break
}

engine, err = t.createRulerQueryEngine(logger)
var engine *logql.Engine
engine, err = t.createRulerQueryEngine(logger, deleteStore)
if err != nil {
break
}

evaluator, err = ruler.NewLocalEvaluator(engine, logger)

// The delete client needs to be stopped when the evaluator is stopped.
// We wrap the client on a IDLE service and call Stop on shutdown.
svc = services.NewIdleService(nil, func(_ error) error {
deleteStore.Stop()
return nil
})
case ruler.EvalModeRemote:
qfClient, e := ruler.DialQueryFrontend(&t.Cfg.Ruler.Evaluation.QueryFrontend)
if e != nil {
Expand All @@ -1366,7 +1379,7 @@ func (t *Loki) initRuleEvaluator() (services.Service, error) {

t.ruleEvaluator = ruler.NewEvaluatorWithJitter(evaluator, t.Cfg.Ruler.Evaluation.MaxJitter, fnv.New32a(), logger)

return nil, nil
return svc, nil
}

func (t *Loki) initMemberlistKV() (services.Service, error) {
Expand Down Expand Up @@ -1912,12 +1925,7 @@ func (t *Loki) deleteRequestsClient(clientType string, limits limiter.CombinedLi
return deletion.NewPerTenantDeleteRequestsClient(client, limits), nil
}

func (t *Loki) createRulerQueryEngine(logger log.Logger) (eng *logql.Engine, err error) {
deleteStore, err := t.deleteRequestsClient("rule-evaluator", t.Overrides)
if err != nil {
return nil, fmt.Errorf("could not create delete requests store: %w", err)
}

func (t *Loki) createRulerQueryEngine(logger log.Logger, deleteStore deletion.DeleteRequestsClient) (eng *logql.Engine, err error) {
q, err := querier.New(t.Cfg.Querier, t.Store, t.ingesterQuerier, t.Overrides, deleteStore, nil, logger)
if err != nil {
return nil, fmt.Errorf("could not create querier: %w", err)
Expand Down
Loading