Skip to content

twarc.Client

Twarc

Twarc allows you retrieve data from the Twitter API. Each method is an iterator that runs to completion, and handles rate limiting so that it will go to sleep when Twitter tells it to, and wake back up when it is able to retrieve data from the API again.

__init__(self, consumer_key=None, consumer_secret=None, access_token=None, access_token_secret=None, connection_errors=0, http_errors=0, config=None, profile='', protected=False, tweet_mode='extended', app_auth=False, validate_keys=True, gnip_auth=False, gnip_username=None, gnip_password=None, gnip_account=None) special

Instantiate a Twarc instance. If keys aren't set we'll try to discover them in the environment or a supplied profile. If no profile is indicated the first section of the config files will be used.

Source code in twarc/client.py
def __init__(
    self,
    consumer_key=None,
    consumer_secret=None,
    access_token=None,
    access_token_secret=None,
    connection_errors=0,
    http_errors=0,
    config=None,
    profile="",
    protected=False,
    tweet_mode="extended",
    app_auth=False,
    validate_keys=True,
    gnip_auth=False,
    gnip_username=None,
    gnip_password=None,
    gnip_account=None,
):
    """
    Instantiate a Twarc instance. If keys aren't set we'll try to
    discover them in the environment or a supplied profile. If no
    profile is indicated the first section of the config files will
    be used.
    """

    self.api_version = "1.1"
    self.consumer_key = consumer_key
    self.consumer_secret = consumer_secret
    self.access_token = access_token
    self.access_token_secret = access_token_secret
    self.connection_errors = connection_errors
    self.http_errors = http_errors
    self.profile = profile
    self.client = None
    self.last_response = None
    self.tweet_mode = tweet_mode
    self.protected = protected
    self.app_auth = app_auth
    self.gnip_auth = gnip_auth
    self.gnip_username = gnip_username
    self.gnip_password = gnip_password
    self.gnip_account = gnip_account

    if config:
        self.config = config
    else:
        self.config = self.default_config()

    self.get_keys()

    if validate_keys:
        self.validate_keys()

dehydrate(self, iterator)

Pass in an iterator of tweets' JSON and get back an iterator of the IDs of each tweet.

Source code in twarc/client.py
def dehydrate(self, iterator):
    """
    Pass in an iterator of tweets' JSON and get back an iterator of the
    IDs of each tweet.
    """
    for line in iterator:
        try:
            yield json.loads(line)["id_str"]
        except Exception as e:
            log.error("uhoh: %s\n" % e)

follower_ids(self, user, max_pages=None)

Returns Twitter user id lists for the specified user's followers. A user can be a specific using their screen_name or user_id

Source code in twarc/client.py
def follower_ids(self, user, max_pages=None):
    """
    Returns Twitter user id lists for the specified user's followers.
    A user can be a specific using their screen_name or user_id
    """
    user = str(user)
    user = user.lstrip("@")
    url = "https://api.twitter.com/1.1/followers/ids.json"

    if re.match(r"^\d+$", user):
        params = {"user_id": user, "cursor": -1}
    else:
        params = {"screen_name": user, "cursor": -1}

    retrieved_pages = 0

    while params["cursor"] != 0:
        try:
            resp = self.get(url, params=params, allow_404=True)
            retrieved_pages += 1
        except requests.exceptions.HTTPError as e:
            if e.response.status_code == 404:
                log.info("no users matching %s", screen_name)
            raise e
        user_ids = resp.json()
        for user_id in user_ids["ids"]:
            yield str_type(user_id)
        params["cursor"] = user_ids["next_cursor"]

        if max_pages is not None and retrieved_pages == max_pages:
            log.info("reached max follower page limit for %s", params)
            break

friend_ids(self, user, max_pages=None)

Returns Twitter user id lists for the specified user's friend. A user can be specified using their screen_name or user_id.

