Class: Riak::Client

Inherits:
Object show all
Includes:
Util::Escape, Util::Translation
Defined in:
lib/riak/client.rb,
lib/riak/client/pool.rb,
lib/riak/client/node.rb,
lib/riak/client/search.rb,
lib/riak/client/http_backend.rb,
lib/riak/client/excon_backend.rb,
lib/riak/client/net_http_backend.rb,
lib/riak/client/beefcake/messages.rb,
lib/riak/client/protobuffs_backend.rb,
lib/riak/client/beefcake/object_methods.rb,
lib/riak/client/http_backend/key_streamer.rb,
lib/riak/client/http_backend/configuration.rb,
lib/riak/client/beefcake_protobuffs_backend.rb,
lib/riak/client/http_backend/object_methods.rb,
lib/riak/client/http_backend/request_headers.rb,
lib/riak/client/http_backend/transport_methods.rb

Overview

A client connection to Riak.

Defined Under Namespace

Classes: BeefcakeProtobuffsBackend, Decaying, ExconBackend, HTTPBackend, LuwakFile, NetHTTPBackend, Node, Pool, ProtobuffsBackend

Constant Summary

MAX_CLIENT_ID =

When using integer client IDs, the exclusive upper-bound of valid values.

4294967296
PROTOCOLS =

Array of valid protocols

%w[http https pbc]
HOST_REGEX =

Regexp for validating hostnames, lifted from uri.rb in Ruby 1.8.6

/^(?:(?:(?:[a-zA-Z\d](?:[-a-zA-Z\d]*[a-zA-Z\d])?)\.)*(?:[a-zA-Z](?:[-a-zA-Z\d]*[a-zA-Z\d])?)\.?|\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}|\[(?:(?:[a-fA-F\d]{1,4}:)*(?:[a-fA-F\d]{1,4}|\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3})|(?:(?:[a-fA-F\d]{1,4}:)*[a-fA-F\d]{1,4})?::(?:(?:[a-fA-F\d]{1,4}:)*(?:[a-fA-F\d]{1,4}|\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}))?)\])$/n
VALID_OPTIONS =

Valid constructor options.

[:protocol, :nodes, :client_id, :http_backend, :protobuffs_backend] | Node::VALID_OPTIONS
NETWORK_ERRORS =

Network errors.

[
  EOFError,
  Errno::ECONNABORTED,
  Errno::ECONNREFUSED,
  Errno::ECONNRESET,
  Errno::ENETDOWN,
  Errno::ENETRESET,
  Errno::ENETUNREACH,
  SocketError,
  SystemCallError,
]

Instance Attribute Summary (collapse)

Instance Method Summary (collapse)

Methods included from Util::Escape

#escape, #maybe_escape, #maybe_unescape, #unescape

Methods included from Util::Translation

#i18n_scope, #t

Constructor Details

- (Client) initialize(options = {})

Creates a client connection to Riak

Parameters:

  • options (Hash) (defaults to: {})

    configuration options for the client

Options Hash (options):

  • :nodes (Array)

    A list of nodes this client connects to. Each element of the list is a hash which is passed to Node.new, e.g. '127.0.0.1', pb_port: 1234, .... If no nodes are given, a single node is constructed from the remaining options given to Client.new.

  • :host (String) — default: '127.0.0.1'

    The host or IP address for the Riak endpoint

  • :http_port (Fixnum) — default: 8098

    The port of the Riak HTTP endpoint

  • :pb_port (Fixnum) — default: 8087

    The port of the Riak Protocol Buffers endpoint

  • :prefix (String) — default: '/riak/'

    The URL path prefix to the main HTTP endpoint

  • :mapred (String) — default: '/mapred'

    The path to the map-reduce HTTP endpoint

  • :client_id (Fixnum, String) — default: rand(MAX_CLIENT_ID)

    The internal client ID used by Riak to route responses

  • :http_backend (String, Symbol) — default: :NetHTTP

    which HTTP backend to use

  • :protobuffs_backend (String, Symbol) — default: :Beefcake

    which Protocol Buffers backend to use

Raises:

  • (ArgumentError)

    raised if any invalid options are given



87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
# File 'lib/riak/client.rb', line 87

