Skip to content

Commit 2e85454

Browse files
committed
feat: profilecli query-blocks series
1 parent 5f970ee commit 2e85454

File tree

6 files changed

+181
-17
lines changed

6 files changed

+181
-17
lines changed

‎cmd/profilecli/main.go

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -79,6 +79,10 @@ func main() {
7979
queryTracerCmd := app.Command("query-tracer", "Analyze query traces.")
8080
queryTracerParams := addQueryTracerParams(queryTracerCmd)
8181

82+
queryBlockCmd := app.Command("query-blocks", "Query on local/remote blocks")
83+
queryBlockSeriesCmd := queryBlockCmd.Command("series", "Request series labels on a local/remote block")
84+
queryBlockSeriesParams := addQueryBlockSeriesParams(queryBlockSeriesCmd)
85+
8286
uploadCmd := app.Command("upload", "Upload profile(s).")
8387
uploadParams := addUploadParams(uploadCmd)
8488

@@ -131,6 +135,10 @@ func main() {
131135
if err := querySeries(ctx, querySeriesParams); err != nil {
132136
os.Exit(checkError(err))
133137
}
138+
case queryBlockSeriesCmd.FullCommand():
139+
if err := queryBlockSeries(ctx, queryBlockSeriesParams); err != nil {
140+
os.Exit(checkError(err))
141+
}
134142

135143
case queryLabelValuesCardinalityCmd.FullCommand():
136144
if err := queryLabelValuesCardinality(ctx, queryLabelValuesCardinalityParams); err != nil {

‎cmd/profilecli/output.go

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,25 @@
1+
package main
2+
3+
import (
4+
"encoding/json"
5+
"os"
6+
7+
"github.com/grafana/pyroscope/api/gen/proto/go/types/v1"
8+
)
9+
10+
func outputSeries(result []*typesv1.Labels) error {
11+
enc := json.NewEncoder(os.Stdout)
12+
m := make(map[string]interface{})
13+
for _, s := range result {
14+
for k := range m {
15+
delete(m, k)
16+
}
17+
for _, l := range s.Labels {
18+
m[l.Name] = l.Value
19+
}
20+
if err := enc.Encode(m); err != nil {
21+
return err
22+
}
23+
}
24+
return nil
25+
}

‎cmd/profilecli/query-blocks.go

Lines changed: 111 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,111 @@
1+
package main
2+
3+
import (
4+
"context"
5+
"fmt"
6+
"math"
7+
8+
"connectrpc.com/connect"
9+
"github.com/go-kit/log/level"
10+
"github.com/pkg/errors"
11+
12+
ingestv1 "github.com/grafana/pyroscope/api/gen/proto/go/ingester/v1"
13+
"github.com/grafana/pyroscope/pkg/objstore"
14+
objstoreclient "github.com/grafana/pyroscope/pkg/objstore/client"
15+
"github.com/grafana/pyroscope/pkg/objstore/providers/filesystem"
16+
"github.com/grafana/pyroscope/pkg/objstore/providers/gcs"
17+
"github.com/grafana/pyroscope/pkg/phlaredb"
18+
)
19+
20+
type queryBlockParams struct {
21+
LocalPath string
22+
BucketName string
23+
BlockIds []string
24+
TenanId string
25+
ObjectStoreType string
26+
Query string
27+
}
28+
29+
type queryBlockSeriesParams struct {
30+
*queryBlockParams
31+
LabelNames []string
32+
}
33+
34+
func addQueryBlockParams(queryCmd commander) *queryBlockParams {
35+
params := new(queryBlockParams)
36+
queryCmd.Flag("local-path", "Path to blocks directory.").Default("./data/anonymous/local").StringVar(&params.LocalPath)
37+
queryCmd.Flag("bucket-name", "The name of the object storage bucket.").StringVar(&params.BucketName)
38+
queryCmd.Flag("object-store-type", "The type of the object storage (e.g., gcs).").Default("gcs").StringVar(&params.ObjectStoreType)
39+
queryCmd.Flag("block-ids", "List of blocks ids to query on").StringsVar(&params.BlockIds)
40+
queryCmd.Flag("tenant-id", "Tenant id of the queried block for remote bucket").StringVar(&params.TenanId)
41+
queryCmd.Flag("query", "Label selector to query.").Default("{}").StringVar(&params.Query)
42+
return params
43+
}
44+
45+
func addQueryBlockSeriesParams(queryCmd commander) *queryBlockSeriesParams {
46+
params := new(queryBlockSeriesParams)
47+
params.queryBlockParams = addQueryBlockParams(queryCmd)
48+
queryCmd.Flag("label-names", "Filter returned labels to the supplied label names. Without any filter all labels are returned.").StringsVar(&params.LabelNames)
49+
return params
50+
}
51+
52+
func queryBlockSeries(ctx context.Context, params *queryBlockSeriesParams) error {
53+
level.Info(logger).Log("msg", fmt.Sprintf("query-block series"), "labelNames", fmt.Sprintf("%q", params.LabelNames), "blockIds", fmt.Sprintf("%v", params.BlockIds), "localPath", params.LocalPath, "bucketName", params.BucketName, "tenantId", params.TenanId)
54+
55+
bucket, err := getBucket(ctx, params)
56+
if err != nil {
57+
return err
58+
}
59+
60+
blockQuerier := phlaredb.NewBlockQuerier(ctx, bucket)
61+
62+
var from, to int64
63+
from, to = math.MaxInt64, math.MinInt64
64+
var targetBlockQueriers phlaredb.Queriers
65+
for _, blockId := range params.queryBlockParams.BlockIds {
66+
meta, err := blockQuerier.BlockMeta(ctx, blockId)
67+
if err != nil {
68+
return err
69+
}
70+
from = min(from, meta.MinTime.Time().UnixMilli())
71+
to = max(to, meta.MaxTime.Time().UnixMilli())
72+
targetBlockQueriers = append(targetBlockQueriers, phlaredb.NewSingleBlockQuerierFromMeta(ctx, bucket, meta))
73+
}
74+
75+
response, err := targetBlockQueriers.Series(ctx, connect.NewRequest(
76+
&ingestv1.SeriesRequest{
77+
Start: from,
78+
End: to,
79+
Matchers: []string{params.Query},
80+
LabelNames: params.LabelNames,
81+
},
82+
))
83+
if err != nil {
84+
return err
85+
}
86+
87+
return outputSeries(response.Msg.LabelsSet)
88+
}
89+
90+
func getBucket(ctx context.Context, params *queryBlockSeriesParams) (objstore.Bucket, error) {
91+
if params.BucketName != "" {
92+
return getRemoteBucket(ctx, params)
93+
} else {
94+
return filesystem.NewBucket(params.LocalPath)
95+
}
96+
}
97+
98+
func getRemoteBucket(ctx context.Context, params *queryBlockSeriesParams) (objstore.Bucket, error) {
99+
if params.TenanId == "" {
100+
return nil, errors.New("specify tenant id for remote bucket")
101+
}
102+
return objstoreclient.NewBucket(ctx, objstoreclient.Config{
103+
StorageBackendConfig: objstoreclient.StorageBackendConfig{
104+
Backend: params.ObjectStoreType,
105+
GCS: gcs.Config{
106+
BucketName: params.BucketName,
107+
},
108+
},
109+
StoragePrefix: fmt.Sprintf("%s/phlaredb", params.TenanId),
110+
}, params.BucketName)
111+
}

‎cmd/profilecli/query.go

Lines changed: 2 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,6 @@ package main
33
import (
44
"bytes"
55
"context"
6-
"encoding/json"
76
"fmt"
87
"io"
98
"os"
@@ -320,22 +319,8 @@ func querySeries(ctx context.Context, params *querySeriesParams) (err error) {
320319
return errors.Errorf("unknown api type %s", params.APIType)
321320
}
322321

323-
enc := json.NewEncoder(os.Stdout)
324-
m := make(map[string]interface{})
325-
for _, s := range result {
326-
for k := range m {
327-
delete(m, k)
328-
}
329-
for _, l := range s.Labels {
330-
m[l.Name] = l.Value
331-
}
332-
if err := enc.Encode(m); err != nil {
333-
return err
334-
}
335-
}
336-
337-
return nil
338-
322+
err = outputSeries(result)
323+
return err
339324
}
340325

341326
type queryLabelValuesCardinalityParams struct {

‎pkg/phlaredb/block_querier.go

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -159,6 +159,23 @@ func (b *BlockQuerier) BlockMetas(ctx context.Context) (metas []*block.Meta, _ e
159159
return metas[0 : pos+1], nil
160160
}
161161

162+
func (b *BlockQuerier) BlockMeta(ctx context.Context, name string) (meta *block.Meta, _ error) {
163+
path := filepath.Join(name, block.MetaFilename)
164+
metaReader, err := b.bkt.Get(ctx, path)
165+
if err != nil {
166+
level.Error(b.logger).Log("msg", "error reading block meta", "block", path, "err", err)
167+
return nil, err
168+
}
169+
170+
meta, err = block.Read(metaReader)
171+
if err != nil {
172+
level.Error(b.logger).Log("msg", "error parsing block meta", "block", path, "err", err)
173+
return nil, err
174+
}
175+
176+
return meta, nil
177+
}
178+
162179
// Sync gradually scans the available blocks. If there are any changes to the
163180
// last run it will Open/Close new/no longer existing ones.
164181
func (b *BlockQuerier) Sync(ctx context.Context) error {

‎pkg/phlaredb/block_querier_test.go

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1386,3 +1386,21 @@ func testSelectMergeByStacktracesRace(t testing.TB, times int) {
13861386
require.NoError(t, g.Wait())
13871387
require.NoError(t, querier.Close())
13881388
}
1389+
1390+
func TestBlockMeta_loadsMetasIndividually(t *testing.T) {
1391+
path := "./block/testdata/"
1392+
bucket, err := filesystem.NewBucket(path)
1393+
require.NoError(t, err)
1394+
1395+
ctx := context.Background()
1396+
blockQuerier := NewBlockQuerier(ctx, bucket)
1397+
metas, err := blockQuerier.BlockMetas(ctx)
1398+
require.NoError(t, err)
1399+
1400+
for _, meta := range metas {
1401+
singleMeta, err := blockQuerier.BlockMeta(ctx, meta.ULID.String())
1402+
require.NoError(t, err)
1403+
1404+
require.Equal(t, meta, singleMeta)
1405+
}
1406+
}

0 commit comments

Comments
 (0)