Class: TweetStream::Client

Inherits:
Object
  • Object
show all
Defined in:
lib/tweetstream/client.rb

Overview

Provides simple access to the Twitter Streaming API (apiwiki.twitter.com/Streaming-API-Documentation) for Ruby scripts that need to create a long connection to Twitter for tracking and other purposes.

Basic usage of the library is to call one of the provided methods and provide a block that will perform actions on a yielded Twitter::Status. For example:

TweetStream::Client.new.track('fail') do |status|
  puts "[#{status.user.screen_name}] #{status.text}"
end

For information about a daemonized TweetStream client, view the TweetStream::Daemon class.

Direct Known Subclasses

Daemon

Instance Attribute Summary (collapse)

Instance Method Summary (collapse)

Constructor Details

- (Client) initialize(options = {})

Creates a new API



31
32
33
34
35
36
# File 'lib/tweetstream/client.rb', line 31

def initialize(options={})
  options = TweetStream.options.merge(options)
  Configuration::VALID_OPTIONS_KEYS.each do |key|
    send("#{key}=", options[key])
  end
end

Instance Attribute Details

- (Object) control (readonly)

Returns the value of attribute control



28
29
30
# File 'lib/tweetstream/client.rb', line 28

def control
  @control
end

- (Object) control_uri (readonly)

Returns the value of attribute control_uri



27
28
29
# File 'lib/tweetstream/client.rb', line 27

def control_uri
  @control_uri
end

Instance Method Details

- (Object) close_connection

Close the connection to twitter without closing the eventmachine loop



442
443
444
# File 'lib/tweetstream/client.rb', line 442

def close_connection
  @stream.close_connection if @stream
end

- (Object) connect(path, query_parameters = {}, &block)

connect to twitter without starting a new EventMachine run loop



333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
# File 'lib/tweetstream/client.rb', line 333

def connect(path, query_parameters = {}, &block)
  method = query_parameters.delete(:method) || :get
  delete_proc = query_parameters.delete(:delete) || self.on_delete
  scrub_geo_proc = query_parameters.delete(:scrub_geo) || self.on_scrub_geo
  limit_proc = query_parameters.delete(:limit) || self.on_limit
  error_proc = query_parameters.delete(:error) || self.on_error
  reconnect_proc = query_parameters.delete(:reconnect) || self.on_reconnect
  inited_proc = query_parameters.delete(:inited) || self.on_inited
  direct_message_proc = query_parameters.delete(:direct_message) || self.on_direct_message
  timeline_status_proc = query_parameters.delete(:timeline_status) || self.on_timeline_status
  anything_proc = query_parameters.delete(:anything) || self.on_anything

  params = normalize_filter_parameters(query_parameters)

  extra_stream_parameters = query_parameters.delete(:extra_stream_parameters) || {}

  uri = method == :get ? build_uri(path, params) : build_uri(path)

  stream_params = {
      :path => uri,
      :method => method.to_s.upcase,
      :user_agent => user_agent,
      :on_inited => inited_proc,
      :filters => params.delete(:track),
      :params => params,
      :ssl => true
  }.merge(auth_params).merge(extra_stream_parameters)

  @stream = Twitter::JSONStream.connect(stream_params)
  @stream.each_item do |item|
    begin
      hash = MultiJson.decode(item)
    rescue MultiJson::DecodeError
      error_proc.call("MultiJson::DecodeError occured in stream: #{item}") if error_proc.is_a?(Proc)
      next
    end

    unless hash.is_a?(::Hash)
      error_proc.call("Unexpected JSON object in stream: #{item}") if error_proc.is_a?(Proc)
      next
    end

    if hash['control'] && hash['control']['control_uri']
      @control_uri = hash['control']['control_uri']
      require 'tweetstream/site_stream_client'
      @control = TweetStream::SiteStreamClient.new(@control_uri)
      @control.on_error(&self.on_error)
    elsif hash['delete'] && hash['delete']['status']
      delete_proc.call(hash['delete']['status']['id'], hash['delete']['status']['user_id']) if delete_proc.is_a?(Proc)
    elsif hash['scrub_geo'] && hash['scrub_geo']['up_to_status_id']
      scrub_geo_proc.call(hash['scrub_geo']['up_to_status_id'], hash['scrub_geo']['user_id']) if scrub_geo_proc.is_a?(Proc)
    elsif hash['limit'] && hash['limit']['track']
      limit_proc.call(hash['limit']['track']) if limit_proc.is_a?(Proc)
    elsif hash['direct_message']
      yield_message_to direct_message_proc, Twitter::DirectMessage.new(hash['direct_message'])
    elsif hash['text'] && hash['user']
      @last_status = Twitter::Status.new(hash)
      yield_message_to timeline_status_proc, @last_status

      if block_given?
        # Give the block the option to receive either one
        # or two arguments, depending on its arity.
        case block.arity
          when 1
            yield @last_status
          when 2
            yield @last_status, self
        end
      end
    elsif hash['for_user']
      @message = hash

      if block_given?
        # Give the block the option to receive either one
        # or two arguments, depending on its arity.
        case block.arity
        when 1
          yield @message
        when 2
          yield @message, self
        end
      end
    end

    yield_message_to anything_proc, hash
  end

  @stream.on_error do |message|
    error_proc.call(message) if error_proc.is_a?(Proc)
  end

  @stream.on_reconnect do |timeout, retries|
    reconnect_proc.call(timeout, retries) if reconnect_proc.is_a?(Proc)
  end

  @stream.on_max_reconnects do |timeout, retries|
    raise TweetStream::ReconnectError.new(timeout, retries)
  end

  @stream
