Skip to content

feat(block-scheduler): status page shows completed jobs #15580

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 4 commits into from
Jan 3, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 20 additions & 0 deletions pkg/blockbuilder/scheduler/priority_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -210,3 +210,23 @@ func (b *CircularBuffer[V]) Lookup(f func(V) bool) (V, bool) {
var zero V
return zero, false
}

// Range iterates over the elements in the buffer from oldest to newest
// and calls the given function for each element.
// If the function returns false, iteration stops.
func (b *CircularBuffer[V]) Range(f func(V) bool) {
if b.size == 0 {
return
}

// Start from head (oldest) and iterate to tail (newest)
idx := b.head
remaining := b.size
for remaining > 0 {
if !f(b.buffer[idx]) {
return
}
idx = (idx + 1) % len(b.buffer)
remaining--
}
}
81 changes: 81 additions & 0 deletions pkg/blockbuilder/scheduler/priority_queue_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
package scheduler

import (
"testing"

"github.com/stretchr/testify/require"
)

func TestCircularBuffer_Range(t *testing.T) {
tests := []struct {
name string
capacity int
input []int
want []int
}{
{
name: "empty buffer",
capacity: 3,
input: []int{},
want: []int{},
},
{
name: "partially filled buffer",
capacity: 3,
input: []int{1, 2},
want: []int{1, 2},
},
{
name: "full buffer",
capacity: 3,
input: []int{1, 2, 3},
want: []int{1, 2, 3},
},
{
name: "buffer with eviction",
capacity: 3,
input: []int{1, 2, 3, 4, 5},
want: []int{3, 4, 5}, // oldest elements (1,2) were evicted
},
{
name: "buffer with multiple evictions",
capacity: 2,
input: []int{1, 2, 3, 4, 5},
want: []int{4, 5}, // only newest elements remain
},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
// Create and fill buffer
buf := NewCircularBuffer[int](tt.capacity)
for _, v := range tt.input {
buf.Push(v)
}

// Use Range to collect elements
got := make([]int, 0)
buf.Range(func(v int) bool {
got = append(got, v)
return true
})

require.Equal(t, tt.want, got, "Range should iterate in order from oldest to newest")
})
}
}

func TestCircularBuffer_Range_EarlyStop(t *testing.T) {
buf := NewCircularBuffer[int](5)
for i := 1; i <= 5; i++ {
buf.Push(i)
}

var got []int
buf.Range(func(v int) bool {
got = append(got, v)
return v != 3 // stop after seeing 3
})

require.Equal(t, []int{1, 2, 3}, got, "Range should stop when function returns false")
}
12 changes: 12 additions & 0 deletions pkg/blockbuilder/scheduler/queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -382,3 +382,15 @@ func (q *JobQueue) ListInProgressJobs() []JobWithMetadata {
}
return jobs
}

