Skip to content

twarc.Client

Twarc

Twarc allows you retrieve data from the Twitter API. Each method is an iterator that runs to completion, and handles rate limiting so that it will go to sleep when Twitter tells it to, and wake back up when it is able to retrieve data from the API again.

Source code in twarc/client.py
class Twarc(object):
    """
    Twarc allows you retrieve data from the Twitter API. Each method
    is an iterator that runs to completion, and handles rate limiting so
    that it will go to sleep when Twitter tells it to, and wake back up
    when it is able to retrieve data from the API again.
    """

    def __init__(
        self,
        consumer_key=None,
        consumer_secret=None,
        access_token=None,
        access_token_secret=None,
        connection_errors=0,
        http_errors=0,
        config=None,
        profile="",
        protected=False,
        tweet_mode="extended",
        app_auth=False,
        validate_keys=True,
        gnip_auth=False,
        gnip_username=None,
        gnip_password=None,
        gnip_account=None,
    ):
        """
        Instantiate a Twarc instance. If keys aren't set we'll try to
        discover them in the environment or a supplied profile. If no
        profile is indicated the first section of the config files will
        be used.
        """

        self.api_version = "1.1"
        self.consumer_key = consumer_key
        self.consumer_secret = consumer_secret
        self.access_token = access_token
        self.access_token_secret = access_token_secret
        self.connection_errors = connection_errors
        self.http_errors = http_errors
        self.profile = profile
        self.client = None
        self.last_response = None
        self.tweet_mode = tweet_mode
        self.protected = protected
        self.app_auth = app_auth
        self.gnip_auth = gnip_auth
        self.gnip_username = gnip_username
        self.gnip_password = gnip_password
        self.gnip_account = gnip_account

        if config:
            self.config = config
        else:
            self.config = self.default_config()

        self.get_keys()

        if validate_keys:
            self.validate_keys()

    @filter_protected
    def search(
        self,
        q,
        max_id=None,
        since_id=None,
        lang=None,
        result_type="recent",
        geocode=None,
        max_pages=None,
    ):
        """
        Pass in a query with optional max_id, min_id, lang, geocode, or
        max_pages, and get back an iterator for decoded tweets. Defaults to
        recent (i.e. not mixed, the API default, or popular) tweets.
        """
        url = "https://api.twitter.com/1.1/search/tweets.json"
        params = {
            "count": 100,
            "q": q,
            "include_ext_alt_text": "true",
            "include_entities": "true",
        }

        if lang is not None:
            params["lang"] = lang
        if geocode is not None:
            params["geocode"] = geocode
        if since_id:
            # Make the since_id inclusive, so we can avoid retrieving
            # an empty page of results in some cases
            params["since_id"] = str(int(since_id) - 1)

        if result_type in ["mixed", "recent", "popular"]:
            params["result_type"] = result_type
        else:
            params["result_type"] = "recent"

        retrieved_pages = 0
        reached_end = False

        while True:

            # note: max_id changes as results are retrieved
            if max_id:
                params["max_id"] = max_id

            resp = self.get(url, params=params)

            retrieved_pages += 1
            statuses = resp.json()["statuses"]

            if len(statuses) == 0:
                log.info("no new tweets matching %s", params)
                break

            for status in statuses:
                # We've certainly reached the end of new results
                if since_id is not None and status["id_str"] == str(since_id):
                    reached_end = True
                    break

                yield status

            if reached_end:
                log.info("no new tweets matching %s", params)
                break

            if max_pages is not None and retrieved_pages == max_pages:
                log.info("reached max page limit for %s", params)
                break

            max_id = str(int(status["id_str"]) - 1)

    def premium_search(
        self,
        q,
        product,
        environment,
        from_date=None,
        to_date=None,
        max_results=None,
        sandbox=False,
        limit=0,
    ):
        """
        Search using the Premium Search API. You will need to pass in a query
        a product (30day or fullarchive) and environment to use. Optionally
        you can pass in a from_date and to_date to limit the search using
        datetime objects. If you would like to set max_results you can, or
        you can accept the maximum results (500). If using the a sandbox
        environment you will want to set sandbox=True to lower the max_results
        to 100. The limit option will cause your search to finish after it has
        return more than that number of tweets (0 means no limit).
        """

        if not self.app_auth and not self.gnip_auth:
            raise RuntimeError(
                "This endpoint is only available with application authentication. "
                "Pass app_auth=True in Python or --app-auth on the command line."
            )

        if from_date and not isinstance(from_date, datetime.date):
            raise RuntimeError(
                "from_date must be a datetime.date or datetime.datetime object"
            )
        if to_date and not isinstance(to_date, datetime.date):
            raise RuntimeError(
                "to_date must be a datetime.date or datetime.datetime object"
            )

        if product not in ["30day", "gnip_fullarchive", "fullarchive"]:
            raise RuntimeError("Invalid Premium Search API product: {}".format(product))

        # set default max_results based on whether its sandboxed
        if max_results is None:
            if sandbox:
                max_results = 100
            else:
                max_results = 500

        if product == "gnip_fullarchive":
            url = "https://gnip-api.twitter.com/search/fullarchive/accounts/{}/{}.json".format(
                self.gnip_account, environment
            )
        else:
            url = "https://api.twitter.com/1.1/tweets/search/{}/{}.json".format(
                product, environment
            )

        params = {
            "query": q,
            "fromDate": from_date.strftime("%Y%m%d%H%M") if from_date else None,
            "toDate": to_date.strftime("%Y%m%d%H%M") if to_date else None,
            "maxResults": max_results,
        }

        count = 0
        stop = False
        while not stop:
            resp = self.get(url, params=params)
            if resp.status_code == 200:
                data = resp.json()
                for tweet in data["results"]:
                    count += 1
                    yield tweet
                    if limit != 0 and count >= limit:
                        stop = True
                        break
                if "next" in data:
                    params["next"] = data["next"]
                else:
                    stop = True
            elif resp.status_code == 422:
                raise RuntimeError(
                    "Twitter API 422 response: are you using a premium search sandbox environment and forgot the --sandbox argument?"
                )

    def timeline(
        self, user_id=None, screen_name=None, max_id=None, since_id=None, max_pages=None
    ):
        """
        Returns a collection of the most recent tweets posted
        by the user indicated by the user_id or screen_name parameter.
        Provide a user_id or screen_name.
        """

        if user_id and screen_name:
            raise ValueError("only user_id or screen_name may be passed")

        # Strip if screen_name is prefixed with '@'
        if screen_name:
            screen_name = screen_name.lstrip("@")
        id = screen_name or str(user_id)
        id_type = "screen_name" if screen_name else "user_id"
        log.info("starting user timeline for user %s", id)

        if screen_name or user_id:
            url = "https://api.twitter.com/1.1/statuses/user_timeline.json"
        else:
            url = "https://api.twitter.com/1.1/statuses/home_timeline.json"

        params = {"count": 200, id_type: id, "include_ext_alt_text": "true"}

        retrieved_pages = 0
        reached_end = False

        while True:
            if since_id:
                # Make the since_id inclusive, so we can avoid retrieving
                # an empty page of results in some cases
                params["since_id"] = str(int(since_id) - 1)
            if max_id:
                params["max_id"] = max_id

            try:
                resp = self.get(url, params=params, allow_404=True)
                retrieved_pages += 1
            except requests.exceptions.HTTPError as e:
                if e.response.status_code == 404:
                    log.warn("no timeline available for %s", id)
                    break
                elif e.response.status_code == 401:
                    log.warn("protected account %s", id)
                    break
                raise e

            statuses = resp.json()

            if len(statuses) == 0:
                log.info("no new tweets matching %s", params)
                break

            for status in statuses:
                # We've certainly reached the end of new results
                if since_id is not None and status["id_str"] == str(since_id):
                    reached_end = True
                    break
                # If you request an invalid user_id, you may still get
                # results so need to check.
                if not user_id or id == status.get("user", {}).get("id_str"):
                    yield status

            if reached_end:
                log.info("no new tweets matching %s", params)
                break

            if max_pages is not None and retrieved_pages == max_pages:
                log.info("reached max page limit for %s", params)
                break

            max_id = str(int(status["id_str"]) - 1)

    def user_lookup(self, ids, id_type="user_id"):
        """
        A generator that returns users for supplied iterator of user ids or screen_names.
        Use the id_type to indicate which you are supplying (user_id or screen_name).
        """

        if isinstance(ids, str):
            raise TypeError("ids must be an iterable other than a string")

        if id_type not in ["user_id", "screen_name"]:
            raise RuntimeError("id_type must be user_id or screen_name")

        if not isinstance(ids, types.GeneratorType):
            ids = iter(ids)

        # TODO: this is similar to hydrate, maybe they could share code?

        lookup_ids = []

        def do_lookup():
            ids_str = ",".join(lookup_ids)
            log.info("looking up users %s", ids_str)
            url = "https://api.twitter.com/1.1/users/lookup.json"
            params = {id_type: ids_str}
            try:
                resp = self.get(url, params=params, allow_404=True)
            except requests.exceptions.HTTPError as e:
                if e.response.status_code == 404:
                    log.warning("no users matching %s", ids_str)
                raise e
            return resp.json()

        for id in ids:
            lookup_ids.append(str(id).strip())
            if len(lookup_ids) == 100:
                for u in do_lookup():
                    yield u
                lookup_ids = []

        if len(lookup_ids) > 0:
            for u in do_lookup():
                yield u

    def follower_ids(self, user, max_pages=None):
        """
        Returns Twitter user id lists for the specified user's followers.
        A user can be a specific using their screen_name or user_id
        """
        user = str(user)
        user = user.lstrip("@")
        url = "https://api.twitter.com/1.1/followers/ids.json"

        if re.match(r"^\d+$", user):
            params = {"user_id": user, "cursor": -1}
        else:
            params = {"screen_name": user, "cursor": -1}

        retrieved_pages = 0

        while params["cursor"] != 0:
            try:
                resp = self.get(url, params=params, allow_404=True)
                retrieved_pages += 1
            except requests.exceptions.HTTPError as e:
                if e.response.status_code == 404:
                    log.info("no users matching %s", user)
                raise e
            user_ids = resp.json()
            for user_id in user_ids["ids"]:
                yield str_type(user_id)
            params["cursor"] = user_ids["next_cursor"]

            if max_pages is not None and retrieved_pages == max_pages:
                log.info("reached max follower page limit for %s", params)
                break

    def friend_ids(self, user, max_pages=None):
        """
        Returns Twitter user id lists for the specified user's friend. A user
        can be specified using their screen_name or user_id.
        """
        user = str(user)
        user = user.lstrip("@")
        url = "https://api.twitter.com/1.1/friends/ids.json"

        if re.match(r"^\d+$", user):
            params = {"user_id": user, "cursor": -1}
        else:
            params = {"screen_name": user, "cursor": -1}

        retrieved_pages = 0

        while params["cursor"] != 0:
            try:
                resp = self.get(url, params=params, allow_404=True)
                retrieved_pages += 1
            except requests.exceptions.HTTPError as e:
                if e.response.status_code == 404:
                    log.error("no users matching %s", user)
                raise e

            user_ids = resp.json()
            for user_id in user_ids["ids"]:
                yield str_type(user_id)
            params["cursor"] = user_ids["next_cursor"]

            if max_pages is not None and retrieved_pages == max_pages:
                log.info("reached max friend page limit for %s", params)
                break

    @filter_protected
    def filter(
        self,
        track=None,
        follow=None,
        locations=None,
        lang=[],
        event=None,
        record_keepalive=False,
    ):
        """
        Returns an iterator for tweets that match a given filter track from
        the livestream of tweets happening right now.

        If a threading.Event is provided for event and the event is set,
        the filter will be interrupted.
        """
        if locations is not None:
            if type(locations) == list:
                locations = ",".join(locations)
            locations = locations.replace("\\", "")

        url = "https://stream.twitter.com/1.1/statuses/filter.json"
        params = {"stall_warning": True, "include_ext_alt_text": True}
        if track:
            params["track"] = track
        if follow:
            params["follow"] = follow
        if locations:
            params["locations"] = locations
        if lang:
            # should be a list, but just in case
            if isinstance(lang, list):
                params["language"] = ",".join(lang)
            else:
                params["language"] = lang
        headers = {"accept-encoding": "deflate, gzip"}
        errors = 0
        while True:
            try:
                log.info("connecting to filter stream for %s", params)
                resp = self.post(url, params, headers=headers, stream=True)
                errors = 0
                for line in resp.iter_lines(chunk_size=1024):
                    if event and event.is_set():
                        log.info("stopping filter")
                        # Explicitly close response
                        resp.close()
                        return
                    if not line:
                        log.info("keep-alive")
                        if record_keepalive:
                            yield "keep-alive"
                        continue
                    try:
                        yield json.loads(line.decode())
                    except Exception as e:
                        log.error("json parse error: %s - %s", e, line)
            except requests.exceptions.HTTPError as e:
                errors += 1
                log.error("caught http error %s on %s try", e, errors)
                if self.http_errors and errors == self.http_errors:
                    log.warning("too many errors")
                    raise e
                if e.response.status_code == 420:
                    if interruptible_sleep(errors * 60, event):
                        log.info("stopping filter")
                        return
                else:
                    if interruptible_sleep(errors * 5, event):
                        log.info("stopping filter")
                        return
            except Exception as e:
                errors += 1
                log.error("caught exception %s on %s try", e, errors)
                if self.http_errors and errors == self.http_errors:
                    log.warning("too many exceptions")
                    raise e
                log.error(e)
                if interruptible_sleep(errors, event):
                    log.info("stopping filter")
                    return

    def sample(self, event=None, record_keepalive=False):
        """
        Returns a small random sample of all public statuses. The Tweets
        returned by the default access level are the same, so if two different
        clients connect to this endpoint, they will see the same Tweets.

        If a threading.Event is provided for event and the event is set,
        the sample will be interrupted.
        """
        url = "https://stream.twitter.com/1.1/statuses/sample.json"
        params = {"stall_warning": True}
        headers = {"accept-encoding": "deflate, gzip"}
        errors = 0
        while True:
            try:
                log.info("connecting to sample stream")
                resp = self.post(url, params, headers=headers, stream=True)
                errors = 0
                for line in resp.iter_lines(chunk_size=512):
                    if event and event.is_set():
                        log.info("stopping sample")
                        # Explicitly close response
                        resp.close()
                        return
                    if line == "":
                        log.info("keep-alive")
                        if record_keepalive:
                            yield "keep-alive"
                        continue
                    try:
                        yield json.loads(line.decode())
                    except Exception as e:
                        log.error("json parse error: %s - %s", e, line)
            except requests.exceptions.HTTPError as e:
                errors += 1
                log.error("caught http error %s on %s try", e, errors)
                if self.http_errors and errors == self.http_errors:
                    log.warning("too many errors")
                    raise e
                if e.response.status_code == 420:
                    if interruptible_sleep(errors * 60, event):
                        log.info("stopping filter")
                        return
                else:
                    if interruptible_sleep(errors * 5, event):
                        log.info("stopping filter")
                        return

            except Exception as e:
                errors += 1
                log.error("caught exception %s on %s try", e, errors)
                if self.http_errors and errors == self.http_errors:
                    log.warning("too many errors")
                    raise e
                if interruptible_sleep(errors, event):
                    log.info("stopping filter")
                    return

    def dehydrate(self, iterator):
        """
        Pass in an iterator of tweets' JSON and get back an iterator of the
        IDs of each tweet.
        """
        for line in iterator:
            try:
                yield json.loads(line)["id_str"]
            except Exception as e:
                log.error("uhoh: %s\n" % e)

    def hydrate(self, iterator, trim_user=False):
        """
        Pass in an iterator of tweet ids and get back an iterator for the
        decoded JSON for each corresponding tweet.
        """
        ids = []
        url = "https://api.twitter.com/1.1/statuses/lookup.json"

        # lookup 100 tweets at a time
        for tweet_id in iterator:
            tweet_id = str(tweet_id)
            tweet_id = tweet_id.strip()  # remove new line if present
            ids.append(tweet_id)
            if len(ids) == 100:
                log.info("hydrating %s ids", len(ids))
                resp = self.post(
                    url,
                    data={
                        "id": ",".join(ids),
                        "include_ext_alt_text": "true",
                        "include_entities": "true",
                        "trim_user": trim_user,
                    },
                )
                tweets = resp.json()
                tweets.sort(key=lambda t: t["id_str"])
                for tweet in tweets:
                    yield tweet
                ids = []

        # hydrate any remaining ones
        if len(ids) > 0:
            log.info("hydrating %s", ids)
            resp = self.post(
                url,
                data={
                    "id": ",".join(ids),
                    "include_ext_alt_text": "true",
                    "include_entities": "true",
                    "trim_user": trim_user,
                },
            )
            for tweet in resp.json():
                yield tweet

    def tweet(self, tweet_id):
        try:
            return next(self.hydrate([tweet_id]))
        except StopIteration:
            return []

    def retweets(self, tweet_ids):
        """
        Retrieves up to the last 100 retweets for the provided iterator of tweet_ids.
        """
        if not isinstance(tweet_ids, types.GeneratorType):
            tweet_ids = iter(tweet_ids)

        for tweet_id in tweet_ids:
            if hasattr(tweet_id, "strip"):
                tweet_id = tweet_id.strip()
            log.info("retrieving retweets of %s", tweet_id)
            url = "https://api.twitter.com/1.1/statuses/retweets/" "{}.json".format(
                tweet_id
            )
            try:
                resp = self.get(url, params={"count": 100}, allow_404=True)
                for tweet in resp.json():
                    yield tweet
            except requests.exceptions.HTTPError as e:
                if e.response.status_code == 404:
                    log.info("can't get tweets for non-existent tweet: %s", tweet_id)

    def trends_available(self):
        """
        Returns a list of regions for which Twitter tracks trends.
        """
        url = "https://api.twitter.com/1.1/trends/available.json"
        try:
            resp = self.get(url)
        except requests.exceptions.HTTPError as e:
            raise e
        return resp.json()

    def trends_place(self, woeid, exclude=None):
        """
        Returns recent Twitter trends for the specified WOEID. If
        exclude == 'hashtags', Twitter will remove hashtag trends from the
        response.
        """
        url = "https://api.twitter.com/1.1/trends/place.json"
        params = {"id": woeid}
        if exclude:
            params["exclude"] = exclude
        try:
            resp = self.get(url, params=params, allow_404=True)
        except requests.exceptions.HTTPError as e:
            if e.response.status_code == 404:
                log.info("no region matching WOEID %s", woeid)
            raise e
        return resp.json()

    def trends_closest(self, lat, lon):
        """
        Returns the closest regions for the supplied lat/lon.
        """
        url = "https://api.twitter.com/1.1/trends/closest.json"
        params = {"lat": lat, "long": lon}
        try:
            resp = self.get(url, params=params)
        except requests.exceptions.HTTPError as e:
            raise e
        return resp.json()

    def replies(self, tweet, recursive=False, prune=()):
        """
        replies returns a generator of tweets that are replies for a given
        tweet. It includes the original tweet. If you would like to fetch the
        replies to the replies use recursive=True which will do a depth-first
        recursive walk of the replies. It also walk up the reply chain if you
        supply a tweet that is itself a reply to another tweet. You can
        optionally supply a tuple of tweet ids to ignore during this traversal
        using the prune parameter.
        """

        yield tweet

        # get replies to the tweet
        screen_name = tweet["user"]["screen_name"]
        tweet_id = tweet["id_str"]
        log.info("looking for replies to: %s", tweet_id)
        for reply in self.search("to:%s" % screen_name, since_id=tweet_id):

            if reply["in_reply_to_status_id_str"] != tweet_id:
                continue

            if reply["id_str"] in prune:
                log.info("ignoring pruned tweet id %s", reply["id_str"])
                continue

            log.info("found reply: %s", reply["id_str"])

            if recursive:
                if reply["id_str"] not in prune:
                    prune = prune + (tweet_id,)
                    for r in self.replies(reply, recursive, prune):
                        yield r
            else:
                yield reply

        # if this tweet is itself a reply to another tweet get it and
        # get other potential replies to it

        reply_to_id = tweet.get("in_reply_to_status_id_str")
        log.info("prune=%s", prune)
        if recursive and reply_to_id and reply_to_id not in prune:
            t = self.tweet(reply_to_id)
            if t:
                log.info("found reply-to: %s", t["id_str"])
                prune = prune + (tweet["id_str"],)
                for r in self.replies(t, recursive=True, prune=prune):
                    yield r

        # if this tweet is a quote go get that too whatever tweets it
        # may be in reply to

        quote_id = tweet.get("quoted_status_id_str")
        if recursive and quote_id and quote_id not in prune:
            t = self.tweet(quote_id)
            if t:
                log.info("found quote: %s", t["id_str"])
                prune = prune + (tweet["id_str"],)
                for r in self.replies(t, recursive=True, prune=prune):
                    yield r

    def list_members(
        self, list_id=None, slug=None, owner_screen_name=None, owner_id=None
    ):
        """
        Returns the members of a list.

        List id or (slug and (owner_screen_name or owner_id)) are required
        """
        assert list_id or (slug and (owner_screen_name or owner_id))
        url = "https://api.twitter.com/1.1/lists/members.json"
        params = {"cursor": -1}
        if list_id:
            params["list_id"] = list_id
        else:
            params["slug"] = slug
            if owner_screen_name:
                params["owner_screen_name"] = owner_screen_name
            else:
                params["owner_id"] = owner_id

        while params["cursor"] != 0:
            try:
                resp = self.get(url, params=params, allow_404=True)
            except requests.exceptions.HTTPError as e:
                if e.response.status_code == 404:
                    log.error("no matching list")
                raise e

            users = resp.json()
            for user in users["users"]:
                yield user
            params["cursor"] = users["next_cursor"]

    def oembed(self, tweet_url, **params):
        """
        Returns the oEmbed JSON for a tweet. The JSON includes an html
        key that contains the HTML for the embed. You can pass in
        parameters that correspond to the paramters that Twitter's
        statuses/oembed endpoint supports. For example:

        o = client.oembed('https://twitter.com/biz/status/21', theme='dark')
        """
        log.info("generating embedding for tweet %s", tweet_url)
        url = "https://publish.twitter.com/oembed"

        params["url"] = tweet_url
        resp = self.get(url, params=params)

        return resp.json()

    @rate_limit
    @catch_conn_reset
    @catch_timeout
    @catch_gzip_errors
    def get(self, *args, **kwargs):
        if not self.client:
            self.connect()

        # set default tweet_mode; only used for non-premium/non-gnip endpoints
        if self.is_standard_v1(args[0]):
            if "params" not in kwargs:
                kwargs["params"] = {"tweet_mode": self.tweet_mode}
            else:
                kwargs["params"]["tweet_mode"] = self.tweet_mode

        # Pass allow 404 to not retry on 404
        allow_404 = kwargs.pop("allow_404", False)
        connection_error_count = kwargs.pop("connection_error_count", 0)
        try:
            log.info("getting %s %s", args, kwargs)
            r = self.last_response = self.client.get(
                *args, timeout=(3.05, 31), **kwargs
            )
            # this has been noticed, believe it or not
            # https://github.com/edsu/twarc/issues/75
            if r.status_code == 404 and not allow_404:
                log.warning("404 from Twitter API! trying again")
                time.sleep(1)
                r = self.get(*args, **kwargs)
            return r
        except (ssl.SSLError, ConnectionError, ProtocolError) as e:
            connection_error_count += 1
            log.error("caught connection error %s on %s try", e, connection_error_count)
            if (
                self.connection_errors
                and connection_error_count == self.connection_errors
            ):
                log.error("received too many connection errors")
                raise e
            else:
                self.connect()
                kwargs["connection_error_count"] = connection_error_count
                kwargs["allow_404"] = allow_404
                return self.get(*args, **kwargs)

    @rate_limit
    @catch_conn_reset
    @catch_timeout
    @catch_gzip_errors
    def post(self, *args, **kwargs):
        if not self.client:
            self.connect()

        if "data" in kwargs:
            kwargs["data"]["tweet_mode"] = self.tweet_mode

        connection_error_count = kwargs.pop("connection_error_count", 0)
        try:
            log.info("posting %s %s", args, kwargs)
            self.last_response = self.client.post(*args, timeout=(3.05, 31), **kwargs)
            return self.last_response
        except (ssl.SSLError, ConnectionError, ProtocolError) as e:
            connection_error_count += 1
            log.error("caught connection error %s on %s try", e, connection_error_count)
            if (
                self.connection_errors
                and connection_error_count == self.connection_errors
            ):
                log.error("received too many connection errors")
                raise e
            else:
                self.connect()
                kwargs["connection_error_count"] = connection_error_count
                return self.post(*args, **kwargs)

    @catch_timeout
    def connect(self):
        """
        Sets up the HTTP session to talk to Twitter. If one is active it is
        closed and another one is opened.
        """
        if self.gnip_auth and not (
            self.gnip_username and self.gnip_password and self.gnip_account
        ):
            raise RuntimeError("MissingKeys")
        elif not self.gnip_auth and not (
            self.consumer_key
            and self.consumer_secret
            and self.access_token
            and self.access_token_secret
        ):
            raise RuntimeError("MissingKeys")

        if self.client:
            log.info("closing existing http session")
            self.client.close()
        if self.last_response:
            log.info("closing last response")
            self.last_response.close()
        log.info("creating http session")

        if self.gnip_auth:
            logging.info("creating basic user authentication for gnip")
            s = requests.Session()
            s.auth = (self.gnip_username, self.gnip_password)
            self.client = s
        elif not self.app_auth:
            logging.info("creating OAuth1 user authentication")
            self.client = OAuth1Session(
                client_key=self.consumer_key,
                client_secret=self.consumer_secret,
                resource_owner_key=self.access_token,
                resource_owner_secret=self.access_token_secret,
            )
        else:
            logging.info("creating OAuth2 app authentication")
            client = BackendApplicationClient(client_id=self.consumer_key)
            oauth = OAuth2Session(client=client)
            token = oauth.fetch_token(
                token_url="https://api.twitter.com/oauth2/token",
                client_id=self.consumer_key,
                client_secret=self.consumer_secret,
            )
            self.client = oauth

        if self.client:
            self.client.headers.update({"User-Agent": user_agent})

    def get_keys(self):
        """
        Get the Twitter API keys. Order of precedence is command line,
        environment, config file. Return True if all the keys were found
        and False if not.
        """
        env = os.environ.get
        if not self.consumer_key:
            self.consumer_key = env("CONSUMER_KEY")
        if not self.consumer_secret:
            self.consumer_secret = env("CONSUMER_SECRET")
        if not self.access_token:
            self.access_token = env("ACCESS_TOKEN")
        if not self.access_token_secret:
            self.access_token_secret = env("ACCESS_TOKEN_SECRET")
        if not self.gnip_username:
            self.gnip_username = env("GNIP_USERNAME")
        if not self.gnip_password:
            self.gnip_password = env("GNIP_PASSWORD")
        if not self.gnip_account:
            self.gnip_account = env("GNIP_ACCOUNT")

        if self.config:
            if self.gnip_auth and not (
                self.gnip_username and self.gnip_password and self.gnip_account
            ):
                self.load_config()
            elif not self.gnip_auth and not (
                self.consumer_key
                and self.consumer_secret
                and self.access_token
                and self.access_token_secret
            ):
                self.load_config()

    def validate_keys(self):
        """
        Validate the keys provided are authentic credentials.
        """
        if self.gnip_auth:
            url = "https://gnip-api.twitter.com/metrics/usage/accounts/{}.json".format(
                self.gnip_account
            )

            keys_present = (
                self.gnip_account and self.gnip_username and self.gnip_password
            )
        elif self.app_auth:
            # no need to validate keys when using OAuth2 App Auth.
            return True
        else:
            url = "https://api.twitter.com/1.1/account/verify_credentials.json"

            keys_present = (
                self.consumer_key
                and self.consumer_secret
                and self.access_token
                and self.access_token_secret
            )

        if keys_present:
            try:
                # Need to explicitly reconnect to confirm the current creds
                # are used in the session object.
                self.connect()
                self.get(url)
                return True
            except requests.HTTPError as e:
                if e.response.status_code == 401:
                    raise RuntimeError("Invalid credentials provided.")
                else:
                    raise e
        else:
            print("Incomplete credentials provided.")
            print('Please run the command "twarc configure" to get started.')
            sys.exit()

    def load_config(self):
        path = self.config
        profile = self.profile
        log.info("loading %s profile from config %s", profile, path)

        if not path or not os.path.isfile(path):
            return {}

        config = configparser.ConfigParser()
        config.read(self.config)

        if len(config.sections()) >= 1 and not profile:
            profile = config.sections()[0]

        data = {}
        keys = (
            ["gnip_username", "gnip_password", "gnip_account"]
            if self.gnip_auth
            else [
                "access_token",
                "access_token_secret",
                "consumer_key",
                "consumer_secret",
            ]
        )
        for key in keys:
            try:
                setattr(self, key, config.get(profile, key))
            except configparser.NoSectionError:
                sys.exit("no such profile %s in %s" % (profile, path))
            except configparser.NoOptionError:
                sys.exit("missing %s from profile %s in %s" % (key, profile, path))
        return data

    def save_config(self, profile):
        if not self.config:
            return
        config = configparser.ConfigParser()
        config.read(self.config)

        if config.has_section(profile):
            config.remove_section(profile)

        config.add_section(profile)
        if self.gnip_auth:
            config.set(profile, "gnip_username", self.access_token_secret)
            config.set(profile, "gnip_password", self.access_token_secret)
            config.set(profile, "gnip_account", self.access_token_secret)
        else:
            config.set(profile, "consumer_key", self.consumer_key)
            config.set(profile, "consumer_secret", self.consumer_secret)
            config.set(profile, "access_token", self.access_token)
            config.set(profile, "access_token_secret", self.access_token_secret)
        with open(self.config, "w") as config_file:
            config.write(config_file)

        return config

    def configure(self):
        print(
            "\nTwarc needs to know a few things before it can talk to Twitter on your behalf.\n"
        )

        reuse = False
        if self.consumer_key and self.consumer_secret:
            print(
                "You already have these application keys in your config %s\n"
                % self.config
            )
            print("consumer key: %s" % self.consumer_key)
            print("consumer secret: %s" % self.consumer_secret)
            reuse = get_input(
                "\nWould you like to use those for your new profile? [y/n] "
            )
            reuse = reuse.lower() == "y"

        if not reuse:
            print(
                "\nPlease enter your Twitter application credentials from apps.twitter.com:\n"
            )

            self.consumer_key = get_input("consumer key: ")
            self.consumer_secret = get_input("consumer secret: ")

        answered = False
        while not answered:
            print(
                "\nHow would you like twarc to obtain your user keys?\n\n1) generate access keys by visiting Twitter\n2) manually enter your access token and secret\n"
            )
            answer = get_input("Please enter your choice [1/2] ")
            if answer == "1":
                answered = True
                generate = True
            elif answer == "2":
                answered = True
                generate = False

        if generate:
            request_token_url = "https://api.twitter.com/oauth/request_token"
            oauth = OAuth1(self.consumer_key, client_secret=self.consumer_secret)
            r = requests.post(url=request_token_url, auth=oauth)

            credentials = parse_qs(r.text)
            if not credentials:
                print("\nError: invalid credentials.")
                print(
                    "Please check that you are copying and pasting correctly and try again.\n"
                )
                return

            resource_owner_key = credentials.get("oauth_token")[0]
            resource_owner_secret = credentials.get("oauth_token_secret")[0]

            base_authorization_url = "https://api.twitter.com/oauth/authorize"
            authorize_url = (
                base_authorization_url + "?oauth_token=" + resource_owner_key
            )
            print(
                "\nPlease log into Twitter and visit this URL in your browser:\n%s"
                % authorize_url
            )
            verifier = get_input(
                "\nAfter you have authorized the application please enter the displayed PIN: "
            )

            access_token_url = "https://api.twitter.com/oauth/access_token"
            oauth = OAuth1(
                self.consumer_key,
                client_secret=self.consumer_secret,
                resource_owner_key=resource_owner_key,
                resource_owner_secret=resource_owner_secret,
                verifier=verifier,
            )
            r = requests.post(url=access_token_url, auth=oauth)
            credentials = parse_qs(r.text)

            if not credentials:
                print("\nError: invalid PIN")
                print(
                    "Please check that you entered the PIN correctly and try again.\n"
                )
                return

            self.access_token = resource_owner_key = credentials.get("oauth_token")[0]
            self.access_token_secret = credentials.get("oauth_token_secret")[0]

            screen_name = credentials.get("screen_name")[0]
        else:
            self.access_token = get_input("Enter your Access Token: ")
            self.access_token_secret = get_input("Enter your Access Token Secret: ")
            screen_name = "default"

        config = self.save_config(screen_name)
        print(
            "\nThe credentials for %s have been saved to your configuration file at %s"
            % (screen_name, self.config)
        )
        print("\n✨ ✨ ✨  Happy twarcing! ✨ ✨ ✨\n")

        if len(config.sections()) > 1:
            print(
                "Note: you have multiple profiles in %s so in order to use %s you will use --profile\n"
                % (self.config, screen_name)
            )

    def default_config(self):
        return os.path.join(os.path.expanduser("~"), ".twarc")

    def is_standard_v1(self, url):
        result = True
        if url.startswith("https://gnip-api.twitter.com"):
            result = False
        elif url.startswith("https://api.twitter.com/1.1/tweets/search/30day"):
            result = False
        elif url.startswith("https://api.twitter.com/1.1/tweets/search/fullarchive"):
            result = False
        return result

