Class: Wurk::Metrics::Rollup
- Inherits:
-
Object
- Object
- Wurk::Metrics::Rollup
- Includes:
- Component
- Defined in:
- lib/wurk/metrics/rollup.rb
Overview
Leader-only background thread that rolls the per-class minute buckets
written by Wurk::Metrics::History (j|YYMMDD|H:M) up into compact,
cluster-total time-series buckets the dashboard "throughput" / "failures"
charts read directly:
jr|1m|<epoch> HASH {p,f,ms} TTL 24h (1-minute resolution)
jr|5m|<epoch> HASH {p,f,ms} TTL 7d (5-minute resolution)
jr|1h|<epoch> HASH {p,f,ms} TTL 30d (1-hour resolution)
<epoch> is the UTC start-of-bucket as integer seconds. Totals are summed
across every job class, so a 30-day chart reads ~720 small hashes instead
of fanning out over ~43k per-class minute keys.
Every write is an idempotent HSET. Each tick recomputes the trailing few 1m buckets from the source minute hash, then recomputes the coarse buckets from their 1m children. Re-running a tick (a missed tick, a leadership change, a late metric write) converges to the same totals — it never double-counts. Storage is bounded purely by the per-bucket TTLs; see docs/metrics-history.md for the retention math.
Constant Summary collapse
- PREFIX =
'jr'- BUCKETS =
bucket => [step_seconds, ttl_seconds]. The retention is the issue's spec: 1m kept 24h, 5m kept 7d, 1h kept 30d.
{ '1m' => [60, 24 * 60 * 60], '5m' => [300, 7 * 24 * 60 * 60], '1h' => [3600, 30 * 24 * 60 * 60] }.freeze
- COARSE =
%w[5m 1h].freeze
- DEFAULT_TICK_SECONDS =
60- LOOKBACK_MINUTES =
Re-roll the last N completed minutes from source on every tick (idempotent). This self-heals a leadership failover / restart or a late metric write up to N minutes old — the source
j|…buckets live 3 days, so re-reading them folds the gap back in. Only outages longer than this leave a hole that ages out with the bucket TTL (best-effort metrics). 15
Instance Attribute Summary collapse
-
#config ⇒ Object
included
from Component
readonly
Returns the value of attribute config.
Class Method Summary collapse
Instance Method Summary collapse
- #default_tag(dir = Dir.pwd) ⇒ Object included from Component
-
#fire_event(event, oneshot: true, reverse: false, reraise: false) ⇒ Object
included
from Component
Invokes lifecycle hooks for
event. - #handle_exception(ex, ctx = {}) ⇒ Object included from Component
- #hostname ⇒ Object included from Component
- #identity ⇒ Object included from Component
-
#initialize(config) ⇒ Rollup
constructor
A new instance of Rollup.
-
#leader? ⇒ Boolean
included
from Component
True iff this process currently holds the cluster
dear-leaderlock. -
#logger ⇒ Object
included
from Component
--- delegated to config -------------------------------------------.
- #mono_ms ⇒ Object included from Component
- #process_nonce ⇒ Object included from Component
-
#real_ms ⇒ Object
included
from Component
--- clocks ---------------------------------------------------------.
- #redis ⇒ Object included from Component
-
#roll(now = ::Time.now) ⇒ Object
One rollup pass, bypassing the leader gate and the sleep loop.
-
#safe_thread(name, priority: nil, &block) ⇒ Object
included
from Component
Spawns a named thread that runs
blockunderwatchdog(name). - #start ⇒ Object
- #terminate ⇒ Object
-
#tick(now: ::Time.now) ⇒ Object
Leader-gated: only the elected leader writes the cluster-total series, so N workers don't each HSET the same buckets every minute.
-
#tid ⇒ Object
included
from Component
--- identity -------------------------------------------------------.
-
#watchdog(last_words) ⇒ Object
included
from Component
Wraps a block at a thread boundary: any unhandled exception is reported via handle_exception (so it lands in error_handlers / the log) and then re-raised.
Constructor Details
#initialize(config) ⇒ Rollup
Returns a new instance of Rollup.
54 55 56 57 58 59 60 61 |
# File 'lib/wurk/metrics/rollup.rb', line 54 def initialize(config) @config = config @done = false @mutex = ::Mutex.new @sleeper = ::ConditionVariable.new @tick_interval = config[:metrics_rollup_interval] || DEFAULT_TICK_SECONDS @thread = nil end |
Instance Attribute Details
#config ⇒ Object (readonly) Originally defined in module Component
Returns the value of attribute config.
Class Method Details
.bucket_key(bucket, epoch) ⇒ Object
50 51 52 |
# File 'lib/wurk/metrics/rollup.rb', line 50 def self.bucket_key(bucket, epoch) "#{PREFIX}|#{bucket}|#{epoch}" end |
Instance Method Details
#default_tag(dir = Dir.pwd) ⇒ Object Originally defined in module Component
#fire_event(event, oneshot: true, reverse: false, reraise: false) ⇒ Object Originally defined in module Component
Invokes lifecycle hooks for event. Hooks run in registration order
(or LIFO when reverse: true, used for teardown). A raise in one hook
is reported via handle_exception and does NOT stop the next hook unless
reraise: true (used in tests / fail-fast boot). oneshot: true
clears the bucket after dispatch so the event can't fire twice.
#handle_exception(ex, ctx = {}) ⇒ Object Originally defined in module Component
#hostname ⇒ Object Originally defined in module Component
#identity ⇒ Object Originally defined in module Component
#leader? ⇒ Boolean Originally defined in module Component
True iff this process currently holds the cluster dear-leader lock.
Per spec, the check is performed at call time (Wurk does not cache);
callers must not poll faster than the 60s follower cadence. Returns
false unconditionally when WURK_LEADER=false (or SIDEKIQ_LEADER=false)
is set on the process (opt-out hot-standby). Any Redis error is swallowed →
false, so a transient partition can't propagate as an exception into user
code.
Spec: docs/target/sidekiq-ent.md §6.1.
#logger ⇒ Object Originally defined in module Component
--- delegated to config -------------------------------------------
#mono_ms ⇒ Object Originally defined in module Component
#process_nonce ⇒ Object Originally defined in module Component
#real_ms ⇒ Object Originally defined in module Component
--- clocks ---------------------------------------------------------
#redis ⇒ Object Originally defined in module Component
#roll(now = ::Time.now) ⇒ Object
One rollup pass, bypassing the leader gate and the sleep loop. Public so deterministic specs and a manual "roll now" can drive it directly.
92 93 94 95 96 97 98 |
# File 'lib/wurk/metrics/rollup.rb', line 92 def roll(now = ::Time.now) cur_min = floor_min(now) minutes = (1..LOOKBACK_MINUTES).map { |i| cur_min - (i * 60) } minutes.each { |epoch_min| write_minute_bucket(epoch_min) } recompute_coarse(minutes) nil end |
#safe_thread(name, priority: nil, &block) ⇒ Object Originally defined in module Component
Spawns a named thread that runs block under watchdog(name). The
parent must retain the returned Thread; otherwise GC may not, but
report_on_exception is disabled so we don't double-log on death.
#start ⇒ Object
63 64 65 66 67 68 69 70 71 |
# File 'lib/wurk/metrics/rollup.rb', line 63 def start @thread ||= safe_thread('metrics-rollup') do # rubocop:disable Naming/MemoizedInstanceVariableName wait until @done tick wait end end end |
#terminate ⇒ Object
73 74 75 76 77 78 |
# File 'lib/wurk/metrics/rollup.rb', line 73 def terminate @mutex.synchronize do @done = true @sleeper.signal end end |
#tick(now: ::Time.now) ⇒ Object
Leader-gated: only the elected leader writes the cluster-total series, so N workers don't each HSET the same buckets every minute.
82 83 84 85 86 87 88 |
# File 'lib/wurk/metrics/rollup.rb', line 82 def tick(now: ::Time.now) return unless leader? roll(now) rescue StandardError => e handle_exception(e, { context: 'metrics-rollup' }) end |
#tid ⇒ Object Originally defined in module Component
--- identity -------------------------------------------------------
#watchdog(last_words) ⇒ Object Originally defined in module Component
Wraps a block at a thread boundary: any unhandled exception is reported
via handle_exception (so it lands in error_handlers / the log) and then
re-raised. last_words is the component label included in the context.