Skip to content
Prev Previous commit
Next Next commit
tsdb creator can optionally return the io.ReadCloser of its bytes
Signed-off-by: Owen Diehl <ow.diehl@gmail.com>
  • Loading branch information
owen-d committed Nov 1, 2024
commit 427505d0dd09f7943fa306a47a6cf930504b941c
2 changes: 1 addition & 1 deletion pkg/storage/stores/shipper/indexshipper/tsdb/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -169,7 +169,7 @@ func (b *Builder) Build(
}
}

if err := writer.Close(); err != nil {
if _, err := writer.Close(false); err != nil {
return id, err
}

Expand Down
29 changes: 22 additions & 7 deletions pkg/storage/stores/shipper/indexshipper/tsdb/index/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ import (
tsdb_errors "github.com/prometheus/prometheus/tsdb/errors"
"github.com/prometheus/prometheus/tsdb/fileutil"

"github.com/grafana/dskit/multierror"
"github.com/grafana/loki/v3/pkg/util/encoding"
)

Expand Down Expand Up @@ -1113,24 +1114,38 @@ type labelIndexHashEntry struct {
offset uint64
}

func (w *Creator) Close() error {
// Even if this fails, we need to close all the files.
ensureErr := w.ensureStage(idxStageDone)
// if reader is true, return an io.ReadCloser of the underlying index. Otherwise, it returns nil.
func (w *Creator) Close(reader bool) (db io.ReadCloser, err error) {
var errs multierror.MultiError

if ensureErr := w.ensureStage(idxStageDone); ensureErr != nil {
errs.Add(ensureErr)
}

if w.fP != nil {
if err := w.fP.Close(); err != nil {
return err
errs.Add(err)
}
}

if w.fPO != nil {
if err := w.fPO.Close(); err != nil {
return err
errs.Add(err)
}
}

if err := w.f.Close(); err != nil {
return err
errs.Add(err)
}

if err := errs.Err(); err != nil {
return nil, err
}

if reader {
return w.f.Load()
}
return ensureErr
return nil, nil
}

// StringIter iterates over a sorted list of strings.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,8 @@ func TestIndexRW_Create_Open(t *testing.T) {
// An empty index must still result in a readable file.
iw, err := NewWriter(context.Background(), FormatV3, fn)
require.NoError(t, err)
require.NoError(t, iw.Close())
_, err = iw.Close()
require.NoError(t, err)

ir, err := NewFileReader(fn)
require.NoError(t, err)
Expand Down Expand Up @@ -178,7 +179,8 @@ func TestIndexRW_Postings(t *testing.T) {
require.NoError(t, iw.AddSeries(3, series[2], model.Fingerprint(series[2].Hash())))
require.NoError(t, iw.AddSeries(4, series[3], model.Fingerprint(series[3].Hash())))

require.NoError(t, iw.Close())
_, err = iw.Close()
require.NoError(t, err)

ir, err := NewFileReader(fn)
require.NoError(t, err)
Expand Down Expand Up @@ -266,7 +268,8 @@ func TestPostingsMany(t *testing.T) {
for i, s := range series {
require.NoError(t, iw.AddSeries(storage.SeriesRef(i), s, model.Fingerprint(s.Hash())))
}
require.NoError(t, iw.Close())
_, err = iw.Close()
require.NoError(t, err)

ir, err := NewFileReader(fn)
require.NoError(t, err)
Expand Down Expand Up @@ -406,7 +409,7 @@ func TestPersistence_index_e2e(t *testing.T) {
postings.Add(storage.SeriesRef(i), s.labels)
}

err = iw.Close()
_, err = iw.Close()
require.NoError(t, err)

ir, err := NewFileReader(filepath.Join(dir, IndexFilename))
Expand Down Expand Up @@ -741,7 +744,7 @@ func TestDecoder_ChunkSamples(t *testing.T) {
require.NoError(t, err)
}

err = iw.Close()
_, err = iw.Close()
require.NoError(t, err)

ir, err := NewFileReader(filepath.Join(dir, name))
Expand Down Expand Up @@ -997,7 +1000,7 @@ func BenchmarkInitReader_ReadOffsetTable(b *testing.B) {
require.NoError(b, err)
}

err = iw.Close()
_, err = iw.Close()
require.NoError(b, err)

bs, err := os.ReadFile(idxFile)
Expand Down
21 changes: 17 additions & 4 deletions pkg/storage/stores/shipper/indexshipper/tsdb/index/writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,10 @@ type writer interface {
Flush() error
// Returns the underlying bytes of the writer and sets the Pos to the end
Bytes() ([]byte, error)

// Used at the end to return the built file. Left as a implementable method rather than being
// done via Bytes() to allow optimizations (e.g. avoid loading whole index into memory when unused)
Load() (io.ReadCloser, error)
}

type FileWriter struct {
Expand Down Expand Up @@ -122,6 +126,14 @@ func (fw *FileWriter) Close() error {
return fw.f.Close()
}

func (fw *FileWriter) Load() (io.ReadCloser, error) {
f, err := os.Open(fw.name)
if err != nil {
return nil, err
}
return f, nil
}

func (fw *FileWriter) Remove() error {
return os.Remove(fw.name)
}
Expand Down Expand Up @@ -214,11 +226,12 @@ func (bw *MemWriter) Bytes() ([]byte, error) {
return bw.buf.Bytes(), nil
}

func (bw *MemWriter) Close() error {
bw.buf.Reset()
return nil
}
func (bw *MemWriter) Close() error { return nil }

func (bw *MemWriter) Flush() error { return nil }

func (bw *MemWriter) Remove() error { return nil }

func (bw *MemWriter) Load() (io.ReadCloser, error) {
return io.NopCloser(bytes.NewReader(bw.buf.Bytes())), nil
}
Loading