__init__(self, consumer_key=None, consumer_secret=None, access_token=None, access_token_secret=None, connection_errors=0, http_errors=0, config=None, profile='', protected=False, tweet_mode='extended', app_auth=False, validate_keys=True, gnip_auth=False, gnip_username=None, gnip_password=None, gnip_account=None) special

Instantiate a Twarc instance. If keys aren't set we'll try to discover them in the environment or a supplied profile. If no profile is indicated the first section of the config files will be used.

Source code in twarc/client.py
def __init__(
    self,
    consumer_key=None,
    consumer_secret=None,
    access_token=None,
    access_token_secret=None,
    connection_errors=0,
    http_errors=0,
    config=None,
    profile="",
    protected=False,
    tweet_mode="extended",
    app_auth=False,
    validate_keys=True,
    gnip_auth=False,
    gnip_username=None,
    gnip_password=None,
    gnip_account=None,
):
    """
    Instantiate a Twarc instance. If keys aren't set we'll try to
    discover them in the environment or a supplied profile. If no
    profile is indicated the first section of the config files will
    be used.
    """

    self.api_version = "1.1"
    self.consumer_key = consumer_key
    self.consumer_secret = consumer_secret
    self.access_token = access_token
    self.access_token_secret = access_token_secret
    self.connection_errors = connection_errors
    self.http_errors = http_errors
    self.profile = profile
    self.client = None
    self.last_response = None
    self.tweet_mode = tweet_mode
    self.protected = protected
    self.app_auth = app_auth
    self.gnip_auth = gnip_auth
    self.gnip_username = gnip_username
    self.gnip_password = gnip_password
    self.gnip_account = gnip_account

    if config:
        self.config = config
    else:
        self.config = self.default_config()

    self.get_keys()

    if validate_keys:
        self.validate_keys()

