Class: Wurk::History
Overview
Sidekiq Enterprise §5 Historical Metrics snapshotter. A leader-gated
background thread that, every config.retain_history seconds, emits a
statsd-shaped snapshot to the configured dogstatsd client — either the
default §5.2 gauge set or a user-supplied collector block.
Configured in a server block:
Sidekiq.configure_server do |config|
config.dogstatsd = -> { Datadog::Statsd.new('localhost', 8125) }
config.retain_history(30) # default §5.2 gauges
# …or a custom collector:
config.retain_history(30) do |s|
Sidekiq::Queue.all.each do |q|
s.gauge("sidekiq.queue.size", q.size, tags: ["queue:#{q.name}"])
end
end
end
The block receives the raw dogstatsd client s (quacks like
Datadog::Statsd: gauge/count/histogram/batch) and writes fully-qualified
sidekiq.* metric names itself, matching Sidekiq Ent. Leader-gated via the
cluster dear-leader lock so exactly one process emits per cluster.
Every snapshot is also appended to the capped Redis stream
history:metrics (§5.3) — the same key a migrated Sidekiq Ent install
uses — so the dashboard's Historical view has a data source independent of
any external statsd, and pre-existing Ent stream data renders without
rewrite. The stream write happens whenever the snapshotter runs; the
dogstatsd emit is skipped only when no client is configured.
Aliased as Sidekiq::History (drop-in contract).
Spec: docs/target/sidekiq-ent.md §5.1–§5.3.
Constant Summary collapse
- SNAPSHOT_FIELDS =
Stream field → Stats reader. Single source for both the
history:metricsstream entry and the default §5.2 statsd gauge set (which prefixessidekiq.). Order is the display order. { 'processed' => :processed, 'failures' => :failed, 'enqueued' => :enqueued, 'retries' => :retry_size, 'dead' => :dead_size, 'scheduled' => :scheduled_size, 'busy' => :workers_size }.freeze
- STREAM_CAP =
Approximate cap on retained snapshots (XADD MAXLEN ~). At the default 30s interval this is ~3.5 days of history; older points age out.
~lets Redis trim in whole macro-nodes, so the actual length can briefly exceed the cap — matching Ent's best-effort retention. 10_000- STREAM_DEFAULT_LIMIT =
1000
Instance Attribute Summary collapse
-
#config ⇒ Object
included
from Component
readonly
Returns the value of attribute config.
Class Method Summary collapse
-
.numeric(value) ⇒ Object
Coerce a stream field to Int/Float for charting; leave non-numeric Ent fields (e.g. a label) untouched so nothing is silently dropped.
- .parse_entry(entry_id, fields) ⇒ Object
-
.recent(limit: STREAM_DEFAULT_LIMIT) ⇒ Object
Most-recent snapshots from the
history:metricsstream, oldest→newest. -
.stream_epoch(entry_id) ⇒ Object
Redis stream IDs are "
- "; the ms half is the snapshot time.
Instance Method Summary collapse
- #default_tag(dir = Dir.pwd) ⇒ Object included from Component
-
#fire_event(event, oneshot: true, reverse: false, reraise: false) ⇒ Object
included
from Component
Invokes lifecycle hooks for
event. - #handle_exception(ex, ctx = {}) ⇒ Object included from Component
- #hostname ⇒ Object included from Component
- #identity ⇒ Object included from Component
-
#initialize(config) ⇒ History
constructor
A new instance of History.
-
#leader? ⇒ Boolean
included
from Component
True iff this process currently holds the cluster
dear-leaderlock. -
#logger ⇒ Object
included
from Component
--- delegated to config -------------------------------------------.
- #mono_ms ⇒ Object included from Component
- #process_nonce ⇒ Object included from Component
-
#real_ms ⇒ Object
included
from Component
--- clocks ---------------------------------------------------------.
- #redis ⇒ Object included from Component
-
#safe_thread(name, priority: nil, &block) ⇒ Object
included
from Component
Spawns a named thread that runs
blockunderwatchdog(name). -
#snapshot ⇒ Object
One snapshot, bypassing the leader gate and the sleep loop.
- #start ⇒ Object
- #terminate ⇒ Object
-
#tick ⇒ Object
Leader-gated: only the elected leader emits, so N workers don't each publish the same cluster-wide gauges every interval.
-
#tid ⇒ Object
included
from Component
--- identity -------------------------------------------------------.
-
#watchdog(last_words) ⇒ Object
included
from Component
Wraps a block at a thread boundary: any unhandled exception is reported via handle_exception (so it lands in error_handlers / the log) and then re-raised.
Constructor Details
#initialize(config) ⇒ History
Returns a new instance of History.
64 65 66 67 68 69 70 71 72 73 |
# File 'lib/wurk/history.rb', line 64 def initialize(config) @config = config @interval = config.history_interval @collector = config.history_collector @stream_cap = config[:history_stream_cap] || STREAM_CAP @done = false @mutex = ::Mutex.new @sleeper = ::ConditionVariable.new @thread = nil end |
Instance Attribute Details
#config ⇒ Object (readonly) Originally defined in module Component
Returns the value of attribute config.
Class Method Details
.numeric(value) ⇒ Object
Coerce a stream field to Int/Float for charting; leave non-numeric Ent fields (e.g. a label) untouched so nothing is silently dropped.
137 138 139 140 141 142 |
# File 'lib/wurk/history.rb', line 137 def self.numeric(value) float = Float(value) (float % 1).zero? ? float.to_i : float rescue ::ArgumentError, ::TypeError value end |
.parse_entry(entry_id, fields) ⇒ Object
123 124 125 126 127 128 |
# File 'lib/wurk/history.rb', line 123 def self.parse_entry(entry_id, fields) pairs = fields.is_a?(::Array) ? fields.each_slice(2).to_h : fields point = { at: stream_epoch(entry_id) } pairs.each { |field, value| point[field.to_sym] = numeric(value) } point end |
.recent(limit: STREAM_DEFAULT_LIMIT) ⇒ Object
Most-recent snapshots from the history:metrics stream, oldest→newest.
Each point is { at: <epoch seconds>, <field>: <numeric>, … }. Fields are
read generically, so a migrated Sidekiq Ent install's entries render
without rewrite regardless of which fields they carry.
117 118 119 120 121 |
# File 'lib/wurk/history.rb', line 117 def self.recent(limit: STREAM_DEFAULT_LIMIT) count = limit.to_i.clamp(1, STREAM_CAP) entries = Wurk.redis { |c| c.call('XREVRANGE', Keys::HISTORY_METRICS, '+', '-', 'COUNT', count) } entries.reverse.map { |entry_id, fields| parse_entry(entry_id, fields) } end |
.stream_epoch(entry_id) ⇒ Object
Redis stream IDs are "
131 132 133 |
# File 'lib/wurk/history.rb', line 131 def self.stream_epoch(entry_id) entry_id.to_s.split('-', 2).first.to_i / 1000.0 end |
Instance Method Details
#default_tag(dir = Dir.pwd) ⇒ Object Originally defined in module Component
#fire_event(event, oneshot: true, reverse: false, reraise: false) ⇒ Object Originally defined in module Component
Invokes lifecycle hooks for event. Hooks run in registration order
(or LIFO when reverse: true, used for teardown). A raise in one hook
is reported via handle_exception and does NOT stop the next hook unless
reraise: true (used in tests / fail-fast boot). oneshot: true
clears the bucket after dispatch so the event can't fire twice.
#handle_exception(ex, ctx = {}) ⇒ Object Originally defined in module Component
#hostname ⇒ Object Originally defined in module Component
#identity ⇒ Object Originally defined in module Component
#leader? ⇒ Boolean Originally defined in module Component
True iff this process currently holds the cluster dear-leader lock.
Per spec, the check is performed at call time (Wurk does not cache);
callers must not poll faster than the 60s follower cadence. Returns
false unconditionally when WURK_LEADER=false (or SIDEKIQ_LEADER=false)
is set on the process (opt-out hot-standby). Any Redis error is swallowed →
false, so a transient partition can't propagate as an exception into user
code.
Spec: docs/target/sidekiq-ent.md §6.1.
#logger ⇒ Object Originally defined in module Component
--- delegated to config -------------------------------------------
#mono_ms ⇒ Object Originally defined in module Component
#process_nonce ⇒ Object Originally defined in module Component
#real_ms ⇒ Object Originally defined in module Component
--- clocks ---------------------------------------------------------
#redis ⇒ Object Originally defined in module Component
#safe_thread(name, priority: nil, &block) ⇒ Object Originally defined in module Component
Spawns a named thread that runs block under watchdog(name). The
parent must retain the returned Thread; otherwise GC may not, but
report_on_exception is disabled so we don't double-log on death.
#snapshot ⇒ Object
One snapshot, bypassing the leader gate and the sleep loop. Public so
deterministic specs and a manual "snapshot now" can drive it directly.
Always appends to the history:metrics stream (the dashboard's source);
additionally emits to dogstatsd when a client is configured.
106 107 108 109 110 111 |
# File 'lib/wurk/history.rb', line 106 def snapshot values = collect_values record_stream(values) emit_statsd(values) nil end |
#start ⇒ Object
75 76 77 78 79 80 81 82 83 |
# File 'lib/wurk/history.rb', line 75 def start @thread ||= safe_thread('history-snapshot') do # rubocop:disable Naming/MemoizedInstanceVariableName wait until @done tick wait end end end |
#terminate ⇒ Object
85 86 87 88 89 90 |
# File 'lib/wurk/history.rb', line 85 def terminate @mutex.synchronize do @done = true @sleeper.signal end end |
#tick ⇒ Object
Leader-gated: only the elected leader emits, so N workers don't each publish the same cluster-wide gauges every interval.
94 95 96 97 98 99 100 |
# File 'lib/wurk/history.rb', line 94 def tick return unless leader? snapshot rescue StandardError => e handle_exception(e, { context: 'history-snapshot' }) end |
#tid ⇒ Object Originally defined in module Component
--- identity -------------------------------------------------------
#watchdog(last_words) ⇒ Object Originally defined in module Component
Wraps a block at a thread boundary: any unhandled exception is reported
via handle_exception (so it lands in error_handlers / the log) and then
re-raised. last_words is the component label included in the context.