Skip to content

Conversation

@cyriltovena
Copy link
Contributor

What this PR does / why we need it:

This adds a benchmark and some improvement to the write path of wal segment.

Which issue(s) this PR fixes:
Fixes https://github.com/grafana/loki-private/issues/1005 https://github.com/grafana/loki-private/issues/1004

❯ benchstat before.txt after.txt                                                                                                                                 
name       old time/op    new time/op    delta
Writes-16    11.9ms ±29%     6.5ms ± 2%  -44.84%  (p=0.008 n=5+5)

name       old alloc/op   new alloc/op   delta
Writes-16    12.9MB ± 9%     0.1MB ± 1%  -98.84%  (p=0.008 n=5+5)

name       old allocs/op  new allocs/op  delta
Writes-16     2.36k ±19%     0.30k ± 0%  -87.13%  (p=0.008 n=5+5)
streams: swiss.NewMap[streamID, *streamSegment](64),
buf1: encoding.EncWith(make([]byte, 0, 4)),
func NewWalSegmentWriter() (*SegmentWriter, error) {
idxWriter, err := index.NewWriter(context.TODO())
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we have to pass a ctx to the index? I think we can remove it because we only ever pass context.TODO(). We also don't pass a context to the parent SegmentWriter so the utility is very limited.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes I'll look into that.

s = streamSegmentPool.Get().(*streamSegment)
s.lbls = lbls
s.tenantID = tenantID
s.entries = s.entries[:0]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should a streamSegment have a reset() method?
It looks like only one line to reset them but it would maintain consistency with our other Readers/Writers.

Copy link
Contributor

@benclive benclive left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM! Just one comment on the tests but the benchmark should already cover it so happy to approve regardless.

t.Logf("Series sizes: [%s]\n", sizesString)
}

func TestReset(t *testing.T) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you add/modify this test case so the re-use case adds more data than the initial case?
If we had re-use errors, we'd likely panic when re-using a small buffer buit we'd be less likely to panic when re-using a large buffer.

for i := 0; i < b.N; i++ {
writer := pool.Get().(*SegmentWriter)

dst.Reset()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This benchmark isn't running in parallel (since dst is reused). You could remove the pool for writers from the benchmark, or add a pool for dst and run the whole thing in parallel :)

@benclive benclive merged commit debb5f2 into grafana:main Jun 24, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

2 participants