def initialize(options={})
  if options.include? :port
    warn(t('deprecated.port', :backtrace => caller[0..2].join("\n    ")))
  end

  unless (evil = options.keys - VALID_OPTIONS).empty?
    raise ArgumentError, "#{evil.inspect} are not valid options for Client.new"
  end

  @nodes = (options[:nodes] || []).map do |n|
    Client::Node.new self, n
  end
  if @nodes.empty? or options[:host] or options[:http_port] or options[:pb_port]
    @nodes |= [Client::Node.new(self, options)]
  end

  @protobuffs_pool = Pool.new(
                              method(:new_protobuffs_backend),
                              lambda { |b| b.teardown }
                              )

  @http_pool = Pool.new(
                        method(:new_http_backend),
                        lambda { |b| b.teardown }
                        )

  self.protocol           = options[:protocol]           || "http"
  self.http_backend       = options[:http_backend]       || :NetHTTP
  self.protobuffs_backend = options[:protobuffs_backend] || :Beefcake
  self.client_id          = options[:client_id]          if options[:client_id]
end

Instance Attribute Details

- (String) client_id

The internal client ID used by Riak to route responses

Returns:

  • (String)

    The internal client ID used by Riak to route responses



57
58
59
60
61
62
63
64
65
# File 'lib/riak/client.rb', line 57

def client_id
  @client_id ||= backend do |b|
    if b.respond_to?(:get_client_id)
      b.get_client_id
    else
      make_client_id
    end
  end
end

- (Symbol) http_backend

The HTTP backend/client to use

Returns:

  • (Symbol)

    The HTTP backend/client to use



60
61
62
# File 'lib/riak/client.rb', line 60

def http_backend
  @http_backend
end

- (Client::Pool) http_pool (readonly)

A pool of HTTP connections

Returns:



63
64
65
# File 'lib/riak/client.rb', line 63

def http_pool
  @http_pool
end

- (Array) nodes

The set of Nodes this client can communicate with.

Returns:

  • (Array)

    The set of Nodes this client can communicate with.



54
55
56
# File 'lib/riak/client.rb', line 54

def nodes
  @nodes
end

- (Symbol) protobuffs_backend

The Protocol Buffers backend/client to use

Returns:

  • (Symbol)

    The Protocol Buffers backend/client to use



66
67
68
# File 'lib/riak/client.rb', line 66

def protobuffs_backend
  @protobuffs_backend
end

- (Client::Pool) protobuffs_pool (readonly)

A pool of protobuffs connections

Returns:



69
70
71
# File 'lib/riak/client.rb', line 69

def protobuffs_pool
  @protobuffs_pool
end

- (String) protocol

The protocol to use for the Riak endpoint

Returns:

  • (String)

    The protocol to use for the Riak endpoint



51
52
53
# File 'lib/riak/client.rb', line 51

def protocol
  @protocol
end

Instance Method Details

- (Object) backend {|HTTPBackend, ProtobuffsBackend| ... }

Yields a backend for operations that are protocol-independent. You can change which type of backend is used by setting the #protocol.

Yields:



123
124
125
126
127
128
129
130
# File 'lib/riak/client.rb', line 123

def backend(&block)
  case @protocol.to_s
  when /https?/i
    http &block
  when /pbc/i
    protobuffs &block
  end
end

- (Object) basic_auth=(auth)

Sets basic HTTP auth on all nodes.



133
134
135
136
137
138
# File 'lib/riak/client.rb', line 133

def basic_auth=(auth)
  @nodes.each do |node|
    node.basic_auth = auth
  end
  auth
end

- (Bucket) bucket(name, options = {}) Also known as: []

Retrieves a bucket from Riak.

Parameters:

  • bucket (String)

    the bucket to retrieve

  • options (Hash) (defaults to: {})

    options for retrieving the bucket

Options Hash (options):

  • :props (Boolean) — default: false

    whether to retreive the bucket properties

Returns:

  • (Bucket)

    the requested bucket



145
146
147
148
149
150
151
152
153
# File 'lib/riak/client.rb', line 145

def bucket(name, options={})
  unless (options.keys - [:props]).empty?
    raise ArgumentError, "invalid options"
  end
  @bucket_cache ||= {}
  (@bucket_cache[name] ||= Bucket.new(self, name)).tap do |b|
    b.props if options[:props]
  end
end

- (Array<Bucket>) buckets Also known as: list_buckets

Note:

This is an expensive operation and should be used only in development.

Lists buckets which have keys stored in them.

Returns:



160
161
162
163
164
165
# File 'lib/riak/client.rb', line 160

