Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Next Next commit
thanos: add support for named stores
  • Loading branch information
ashwanthgoli committed Oct 29, 2024
commit ee8b34dee9f77a060b36200fc75167d8d591f843
7 changes: 6 additions & 1 deletion pkg/loki/validation.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"fmt"
"strings"

"github.com/grafana/loki/v3/pkg/storage/bucket"
"github.com/grafana/loki/v3/pkg/storage/chunk/cache"
"github.com/grafana/loki/v3/pkg/storage/config"
"github.com/grafana/loki/v3/pkg/storage/types"
Expand Down Expand Up @@ -97,7 +98,11 @@ func validateSchemaValues(c *Config) []error {
errs = append(errs, fmt.Errorf("unrecognized `store` (index) type `%s`, choose one of: %s", cfg.IndexType, strings.Join(types.SupportedIndexTypes, ", ")))
}

if !util.StringsContain(types.TestingStorageTypes, cfg.ObjectType) &&
if c.StorageConfig.UseThanosObjstore {
if !util.StringsContain(bucket.SupportedBackends, cfg.ObjectType) && !c.StorageConfig.ObjectStore.NamedStores.Exists(cfg.ObjectType) {
errs = append(errs, fmt.Errorf("unrecognized `object_store` type `%s`, which also does not match any named_stores. Choose one of: %s. Or choose a named_store", cfg.ObjectType, strings.Join(bucket.SupportedBackends, ", ")))
}
} else if !util.StringsContain(types.TestingStorageTypes, cfg.ObjectType) &&
!util.StringsContain(types.SupportedStorageTypes, cfg.ObjectType) &&
!util.StringsContain(types.DeprecatedStorageTypes, cfg.ObjectType) {
if !c.StorageConfig.NamedStores.Exists(cfg.ObjectType) {
Expand Down
88 changes: 76 additions & 12 deletions pkg/storage/bucket/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"errors"
"flag"
"fmt"
"regexp"

"github.com/go-kit/log"
Expand Down Expand Up @@ -56,6 +57,8 @@ type StorageBackendConfig struct {
Swift swift.Config `yaml:"swift"`
Filesystem filesystem.Config `yaml:"filesystem"`

NamedStores NamedStores `yaml:"named_stores"`

// Used to inject additional backends into the config. Allows for this config to
// be embedded in multiple contexts and support non-object storage based backends.
ExtraBackends []string `yaml:"-"`
Expand Down Expand Up @@ -84,12 +87,11 @@ func (cfg *StorageBackendConfig) RegisterFlagsWithPrefix(prefix string, f *flag.
}

func (cfg *StorageBackendConfig) Validate() error {
// TODO: enable validation when s3 flags are registered
// if err := cfg.S3.Validate(); err != nil {
// return err
//}
if err := cfg.S3.Validate(); err != nil {
return err
}

return nil
return cfg.NamedStores.Validate()
}

// Config holds configuration for accessing long-term storage.
Expand Down Expand Up @@ -124,28 +126,90 @@ func (cfg *Config) Validate() error {
}
}

return cfg.StorageBackendConfig.Validate()
if err := cfg.StorageBackendConfig.Validate(); err != nil {
return err
}

return cfg.NamedStores.Validate()
}

// NewClient creates a new bucket client based on the configured backend
func NewClient(ctx context.Context, backend string, cfg Config, name string, logger log.Logger) (objstore.InstrumentedBucket, error) {
var (
storeType = backend
namedStore bool

client objstore.Bucket
err error
)

if st, ok := cfg.NamedStores.storeType[backend]; ok {
namedStore = true
storeType = st
}

// TODO: add support for other backends that loki already supports
switch backend {
switch storeType {
case S3:
client, err = s3.NewBucketClient(cfg.S3, name, logger)
s3Cfg := cfg.S3
if namedStore {
nsCfg, ok := cfg.NamedStores.S3[backend]
if !ok {
return nil, fmt.Errorf("Unrecognized named s3 storage config %s", backend)
}

s3Cfg = (s3.Config)(nsCfg)
}

client, err = s3.NewBucketClient(s3Cfg, name, logger)
case GCS:
client, err = gcs.NewBucketClient(ctx, cfg.GCS, name, logger)
gcsCfg := cfg.GCS
if namedStore {
nsCfg, ok := cfg.NamedStores.GCS[backend]
if !ok {
return nil, fmt.Errorf("Unrecognized named gcs storage config %s", backend)
}

gcsCfg = (gcs.Config)(nsCfg)
}

client, err = gcs.NewBucketClient(ctx, gcsCfg, name, logger)
case Azure:
client, err = azure.NewBucketClient(cfg.Azure, name, logger)
azureCfg := cfg.Azure
if namedStore {
nsCfg, ok := cfg.NamedStores.Azure[backend]
if !ok {
return nil, fmt.Errorf("Unrecognized named azure storage config %s", backend)
}

azureCfg = (azure.Config)(nsCfg)
}

client, err = azure.NewBucketClient(azureCfg, name, logger)
case Swift:
client, err = swift.NewBucketClient(cfg.Swift, name, logger)
swiftCfg := cfg.Swift
if namedStore {
nsCfg, ok := cfg.NamedStores.Swift[backend]
if !ok {
return nil, fmt.Errorf("Unrecognized named swift storage config %s", backend)
}

swiftCfg = (swift.Config)(nsCfg)
}

client, err = swift.NewBucketClient(swiftCfg, name, logger)
case Filesystem:
client, err = filesystem.NewBucketClient(cfg.Filesystem)
fsCfg := cfg.Filesystem
if namedStore {
nsCfg, ok := cfg.NamedStores.Filesystem[backend]
if !ok {
return nil, fmt.Errorf("Unrecognized named swift storage config %s", backend)
}

fsCfg = (filesystem.Config)(nsCfg)
}

client, err = filesystem.NewBucketClient(fsCfg)
default:
return nil, ErrUnsupportedStorageBackend
}
Expand Down
149 changes: 149 additions & 0 deletions pkg/storage/bucket/named_stores.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,149 @@
package bucket

import (
"fmt"

"github.com/grafana/loki/v3/pkg/storage/bucket/azure"
"github.com/grafana/loki/v3/pkg/storage/bucket/filesystem"
"github.com/grafana/loki/v3/pkg/storage/bucket/gcs"
"github.com/grafana/loki/v3/pkg/storage/bucket/s3"
"github.com/grafana/loki/v3/pkg/storage/bucket/swift"

"github.com/grafana/dskit/flagext"
)

// NamedStores helps configure additional object stores from a given storage provider
type NamedStores struct {
Azure map[string]NamedAzureStorageConfig `yaml:"azure"`
Filesystem map[string]NamedFilesystemStorageConfig `yaml:"filesystem"`
GCS map[string]NamedGCSStorageConfig `yaml:"gcs"`
S3 map[string]NamedS3StorageConfig `yaml:"s3"`
Swift map[string]NamedSwiftStorageConfig `yaml:"swift"`

// contains mapping from named store reference name to store type
storeType map[string]string `yaml:"-"`
}

func (ns *NamedStores) Validate() error {
for name, s3Cfg := range ns.S3 {
if err := s3Cfg.Validate(); err != nil {
return fmt.Errorf("invalid S3 Storage config with name %s: %w", name, err)
}
}

return ns.populateStoreType()
}

func (ns *NamedStores) populateStoreType() error {
ns.storeType = make(map[string]string)

checkForDuplicates := func(name string) error {
switch name {
case S3, GCS, Azure, Swift, Filesystem:
return fmt.Errorf("named store %q should not match with the name of a predefined storage type", name)
}

if st, ok := ns.storeType[name]; ok {
return fmt.Errorf("named store %q is already defined under %s", name, st)
}

return nil
}

for name := range ns.S3 {
if err := checkForDuplicates(name); err != nil {
return err
}
ns.storeType[name] = S3
}

for name := range ns.Azure {
if err := checkForDuplicates(name); err != nil {
return err
}
ns.storeType[name] = Azure
}

for name := range ns.Filesystem {
if err := checkForDuplicates(name); err != nil {
return err
}
ns.storeType[name] = Filesystem
}

for name := range ns.GCS {
if err := checkForDuplicates(name); err != nil {
return err
}
ns.storeType[name] = GCS
}

for name := range ns.Swift {
if err := checkForDuplicates(name); err != nil {
return err
}
ns.storeType[name] = Swift
}

return nil
}

func (ns *NamedStores) Exists(name string) bool {
_, ok := ns.storeType[name]
return ok
}

// Storage configs defined as Named stores don't get any defaults as they do not
// register flags. To get around this we implement Unmarshaler interface that
// assigns the defaults before calling unmarshal.

// We cannot implement Unmarshaler directly on s3.Config or other stores
// as it would end up overriding values set as part of ApplyDynamicConfig().
// Note: we unmarshal a second time after applying dynamic configs
//
// Implementing the Unmarshaler for Named*StorageConfig types is fine as
// we do not apply any dynamic config on them.

type NamedS3StorageConfig s3.Config

// UnmarshalYAML implements the yaml.Unmarshaler interface.
func (cfg *NamedS3StorageConfig) UnmarshalYAML(unmarshal func(interface{}) error) error {
flagext.DefaultValues((*s3.Config)(cfg))
return unmarshal((*s3.Config)(cfg))
}

func (cfg *NamedS3StorageConfig) Validate() error {
return (*s3.Config)(cfg).Validate()
}

type NamedGCSStorageConfig gcs.Config

// UnmarshalYAML implements the yaml.Unmarshaler interface.
func (cfg *NamedGCSStorageConfig) UnmarshalYAML(unmarshal func(interface{}) error) error {
flagext.DefaultValues((*gcs.Config)(cfg))
return unmarshal((*gcs.Config)(cfg))
}

type NamedAzureStorageConfig azure.Config

// UnmarshalYAML implements the yaml.Unmarshaler interface.
func (cfg *NamedAzureStorageConfig) UnmarshalYAML(unmarshal func(interface{}) error) error {
flagext.DefaultValues((*azure.Config)(cfg))
return unmarshal((*azure.Config)(cfg))
}

type NamedSwiftStorageConfig swift.Config

// UnmarshalYAML implements the yaml.Unmarshaler interface.
func (cfg *NamedSwiftStorageConfig) UnmarshalYAML(unmarshal func(interface{}) error) error {
flagext.DefaultValues((*swift.Config)(cfg))
return unmarshal((*swift.Config)(cfg))
}

type NamedFilesystemStorageConfig filesystem.Config

// UnmarshalYAML implements the yaml.Unmarshaler interface.
func (cfg *NamedFilesystemStorageConfig) UnmarshalYAML(unmarshal func(interface{}) error) error {
flagext.DefaultValues((*filesystem.Config)(cfg))
return unmarshal((*filesystem.Config)(cfg))
}