Class: Wurk::Configuration

Inherits:
Object
  • Object
show all
Defined in:
lib/wurk/configuration.rb

Overview

Owns runtime knobs (concurrency, queues, timeouts, lifecycle events, error/death handlers) and the registry of Capsules. Single source of truth for everything the swarm / managers / processors need to boot.

Spec: docs/target/sidekiq-free.md §4 (Sidekiq::Config).

Constant Summary collapse

DEFAULTS =

Mirrors Sidekiq::Config::DEFAULTS. Order and keys are part of the drop-in contract — third-party gems read @options via [] / fetch / dig.

{
  labels: Set.new,
  require: '.',
  environment: nil,
  concurrency: 5,
  timeout: 25,
  poll_interval_average: nil,
  average_scheduled_poll_interval: 5,
  on_complex_arguments: :raise,
  max_iteration_runtime: nil,
  error_handlers: [],
  death_handlers: [],
  lifecycle_events: {
    startup: [],
    fork: [],
    quiet: [],
    shutdown: [],
    exit: [],
    heartbeat: [],
    beat: [],
    leader: []
  },
  dead_max_jobs: 10_000,
  dead_timeout_in_seconds: 180 * 24 * 60 * 60,
  reloader: proc { |&b| b.call },
  backtrace_cleaner: ->(bt) { bt },
  logged_job_attributes: %w[bid tags],
  redis_idle_timeout: nil
}.freeze
LIFECYCLE_EVENTS =

:fork fires only inside swarm children, after fork + internal AR/Redis reconnect — apps reopen sockets / non-fork-safe libs there (Ent §7.4).

%i[startup fork quiet shutdown exit heartbeat beat leader].freeze
DEFAULT_THREAD_PRIORITY =
-1
ERROR_HANDLER =

Default error handler. Wraps the report in the thread-local Wurk::Context so logger formatters/JSON layouts can pick up jid/bid/tags. full_message (with backtrace) in dev/debug, detailed_message in prod — mirrors the Sidekiq behavior so log scrapers built for one work for both.

Spec: docs/target/sidekiq-free.md §4.3.

lambda do |ex, ctx, cfg = Wurk.configuration|
  safe_ctx = ctx || {}
  Wurk::Context.with(safe_ctx) do
    dev = $DEBUG || ENV['WURK_DEBUG'] || cfg.logger.debug?
    msg = dev ? ex.full_message : ex.detailed_message
    cfg.logger.info { msg }
  end
end
HISTORY_DEFAULT_INTERVAL =

--- Historical metrics snapshotter (Ent §5) -------------------------

30

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(options = {}) ⇒ Configuration

Returns a new instance of Configuration.



81
82
83
84
85
86
87
88
89
90
91
92
# File 'lib/wurk/configuration.rb', line 81

def initialize(options = {})
  @options = deep_dup_defaults.merge(options)
  @options[:error_handlers] << ERROR_HANDLER if @options[:error_handlers].empty?
  @capsules = {}
  @directory = {}
  @client_chain = Middleware::Chain.new
  @server_chain = Middleware::Chain.new
  @redis_config = { url: ENV.fetch('REDIS_URL', 'redis://localhost:6379/0') }
  @logger = nil
  @thread_priority = DEFAULT_THREAD_PRIORITY
  @frozen = false
end

Instance Attribute Details

#capsulesObject (readonly)

Returns the value of attribute capsules.



69
70
71
# File 'lib/wurk/configuration.rb', line 69

def capsules
  @capsules
end

#directoryObject (readonly)

Returns the value of attribute directory.



69
70
71
# File 'lib/wurk/configuration.rb', line 69

def directory
  @directory
end

#dogstatsdObject

Pro parity: callable that builds the statsd / dogstatsd client. Invoked once per process AFTER fork; see Wurk::Metrics::Statsd.client. Assignable as a Proc, lambda, or any object responding to #call:

config.dogstatsd = -> { Datadog::Statsd.new('host', 8125) }

Spec: docs/target/sidekiq-pro.md §9.1.



79
80
81
# File 'lib/wurk/configuration.rb', line 79

def dogstatsd
  @dogstatsd
end

#loggerObject

--- Logger -----------------------------------------------------------



