Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions logstash-core/lib/logstash/instrument/collector.rb
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,12 @@ def get(namespaces_path, key, type)
end
end

##
# @return [Metric]: the metric that exists after registration
def register(namespaces_path, key, &metric_supplier)
@metric_store.fetch_or_store(namespaces_path, key, &metric_supplier)
end

# test injection, see MetricExtFactory
def initialize_metric(type, namespaces_path, key)
MetricType.create(type, namespaces_path, key)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,16 @@ public interface NamespacedMetric extends Metric {
*/
TimerMetric timer(String metric);

/**
* Creates or retrieves a {@link UserMetric} with the provided {@code metric} name,
* using the supplied {@code userMetricFactory}.
* @param metric the name of the metric
* @param userMetricFactory a factory for creating the metric
* @return the resulting metric at the address, whether retrieved or created
* @param <USER_METRIC> the type of metric to create
*/
<USER_METRIC extends UserMetric<?>> USER_METRIC register(String metric, UserMetric.Factory<USER_METRIC> userMetricFactory);

/**
* Increment the {@code metric} metric by 1.
*
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
package co.elastic.logstash.api;

import java.util.function.Function;

/**
* A custom metric.
* @param <VALUE_TYPE> must be jackson-serializable.
*
* NOTE: this interface is experimental and considered internal.
* Its shape may change from one Logstash release to the next.
*/
public interface UserMetric<VALUE_TYPE> {
VALUE_TYPE getValue();

/**
* A {@link UserMetric.Factory<USER_METRIC>} is the primary way to register a custom user-metric
* along-side a null implementation for performance when metrics are disabled.
*
* @param <USER_METRIC> a sub-interface of {@link UserMetric}
*/
interface Factory<USER_METRIC extends UserMetric<?>> {
Class<USER_METRIC> getType();
USER_METRIC create(String name);
USER_METRIC nullImplementation();
}

/**
* A {@link UserMetric.Provider} is an intermediate helper type meant to be statically available by any
* user-provided {@link UserMetric} interface, encapsulating its null implementation and providing
* a way to simply get a {@link UserMetric.Factory} for a given non-null implementation.
*
* @param <USER_METRIC> an interface that extends {@link UserMetric}.
*/
class Provider<USER_METRIC extends UserMetric<?>> {
private final Class<USER_METRIC> type;
private final USER_METRIC nullImplementation;

public Provider(final Class<USER_METRIC> type, final USER_METRIC nullImplementation) {
assert type.isInterface() : String.format("type must be an interface, got %s", type);

this.type = type;
this.nullImplementation = nullImplementation;
}

public Factory<USER_METRIC> getFactory(final Function<String, USER_METRIC> supplier) {
return new Factory<USER_METRIC>() {
@Override
public USER_METRIC create(final String name) {
return supplier.apply(name);
}

@Override
public Class<USER_METRIC> getType() {
return type;
}

@Override
public USER_METRIC nullImplementation() {
return nullImplementation;
}
};
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,11 @@ public IRubyObject timer(final ThreadContext context, final IRubyObject key) {
return getTimer(context, key);
}

@JRubyMethod
public IRubyObject register(final ThreadContext context, final IRubyObject key, final Block metricSupplier) {
return doRegister(context, key, metricSupplier);
}

@JRubyMethod(required = 1, optional = 1)
public IRubyObject increment(final ThreadContext context, final IRubyObject[] args) {
return doIncrement(context, args);
Expand Down Expand Up @@ -104,5 +109,7 @@ protected abstract IRubyObject doReportTime(ThreadContext context,

protected abstract IRubyObject doDecrement(ThreadContext context, IRubyObject[] args);

protected abstract IRubyObject doRegister(ThreadContext context, IRubyObject key, Block metricSupplier);

public abstract AbstractMetricExt getMetric();
}
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,11 @@ public IRubyObject time(final ThreadContext context,
return doTime(context, namespace, key, block);
}

@JRubyMethod(name = "register")
public IRubyObject register(final ThreadContext context, final IRubyObject namespace, final IRubyObject key, final Block metricSupplier) {
return doRegister(context, namespace, key, metricSupplier);
}

protected abstract IRubyObject doDecrement(ThreadContext context, IRubyObject[] args);

protected abstract IRubyObject doIncrement(ThreadContext context, IRubyObject[] args);
Expand All @@ -88,4 +93,6 @@ protected abstract IRubyObject doReportTime(ThreadContext context, IRubyObject n

protected abstract IRubyObject doTime(ThreadContext context, IRubyObject namespace,
IRubyObject key, Block block);

protected abstract IRubyObject doRegister(ThreadContext context, IRubyObject namespace, IRubyObject key, Block metricSupplier);
}
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,12 @@ protected IRubyObject getTimer(final ThreadContext context,
);
}

@Override
protected IRubyObject doRegister(ThreadContext context, IRubyObject namespace, IRubyObject key, Block supplier) {
MetricExt.validateKey(context, null, key);
return collector.callMethod(context, "register", new IRubyObject[]{normalizeNamespace(namespace), key}, supplier);
}

@Override
protected IRubyObject doReportTime(final ThreadContext context, final IRubyObject namespace,
final IRubyObject key, final IRubyObject duration) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,11 @@ public enum MetricType {
* A flow-rate {@link FlowMetric}, instantiated with one or more backing {@link Metric}{@code <Number>}.
*/
FLOW_RATE("flow/rate"),

/**
* A user metric
*/
USER("user"),
;

private final String type;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,11 @@ public NamespacedMetricExt(final Ruby runtime, final RubyClass metaClass) {
super(runtime, metaClass);
}

@Override
protected IRubyObject doRegister(ThreadContext context, IRubyObject key, Block metricSupplier) {
return metric.register(context, namespaceName, key, metricSupplier);
}

@JRubyMethod(visibility = Visibility.PRIVATE)
public NamespacedMetricExt initialize(final ThreadContext context, final IRubyObject metric,
final IRubyObject namespaceName) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,11 @@ protected IRubyObject doTime(final ThreadContext context, final IRubyObject name
return block.call(context);
}

@Override
protected IRubyObject doRegister(ThreadContext context, IRubyObject namespace, IRubyObject key, Block metricSupplier) {
return context.nil;
}

@Override
protected AbstractNamespacedMetricExt createNamespaced(final ThreadContext context,
final IRubyObject name) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,11 @@ protected IRubyObject doReportTime(final ThreadContext context, final IRubyObjec
return context.nil;
}

@Override
protected IRubyObject doRegister(ThreadContext context, IRubyObject key, Block metricSupplier) {
return context.nil;
}

@Override
@SuppressWarnings("rawtypes")
protected RubyArray getNamespaceName(final ThreadContext context) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
package org.logstash.instrument.metrics;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.jruby.RubySymbol;
import org.jruby.runtime.Block;
import org.jruby.runtime.JavaInternalBlockBody;
import org.jruby.runtime.Signature;
import org.jruby.runtime.ThreadContext;
import org.jruby.runtime.builtin.IRubyObject;
import org.logstash.RubyUtil;

public class UserMetric {
private UserMetric() {}

private static Logger LOGGER = LogManager.getLogger(UserMetric.class);

public static <USER_METRIC extends co.elastic.logstash.api.UserMetric<?>> USER_METRIC fromRubyBase(
final AbstractNamespacedMetricExt metric,
final RubySymbol key,
final co.elastic.logstash.api.UserMetric.Factory<USER_METRIC> metricFactory
) {
final ThreadContext context = RubyUtil.RUBY.getCurrentContext();

final Block metricSupplier = new Block(new JavaInternalBlockBody(context.runtime, Signature.NO_ARGUMENTS) {
@Override
public IRubyObject yield(ThreadContext threadContext, IRubyObject[] iRubyObjects) {
return RubyUtil.toRubyObject(metricFactory.create(key.asJavaString()));
}
});

final IRubyObject result = metric.register(context, key, metricSupplier);
final Class<USER_METRIC> type = metricFactory.getType();
if (!type.isAssignableFrom(result.getJavaClass())) {
LOGGER.warn("UserMetric type mismatch for %s (expected: %s, received: %s); " +
"a null implementation will be substituted", key.asJavaString(), type, result.getJavaClass());
return metricFactory.nullImplementation();
}

return result.toJava(type);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import co.elastic.logstash.api.CounterMetric;
import co.elastic.logstash.api.Metric;
import co.elastic.logstash.api.NamespacedMetric;
import co.elastic.logstash.api.UserMetric;
import org.jruby.RubyArray;
import org.jruby.RubyObject;
import org.jruby.RubySymbol;
Expand All @@ -34,6 +35,7 @@

import java.util.ArrayList;
import java.util.List;
import java.util.Objects;
import java.util.function.Supplier;
import java.util.stream.Stream;

Expand Down Expand Up @@ -67,6 +69,13 @@ public co.elastic.logstash.api.TimerMetric timer(final String metric) {
return TimerMetric.fromRubyBase(metrics, threadContext.getRuntime().newString(metric).intern());
}

@Override
public <USER_METRIC extends UserMetric<?>> USER_METRIC register(String metric, UserMetric.Factory<USER_METRIC> userMetricFactory) {
USER_METRIC userMetric = org.logstash.instrument.metrics.UserMetric.fromRubyBase(metrics, threadContext.runtime.newSymbol(metric), userMetricFactory);

return Objects.requireNonNullElseGet(userMetric, userMetricFactory::nullImplementation);
}

@Override
public NamespacedMetric namespace(final String... key) {
final IRubyObject[] rubyfiedKeys = Stream.of(key)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ public void ensurePassivity(){
nameMap.put(MetricType.GAUGE_RUBYHASH, "gauge/rubyhash");
nameMap.put(MetricType.GAUGE_RUBYTIMESTAMP, "gauge/rubytimestamp");
nameMap.put(MetricType.FLOW_RATE, "flow/rate");
nameMap.put(MetricType.USER, "user");

//ensure we are testing all of the enumerations
assertThat(EnumSet.allOf(MetricType.class).size()).isEqualTo(nameMap.size());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ public class MockNamespacedMetric extends AbstractNamespacedMetricExt {

private static final long serialVersionUID = -6507123659910450215L;

private transient final ConcurrentMap<String, Metric> metrics = new ConcurrentHashMap<>();
private transient final ConcurrentMap<String, Object> metrics = new ConcurrentHashMap<>();

public static MockNamespacedMetric create() {
return new MockNamespacedMetric(RubyUtil.RUBY, RubyUtil.NAMESPACED_METRIC_CLASS);
Expand Down Expand Up @@ -83,6 +83,11 @@ public AbstractMetricExt getMetric() {
return NullMetricExt.create();
}

@Override
protected IRubyObject doRegister(ThreadContext context, IRubyObject key, Block metricSupplier) {
return RubyUtil.toRubyObject(metrics.computeIfAbsent(key.asJavaString(), (k) -> metricSupplier.call(context)));
}

@Override
protected AbstractNamespacedMetricExt createNamespaced(ThreadContext context, IRubyObject name) {
return this;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,17 @@

import co.elastic.logstash.api.Metric;
import co.elastic.logstash.api.NamespacedMetric;
import co.elastic.logstash.api.UserMetric;
import org.assertj.core.data.Percentage;
import org.jruby.RubyHash;
import org.junit.Ignore;
import org.junit.Test;

import java.util.Arrays;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
import java.util.stream.Stream;

import static org.assertj.core.api.Assertions.assertThat;

Expand Down Expand Up @@ -158,4 +163,67 @@ public void testRoot() {
final NamespacedMetric namespaced = root.namespace("someothernamespace");
assertThat(namespaced.namespaceName()).containsExactly("someothernamespace");
}

@Test
public void testRegister() {
final NamespacedMetric metrics = this.getInstance().namespace("testRegister");

CustomMetric leftCustomMetric = metrics.register("left", CorrelatingCustomMetric.FACTORY);

// re-registering the same metric should get the existing instance.
assertThat(metrics.register("left", CorrelatingCustomMetric.FACTORY)).isSameAs(leftCustomMetric);

// registering a new metric should be different instance
CustomMetric rightCustomMetric = metrics.register("right", CorrelatingCustomMetric.FACTORY);
assertThat(rightCustomMetric).isNotSameAs(leftCustomMetric);

// this tests our test-only CustomMetric impl more than anything, but it validates
// that the instances we update are connected to their values.
leftCustomMetric.record("this");
leftCustomMetric.record("that");
rightCustomMetric.record("that");
leftCustomMetric.record("this");
rightCustomMetric.record("another");
rightCustomMetric.record("that");
rightCustomMetric.record("another");

assertThat(leftCustomMetric.getValue()).contains("this=2", "that=1");
assertThat(rightCustomMetric.getValue()).contains("that=2", "another=2");
}

private interface CustomMetric extends UserMetric<String> {
void record(final String value);

UserMetric.Provider<CustomMetric> PROVIDER = new UserMetric.Provider<CustomMetric>(CustomMetric.class, new CustomMetric() {
@Override
public void record(String value) {
// no-op
}

@Override
public String getValue() {
return "";
}
});
}

private static class CorrelatingCustomMetric implements CustomMetric {
private final ConcurrentHashMap<String, Integer> mapping = new ConcurrentHashMap<>();

static UserMetric.Factory<CustomMetric> FACTORY = CustomMetric.PROVIDER.getFactory((name) -> new CorrelatingCustomMetric());

@Override
public void record(String value) {
mapping.compute(value, (k, v) -> v == null ? 1 : v + 1);
}

@Override
public String getValue() {
return Map.copyOf(mapping).entrySet()
.stream()
.sorted(Map.Entry.comparingByKey())
.map((e) -> (e.getKey() + '=' + e.getValue().toString()))
.reduce((a, b) -> a + ';' + b).orElse("EMPTY");
}
}
}