Skip to content

Commit a3a5acd

Browse files
authored
chore: add status page for block scheduler (#15553)
1 parent 52d745f commit a3a5acd

File tree

7 files changed

+325
-0
lines changed

7 files changed

+325
-0
lines changed

‎pkg/blockbuilder/scheduler/priority_queue.go‎

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -94,6 +94,11 @@ func (pq *PriorityQueue[K, V]) Len() int {
9494
return pq.h.Len()
9595
}
9696

97+
// List returns all elements in the queue.
98+
func (pq *PriorityQueue[K, V]) List() []V {
99+
return pq.h.List()
100+
}
101+
97102
// priorityHeap is the internal heap implementation that satisfies heap.Interface.
98103
type priorityHeap[V any] struct {
99104
less func(V, V) bool
@@ -108,6 +113,14 @@ func (h *priorityHeap[V]) Less(i, j int) bool {
108113
return h.less(h.heap[i].value, h.heap[j].value)
109114
}
110115

116+
func (h *priorityHeap[V]) List() []V {
117+
vals := make([]V, 0, len(h.heap))
118+
for _, item := range h.heap {
119+
vals = append(vals, item.value)
120+
}
121+
return vals
122+
}
123+
111124
func (h *priorityHeap[V]) Swap(i, j int) {
112125
h.heap[i], h.heap[j] = h.heap[j], h.heap[i]
113126
h.heap[i].index = i

‎pkg/blockbuilder/scheduler/queue.go‎

Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -268,3 +268,42 @@ func (q *JobQueue) SyncJob(jobID string, job *types.Job) {
268268
q.inProgress[jobID] = jobMeta
269269
q.statusMap[jobID] = types.JobStatusInProgress
270270
}
271+
272+
func (q *JobQueue) ListPendingJobs() []JobWithMetadata {
273+
q.mu.RLock()
274+
defer q.mu.RUnlock()
275+
276+
// return copies of the jobs since they can change after the lock is released
277+
jobs := make([]JobWithMetadata, 0, q.pending.Len())
278+
for _, j := range q.pending.List() {
279+
jobs = append(jobs, JobWithMetadata{
280+
// Job is immutable, no need to make a copy
281+
Job: j.Job,
282+
Priority: j.Priority,
283+
Status: j.Status,
284+
StartTime: j.StartTime,
285+
UpdateTime: j.UpdateTime,
286+
})
287+
}
288+
289+
return jobs
290+
}
291+
292+
func (q *JobQueue) ListInProgressJobs() []JobWithMetadata {
293+
q.mu.RLock()
294+
defer q.mu.RUnlock()
295+
296+
// return copies of the jobs since they can change after the lock is released
297+
jobs := make([]JobWithMetadata, 0, len(q.inProgress))
298+
for _, j := range q.inProgress {
299+
jobs = append(jobs, JobWithMetadata{
300+
// Job is immutable, no need to make a copy
301+
Job: j.Job,
302+
Priority: j.Priority,
303+
Status: j.Status,
304+
StartTime: j.StartTime,
305+
UpdateTime: j.UpdateTime,
306+
})
307+
}
308+
return jobs
309+
}

‎pkg/blockbuilder/scheduler/scheduler.go‎

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ import (
55
"errors"
66
"flag"
77
"fmt"
8+
"net/http"
89
"strconv"
910
"strings"
1011
"time"
@@ -259,3 +260,7 @@ func (s *BlockScheduler) HandleSyncJob(_ context.Context, job *types.Job) error
259260
s.queue.SyncJob(job.ID(), job)
260261
return nil
261262
}
263+
264+
func (s *BlockScheduler) ServeHTTP(w http.ResponseWriter, req *http.Request) {
265+
newStatusPageHandler(s.queue, s.offsetManager, s.cfg.LookbackPeriod).ServeHTTP(w, req)
266+
}
Lines changed: 93 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,93 @@
1+
package scheduler
2+
3+
import (
4+
"context"
5+
_ "embed"
6+
"html/template"
7+
"net/http"
8+
"slices"
9+
"time"
10+
11+
"github.com/twmb/franz-go/pkg/kadm"
12+
)
13+
14+
//go:embed status.gohtml
15+
var defaultPageContent string
16+
var defaultPageTemplate = template.Must(template.New("webpage").Funcs(template.FuncMap{
17+
"durationSince": func(t time.Time) string { return time.Since(t).Truncate(time.Second).String() },
18+
}).Parse(defaultPageContent))
19+
20+
type jobQueue interface {
21+
ListPendingJobs() []JobWithMetadata
22+
ListInProgressJobs() []JobWithMetadata
23+
}
24+
25+
type offsetReader interface {
26+
GroupLag(ctx context.Context, lookbackPeriod time.Duration) (map[int32]kadm.GroupMemberLag, error)
27+
}
28+
29+
type partitionInfo struct {
30+
Partition int32
31+
Lag int64
32+
EndOffset int64
33+
CommittedOffset int64
34+
}
35+
36+
type statusPageHandler struct {
37+
jobQueue jobQueue
38+
offsetReader offsetReader
39+
lookbackPeriod time.Duration
40+
}
41+
42+
func newStatusPageHandler(jobQueue jobQueue, offsetReader offsetReader, lookbackPeriod time.Duration) *statusPageHandler {
43+
return &statusPageHandler{jobQueue: jobQueue, offsetReader: offsetReader, lookbackPeriod: lookbackPeriod}
44+
}
45+
46+
func (h *statusPageHandler) ServeHTTP(w http.ResponseWriter, _ *http.Request) {
47+
offsets, err := h.offsetReader.GroupLag(context.Background(), h.lookbackPeriod)
48+
if err != nil {
49+
http.Error(w, err.Error(), http.StatusInternalServerError)
50+
return
51+
}
52+
53+
pendingJobs := h.jobQueue.ListPendingJobs()
54+
slices.SortFunc(pendingJobs, func(a, b JobWithMetadata) int {
55+
return b.Priority - a.Priority // Higher priority first
56+
})
57+
58+
inProgressJobs := h.jobQueue.ListInProgressJobs()
59+
slices.SortFunc(inProgressJobs, func(a, b JobWithMetadata) int {
60+
return int(a.StartTime.Sub(b.StartTime)) // Earlier start time First
61+
})
62+
63+
data := struct {
64+
PendingJobs []JobWithMetadata
65+
InProgressJobs []JobWithMetadata
66+
Now time.Time
67+
PartitionInfo []partitionInfo
68+
}{
69+
Now: time.Now(),
70+
PendingJobs: pendingJobs,
71+
InProgressJobs: inProgressJobs,
72+
}
73+
74+
for _, partitionOffset := range offsets {
75+
// only include partitions having lag
76+
if partitionOffset.Lag > 0 {
77+
data.PartitionInfo = append(data.PartitionInfo, partitionInfo{
78+
Partition: partitionOffset.Partition,
79+
Lag: partitionOffset.Lag,
80+
EndOffset: partitionOffset.End.Offset,
81+
CommittedOffset: partitionOffset.Commit.At,
82+
})
83+
}
84+
}
85+
slices.SortFunc(data.PartitionInfo, func(a, b partitionInfo) int {
86+
return int(a.Partition - b.Partition)
87+
})
88+
89+
w.Header().Set("Content-Type", "text/html")
90+
if err := defaultPageTemplate.Execute(w, data); err != nil {
91+
http.Error(w, err.Error(), http.StatusInternalServerError)
92+
}
93+
}
Lines changed: 82 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,82 @@
1+
{{- /*gotype: github.com/grafana/dskit/ring.httpResponse */ -}}
2+
<!DOCTYPE html>
3+
<html>
4+
<head>
5+
<meta charset="UTF-8">
6+
<title>Block Scheduler Status</title>
7+
</head>
8+
<body>
9+
<h1>Block Scheduler Status</h1>
10+
<p>Current time: {{ .Now }}</p>
11+
<h2>Pending Jobs</h2>
12+
<table width="100%" border="1">
13+
<thead>
14+
<tr>
15+
<th>ID</th>
16+
<th>Priority</th>
17+
<th>Partition</th>
18+
<th>Start Offset</th>
19+
<th>End Offset</th>
20+
<th>Creation Timestamp</th>
21+
</tr>
22+
</thead>
23+
<tbody>
24+
{{ range $i, $job := .PendingJobs }}
25+
<td>{{ .ID }}</td>
26+
<td>{{ .Priority }}</td>
27+
<td>{{ .Partition }}</td>
28+
<td>{{ .Offsets.Min }}</td>
29+
<td>{{ .Offsets.Max }}</td>
30+
<td>{{ .UpdateTime | durationSince }} ago ({{ .UpdateTime.Format "Mon, 02 Jan 2006 15:04:05 -0700" }})</td>
31+
</tr>
32+
{{ end }}
33+
</tbody>
34+
</table>
35+
<h2>In progress Jobs</h2>
36+
<table width="100%" border="1">
37+
<thead>
38+
<tr>
39+
<th>ID</th>
40+
<th>Priority</th>
41+
<th>Partition</th>
42+
<th>Start Offset</th>
43+
<th>End Offset</th>
44+
<th>Start Timestamp</th>
45+
<th>Last Updated Timestamp</th>
46+
</tr>
47+
</thead>
48+
<tbody>
49+
{{ range $i, $job := .InProgressJobs }}
50+
<td>{{ .ID }}</td>
51+
<td>{{ .Priority }}</td>
52+
<td>{{ .Partition }}</td>
53+
<td>{{ .Offsets.Min }}</td>
54+
<td>{{ .Offsets.Max }}</td>
55+
<td>{{ .StartTime | durationSince }} ago ({{ .StartTime.Format "Mon, 02 Jan 2006 15:04:05 -0700" }})</td>
56+
<td>{{ .UpdateTime | durationSince }} ago ({{ .UpdateTime.Format "Mon, 02 Jan 2006 15:04:05 -0700" }})</td>
57+
</tr>
58+
{{ end }}
59+
</tbody>
60+
</table>
61+
<h3>Partition Lag</h2>
62+
<table width="100%" border="1">
63+
<thead>
64+
<tr>
65+
<th>Partition</th>
66+
<th>Lag</th>
67+
<th>End offset</th>
68+
<th>Committed offset</th>
69+
</tr>
70+
</thead>
71+
<tbody>
72+
{{ range .PartitionInfo }}
73+
<td>{{ .Partition }}</td>
74+
<td>{{ .Lag }}</td>
75+
<td>{{ .EndOffset }}</td>
76+
<td>{{ .CommittedOffset }}</td>
77+
</tr>
78+
{{ end }}
79+
</tbody>
80+
</table>
81+
</body>
82+
</html>
Lines changed: 91 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,91 @@
1+
package scheduler
2+
3+
import (
4+
"net/http"
5+
"net/http/httptest"
6+
"os"
7+
"testing"
8+
"time"
9+
10+
"github.com/twmb/franz-go/pkg/kadm"
11+
12+
"github.com/grafana/loki/v3/pkg/blockbuilder/types"
13+
)
14+
15+
type mockQueueLister struct {
16+
pendingJobs []JobWithMetadata
17+
inProgressJobs []JobWithMetadata
18+
}
19+
20+
func (m *mockQueueLister) ListPendingJobs() []JobWithMetadata {
21+
return m.pendingJobs
22+
}
23+
24+
func (m *mockQueueLister) ListInProgressJobs() []JobWithMetadata {
25+
return m.inProgressJobs
26+
}
27+
28+
func TestStatusPageHandler_ServeHTTP(t *testing.T) {
29+
t.Skip("skipping. only added to inspect the generated status page.")
30+
31+
// Setup mock data
32+
mockLister := &mockQueueLister{
33+
pendingJobs: []JobWithMetadata{
34+
{Job: types.NewJob(11, types.Offsets{Min: 11, Max: 20}), UpdateTime: time.Now().Add(-2 * time.Hour), Priority: 23},
35+
{Job: types.NewJob(22, types.Offsets{Min: 21, Max: 30}), UpdateTime: time.Now().Add(-1 * time.Hour), Priority: 42},
36+
{Job: types.NewJob(33, types.Offsets{Min: 22, Max: 40}), UpdateTime: time.Now().Add(-1 * time.Hour), Priority: 11},
37+
},
38+
inProgressJobs: []JobWithMetadata{
39+
{Job: types.NewJob(0, types.Offsets{Min: 1, Max: 10}), StartTime: time.Now().Add(-4 * time.Hour), UpdateTime: time.Now().Add(-3 * time.Hour)},
40+
{Job: types.NewJob(1, types.Offsets{Min: 11, Max: 110}), StartTime: time.Now().Add(-5 * time.Hour), UpdateTime: time.Now().Add(-4 * time.Hour)},
41+
},
42+
}
43+
44+
mockReader := &mockOffsetReader{
45+
groupLag: map[int32]kadm.GroupMemberLag{
46+
0: {
47+
Lag: 10,
48+
Partition: 3,
49+
End: kadm.ListedOffset{Offset: 100},
50+
Commit: kadm.Offset{At: 90},
51+
},
52+
1: {
53+
Lag: 0,
54+
Partition: 1,
55+
End: kadm.ListedOffset{Offset: 100},
56+
Commit: kadm.Offset{At: 100},
57+
},
58+
2: {
59+
Lag: 233,
60+
Partition: 2,
61+
End: kadm.ListedOffset{Offset: 333},
62+
Commit: kadm.Offset{At: 100},
63+
},
64+
},
65+
}
66+
67+
handler := newStatusPageHandler(mockLister, mockReader, time.Hour)
68+
req := httptest.NewRequest(http.MethodGet, "/test", nil)
69+
w := httptest.NewRecorder()
70+
handler.ServeHTTP(w, req)
71+
72+
resp := w.Result()
73+
defer resp.Body.Close()
74+
75+
// Verify status code
76+
if resp.StatusCode != http.StatusOK {
77+
t.Errorf("expected status OK; got %v", resp.StatusCode)
78+
}
79+
80+
// Verify content type
81+
contentType := resp.Header.Get("Content-Type")
82+
if contentType != "text/html" {
83+
t.Errorf("expected Content-Type text/html; got %v", contentType)
84+
}
85+
86+
// Write response body to file for inspection
87+
err := os.WriteFile("/tmp/generated_status.html", w.Body.Bytes(), 0644)
88+
if err != nil {
89+
t.Errorf("failed to write response body to file: %v", err)
90+
}
91+
}

‎pkg/loki/modules.go‎

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1888,6 +1888,8 @@ func (t *Loki) initBlockScheduler() (services.Service, error) {
18881888
return nil, err
18891889
}
18901890

1891+
t.Server.HTTP.Path("/blockscheduler/status").Methods("GET").Handler(s)
1892+
18911893
blockprotos.RegisterSchedulerServiceServer(
18921894
t.Server.GRPC,
18931895
blocktypes.NewSchedulerServer(s),

0 commit comments

Comments
 (0)