356
357
358
# File 'lib/wurk/configuration.rb', line 356

def logger
  @logger ||= default_logger
end

#redis_configObject (readonly)

Returns the value of attribute redis_config.



69
70
71
# File 'lib/wurk/configuration.rb', line 69

def redis_config
  @redis_config
end

#super_fetch_callbackObject (readonly)

Returns the value of attribute super_fetch_callback.



69
70
71
# File 'lib/wurk/configuration.rb', line 69

def super_fetch_callback
  @super_fetch_callback
end

#thread_priorityObject

Returns the value of attribute thread_priority.



70
71
72
# File 'lib/wurk/configuration.rb', line 70

def thread_priority
  @thread_priority
end

Instance Method Details

#[](key) ⇒ Object

--- Hash-like options access -----------------------------------------



96
# File 'lib/wurk/configuration.rb', line 96

def [](key) = @options.[](key)

#[]=(key, val) ⇒ Object



98
99
100
101
# File 'lib/wurk/configuration.rb', line 98

def []=(key, val)
  guard_frozen!
  @options[key] = val
end

#average_scheduled_poll_interval=(interval) ⇒ Object



213
214
215
# File 'lib/wurk/configuration.rb', line 213

def average_scheduled_poll_interval=(interval)
  @options[:average_scheduled_poll_interval] = interval
end

#capsule(name) {|cap| ... } ⇒ Object

Yields:

  • (cap)


141
142
143
144
145
146
# File 'lib/wurk/configuration.rb', line 141

def capsule(name)
  name = name.to_s
  cap = @capsules[name] ||= Capsule.new(name, self)
  yield cap if block_given?
  cap
end

#client_middleware {|@client_chain| ... } ⇒ Object

--- Middleware -------------------------------------------------------

Yields:

  • (@client_chain)


150
151
152
153
# File 'lib/wurk/configuration.rb', line 150

def client_middleware
  yield @client_chain if block_given?
  @client_chain
end

#concurrencyInteger

Returns threads per worker process for the default capsule (default 5). The process count is separate — set via WURK_COUNT (defaults to the CPU count). With a single capsule, total in-flight jobs = WURK_COUNT × concurrency; with multiple capsules see #total_concurrency for the cluster aggregate.

Returns:

  • (Integer)

    threads per worker process for the default capsule (default 5). The process count is separate — set via WURK_COUNT (defaults to the CPU count). With a single capsule, total in-flight jobs = WURK_COUNT × concurrency; with multiple capsules see #total_concurrency for the cluster aggregate.



120
# File 'lib/wurk/configuration.rb', line 120

def concurrency = default_capsule.concurrency

#concurrency=(val) ⇒ Object

Parameters:

  • val (Integer)

    threads per worker process



123
124
125
# File 'lib/wurk/configuration.rb', line 123

def concurrency=(val)
  default_capsule.concurrency = val
end

#configure_client {|_self| ... } ⇒ Object

Yields:

  • (_self)

Yield Parameters:



380
381
382
# File 'lib/wurk/configuration.rb', line 380

def configure_client(&block)
  yield self if block && !server?
end

#configure_server {|_self| ... } ⇒ Object

--- Configure blocks (Sidekiq.configure_server / _client) -----------

Yields:

  • (_self)

Yield Parameters:



376
377
378
# File 'lib/wurk/configuration.rb', line 376

def configure_server(&block)
  yield self if block && server?
end

#death_handlersObject



209
210
211
# File 'lib/wurk/configuration.rb', line 209

def death_handlers
  @options[:death_handlers]
end

#default_capsuleObject



137
138
139
# File 'lib/wurk/configuration.rb', line 137

def default_capsule(&)
  capsule('default', &)
end

#dig(*keys) ⇒ Object



111
# File 'lib/wurk/configuration.rb', line 111

def dig(*keys) = @options.dig(*keys)

#error_handlersObject

--- Handlers ---------------------------------------------------------



205
206
207
# File 'lib/wurk/configuration.rb', line 205

def error_handlers
  @options[:error_handlers]
end

#fetchObject



103
# File 'lib/wurk/configuration.rb', line 103

def fetch(*, &) = @options.fetch(*, &)

#fetch_poll_intervalObject



