Skip to content
Prev Previous commit
Next Next commit
new writer file
Signed-off-by: Owen Diehl <ow.diehl@gmail.com>
  • Loading branch information
owen-d committed Oct 31, 2024
commit 13047c42f8a37f14ca3d2f6b78ab28444f8a6684
97 changes: 0 additions & 97 deletions pkg/storage/stores/shipper/indexshipper/tsdb/index/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@
package index

import (
"bufio"
"bytes"
"context"
"encoding/binary"
Expand Down Expand Up @@ -282,102 +281,6 @@ func (w *Writer) addPadding(size int) error {
return w.f.AddPadding(size)
}

type FileWriter struct {
f *os.File
fbuf *bufio.Writer
position uint64
name string
}

func NewFileWriter(name string) (*FileWriter, error) {
f, err := os.OpenFile(name, os.O_CREATE|os.O_RDWR, 0o666)
if err != nil {
return nil, err
}
return &FileWriter{
f: f,
fbuf: bufio.NewWriterSize(f, 1<<22),
position: 0,
name: name,
}, nil
}

func (fw *FileWriter) Pos() uint64 {
return fw.position
}

func (fw *FileWriter) ReadFrom(r io.Reader) (int64, error) {
n, err := fw.fbuf.ReadFrom(r)
fw.position += uint64(n)
return n, err
}

func (fw *FileWriter) Write(p []byte) (n int, err error) {
n, err = fw.fbuf.Write(p)
fw.position += uint64(n)
if err != nil {
return n, err
}

// For now the index file must not grow beyond 64GiB. Some of the fixed-sized
// offset references in v1 are only 4 bytes large.
// Once we move to compressed/varint representations in those areas, this limitation
// can be lifted.
if fw.position > 16*math.MaxUint32 {
return n, errors.Errorf("%q exceeding max size of 64GiB", fw.name)
}
return n, nil
}

func (fw *FileWriter) WriteBufs(bufs ...[]byte) error {
for _, b := range bufs {
if _, err := fw.Write(b); err != nil {
return err
}
}
return nil
}

func (fw *FileWriter) Flush() error {
return fw.fbuf.Flush()
}

func (fw *FileWriter) WriteAt(buf []byte, pos uint64) error {
if err := fw.Flush(); err != nil {
return err
}
_, err := fw.f.WriteAt(buf, int64(pos))
return err
}

// AddPadding adds zero byte padding until the file size is a multiple size.
func (fw *FileWriter) AddPadding(size int) error {
p := fw.position % uint64(size)
if p == 0 {
return nil
}
p = uint64(size) - p

if _, err := fw.Write(make([]byte, p)); err != nil {
return errors.Wrap(err, "add padding")
}
return nil
}

func (fw *FileWriter) Close() error {
if err := fw.Flush(); err != nil {
return err
}
if err := fw.f.Sync(); err != nil {
return err
}
return fw.f.Close()
}

func (fw *FileWriter) Remove() error {
return os.Remove(fw.name)
}

// ensureStage handles transitions between write stages and ensures that IndexWriter
// methods are called in an order valid for the implementation.
func (w *Writer) ensureStage(s indexWriterStage) error {
Expand Down
106 changes: 106 additions & 0 deletions pkg/storage/stores/shipper/indexshipper/tsdb/index/writer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
package index

import (
"bufio"
"io"
"math"
"os"

"github.com/pkg/errors"
)

type FileWriter struct {
f *os.File
fbuf *bufio.Writer
position uint64
name string
}

func NewFileWriter(name string) (*FileWriter, error) {
f, err := os.OpenFile(name, os.O_CREATE|os.O_RDWR, 0o666)
if err != nil {
return nil, err
}
return &FileWriter{
f: f,
fbuf: bufio.NewWriterSize(f, 1<<22),
position: 0,
name: name,
}, nil
}

func (fw *FileWriter) Pos() uint64 {
return fw.position
}

func (fw *FileWriter) ReadFrom(r io.Reader) (int64, error) {
n, err := fw.fbuf.ReadFrom(r)
fw.position += uint64(n)
return n, err
}

func (fw *FileWriter) Write(p []byte) (n int, err error) {
n, err = fw.fbuf.Write(p)
fw.position += uint64(n)
if err != nil {
return n, err
}

// For now the index file must not grow beyond 64GiB. Some of the fixed-sized
// offset references in v1 are only 4 bytes large.
// Once we move to compressed/varint representations in those areas, this limitation
// can be lifted.
if fw.position > 16*math.MaxUint32 {
return n, errors.Errorf("%q exceeding max size of 64GiB", fw.name)
}
return n, nil
}

func (fw *FileWriter) WriteBufs(bufs ...[]byte) error {
for _, b := range bufs {
if _, err := fw.Write(b); err != nil {
return err
}
}
return nil
}

func (fw *FileWriter) Flush() error {
return fw.fbuf.Flush()
}

func (fw *FileWriter) WriteAt(buf []byte, pos uint64) error {
if err := fw.Flush(); err != nil {
return err
}
_, err := fw.f.WriteAt(buf, int64(pos))
return err
}

// AddPadding adds zero byte padding until the file size is a multiple size.
func (fw *FileWriter) AddPadding(size int) error {
p := fw.position % uint64(size)
if p == 0 {
return nil
}
p = uint64(size) - p

if _, err := fw.Write(make([]byte, p)); err != nil {
return errors.Wrap(err, "add padding")
}
return nil
}

func (fw *FileWriter) Close() error {
if err := fw.Flush(); err != nil {
return err
}
if err := fw.f.Sync(); err != nil {
return err
}
return fw.f.Close()
}

func (fw *FileWriter) Remove() error {
return os.Remove(fw.name)
}