Module: Net::HTTP::Pipeline
- Included in:
- Net::HTTP
- Defined in:
- lib/net/http/pipeline.rb
Overview
An HTTP/1.1 pipelining implementation atop Net::HTTP. This library is compliant with RFC 2616 8.1.2.2.
Pipeline allows you to create a bunch of requests then send them all to an HTTP/1.1 server without waiting for responses. The server will return HTTP responses in-order.
Net::HTTP::Pipeline does not assume the server supports pipelining. If you know the server supports pipelining you can set Net::HTTP#pipelining to true.
Example
require 'net/http/pipeline'
Net::HTTP.start 'localhost' do |http|
requests = []
requests << Net::HTTP::Get.new('/')
requests << Net::HTTP::Get.new('/')
requests << Net::HTTP::Get.new('/')
http.pipeline requests do |res|
puts res.code
puts res.body[0..60].inspect
puts
end
end
Defined Under Namespace
Classes: Error, PersistenceError, PipelineError, ResponseError, VersionError
Constant Summary collapse
- VERSION =
The version of net-http-pipeline you are using
'1.0.1'
Instance Attribute Summary collapse
-
#pipelining ⇒ Object
Pipelining capability accessor.
Instance Method Summary collapse
-
#idempotent?(req) ⇒ Boolean
Is
reqidempotent according to RFC 2616?. -
#pipeline(requests, &block) ⇒ Object
Pipelines
requeststo the HTTP server yielding responses if a block is given. -
#pipeline_check(requests, responses) {|res| ... } ⇒ Object
Ensures the connection supports pipelining.
-
#pipeline_end_transport(res) ⇒ Object
Updates the HTTP version and ensures the connection has keep-alive.
-
#pipeline_finish ⇒ Object
Closes the connection and rescues any IOErrors this may cause.
-
#pipeline_keep_alive?(res) ⇒ Boolean
Checks for an connection close header.
-
#pipeline_receive(in_flight, responses) ⇒ Object
Receives HTTP responses for
in_flightrequests and adds them toresponses. -
#pipeline_reset(requests, responses) ⇒ Object
Resets this connection.
-
#pipeline_send(requests) ⇒ Object
Sends
requeststo the HTTP server and removes them from therequestslist.
Instance Attribute Details
#pipelining ⇒ Object
Pipelining capability accessor.
Pipeline assumes servers do not support pipelining by default. The first request is not pipelined while Pipeline ensures that the server is HTTP/1.1 or newer and defaults to persistent connections.
If you know the server is HTTP/1.1 and defaults to persistent connections you can set this to true when you create the Net::HTTP object.
141 142 143 |
# File 'lib/net/http/pipeline.rb', line 141 def pipelining @pipelining end |
Instance Method Details
#idempotent?(req) ⇒ Boolean
Is req idempotent according to RFC 2616?
146 147 148 149 150 151 152 |
# File 'lib/net/http/pipeline.rb', line 146 def idempotent? req case req when Net::HTTP::Delete, Net::HTTP::Get, Net::HTTP::Head, Net::HTTP::Options, Net::HTTP::Put, Net::HTTP::Trace then true end end |
#pipeline(requests, &block) ⇒ Object
Pipelines requests to the HTTP server yielding responses if a block is given. Returns all responses recieved.
The Net::HTTP connection must be started before calling #pipeline.
Raises an exception if the connection is not pipeline-capable or if the HTTP session has not been started.
163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 |
# File 'lib/net/http/pipeline.rb', line 163 def pipeline requests, &block # :yields: response responses = [] raise Error.new('Net::HTTP not started', requests, responses) unless started? raise VersionError.new(requests, responses) if '1.1' > @curr_http_version pipeline_check requests, responses, &block retried = responses.length until requests.empty? do begin in_flight = pipeline_send requests pipeline_receive in_flight, responses, &block rescue Net::HTTP::Pipeline::ResponseError => e e.requests.reverse_each do |request| requests.unshift request end raise if responses.length == retried or not idempotent? requests.first retried = responses.length pipeline_reset requests, responses retry end end responses end |
#pipeline_check(requests, responses) {|res| ... } ⇒ Object
Ensures the connection supports pipelining.
If the server has not been tested for pipelining support one of the requests will be consumed and placed in responses.
A VersionError will be raised if the server is not HTTP/1.1 or newer.
A PersistenceError will be raised if the server does not support persistent connections.
A PipelineError will be raised if the it was previously determined that the server does not support pipelining.
212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 |
# File 'lib/net/http/pipeline.rb', line 212 def pipeline_check requests, responses if instance_variable_defined? :@pipelining then return if @pipelining raise PipelineError.new(requests, responses) unless @pipelining else @pipelining = false end req = requests.shift retried = false begin res = request req rescue Timeout::Error, EOFError, Errno::ECONNABORTED, Errno::ECONNRESET, Errno::EPIPE, Net::HTTPBadResponse, IOError => e if retried then requests.unshift req raise ResponseError.new(e, requests, responses) end retried = true pipeline_reset requests, responses retry end responses << res yield res if block_given? @pipelining = pipeline_keep_alive? res if '1.1' > @curr_http_version then @pipelining = false raise VersionError.new(requests, responses) elsif not @pipelining then raise PersistenceError.new(requests, responses) end end |
#pipeline_end_transport(res) ⇒ Object
Updates the HTTP version and ensures the connection has keep-alive.
256 257 258 259 260 261 262 263 264 265 266 267 |
# File 'lib/net/http/pipeline.rb', line 256 def pipeline_end_transport res @curr_http_version = res.http_version if @socket.closed? then D 'Conn socket closed on pipeline' elsif pipeline_keep_alive? res then D 'Conn pipeline keep-alive' else D 'Conn close on pipeline' @socket.close end end |
#pipeline_finish ⇒ Object
Closes the connection and rescues any IOErrors this may cause
272 273 274 275 |
# File 'lib/net/http/pipeline.rb', line 272 def pipeline_finish finish rescue IOError end |
#pipeline_keep_alive?(res) ⇒ Boolean
Checks for an connection close header
281 282 283 |
# File 'lib/net/http/pipeline.rb', line 281 def pipeline_keep_alive? res not res.connection_close? end |
#pipeline_receive(in_flight, responses) ⇒ Object
Receives HTTP responses for in_flight requests and adds them to responses
294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 |
# File 'lib/net/http/pipeline.rb', line 294 def pipeline_receive in_flight, responses while req = in_flight.shift do begin begin res = Net::HTTPResponse.read_new @socket end while res.kind_of? Net::HTTPContinue res.reading_body @socket, req.response_body_permitted? do responses << res yield res if block_given? end pipeline_end_transport res rescue StandardError, Timeout::Error in_flight.unshift req raise end end responses rescue Timeout::Error, EOFError, Errno::ECONNABORTED, Errno::ECONNRESET, Errno::EPIPE, Net::HTTPBadResponse, IOError => e pipeline_finish raise ResponseError.new(e, in_flight, responses) end |
#pipeline_reset(requests, responses) ⇒ Object
Resets this connection
324 325 326 327 328 329 330 331 332 333 |
# File 'lib/net/http/pipeline.rb', line 324 def pipeline_reset requests, responses pipeline_finish start rescue Errno::ECONNREFUSED raise Error.new("connection refused: #{address}:#{port}", requests, responses) rescue Errno::EHOSTDOWN raise Error.new("host down: #{address}:#{port}", requests, responses) end |
#pipeline_send(requests) ⇒ Object
Sends requests to the HTTP server and removes them from the requests list. Returns the requests that have been pipelined and are in-flight.
If a non-idempotent request is first in requests it will be sent and no further requests will be pipelined.
If a non-idempotent request is encountered after an idempotent request it will not be sent.
345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 |
# File 'lib/net/http/pipeline.rb', line 345 def pipeline_send requests in_flight = [] while req = requests.shift do idempotent = idempotent? req unless idempotent or in_flight.empty? then requests.unshift req break end begin_transport req req.exec @socket, @curr_http_version, edit_path(req.path) in_flight << req break unless idempotent end in_flight end |