226
227
228
# File 'lib/wurk/configuration.rb', line 226

def fetch_poll_interval
  @options[:fetch_poll_interval]
end

#fetch_poll_interval=(seconds) ⇒ Object

Reliable-fetch empty-poll backoff: the BLMOVE block timeout (seconds) used when every served queue is empty. Pro super_fetch §3.3's fetch_poll_interval knob. Unset (nil) → the fetcher's default (Wurk::Fetcher::Reliable::TIMEOUT, 2s). Also readable as config[:fetch_poll_interval].



222
223
224
# File 'lib/wurk/configuration.rb', line 222

def fetch_poll_interval=(seconds)
  @options[:fetch_poll_interval] = seconds
end

#freeze!Object



421
422
423
424
425
426
427
428
429
430
# File 'lib/wurk/configuration.rb', line 421

def freeze!
  return self if @frozen

  @capsules.each_value(&:freeze)
  @capsules.freeze
  @options.freeze
  @directory.freeze
  @frozen = true
  self
end

#frozen?Boolean

Returns:

  • (Boolean)


432
433
434
# File 'lib/wurk/configuration.rb', line 432

def frozen?
  @frozen
end

#handle_exception(ex, ctx = {}) ⇒ Object



362
363
364
365
366
367
368
369
370
371
372
# File 'lib/wurk/configuration.rb', line 362

def handle_exception(ex, ctx = {})
  if error_handlers.empty?
    logger.error("#{ctx} #{ex.class}: #{ex.message}")
  else
    error_handlers.each do |handler|
      handler.call(ex, ctx, self)
    rescue StandardError => e
      logger.error("error_handler raised: #{e.class}: #{e.message}")
    end
  end
end

#health_check(port:, bind: '0.0.0.0', ready_window: 30) ⇒ Object

Opt-in thin HTTP listener inside the worker process for k8s probes. When called, the Launcher will start a TCP server on port bound to bind exposing GET /live (200 while not stopping) and GET /ready (200 only when Redis is reachable AND heartbeat fired within ready_window seconds).

Off by default — call this in a configure_server block to enable. Spec: docs/target/sidekiq-ent.md §7.1.2.

Raises:

  • (ArgumentError)


330
331
332
333
334
335
336
337
338
339
340
341
# File 'lib/wurk/configuration.rb', line 330

def health_check(port:, bind: '0.0.0.0', ready_window: 30)
  guard_frozen!
  p = Integer(port)
  rw = Integer(ready_window)
  raise ArgumentError, 'port must be between 0 and 65535' unless (0..65535).cover?(p)
  raise ArgumentError, 'ready_window must be > 0' unless rw.positive?

  b = bind.to_s
  raise ArgumentError, 'bind must be a non-empty string' if b.empty?

  @options[:health_check_options] = { port: p, bind: b, ready_window: rw }
end

#history_collectorObject



305
# File 'lib/wurk/configuration.rb', line 305

def history_collector = @options[:history_collector]

#history_enabled?Boolean

Returns:

  • (Boolean)


303
# File 'lib/wurk/configuration.rb', line 303

def history_enabled? = @options.key?(:history_interval)

#history_intervalObject



304
# File 'lib/wurk/configuration.rb', line 304

def history_interval = @options.fetch(:history_interval, HISTORY_DEFAULT_INTERVAL)

#inspectObject



436
437
438
# File 'lib/wurk/configuration.rb', line 436

def inspect
  "#<#{self.class} capsules=#{@capsules.keys} concurrency=#{total_concurrency}>"
end

#key?(key) ⇒ Boolean Also known as: has_key?

Returns:

  • (Boolean)


104
# File 'lib/wurk/configuration.rb', line 104

def key?(key) = @options.key?(key)

#local_redis_poolObject



171
172
173
# File 'lib/wurk/configuration.rb', line 171

def local_redis_pool
  @local_redis_pool ||= build_redis_pool(size: 10, name: 'internal')
end

#lookup(name, default_class = nil) ⇒ Object



199
200
201
# File 'lib/wurk/configuration.rb', line 199

def lookup(name, default_class = nil)
  @directory[name] ||= default_class&.new
end

#memory_limit_kbObject