end

- (Object) filter(query_params = {}, &block)

Make a call to the statuses/filter method of the Streaming API, you may provide :follow, :track or both as options to follow the tweets of specified users or track keywords. This method is provided separately for cases when it would conserve the number of HTTP connections to combine track and follow.



115
116
117
# File 'lib/tweetstream/client.rb', line 115

def filter(query_params = {}, &block)
  start('statuses/filter', query_params.merge(:method => :post), &block)
end

- (Object) firehose(query_parameters = {}, &block)

Returns all public statuses. The Firehose is not a generally available resource. Few applications require this level of access. Creative use of a combination of other resources and various access levels can satisfy nearly every application use case.



42
43
44
# File 'lib/tweetstream/client.rb', line 42

def firehose(query_parameters = {}, &block)
  start('statuses/firehose', query_parameters, &block)
end

- (Object) follow(*user_ids, &block)

Returns public statuses from or in reply to a set of users. Mentions ("Hello @user!") and implicit replies ("@user Hello!" created without pressing the reply "swoosh") are not matched. Requires integer user IDs, not screen names. Query parameters may be passed as the last argument.



90
91
92
93
94
# File 'lib/tweetstream/client.rb', line 90

def follow(*user_ids, &block)
  query_params = user_ids.pop if user_ids.last.is_a?(::Hash)
  query_params ||= {}
  filter(query_params.merge(:follow => user_ids), &block)
end

Returns all statuses containing http: and https:. The links stream is not a generally available resource. Few applications require this level of access. Creative use of a combination of other resources and various access levels can satisfy nearly every application use case.



50
51
52
# File 'lib/tweetstream/client.rb', line 50

def links(query_parameters = {}, &block)
  start('statuses/links', query_parameters, &block)
end

- (Object) locations(*locations_map, &block)

Specifies a set of bounding boxes to track. Only tweets that are both created using the Geotagging API and are placed from within a tracked bounding box will be included in the stream ??? the user???s location field is not used to filter tweets (e.g. if a user has their location set to ???San Francisco???, but the tweet was not created using the Geotagging API and has no geo element, it will not be included in the stream). Bounding boxes are specified as a comma separate list of longitude/latitude pairs, with the first pair denoting the southwest corner of the box longitude/latitude pairs, separated by commas. The first pair specifies the southwest corner of the box.



104
105
106
107
108
# File 'lib/tweetstream/client.rb', line 104

def locations(*locations_map, &block)
  query_params = locations_map.pop if locations_map.last.is_a?(::Hash)
  query_params ||= {}
  filter(query_params.merge(:locations => locations_map), &block)
end

- (Object) on_anything(&block)

Set a Proc to be run whenever anything is encountered in the processing of the stream.