dehydrate(self, iterator)

Pass in an iterator of tweets' JSON and get back an iterator of the IDs of each tweet.

Source code in twarc/client.py
def dehydrate(self, iterator):
    """
    Pass in an iterator of tweets' JSON and get back an iterator of the
    IDs of each tweet.
    """
    for line in iterator:
        try:
            yield json.loads(line)["id_str"]
        except Exception as e:
            log.error("uhoh: %s\n" % e)

follower_ids(self, user, max_pages=None)

Returns Twitter user id lists for the specified user's followers. A user can be a specific using their screen_name or user_id

Source code in twarc/client.py
def follower_ids(self, user, max_pages=None):
    """
    Returns Twitter user id lists for the specified user's followers.
    A user can be a specific using their screen_name or user_id
    """
    user = str(user)
    user = user.lstrip("@")
    url = "https://api.twitter.com/1.1/followers/ids.json"

    if re.match(r"^\d+$", user):
        params = {"user_id": user, "cursor": -1}
    else:
        params = {"screen_name": user, "cursor": -1}

    retrieved_pages = 0

    while params["cursor"] != 0:
        try:
            resp = self.get(url, params=params, allow_404=True)
            retrieved_pages += 1
        except requests.exceptions.HTTPError as e:
            if e.response.status_code == 404:
                log.info("no users matching %s", user)
            raise e
        user_ids = resp.json()
        for user_id in user_ids["ids"]:
            yield str_type(user_id)
        params["cursor"] = user_ids["next_cursor"]

        if max_pages is not None and retrieved_pages == max_pages:
            log.info("reached max follower page limit for %s", params)
            break

