Skip to content

Compact idle blocks. #2803

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 13 commits into from
Jun 30, 2020
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
* [ENHANCEMENT] Added `-ingester.flush-on-shutdown-with-wal-enabled` option to enable chunks flushing even when WAL is enabled. #2780
* [ENHANCEMENT] Query-tee: Support for custom API prefix by using `-server.path-prefix` option. #2814
* [ENHANCEMENT] Query-tee: Forward `X-Scope-OrgId` header to backend, if present in the request. #2815
* [ENHANCEMENT] Experimental TSDB: Added `-experimental.tsdb.head-compaction-idle-timeout` option to force compaction of data in memory into a block. #2803
* [BUGFIX] Fixed a bug in the index intersect code causing storage to return more chunks/series than required. #2796
* [BUGFIX] Fixed the number of reported keys in the background cache queue. #2764
* [BUGFIX] Fix race in processing of headers in sharded queries. #2762
Expand Down
4 changes: 4 additions & 0 deletions docs/configuration/config-file-reference.md
Original file line number Diff line number Diff line change
Expand Up @@ -3049,6 +3049,10 @@ bucket_store:
# CLI flag: -experimental.tsdb.head-compaction-concurrency
[head_compaction_concurrency: <int> | default = 5]

# If TSDB head is idle for this duration, it is compacted. 0 means disabled.
# CLI flag: -experimental.tsdb.head-compaction-idle-timeout
[head_compaction_idle_timeout: <duration> | default = 1h]

# The number of shards of series to use in TSDB (must be a power of 2). Reducing
# this will decrease memory footprint, but can negatively impact performance.
# CLI flag: -experimental.tsdb.stripe-size
Expand Down
4 changes: 4 additions & 0 deletions docs/operations/blocks-storage.md
Original file line number Diff line number Diff line change
Expand Up @@ -433,6 +433,10 @@ tsdb:
# CLI flag: -experimental.tsdb.head-compaction-concurrency
[head_compaction_concurrency: <int> | default = 5]

# If TSDB head is idle for this duration, it is compacted. 0 means disabled.
# CLI flag: -experimental.tsdb.head-compaction-idle-timeout
[head_compaction_idle_timeout: <duration> | default = 1h]

# The number of shards of series to use in TSDB (must be a power of 2).
# Reducing this will decrease memory footprint, but can negatively impact
# performance.
Expand Down
34 changes: 33 additions & 1 deletion pkg/ingester/ingester_v2.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"github.com/thanos-io/thanos/pkg/shipper"
"github.com/weaveworks/common/httpgrpc"
"github.com/weaveworks/common/user"
"go.uber.org/atomic"

"github.com/cortexproject/cortex/pkg/ingester/client"
"github.com/cortexproject/cortex/pkg/ring"
Expand Down Expand Up @@ -52,6 +53,9 @@ type userTSDB struct {
seriesInMetric *metricCounter
limiter *Limiter

// Used to detect idle TSDBs.
lastUpdate *atomic.Int64

// Thanos shipper used to ship blocks to the storage.
shipper Shipper

Expand Down Expand Up @@ -105,6 +109,16 @@ func (u *userTSDB) PostDeletion(metrics ...labels.Labels) {
}
}

func (u *userTSDB) isIdle(now time.Time, idle time.Duration) bool {
lu := u.lastUpdate.Load()

return time.Unix(lu, 0).Add(idle).Before(now)
}

func (u *userTSDB) setLastUpdate(t time.Time) {
u.lastUpdate.Store(t.Unix())
}