@client = TweetStream::Client.new
@client.on_anything do |status|
  # do something with the status
end

Block can take one or two arguments. |status (, client)| If no block is given, it will return the currently set timeline status proc. When a block is given, the TweetStream::Client object is returned to allow for chaining.



255
256
257
258
259
260
261
262
# File 'lib/tweetstream/client.rb', line 255

def on_anything(&block)
  if block_given?
    @on_anything = block
    self
  else
    @on_anything
  end
end

- (Object) on_delete(&block)

Set a Proc to be run when a deletion notice is received from the Twitter stream. For example:

@client = TweetStream::Client.new
@client.on_delete do |status_id, user_id|
  Tweet.delete(status_id)
end

Block must take two arguments: the status id and the user id. If no block is given, it will return the currently set deletion proc. When a block is given, the TweetStream::Client object is returned to allow for chaining.



149
150
151
152
153
154
155
156
# File 'lib/tweetstream/client.rb', line 149

def on_delete(&block)
  if block_given?
    @on_delete = block
    self
  else
    @on_delete
  end
end

- (Object) on_direct_message(&block)

Set a Proc to be run when a direct message is encountered in the processing of the stream.

@client = TweetStream::Client.new
@client.on_direct_message do |direct_message|
  # do something with the direct message
end

Block must take one argument: the direct message. If no block is given, it will return the currently set direct message proc. When a block is given, the TweetStream::Client object is returned to allow for chaining.



234
235
236
237
238
239
240
241
# File 'lib/tweetstream/client.rb', line 234

def on_direct_message(&block)
  if block_given?
    @on_direct_message = block
    self
  else
    @on_direct_message
  end
end

- (Object) on_error(&block)

Set a Proc to be run when an HTTP error is encountered in the processing of the stream. Note that TweetStream will automatically try to reconnect, this is for reference only. Don't panic!

@client = TweetStream::Client.new
@client.on_error do |message|
  # Make note of error message
end

Block must take one argument: the error message. If no block is given, it will return the currently set error proc. When a block is given, the TweetStream::Client object is returned to allow for chaining.



213
214
215
216
217
218
219
220
# File 'lib/tweetstream/client.rb', line 213

def on_error(&block)
  if block_given?
    @on_error = block
    self
  else
    @on_error
  end
end

- (Object) on_inited(&block)

Set a Proc to be run when connection established. Called in EventMachine::Connection#post_init

@client = TweetStream::Client.new
@client.on_inited do
  puts 'Connected...'
end


309
310
311
312
313
314
315
316
# File 'lib/tweetstream/client.rb', line 309

def on_inited(&block)
  if block_given?
    @on_inited = block
    self
  else
    @on_inited
  end
end

- (Object) on_limit(&block)

Set a Proc to be run when a rate limit notice is received from the Twitter stream. For example:

@client = TweetStream::Client.new
@client.on_limit do |discarded_count|
  # Make note of discarded count
end

Block must take one argument: the number of discarded tweets. If no block is given, it will return the currently set limit proc. When a block is given, the TweetStream::Client object is returned to allow for chaining.



191
192
193
194
195
196
197
198
# File 'lib/tweetstream/client.rb', line 191

def on_limit(&block)
  if block_given?
    @on_limit = block
    self
  else
    @on_limit
  end
end

- (Object) on_reconnect(&block)

Set a Proc to be run on reconnect.

@client = TweetStream::Client.new
@client.on_reconnect do |timeout, retries|
  # Make note of the reconnection
end


292
293
294
295
296
297
298
299
# File 'lib/tweetstream/client.rb', line 292

def on_reconnect(&block)
  if block_given?
    @on_reconnect = block
    self
  else
    @on_reconnect
  end
end

- (Object) on_scrub_geo(&block)

Set a Proc to be run when a scrub_geo notice is received from the Twitter stream. For example:

@client = TweetStream::Client.new
@client.on_scrub_geo do |up_to_status_id, user_id|
  Tweet.where(:status_id <= up_to_status_id)
end

Block must take two arguments: the upper status id and the user id. If no block is given, it will return the currently set scrub_geo proc. When a block is given, the TweetStream::Client object is returned to allow for chaining.



170
171
172
173
174
175
176
177
# File 'lib/tweetstream/client.rb', line 170

