Class: Riak::Client

Inherits:
Object show all
Includes:
Util::Escape, Util::Translation
Defined in:
lib/riak/client.rb,
lib/riak/client/node.rb,
lib/riak/client/search.rb,
lib/riak/client/yokozuna.rb,
lib/riak/client/decaying.rb,
lib/riak/client/http_backend.rb,
lib/riak/client/excon_backend.rb,
lib/riak/client/net_http_backend.rb,
lib/riak/client/feature_detection.rb,
lib/riak/client/beefcake/messages.rb,
lib/riak/client/protobuffs_backend.rb,
lib/riak/client/beefcake/message_codes.rb,
lib/riak/client/beefcake/object_methods.rb,
lib/riak/client/beefcake/message_overlay.rb,
lib/riak/client/http_backend/key_streamer.rb,
lib/riak/client/http_backend/configuration.rb,
lib/riak/client/beefcake_protobuffs_backend.rb,
lib/riak/client/http_backend/object_methods.rb,
lib/riak/client/http_backend/bucket_streamer.rb,
lib/riak/client/http_backend/request_headers.rb,
lib/riak/client/http_backend/transport_methods.rb,
lib/riak/client/http_backend/chunked_json_streamer.rb

Overview

A client connection to Riak.

Defined Under Namespace

Modules: BeefcakeMessageCodes, FeatureDetection Classes: BeefcakeProtobuffsBackend, Decaying, ExconBackend, HTTPBackend, NetHTTPBackend, Node, ProtobuffsBackend

Constant Summary

MAX_CLIENT_ID =

When using integer client IDs, the exclusive upper-bound of valid values.

4294967296
PROTOCOLS =

Array of valid protocols

%w[http https pbc]
HOST_REGEX =

Regexp for validating hostnames, lifted from uri.rb in Ruby 1.8.6

/^(?:(?:(?:[a-zA-Z\d](?:[-a-zA-Z\d]*[a-zA-Z\d])?)\.)*(?:[a-zA-Z](?:[-a-zA-Z\d]*[a-zA-Z\d])?)\.?|\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}|\[(?:(?:[a-fA-F\d]{1,4}:)*(?:[a-fA-F\d]{1,4}|\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3})|(?:(?:[a-fA-F\d]{1,4}:)*[a-fA-F\d]{1,4})?::(?:(?:[a-fA-F\d]{1,4}:)*(?:[a-fA-F\d]{1,4}|\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}))?)\])$/n
VALID_OPTIONS =

Valid constructor options.

[:protocol, :nodes, :client_id, :http_backend, :protobuffs_backend] | Node::VALID_OPTIONS
NETWORK_ERRORS =

Network errors.

[
  EOFError,
  Errno::ECONNABORTED,
  Errno::ECONNREFUSED,
  Errno::ECONNRESET,
  Errno::ENETDOWN,
  Errno::ENETRESET,
  Errno::ENETUNREACH,
  SocketError,
  SystemCallError,
]
Pool =
::Innertube::Pool

Instance Attribute Summary (collapse)

Instance Method Summary (collapse)

Methods included from Util::Escape

#escape, #maybe_escape, #maybe_unescape, #unescape

Methods included from Util::Translation

#i18n_scope, #t

Constructor Details

- (Client) initialize(options = {})

Creates a client connection to Riak

Parameters:

  • options (Hash) (defaults to: {})

    configuration options for the client

