Class: TweetStream::Client
- Inherits:
-
Object
- Object
- TweetStream::Client
- Defined in:
- lib/tweetstream/client.rb
Overview
Provides simple access to the Twitter Streaming API (apiwiki.twitter.com/Streaming-API-Documentation) for Ruby scripts that need to create a long connection to Twitter for tracking and other purposes.
Basic usage of the library is to call one of the provided methods and provide a block that will perform actions on a yielded Twitter::Status. For example:
TweetStream::Client.new.track('fail') do |status|
puts "[#{status.user.screen_name}] #{status.text}"
end
For information about a daemonized TweetStream client, view the TweetStream::Daemon class.
Direct Known Subclasses
Instance Attribute Summary (collapse)
-
- (Object) control
readonly
Returns the value of attribute control.
-
- (Object) control_uri
readonly
Returns the value of attribute control_uri.
Instance Method Summary (collapse)
-
- (Object) close_connection
Close the connection to twitter without closing the eventmachine loop.
-
- (Object) connect(path, query_parameters = {}, &block)
connect to twitter without starting a new EventMachine run loop.
-
- (Object) filter(query_params = {}, &block)
Make a call to the statuses/filter method of the Streaming API, you may provide :follow, :track or both as options to follow the tweets of specified users or track keywords.
-
- (Object) firehose(query_parameters = {}, &block)
Returns all public statuses.
-
- (Object) follow(*user_ids, &block)
Returns public statuses from or in reply to a set of users.
-
- (Client) initialize(options = {})
constructor
Creates a new API.
-
- (Object) links(query_parameters = {}, &block)
Returns all statuses containing http: and https:.
-
- (Object) locations(*locations_map, &block)
Specifies a set of bounding boxes to track.
-
- (Object) on_anything(&block)
Set a Proc to be run whenever anything is encountered in the processing of the stream.
-
- (Object) on_delete(&block)
Set a Proc to be run when a deletion notice is received from the Twitter stream.
-
- (Object) on_direct_message(&block)
Set a Proc to be run when a direct message is encountered in the processing of the stream.
-
- (Object) on_error(&block)
Set a Proc to be run when an HTTP error is encountered in the processing of the stream.
-
- (Object) on_inited(&block)
Set a Proc to be run when connection established.
-
- (Object) on_limit(&block)
Set a Proc to be run when a rate limit notice is received from the Twitter stream.
-
- (Object) on_reconnect(&block)
Set a Proc to be run on reconnect.
-
- (Object) on_scrub_geo(&block)
Set a Proc to be run when a scrub_geo notice is received from the Twitter stream.
-
- (Object) on_timeline_status(&block)
Set a Proc to be run when a regular timeline message is encountered in the processing of the stream.
-
- (Object) retweet(query_parameters = {}, &block)
Returns all retweets.
-
- (Object) sample(query_parameters = {}, &block)
Returns a random sample of all public statuses.
-
- (Object) sitestream(user_ids = [], query_params = {}, &block)
Make a call to the userstream api.
-
- (Object) start(path, query_parameters = {}, &block)
connect to twitter while starting a new EventMachine run loop.
-
- (Object) stop
Terminate the currently running TweetStream and close EventMachine loop.
- - (Object) stop_stream
-
- (Object) track(*keywords, &block)
Specify keywords to track.
-
- (Object) userstream(&block)
Make a call to the userstream api for currently authenticated user.
Constructor Details
- (Client) initialize(options = {})
Creates a new API
31 32 33 34 35 36 |
# File 'lib/tweetstream/client.rb', line 31 def initialize(={}) = TweetStream..merge() Configuration::VALID_OPTIONS_KEYS.each do |key| send("#{key}=", [key]) end end |
Instance Attribute Details
- (Object) control (readonly)
Returns the value of attribute control
28 29 30 |
# File 'lib/tweetstream/client.rb', line 28 def control @control end |
- (Object) control_uri (readonly)
Returns the value of attribute control_uri
27 28 29 |
# File 'lib/tweetstream/client.rb', line 27 def control_uri @control_uri end |
Instance Method Details
- (Object) close_connection
Close the connection to twitter without closing the eventmachine loop
442 443 444 |
# File 'lib/tweetstream/client.rb', line 442 def close_connection @stream.close_connection if @stream end |
- (Object) connect(path, query_parameters = {}, &block)
connect to twitter without starting a new EventMachine run loop
333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 |
# File 'lib/tweetstream/client.rb', line 333 def connect(path, query_parameters = {}, &block) method = query_parameters.delete(:method) || :get delete_proc = query_parameters.delete(:delete) || self.on_delete scrub_geo_proc = query_parameters.delete(:scrub_geo) || self.on_scrub_geo limit_proc = query_parameters.delete(:limit) || self.on_limit error_proc = query_parameters.delete(:error) || self.on_error reconnect_proc = query_parameters.delete(:reconnect) || self.on_reconnect inited_proc = query_parameters.delete(:inited) || self.on_inited = query_parameters.delete(:direct_message) || self. timeline_status_proc = query_parameters.delete(:timeline_status) || self.on_timeline_status anything_proc = query_parameters.delete(:anything) || self.on_anything params = normalize_filter_parameters(query_parameters) extra_stream_parameters = query_parameters.delete(:extra_stream_parameters) || {} uri = method == :get ? build_uri(path, params) : build_uri(path) stream_params = { :path => uri, :method => method.to_s.upcase, :user_agent => user_agent, :on_inited => inited_proc, :filters => params.delete(:track), :params => params, :ssl => true }.merge(auth_params).merge(extra_stream_parameters) @stream = Twitter::JSONStream.connect(stream_params) @stream.each_item do |item| begin hash = MultiJson.decode(item) rescue MultiJson::DecodeError error_proc.call("MultiJson::DecodeError occured in stream: #{item}") if error_proc.is_a?(Proc) next end unless hash.is_a?(::Hash) error_proc.call("Unexpected JSON object in stream: #{item}") if error_proc.is_a?(Proc) next end if hash['control'] && hash['control']['control_uri'] @control_uri = hash['control']['control_uri'] require 'tweetstream/site_stream_client' @control = TweetStream::SiteStreamClient.new(@control_uri) @control.on_error(&self.on_error) elsif hash['delete'] && hash['delete']['status'] delete_proc.call(hash['delete']['status']['id'], hash['delete']['status']['user_id']) if delete_proc.is_a?(Proc) elsif hash['scrub_geo'] && hash['scrub_geo']['up_to_status_id'] scrub_geo_proc.call(hash['scrub_geo']['up_to_status_id'], hash['scrub_geo']['user_id']) if scrub_geo_proc.is_a?(Proc) elsif hash['limit'] && hash['limit']['track'] limit_proc.call(hash['limit']['track']) if limit_proc.is_a?(Proc) elsif hash['direct_message'] , Twitter::DirectMessage.new(hash['direct_message']) elsif hash['text'] && hash['user'] @last_status = Twitter::Status.new(hash) timeline_status_proc, @last_status if block_given? # Give the block the option to receive either one # or two arguments, depending on its arity. case block.arity when 1 yield @last_status when 2 yield @last_status, self end end elsif hash['for_user'] @message = hash if block_given? # Give the block the option to receive either one # or two arguments, depending on its arity. case block.arity when 1 yield @message when 2 yield @message, self end end end anything_proc, hash end @stream.on_error do || error_proc.call() if error_proc.is_a?(Proc) end @stream.on_reconnect do |timeout, retries| reconnect_proc.call(timeout, retries) if reconnect_proc.is_a?(Proc) end @stream.on_max_reconnects do |timeout, retries| raise TweetStream::ReconnectError.new(timeout, retries) end @stream end |
- (Object) filter(query_params = {}, &block)
Make a call to the statuses/filter method of the Streaming API, you may provide :follow, :track or both as options to follow the tweets of specified users or track keywords. This method is provided separately for cases when it would conserve the number of HTTP connections to combine track and follow.
115 116 117 |
# File 'lib/tweetstream/client.rb', line 115 def filter(query_params = {}, &block) start('statuses/filter', query_params.merge(:method => :post), &block) end |
- (Object) firehose(query_parameters = {}, &block)
Returns all public statuses. The Firehose is not a generally available resource. Few applications require this level of access. Creative use of a combination of other resources and various access levels can satisfy nearly every application use case.
42 43 44 |
# File 'lib/tweetstream/client.rb', line 42 def firehose(query_parameters = {}, &block) start('statuses/firehose', query_parameters, &block) end |
- (Object) follow(*user_ids, &block)
Returns public statuses from or in reply to a set of users. Mentions ("Hello @user!") and implicit replies ("@user Hello!" created without pressing the reply "swoosh") are not matched. Requires integer user IDs, not screen names. Query parameters may be passed as the last argument.
90 91 92 93 94 |
# File 'lib/tweetstream/client.rb', line 90 def follow(*user_ids, &block) query_params = user_ids.pop if user_ids.last.is_a?(::Hash) query_params ||= {} filter(query_params.merge(:follow => user_ids), &block) end |
- (Object) links(query_parameters = {}, &block)
Returns all statuses containing http: and https:. The links stream is not a generally available resource. Few applications require this level of access. Creative use of a combination of other resources and various access levels can satisfy nearly every application use case.
50 51 52 |
# File 'lib/tweetstream/client.rb', line 50 def links(query_parameters = {}, &block) start('statuses/links', query_parameters, &block) end |
- (Object) locations(*locations_map, &block)
Specifies a set of bounding boxes to track. Only tweets that are both created using the Geotagging API and are placed from within a tracked bounding box will be included in the stream ??? the user???s location field is not used to filter tweets (e.g. if a user has their location set to ???San Francisco???, but the tweet was not created using the Geotagging API and has no geo element, it will not be included in the stream). Bounding boxes are specified as a comma separate list of longitude/latitude pairs, with the first pair denoting the southwest corner of the box longitude/latitude pairs, separated by commas. The first pair specifies the southwest corner of the box.
104 105 106 107 108 |
# File 'lib/tweetstream/client.rb', line 104 def locations(*locations_map, &block) query_params = locations_map.pop if locations_map.last.is_a?(::Hash) query_params ||= {} filter(query_params.merge(:locations => locations_map), &block) end |
- (Object) on_anything(&block)
Set a Proc to be run whenever anything is encountered in the processing of the stream.
@client = TweetStream::Client.new
@client.on_anything do |status|
# do something with the status
end
Block can take one or two arguments. |status (, client)| If no block is given, it will return the currently set timeline status proc. When a block is given, the TweetStream::Client object is returned to allow for chaining.
255 256 257 258 259 260 261 262 |
# File 'lib/tweetstream/client.rb', line 255 def on_anything(&block) if block_given? @on_anything = block self else @on_anything end end |
- (Object) on_delete(&block)
Set a Proc to be run when a deletion notice is received from the Twitter stream. For example:
@client = TweetStream::Client.new
@client.on_delete do |status_id, user_id|
Tweet.delete(status_id)
end
Block must take two arguments: the status id and the user id. If no block is given, it will return the currently set deletion proc. When a block is given, the TweetStream::Client object is returned to allow for chaining.
149 150 151 152 153 154 155 156 |
# File 'lib/tweetstream/client.rb', line 149 def on_delete(&block) if block_given? @on_delete = block self else @on_delete end end |
- (Object) on_direct_message(&block)
Set a Proc to be run when a direct message is encountered in the processing of the stream.
@client = TweetStream::Client.new
@client. do ||
# do something with the direct message
end
Block must take one argument: the direct message. If no block is given, it will return the currently set direct message proc. When a block is given, the TweetStream::Client object is returned to allow for chaining.
234 235 236 237 238 239 240 241 |
# File 'lib/tweetstream/client.rb', line 234 def (&block) if block_given? @on_direct_message = block self else @on_direct_message end end |
- (Object) on_error(&block)
Set a Proc to be run when an HTTP error is encountered in the processing of the stream. Note that TweetStream will automatically try to reconnect, this is for reference only. Don't panic!
@client = TweetStream::Client.new
@client.on_error do ||
# Make note of error message
end
Block must take one argument: the error message. If no block is given, it will return the currently set error proc. When a block is given, the TweetStream::Client object is returned to allow for chaining.
213 214 215 216 217 218 219 220 |
# File 'lib/tweetstream/client.rb', line 213 def on_error(&block) if block_given? @on_error = block self else @on_error end end |
- (Object) on_inited(&block)
Set a Proc to be run when connection established. Called in EventMachine::Connection#post_init
@client = TweetStream::Client.new
@client.on_inited do
puts 'Connected...'
end
309 310 311 312 313 314 315 316 |
# File 'lib/tweetstream/client.rb', line 309 def on_inited(&block) if block_given? @on_inited = block self else @on_inited end end |
- (Object) on_limit(&block)
Set a Proc to be run when a rate limit notice is received from the Twitter stream. For example:
@client = TweetStream::Client.new
@client.on_limit do |discarded_count|
# Make note of discarded count
end
Block must take one argument: the number of discarded tweets. If no block is given, it will return the currently set limit proc. When a block is given, the TweetStream::Client object is returned to allow for chaining.
191 192 193 194 195 196 197 198 |
# File 'lib/tweetstream/client.rb', line 191 def on_limit(&block) if block_given? @on_limit = block self else @on_limit end end |
- (Object) on_reconnect(&block)
Set a Proc to be run on reconnect.
@client = TweetStream::Client.new
@client.on_reconnect do |timeout, retries|
# Make note of the reconnection
end
292 293 294 295 296 297 298 299 |
# File 'lib/tweetstream/client.rb', line 292 def on_reconnect(&block) if block_given? @on_reconnect = block self else @on_reconnect end end |
- (Object) on_scrub_geo(&block)
Set a Proc to be run when a scrub_geo notice is received from the Twitter stream. For example:
@client = TweetStream::Client.new
@client.on_scrub_geo do |up_to_status_id, user_id|
Tweet.where(:status_id <= up_to_status_id)
end
Block must take two arguments: the upper status id and the user id. If no block is given, it will return the currently set scrub_geo proc. When a block is given, the TweetStream::Client object is returned to allow for chaining.
170 171 172 173 174 175 176 177 |
# File 'lib/tweetstream/client.rb', line 170 def on_scrub_geo(&block) if block_given? @on_scrub_geo = block self else @on_scrub_geo end end |
- (Object) on_timeline_status(&block)
Set a Proc to be run when a regular timeline message is encountered in the processing of the stream.
@client = TweetStream::Client.new
@client.on_timeline_status do |status|
# do something with the status
end
Block can take one or two arguments. |status (, client)| If no block is given, it will return the currently set timeline status proc. When a block is given, the TweetStream::Client object is returned to allow for chaining.
276 277 278 279 280 281 282 283 |
# File 'lib/tweetstream/client.rb', line 276 def on_timeline_status(&block) if block_given? @on_timeline_status = block self else @on_timeline_status end end |
- (Object) retweet(query_parameters = {}, &block)
Returns all retweets. The retweet stream is not a generally available resource. Few applications require this level of access. Creative use of a combination of other resources and various access levels can satisfy nearly every application use case. As of 9/11/2009, the site-wide retweet feature has not yet launched, so there are currently few, if any, retweets on this stream.
60 61 62 |
# File 'lib/tweetstream/client.rb', line 60 def retweet(query_parameters = {}, &block) start('statuses/retweet', query_parameters, &block) end |
- (Object) sample(query_parameters = {}, &block)
Returns a random sample of all public statuses. The default access level provides a small proportion of the Firehose. The "Gardenhose" access level provides a proportion more suitable for data mining and research applications that desire a larger proportion to be statistically significant sample.
69 70 71 |
# File 'lib/tweetstream/client.rb', line 69 def sample(query_parameters = {}, &block) start('statuses/sample', query_parameters, &block) end |
- (Object) sitestream(user_ids = [], query_params = {}, &block)
Make a call to the userstream api
126 127 128 129 130 131 132 133 134 135 |
# File 'lib/tweetstream/client.rb', line 126 def sitestream(user_ids = [], query_params = {}, &block) stream_params = { :host => "sitestream.twitter.com", :path => '/2b/site.json' } sitestream_params = { :method => :post, :follow => user_ids, :extra_stream_parameters => stream_params } sitestream_params.merge!(:with => 'followings') if query_params[:followings] start('', sitestream_params, &block) end |
- (Object) start(path, query_parameters = {}, &block)
connect to twitter while starting a new EventMachine run loop
319 320 321 322 323 324 325 326 327 328 329 330 |
# File 'lib/tweetstream/client.rb', line 319 def start(path, query_parameters = {}, &block) if EventMachine.reactor_running? connect(path, query_parameters, &block) else EventMachine.epoll EventMachine.kqueue EventMachine::run do connect(path, query_parameters, &block) end end end |
- (Object) stop
Terminate the currently running TweetStream and close EventMachine loop
436 437 438 439 |
# File 'lib/tweetstream/client.rb', line 436 def stop EventMachine.stop_event_loop @last_status end |
- (Object) stop_stream
446 447 448 |
# File 'lib/tweetstream/client.rb', line 446 def stop_stream @stream.stop if @stream end |
- (Object) track(*keywords, &block)
Specify keywords to track. Queries are subject to Track Limitations, described in Track Limiting and subject to access roles, described in the statuses/filter method. Track keywords are case-insensitive logical ORs. Terms are exact-matched, and also exact-matched ignoring punctuation. Phrases, keywords with spaces, are not supported. Keywords containing punctuation will only exact match tokens. Query parameters may be passed as the last argument.
80 81 82 83 84 |
# File 'lib/tweetstream/client.rb', line 80 def track(*keywords, &block) query_params = keywords.pop if keywords.last.is_a?(::Hash) query_params ||= {} filter(query_params.merge(:track => keywords), &block) end |
- (Object) userstream(&block)
Make a call to the userstream api for currently authenticated user
120 121 122 123 |
# File 'lib/tweetstream/client.rb', line 120 def userstream(&block) stream_params = { :host => "userstream.twitter.com", :path => "/2/user.json" } start('', :extra_stream_parameters => stream_params, &block) end |