Module: Redis::Commands::Streams

Included in:
Redis::Commands
Defined in:
lib/redis/commands/streams.rb

Instance Method Summary collapse

Instance Method Details

#xack(key, group, *ids) ⇒ Integer

Removes one or multiple entries from the pending entries list of a stream consumer group.

Examples:

With a entry id

redis.xack('mystream', 'mygroup', '1526569495631-0')

With splatted entry ids

redis.xack('mystream', 'mygroup', '0-1', '0-2')

With arrayed entry ids

redis.xack('mystream', 'mygroup', %w[0-1 0-2])

Parameters:

  • key (String)

    the stream key

  • group (String)

    the consumer group name

  • ids (Array<String>)

    one or multiple entry ids

Returns:

  • (Integer)

    the number of entries successfully acknowledged


251
252
253
254
# File 'lib/redis/commands/streams.rb', line 251

def xack(key, group, *ids)
  args = [:xack, key, group].concat(ids.flatten)
  send_command(args)
end

#xadd(key, entry, approximate: nil, maxlen: nil, id: '*') ⇒ String

Add new entry to the stream.

Examples:

Without options

redis.xadd('mystream', f1: 'v1', f2: 'v2')

With options

redis.xadd('mystream', { f1: 'v1', f2: 'v2' }, id: '0-0', maxlen: 1000, approximate: true)

Parameters:

  • key (String)

    the stream key

  • entry (Hash)

    one or multiple field-value pairs

  • opts (Hash)

    several options for XADD command

Returns:

  • (String)

    the entry id


51
52
53
54
55
56
57
58
59
60
61
# File 'lib/redis/commands/streams.rb', line 51

def xadd(key, entry, approximate: nil, maxlen: nil, id: '*')
  args = [:xadd, key]
  if maxlen
    args << "MAXLEN"
    args << "~" if approximate
    args << maxlen
  end
  args << id
  args.concat(entry.to_a.flatten)
  send_command(args)
end

#xautoclaim(key, group, consumer, min_idle_time, start, count: nil, justid: false) ⇒ Hash{String => Hash}, Array<String>

Transfers ownership of pending stream entries that match the specified criteria.

Examples:

Claim next pending message stuck > 5 minutes and mark as retry

redis.xautoclaim('mystream', 'mygroup', 'consumer1', 3600000, '0-0')

Claim 50 next pending messages stuck > 5 minutes and mark as retry

redis.xclaim('mystream', 'mygroup', 'consumer1', 3600000, '0-0', count: 50)

Claim next pending message stuck > 5 minutes and don't mark as retry

redis.xclaim('mystream', 'mygroup', 'consumer1', 3600000, '0-0', justid: true)

Claim next pending message after this id stuck > 5 minutes and mark as retry

redis.xautoclaim('mystream', 'mygroup', 'consumer1', 3600000, '1641321233-0')

Parameters:

  • key (String)

    the stream key

  • group (String)

    the consumer group name

  • consumer (String)

    the consumer name

  • min_idle_time (Integer)

    the number of milliseconds

  • start (String)

    entry id to start scanning from or 0-0 for everything

  • count (Integer) (defaults to: nil)

    number of messages to claim (default 1)

  • justid (Boolean) (defaults to: false)

    whether to fetch just an array of entry ids or not. Does not increment retry count when true

Returns:

  • (Hash{String => Hash})

    the entries successfully claimed

  • (Array<String>)

    the entry ids successfully claimed if justid option is true


321
322
323
324
325
326
327
328
329
# File 'lib/redis/commands/streams.rb', line 321

def xautoclaim(key, group, consumer, min_idle_time, start, count: nil, justid: false)
  args = [:xautoclaim, key, group, consumer, min_idle_time, start]
  if count
    args << 'COUNT' << count.to_s
  end
  args << 'JUSTID' if justid
  blk = justid ? HashifyStreamAutoclaimJustId : HashifyStreamAutoclaim
  send_command(args, &blk)
end

#xclaim(key, group, consumer, min_idle_time, *ids, **opts) ⇒ Hash{String => Hash}, Array<String>

Changes the ownership of a pending entry

Examples:

With splatted entry ids

redis.xclaim('mystream', 'mygroup', 'consumer1', 3600000, '0-1', '0-2')

