Class: Wurk::Heartbeat
- Inherits:
-
Object
- Object
- Wurk::Heartbeat
- Includes:
- Component
- Defined in:
- lib/wurk/heartbeat.rb
Overview
Single-purpose owner of the Redis-side process heartbeat. Lifted out of Wurk::Launcher so the launcher can stay focused on lifecycle and so the heartbeat schema lives in one place readers can grep for.
Each beat is one pipelined round-trip:
SADD processes <identity>
HSET <identity> info concurrency busy beat quiet rss rtt_us
EXPIRE <identity> 60
UNLINK <identity>:work
HSET <identity>:work <tid> <json> ... (only if WORK_STATE non-empty)
EXPIRE <identity>:work 60 (only if WORK_STATE non-empty)
LPOP <identity>-signals × BEAT_PAUSE
The work hash is UNLINK-then-rewritten on every beat — a dropped beat
momentarily empties it, and ProcessSet#cleanup compensates by SREM-ing
identities whose info field has expired.
Heartbeat owns only the wire. Signals queued by the dashboard are pulled out of the pipelined results and returned to the caller for dispatch — TSTP/TERM semantics live in Launcher.
:heartbeat fires once on the first successful beat and again after any
network partition (rearmed when beat! rescues). :beat fires every tick.
Spec: docs/target/sidekiq-free.md §12 (Launcher#❤️).
Constant Summary collapse
- BEAT_PAUSE =
Cadence in seconds. Key TTL is 60s — a process is dead after ~6 misses.
10- TTL_SECONDS =
60
Instance Attribute Summary collapse
-
#config ⇒ Object
included
from Component
readonly
Returns the value of attribute config.
-
#identity ⇒ Object
readonly
Returns the value of attribute identity.
-
#last_beat_at ⇒ Object
readonly
Returns the value of attribute last_beat_at.
-
#rtt_us ⇒ Object
readonly
Returns the value of attribute rtt_us.
Instance Method Summary collapse
-
#beat! ⇒ Object
Pipelined beat.
- #default_tag(dir = Dir.pwd) ⇒ Object included from Component
-
#fire_event(event, oneshot: true, reverse: false, reraise: false) ⇒ Object
included
from Component
Invokes lifecycle hooks for
event. - #handle_exception(ex, ctx = {}) ⇒ Object included from Component
- #hostname ⇒ Object included from Component
-
#initialize(identity:, config:, started_at: nil, embedded: false, quiet: nil) ⇒ Heartbeat
constructor
quiet:is a callable so Launcher can keep ownership of its@doneflag without an awkward setter contract. -
#leader? ⇒ Boolean
included
from Component
True iff this process currently holds the cluster
dear-leaderlock. -
#logger ⇒ Object
included
from Component
--- delegated to config -------------------------------------------.
- #mono_ms ⇒ Object included from Component
- #process_nonce ⇒ Object included from Component
-
#real_ms ⇒ Object
included
from Component
--- clocks ---------------------------------------------------------.
- #redis ⇒ Object included from Component
-
#safe_thread(name, priority: nil, &block) ⇒ Object
included
from Component
Spawns a named thread that runs
blockunderwatchdog(name). -
#stop! ⇒ Object
Best-effort teardown.
-
#tid ⇒ Object
included
from Component
--- identity -------------------------------------------------------.
-
#watchdog(last_words) ⇒ Object
included
from Component
Wraps a block at a thread boundary: any unhandled exception is reported via handle_exception (so it lands in error_handlers / the log) and then re-raised.
Constructor Details
#initialize(identity:, config:, started_at: nil, embedded: false, quiet: nil) ⇒ Heartbeat
quiet: is a callable so Launcher can keep ownership of its @done
flag without an awkward setter contract. info_overrides: lets the
caller (Launcher#embedded, tests) inject fields without forcing
Heartbeat to know about every flag the host process tracks.
48 49 50 51 52 53 54 55 56 |
# File 'lib/wurk/heartbeat.rb', line 48 def initialize(identity:, config:, started_at: nil, embedded: false, quiet: nil) @identity = identity @config = config @started_at = started_at || Time.now.to_f @embedded = @quiet_proc = quiet || -> { false } @first_heartbeat = true @rtt_us = 0 end |
Instance Attribute Details
#config ⇒ Object (readonly) Originally defined in module Component
Returns the value of attribute config.
#identity ⇒ Object (readonly)
Returns the value of attribute identity.
42 43 44 |
# File 'lib/wurk/heartbeat.rb', line 42 def identity @identity end |
#last_beat_at ⇒ Object (readonly)
Returns the value of attribute last_beat_at.
42 43 44 |
# File 'lib/wurk/heartbeat.rb', line 42 def last_beat_at @last_beat_at end |
#rtt_us ⇒ Object (readonly)
Returns the value of attribute rtt_us.
42 43 44 |
# File 'lib/wurk/heartbeat.rb', line 42 def rtt_us @rtt_us end |
Instance Method Details
#beat! ⇒ Object
Pipelined beat. Returns the drained signals as Array:heartbeat so partition recovery is observable.
61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 |
# File 'lib/wurk/heartbeat.rb', line 61 def beat! sigs, @rtt_us = pipelined_beat @last_beat_at = Time.now.to_f if @first_heartbeat @first_heartbeat = false fire_event(:heartbeat) end fire_event(:beat, oneshot: false) emit_statsd_gauges sigs.compact rescue StandardError => e @first_heartbeat = true handle_exception(e, { context: 'heartbeat' }) nil end |
#default_tag(dir = Dir.pwd) ⇒ Object Originally defined in module Component
#fire_event(event, oneshot: true, reverse: false, reraise: false) ⇒ Object Originally defined in module Component
Invokes lifecycle hooks for event. Hooks run in registration order
(or LIFO when reverse: true, used for teardown). A raise in one hook
is reported via handle_exception and does NOT stop the next hook unless
reraise: true (used in tests / fail-fast boot). oneshot: true
clears the bucket after dispatch so the event can't fire twice.
#handle_exception(ex, ctx = {}) ⇒ Object Originally defined in module Component
#hostname ⇒ Object Originally defined in module Component
#leader? ⇒ Boolean Originally defined in module Component
True iff this process currently holds the cluster dear-leader lock.
Per spec, the check is performed at call time (Wurk does not cache);
callers must not poll faster than the 60s follower cadence. Returns
false unconditionally when WURK_LEADER=false (or SIDEKIQ_LEADER=false)
is set on the process (opt-out hot-standby). Any Redis error is swallowed →
false, so a transient partition can't propagate as an exception into user
code.
Spec: docs/target/sidekiq-ent.md §6.1.
#logger ⇒ Object Originally defined in module Component
--- delegated to config -------------------------------------------
#mono_ms ⇒ Object Originally defined in module Component
#process_nonce ⇒ Object Originally defined in module Component
#real_ms ⇒ Object Originally defined in module Component
--- clocks ---------------------------------------------------------
#redis ⇒ Object Originally defined in module Component
#safe_thread(name, priority: nil, &block) ⇒ Object Originally defined in module Component
Spawns a named thread that runs block under watchdog(name). The
parent must retain the returned Thread; otherwise GC may not, but
report_on_exception is disabled so we don't double-log on death.
#stop! ⇒ Object
Best-effort teardown. SREM removes us from the live list; UNLINK drops the identity hash and its work mirror. Idempotent: if Redis is down the next process boot's ProcessSet#cleanup prunes us.
80 81 82 83 84 85 86 87 88 89 |
# File 'lib/wurk/heartbeat.rb', line 80 def stop! redis do |conn| conn.pipelined do |pipe| pipe.call('SREM', Keys::PROCESSES, @identity) pipe.call('UNLINK', @identity, "#{@identity}:work") end end rescue StandardError nil end |
#tid ⇒ Object Originally defined in module Component
--- identity -------------------------------------------------------
#watchdog(last_words) ⇒ Object Originally defined in module Component
Wraps a block at a thread boundary: any unhandled exception is reported
via handle_exception (so it lands in error_handlers / the log) and then
re-raised. last_words is the component label included in the context.