Skip to content

Commit 7b53f20

Browse files
jeschkiescstyan
andauthored
feat: Introduce shardable probabilistic topk for instant queries. (#14243)
Signed-off-by: Callum Styan <callumstyan@gmail.com> Co-authored-by: Callum Styan <callumstyan@gmail.com>
1 parent 1c993f9 commit 7b53f20

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

46 files changed

+3153
-961
lines changed

‎docs/sources/shared/configuration.md

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -130,6 +130,11 @@ querier_rf1:
130130
# CLI flag: -querier-rf1.engine.max-lookback-period
131131
[max_look_back_period: <duration> | default = 30s]
132132

133+
# The maximum number of labels the heap of a topk query using a count min
134+
# sketch can track.
135+
# CLI flag: -querier-rf1.engine.max-count-min-sketch-heap-size
136+
[max_count_min_sketch_heap_size: <int> | default = 10000]
137+
133138
# The maximum number of queries that can be simultaneously processed by the
134139
# querier.
135140
# CLI flag: -querier-rf1.max-concurrent
@@ -3841,6 +3846,9 @@ otlp_config:
38413846
# CLI flag: -limits.ingestion-partition-tenant-shard-size
38423847
[ingestion_partitions_tenant_shard_size: <int> | default = 0]
38433848

3849+
# List of LogQL vector and range aggregations that should be sharded.
3850+
[shard_aggregations: <list of strings>]
3851+
38443852
# Enable metric aggregation. When enabled, pushed streams will be sampled for
38453853
# bytes and count, and these metric will be written back into Loki as a special
38463854
# __aggregated_metric__ stream, which can be queried for faster histogram
@@ -4245,6 +4253,11 @@ engine:
42454253
# CLI flag: -querier.engine.max-lookback-period
42464254
[max_look_back_period: <duration> | default = 30s]
42474255
4256+
# The maximum number of labels the heap of a topk query using a count min
4257+
# sketch can track.
4258+
# CLI flag: -querier.engine.max-count-min-sketch-heap-size
4259+
[max_count_min_sketch_heap_size: <int> | default = 10000]
4260+
42484261
# The maximum number of queries that can be simultaneously processed by the
42494262
# querier.
42504263
# CLI flag: -querier.max-concurrent

‎integration/loki_micro_services_test.go

Lines changed: 110 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -94,7 +94,7 @@ func TestMicroServicesIngestQuery(t *testing.T) {
9494
"-common.compactor-address="+tCompactor.HTTPURL(),
9595
"-querier.per-request-limits-enabled=true",
9696
"-frontend.encoding=protobuf",
97-
"-querier.shard-aggregations=quantile_over_time",
97+
"-querier.shard-aggregations=quantile_over_time,approx_topk",
9898
"-frontend.tail-proxy-url="+tQuerier.HTTPURL(),
9999
)
100100
)
@@ -784,6 +784,115 @@ func TestOTLPLogsIngestQuery(t *testing.T) {
784784
})
785785
}
786786

