Class: Wurk::ProcessSet

Inherits:
Object
  • Object
show all
Includes:
Enumerable
Defined in:
lib/wurk/process_set.rb

Overview

Live process list view backed by the processes SET + per-identity hashes. Cluster topology lives here: dashboard process list, total concurrency gauge, RSS roll-up, leader lookup.

Wire-compat is sacred — every Redis call matches Sidekiq OSS exactly. Spec: docs/target/sidekiq-free.md §1 (Redis schema), §19.6.

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(clean_plz = true) ⇒ ProcessSet

processes SET only stores identity strings; the live hash at each identity expires every 60s, so SCARD can lag reality. Constructor opts into a cleanup (rate-limited globally to 1/min) so the size reported below reflects the post-prune state — except when caller explicitly skips it (e.g. inside cleanup itself, or for snapshot reads on hot paths).

Positional Boolean matches Sidekiq's public API — wire-compat sacred.



27
28
29
# File 'lib/wurk/process_set.rb', line 27

def initialize(clean_plz = true) # rubocop:disable Style/OptionalBooleanParameter
  cleanup if clean_plz
end

Class Method Details

.[](identity) ⇒ Object

Fetch a single Process by identity. Pipelined SISMEMBER + HMGET so absence (process never registered) and expiry (heartbeat lapsed, info field gone) both return nil.



34
35
36
37
38
39
40
41
42
43
44
# File 'lib/wurk/process_set.rb', line 34

def self.[](identity)
  exists, fields = Wurk.redis do |conn|
    conn.pipelined do |pipe|
      pipe.call('SISMEMBER', Keys::PROCESSES, identity)
      pipe.call('HMGET', identity, *LOOKUP_FIELDS)
    end
  end
  return nil if exists.to_i.zero? || fields.first.nil?

  build_process(LOOKUP_FIELDS.zip(fields).to_h)
end

Instance Method Details

#cleanupObject

SREM identities whose info HASH field has expired. Rate-limited globally to 1/min via SET NX EX so concurrent dashboards / API calls don't dogpile the prune. Returns the number of identities removed (or 0 when the lock was held by someone else).

Spec: docs/target/sidekiq-free.md §31.17.



52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
# File 'lib/wurk/process_set.rb', line 52

def cleanup
  return 0 unless acquired_cleanup_lock?

  Wurk.redis do |conn|
    procs = conn.call('SMEMBERS', Keys::PROCESSES)
    next 0 if procs.empty?

    heartbeats = conn.pipelined do |pipe|
      procs.each { |key| pipe.call('HGET', key, 'info') }
    end
    to_prune = procs.zip(heartbeats).filter_map { |id, info| id if info.nil? }
    next 0 if to_prune.empty?

    conn.call('SREM', Keys::PROCESSES, *to_prune).to_i
  end
end

#eachObject

Pipelined identity → HMGET. Skips identities whose info field is gone (heartbeat expired between SMEMBERS and HMGET). Sorted by identity so iteration order is stable for dashboards.



72
73
74
75
76
77
78
79
80
81
82
# File 'lib/wurk/process_set.rb', line 72

def each
  return enum_for(:each) unless block_given?

  rows = fetch_each_rows
  rows.each do |row|
    attrs = EACH_FIELDS.zip(row).to_h
    next if attrs['info'].nil?

    yield ProcessSet.send(:build_process, attrs)
  end
end

#leaderObject

Cluster leader identity. Ent-only feature; OSS only reads the key. Memoized per-instance: dashboards re-instantiate ProcessSet per render. ||= with empty-string fallback distinguishes "leader is unset" from "memoization not yet computed".



106
107
108
# File 'lib/wurk/process_set.rb', line 106

def leader
  @leader ||= Wurk.redis { |c| c.call('GET', 'dear-leader') } || ''
end

#sizeObject

SCARD over processes. Not pruned — may include identities whose heartbeat has lapsed. Use each for the accurate count.



86
87
88
# File 'lib/wurk/process_set.rb', line 86

def size
  Wurk.redis { |conn| conn.call('SCARD', Keys::PROCESSES) }
end

#total_concurrencyObject

Sum of concurrency across live processes. Iterates each so dead identities are skipped.



92
93
94
# File 'lib/wurk/process_set.rb', line 92

def total_concurrency
  sum { |p| p['concurrency'].to_i }
end

#total_rss_in_kbObject Also known as: total_rss

Sum of rss (KB) across live processes.



97
98
99
# File 'lib/wurk/process_set.rb', line 97

def total_rss_in_kb
  sum { |p| p['rss'].to_i }
end