With arrayed entry ids

redis.xclaim('mystream', 'mygroup', 'consumer1', 3600000, %w[0-1 0-2])

With idle option

redis.xclaim('mystream', 'mygroup', 'consumer1', 3600000, %w[0-1 0-2], idle: 1000)

With time option

redis.xclaim('mystream', 'mygroup', 'consumer1', 3600000, %w[0-1 0-2], time: 1542866959000)

With retrycount option

redis.xclaim('mystream', 'mygroup', 'consumer1', 3600000, %w[0-1 0-2], retrycount: 10)

With force option

redis.xclaim('mystream', 'mygroup', 'consumer1', 3600000, %w[0-1 0-2], force: true)

With justid option

redis.xclaim('mystream', 'mygroup', 'consumer1', 3600000, %w[0-1 0-2], justid: true)

Parameters:

  • key (String)

    the stream key

  • group (String)

    the consumer group name

  • consumer (String)

    the consumer name

  • min_idle_time (Integer)

    the number of milliseconds

  • ids (Array<String>)

    one or multiple entry ids

  • opts (Hash)

    several options for XCLAIM command

Options Hash (**opts):

  • :idle (Integer)

    the number of milliseconds as last time it was delivered of the entry

  • :time (Integer)

    the number of milliseconds as a specific Unix Epoch time

  • :retrycount (Integer)

    the number of retry counter

  • :force (Boolean)

    whether to create the pending entry to the pending entries list or not

  • :justid (Boolean)

    whether to fetch just an array of entry ids or not

Returns:

  • (Hash{String => Hash})

    the entries successfully claimed

  • (Array<String>)

    the entry ids successfully claimed if justid option is true


288
289
290
291
292
293
294
295
296
297
# File 'lib/redis/commands/streams.rb', line 288

def xclaim(key, group, consumer, min_idle_time, *ids, **opts)
  args = [:xclaim, key, group, consumer, min_idle_time].concat(ids.flatten)
  args.concat(['IDLE',       opts[:idle].to_i])  if opts[:idle]
  args.concat(['TIME',       opts[:time].to_i])  if opts[:time]
  args.concat(['RETRYCOUNT', opts[:retrycount]]) if opts[:retrycount]
  args << 'FORCE'                                if opts[:force]
  args << 'JUSTID'                               if opts[:justid]
  blk = opts[:justid] ? Noop : HashifyStreamEntries
  send_command(args, &blk)
end

#xdel(key, *ids) ⇒ Integer

Delete entries by entry ids.

Examples:

With splatted entry ids

redis.xdel('mystream', '0-1', '0-2')

With arrayed entry ids

redis.xdel('mystream', ['0-1', '0-2'])

Parameters:

  • key (String)

    the stream key

  • ids (Array<String>)

    one or multiple entry ids

Returns:

  • (Integer)

    the number of entries actually deleted


91
92
93
94
# File 'lib/redis/commands/streams.rb', line 91

def xdel(key, *ids)
  args = [:xdel, key].concat(ids.flatten)
  send_command(args)
end

#xgroup(subcommand, key, group, id_or_consumer = nil, mkstream: false) ⇒ String, Integer

Manages the consumer group of the stream.

Examples:

With create subcommand

redis.xgroup(:create, 'mystream', 'mygroup', '$')

With setid subcommand

redis.xgroup(:setid, 'mystream', 'mygroup', '$')

With destroy subcommand

redis.xgroup(:destroy, 'mystream', 'mygroup')

With delconsumer subcommand

redis.xgroup(:delconsumer, 'mystream', 'mygroup', 'consumer1')

Parameters:

  • subcommand (String)

    create setid destroy delconsumer

  • key (String)

    the stream key

  • group (String)

    the consumer group name

  • id_or_consumer (String) (defaults to: nil)
    • the entry id or $, required if subcommand is create or setid
    • the consumer name, required if subcommand is delconsumer
  • mkstream (Boolean) (defaults to: false)

    whether to create an empty stream automatically or not

Returns:

  • (String)

    OK if subcommand is create or setid

  • (Integer)

    effected count if subcommand is destroy or delconsumer


199
200
201
202
# File 'lib/redis/commands/streams.rb', line 199