friend_ids(self, user, max_pages=None)

Returns Twitter user id lists for the specified user's friend. A user can be specified using their screen_name or user_id.

Source code in twarc/client.py
def friend_ids(self, user, max_pages=None):
    """
    Returns Twitter user id lists for the specified user's friend. A user
    can be specified using their screen_name or user_id.
    """
    user = str(user)
    user = user.lstrip("@")
    url = "https://api.twitter.com/1.1/friends/ids.json"

    if re.match(r"^\d+$", user):
        params = {"user_id": user, "cursor": -1}
    else:
        params = {"screen_name": user, "cursor": -1}

    retrieved_pages = 0

    while params["cursor"] != 0:
        try:
            resp = self.get(url, params=params, allow_404=True)
            retrieved_pages += 1
        except requests.exceptions.HTTPError as e:
            if e.response.status_code == 404:
                log.error("no users matching %s", user)
            raise e

        user_ids = resp.json()
        for user_id in user_ids["ids"]:
            yield str_type(user_id)
        params["cursor"] = user_ids["next_cursor"]

        if max_pages is not None and retrieved_pages == max_pages:
            log.info("reached max friend page limit for %s", params)
            break

get_keys(self)

Get the Twitter API keys. Order of precedence is command line, environment, config file. Return True if all the keys were found and False if not.

