diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml new file mode 100644 index 0000000..296555a --- /dev/null +++ b/.github/workflows/test.yml @@ -0,0 +1,28 @@ +name: Run tests + +on: [push, pull_request] + +jobs: + tests: + runs-on: ubuntu-latest + steps: + - uses: actions/checkout@v2 + - name: Set up Python + uses: actions/setup-python@v1 + with: + python-version: '3.x' + - name: Install dependencies + run: | + python -m pip install --upgrade pip + pip install -r requirements.txt \ + && pip install pytest pytest-coverage + - name: Run tests + run: | + pytest --cov-report xml --cov-report term:skip-covered --cov iap_auth/ tests/ + - name: "Upload coverage to Codecov" + uses: codecov/codecov-action@v1 + with: + files: ./coverage.xml + name: codecov-umbrella + fail_ci_if_error: true + verbose: true diff --git a/README.md b/README.md index 4d720ff..ab3d2a9 100644 --- a/README.md +++ b/README.md @@ -1,5 +1,7 @@ # Google IAP authentication ![Upload Python Package](https://github.com/lmtani/iap-auth/workflows/Upload%20Python%20Package/badge.svg?branch=master) +[![codecov](https://codecov.io/gh/lmtani/iap-auth/branch/master/graph/badge.svg)](https://codecov.io/gh/lmtani/iap-auth) + This module contains a helper class to make requests to an app running behind a Google Identity-Aware Proxy. The code was obtained from the Google [Programmatic authentication](https://cloud.google.com/iap/docs/authentication-howto#iap_make_request-python) document. @@ -11,20 +13,41 @@ pip install iap-auth ### Usage +#### With application default credentials + +If running outside Google Cloud Platform you need to specify env var GOOGLE_APPLICATION_CREDENTIALS to point to your authorized service account. + ```python from iap_auth import IapClient -IAM_SCOPE = 'https://www.googleapis.com/auth/iam' -OAUTH_TOKEN_URI = 'https://www.googleapis.com/oauth2/v4/token' CLIENT_ID = '.apps.googleusercontent.com' URL = 'https://your-iap-protected-website.com.br' METHOD = 'GET' kwargs = {} -cli = IapClient(OAUTH_TOKEN_URI, IAM_SCOPE) -resp = cli.make_iap_request(URL, CLIENT_ID, method=METHOD, **kwargs) +client = IapClient(CLIENT_ID) +resp = client.make_iap_request(URL, method=METHOD, **kwargs) # resp is a requests.Response object. ``` -> If running outside Google Cloud Platform you need to specify env var GOOGLE_APPLICATION_CREDENTIALS to point to your authorized service account. +#### Authenticating a user account + +This way users do not need to have a service account or Google SDK installed. You'll need to [create an OAuth 2.0 client ID](https://cloud.google.com/iap/docs/authentication-howto#authenticating_from_a_desktop_app) and then use this lib as follows: + +```python +from iap_auth.user_client import UserAuth, UserIapClient + +OAUTH_ID = ".googleusercontent.com" +OAUTH_SECRET = "z6..desktop-app-oauth-secret..Ys1" +KEY_PATH = "/where/to/store/your/user-credentials.json" +IAP_OAUTH_ID = '.apps.googleusercontent.com' + +URL = 'https://your-iap-protected-website.com.br' + +user_auth = UserAuth(OAUTH_ID, OAUTH_SECRET, KEY_PATH) +client = UserIapClient(user_auth, IAP_OAUTH_ID) +resp = client.make_iap_request(URL, method=METHOD) + +# resp is a requests.Response object. +``` diff --git a/iap_auth/__init__.py b/iap_auth/__init__.py index 5a8ba89..ac84af9 100644 --- a/iap_auth/__init__.py +++ b/iap_auth/__init__.py @@ -1,3 +1,4 @@ from .iap_client import IapClient +from .user_client import UserIapClient __version__ = "0.1.5" diff --git a/iap_auth/iap_client.py b/iap_auth/iap_client.py index 885205e..5bbb043 100644 --- a/iap_auth/iap_client.py +++ b/iap_auth/iap_client.py @@ -1,83 +1,35 @@ -import jwt -import time import requests -import google.auth -import google.auth.iam -import google.auth.app_engine -import google.oauth2.credentials -import google.oauth2.service_account -import requests_toolbelt.adapters.appengine -import google.auth.compute_engine.credentials +from typing import Optional, Dict, Any from google.auth.transport.requests import Request +from google.oauth2 import id_token + +from .util import is_token_valid class IapClient: """ - Helper to make requests to applications behind a IAP. + Helper to make requests to applications behind a IAP. This class requires + a Service Account from GCP environment or pointing a secret.json using + GOOGLE_APPLICATION_CREDENTIALS environment variable. + + For more details: + https://cloud.google.com/iap/docs/authentication-howto#authenticating_from_a_service_account Args: - oauth_token_uri: Google Token endpoint. - iam_scope: Google scope required (iam) + oauth_id: Oauth server client id. """ + decoded_token: Dict[str, Any] = {} - def __init__(self, oauth_token_uri, iam_scope): - self.OAUTH_TOKEN_URI = oauth_token_uri - self.IAM_SCOPE = iam_scope + def __init__(self, oauth_id): + self._oauth_id = oauth_id self._iap_token = None - def make_iap_request(self, url, client_id, method="GET", **kwargs): - """Makes a request to an application protected by Identity-Aware Proxy. - - Args: - url: The Identity-Aware Proxy-protected URL to fetch. - client_id: The client ID used by Identity-Aware Proxy. - method: The request method to use - ('GET', 'OPTIONS', 'HEAD', 'POST', 'PUT', 'PATCH', 'DELETE') - **kwargs: Any of the parameters defined for the request function: - https://github.com/requests/requests/blob/master/requests/api.py - If no timeout is provided, it is set to 90 by default. - - Returns: - A requests.Response object or raises exception if credential is not of a Service Account - """ + def make_iap_request(self, url, method="GET", **kwargs): if "timeout" not in kwargs: kwargs["timeout"] = 90 - bootstrap_credentials, _ = google.auth.default(scopes=[self.IAM_SCOPE]) - - if isinstance(bootstrap_credentials, google.oauth2.credentials.Credentials): - raise Exception("make_iap_request is only supported for service " "accounts.") - elif isinstance(bootstrap_credentials, google.auth.app_engine.Credentials): - requests_toolbelt.adapters.appengine.monkeypatch() - - bootstrap_credentials.refresh(Request()) - signer_email = bootstrap_credentials.service_account_email - if isinstance(bootstrap_credentials, google.auth.compute_engine.credentials.Credentials): - signer = google.auth.iam.Signer(Request(), bootstrap_credentials, signer_email) - else: - signer = bootstrap_credentials.signer - - service_account_credentials = google.oauth2.service_account.Credentials( - signer, signer_email, token_uri=self.OAUTH_TOKEN_URI, additional_claims={"target_audience": client_id} - ) - - if not self._is_token_valid(): - self._iap_token = self._get_google_open_id_connect_token(service_account_credentials) + if not is_token_valid(self.decoded_token.get("exp")): + self._iap_token = id_token.fetch_id_token(Request(), self._oauth_id) + self.decoded_token = id_token.verify_oauth2_token(self._iap_token, Request(), self._oauth_id) return requests.request(method, url, headers={"Authorization": "Bearer {}".format(self._iap_token)}, **kwargs) - - def _get_google_open_id_connect_token(self, service_account_credentials): - service_account_jwt = service_account_credentials._make_authorization_grant_assertion() - request = google.auth.transport.requests.Request() - body = {"assertion": service_account_jwt, "grant_type": google.oauth2._client._JWT_GRANT_TYPE} - token_response = google.oauth2._client._token_endpoint_request(request, self.OAUTH_TOKEN_URI, body) - - return token_response["id_token"] - - def _is_token_valid(self): - if not self._iap_token: - return False - decoded = jwt.decode(self._iap_token, verify=False) - if decoded["exp"] < time.time(): - return False - return True diff --git a/iap_auth/user_client.py b/iap_auth/user_client.py new file mode 100644 index 0000000..412106b --- /dev/null +++ b/iap_auth/user_client.py @@ -0,0 +1,154 @@ +import json +import logging +import webbrowser +from dataclasses import dataclass +from datetime import datetime +from pathlib import Path +from typing import Optional + +import requests + + +@dataclass +class Token: + access_token: str + expires_in: int + scope: str + token_type: str + id_token: str + expires_at: float + + @staticmethod + def from_dict(data: dict): + at = data["expires_in"] + datetime.now().timestamp() + return Token(**data, expires_at=at) + + def is_token_valid(self): + if self.expires_at > datetime.now().timestamp(): + return True + return False + + +@dataclass +class UserCredentials: + access_token: str + expires_in: int + refresh_token: str + scope: str + token_type: str + id_token: str + + @staticmethod + def from_dict(data: dict): + at = data["expires_in"] + datetime.now().timestamp() + return Token(**data, expires_at=at) + + def is_token_valid(self): + if self.expires_at > datetime.now().timestamp(): + return True + return False + + +def http_request(method, url, **kwargs): + return requests.request(method, url, **kwargs) + + +class UserAuth: + """ + For more details: + https://cloud.google.com/iap/docs/authentication-howto#authenticating_from_a_desktop_app + """ + + oauth2_token_endpoint = "https://oauth2.googleapis.com/token" + user_credentials = None + + def __init__(self, desktop_oauth_id, desktop_oauth_secret, credentials): + self.desktop_oauth_id = desktop_oauth_id + self.desktop_oauth_secret = desktop_oauth_secret + self.store_path = credentials + + def obtain_user_credentials(self) -> UserCredentials: + if Path(self.store_path).is_file(): + return self._load_stored_credentials() + else: + return self._ask_user_to_login() + + def _load_stored_credentials(self): + with open(self.store_path) as fh: + return UserCredentials(**json.load(fh)) + + def _ask_user_to_login(self): + code = self._ask_for_authorization_code() + user_credentials = self._get_credentials(code) + self._store_user_credentials(user_credentials) + return UserCredentials(**user_credentials) + + def _ask_for_authorization_code(self) -> str: + logging.debug("Getting authorization code.") + resource = "https://accounts.google.com/o/oauth2/v2/auth" + default_params = ( + "&response_type=code&scope=openid%20email&access_type=offline&redirect_uri=urn:ietf:wg:oauth:2.0:oob" + ) + + url = f"{resource}?client_id={self.desktop_oauth_id}{default_params}" + webbrowser.open_new(url) + code = input("Please, login with your Google Account and paste here your authorization code: ") + return code + + def _get_credentials(self, code: str): + logging.debug("Getting user credentials.") + data = dict( + client_id=self.desktop_oauth_id, + client_secret=self.desktop_oauth_secret, + code=code, + redirect_uri="urn:ietf:wg:oauth:2.0:oob", + grant_type="authorization_code", + ) + resp = http_request( + "POST", self.oauth2_token_endpoint, data=json.dumps(data), headers={"Content-Type": "application/json"} + ) + resp.raise_for_status() + return resp.json() + + def _store_user_credentials(self, user_credentials): + with open(self.store_path, "w") as fh: + json.dump(user_credentials, fh, indent=4) + logging.info("User credentials stored at '%s'", self.store_path) + + +class UserIapClient: + oauth2_token_endpoint = "https://oauth2.googleapis.com/token" + _access_token: Optional[Token] = None + + def __init__(self, user_auth: UserAuth, target_audience: str): + self._user_auth = user_auth + self._target_audience = target_audience + + def make_iap_request(self, url, method="GET", **kwargs): + user_credentials = self._user_auth.obtain_user_credentials() + access_token = self._get_id_token_for_target_audience(user_credentials.refresh_token, self._target_audience) + return http_request( + method, url, headers={"Authorization": "Bearer {}".format(access_token.id_token)}, **kwargs + ) + + def _get_id_token_for_target_audience(self, refresh_token: str, audience: str): + if self._access_token and self._access_token.is_token_valid(): + return self._access_token + + logging.debug("Getting id_token for requested audience.") + data = self._prepare_request_body(refresh_token, audience) + resp = http_request( + "POST", self.oauth2_token_endpoint, data=json.dumps(data), headers={"Content-Type": "application/json"} + ) + resp.raise_for_status() + self._access_token = Token.from_dict(resp.json()) + return self._access_token + + def _prepare_request_body(self, refresh_token, audience): + return dict( + client_id=self._user_auth.desktop_oauth_id, + client_secret=self._user_auth.desktop_oauth_secret, + refresh_token=refresh_token, + grant_type="refresh_token", + audience=audience, + ) diff --git a/iap_auth/util.py b/iap_auth/util.py new file mode 100644 index 0000000..c34c7c9 --- /dev/null +++ b/iap_auth/util.py @@ -0,0 +1,9 @@ +from datetime import datetime + + +def is_token_valid(expires_at: float): + print(expires_at) + print(datetime.now().timestamp()) + if expires_at and expires_at > datetime.now().timestamp(): + return True + return False diff --git a/pyproject.toml b/pyproject.toml index 55ec8d7..13146d9 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,2 +1,5 @@ [tool.black] line-length = 120 + +[tool.isort] +sections = ['FUTURE', 'STDLIB', 'THIRDPARTY', 'FIRSTPARTY', 'LOCALFOLDER'] diff --git a/requirements.txt b/requirements.txt index 9422bd6..60a8cd6 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,3 +1,2 @@ -requests_toolbelt==0.9.1 -google-auth-oauthlib==0.4.1 -PyJWT==1.7.1 +google-auth==1.29.0 +requests==2.25.1 diff --git a/setup.py b/setup.py index 18dcd48..fceae52 100644 --- a/setup.py +++ b/setup.py @@ -5,7 +5,7 @@ setup( name='iap-auth', - version='??VERSION??', + version='0.2.1a1', description='Performs authentication for Google Cloud Identity Aware Proxy from a service account', long_description=long_description, long_description_content_type="text/markdown", @@ -14,6 +14,6 @@ author_email='ltaniguti@gmail.com', license='unlicense', packages=['iap_auth'], - install_requires=['requests_toolbelt', 'google-auth-oauthlib'], + install_requires=['google-auth', 'requests'], zip_safe=False ) diff --git a/tests/__init__.py b/tests/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/tests/conftest.py b/tests/conftest.py new file mode 100644 index 0000000..5f0e248 --- /dev/null +++ b/tests/conftest.py @@ -0,0 +1,17 @@ +from pytest import fixture + +from iap_auth.user_client import UserIapClient, UserAuth + + +@fixture(name="user_auth", scope="function") +def _user_auth() -> UserAuth: + fake_oauth_id = "this-is-oauth_client_id" + fake_oauth_secret = "this-is-oauth_client_secret" + credentials_path = "./credentials.json" + return UserAuth(fake_oauth_id, fake_oauth_secret, credentials_path) + + +@fixture() +def user_iap_client(user_auth) -> UserIapClient: + fake_audience = "to-this-audience" + return UserIapClient(user_auth, fake_audience) diff --git a/tests/test_user_auth.py b/tests/test_user_auth.py new file mode 100644 index 0000000..c44a96b --- /dev/null +++ b/tests/test_user_auth.py @@ -0,0 +1,138 @@ +from datetime import datetime +from pathlib import Path +from unittest import mock + +from pytest import MonkeyPatch + +from iap_auth.user_client import Token, UserAuth, UserCredentials, UserIapClient + + +class MockResponse: + def __init__(self, json_data, status_code): + self.json_data = json_data + self.status_code = status_code + + def json(self): + return self.json_data + + def raise_for_status(self): + pass + + +@mock.patch("requests.request") +@mock.patch.object(Path, "is_file", return_value=True) +def test_user_auth_should_negotiate_new_token_if_existing_one_is_expired( + path_mock, requests, user_iap_client: UserIapClient, user_auth: UserAuth, monkeypatch: MonkeyPatch +): + # Setup + monkeypatch.setattr( + user_auth, + "_load_stored_credentials", + lambda: UserCredentials( + access_token="aaaasss", + scope="iam", + token_type="refresh", + refresh_token="aaaa", + expires_in=600, + id_token="bbbb", + ), + ) + + monkeypatch.setattr( + user_iap_client, + "_access_token", + Token( + id_token="id-token-aaaaaaaaaaaa", + token_type="Bearer", + access_token="access-token-aaaa", + expires_in=600, + expires_at=datetime.now().timestamp() - 50, + scope="iam", + ), + ) + + json_data = { + "access_token": "not-used-in-iap", + "expires_in": 10, + "scope": "iam", + "token_type": "Bearer", + "id_token": "user-token", + } + requests.return_value = MockResponse(json_data, 200) + + # Call + user_iap_client.make_iap_request("https://google.com.br") + + # Assert + expected_id_token_for_audience_call = mock.call( + "POST", + "https://oauth2.googleapis.com/token", + data='{"client_id": "this-is-oauth_client_id", "client_secret": "this-is-oauth_client_secret", "refresh_token": "aaaa", "grant_type": "refresh_token", "audience": "to-this-audience"}', + headers={"Content-Type": "application/json"}, + ) + expected_iap_protected_resource_call = mock.call( + "GET", "https://google.com.br", headers={"Authorization": "Bearer user-token"} + ) + calls = [expected_id_token_for_audience_call, expected_iap_protected_resource_call] + requests.assert_has_calls(calls, any_order=False) + + +@mock.patch("builtins.input", return_value="user/authorization-code") +@mock.patch("webbrowser.open_new") +@mock.patch("requests.request") +@mock.patch.object(Path, "is_file", return_value=False) +def test_user_auth_should_ask_for_user_credentials_if_no_one_found_in_path( + path_mock, + requests, + webbrowser, + input_mock, + user_iap_client: UserIapClient, + user_auth: UserAuth, + monkeypatch: MonkeyPatch, +): + # Setup + + json_data1 = { + "access_token": "not-used-in-iap", + "expires_in": 600, + "scope": "iam", + "token_type": "refresh", + "refresh_token": "this-allows-reuse-this-credential", + "id_token": "user-token", + } + json_data2 = { + "access_token": "not-used-in-iap", + "expires_in": 600, + "scope": "iam", + "token_type": "Bearer", + "id_token": "user-token2", + } + desired_return = {"resp": "google.com.br-response"} + requests.side_effect = [ + MockResponse(json_data1, 200), + MockResponse(json_data2, 200), + MockResponse(desired_return, 200), + ] + + # Call + user_iap_client.make_iap_request("https://google.com.br") + + # Assert + expected_authorization_call = mock.call( + "POST", + "https://oauth2.googleapis.com/token", + data='{"client_id": "this-is-oauth_client_id", "client_secret": "this-is-oauth_client_secret", "code": "user/authorization-code", "redirect_uri": "urn:ietf:wg:oauth:2.0:oob", "grant_type": "authorization_code"}', + headers={"Content-Type": "application/json"}, + ) + expected_id_token_for_audience_call = mock.call( + "POST", + "https://oauth2.googleapis.com/token", + data='{"client_id": "this-is-oauth_client_id", "client_secret": "this-is-oauth_client_secret", "refresh_token": "this-allows-reuse-this-credential", "grant_type": "refresh_token", "audience": "to-this-audience"}', + headers={"Content-Type": "application/json"}, + ) + expected_iap_protected_resource_call = mock.call( + "GET", "https://google.com.br", headers={"Authorization": "Bearer user-token2"} + ) + calls = [expected_authorization_call, expected_id_token_for_audience_call, expected_iap_protected_resource_call] + + requests.assert_has_calls(calls, any_order=False)