def xgroup(subcommand, key, group, id_or_consumer = nil, mkstream: false)
  args = [:xgroup, subcommand, key, group, id_or_consumer, (mkstream ? 'MKSTREAM' : nil)].compact
  send_command(args)
end

#xinfo(subcommand, key, group = nil) ⇒ Hash+

Returns the stream information each subcommand.

Examples:

stream

redis.xinfo(:stream, 'mystream')

groups

redis.xinfo(:groups, 'mystream')

consumers

redis.xinfo(:consumers, 'mystream', 'mygroup')

Parameters:

  • subcommand (String)

    e.g. stream groups consumers

  • key (String)

    the stream key

  • group (String) (defaults to: nil)

    the consumer group name, required if subcommand is consumers

Returns:

  • (Hash)

    information of the stream if subcommand is stream

  • (Array<Hash>)

    information of the consumer groups if subcommand is groups

  • (Array<Hash>)

    information of the consumers if subcommand is consumers


22
23
24
25
26
27
28
29
30
31
32
33
# File 'lib/redis/commands/streams.rb', line 22

def xinfo(subcommand, key, group = nil)
  args = [:xinfo, subcommand, key, group].compact
  synchronize do |client|
    client.call(args) do |reply|
      case subcommand.to_s.downcase
      when 'stream'              then Hashify.call(reply)
      when 'groups', 'consumers' then reply.map { |arr| Hashify.call(arr) }
      else reply
      end
    end
  end
end

#xlen(key) ⇒ Integer

Returns the number of entries inside a stream.

Examples:

With key

redis.xlen('mystream')

Parameters:

  • key (String)

    the stream key

Returns:

  • (Integer)

    the number of entries


150
151
152
# File 'lib/redis/commands/streams.rb', line 150

def xlen(key)
  send_command([:xlen, key])
end

#xpending(key, group, *args) ⇒ Hash+

Fetches not acknowledging pending entries

Examples:

With key and group

redis.xpending('mystream', 'mygroup')

With range options

redis.xpending('mystream', 'mygroup', '-', '+', 10)

With range and consumer options

redis.xpending('mystream', 'mygroup', '-', '+', 10, 'consumer1')

Parameters:

  • key (String)

    the stream key

  • group (String)

    the consumer group name

  • start (String)

    start first entry id of range

  • end (String)

    end last entry id of range

  • count (Integer)

    count the number of entries as limit

  • consumer (String)

    the consumer name

Returns:

  • (Hash)

    the summary of pending entries

  • (Array<Hash>)

    the pending entries details if options were specified


349
350
351
352
353
354
355
356
357
358
359
360
361
# File 'lib/redis/commands/streams.rb', line 349

def xpending(key, group, *args)
  command_args = [:xpending, key, group]
  case args.size
  when 0, 3, 4
    command_args.concat(args)
  else
    raise ArgumentError, "wrong number of arguments (given #{args.size + 2}, expected 2, 5 or 6)"
  end

  summary_needed = args.empty?
  blk = summary_needed ? HashifyStreamPendings : HashifyStreamPendingDetails
  send_command(command_args, &blk)
end

#xrange(key, start = '-', range_end = '+', count: nil) ⇒ Array<Array<String, Hash>>

Fetches entries of the stream in ascending order.

Examples:

Without options

redis.xrange('mystream')

With a specific start

redis.xrange('mystream', '0-1')

With a specific start and end

redis.xrange('mystream', '0-1', '0-3')

With count options

redis.xrange('mystream', count: 10)

Parameters:

  • key (String)

    the stream key

  • start (String) (defaults to: '-')

    first entry id of range, default value is -

  • end (String)

    last entry id of range, default value is +

  • count (Integer) (defaults to: nil)

    the number of entries as limit

Returns:

  • (Array<Array<String, Hash>>)

    the ids and entries pairs


113
114
115
116
117
# File 'lib/redis/commands/streams.rb', line 113

def xrange(key, start = '-', range_end = '+', count: nil)
  args = [:xrange, key, start, range_end]
  args.concat(['COUNT', count]) if count
  synchronize { |client| client.call(args, &HashifyStreamEntries) }
end

#xread(keys, ids, count: nil, block: nil) ⇒ Hash{String => Hash{String => Hash}}

Fetches entries from one or multiple streams. Optionally blocking.

