Module: NATSD::Connection

Included in:
Route
Defined in:
lib/nats/server/connection.rb

Overview

:nodoc: all

Instance Attribute Summary collapse

Instance Method Summary collapse

Instance Attribute Details

#cidObject (readonly)

Returns the value of attribute cid


6
7
8
# File 'lib/nats/server/connection.rb', line 6

def cid
  @cid
end

#closingObject (readonly) Also known as: closing?

Returns the value of attribute closing


6
7
8
# File 'lib/nats/server/connection.rb', line 6

def closing
  @closing
end

#in_bytesObject

Returns the value of attribute in_bytes


5
6
7
# File 'lib/nats/server/connection.rb', line 5

def in_bytes
  @in_bytes
end

#in_msgsObject

Returns the value of attribute in_msgs


5
6
7
# File 'lib/nats/server/connection.rb', line 5

def in_msgs
  @in_msgs
end

#last_activityObject (readonly)

Returns the value of attribute last_activity


6
7
8
# File 'lib/nats/server/connection.rb', line 6

def last_activity
  @last_activity
end

#out_bytesObject

Returns the value of attribute out_bytes


5
6
7
# File 'lib/nats/server/connection.rb', line 5

def out_bytes
  @out_bytes
end

#out_msgsObject

Returns the value of attribute out_msgs


5
6
7
# File 'lib/nats/server/connection.rb', line 5

def out_msgs
  @out_msgs
end

#subscriptionsObject (readonly)

Returns the value of attribute subscriptions


6
7
8
# File 'lib/nats/server/connection.rb', line 6

def subscriptions
  @subscriptions
end

#writev_sizeObject (readonly)

Returns the value of attribute writev_size


6
7
8
# File 'lib/nats/server/connection.rb', line 6

def writev_size
  @writev_size
end

Instance Method Details

#auth_ok?(user, pass) ⇒ Boolean


203
204
205
# File 'lib/nats/server/connection.rb', line 203

def auth_ok?(user, pass)
  Server.auth_ok?(user, pass)
end

#client_infoObject


22
23
24
# File 'lib/nats/server/connection.rb', line 22

def client_info
  @client_info ||= (get_peername.nil? ? 'N/A' : Socket.unpack_sockaddr_in(get_peername))
end

#connect_auth_timeoutObject


80
81
82
83
# File 'lib/nats/server/connection.rb', line 80

def connect_auth_timeout
  error_close AUTH_REQUIRED
  debug "#{type} connection timeout due to lack of auth credentials", cid
end

#connect_ssl_timeoutObject


85
86
87
88
# File 'lib/nats/server/connection.rb', line 85

def connect_ssl_timeout
  error_close SSL_REQUIRED
  debug "#{type} connection timeout due to lack of TLS/SSL negotiations", cid
end

#ctrace(*args) ⇒ Object


285
286
287
# File 'lib/nats/server/connection.rb', line 285

def ctrace(*args)
  trace(args, "c: #{cid}")
end

#debug_print_controlline_too_big(line_size) ⇒ Object


236
237
238
239
# File 'lib/nats/server/connection.rb', line 236

def debug_print_controlline_too_big(line_size)
  sizes = "#{pretty_size(line_size)} vs #{pretty_size(NATSD::Server.max_control_line)} max"
  debug "Control line size exceeded (#{sizes}), closing connection.."
end

#debug_print_msg_too_big(msg_size) ⇒ Object


241
242
243
244
# File 'lib/nats/server/connection.rb', line 241

def debug_print_msg_too_big(msg_size)
  sizes = "#{pretty_size(msg_size)} vs #{pretty_size(NATSD::Server.max_payload)} max"
  debug "Message payload size exceeded (#{sizes}), closing connection"
end

#dec_connectionsObject


251
252
253
254
# File 'lib/nats/server/connection.rb', line 251

def dec_connections
  Server.num_connections -= 1
  Server.connections.delete(cid)
end

#delete_subscriber(sub) ⇒ Object


223
224
225
226
227
# File 'lib/nats/server/connection.rb', line 223

def delete_subscriber(sub)
  ctrace('DELSUB OP', sub.subject, sub.qgroup, sub.sid) if NATSD::Server.trace_flag?
  Server.unsubscribe(sub, is_route?)
  @subscriptions.delete(sub.sid)
end

#error_close(msg) ⇒ Object


229
230
231
232
233
234
# File 'lib/nats/server/connection.rb', line 229

def error_close(msg)
  queue_data(msg)
  flush_data
  EM.next_tick { close_connection_after_writing }
  @closing = true
end

#flush_dataObject


9
10
11
12
13
# File 'lib/nats/server/connection.rb', line 9

def flush_data
  return if @writev.nil? || closing?
  send_data(@writev.join)
  @writev, @writev_size = nil, 0
end

#inc_connectionsObject


246
247
248
249
# File 'lib/nats/server/connection.rb', line 246

def inc_connections
  Server.num_connections += 1
  Server.connections[cid] = self