Source code in twarc/client.py
def friend_ids(self, user, max_pages=None):
    """
    Returns Twitter user id lists for the specified user's friend. A user
    can be specified using their screen_name or user_id.
    """
    user = str(user)
    user = user.lstrip("@")
    url = "https://api.twitter.com/1.1/friends/ids.json"

    if re.match(r"^\d+$", user):
        params = {"user_id": user, "cursor": -1}
    else:
        params = {"screen_name": user, "cursor": -1}

    retrieved_pages = 0

    while params["cursor"] != 0:
        try:
            resp = self.get(url, params=params, allow_404=True)
            retrieved_pages += 1
        except requests.exceptions.HTTPError as e:
            if e.response.status_code == 404:
                log.error("no users matching %s", user)
            raise e

        user_ids = resp.json()
        for user_id in user_ids["ids"]:
            yield str_type(user_id)
        params["cursor"] = user_ids["next_cursor"]

        if max_pages is not None and retrieved_pages == max_pages:
            log.info("reached max friend page limit for %s", params)
            break

get_keys(self)

Get the Twitter API keys. Order of precedence is command line, environment, config file. Return True if all the keys were found and False if not.

Source code in twarc/client.py
def get_keys(self):
    """
    Get the Twitter API keys. Order of precedence is command line,
    environment, config file. Return True if all the keys were found
    and False if not.
    """
    env = os.environ.get
    if not self.consumer_key:
        self.consumer_key = env("CONSUMER_KEY")
    if not self.consumer_secret:
        self.consumer_secret = env("CONSUMER_SECRET")
    if not self.access_token:
        self.access_token = env("ACCESS_TOKEN")
    if not self.access_token_secret:
        self.access_token_secret = env("ACCESS_TOKEN_SECRET")
    if not self.gnip_username:
        self.gnip_username = env("GNIP_USERNAME")
    if not self.gnip_password:
        self.gnip_password = env("GNIP_PASSWORD")
    if not self.gnip_account:
        self.gnip_account = env("GNIP_ACCOUNT")

    if self.config:
        if self.gnip_auth and not (
            self.gnip_username and self.gnip_password and self.gnip_account
        ):
            self.load_config()
        elif not self.gnip_auth and not (
            self.consumer_key
            and self.consumer_secret
            and self.access_token
            and self.access_token_secret
        ):
            self.load_config()

hydrate(self, iterator, trim_user=False)

Pass in an iterator of tweet ids and get back an iterator for the decoded JSON for each corresponding tweet.

Source code in twarc/client.py
def hydrate(self, iterator, trim_user=False):
    """
    Pass in an iterator of tweet ids and get back an iterator for the
    decoded JSON for each corresponding tweet.
    """
    ids = []
    url = "https://api.twitter.com/1.1/statuses/lookup.json"

    # lookup 100 tweets at a time
    for tweet_id in iterator:
        tweet_id = str(tweet_id)
        tweet_id = tweet_id.strip()  # remove new line if present
        ids.append(tweet_id)
        if len(ids) == 100:
            log.info("hydrating %s ids", len(ids))
            resp = self.post(
                url,
                data={
                    "id": ",".join(ids),
                    "include_ext_alt_text": "true",
                    "include_entities": "true",
                    "trim_user": trim_user,
                },
            )
            tweets = resp.json()
            tweets.sort(key=lambda t: t["id_str"])
            for tweet in tweets:
                yield tweet
            ids = []

    # hydrate any remaining ones
    if len(ids) > 0:
        log.info("hydrating %s", ids)
        resp = self.post(
            url,
            data={
                "id": ",".join(ids),
                "include_ext_alt_text": "true",
                "include_entities": "true",
                "trim_user": trim_user,
            },
        )
        for tweet in resp.json():
            yield tweet

list_members(self, list_id=None, slug=None, owner_screen_name=None, owner_id=None)

Returns the members of a list.

List id or (slug and (owner_screen_name or owner_id)) are required