Options Hash (options):

  • :nodes (Array)

    A list of nodes this client connects to. Each element of the list is a hash which is passed to Node.new, e.g. '127.0.0.1', pb_port: 1234, …. If no nodes are given, a single node is constructed from the remaining options given to Client.new.

  • :host (String) — default: '127.0.0.1'

    The host or IP address for the Riak endpoint

  • :protocol (String) — default: 'http'

    The protocol to use for connecting to a node backend

  • :http_port (Fixnum) — default: 8098

    The port of the Riak HTTP endpoint

  • :pb_port (Fixnum) — default: 8087

    The port of the Riak Protocol Buffers endpoint

  • :prefix (String) — default: '/riak/'

    The URL path prefix to the main HTTP endpoint

  • :mapred (String) — default: '/mapred'

    The path to the map-reduce HTTP endpoint

  • :client_id (Fixnum, String) — default: rand(MAX_CLIENT_ID)

    The internal client ID used by Riak to route responses

  • :http_backend (String, Symbol) — default: :NetHTTP

    which HTTP backend to use

  • :protobuffs_backend (String, Symbol) — default: :Beefcake

    which Protocol Buffers backend to use

  • :ssl (Boolean, Hash) — default: nil

    The SSL options to pass to each node or true for default options

Raises:

  • (ArgumentError)

    raised if any invalid options are given



98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
# File 'lib/riak/client.rb', line 98

def initialize(options={})
  if options.include? :port
    warn(t('deprecated.port', :backtrace => caller[0..2].join("\n    ")))
  end

  unless (evil = options.keys - VALID_OPTIONS).empty?
    raise ArgumentError, "#{evil.inspect} are not valid options for Client.new"
  end

  @nodes = (options[:nodes] || []).map do |n|
    Client::Node.new self, n
  end
  if @nodes.empty? or options[:host] or options[:http_port] or options[:pb_port]
    @nodes |= [Client::Node.new(self, options)]
  end

  @protobuffs_pool = Pool.new(
                              method(:new_protobuffs_backend),
                              lambda { |b| b.teardown }
                              )

  @http_pool = Pool.new(
                        method(:new_http_backend),
                        lambda { |b| b.teardown }
                        )

  self.protocol           = options[:protocol]           || "http"
  self.http_backend       = options[:http_backend]       || :NetHTTP
  self.protobuffs_backend = options[:protobuffs_backend] || :Beefcake
  self.client_id          = options[:client_id]          if options[:client_id]
  self.ssl                = options[:ssl]                if options[:ssl]
  self.multiget_threads   = options[:multiget_threads]
end

Instance Attribute Details

- (String) client_id

Returns The internal client ID used by Riak to route responses

Returns:

  • (String)

    The internal client ID used by Riak to route responses



63
64
65
# File 'lib/riak/client.rb', line 63

def client_id
  @client_id
end

- (Symbol) http_backend

Returns The HTTP backend/client to use

Returns:

  • (Symbol)

    The HTTP backend/client to use



66
67
68
# File 'lib/riak/client.rb', line 66

def http_backend
  @http_backend
end

- (Client::Pool) http_pool (readonly)

Returns A pool of HTTP connections

Returns:



69
70
71
# File 'lib/riak/client.rb', line 69

def http_pool
  @http_pool
end

- (Integer) multiget_threads

Returns The number of threads for multiget requests

Returns:

  • (Integer)

    The number of threads for multiget requests



78
79
80
# File 'lib/riak/client.rb', line 78

def multiget_threads
  @multiget_threads
end

- (Array) nodes

Returns The set of Nodes this client can communicate with.

Returns:

  • (Array)

    The set of Nodes this client can communicate with.



60
61
62
# File 'lib/riak/client.rb', line 60

def nodes
  @nodes
end

- (Symbol) protobuffs_backend

Returns The Protocol Buffers backend/client to use

Returns:

  • (Symbol)

    The Protocol Buffers backend/client to use



72
73
74
# File 'lib/riak/client.rb', line 72

def protobuffs_backend
  @protobuffs_backend
end

- (Client::Pool) protobuffs_pool (readonly)

Returns A pool of protobuffs connections

Returns:



75
76
77
# File 'lib/riak/client.rb', line 75

def protobuffs_pool
  @protobuffs_pool
end

- (String) protocol

Returns The protocol to use for the Riak endpoint

Returns:

  • (String)

    The protocol to use for the Riak endpoint



57
58
59
# File 'lib/riak/client.rb', line 57

