Skip to content

feat: support disabling the partition consumers cache #17318

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions docs/sources/shared/configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -1218,9 +1218,9 @@ ingest_limits_frontend:
# CLI flag: -ingest-limits-frontend.num-partitions
[num_partitions: <int> | default = 64]

# The TTL for the stream usage cache.
# CLI flag: -ingest-limits-frontend.partition-id-cache-ttl
[partition_id_cache_ttl: <duration> | default = 1m]
# The TTL for the assigned partitions cache. 0 disables the cache.
# CLI flag: -ingest-limits-frontend.assigned-partitions-cache-ttl
[assigned_partitions_cache_ttl: <duration> | default = 1m]

ingest_limits_frontend_client:
# Configures client gRPC connections to limits service.
Expand Down
1 change: 0 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,6 @@ require (
github.com/grafana/loki/pkg/push v0.0.0-20240924133635-758364c7775f
github.com/heroku/x v0.4.3
github.com/influxdata/tdigest v0.0.2-0.20210216194612-fc98d27c9e8b
github.com/jellydator/ttlcache/v3 v3.3.0
github.com/mitchellh/colorstring v0.0.0-20190213212951-d06e56a500db
github.com/ncw/swift/v2 v2.0.3
github.com/parquet-go/parquet-go v0.25.0
Expand Down
2 changes: 0 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -786,8 +786,6 @@ github.com/jcmturner/gokrb5/v8 v8.4.4 h1:x1Sv4HaTpepFkXbt2IkL29DXRf8sOfZXo8eRKh6
github.com/jcmturner/gokrb5/v8 v8.4.4/go.mod h1:1btQEpgT6k+unzCwX1KdWMEwPPkkgBtP+F6aCACiMrs=
github.com/jcmturner/rpc/v2 v2.0.3 h1:7FXXj8Ti1IaVFpSAziCZWNzbNuZmnvw/i6CqLNdWfZY=
github.com/jcmturner/rpc/v2 v2.0.3/go.mod h1:VUJYCIDm3PVOEHw8sgt091/20OJjskO/YJki3ELg/Hc=
github.com/jellydator/ttlcache/v3 v3.3.0 h1:BdoC9cE81qXfrxeb9eoJi9dWrdhSuwXMAnHTbnBm4Wc=
github.com/jellydator/ttlcache/v3 v3.3.0/go.mod h1:bj2/e0l4jRnQdrnSTaGTsh4GSXvMjQcy41i7th0GVGw=
github.com/jmespath/go-jmespath v0.0.0-20180206201540-c2b33e8439af/go.mod h1:Nht3zPeWKUH0NzdCt2Blrr5ys8VGpn0CEB0cQHVjt7k=
github.com/jmespath/go-jmespath v0.4.0 h1:BEgLn5cpjn8UN1mAw4NjwDrS35OdebyEtFe+9YPoQUg=
github.com/jmespath/go-jmespath v0.4.0/go.mod h1:T8mJZnbsbmF+m6zOOFylbeCJqk5+pHWvzYPziyZiYoo=
Expand Down
113 changes: 105 additions & 8 deletions pkg/limits/frontend/cache.go
Original file line number Diff line number Diff line change
@@ -1,18 +1,115 @@
package frontend

import (
"sync"
"time"

"github.com/jellydator/ttlcache/v3"

"github.com/grafana/loki/v3/pkg/logproto"
"github.com/coder/quartz"
)

type PartitionConsumersCache = ttlcache.Cache[string, *logproto.GetAssignedPartitionsResponse]
type Cache[K comparable, V any] interface {
// Get returns the value for the key. It returns true if the key exists,
// otherwise false.
Get(K) (V, bool)
// Set stores the value for the key.
Set(K, V)
// Delete removes the key. If the key does not exist, the operation is a
// no-op.
Delete(K)
// Reset removes all keys.
Reset()
}

// item contains the value and expiration time for a key.
type item[V any] struct {
value V
expiresAt time.Time
}

func (i *item[V]) hasExpired(now time.Time) bool {
return i.expiresAt.Before(now) || i.expiresAt.Equal(now)
}

// TTLCache is a simple, thread-safe cache with a single per-cache TTL.
type TTLCache[K comparable, V any] struct {
items map[K]item[V]
ttl time.Duration
mu sync.RWMutex

// Used for tests.
clock quartz.Clock
}

func NewTTLCache[K comparable, V any](ttl time.Duration) *TTLCache[K, V] {
return &TTLCache[K, V]{
items: make(map[K]item[V]),
ttl: ttl,
clock: quartz.NewReal(),
}
}

func NewPartitionConsumerCache(ttl time.Duration) *PartitionConsumersCache {
return ttlcache.New(
ttlcache.WithTTL[string, *logproto.GetAssignedPartitionsResponse](ttl),
ttlcache.WithDisableTouchOnHit[string, *logproto.GetAssignedPartitionsResponse](),
// Get implements Cache.Get.
func (c *TTLCache[K, V]) Get(key K) (V, bool) {
var (
value V
exists bool
now = c.clock.Now()
)
c.mu.RLock()
defer c.mu.RUnlock()
if item, ok := c.items[key]; ok && !item.hasExpired(now) {
value = item.value
exists = true
}
return value, exists
}

// Set implements Cache.Set.
func (c *TTLCache[K, V]) Set(key K, value V) {
now := c.clock.Now()
c.mu.Lock()
defer c.mu.Unlock()
c.items[key] = item[V]{
value: value,
expiresAt: now.Add(c.ttl),
}
c.removeExpiredItems(now)
}

// Delete implements Cache.Delete.
func (c *TTLCache[K, V]) Delete(key K) {
c.mu.Lock()
defer c.mu.Unlock()
delete(c.items, key)
}

// Reset implements Cache.Reset.
func (c *TTLCache[K, V]) Reset() {
c.mu.Lock()
defer c.mu.Unlock()
c.items = make(map[K]item[V])
}

// removeExpiredItems removes expired items.
func (c *TTLCache[K, V]) removeExpiredItems(now time.Time) {
for key, item := range c.items {
if item.hasExpired(now) {
delete(c.items, key)
}
}
}

// NopCache is a no-op cache. It does not store any keys. It is used in tests
// and as a stub for disabled caches.
type NopCache[K comparable, V any] struct{}

func NewNopCache[K comparable, V any]() *NopCache[K, V] {
return &NopCache[K, V]{}
}
func (c *NopCache[K, V]) Get(_ K) (V, bool) {
var value V
return value, false
}
func (c *NopCache[K, V]) Set(_ K, _ V) {}
func (c *NopCache[K, V]) Delete(_ K) {}
func (c *NopCache[K, V]) Reset() {}
141 changes: 141 additions & 0 deletions pkg/limits/frontend/cache_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,141 @@
package frontend

import (
"testing"
"time"

"github.com/coder/quartz"
"github.com/stretchr/testify/require"
)

func TestTTLCache_Get(t *testing.T) {
c := NewTTLCache[string, string](time.Minute)
clock := quartz.NewMock(t)
c.clock = clock
// The value should be absent.
value, ok := c.Get("foo")
require.Equal(t, "", value)
require.False(t, ok)
// Set the value and it should be present.
c.Set("foo", "bar")
value, ok = c.Get("foo")
require.Equal(t, "bar", value)
require.True(t, ok)
// Advance the time to be 1 second before the expiration time.
clock.Advance(59 * time.Second)
value, ok = c.Get("foo")
require.Equal(t, "bar", value)
require.True(t, ok)
// Advance the time to be equal to the expiration time, the value should
// be absent.
clock.Advance(time.Second)
value, ok = c.Get("foo")
require.Equal(t, "", value)
require.False(t, ok)
// Advance the time past the expiration time, the value should still be
// absent.
clock.Advance(time.Second)
value, ok = c.Get("foo")
require.Equal(t, "", value)
require.False(t, ok)
}

func TestTTLCache_Set(t *testing.T) {
c := NewTTLCache[string, string](time.Minute)
clock := quartz.NewMock(t)
c.clock = clock
c.Set("foo", "bar")
item1, ok := c.items["foo"]
require.True(t, ok)
require.Equal(t, c.clock.Now().Add(time.Minute), item1.expiresAt)
// Set should refresh the expiration time.
clock.Advance(time.Second)
c.Set("foo", "bar")
item2, ok := c.items["foo"]
require.True(t, ok)
require.Greater(t, item2.expiresAt, item1.expiresAt)
require.Equal(t, item2.expiresAt, item1.expiresAt.Add(time.Second))
// Set should replace the value.
c.Set("foo", "baz")
value, ok := c.Get("foo")
require.True(t, ok)
require.Equal(t, "baz", value)
}

func TestTTLCache_Delete(t *testing.T) {
c := NewTTLCache[string, string](time.Minute)
clock := quartz.NewMock(t)
c.clock = clock
// Set the value and it should be present.
c.Set("foo", "bar")
value, ok := c.Get("foo")
require.True(t, ok)
require.Equal(t, "bar", value)
// Delete the value, it should be absent.
c.Delete("foo")
value, ok = c.Get("foo")
require.False(t, ok)
require.Equal(t, "", value)
}

func TestTTLCache_Reset(t *testing.T) {
c := NewTTLCache[string, string](time.Minute)
clock := quartz.NewMock(t)
c.clock = clock
// Set two values, both should be present.
c.Set("foo", "bar")
value, ok := c.Get("foo")
require.True(t, ok)
require.Equal(t, "bar", value)
c.Set("bar", "baz")
value, ok = c.Get("bar")
require.True(t, ok)
require.Equal(t, "baz", value)
// Reset the cache, all should be absent.
c.Reset()
value, ok = c.Get("foo")
require.False(t, ok)
require.Equal(t, "", value)
value, ok = c.Get("bar")
require.False(t, ok)
require.Equal(t, "", value)
// Should be able to set values following a reset.
c.Set("baz", "qux")
value, ok = c.Get("baz")
require.True(t, ok)
require.Equal(t, "qux", value)
}

func TestTTLCache_RemoveExpiredItems(t *testing.T) {
c := NewTTLCache[string, string](time.Minute)
clock := quartz.NewMock(t)
c.clock = clock
c.Set("foo", "bar")
_, ok := c.items["foo"]
require.True(t, ok)
// Advance the clock and update foo, it should not be removed.
clock.Advance(time.Minute)
c.Set("foo", "bar")
_, ok = c.items["foo"]
require.True(t, ok)
// Advance the clock again but this time set bar, foo should be removed.
clock.Advance(time.Minute)
c.Set("bar", "baz")
_, ok = c.items["foo"]
require.False(t, ok)
_, ok = c.items["bar"]
require.True(t, ok)
}

func TestNopCache(t *testing.T) {
c := NewNopCache[string, string]()
// The value should be absent.
value, ok := c.Get("foo")
require.Equal(t, "", value)
require.False(t, ok)
// Despite setting the value, it should still be absent.
c.Set("foo", "bar")
value, ok = c.Get("foo")
require.Equal(t, "", value)
require.False(t, ok)
}
12 changes: 6 additions & 6 deletions pkg/limits/frontend/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,19 +13,19 @@ import (

// Config contains the config for an ingest-limits-frontend.
type Config struct {
ClientConfig limits_client.Config `yaml:"client_config"`
LifecyclerConfig ring.LifecyclerConfig `yaml:"lifecycler,omitempty"`
RecheckPeriod time.Duration `yaml:"recheck_period"`
NumPartitions int `yaml:"num_partitions"`
PartitionIDCacheTTL time.Duration `yaml:"partition_id_cache_ttl"`
ClientConfig limits_client.Config `yaml:"client_config"`
LifecyclerConfig ring.LifecyclerConfig `yaml:"lifecycler,omitempty"`
RecheckPeriod time.Duration `yaml:"recheck_period"`
NumPartitions int `yaml:"num_partitions"`
AssignedPartitionsCacheTTL time.Duration `yaml:"assigned_partitions_cache_ttl"`
}

func (cfg *Config) RegisterFlags(f *flag.FlagSet) {
cfg.ClientConfig.RegisterFlagsWithPrefix("ingest-limits-frontend", f)
cfg.LifecyclerConfig.RegisterFlagsWithPrefix("ingest-limits-frontend.", f, util_log.Logger)
f.DurationVar(&cfg.RecheckPeriod, "ingest-limits-frontend.recheck-period", 10*time.Second, "The period to recheck per tenant ingestion rate limit configuration.")
f.IntVar(&cfg.NumPartitions, "ingest-limits-frontend.num-partitions", 64, "The number of partitions to use for the ring.")
f.DurationVar(&cfg.PartitionIDCacheTTL, "ingest-limits-frontend.partition-id-cache-ttl", 1*time.Minute, "The TTL for the stream usage cache.")
f.DurationVar(&cfg.AssignedPartitionsCacheTTL, "ingest-limits-frontend.assigned-partitions-cache-ttl", 1*time.Minute, "The TTL for the assigned partitions cache. 0 disables the cache.")
}

func (cfg *Config) Validate() error {
Expand Down
38 changes: 21 additions & 17 deletions pkg/limits/frontend/frontend.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,11 +58,11 @@ type Frontend struct {
cfg Config
logger log.Logger

limits Limits
rateLimiter *limiter.RateLimiter
streamUsage StreamUsageGatherer
partitionIDCache *PartitionConsumersCache
metrics *metrics
limits Limits
rateLimiter *limiter.RateLimiter
streamUsage StreamUsageGatherer
assignedPartitionsCache Cache[string, *logproto.GetAssignedPartitionsResponse]
metrics *metrics

subservices *services.Manager
subservicesWatcher *services.FailureWatcher
Expand All @@ -84,18 +84,25 @@ func New(cfg Config, ringName string, limitsRing ring.ReadRing, limits Limits, l
logger,
)

var assignedPartitionsCache Cache[string, *logproto.GetAssignedPartitionsResponse]
if cfg.AssignedPartitionsCacheTTL == 0 {
// When the TTL is 0, the cache is disabled.
assignedPartitionsCache = NewNopCache[string, *logproto.GetAssignedPartitionsResponse]()
} else {
assignedPartitionsCache = NewTTLCache[string, *logproto.GetAssignedPartitionsResponse](cfg.AssignedPartitionsCacheTTL)
}

rateLimiter := limiter.NewRateLimiter(newRateLimitsAdapter(limits), cfg.RecheckPeriod)
partitionIDCache := NewPartitionConsumerCache(cfg.PartitionIDCacheTTL)
streamUsage := NewRingStreamUsageGatherer(limitsRing, clientPool, logger, partitionIDCache, cfg.NumPartitions)
streamUsage := NewRingStreamUsageGatherer(limitsRing, clientPool, cfg.NumPartitions, assignedPartitionsCache, logger)

f := &Frontend{
cfg: cfg,
logger: logger,
limits: limits,
rateLimiter: rateLimiter,
streamUsage: streamUsage,
partitionIDCache: partitionIDCache,
metrics: newMetrics(reg),
cfg: cfg,
logger: logger,
limits: limits,
rateLimiter: rateLimiter,
streamUsage: streamUsage,
assignedPartitionsCache: assignedPartitionsCache,
metrics: newMetrics(reg),
}

lifecycler, err := ring.NewLifecycler(cfg.LifecyclerConfig, f, RingName, RingKey, true, logger, reg)
Expand Down Expand Up @@ -138,8 +145,6 @@ func (f *Frontend) starting(ctx context.Context) (err error) {
return fmt.Errorf("failed to start subservices: %w", err)
}

go f.partitionIDCache.Start()

return nil
}

Expand All @@ -155,7 +160,6 @@ func (f *Frontend) running(ctx context.Context) error {

// stopping implements services.Service.
func (f *Frontend) stopping(_ error) error {
f.partitionIDCache.Stop()
return services.StopManagerAndAwaitStopped(context.Background(), f.subservices)
}

Expand Down
Loading
Loading