Module: Wurk::Unique

Defined in:
lib/wurk/unique.rb

Overview

Sidekiq Enterprise unique jobs. Best-effort dedup at enqueue time keyed by a SHA256 digest of [class, queue, args] (overridable via sidekiq_unique_context). Three lock-release strategies:

* `unique_until: :success` (default) — lock retained through retries;
server middleware DELs it on successful perform. Surviving across
a process crash is bounded by `unique_for` TTL.
* `unique_until: :start` — server middleware DELs the lock right
*before* invoking perform; a duplicate can be enqueued while the
first is running.

A job that dies automatically (retries exhausted / discarded) releases its lock via a death handler; manual UI kills keep the lock until TTL expiry (Ent wiki, Ent-Unique-Jobs).

Wire-compat (§3.9): single-key Redis layout — unique:<sha256> STRING holding the owning JID. Scheduled jobs extend the TTL by the delay so the lock covers the entire wait+execution window (§3.4).

This is the native replacement for the sidekiq-unique-jobs gem; see the migration guide for the lock:unique_until: translation table.

Spec: docs/target/sidekiq-ent.md §3.

Examples:

Enable globally, then declare per worker

Sidekiq::Enterprise.unique!   # activate the middleware once, at boot

class ChargeJob
  include Sidekiq::Job
  sidekiq_options unique_for: 10.minutes, unique_until: :success

  # optional: customize the dedup key
  def self.sidekiq_unique_context(job)
    job["args"].first   # dedup on the first arg only
  end
end

Defined Under Namespace

Classes: ClientMiddleware, ServerMiddleware

Constant Summary collapse

KEY_PREFIX =
'unique:'
DEFAULT_UNTIL =
:success
VALID_UNTIL =
%i[success start].freeze
DEATH_HANDLER =

Ent parity: a job that dies automatically releases its lock so a duplicate can enqueue immediately. Manual API/UI kills keep the lock until TTL expiry (Ent wiki, Ent-Unique-Jobs) — they reach death handlers too (Sidekiq fires them on API kills), so we recognize the synthesized kill exception and skip the release for it. Atomic CAS-DEL via the shared Lua script mirrors ServerMiddleware#release.

lambda do |job, exception|
  next unless Wurk::Unique.enabled?
  next unless Wurk::Unique.coerce_ttl(job['unique_for'])
  next if exception.instance_of?(::RuntimeError) && exception.message == DeadSet::API_KILL_MESSAGE

  Wurk.redis { |conn| Wurk::Unique.release_if_owner(conn, Wurk::Unique.lock_key_for(job), job['jid']) }
end

Class Method Summary collapse

Class Method Details

.coerce_ttl(value) ⇒ Object

Coerce unique_for to a numeric seconds value. Accepts Integer, Numeric, ActiveSupport::Duration (any to_i-respondent), or false (skip). Returns nil when uniqueness should be skipped.



148
149
150
151
152
153
154
155
156
157
158
159
# File 'lib/wurk/unique.rb', line 148

def self.coerce_ttl(value)
  return nil if value.nil? || value == false
  # `.to_i`, not `value`: ActiveSupport::Duration overrides `is_a?(Integer)`
  # to return true (it delegates to its underlying value), so `1.hour` passes
  # this guard — returning the raw Duration handed redis-client a non-Integer
  # EX arg and raised TypeError at enqueue (#253).
  return value.to_i if value.is_a?(Integer) && value.positive?
  return value.to_i if value.is_a?(Numeric)
  return value.to_i if duration_like?(value)

  nil
end

.disable!Object

Test helper — not part of the public Sidekiq surface. Clears the flag so per-test enable!/disable! does not leak across runs.



92
93
94
95
# File 'lib/wurk/unique.rb', line 92

def disable!
  @enabled = false
  nil
end

.duration_like?(value) ⇒ Boolean

Returns:

  • (Boolean)


161
162
163
164
165
# File 'lib/wurk/unique.rb', line 161

def self.duration_like?(value)
  return false unless value.respond_to?(:to_i)

  value.respond_to?(:since) || value.class.name.to_s.include?('Duration')
end

.enable!Object

rubocop:disable Naming/PredicateMethod



84
85
86
87
88
# File 'lib/wurk/unique.rb', line 84

def enable! # rubocop:disable Naming/PredicateMethod
  @enabled = true
  register_middleware!
  true
end

.enabled?Boolean

Returns:

  • (Boolean)


80
81
82
# File 'lib/wurk/unique.rb', line 80

def enabled?
  @enabled == true
end

.lock_key(klass, queue, args) ⇒ Object

Compute the lock key for an arbitrary (queue, klass, args) triple. Used by both the client middleware and the public locked? probe so they cannot drift.



100
101
102
103
# File 'lib/wurk/unique.rb', line 100

def lock_key(klass, queue, args)
  context = [klass.to_s, queue.to_s, args]
  "#{KEY_PREFIX}#{Digest::SHA256.hexdigest(JSON.dump(context))}"
end

.lock_key_for(job) ⇒ Object

Compute the lock key from a job payload, honoring sidekiq_unique_context when the worker class is loaded and defines it.



108
109
110
111
# File 'lib/wurk/unique.rb', line 108

def lock_key_for(job)
  context = unique_context(job)
  "#{KEY_PREFIX}#{Digest::SHA256.hexdigest(JSON.dump(context))}"
end

.locked?(queue_or_klass, klass_or_args = nil, args = nil) ⇒ String?

Returns owning jid, or nil when the lock is free.

Returns:

  • (String, nil)

    owning jid, or nil when the lock is free.



172
173
174
175
176
# File 'lib/wurk/unique.rb', line 172

def self.locked?(queue_or_klass, klass_or_args = nil, args = nil)
  queue, klass, payload = normalize_locked_args(queue_or_klass, klass_or_args, args)
  key = lock_key(klass, queue, payload)
  Wurk.redis { |c| c.call('GET', key) }
end

.release_if_owner(conn, key, jid) ⇒ Object

Atomic compare-and-delete of a unique lock key. Two-command GET-then-DEL is not a real CAS — the key can expire between the GET and DEL, letting a fresh enqueue grab it, and the bare DEL would then drop the new owner's lock. Routed through a single Lua script (Wurk::Lua::RELEASE_IF_OWNER) shared by ServerMiddleware#release (normal success/start release) and DEATH_HANDLER (automatic-death release) so the two paths cannot drift.



71
72
73
# File 'lib/wurk/unique.rb', line 71

def self.release_if_owner(conn, key, jid)
  Wurk::Lua::Loader.eval_cached(conn, :release_if_owner, keys: [key], argv: [jid])
end

.unique_context(job) ⇒ Object

Default: [class, queue, args]. Workers may override by defining self.sidekiq_unique_context(job) returning any JSON-serializable value (e.g. a subset of args). Spec §3.5.



116
117
118
119
120
121
122
123
# File 'lib/wurk/unique.rb', line 116

def unique_context(job)
  klass = resolve_class(job['class'])
  if klass.respond_to?(:sidekiq_unique_context)
    klass.sidekiq_unique_context(job)
  else
    [job['class'], job['queue'], job['args']]
  end
end