Class: Chewy::Strategy::DelayedSidekiq::Worker
- Inherits:
-
Object
- Object
- Chewy::Strategy::DelayedSidekiq::Worker
- Includes:
- Sidekiq::Worker
- Defined in:
- lib/chewy/strategy/delayed_sidekiq/worker.rb
Constant Summary collapse
- LUA_SCRIPT =
"local type = ARGV[1]\nlocal score = tonumber(ARGV[2])\nlocal prefix = ARGV[3]\nlocal timechunks_key = prefix .. \":\" .. type .. \":timechunks\"\n\n-- Get timechunk_keys with scores less than or equal to the specified score\nlocal timechunk_keys = redis.call('zrangebyscore', timechunks_key, '-inf', score)\n\n-- Get all members from the sets associated with the timechunk_keys\nlocal members = {}\nfor _, timechunk_key in ipairs(timechunk_keys) do\n local set_members = redis.call('smembers', timechunk_key)\n for _, member in ipairs(set_members) do\n table.insert(members, member)\n end\nend\n\n-- Remove timechunk_keys and their associated sets\nfor _, timechunk_key in ipairs(timechunk_keys) do\n redis.call('del', timechunk_key)\nend\n\n-- Remove timechunks with scores less than or equal to the specified score\nredis.call('zremrangebyscore', timechunks_key, '-inf', score)\n\nreturn members\n"
Instance Method Summary collapse
Instance Method Details
#perform(type, score, options = {}) ⇒ Object
38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 |
# File 'lib/chewy/strategy/delayed_sidekiq/worker.rb', line 38 def perform(type, score, = {}) [:refresh] = !Chewy.disable_refresh_async if Chewy.disable_refresh_async ::Sidekiq.redis do |redis| members = redis.eval(LUA_SCRIPT, keys: [], argv: [type, score, Scheduler::KEY_PREFIX]) # extract ids and fields & do the reset of records ids, fields = extract_ids_and_fields(members) [:update_fields] = fields if fields index = type.constantize index.strategy_config.delayed_sidekiq.reindex_wrapper.call do .any? ? index.import!(ids, **) : index.import!(ids) end end end |