def buckets
  warn(t('list_buckets', :backtrace => caller.join("\n    "))) unless Riak.disable_list_keys_warnings
  backend do |b|
    b.list_buckets.map {|name| Bucket.new(self, name) }
  end
end

- (Object) choose_node(nodes = self.nodes)

Choose a node from a set.



169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
# File 'lib/riak/client.rb', line 169

def choose_node(nodes = self.nodes)
  # Prefer nodes which have gone a reasonable time without errors.
  s = nodes.select do |node|
    node.error_rate.value < 0.1
  end

  if s.empty?
    # Fall back to minimally broken node.
    nodes.min_by do |node|
      node.error_rate.value
    end
  else
    s[rand(s.size)]
  end
end

- (Object) delete_file(filename)

Deletes a file stored via the "Luwak" interface

Parameters:

  • filename (String)

    the key/filename to delete



217
218
219
220
221
222
# File 'lib/riak/client.rb', line 217

def delete_file(filename)
  http do |h|
    h.delete_file(filename)
  end
  true
end

- (Object) delete_object(bucket, key, options = {})

Delete an object. See Bucket#delete



225
226
227
228
229
# File 'lib/riak/client.rb', line 225

def delete_object(bucket, key, options = {})
  backend do |b|
    b.delete_object(bucket, key, options)
  end
end

- (true, false) file_exists?(key) Also known as: file_exist?

Checks whether a file exists in "Luwak".

Parameters:

  • key (String)

    the key to check

Returns:

  • (true, false)

    whether the key exists in "Luwak"



234
235
236
237
238
# File 'lib/riak/client.rb', line 234

def file_exists?(key)
  http do |h|
    h.file_exists?(key)
  end
end

- (Object) get_bucket_props(bucket)

Bucket properties. See Bucket#props



242
243
244
245
246
# File 'lib/riak/client.rb', line 242

def get_bucket_props(bucket)
  backend do |b|
    b.get_bucket_props bucket
  end
end

- (IO?) get_file(filename) {|chunk| ... }

Retrieves a large file/IO object from Riak via the "Luwak" interface. Streams the data to a temporary file unless a block is given.

Parameters:

  • filename (String)

    the key/filename for the object

Yields:

  • (chunk)

    stream contents of the file through the block. Passing the block will result in nil being returned from the method.

Yield Parameters:

  • chunk (String)

    a single chunk of the object's data

Returns:

  • (IO, nil)

    the file (also having content_type and original_filename accessors). The file will need to be reopened to be read. nil will be returned if a block is given.



259
260
261
262
263
# File 'lib/riak/client.rb', line 259

def get_file(filename, &block)
  http do |h|
    h.get_file(filename, &block)
  end
end

- (Object) get_index(bucket, index, query)

Queries a secondary index on a bucket. See Bucket#get_index



266
267
268
269
270
# File 'lib/riak/client.rb', line 266

def get_index(bucket, index, query)
  backend do |b|
    b.get_index bucket, index, query
  end
end

- (Object) get_object(bucket, key, options = {})

Get an object. See Bucket#get



273
274
275
276
277
# File 'lib/riak/client.rb', line 273

def get_object(bucket, key, options = {})
  backend do |b|
    b.fetch_object(bucket, key, options)
  end
end

- (Object) http(&block)

Yields an HTTPBackend.



280
281
282
# File 'lib/riak/client.rb', line 280

def http(&block)
  recover_from @http_pool, &block
end

- (Object) index(index, *docs) - (Object) index(*docs) Also known as: add_doc

(Riak Search) Adds documents to a search index via the Solr interface.

Overloads:

  • - (Object) index(index, *docs)

    Adds documents to the specified search index

    Parameters:

    • index (String)

      the index in which to add/update the given documents

    • docs (Array<Hash>)

      unnested document hashes, with one key per field

  • - (Object) index(*docs)

    Adds documents to the default search index

    Parameters:

    • docs (Array<Hash>)

      unnested document hashes, with one key per field

Raises:

  • (ArgumentError)

    if any documents don't include 'id' key



38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
# File 'lib/riak/client/search.rb', line 38

