twarc.Client
Twarc
Twarc allows you retrieve data from the Twitter API. Each method is an iterator that runs to completion, and handles rate limiting so that it will go to sleep when Twitter tells it to, and wake back up when it is able to retrieve data from the API again.
__init__(self, consumer_key=None, consumer_secret=None, access_token=None, access_token_secret=None, connection_errors=0, http_errors=0, config=None, profile='', protected=False, tweet_mode='extended', app_auth=False, validate_keys=True, gnip_auth=False, gnip_username=None, gnip_password=None, gnip_account=None)
special
Instantiate a Twarc instance. If keys aren't set we'll try to discover them in the environment or a supplied profile. If no profile is indicated the first section of the config files will be used.
Source code in twarc/client.py
def __init__(
self,
consumer_key=None,
consumer_secret=None,
access_token=None,
access_token_secret=None,
connection_errors=0,
http_errors=0,
config=None,
profile="",
protected=False,
tweet_mode="extended",
app_auth=False,
validate_keys=True,
gnip_auth=False,
gnip_username=None,
gnip_password=None,
gnip_account=None,
):
"""
Instantiate a Twarc instance. If keys aren't set we'll try to
discover them in the environment or a supplied profile. If no
profile is indicated the first section of the config files will
be used.
"""
self.api_version = "1.1"
self.consumer_key = consumer_key
self.consumer_secret = consumer_secret
self.access_token = access_token
self.access_token_secret = access_token_secret
self.connection_errors = connection_errors
self.http_errors = http_errors
self.profile = profile
self.client = None
self.last_response = None
self.tweet_mode = tweet_mode
self.protected = protected
self.app_auth = app_auth
self.gnip_auth = gnip_auth
self.gnip_username = gnip_username
self.gnip_password = gnip_password
self.gnip_account = gnip_account
if config:
self.config = config
else:
self.config = self.default_config()
self.get_keys()
if validate_keys:
self.validate_keys()
dehydrate(self, iterator)
Pass in an iterator of tweets' JSON and get back an iterator of the IDs of each tweet.
Source code in twarc/client.py
def dehydrate(self, iterator):
"""
Pass in an iterator of tweets' JSON and get back an iterator of the
IDs of each tweet.
"""
for line in iterator:
try:
yield json.loads(line)["id_str"]
except Exception as e:
log.error("uhoh: %s\n" % e)
follower_ids(self, user, max_pages=None)
Returns Twitter user id lists for the specified user's followers. A user can be a specific using their screen_name or user_id
Source code in twarc/client.py
def follower_ids(self, user, max_pages=None):
"""
Returns Twitter user id lists for the specified user's followers.
A user can be a specific using their screen_name or user_id
"""
user = str(user)
user = user.lstrip("@")
url = "https://api.twitter.com/1.1/followers/ids.json"
if re.match(r"^\d+$", user):
params = {"user_id": user, "cursor": -1}
else:
params = {"screen_name": user, "cursor": -1}
retrieved_pages = 0
while params["cursor"] != 0:
try:
resp = self.get(url, params=params, allow_404=True)
retrieved_pages += 1
except requests.exceptions.HTTPError as e:
if e.response.status_code == 404:
log.info("no users matching %s", screen_name)
raise e
user_ids = resp.json()
for user_id in user_ids["ids"]:
yield str_type(user_id)
params["cursor"] = user_ids["next_cursor"]
if max_pages is not None and retrieved_pages == max_pages:
log.info("reached max follower page limit for %s", params)
break
friend_ids(self, user, max_pages=None)
Returns Twitter user id lists for the specified user's friend. A user can be specified using their screen_name or user_id.
Source code in twarc/client.py
def friend_ids(self, user, max_pages=None):
"""
Returns Twitter user id lists for the specified user's friend. A user
can be specified using their screen_name or user_id.
"""
user = str(user)
user = user.lstrip("@")
url = "https://api.twitter.com/1.1/friends/ids.json"
if re.match(r"^\d+$", user):
params = {"user_id": user, "cursor": -1}
else:
params = {"screen_name": user, "cursor": -1}
retrieved_pages = 0
while params["cursor"] != 0:
try:
resp = self.get(url, params=params, allow_404=True)
retrieved_pages += 1
except requests.exceptions.HTTPError as e:
if e.response.status_code == 404:
log.error("no users matching %s", user)
raise e
user_ids = resp.json()
for user_id in user_ids["ids"]:
yield str_type(user_id)
params["cursor"] = user_ids["next_cursor"]
if max_pages is not None and retrieved_pages == max_pages:
log.info("reached max friend page limit for %s", params)
break
get_keys(self)
Get the Twitter API keys. Order of precedence is command line, environment, config file. Return True if all the keys were found and False if not.
Source code in twarc/client.py
def get_keys(self):
"""
Get the Twitter API keys. Order of precedence is command line,
environment, config file. Return True if all the keys were found
and False if not.
"""
env = os.environ.get
if not self.consumer_key:
self.consumer_key = env("CONSUMER_KEY")
if not self.consumer_secret:
self.consumer_secret = env("CONSUMER_SECRET")
if not self.access_token:
self.access_token = env("ACCESS_TOKEN")
if not self.access_token_secret:
self.access_token_secret = env("ACCESS_TOKEN_SECRET")
if not self.gnip_username:
self.gnip_username = env("GNIP_USERNAME")
if not self.gnip_password:
self.gnip_password = env("GNIP_PASSWORD")
if not self.gnip_account:
self.gnip_account = env("GNIP_ACCOUNT")
if self.config:
if self.gnip_auth and not (
self.gnip_username and self.gnip_password and self.gnip_account
):
self.load_config()
elif not self.gnip_auth and not (
self.consumer_key
and self.consumer_secret
and self.access_token
and self.access_token_secret
):
self.load_config()
hydrate(self, iterator, trim_user=False)
Pass in an iterator of tweet ids and get back an iterator for the decoded JSON for each corresponding tweet.
Source code in twarc/client.py
def hydrate(self, iterator, trim_user=False):
"""
Pass in an iterator of tweet ids and get back an iterator for the
decoded JSON for each corresponding tweet.
"""
ids = []
url = "https://api.twitter.com/1.1/statuses/lookup.json"
# lookup 100 tweets at a time
for tweet_id in iterator:
tweet_id = str(tweet_id)
tweet_id = tweet_id.strip() # remove new line if present
ids.append(tweet_id)
if len(ids) == 100:
log.info("hydrating %s ids", len(ids))
resp = self.post(
url,
data={
"id": ",".join(ids),
"include_ext_alt_text": "true",
"include_entities": "true",
"trim_user": trim_user,
},
)
tweets = resp.json()
tweets.sort(key=lambda t: t["id_str"])
for tweet in tweets:
yield tweet
ids = []
# hydrate any remaining ones
if len(ids) > 0:
log.info("hydrating %s", ids)
resp = self.post(
url,
data={
"id": ",".join(ids),
"include_ext_alt_text": "true",
"include_entities": "true",
"trim_user": trim_user,
},
)
for tweet in resp.json():
yield tweet
list_members(self, list_id=None, slug=None, owner_screen_name=None, owner_id=None)
Returns the members of a list.
List id or (slug and (owner_screen_name or owner_id)) are required
Source code in twarc/client.py
def list_members(
self, list_id=None, slug=None, owner_screen_name=None, owner_id=None
):
"""
Returns the members of a list.
List id or (slug and (owner_screen_name or owner_id)) are required
"""
assert list_id or (slug and (owner_screen_name or owner_id))
url = "https://api.twitter.com/1.1/lists/members.json"
params = {"cursor": -1}
if list_id:
params["list_id"] = list_id
else:
params["slug"] = slug
if owner_screen_name:
params["owner_screen_name"] = owner_screen_name
else:
params["owner_id"] = owner_id
while params["cursor"] != 0:
try:
resp = self.get(url, params=params, allow_404=True)
except requests.exceptions.HTTPError as e:
if e.response.status_code == 404:
log.error("no matching list")
raise e
users = resp.json()
for user in users["users"]:
yield user
params["cursor"] = users["next_cursor"]
oembed(self, tweet_url, **params)
Returns the oEmbed JSON for a tweet. The JSON includes an html key that contains the HTML for the embed. You can pass in parameters that correspond to the paramters that Twitter's statuses/oembed endpoint supports. For example:
o = client.oembed('https://twitter.com/biz/status/21', theme='dark')
Source code in twarc/client.py
def oembed(self, tweet_url, **params):
"""
Returns the oEmbed JSON for a tweet. The JSON includes an html
key that contains the HTML for the embed. You can pass in
parameters that correspond to the paramters that Twitter's
statuses/oembed endpoint supports. For example:
o = client.oembed('https://twitter.com/biz/status/21', theme='dark')
"""
log.info("generating embedding for tweet %s", tweet_url)
url = "https://publish.twitter.com/oembed"
params["url"] = tweet_url
resp = self.get(url, params=params)
return resp.json()
premium_search(self, q, product, environment, from_date=None, to_date=None, max_results=None, sandbox=False, limit=0)
Search using the Premium Search API. You will need to pass in a query a product (30day or fullarchive) and environment to use. Optionally you can pass in a from_date and to_date to limit the search using datetime objects. If you would like to set max_results you can, or you can accept the maximum results (500). If using the a sandbox environment you will want to set sandbox=True to lower the max_results to 100. The limit option will cause your search to finish after it has return more than that number of tweets (0 means no limit).
Source code in twarc/client.py
def premium_search(
self,
q,
product,
environment,
from_date=None,
to_date=None,
max_results=None,
sandbox=False,
limit=0,
):
"""
Search using the Premium Search API. You will need to pass in a query
a product (30day or fullarchive) and environment to use. Optionally
you can pass in a from_date and to_date to limit the search using
datetime objects. If you would like to set max_results you can, or
you can accept the maximum results (500). If using the a sandbox
environment you will want to set sandbox=True to lower the max_results
to 100. The limit option will cause your search to finish after it has
return more than that number of tweets (0 means no limit).
"""
if not self.app_auth and not self.gnip_auth:
raise RuntimeError(
"This endpoint is only available with application authentication. "
"Pass app_auth=True in Python or --app-auth on the command line."
)
if from_date and not isinstance(from_date, datetime.date):
raise RuntimeError(
"from_date must be a datetime.date or datetime.datetime object"
)
if to_date and not isinstance(to_date, datetime.date):
raise RuntimeError(
"to_date must be a datetime.date or datetime.datetime object"
)
if product not in ["30day", "gnip_fullarchive", "fullarchive"]:
raise RuntimeError("Invalid Premium Search API product: {}".format(product))
# set default max_results based on whether its sandboxed
if max_results is None:
if sandbox:
max_results = 100
else:
max_results = 500
if product == "gnip_fullarchive":
url = "https://gnip-api.twitter.com/search/fullarchive/accounts/{}/{}.json".format(
self.gnip_account, environment
)
else:
url = "https://api.twitter.com/1.1/tweets/search/{}/{}.json".format(
product, environment
)
params = {
"query": q,
"fromDate": from_date.strftime("%Y%m%d%H%M") if from_date else None,
"toDate": to_date.strftime("%Y%m%d%H%M") if to_date else None,
"maxResults": max_results,
}
count = 0
stop = False
while not stop:
resp = self.get(url, params=params)
if resp.status_code == 200:
data = resp.json()
for tweet in data["results"]:
count += 1
yield tweet
if limit != 0 and count >= limit:
stop = True
break
if "next" in data:
params["next"] = data["next"]
else:
stop = True
elif resp.status_code == 422:
raise RuntimeError(
"Twitter API 422 response: are you using a premium search sandbox environment and forgot the --sandbox argument?"
)
replies(self, tweet, recursive=False, prune=())
replies returns a generator of tweets that are replies for a given tweet. It includes the original tweet. If you would like to fetch the replies to the replies use recursive=True which will do a depth-first recursive walk of the replies. It also walk up the reply chain if you supply a tweet that is itself a reply to another tweet. You can optionally supply a tuple of tweet ids to ignore during this traversal using the prune parameter.
Source code in twarc/client.py
def replies(self, tweet, recursive=False, prune=()):
"""
replies returns a generator of tweets that are replies for a given
tweet. It includes the original tweet. If you would like to fetch the
replies to the replies use recursive=True which will do a depth-first
recursive walk of the replies. It also walk up the reply chain if you
supply a tweet that is itself a reply to another tweet. You can
optionally supply a tuple of tweet ids to ignore during this traversal
using the prune parameter.
"""
yield tweet
# get replies to the tweet
screen_name = tweet["user"]["screen_name"]
tweet_id = tweet["id_str"]
log.info("looking for replies to: %s", tweet_id)
for reply in self.search("to:%s" % screen_name, since_id=tweet_id):
if reply["in_reply_to_status_id_str"] != tweet_id:
continue
if reply["id_str"] in prune:
log.info("ignoring pruned tweet id %s", reply["id_str"])
continue
log.info("found reply: %s", reply["id_str"])
if recursive:
if reply["id_str"] not in prune:
prune = prune + (tweet_id,)
for r in self.replies(reply, recursive, prune):
yield r
else:
yield reply
# if this tweet is itself a reply to another tweet get it and
# get other potential replies to it
reply_to_id = tweet.get("in_reply_to_status_id_str")
log.info("prune=%s", prune)
if recursive and reply_to_id and reply_to_id not in prune:
t = self.tweet(reply_to_id)
if t:
log.info("found reply-to: %s", t["id_str"])
prune = prune + (tweet["id_str"],)
for r in self.replies(t, recursive=True, prune=prune):
yield r
# if this tweet is a quote go get that too whatever tweets it
# may be in reply to
quote_id = tweet.get("quoted_status_id_str")
if recursive and quote_id and quote_id not in prune:
t = self.tweet(quote_id)
if t:
log.info("found quote: %s", t["id_str"])
prune = prune + (tweet["id_str"],)
for r in self.replies(t, recursive=True, prune=prune):
yield r
retweets(self, tweet_ids)
Retrieves up to the last 100 retweets for the provided iterator of tweet_ids.
Source code in twarc/client.py
def retweets(self, tweet_ids):
"""
Retrieves up to the last 100 retweets for the provided iterator of tweet_ids.
"""
if not isinstance(tweet_ids, types.GeneratorType):
tweet_ids = iter(tweet_ids)
for tweet_id in tweet_ids:
if hasattr(tweet_id, "strip"):
tweet_id = tweet_id.strip()
log.info("retrieving retweets of %s", tweet_id)
url = "https://api.twitter.com/1.1/statuses/retweets/" "{}.json".format(
tweet_id
)
try:
resp = self.get(url, params={"count": 100}, allow_404=True)
for tweet in resp.json():
yield tweet
except requests.exceptions.HTTPError as e:
if e.response.status_code == 404:
log.info("can't get tweets for non-existent tweet: %s", tweet_id)
sample(self, event=None, record_keepalive=False)
Returns a small random sample of all public statuses. The Tweets returned by the default access level are the same, so if two different clients connect to this endpoint, they will see the same Tweets.
If a threading.Event is provided for event and the event is set, the sample will be interrupted.
Source code in twarc/client.py
def sample(self, event=None, record_keepalive=False):
"""
Returns a small random sample of all public statuses. The Tweets
returned by the default access level are the same, so if two different
clients connect to this endpoint, they will see the same Tweets.
If a threading.Event is provided for event and the event is set,
the sample will be interrupted.
"""
url = "https://stream.twitter.com/1.1/statuses/sample.json"
params = {"stall_warning": True}
headers = {"accept-encoding": "deflate, gzip"}
errors = 0
while True:
try:
log.info("connecting to sample stream")
resp = self.post(url, params, headers=headers, stream=True)
errors = 0
for line in resp.iter_lines(chunk_size=512):
if event and event.is_set():
log.info("stopping sample")
# Explicitly close response
resp.close()
return
if line == "":
log.info("keep-alive")
if record_keepalive:
yield "keep-alive"
continue
try:
yield json.loads(line.decode())
except Exception as e:
log.error("json parse error: %s - %s", e, line)
except requests.exceptions.HTTPError as e:
errors += 1
log.error("caught http error %s on %s try", e, errors)
if self.http_errors and errors == self.http_errors:
log.warning("too many errors")
raise e
if e.response.status_code == 420:
if interruptible_sleep(errors * 60, event):
log.info("stopping filter")
return
else:
if interruptible_sleep(errors * 5, event):
log.info("stopping filter")
return
except Exception as e:
errors += 1
log.error("caught exception %s on %s try", e, errors)
if self.http_errors and errors == self.http_errors:
log.warning("too many errors")
raise e
if interruptible_sleep(errors, event):
log.info("stopping filter")
return
timeline(self, user_id=None, screen_name=None, max_id=None, since_id=None, max_pages=None)
Returns a collection of the most recent tweets posted by the user indicated by the user_id or screen_name parameter. Provide a user_id or screen_name.
Source code in twarc/client.py
def timeline(
self, user_id=None, screen_name=None, max_id=None, since_id=None, max_pages=None
):
"""
Returns a collection of the most recent tweets posted
by the user indicated by the user_id or screen_name parameter.
Provide a user_id or screen_name.
"""
if user_id and screen_name:
raise ValueError("only user_id or screen_name may be passed")
# Strip if screen_name is prefixed with '@'
if screen_name:
screen_name = screen_name.lstrip("@")
id = screen_name or str(user_id)
id_type = "screen_name" if screen_name else "user_id"
log.info("starting user timeline for user %s", id)
if screen_name or user_id:
url = "https://api.twitter.com/1.1/statuses/user_timeline.json"
else:
url = "https://api.twitter.com/1.1/statuses/home_timeline.json"
params = {"count": 200, id_type: id, "include_ext_alt_text": "true"}
retrieved_pages = 0
reached_end = False
while True:
if since_id:
# Make the since_id inclusive, so we can avoid retrieving
# an empty page of results in some cases
params["since_id"] = str(int(since_id) - 1)
if max_id:
params["max_id"] = max_id
try:
resp = self.get(url, params=params, allow_404=True)
retrieved_pages += 1
except requests.exceptions.HTTPError as e:
if e.response.status_code == 404:
log.warn("no timeline available for %s", id)
break
elif e.response.status_code == 401:
log.warn("protected account %s", id)
break
raise e
statuses = resp.json()
if len(statuses) == 0:
log.info("no new tweets matching %s", params)
break
for status in statuses:
# We've certainly reached the end of new results
if since_id is not None and status["id_str"] == str(since_id):
reached_end = True
break
# If you request an invalid user_id, you may still get
# results so need to check.
if not user_id or id == status.get("user", {}).get("id_str"):
yield status
if reached_end:
log.info("no new tweets matching %s", params)
break
if max_pages is not None and retrieved_pages == max_pages:
log.info("reached max page limit for %s", params)
break
max_id = str(int(status["id_str"]) - 1)
trends_available(self)
Returns a list of regions for which Twitter tracks trends.
Source code in twarc/client.py
def trends_available(self):
"""
Returns a list of regions for which Twitter tracks trends.
"""
url = "https://api.twitter.com/1.1/trends/available.json"
try:
resp = self.get(url)
except requests.exceptions.HTTPError as e:
raise e
return resp.json()
trends_closest(self, lat, lon)
Returns the closest regions for the supplied lat/lon.
Source code in twarc/client.py
def trends_closest(self, lat, lon):
"""
Returns the closest regions for the supplied lat/lon.
"""
url = "https://api.twitter.com/1.1/trends/closest.json"
params = {"lat": lat, "long": lon}
try:
resp = self.get(url, params=params)
except requests.exceptions.HTTPError as e:
raise e
return resp.json()
trends_place(self, woeid, exclude=None)
Returns recent Twitter trends for the specified WOEID. If exclude == 'hashtags', Twitter will remove hashtag trends from the response.
Source code in twarc/client.py
def trends_place(self, woeid, exclude=None):
"""
Returns recent Twitter trends for the specified WOEID. If
exclude == 'hashtags', Twitter will remove hashtag trends from the
response.
"""
url = "https://api.twitter.com/1.1/trends/place.json"
params = {"id": woeid}
if exclude:
params["exclude"] = exclude
try:
resp = self.get(url, params=params, allow_404=True)
except requests.exceptions.HTTPError as e:
if e.response.status_code == 404:
log.info("no region matching WOEID %s", woeid)
raise e
return resp.json()
user_lookup(self, ids, id_type='user_id')
A generator that returns users for supplied user ids, screen_names, or an iterator of user_ids of either. Use the id_type to indicate which you are supplying (user_id or screen_name)
Source code in twarc/client.py
def user_lookup(self, ids, id_type="user_id"):
"""
A generator that returns users for supplied user ids, screen_names,
or an iterator of user_ids of either. Use the id_type to indicate
which you are supplying (user_id or screen_name)
"""
if id_type not in ["user_id", "screen_name"]:
raise RuntimeError("id_type must be user_id or screen_name")
if not isinstance(ids, types.GeneratorType):
ids = iter(ids)
# TODO: this is similar to hydrate, maybe they could share code?
lookup_ids = []
def do_lookup():
ids_str = ",".join(lookup_ids)
log.info("looking up users %s", ids_str)
url = "https://api.twitter.com/1.1/users/lookup.json"
params = {id_type: ids_str}
try:
resp = self.get(url, params=params, allow_404=True)
except requests.exceptions.HTTPError as e:
if e.response.status_code == 404:
log.warning("no users matching %s", ids_str)
raise e
return resp.json()
for id in ids:
lookup_ids.append(str(id).strip())
if len(lookup_ids) == 100:
for u in do_lookup():
yield u
lookup_ids = []
if len(lookup_ids) > 0:
for u in do_lookup():
yield u
validate_keys(self)
Validate the keys provided are authentic credentials.
Source code in twarc/client.py
def validate_keys(self):
"""
Validate the keys provided are authentic credentials.
"""
if self.gnip_auth:
url = "https://gnip-api.twitter.com/metrics/usage/accounts/{}.json".format(
self.gnip_account
)
keys_present = (
self.gnip_account and self.gnip_username and self.gnip_password
)
elif self.app_auth:
# no need to validate keys when using OAuth2 App Auth.
return True
else:
url = "https://api.twitter.com/1.1/account/verify_credentials.json"
keys_present = (
self.consumer_key
and self.consumer_secret
and self.access_token
and self.access_token_secret
)
if keys_present:
try:
# Need to explicitly reconnect to confirm the current creds
# are used in the session object.
self.connect()
self.get(url)
return True
except requests.HTTPError as e:
if e.response.status_code == 401:
raise RuntimeError("Invalid credentials provided.")
else:
raise e
else:
print("Incomplete credentials provided.")
print('Please run the command "twarc configure" to get started.')
sys.exit()
handler: python