__all__ = [
    "SpotifyClientCredentials",
    "SpotifyOAuth",
    "SpotifyOauthError",
    "SpotifyStateError",
    "SpotifyImplicitGrant",
    "SpotifyPKCE"
]

import base64
import html
import logging
import os
import time
import urllib.parse as urllibparse
import warnings
import webbrowser
from http.server import BaseHTTPRequestHandler, HTTPServer
from urllib.parse import parse_qsl, urlparse

import requests

from spotipy.cache_handler import CacheFileHandler, CacheHandler
from spotipy.exceptions import SpotifyOauthError, SpotifyStateError
from spotipy.util import (CLIENT_CREDS_ENV_VARS, REQUESTS_SESSION,
                          get_host_port, normalize_scope)

logger = logging.getLogger(__name__)


def _make_authorization_headers(client_id, client_secret):
    auth_header = base64.b64encode(
        str(client_id + ":" + client_secret).encode("ascii")
    )
    return {"Authorization": f"Basic {auth_header.decode('ascii')}"}


def _ensure_value(value, env_key):
    env_val = CLIENT_CREDS_ENV_VARS[env_key]
    _val = value or os.getenv(env_val)
    if _val is None:
        msg = f"No {env_key}. Pass it or set a {env_val} environment variable."
        raise SpotifyOauthError(msg)
    return _val


class SpotifyAuthBase:
    def __init__(self, requests_session):
        if isinstance(requests_session, requests.Session):
            self._session = requests_session
        else:
            if requests_session:  # Build a new session.
                self._session = requests.Session()
            else:  # Use the Requests API module as a "session".
                from requests import api
                self._session = api

    def _normalize_scope(self, scope):
        return normalize_scope(scope)

    @property
    def client_id(self):
        return self._client_id

    @client_id.setter
    def client_id(self, val):
        self._client_id = _ensure_value(val, "client_id")

    @property
    def client_secret(self):
        return self._client_secret

    @client_secret.setter
    def client_secret(self, val):
        self._client_secret = _ensure_value(val, "client_secret")

    @property
    def redirect_uri(self):
        return self._redirect_uri

    @redirect_uri.setter
    def redirect_uri(self, val):
        self._redirect_uri = _ensure_value(val, "redirect_uri")

    @staticmethod
    def _get_user_input(prompt):
        try:
            return raw_input(prompt)
        except NameError:
            return input(prompt)

    @staticmethod
    def is_token_expired(token_info):
        now = int(time.time())
        return token_info["expires_at"] - now < 60

    @staticmethod
    def _is_scope_subset(needle_scope, haystack_scope):
        needle_scope = set(needle_scope.split()) if needle_scope else set()
        haystack_scope = (
            set(haystack_scope.split()) if haystack_scope else set()
        )
        return needle_scope <= haystack_scope

    def _handle_oauth_error(self, http_error):
        response = http_error.response
        try:
            error_payload = response.json()
            error = error_payload.get('error')
            error_description = error_payload.get('error_description')
        except ValueError:
            # if the response cannot be decoded into JSON (which raises a ValueError),
            # then try to decode it into text

            # if we receive an empty string (which is falsy), then replace it with `None`
            error = response.text or None
            error_description = None

        raise SpotifyOauthError(
            f'error: {error}, error_description: {error_description}',
            error=error,
            error_description=error_description
        )

    def __del__(self):
        """Make sure the connection (pool) gets closed"""
        if getattr(self, "_session", None) and isinstance(self._session, REQUESTS_SESSION):
            self._session.close()


class SpotifyClientCredentials(SpotifyAuthBase):
    OAUTH_TOKEN_URL = "https://accounts.spotify.com/api/token"

    def __init__(
        self,
        client_id=None,
        client_secret=None,
        proxies=None,
        requests_session=True,
        requests_timeout=None,
        cache_handler=None
    ):
        """
        Creates a Client Credentials Flow Manager.

        The Client Credentials flow is used in server-to-server authentication.
        Only endpoints that do not access user information can be accessed.
        This means that endpoints that require authorization scopes cannot be accessed.
        The advantage, however, of this authorization flow is that it does not require any
        user interaction

        You can either provide a client_id and client_secret to the
        constructor or set SPOTIPY_CLIENT_ID and SPOTIPY_CLIENT_SECRET
        environment variables

        Parameters:
             * client_id: Must be supplied or set as environment variable
             * client_secret: Must be supplied or set as environment variable
             * proxies: Optional, proxy for the requests library to route through
             * requests_session: A Requests session
             * requests_timeout: Optional, tell Requests to stop waiting for a response after
                                 a given number of seconds
             * cache_handler: An instance of the `CacheHandler` class to handle
                              getting and saving cached authorization tokens.
                              Optional, will otherwise use `CacheFileHandler`.
                              (takes precedence over `cache_path` and `username`)

        """

        super().__init__(requests_session)

        self.client_id = client_id
        self.client_secret = client_secret
        self.proxies = proxies
        self.requests_timeout = requests_timeout
        if cache_handler:
            assert issubclass(cache_handler.__class__, CacheHandler), \
                "cache_handler must be a subclass of CacheHandler: " + str(type(cache_handler)) \
                + " != " + str(CacheHandler)
            self.cache_handler = cache_handler
        else:
            self.cache_handler = CacheFileHandler()

    def get_access_token(self, as_dict=True, check_cache=True):
        """
        If a valid access token is in memory, returns it
        Else fetches a new token and returns it

            Parameters:
            - as_dict: (deprecated) a boolean indicating if returning the access token
                as a token_info dictionary, otherwise it will be returned
                as a string.
        """
        if as_dict:
            warnings.warn(
                "You're using 'as_dict = True'."
                "get_access_token will return the token string directly in future "
                "versions. Please adjust your code accordingly, or use "
                "get_cached_token instead.",
                DeprecationWarning,
                stacklevel=2,
            )

        if check_cache:
            token_info = self.cache_handler.get_cached_token()
            if token_info and not self.is_token_expired(token_info):
                return token_info if as_dict else token_info["access_token"]

        token_info = self._request_access_token()
        token_info = self._add_custom_values_to_token_info(token_info)
        self.cache_handler.save_token_to_cache(token_info)
        return token_info if as_dict else token_info["access_token"]

    def _request_access_token(self):
        """Gets client credentials access token """
        payload = {"grant_type": "client_credentials"}

        headers = _make_authorization_headers(
            self.client_id, self.client_secret
        )

        logger.debug(f"Sending POST request to {self.OAUTH_TOKEN_URL} with Headers: "
                     f"{headers} and Body: {payload}")

        try:
            response = self._session.post(
                self.OAUTH_TOKEN_URL,
                data=payload,
                headers=headers,
                verify=True,
                proxies=self.proxies,
                timeout=self.requests_timeout,
            )
            response.raise_for_status()
            token_info = response.json()
            return token_info
        except requests.exceptions.HTTPError as http_error:
            self._handle_oauth_error(http_error)

    def _add_custom_values_to_token_info(self, token_info):
        """
        Store some values that aren't directly provided by a Web API
        response.
        """
        token_info["expires_at"] = int(time.time()) + token_info["expires_in"]
        return token_info