def protocol
  @protocol
end

Instance Method Details

- (Object) backend {|HTTPBackend, ProtobuffsBackend| ... }

Yields a backend for operations that are protocol-independent. You can change which type of backend is used by setting the #protocol.

Yields:



136
137
138
139
140
141
142
143
# File 'lib/riak/client.rb', line 136

def backend(&block)
  case @protocol.to_s
  when /https?/i
    http &block
  when /pbc/i
    protobuffs &block
  end
end

- (Object) basic_auth=(auth)

Sets basic HTTP auth on all nodes.



146
147
148
149
150
151
# File 'lib/riak/client.rb', line 146

def basic_auth=(auth)
  @nodes.each do |node|
    node.basic_auth = auth
  end
  auth
end

- (Bucket) bucket(name, options = {}) Also known as: []

Retrieves a bucket from Riak.

Parameters:

  • bucket (String)

    the bucket to retrieve

  • options (Hash) (defaults to: {})

    options for retrieving the bucket

Options Hash (options):

  • :props (Boolean) — default: false

    whether to retreive the bucket properties

Returns:

  • (Bucket)

    the requested bucket

Raises:

  • (ArgumentError)


158
159
160
161
162
163
164
165
166
167
# File 'lib/riak/client.rb', line 158

def bucket(name, options={})
  raise ArgumentError, t('zero_length_bucket') if name == ''
  unless (options.keys - [:props]).empty?
    raise ArgumentError, "invalid options"
  end
  @bucket_cache ||= {}
  (@bucket_cache[name] ||= Bucket.new(self, name)).tap do |b|
    b.props if options[:props]
  end
end

- (Array<Bucket>) buckets(options = {}, &block) Also known as: list_buckets

Note:

This is an expensive operation and should be used only in development.

Lists buckets which have keys stored in them.

Returns:



174
175
176
177
178
179
180
181
182
# File 'lib/riak/client.rb', line 174

def buckets(options={}, &block)
  warn(t('list_buckets', :backtrace => caller.join("\n    "))) unless Riak.disable_list_keys_warnings

  return ListBuckets.new self, options, block if block_given?

  backend do |b|
    b.list_buckets(options).map {|name| Bucket.new(self, name) }
  end
end

- (Object) choose_node(nodes = self.nodes)

Choose a node from a set.



186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
# File 'lib/riak/client.rb', line 186

def choose_node(nodes = self.nodes)
  # Prefer nodes which have gone a reasonable time without errors.
  s = nodes.select do |node|
    node.error_rate.value < 0.1
  end

  if s.empty?
    # Fall back to minimally broken node.
    nodes.min_by do |node|
      node.error_rate.value
    end
  else
    s[rand(s.size)]
  end
end

- (Object) clear_bucket_props(bucket)

Clears the properties on a bucket. See Bucket#clear_props



473
474
475
476
477
# File 'lib/riak/client.rb', line 473

def clear_bucket_props(bucket)
  http do |b|
    b.clear_bucket_props(bucket)
  end
end

- (Object) create_search_index(name, schema = nil)

Raises:

  • (ArgumentError)


3
4
5
6
7
8
9
# File 'lib/riak/client/yokozuna.rb', line 3

def create_search_index(name, schema=nil)
  raise ArgumentError, t("zero_length_index") if name.nil? || name.empty?
  backend do |b|
    b.create_search_index(name, schema)
  end
  true
end

- (Object) create_search_schema(name, content)

Raises:

  • (ArgumentError)


36
37
38
39
40
41
42
43
# File 'lib/riak/client/yokozuna.rb', line 36

def create_search_schema(name, content)
  raise ArgumentError, t("zero_length_schema") if name.nil? || name.empty?
  raise ArgumentError, t("zero_length_content") if content.nil? || content.empty?
  backend do |b|
    b.create_search_schema(name, content)
  end
  true
end

- (Object) delete_object(bucket, key, options = {})