Source code in twarc/client.py
def get_keys(self):
    """
    Get the Twitter API keys. Order of precedence is command line,
    environment, config file. Return True if all the keys were found
    and False if not.
    """
    env = os.environ.get
    if not self.consumer_key:
        self.consumer_key = env("CONSUMER_KEY")
    if not self.consumer_secret:
        self.consumer_secret = env("CONSUMER_SECRET")
    if not self.access_token:
        self.access_token = env("ACCESS_TOKEN")
    if not self.access_token_secret:
        self.access_token_secret = env("ACCESS_TOKEN_SECRET")
    if not self.gnip_username:
        self.gnip_username = env("GNIP_USERNAME")
    if not self.gnip_password:
        self.gnip_password = env("GNIP_PASSWORD")
    if not self.gnip_account:
        self.gnip_account = env("GNIP_ACCOUNT")

    if self.config:
        if self.gnip_auth and not (
            self.gnip_username and self.gnip_password and self.gnip_account
        ):
            self.load_config()
        elif not self.gnip_auth and not (
            self.consumer_key
            and self.consumer_secret
            and self.access_token
            and self.access_token_secret
        ):
            self.load_config()

hydrate(self, iterator, trim_user=False)

Pass in an iterator of tweet ids and get back an iterator for the decoded JSON for each corresponding tweet.

