Module: Wurk::Limiter

Defined in:
lib/wurk/limiter.rb,
lib/wurk/limiter/base.rb,
lib/wurk/limiter/leaky.rb,
lib/wurk/limiter/bucket.rb,
lib/wurk/limiter/points.rb,
lib/wurk/limiter/window.rb,
lib/wurk/limiter/unlimited.rb,
lib/wurk/limiter/concurrent.rb,
lib/wurk/limiter/server_middleware.rb

Overview

Sidekiq Enterprise rate limiters: concurrent, bucket, window, leaky, points, unlimited. Lua-backed; all timing inside Lua is from TIME so clock skew across hosts doesn't matter inside one Redis. Spec: docs/target/sidekiq-ent.md §1.

Layout (one file per type under lib/wurk/limiter/):

* `Limiter::Base` owns the metadata write (lmtr:{name}) + the global
`lmtr-list` registration so the Web UI can list every limiter, and
the uniform `status` shape.
* Per-type subclasses (Concurrent / Bucket / Window / Leaky / Points)
own their acquire/wait loop. Each delegates the atomic step to a
Lua script in `lib/wurk/lua/limiter_*.lua`.
* `Unlimited` is a no-op stub for tests and the `unlimited(*)`
constructor — same `within_limit` surface, never raises.
* `ServerMiddleware` catches OverLimit, reschedules, and applies the
poison brake.

Wire-compat: every key uses the lmtr-...: prefix family from §1.7 and the limiter is added to the shared lmtr-list SET.

Examples:

Throttle to 50 concurrent uses, waiting up to the default timeout

STRIPE = Sidekiq::Limiter.concurrent("stripe", 50)

class ChargeJob
  include Sidekiq::Job
  def perform(id)
    STRIPE.within_limit { Stripe::Charge.create(...) }
  end
end

Defined Under Namespace

Classes: Base, Bucket, Concurrent, Config, Leaky, OverLimit, Points, ServerMiddleware, Unlimited, Window

Constant Summary collapse

DEFAULT_TTL =
90 * 24 * 3600
DEFAULT_WAIT_TIMEOUT =
5
DEFAULT_LOCK_TIMEOUT =
30
DEFAULT_RESCHEDULE =
20
DEFAULT_BACKOFF =
lambda do |_limiter, job, _exc|
  overrated = job.is_a?(Hash) ? job.fetch('overrated', 0).to_i : 0
  (300 * overrated) + rand(300) + 1
