Skip to content

[release-2.8.x] caching: do not try to fill the gap in log results cache when the new query interval does not overlap the cached query interval #9783

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 2 commits into
base: release-2.8.x
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,10 @@
##### Fixes

* [9629](https://github.com/grafana/loki/pull/9629) **periklis**: Fix duplicate label values from ingester streams.
* [9757](https://github.com/grafana/loki/pull/9757) **sandeepsukhani**: Frontend Caching: Fix a bug in negative logs results cache causing Loki to unexpectedly send empty/incorrect results.
* [9754](https://github.com/grafana/loki/pull/9754) **ashwanthgoli**: Fixes an issue with indexes becoming unqueriable if the index prefix is different from the one configured in the latest period config.
* [9763](https://github.com/grafana/loki/pull/9763) **ssncferreira**: Fix the logic of the `offset` operator for downstream queries on instant query splitting of (range) vector aggregation expressions containing an offset.
* [9773](https://github.com/grafana/loki/pull/9773) **ssncferreira**: Fix instant query summary statistic's `splits` corresponding to the number of subqueries a query is split into based on `split_queries_by_interval`.

##### Changes

Expand Down
154 changes: 90 additions & 64 deletions pkg/querier/queryrange/log_result_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -185,77 +185,95 @@ func (l *logResultCache) handleHit(ctx context.Context, cacheKey string, cachedR
if cachedRequest.StartTs.UnixNano() <= lokiReq.StartTs.UnixNano() && cachedRequest.EndTs.UnixNano() >= lokiReq.EndTs.UnixNano() {
return result, nil
}
// we could be missing data at the start and the end.
// so we're going to fetch what is missing.
var (
startRequest, endRequest *LokiRequest
startResp, endResp *LokiResponse
updateCache bool
ok bool
)
g, ctx := errgroup.WithContext(ctx)

// if we're missing data at the start, start fetching from the start to the cached start.
if lokiReq.GetStartTs().Before(cachedRequest.GetStartTs()) {
g.Go(func() error {
startRequest = lokiReq.WithStartEndTime(lokiReq.GetStartTs(), cachedRequest.GetStartTs())
resp, err := l.next.Do(ctx, startRequest)
if err != nil {
return err
}
startResp, ok = resp.(*LokiResponse)
if !ok {
return fmt.Errorf("unexpected response type %T", resp)
}
return nil
})
}

// if we're missing data at the end, start fetching from the cached end to the end.
if lokiReq.GetEndTs().After(cachedRequest.GetEndTs()) {
g.Go(func() error {
endRequest = lokiReq.WithStartEndTime(cachedRequest.GetEndTs(), lokiReq.GetEndTs())
resp, err := l.next.Do(ctx, endRequest)
if err != nil {
return err
}
endResp, ok = resp.(*LokiResponse)
if !ok {
return fmt.Errorf("unexpected response type %T", resp)
}
return nil
})
}

if err := g.Wait(); err != nil {
return nil, err
}
updateCache := false
// if the query does not overlap cached interval, do not try to fill the gap since it requires extending the queries beyond what is requested in the query.
// Extending the queries beyond what is requested could result in empty responses due to response limit set in the queries.
if !overlap(lokiReq.StartTs, lokiReq.EndTs, cachedRequest.StartTs, cachedRequest.EndTs) {
resp, err := l.next.Do(ctx, lokiReq)
if err != nil {
return nil, err
}
result = resp.(*LokiResponse)

// if we have data at the start, we need to merge it with the cached data if it's empty and update the cache.
// If it's not empty only merge the response.
if startResp != nil {
if isEmpty(startResp) {
cachedRequest = cachedRequest.WithStartEndTime(startRequest.GetStartTs(), cachedRequest.GetEndTs())
// if the response is empty and the query is larger than what is cached, update the cache
if isEmpty(result) && (lokiReq.EndTs.UnixNano()-lokiReq.StartTs.UnixNano() > cachedRequest.EndTs.UnixNano()-cachedRequest.StartTs.UnixNano()) {
cachedRequest = cachedRequest.WithStartEndTime(lokiReq.GetStartTs(), lokiReq.GetEndTs())
updateCache = true
} else {
if startResp.Status != loghttp.QueryStatusSuccess {
return startResp, nil
}
} else {
// we could be missing data at the start and the end.
// so we're going to fetch what is missing.
var (
startRequest, endRequest *LokiRequest
startResp, endResp *LokiResponse
)
g, ctx := errgroup.WithContext(ctx)

// if we're missing data at the start, start fetching from the start to the cached start.
if lokiReq.GetStartTs().Before(cachedRequest.GetStartTs()) {
g.Go(func() error {
startRequest = lokiReq.WithStartEndTime(lokiReq.GetStartTs(), cachedRequest.GetStartTs())
resp, err := l.next.Do(ctx, startRequest)
if err != nil {
return err
}
var ok bool
startResp, ok = resp.(*LokiResponse)
if !ok {
return fmt.Errorf("unexpected response type %T", resp)
}
return nil
})
}

// if we're missing data at the end, start fetching from the cached end to the end.
if lokiReq.GetEndTs().After(cachedRequest.GetEndTs()) {
g.Go(func() error {
endRequest = lokiReq.WithStartEndTime(cachedRequest.GetEndTs(), lokiReq.GetEndTs())
resp, err := l.next.Do(ctx, endRequest)
if err != nil {
return err
}
var ok bool
endResp, ok = resp.(*LokiResponse)
if !ok {
return fmt.Errorf("unexpected response type %T", resp)
}
return nil
})
}

if err := g.Wait(); err != nil {
return nil, err
}

// if we have data at the start, we need to merge it with the cached data if it's empty and update the cache.
// If it's not empty only merge the response.
if startResp != nil {
if isEmpty(startResp) {
cachedRequest = cachedRequest.WithStartEndTime(startRequest.GetStartTs(), cachedRequest.GetEndTs())
updateCache = true
} else {
if startResp.Status != loghttp.QueryStatusSuccess {
return startResp, nil
}
result = mergeLokiResponse(startResp, result)
}
result = mergeLokiResponse(extractLokiResponse(lokiReq.GetStartTs(), lokiReq.GetEndTs(), startResp), result)
}
}

// if we have data at the end, we need to merge it with the cached data if it's empty and update the cache.
// If it's not empty only merge the response.
if endResp != nil {
if isEmpty(endResp) {
cachedRequest = cachedRequest.WithStartEndTime(cachedRequest.GetStartTs(), endRequest.GetEndTs())
updateCache = true
} else {
if endResp.Status != loghttp.QueryStatusSuccess {
return endResp, nil
// if we have data at the end, we need to merge it with the cached data if it's empty and update the cache.
// If it's not empty only merge the response.
if endResp != nil {
if isEmpty(endResp) {
cachedRequest = cachedRequest.WithStartEndTime(cachedRequest.GetStartTs(), endRequest.GetEndTs())
updateCache = true
} else {
if endResp.Status != loghttp.QueryStatusSuccess {
return endResp, nil
}
result = mergeLokiResponse(endResp, result)
}
result = mergeLokiResponse(extractLokiResponse(lokiReq.GetStartTs(), lokiReq.GetEndTs(), endResp), result)
}
}

Expand Down Expand Up @@ -331,3 +349,11 @@ func emptyResponse(lokiReq *LokiRequest) *LokiResponse {
},
}
}

func overlap(aFrom, aThrough, bFrom, bThrough time.Time) bool {
if aFrom.After(bThrough) || bFrom.After(aThrough) {
return false
}

return true
}
104 changes: 79 additions & 25 deletions pkg/querier/queryrange/log_result_cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ import (
"time"

"github.com/go-kit/log"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/testutil"
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/require"
"github.com/weaveworks/common/user"
Expand Down Expand Up @@ -434,38 +436,54 @@ func Test_LogResultCacheDifferentRangeNonEmptyAndEmpty(t *testing.T) {
fake.AssertExpectations(t)
}

func Test_LogResultFillingGap(t *testing.T) {
// Test_LogResultNonOverlappingCache tests the scenario where the cached query does not overlap with the new request
func Test_LogResultNonOverlappingCache(t *testing.T) {
metrics := NewLogResultCacheMetrics(prometheus.NewPedanticRegistry())
mockCache := cache.NewMockCache()
var (
ctx = user.InjectOrgID(context.Background(), "foo")
lrc = NewLogResultCache(
log.NewNopLogger(),
fakeLimits{
splits: map[string]time.Duration{"foo": time.Minute},
},
cache.NewMockCache(),
nil,
mockCache,
nil,
nil,
metrics,
)
)

checkCacheMetrics := func(expectedHits, expectedMisses int) {
require.Equal(t, float64(expectedHits), testutil.ToFloat64(metrics.CacheHit))
require.Equal(t, float64(expectedMisses), testutil.ToFloat64(metrics.CacheMiss))
}

// data requested for just 1 sec, resulting in empty response
req1 := &LokiRequest{
StartTs: time.Unix(0, time.Minute.Nanoseconds()+30*time.Second.Nanoseconds()),
EndTs: time.Unix(0, time.Minute.Nanoseconds()+31*time.Second.Nanoseconds()),
Limit: entriesLimit,
}

// data requested for just 1 sec, within the same split but couple seconds apart
// data requested for just 1 sec(non-overlapping), resulting in empty response
req2 := &LokiRequest{
StartTs: time.Unix(0, time.Minute.Nanoseconds()+35*time.Second.Nanoseconds()),
EndTs: time.Unix(0, time.Minute.Nanoseconds()+36*time.Second.Nanoseconds()),
StartTs: time.Unix(0, time.Minute.Nanoseconds()+24*time.Second.Nanoseconds()),
EndTs: time.Unix(0, time.Minute.Nanoseconds()+25*time.Second.Nanoseconds()),
Limit: entriesLimit,
}

// data requested for larger interval than req1(overlapping with req2), returns empty response
req3 := &LokiRequest{
StartTs: time.Unix(0, time.Minute.Nanoseconds()+25*time.Second.Nanoseconds()),
EndTs: time.Unix(0, time.Minute.Nanoseconds()+26*time.Second.Nanoseconds()),
StartTs: time.Unix(0, time.Minute.Nanoseconds()+24*time.Second.Nanoseconds()),
EndTs: time.Unix(0, time.Minute.Nanoseconds()+29*time.Second.Nanoseconds()),
Limit: entriesLimit,
}

// data requested for larger interval than req3(non-overlapping), returns non-empty response
req4 := &LokiRequest{
StartTs: time.Unix(0, time.Minute.Nanoseconds()+10*time.Second.Nanoseconds()),
EndTs: time.Unix(0, time.Minute.Nanoseconds()+20*time.Second.Nanoseconds()),
Limit: entriesLimit,
}

Expand All @@ -476,34 +494,49 @@ func Test_LogResultFillingGap(t *testing.T) {
Response: emptyResponse(req1),
},
},
// partial request being made for missing interval at the end
// req2 should do query for just its query range and should not update the cache
{
RequestResponse: queryrangebase.RequestResponse{
Request: &LokiRequest{
StartTs: time.Unix(0, time.Minute.Nanoseconds()+31*time.Second.Nanoseconds()),
EndTs: time.Unix(0, time.Minute.Nanoseconds()+36*time.Second.Nanoseconds()),
StartTs: time.Unix(0, time.Minute.Nanoseconds()+24*time.Second.Nanoseconds()),
EndTs: time.Unix(0, time.Minute.Nanoseconds()+25*time.Second.Nanoseconds()),
Limit: entriesLimit,
},
Response: nonEmptyResponse(&LokiRequest{
StartTs: time.Unix(0, time.Minute.Nanoseconds()+31*time.Second.Nanoseconds()),
EndTs: time.Unix(0, time.Minute.Nanoseconds()+36*time.Second.Nanoseconds()),
Response: emptyResponse(&LokiRequest{
StartTs: time.Unix(0, time.Minute.Nanoseconds()+24*time.Second.Nanoseconds()),
EndTs: time.Unix(0, time.Minute.Nanoseconds()+25*time.Second.Nanoseconds()),
Limit: entriesLimit,
}, time.Unix(31, 0), time.Unix(34, 0), lblFooBar), // data not present for actual query interval i.e req2
}),
},
},
// partial request being made for missing interval at the beginning
// req3 should do query for just its query range and should update the cache
{
RequestResponse: queryrangebase.RequestResponse{
Request: &LokiRequest{
StartTs: time.Unix(0, time.Minute.Nanoseconds()+25*time.Second.Nanoseconds()),
EndTs: time.Unix(0, time.Minute.Nanoseconds()+30*time.Second.Nanoseconds()),
StartTs: time.Unix(0, time.Minute.Nanoseconds()+24*time.Second.Nanoseconds()),
EndTs: time.Unix(0, time.Minute.Nanoseconds()+29*time.Second.Nanoseconds()),
Limit: entriesLimit,
},
Response: emptyResponse(&LokiRequest{
StartTs: time.Unix(0, time.Minute.Nanoseconds()+24*time.Second.Nanoseconds()),
EndTs: time.Unix(0, time.Minute.Nanoseconds()+29*time.Second.Nanoseconds()),
Limit: entriesLimit,
}),
},
},
// req4 should do query for its query range. Data would be non-empty so cache should not be updated
{
RequestResponse: queryrangebase.RequestResponse{
Request: &LokiRequest{
StartTs: time.Unix(0, time.Minute.Nanoseconds()+10*time.Second.Nanoseconds()),
EndTs: time.Unix(0, time.Minute.Nanoseconds()+20*time.Second.Nanoseconds()),
Limit: entriesLimit,
},
Response: nonEmptyResponse(&LokiRequest{
StartTs: time.Unix(0, time.Minute.Nanoseconds()+25*time.Second.Nanoseconds()),
EndTs: time.Unix(0, time.Minute.Nanoseconds()+30*time.Second.Nanoseconds()),
StartTs: time.Unix(0, time.Minute.Nanoseconds()+10*time.Second.Nanoseconds()),
EndTs: time.Unix(0, time.Minute.Nanoseconds()+20*time.Second.Nanoseconds()),
Limit: entriesLimit,
}, time.Unix(27, 0), time.Unix(29, 0), lblFooBar), // data not present for actual query interval i.e req3
}, time.Unix(71, 0), time.Unix(79, 0), lblFooBar),
},
},
})
Expand All @@ -513,16 +546,37 @@ func Test_LogResultFillingGap(t *testing.T) {
resp, err := h.Do(ctx, req1)
require.NoError(t, err)
require.Equal(t, emptyResponse(req1), resp)
checkCacheMetrics(0, 1)
require.Equal(t, 1, mockCache.NumKeyUpdates())

// although the caching code would request for more data than the actual query, we should have empty response here since we
// do not have any data for the query we made
// req2 should not update the cache since it has same length as previously cached query
resp, err = h.Do(ctx, req2)
require.NoError(t, err)
require.Equal(t, mergeLokiResponse(emptyResponse(req1), emptyResponse(req2)), resp)
require.Equal(t, emptyResponse(req2), resp)
checkCacheMetrics(1, 1)
require.Equal(t, 1, mockCache.NumKeyUpdates())

// req3 should update the cache since it has larger length than previously cached query
resp, err = h.Do(ctx, req3)
require.NoError(t, err)
require.Equal(t, mergeLokiResponse(emptyResponse(req1), emptyResponse(req3)), resp)
require.Equal(t, emptyResponse(req3), resp)
checkCacheMetrics(2, 1)
require.Equal(t, 2, mockCache.NumKeyUpdates())

// req4 returns non-empty response so it should not update the cache
resp, err = h.Do(ctx, req4)
require.NoError(t, err)
require.Equal(t, nonEmptyResponse(req4, time.Unix(71, 0), time.Unix(79, 0), lblFooBar), resp)
checkCacheMetrics(3, 1)
require.Equal(t, 2, mockCache.NumKeyUpdates())

// req2 should return back empty response from the cache, without updating the cache
resp, err = h.Do(ctx, req2)
require.NoError(t, err)
require.Equal(t, emptyResponse(req2), resp)
checkCacheMetrics(4, 1)
require.Equal(t, 2, mockCache.NumKeyUpdates())

fake.AssertExpectations(t)
}

Expand Down
13 changes: 12 additions & 1 deletion pkg/storage/chunk/cache/mock.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,13 @@ import (
"github.com/grafana/loki/pkg/logqlmodel/stats"
)

type MockCache interface {
Cache
NumKeyUpdates() int
}

type mockCache struct {
numKeyUpdates int
sync.Mutex
cache map[string][]byte
}
Expand All @@ -17,6 +23,7 @@ func (m *mockCache) Store(_ context.Context, keys []string, bufs [][]byte) error
defer m.Unlock()
for i := range keys {
m.cache[keys[i]] = bufs[i]
m.numKeyUpdates++
}
return nil
}
Expand All @@ -43,8 +50,12 @@ func (m *mockCache) GetCacheType() stats.CacheType {
return "mock"
}

func (m *mockCache) NumKeyUpdates() int {
return m.numKeyUpdates
}

// NewMockCache makes a new MockCache.
func NewMockCache() Cache {
func NewMockCache() MockCache {
return &mockCache{
cache: map[string][]byte{},
}
Expand Down