import time
from urllib.parse import urljoin
import requests
from requests import Session
import jwt
import uuid
from datetime import datetime, timedelta, timezone
class SEAPIClient:
def __init__(
self,
tenant: str,
auth_url: str,
api_identifier: str,
facade_url: str,
client_id: str,
client_secret: str = None,
private_key: str = None,
timeout: int = 30,
):
self._tenant = tenant
self._client_id = client_id
self._client_secret = client_secret
self._private_key = private_key
self._auth_url = auth_url
self._api_identifier = api_identifier
self._facade_url = facade_url
self._session = Session()
self._auth_token = None
self._token_expiry = None
self._timeout = timeout
# Validate that we have either client_secret or private_key
if not client_secret and not private_key:
raise ValueError("Either client_secret or private_key must be provided")
if client_secret and private_key:
raise ValueError(
"Only one of client_secret or private_key should be provided"
)
def _get_auth_token(self):
# 5 min expiration buffer
if not self._auth_token or (
self._token_expiry and time.time() > self._token_expiry - 300
):
self._auth_token = self._fetch_new_auth_token()
return self._auth_token
def _create_jwt_assertion(self):
"""Create a JWT assertion for the private key flow"""
if not self._private_key:
raise ValueError("Private key is required for JWT assertion flow")
now = datetime.now(timezone.utc)
expiration = now + timedelta(minutes=5)
audience = f"{self._auth_url}/"
payload = {
"iss": self._client_id,
"sub": self._client_id,
"aud": audience,
"jti": str(uuid.uuid4()),
"exp": int(expiration.timestamp()),
"iat": int(now.timestamp()),
}
return jwt.encode(payload, self._private_key, algorithm="RS256")
def _fetch_new_auth_token(self):
url = urljoin(self._auth_url, "/oauth/token")
headers = {"Content-Type": "application/x-www-form-urlencoded"}
if self._private_key:
# Private key flow - use JWT Bearer assertion
client_assertion = self._create_jwt_assertion()
data = {
"grant_type": "client_credentials",
"client_id": self._client_id,
"client_assertion_type": "urn:ietf:params:oauth:client-assertion-type:jwt-bearer",
"client_assertion": client_assertion,
"tenant": self._tenant,
"audience": self._api_identifier,
}
else:
# Client credentials flow
data = {
"client_id": self._client_id,
"client_secret": self._client_secret,
"audience": self._api_identifier,
"grant_type": "client_credentials",
"tenant": self._tenant,
}
response = requests.post(headers=headers, url=url, data=data, timeout=60)
if response.status_code != 200:
raise Exception(
f"Failed to get auth token: {response.status_code} - {response.text}"
)
token = response.json()["access_token"]
decoded_token = jwt.decode(token, options={"verify_signature": False})
self._token_expiry = decoded_token["exp"]
return f"Bearer {token}"
def _get_url(self):
protocol, base_url = self._facade_url.split("://")
return f"{protocol}://{self._tenant}.{base_url}"
def _create_headers(
self,
content_type: str = "application/json",
version: str = None,
):
headers = {"Content-Type": content_type}
if version:
headers["X-API-Version"] = version
headers["Authorization"] = self._get_auth_token()
return headers
def get(self, endpoint: str, version: str = None):
url = urljoin(self._get_url(), endpoint)
headers = self._create_headers(version=version)
print(url)
print(headers)
response = self._session.get(headers=headers, url=url, timeout=self._timeout)
return response
def post(self, endpoint: str, data: dict, version: str = None):
url = urljoin(self._get_url(), endpoint)
headers = self._create_headers(version=version)
response = self._session.post(
headers=headers, url=url, json=data, timeout=self._timeout
)
return response
def patch(self, endpoint: str, data: dict, version: str = None):
url = urljoin(self._get_url(), endpoint)
headers = self._create_headers(version=version)
response = self._session.patch(
headers=headers, url=url, json=data, timeout=self._timeout
)
return response
def put(self, endpoint: str, data: dict, version: str = None):
url = urljoin(self._get_url(), endpoint)
headers = self._create_headers(version=version)
response = self._session.put(
headers=headers, url=url, json=data, timeout=self._timeout
)
return response
def delete(self, endpoint: str, version: str = None):
url = urljoin(self._get_url(), endpoint)
headers = self._create_headers(version=version)
response = self._session.delete(headers=headers, url=url, timeout=self._timeout)
return response