Class: Riak::Client::HTTPBackend

Inherits:
Object
  • Object
show all
Includes:
FeatureDetection, Configuration, ObjectMethods, TransportMethods, Util::Escape, Util::Translation
Defined in:
lib/riak/client/http_backend.rb,
lib/riak/client/http_backend/key_streamer.rb,
lib/riak/client/http_backend/configuration.rb,
lib/riak/client/http_backend/object_methods.rb,
lib/riak/client/http_backend/bucket_streamer.rb,
lib/riak/client/http_backend/request_headers.rb,
lib/riak/client/http_backend/transport_methods.rb,
lib/riak/client/http_backend/chunked_json_streamer.rb

Overview

The parent class for all backends that connect to Riak via HTTP. This class implements all of the universal backend API methods on behalf of subclasses, which need only implement the TransportMethods#perform method for library-specific semantics.

Direct Known Subclasses

ExconBackend, NetHTTPBackend

Defined Under Namespace

Modules: Configuration, ObjectMethods, TransportMethods Classes: BucketStreamer, ChunkedJsonStreamer, KeyStreamer, RequestHeaders

Constant Summary

Constant Summary

Constants included from FeatureDetection

FeatureDetection::VERSION

Instance Attribute Summary (collapse)

Instance Method Summary (collapse)

Methods included from Configuration

#bucket_list_path, #bucket_properties_path, #counter_path, #index_eq_path, #index_range_path, #key_list_path, #link_walk_path, #mapred_path, #object_path, #ping_path, #solr_select_path, #solr_update_path, #stats_path

Methods included from ObjectMethods

#load_object, #reload_headers, #store_headers

Methods included from TransportMethods

#basic_auth_header, #client_id, #default_headers, #delete, #get, #head, #path, #perform, #post, #put, #return_body?, #root_uri, #valid_response?, #verify_body!

Methods included from FeatureDetection

#get_server_version, #http_props_clearable?, #index_pagination?, #index_return_terms?, #index_streaming?, #key_object_bucket_timeouts?, #mapred_phaseless?, #pb_conditionals?, #pb_head?, #pb_indexes?, #pb_search?, #quorum_controls?, #server_version, #tombstone_vclocks?

Methods included from Util::Translation

#i18n_scope, #t

Methods included from Util::Escape

#escape, #maybe_escape, #maybe_unescape, #unescape

Constructor Details

- (HTTPBackend) initialize(client, node)

Create an HTTPBackend for the Riak::Client.

Raises:

  • (ArgumentError)


41
42
43
44
45
46
# File 'lib/riak/client/http_backend.rb', line 41

def initialize(client, node)
  raise ArgumentError, t("client_type", :client => client) unless Client === client
  raise ArgumentError, t("node_type", :node => node) unless Node === node
  @client = client
  @node = node
end

Instance Attribute Details

- (Object) client (readonly)

The Riak::Client that uses this backend



33
34
35
# File 'lib/riak/client/http_backend.rb', line 33

def client
  @client
end

- (Object) node (readonly)

The Riak::Client::Node that uses this backend



36
37
38
# File 'lib/riak/client/http_backend.rb', line 36

def node
  @node
end

Instance Method Details

- (true, false) clear_bucket_props(bucket)

Note:

false will be returned if the operation is not supported on the connected node

Clears bucket properties



176
177
178
179
180
181
182
183
184
# File 'lib/riak/client/http_backend.rb', line 176

def clear_bucket_props(bucket)
  if http_props_clearable?
    bucket = bucket.name if Bucket === bucket
    delete(204, bucket_properties_path(bucket))
    true
  else
    false
  end
end

- (Object) delete_object(bucket, key, options = {})

Deletes an object



113
114
115
116
117
118
# File 'lib/riak/client/http_backend.rb', line 113

def delete_object(bucket, key, options={})
  bucket = bucket.name if Bucket === bucket
  vclock = options.delete(:vclock)
  headers = vclock ? {"X-Riak-VClock" => vclock} : {}
  delete([204, 404], object_path(bucket, key, options), headers)
end

- (RObject) fetch_object(bucket, key, options = {})

Fetches an object by bucket/key

Options Hash (options):

  • :r (Fixnum, String, Symbol)

    the read quorum for the request - how many nodes should concur on the read

  • :pr (Fixnum, String, Symbol)

    the “primary” read quorum for the request - how many primary partitions must be available