class SpotifyOAuth(SpotifyAuthBase):
    """
    Implements Authorization Code Flow for Spotify's OAuth implementation.
    """
    OAUTH_AUTHORIZE_URL = "https://accounts.spotify.com/authorize"
    OAUTH_TOKEN_URL = "https://accounts.spotify.com/api/token"

    def __init__(
            self,
            client_id=None,
            client_secret=None,
            redirect_uri=None,
            state=None,
            scope=None,
            cache_path=None,
            username=None,
            proxies=None,
            show_dialog=False,
            requests_session=True,
            requests_timeout=None,
            open_browser=True,
            cache_handler=None
    ):
        """
        Creates a SpotifyOAuth object

        Parameters:
             * client_id: Must be supplied or set as environment variable
             * client_secret: Must be supplied or set as environment variable
             * redirect_uri: Must be supplied or set as environment variable
             * state: Optional, no verification is performed
             * scope: Optional, either a list of scopes or comma separated string of scopes.
                      e.g, "playlist-read-private,playlist-read-collaborative"
             * cache_path: (deprecated) Optional, will otherwise be generated
                           (takes precedence over `username`)
             * username: (deprecated) Optional or set as environment variable
                         (will set `cache_path` to `.cache-{username}`)
             * proxies: Optional, proxy for the requests library to route through
             * show_dialog: Optional, interpreted as boolean
             * requests_session: A Requests session
             * requests_timeout: Optional, tell Requests to stop waiting for a response after
                                 a given number of seconds
             * open_browser: Optional, whether the web browser should be opened to
                             authorize a user
             * cache_handler: An instance of the `CacheHandler` class to handle
                              getting and saving cached authorization tokens.
                              Optional, will otherwise use `CacheFileHandler`.
                              (takes precedence over `cache_path` and `username`)
        """

        super().__init__(requests_session)

        self.client_id = client_id
        self.client_secret = client_secret
        self.redirect_uri = redirect_uri
        self.state = state
        self.scope = self._normalize_scope(scope)
        if username or cache_path:
            warnings.warn("Specifying cache_path or username as arguments to SpotifyOAuth " +
                          "will be deprecated. Instead, please create a CacheFileHandler " +
                          "instance with the desired cache_path and username and pass it " +
                          "to SpotifyOAuth as the cache_handler. For example:\n\n" +
                          "\tfrom spotipy.oauth2 import CacheFileHandler\n" +
                          "\thandler = CacheFileHandler(cache_path=cache_path, " +
                          "username=username)\n" +
                          "\tsp = spotipy.SpotifyOAuth(client_id, client_secret, " +
                          "redirect_uri," +
                          " cache_handler=handler)",
                          DeprecationWarning
                          )
            if cache_handler:
                warnings.warn("A cache_handler has been specified along with a cache_path or " +
                              "username. The cache_path and username arguments will be ignored.")
        if cache_handler:
            assert issubclass(cache_handler.__class__, CacheHandler), \
                "cache_handler must be a subclass of CacheHandler: " + str(type(cache_handler)) \
                + " != " + str(CacheHandler)
            self.cache_handler = cache_handler
        else:
            username = (username or os.getenv(CLIENT_CREDS_ENV_VARS["client_username"]))
            self.cache_handler = CacheFileHandler(
                username=username,
                cache_path=cache_path
            )
        self.proxies = proxies
        self.requests_timeout = requests_timeout
        self.show_dialog = show_dialog
        self.open_browser = open_browser

    def validate_token(self, token_info):
        if token_info is None:
            return None

        # if scopes don't match, then bail
        if "scope" not in token_info or not self._is_scope_subset(
                self.scope, token_info["scope"]
        ):
            return None

        if self.is_token_expired(token_info):
            token_info = self.refresh_access_token(
                token_info["refresh_token"]
            )

        return token_info

    def get_authorize_url(self, state=None):
        """ Gets the URL to use to authorize this app
        """
        payload = {
            "client_id": self.client_id,
            "response_type": "code",
            "redirect_uri": self.redirect_uri,
        }
        if self.scope:
            payload["scope"] = self.scope
        if state is None:
            state = self.state
        if state is not None:
            payload["state"] = state
        if self.show_dialog:
            payload["show_dialog"] = True

        urlparams = urllibparse.urlencode(payload)

        return f"{self.OAUTH_AUTHORIZE_URL}?{urlparams}"

    def parse_response_code(self, url):
        """ Parse the response code in the given response url

            Parameters:
                - url - the response url
        """
        _, code = self.parse_auth_response_url(url)
        if code is None:
            return url
        else:
            return code

    @staticmethod
    def parse_auth_response_url(url):
        query_s = urlparse(url).query
        form = dict(parse_qsl(query_s))
        if "error" in form:
            raise SpotifyOauthError(f"Received error from auth server: {form['error']}",
                                    error=form["error"])
        return tuple(form.get(param) for param in ["state", "code"])

    def _make_authorization_headers(self):
        return _make_authorization_headers(self.client_id, self.client_secret)

    def _open_auth_url(self):
        auth_url = self.get_authorize_url()
        try:
            webbrowser.open(auth_url)
            logger.info(f"Opened {auth_url} in your browser")
        except webbrowser.Error:
            logger.error(f"Please navigate here: {auth_url}")

    def _get_auth_response_interactive(self, open_browser=False):
        if open_browser:
            self._open_auth_url()
            prompt = "Enter the URL you were redirected to: "
        else:
            url = self.get_authorize_url()
            prompt = (
                f"Go to the following URL: {url}\n"
                "Enter the URL you were redirected to: "
            )
        response = self._get_user_input(prompt)
        state, code = SpotifyOAuth.parse_auth_response_url(response)
        if self.state is not None and self.state != state:
            raise SpotifyStateError(self.state, state)
        return code

    def _get_auth_response_local_server(self, redirect_port):
        server = start_local_http_server(redirect_port)
        self._open_auth_url()
        server.handle_request()

        if server.error is not None:
            raise server.error
        elif self.state is not None and server.state != self.state:
            raise SpotifyStateError(self.state, server.state)
        elif server.auth_code is not None:
            return server.auth_code
        else:
            raise SpotifyOauthError("Server listening on localhost has not been accessed")

    def get_auth_response(self, open_browser=None):
        logger.info('User authentication requires interaction with your '
                    'web browser. Once you enter your credentials and '
                    'give authorization, you will be redirected to '
                    'a url.  Paste that url you were directed to to '
                    'complete the authorization.')

        redirect_info = urlparse(self.redirect_uri)
        redirect_host, redirect_port = get_host_port(redirect_info.netloc)

        if redirect_host == 'localhost':
            logger.warning(
                "Using 'localhost' as a redirect URI is being deprecated. "
                "Use a loopback IP address such as 127.0.0.1 "
                "to ensure your app remains functional.")

        if redirect_info.scheme == "http" and redirect_host not in ("127.0.0.1", "localhost"):
            logger.warning(
                "Redirect URIs using HTTP are being deprecated. "
                "To ensure your app remains functional, use HTTPS instead.")

        if open_browser is None:
            open_browser = self.open_browser

        if (
                open_browser
                and redirect_host in ("127.0.0.1", "localhost")
                and redirect_info.scheme == "http"
        ):
            # Only start a local http server if a port is specified
            if redirect_port:
                return self._get_auth_response_local_server(redirect_port)
            else:
                logger.warning(f'Using `{redirect_host}` as redirect URI without a port. '
                               f'Specify a port (e.g. `{redirect_host}:8080`) to allow '
                               'automatic retrieval of authentication code '
                               'instead of having to copy and paste '
                               'the URL your browser is redirected to.')

        return self._get_auth_response_interactive(open_browser=open_browser)

    def get_authorization_code(self, response=None):
        if response:
            return self.parse_response_code(response)
        return self.get_auth_response()

    def get_access_token(self, code=None, as_dict=True, check_cache=True):
        """ Gets the access token for the app given the code

            Parameters:
                - code: the response code
                - as_dict: (deprecated) a boolean indicating if returning the access token
                            as a token_info dictionary, otherwise it will be returned
                            as a string.
        """
        if as_dict:
            warnings.warn(
                "You're using 'as_dict = True'."
                "get_access_token will return the token string directly in future "
                "versions. Please adjust your code accordingly, or use "
                "get_cached_token instead.",
                DeprecationWarning,
                stacklevel=2,
            )
        if check_cache:
            token_info = self.validate_token(self.cache_handler.get_cached_token())
            if token_info is not None:
                if self.is_token_expired(token_info):
                    token_info = self.refresh_access_token(
                        token_info["refresh_token"]
                    )
                return token_info if as_dict else token_info["access_token"]

        payload = {
            "redirect_uri": self.redirect_uri,
            "code": code or self.get_auth_response(),
            "grant_type": "authorization_code",
        }
        if self.scope:
            payload["scope"] = self.scope
        if self.state:
            payload["state"] = self.state

        headers = self._make_authorization_headers()

        logger.debug(f"Sending POST request to {self.OAUTH_TOKEN_URL} with Headers: "
                     f"{headers} and Body: {payload}")

        try:
            response = self._session.post(
                self.OAUTH_TOKEN_URL,
                data=payload,
                headers=headers,
                verify=True,
                proxies=self.proxies,
                timeout=self.requests_timeout,
            )
            response.raise_for_status()
            token_info = response.json()
            token_info = self._add_custom_values_to_token_info(token_info)
            self.cache_handler.save_token_to_cache(token_info)
            return token_info if as_dict else token_info["access_token"]
        except requests.exceptions.HTTPError as http_error:
            self._handle_oauth_error(http_error)

    def refresh_access_token(self, refresh_token):
        payload = {
            "refresh_token": refresh_token,
            "grant_type": "refresh_token",
        }

        headers = self._make_authorization_headers()

        logger.debug(f"Sending POST request to {self.OAUTH_TOKEN_URL} with Headers: "
                     f"{headers} and Body: {payload}")

        try:
            response = self._session.post(
                self.OAUTH_TOKEN_URL,
                data=payload,
                headers=headers,
                proxies=self.proxies,
                timeout=self.requests_timeout,
            )
            response.raise_for_status()
            token_info = response.json()
            token_info = self._add_custom_values_to_token_info(token_info)
            if "refresh_token" not in token_info:
                token_info["refresh_token"] = refresh_token
            self.cache_handler.save_token_to_cache(token_info)
            return token_info
        except requests.exceptions.HTTPError as http_error:
            self._handle_oauth_error(http_error)

    def _add_custom_values_to_token_info(self, token_info):
        """
        Store some values that aren't directly provided by a Web API
        response.
        """
        token_info["expires_at"] = int(time.time()) + token_info["expires_in"]
        token_info["scope"] = self.scope
        return token_info

    def get_cached_token(self):
        """ Gets the cached token for the app

            .. deprecated::
            This method is deprecated and may be removed in a future version.
        """
        warnings.warn("Calling get_cached_token directly on the SpotifyOAuth object will be " +
                      "deprecated. Instead, please specify a CacheFileHandler instance as " +
                      "the cache_handler in SpotifyOAuth and use the CacheFileHandler's " +
                      "get_cached_token method. You can replace:\n\tsp.get_cached_token()" +
                      "\n\nWith:\n\tsp.validate_token(sp.cache_handler.get_cached_token())",
                      DeprecationWarning
                      )
        return self.validate_token(self.cache_handler.get_cached_token())

    def _save_token_info(self, token_info):
        warnings.warn("Calling _save_token_info directly on the SpotifyOAuth object will be " +
                      "deprecated. Instead, please specify a CacheFileHandler instance as " +
                      "the cache_handler in SpotifyOAuth and use the CacheFileHandler's " +
                      "save_token_to_cache method.",
                      DeprecationWarning
                      )
        self.cache_handler.save_token_to_cache(token_info)
        return None


