Skip to content

Storage: Fix max_chunks_per_query functionality #11212

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 19 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
19 commits
Select commit Hold shift + click to select a range
761f786
Remove unused query_limiter.go code
le0park Nov 11, 2023
fadd02c
Error when chunk ids length is larger than max_chunks_per_query.
le0park Nov 11, 2023
4fb583a
Test GetChunks with max_chunks_per_query
le0park Nov 11, 2023
7c677ec
Merge remote-tracking branch 'origin/main' into revive-max-chunks-per…
le0park Nov 11, 2023
c928439
Merge branch 'main' into revive-max-chunks-per-query
le0park Nov 13, 2023
db20ab7
Merge branch 'main' into revive-max-chunks-per-query
le0park Nov 13, 2023
49322b5
Merge branch 'main' into revive-max-chunks-per-query
le0park Nov 14, 2023
fdad087
Merge branch 'main' into revive-max-chunks-per-query
le0park Nov 15, 2023
1353fb2
Merge branch 'main' into revive-max-chunks-per-query
le0park Nov 16, 2023
d0eebcb
Merge branch 'main' into revive-max-chunks-per-query
le0park Nov 23, 2023
ad8d92b
Merge branch 'main' into revive-max-chunks-per-query
le0park Nov 24, 2023
8bd8314
Merge branch 'main' into revive-max-chunks-per-query
le0park Feb 5, 2024
40c2979
Merge branch 'main' into revive-max-chunks-per-query
le0park Feb 5, 2024
b7931f9
Merge branch 'main' into revive-max-chunks-per-query
le0park Feb 6, 2024
a6924c1
Merge branch 'main' into revive-max-chunks-per-query
le0park Feb 7, 2024
195a8ad
Merge branch 'main' into revive-max-chunks-per-query
le0park Mar 11, 2024
2ecade6
fix: using MaxChunksPerQuery in stores.StoreLimits
le0park Mar 11, 2024
9a0fa7f
Merge branch 'main' into revive-max-chunks-per-query
le0park Mar 12, 2024
d61571f
Merge branch 'main' into revive-max-chunks-per-query
le0park Mar 16, 2024
File filter

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions pkg/storage/stores/composite_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package stores

import (
"context"
"fmt"
"sort"

"github.com/prometheus/common/model"
Expand All @@ -10,6 +11,7 @@ import (
"github.com/grafana/loki/pkg/logproto"
"github.com/grafana/loki/pkg/storage/chunk"
"github.com/grafana/loki/pkg/storage/chunk/fetcher"
"github.com/grafana/loki/pkg/storage/errors"
"github.com/grafana/loki/pkg/storage/stores/index"
"github.com/grafana/loki/pkg/storage/stores/index/seriesvolume"
"github.com/grafana/loki/pkg/storage/stores/index/stats"
Expand Down Expand Up @@ -171,6 +173,14 @@ func (c CompositeStore) GetChunks(ctx context.Context, userID string, from, thro
fetchers = append(fetchers, fetcher...)
return nil
})

// Protect ourselves against OOMing.
maxChunksPerQuery := c.limits.MaxChunksPerQuery(userID)
if maxChunksPerQuery > 0 && len(chunkIDs) > maxChunksPerQuery {
err := errors.QueryError(fmt.Sprintf("Query %v fetched too many chunks (%d > %d)", predicate, len(chunkIDs), maxChunksPerQuery))
return nil, nil, err
}

return chunkIDs, fetchers, err
}

Expand Down
2 changes: 1 addition & 1 deletion pkg/storage/stores/composite_store_entry.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ import (
)

type StoreLimits interface {
MaxChunksPerQueryFromStore(string) int
MaxChunksPerQuery(string) int
MaxQueryLength(context.Context, string) time.Duration
}

Expand Down
79 changes: 79 additions & 0 deletions pkg/storage/stores/composite_store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,14 @@ import (
"fmt"
"reflect"
"testing"
"time"

"github.com/pkg/errors"

"github.com/grafana/loki/pkg/logproto"

"github.com/grafana/dskit/test"
"github.com/grafana/dskit/user"
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/model/labels"
"github.com/stretchr/testify/require"
Expand Down Expand Up @@ -301,6 +303,83 @@ func TestCompositeStore_GetChunkFetcher(t *testing.T) {
}
}

type mockStoreGetChunks struct {
mockStoreGetChunkFetcher
chunkCount int
}

func (m mockStoreGetChunks) GetChunks(_ context.Context, _ string, _, _ model.Time, _ ...*labels.Matcher) ([][]chunk.Chunk, []*fetcher.Fetcher, error) {
return make([][]chunk.Chunk, m.chunkCount), []*fetcher.Fetcher{m.chunkFetcher}, nil
}

type mockStoreLimits struct {
MaxChunksPerQuery int
}

func (ml mockStoreLimits) MaxChunksPerQuery(string) int {
return ml.MaxChunksPerQuery
}

func (ml mockStoreLimits) MaxQueryLength(context.Context, string) time.Duration {
// mock
return time.Duration(1000 * 1000 * 1000 * 60 * 60 * 24)
}

var (
ctx = user.InjectOrgID(context.Background(), "fake")
)

func TestCompositeStore_GetChunks_MaxChunksPerQuery(t *testing.T) {
cs := []CompositeStore{
{
limits: mockStoreLimits{10},
stores: []compositeStoreEntry{
{
model.TimeFromUnix(10),
mockStoreGetChunks{mockStoreGetChunkFetcher{mockStore(0), &fetcher.Fetcher{}}, 5},
},
},
},
{
limits: mockStoreLimits{10},
stores: []compositeStoreEntry{
{
model.TimeFromUnix(10),
mockStoreGetChunks{mockStoreGetChunkFetcher{mockStore(2), &fetcher.Fetcher{}}, 100},
},
},
},
}

for _, tc := range []struct {
name string
tmf model.Time
tmt model.Time
cs CompositeStore
expectedChunkLength int
}{
{
name: "store with 5 chunks",
tmf: model.TimeFromUnix(10),
tmt: model.TimeFromUnix(20),
cs: cs[0],
expectedChunkLength: 5,
},
{
name: "store with 100 chunks",
tmf: model.TimeFromUnix(10),
tmt: model.TimeFromUnix(20),
cs: cs[1],
expectedChunkLength: 0,
},
} {
t.Run(tc.name, func(t *testing.T) {
actualChunks, _, _ := tc.cs.GetChunks(ctx, "fake", tc.tmf, tc.tmt)
require.Equal(t, tc.expectedChunkLength, len(actualChunks))
})
}
}

type mockStoreVolume struct {
mockStore
value *logproto.VolumeResponse
Expand Down
103 changes: 0 additions & 103 deletions pkg/util/limiter/query_limiter.go

This file was deleted.