Skip to content

Commit 6c33809

Browse files
feat: split detected fields queries (#12491)
1 parent a8b172b commit 6c33809

File tree

16 files changed

+781
-191
lines changed

16 files changed

+781
-191
lines changed

‎cmd/loki/loki-local-config.yaml

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,9 @@ schema_config:
3636
ruler:
3737
alertmanager_url: http://localhost:9093
3838

39+
frontend:
40+
encoding: protobuf
41+
3942
# By default, Loki will send anonymous, but uniquely-identifiable usage and configuration
4043
# analytics to Grafana Labs. These statistics are sent to https://stats.grafana.org/
4144
#

‎pkg/ingester/ingester.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1365,7 +1365,7 @@ func adjustQueryStartTime(maxLookBackPeriod time.Duration, start, now time.Time)
13651365
return start
13661366
}
13671367

1368-
func (i *Ingester) GetDetectedFields(_ context.Context, _ *logproto.DetectedFieldsRequest) (*logproto.DetectedFieldsResponse, error) {
1368+
func (i *Ingester) GetDetectedFields(_ context.Context, r *logproto.DetectedFieldsRequest) (*logproto.DetectedFieldsResponse, error) {
13691369
return &logproto.DetectedFieldsResponse{
13701370
Fields: []*logproto.DetectedField{
13711371
{
@@ -1374,6 +1374,7 @@ func (i *Ingester) GetDetectedFields(_ context.Context, _ *logproto.DetectedFiel
13741374
Cardinality: 1,
13751375
},
13761376
},
1377+
FieldLimit: r.GetFieldLimit(),
13771378
}, nil
13781379
}
13791380

‎pkg/logproto/logproto.pb.go

Lines changed: 266 additions & 165 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

‎pkg/logproto/logproto.proto

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -452,12 +452,16 @@ message DetectedFieldsRequest {
452452

453453
message DetectedFieldsResponse {
454454
repeated DetectedField fields = 1;
455+
uint32 fieldLimit = 2;
455456
}
456457

458+
// TODO: make the detected field include the serialized sketch
459+
// we only want cardinality in the JSON response
457460
message DetectedField {
458461
string label = 1;
459462
string type = 2 [(gogoproto.casttype) = "DetectedFieldType"];
460463
uint64 cardinality = 3;
464+
bytes sketch = 4 [(gogoproto.jsontag) = "-"]; //serialized hyperloglog sketch
461465
}
462466

463467
message DetectedLabelsRequest {

‎pkg/loki/modules.go

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1085,7 +1085,6 @@ func (t *Loki) initQueryFrontend() (_ services.Service, err error) {
10851085
t.Server.HTTP.Path("/loki/api/v1/labels").Methods("GET", "POST").Handler(frontendHandler)
10861086
t.Server.HTTP.Path("/loki/api/v1/label/{name}/values").Methods("GET", "POST").Handler(frontendHandler)
10871087
t.Server.HTTP.Path("/loki/api/v1/series").Methods("GET", "POST").Handler(frontendHandler)
1088-
t.Server.HTTP.Path("/loki/api/v1/detected_fields").Methods("GET", "POST").Handler(frontendHandler)
10891088
t.Server.HTTP.Path("/loki/api/v1/patterns").Methods("GET", "POST").Handler(frontendHandler)
10901089
t.Server.HTTP.Path("/loki/api/v1/detected_labels").Methods("GET", "POST").Handler(frontendHandler)
10911090
t.Server.HTTP.Path("/loki/api/v1/index/stats").Methods("GET", "POST").Handler(frontendHandler)
@@ -1105,6 +1104,12 @@ func (t *Loki) initQueryFrontend() (_ services.Service, err error) {
11051104
t.Server.HTTP.Path("/api/prom/tail").Methods("GET", "POST").Handler(defaultHandler)
11061105
}
11071106

1107+
// We don't marshal the hyperloglog sketch in the detected fields response to JSON, so this endpoint
1108+
// only works correctly in V2 with protobuf encoding enabled.
1109+
if frontendV2 != nil && frontendV2.IsProtobufEncoded() {
1110+
t.Server.HTTP.Path("/loki/api/v1/detected_fields").Methods("GET", "POST").Handler(frontendHandler)
1111+
}
1112+
11081113
if t.frontend == nil {
11091114
return services.NewIdleService(nil, func(_ error) error {
11101115
if t.stopper != nil {

‎pkg/lokifrontend/frontend/v2/frontend.go

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -446,6 +446,14 @@ func (f *Frontend) CheckReady(_ context.Context) error {
446446
return errors.New(msg)
447447
}
448448

449+
func (f *Frontend) IsProtobufEncoded() bool {
450+
return f.cfg.Encoding == EncodingProtobuf
451+
}
452+
453+
func (f *Frontend) IsJSONEncoded() bool {
454+
return f.cfg.Encoding == EncodingJSON
455+
}
456+
449457
const stripeSize = 1 << 6
450458

451459
type requestsInProgress struct {

‎pkg/querier/http.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -386,7 +386,8 @@ func (q *QuerierAPI) DetectedFieldsHandler(ctx context.Context, req *logproto.De
386386
"msg", "queried store for detected fields that does not support it, no response from querier.DetectedFields",
387387
)
388388
return &logproto.DetectedFieldsResponse{
389-
Fields: []*logproto.DetectedField{},
389+
Fields: []*logproto.DetectedField{},
390+
FieldLimit: req.GetFieldLimit(),
390391
}, nil
391392
}
392393
return resp, nil

‎pkg/querier/multi_tenant_querier.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -278,7 +278,8 @@ func (q *MultiTenantQuerier) DetectedFields(ctx context.Context, req *logproto.D
278278
)
279279

280280
return &logproto.DetectedFieldsResponse{
281-
Fields: []*logproto.DetectedField{},
281+
Fields: []*logproto.DetectedField{},
282+
FieldLimit: req.GetFieldLimit(),
282283
}, nil
283284
}
284285

‎pkg/querier/querier.go

Lines changed: 25 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1018,27 +1018,39 @@ func (q *SingleTenantQuerier) DetectedFields(ctx context.Context, req *logproto.
10181018

10191019
// TODO(twhitney): converting from a step to a duration should be abstracted and reused,
10201020
// doing this in a few places now.
1021-
streams, err := streamsForFieldDetection(iters, req.LineLimit, time.Duration(req.Step*1e6))
1021+
streams, err := streamsForFieldDetection(iters, req.LineLimit, time.Duration(req.Step))
10221022
if err != nil {
10231023
return nil, err
10241024
}
10251025

10261026
detectedFields := parseDetectedFields(ctx, req.FieldLimit, streams)
10271027

1028+
//TODO: detected field needs to contain the sketch
1029+
// make sure response to frontend is GRPC
1030+
//only want cardinality in JSON
10281031
fields := make([]*logproto.DetectedField, len(detectedFields))
10291032
fieldCount := 0
10301033
for k, v := range detectedFields {
1034+
sketch, err := v.sketch.MarshalBinary()
1035+
if err != nil {
1036+
level.Warn(q.logger).Log("msg", "failed to marshal hyperloglog sketch", "err", err)
1037+
continue
1038+
}
1039+
10311040
fields[fieldCount] = &logproto.DetectedField{
10321041
Label: k,
10331042
Type: v.fieldType,
10341043
Cardinality: v.Estimate(),
1044+
Sketch: sketch,
10351045
}
10361046

10371047
fieldCount++
10381048
}
10391049

1050+
//TODO: detected fields response needs to include the sketch
10401051
return &logproto.DetectedFieldsResponse{
1041-
Fields: fields,
1052+
Fields: fields,
1053+
FieldLimit: req.GetFieldLimit(),
10421054
}, nil
10431055
}
10441056

@@ -1064,6 +1076,10 @@ func (p *parsedFields) Estimate() uint64 {
10641076
return p.sketch.Estimate()
10651077
}
10661078

1079+
func (p *parsedFields) Marshal() ([]byte, error) {
1080+
return p.sketch.MarshalBinary()
1081+
}
1082+
10671083
func (p *parsedFields) DetermineType(value string) {
10681084
p.fieldType = determineType(value)
10691085
p.isTypeDetected = true
@@ -1098,20 +1114,22 @@ func parseDetectedFields(ctx context.Context, limit uint32, streams logqlmodel.S
10981114
fieldCount := uint32(0)
10991115

11001116
for _, stream := range streams {
1101-
11021117
level.Debug(spanlogger.FromContext(ctx)).Log(
11031118
"detected_fields", "true",
11041119
"msg", fmt.Sprintf("looking for detected fields in stream %d with %d lines", stream.Hash, len(stream.Entries)))
11051120

11061121
for _, entry := range stream.Entries {
11071122
detected := parseLine(entry.Line)
11081123
for k, vals := range detected {
1109-
if fieldCount >= limit {
1110-
return detectedFields
1124+
df, ok := detectedFields[k]
1125+
if !ok && fieldCount < limit {
1126+
df = newParsedFields()
1127+
detectedFields[k] = df
1128+
fieldCount++
11111129
}
11121130

1113-
if _, ok := detectedFields[k]; !ok {
1114-
detectedFields[k] = newParsedFields()
1131+
if df == nil {
1132+
continue
11151133
}
11161134

11171135
for _, v := range vals {
@@ -1126,8 +1144,6 @@ func parseDetectedFields(ctx context.Context, limit uint32, streams logqlmodel.S
11261144
level.Debug(spanlogger.FromContext(ctx)).Log(
11271145
"detected_fields", "true",
11281146
"msg", fmt.Sprintf("detected field %s with %d values", k, len(vals)))
1129-
1130-
fieldCount++
11311147
}
11321148
}
11331149
}

‎pkg/querier/queryrange/codec.go

Lines changed: 97 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ import (
1717
"golang.org/x/exp/maps"
1818

1919
"github.com/grafana/loki/v3/pkg/storage/chunk/cache/resultscache"
20+
"github.com/grafana/loki/v3/pkg/storage/detected"
2021
"github.com/grafana/loki/v3/pkg/storage/stores/index/seriesvolume"
2122

2223
"github.com/grafana/dskit/httpgrpc"
@@ -972,9 +973,12 @@ func (c Codec) EncodeRequest(ctx context.Context, r queryrangebase.Request) (*ht
972973
return req.WithContext(ctx), nil
973974
case *DetectedFieldsRequest:
974975
params := url.Values{
975-
"start": []string{fmt.Sprintf("%d", request.Start.UnixNano())},
976-
"end": []string{fmt.Sprintf("%d", request.End.UnixNano())},
977-
"query": []string{request.GetQuery()},
976+
"query": []string{request.GetQuery()},
977+
"start": []string{fmt.Sprintf("%d", request.Start.UnixNano())},
978+
"end": []string{fmt.Sprintf("%d", request.End.UnixNano())},
979+
"line_limit": []string{fmt.Sprintf("%d", request.GetLineLimit())},
980+
"field_limit": []string{fmt.Sprintf("%d", request.GetFieldLimit())},
981+
"step": []string{fmt.Sprintf("%d", request.GetStep())},
978982
}
979983

980984
u := &url.URL{
@@ -1587,6 +1591,29 @@ func (Codec) MergeResponse(responses ...queryrangebase.Response) (queryrangebase
15871591
Response: seriesvolume.Merge(resps, resp0.Response.Limit),
15881592
Headers: headers,
15891593
}, nil
1594+
case *DetectedFieldsResponse:
1595+
resp0 := responses[0].(*DetectedFieldsResponse)
1596+
headers := resp0.Headers
1597+
fieldLimit := resp0.Response.GetFieldLimit()
1598+
1599+
fields := []*logproto.DetectedField{}
1600+
for _, r := range responses {
1601+
fields = append(fields, r.(*DetectedFieldsResponse).Response.Fields...)
1602+
}
1603+
1604+
mergedFields, err := detected.MergeFields(fields, fieldLimit)
1605+
1606+
if err != nil {
1607+
return nil, err
1608+
}
1609+
1610+
return &DetectedFieldsResponse{
1611+
Response: &logproto.DetectedFieldsResponse{
1612+
Fields: mergedFields,
1613+
FieldLimit: fieldLimit,
1614+
},
1615+
Headers: headers,
1616+
}, nil
15901617
default:
15911618
return nil, fmt.Errorf("unknown response type (%T) in merging responses", responses[0])
15921619
}
@@ -1781,8 +1808,12 @@ func ParamsFromRequest(req queryrangebase.Request) (logql.Params, error) {
17811808
return &paramsStatsWrapper{
17821809
IndexStatsRequest: r,
17831810
}, nil
1811+
case *DetectedFieldsRequest:
1812+
return &paramsDetectedFieldsWrapper{
1813+
DetectedFieldsRequest: r,
1814+
}, nil
17841815
default:
1785-
return nil, fmt.Errorf("expected one of the *LokiRequest, *LokiInstantRequest, *LokiSeriesRequest, *LokiLabelNamesRequest, got (%T)", r)
1816+
return nil, fmt.Errorf("expected one of the *LokiRequest, *LokiInstantRequest, *LokiSeriesRequest, *LokiLabelNamesRequest, *DetectedFieldsRequest, got (%T)", r)
17861817
}
17871818
}
17881819

@@ -1950,6 +1981,47 @@ func (p paramsStatsWrapper) Shards() []string {
19501981
return make([]string, 0)
19511982
}
19521983

1984+
type paramsDetectedFieldsWrapper struct {
1985+
*DetectedFieldsRequest
1986+
}
1987+
1988+
func (p paramsDetectedFieldsWrapper) QueryString() string {
1989+
return p.GetQuery()
1990+
}
1991+
1992+
func (p paramsDetectedFieldsWrapper) GetExpression() syntax.Expr {
1993+
expr, err := syntax.ParseExpr(p.GetQuery())
1994+
if err != nil {
1995+
return nil
1996+
}
1997+
1998+
return expr
1999+
}
2000+
2001+
func (p paramsDetectedFieldsWrapper) Start() time.Time {
2002+
return p.GetStartTs()
2003+
}
2004+
2005+
func (p paramsDetectedFieldsWrapper) End() time.Time {
2006+
return p.GetEndTs()
2007+
}
2008+
2009+
func (p paramsDetectedFieldsWrapper) Step() time.Duration {
2010+
return time.Duration(p.GetStep() * 1e6)
2011+
}
2012+
2013+
func (p paramsDetectedFieldsWrapper) Interval() time.Duration {
2014+
return 0
2015+
}
2016+
2017+
func (p paramsDetectedFieldsWrapper) Direction() logproto.Direction {
2018+
return logproto.BACKWARD
2019+
}
2020+
func (p paramsDetectedFieldsWrapper) Limit() uint32 { return p.DetectedFieldsRequest.LineLimit }
2021+
func (p paramsDetectedFieldsWrapper) Shards() []string {
2022+
return make([]string, 0)
2023+
}
2024+
19532025
func httpResponseHeadersToPromResponseHeaders(httpHeaders http.Header) []queryrangebase.PrometheusResponseHeader {
19542026
var promHeaders []queryrangebase.PrometheusResponseHeader
19552027
for h, hv := range httpHeaders {
@@ -2071,12 +2143,15 @@ type DetectedFieldsRequest struct {
20712143
path string
20722144
}
20732145

2074-
func NewDetectedFieldsRequest(start, end time.Time, query, path string) *DetectedFieldsRequest {
2146+
func NewDetectedFieldsRequest(start, end time.Time, lineLimit, fieldLimit uint32, step int64, query, path string) *DetectedFieldsRequest {
20752147
return &DetectedFieldsRequest{
20762148
DetectedFieldsRequest: logproto.DetectedFieldsRequest{
2077-
Start: start,
2078-
End: end,
2079-
Query: query,
2149+
Start: start,
2150+
End: end,
2151+
Query: query,
2152+
LineLimit: lineLimit,
2153+
FieldLimit: fieldLimit,
2154+
Step: step,
20802155
},
20812156
path: path,
20822157
}
@@ -2103,7 +2178,15 @@ func (r *DetectedFieldsRequest) GetStartTs() time.Time {
21032178
}
21042179

21052180
func (r *DetectedFieldsRequest) GetStep() int64 {
2106-
return 0
2181+
return r.Step
2182+
}
2183+
2184+
func (r *DetectedFieldsRequest) GetLineLimit() uint32 {
2185+
return r.LineLimit
2186+
}
2187+
2188+
func (r *DetectedFieldsRequest) GetFieldLimit() uint32 {
2189+
return r.FieldLimit
21072190
}
21082191

21092192
func (r *DetectedFieldsRequest) Path() string {
@@ -2132,6 +2215,11 @@ func (r *DetectedFieldsRequest) LogToSpan(sp opentracing.Span) {
21322215
sp.LogFields(
21332216
otlog.String("start", timestamp.Time(r.GetStart().UnixNano()).String()),
21342217
otlog.String("end", timestamp.Time(r.GetEnd().UnixNano()).String()),
2218+
otlog.String("query", r.GetQuery()),
2219+
otlog.Int64("step (ms)", r.GetStep()),
2220+
otlog.Int64("line_limit", int64(r.GetLineLimit())),
2221+
otlog.Int64("field_limit", int64(r.GetFieldLimit())),
2222+
otlog.String("step", fmt.Sprintf("%d", r.GetStep())),
21352223
)
21362224
}
21372225

0 commit comments

Comments
 (0)