Source code in twarc/client.py
def list_members(
    self, list_id=None, slug=None, owner_screen_name=None, owner_id=None
):
    """
    Returns the members of a list.

    List id or (slug and (owner_screen_name or owner_id)) are required
    """
    assert list_id or (slug and (owner_screen_name or owner_id))
    url = "https://api.twitter.com/1.1/lists/members.json"
    params = {"cursor": -1}
    if list_id:
        params["list_id"] = list_id
    else:
        params["slug"] = slug
        if owner_screen_name:
            params["owner_screen_name"] = owner_screen_name
        else:
            params["owner_id"] = owner_id

    while params["cursor"] != 0:
        try:
            resp = self.get(url, params=params, allow_404=True)
        except requests.exceptions.HTTPError as e:
            if e.response.status_code == 404:
                log.error("no matching list")
            raise e

        users = resp.json()
        for user in users["users"]:
            yield user
        params["cursor"] = users["next_cursor"]

oembed(self, tweet_url, **params)

Returns the oEmbed JSON for a tweet. The JSON includes an html key that contains the HTML for the embed. You can pass in parameters that correspond to the paramters that Twitter's statuses/oembed endpoint supports. For example:

o = client.oembed('https://twitter.com/biz/status/21', theme='dark')

Source code in twarc/client.py
def oembed(self, tweet_url, **params):
    """
    Returns the oEmbed JSON for a tweet. The JSON includes an html
    key that contains the HTML for the embed. You can pass in
    parameters that correspond to the paramters that Twitter's
    statuses/oembed endpoint supports. For example:

    o = client.oembed('https://twitter.com/biz/status/21', theme='dark')
    """
    log.info("generating embedding for tweet %s", tweet_url)
    url = "https://publish.twitter.com/oembed"

    params["url"] = tweet_url
    resp = self.get(url, params=params)

    return resp.json()

Search using the Premium Search API. You will need to pass in a query a product (30day or fullarchive) and environment to use. Optionally you can pass in a from_date and to_date to limit the search using datetime objects. If you would like to set max_results you can, or you can accept the maximum results (500). If using the a sandbox environment you will want to set sandbox=True to lower the max_results to 100. The limit option will cause your search to finish after it has return more than that number of tweets (0 means no limit).

Source code in twarc/client.py
def premium_search(
    self,
    q,
    product,
    environment,
    from_date=None,
    to_date=None,
    max_results=None,
    sandbox=False,
    limit=0,
):
    """
    Search using the Premium Search API. You will need to pass in a query
    a product (30day or fullarchive) and environment to use. Optionally
    you can pass in a from_date and to_date to limit the search using
    datetime objects. If you would like to set max_results you can, or
    you can accept the maximum results (500). If using the a sandbox
    environment you will want to set sandbox=True to lower the max_results
    to 100. The limit option will cause your search to finish after it has
    return more than that number of tweets (0 means no limit).
    """

    if not self.app_auth and not self.gnip_auth:
        raise RuntimeError(
            "This endpoint is only available with application authentication. "
            "Pass app_auth=True in Python or --app-auth on the command line."
        )

    if from_date and not isinstance(from_date, datetime.date):
        raise RuntimeError(
            "from_date must be a datetime.date or datetime.datetime object"
        )
    if to_date and not isinstance(to_date, datetime.date):
        raise RuntimeError(
            "to_date must be a datetime.date or datetime.datetime object"
        )

    if product not in ["30day", "gnip_fullarchive", "fullarchive"]:
        raise RuntimeError("Invalid Premium Search API product: {}".format(product))

    # set default max_results based on whether its sandboxed
    if max_results is None:
        if sandbox:
            max_results = 100
        else:
            max_results = 500

    if product == "gnip_fullarchive":
        url = "https://gnip-api.twitter.com/search/fullarchive/accounts/{}/{}.json".format(
            self.gnip_account, environment
        )
    else:
        url = "https://api.twitter.com/1.1/tweets/search/{}/{}.json".format(
            product, environment
        )

    params = {
        "query": q,
        "fromDate": from_date.strftime("%Y%m%d%H%M") if from_date else None,
        "toDate": to_date.strftime("%Y%m%d%H%M") if to_date else None,
        "maxResults": max_results,
    }

    count = 0
    stop = False
    while not stop:
        resp = self.get(url, params=params)
        if resp.status_code == 200:
            data = resp.json()
            for tweet in data["results"]:
                count += 1
                yield tweet
                if limit != 0 and count >= limit:
                    stop = True
                    break
            if "next" in data:
                params["next"] = data["next"]
            else:
                stop = True
        elif resp.status_code == 422:
            raise RuntimeError(
                "Twitter API 422 response: are you using a premium search sandbox environment and forgot the --sandbox argument?"
            )