68
69
70
71
72
73
# File 'lib/riak/client/http_backend.rb', line 68

def fetch_object(bucket, key, options={})
  bucket = Bucket.new(client, bucket) if String === bucket
  method = options.delete(:head) ? :head : :get
  response = send(method, [200,300], object_path(bucket.name, key, options))
  load_object(RObject.new(bucket, key), response)
end

- (Hash) get_bucket_props(bucket)

Fetches bucket properties



155
156
157
158
159
# File 'lib/riak/client/http_backend.rb', line 155

def get_bucket_props(bucket)
  bucket = bucket.name if Bucket === bucket
  response = get(200, bucket_properties_path(bucket))
  JSON.parse(response[:body])['props']
end

- (Object) get_counter(bucket, key, options = {})

Fetches a counter



124
125
126
127
128
129
130
131
132
133
# File 'lib/riak/client/http_backend.rb', line 124

def get_counter(bucket, key, options={})
  bucket = bucket.name if bucket.is_a? Bucket
  response = get([200, 404], counter_path(bucket, key, options))
  case response[:code]
  when 200
    return response[:body].to_i
  when 404
    return 0
  end
end

- (Array<String>) get_index(bucket, index, query, options = {})

Performs a secondary-index query.



278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
# File 'lib/riak/client/http_backend.rb', line 278

def get_index(bucket, index, query, options={})
  bucket = bucket.name if Bucket === bucket
  path = case query
         when Range
           raise ArgumentError, t('invalid_index_query', :value => query.inspect) unless String === query.begin || Integer === query.end
           index_range_path(bucket, index, query.begin, query.end, options)
         when String, Integer
           index_eq_path(bucket, index, query, options)
         else
           raise ArgumentError, t('invalid_index_query', :value => query.inspect)
         end
  if block_given?
    parser = Riak::Util::Multipart::StreamParser.new do |response|
      result = JSON.parse response[:body]

      yield result['keys'] || result['results'] || []
    end
    get(200, path, &parser)
  else
    response = get(200, path)
    Riak::IndexCollection.new_from_json response[:body]
  end
end

Performs a link-walking query



256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
# File 'lib/riak/client/http_backend.rb', line 256

def link_walk(robject, walk_specs)
  response = get(200, link_walk_path(robject.bucket.name, robject.key, walk_specs))
  if boundary = Util::Multipart.extract_boundary(response[:headers]['content-type'].first)
    Util::Multipart.parse(response[:body], boundary).map do |group|
      group.map do |obj|
        if obj[:headers] && !obj[:headers]['x-riak-deleted'] && !obj[:body].blank? && obj[:headers]['location']
          link = Riak::Link.new(obj[:headers]['location'].first, "")
          load_object(RObject.new(client.bucket(link.bucket), link.key), obj)
        end
      end.compact
    end
  else
    []
  end
end

- (Array<String>) list_buckets(options = {}, &block)

Lists known buckets



207
208
209
210
211
212
213
214
215
# File 'lib/riak/client/http_backend.rb', line 207

def list_buckets(options = {}, &block)
  if block_given?
    get(200, bucket_list_path(options.merge(stream: true)), &BucketStreamer.new(block))
    return
  end

  response = get(200, bucket_list_path)
  JSON.parse(response[:body])['buckets']
end

- (Array<String>) list_keys(bucket, options = {}) {|Array<String>| ... }

List keys in a bucket

Yields:

  • (Array<String>)

    a list of keys from the current streamed chunk



192
193
194
195
196
197
198
199
200
201
202
203
# File 'lib/riak/client/http_backend.rb', line 192

def list_keys(bucket, options={}, &block)
  bucket = bucket.name if Bucket === bucket
  if block_given?
    stream_opts = options.merge keys: 'stream'
    get(200, key_list_path(bucket, stream_opts), {}, &KeyStreamer.new(block))
  else
    list_opts = options.merge keys: true
    response = get(200, key_list_path(bucket, list_opts))
    obj = JSON.parse(response[:body])
    obj && obj['keys'].map {|k| unescape(k) }
  end
end

- (Array<Object>) mapred(mr) {|Fixnum, Object| ... }

Performs a MapReduce query.