Delete an object. See Bucket#delete



251
252
253
254
255
# File 'lib/riak/client.rb', line 251

def delete_object(bucket, key, options = {})
  backend do |b|
    b.delete_object(bucket, key, options)
  end
end

- (Object) delete_search_index(name)

Raises:

  • (ArgumentError)


28
29
30
31
32
33
34
# File 'lib/riak/client/yokozuna.rb', line 28

def delete_search_index(name)
  raise ArgumentError, t("zero_length_index") if name.nil? || name.empty?
  backend do |b|
    b.delete_search_index(name)
  end
  true
end

- (Object) get_bucket_props(bucket)

Bucket properties. See Bucket#props



258
259
260
261
262
# File 'lib/riak/client.rb', line 258

def get_bucket_props(bucket)
  backend do |b|
    b.get_bucket_props bucket
  end
end

- (Object) get_index(bucket, index, query, options = {})

Queries a secondary index on a bucket. See Bucket#get_index



265
266
267
268
269
# File 'lib/riak/client.rb', line 265

def get_index(bucket, index, query, options={})
  backend do |b|
    b.get_index bucket, index, query, options
  end
end

- (Object) get_many(pairs)

Get multiple objects in parallel.



272
273
274
# File 'lib/riak/client.rb', line 272

def get_many(pairs)
  Multiget.get_all self, pairs
end

- (Object) get_object(bucket, key, options = {})

Get an object. See Bucket#get

Raises:

  • (ArgumentError)


277
278
279
280
281
282
# File 'lib/riak/client.rb', line 277

def get_object(bucket, key, options = {})
  raise ArgumentError, t("zero_length_key") if key == ''
  backend do |b|
    b.fetch_object(bucket, key, options)
  end
end

- (Object) get_search_index(name)

Raises:

  • (ArgumentError)


11
12
13
14
15
16
17
18
# File 'lib/riak/client/yokozuna.rb', line 11

def get_search_index(name)
  raise ArgumentError, t("zero_length_index") if name.nil? || name.empty?
  resp = []
  backend do |b|
    resp = b.get_search_index(name)
  end
  resp.index && Array === resp.index ? resp.index.first : resp
end

- (Object) get_search_schema(name)

Raises:

  • (ArgumentError)


45
46
47
48
49
50
# File 'lib/riak/client/yokozuna.rb', line 45

def get_search_schema(name)
  raise ArgumentError, t("zero_length_schema") if name.nil? || name.empty?
  backend do |b|
    return b.get_search_schema(name)
  end
end

- (Object) http(&block)

Yields an HTTPBackend.



285
286
287
# File 'lib/riak/client.rb', line 285

def http(&block)
  recover_from @http_pool, &block
end

- (Object) index(index, *docs) - (Object) index(*docs) Also known as: add_doc

(Riak Search) Adds documents to a search index via the Solr interface.

Overloads:

  • - (Object) index(index, *docs)

    Adds documents to the specified search index

    Parameters:

    • index (String)

      the index in which to add/update the given documents

    • docs (Array<Hash>)

      unnested document hashes, with one key per field

  • - (Object) index(*docs)

    Adds documents to the default search index

    Parameters:

    • docs (Array<Hash>)

      unnested document hashes, with one key per field

Raises:

  • (ArgumentError)

    if any documents don't include 'id' key



38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
# File 'lib/riak/client/search.rb', line 38

def index(*args)
  index = args.shift if String === args.first # Documents must be hashes of fields
  raise ArgumentError.new(t("search_docs_require_id")) unless args.all? {|d| d.key?("id") || d.key?(:id) }
  xml = Builder::XmlMarkup.new
  xml.add do
    args.each do |doc|
      xml.doc do
        doc.each do |k,v|
          xml.field('name' => k.to_s) { xml.text!(v.to_s) }
        end
      end
    end
  end
  http do |h|
    h.update_search_index(index, xml.target!)
  end
  true
end

- (String) inspect

