108 lines
3.4 KiB
Python
108 lines
3.4 KiB
Python
import hashlib
|
|
import hmac
|
|
import json
|
|
from datetime import datetime, timezone
|
|
from urllib.parse import quote
|
|
|
|
|
|
def _sha256_hex(data: bytes) -> str:
|
|
return hashlib.sha256(data).hexdigest()
|
|
|
|
|
|
def _hmac_sha256(key: bytes, data: str) -> bytes:
|
|
return hmac.new(key, data.encode("utf-8"), hashlib.sha256).digest()
|
|
|
|
|
|
def _get_signing_key(secret_key: str, date_str: str, region: str, service: str) -> bytes:
|
|
k_date = _hmac_sha256(secret_key.encode("utf-8"), date_str)
|
|
k_region = _hmac_sha256(k_date, region)
|
|
k_service = _hmac_sha256(k_region, service)
|
|
k_signing = _hmac_sha256(k_service, "request")
|
|
return k_signing
|
|
|
|
|
|
class Signer:
|
|
"""
|
|
Signs requests to Volcengine OpenAPI using AWS SigV4-compatible signing.
|
|
"""
|
|
|
|
def __init__(self, request_data: dict, service: str):
|
|
"""
|
|
request_data: {
|
|
region: str,
|
|
method: str,
|
|
params: dict,
|
|
headers: dict,
|
|
body: dict,
|
|
}
|
|
service: e.g. "rtc"
|
|
"""
|
|
self.region = request_data.get("region", "cn-north-1")
|
|
self.method = request_data.get("method", "POST").upper()
|
|
self.path = request_data.get("path", "/")
|
|
self.params = request_data.get("params", {})
|
|
self.headers = request_data.get("headers", {})
|
|
self.body = request_data.get("body", {})
|
|
self.service = service
|
|
|
|
def add_authorization(self, account_config: dict):
|
|
access_key = account_config["accessKeyId"]
|
|
secret_key = account_config["secretKey"]
|
|
|
|
now = datetime.now(timezone.utc)
|
|
date_str = now.strftime("%Y%m%d")
|
|
datetime_str = now.strftime("%Y%m%dT%H%M%SZ")
|
|
|
|
self.headers["X-Date"] = datetime_str
|
|
self.headers["X-Content-Sha256"] = _sha256_hex(
|
|
json.dumps(self.body, separators=(",", ":"), ensure_ascii=False).encode(
|
|
"utf-8"
|
|
)
|
|
)
|
|
|
|
signed_header_names = sorted(k.lower() for k in self.headers)
|
|
canonical_headers = "".join(
|
|
f"{k}:{self.headers[next(h for h in self.headers if h.lower() == k)]}\n"
|
|
for k in signed_header_names
|
|
)
|
|
signed_headers_str = ";".join(signed_header_names)
|
|
|
|
sorted_params = sorted(self.params.items())
|
|
canonical_qs = "&".join(
|
|
f"{quote(str(k), safe='')}={quote(str(v), safe='')}"
|
|
for k, v in sorted_params
|
|
)
|
|
|
|
body_hash = self.headers["X-Content-Sha256"]
|
|
canonical_request = "\n".join(
|
|
[
|
|
self.method,
|
|
self.path,
|
|
canonical_qs,
|
|
canonical_headers,
|
|
signed_headers_str,
|
|
body_hash,
|
|
]
|
|
)
|
|
|
|
credential_scope = f"{date_str}/{self.region}/{self.service}/request"
|
|
string_to_sign = "\n".join(
|
|
[
|
|
"HMAC-SHA256",
|
|
datetime_str,
|
|
credential_scope,
|
|
_sha256_hex(canonical_request.encode("utf-8")),
|
|
]
|
|
)
|
|
|
|
signing_key = _get_signing_key(secret_key, date_str, self.region, self.service)
|
|
signature = hmac.new(
|
|
signing_key, string_to_sign.encode("utf-8"), hashlib.sha256
|
|
).hexdigest()
|
|
|
|
self.headers["Authorization"] = (
|
|
f"HMAC-SHA256 Credential={access_key}/{credential_scope}, "
|
|
f"SignedHeaders={signed_headers_str}, "
|
|
f"Signature={signature}"
|
|
)
|