Examples:

With a key

redis.xread('mystream', '0-0')

With multiple keys

redis.xread(%w[mystream1 mystream2], %w[0-0 0-0])

With count option

redis.xread('mystream', '0-0', count: 2)

With block option

redis.xread('mystream', '$', block: 1000)

Parameters:

  • keys (Array<String>)

    one or multiple stream keys

  • ids (Array<String>)

    one or multiple entry ids

  • count (Integer) (defaults to: nil)

    the number of entries as limit per stream

  • block (Integer) (defaults to: nil)

    the number of milliseconds as blocking timeout

Returns:

  • (Hash{String => Hash{String => Hash}})

    the entries


171
172
173
174
175
176
# File 'lib/redis/commands/streams.rb', line 171

def xread(keys, ids, count: nil, block: nil)
  args = [:xread]
  args << 'COUNT' << count if count
  args << 'BLOCK' << block.to_i if block
  _xread(args, keys, ids, block)
end

#xreadgroup(group, consumer, keys, ids, count: nil, block: nil, noack: nil) ⇒ Hash{String => Hash{String => Hash}}

Fetches a subset of the entries from one or multiple streams related with the consumer group. Optionally blocking.

Examples:

With a key

redis.xreadgroup('mygroup', 'consumer1', 'mystream', '>')

With multiple keys

redis.xreadgroup('mygroup', 'consumer1', %w[mystream1 mystream2], %w[> >])

With count option

redis.xreadgroup('mygroup', 'consumer1', 'mystream', '>', count: 2)

With block option

redis.xreadgroup('mygroup', 'consumer1', 'mystream', '>', block: 1000)

With noack option

redis.xreadgroup('mygroup', 'consumer1', 'mystream', '>', noack: true)

Parameters:

  • group (String)

    the consumer group name

  • consumer (String)

    the consumer name

  • keys (Array<String>)

    one or multiple stream keys

  • ids (Array<String>)

    one or multiple entry ids

  • opts (Hash)

    several options for XREADGROUP command

Returns:

  • (Hash{String => Hash{String => Hash}})

    the entries


229
230
231
232
233
234
235
# File 'lib/redis/commands/streams.rb', line 229

def xreadgroup(group, consumer, keys, ids, count: nil, block: nil, noack: nil)
  args = [:xreadgroup, 'GROUP', group, consumer]
  args << 'COUNT' << count if count
  args << 'BLOCK' << block.to_i if block
  args << 'NOACK' if noack
  _xread(args, keys, ids, block)
end

#xrevrange(key, range_end = '+', start = '-', count: nil) ⇒ Array<Array<String, Hash>>

Fetches entries of the stream in descending order.

Examples:

Without options

redis.xrevrange('mystream')

With a specific end

redis.xrevrange('mystream', '0-3')

With a specific end and start

redis.xrevrange('mystream', '0-3', '0-1')

With count options

redis.xrevrange('mystream', count: 10)

Parameters:

  • key (String)

    the stream key

  • end (String)

    first entry id of range, default value is +

  • start (String) (defaults to: '-')

    last entry id of range, default value is -

Returns:

  • (Array<Array<String, Hash>>)

    the ids and entries pairs


136
137
138
139
140
# File 'lib/redis/commands/streams.rb', line 136

def xrevrange(key, range_end = '+', start = '-', count: nil)
  args = [:xrevrange, key, range_end, start]
  args.concat(['COUNT', count]) if count
  send_command(args, &HashifyStreamEntries)
end

#xtrim(key, maxlen, approximate: false) ⇒ Integer

Trims older entries of the stream if needed.

Examples:

Without options

redis.xtrim('mystream', 1000)

With options

redis.xtrim('mystream', 1000, approximate: true)

Parameters:

  • key (String)

    the stream key

  • mexlen (Integer)

    max length of entries

  • approximate (Boolean) (defaults to: false)

    whether to add ~ modifier of maxlen or not

Returns:

  • (Integer)

    the number of entries actually deleted


75
76
77
78
# File 'lib/redis/commands/streams.rb', line 75

def xtrim(key, maxlen, approximate: false)
  args = [:xtrim, key, 'MAXLEN', (approximate ? '~' : nil), maxlen].compact
  send_command(args)
end