end
NAME_PATTERN =
/\A[\w\-:.\#@]+\z/
LIST_KEY =
'lmtr-list'
INTERVAL_UNITS =

:second :minute :hour :day symbols → seconds. Window also accepts a raw Integer; bucket does not (boundary semantics require a unit).

{
  second: 1,
  minute: 60,
  hour: 3600,
  day: 86_400
}.freeze
TYPE_CLASSES =

Type string (as stored in the lmtr:{name} meta hash) → subclass. Drives build for dashboard introspection.

{
  'concurrent' => 'Concurrent',
  'bucket' => 'Bucket',
  'window' => 'Window',
  'leaky' => 'Leaky',
  'points' => 'Points'
}.freeze

Class Method Summary collapse

Class Method Details

.bucket(name, count, interval, wait_timeout: DEFAULT_WAIT_TIMEOUT, backoff: nil, ttl: DEFAULT_TTL, reschedule: DEFAULT_RESCHEDULE) ⇒ Object



176
177
178
179
180
181
182
183
184
185
# File 'lib/wurk/limiter.rb', line 176

def bucket(name, count, interval, wait_timeout: DEFAULT_WAIT_TIMEOUT, backoff: nil,
           ttl: DEFAULT_TTL, reschedule: DEFAULT_RESCHEDULE)
  Bucket.new(name,
             count: count,
             interval: interval,
             wait_timeout: wait_timeout,
             backoff: backoff,
             ttl: ttl,
             reschedule: reschedule)
end

.build(name, type, options, register: false) ⇒ Object

Reconstruct a limiter from its persisted metadata for read-only introspection (the dashboard status column). register: false keeps the GET side-effect-free. Returns nil for an unknown type.



222
223
224
225
226
227
228
229
# File 'lib/wurk/limiter.rb', line 222

def build(name, type, options, register: false)
  return Unlimited.new if type.to_s == 'unlimited'

  klass_name = TYPE_CLASSES[type.to_s]
  return nil unless klass_name

  const_get(klass_name).new(name, register: register, **coerce_build_options(options))
end

.concurrent(name, limit, wait_timeout: DEFAULT_WAIT_TIMEOUT, lock_timeout: DEFAULT_LOCK_TIMEOUT, policy: :raise, backoff: nil, ttl: DEFAULT_TTL) ⇒ Object



165
166
167
168
169
170
171
172
173
174
# File 'lib/wurk/limiter.rb', line 165

def concurrent(name, limit, wait_timeout: DEFAULT_WAIT_TIMEOUT, lock_timeout: DEFAULT_LOCK_TIMEOUT,
               policy: :raise, backoff: nil, ttl: DEFAULT_TTL)
  Concurrent.new(name,
                 limit: limit,
                 wait_timeout: wait_timeout,
                 lock_timeout: lock_timeout,
                 policy: policy,
                 backoff: backoff,
                 ttl: ttl)
end

.configObject



134
135
136
# File 'lib/wurk/limiter.rb', line 134

def config
  @config ||= Config.new
end

.configure {|config| ... } ⇒ Object

Yields:



130
131
132
# File 'lib/wurk/limiter.rb', line 130

def configure
  yield config
end

.first_score(row) ⇒ Object

ZRANGE key 0 0 WITHSCORES yields a single [member, score] pair, but the shape depends on the protocol: RESP3 (redis-client's default vs Redis >= 7) nests it as [[member, score]]; RESP2 returns a flat [member, score]. Return the score as a Float across both, or nil when the set is empty. (The old flat-only row[1] silently collapsed to 0.0 under RESP3.)



158
159
160
161
162
163
# File 'lib/wurk/limiter.rb', line 158

def first_score(row)
  pair = row.first
  return nil if pair.nil?

  (pair.is_a?(Array) ? pair.last : row[1]).to_f
end

.interval_seconds(interval, allow_integer:) ⇒ Object



231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
# File 'lib/wurk/limiter.rb', line 231

def interval_seconds(interval, allow_integer:)
  interval = interval.to_sym if interval.is_a?(String) && INTERVAL_UNITS.key?(interval.to_sym)
  case interval
  when Symbol
    INTERVAL_UNITS.fetch(interval) do
      raise ArgumentError, "interval must be one of #{INTERVAL_UNITS.keys.inspect} (got #{interval.inspect})"
    end
  when Integer
    unless allow_integer
      raise ArgumentError, "interval must be a Symbol (got Integer); use #{INTERVAL_UNITS.keys.inspect}"
    end

    interval
  else
    raise ArgumentError, "interval must be Symbol or Integer (got #{interval.class})"
  end
end

.leaky(name, bucket_size, drain, wait_timeout: DEFAULT_WAIT_TIMEOUT, backoff: nil, ttl: DEFAULT_TTL) ⇒ Object



198
199
200
201
202
203
204
205
# File 'lib/wurk/limiter.rb', line 198

def leaky(name, bucket_size, drain, wait_timeout: DEFAULT_WAIT_TIMEOUT, backoff: nil, ttl: DEFAULT_TTL)
  Leaky.new(name,
            bucket_size: bucket_size,
            drain: drain,
            wait_timeout: wait_timeout,
            backoff: backoff,
            ttl: ttl)
end

.points(name, initial_points, refill_per_second, backoff: nil, ttl: DEFAULT_TTL) ⇒ Object



207
208
209
210
211
212
213
# File 'lib/wurk/limiter.rb', line 207

def points(name, initial_points, refill_per_second, backoff: nil, ttl: DEFAULT_TTL)
  Points.new(name,
             initial: initial_points,
             refill: refill_per_second,
             backoff: backoff,
             ttl: ttl)
end

.redisObject

Redis access: caller-supplied pool (Limiter.configure.redis = …) wins, else fall back to the default Wurk pool. This is the same hierarchy Sidekiq Ent documents — dedicated rate-limiter pool is opt-in.



148
149
150
151
# File 'lib/wurk/limiter.rb', line 148

def redis(&)
  pool = config.pool || Wurk.redis_pool
  pool.with(&)
end

.reset_config!Object

Test helper: blow away config + cached pool so a test that mutates config.backoff doesn't leak into the next one. Not part of the public Sidekiq surface.



141
142
143
# File 'lib/wurk/limiter.rb', line 141

def reset_config!
  @config = nil
end

.unlimited(*_args, **_opts) ⇒ Object



215
216
217
# File 'lib/wurk/limiter.rb', line 215

def unlimited(*_args, **_opts)
  Unlimited.new
end

.window(name, count, interval, wait_timeout: DEFAULT_WAIT_TIMEOUT, backoff: nil, ttl: DEFAULT_TTL, reschedule: DEFAULT_RESCHEDULE) ⇒ Object



187
188
189
190
191
192
193
194
195
196
# File 'lib/wurk/limiter.rb', line 187

def window(name, count, interval, wait_timeout: DEFAULT_WAIT_TIMEOUT, backoff: nil,
           ttl: DEFAULT_TTL, reschedule: DEFAULT_RESCHEDULE)
  Window.new(name,
             count: count,
             interval: interval,
             wait_timeout: wait_timeout,
             backoff: backoff,
             ttl: ttl,
             reschedule: reschedule)
end