// TSDBState holds data structures used by the TSDB storage engine
type TSDBState struct {
dbs map[string]*userTSDB // tsdb sharded by userID
Expand Down Expand Up @@ -438,6 +452,8 @@ func (i *Ingester) v2Push(ctx context.Context, req *client.WriteRequest) (*clien
}
i.TSDBState.appenderCommitDuration.Observe(time.Since(startCommit).Seconds())

db.setLastUpdate(time.Now())

// Increment metrics only if the samples have been successfully committed.
// If the code didn't reach this point, it means that we returned an error
// which will be converted into an HTTP 5xx and the client should/will retry.
Expand Down Expand Up @@ -856,6 +872,7 @@ func (i *Ingester) createTSDB(userID string) (*userTSDB, error) {
seriesInMetric: newMetricCounter(i.limiter),
ingestedAPISamples: newEWMARate(0.2, i.cfg.RateUpdatePeriod),
ingestedRuleSamples: newEWMARate(0.2, i.cfg.RateUpdatePeriod),
lastUpdate: atomic.NewInt64(0),
}

// Create a new user database
Expand All @@ -877,6 +894,7 @@ func (i *Ingester) createTSDB(userID string) (*userTSDB, error) {
// We set the limiter here because we don't want to limit
// series during WAL replay.
userDB.limiter = i.limiter
userDB.setLastUpdate(time.Now()) // After WAL replay.

// Thanos shipper requires at least 1 external label to be set. For this reason,
// we set the tenant ID as external label and we'll filter it out when reading
Expand Down Expand Up @@ -1099,8 +1117,22 @@ func (i *Ingester) compactBlocks(ctx context.Context) {
return
}

// Don't do anything, if there is nothing to compact.
h := userDB.Head()
if h.NumSeries() == 0 {
return
}

var err error

i.TSDBState.compactionsTriggered.Inc()
err := userDB.Compact()
if i.cfg.TSDBConfig.HeadCompactionIdleTimeout > 0 && userDB.isIdle(time.Now(), i.cfg.TSDBConfig.HeadCompactionIdleTimeout) {
level.Debug(util.Logger).Log("msg", "Forcing compaction due to TSDB being idle")
err = userDB.CompactHead(tsdb.NewRangeHead(h, h.MinTime(), h.MaxTime()))
} else {
err = userDB.Compact()
}

if err != nil {
i.TSDBState.compactionsFailed.Inc()
level.Warn(util.Logger).Log("msg", "TSDB blocks compaction for user has failed", "user", userID, "err", err)
Expand Down
83 changes: 83 additions & 0 deletions pkg/ingester/ingester_v2_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1410,3 +1410,86 @@ func Test_Ingester_v2AllUserStats(t *testing.T) {
}
assert.ElementsMatch(t, expect, res.Stats)
}

func TestIngesterCompactIdleBlock(t *testing.T) {
cfg := defaultIngesterTestConfig()
cfg.LifecyclerConfig.JoinAfter = 0
cfg.TSDBConfig.ShipConcurrency = 1
cfg.TSDBConfig.HeadCompactionInterval = 1 * time.Hour // Long enough to not be reached during the test.
cfg.TSDBConfig.HeadCompactionIdleTimeout = 1 * time.Second // Testing this.

r := prometheus.NewRegistry()

// Create ingester
i, cleanup, err := newIngesterMockWithTSDBStorage(cfg, r)
require.NoError(t, err)
t.Cleanup(cleanup)

require.NoError(t, services.StartAndAwaitRunning(context.Background(), i))
t.Cleanup(func() {
_ = services.StopAndAwaitTerminated(context.Background(), i)
})

// Wait until it's ACTIVE
test.Poll(t, 10*time.Millisecond, ring.ACTIVE, func() interface{} {
return i.lifecycler.GetState()
})

pushSample(t, i, time.Now(), 0)

i.compactBlocks(context.Background())
verifyCompactedHead(t, i, false)
require.NoError(t, testutil.GatherAndCompare(r, strings.NewReader(`
# HELP cortex_ingester_memory_series_created_total The total number of series that were created per user.
# TYPE cortex_ingester_memory_series_created_total counter
cortex_ingester_memory_series_created_total{user="1"} 1

# HELP cortex_ingester_memory_series_removed_total The total number of series that were removed per user.
# TYPE cortex_ingester_memory_series_removed_total counter
cortex_ingester_memory_series_removed_total{user="1"} 0
`), memSeriesCreatedTotalName, memSeriesRemovedTotalName))

// wait one second -- TSDB is now idle.
time.Sleep(cfg.TSDBConfig.HeadCompactionIdleTimeout)

i.compactBlocks(context.Background())
verifyCompactedHead(t, i, true)
require.NoError(t, testutil.GatherAndCompare(r, strings.NewReader(`
# HELP cortex_ingester_memory_series_created_total The total number of series that were created per user.
# TYPE cortex_ingester_memory_series_created_total counter
cortex_ingester_memory_series_created_total{user="1"} 1

# HELP cortex_ingester_memory_series_removed_total The total number of series that were removed per user.
# TYPE cortex_ingester_memory_series_removed_total counter
cortex_ingester_memory_series_removed_total{user="1"} 1
`), memSeriesCreatedTotalName, memSeriesRemovedTotalName))

// Pushing another sample still works.
pushSample(t, i, time.Now(), 0)
verifyCompactedHead(t, i, false)

require.NoError(t, testutil.GatherAndCompare(r, strings.NewReader(`
# HELP cortex_ingester_memory_series_created_total The total number of series that were created per user.
# TYPE cortex_ingester_memory_series_created_total counter
cortex_ingester_memory_series_created_total{user="1"} 2

# HELP cortex_ingester_memory_series_removed_total The total number of series that were removed per user.
# TYPE cortex_ingester_memory_series_removed_total counter
cortex_ingester_memory_series_removed_total{user="1"} 1
`), memSeriesCreatedTotalName, memSeriesRemovedTotalName))
}

func verifyCompactedHead(t *testing.T, i *Ingester, expected bool) {
db := i.getTSDB(userID)
require.NotNil(t, db)

h := db.Head()
require.Equal(t, expected, h.NumSeries() == 0)
}

func pushSample(t *testing.T, i *Ingester, ts time.Time, val float64) {
ctx := user.InjectOrgID(context.Background(), userID)
req, _, _ := mockWriteRequest(labels.Labels{{Name: labels.MetricName, Value: "test"}}, val, util.TimeToMillis(ts))
_, err := i.v2Push(ctx, req)
require.NoError(t, err)
}
2 changes: 2 additions & 0 deletions pkg/storage/tsdb/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ type Config struct {
BucketStore BucketStoreConfig `yaml:"bucket_store"`
HeadCompactionInterval time.Duration `yaml:"head_compaction_interval"`
HeadCompactionConcurrency int `yaml:"head_compaction_concurrency"`
HeadCompactionIdleTimeout time.Duration `yaml:"head_compaction_idle_timeout"`
StripeSize int `yaml:"stripe_size"`
WALCompressionEnabled bool `yaml:"wal_compression_enabled"`
StoreGatewayEnabled bool `yaml:"store_gateway_enabled"`
Expand Down Expand Up @@ -139,6 +140,7 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet) {
f.IntVar(&cfg.MaxTSDBOpeningConcurrencyOnStartup, "experimental.tsdb.max-tsdb-opening-concurrency-on-startup", 10, "limit the number of concurrently opening TSDB's on startup")
f.DurationVar(&cfg.HeadCompactionInterval, "experimental.tsdb.head-compaction-interval", 1*time.Minute, "How frequently does Cortex try to compact TSDB head. Block is only created if data covers smallest block range. Must be greater than 0 and max 5 minutes.")
f.IntVar(&cfg.HeadCompactionConcurrency, "experimental.tsdb.head-compaction-concurrency", 5, "Maximum number of tenants concurrently compacting TSDB head into a new block")
f.DurationVar(&cfg.HeadCompactionIdleTimeout, "experimental.tsdb.head-compaction-idle-timeout", 1*time.Hour, "If TSDB head is idle for this duration, it is compacted. 0 means disabled.")
f.IntVar(&cfg.StripeSize, "experimental.tsdb.stripe-size", 16384, "The number of shards of series to use in TSDB (must be a power of 2). Reducing this will decrease memory footprint, but can negatively impact performance.")
f.BoolVar(&cfg.WALCompressionEnabled, "experimental.tsdb.wal-compression-enabled", false, "True to enable TSDB WAL compression.")
f.BoolVar(&cfg.StoreGatewayEnabled, "experimental.tsdb.store-gateway-enabled", false, "True if the Cortex cluster is running the store-gateway service and the querier should query the bucket store via the store-gateway.")
Expand Down