Source code in twarc/client.py
def hydrate(self, iterator, trim_user=False):
    """
    Pass in an iterator of tweet ids and get back an iterator for the
    decoded JSON for each corresponding tweet.
    """
    ids = []
    url = "https://api.twitter.com/1.1/statuses/lookup.json"

    # lookup 100 tweets at a time
    for tweet_id in iterator:
        tweet_id = str(tweet_id)
        tweet_id = tweet_id.strip()  # remove new line if present
        ids.append(tweet_id)
        if len(ids) == 100:
            log.info("hydrating %s ids", len(ids))
            resp = self.post(
                url,
                data={
                    "id": ",".join(ids),
                    "include_ext_alt_text": "true",
                    "include_entities": "true",
                    "trim_user": trim_user,
                },
            )
            tweets = resp.json()
            tweets.sort(key=lambda t: t["id_str"])
            for tweet in tweets:
                yield tweet
            ids = []

    # hydrate any remaining ones
    if len(ids) > 0:
        log.info("hydrating %s", ids)
        resp = self.post(
            url,
            data={
                "id": ",".join(ids),
                "include_ext_alt_text": "true",
                "include_entities": "true",
                "trim_user": trim_user,
            },
        )
        for tweet in resp.json():
            yield tweet

list_members(self, list_id=None, slug=None, owner_screen_name=None, owner_id=None)

Returns the members of a list.

List id or (slug and (owner_screen_name or owner_id)) are required

Source code in twarc/client.py
def list_members(
    self, list_id=None, slug=None, owner_screen_name=None, owner_id=None
):
    """
    Returns the members of a list.

    List id or (slug and (owner_screen_name or owner_id)) are required
    """
    assert list_id or (slug and (owner_screen_name or owner_id))
    url = "https://api.twitter.com/1.1/lists/members.json"
    params = {"cursor": -1}
    if list_id:
        params["list_id"] = list_id
    else:
        params["slug"] = slug
        if owner_screen_name:
            params["owner_screen_name"] = owner_screen_name
        else:
            params["owner_id"] = owner_id

    while params["cursor"] != 0:
        try:
            resp = self.get(url, params=params, allow_404=True)
        except requests.exceptions.HTTPError as e:
            if e.response.status_code == 404:
                log.error("no matching list")
            raise e

        users = resp.json()
        for user in users["users"]:
            yield user
        params["cursor"] = users["next_cursor"]

oembed(self, tweet_url, **params)

Returns the oEmbed JSON for a tweet. The JSON includes an html key that contains the HTML for the embed. You can pass in parameters that correspond to the paramters that Twitter's statuses/oembed endpoint supports. For example:

o = client.oembed('https://twitter.com/biz/status/21', theme='dark')

Source code in twarc/client.py
def oembed(self, tweet_url, **params):
    """
    Returns the oEmbed JSON for a tweet. The JSON includes an html
    key that contains the HTML for the embed. You can pass in
    parameters that correspond to the paramters that Twitter's
    statuses/oembed endpoint supports. For example:

    o = client.oembed('https://twitter.com/biz/status/21', theme='dark')
    """
    log.info("generating embedding for tweet %s", tweet_url)
    url = "https://publish.twitter.com/oembed"

    params["url"] = tweet_url
    resp = self.get(url, params=params)

    return resp.json()

Search using the Premium Search API. You will need to pass in a query a product (30day or fullarchive) and environment to use. Optionally you can pass in a from_date and to_date to limit the search using datetime objects. If you would like to set max_results you can, or you can accept the maximum results (500). If using the a sandbox environment you will want to set sandbox=True to lower the max_results to 100. The limit option will cause your search to finish after it has return more than that number of tweets (0 means no limit).

