Class: Pwwka::Transmitter

Inherits:
Object
  • Object
show all
Extended by:
Logging
Includes:
Logging
Defined in:
lib/pwwka/transmitter.rb

Overview

Primary interface for sending messages.

Example:

# Send a message, blowing up if there's any problem
Pwwka::Transmitter.send_message!({ user_id: @user.id }, "users.user.activated")

# Send a message, logging if there's any problem
Pwwka::Transmitter.send_message_safely({ user_id: @user.id }, "users.user.activated")

Constant Summary collapse

DEFAULT_DELAY_BY_MS =
5000

Constants included from Logging

Logging::LEVELS

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Methods included from Logging

logf, logger

Constructor Details

#initializeTransmitter

Returns a new instance of Transmitter


28
29
30
# File 'lib/pwwka/transmitter.rb', line 28

def initialize
  @channel_connector = ChannelConnector.new(connection_name: "p: #{Pwwka.configuration.app_id}")
end

Instance Attribute Details

#channel_connectorObject (readonly)

Returns the value of attribute channel_connector


26
27
28
# File 'lib/pwwka/transmitter.rb', line 26

def channel_connector
  @channel_connector
end

Class Method Details

.send_message!(payload, routing_key, on_error: :raise, delayed: false, delay_by: nil, type: nil, message_id: :auto_generate, headers: nil) ⇒ Object

Send an important message that must go through. This method allows any raised exception to pass through.

payload

Hash of what you'd like to include in your message

routing_key

String routing key for the message

delayed

Boolean send this message later

delay_by

Integer milliseconds to delay the message

type

A string describing the type. This + your configured app_id should be unique to your entire ecosystem.

message_id

If specified (which generally you should not do), sets the id of the message. If omitted, a GUID is used.

headers

A hash of arbitrary headers to include in the AMQP attributes

on_error

What is the behavior of

  • :ignore (aka as send_message_safely)

  • :raise

  • :resque – use Resque to try to send the message later

Returns true

Raises any exception generated by the innerworkings of this library.


50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
# File 'lib/pwwka/transmitter.rb', line 50

def self.send_message!(payload, routing_key,
                       on_error: :raise,
                       delayed: false,
                       delay_by: nil,
                       type: nil,
                       message_id: :auto_generate,
                       headers: nil)
  if delayed
    new.send_delayed_message!(*[payload, routing_key, delay_by].compact, type: type, headers: headers, message_id: message_id)
  else
    new.send_message!(payload, routing_key, type: type, headers: headers, message_id: message_id)
  end
  logf "AFTER Transmitting Message on %{routing_key} -> %{payload}",routing_key: routing_key, payload: payload
  true
rescue => e

  logf "ERROR Transmitting Message on %{routing_key} -> %{payload} : %{error}", routing_key: routing_key, payload: payload, error: e, at: :error

  case on_error

    when :raise
      raise e

    when :resque
      begin
        send_message_async(payload, routing_key, delay_by_ms: delayed ? delay_by || DEFAULT_DELAY_BY_MS : 0)
      rescue => resque_exception
        warn(resque_exception.message)
        raise e
      end
    else # ignore
  end
  false
end

.send_message_async(payload, routing_key, delay_by_ms: 0, type: nil, message_id: :auto_generate, headers: nil) ⇒ Object

Use Resque to enqueue the message.

  • :delay_by_ms

    Integer milliseconds to delay the message. Default is 0.


87
88
89
90
91
92
93
94
95
96
97
98
99
# File 'lib/pwwka/transmitter.rb', line 87

def self.send_message_async(payload, routing_key,
                            delay_by_ms: 0,
                            type: nil,
                            message_id: :auto_generate,
                            headers: nil)
  job = Pwwka.configuration.async_job_klass
  # Be perhaps too carefully making sure we queue jobs in the legacy way
  if type == nil && message_id == :auto_generate && headers == nil
    Resque.enqueue_in(delay_by_ms/1000, job, payload, routing_key)
  else
    Resque.enqueue_in(delay_by_ms/1000, job, payload, routing_key, type: type, message_id: message_id, headers: headers)
  end
end

.send_message_safely(payload, routing_key, delayed: false, delay_by: nil, message_id: :auto_generate) ⇒ Object

Deprecated.

This is ignoring a message. ::send_message supports this explicitly.

Send a less important message that doesn't have to go through. This eats any `StandardError` and logs it, returning false rather than blowing up.

payload

Hash of what you'd like to include in your message

routing_key

String routing key for the message

delayed

Boolean send this message later

delay_by

Integer milliseconds to delay the message

Returns true if the message was sent, false otherwise


111
112
113
# File 'lib/pwwka/transmitter.rb', line 111

def self.send_message_safely(payload, routing_key, delayed: false, delay_by: nil, message_id: :auto_generate)
  send_message!(payload, routing_key, delayed: delayed, delay_by: delay_by, on_error: :ignore)
end

Instance Method Details

#send_delayed_message!(payload, routing_key, delay_by = DEFAULT_DELAY_BY_MS, type: nil, headers: nil, message_id: :auto_generate) ⇒ Object


132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
# File 'lib/pwwka/transmitter.rb', line 132

def send_delayed_message!(payload, routing_key, delay_by = DEFAULT_DELAY_BY_MS, type: nil, headers: nil, message_id: :auto_generate)
  channel_connector.raise_if_delayed_not_allowed
  publish_options = Pwwka::PublishOptions.new(
    routing_key: routing_key,
    message_id: message_id,
    type: type,
    headers: headers,
    expiration: delay_by
  )
  logf "START Transmitting Delayed Message on id[%{id}] %{routing_key} -> %{payload}", id: publish_options.message_id, routing_key: routing_key, payload: payload
  channel_connector.create_delayed_queue
  channel_connector.delayed_exchange.publish(payload.to_json,publish_options.to_h)
  # if it gets this far it has succeeded
  logf "END Transmitting Delayed Message on id[%{id}] %{routing_key} -> %{payload}", id: publish_options.message_id, routing_key: routing_key, payload: payload
  true
ensure
  channel_connector.connection_close
end

#send_message!(payload, routing_key, type: nil, headers: nil, message_id: :auto_generate) ⇒ Object


115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
# File 'lib/pwwka/transmitter.rb', line 115

def send_message!(payload, routing_key, type: nil, headers: nil, message_id: :auto_generate)
  publish_options = Pwwka::PublishOptions.new(
    routing_key: routing_key,
    message_id: message_id,
    type: type,
    headers: headers
  )
  logf "START Transmitting Message on id[%{id}] %{routing_key} -> %{payload}", id: publish_options.message_id, routing_key: routing_key, payload: payload
  channel_connector.topic_exchange.publish(payload.to_json, publish_options.to_h)
  # if it gets this far it has succeeded
  logf "END Transmitting Message on id[%{id}] %{routing_key} -> %{payload}", id: publish_options.message_id, routing_key: routing_key, payload: payload
  true
ensure
  channel_connector.connection_close
end