Class: Wurk::Metrics::QueueRollup
- Inherits:
-
Object
- Object
- Wurk::Metrics::QueueRollup
- Includes:
- Component
- Defined in:
- lib/wurk/metrics/queue_rollup.rb
Overview
Leader-only background thread that snapshots each queue's current depth (LLEN) and head-of-line latency into compact per-queue gauge buckets the dashboard's "queue size / latency over time" charts read directly:
qm|1m|<epoch> HASH {<queue>|sz, <queue>|lt} TTL 24h
qm|5m|<epoch> HASH {<queue>|sz, <queue>|lt} TTL 7d
qm|1h|<epoch> HASH {<queue>|sz, <queue>|lt} TTL 30d
Unlike Metrics::Rollup (which SUMS counters rolled up from a source),
size and latency are GAUGES — point-in-time values — so each tick samples
"now" and writes it to the current bucket at every resolution. Within a
coarse (5m/1h) bucket the per-minute ticks overwrite, so the bucket holds
the latest sample in its window (a "last value" downsample, which is the
right summary for a gauge). Leader-gated so N workers don't each sample
the same queues every minute. <epoch> is the UTC start-of-bucket.
Spec: docs/target/sidekiq-ent.md §5.2 (sidekiq.queue.size / sidekiq.queue.latency gauges), §7 Historical tab.
Constant Summary collapse
- PREFIX =
'qm'- SIZE_KIND =
'sz'- LAT_KIND =
'lt'- BUCKETS =
Mirror Metrics::Rollup retention so the dashboard's range selector (24h·1m / 7d·5m / 30d·1h) maps 1:1 to both the throughput and the queue-gauge series.
Wurk::Metrics::Rollup::BUCKETS
- DEFAULT_TICK_SECONDS =
60
Instance Attribute Summary collapse
-
#config ⇒ Object
included
from Component
readonly
Returns the value of attribute config.
Class Method Summary collapse
Instance Method Summary collapse
- #default_tag(dir = Dir.pwd) ⇒ Object included from Component
-
#fire_event(event, oneshot: true, reverse: false, reraise: false) ⇒ Object
included
from Component
Invokes lifecycle hooks for
event. - #handle_exception(ex, ctx = {}) ⇒ Object included from Component
- #hostname ⇒ Object included from Component
- #identity ⇒ Object included from Component
-
#initialize(config) ⇒ QueueRollup
constructor
A new instance of QueueRollup.
-
#leader? ⇒ Boolean
included
from Component
True iff this process currently holds the cluster
dear-leaderlock. -
#logger ⇒ Object
included
from Component
--- delegated to config -------------------------------------------.
- #mono_ms ⇒ Object included from Component
- #process_nonce ⇒ Object included from Component
-
#real_ms ⇒ Object
included
from Component
--- clocks ---------------------------------------------------------.
- #redis ⇒ Object included from Component
-
#safe_thread(name, priority: nil, &block) ⇒ Object
included
from Component
Spawns a named thread that runs
blockunderwatchdog(name). -
#sample(now = ::Time.now) ⇒ Object
One sampling pass, bypassing the leader gate and the sleep loop.
- #start ⇒ Object
- #terminate ⇒ Object
-
#tick(now: ::Time.now) ⇒ Object
Leader-gated: only the elected leader samples, so N workers don't each HSET the same buckets every minute.
-
#tid ⇒ Object
included
from Component
--- identity -------------------------------------------------------.
-
#watchdog(last_words) ⇒ Object
included
from Component
Wraps a block at a thread boundary: any unhandled exception is reported via handle_exception (so it lands in error_handlers / the log) and then re-raised.
Constructor Details
#initialize(config) ⇒ QueueRollup
Returns a new instance of QueueRollup.
46 47 48 49 50 51 52 53 |
# File 'lib/wurk/metrics/queue_rollup.rb', line 46 def initialize(config) @config = config @done = false @mutex = ::Mutex.new @sleeper = ::ConditionVariable.new @tick_interval = config[:metrics_rollup_interval] || DEFAULT_TICK_SECONDS @thread = nil end |
Instance Attribute Details
#config ⇒ Object (readonly) Originally defined in module Component
Returns the value of attribute config.
Class Method Details
.bucket_key(bucket, epoch) ⇒ Object
42 43 44 |
# File 'lib/wurk/metrics/queue_rollup.rb', line 42 def self.bucket_key(bucket, epoch) "#{PREFIX}|#{bucket}|#{epoch}" end |
Instance Method Details
#default_tag(dir = Dir.pwd) ⇒ Object Originally defined in module Component
#fire_event(event, oneshot: true, reverse: false, reraise: false) ⇒ Object Originally defined in module Component
Invokes lifecycle hooks for event. Hooks run in registration order
(or LIFO when reverse: true, used for teardown). A raise in one hook
is reported via handle_exception and does NOT stop the next hook unless
reraise: true (used in tests / fail-fast boot). oneshot: true
clears the bucket after dispatch so the event can't fire twice.
#handle_exception(ex, ctx = {}) ⇒ Object Originally defined in module Component
#hostname ⇒ Object Originally defined in module Component
#identity ⇒ Object Originally defined in module Component
#leader? ⇒ Boolean Originally defined in module Component
True iff this process currently holds the cluster dear-leader lock.
Per spec, the check is performed at call time (Wurk does not cache);
callers must not poll faster than the 60s follower cadence. Returns
false unconditionally when WURK_LEADER=false (or SIDEKIQ_LEADER=false)
is set on the process (opt-out hot-standby). Any Redis error is swallowed →
false, so a transient partition can't propagate as an exception into user
code.
Spec: docs/target/sidekiq-ent.md §6.1.
#logger ⇒ Object Originally defined in module Component
--- delegated to config -------------------------------------------
#mono_ms ⇒ Object Originally defined in module Component
#process_nonce ⇒ Object Originally defined in module Component
#real_ms ⇒ Object Originally defined in module Component
--- clocks ---------------------------------------------------------
#redis ⇒ Object Originally defined in module Component
#safe_thread(name, priority: nil, &block) ⇒ Object Originally defined in module Component
Spawns a named thread that runs block under watchdog(name). The
parent must retain the returned Thread; otherwise GC may not, but
report_on_exception is disabled so we don't double-log on death.
#sample(now = ::Time.now) ⇒ Object
One sampling pass, bypassing the leader gate and the sleep loop. Public so deterministic specs and a manual "sample now" can drive it directly.
84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 |
# File 'lib/wurk/metrics/queue_rollup.rb', line 84 def sample(now = ::Time.now) gauges = queue_gauges return if gauges.empty? fields = gauges.flat_map do |name, (size, lat)| ["#{name}|#{SIZE_KIND}", size, "#{name}|#{LAT_KIND}", lat] end BUCKETS.each do |bucket, (step, ttl)| key = self.class.bucket_key(bucket, (now.to_i / step) * step) redis do |c| c.call('HSET', key, *fields) c.call('EXPIRE', key, ttl) end end nil end |
#start ⇒ Object
55 56 57 58 59 60 61 62 63 |
# File 'lib/wurk/metrics/queue_rollup.rb', line 55 def start @thread ||= safe_thread('queue-metrics') do # rubocop:disable Naming/MemoizedInstanceVariableName wait until @done tick wait end end end |
#terminate ⇒ Object
65 66 67 68 69 70 |
# File 'lib/wurk/metrics/queue_rollup.rb', line 65 def terminate @mutex.synchronize do @done = true @sleeper.signal end end |
#tick(now: ::Time.now) ⇒ Object
Leader-gated: only the elected leader samples, so N workers don't each HSET the same buckets every minute.
74 75 76 77 78 79 80 |
# File 'lib/wurk/metrics/queue_rollup.rb', line 74 def tick(now: ::Time.now) return unless leader? sample(now) rescue StandardError => e handle_exception(e, { context: 'queue-metrics' }) end |
#tid ⇒ Object Originally defined in module Component
--- identity -------------------------------------------------------
#watchdog(last_words) ⇒ Object Originally defined in module Component
Wraps a block at a thread boundary: any unhandled exception is reported
via handle_exception (so it lands in error_handlers / the log) and then
re-raised. last_words is the component label included in the context.