def index(*args)
  index = args.shift if String === args.first # Documents must be hashes of fields
  raise ArgumentError.new(t("search_docs_require_id")) unless args.all? {|d| d.key?("id") || d.key?(:id) }
  xml = Builder::XmlMarkup.new
  xml.add do
    args.each do |doc|
      xml.doc do
        doc.each do |k,v|
          xml.field('name' => k.to_s) { xml.text!(v.to_s) }
        end
      end
    end
  end
  http do |h|
    h.update_search_index(index, xml.target!)
  end
  true
end

- (String) inspect

A representation suitable for IRB and debugging output.

Returns:

  • (String)

    A representation suitable for IRB and debugging output.



293
294
295
# File 'lib/riak/client.rb', line 293

def inspect
  "#<Riak::Client #{nodes.inspect}>"
end

Link-walk.



298
299
300
301
302
# File 'lib/riak/client.rb', line 298

def link_walk(object, specs)
  http do |h|
    h.link_walk object, specs
  end
end

- (Object) list_keys(bucket, &block)

Retrieves a list of keys in the given bucket. See Bucket#keys



305
306
307
308
309
310
311
312
313
314
315
# File 'lib/riak/client.rb', line 305

def list_keys(bucket, &block)
  if block_given?
    backend do |b|
      b.list_keys bucket, &block
    end
  else
    backend do |b|
      b.list_keys bucket
    end
  end
end

- (Object) mapred(mr, &block)

Executes a mapreduce request. See MapReduce#run



318
319
320
321
322
# File 'lib/riak/client.rb', line 318

def mapred(mr, &block)
  backend do |b|
    b.mapred(mr, &block)
  end
end

- (HTTPBackend) new_http_backend

Creates a new HTTP backend.

Returns:



326
327
328
329
330
331
332
333
334
335
336
337
338
339
# File 'lib/riak/client.rb', line 326

def new_http_backend
  klass = self.class.const_get("#{@http_backend}Backend")
  if klass.configured?
    node = choose_node(
      @nodes.select do |n|
        n.http?
      end
    )

    klass.new(self, node)
  else
    raise t('http_configuration', :backend => @http_backend)
  end
end

- (ProtobuffsBackend) new_protobuffs_backend

Creates a new protocol buffers backend.

Returns:



344
345
346
347
348
349
350
351
352
353
354
355
356
357
# File 'lib/riak/client.rb', line 344

def new_protobuffs_backend
  klass = self.class.const_get("#{@protobuffs_backend}ProtobuffsBackend")
  if klass.configured?
    node = choose_node(
      @nodes.select do |n|
        n.protobuffs?
      end
    )

    klass.new(self, node)
  else
    raise t('protobuffs_configuration', :backend => @protobuffs_backend)
  end
end

- (Node) node

An arbitrary Node.

Returns:

  • (Node)

    An arbitrary Node.



360
361
362
# File 'lib/riak/client.rb', line 360

def node
  nodes[rand nodes.size]
end

- (true, false) ping

Pings the Riak cluster to check for liveness.

Returns:

  • (true, false)

    whether the Riak cluster is alive and reachable



366
367
368
369
370
# File 'lib/riak/client.rb', line 366

def ping
  backend do |b|
    b.ping
  end
end

- (Object) protobuffs(&block)

Yields a protocol buffers backend.



373
374
375
# File 'lib/riak/client.rb', line 373

def protobuffs(&block)
  recover_from @protobuffs_pool, &block
end

- (Object) recover_from(pool)

Takes a pool. Acquires a backend from the pool and yields it with node-specific error recovery.



412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
# File 'lib/riak/client.rb', line 412

def recover_from(pool)
  skip_nodes = []
  take_opts = {}
  tries = 3

  begin
    # Only select nodes which we haven't used before.
    unless skip_nodes.empty?
      take_opts[:filter] = lambda do |backend|
        not skip_nodes.include? backend.node
      end
    end

    # Acquire a backend
    pool.take(take_opts) do |backend|
      begin
        yield backend
      rescue *NETWORK_ERRORS => e
        # Network error.
        tries -= 1

        # Notify the node that a request against it failed.
        backend.node.error_rate << 1

        # Skip this node next time.
        skip_nodes << backend.node

        # And delete this connection.
        raise Pool::BadResource, e
      end
    end
  rescue Pool::BadResource => e
    retry if tries > 0
    raise e.message
  end
end

- (Object) reload_object(object, options = {})

Reloads the object from Riak.



450
451
452
453
454
# File 'lib/riak/client.rb', line 450

def reload_object(object, options = {})
  backend do |b|
    b.reload_object(object, options)
  end
end