end

#infoObject


26
27
28
29
30
31
32
33
34
35
36
37
38
# File 'lib/nats/server/connection.rb', line 26

def info
  {
    :cid => cid,
    :ip => client_info[1],
    :port => client_info[0],
    :subscriptions => @subscriptions.size,
    :pending_size => get_outbound_data_size,
    :in_msgs => @in_msgs,
    :out_msgs => @out_msgs,
    :in_bytes => @in_bytes,
    :out_bytes => @out_bytes
  }
end

#is_route?Boolean


293
294
295
# File 'lib/nats/server/connection.rb', line 293

def is_route?
  false
end

#max_connections_exceeded?Boolean


40
41
42
43
44
45
# File 'lib/nats/server/connection.rb', line 40

def max_connections_exceeded?
  return false unless (Server.num_connections > Server.max_connections)
  error_close MAX_CONNS_EXCEEDED
  debug "Maximum #{Server.max_connections} connections exceeded, c:#{cid} will be closed"
  true
end

#post_initObject


47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
# File 'lib/nats/server/connection.rb', line 47

def post_init
  @cid = Server.cid
  @subscriptions = {}
  @verbose = @pedantic = true # suppressed by most clients, but allows friendly telnet
  @in_msgs = @out_msgs = @in_bytes = @out_bytes = 0
  @writev_size = 0
  @parse_state = AWAITING_CONTROL_LINE
  send_info
  debug "#{type} connection created", client_info, cid
  if Server.ssl_required?
    debug "Starting TLS/SSL", client_info, cid
    flush_data
    @ssl_pending = EM.add_timer(NATSD::Server.ssl_timeout) { connect_ssl_timeout }
    start_tls(:verify_peer => true) if Server.ssl_required?
  end
  @auth_pending = EM.add_timer(NATSD::Server.auth_timeout) { connect_auth_timeout } if Server.auth_required?
  @ping_timer = EM.add_periodic_timer(NATSD::Server.ping_interval) { send_ping }
  @pings_outstanding = 0
  inc_connections
  return if max_connections_exceeded?
end

#process_connect_config(config) ⇒ Object


207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
# File 'lib/nats/server/connection.rb', line 207

def process_connect_config(config)
  @verbose  = config['verbose'] unless config['verbose'].nil?
  @pedantic = config['pedantic'] unless config['pedantic'].nil?

  return queue_data(OK) unless Server.auth_required?

  EM.cancel_timer(@auth_pending)
  if auth_ok?(config['user'], config['pass'])
    queue_data(OK) if @verbose
    @auth_pending = nil
  else
    error_close AUTH_FAILED
    debug "Authorization failed for #{type.downcase} connection", cid
  end
end

#process_info(info) ⇒ Object

Placeholder


200
201
# File 'lib/nats/server/connection.rb', line 200

def process_info(info)
end

#process_unbindObject


256
257
258
259
260
261
262
263
264
265
266
# File 'lib/nats/server/connection.rb', line 256

def process_unbind
  dec_connections
  EM.cancel_timer(@ssl_pending) if @ssl_pending
  @ssl_pending = nil
  EM.cancel_timer(@auth_pending) if @auth_pending
  @auth_pending = nil
  EM.cancel_timer(@ping_timer) if @ping_timer
  @ping_timer = nil
  @subscriptions.each_value { |sub| Server.unsubscribe(sub) }
  @closing = true
end

#queue_data(data) ⇒ Object


15
16
17
18
19
20
# File 'lib/nats/server/connection.rb', line 15

def queue_data(data)
  EM.next_tick { flush_data } if @writev.nil?
  (@writev ||= []) << data
  @writev_size += data.bytesize
  flush_data if @writev_size > MAX_WRITEV_SIZE
end

#receive_data(data) ⇒ Object


90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
# File 'lib/nats/server/connection.rb', line 90

