-
Notifications
You must be signed in to change notification settings - Fork 25.8k
Description
Summary
DataCountsReporter.writeUnreportedCounts() is synchronized and can block on JobDataCountsPersister.persistDataCounts(..., mustWait=true) which waits on latches. While this thread is waiting, it holds the DataCountsReporter intrinsic lock, blocking other threads that call synchronized methods on the same instance—most notably runningTotalStats(), which is used to serve _ml/anomaly_detectors/_stats / GetJobsStats requests (management threadpool). This shows up as intrinsic lock contention and parked threads in CountDownLatch.await().
Evidence / symptoms
Representative stack trace from a thread such as ml_job_comms:
DataCountsReporter.writeUnreportedCounts(DataCountsReporter.java:263)JobDataCountsPersister.persistDataCounts(JobDataCountsPersister.java:105)CountDownLatch.await(...)
This indicates the thread is waiting for a previous counts persist to finish and/or waiting for its own persist callback to count down the latch, while still inside the synchronized method.
Impact
- Increased latency / stalls when retrieving job stats (
GetJobsStats, Kibana polling, monitoring collectors), because those code paths callDataCountsReporter.runningTotalStats()(alsosynchronized) and must wait for the lock. - Contention is amplified when counts persistence is slow/backed up (e.g., slow results index writes, retries, shard issues, threadpool pressure), or when
writeUnreportedCounts()is called frequently (e.g.,flushJob(),close()).
Key code references
DataCountsReporter.writeUnreportedCounts()issynchronizedand calls a potentially blocking persistence withmustWait=true:x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/DataCountsReporter.java
DataCountsReporter.runningTotalStats()is alsosynchronizedand is called to serve job stats:AutodetectCommunicator.getDataCounts()→runningTotalStats()TransportGetJobsStatsActionexecutes on theMANAGEMENTthreadpool and usesprocessManager.getStatistics()which fetchescommunicator.getDataCounts()
JobDataCountsPersister.persistDataCounts()waits on per-job latches (previousLatch.await(),latch.await()):x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/persistence/JobDataCountsPersister.java
Why this looks like a bug / outlier
Elsewhere in the codebase, similar “wait for async work to finish” logic often avoids holding an intrinsic lock during blocking waits by copying the awaited handle under lock and waiting outside, e.g.:
ShortCircuitingRenormalizer.waitUntilIdle()captures the latch in a synchronized block and callsawait()outside the lock.
DataCountsReporter differs because the same lock gates both:
- waiting for persistence (
writeUnreportedCounts()), and - answering stats reads (
runningTotalStats()used by management/stats APIs).
Suggested resolution
Refactor DataCountsReporter.writeUnreportedCounts() to avoid blocking while holding the DataCountsReporter intrinsic monitor, e.g.:
- Snapshot-under-lock, persist outside lock:
- Under
synchronized (this), copyunreportedStatsto a local variable (and possibly set a state flag). - Release the lock.
- Call
persistDataCounts(..., mustWait=true)outside the lock. - Re-acquire lock to clear
unreportedStats(or update it if persistence failed / was superseded).
- Under
Possible approaches:
- Replace the
synchronized+ field pattern with anAtomicReference<DataCounts>forunreportedStats, using CAS to avoid races. - Keep
synchronizedbut only for short critical sections; never callawait()or persistence while holding the lock.
Why the change is safe / desirable
- Preserves intent: ensure unreported counts are persisted eventually and serialized per job.
- Removes lock coupling between persistence delays and stats-read paths.
- Prevents management/stat requests from being blocked by slow indexing / retry backpressure in the results index.
- Aligns with safer concurrency patterns used elsewhere (capture awaited state under lock, wait outside).
Reproduction / how to observe
- Force slow ML results index writes (e.g., heavy indexing load, shard relocation, delayed writes).
- Trigger frequent
flushJob()and/or job close while concurrently polling job stats (_ml/anomaly_detectors/_stats). - Observe thread dumps showing:
ml_job_commsparked inJobDataCountsPersister.persistDataCountsawaiting a latch insideDataCountsReporter.writeUnreportedCounts(), and- management threads blocked trying to enter
DataCountsReporter.runningTotalStats().
Acceptance criteria
DataCountsReporterno longer holds its monitor while waiting for persistence completion.- Under persistence backpressure, stats reads do not block on
DataCountsReporter’s intrinsic lock (or are significantly reduced), and thread dumps no longer showrunningTotalStats()blocked behindwriteUnreportedCounts()waiting on a latch.