Skip to content
Prev Previous commit
Next Next commit
uses writer, removes mmap from tsdb writing
Signed-off-by: Owen Diehl <ow.diehl@gmail.com>
  • Loading branch information
owen-d committed Oct 31, 2024
commit fddaddace593805f60dfb2a9d220da6d0d73a24b
81 changes: 35 additions & 46 deletions pkg/storage/stores/shipper/indexshipper/tsdb/index/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,13 +111,13 @@ type symbolCacheEntry struct {
type Creator struct {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think Writer was still fine.

ctx context.Context

// For the main index file.
f *FileWriter
// For the main index.
f writer

// Temporary file for postings.
fP *FileWriter
// Temporary file for posting offsets table.
fPO *FileWriter
// Temporary writer for postings.
fP writer
// Temporary writer for posting offsets table.
fPO writer
cntPO uint64

toc TOC
Expand All @@ -130,7 +130,6 @@ type Creator struct {

numSymbols int
symbols *Symbols
symbolFile *fileutil.MmapFile
lastSymbol string
symbolCache map[string]symbolCacheEntry

Expand Down Expand Up @@ -273,8 +272,9 @@ func (w *Creator) write(bufs ...[]byte) error {
return w.f.WriteBufs(bufs...)
}

func (w *Creator) writeAt(buf []byte, pos uint64) error {
return w.f.WriteAt(buf, pos)
func (w *Creator) writeAt(buf []byte, pos int64) error {
_, err := w.f.WriteAt(buf, pos)
return err
}

func (w *Creator) addPadding(size int) error {
Expand Down Expand Up @@ -594,7 +594,7 @@ func (w *Creator) finishSymbols() error {
w.buf1.Reset()
w.buf1.PutBE32int(int(symbolTableSize))
w.buf1.PutBE32int(w.numSymbols)
if err := w.writeAt(w.buf1.Get(), w.toc.Symbols); err != nil {
if err := w.writeAt(w.buf1.Get(), int64(w.toc.Symbols)); err != nil {
return err
}

Expand All @@ -608,20 +608,22 @@ func (w *Creator) finishSymbols() error {
return err
}

sf, err := fileutil.OpenMmapFile(w.f.name)
symbolBytes, err := w.f.Bytes()
if err != nil {
return err
}
w.symbolFile = sf
hash := crc32.Checksum(w.symbolFile.Bytes()[w.toc.Symbols+4:hashPos], castagnoliTable)
hash := crc32.Checksum(symbolBytes[w.toc.Symbols+4:hashPos], castagnoliTable)
w.buf1.Reset()
w.buf1.PutBE32(hash)
if err := w.writeAt(w.buf1.Get(), hashPos); err != nil {
if err := w.writeAt(w.buf1.Get(), int64(hashPos)); err != nil {
return err
}

// Load in the symbol table efficiently for the rest of the index writing.
w.symbols, err = NewSymbols(RealByteSlice(w.symbolFile.Bytes()), w.Version, int(w.toc.Symbols))
// Now that we've calculated and added the checksum on disk, add it to the
// pre-checksummed bytes in memory so we can use this later,
// loading the symbol table efficiently for the rest of the index writing.
copy(symbolBytes[hashPos:], w.buf1.Get())
w.symbols, err = NewSymbols(RealByteSlice(symbolBytes), w.Version, int(w.toc.Symbols))
if err != nil {
return errors.Wrap(err, "read symbols")
}
Expand All @@ -633,14 +635,12 @@ func (w *Creator) writeLabelIndices() error {
return err
}

// Find all the label values in the tmp posting offset table.
f, err := fileutil.OpenMmapFile(w.fPO.name)
pOffsets, err := w.fPO.Bytes()
if err != nil {
return err
}
defer f.Close()

d := encoding.DecWrap(tsdb_enc.NewDecbufRaw(RealByteSlice(f.Bytes()), int(w.fPO.Pos())))
d := encoding.DecWrap(tsdb_enc.NewDecbufRaw(RealByteSlice(pOffsets), int(w.fPO.Pos())))
cnt := w.cntPO
current := []byte{}
values := []uint32{}
Expand Down Expand Up @@ -723,7 +723,7 @@ func (w *Creator) writeLabelIndex(name string, values []uint32) error {
return errors.Errorf("label index size exceeds 4 bytes: %d", l)
}
w.buf1.PutBE32int(int(l))
if err := w.writeAt(w.buf1.Get(), startPos); err != nil {
if err := w.writeAt(w.buf1.Get(), int64(startPos)); err != nil {
return err
}

Expand Down Expand Up @@ -767,7 +767,7 @@ func (w *Creator) writeLabelIndexesOffsetTable() error {
return errors.Errorf("label indexes offset table size exceeds 4 bytes: %d", l)
}
w.buf1.PutBE32int(int(l))
if err := w.writeAt(w.buf1.Get(), startPos); err != nil {
if err := w.writeAt(w.buf1.Get(), int64(startPos)); err != nil {
return err
}

Expand Down Expand Up @@ -801,16 +801,12 @@ func (w *Creator) writePostingsOffsetTable() error {
return err
}

f, err := fileutil.OpenMmapFile(w.fPO.name)
pOffsets, err := w.fPO.Bytes()
if err != nil {
return err
}
defer func() {
if f != nil {
f.Close()
}
}()
d := encoding.DecWrap(tsdb_enc.NewDecbufRaw(RealByteSlice(f.Bytes()), int(w.fPO.Pos())))

d := encoding.DecWrap(tsdb_enc.NewDecbufRaw(RealByteSlice(pOffsets), int(w.fPO.Pos())))
cnt := w.cntPO
for d.Err() == nil && cnt > 0 {
w.buf1.Reset()
Expand All @@ -828,11 +824,6 @@ func (w *Creator) writePostingsOffsetTable() error {
return d.Err()
}

// Cleanup temporary file.
if err := f.Close(); err != nil {
return err
}
f = nil
if err := w.fPO.Close(); err != nil {
return err
}
Expand All @@ -848,7 +839,7 @@ func (w *Creator) writePostingsOffsetTable() error {
return errors.Errorf("postings offset table size exceeds 4 bytes: %d", l)
}
w.buf1.PutBE32int(int(l))
if err := w.writeAt(w.buf1.Get(), startPos); err != nil {
if err := w.writeAt(w.buf1.Get(), int64(startPos)); err != nil {
return err
}

Expand Down Expand Up @@ -922,15 +913,15 @@ func (w *Creator) writePostingsToTmpFiles() error {
if err := w.f.Flush(); err != nil {
return err
}
f, err := fileutil.OpenMmapFile(w.f.name)

b, err := w.f.Bytes()
if err != nil {
return err
}
defer f.Close()

// Write out the special all posting.
offsets := []uint32{}
d := encoding.DecWrap(tsdb_enc.NewDecbufRaw(RealByteSlice(f.Bytes()), int(w.toc.LabelIndices)))
d := encoding.DecWrap(tsdb_enc.NewDecbufRaw(RealByteSlice(b), int(w.toc.LabelIndices)))
d.Skip(int(w.toc.Series))
for d.Len() > 0 {
d.ConsumePadding()
Expand Down Expand Up @@ -976,7 +967,7 @@ func (w *Creator) writePostingsToTmpFiles() error {
// Label name -> label value -> positions.
postings := map[uint32]map[uint32][]uint32{}

d := encoding.DecWrap(tsdb_enc.NewDecbufRaw(RealByteSlice(f.Bytes()), int(w.toc.LabelIndices)))
d := encoding.DecWrap(tsdb_enc.NewDecbufRaw(RealByteSlice(b), int(w.toc.LabelIndices)))
d.Skip(int(w.toc.Series))
for d.Len() > 0 {
d.ConsumePadding()
Expand Down Expand Up @@ -1086,11 +1077,14 @@ func (w *Creator) writePostings() error {
if err := w.fP.Flush(); err != nil {
return err
}
if _, err := w.fP.f.Seek(0, 0); err != nil {
// NB(owen-d): inefficient, but avoids complexity `Pos()` altering `Seek` logic.
postings, err := w.fP.Bytes()
if err != nil {
return err
}

// Don't need to calculate a checksum, so can copy directly.
n, err := io.CopyBuffer(w.f, w.fP.f, make([]byte, 1<<20))
n, err := io.CopyBuffer(w.f, bytes.NewReader(postings), make([]byte, 1<<20))
if err != nil {
return err
}
Expand Down Expand Up @@ -1123,11 +1117,6 @@ func (w *Creator) Close() error {
// Even if this fails, we need to close all the files.
ensureErr := w.ensureStage(idxStageDone)

if w.symbolFile != nil {
if err := w.symbolFile.Close(); err != nil {
return err
}
}
if w.fP != nil {
if err := w.fP.Close(); err != nil {
return err
Expand Down