Module: ActionCable::Channel::Streams

Extended by:
ActiveSupport::Concern
Included in:
Base
Defined in:
actioncable/lib/action_cable/channel/streams.rb

Overview

Потоки позволяют каналам направить трансляции подписчику. Трансляция, как обсуждалось в другом месте, является очередью pubsub, где любые данные, помещенные в нее, автоматически отправляются клиентам, которые подключены в это время. Это чисто онлайн-очередь, хотя. Если вы не передаете трансляцию в тот момент, когда она отправляет обновление, вы не получите это обновление, даже если вы подключитесь после его отправки.

Чаще всего потоковая трансляция отправляется прямо подписчику на стороне клиента. Канал выступает только как соединитель между двумя сторонами (вещателем и подписчиком канала). Вот пример канала, который позволяет подписчикам получать все новые комментарии на данной странице:

class CommentsChannel < ApplicationCable::Channel
  def follow(data)
    stream_from "comments_for_#{data['recording_id']}"
  end

  def unfollow
    stop_all_streams
  end
end

На основании вышеприведенного примера, подписчики этого канала получат все данные, которые будут помещены в, скажем, comments_for_45 трансляцию, как только они попадут туда.

Пример трансляции для этого канала выглядит так:

ActionCable.server.broadcast "comments_for_45", author: 'DHH', content: 'Rails is just swell'

Если у вас есть поток, связанный с моделью, тогда используемая трансляция может быть сгенерировано из модели и канала. В следующем примере будет подписана трансляция, наподобие comments:Z2lkOi8vVGVzdEFwcC9Qb3N0LzE.

class CommentsChannel < ApplicationCable::Channel
  def subscribed
    post = Post.find(params[:id])
    stream_for post
  end
end

Затем вы можете транслировать этот канал, используя:

CommentsChannel.broadcast_to(@post, @comment)

Если вы хотите не просто передать трансляцию нефильтрованному подписчику, вы также можете предоставить колбэк, который позволит вам изменить то, что отправлено. В следующем примере показано, как вы можете использовать это, чтобы обеспечить интроспекцию производительности в этом процессе:

class ChatChannel < ApplicationCable::Channel
  def subscribed
    @room = Chat::Room[params[:room_number]]

    stream_for @room, coder: ActiveSupport::JSON do |message|
      if message['originated_at'].present?
        elapsed_time = (Time.now.to_f - message['originated_at']).round(2)

        ActiveSupport::Notifications.instrument :performance, measurement: 'Chat.message_delay', value: elapsed_time, action: :timing
        logger.info "Message took #{elapsed_time}s to arrive"
      end

      transmit message
    end
  end
end

Вы можете прекратить чтение из потока для всех трансляций, вызывая #stop_all_streams.

Instance Method Summary collapse

Methods included from ActiveSupport::Concern

append_features, class_methods, extended, included

Instance Method Details

#stop_all_streamsObject

Отменяет подписку на все потоки, связанные с этим каналом, из очереди pubsub.


104
105
106
107
108
109
# File 'actioncable/lib/action_cable/channel/streams.rb', line 104

def stop_all_streams
  streams.each do |broadcasting, callback|
    pubsub.unsubscribe broadcasting, callback
    logger.info "#{self.class.name} stopped streaming from #{broadcasting}"
  end.clear
end

#stream_for(model, callback = nil, coder: nil, &block) ⇒ Object

Начните читать из потока очереди pubsub для model в этом канале. При желании вы можете передать callback, который будет использоваться вместо значения по умолчанию, просто передавая обновления прямо подписчику.

Передайте coder: ActiveSupport::JSON для декодирования сообщений как JSON перед передачей колбэка. По умолчанию coder: nil, который не выполняет декодирование, передает необработанные сообщения.


99
100
101
# File 'actioncable/lib/action_cable/channel/streams.rb', line 99

def stream_for(model, callback = nil, coder: nil, &block)
  stream_from(broadcasting_for([ channel_name, model ]), callback || block, coder: coder)
end

#stream_from(broadcasting, callback = nil, coder: nil, &block) ⇒ Object

Запустите чтение из потока из очереди pubsub названной broadcasting. При желании вы можете передать callback, который будет использоваться вместо значения по умолчанию, просто передавая обновления прямо подписчику. Передайте coder: ActiveSupport::JSON, чтобы декодировать сообщения как JSON перед передачей колбэка. По умолчанию coder: nil, который не выполняет декодирование, передает необработанные сообщения.


74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
# File 'actioncable/lib/action_cable/channel/streams.rb', line 74

def stream_from(broadcasting, callback = nil, coder: nil, &block)
  broadcasting = String(broadcasting)

  # Не отправлять подтверждение, пока pubsub#subscribe не будет выполнена
  defer_subscription_confirmation!

  # Создайте обработчик потока, обернув предоставленный пользователем колбэк с помощью 
  # декодера или по умолчанию для ретранслятора JSON-декодирования.
  handler = worker_pool_stream_handler(broadcasting, callback || block, coder: coder)
  streams << [ broadcasting, handler ]

  connection.server.event_loop.post do
    pubsub.subscribe(broadcasting, handler, lambda do
      ensure_confirmation_sent
      logger.info "#{self.class.name} is streaming from #{broadcasting}"
    end)
  end
end