Class: Riak::Client::HTTPBackend

Inherits:
Object
  • Object
show all
Includes:
FeatureDetection, Configuration, ObjectMethods, TransportMethods, Util::Escape, Util::Translation
Defined in:
lib/riak/client/http_backend.rb,
lib/riak/client/http_backend/key_streamer.rb,
lib/riak/client/http_backend/configuration.rb,
lib/riak/client/http_backend/object_methods.rb,
lib/riak/client/http_backend/bucket_streamer.rb,
lib/riak/client/http_backend/request_headers.rb,
lib/riak/client/http_backend/transport_methods.rb,
lib/riak/client/http_backend/chunked_json_streamer.rb

Overview

The parent class for all backends that connect to Riak via HTTP. This class implements all of the universal backend API methods on behalf of subclasses, which need only implement the TransportMethods#perform method for library-specific semantics.

Direct Known Subclasses

ExconBackend, NetHTTPBackend

Defined Under Namespace

Modules: Configuration, ObjectMethods, TransportMethods Classes: BucketStreamer, ChunkedJsonStreamer, KeyStreamer, RequestHeaders

Constant Summary

Constant Summary

Constants included from FeatureDetection

FeatureDetection::VERSION

Instance Attribute Summary (collapse)

Instance Method Summary (collapse)

Methods included from Configuration

#bucket_list_path, #bucket_properties_path, #counter_path, #index_eq_path, #index_range_path, #key_list_path, #link_walk_path, #mapred_path, #object_path, #ping_path, #solr_select_path, #solr_update_path, #stats_path

Methods included from ObjectMethods

#load_object, #reload_headers, #store_headers

Methods included from TransportMethods

#basic_auth_header, #client_id, #default_headers, #delete, #get, #head, #path, #perform, #post, #put, #return_body?, #root_uri, #valid_response?, #verify_body!

Methods included from FeatureDetection

#get_server_version, #http_props_clearable?, #index_pagination?, #index_return_terms?, #index_streaming?, #key_object_bucket_timeouts?, #mapred_phaseless?, #pb_conditionals?, #pb_head?, #pb_indexes?, #pb_search?, #quorum_controls?, #server_version, #tombstone_vclocks?

Methods included from Util::Translation

#i18n_scope, #t

Methods included from Util::Escape

#escape, #maybe_escape, #maybe_unescape, #unescape

Constructor Details

- (HTTPBackend) initialize(client, node)

Create an HTTPBackend for the Riak::Client.

Parameters:

  • The (Client)

    client

  • The (Node)

    node we're connecting to.

Raises:

  • (ArgumentError)


41
42
43
44
45
46
# File 'lib/riak/client/http_backend.rb', line 41

def initialize(client, node)
  raise ArgumentError, t("client_type", :client => client) unless Client === client
  raise ArgumentError, t("node_type", :node => node) unless Node === node
  @client = client
  @node = node
end

Instance Attribute Details

- (Object) client (readonly)

The Riak::Client that uses this backend



33
34
35
# File 'lib/riak/client/http_backend.rb', line 33

def client
  @client
end

- (Object) node (readonly)

The Riak::Client::Node that uses this backend



36
37
38
# File 'lib/riak/client/http_backend.rb', line 36

def node
  @node
end

Instance Method Details

- (true, false) clear_bucket_props(bucket)

Note:

false will be returned if the operation is not supported on the connected node

Clears bucket properties

Parameters:

  • bucket (Bucket, String)

    the bucket to clear properties on

Returns:

  • (true, false)

    whether the operation succeeded



176
177
178
179
180
181
182
183
184
# File 'lib/riak/client/http_backend.rb', line 176

def clear_bucket_props(bucket)
  if http_props_clearable?
    bucket = bucket.name if Bucket === bucket
    delete(204, bucket_properties_path(bucket))
    true
  else
    false
  end
end

- (Object) delete_object(bucket, key, options = {})

Deletes an object

Parameters:

  • bucket (Bucket, String)

    the bucket where the object lives

  • key (String)

    the key where the object lives

  • options (Hash) (defaults to: {})

    quorum and delete options



113
114
115
116
117
118
# File 'lib/riak/client/http_backend.rb', line 113

def delete_object(bucket, key, options={})
  bucket = bucket.name if Bucket === bucket
  vclock = options.delete(:vclock)
  headers = vclock ? {"X-Riak-VClock" => vclock} : {}
  delete([204, 404], object_path(bucket, key, options), headers)
end

- (RObject) fetch_object(bucket, key, options = {})

Fetches an object by bucket/key

Parameters:

  • bucket (Bucket, String)

    the bucket where the object is stored

  • key (String)

    the key of the object

  • options (Hash) (defaults to: {})

    request quorums

Options Hash (options):

  • :r (Fixnum, String, Symbol)

    the read quorum for the request - how many nodes should concur on the read

  • :pr (Fixnum, String, Symbol)

    the “primary” read quorum for the request - how many primary partitions must be available

Returns:



68
69
70
71
72
73
# File 'lib/riak/client/http_backend.rb', line 68