Yields:

  • (Fixnum, Object)

    the phase number and single result from the phase

Raises:



223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
# File 'lib/riak/client/http_backend.rb', line 223

def mapred(mr)
  raise MapReduceError.new(t("empty_map_reduce_query")) if mr.query.empty? && !mapred_phaseless?
  if block_given?
    parser = Riak::Util::Multipart::StreamParser.new do |response|
      result = JSON.parse(response[:body])
      yield result['phase'], result['data']
    end
    post(200, mapred_path({:chunked => true}), mr.to_json, {"Content-Type" => "application/json", "Accept" => "application/json"}, &parser)
    nil
  else
    results = MapReduce::Results.new(mr)
    parser = Riak::Util::Multipart::StreamParser.new do |response|
      result = JSON.parse(response[:body])
      results.add result['phase'], result['data']
    end
    post(200, mapred_path({:chunked => true}), mr.to_json, {"Content-Type" => "application/json", "Accept" => "application/json"}, &parser)
    results.report
  end
end

- (true, false) ping

Pings the server



50
51
52
53
54
55
# File 'lib/riak/client/http_backend.rb', line 50

def ping
  get(200, ping_path)
  true
rescue
  false
end

- (Object) post_counter(bucket, key, amount, options = {})

Updates a counter



140
141
142
143
144
145
146
147
148
149
150
# File 'lib/riak/client/http_backend.rb', line 140

def post_counter(bucket, key, amount, options={})
  bucket = bucket.name if bucket.is_a? Bucket
  response = post([200, 204], counter_path(bucket, key, options), amount.to_s)
  case response[:code]
  when 200
    return response[:body].to_i
  when 204
    return 0 if options[:return_value]
    return nil
  end
end

- (Object) reload_object(robject, options = {})

Reloads the data for a given RObject, a special case of #fetch_object.



76
77
78
79
80
81
82
83
# File 'lib/riak/client/http_backend.rb', line 76

def reload_object(robject, options={})
  response = get([200,300,304], object_path(robject.bucket.name, robject.key, options), reload_headers(robject))
  if response[:code].to_i == 304
    robject
  else
    load_object(robject, response)
  end
end

- (Object) search(index, query, options = {})

(Riak Search) Performs a search query



308
309
310
311
312
313
314
315
# File 'lib/riak/client/http_backend.rb', line 308

def search(index, query, options={})
  response = get(200, solr_select_path(index, query, options.stringify_keys))
  if response[:headers]['content-type'].include?("application/json")
    normalize_search_response JSON.parse(response[:body])
  else
    response[:body]
  end
end

- (Object) set_bucket_props(bucket, props)

Sets bucket properties



164
165
166
167
168
# File 'lib/riak/client/http_backend.rb', line 164

def set_bucket_props(bucket, props)
  bucket = bucket.name if Bucket === bucket
  body = {'props' => props}.to_json
  put(204, bucket_properties_path(bucket), body, {"Content-Type" => "application/json"})
end

- (Hash) stats

Gets health statistics



245
246
247
248
# File 'lib/riak/client/http_backend.rb', line 245

def stats
  response = get(200, stats_path)
  JSON.parse(response[:body])
end

- (Object) store_object(robject, options = {})

Stores an object

Options Hash (options):

  • :returnbody (true, false) — default: false

    whether to update the object after write with the new value

  • :w (Fixnum, String, Symbol)

    the write quorum

  • :pw (Fixnum, String, Symbol)

    the “primary” write quorum - how many primary partitions must be available

  • :dw (Fixnum, String, Symbol)

    the durable write quorum



94
95
96
97
98
99
100
101
102
# File 'lib/riak/client/http_backend.rb', line 94

def store_object(robject, options={})
  method, codes = if robject.key.present?
                    [:put, [200,204,300]]
                  else
                    [:post, 201]
                  end
  response = send(method, codes, object_path(robject.bucket.name, robject.key, options), robject.raw_data, store_headers(robject))
  load_object(robject, response) if options[:returnbody]
end

- (Object) update_search_index(index, updates)

(Riak Search) Updates a search index (includes deletes).

See Also:



322
323
324
# File 'lib/riak/client/http_backend.rb', line 322

def update_search_index(index, updates)
  post(200, solr_update_path(index), updates, {'Content-Type' => 'text/xml'})
end