replies(self, tweet, recursive=False, prune=())

replies returns a generator of tweets that are replies for a given tweet. It includes the original tweet. If you would like to fetch the replies to the replies use recursive=True which will do a depth-first recursive walk of the replies. It also walk up the reply chain if you supply a tweet that is itself a reply to another tweet. You can optionally supply a tuple of tweet ids to ignore during this traversal using the prune parameter.

Source code in twarc/client.py
def replies(self, tweet, recursive=False, prune=()):
    """
    replies returns a generator of tweets that are replies for a given
    tweet. It includes the original tweet. If you would like to fetch the
    replies to the replies use recursive=True which will do a depth-first
    recursive walk of the replies. It also walk up the reply chain if you
    supply a tweet that is itself a reply to another tweet. You can
    optionally supply a tuple of tweet ids to ignore during this traversal
    using the prune parameter.
    """

    yield tweet

    # get replies to the tweet
    screen_name = tweet["user"]["screen_name"]
    tweet_id = tweet["id_str"]
    log.info("looking for replies to: %s", tweet_id)
    for reply in self.search("to:%s" % screen_name, since_id=tweet_id):

        if reply["in_reply_to_status_id_str"] != tweet_id:
            continue

        if reply["id_str"] in prune:
            log.info("ignoring pruned tweet id %s", reply["id_str"])
            continue

        log.info("found reply: %s", reply["id_str"])

        if recursive:
            if reply["id_str"] not in prune:
                prune = prune + (tweet_id,)
                for r in self.replies(reply, recursive, prune):
                    yield r
        else:
            yield reply

    # if this tweet is itself a reply to another tweet get it and
    # get other potential replies to it

    reply_to_id = tweet.get("in_reply_to_status_id_str")
    log.info("prune=%s", prune)
    if recursive and reply_to_id and reply_to_id not in prune:
        t = self.tweet(reply_to_id)
        if t:
            log.info("found reply-to: %s", t["id_str"])
            prune = prune + (tweet["id_str"],)
            for r in self.replies(t, recursive=True, prune=prune):
                yield r

    # if this tweet is a quote go get that too whatever tweets it
    # may be in reply to

    quote_id = tweet.get("quoted_status_id_str")
    if recursive and quote_id and quote_id not in prune:
        t = self.tweet(quote_id)
        if t:
            log.info("found quote: %s", t["id_str"])
            prune = prune + (tweet["id_str"],)
            for r in self.replies(t, recursive=True, prune=prune):
                yield r

retweets(self, tweet_ids)

Retrieves up to the last 100 retweets for the provided iterator of tweet_ids.

Source code in twarc/client.py
def retweets(self, tweet_ids):
    """
    Retrieves up to the last 100 retweets for the provided iterator of tweet_ids.
    """
    if not isinstance(tweet_ids, types.GeneratorType):
        tweet_ids = iter(tweet_ids)

    for tweet_id in tweet_ids:
        if hasattr(tweet_id, "strip"):
            tweet_id = tweet_id.strip()
        log.info("retrieving retweets of %s", tweet_id)
        url = "https://api.twitter.com/1.1/statuses/retweets/" "{}.json".format(
            tweet_id
        )
        try:
            resp = self.get(url, params={"count": 100}, allow_404=True)
            for tweet in resp.json():
                yield tweet
        except requests.exceptions.HTTPError as e:
            if e.response.status_code == 404:
                log.info("can't get tweets for non-existent tweet: %s", tweet_id)

sample(self, event=None, record_keepalive=False)

Returns a small random sample of all public statuses. The Tweets returned by the default access level are the same, so if two different clients connect to this endpoint, they will see the same Tweets.

If a threading.Event is provided for event and the event is set, the sample will be interrupted.