def fetch_object(bucket, key, options={})
  bucket = Bucket.new(client, bucket) if String === bucket
  method = options.delete(:head) ? :head : :get
  response = send(method, [200,300], object_path(bucket.name, key, options))
  load_object(RObject.new(bucket, key), response)
end

- (Hash) get_bucket_props(bucket)

Fetches bucket properties

Parameters:

  • bucket (Bucket, String)

    the bucket properties to fetch

Returns:

  • (Hash)

    bucket properties



155
156
157
158
159
# File 'lib/riak/client/http_backend.rb', line 155

def get_bucket_props(bucket)
  bucket = bucket.name if Bucket === bucket
  response = get(200, bucket_properties_path(bucket))
  JSON.parse(response[:body])['props']
end

- (Object) get_counter(bucket, key, options = {})

Fetches a counter

Parameters:

  • bucket (Bucket, String)

    the bucket where the counter exists

  • key (String)

    the key for the counter

  • options (Hash) (defaults to: {})

    unused



124
125
126
127
128
129
130
131
132
133
# File 'lib/riak/client/http_backend.rb', line 124

def get_counter(bucket, key, options={})
  bucket = bucket.name if bucket.is_a? Bucket
  response = get([200, 404], counter_path(bucket, key, options))
  case response[:code]
  when 200
    return response[:body].to_i
  when 404
    return 0
  end
end

- (Array<String>) get_index(bucket, index, query, options = {})

Performs a secondary-index query.

Parameters:

  • bucket (String, Bucket)

    the bucket to query

  • index (String)

    the index to query

  • query (String, Integer, Range)

    the equality query or range query to perform

Returns:

  • (Array<String>)

    a list of keys matching the query



278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
# File 'lib/riak/client/http_backend.rb', line 278

def get_index(bucket, index, query, options={})
  bucket = bucket.name if Bucket === bucket
  path = case query
         when Range
           raise ArgumentError, t('invalid_index_query', :value => query.inspect) unless String === query.begin || Integer === query.end
           index_range_path(bucket, index, query.begin, query.end, options)
         when String, Integer
           index_eq_path(bucket, index, query, options)
         else
           raise ArgumentError, t('invalid_index_query', :value => query.inspect)
         end
  if block_given?
    parser = Riak::Util::Multipart::StreamParser.new do |response|
      result = JSON.parse response[:body]

      yield result['keys'] || result['results'] || []
    end
    get(200, path, &parser)
  else
    response = get(200, path)
    Riak::IndexCollection.new_from_json response[:body]
  end
end

Performs a link-walking query

Parameters:

  • robject (RObject)

    the object to start at

  • walk_specs (Array<WalkSpec>)

    a list of walk specifications to process

Returns:



256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
# File 'lib/riak/client/http_backend.rb', line 256

def link_walk(robject, walk_specs)
  response = get(200, link_walk_path(robject.bucket.name, robject.key, walk_specs))
  if boundary = Util::Multipart.extract_boundary(response[:headers]['content-type'].first)
    Util::Multipart.parse(response[:body], boundary).map do |group|
      group.map do |obj|
        if obj[:headers] && !obj[:headers]['x-riak-deleted'] && !obj[:body].blank? && obj[:headers]['location']
          link = Riak::Link.new(obj[:headers]['location'].first, "")
          load_object(RObject.new(client.bucket(link.bucket), link.key), obj)
        end
      end.compact
    end
  else
    []
  end
end

- (Array<String>) list_buckets(options = {}, &block)

Lists known buckets

Returns:



207
208
209
210
211
212
213
214
215
# File 'lib/riak/client/http_backend.rb', line 207

def list_buckets(options = {}, &block)
  if block_given?
    get(200, bucket_list_path(options.merge(stream: true)), &BucketStreamer.new(block))
    return
  end

  response = get(200, bucket_list_path)
  JSON.parse(response[:body])['buckets']
end

- (Array<String>) list_keys(bucket, options = {}) {|Array<String>| ... }

List keys in a bucket

Parameters:

  • bucket (Bucket, String)

    the bucket to fetch the keys for

Yields:

  • (Array<String>)

    a list of keys from the current streamed chunk

Returns:

  • (Array<String>)

    the list of keys, if no block was given



192
193
194
195
196
197
198
199
200
201
202
203
# File 'lib/riak/client/http_backend.rb', line 192

def list_keys(bucket, options={}, &block)
  bucket = bucket.name if Bucket === bucket
  if block_given?
    stream_opts = options.merge keys: 'stream'
    get(200, key_list_path(bucket, stream_opts), {}, &KeyStreamer.new(block))
  else
    list_opts = options.merge keys: true
    response = get(200, key_list_path(bucket, list_opts))
    obj = JSON.parse(response[:body])
    obj && obj['keys'].map {|k| unescape(k) }
  end
end

- (Array<Object>) mapred(mr) {|Fixnum, Object| ... }

Performs a MapReduce query.

Parameters:

Yields:

  • (Fixnum, Object)

    the phase number and single result from the phase

Returns:

  • (Array<Object>)

    the list of results, if no block was given

Raises:



