Class: Wurk::Metrics::QueueRollup

Inherits:
Object
  • Object
show all
Includes:
Component
Defined in:
lib/wurk/metrics/queue_rollup.rb

Overview

Leader-only background thread that snapshots each queue's current depth (LLEN) and head-of-line latency into compact per-queue gauge buckets the dashboard's "queue size / latency over time" charts read directly:

qm|1m|<epoch>   HASH {<queue>|sz, <queue>|lt}   TTL 24h
qm|5m|<epoch>   HASH {<queue>|sz, <queue>|lt}   TTL 7d
qm|1h|<epoch>   HASH {<queue>|sz, <queue>|lt}   TTL 30d

Unlike Metrics::Rollup (which SUMS counters rolled up from a source), size and latency are GAUGES — point-in-time values — so each tick samples "now" and writes it to the current bucket at every resolution. Within a coarse (5m/1h) bucket the per-minute ticks overwrite, so the bucket holds the latest sample in its window (a "last value" downsample, which is the right summary for a gauge). Leader-gated so N workers don't each sample the same queues every minute. <epoch> is the UTC start-of-bucket.

Spec: docs/target/sidekiq-ent.md §5.2 (sidekiq.queue.size / sidekiq.queue.latency gauges), §7 Historical tab.

Constant Summary collapse

PREFIX =
'qm'
SIZE_KIND =
'sz'
LAT_KIND =
'lt'
BUCKETS =

Mirror Metrics::Rollup retention so the dashboard's range selector (24h·1m / 7d·5m / 30d·1h) maps 1:1 to both the throughput and the queue-gauge series.

Wurk::Metrics::Rollup::BUCKETS
DEFAULT_TICK_SECONDS =
60

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(config) ⇒ QueueRollup

Returns a new instance of QueueRollup.



46
47
48
49
50
51
52
53
# File 'lib/wurk/metrics/queue_rollup.rb', line 46

def initialize(config)
  @config = config
  @done = false
  @mutex = ::Mutex.new
  @sleeper = ::ConditionVariable.new
  @tick_interval = config[:metrics_rollup_interval] || DEFAULT_TICK_SECONDS
  @thread = nil
end

Instance Attribute Details

#configObject (readonly) Originally defined in module Component

Returns the value of attribute config.

Class Method Details

.bucket_key(bucket, epoch) ⇒ Object



42
43
44
# File 'lib/wurk/metrics/queue_rollup.rb', line 42

def self.bucket_key(bucket, epoch)
  "#{PREFIX}|#{bucket}|#{epoch}"
end

Instance Method Details

#default_tag(dir = Dir.pwd) ⇒ Object Originally defined in module Component

#fire_event(event, oneshot: true, reverse: false, reraise: false) ⇒ Object Originally defined in module Component

Invokes lifecycle hooks for event. Hooks run in registration order (or LIFO when reverse: true, used for teardown). A raise in one hook is reported via handle_exception and does NOT stop the next hook unless reraise: true (used in tests / fail-fast boot). oneshot: true clears the bucket after dispatch so the event can't fire twice.

#handle_exception(ex, ctx = {}) ⇒ Object Originally defined in module Component

#hostnameObject Originally defined in module Component

#identityObject Originally defined in module Component

#leader?Boolean Originally defined in module Component

True iff this process currently holds the cluster dear-leader lock. Per spec, the check is performed at call time (Wurk does not cache); callers must not poll faster than the 60s follower cadence. Returns false unconditionally when WURK_LEADER=false (or SIDEKIQ_LEADER=false) is set on the process (opt-out hot-standby). Any Redis error is swallowed → false, so a transient partition can't propagate as an exception into user code.

Spec: docs/target/sidekiq-ent.md §6.1.

Returns:

  • (Boolean)

#loggerObject Originally defined in module Component

--- delegated to config -------------------------------------------

#mono_msObject Originally defined in module Component

#process_nonceObject Originally defined in module Component

#real_msObject Originally defined in module Component

--- clocks ---------------------------------------------------------

#redisObject Originally defined in module Component

#safe_thread(name, priority: nil, &block) ⇒ Object Originally defined in module Component

Spawns a named thread that runs block under watchdog(name). The parent must retain the returned Thread; otherwise GC may not, but report_on_exception is disabled so we don't double-log on death.

#sample(now = ::Time.now) ⇒ Object

One sampling pass, bypassing the leader gate and the sleep loop. Public so deterministic specs and a manual "sample now" can drive it directly.



84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
# File 'lib/wurk/metrics/queue_rollup.rb', line 84

def sample(now = ::Time.now)
  gauges = queue_gauges
  return if gauges.empty?

  fields = gauges.flat_map do |name, (size, lat)|
    ["#{name}|#{SIZE_KIND}", size, "#{name}|#{LAT_KIND}", lat]
  end
  BUCKETS.each do |bucket, (step, ttl)|
    key = self.class.bucket_key(bucket, (now.to_i / step) * step)
    redis do |c|
      c.call('HSET', key, *fields)
      c.call('EXPIRE', key, ttl)
    end
  end
  nil
end

#startObject



55
56
57
58
59
60
61
62
63
# File 'lib/wurk/metrics/queue_rollup.rb', line 55

def start
  @thread ||= safe_thread('queue-metrics') do # rubocop:disable Naming/MemoizedInstanceVariableName
    wait
    until @done
      tick
      wait
    end
  end
end

#terminateObject



65
66
67
68
69
70
# File 'lib/wurk/metrics/queue_rollup.rb', line 65

def terminate
  @mutex.synchronize do
    @done = true
    @sleeper.signal
  end
end

#tick(now: ::Time.now) ⇒ Object

Leader-gated: only the elected leader samples, so N workers don't each HSET the same buckets every minute.



74
75
76
77
78
79
80
# File 'lib/wurk/metrics/queue_rollup.rb', line 74

def tick(now: ::Time.now)
  return unless leader?

  sample(now)
rescue StandardError => e
  handle_exception(e, { context: 'queue-metrics' })
end

#tidObject Originally defined in module Component

--- identity -------------------------------------------------------

#watchdog(last_words) ⇒ Object Originally defined in module Component

Wraps a block at a thread boundary: any unhandled exception is reported via handle_exception (so it lands in error_handlers / the log) and then re-raised. last_words is the component label included in the context.