Threshold in KB, the unit the swarm compares against /proc//statm (pages × 4KB). nil when recycling is disabled.



416
417
418
419
# File 'lib/wurk/configuration.rb', line 416

def memory_limit_kb
  mb = memory_limit_mb
  mb&.positive? ? mb * 1024 : nil
end

#memory_limit_mbObject

Memory-based child recycling (Sidekiq Ent §7.5): the swarm parent TERMs (and respawns) any child whose RSS exceeds this many MB. Set in code or via SIDEKIQ_MAXMEM_MB (WURK_MAXMEM_MB is the native alias); an explicit value wins over the env. nil/0 disables recycling (the default).



405
406
407
# File 'lib/wurk/configuration.rb', line 405

def memory_limit_mb
  @memory_limit_mb || env_memory_limit_mb
end

#memory_limit_mb=(value) ⇒ Object



409
410
411
412
# File 'lib/wurk/configuration.rb', line 409

def memory_limit_mb=(value)
  guard_frozen!
  @memory_limit_mb = value.nil? ? nil : Integer(value)
end

#merge!(other) ⇒ Object



106
107
108
109
# File 'lib/wurk/configuration.rb', line 106

def merge!(other)
  guard_frozen!
  @options.merge!(other)
end

#new_redis_pool(size, name = 'custom') ⇒ Object



184
185
186
# File 'lib/wurk/configuration.rb', line 184

def new_redis_pool(size, name = 'custom')
  build_redis_pool(size: size, name: name)
end

#on(event, &block) ⇒ Object

--- Lifecycle hooks --------------------------------------------------

Raises:

  • (ArgumentError)


345
346
347
348
349
350
351
352
# File 'lib/wurk/configuration.rb', line 345

def on(event, &block)
  raise ArgumentError, "block required for on(#{event.inspect})" unless block
  unless LIFECYCLE_EVENTS.include?(event)
    raise ArgumentError, "invalid event #{event.inspect}, must be one of #{LIFECYCLE_EVENTS.inspect}"
  end

  @options[:lifecycle_events][event] << block
end

#periodic {|mgr| ... } ⇒ Wurk::Cron::Manager

Yields a Wurk::Cron::Manager so the host app can register periodic jobs at boot. Manager state is shared per-process so multiple config.periodic blocks accumulate (matches Sidekiq Ent §2.1). This is the native replacement for the sidekiq-cron gem.

Spec: docs/target/sidekiq-ent.md §2.

Examples:

Register cron jobs at boot

Wurk.configure_server do |config|
  config.periodic do |mgr|
    mgr.register("*/5 * * * *", ReportJob)
    mgr.register("0 0 * * *", NightlyJob, tz: "UTC")
  end
end

Yield Parameters:

Returns:



274
275
276
277
278
279
# File 'lib/wurk/configuration.rb', line 274

def periodic
  require_relative 'cron'
  @periodic_manager ||= Wurk::Cron::Manager.new(self)
  yield @periodic_manager if block_given?
  @periodic_manager
end

#queuesObject



127
# File 'lib/wurk/configuration.rb', line 127

def queues = default_capsule.queues

#queues=(val) ⇒ Object



129
130
131
# File 'lib/wurk/configuration.rb', line 129

def queues=(val)
  default_capsule.queues = val
end

#redisObject



188
189
190
# File 'lib/wurk/configuration.rb', line 188

def redis(&)
  redis_pool.with(&)
end

#redis=(hash) ⇒ Object

--- Redis ------------------------------------------------------------



162
163
164
165
# File 'lib/wurk/configuration.rb', line 162

def redis=(hash)
  guard_frozen!
  @redis_config = @redis_config.merge(hash.transform_keys(&:to_sym))
end

#redis_poolObject



167
168
169
# File 'lib/wurk/configuration.rb', line 167

def redis_pool
  default_capsule.redis_pool
end

#register(name, instance) ⇒ Object

--- Service locator (extension registry) ----------------------------



194
195
196
197
# File 'lib/wurk/configuration.rb', line 194

def register(name, instance)
  guard_frozen!
  @directory[name] = instance
end

#reliable_scheduler!Object

