Skip to content
Prev Previous commit
Next Next commit
rename index.Writer -> index.Creator + adds new writer ifc
Signed-off-by: Owen Diehl <ow.diehl@gmail.com>
  • Loading branch information
owen-d committed Oct 31, 2024
commit 9d34fcd243ccdb2da39b7b4434c17449e74017f3
2 changes: 1 addition & 1 deletion pkg/storage/stores/shipper/indexshipper/tsdb/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ func (b *Builder) Build(
name := fmt.Sprintf("%s-%x.staging", index.IndexFilename, rng)
tmpPath := filepath.Join(scratchDir, name)

var writer *index.Writer
var writer *index.Creator

writer, err = index.NewWriterWithVersion(ctx, b.version, tmpPath)
if err != nil {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -433,7 +433,7 @@ func TestChunkEncodingRoundTrip(t *testing.T) {
} {
t.Run(fmt.Sprintf("version %d nChks %d pageSize %d", version, nChks, pageSize), func(t *testing.T) {
chks := mkChks(nChks)
var w Writer
var w Creator
w.Version = version
primary := encoding.EncWrap(tsdb_enc.Encbuf{B: make([]byte, 0)})
scratch := encoding.EncWrap(tsdb_enc.Encbuf{B: make([]byte, 0)})
Expand Down Expand Up @@ -583,7 +583,7 @@ func TestSearchWithPageMarkers(t *testing.T) {
},
} {
t.Run(fmt.Sprintf("%s-pagesize-%d", tc.desc, pageSize), func(t *testing.T) {
var w Writer
var w Creator
w.Version = FormatV3
primary := encoding.EncWrap(tsdb_enc.Encbuf{B: make([]byte, 0)})
scratch := encoding.EncWrap(tsdb_enc.Encbuf{B: make([]byte, 0)})
Expand Down Expand Up @@ -697,7 +697,7 @@ func TestDecoderChunkStats(t *testing.T) {
},
} {
t.Run(fmt.Sprintf("%s_version=%d_pageSize=%d", tc.desc, version, pageSize), func(t *testing.T) {
var w Writer
var w Creator
w.Version = version
primary := encoding.EncWrap(tsdb_enc.Encbuf{B: make([]byte, 0)})
scratch := encoding.EncWrap(tsdb_enc.Encbuf{B: make([]byte, 0)})
Expand All @@ -722,7 +722,7 @@ func BenchmarkChunkStats(b *testing.B) {
from, through := int64(nChks*40/100), int64(nChks*60/100)
for _, version := range []int{FormatV2, FormatV3} {
b.Run(fmt.Sprintf("version %d/%d chunks", version, nChks), func(b *testing.B) {
var w Writer
var w Creator
w.Version = version
primary := encoding.EncWrap(tsdb_enc.Encbuf{B: make([]byte, 0)})
scratch := encoding.EncWrap(tsdb_enc.Encbuf{B: make([]byte, 0)})
Expand All @@ -747,7 +747,7 @@ func BenchmarkReadChunks(b *testing.B) {
from, through := int64(nChks*40/100), int64(nChks*60/100)
for _, version := range []int{FormatV2, FormatV3} {
b.Run(fmt.Sprintf("version %d/%d chunks", version, nChks), func(b *testing.B) {
var w Writer
var w Creator
w.Version = version
primary := encoding.EncWrap(tsdb_enc.Encbuf{B: make([]byte, 0)})
scratch := encoding.EncWrap(tsdb_enc.Encbuf{B: make([]byte, 0)})
Expand Down
54 changes: 27 additions & 27 deletions pkg/storage/stores/shipper/indexshipper/tsdb/index/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,9 +106,9 @@ type symbolCacheEntry struct {
lastValueIndex uint32
}

// Writer implements the IndexWriter interface for the standard
// Creator implements the IndexWriter interface for the standard
// serialization format.
type Writer struct {
type Creator struct {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think Writer was still fine.

ctx context.Context

// For the main index file.
Expand Down Expand Up @@ -210,7 +210,7 @@ func NewTOCFromByteSlice(bs ByteSlice) (*TOC, error) {
}, nil
}

func NewWriterWithVersion(ctx context.Context, version int, fn string) (*Writer, error) {
func NewWriterWithVersion(ctx context.Context, version int, fn string) (*Creator, error) {
dir := filepath.Dir(fn)

df, err := fileutil.OpenDir(dir)
Expand Down Expand Up @@ -242,7 +242,7 @@ func NewWriterWithVersion(ctx context.Context, version int, fn string) (*Writer,
return nil, errors.Wrap(err, "sync dir")
}

iw := &Writer{
iw := &Creator{
Version: version,
ctx: ctx,
f: f,
Expand All @@ -265,25 +265,25 @@ func NewWriterWithVersion(ctx context.Context, version int, fn string) (*Writer,
}

// NewWriter returns a new Writer to the given filename.
func NewWriter(ctx context.Context, indexFormat int, fn string) (*Writer, error) {
func NewWriter(ctx context.Context, indexFormat int, fn string) (*Creator, error) {
return NewWriterWithVersion(ctx, indexFormat, fn)
}

func (w *Writer) write(bufs ...[]byte) error {
func (w *Creator) write(bufs ...[]byte) error {
return w.f.WriteBufs(bufs...)
}

func (w *Writer) writeAt(buf []byte, pos uint64) error {
func (w *Creator) writeAt(buf []byte, pos uint64) error {
return w.f.WriteAt(buf, pos)
}

func (w *Writer) addPadding(size int) error {
func (w *Creator) addPadding(size int) error {
return w.f.AddPadding(size)
}

// ensureStage handles transitions between write stages and ensures that IndexWriter
// methods are called in an order valid for the implementation.
func (w *Writer) ensureStage(s indexWriterStage) error {
func (w *Creator) ensureStage(s indexWriterStage) error {
select {
case <-w.ctx.Done():
return w.ctx.Err()
Expand Down Expand Up @@ -356,7 +356,7 @@ func (w *Writer) ensureStage(s indexWriterStage) error {
return nil
}

func (w *Writer) writeMeta() error {
func (w *Creator) writeMeta() error {
w.buf1.Reset()
w.buf1.PutBE32(MagicIndex)
w.buf1.PutByte(byte(w.Version))
Expand All @@ -369,7 +369,7 @@ func (w *Writer) writeMeta() error {
// fingerprint differs from what labels.Hash() produces. For example,
// multitenant TSDBs embed a tenant label, but the actual series has no such
// label and so the derived fingerprint differs.
func (w *Writer) AddSeries(ref storage.SeriesRef, lset labels.Labels, fp model.Fingerprint, chunks ...ChunkMeta) error {
func (w *Creator) AddSeries(ref storage.SeriesRef, lset labels.Labels, fp model.Fingerprint, chunks ...ChunkMeta) error {
if err := w.ensureStage(idxStageSeries); err != nil {
return err
}
Expand Down Expand Up @@ -456,15 +456,15 @@ func (w *Writer) AddSeries(ref storage.SeriesRef, lset labels.Labels, fp model.F
return nil
}

func (w *Writer) addChunks(chunks []ChunkMeta, primary, scratch *encoding.Encbuf, pageSize int) {
func (w *Creator) addChunks(chunks []ChunkMeta, primary, scratch *encoding.Encbuf, pageSize int) {
if w.Version > FormatV2 {
w.addChunksV3(chunks, primary, scratch, pageSize)
return
}
w.addChunksPriorV3(chunks, primary, scratch)
}

func (w *Writer) addChunksPriorV3(chunks []ChunkMeta, primary, _ *encoding.Encbuf) {
func (w *Creator) addChunksPriorV3(chunks []ChunkMeta, primary, _ *encoding.Encbuf) {
primary.PutUvarint(len(chunks))

if len(chunks) > 0 {
Expand Down Expand Up @@ -493,7 +493,7 @@ func (w *Writer) addChunksPriorV3(chunks []ChunkMeta, primary, _ *encoding.Encbu
}
}

func (w *Writer) addChunksV3(chunks []ChunkMeta, primary, scratch *encoding.Encbuf, chunkPageSize int) {
func (w *Creator) addChunksV3(chunks []ChunkMeta, primary, scratch *encoding.Encbuf, chunkPageSize int) {
scratch.Reset()

primary.PutUvarint(len(chunks))
Expand Down Expand Up @@ -562,14 +562,14 @@ func (w *Writer) addChunksV3(chunks []ChunkMeta, primary, scratch *encoding.Encb
primary.PutBytes(scratch.Get())
}

func (w *Writer) startSymbols() error {
func (w *Creator) startSymbols() error {
// We are at w.toc.Symbols.
// Leave 4 bytes of space for the length, and another 4 for the number of symbols
// which will both be calculated later.
return w.write([]byte("alenblen"))
}

func (w *Writer) AddSymbol(sym string) error {
func (w *Creator) AddSymbol(sym string) error {
if err := w.ensureStage(idxStageSymbols); err != nil {
return err
}
Expand All @@ -583,7 +583,7 @@ func (w *Writer) AddSymbol(sym string) error {
return w.write(w.buf1.Get())
}

func (w *Writer) finishSymbols() error {
func (w *Creator) finishSymbols() error {
symbolTableSize := w.f.Pos() - w.toc.Symbols - 4
// The symbol table's <len> part is 4 bytes. So the total symbol table size must be less than or equal to 2^32-1
if symbolTableSize > math.MaxUint32 {
Expand Down Expand Up @@ -628,7 +628,7 @@ func (w *Writer) finishSymbols() error {
return nil
}

func (w *Writer) writeLabelIndices() error {
func (w *Creator) writeLabelIndices() error {
if err := w.fPO.Flush(); err != nil {
return err
}
Expand Down Expand Up @@ -681,7 +681,7 @@ func (w *Writer) writeLabelIndices() error {
return nil
}

func (w *Writer) writeLabelIndex(name string, values []uint32) error {
func (w *Creator) writeLabelIndex(name string, values []uint32) error {
// Align beginning to 4 bytes for more efficient index list scans.
if err := w.addPadding(4); err != nil {
return err
Expand Down Expand Up @@ -733,7 +733,7 @@ func (w *Writer) writeLabelIndex(name string, values []uint32) error {
}

// writeLabelIndexesOffsetTable writes the label indices offset table.
func (w *Writer) writeLabelIndexesOffsetTable() error {
func (w *Creator) writeLabelIndexesOffsetTable() error {
startPos := w.f.Pos()
// Leave 4 bytes of space for the length, which will be calculated later.
if err := w.write([]byte("alen")); err != nil {
Expand Down Expand Up @@ -777,7 +777,7 @@ func (w *Writer) writeLabelIndexesOffsetTable() error {
}

// writePostingsOffsetTable writes the postings offset table.
func (w *Writer) writePostingsOffsetTable() error {
func (w *Creator) writePostingsOffsetTable() error {
// Ensure everything is in the temporary file.
if err := w.fPO.Flush(); err != nil {
return err
Expand Down Expand Up @@ -858,7 +858,7 @@ func (w *Writer) writePostingsOffsetTable() error {
return w.write(w.buf1.Get())
}

func (w *Writer) writeFingerprintOffsetsTable() error {
func (w *Creator) writeFingerprintOffsetsTable() error {
w.buf1.Reset()
w.buf2.Reset()

Expand Down Expand Up @@ -892,7 +892,7 @@ func (w *Writer) writeFingerprintOffsetsTable() error {

const indexTOCLen = 8*9 + crc32.Size

func (w *Writer) writeTOC() error {
func (w *Creator) writeTOC() error {
w.buf1.Reset()

w.buf1.PutBE64(w.toc.Symbols)
Expand All @@ -912,7 +912,7 @@ func (w *Writer) writeTOC() error {
return w.write(w.buf1.Get())
}

func (w *Writer) writePostingsToTmpFiles() error {
func (w *Creator) writePostingsToTmpFiles() error {
names := make([]string, 0, len(w.labelNames))
for n := range w.labelNames {
names = append(names, n)
Expand Down Expand Up @@ -1037,7 +1037,7 @@ func (w *Writer) writePostingsToTmpFiles() error {
return nil
}

func (w *Writer) writePosting(name, value string, offs []uint32) error {
func (w *Creator) writePosting(name, value string, offs []uint32) error {
// Align beginning to 4 bytes for more efficient postings list scans.
if err := w.fP.AddPadding(4); err != nil {
return err
Expand Down Expand Up @@ -1075,7 +1075,7 @@ func (w *Writer) writePosting(name, value string, offs []uint32) error {
return w.fP.WriteBufs(w.buf2.Get(), w.buf1.Get())
}

func (w *Writer) writePostings() error {
func (w *Creator) writePostings() error {
// There's padding in the tmp file, make sure it actually works.
if err := w.f.AddPadding(4); err != nil {
return err
Expand Down Expand Up @@ -1119,7 +1119,7 @@ type labelIndexHashEntry struct {
offset uint64
}

func (w *Writer) Close() error {
func (w *Creator) Close() error {
// Even if this fails, we need to close all the files.
ensureErr := w.ensureStage(idxStageDone)

Expand Down