Class: Wurk::ProcessSet
- Inherits:
-
Object
- Object
- Wurk::ProcessSet
- Includes:
- Enumerable
- Defined in:
- lib/wurk/process_set.rb
Overview
Live process list view backed by the processes SET + per-identity hashes.
Cluster topology lives here: dashboard process list, total concurrency
gauge, RSS roll-up, leader lookup.
Wire-compat is sacred — every Redis call matches Sidekiq OSS exactly. Spec: docs/target/sidekiq-free.md §1 (Redis schema), §19.6.
Class Method Summary collapse
-
.[](identity) ⇒ Object
Fetch a single Process by identity.
Instance Method Summary collapse
-
#cleanup ⇒ Object
SREM identities whose
infoHASH field has expired. -
#each ⇒ Object
Pipelined identity → HMGET.
-
#initialize(clean_plz = true) ⇒ ProcessSet
constructor
processesSET only stores identity strings; the live hash at each identity expires every 60s, so SCARD can lag reality. -
#leader ⇒ Object
Cluster leader identity.
-
#size ⇒ Object
SCARD over
processes. -
#total_concurrency ⇒ Object
Sum of
concurrencyacross live processes. -
#total_rss_in_kb ⇒ Object
(also: #total_rss)
Sum of
rss(KB) across live processes.
Constructor Details
#initialize(clean_plz = true) ⇒ ProcessSet
processes SET only stores identity strings; the live hash at each
identity expires every 60s, so SCARD can lag reality. Constructor
opts into a cleanup (rate-limited globally to 1/min) so the size
reported below reflects the post-prune state — except when caller
explicitly skips it (e.g. inside cleanup itself, or for snapshot
reads on hot paths).
Positional Boolean matches Sidekiq's public API — wire-compat sacred.
27 28 29 |
# File 'lib/wurk/process_set.rb', line 27 def initialize(clean_plz = true) # rubocop:disable Style/OptionalBooleanParameter cleanup if clean_plz end |
Class Method Details
.[](identity) ⇒ Object
Fetch a single Process by identity. Pipelined SISMEMBER + HMGET so absence (process never registered) and expiry (heartbeat lapsed, info field gone) both return nil.
34 35 36 37 38 39 40 41 42 43 44 |
# File 'lib/wurk/process_set.rb', line 34 def self.[](identity) exists, fields = Wurk.redis do |conn| conn.pipelined do |pipe| pipe.call('SISMEMBER', Keys::PROCESSES, identity) pipe.call('HMGET', identity, *LOOKUP_FIELDS) end end return nil if exists.to_i.zero? || fields.first.nil? build_process(LOOKUP_FIELDS.zip(fields).to_h) end |
Instance Method Details
#cleanup ⇒ Object
SREM identities whose info HASH field has expired. Rate-limited
globally to 1/min via SET NX EX so concurrent dashboards / API calls
don't dogpile the prune. Returns the number of identities removed
(or 0 when the lock was held by someone else).
Spec: docs/target/sidekiq-free.md §31.17.
52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 |
# File 'lib/wurk/process_set.rb', line 52 def cleanup return 0 unless acquired_cleanup_lock? Wurk.redis do |conn| procs = conn.call('SMEMBERS', Keys::PROCESSES) next 0 if procs.empty? heartbeats = conn.pipelined do |pipe| procs.each { |key| pipe.call('HGET', key, 'info') } end to_prune = procs.zip(heartbeats).filter_map { |id, info| id if info.nil? } next 0 if to_prune.empty? conn.call('SREM', Keys::PROCESSES, *to_prune).to_i end end |
#each ⇒ Object
Pipelined identity → HMGET. Skips identities whose info field is
gone (heartbeat expired between SMEMBERS and HMGET). Sorted by
identity so iteration order is stable for dashboards.
72 73 74 75 76 77 78 79 80 81 82 |
# File 'lib/wurk/process_set.rb', line 72 def each return enum_for(:each) unless block_given? rows = fetch_each_rows rows.each do |row| attrs = EACH_FIELDS.zip(row).to_h next if attrs['info'].nil? yield ProcessSet.send(:build_process, attrs) end end |
#leader ⇒ Object
Cluster leader identity. Ent-only feature; OSS only reads the key.
Memoized per-instance: dashboards re-instantiate ProcessSet per render.
||= with empty-string fallback distinguishes "leader is unset" from
"memoization not yet computed".
106 107 108 |
# File 'lib/wurk/process_set.rb', line 106 def leader @leader ||= Wurk.redis { |c| c.call('GET', 'dear-leader') } || '' end |
#size ⇒ Object
SCARD over processes. Not pruned — may include identities whose
heartbeat has lapsed. Use each for the accurate count.
86 87 88 |
# File 'lib/wurk/process_set.rb', line 86 def size Wurk.redis { |conn| conn.call('SCARD', Keys::PROCESSES) } end |
#total_concurrency ⇒ Object
Sum of concurrency across live processes. Iterates each so dead
identities are skipped.
92 93 94 |
# File 'lib/wurk/process_set.rb', line 92 def total_concurrency sum { |p| p['concurrency'].to_i } end |
#total_rss_in_kb ⇒ Object Also known as: total_rss
Sum of rss (KB) across live processes.
97 98 99 |
# File 'lib/wurk/process_set.rb', line 97 def total_rss_in_kb sum { |p| p['rss'].to_i } end |