class SpotifyPKCE(SpotifyAuthBase):
    """ Implements PKCE Authorization Flow for client apps

    This auth manager enables *user and non-user* endpoints with only
    a client ID, redirect URI, and username. When the app requests
    an access token for the first time, the user is prompted to
    authorize the new client app. After authorizing the app, the client
    app is then given both access and refresh tokens. This is the
    preferred way of authorizing a mobile/desktop client.

    """

    OAUTH_AUTHORIZE_URL = "https://accounts.spotify.com/authorize"
    OAUTH_TOKEN_URL = "https://accounts.spotify.com/api/token"

    def __init__(self,
                 client_id=None,
                 redirect_uri=None,
                 state=None,
                 scope=None,
                 cache_path=None,
                 username=None,
                 proxies=None,
                 requests_timeout=None,
                 requests_session=True,
                 open_browser=True,
                 cache_handler=None):
        """
        Creates Auth Manager with the PKCE Auth flow.

        Parameters:
             * client_id: Must be supplied or set as environment variable
             * redirect_uri: Must be supplied or set as environment variable
             * state: Optional, no verification is performed
             * scope: Optional, either a list of scopes or comma separated string of scopes.
                      e.g, "playlist-read-private,playlist-read-collaborative"
             * cache_path: (deprecated) Optional, will otherwise be generated
                           (takes precedence over `username`)
             * username: (deprecated) Optional or set as environment variable
                         (will set `cache_path` to `.cache-{username}`)
             * proxies: Optional, proxy for the requests library to route through
             * requests_timeout: Optional, tell Requests to stop waiting for a response after
                                 a given number of seconds
             * requests_session: A Requests session
             * open_browser: Optional, whether the web browser should be opened to
                             authorize a user
             * cache_handler: An instance of the `CacheHandler` class to handle
                              getting and saving cached authorization tokens.
                              Optional, will otherwise use `CacheFileHandler`.
                              (takes precedence over `cache_path` and `username`)
        """

        super().__init__(requests_session)
        self.client_id = client_id
        self.redirect_uri = redirect_uri
        self.state = state
        self.scope = self._normalize_scope(scope)
        if username or cache_path:
            warnings.warn("Specifying cache_path or username as arguments to SpotifyPKCE " +
                          "will be deprecated. Instead, please create a CacheFileHandler " +
                          "instance with the desired cache_path and username and pass it " +
                          "to SpotifyPKCE as the cache_handler. For example:\n\n" +
                          "\tfrom spotipy.oauth2 import CacheFileHandler\n" +
                          "\thandler = CacheFileHandler(cache_path=cache_path, " +
                          "username=username)\n" +
                          "\tsp = spotipy.SpotifyImplicitGrant(client_id, client_secret, " +
                          "redirect_uri, cache_handler=handler)",
                          DeprecationWarning
                          )
            if cache_handler:
                warnings.warn("A cache_handler has been specified along with a cache_path or " +
                              "username. The cache_path and username arguments will be ignored.")
        if cache_handler:
            assert issubclass(type(cache_handler), CacheHandler), \
                "type(cache_handler): " + str(type(cache_handler)) + " != " + str(CacheHandler)
            self.cache_handler = cache_handler
        else:
            username = (username or os.getenv(CLIENT_CREDS_ENV_VARS["client_username"]))
            self.cache_handler = CacheFileHandler(
                username=username,
                cache_path=cache_path
            )
        self.proxies = proxies
        self.requests_timeout = requests_timeout

        self._code_challenge_method = "S256"  # Spotify requires SHA256
        self.code_verifier = None
        self.code_challenge = None
        self.authorization_code = None
        self.open_browser = open_browser

    def _get_code_verifier(self):
        """ Spotify PCKE code verifier - See step 1 of the reference guide below
        Reference:
        https://developer.spotify.com/documentation/general/guides/authorization-guide/#authorization-code-flow-with-proof-key-for-code-exchange-pkce
        """
        # Range (33,96) is used to select between 44-128 base64 characters for the
        # next operation. The range looks weird because base64 is 6 bytes
        import random
        length = random.randint(33, 96)

        # The seeded length generates between a 44 and 128 base64 characters encoded string
        import secrets
        return secrets.token_urlsafe(length)

    def _get_code_challenge(self):
        """ Spotify PCKE code challenge - See step 1 of the reference guide below
        Reference:
        https://developer.spotify.com/documentation/general/guides/authorization-guide/#authorization-code-flow-with-proof-key-for-code-exchange-pkce
        """
        import base64
        import hashlib
        code_challenge_digest = hashlib.sha256(self.code_verifier.encode('utf-8')).digest()
        code_challenge = base64.urlsafe_b64encode(code_challenge_digest).decode('utf-8')
        return code_challenge.replace('=', '')

    def get_authorize_url(self, state=None):
        """ Gets the URL to use to authorize this app """
        if not self.code_challenge:
            self.get_pkce_handshake_parameters()
        payload = {
            "client_id": self.client_id,
            "response_type": "code",
            "redirect_uri": self.redirect_uri,
            "code_challenge_method": self._code_challenge_method,
            "code_challenge": self.code_challenge
        }
        if self.scope:
            payload["scope"] = self.scope
        if state is None:
            state = self.state
        if state is not None:
            payload["state"] = state
        urlparams = urllibparse.urlencode(payload)
        return f"{self.OAUTH_AUTHORIZE_URL}?{urlparams}"

    def _open_auth_url(self, state=None):
        auth_url = self.get_authorize_url(state)
        try:
            webbrowser.open(auth_url)
            logger.info(f"Opened {auth_url} in your browser")
        except webbrowser.Error:
            logger.error(f"Please navigate here: {auth_url}")

    def _get_auth_response(self, open_browser=None):
        logger.info('User authentication requires interaction with your '
                    'web browser. Once you enter your credentials and '
                    'give authorization, you will be redirected to '
                    'a url.  Paste that url you were directed to to '
                    'complete the authorization.')

        redirect_info = urlparse(self.redirect_uri)
        redirect_host, redirect_port = get_host_port(redirect_info.netloc)

        if open_browser is None:
            open_browser = self.open_browser

        if redirect_host == 'localhost':
            logger.warning(
                "Using 'localhost' as a redirect URI is being deprecated. "
                "Use a loopback IP address such as 127.0.0.1 "
                "to ensure your app remains functional.")

        if redirect_info.scheme == "http" and redirect_host not in ("127.0.0.1", "localhost"):
            logger.warning(
                "Redirect URIs using HTTP are being deprecated. "
                "To ensure your app remains functional, use HTTPS instead.")

        if (
                open_browser
                and redirect_host in ("127.0.0.1", "localhost")
                and redirect_info.scheme == "http"
        ):
            # Only start a local http server if a port is specified
            if redirect_port:
                return self._get_auth_response_local_server(redirect_port)
            else:
                logger.warning(f'Using `{redirect_host}` as redirect URI without a port. '
                               f'Specify a port (e.g. `{redirect_host}:8080`) to allow '
                               'automatic retrieval of authentication code '
                               'instead of having to copy and paste '
                               'the URL your browser is redirected to.')
        return self._get_auth_response_interactive(open_browser=open_browser)

    def _get_auth_response_local_server(self, redirect_port):
        server = start_local_http_server(redirect_port)
        self._open_auth_url()
        server.handle_request()

        if self.state is not None and server.state != self.state:
            raise SpotifyStateError(self.state, server.state)

        if server.auth_code is not None:
            return server.auth_code
        elif server.error is not None:
            raise SpotifyOauthError(f"Received error from OAuth server: {server.error}")
        else:
            raise SpotifyOauthError("Server listening on localhost has not been accessed")

    def _get_auth_response_interactive(self, open_browser=False):
        if open_browser or self.open_browser:
            self._open_auth_url()
            prompt = "Enter the URL you were redirected to: "
        else:
            url = self.get_authorize_url()
            prompt = (f"Go to the following URL: {url}\n"
                      f"Enter the URL you were redirected to: ")
        response = self._get_user_input(prompt)
        state, code = self.parse_auth_response_url(response)
        if self.state is not None and self.state != state:
            raise SpotifyStateError(self.state, state)
        return code

    def get_authorization_code(self, response=None):
        if response:
            return self.parse_response_code(response)
        return self._get_auth_response()

    def validate_token(self, token_info):
        if token_info is None:
            return None

        # if scopes don't match, then bail
        if "scope" not in token_info or not self._is_scope_subset(
                self.scope, token_info["scope"]
        ):
            return None

        if self.is_token_expired(token_info):
            token_info = self.refresh_access_token(
                token_info["refresh_token"]
            )

        return token_info

    def _add_custom_values_to_token_info(self, token_info):
        """
        Store some values that aren't directly provided by a Web API
        response.
        """
        token_info["expires_at"] = int(time.time()) + token_info["expires_in"]
        return token_info

    def get_pkce_handshake_parameters(self):
        self.code_verifier = self._get_code_verifier()
        self.code_challenge = self._get_code_challenge()

    def get_access_token(self, code=None, check_cache=True):
        """ Gets the access token for the app

            If the code is not given and no cached token is used, an
            authentication window will be shown to the user to get a new
            code.

            Parameters:
                - code - the response code from authentication
                - check_cache - if true, checks for a locally stored token
                                before requesting a new token
        """

        if check_cache:
            token_info = self.validate_token(self.cache_handler.get_cached_token())
            if token_info is not None:
                if self.is_token_expired(token_info):
                    token_info = self.refresh_access_token(
                        token_info["refresh_token"]
                    )
                return token_info["access_token"]

        if self.code_verifier is None or self.code_challenge is None:
            self.get_pkce_handshake_parameters()

        payload = {
            "client_id": self.client_id,
            "grant_type": "authorization_code",
            "code": code or self.get_authorization_code(),
            "redirect_uri": self.redirect_uri,
            "code_verifier": self.code_verifier
        }

        headers = {"Content-Type": "application/x-www-form-urlencoded"}

        logger.debug(f"Sending POST request to {self.OAUTH_TOKEN_URL} with Headers: "
                     f"{headers} and Body: {payload}")

        try:
            response = self._session.post(
                self.OAUTH_TOKEN_URL,
                data=payload,
                headers=headers,
                verify=True,
                proxies=self.proxies,
                timeout=self.requests_timeout,
            )
            response.raise_for_status()
            token_info = response.json()
            token_info = self._add_custom_values_to_token_info(token_info)
            self.cache_handler.save_token_to_cache(token_info)
            return token_info["access_token"]
        except requests.exceptions.HTTPError as http_error:
            self._handle_oauth_error(http_error)

    def refresh_access_token(self, refresh_token):
        payload = {
            "refresh_token": refresh_token,
            "grant_type": "refresh_token",
            "client_id": self.client_id,
        }

        headers = {"Content-Type": "application/x-www-form-urlencoded"}

        logger.debug(f"Sending POST request to {self.OAUTH_TOKEN_URL} with Headers: "
                     f"{headers} and Body: {payload}")

        try:
            response = self._session.post(
                self.OAUTH_TOKEN_URL,
                data=payload,
                headers=headers,
                proxies=self.proxies,
                timeout=self.requests_timeout,
            )
            response.raise_for_status()
            token_info = response.json()
            token_info = self._add_custom_values_to_token_info(token_info)
            if "refresh_token" not in token_info:
                token_info["refresh_token"] = refresh_token
            self.cache_handler.save_token_to_cache(token_info)
            return token_info
        except requests.exceptions.HTTPError as http_error:
            self._handle_oauth_error(http_error)

    def parse_response_code(self, url):
        """ Parse the response code in the given response url

            Parameters:
                - url - the response url
        """
        _, code = self.parse_auth_response_url(url)
        if code is None:
            return url
        else:
            return code

    @staticmethod
    def parse_auth_response_url(url):
        return SpotifyOAuth.parse_auth_response_url(url)

    def get_cached_token(self):
        warnings.warn("Calling get_cached_token directly on the SpotifyPKCE object will be " +
                      "deprecated. Instead, please specify a CacheFileHandler instance as " +
                      "the cache_handler in SpotifyOAuth and use the CacheFileHandler's " +
                      "get_cached_token method. You can replace:\n\tsp.get_cached_token()" +
                      "\n\nWith:\n\tsp.validate_token(sp.cache_handler.get_cached_token())",
                      DeprecationWarning
                      )
        return self.validate_token(self.cache_handler.get_cached_token())

    def _save_token_info(self, token_info):
        warnings.warn("Calling _save_token_info directly on the SpotifyOAuth object will be " +
                      "deprecated. Instead, please specify a CacheFileHandler instance as " +
                      "the cache_handler in SpotifyOAuth and use the CacheFileHandler's " +
                      "save_token_to_cache method.",
                      DeprecationWarning
                      )
        self.cache_handler.save_token_to_cache(token_info)
        return None