Source code in twarc/client.py
def premium_search(
    self,
    q,
    product,
    environment,
    from_date=None,
    to_date=None,
    max_results=None,
    sandbox=False,
    limit=0,
):
    """
    Search using the Premium Search API. You will need to pass in a query
    a product (30day or fullarchive) and environment to use. Optionally
    you can pass in a from_date and to_date to limit the search using
    datetime objects. If you would like to set max_results you can, or
    you can accept the maximum results (500). If using the a sandbox
    environment you will want to set sandbox=True to lower the max_results
    to 100. The limit option will cause your search to finish after it has
    return more than that number of tweets (0 means no limit).
    """

    if not self.app_auth and not self.gnip_auth:
        raise RuntimeError(
            "This endpoint is only available with application authentication. "
            "Pass app_auth=True in Python or --app-auth on the command line."
        )

    if from_date and not isinstance(from_date, datetime.date):
        raise RuntimeError(
            "from_date must be a datetime.date or datetime.datetime object"
        )
    if to_date and not isinstance(to_date, datetime.date):
        raise RuntimeError(
            "to_date must be a datetime.date or datetime.datetime object"
        )

    if product not in ["30day", "gnip_fullarchive", "fullarchive"]:
        raise RuntimeError("Invalid Premium Search API product: {}".format(product))

    # set default max_results based on whether its sandboxed
    if max_results is None:
        if sandbox:
            max_results = 100
        else:
            max_results = 500

    if product == "gnip_fullarchive":
        url = "https://gnip-api.twitter.com/search/fullarchive/accounts/{}/{}.json".format(
            self.gnip_account, environment
        )
    else:
        url = "https://api.twitter.com/1.1/tweets/search/{}/{}.json".format(
            product, environment
        )

    params = {
        "query": q,
        "fromDate": from_date.strftime("%Y%m%d%H%M") if from_date else None,
        "toDate": to_date.strftime("%Y%m%d%H%M") if to_date else None,
        "maxResults": max_results,
    }

    count = 0
    stop = False
    while not stop:
        resp = self.get(url, params=params)
        if resp.status_code == 200:
            data = resp.json()
            for tweet in data["results"]:
                count += 1
                yield tweet
                if limit != 0 and count >= limit:
                    stop = True
                    break
            if "next" in data:
                params["next"] = data["next"]
            else:
                stop = True
        elif resp.status_code == 422:
            raise RuntimeError(
                "Twitter API 422 response: are you using a premium search sandbox environment and forgot the --sandbox argument?"
            )

replies(self, tweet, recursive=False, prune=())

replies returns a generator of tweets that are replies for a given tweet. It includes the original tweet. If you would like to fetch the replies to the replies use recursive=True which will do a depth-first recursive walk of the replies. It also walk up the reply chain if you supply a tweet that is itself a reply to another tweet. You can optionally supply a tuple of tweet ids to ignore during this traversal using the prune parameter.

Source code in twarc/client.py
def replies(self, tweet, recursive=False, prune=()):
    """
    replies returns a generator of tweets that are replies for a given
    tweet. It includes the original tweet. If you would like to fetch the
    replies to the replies use recursive=True which will do a depth-first
    recursive walk of the replies. It also walk up the reply chain if you
    supply a tweet that is itself a reply to another tweet. You can
    optionally supply a tuple of tweet ids to ignore during this traversal
    using the prune parameter.
    """

    yield tweet

    # get replies to the tweet
    screen_name = tweet["user"]["screen_name"]
    tweet_id = tweet["id_str"]
    log.info("looking for replies to: %s", tweet_id)
    for reply in self.search("to:%s" % screen_name, since_id=tweet_id):

        if reply["in_reply_to_status_id_str"] != tweet_id:
            continue

        if reply["id_str"] in prune:
            log.info("ignoring pruned tweet id %s", reply["id_str"])
            continue

        log.info("found reply: %s", reply["id_str"])

        if recursive:
            if reply["id_str"] not in prune:
                prune = prune + (tweet_id,)
                for r in self.replies(reply, recursive, prune):
                    yield r
        else:
            yield reply

    # if this tweet is itself a reply to another tweet get it and
    # get other potential replies to it

    reply_to_id = tweet.get("in_reply_to_status_id_str")
    log.info("prune=%s", prune)
    if recursive and reply_to_id and reply_to_id not in prune:
        t = self.tweet(reply_to_id)
        if t:
            log.info("found reply-to: %s", t["id_str"])
            prune = prune + (tweet["id_str"],)
            for r in self.replies(t, recursive=True, prune=prune):
                yield r

    # if this tweet is a quote go get that too whatever tweets it
    # may be in reply to

    quote_id = tweet.get("quoted_status_id_str")
    if recursive and quote_id and quote_id not in prune:
        t = self.tweet(quote_id)
        if t:
            log.info("found quote: %s", t["id_str"])
            prune = prune + (tweet["id_str"],)
            for r in self.replies(t, recursive=True, prune=prune):
                yield r

retweets(self, tweet_ids)

Retrieves up to the last 100 retweets for the provided iterator of tweet_ids.

Source code in twarc/client.py
def retweets(self, tweet_ids):
    """
    Retrieves up to the last 100 retweets for the provided iterator of tweet_ids.
    """
    if not isinstance(tweet_ids, types.GeneratorType):
        tweet_ids = iter(tweet_ids)

    for tweet_id in tweet_ids:
        if hasattr(tweet_id, "strip"):
            tweet_id = tweet_id.strip()
        log.info("retrieving retweets of %s", tweet_id)
        url = "https://api.twitter.com/1.1/statuses/retweets/" "{}.json".format(
            tweet_id
        )
        try:
            resp = self.get(url, params={"count": 100}, allow_404=True)
            for tweet in resp.json():
                yield tweet
        except requests.exceptions.HTTPError as e:
            if e.response.status_code == 404:
                log.info("can't get tweets for non-existent tweet: %s", tweet_id)

sample(self, event=None, record_keepalive=False)

Returns a small random sample of all public statuses. The Tweets returned by the default access level are the same, so if two different clients connect to this endpoint, they will see the same Tweets.

If a threading.Event is provided for event and the event is set, the sample will be interrupted.

Source code in twarc/client.py
def sample(self, event=None, record_keepalive=False):
    """
    Returns a small random sample of all public statuses. The Tweets
    returned by the default access level are the same, so if two different
    clients connect to this endpoint, they will see the same Tweets.

    If a threading.Event is provided for event and the event is set,
    the sample will be interrupted.
    """
    url = "https://stream.twitter.com/1.1/statuses/sample.json"
    params = {"stall_warning": True}
    headers = {"accept-encoding": "deflate, gzip"}
    errors = 0
    while True:
        try:
            log.info("connecting to sample stream")
            resp = self.post(url, params, headers=headers, stream=True)
            errors = 0
            for line in resp.iter_lines(chunk_size=512):
                if event and event.is_set():
                    log.info("stopping sample")
                    # Explicitly close response
                    resp.close()
                    return
                if line == "":
                    log.info("keep-alive")
                    if record_keepalive:
                        yield "keep-alive"
                    continue
                try:
                    yield json.loads(line.decode())
                except Exception as e:
                    log.error("json parse error: %s - %s", e, line)
        except requests.exceptions.HTTPError as e:
            errors += 1
            log.error("caught http error %s on %s try", e, errors)
            if self.http_errors and errors == self.http_errors:
                log.warning("too many errors")
                raise e
            if e.response.status_code == 420:
                if interruptible_sleep(errors * 60, event):
                    log.info("stopping filter")
                    return
            else:
                if interruptible_sleep(errors * 5, event):
                    log.info("stopping filter")
                    return

        except Exception as e:
            errors += 1
            log.error("caught exception %s on %s try", e, errors)
            if self.http_errors and errors == self.http_errors:
                log.warning("too many errors")
                raise e
            if interruptible_sleep(errors, event):
                log.info("stopping filter")
                return

