""" Copyright 2025 Beijing Volcano Engine Technology Co., Ltd. All Rights Reserved. SPDX-license-identifier: BSD-3-Clause Migrated from @volcengine/openapi Signer (AWS SigV4 compatible) Reference: https://www.volcengine.com/docs/6348/69828 """ import hashlib import hmac import json from datetime import datetime, timezone from urllib.parse import quote def _sha256_hex(data: bytes) -> str: return hashlib.sha256(data).hexdigest() def _hmac_sha256(key: bytes, data: str) -> bytes: return hmac.new(key, data.encode("utf-8"), hashlib.sha256).digest() def _get_signing_key(secret_key: str, date_str: str, region: str, service: str) -> bytes: k_date = _hmac_sha256(("HMAC-SHA256" + secret_key).encode("utf-8"), date_str) k_region = _hmac_sha256(k_date, region) k_service = _hmac_sha256(k_region, service) k_signing = _hmac_sha256(k_service, "request") return k_signing class Signer: """ Signs requests to Volcengine OpenAPI using AWS SigV4-compatible signing. """ def __init__(self, request_data: dict, service: str): """ request_data: { region: str, method: str, params: dict, # query params (Action, Version, ...) headers: dict, body: dict, } service: e.g. "rtc" """ self.region = request_data.get("region", "cn-north-1") self.method = request_data.get("method", "POST").upper() self.params = request_data.get("params", {}) self.headers = request_data.get("headers", {}) self.body = request_data.get("body", {}) self.service = service def add_authorization(self, account_config: dict): """ Computes and injects Authorization + X-Date headers into self.headers. account_config: { accessKeyId: str, secretKey: str } """ access_key = account_config["accessKeyId"] secret_key = account_config["secretKey"] now = datetime.now(timezone.utc) date_str = now.strftime("%Y%m%d") datetime_str = now.strftime("%Y%m%dT%H%M%SZ") self.headers["X-Date"] = datetime_str self.headers["X-Content-Sha256"] = _sha256_hex( json.dumps(self.body, separators=(",", ":"), ensure_ascii=False).encode("utf-8") ) # Canonical headers: sorted lowercase header names signed_header_names = sorted(k.lower() for k in self.headers) canonical_headers = "".join( f"{k}:{self.headers[next(h for h in self.headers if h.lower() == k)]}\n" for k in signed_header_names ) signed_headers_str = ";".join(signed_header_names) # Canonical query string sorted_params = sorted(self.params.items()) canonical_qs = "&".join( f"{quote(str(k), safe='')}={quote(str(v), safe='')}" for k, v in sorted_params ) # Canonical request body_hash = self.headers["X-Content-Sha256"] canonical_request = "\n".join([ self.method, "/", canonical_qs, canonical_headers, signed_headers_str, body_hash, ]) credential_scope = f"{date_str}/{self.region}/{self.service}/request" string_to_sign = "\n".join([ "HMAC-SHA256", datetime_str, credential_scope, _sha256_hex(canonical_request.encode("utf-8")), ]) signing_key = _get_signing_key(secret_key, date_str, self.region, self.service) signature = hmac.new(signing_key, string_to_sign.encode("utf-8"), hashlib.sha256).hexdigest() self.headers["Authorization"] = ( f"HMAC-SHA256 Credential={access_key}/{credential_scope}, " f"SignedHeaders={signed_headers_str}, " f"Signature={signature}" )