Pro reliable scheduler (§4): promote due jobs from retry/schedule onto their target queue in a single atomic Lua (ZRANGEBYSCORE+ZREM+LPUSH), closing the pop→push job-loss window of the default poller. Swaps the pluggable scheduled_enq for the atomic promoter; idempotent.



251
252
253
254
# File 'lib/wurk/configuration.rb', line 251

def reliable_scheduler!(*)
  self[:scheduled_enq] = Wurk::Scheduled::ReliableEnq
  nil
end

#reset_redis_pools!Object

Disconnect and drop every cached pool — the per-capsule mains plus the config-level internal pool. Used by Wurk::Swarm so the parent never leaks sockets into forks and each child can build fresh ones.



178
179
180
181
182
# File 'lib/wurk/configuration.rb', line 178

def reset_redis_pools!
  @capsules.each_value(&:reset_redis_pools!)
  @local_redis_pool&.disconnect!
  @local_redis_pool = nil
end

#retain_history(seconds = HISTORY_DEFAULT_INTERVAL, &block) ⇒ Object

Enables the Ent Historical Metrics snapshotter: every seconds the cluster leader emits a statsd-shaped snapshot to the configured dogstatsd client. With no block the default §5.2 gauge set is published; a block receives the dogstatsd client s and collects custom metrics instead. The Launcher starts the snapshotter only when this has been called.

Spec: docs/target/sidekiq-ent.md §5.1.

Raises:

  • (ArgumentError)


293
294
295
296
297
298
299
300
301
# File 'lib/wurk/configuration.rb', line 293

def retain_history(seconds = HISTORY_DEFAULT_INTERVAL, &block)
  guard_frozen!
  interval = Float(seconds)
  raise ArgumentError, 'retain_history interval must be > 0' unless interval.positive?

  @options[:history_interval] = interval
  @options[:history_collector] = block
  nil
end

#server?Boolean

Returns:

  • (Boolean)


384
385
386
# File 'lib/wurk/configuration.rb', line 384

def server?
  @options[:server] == true
end

#server_middleware {|@server_chain| ... } ⇒ Object

Yields:

  • (@server_chain)


155
156
157
158
# File 'lib/wurk/configuration.rb', line 155

def server_middleware
  yield @server_chain if block_given?
  @server_chain
end

#super_fetch!(&block) ⇒ Object

Sidekiq Pro's opt-in toggles for reliable fetch and the reliable scheduler. Both are already the default in Wurk — the fetcher is always the reliable BLMOVE fetcher with orphan reclamation, and the scheduler is always atomic Lua — so the toggle itself is a no-op. They exist only so a Pro initializer drops in unchanged instead of raising NoMethodError.

The optional block is Pro's recovery callback: |jobstr, pill|, fired once per orphan recovery (pill nil) and once on a poison kill (pill responds to .jid/.klass/.count/.queue). The reaper drives it via Wurk::Middleware::PoisonPill.track!. Spec: docs/target/sidekiq-pro.md §3.1.



242
243
244
245
# File 'lib/wurk/configuration.rb', line 242

def super_fetch!(*, &block)
  @super_fetch_callback = block if block
  nil
end

#topologyObject

Worker topology for the swarm. When the host hasn't declared one (the railtie path), default to a single flat fork running the default capsule's queues + concurrency. Assign a custom Wurk::Topology (via topology=) for specialized slots.



392
393
394
# File 'lib/wurk/configuration.rb', line 392

def topology
  @topology ||= default_topology
end

#topology=(value) ⇒ Object



396
397
398
399
# File 'lib/wurk/configuration.rb', line 396

def topology=(value)
  guard_frozen!
  @topology = value
end

#total_concurrencyObject



133
134
135
# File 'lib/wurk/configuration.rb', line 133

def total_concurrency
  @capsules.each_value.sum(&:concurrency)
end

#webObject

Web UI configuration: the authorization hook and read-only mode. Returns the process-wide Wurk::Web.config singleton so config.web.read_only = true and the engine middleware share one source of truth. Lazy-requires the web layer to keep standalone boot lean.

Spec: docs/target/sidekiq-ent.md §9.2.



315
316
317
318
# File 'lib/wurk/configuration.rb', line 315

def web
  require_relative 'web/config'
  Wurk::Web.config
end