Class: Wurk::Metrics::Rollup

Inherits:
Object
  • Object
show all
Includes:
Component
Defined in:
lib/wurk/metrics/rollup.rb

Overview

Leader-only background thread that rolls the per-class minute buckets written by Wurk::Metrics::History (j|YYMMDD|H:M) up into compact, cluster-total time-series buckets the dashboard "throughput" / "failures" charts read directly:

jr|1m|<epoch>   HASH {p,f,ms}   TTL 24h   (1-minute resolution)
jr|5m|<epoch>   HASH {p,f,ms}   TTL 7d    (5-minute resolution)
jr|1h|<epoch>   HASH {p,f,ms}   TTL 30d   (1-hour resolution)

<epoch> is the UTC start-of-bucket as integer seconds. Totals are summed across every job class, so a 30-day chart reads ~720 small hashes instead of fanning out over ~43k per-class minute keys.

Every write is an idempotent HSET. Each tick recomputes the trailing few 1m buckets from the source minute hash, then recomputes the coarse buckets from their 1m children. Re-running a tick (a missed tick, a leadership change, a late metric write) converges to the same totals — it never double-counts. Storage is bounded purely by the per-bucket TTLs; see docs/metrics-history.md for the retention math.

Constant Summary collapse

PREFIX =
'jr'
BUCKETS =

bucket => [step_seconds, ttl_seconds]. The retention is the issue's spec: 1m kept 24h, 5m kept 7d, 1h kept 30d.

{
  '1m' => [60,   24 * 60 * 60],
  '5m' => [300,  7 * 24 * 60 * 60],
  '1h' => [3600, 30 * 24 * 60 * 60]
}.freeze
COARSE =
%w[5m 1h].freeze
DEFAULT_TICK_SECONDS =
60
LOOKBACK_MINUTES =

Re-roll the last N completed minutes from source on every tick (idempotent). This self-heals a leadership failover / restart or a late metric write up to N minutes old — the source j|… buckets live 3 days, so re-reading them folds the gap back in. Only outages longer than this leave a hole that ages out with the bucket TTL (best-effort metrics).

15

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(config) ⇒ Rollup

Returns a new instance of Rollup.



54
55
56
57
58
59
60
61
# File 'lib/wurk/metrics/rollup.rb', line 54

def initialize(config)
  @config = config
  @done = false
  @mutex = ::Mutex.new
  @sleeper = ::ConditionVariable.new
  @tick_interval = config[:metrics_rollup_interval] || DEFAULT_TICK_SECONDS
  @thread = nil
end

Instance Attribute Details

#configObject (readonly) Originally defined in module Component

Returns the value of attribute config.

Class Method Details

.bucket_key(bucket, epoch) ⇒ Object



50
51
52
# File 'lib/wurk/metrics/rollup.rb', line 50

def self.bucket_key(bucket, epoch)
  "#{PREFIX}|#{bucket}|#{epoch}"
end

Instance Method Details

#default_tag(dir = Dir.pwd) ⇒ Object Originally defined in module Component

#fire_event(event, oneshot: true, reverse: false, reraise: false) ⇒ Object Originally defined in module Component

Invokes lifecycle hooks for event. Hooks run in registration order (or LIFO when reverse: true, used for teardown). A raise in one hook is reported via handle_exception and does NOT stop the next hook unless reraise: true (used in tests / fail-fast boot). oneshot: true clears the bucket after dispatch so the event can't fire twice.

#handle_exception(ex, ctx = {}) ⇒ Object Originally defined in module Component

#hostnameObject Originally defined in module Component

#identityObject Originally defined in module Component

#leader?Boolean Originally defined in module Component

True iff this process currently holds the cluster dear-leader lock. Per spec, the check is performed at call time (Wurk does not cache); callers must not poll faster than the 60s follower cadence. Returns false unconditionally when WURK_LEADER=false (or SIDEKIQ_LEADER=false) is set on the process (opt-out hot-standby). Any Redis error is swallowed → false, so a transient partition can't propagate as an exception into user code.

Spec: docs/target/sidekiq-ent.md §6.1.

Returns:

  • (Boolean)

#loggerObject Originally defined in module Component

--- delegated to config -------------------------------------------

#mono_msObject Originally defined in module Component

#process_nonceObject Originally defined in module Component

#real_msObject Originally defined in module Component

--- clocks ---------------------------------------------------------

#redisObject Originally defined in module Component

#roll(now = ::Time.now) ⇒ Object

One rollup pass, bypassing the leader gate and the sleep loop. Public so deterministic specs and a manual "roll now" can drive it directly.



92
93
94
95
96
97
98
# File 'lib/wurk/metrics/rollup.rb', line 92

def roll(now = ::Time.now)
  cur_min = floor_min(now)
  minutes = (1..LOOKBACK_MINUTES).map { |i| cur_min - (i * 60) }
  minutes.each { |epoch_min| write_minute_bucket(epoch_min) }
  recompute_coarse(minutes)
  nil
end

#safe_thread(name, priority: nil, &block) ⇒ Object Originally defined in module Component

Spawns a named thread that runs block under watchdog(name). The parent must retain the returned Thread; otherwise GC may not, but report_on_exception is disabled so we don't double-log on death.

#startObject



63
64
65
66
67
68
69
70
71
# File 'lib/wurk/metrics/rollup.rb', line 63

def start
  @thread ||= safe_thread('metrics-rollup') do # rubocop:disable Naming/MemoizedInstanceVariableName
    wait
    until @done
      tick
      wait
    end
  end
end

#terminateObject



73
74
75
76
77
78
# File 'lib/wurk/metrics/rollup.rb', line 73

def terminate
  @mutex.synchronize do
    @done = true
    @sleeper.signal
  end
end

#tick(now: ::Time.now) ⇒ Object

Leader-gated: only the elected leader writes the cluster-total series, so N workers don't each HSET the same buckets every minute.



82
83
84
85
86
87
88
# File 'lib/wurk/metrics/rollup.rb', line 82

def tick(now: ::Time.now)
  return unless leader?

  roll(now)
rescue StandardError => e
  handle_exception(e, { context: 'metrics-rollup' })
end

#tidObject Originally defined in module Component

--- identity -------------------------------------------------------

#watchdog(last_words) ⇒ Object Originally defined in module Component

Wraps a block at a thread boundary: any unhandled exception is reported via handle_exception (so it lands in error_handlers / the log) and then re-raised. last_words is the component label included in the context.