def receive_data(data)
  @buf = @buf ? @buf << data : data

  while (@buf && !@closing)
    case @parse_state
    when AWAITING_CONTROL_LINE
      case @buf
      when PUB_OP
        ctrace('PUB OP', strip_op($&)) if NATSD::Server.trace_flag?
        return connect_auth_timeout if @auth_pending
        @buf = $'
        @parse_state = AWAITING_MSG_PAYLOAD
        @msg_sub, @msg_reply, @msg_size = $1, $3, $4.to_i
        if (@msg_size > NATSD::Server.max_payload)
          debug_print_msg_too_big(@msg_size)
          error_close PAYLOAD_TOO_BIG
        end
        queue_data(INVALID_SUBJECT) if (@pedantic && !(@msg_sub =~ SUB_NO_WC))
      when SUB_OP
        ctrace('SUB OP', strip_op($&)) if NATSD::Server.trace_flag?
        return connect_auth_timeout if @auth_pending
        @buf = $'
        sub, qgroup, sid = $1, $3, $4
        return queue_data(INVALID_SUBJECT) if !($1 =~ SUB)
        return queue_data(INVALID_SID_TAKEN) if @subscriptions[sid]
        sub = Subscriber.new(self, sub, sid, qgroup, 0)
        @subscriptions[sid] = sub
        Server.subscribe(sub)
        queue_data(OK) if @verbose
      when UNSUB_OP
        ctrace('UNSUB OP', strip_op($&)) if NATSD::Server.trace_flag?
        return connect_auth_timeout if @auth_pending
        @buf = $'
        sid, sub = $1, @subscriptions[$1]
        if sub
          # If we have set max_responses, we will unsubscribe once we have received
          # the appropriate amount of responses.
          sub.max_responses = ($2 && $3) ? $3.to_i : nil
          delete_subscriber(sub) unless (sub.max_responses && (sub.num_responses < sub.max_responses))
          queue_data(OK) if @verbose
        else
          queue_data(INVALID_SID_NOEXIST) if @pedantic
        end
      when PING
        ctrace('PING OP') if NATSD::Server.trace_flag?
        @buf = $'
        queue_data(PONG_RESPONSE)
        flush_data
      when PONG
        ctrace('PONG OP') if NATSD::Server.trace_flag?
        @buf = $'
        @pings_outstanding -= 1
      when CONNECT
        ctrace('CONNECT OP', strip_op($&)) if NATSD::Server.trace_flag?
        @buf = $'
        begin
          config = JSON.parse($1)
          process_connect_config(config)
        rescue => e
          queue_data(INVALID_CONFIG)
          log_error
        end
      when INFO_REQ
        ctrace('INFO_REQUEST OP') if NATSD::Server.trace_flag?
        return connect_auth_timeout if @auth_pending
        @buf = $'
        send_info
      when UNKNOWN
        ctrace('Unknown Op', strip_op($&)) if NATSD::Server.trace_flag?
        return connect_auth_timeout if @auth_pending
        @buf = $'
        queue_data(UNKNOWN_OP)
      when CTRL_C # ctrl+c or ctrl+d for telnet friendly
        ctrace('CTRL-C encountered', strip_op($&)) if NATSD::Server.trace_flag?
        return close_connection
      when CTRL_D # ctrl+d for telnet friendly
        ctrace('CTRL-D encountered', strip_op($&)) if NATSD::Server.trace_flag?
        return close_connection
      else
        # If we are here we do not have a complete line yet that we understand.
        # If too big, cut the connection off.
        if @buf.bytesize > NATSD::Server.max_control_line
          debug_print_controlline_too_big(@buf.bytesize)
          close_connection
        end
        return
      end
      @buf = nil if (@buf && @buf.empty?)

    when AWAITING_MSG_PAYLOAD
      return unless (@buf.bytesize >= (@msg_size + CR_LF_SIZE))
      msg = @buf.slice(0, @msg_size)
      ctrace('Processing msg', @msg_sub, @msg_reply, msg) if NATSD::Server.trace_flag?
      queue_data(OK) if @verbose
      Server.route_to_subscribers(@msg_sub, @msg_reply, msg)
      @in_msgs += 1
      @in_bytes += @msg_size
      @buf = @buf.slice((@msg_size + CR_LF_SIZE), @buf.bytesize)
      @msg_sub = @msg_size = @reply = nil
      @parse_state = AWAITING_CONTROL_LINE
      @buf = nil if (@buf && @buf.empty?)
    end
  end
end

#send_infoObject


195
196
197
# File 'lib/nats/server/connection.rb', line 195

def send_info
  queue_data("INFO #{Server.info_string}#{CR_LF}")
end

#send_pingObject


69
70
71
72
73
74
75
76
77
78
# File 'lib/nats/server/connection.rb', line 69

def send_ping
  return if @closing
  if @pings_outstanding > NATSD::Server.ping_max
    error_close UNRESPONSIVE
    return
  end
  queue_data(PING_RESPONSE)
  flush_data
  @pings_outstanding += 1
end

#ssl_handshake_completedObject


273
274
275
276
277
278
# File 'lib/nats/server/connection.rb', line 273

def ssl_handshake_completed
  EM.cancel_timer(@ssl_pending)
  @ssl_pending = nil
  cert = get_peer_cert
  debug "#{type} Certificate:", cert ? cert : 'N/A', cid
end

#ssl_verify_peer(cert) ⇒ Object

FIXME! Cert accepted by default


281
282
283
# File 'lib/nats/server/connection.rb', line 281

def ssl_verify_peer(cert)
  true
end

#strip_op(op = '') ⇒ Object


289
290
291
# File 'lib/nats/server/connection.rb', line 289

def strip_op(op='')
  op.dup.sub(CR_LF, EMPTY)
end

#typeObject


297
298
299
# File 'lib/nats/server/connection.rb', line 297

def type
  'Client'
end

#unbindObject


268
269
270
271
# File 'lib/nats/server/connection.rb', line 268

def unbind
  debug "Client connection closed", client_info, cid
  process_unbind
end