Module: Net::HTTP::Pipeline

Included in:
Net::HTTP
Defined in:
lib/net/http/pipeline.rb

Overview

An HTTP/1.1 pipelining implementation atop Net::HTTP. This library is compliant with RFC 2616 8.1.2.2.

Pipeline allows you to create a bunch of requests then send them all to an HTTP/1.1 server without waiting for responses. The server will return HTTP responses in-order.

Net::HTTP::Pipeline does not assume the server supports pipelining. If you know the server supports pipelining you can set Net::HTTP#pipelining to true.

Example

require 'net/http/pipeline'

Net::HTTP.start 'localhost' do |http|
  requests = []
  requests << Net::HTTP::Get.new('/')
  requests << Net::HTTP::Get.new('/')
  requests << Net::HTTP::Get.new('/')

  http.pipeline requests do |res|
    puts res.code
    puts res.body[0..60].inspect
    puts
  end
end

Defined Under Namespace

Classes: Error, PersistenceError, PipelineError, ResponseError, VersionError

Constant Summary collapse

VERSION =

The version of net-http-pipeline you are using

'1.0.1'

Instance Attribute Summary collapse

Instance Method Summary collapse

Instance Attribute Details

#pipeliningObject

Pipelining capability accessor.

Pipeline assumes servers do not support pipelining by default. The first request is not pipelined while Pipeline ensures that the server is HTTP/1.1 or newer and defaults to persistent connections.

If you know the server is HTTP/1.1 and defaults to persistent connections you can set this to true when you create the Net::HTTP object.



141
142
143
# File 'lib/net/http/pipeline.rb', line 141

def pipelining
  @pipelining
end

Instance Method Details

#idempotent?(req) ⇒ Boolean

Is req idempotent according to RFC 2616?



146
147
148
149
150
151
152
# File 'lib/net/http/pipeline.rb', line 146

def idempotent? req
  case req
  when Net::HTTP::Delete, Net::HTTP::Get, Net::HTTP::Head,
       Net::HTTP::Options, Net::HTTP::Put, Net::HTTP::Trace then
    true
  end
end

#pipeline(requests, &block) ⇒ Object

Pipelines requests to the HTTP server yielding responses if a block is given. Returns all responses recieved.

The Net::HTTP connection must be started before calling #pipeline.

Raises an exception if the connection is not pipeline-capable or if the HTTP session has not been started.

Raises:



163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
# File 'lib/net/http/pipeline.rb', line 163

def pipeline requests, &block # :yields: response
  responses = []

  raise Error.new('Net::HTTP not started', requests, responses) unless
    started?

  raise VersionError.new(requests, responses) if '1.1' > @curr_http_version

  pipeline_check requests, responses, &block

  retried = responses.length

  until requests.empty? do
    begin
      in_flight = pipeline_send requests

      pipeline_receive in_flight, responses, &block
    rescue Net::HTTP::Pipeline::ResponseError => e
      e.requests.reverse_each do |request|
        requests.unshift request
      end

      raise if responses.length == retried or not idempotent? requests.first

      retried = responses.length

      pipeline_reset requests, responses

      retry
    end
  end

  responses
end

#pipeline_check(requests, responses) {|res| ... } ⇒ Object

Ensures the connection supports pipelining.

If the server has not been tested for pipelining support one of the requests will be consumed and placed in responses.

A VersionError will be raised if the server is not HTTP/1.1 or newer.

A PersistenceError will be raised if the server does not support persistent connections.

A PipelineError will be raised if the it was previously determined that the server does not support pipelining.

Yields:

  • (res)


212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
# File 'lib/net/http/pipeline.rb', line 212

def pipeline_check requests, responses
  if instance_variable_defined? :@pipelining then
    return if @pipelining
    raise PipelineError.new(requests, responses) unless @pipelining
  else
    @pipelining = false
  end

  req = requests.shift
  retried = false

  begin
    res = request req
  rescue Timeout::Error, EOFError, Errno::ECONNABORTED, Errno::ECONNRESET,
         Errno::EPIPE, Net::HTTPBadResponse, IOError => e
    if retried then
      requests.unshift req
      raise ResponseError.new(e, requests, responses)
    end

    retried = true

    pipeline_reset requests, responses

    retry
  end

  responses << res

  yield res if block_given?

  @pipelining = pipeline_keep_alive? res

  if '1.1' > @curr_http_version then
    @pipelining = false
    raise VersionError.new(requests, responses)
  elsif not @pipelining then
    raise PersistenceError.new(requests, responses)
  end
end

#pipeline_end_transport(res) ⇒ Object

Updates the HTTP version and ensures the connection has keep-alive.



256
257
258
259
260
261
262
263
264
265
266
267
# File 'lib/net/http/pipeline.rb', line 256

def pipeline_end_transport res
  @curr_http_version = res.http_version

  if @socket.closed? then
    D 'Conn socket closed on pipeline'
  elsif pipeline_keep_alive? res then
    D 'Conn pipeline keep-alive'
  else
    D 'Conn close on pipeline'
    @socket.close
  end
end

#pipeline_finishObject

Closes the connection and rescues any IOErrors this may cause



272
273
274
275
# File 'lib/net/http/pipeline.rb', line 272

def pipeline_finish
  finish
rescue IOError
end

#pipeline_keep_alive?(res) ⇒ Boolean

Checks for an connection close header



281
282
283
# File 'lib/net/http/pipeline.rb', line 281

def pipeline_keep_alive? res
  not res.connection_close?
end

#pipeline_receive(in_flight, responses) ⇒ Object

Receives HTTP responses for in_flight requests and adds them to responses



294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
# File 'lib/net/http/pipeline.rb', line 294

def pipeline_receive in_flight, responses
  while req = in_flight.shift do
    begin
      begin
        res = Net::HTTPResponse.read_new @socket
      end while res.kind_of? Net::HTTPContinue

      res.reading_body @socket, req.response_body_permitted? do
        responses << res
        yield res if block_given?
      end

      pipeline_end_transport res
    rescue StandardError, Timeout::Error
      in_flight.unshift req
      raise
    end
  end

  responses
rescue Timeout::Error, EOFError, Errno::ECONNABORTED, Errno::ECONNRESET,
       Errno::EPIPE, Net::HTTPBadResponse, IOError => e
  pipeline_finish

  raise ResponseError.new(e, in_flight, responses)
end

#pipeline_reset(requests, responses) ⇒ Object

Resets this connection



324
325
326
327
328
329
330
331
332
333
# File 'lib/net/http/pipeline.rb', line 324

def pipeline_reset requests, responses
  pipeline_finish

  start
rescue Errno::ECONNREFUSED
  raise Error.new("connection refused: #{address}:#{port}", requests,
                  responses)
rescue Errno::EHOSTDOWN
  raise Error.new("host down: #{address}:#{port}", requests, responses)
end

#pipeline_send(requests) ⇒ Object

Sends requests to the HTTP server and removes them from the requests list. Returns the requests that have been pipelined and are in-flight.

If a non-idempotent request is first in requests it will be sent and no further requests will be pipelined.

If a non-idempotent request is encountered after an idempotent request it will not be sent.



345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
# File 'lib/net/http/pipeline.rb', line 345

def pipeline_send requests
  in_flight = []

  while req = requests.shift do
    idempotent = idempotent? req

    unless idempotent or in_flight.empty? then
      requests.unshift req
      break
    end

    begin_transport req
    req.exec @socket, @curr_http_version, edit_path(req.path)
    in_flight << req

    break unless idempotent
  end

  in_flight
end