Skip to content

Commit 5625464

Browse files
fix: Fix missing series in monolithic mode (#17067)
In monolithic mode, unlike other API endpoints (query/labels/...), the series endpoint did not retrieve data from storage for the time window between end of `index_cache_validity` and `query_store_max_look_back_period`, and therefore missing series in the response.
1 parent 940a067 commit 5625464

File tree

3 files changed

+211
-2
lines changed

3 files changed

+211
-2
lines changed

‎pkg/ingester/flush_test.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -509,6 +509,10 @@ func (s *testStore) Volume(_ context.Context, _ string, _, _ model.Time, _ int32
509509
return &logproto.VolumeResponse{}, nil
510510
}
511511

512+
func (s *testStore) Series(_ context.Context, _ logql.SelectLogParams) ([]logproto.SeriesIdentifier, error) {
513+
return nil, nil
514+
}
515+
512516
func pushTestSamples(t *testing.T, ing logproto.PusherServer) map[string][]logproto.Stream {
513517
userIDs := []string{"1", "2", "3"}
514518

‎pkg/ingester/ingester.go

Lines changed: 42 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1356,7 +1356,48 @@ func (i *Ingester) series(ctx context.Context, req *logproto.SeriesRequest) (*lo
13561356
if err != nil {
13571357
return nil, err
13581358
}
1359-
return instance.Series(ctx, req)
1359+
1360+
resp, err := instance.Series(ctx, req)
1361+
if err != nil {
1362+
return nil, err
1363+
}
1364+
1365+
if start, end, ok := buildStoreRequest(i.cfg, req.Start, req.End, time.Now()); ok {
1366+
var storeSeries []logproto.SeriesIdentifier
1367+
var parsed syntax.Expr
1368+
1369+
groups := []string{""}
1370+
if len(req.Groups) != 0 {
1371+
groups = req.Groups
1372+
}
1373+
1374+
for _, group := range groups {
1375+
if group != "" {
1376+
parsed, err = syntax.ParseExpr(group)
1377+
if err != nil {
1378+
return nil, err
1379+
}
1380+
}
1381+
storeSeries, err = i.store.SelectSeries(ctx, logql.SelectLogParams{
1382+
QueryRequest: &logproto.QueryRequest{
1383+
Selector: group,
1384+
Limit: 1,
1385+
Start: start,
1386+
End: end,
1387+
Direction: logproto.FORWARD,
1388+
Shards: req.Shards,
1389+
Plan: &plan.QueryPlan{
1390+
AST: parsed,
1391+
},
1392+
},
1393+
})
1394+
if err != nil {
1395+
return nil, err
1396+
}
1397+
resp.Series = append(resp.Series, storeSeries...)
1398+
}
1399+
}
1400+
return resp, nil
13601401
}
13611402

13621403
func (i *Ingester) GetStats(ctx context.Context, req *logproto.IndexStatsRequest) (*logproto.IndexStatsResponse, error) {

‎pkg/ingester/ingester_test.go

Lines changed: 165 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,7 @@ import (
5151
"github.com/grafana/loki/v3/pkg/storage/stores/index/stats"
5252
"github.com/grafana/loki/v3/pkg/storage/stores/shipper/indexshipper/tsdb/sharding"
5353
"github.com/grafana/loki/v3/pkg/util/constants"
54+
"github.com/grafana/loki/v3/pkg/util/marshal"
5455
"github.com/grafana/loki/v3/pkg/validation"
5556
)
5657

@@ -458,7 +459,14 @@ func (s *mockStore) SelectSamples(_ context.Context, _ logql.SelectSampleParams)
458459
return nil, nil
459460
}
460461

461-
func (s *mockStore) SelectSeries(_ context.Context, _ logql.SelectLogParams) ([]logproto.SeriesIdentifier, error) {
462+
func (s *mockStore) SelectSeries(_ context.Context, req logql.SelectLogParams) ([]logproto.SeriesIdentifier, error) {
463+
// NOTE: If the time range includes one hour before the current time, the return value is returned.
464+
thresTime := time.Now().Add(-1 * time.Hour)
465+
if !thresTime.Before(req.Start) && !thresTime.After(req.End) {
466+
return []logproto.SeriesIdentifier{
467+
{Labels: mustParseLabels(`{a="11",c="33"}`)},
468+
}, nil
469+
}
462470
return nil, nil
463471
}
464472

@@ -1390,6 +1398,141 @@ func TestVolume(t *testing.T) {
13901398
})
13911399
}
13921400

1401+
func Test_Series(t *testing.T) {
1402+
ingesterConfig := defaultIngesterTestConfig(t)
1403+
limits, err := validation.NewOverrides(defaultLimitsTestConfig(), nil)
1404+
require.NoError(t, err)
1405+
1406+
store := &mockStore{
1407+
chunks: map[string][]chunk.Chunk{},
1408+
}
1409+
1410+
mockRing := mockReadRingWithOneActiveIngester()
1411+
1412+
i, err := New(ingesterConfig, client.Config{}, store, limits, runtime.DefaultTenantConfigs(), nil, writefailures.Cfg{}, constants.Loki, log.NewNopLogger(), nil, mockRing, nil)
1413+
require.NoError(t, err)
1414+
defer services.StopAndAwaitTerminated(context.Background(), i) //nolint:errcheck
1415+
1416+
ctx := user.InjectOrgID(context.Background(), "test")
1417+
t.Run("only in-memory series", func(t *testing.T) {
1418+
i.cfg.QueryStore = false
1419+
req := logproto.PushRequest{
1420+
Streams: []logproto.Stream{
1421+
{
1422+
Labels: `{a="11",b="22"}`,
1423+
},
1424+
{
1425+
Labels: `{a="11",c="33"}`,
1426+
},
1427+
},
1428+
}
1429+
for i := 0; i < 10; i++ {
1430+
req.Streams[0].Entries = append(req.Streams[0].Entries, logproto.Entry{
1431+
Timestamp: time.Unix(0, 0),
1432+
Line: fmt.Sprintf("line %d", i),
1433+
})
1434+
req.Streams[1].Entries = append(req.Streams[1].Entries, logproto.Entry{
1435+
Timestamp: time.Unix(0, 0),
1436+
Line: fmt.Sprintf("line %d", i),
1437+
})
1438+
}
1439+
_, err = i.Push(ctx, &req)
1440+
require.NoError(t, err)
1441+
1442+
res, err := i.Series(ctx, &logproto.SeriesRequest{
1443+
Start: time.Unix(0, 0),
1444+
End: time.Unix(1, 0),
1445+
Groups: []string{`{a="11"}`},
1446+
})
1447+
1448+
expectedSeries := []logproto.SeriesIdentifier{
1449+
{Labels: mustParseLabels(`{a="11", b="22"}`)},
1450+
{Labels: mustParseLabels(`{a="11", c="33"}`)},
1451+
}
1452+
// ignore order
1453+
sortLabels(res.Series)
1454+
sortLabels(expectedSeries)
1455+
1456+
require.NoError(t, err)
1457+
require.ElementsMatch(t, expectedSeries, res.Series)
1458+
})
1459+
1460+
t.Run("in-memory and storage (If data exists in storage)", func(t *testing.T) {
1461+
i.cfg.QueryStore = true
1462+
req := logproto.PushRequest{
1463+
Streams: []logproto.Stream{
1464+
{
1465+
Labels: `{a="11",b="22"}`,
1466+
},
1467+
},
1468+
}
1469+
for i := 0; i < 10; i++ {
1470+
req.Streams[0].Entries = append(req.Streams[0].Entries, logproto.Entry{
1471+
Timestamp: time.Now(),
1472+
Line: fmt.Sprintf("line %d", i),
1473+
},
1474+
)
1475+
}
1476+
1477+
_, err = i.Push(ctx, &req)
1478+
require.NoError(t, err)
1479+
1480+
res, err := i.Series(ctx, &logproto.SeriesRequest{
1481+
Start: time.Now().Add(-20 * time.Hour),
1482+
End: time.Now(),
1483+
Groups: []string{`{a="11"}`},
1484+
})
1485+
1486+
expectedSeries := []logproto.SeriesIdentifier{
1487+
{Labels: mustParseLabels(`{a="11", b="22"}`)},
1488+
{Labels: mustParseLabels(`{a="11", c="33"}`)},
1489+
}
1490+
// ignore order
1491+
sortLabels(res.Series)
1492+
sortLabels(expectedSeries)
1493+
1494+
require.NoError(t, err)
1495+
require.ElementsMatch(t, expectedSeries, res.Series)
1496+
})
1497+
1498+
t.Run("in-memory and storage (If no data exists in storage)", func(t *testing.T) {
1499+
i.cfg.QueryStore = true
1500+
req := logproto.PushRequest{
1501+
Streams: []logproto.Stream{
1502+
{
1503+
Labels: `{a="11",b="22"}`,
1504+
},
1505+
},
1506+
}
1507+
for i := 0; i < 10; i++ {
1508+
req.Streams[0].Entries = append(req.Streams[0].Entries, logproto.Entry{
1509+
Timestamp: time.Now(),
1510+
Line: fmt.Sprintf("line %d", i),
1511+
},
1512+
)
1513+
}
1514+
1515+
_, err = i.Push(ctx, &req)
1516+
require.NoError(t, err)
1517+
1518+
res, err := i.Series(ctx, &logproto.SeriesRequest{
1519+
Start: time.Now().Add(-30 * time.Minute),
1520+
End: time.Now(),
1521+
Groups: []string{`{a="11"}`},
1522+
})
1523+
1524+
expectedSeries := []logproto.SeriesIdentifier{
1525+
{Labels: mustParseLabels(`{a="11", b="22"}`)},
1526+
}
1527+
// ignore order
1528+
sortLabels(res.Series)
1529+
sortLabels(expectedSeries)
1530+
1531+
require.NoError(t, err)
1532+
require.ElementsMatch(t, expectedSeries, res.Series)
1533+
})
1534+
}
1535+
13931536
type ingesterClient struct {
13941537
logproto.PusherClient
13951538
logproto.QuerierClient
@@ -1662,3 +1805,24 @@ func TestUpdateOwnedStreams(t *testing.T) {
16621805
ownedStreams := i.instances["test"].ownedStreamsSvc.getOwnedStreamCount()
16631806
require.Equal(t, 8, ownedStreams)
16641807
}
1808+
1809+
func mustParseLabels(s string) []logproto.SeriesIdentifier_LabelsEntry {
1810+
l, err := marshal.NewLabelSet(s)
1811+
if err != nil {
1812+
panic(fmt.Sprintf("Failed to parse %s", s))
1813+
}
1814+
1815+
result := make([]logproto.SeriesIdentifier_LabelsEntry, 0, len(l))
1816+
for k, v := range l {
1817+
result = append(result, logproto.SeriesIdentifier_LabelsEntry{Key: k, Value: v})
1818+
}
1819+
return result
1820+
}
1821+
1822+
func sortLabels(series []logproto.SeriesIdentifier) {
1823+
for i := range series {
1824+
sort.SliceStable(series[i].Labels, func(j, k int) bool {
1825+
return series[i].Labels[j].Key < series[i].Labels[k].Key
1826+
})
1827+
}
1828+
}

0 commit comments

Comments
 (0)