- (Object) remove(index, specs) - (Object) remove(specs) Also known as: delete_doc, deindex

(Riak Search) Removes documents from a search index via the Solr interface.

Overloads:

  • - (Object) remove(index, specs)

    Removes documents from the specified index

    Parameters:

    • index (String)

      the index from which to remove documents

    • specs (Array<Hash>)

      the specificaiton of documents to remove (must contain 'id' or 'query' keys)

  • - (Object) remove(specs)

    Removes documents from the default index

    Parameters:

    • specs (Array<Hash>)

      the specification of documents to remove (must contain 'id' or 'query' keys)

Raises:

  • (ArgumentError)

    if any document specs don't include 'id' or 'query' keys



67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
# File 'lib/riak/client/search.rb', line 67

def remove(*args)
  index = args.shift if String === args.first
  raise ArgumentError.new(t("search_remove_requires_id_or_query")) unless args.all? { |s|
    s.include? :id or
    s.include? 'id' or
    s.include? :query or
    s.include? 'query'
  }
  xml = Builder::XmlMarkup.new
  xml.delete do
    args.each do |spec|
      spec.each do |k,v|
        xml.tag!(k.to_sym, v)
      end
    end
  end
  http do |h|
    h.update_search_index(index, xml.target!)
  end
  true
end

- (Hash) search(index, query, options = {}) - (Hash) search(query, options = {}) Also known as: select

(Riak Search) Performs a search via the Solr interface.

Overloads:

  • - (Hash) search(index, query, options = {})

    Parameters:

    • index (String)

      the index to query on

    • query (String)

      a Lucene query string

  • - (Hash) search(query, options = {})

    Queries the default index

    Parameters:

    • query (String)

      a Lucene query string

Parameters:

  • options (Hash)

    extra options for the Solr query

Returns:

  • (Hash)

    the query result, containing the 'responseHeaders' and 'response' keys



20
21
22
23
24
25
26
# File 'lib/riak/client/search.rb', line 20

def search(*args)
  options = args.extract_options!
  index, query = args[-2], args[-1]  # Allows nil index, while keeping it as firstargument
  http do |h|
    h.search(index, query, options)
  end
end

- (Object) set_bucket_props(bucket, properties)

Sets the properties on a bucket. See Bucket#props=



457
458
459
460
461
462
463
# File 'lib/riak/client.rb', line 457

def set_bucket_props(bucket, properties)
  # A bug in Beefcake is still giving us trouble with default booleans.
  # Until it is resolved, we'll use the HTTP backend.
  http do |b|
    b.set_bucket_props(bucket, properties)
  end
end

- (Object) ssl=(value)

Enables or disables SSL on all nodes, for HTTP backends.



466
467
468
469
470
471
472
473
474
475
476
477
# File 'lib/riak/client.rb', line 466

def ssl=(value)
  @nodes.each do |node|
    node.ssl = value
  end

  if value
    @protocol = 'https'
  else
    @protocol = 'http'
  end
  value
end

- (Stamp) stamp

Exposes a Stamp object for use in generating unique identifiers.

Returns:

  • (Stamp)

    an ID generator

See Also:



483
484
485
# File 'lib/riak/client.rb', line 483

def stamp
  @stamp ||= Riak::Stamp.new(self)
end

- (String) store_file(filename, content_type, data) - (String) store_file(content_type, data)

Stores a large file/IO-like object in Riak via the "Luwak" interface.

Overloads:

  • - (String) store_file(filename, content_type, data)

    Stores the file at the given key/filename

    Parameters:

    • filename (String)

      the key/filename for the object

    • content_type (String)

      the MIME Content-Type for the data

    • data (IO, String)

      the contents of the file

  • - (String) store_file(content_type, data)

    Stores the file with a server-determined key/filename

    Parameters:

    • content_type (String)

      the MIME Content-Type for the data

    • data (String, #read)

      the contents of the file

Returns:

  • (String)

    the key/filename where the object was stored



498
499
500
501
502
# File 'lib/riak/client.rb', line 498

def store_file(*args)
  http do |h|
    h.store_file(*args)
  end
end

- (Object) store_object(object, options = {})

Stores an object in Riak.



505
506
507
508
509
510
# File 'lib/riak/client.rb', line 505

def store_object(object, options = {})
  params = {:returnbody => true}.merge(options)
  backend do |b|
    b.store_object(object, params)
  end
end