Returns A representation suitable for IRB and debugging output.

Returns:

  • (String)

    A representation suitable for IRB and debugging output.



298
299
300
# File 'lib/riak/client.rb', line 298

def inspect
  "#<Riak::Client #{nodes.inspect}>"
end

Link-walk.



303
304
305
306
307
# File 'lib/riak/client.rb', line 303

def link_walk(object, specs)
  http do |h|
    h.link_walk object, specs
  end
end

- (Object) list_keys(bucket, options = {}, &block)

Retrieves a list of keys in the given bucket. See Bucket#keys



310
311
312
313
314
315
316
317
318
319
320
# File 'lib/riak/client.rb', line 310

def list_keys(bucket, options={}, &block)
  if block_given?
    backend do |b|
      b.list_keys bucket, options, &block
    end
  else
    backend do |b|
      b.list_keys bucket, options
    end
  end
end

- (Object) list_search_indexes



20
21
22
23
24
25
26
# File 'lib/riak/client/yokozuna.rb', line 20

def list_search_indexes()
  resp = []
  backend do |b|
    resp = b.get_search_index(nil)
  end
  resp.index ? resp.index : resp
end

- (Object) mapred(mr, &block)

Executes a mapreduce request. See MapReduce#run



323
324
325
326
327
# File 'lib/riak/client.rb', line 323

def mapred(mr, &block)
  backend do |b|
    b.mapred(mr, &block)
  end
end

- (HTTPBackend) new_http_backend

Creates a new HTTP backend.

Returns:



331
332
333
334
335
336
337
338
339
340
341
342
343
344
# File 'lib/riak/client.rb', line 331

def new_http_backend
  klass = self.class.const_get("#{@http_backend}Backend")
  if klass.configured?
    node = choose_node(
      @nodes.select do |n|
        n.http?
      end
    )

    klass.new(self, node)
  else
    raise t('http_configuration', :backend => @http_backend)
  end
end

- (ProtobuffsBackend) new_protobuffs_backend

Creates a new protocol buffers backend.

Returns:



349
350
351
352
353
354
355
356
357
358
359
360
361
362
# File 'lib/riak/client.rb', line 349

def new_protobuffs_backend
  klass = self.class.const_get("#{@protobuffs_backend}ProtobuffsBackend")
  if klass.configured?
    node = choose_node(
      @nodes.select do |n|
        n.protobuffs?
      end
    )

    klass.new(self, node)
  else
    raise t('protobuffs_configuration', :backend => @protobuffs_backend)
  end
end

- (Node) node

Returns An arbitrary Node.

Returns:

  • (Node)

    An arbitrary Node.



365
366
367
# File 'lib/riak/client.rb', line 365

def node
  nodes[rand nodes.size]
end

- (true, false) ping

Pings the Riak cluster to check for liveness.

Returns:

  • (true, false)

    whether the Riak cluster is alive and reachable



371
372
373
374
375
# File 'lib/riak/client.rb', line 371

def ping
  backend do |b|
    b.ping
  end
end

- (Object) protobuffs(&block)

Yields a protocol buffers backend.



378
379
380
# File 'lib/riak/client.rb', line 378

def protobuffs(&block)
  recover_from @protobuffs_pool, &block
end

- (Object) recover_from(pool)

Takes a pool. Acquires a backend from the pool and yields it with node-specific error recovery.



419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
# File 'lib/riak/client.rb', line 419

def recover_from(pool)
  skip_nodes = []
  take_opts = {}
  tries = 3

  begin
    # Only select nodes which we haven't used before.
    unless skip_nodes.empty?
      take_opts[:filter] = lambda do |backend|
        not skip_nodes.include? backend.node
      end
    end

    # Acquire a backend
    pool.take(take_opts) do |backend|
      begin
        yield backend
      rescue *NETWORK_ERRORS => e
        # Network error.
        tries -= 1

        # Notify the node that a request against it failed.
        backend.node.error_rate << 1

        # Skip this node next time.
        skip_nodes << backend.node

        # And delete this connection.
        raise Pool::BadResource, e
      end
    end
  rescue Pool::BadResource => e
    retry if tries > 0
    raise e.message
  end
