Class: Rpush::Daemon::AppRunner

Inherits:
Object
  • Object
show all
Extended by:
Loggable, StringHelpers, Reflectable
Includes:
Loggable, StringHelpers, Reflectable
Defined in:
lib/rpush/daemon/app_runner.rb

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Methods included from StringHelpers

pluralize

Methods included from Loggable

log_debug, log_error, log_info, log_warn

Methods included from Reflectable

reflect

Constructor Details

#initialize(app) ⇒ AppRunner


94
95
96
97
98
# File 'lib/rpush/daemon/app_runner.rb', line 94

def initialize(app)
  @app = app
  @loops = []
  @dispatcher_loops = []
end

Instance Attribute Details

#appObject (readonly)

Returns the value of attribute app


91
92
93
# File 'lib/rpush/daemon/app_runner.rb', line 91

def app
  @app
end

Class Method Details

.app_idsObject


57
58
59
# File 'lib/rpush/daemon/app_runner.rb', line 57

def self.app_ids
  @runners.keys
end

.app_running?(app) ⇒ Boolean


53
54
55
# File 'lib/rpush/daemon/app_runner.rb', line 53

def self.app_running?(app)
  @runners.key?(app.id)
end

.app_with_id(app_id) ⇒ Object


49
50
51
# File 'lib/rpush/daemon/app_runner.rb', line 49

def self.app_with_id(app_id)
  @runners[app_id].app
end

.decrement_dispatchers(app, num) ⇒ Object


79
80
81
# File 'lib/rpush/daemon/app_runner.rb', line 79

def self.decrement_dispatchers(app, num)
  @runners[app.id].decrement_dispatchers(num)
end

.enqueue(notifications) ⇒ Object


15
16
17
18
19
20
21
22
# File 'lib/rpush/daemon/app_runner.rb', line 15

def self.enqueue(notifications)
  notifications.group_by(&:app_id).each do |app_id, group|
    start_app_with_id(app_id) unless @runners[app_id]
    @runners[app_id].enqueue(group) if @runners[app_id]
  end

  ProcTitle.update
end

.increment_dispatchers(app, num) ⇒ Object


83
84
85
# File 'lib/rpush/daemon/app_runner.rb', line 83

def self.increment_dispatchers(app, num)
  @runners[app.id].increment_dispatchers(num)
end

.num_dispatchers_for_app(app) ⇒ Object


74
75
76
77
# File 'lib/rpush/daemon/app_runner.rb', line 74

def self.num_dispatchers_for_app(app)
  runner = @runners[app.id]
  runner ? runner.num_dispatcher_loops : 0
end

.start_app(app) ⇒ Object


28
29
30
31
32
33
34
35
36
37
38
39
# File 'lib/rpush/daemon/app_runner.rb', line 28

def self.start_app(app)
  Rpush.logger.info("[#{app.name}] Starting #{pluralize(app.connections, 'dispatcher')}... ", true)
  runner = @runners[app.id] = new(app)
  runner.start_dispatchers
  puts ANSI.green { '✔' } if Rpush.config.foreground
  runner.start_loops
rescue StandardError => e
  @runners.delete(app.id)
  Rpush.logger.error("[#{app.name}] Exception raised during startup. Notifications will not be delivered for this app.")
  Rpush.logger.error(e)
  reflect(:error, e)
end

.start_app_with_id(app_id) ⇒ Object


24
25
26
# File 'lib/rpush/daemon/app_runner.rb', line 24

def self.start_app_with_id(app_id)
  start_app(Rpush::Daemon.store.app(app_id))
end

.statusObject


87
88
89
# File 'lib/rpush/daemon/app_runner.rb', line 87

def self.status
  { app_runners: @runners.values.map(&:status) }
end

.stopObject


61
62
63
64
# File 'lib/rpush/daemon/app_runner.rb', line 61

def self.stop
  @runners.values.map(&:stop)
  @runners.clear
end

.stop_app(app_id) ⇒ Object


41
42
43
44
45
46
47
# File 'lib/rpush/daemon/app_runner.rb', line 41

def self.stop_app(app_id)
  runner = @runners.delete(app_id)
  if runner
    runner.stop
    log_info("[#{runner.app.name}] Stopped.")
  end
end

.total_dispatchersObject


66
67
68
# File 'lib/rpush/daemon/app_runner.rb', line 66

def self.total_dispatchers
  @runners.values.sum(&:num_dispatcher_loops)
end

.total_queuedObject


70
71
72
# File 'lib/rpush/daemon/app_runner.rb', line 70

def self.total_queued
  @runners.values.sum(&:queue_size)
end

Instance Method Details

#decrement_dispatchers(num) ⇒ Object


135
136
137
# File 'lib/rpush/daemon/app_runner.rb', line 135

def decrement_dispatchers(num)
  num.times { @dispatcher_loops.pop.stop }
end

#enqueue(notifications) ⇒ Object


119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
# File 'lib/rpush/daemon/app_runner.rb', line 119

def enqueue(notifications)
  if service.batch_deliveries?
    batch_size = (notifications.size / num_dispatcher_loops.to_f).ceil
    notifications.in_groups_of(batch_size, false).each do |batch_notifications|
      batch = Batch.new(batch_notifications)
      queue.push(QueuePayload.new(batch))
    end
  else
    batch = Batch.new(notifications)
    notifications.each do |notification|
      queue.push(QueuePayload.new(batch, notification))
      reflect(:notification_enqueued, notification)
    end
  end
end

#increment_dispatchers(num) ⇒ Object


139
140
141
# File 'lib/rpush/daemon/app_runner.rb', line 139

def increment_dispatchers(num)
  num.times { @dispatcher_loops.push(new_dispatcher_loop) }
end

#num_dispatcher_loopsObject


157
158
159
# File 'lib/rpush/daemon/app_runner.rb', line 157

def num_dispatcher_loops
  @dispatcher_loops.size
end

#start_dispatchersObject


100
101
102
# File 'lib/rpush/daemon/app_runner.rb', line 100

def start_dispatchers
  app.connections.times { @dispatcher_loops.push(new_dispatcher_loop) }
end

#start_loopsObject


104
105
106
107
# File 'lib/rpush/daemon/app_runner.rb', line 104

def start_loops
  @loops = service.loop_instances(@app)
  @loops.map(&:start)
end

#statusObject


143
144
145
146
147
148
149
150
151
152
153
154
155
# File 'lib/rpush/daemon/app_runner.rb', line 143

def status
  dispatcher_details = {}

  @dispatcher_loops.each_with_index do |dispatcher_loop, i|
    dispatcher_details[i] = {
      started_at: dispatcher_loop.started_at.iso8601,
      dispatched: dispatcher_loop.dispatch_count,
      thread_status: dispatcher_loop.thread_status
    }
  end

  { app_name: @app.name, dispatchers: dispatcher_details, queued: queue_size }
end

#stopObject


109
110
111
112
113
# File 'lib/rpush/daemon/app_runner.rb', line 109

def stop
  wait_until_idle
  stop_dispatcher_loops
  stop_loops
end

#wait_until_idleObject


115
116
117
# File 'lib/rpush/daemon/app_runner.rb', line 115

def wait_until_idle
  sleep 0.5 while queue.size > 0
end