class SpotifyImplicitGrant(SpotifyAuthBase):
    """ Implements Implicit Grant Flow for client apps

    This auth manager enables *user and non-user* endpoints with only
    a client secret, redirect uri, and username. The user will need to
    copy and paste a URI from the browser every hour.

    Security Warning
    -----------------
    The OAuth standard no longer recommends the Implicit Grant Flow for
    client-side code. Spotify has implemented the OAuth-suggested PKCE
    extension that removes the need for a client secret in the
    Authentication Code flow. Use the SpotifyPKCE auth manager instead
    of SpotifyImplicitGrant.

    SpotifyPKCE contains all the functionality of
    SpotifyImplicitGrant, plus automatic response retrieval and
    refreshable tokens. Only a few replacements need to be made:

    * get_auth_response()['access_token'] ->
      get_access_token(get_authorization_code())
    * get_auth_response() ->
      get_access_token(get_authorization_code()); get_cached_token()
    * parse_response_token(url)['access_token'] ->
      get_access_token(parse_response_code(url))
    * parse_response_token(url) ->
      get_access_token(parse_response_code(url)); get_cached_token()

    The security concern in the Implicit Grant flow is that the token is
    returned in the URL and can be intercepted through the browser. A
    request with an authorization code and proof of origin could not be
    easily intercepted without a compromised network.
    """
    OAUTH_AUTHORIZE_URL = "https://accounts.spotify.com/authorize"

    def __init__(self,
                 client_id=None,
                 redirect_uri=None,
                 state=None,
                 scope=None,
                 cache_path=None,
                 username=None,
                 show_dialog=False,
                 cache_handler=None):
        """ Creates Auth Manager using the Implicit Grant flow

        **See help(SpotifyImplicitGrant) for full Security Warning**

        Parameters
        ----------
        * client_id: Must be supplied or set as environment variable
        * redirect_uri: Must be supplied or set as environment variable
        * state: May be supplied, no verification is performed
        * scope: Optional, either a list of scopes or comma separated string of scopes.
                 e.g, "playlist-read-private,playlist-read-collaborative"
        * cache_handler: An instance of the `CacheHandler` class to handle
                              getting and saving cached authorization tokens.
                              May be supplied, will otherwise use `CacheFileHandler`.
                              (takes precedence over `cache_path` and `username`)
        * cache_path: (deprecated) May be supplied, will otherwise be generated
                      (takes precedence over `username`)
        * username: (deprecated) May be supplied or set as environment variable
                    (will set `cache_path` to `.cache-{username}`)
        * show_dialog: Interpreted as boolean
        """
        logger.warning("Spotify is deprecating the Implicit "
                       "Grant Flow for client-side code. Use the SpotifyPKCE "
                       "auth manager instead of SpotifyImplicitGrant. For "
                       "more details and a guide to switching, see "
                       "help(SpotifyImplicitGrant).")

        self.client_id = client_id
        self.redirect_uri = redirect_uri
        self.state = state
        if username or cache_path:
            warnings.warn("Specifying cache_path or username as arguments to " +
                          "SpotifyImplicitGrant will be deprecated. Instead, please create " +
                          "a CacheFileHandler instance with the desired cache_path and " +
                          "username and pass it to SpotifyImplicitGrant as the " +
                          "cache_handler. For example:\n\n" +
                          "\tfrom spotipy.oauth2 import CacheFileHandler\n" +
                          "\thandler = CacheFileHandler(cache_path=cache_path, " +
                          "username=username)\n" +
                          "\tsp = spotipy.SpotifyImplicitGrant(client_id, client_secret, " +
                          "redirect_uri, cache_handler=handler)",
                          DeprecationWarning
                          )
            if cache_handler:
                warnings.warn("A cache_handler has been specified along with a cache_path or " +
                              "username. The cache_path and username arguments will be ignored.")
        if cache_handler:
            assert issubclass(type(cache_handler), CacheHandler), \
                "type(cache_handler): " + str(type(cache_handler)) + " != " + str(CacheHandler)
            self.cache_handler = cache_handler
        else:
            username = (username or os.getenv(CLIENT_CREDS_ENV_VARS["client_username"]))
            self.cache_handler = CacheFileHandler(
                username=username,
                cache_path=cache_path
            )
        self.scope = self._normalize_scope(scope)
        self.show_dialog = show_dialog
        self._session = None  # As to not break inherited __del__

    def validate_token(self, token_info):
        if token_info is None:
            return None

        # if scopes don't match, then bail
        if "scope" not in token_info or not self._is_scope_subset(
                self.scope, token_info["scope"]
        ):
            return None

        if self.is_token_expired(token_info):
            return None

        return token_info

    def get_access_token(self,
                         state=None,
                         response=None,
                         check_cache=True):
        """ Gets Auth Token from cache (preferred) or user interaction

        Parameters
        ----------
        * state: May be given, overrides (without changing) self.state
        * response: URI with token, can break expiration checks
        * check_cache: Interpreted as boolean
        """
        if check_cache:
            token_info = self.validate_token(self.cache_handler.get_cached_token())
            if not (token_info is None or self.is_token_expired(token_info)):
                return token_info["access_token"]

        if response:
            token_info = self.parse_response_token(response)
        else:
            token_info = self.get_auth_response(state)
        token_info = self._add_custom_values_to_token_info(token_info)
        self.cache_handler.save_token_to_cache(token_info)

        return token_info["access_token"]

    def get_authorize_url(self, state=None):
        """ Gets the URL to use to authorize this app """
        payload = {
            "client_id": self.client_id,
            "response_type": "token",
            "redirect_uri": self.redirect_uri,
        }
        if self.scope:
            payload["scope"] = self.scope
        if state is None:
            state = self.state
        if state is not None:
            payload["state"] = state
        if self.show_dialog:
            payload["show_dialog"] = True

        urlparams = urllibparse.urlencode(payload)

        return f"{self.OAUTH_AUTHORIZE_URL}?{urlparams}"

    def parse_response_token(self, url, state=None):
        """ Parse the response code in the given response url """
        remote_state, token, t_type, exp_in = self.parse_auth_response_url(url)
        if state is None:
            state = self.state
        if state is not None and remote_state != state:
            raise SpotifyStateError(state, remote_state)
        return {"access_token": token, "token_type": t_type,
                "expires_in": exp_in, "state": state}

    @staticmethod
    def parse_auth_response_url(url):
        url_components = urlparse(url)
        fragment_s = url_components.fragment
        query_s = url_components.query
        form = dict(i.split('=') for i
                    in (fragment_s or query_s or url).split('&'))
        if "error" in form:
            raise SpotifyOauthError(f"Received error from auth server: {form['error']}",
                                    state=form["state"])
        if "expires_in" in form:
            form["expires_in"] = int(form["expires_in"])
        return tuple(form.get(param) for param in ["state", "access_token",
                                                   "token_type", "expires_in"])

    def _open_auth_url(self, state=None):
        auth_url = self.get_authorize_url(state)
        try:
            webbrowser.open(auth_url)
            logger.info(f"Opened {auth_url} in your browser")
        except webbrowser.Error:
            logger.error(f"Please navigate here: {auth_url}")

    def get_auth_response(self, state=None):
        """ Gets a new auth **token** with user interaction """
        logger.info('User authentication requires interaction with your '
                    'web browser. Once you enter your credentials and '
                    'give authorization, you will be redirected to '
                    'a url.  Paste that url you were directed to to '
                    'complete the authorization.')

        redirect_info = urlparse(self.redirect_uri)
        redirect_host, redirect_port = get_host_port(redirect_info.netloc)
        # Implicit Grant tokens are returned in a hash fragment
        # which is only available to the browser. Therefore, interactive
        # URL retrieval is required.
        if (redirect_host in ("127.0.0.1", "localhost")
                and redirect_info.scheme == "http" and redirect_port):
            logger.warning('Using a local redirect URI with a '
                           'port, likely expecting automatic '
                           'retrieval. Due to technical limitations, '
                           'the authentication token cannot be '
                           'automatically retrieved and must be '
                           'copied and pasted.')

        self._open_auth_url(state)
        logger.info('Paste that url you were directed to in order to '
                    'complete the authorization')
        response = SpotifyImplicitGrant._get_user_input("Enter the URL you "
                                                        "were redirected to: ")
        return self.parse_response_token(response, state)

    def _add_custom_values_to_token_info(self, token_info):
        """
        Store some values that aren't directly provided by a Web API
        response.
        """
        token_info["expires_at"] = int(time.time()) + token_info["expires_in"]
        token_info["scope"] = self.scope
        return token_info

    def get_cached_token(self):
        """ Gets the cached token for the app

            .. deprecated::
            This method is deprecated and may be removed in a future version.
        """
        warnings.warn("Calling get_cached_token directly on the SpotifyImplicitGrant " +
                      "object will be deprecated. Instead, please specify a " +
                      "CacheFileHandler instance as the cache_handler in SpotifyOAuth " +
                      "and use the CacheFileHandler's get_cached_token method. " +
                      "You can replace:\n\tsp.get_cached_token()" +
                      "\n\nWith:\n\tsp.validate_token(sp.cache_handler.get_cached_token())",
                      DeprecationWarning
                      )
        return self.validate_token(self.cache_handler.get_cached_token())

    def _save_token_info(self, token_info):
        warnings.warn("Calling _save_token_info directly on the SpotifyImplicitGrant " +
                      "object will be deprecated. Instead, please specify a " +
                      "CacheFileHandler instance as the cache_handler in SpotifyOAuth " +
                      "and use the CacheFileHandler's save_token_to_cache method.",
                      DeprecationWarning
                      )
        self.cache_handler.save_token_to_cache(token_info)
        return None