end

- (Object) reload_object(object, options = {})

Reloads the object from Riak.



457
458
459
460
461
# File 'lib/riak/client.rb', line 457

def reload_object(object, options = {})
  backend do |b|
    b.reload_object(object, options)
  end
end

- (Object) remove(index, specs) - (Object) remove(specs) Also known as: delete_doc, deindex

(Riak Search) Removes documents from a search index via the Solr interface.

Overloads:

  • - (Object) remove(index, specs)

    Removes documents from the specified index

    Parameters:

    • index (String)

      the index from which to remove documents

    • specs (Array<Hash>)

      the specificaiton of documents to remove (must contain 'id' or 'query' keys)

  • - (Object) remove(specs)

    Removes documents from the default index

    Parameters:

    • specs (Array<Hash>)

      the specification of documents to remove (must contain 'id' or 'query' keys)

Raises:

  • (ArgumentError)

    if any document specs don't include 'id' or 'query' keys



67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
# File 'lib/riak/client/search.rb', line 67

def remove(*args)
  index = args.shift if String === args.first
  raise ArgumentError.new(t("search_remove_requires_id_or_query")) unless args.all? { |s|
    s.include? :id or
    s.include? 'id' or
    s.include? :query or
    s.include? 'query'
  }
  xml = Builder::XmlMarkup.new
  xml.delete do
    args.each do |spec|
      spec.each do |k,v|
        xml.tag!(k.to_sym, v)
      end
    end
  end
  http do |h|
    h.update_search_index(index, xml.target!)
  end
  true
end

- (Hash) search(index, query, options = {}) - (Hash) search(query, options = {}) Also known as: select

(Riak Search) Performs a search via the Solr interface.

Overloads:

  • - (Hash) search(index, query, options = {})

    Parameters:

    • index (String)

      the index to query on

    • query (String)

      a Lucene query string

  • - (Hash) search(query, options = {})

    Queries the default index

    Parameters:

    • query (String)

      a Lucene query string

Parameters:

  • options (Hash)

    extra options for the Solr query

Returns:

  • (Hash)

    the query result, containing the 'responseHeaders' and 'response' keys



20
21
22
23
24
25
26
# File 'lib/riak/client/search.rb', line 20

def search(*args)
  options = args.extract_options!
  index, query = args[-2], args[-1]  # Allows nil index, while keeping it as firstargument
  backend do |b|
    b.search(index, query, options)
  end
end

- (Object) set_bucket_props(bucket, properties)

Sets the properties on a bucket. See Bucket#props=



464
465
466
467
468
469
470
# File 'lib/riak/client.rb', line 464

def set_bucket_props(bucket, properties)
  # A bug in Beefcake is still giving us trouble with default booleans.
  # Until it is resolved, we'll use the HTTP backend.
  http do |b|
    b.set_bucket_props(bucket, properties)
  end
end

- (Object) ssl=(value)

Enables or disables SSL on all nodes, for HTTP backends.



480
481
482
483
484
485
# File 'lib/riak/client.rb', line 480

def ssl=(value)
  @nodes.each do |node|
    node.ssl = value
  end
  value
end

- (Stamp) stamp

Exposes a Stamp object for use in generating unique identifiers.

Returns:

  • (Stamp)

    an ID generator

See Also:



491
492
493
# File 'lib/riak/client.rb', line 491

def stamp
  @stamp ||= Riak::Stamp.new(self)
end

- (Object) store_object(object, options = {})

Stores an object in Riak.



497
498
499
500
501
502
# File 'lib/riak/client.rb', line 497

def store_object(object, options = {})
  params = {:returnbody => true}.merge(options)
  backend do |b|
    b.store_object(object, params)
  end
end