Source code in twarc/client.py
def sample(self, event=None, record_keepalive=False):
    """
    Returns a small random sample of all public statuses. The Tweets
    returned by the default access level are the same, so if two different
    clients connect to this endpoint, they will see the same Tweets.

    If a threading.Event is provided for event and the event is set,
    the sample will be interrupted.
    """
    url = "https://stream.twitter.com/1.1/statuses/sample.json"
    params = {"stall_warning": True}
    headers = {"accept-encoding": "deflate, gzip"}
    errors = 0
    while True:
        try:
            log.info("connecting to sample stream")
            resp = self.post(url, params, headers=headers, stream=True)
            errors = 0
            for line in resp.iter_lines(chunk_size=512):
                if event and event.is_set():
                    log.info("stopping sample")
                    # Explicitly close response
                    resp.close()
                    return
                if line == "":
                    log.info("keep-alive")
                    if record_keepalive:
                        yield "keep-alive"
                    continue
                try:
                    yield json.loads(line.decode())
                except Exception as e:
                    log.error("json parse error: %s - %s", e, line)
        except requests.exceptions.HTTPError as e:
            errors += 1
            log.error("caught http error %s on %s try", e, errors)
            if self.http_errors and errors == self.http_errors:
                log.warning("too many errors")
                raise e
            if e.response.status_code == 420:
                if interruptible_sleep(errors * 60, event):
                    log.info("stopping filter")
                    return
            else:
                if interruptible_sleep(errors * 5, event):
                    log.info("stopping filter")
                    return

        except Exception as e:
            errors += 1
            log.error("caught exception %s on %s try", e, errors)
            if self.http_errors and errors == self.http_errors:
                log.warning("too many errors")
                raise e
            if interruptible_sleep(errors, event):
                log.info("stopping filter")
                return

timeline(self, user_id=None, screen_name=None, max_id=None, since_id=None, max_pages=None)

Returns a collection of the most recent tweets posted by the user indicated by the user_id or screen_name parameter. Provide a user_id or screen_name.

Source code in twarc/client.py
def timeline(
    self, user_id=None, screen_name=None, max_id=None, since_id=None, max_pages=None
):
    """
    Returns a collection of the most recent tweets posted
    by the user indicated by the user_id or screen_name parameter.
    Provide a user_id or screen_name.
    """

    if user_id and screen_name:
        raise ValueError("only user_id or screen_name may be passed")

    # Strip if screen_name is prefixed with '@'
    if screen_name:
        screen_name = screen_name.lstrip("@")
    id = screen_name or str(user_id)
    id_type = "screen_name" if screen_name else "user_id"
    log.info("starting user timeline for user %s", id)

    if screen_name or user_id:
        url = "https://api.twitter.com/1.1/statuses/user_timeline.json"
    else:
        url = "https://api.twitter.com/1.1/statuses/home_timeline.json"

    params = {"count": 200, id_type: id, "include_ext_alt_text": "true"}

    retrieved_pages = 0
    reached_end = False

    while True:
        if since_id:
            # Make the since_id inclusive, so we can avoid retrieving
            # an empty page of results in some cases
            params["since_id"] = str(int(since_id) - 1)
        if max_id:
            params["max_id"] = max_id

        try:
            resp = self.get(url, params=params, allow_404=True)
            retrieved_pages += 1
        except requests.exceptions.HTTPError as e:
            if e.response.status_code == 404:
                log.warn("no timeline available for %s", id)
                break
            elif e.response.status_code == 401:
                log.warn("protected account %s", id)
                break
            raise e

        statuses = resp.json()

        if len(statuses) == 0:
            log.info("no new tweets matching %s", params)
            break

        for status in statuses:
            # We've certainly reached the end of new results
            if since_id is not None and status["id_str"] == str(since_id):
                reached_end = True
                break
            # If you request an invalid user_id, you may still get
            # results so need to check.
            if not user_id or id == status.get("user", {}).get("id_str"):
                yield status

        if reached_end:
            log.info("no new tweets matching %s", params)
            break

        if max_pages is not None and retrieved_pages == max_pages:
            log.info("reached max page limit for %s", params)
            break

        max_id = str(int(status["id_str"]) - 1)

trends_available(self)

Returns a list of regions for which Twitter tracks trends.