787+
func TestProbabilisticQuery(t *testing.T) {
788+
clu := cluster.New(nil, cluster.SchemaWithTSDBAndTSDB, func(c *cluster.Cluster) {
789+
c.SetSchemaVer("v13")
790+
})
791+
defer func() {
792+
assert.NoError(t, clu.Cleanup())
793+
}()
794+
795+
// run initially the compactor, indexgateway, and distributor.
796+
var (
797+
tCompactor = clu.AddComponent(
798+
"compactor",
799+
"-target=compactor",
800+
"-compactor.compaction-interval=1s",
801+
"-compactor.retention-delete-delay=1s",
802+
// By default, a minute is added to the delete request start time. This compensates for that.
803+
"-compactor.delete-request-cancel-period=-60s",
804+
"-compactor.deletion-mode=filter-and-delete",
805+
)
806+
tIndexGateway = clu.AddComponent(
807+
"index-gateway",
808+
"-target=index-gateway",
809+
)
810+
tDistributor = clu.AddComponent(
811+
"distributor",
812+
"-target=distributor",
813+
)
814+
)
815+
require.NoError(t, clu.Run())
816+
817+
// then, run only the ingester and query scheduler.
818+
var (
819+
tIngester = clu.AddComponent(
820+
"ingester",
821+
"-target=ingester",
822+
"-boltdb.shipper.index-gateway-client.server-address="+tIndexGateway.GRPCURL(),
823+
)
824+
tQueryScheduler = clu.AddComponent(
825+
"query-scheduler",
826+
"-target=query-scheduler",
827+
"-query-scheduler.use-scheduler-ring=false",
828+
"-boltdb.shipper.index-gateway-client.server-address="+tIndexGateway.GRPCURL(),
829+
)
830+
)
831+
require.NoError(t, clu.Run())
832+
833+
// the run querier.
834+
var (
835+
tQuerier = clu.AddComponent(
836+
"querier",
837+
"-target=querier",
838+
"-querier.scheduler-address="+tQueryScheduler.GRPCURL(),
839+
"-boltdb.shipper.index-gateway-client.server-address="+tIndexGateway.GRPCURL(),
840+
"-common.compactor-address="+tCompactor.HTTPURL(),
841+
)
842+
)
843+
require.NoError(t, clu.Run())
844+
845+
// finally, run the query-frontend.
846+
var (
847+
tQueryFrontend = clu.AddComponent(
848+
"query-frontend",
849+
"-target=query-frontend",
850+
"-frontend.scheduler-address="+tQueryScheduler.GRPCURL(),
851+
"-boltdb.shipper.index-gateway-client.server-address="+tIndexGateway.GRPCURL(),
852+
"-common.compactor-address="+tCompactor.HTTPURL(),
853+
"-querier.per-request-limits-enabled=true",
854+
"-frontend.encoding=protobuf",
855+
"-querier.shard-aggregations=quantile_over_time,approx_topk",
856+
"-frontend.tail-proxy-url="+tQuerier.HTTPURL(),
857+
)
858+
)
859+
require.NoError(t, clu.Run())
860+
861+
tenantID := randStringRunes()
862+
863+
now := time.Now()
864+
cliDistributor := client.New(tenantID, "", tDistributor.HTTPURL())
865+
cliDistributor.Now = now
866+
cliIngester := client.New(tenantID, "", tIngester.HTTPURL())
867+
cliIngester.Now = now
868+
cliQueryFrontend := client.New(tenantID, "", tQueryFrontend.HTTPURL())
869+
cliQueryFrontend.Now = now
870+
871+
t.Run("ingest-logs", func(t *testing.T) {
872+
// ingest some log lines
873+
require.NoError(t, cliDistributor.PushLogLine("lineA", now.Add(-45*time.Minute), nil, map[string]string{"job": "one"}))
874+
require.NoError(t, cliDistributor.PushLogLine("lineB", now.Add(-45*time.Minute), nil, map[string]string{"job": "one"}))
875+
876+
require.NoError(t, cliDistributor.PushLogLine("lineC", now, nil, map[string]string{"job": "one"}))
877+
require.NoError(t, cliDistributor.PushLogLine("lineD", now, nil, map[string]string{"job": "two"}))
878+
})
879+
880+
t.Run("query", func(t *testing.T) {
881+
resp, err := cliQueryFrontend.RunQuery(context.Background(), `approx_topk(1, count_over_time({job=~".+"}[1h]))`)
882+
require.NoError(t, err)
883+
assert.Equal(t, "vector", resp.Data.ResultType)
884+
885+
var values []string
886+
var labels []string
887+
for _, value := range resp.Data.Vector {
888+
values = append(values, value.Value)
889+
labels = append(labels, value.Metric["job"])
890+
}
891+
assert.ElementsMatch(t, []string{"3"}, values)
892+
assert.ElementsMatch(t, []string{"one"}, labels)
893+
})
894+
}
895+
787896
func TestCategorizedLabels(t *testing.T) {
788897
clu := cluster.New(nil, cluster.SchemaWithTSDB, func(c *cluster.Cluster) {
789898
c.SetSchemaVer("v13")

0 commit comments

Comments
 (0)