Class: Wurk::History

Inherits:
Object
  • Object
show all
Includes:
Component
Defined in:
lib/wurk/history.rb

Overview

Sidekiq Enterprise §5 Historical Metrics snapshotter. A leader-gated background thread that, every config.retain_history seconds, emits a statsd-shaped snapshot to the configured dogstatsd client — either the default §5.2 gauge set or a user-supplied collector block.

Configured in a server block:

Sidekiq.configure_server do |config|
config.dogstatsd = -> { Datadog::Statsd.new('localhost', 8125) }
config.retain_history(30)                      # default §5.2 gauges
# …or a custom collector:
config.retain_history(30) do |s|
  Sidekiq::Queue.all.each do |q|
    s.gauge("sidekiq.queue.size", q.size, tags: ["queue:#{q.name}"])
  end
end
end

The block receives the raw dogstatsd client s (quacks like Datadog::Statsd: gauge/count/histogram/batch) and writes fully-qualified sidekiq.* metric names itself, matching Sidekiq Ent. Leader-gated via the cluster dear-leader lock so exactly one process emits per cluster.

Every snapshot is also appended to the capped Redis stream history:metrics (§5.3) — the same key a migrated Sidekiq Ent install uses — so the dashboard's Historical view has a data source independent of any external statsd, and pre-existing Ent stream data renders without rewrite. The stream write happens whenever the snapshotter runs; the dogstatsd emit is skipped only when no client is configured.

Aliased as Sidekiq::History (drop-in contract). Spec: docs/target/sidekiq-ent.md §5.1–§5.3.

Constant Summary collapse

SNAPSHOT_FIELDS =

Stream field → Stats reader. Single source for both the history:metrics stream entry and the default §5.2 statsd gauge set (which prefixes sidekiq.). Order is the display order.

{
  'processed' => :processed,
  'failures' => :failed,
  'enqueued' => :enqueued,
  'retries' => :retry_size,
  'dead' => :dead_size,
  'scheduled' => :scheduled_size,
  'busy' => :workers_size
}.freeze
STREAM_CAP =

Approximate cap on retained snapshots (XADD MAXLEN ~). At the default 30s interval this is ~3.5 days of history; older points age out. ~ lets Redis trim in whole macro-nodes, so the actual length can briefly exceed the cap — matching Ent's best-effort retention.

10_000
STREAM_DEFAULT_LIMIT =
1000

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(config) ⇒ History

Returns a new instance of History.



64
65
66
67
68
69
70
71
72
73
# File 'lib/wurk/history.rb', line 64

def initialize(config)
  @config = config
  @interval = config.history_interval
  @collector = config.history_collector
  @stream_cap = config[:history_stream_cap] || STREAM_CAP
  @done = false
  @mutex = ::Mutex.new
  @sleeper = ::ConditionVariable.new
  @thread = nil
end

Instance Attribute Details

#configObject (readonly) Originally defined in module Component

Returns the value of attribute config.

Class Method Details

.numeric(value) ⇒ Object

Coerce a stream field to Int/Float for charting; leave non-numeric Ent fields (e.g. a label) untouched so nothing is silently dropped.



137
138
139
140
141
142
# File 'lib/wurk/history.rb', line 137

def self.numeric(value)
  float = Float(value)
  (float % 1).zero? ? float.to_i : float
rescue ::ArgumentError, ::TypeError
  value
end

.parse_entry(entry_id, fields) ⇒ Object



123
124
125
126
127
128
# File 'lib/wurk/history.rb', line 123

def self.parse_entry(entry_id, fields)
  pairs = fields.is_a?(::Array) ? fields.each_slice(2).to_h : fields
  point = { at: stream_epoch(entry_id) }
  pairs.each { |field, value| point[field.to_sym] = numeric(value) }
  point
end

.recent(limit: STREAM_DEFAULT_LIMIT) ⇒ Object

Most-recent snapshots from the history:metrics stream, oldest→newest. Each point is { at: <epoch seconds>, <field>: <numeric>, … }. Fields are read generically, so a migrated Sidekiq Ent install's entries render without rewrite regardless of which fields they carry.