Source code in twarc/client.py
def trends_available(self):
    """
    Returns a list of regions for which Twitter tracks trends.
    """
    url = "https://api.twitter.com/1.1/trends/available.json"
    try:
        resp = self.get(url)
    except requests.exceptions.HTTPError as e:
        raise e
    return resp.json()

trends_closest(self, lat, lon)

Returns the closest regions for the supplied lat/lon.

Source code in twarc/client.py
def trends_closest(self, lat, lon):
    """
    Returns the closest regions for the supplied lat/lon.
    """
    url = "https://api.twitter.com/1.1/trends/closest.json"
    params = {"lat": lat, "long": lon}
    try:
        resp = self.get(url, params=params)
    except requests.exceptions.HTTPError as e:
        raise e
    return resp.json()

trends_place(self, woeid, exclude=None)

Returns recent Twitter trends for the specified WOEID. If exclude == 'hashtags', Twitter will remove hashtag trends from the response.

Source code in twarc/client.py
def trends_place(self, woeid, exclude=None):
    """
    Returns recent Twitter trends for the specified WOEID. If
    exclude == 'hashtags', Twitter will remove hashtag trends from the
    response.
    """
    url = "https://api.twitter.com/1.1/trends/place.json"
    params = {"id": woeid}
    if exclude:
        params["exclude"] = exclude
    try:
        resp = self.get(url, params=params, allow_404=True)
    except requests.exceptions.HTTPError as e:
        if e.response.status_code == 404:
            log.info("no region matching WOEID %s", woeid)
        raise e
    return resp.json()

user_lookup(self, ids, id_type='user_id')

A generator that returns users for supplied user ids, screen_names, or an iterator of user_ids of either. Use the id_type to indicate which you are supplying (user_id or screen_name)

Source code in twarc/client.py
def user_lookup(self, ids, id_type="user_id"):
    """
    A generator that returns users for supplied user ids, screen_names,
    or an iterator of user_ids of either. Use the id_type to indicate
    which you are supplying (user_id or screen_name)
    """

    if id_type not in ["user_id", "screen_name"]:
        raise RuntimeError("id_type must be user_id or screen_name")

    if not isinstance(ids, types.GeneratorType):
        ids = iter(ids)

    # TODO: this is similar to hydrate, maybe they could share code?

    lookup_ids = []

    def do_lookup():
        ids_str = ",".join(lookup_ids)
        log.info("looking up users %s", ids_str)
        url = "https://api.twitter.com/1.1/users/lookup.json"
        params = {id_type: ids_str}
        try:
            resp = self.get(url, params=params, allow_404=True)
        except requests.exceptions.HTTPError as e:
            if e.response.status_code == 404:
                log.warning("no users matching %s", ids_str)
            raise e
        return resp.json()

    for id in ids:
        lookup_ids.append(str(id).strip())
        if len(lookup_ids) == 100:
            for u in do_lookup():
                yield u
            lookup_ids = []

    if len(lookup_ids) > 0:
        for u in do_lookup():
            yield u

validate_keys(self)

Validate the keys provided are authentic credentials.

Source code in twarc/client.py
def validate_keys(self):
    """
    Validate the keys provided are authentic credentials.
    """
    if self.gnip_auth:
        url = "https://gnip-api.twitter.com/metrics/usage/accounts/{}.json".format(
            self.gnip_account
        )

        keys_present = (
            self.gnip_account and self.gnip_username and self.gnip_password
        )
    elif self.app_auth:
        # no need to validate keys when using OAuth2 App Auth.
        return True
    else:
        url = "https://api.twitter.com/1.1/account/verify_credentials.json"

        keys_present = (
            self.consumer_key
            and self.consumer_secret
            and self.access_token
            and self.access_token_secret
        )

    if keys_present:
        try:
            # Need to explicitly reconnect to confirm the current creds
            # are used in the session object.
            self.connect()
            self.get(url)
            return True
        except requests.HTTPError as e:
            if e.response.status_code == 401:
                raise RuntimeError("Invalid credentials provided.")
            else:
                raise e
    else:
        print("Incomplete credentials provided.")
        print('Please run the command "twarc configure" to get started.')
        sys.exit()

handler: python