223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
# File 'lib/riak/client/http_backend.rb', line 223

def mapred(mr)
  raise MapReduceError.new(t("empty_map_reduce_query")) if mr.query.empty? && !mapred_phaseless?
  if block_given?
    parser = Riak::Util::Multipart::StreamParser.new do |response|
      result = JSON.parse(response[:body])
      yield result['phase'], result['data']
    end
    post(200, mapred_path({:chunked => true}), mr.to_json, {"Content-Type" => "application/json", "Accept" => "application/json"}, &parser)
    nil
  else
    results = MapReduce::Results.new(mr)
    parser = Riak::Util::Multipart::StreamParser.new do |response|
      result = JSON.parse(response[:body])
      results.add result['phase'], result['data']
    end
    post(200, mapred_path({:chunked => true}), mr.to_json, {"Content-Type" => "application/json", "Accept" => "application/json"}, &parser)
    results.report
  end
end

- (true, false) ping

Pings the server

Returns:

  • (true, false)

    whether the server is available



50
51
52
53
54
55
# File 'lib/riak/client/http_backend.rb', line 50

def ping
  get(200, ping_path)
  true
rescue
  false
end

- (Object) post_counter(bucket, key, amount, options = {})

Updates a counter

Parameters:

  • bucket (Bucket, String)

    the bucket where the counter exists

  • key (String)

    the key for the counter

  • amount (Integer)

    how much to increment the counter

  • options (Hash) (defaults to: {})

    unused



140
141
142
143
144
145
146
147
148
149
150
# File 'lib/riak/client/http_backend.rb', line 140

def post_counter(bucket, key, amount, options={})
  bucket = bucket.name if bucket.is_a? Bucket
  response = post([200, 204], counter_path(bucket, key, options), amount.to_s)
  case response[:code]
  when 200
    return response[:body].to_i
  when 204
    return 0 if options[:return_value]
    return nil
  end
end

- (Object) reload_object(robject, options = {})

Reloads the data for a given RObject, a special case of #fetch_object.



76
77
78
79
80
81
82
83
# File 'lib/riak/client/http_backend.rb', line 76

def reload_object(robject, options={})
  response = get([200,300,304], object_path(robject.bucket.name, robject.key, options), reload_headers(robject))
  if response[:code].to_i == 304
    robject
  else
    load_object(robject, response)
  end
end

- (Object) search(index, query, options = {})

(Riak Search) Performs a search query

Parameters:

  • index (String, nil)

    the index to query, or nil for the default

  • query (String)

    the Lucene query to perform

  • options (Hash) (defaults to: {})

    query options

See Also:



308
309
310
311
312
313
314
315
# File 'lib/riak/client/http_backend.rb', line 308

def search(index, query, options={})
  response = get(200, solr_select_path(index, query, options.stringify_keys))
  if response[:headers]['content-type'].include?("application/json")
    normalize_search_response JSON.parse(response[:body])
  else
    response[:body]
  end
end

- (Object) set_bucket_props(bucket, props)

Sets bucket properties

Parameters:

  • bucket (Bucket, String)

    the bucket to set properties on

  • properties (Hash)

    the properties to set



164
165
166
167
168
# File 'lib/riak/client/http_backend.rb', line 164

def set_bucket_props(bucket, props)
  bucket = bucket.name if Bucket === bucket
  body = {'props' => props}.to_json
  put(204, bucket_properties_path(bucket), body, {"Content-Type" => "application/json"})
end

- (Hash) stats

Gets health statistics

Returns:

  • (Hash)

    information about the server, including stats



245
246
247
248
# File 'lib/riak/client/http_backend.rb', line 245

def stats
  response = get(200, stats_path)
  JSON.parse(response[:body])
end

- (Object) store_object(robject, options = {})

Stores an object

Parameters:

  • robject (RObject)

    the object to store

  • options (Hash) (defaults to: {})

    quorum and storage options

Options Hash (options):

  • :returnbody (true, false) — default: false

    whether to update the object after write with the new value

  • :w (Fixnum, String, Symbol)

    the write quorum

  • :pw (Fixnum, String, Symbol)

    the “primary” write quorum - how many primary partitions must be available

  • :dw (Fixnum, String, Symbol)

    the durable write quorum



94
95
96
97
98
99
100
101
102
# File 'lib/riak/client/http_backend.rb', line 94

def store_object(robject, options={})
  method, codes = if robject.key.present?
                    [:put, [200,204,300]]
                  else
                    [:post, 201]
                  end
  response = send(method, codes, object_path(robject.bucket.name, robject.key, options), robject.raw_data, store_headers(robject))
  load_object(robject, response) if options[:returnbody]
end

- (Object) update_search_index(index, updates)

(Riak Search) Updates a search index (includes deletes).

Parameters:

  • index (String, nil)

    the index to update, or nil for the default index.

  • updates (String)

    an XML update string in Solr's required format

See Also:



322
323
324
# File 'lib/riak/client/http_backend.rb', line 322

def update_search_index(index, updates)
  post(200, solr_update_path(index), updates, {'Content-Type' => 'text/xml'})
end