117
118
119
120
121
# File 'lib/wurk/history.rb', line 117

def self.recent(limit: STREAM_DEFAULT_LIMIT)
  count = limit.to_i.clamp(1, STREAM_CAP)
  entries = Wurk.redis { |c| c.call('XREVRANGE', Keys::HISTORY_METRICS, '+', '-', 'COUNT', count) }
  entries.reverse.map { |entry_id, fields| parse_entry(entry_id, fields) }
end

.stream_epoch(entry_id) ⇒ Object

Redis stream IDs are "-"; the ms half is the snapshot time.



131
132
133
# File 'lib/wurk/history.rb', line 131

def self.stream_epoch(entry_id)
  entry_id.to_s.split('-', 2).first.to_i / 1000.0
end

Instance Method Details

#default_tag(dir = Dir.pwd) ⇒ Object Originally defined in module Component

#fire_event(event, oneshot: true, reverse: false, reraise: false) ⇒ Object Originally defined in module Component

Invokes lifecycle hooks for event. Hooks run in registration order (or LIFO when reverse: true, used for teardown). A raise in one hook is reported via handle_exception and does NOT stop the next hook unless reraise: true (used in tests / fail-fast boot). oneshot: true clears the bucket after dispatch so the event can't fire twice.

#handle_exception(ex, ctx = {}) ⇒ Object Originally defined in module Component

#hostnameObject Originally defined in module Component

#identityObject Originally defined in module Component

#leader?Boolean Originally defined in module Component

True iff this process currently holds the cluster dear-leader lock. Per spec, the check is performed at call time (Wurk does not cache); callers must not poll faster than the 60s follower cadence. Returns false unconditionally when WURK_LEADER=false (or SIDEKIQ_LEADER=false) is set on the process (opt-out hot-standby). Any Redis error is swallowed → false, so a transient partition can't propagate as an exception into user code.

Spec: docs/target/sidekiq-ent.md §6.1.

Returns:

  • (Boolean)

#loggerObject Originally defined in module Component

--- delegated to config -------------------------------------------

#mono_msObject Originally defined in module Component

#process_nonceObject Originally defined in module Component

#real_msObject Originally defined in module Component

--- clocks ---------------------------------------------------------

#redisObject Originally defined in module Component

#safe_thread(name, priority: nil, &block) ⇒ Object Originally defined in module Component

Spawns a named thread that runs block under watchdog(name). The parent must retain the returned Thread; otherwise GC may not, but report_on_exception is disabled so we don't double-log on death.

#snapshotObject

One snapshot, bypassing the leader gate and the sleep loop. Public so deterministic specs and a manual "snapshot now" can drive it directly. Always appends to the history:metrics stream (the dashboard's source); additionally emits to dogstatsd when a client is configured.



106
107
108
109
110
111
# File 'lib/wurk/history.rb', line 106

def snapshot
  values = collect_values
  record_stream(values)
  emit_statsd(values)
  nil
end

#startObject



75
76
77
78
79
80
81
82
83
# File 'lib/wurk/history.rb', line 75

def start
  @thread ||= safe_thread('history-snapshot') do # rubocop:disable Naming/MemoizedInstanceVariableName
    wait
    until @done
      tick
      wait
    end
  end
end

#terminateObject



85
86
87
88
89
90
# File 'lib/wurk/history.rb', line 85

def terminate
  @mutex.synchronize do
    @done = true
    @sleeper.signal
  end
end

#tickObject

Leader-gated: only the elected leader emits, so N workers don't each publish the same cluster-wide gauges every interval.



94
95
96
97
98
99
100
# File 'lib/wurk/history.rb', line 94

def tick
  return unless leader?

  snapshot
rescue StandardError => e
  handle_exception(e, { context: 'history-snapshot' })
end

#tidObject Originally defined in module Component

--- identity -------------------------------------------------------

#watchdog(last_words) ⇒ Object Originally defined in module Component

Wraps a block at a thread boundary: any unhandled exception is reported via handle_exception (so it lands in error_handlers / the log) and then re-raised. last_words is the component label included in the context.