func (q *JobQueue) ListCompletedJobs() []JobWithMetadata {
q.mu.RLock()
defer q.mu.RUnlock()

jobs := make([]JobWithMetadata, 0, q.completed.Len())
q.completed.Range(func(job *JobWithMetadata) bool {
jobs = append(jobs, *job)
return true
})
return jobs
}
3 changes: 3 additions & 0 deletions pkg/blockbuilder/scheduler/status.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ var defaultPageTemplate = template.Must(template.New("webpage").Funcs(template.F
type jobQueue interface {
ListPendingJobs() []JobWithMetadata
ListInProgressJobs() []JobWithMetadata
ListCompletedJobs() []JobWithMetadata
}

type offsetReader interface {
Expand Down Expand Up @@ -63,12 +64,14 @@ func (h *statusPageHandler) ServeHTTP(w http.ResponseWriter, _ *http.Request) {
data := struct {
PendingJobs []JobWithMetadata
InProgressJobs []JobWithMetadata
CompletedJobs []JobWithMetadata
Now time.Time
PartitionInfo []partitionInfo
}{
Now: time.Now(),
PendingJobs: pendingJobs,
InProgressJobs: inProgressJobs,
CompletedJobs: h.jobQueue.ListCompletedJobs(),
}

for _, partitionOffset := range offsets {
Expand Down
32 changes: 32 additions & 0 deletions pkg/blockbuilder/scheduler/status.gohtml
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
</thead>
<tbody>
{{ range $i, $job := .PendingJobs }}
<tr>
<td>{{ .ID }}</td>
<td>{{ .Priority }}</td>
<td>{{ .Partition }}</td>
Expand All @@ -47,6 +48,7 @@
</thead>
<tbody>
{{ range $i, $job := .InProgressJobs }}
<tr>
<td>{{ .ID }}</td>
<td>{{ .Priority }}</td>
<td>{{ .Partition }}</td>
Expand All @@ -58,6 +60,35 @@
{{ end }}
</tbody>
</table>
<h2>Completed Jobs</h2>
<table width="100%" border="1">
<thead>
<tr>
<th>ID</th>
<th>Priority</th>
<th>Partition</th>
<th>Start Offset</th>
<th>End Offset</th>
<th>Status</th>
<th>Start Timestamp</th>
<th>Completion Timestamp</th>
</tr>
</thead>
<tbody>
{{ range $i, $job := .CompletedJobs }}
<tr>
<td>{{ .ID }}</td>
<td>{{ .Priority }}</td>
<td>{{ .Partition }}</td>
<td>{{ .Offsets.Min }}</td>
<td>{{ .Offsets.Max }}</td>
<td>{{ .Status }}</td>
<td>{{ .StartTime | durationSince }} ago ({{ .StartTime.Format "Mon, 02 Jan 2006 15:04:05 -0700" }})</td>
<td>{{ .UpdateTime | durationSince }} ago ({{ .UpdateTime.Format "Mon, 02 Jan 2006 15:04:05 -0700" }})</td>
</tr>
{{ end }}
</tbody>
</table>
<h3>Partition Lag</h2>
<table width="100%" border="1">
<thead>
Expand All @@ -70,6 +101,7 @@
</thead>
<tbody>
{{ range .PartitionInfo }}
<tr>
<td>{{ .Partition }}</td>
<td>{{ .Lag }}</td>
<td>{{ .EndOffset }}</td>
Expand Down
72 changes: 72 additions & 0 deletions pkg/blockbuilder/scheduler/status_preview_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
//go:build preview

package scheduler

import (
"fmt"
"net/http/httptest"
"testing"
"time"

"github.com/twmb/franz-go/pkg/kadm"

"github.com/grafana/loki/v3/pkg/blockbuilder/types"
)

// TestPreview is a utility test that runs a local server with the status page.
// Run it with: go test -tags=preview -v -run TestPreview
func TestPreview(t *testing.T) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: i guess we could just remove status_test.go since this is a better way to preview the page

// Setup mock data with varied timestamps
now := time.Now()
mockLister := &mockQueueLister{
pendingJobs: []JobWithMetadata{
{Job: types.NewJob(11, types.Offsets{Min: 11, Max: 20}), UpdateTime: now.Add(-2 * time.Hour), Priority: 23},
{Job: types.NewJob(22, types.Offsets{Min: 21, Max: 30}), UpdateTime: now.Add(-1 * time.Hour), Priority: 42},
{Job: types.NewJob(33, types.Offsets{Min: 22, Max: 40}), UpdateTime: now.Add(-30 * time.Minute), Priority: 11},
},
inProgressJobs: []JobWithMetadata{
{Job: types.NewJob(44, types.Offsets{Min: 1, Max: 10}), StartTime: now.Add(-4 * time.Hour), UpdateTime: now.Add(-3 * time.Hour)},
{Job: types.NewJob(55, types.Offsets{Min: 11, Max: 110}), StartTime: now.Add(-5 * time.Hour), UpdateTime: now.Add(-4 * time.Hour)},
},
completedJobs: []JobWithMetadata{
{Job: types.NewJob(66, types.Offsets{Min: 1, Max: 50}), StartTime: now.Add(-8 * time.Hour), UpdateTime: now.Add(-7 * time.Hour), Status: types.JobStatusComplete},
{Job: types.NewJob(77, types.Offsets{Min: 51, Max: 100}), StartTime: now.Add(-6 * time.Hour), UpdateTime: now.Add(-5 * time.Hour), Status: types.JobStatusComplete},
{Job: types.NewJob(88, types.Offsets{Min: 101, Max: 150}), StartTime: now.Add(-4 * time.Hour), UpdateTime: now.Add(-3 * time.Hour), Status: types.JobStatusFailed},
{Job: types.NewJob(99, types.Offsets{Min: 151, Max: 200}), StartTime: now.Add(-2 * time.Hour), UpdateTime: now.Add(-1 * time.Hour), Status: types.JobStatusComplete},
},
}

mockReader := &mockOffsetReader{
groupLag: map[int32]kadm.GroupMemberLag{
0: {
Lag: 10,
Partition: 3,
End: kadm.ListedOffset{Offset: 100},
Commit: kadm.Offset{At: 90},
},
1: {
Lag: 0,
Partition: 1,
End: kadm.ListedOffset{Offset: 100},
Commit: kadm.Offset{At: 100},
},
2: {
Lag: 233,
Partition: 2,
End: kadm.ListedOffset{Offset: 333},
Commit: kadm.Offset{At: 100},
},
},
}

handler := newStatusPageHandler(mockLister, mockReader, time.Hour)

// Start local server
server := httptest.NewServer(handler)
defer server.Close()

fmt.Printf("\n\n=== Preview server running ===\nOpen this URL in your browser:\n%s\nPress Ctrl+C to stop the server\n\n", server.URL)

// Keep server running until interrupted
select {}
}
5 changes: 5 additions & 0 deletions pkg/blockbuilder/scheduler/status_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
type mockQueueLister struct {
pendingJobs []JobWithMetadata
inProgressJobs []JobWithMetadata
completedJobs []JobWithMetadata
}

func (m *mockQueueLister) ListPendingJobs() []JobWithMetadata {
Expand All @@ -25,6 +26,10 @@ func (m *mockQueueLister) ListInProgressJobs() []JobWithMetadata {
return m.inProgressJobs
}

func (m *mockQueueLister) ListCompletedJobs() []JobWithMetadata {
return m.completedJobs
}

func TestStatusPageHandler_ServeHTTP(t *testing.T) {
t.Skip("skipping. only added to inspect the generated status page.")

Expand Down
Loading