Class: Wurk::Heartbeat

Inherits:
Object
  • Object
show all
Includes:
Component
Defined in:
lib/wurk/heartbeat.rb

Overview

Single-purpose owner of the Redis-side process heartbeat. Lifted out of Wurk::Launcher so the launcher can stay focused on lifecycle and so the heartbeat schema lives in one place readers can grep for.

Each beat is one pipelined round-trip:

SADD   processes <identity>
HSET   <identity>          info concurrency busy beat quiet rss rtt_us
EXPIRE <identity>          60
UNLINK <identity>:work
HSET   <identity>:work     <tid> <json> ...    (only if WORK_STATE non-empty)
EXPIRE <identity>:work     60                  (only if WORK_STATE non-empty)
LPOP   <identity>-signals  × BEAT_PAUSE

The work hash is UNLINK-then-rewritten on every beat — a dropped beat momentarily empties it, and ProcessSet#cleanup compensates by SREM-ing identities whose info field has expired.

Heartbeat owns only the wire. Signals queued by the dashboard are pulled out of the pipelined results and returned to the caller for dispatch — TSTP/TERM semantics live in Launcher.

:heartbeat fires once on the first successful beat and again after any network partition (rearmed when beat! rescues). :beat fires every tick.

Spec: docs/target/sidekiq-free.md §12 (Launcher#❤️).

Constant Summary collapse

BEAT_PAUSE =

Cadence in seconds. Key TTL is 60s — a process is dead after ~6 misses.

10
TTL_SECONDS =
60

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(identity:, config:, started_at: nil, embedded: false, quiet: nil) ⇒ Heartbeat

quiet: is a callable so Launcher can keep ownership of its @done flag without an awkward setter contract. info_overrides: lets the caller (Launcher#embedded, tests) inject fields without forcing Heartbeat to know about every flag the host process tracks.



48
49
50
51
52
53
54
55
56
# File 'lib/wurk/heartbeat.rb', line 48

def initialize(identity:, config:, started_at: nil, embedded: false, quiet: nil)
  @identity = identity
  @config = config
  @started_at = started_at || Time.now.to_f
  @embedded = embedded
  @quiet_proc = quiet || -> { false }
  @first_heartbeat = true
  @rtt_us = 0
end

Instance Attribute Details

#configObject (readonly) Originally defined in module Component

Returns the value of attribute config.

#identityObject (readonly)

Returns the value of attribute identity.



42
43
44
# File 'lib/wurk/heartbeat.rb', line 42

def identity
  @identity
end

#last_beat_atObject (readonly)

Returns the value of attribute last_beat_at.



42
43
44
# File 'lib/wurk/heartbeat.rb', line 42

def last_beat_at
  @last_beat_at
end

#rtt_usObject (readonly)

Returns the value of attribute rtt_us.



42
43
44
# File 'lib/wurk/heartbeat.rb', line 42

def rtt_us
  @rtt_us
end

Instance Method Details

#beat!Object

Pipelined beat. Returns the drained signals as Array on success, or nil if Redis raised — the next successful beat refires :heartbeat so partition recovery is observable.



61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
# File 'lib/wurk/heartbeat.rb', line 61

def beat!
  sigs, @rtt_us = pipelined_beat
  @last_beat_at = Time.now.to_f
  if @first_heartbeat
    @first_heartbeat = false
    fire_event(:heartbeat)
  end
  fire_event(:beat, oneshot: false)
  emit_statsd_gauges
  sigs.compact
rescue StandardError => e
  @first_heartbeat = true
  handle_exception(e, { context: 'heartbeat' })
  nil
end

#default_tag(dir = Dir.pwd) ⇒ Object Originally defined in module Component

#fire_event(event, oneshot: true, reverse: false, reraise: false) ⇒ Object Originally defined in module Component

Invokes lifecycle hooks for event. Hooks run in registration order (or LIFO when reverse: true, used for teardown). A raise in one hook is reported via handle_exception and does NOT stop the next hook unless reraise: true (used in tests / fail-fast boot). oneshot: true clears the bucket after dispatch so the event can't fire twice.

#handle_exception(ex, ctx = {}) ⇒ Object Originally defined in module Component

#hostnameObject Originally defined in module Component

#leader?Boolean Originally defined in module Component

True iff this process currently holds the cluster dear-leader lock. Per spec, the check is performed at call time (Wurk does not cache); callers must not poll faster than the 60s follower cadence. Returns false unconditionally when WURK_LEADER=false (or SIDEKIQ_LEADER=false) is set on the process (opt-out hot-standby). Any Redis error is swallowed → false, so a transient partition can't propagate as an exception into user code.

Spec: docs/target/sidekiq-ent.md §6.1.

Returns:

  • (Boolean)

#loggerObject Originally defined in module Component

--- delegated to config -------------------------------------------

#mono_msObject Originally defined in module Component

#process_nonceObject Originally defined in module Component

#real_msObject Originally defined in module Component

--- clocks ---------------------------------------------------------

#redisObject Originally defined in module Component

#safe_thread(name, priority: nil, &block) ⇒ Object Originally defined in module Component

Spawns a named thread that runs block under watchdog(name). The parent must retain the returned Thread; otherwise GC may not, but report_on_exception is disabled so we don't double-log on death.

#stop!Object

Best-effort teardown. SREM removes us from the live list; UNLINK drops the identity hash and its work mirror. Idempotent: if Redis is down the next process boot's ProcessSet#cleanup prunes us.



80
81
82
83
84
85
86
87
88
89
# File 'lib/wurk/heartbeat.rb', line 80

def stop!
  redis do |conn|
    conn.pipelined do |pipe|
      pipe.call('SREM', Keys::PROCESSES, @identity)
      pipe.call('UNLINK', @identity, "#{@identity}:work")
    end
  end
rescue StandardError
  nil
end

#tidObject Originally defined in module Component

--- identity -------------------------------------------------------

#watchdog(last_words) ⇒ Object Originally defined in module Component

Wraps a block at a thread boundary: any unhandled exception is reported via handle_exception (so it lands in error_handlers / the log) and then re-raised. last_words is the component label included in the context.