timeline(self, user_id=None, screen_name=None, max_id=None, since_id=None, max_pages=None)

Returns a collection of the most recent tweets posted by the user indicated by the user_id or screen_name parameter. Provide a user_id or screen_name.

Source code in twarc/client.py
def timeline(
    self, user_id=None, screen_name=None, max_id=None, since_id=None, max_pages=None
):
    """
    Returns a collection of the most recent tweets posted
    by the user indicated by the user_id or screen_name parameter.
    Provide a user_id or screen_name.
    """

    if user_id and screen_name:
        raise ValueError("only user_id or screen_name may be passed")

    # Strip if screen_name is prefixed with '@'
    if screen_name:
        screen_name = screen_name.lstrip("@")
    id = screen_name or str(user_id)
    id_type = "screen_name" if screen_name else "user_id"
    log.info("starting user timeline for user %s", id)

    if screen_name or user_id:
        url = "https://api.twitter.com/1.1/statuses/user_timeline.json"
    else:
        url = "https://api.twitter.com/1.1/statuses/home_timeline.json"

    params = {"count": 200, id_type: id, "include_ext_alt_text": "true"}

    retrieved_pages = 0
    reached_end = False

    while True:
        if since_id:
            # Make the since_id inclusive, so we can avoid retrieving
            # an empty page of results in some cases
            params["since_id"] = str(int(since_id) - 1)
        if max_id:
            params["max_id"] = max_id

        try:
            resp = self.get(url, params=params, allow_404=True)
            retrieved_pages += 1
        except requests.exceptions.HTTPError as e:
            if e.response.status_code == 404:
                log.warn("no timeline available for %s", id)
                break
            elif e.response.status_code == 401:
                log.warn("protected account %s", id)
                break
            raise e

        statuses = resp.json()

        if len(statuses) == 0:
            log.info("no new tweets matching %s", params)
            break

        for status in statuses:
            # We've certainly reached the end of new results
            if since_id is not None and status["id_str"] == str(since_id):
                reached_end = True
                break
            # If you request an invalid user_id, you may still get
            # results so need to check.
            if not user_id or id == status.get("user", {}).get("id_str"):
                yield status

        if reached_end:
            log.info("no new tweets matching %s", params)
            break

        if max_pages is not None and retrieved_pages == max_pages:
            log.info("reached max page limit for %s", params)
            break

        max_id = str(int(status["id_str"]) - 1)

trends_available(self)

Returns a list of regions for which Twitter tracks trends.

Source code in twarc/client.py
def trends_available(self):
    """
    Returns a list of regions for which Twitter tracks trends.
    """
    url = "https://api.twitter.com/1.1/trends/available.json"
    try:
        resp = self.get(url)
    except requests.exceptions.HTTPError as e:
        raise e
    return resp.json()

trends_closest(self, lat, lon)

Returns the closest regions for the supplied lat/lon.

Source code in twarc/client.py
def trends_closest(self, lat, lon):
    """
    Returns the closest regions for the supplied lat/lon.
    """
    url = "https://api.twitter.com/1.1/trends/closest.json"
    params = {"lat": lat, "long": lon}
    try:
        resp = self.get(url, params=params)
    except requests.exceptions.HTTPError as e:
        raise e
    return resp.json()

trends_place(self, woeid, exclude=None)

Returns recent Twitter trends for the specified WOEID. If exclude == 'hashtags', Twitter will remove hashtag trends from the response.

Source code in twarc/client.py
def trends_place(self, woeid, exclude=None):
    """
    Returns recent Twitter trends for the specified WOEID. If
    exclude == 'hashtags', Twitter will remove hashtag trends from the
    response.
    """
    url = "https://api.twitter.com/1.1/trends/place.json"
    params = {"id": woeid}
    if exclude:
        params["exclude"] = exclude
    try:
        resp = self.get(url, params=params, allow_404=True)
    except requests.exceptions.HTTPError as e:
        if e.response.status_code == 404:
            log.info("no region matching WOEID %s", woeid)
        raise e
    return resp.json()

user_lookup(self, ids, id_type='user_id')

A generator that returns users for supplied iterator of user ids or screen_names. Use the id_type to indicate which you are supplying (user_id or screen_name).

Source code in twarc/client.py
def user_lookup(self, ids, id_type="user_id"):
    """
    A generator that returns users for supplied iterator of user ids or screen_names.
    Use the id_type to indicate which you are supplying (user_id or screen_name).
    """

    if isinstance(ids, str):
        raise TypeError("ids must be an iterable other than a string")

    if id_type not in ["user_id", "screen_name"]:
        raise RuntimeError("id_type must be user_id or screen_name")

    if not isinstance(ids, types.GeneratorType):
        ids = iter(ids)

    # TODO: this is similar to hydrate, maybe they could share code?

    lookup_ids = []

    def do_lookup():
        ids_str = ",".join(lookup_ids)
        log.info("looking up users %s", ids_str)
        url = "https://api.twitter.com/1.1/users/lookup.json"
        params = {id_type: ids_str}
        try:
            resp = self.get(url, params=params, allow_404=True)
        except requests.exceptions.HTTPError as e:
            if e.response.status_code == 404:
                log.warning("no users matching %s", ids_str)
            raise e
        return resp.json()

    for id in ids:
        lookup_ids.append(str(id).strip())
        if len(lookup_ids) == 100:
            for u in do_lookup():
                yield u
            lookup_ids = []

    if len(lookup_ids) > 0:
        for u in do_lookup():
            yield u

validate_keys(self)

Validate the keys provided are authentic credentials.

Source code in twarc/client.py
def validate_keys(self):
    """
    Validate the keys provided are authentic credentials.
    """
    if self.gnip_auth:
        url = "https://gnip-api.twitter.com/metrics/usage/accounts/{}.json".format(
            self.gnip_account
        )

        keys_present = (
            self.gnip_account and self.gnip_username and self.gnip_password
        )
    elif self.app_auth:
        # no need to validate keys when using OAuth2 App Auth.
        return True
    else:
        url = "https://api.twitter.com/1.1/account/verify_credentials.json"

        keys_present = (
            self.consumer_key
            and self.consumer_secret
            and self.access_token
            and self.access_token_secret
        )

    if keys_present:
        try:
            # Need to explicitly reconnect to confirm the current creds
            # are used in the session object.
            self.connect()
            self.get(url)
            return True
        except requests.HTTPError as e:
            if e.response.status_code == 401:
                raise RuntimeError("Invalid credentials provided.")
            else:
                raise e
    else:
        print("Incomplete credentials provided.")
        print('Please run the command "twarc configure" to get started.')
        sys.exit()

handler: python