Class: Wurk::Configuration
- Inherits:
-
Object
- Object
- Wurk::Configuration
- Defined in:
- lib/wurk/configuration.rb
Overview
Owns runtime knobs (concurrency, queues, timeouts, lifecycle events, error/death handlers) and the registry of Capsules. Single source of truth for everything the swarm / managers / processors need to boot.
Spec: docs/target/sidekiq-free.md §4 (Sidekiq::Config).
Constant Summary collapse
- DEFAULTS =
Mirrors Sidekiq::Config::DEFAULTS. Order and keys are part of the drop-in contract — third-party gems read @options via [] / fetch / dig.
{ labels: Set.new, require: '.', environment: nil, concurrency: 5, timeout: 25, poll_interval_average: nil, average_scheduled_poll_interval: 5, on_complex_arguments: :raise, max_iteration_runtime: nil, error_handlers: [], death_handlers: [], lifecycle_events: { startup: [], fork: [], quiet: [], shutdown: [], exit: [], heartbeat: [], beat: [], leader: [] }, dead_max_jobs: 10_000, dead_timeout_in_seconds: 180 * 24 * 60 * 60, reloader: proc { |&b| b.call }, backtrace_cleaner: ->(bt) { bt }, logged_job_attributes: %w[bid tags], redis_idle_timeout: nil }.freeze
- LIFECYCLE_EVENTS =
:fork fires only inside swarm children, after fork + internal AR/Redis reconnect — apps reopen sockets / non-fork-safe libs there (Ent §7.4).
%i[startup fork quiet shutdown exit heartbeat beat leader].freeze
- DEFAULT_THREAD_PRIORITY =
-1
- ERROR_HANDLER =
Default error handler. Wraps the report in the thread-local Wurk::Context so logger formatters/JSON layouts can pick up jid/bid/tags.
full_message(with backtrace) in dev/debug,detailed_messagein prod — mirrors the Sidekiq behavior so log scrapers built for one work for both.Spec: docs/target/sidekiq-free.md §4.3.
lambda do |ex, ctx, cfg = Wurk.configuration| safe_ctx = ctx || {} Wurk::Context.with(safe_ctx) do dev = $DEBUG || ENV['WURK_DEBUG'] || cfg.logger.debug? msg = dev ? ex. : ex. cfg.logger.info { msg } end end
- HISTORY_DEFAULT_INTERVAL =
--- Historical metrics snapshotter (Ent §5) -------------------------
30
Instance Attribute Summary collapse
-
#capsules ⇒ Object
readonly
Returns the value of attribute capsules.
-
#directory ⇒ Object
readonly
Returns the value of attribute directory.
-
#dogstatsd ⇒ Object
Pro parity: callable that builds the statsd / dogstatsd client.
-
#logger ⇒ Object
--- Logger -----------------------------------------------------------.
-
#redis_config ⇒ Object
readonly
Returns the value of attribute redis_config.
-
#super_fetch_callback ⇒ Object
readonly
Returns the value of attribute super_fetch_callback.
-
#thread_priority ⇒ Object
Returns the value of attribute thread_priority.
Instance Method Summary collapse
-
#[](key) ⇒ Object
--- Hash-like options access -----------------------------------------.
- #[]=(key, val) ⇒ Object
- #average_scheduled_poll_interval=(interval) ⇒ Object
- #capsule(name) {|cap| ... } ⇒ Object
-
#client_middleware {|@client_chain| ... } ⇒ Object
--- Middleware -------------------------------------------------------.
-
#concurrency ⇒ Integer
Threads per worker process for the default capsule (default 5).
- #concurrency=(val) ⇒ Object
- #configure_client {|_self| ... } ⇒ Object
-
#configure_server {|_self| ... } ⇒ Object
--- Configure blocks (Sidekiq.configure_server / _client) -----------.
- #death_handlers ⇒ Object
- #default_capsule ⇒ Object
- #dig(*keys) ⇒ Object
-
#error_handlers ⇒ Object
--- Handlers ---------------------------------------------------------.
- #fetch ⇒ Object
- #fetch_poll_interval ⇒ Object
-
#fetch_poll_interval=(seconds) ⇒ Object
Reliable-fetch empty-poll backoff: the BLMOVE block timeout (seconds) used when every served queue is empty.
- #freeze! ⇒ Object
- #frozen? ⇒ Boolean
- #handle_exception(ex, ctx = {}) ⇒ Object
-
#health_check(port:, bind: '0.0.0.0', ready_window: 30) ⇒ Object
Opt-in thin HTTP listener inside the worker process for k8s probes.
- #history_collector ⇒ Object
- #history_enabled? ⇒ Boolean
- #history_interval ⇒ Object
-
#initialize(options = {}) ⇒ Configuration
constructor
A new instance of Configuration.
- #inspect ⇒ Object
- #key?(key) ⇒ Boolean (also: #has_key?)
- #local_redis_pool ⇒ Object
- #lookup(name, default_class = nil) ⇒ Object
-
#memory_limit_kb ⇒ Object
Threshold in KB, the unit the swarm compares against /proc/
/statm (pages × 4KB). -
#memory_limit_mb ⇒ Object
Memory-based child recycling (Sidekiq Ent §7.5): the swarm parent TERMs (and respawns) any child whose RSS exceeds this many MB.
- #memory_limit_mb=(value) ⇒ Object
- #merge!(other) ⇒ Object
- #new_redis_pool(size, name = 'custom') ⇒ Object
-
#on(event, &block) ⇒ Object
--- Lifecycle hooks --------------------------------------------------.
-
#periodic {|mgr| ... } ⇒ Wurk::Cron::Manager
Yields a Wurk::Cron::Manager so the host app can register periodic jobs at boot.
- #queues ⇒ Object
- #queues=(val) ⇒ Object
- #redis ⇒ Object
-
#redis=(hash) ⇒ Object
--- Redis ------------------------------------------------------------.
- #redis_pool ⇒ Object
-
#register(name, instance) ⇒ Object
--- Service locator (extension registry) ----------------------------.
-
#reliable_scheduler! ⇒ Object
Pro reliable scheduler (§4): promote due jobs from retry/schedule onto their target queue in a single atomic Lua (ZRANGEBYSCORE+ZREM+LPUSH), closing the pop→push job-loss window of the default poller.
-
#reset_redis_pools! ⇒ Object
Disconnect and drop every cached pool — the per-capsule mains plus the config-level internal pool.
-
#retain_history(seconds = HISTORY_DEFAULT_INTERVAL, &block) ⇒ Object
Enables the Ent Historical Metrics snapshotter: every
secondsthe cluster leader emits a statsd-shaped snapshot to the configureddogstatsdclient. - #server? ⇒ Boolean
- #server_middleware {|@server_chain| ... } ⇒ Object
-
#super_fetch!(&block) ⇒ Object
Sidekiq Pro's opt-in toggles for reliable fetch and the reliable scheduler.
-
#topology ⇒ Object
Worker topology for the swarm.
- #topology=(value) ⇒ Object
- #total_concurrency ⇒ Object
-
#web ⇒ Object
Web UI configuration: the authorization hook and read-only mode.
Constructor Details
#initialize(options = {}) ⇒ Configuration
Returns a new instance of Configuration.
81 82 83 84 85 86 87 88 89 90 91 92 |
# File 'lib/wurk/configuration.rb', line 81 def initialize( = {}) @options = deep_dup_defaults.merge() @options[:error_handlers] << ERROR_HANDLER if @options[:error_handlers].empty? @capsules = {} @directory = {} @client_chain = Middleware::Chain.new @server_chain = Middleware::Chain.new @redis_config = { url: ENV.fetch('REDIS_URL', 'redis://localhost:6379/0') } @logger = nil @thread_priority = DEFAULT_THREAD_PRIORITY @frozen = false end |
Instance Attribute Details
#capsules ⇒ Object (readonly)
Returns the value of attribute capsules.
69 70 71 |
# File 'lib/wurk/configuration.rb', line 69 def capsules @capsules end |
#directory ⇒ Object (readonly)
Returns the value of attribute directory.
69 70 71 |
# File 'lib/wurk/configuration.rb', line 69 def directory @directory end |
#dogstatsd ⇒ Object
Pro parity: callable that builds the statsd / dogstatsd client. Invoked once per process AFTER fork; see Wurk::Metrics::Statsd.client. Assignable as a Proc, lambda, or any object responding to #call:
config.dogstatsd = -> { Datadog::Statsd.new('host', 8125) }
Spec: docs/target/sidekiq-pro.md §9.1.
79 80 81 |
# File 'lib/wurk/configuration.rb', line 79 def dogstatsd @dogstatsd end |
#logger ⇒ Object
--- Logger -----------------------------------------------------------
356 357 358 |
# File 'lib/wurk/configuration.rb', line 356 def logger @logger ||= default_logger end |
#redis_config ⇒ Object (readonly)
Returns the value of attribute redis_config.
69 70 71 |
# File 'lib/wurk/configuration.rb', line 69 def redis_config @redis_config end |
#super_fetch_callback ⇒ Object (readonly)
Returns the value of attribute super_fetch_callback.
69 70 71 |
# File 'lib/wurk/configuration.rb', line 69 def super_fetch_callback @super_fetch_callback end |
#thread_priority ⇒ Object
Returns the value of attribute thread_priority.
70 71 72 |
# File 'lib/wurk/configuration.rb', line 70 def thread_priority @thread_priority end |
Instance Method Details
#[](key) ⇒ Object
--- Hash-like options access -----------------------------------------
96 |
# File 'lib/wurk/configuration.rb', line 96 def [](key) = @options.[](key) |
#[]=(key, val) ⇒ Object
98 99 100 101 |
# File 'lib/wurk/configuration.rb', line 98 def []=(key, val) guard_frozen! @options[key] = val end |
#average_scheduled_poll_interval=(interval) ⇒ Object
213 214 215 |
# File 'lib/wurk/configuration.rb', line 213 def average_scheduled_poll_interval=(interval) @options[:average_scheduled_poll_interval] = interval end |
#capsule(name) {|cap| ... } ⇒ Object
141 142 143 144 145 146 |
# File 'lib/wurk/configuration.rb', line 141 def capsule(name) name = name.to_s cap = @capsules[name] ||= Capsule.new(name, self) yield cap if block_given? cap end |
#client_middleware {|@client_chain| ... } ⇒ Object
--- Middleware -------------------------------------------------------
150 151 152 153 |
# File 'lib/wurk/configuration.rb', line 150 def client_middleware yield @client_chain if block_given? @client_chain end |
#concurrency ⇒ Integer
Returns threads per worker process for the default capsule
(default 5). The process count is separate — set via WURK_COUNT
(defaults to the CPU count). With a single capsule, total in-flight
jobs = WURK_COUNT × concurrency; with multiple capsules see
#total_concurrency for the cluster aggregate.
120 |
# File 'lib/wurk/configuration.rb', line 120 def concurrency = default_capsule.concurrency |
#concurrency=(val) ⇒ Object
123 124 125 |
# File 'lib/wurk/configuration.rb', line 123 def concurrency=(val) default_capsule.concurrency = val end |
#configure_client {|_self| ... } ⇒ Object
380 381 382 |
# File 'lib/wurk/configuration.rb', line 380 def configure_client(&block) yield self if block && !server? end |
#configure_server {|_self| ... } ⇒ Object
--- Configure blocks (Sidekiq.configure_server / _client) -----------
376 377 378 |
# File 'lib/wurk/configuration.rb', line 376 def configure_server(&block) yield self if block && server? end |
#death_handlers ⇒ Object
209 210 211 |
# File 'lib/wurk/configuration.rb', line 209 def death_handlers @options[:death_handlers] end |
#default_capsule ⇒ Object
137 138 139 |
# File 'lib/wurk/configuration.rb', line 137 def default_capsule(&) capsule('default', &) end |
#dig(*keys) ⇒ Object
111 |
# File 'lib/wurk/configuration.rb', line 111 def dig(*keys) = @options.dig(*keys) |
#error_handlers ⇒ Object
--- Handlers ---------------------------------------------------------
205 206 207 |
# File 'lib/wurk/configuration.rb', line 205 def error_handlers @options[:error_handlers] end |
#fetch ⇒ Object
103 |
# File 'lib/wurk/configuration.rb', line 103 def fetch(*, &) = @options.fetch(*, &) |
#fetch_poll_interval ⇒ Object
226 227 228 |
# File 'lib/wurk/configuration.rb', line 226 def fetch_poll_interval @options[:fetch_poll_interval] end |
#fetch_poll_interval=(seconds) ⇒ Object
Reliable-fetch empty-poll backoff: the BLMOVE block timeout (seconds)
used when every served queue is empty. Pro super_fetch §3.3's
fetch_poll_interval knob. Unset (nil) → the fetcher's default
(Wurk::Fetcher::Reliable::TIMEOUT, 2s). Also readable as
config[:fetch_poll_interval].
222 223 224 |
# File 'lib/wurk/configuration.rb', line 222 def fetch_poll_interval=(seconds) @options[:fetch_poll_interval] = seconds end |
#freeze! ⇒ Object
421 422 423 424 425 426 427 428 429 430 |
# File 'lib/wurk/configuration.rb', line 421 def freeze! return self if @frozen @capsules.each_value(&:freeze) @capsules.freeze @options.freeze @directory.freeze @frozen = true self end |
#frozen? ⇒ Boolean
432 433 434 |
# File 'lib/wurk/configuration.rb', line 432 def frozen? @frozen end |
#handle_exception(ex, ctx = {}) ⇒ Object
362 363 364 365 366 367 368 369 370 371 372 |
# File 'lib/wurk/configuration.rb', line 362 def handle_exception(ex, ctx = {}) if error_handlers.empty? logger.error("#{ctx} #{ex.class}: #{ex.}") else error_handlers.each do |handler| handler.call(ex, ctx, self) rescue StandardError => e logger.error("error_handler raised: #{e.class}: #{e.}") end end end |
#health_check(port:, bind: '0.0.0.0', ready_window: 30) ⇒ Object
Opt-in thin HTTP listener inside the worker process for k8s probes.
When called, the Launcher will start a TCP server on port bound to
bind exposing GET /live (200 while not stopping) and GET /ready
(200 only when Redis is reachable AND heartbeat fired within
ready_window seconds).
Off by default — call this in a configure_server block to enable.
Spec: docs/target/sidekiq-ent.md §7.1.2.
330 331 332 333 334 335 336 337 338 339 340 341 |
# File 'lib/wurk/configuration.rb', line 330 def health_check(port:, bind: '0.0.0.0', ready_window: 30) guard_frozen! p = Integer(port) rw = Integer(ready_window) raise ArgumentError, 'port must be between 0 and 65535' unless (0..65535).cover?(p) raise ArgumentError, 'ready_window must be > 0' unless rw.positive? b = bind.to_s raise ArgumentError, 'bind must be a non-empty string' if b.empty? @options[:health_check_options] = { port: p, bind: b, ready_window: rw } end |
#history_collector ⇒ Object
305 |
# File 'lib/wurk/configuration.rb', line 305 def history_collector = @options[:history_collector] |
#history_enabled? ⇒ Boolean
303 |
# File 'lib/wurk/configuration.rb', line 303 def history_enabled? = @options.key?(:history_interval) |
#history_interval ⇒ Object
304 |
# File 'lib/wurk/configuration.rb', line 304 def history_interval = @options.fetch(:history_interval, HISTORY_DEFAULT_INTERVAL) |
#inspect ⇒ Object
436 437 438 |
# File 'lib/wurk/configuration.rb', line 436 def inspect "#<#{self.class} capsules=#{@capsules.keys} concurrency=#{total_concurrency}>" end |
#key?(key) ⇒ Boolean Also known as: has_key?
104 |
# File 'lib/wurk/configuration.rb', line 104 def key?(key) = @options.key?(key) |
#local_redis_pool ⇒ Object
171 172 173 |
# File 'lib/wurk/configuration.rb', line 171 def local_redis_pool @local_redis_pool ||= build_redis_pool(size: 10, name: 'internal') end |
#lookup(name, default_class = nil) ⇒ Object
199 200 201 |
# File 'lib/wurk/configuration.rb', line 199 def lookup(name, default_class = nil) @directory[name] ||= default_class&.new end |
#memory_limit_kb ⇒ Object
Threshold in KB, the unit the swarm compares against /proc/
416 417 418 419 |
# File 'lib/wurk/configuration.rb', line 416 def memory_limit_kb mb = memory_limit_mb mb&.positive? ? mb * 1024 : nil end |
#memory_limit_mb ⇒ Object
Memory-based child recycling (Sidekiq Ent §7.5): the swarm parent TERMs (and respawns) any child whose RSS exceeds this many MB. Set in code or via SIDEKIQ_MAXMEM_MB (WURK_MAXMEM_MB is the native alias); an explicit value wins over the env. nil/0 disables recycling (the default).
405 406 407 |
# File 'lib/wurk/configuration.rb', line 405 def memory_limit_mb @memory_limit_mb || env_memory_limit_mb end |
#memory_limit_mb=(value) ⇒ Object
409 410 411 412 |
# File 'lib/wurk/configuration.rb', line 409 def memory_limit_mb=(value) guard_frozen! @memory_limit_mb = value.nil? ? nil : Integer(value) end |
#merge!(other) ⇒ Object
106 107 108 109 |
# File 'lib/wurk/configuration.rb', line 106 def merge!(other) guard_frozen! @options.merge!(other) end |
#new_redis_pool(size, name = 'custom') ⇒ Object
184 185 186 |
# File 'lib/wurk/configuration.rb', line 184 def new_redis_pool(size, name = 'custom') build_redis_pool(size: size, name: name) end |
#on(event, &block) ⇒ Object
--- Lifecycle hooks --------------------------------------------------
345 346 347 348 349 350 351 352 |
# File 'lib/wurk/configuration.rb', line 345 def on(event, &block) raise ArgumentError, "block required for on(#{event.inspect})" unless block unless LIFECYCLE_EVENTS.include?(event) raise ArgumentError, "invalid event #{event.inspect}, must be one of #{LIFECYCLE_EVENTS.inspect}" end @options[:lifecycle_events][event] << block end |
#periodic {|mgr| ... } ⇒ Wurk::Cron::Manager
Yields a Wurk::Cron::Manager so the host app can register periodic
jobs at boot. Manager state is shared per-process so multiple
config.periodic blocks accumulate (matches Sidekiq Ent §2.1). This is
the native replacement for the sidekiq-cron gem.
Spec: docs/target/sidekiq-ent.md §2.
274 275 276 277 278 279 |
# File 'lib/wurk/configuration.rb', line 274 def periodic require_relative 'cron' @periodic_manager ||= Wurk::Cron::Manager.new(self) yield @periodic_manager if block_given? @periodic_manager end |
#queues ⇒ Object
127 |
# File 'lib/wurk/configuration.rb', line 127 def queues = default_capsule.queues |
#queues=(val) ⇒ Object
129 130 131 |
# File 'lib/wurk/configuration.rb', line 129 def queues=(val) default_capsule.queues = val end |
#redis ⇒ Object
188 189 190 |
# File 'lib/wurk/configuration.rb', line 188 def redis(&) redis_pool.with(&) end |
#redis=(hash) ⇒ Object
--- Redis ------------------------------------------------------------
162 163 164 165 |
# File 'lib/wurk/configuration.rb', line 162 def redis=(hash) guard_frozen! @redis_config = @redis_config.merge(hash.transform_keys(&:to_sym)) end |
#redis_pool ⇒ Object
167 168 169 |
# File 'lib/wurk/configuration.rb', line 167 def redis_pool default_capsule.redis_pool end |
#register(name, instance) ⇒ Object
--- Service locator (extension registry) ----------------------------
194 195 196 197 |
# File 'lib/wurk/configuration.rb', line 194 def register(name, instance) guard_frozen! @directory[name] = instance end |
#reliable_scheduler! ⇒ Object
Pro reliable scheduler (§4): promote due jobs from retry/schedule onto
their target queue in a single atomic Lua (ZRANGEBYSCORE+ZREM+LPUSH),
closing the pop→push job-loss window of the default poller. Swaps the
pluggable scheduled_enq for the atomic promoter; idempotent.
251 252 253 254 |
# File 'lib/wurk/configuration.rb', line 251 def reliable_scheduler!(*) self[:scheduled_enq] = Wurk::Scheduled::ReliableEnq nil end |
#reset_redis_pools! ⇒ Object
Disconnect and drop every cached pool — the per-capsule mains plus the config-level internal pool. Used by Wurk::Swarm so the parent never leaks sockets into forks and each child can build fresh ones.
178 179 180 181 182 |
# File 'lib/wurk/configuration.rb', line 178 def reset_redis_pools! @capsules.each_value(&:reset_redis_pools!) @local_redis_pool&.disconnect! @local_redis_pool = nil end |
#retain_history(seconds = HISTORY_DEFAULT_INTERVAL, &block) ⇒ Object
Enables the Ent Historical Metrics snapshotter: every seconds the
cluster leader emits a statsd-shaped snapshot to the configured
dogstatsd client. With no block the default §5.2 gauge set is
published; a block receives the dogstatsd client s and collects
custom metrics instead. The Launcher starts the snapshotter only when
this has been called.
Spec: docs/target/sidekiq-ent.md §5.1.
293 294 295 296 297 298 299 300 301 |
# File 'lib/wurk/configuration.rb', line 293 def retain_history(seconds = HISTORY_DEFAULT_INTERVAL, &block) guard_frozen! interval = Float(seconds) raise ArgumentError, 'retain_history interval must be > 0' unless interval.positive? @options[:history_interval] = interval @options[:history_collector] = block nil end |
#server? ⇒ Boolean
384 385 386 |
# File 'lib/wurk/configuration.rb', line 384 def server? @options[:server] == true end |
#server_middleware {|@server_chain| ... } ⇒ Object
155 156 157 158 |
# File 'lib/wurk/configuration.rb', line 155 def server_middleware yield @server_chain if block_given? @server_chain end |
#super_fetch!(&block) ⇒ Object
Sidekiq Pro's opt-in toggles for reliable fetch and the reliable scheduler. Both are already the default in Wurk — the fetcher is always the reliable BLMOVE fetcher with orphan reclamation, and the scheduler is always atomic Lua — so the toggle itself is a no-op. They exist only so a Pro initializer drops in unchanged instead of raising NoMethodError.
The optional block is Pro's recovery callback: |jobstr, pill|, fired
once per orphan recovery (pill nil) and once on a poison kill (pill
responds to .jid/.klass/.count/.queue). The reaper drives it via
Wurk::Middleware::PoisonPill.track!. Spec: docs/target/sidekiq-pro.md §3.1.
242 243 244 245 |
# File 'lib/wurk/configuration.rb', line 242 def super_fetch!(*, &block) @super_fetch_callback = block if block nil end |
#topology ⇒ Object
Worker topology for the swarm. When the host hasn't declared one (the
railtie path), default to a single flat fork running the default
capsule's queues + concurrency. Assign a custom Wurk::Topology (via
topology=) for specialized slots.
392 393 394 |
# File 'lib/wurk/configuration.rb', line 392 def topology @topology ||= default_topology end |
#topology=(value) ⇒ Object
396 397 398 399 |
# File 'lib/wurk/configuration.rb', line 396 def topology=(value) guard_frozen! @topology = value end |
#total_concurrency ⇒ Object
133 134 135 |
# File 'lib/wurk/configuration.rb', line 133 def total_concurrency @capsules.each_value.sum(&:concurrency) end |
#web ⇒ Object
Web UI configuration: the authorization hook and read-only mode. Returns
the process-wide Wurk::Web.config singleton so config.web.read_only = true and the engine middleware share one source of truth. Lazy-requires
the web layer to keep standalone boot lean.
Spec: docs/target/sidekiq-ent.md §9.2.
315 316 317 318 |
# File 'lib/wurk/configuration.rb', line 315 def web require_relative 'web/config' Wurk::Web.config end |