Skip to content
Next Next commit
filewriter refactoring prep: remove external use of private field
ReaderFrom & Writer impl
Signed-off-by: Owen Diehl <ow.diehl@gmail.com>
  • Loading branch information
owen-d committed Oct 31, 2024
commit 1c596dede383b5a8ab09b64ec443980a94d61b1b
117 changes: 65 additions & 52 deletions pkg/storage/stores/shipper/indexshipper/tsdb/index/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -271,7 +271,7 @@ func NewWriter(ctx context.Context, indexFormat int, fn string) (*Writer, error)
}

func (w *Writer) write(bufs ...[]byte) error {
return w.f.Write(bufs...)
return w.f.WriteBufs(bufs...)
}

func (w *Writer) writeAt(buf []byte, pos uint64) error {
Expand All @@ -283,10 +283,10 @@ func (w *Writer) addPadding(size int) error {
}

type FileWriter struct {
f *os.File
fbuf *bufio.Writer
pos uint64
name string
f *os.File
fbuf *bufio.Writer
position uint64
name string
}

func NewFileWriter(name string) (*FileWriter, error) {
Expand All @@ -295,31 +295,45 @@ func NewFileWriter(name string) (*FileWriter, error) {
return nil, err
}
return &FileWriter{
f: f,
fbuf: bufio.NewWriterSize(f, 1<<22),
pos: 0,
name: name,
f: f,
fbuf: bufio.NewWriterSize(f, 1<<22),
position: 0,
name: name,
}, nil
}

func (fw *FileWriter) Pos() uint64 {
return fw.pos
return fw.position
}

func (fw *FileWriter) Write(bufs ...[]byte) error {
func (fw *FileWriter) ReadFrom(r io.Reader) (int64, error) {
n, err := fw.fbuf.ReadFrom(r)
fw.position += uint64(n)
return n, err
}

func (fw *FileWriter) Write(p []byte) (n int, err error) {
n, err = fw.fbuf.Write(p)
fw.position += uint64(n)
if err != nil {
return n, err
}

// For now the index file must not grow beyond 64GiB. Some of the fixed-sized
// offset references in v1 are only 4 bytes large.
// Once we move to compressed/varint representations in those areas, this limitation
// can be lifted.
if fw.position > 16*math.MaxUint32 {
return n, errors.Errorf("%q exceeding max size of 64GiB", fw.name)
}
return n, nil
}

func (fw *FileWriter) WriteBufs(bufs ...[]byte) error {
for _, b := range bufs {
n, err := fw.fbuf.Write(b)
fw.pos += uint64(n)
if err != nil {
if _, err := fw.Write(b); err != nil {
return err
}
// For now the index file must not grow beyond 64GiB. Some of the fixed-sized
// offset references in v1 are only 4 bytes large.
// Once we move to compressed/varint representations in those areas, this limitation
// can be lifted.
if fw.pos > 16*math.MaxUint32 {
return errors.Errorf("%q exceeding max size of 64GiB", fw.name)
}
}
return nil
}
Expand All @@ -338,13 +352,13 @@ func (fw *FileWriter) WriteAt(buf []byte, pos uint64) error {

// AddPadding adds zero byte padding until the file size is a multiple size.
func (fw *FileWriter) AddPadding(size int) error {
p := fw.pos % uint64(size)
p := fw.position % uint64(size)
if p == 0 {
return nil
}
p = uint64(size) - p

if err := fw.Write(make([]byte, p)); err != nil {
if _, err := fw.Write(make([]byte, p)); err != nil {
return errors.Wrap(err, "add padding")
}
return nil
Expand Down Expand Up @@ -389,18 +403,18 @@ func (w *Writer) ensureStage(s indexWriterStage) error {
// Mark start of sections in table of contents.
switch s {
case idxStageSymbols:
w.toc.Symbols = w.f.pos
w.toc.Symbols = w.f.Pos()
if err := w.startSymbols(); err != nil {
return err
}
case idxStageSeries:
if err := w.finishSymbols(); err != nil {
return err
}
w.toc.Series = w.f.pos
w.toc.Series = w.f.Pos()

case idxStageDone:
w.toc.LabelIndices = w.f.pos
w.toc.LabelIndices = w.f.Pos()
// LabelIndices generation depends on the posting offset
// table produced at this stage.
if err := w.writePostingsToTmpFiles(); err != nil {
Expand All @@ -410,22 +424,22 @@ func (w *Writer) ensureStage(s indexWriterStage) error {
return err
}

w.toc.Postings = w.f.pos
w.toc.Postings = w.f.Pos()
if err := w.writePostings(); err != nil {
return err
}

w.toc.LabelIndicesTable = w.f.pos
w.toc.LabelIndicesTable = w.f.Pos()
if err := w.writeLabelIndexesOffsetTable(); err != nil {
return err
}

w.toc.PostingsTable = w.f.pos
w.toc.PostingsTable = w.f.Pos()
if err := w.writePostingsOffsetTable(); err != nil {
return err
}

w.toc.FingerprintOffsets = w.f.pos
w.toc.FingerprintOffsets = w.f.Pos()
if err := w.writeFingerprintOffsetsTable(); err != nil {
return err
}
Expand Down Expand Up @@ -478,8 +492,8 @@ func (w *Writer) AddSeries(ref storage.SeriesRef, lset labels.Labels, fp model.F
return errors.Errorf("failed to write padding bytes: %v", err)
}

if w.f.pos%16 != 0 {
return errors.Errorf("series write not 16-byte aligned at %d", w.f.pos)
if w.f.Pos()%16 != 0 {
return errors.Errorf("series write not 16-byte aligned at %d", w.f.Pos())
}

w.buf2.Reset()
Expand Down Expand Up @@ -528,7 +542,7 @@ func (w *Writer) AddSeries(ref storage.SeriesRef, lset labels.Labels, fp model.F
if ref%fingerprintInterval == 0 {
// series references are the 16-byte aligned offsets
// Do NOT ask me how long I debugged this particular bit >:O
sRef := w.f.pos / 16
sRef := w.f.Pos() / 16
w.fingerprintOffsets = append(w.fingerprintOffsets, [2]uint64{sRef, labelHash})
}

Expand Down Expand Up @@ -667,7 +681,7 @@ func (w *Writer) AddSymbol(sym string) error {
}

func (w *Writer) finishSymbols() error {
symbolTableSize := w.f.pos - w.toc.Symbols - 4
symbolTableSize := w.f.Pos() - w.toc.Symbols - 4
// The symbol table's <len> part is 4 bytes. So the total symbol table size must be less than or equal to 2^32-1
if symbolTableSize > math.MaxUint32 {
return errors.Errorf("symbol table size exceeds 4 bytes: %d", symbolTableSize)
Expand All @@ -681,7 +695,7 @@ func (w *Writer) finishSymbols() error {
return err
}

hashPos := w.f.pos
hashPos := w.f.Pos()
// Leave space for the hash. We can only calculate it
// now that the number of symbols is known, so mmap and do it from there.
if err := w.write([]byte("hash")); err != nil {
Expand Down Expand Up @@ -723,7 +737,7 @@ func (w *Writer) writeLabelIndices() error {
}
defer f.Close()

d := encoding.DecWrap(tsdb_enc.NewDecbufRaw(RealByteSlice(f.Bytes()), int(w.fPO.pos)))
d := encoding.DecWrap(tsdb_enc.NewDecbufRaw(RealByteSlice(f.Bytes()), int(w.fPO.Pos())))
cnt := w.cntPO
current := []byte{}
values := []uint32{}
Expand Down Expand Up @@ -772,10 +786,10 @@ func (w *Writer) writeLabelIndex(name string, values []uint32) error {

w.labelIndexes = append(w.labelIndexes, labelIndexHashEntry{
keys: []string{name},
offset: w.f.pos,
offset: w.f.Pos(),
})

startPos := w.f.pos
startPos := w.f.Pos()
// Leave 4 bytes of space for the length, which will be calculated later.
if err := w.write([]byte("alen")); err != nil {
return err
Expand All @@ -801,7 +815,7 @@ func (w *Writer) writeLabelIndex(name string, values []uint32) error {

// Write out the length.
w.buf1.Reset()
l := w.f.pos - startPos - 4
l := w.f.Pos() - startPos - 4
if l > math.MaxUint32 {
return errors.Errorf("label index size exceeds 4 bytes: %d", l)
}
Expand All @@ -817,7 +831,7 @@ func (w *Writer) writeLabelIndex(name string, values []uint32) error {

// writeLabelIndexesOffsetTable writes the label indices offset table.
func (w *Writer) writeLabelIndexesOffsetTable() error {
startPos := w.f.pos
startPos := w.f.Pos()
// Leave 4 bytes of space for the length, which will be calculated later.
if err := w.write([]byte("alen")); err != nil {
return err
Expand Down Expand Up @@ -845,7 +859,7 @@ func (w *Writer) writeLabelIndexesOffsetTable() error {
}
// Write out the length.
w.buf1.Reset()
l := w.f.pos - startPos - 4
l := w.f.Pos() - startPos - 4
if l > math.MaxUint32 {
return errors.Errorf("label indexes offset table size exceeds 4 bytes: %d", l)
}
Expand All @@ -866,7 +880,7 @@ func (w *Writer) writePostingsOffsetTable() error {
return err
}

startPos := w.f.pos
startPos := w.f.Pos()
// Leave 4 bytes of space for the length, which will be calculated later.
if err := w.write([]byte("alen")); err != nil {
return err
Expand All @@ -893,7 +907,7 @@ func (w *Writer) writePostingsOffsetTable() error {
f.Close()
}
}()
d := encoding.DecWrap(tsdb_enc.NewDecbufRaw(RealByteSlice(f.Bytes()), int(w.fPO.pos)))
d := encoding.DecWrap(tsdb_enc.NewDecbufRaw(RealByteSlice(f.Bytes()), int(w.fPO.Pos())))
cnt := w.cntPO
for d.Err() == nil && cnt > 0 {
w.buf1.Reset()
Expand Down Expand Up @@ -926,7 +940,7 @@ func (w *Writer) writePostingsOffsetTable() error {

// Write out the length.
w.buf1.Reset()
l := w.f.pos - startPos - 4
l := w.f.Pos() - startPos - 4
if l > math.MaxUint32 {
return errors.Errorf("postings offset table size exceeds 4 bytes: %d", l)
}
Expand Down Expand Up @@ -1131,8 +1145,8 @@ func (w *Writer) writePosting(name, value string, offs []uint32) error {
w.buf1.PutUvarint(2)
w.buf1.PutUvarintStr(name)
w.buf1.PutUvarintStr(value)
w.buf1.PutUvarint64(w.fP.pos) // This is relative to the postings tmp file, not the final index file.
if err := w.fPO.Write(w.buf1.Get()); err != nil {
w.buf1.PutUvarint64(w.fP.Pos()) // This is relative to the postings tmp file, not the final index file.
if err := w.fPO.WriteBufs(w.buf1.Get()); err != nil {
return err
}
w.cntPO++
Expand All @@ -1155,15 +1169,15 @@ func (w *Writer) writePosting(name, value string, offs []uint32) error {
}
w.buf2.PutBE32int(l)
w.buf1.PutHash(w.crc32)
return w.fP.Write(w.buf2.Get(), w.buf1.Get())
return w.fP.WriteBufs(w.buf2.Get(), w.buf1.Get())
}

func (w *Writer) writePostings() error {
// There's padding in the tmp file, make sure it actually works.
if err := w.f.AddPadding(4); err != nil {
return err
}
w.postingsStart = w.f.pos
w.postingsStart = w.f.Pos()

// Copy temporary file into main index.
if err := w.fP.Flush(); err != nil {
Expand All @@ -1173,14 +1187,13 @@ func (w *Writer) writePostings() error {
return err
}
// Don't need to calculate a checksum, so can copy directly.
n, err := io.CopyBuffer(w.f.fbuf, w.fP.f, make([]byte, 1<<20))
n, err := io.CopyBuffer(w.f, w.fP.f, make([]byte, 1<<20))
if err != nil {
return err
}
if uint64(n) != w.fP.pos {
return errors.Errorf("wrote %d bytes to posting temporary file, but only read back %d", w.fP.pos, n)
if uint64(n) != w.fP.Pos() {
return errors.Errorf("wrote %d bytes to posting temporary file, but only read back %d", w.fP.Pos(), n)
}
w.f.pos += uint64(n)

if err := w.fP.Close(); err != nil {
return err
Expand Down