Skip to content
5 changes: 5 additions & 0 deletions docs/changelog/138282.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
pr: 138282
summary: Allow fast blob-cache introspection by shard-id
area: Searchable Snapshots
type: enhancement
issues: []
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/

package org.elasticsearch.blobcache.shared;

import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.BiConsumer;
import java.util.function.Function;

/**
* A 2 layer key mapping for the shared cache.
* @param <Key1> The outer layer key type
* @param <Key2> The inner key type
* @param <Value> The value type
*/
class KeyMapping<Key1, Key2, Value> {
private final ConcurrentHashMap<Key1, ConcurrentHashMap<Key2, Value>> mapping = new ConcurrentHashMap<>();

public Value get(Key1 key1, Key2 key2) {
ConcurrentHashMap<Key2, Value> inner = mapping.get(key1);
if (inner != null) {
return inner.get(key2);
} else {
return null;
}
}

/**
* Compute a key if absent. Notice that unlike CHM#computeIfAbsent, locking will be done also when present
* @param key1 The key1 part
* @param key2 The key2 part
* @param function the function to get from key2 to the value
* @return the resulting value.
*/
public Value computeIfAbsent(Key1 key1, Key2 key2, Function<Key2, Value> function) {
AtomicReference<Value> result = new AtomicReference<>();
mapping.compute(key1, (k, current) -> {
ConcurrentHashMap<Key2, Value> map = current == null ? new ConcurrentHashMap<>() : current;
result.setPlain(map.computeIfAbsent(key2, function));
return map;
});
return result.getPlain();
}

public boolean remove(Key1 key1, Key2 key2, Value value) {
ConcurrentHashMap<Key2, Value> inner = mapping.get(key1);
if (inner != null) {
boolean removed = inner.remove(key2, value);
if (removed && inner.isEmpty()) {
mapping.computeIfPresent(key1, (k, v) -> v.isEmpty() ? null : v);
}
return removed;
}
return false;
}

Iterable<Key1> key1s() {
return mapping.keySet();
}

void forEach(Key1 key1, BiConsumer<Key2, Value> consumer) {
ConcurrentHashMap<Key2, Value> map = mapping.get(key1);
if (map != null) {
map.forEach(consumer);
}
}

void forEach(BiConsumer<Key2, Value> consumer) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we make this private?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We cannot, it is used in forceEvict here (an unchanged line).

for (ConcurrentHashMap<Key2, Value> map : mapping.values()) {
map.forEach(consumer);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.env.Environment;
import org.elasticsearch.env.NodeEnvironment;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.index.store.LuceneFilesExtensions;
import org.elasticsearch.monitor.fs.FsProbe;
import org.elasticsearch.node.NodeRoleSettings;
Expand Down Expand Up @@ -78,7 +79,11 @@
/**
* A caching layer on a local node to minimize network roundtrips to the remote blob store.
*/
public class SharedBlobCacheService<KeyType> implements Releasable {
public class SharedBlobCacheService<KeyType extends SharedBlobCacheService.KeyBase> implements Releasable {

public interface KeyBase {
ShardId shardId();
}

private static final String SHARED_CACHE_SETTINGS_PREFIX = "xpack.searchable.snapshot.shared_cache.";

Expand Down Expand Up @@ -301,6 +306,8 @@ private interface Cache<K, T> extends Releasable {
int forceEvict(Predicate<K> cacheKeyPredicate);

void forceEvictAsync(Predicate<K> cacheKey);

int forceEvict(ShardId shard, Predicate<K> cacheKeyPredicate);
}

private abstract static class CacheEntry<T> {
Expand Down Expand Up @@ -759,7 +766,7 @@ public Stats getStats() {
}

public void removeFromCache(KeyType cacheKey) {
forceEvict(cacheKey::equals);
forceEvict(cacheKey.shardId(), cacheKey::equals);
}

/**
Expand All @@ -770,7 +777,10 @@ public void removeFromCache(KeyType cacheKey) {
*/
public int forceEvict(Predicate<KeyType> cacheKeyPredicate) {
return cache.forceEvict(cacheKeyPredicate);
}

public int forceEvict(ShardId shard, Predicate<KeyType> cacheKeyPredicate) {
return cache.forceEvict(shard, cacheKeyPredicate);
}

/**
Expand Down Expand Up @@ -867,7 +877,7 @@ protected boolean assertOffsetsWithinFileLength(long offset, long length, long f
* always be used, ensuring the right ordering between incRef/tryIncRef and ensureOpen
* (see {@link SharedBlobCacheService.LFUCache#maybeEvictAndTakeForFrequency(Runnable, int)})
*/
static class CacheFileRegion<KeyType> extends EvictableRefCounted {
static class CacheFileRegion<KeyType extends KeyBase> extends EvictableRefCounted {

private static final VarHandle VH_IO = findIOVarHandle();

Expand Down Expand Up @@ -1634,7 +1644,7 @@ void touch() {
}
}

private final ConcurrentHashMap<RegionKey<KeyType>, LFUCacheEntry> keyMapping = new ConcurrentHashMap<>();
private final KeyMapping<ShardId, RegionKey<KeyType>, LFUCacheEntry> keyMapping = new KeyMapping<>();
private final LFUCacheEntry[] freqs;
private final int maxFreq;
private final DecayAndNewEpochTask decayAndNewEpochTask;
Expand All @@ -1653,8 +1663,9 @@ public void close() {
decayAndNewEpochTask.close();
}

// used by tests
int getFreq(CacheFileRegion<KeyType> cacheFileRegion) {
return keyMapping.get(cacheFileRegion.regionKey).freq;
return keyMapping.get(cacheFileRegion.regionKey.file().shardId(), cacheFileRegion.regionKey).freq;
}

@Override
Expand All @@ -1663,10 +1674,11 @@ public LFUCacheEntry get(KeyType cacheKey, long fileLength, int region) {
final long now = epoch.get();
// try to just get from the map on the fast-path to save instantiating the capturing lambda needed on the slow path
// if we did not find an entry
var entry = keyMapping.get(regionKey);
var entry = keyMapping.get(cacheKey.shardId(), regionKey);
if (entry == null) {
final int effectiveRegionSize = computeCacheFileRegionSize(fileLength, region);
entry = keyMapping.computeIfAbsent(
cacheKey.shardId(),
regionKey,
key -> new LFUCacheEntry(new CacheFileRegion<KeyType>(SharedBlobCacheService.this, key, effectiveRegionSize), now)
);
Expand Down Expand Up @@ -1706,7 +1718,7 @@ public int forceEvict(Predicate<KeyType> cacheKeyPredicate) {
boolean evicted = entry.chunk.forceEvict();
if (evicted && entry.chunk.volatileIO() != null) {
unlink(entry);
keyMapping.remove(entry.chunk.regionKey, entry);
keyMapping.remove(entry.chunk.regionKey.file.shardId(), entry.chunk.regionKey, entry);
evictedCount++;
if (frequency > 0) {
nonZeroFrequencyEvictedCount++;
Expand All @@ -1719,6 +1731,10 @@ public int forceEvict(Predicate<KeyType> cacheKeyPredicate) {
return evictedCount;
}

private boolean removeKeyMappingForEntry(LFUCacheEntry entry) {
return keyMapping.remove(entry.chunk.regionKey.file().shardId(), entry.chunk.regionKey, entry);
}

@Override
public void forceEvictAsync(Predicate<KeyType> cacheKeyPredicate) {
asyncEvictionsRunner.enqueueTask(new ActionListener<>() {
Expand All @@ -1739,10 +1755,42 @@ public void onFailure(Exception e) {
});
}

@Override
public int forceEvict(ShardId shard, Predicate<KeyType> cacheKeyPredicate) {
final List<LFUCacheEntry> matchingEntries = new ArrayList<>();
keyMapping.forEach(shard, (key, entry) -> {
if (cacheKeyPredicate.test(key.file)) {
matchingEntries.add(entry);
}
});

var evictedCount = 0;
var nonZeroFrequencyEvictedCount = 0;
if (matchingEntries.isEmpty() == false) {
synchronized (SharedBlobCacheService.this) {
for (LFUCacheEntry entry : matchingEntries) {
int frequency = entry.freq;
boolean evicted = entry.chunk.forceEvict();
if (evicted && entry.chunk.volatileIO() != null) {
unlink(entry);
assert shard.equals(entry.chunk.regionKey.file.shardId());
keyMapping.remove(shard, entry.chunk.regionKey, entry);
evictedCount++;
if (frequency > 0) {
nonZeroFrequencyEvictedCount++;
}
}
}
}
}
blobCacheMetrics.getEvictedCountNonZeroFrequency().incrementBy(nonZeroFrequencyEvictedCount);
return evictedCount;
}

private LFUCacheEntry initChunk(LFUCacheEntry entry) {
assert Thread.holdsLock(entry.chunk);
RegionKey<KeyType> regionKey = entry.chunk.regionKey;
if (keyMapping.get(regionKey) != entry) {
if (keyMapping.get(regionKey.file().shardId(), regionKey) != entry) {
throwAlreadyClosed("no free region found (contender)");
}
// new item
Expand All @@ -1765,7 +1813,7 @@ private LFUCacheEntry initChunk(LFUCacheEntry entry) {
if (io != null) {
assignToSlot(entry, io);
} else {
boolean removed = keyMapping.remove(regionKey, entry);
boolean removed = removeKeyMappingForEntry(entry);
assert removed;
throwAlreadyClosed("no free region found");
}
Expand All @@ -1780,7 +1828,7 @@ private void assignToSlot(LFUCacheEntry entry, SharedBytes.IO freeSlot) {
if (entry.chunk.isEvicted()) {
assert regionOwners.remove(freeSlot) == entry.chunk;
freeRegions.add(freeSlot);
keyMapping.remove(entry.chunk.regionKey, entry);
removeKeyMappingForEntry(entry);
throwAlreadyClosed("evicted during free region allocation");
}
pushEntryToBack(entry);
Expand Down Expand Up @@ -1985,7 +2033,7 @@ private SharedBytes.IO maybeEvictAndTakeForFrequency(Runnable evictedNotificatio
}
} finally {
unlink(entry);
keyMapping.remove(entry.chunk.regionKey, entry);
removeKeyMappingForEntry(entry);
}
}
} finally {
Expand Down Expand Up @@ -2020,7 +2068,7 @@ public boolean maybeEvictLeastUsed() {
boolean evicted = entry.chunk.tryEvict();
if (evicted && entry.chunk.volatileIO() != null) {
unlink(entry);
keyMapping.remove(entry.chunk.regionKey, entry);
removeKeyMappingForEntry(entry);
return true;
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,113 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/

package org.elasticsearch.blobcache.shared;

import org.elasticsearch.test.ESTestCase;

import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.stream.IntStream;

public class KeyMappingTests extends ESTestCase {

public void testBasics() {
final String k1 = randomAlphanumericOfLength(10);
final String k2 = randomAlphanumericOfLength(10);
final String value = randomAlphanumericOfLength(10);
KeyMapping<String, String, String> mapping = new KeyMapping<>();
assertNull(mapping.get(k1, k2));

assertEquals(value, mapping.computeIfAbsent(k1, k2, (kx) -> value));
assertEquals(value, mapping.get(k1, k2));

mapping.computeIfAbsent(k1, k2, (kx) -> { throw new AssertionError(); });

assertEquals(value, mapping.get(k1, k2));

final String k12 = randomValueOtherThan(k1, () -> randomAlphanumericOfLength(10));
mapping.computeIfAbsent(k12, k2, (kx) -> randomAlphanumericOfLength(10));

assertEquals(value, mapping.get(k1, k2));

assertEquals(Set.of(k1, k12), mapping.key1s());

Set<String> values = new HashSet<>();
mapping.forEach(k1, (ak2, result) -> { assertTrue(values.add(result)); });
assertEquals(Set.of(value), values);

assertTrue(mapping.remove(k1, k2, value));

assertEquals(Set.of(k12), mapping.key1s());

assertNull(mapping.get(k1, k2));
assertNotNull(mapping.get(k12, k2));

assertFalse(mapping.remove(k1, k2, value));
}

public void testMultiThreaded() {
final String k1 = randomAlphanumericOfLength(10);
KeyMapping<String, String, Integer> mapping = new KeyMapping<>();

List<Thread> threads = IntStream.range(0, 10).mapToObj(i -> new Thread(() -> {
final String k2 = Integer.toString(i);
logger.info(k2);

for (int j = 0; j < 1000; ++j) {
Integer finalJ = j;
assertNull(mapping.get(k1, k2));
assertSame(finalJ, mapping.computeIfAbsent(k1, k2, (kx) -> finalJ));
assertEquals(finalJ, mapping.get(k1, k2));
assertTrue(mapping.remove(k1, k2, finalJ));
if ((j & 1) == 0) {
assertFalse(mapping.remove(k1, k2, finalJ));
}

}
assertNull(mapping.get(k1, k2));
}, "test-thread-" + i)).toList();

threads.forEach(Thread::start);
threads.forEach(t -> {
try {
t.join(10000);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
});

assertEquals(Set.of(), mapping.key1s());
}

public void testMultiThreadedSameKey() {
final String k1 = randomAlphanumericOfLength(10);
KeyMapping<String, String, Integer> mapping = new KeyMapping<>();

List<Thread> threads = IntStream.range(0, 10).mapToObj(i -> new Thread(() -> {
for (int j = 0; j < 1000; ++j) {
Integer computeValue = i * 1000 + j;
Integer value = mapping.computeIfAbsent(k1, k1, (kx) -> computeValue);
assertNotNull(value);
// either our value or another threads value.
assertTrue(value == computeValue || value / 1000 != i);
mapping.remove(k1, k1, value);
}
})).toList();
threads.forEach(Thread::start);
threads.forEach(t -> {
try {
t.join(10000);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
});

assertEquals(Set.of(), mapping.key1s());
}
}
Loading