Skip to content

Manipulate ingesters query min time when -querier.query-ingesters-within is set #2904

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
* [CHANGE] Ingester: Chunks flushed via /flush stay in memory until retention period is reached. This affects `cortex_ingester_memory_chunks` metric. #2778
* [CHANGE] Querier: the error message returned when the query time range exceeds `-store.max-query-length` has changed from `invalid query, length > limit (X > Y)` to `the query time range exceeds the limit (query length: X, limit: Y)`. #2826
* [CHANGE] Add `component` label to metrics exposed by chunk, delete and index store clients. #2774
* [CHANGE] Querier: when `-querier.query-ingesters-within` is configured, the time range of the query sent to ingesters is now manipulated to ensure the query start time is not older than 'now - query-ingesters-within'. #2904
* [CHANGE] KV: The `role` label which was a label of `multi` KV store client only has been added to metrics of every KV store client. If KV store client is not `multi`, then the value of `role` label is `primary`. #2837
* [CHANGE] Added the `engine` label to the metrics exposed by the Prometheus query engine, to distinguish between `ruler` and `querier` metrics. #2854
* [CHANGE] Added ruler to the single binary when started with `-target=all` (default). #2854
Expand Down
2 changes: 1 addition & 1 deletion pkg/querier/blocks_store_queryable.go
Original file line number Diff line number Diff line change
Expand Up @@ -334,7 +334,7 @@ func (q *blocksStoreQuerier) selectSorted(sp *storage.SelectHints, matchers ...*
maxT = util.Min64(maxT, util.TimeToMillis(now.Add(-q.queryStoreAfter)))

if origMaxT != maxT {
level.Debug(spanLog).Log("msg", "query max time has been manipulated", "original", origMaxT, "updated", maxT)
level.Debug(spanLog).Log("msg", "the max time of the query to blocks storage has been manipulated", "original", origMaxT, "updated", maxT)
}

if maxT < minT {
Expand Down
83 changes: 53 additions & 30 deletions pkg/querier/distributor_queryable.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"sort"
"time"

"github.com/go-kit/kit/log/level"
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/pkg/labels"
"github.com/prometheus/prometheus/promql"
Expand All @@ -31,44 +32,46 @@ type Distributor interface {
MetricsMetadata(ctx context.Context) ([]scrape.MetricMetadata, error)
}

func newDistributorQueryable(distributor Distributor, streaming bool, iteratorFn chunkIteratorFunc, queryIngesterWithin time.Duration) QueryableWithFilter {
func newDistributorQueryable(distributor Distributor, streaming bool, iteratorFn chunkIteratorFunc, queryIngestersWithin time.Duration) QueryableWithFilter {
return distributorQueryable{
distributor: distributor,
streaming: streaming,
iteratorFn: iteratorFn,
queryIngesterWithin: queryIngesterWithin,
distributor: distributor,
streaming: streaming,
iteratorFn: iteratorFn,
queryIngestersWithin: queryIngestersWithin,
}
}

type distributorQueryable struct {
distributor Distributor
streaming bool
iteratorFn chunkIteratorFunc
queryIngesterWithin time.Duration
distributor Distributor
streaming bool
iteratorFn chunkIteratorFunc
queryIngestersWithin time.Duration
}

func (d distributorQueryable) Querier(ctx context.Context, mint, maxt int64) (storage.Querier, error) {
return &distributorQuerier{
distributor: d.distributor,
ctx: ctx,
mint: mint,
maxt: maxt,
streaming: d.streaming,
chunkIterFn: d.iteratorFn,
distributor: d.distributor,
ctx: ctx,
mint: mint,
maxt: maxt,
streaming: d.streaming,
chunkIterFn: d.iteratorFn,
queryIngestersWithin: d.queryIngestersWithin,
}, nil
}

func (d distributorQueryable) UseQueryable(now time.Time, _, queryMaxT int64) bool {
// Include ingester only if maxt is within QueryIngestersWithin w.r.t. current time.
return d.queryIngesterWithin == 0 || queryMaxT >= util.TimeToMillis(now.Add(-d.queryIngesterWithin))
return d.queryIngestersWithin == 0 || queryMaxT >= util.TimeToMillis(now.Add(-d.queryIngestersWithin))
}

type distributorQuerier struct {
distributor Distributor
ctx context.Context
mint, maxt int64
streaming bool
chunkIterFn chunkIteratorFunc
distributor Distributor
ctx context.Context
mint, maxt int64
streaming bool
chunkIterFn chunkIteratorFunc
queryIngestersWithin time.Duration
}

// Select implements storage.Querier interface.
Expand All @@ -77,23 +80,45 @@ func (q *distributorQuerier) Select(_ bool, sp *storage.SelectHints, matchers ..
log, ctx := spanlogger.New(q.ctx, "distributorQuerier.Select")
defer log.Span.Finish()

minT, maxT := q.mint, q.maxt
if sp != nil {
minT, maxT = sp.Start, sp.End
}

// If queryIngestersWithin is enabled, we do manipulate the query mint to query samples up until
// now - queryIngestersWithin, because older time ranges are covered by the storage. This
// optimization is particularly important for the blocks storage where the blocks retention in the
// ingesters could be way higher than queryIngestersWithin.
if q.queryIngestersWithin > 0 {
now := time.Now()
origMinT := minT
minT = util.Max64(minT, util.TimeToMillis(now.Add(-q.queryIngestersWithin)))

if origMinT != minT {
level.Debug(log).Log("msg", "the min time of the query to ingesters has been manipulated", "original", origMinT, "updated", minT)
}

if minT > maxT {
level.Debug(log).Log("msg", "empty query time range after min time manipulation")
return storage.EmptySeriesSet()
}
}

// Kludge: Prometheus passes nil SelectParams if it is doing a 'series' operation,
// which needs only metadata.
if sp == nil {
ms, err := q.distributor.MetricsForLabelMatchers(ctx, model.Time(q.mint), model.Time(q.maxt), matchers...)
ms, err := q.distributor.MetricsForLabelMatchers(ctx, model.Time(minT), model.Time(maxT), matchers...)
if err != nil {
return storage.ErrSeriesSet(err)
}
return series.MetricsToSeriesSet(ms)
}

mint, maxt := sp.Start, sp.End

if q.streaming {
return q.streamingSelect(*sp, matchers)
return q.streamingSelect(minT, maxT, matchers)
}

matrix, err := q.distributor.Query(ctx, model.Time(mint), model.Time(maxt), matchers...)
matrix, err := q.distributor.Query(ctx, model.Time(minT), model.Time(maxT), matchers...)
if err != nil {
return storage.ErrSeriesSet(promql.ErrStorage{Err: err})
}
Expand All @@ -102,15 +127,13 @@ func (q *distributorQuerier) Select(_ bool, sp *storage.SelectHints, matchers ..
return series.MatrixToSeriesSet(matrix)
}

func (q *distributorQuerier) streamingSelect(sp storage.SelectHints, matchers []*labels.Matcher) storage.SeriesSet {
func (q *distributorQuerier) streamingSelect(minT, maxT int64, matchers []*labels.Matcher) storage.SeriesSet {
userID, err := user.ExtractOrgID(q.ctx)
if err != nil {
return storage.ErrSeriesSet(promql.ErrStorage{Err: err})
}

mint, maxt := sp.Start, sp.End

results, err := q.distributor.QueryStream(q.ctx, model.Time(mint), model.Time(maxt), matchers...)
results, err := q.distributor.QueryStream(q.ctx, model.Time(minT), model.Time(maxT), matchers...)
if err != nil {
return storage.ErrSeriesSet(promql.ErrStorage{Err: err})
}
Expand Down
104 changes: 87 additions & 17 deletions pkg/querier/distributor_queryable_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,16 @@ package querier

import (
"context"
"fmt"
"testing"
"time"

"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/pkg/labels"
"github.com/prometheus/prometheus/scrape"
"github.com/prometheus/prometheus/storage"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/require"
"github.com/weaveworks/common/user"

Expand All @@ -25,8 +28,9 @@ const (
)

func TestDistributorQuerier(t *testing.T) {
d := &mockDistributor{
m: model.Matrix{
d := &mockDistributor{}
d.On("Query", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(
model.Matrix{
// Matrixes are unsorted, so this tests that the labels get sorted.
&model.SampleStream{
Metric: model.Metric{
Expand All @@ -39,7 +43,8 @@ func TestDistributorQuerier(t *testing.T) {
},
},
},
}
nil)

queryable := newDistributorQueryable(d, false, nil, 0)
querier, err := queryable.Querier(context.Background(), mint, maxt)
require.NoError(t, err)
Expand All @@ -59,6 +64,73 @@ func TestDistributorQuerier(t *testing.T) {
require.NoError(t, seriesSet.Err())
}

func TestDistributorQuerier_SelectShouldHonorQueryIngestersWithin(t *testing.T) {
now := time.Now()

tests := map[string]struct {
queryIngestersWithin time.Duration
queryMinT int64
queryMaxT int64
expectedMinT int64
expectedMaxT int64
}{
"should not manipulate query time range if queryIngestersWithin is disabled": {
queryIngestersWithin: 0,
queryMinT: util.TimeToMillis(now.Add(-100 * time.Minute)),
queryMaxT: util.TimeToMillis(now.Add(-30 * time.Minute)),
expectedMinT: util.TimeToMillis(now.Add(-100 * time.Minute)),
expectedMaxT: util.TimeToMillis(now.Add(-30 * time.Minute)),
},
"should not manipulate query time range if queryIngestersWithin is enabled but query min time is newer": {
queryIngestersWithin: time.Hour,
queryMinT: util.TimeToMillis(now.Add(-50 * time.Minute)),
queryMaxT: util.TimeToMillis(now.Add(-30 * time.Minute)),
expectedMinT: util.TimeToMillis(now.Add(-50 * time.Minute)),
expectedMaxT: util.TimeToMillis(now.Add(-30 * time.Minute)),
},
"should manipulate query time range if queryIngestersWithin is enabled and query min time is older": {
queryIngestersWithin: time.Hour,
queryMinT: util.TimeToMillis(now.Add(-100 * time.Minute)),
queryMaxT: util.TimeToMillis(now.Add(-30 * time.Minute)),
expectedMinT: util.TimeToMillis(now.Add(-60 * time.Minute)),
expectedMaxT: util.TimeToMillis(now.Add(-30 * time.Minute)),
},
"should skip the query if the query max time is older than queryIngestersWithin": {
queryIngestersWithin: time.Hour,
queryMinT: util.TimeToMillis(now.Add(-100 * time.Minute)),
queryMaxT: util.TimeToMillis(now.Add(-90 * time.Minute)),
expectedMinT: 0,
expectedMaxT: 0,
},
}

for _, streamingEnabled := range []bool{false, true} {
for testName, testData := range tests {
t.Run(fmt.Sprintf("%s (streaming enabled: %t)", testName, streamingEnabled), func(t *testing.T) {
distributor := &mockDistributor{}
distributor.On("Query", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(model.Matrix{}, nil)
distributor.On("QueryStream", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(&client.QueryStreamResponse{}, nil)

ctx := user.InjectOrgID(context.Background(), "test")
queryable := newDistributorQueryable(distributor, streamingEnabled, nil, testData.queryIngestersWithin)
querier, err := queryable.Querier(ctx, testData.queryMinT, testData.queryMaxT)
require.NoError(t, err)

seriesSet := querier.Select(true, &storage.SelectHints{Start: testData.queryMinT, End: testData.queryMaxT})
require.NoError(t, seriesSet.Err())

if testData.expectedMinT == 0 && testData.expectedMaxT == 0 {
assert.Len(t, distributor.Calls, 0)
} else {
require.Len(t, distributor.Calls, 1)
assert.InDelta(t, testData.expectedMinT, int64(distributor.Calls[0].Arguments.Get(1).(model.Time)), float64(5*time.Second.Milliseconds()))
assert.Equal(t, testData.expectedMaxT, int64(distributor.Calls[0].Arguments.Get(2).(model.Time)))
}
})
}
}
}

func TestDistributorQueryableFilter(t *testing.T) {
d := &mockDistributor{}
dq := newDistributorQueryable(d, false, nil, 1*time.Hour)
Expand Down Expand Up @@ -86,8 +158,9 @@ func TestIngesterStreaming(t *testing.T) {
})
require.NoError(t, err)

d := &mockDistributor{
r: &client.QueryStreamResponse{
d := &mockDistributor{}
d.On("QueryStream", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(
&client.QueryStreamResponse{
Chunkseries: []client.TimeSeriesChunk{
{
Labels: []client.LabelAdapter{
Expand All @@ -103,7 +176,8 @@ func TestIngesterStreaming(t *testing.T) {
},
},
},
}
nil)

ctx := user.InjectOrgID(context.Background(), "0")
queryable := newDistributorQueryable(d, true, mergeChunks, 0)
querier, err := queryable.Querier(ctx, mint, maxt)
Expand All @@ -125,17 +199,16 @@ func TestIngesterStreaming(t *testing.T) {
}

type mockDistributor struct {
metadata []scrape.MetricMetadata
metadataError error
m model.Matrix
r *client.QueryStreamResponse
mock.Mock
}

func (m *mockDistributor) Query(ctx context.Context, from, to model.Time, matchers ...*labels.Matcher) (model.Matrix, error) {
return m.m, nil
args := m.Called(ctx, from, to, matchers)
return args.Get(0).(model.Matrix), args.Error(1)
}
func (m *mockDistributor) QueryStream(ctx context.Context, from, to model.Time, matchers ...*labels.Matcher) (*client.QueryStreamResponse, error) {
return m.r, nil
args := m.Called(ctx, from, to, matchers)
return args.Get(0).(*client.QueryStreamResponse), args.Error(1)
}
func (m *mockDistributor) LabelValuesForLabelName(context.Context, model.LabelName) ([]string, error) {
return nil, nil
Expand All @@ -148,9 +221,6 @@ func (m *mockDistributor) MetricsForLabelMatchers(ctx context.Context, from, thr
}

func (m *mockDistributor) MetricsMetadata(ctx context.Context) ([]scrape.MetricMetadata, error) {
if m.metadataError != nil {
return nil, m.metadataError
}

return m.metadata, nil
args := m.Called(ctx)
return args.Get(0).([]scrape.MetricMetadata), args.Error(1)
}
13 changes: 7 additions & 6 deletions pkg/querier/metadata_handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,15 +8,17 @@ import (
"testing"

"github.com/prometheus/prometheus/scrape"
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/require"
)

func TestMetadataHandler_Success(t *testing.T) {
d := &mockDistributor{
metadata: []scrape.MetricMetadata{
d := &mockDistributor{}
d.On("MetricsMetadata", mock.Anything).Return(
[]scrape.MetricMetadata{
{Metric: "alertmanager_dispatcher_aggregation_groups", Help: "Number of active aggregation groups", Type: "gauge", Unit: ""},
},
}
nil)

handler := MetadataHandler(d)

Expand Down Expand Up @@ -49,9 +51,8 @@ func TestMetadataHandler_Success(t *testing.T) {
}

func TestMetadataHandler_Error(t *testing.T) {
d := &mockDistributor{
metadataError: fmt.Errorf("no user id"),
}
d := &mockDistributor{}
d.On("MetricsMetadata", mock.Anything).Return([]scrape.MetricMetadata{}, fmt.Errorf("no user id"))

handler := MetadataHandler(d)

Expand Down
8 changes: 4 additions & 4 deletions pkg/querier/querier_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"github.com/pkg/errors"
"github.com/prometheus/prometheus/tsdb"
"github.com/prometheus/prometheus/tsdb/chunkenc"
"github.com/stretchr/testify/mock"

"github.com/cortexproject/cortex/pkg/chunk/purger"
"github.com/cortexproject/cortex/pkg/util/validation"
Expand Down Expand Up @@ -486,10 +487,9 @@ func mockDistibutorFor(t *testing.T, cs mockChunkStore, through model.Time) *moc
matrix, err := chunk.ChunksToMatrix(context.Background(), cs.chunks, 0, through)
require.NoError(t, err)

result := &mockDistributor{
m: matrix,
r: &client.QueryStreamResponse{Chunkseries: []client.TimeSeriesChunk{tsc}},
}
result := &mockDistributor{}
result.On("Query", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(matrix, nil)
result.On("QueryStream", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(&client.QueryStreamResponse{Chunkseries: []client.TimeSeriesChunk{tsc}}, nil)
return result
}

Expand Down