Class: PredictionIO::Connection

Inherits:
Object
  • Object
show all
Defined in:
lib/predictionio/connection.rb

Overview

This class handles multithreading and asynchronous requests transparently for the REST client.

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(uri, threads = 1, timeout = 60) ⇒ Connection

Spawns a number of threads with persistent HTTP connection to the specified URI. Sets a default timeout of 60 seconds.


32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
# File 'lib/predictionio/connection.rb', line 32

def initialize(uri, threads = 1, timeout = 60)
  @packages = Queue.new
  @counter_lock = Mutex.new
  @connections = 0
  @timeout = timeout
  threads.times do
    Thread.new do
      begin
        Net::HTTP.start(uri.host, uri.port, use_ssl: uri.scheme == 'https') do |http|
          @counter_lock.synchronize do
            @connections += 1
          end
          catch(:exit) do
            http.read_timeout = @timeout
            loop do
              package = @packages.pop
              request = package[:request]
              response = package[:response]
              case package[:method]
              when 'get'
                http_req = Net::HTTP::Get.new("#{uri.path}#{request.qpath}")
                begin
                  response.set(http.request(http_req))
                rescue Exception => details
                  response.set(details)
                end
              when 'post'
                if request.params.is_a?(Hash)
                  http_req = Net::HTTP::Post.new("#{uri.path}#{request.path}")
                  http_req.set_form_data(request.params)
                else
                  http_req = Net::HTTP::Post.new("#{uri.path}#{request.path}", initheader = { 'Content-Type' => 'application/json; charset=utf-8' })
                  http_req.body = request.params
                end
                begin
                  response.set(http.request(http_req))
                rescue Exception => details
                  response.set(details)
                end
              when 'delete'
                http_req = Net::HTTP::Delete.new("#{uri.path}#{request.qpath}")
                begin
                  response.set(http.request(http_req))
                rescue Exception => details
                  response.set(details)
                end
              when 'exit'
                @counter_lock.synchronize do
                  @connections -= 1
                end
                throw :exit
              end
            end
          end
        end
      rescue Exception => detail
        @counter_lock.synchronize do
          if @connections == 0 then
            # Use non-blocking pop to avoid dead-locking the current
            # thread when there is no request, and give it a chance to re-connect.
            begin
              package = @packages.pop(true)
              response = package[:response]
              response.set(detail)
            rescue Exception
            end
          end
        end
        sleep(1)
        retry
      end
    end
  end
end

Instance Attribute Details

#connectionsObject (readonly)

Number of connections active


25
26
27
# File 'lib/predictionio/connection.rb', line 25

def connections
  @connections
end

#packagesObject (readonly)

Number of pending asynchronous request and response packages.


22
23
24
# File 'lib/predictionio/connection.rb', line 22

def packages
  @packages
end

#timeoutObject (readonly)

Timeout in seconds


28
29
30
# File 'lib/predictionio/connection.rb', line 28

def timeout
  @timeout
end

Instance Method Details

#adelete(areq) ⇒ Object

Shortcut to create an asynchronous DELETE request with the response object returned.


125
126
127
# File 'lib/predictionio/connection.rb', line 125

def adelete(areq)
  request('delete', areq)
end

#aget(areq) ⇒ Object

Shortcut to create an asynchronous GET request with the response object returned.


115
116
117
# File 'lib/predictionio/connection.rb', line 115

def aget(areq)
  request('get', areq)
end

#apost(areq) ⇒ Object

Shortcut to create an asynchronous POST request with the response object returned.


120
121
122
# File 'lib/predictionio/connection.rb', line 120

def apost(areq)
  request('post', areq)
end

#request(method, request) ⇒ Object

Create an asynchronous request and response package, put it in the pending queue, and return the response object.


108
109
110
111
112
# File 'lib/predictionio/connection.rb', line 108

def request(method, request)
  response = AsyncResponse.new(request)
  @packages.push(method: method, request: request, response: response)
  response
end