def on_scrub_geo(&block)
  if block_given?
    @on_scrub_geo = block
    self
  else
    @on_scrub_geo
  end
end

- (Object) on_timeline_status(&block)

Set a Proc to be run when a regular timeline message is encountered in the processing of the stream.

@client = TweetStream::Client.new
@client.on_timeline_status do |status|
  # do something with the status
end

Block can take one or two arguments. |status (, client)| If no block is given, it will return the currently set timeline status proc. When a block is given, the TweetStream::Client object is returned to allow for chaining.



276
277
278
279
280
281
282
283
# File 'lib/tweetstream/client.rb', line 276

def on_timeline_status(&block)
  if block_given?
    @on_timeline_status = block
    self
  else
    @on_timeline_status
  end
end

- (Object) retweet(query_parameters = {}, &block)

Returns all retweets. The retweet stream is not a generally available resource. Few applications require this level of access. Creative use of a combination of other resources and various access levels can satisfy nearly every application use case. As of 9/11/2009, the site-wide retweet feature has not yet launched, so there are currently few, if any, retweets on this stream.



60
61
62
# File 'lib/tweetstream/client.rb', line 60

def retweet(query_parameters = {}, &block)
  start('statuses/retweet', query_parameters, &block)
end

- (Object) sample(query_parameters = {}, &block)

Returns a random sample of all public statuses. The default access level provides a small proportion of the Firehose. The "Gardenhose" access level provides a proportion more suitable for data mining and research applications that desire a larger proportion to be statistically significant sample.



69
70
71
# File 'lib/tweetstream/client.rb', line 69

def sample(query_parameters = {}, &block)
  start('statuses/sample', query_parameters, &block)
end

- (Object) sitestream(user_ids = [], query_params = {}, &block)

Make a call to the userstream api



126
127
128
129
130
131
132
133
134
135
# File 'lib/tweetstream/client.rb', line 126

def sitestream(user_ids = [], query_params = {}, &block)
  stream_params = { :host => "sitestream.twitter.com", :path => '/2b/site.json' }
  sitestream_params = {
    :method => :post,
    :follow => user_ids,
    :extra_stream_parameters => stream_params
  }
  sitestream_params.merge!(:with => 'followings') if query_params[:followings]
  start('', sitestream_params, &block)
end

- (Object) start(path, query_parameters = {}, &block)

connect to twitter while starting a new EventMachine run loop



319
320
321
322
323
324
325
326
327
328
329
330
# File 'lib/tweetstream/client.rb', line 319

def start(path, query_parameters = {}, &block)
  if EventMachine.reactor_running?
    connect(path, query_parameters, &block)
  else
    EventMachine.epoll
    EventMachine.kqueue

    EventMachine::run do
      connect(path, query_parameters, &block)
    end
  end
end

- (Object) stop

Terminate the currently running TweetStream and close EventMachine loop



436
437
438
439
# File 'lib/tweetstream/client.rb', line 436

def stop
  EventMachine.stop_event_loop
  @last_status
end

- (Object) stop_stream



446
447
448
# File 'lib/tweetstream/client.rb', line 446

def stop_stream
  @stream.stop if @stream
end

- (Object) track(*keywords, &block)

Specify keywords to track. Queries are subject to Track Limitations, described in Track Limiting and subject to access roles, described in the statuses/filter method. Track keywords are case-insensitive logical ORs. Terms are exact-matched, and also exact-matched ignoring punctuation. Phrases, keywords with spaces, are not supported. Keywords containing punctuation will only exact match tokens. Query parameters may be passed as the last argument.



80
81
82
83
84
# File 'lib/tweetstream/client.rb', line 80

def track(*keywords, &block)
  query_params = keywords.pop if keywords.last.is_a?(::Hash)
  query_params ||= {}
  filter(query_params.merge(:track => keywords), &block)
end

- (Object) userstream(&block)

Make a call to the userstream api for currently authenticated user



120
121
122
123
# File 'lib/tweetstream/client.rb', line 120

def userstream(&block)
  stream_params = { :host => "userstream.twitter.com", :path => "/2/user.json" }
  start('', :extra_stream_parameters => stream_params, &block)
end