class RequestHandler(BaseHTTPRequestHandler):
    def do_GET(self):
        self.server.auth_code = self.server.error = None
        try:
            state, auth_code = SpotifyOAuth.parse_auth_response_url(self.path)
            self.server.state = state
            self.server.auth_code = auth_code
        except SpotifyOauthError as error:
            self.server.error = error

        self.send_response(200)
        self.send_header("Content-Type", "text/html")
        self.end_headers()

        if self.server.auth_code:
            status = "successful"
        elif self.server.error:
            status = f"failed ({html.escape(str(self.server.error))})"
        else:
            self._write("<html><body><h1>Invalid request</h1></body></html>")
            return

        self._write(f"""<html>
<script>
window.close()
</script>
<body>
<h1>Authentication status: {status}</h1>
This window can be closed.
<script>
window.close()
</script>
<button class="closeButton" style="cursor: pointer" onclick="window.close();">
Close Window
</button>
</body>
</html>""")

    def _write(self, text):
        return self.wfile.write(text.encode("utf-8"))

    def log_message(self, format, *args):
        return


def start_local_http_server(port, handler=RequestHandler):
    server = HTTPServer(("127.0.0.1", port), handler)
    server.allow_reuse_address = True
    server.auth_code = None
    server.auth_token_form = None
    server.error = None
    return server
