rtc-voice-chat/backend/security/signer.py
2026-03-30 10:39:19 +08:00

116 lines
3.6 KiB
Python

"""
Copyright 2025 Beijing Volcano Engine Technology Co., Ltd. All Rights Reserved.
SPDX-license-identifier: BSD-3-Clause
Migrated from @volcengine/openapi Signer (AWS SigV4 compatible)
Reference: https://www.volcengine.com/docs/6348/69828
"""
import hashlib
import hmac
import json
from datetime import datetime, timezone
from urllib.parse import quote
def _sha256_hex(data: bytes) -> str:
return hashlib.sha256(data).hexdigest()
def _hmac_sha256(key: bytes, data: str) -> bytes:
return hmac.new(key, data.encode("utf-8"), hashlib.sha256).digest()
def _get_signing_key(secret_key: str, date_str: str, region: str, service: str) -> bytes:
k_date = _hmac_sha256(secret_key.encode("utf-8"), date_str)
k_region = _hmac_sha256(k_date, region)
k_service = _hmac_sha256(k_region, service)
k_signing = _hmac_sha256(k_service, "request")
return k_signing
class Signer:
"""
Signs requests to Volcengine OpenAPI using AWS SigV4-compatible signing.
"""
def __init__(self, request_data: dict, service: str):
"""
request_data: {
region: str,
method: str,
params: dict,
headers: dict,
body: dict,
}
service: e.g. "rtc"
"""
self.region = request_data.get("region", "cn-north-1")
self.method = request_data.get("method", "POST").upper()
self.path = request_data.get("path", "/")
self.params = request_data.get("params", {})
self.headers = request_data.get("headers", {})
self.body = request_data.get("body", {})
self.service = service
def add_authorization(self, account_config: dict):
access_key = account_config["accessKeyId"]
secret_key = account_config["secretKey"]
now = datetime.now(timezone.utc)
date_str = now.strftime("%Y%m%d")
datetime_str = now.strftime("%Y%m%dT%H%M%SZ")
self.headers["X-Date"] = datetime_str
self.headers["X-Content-Sha256"] = _sha256_hex(
json.dumps(self.body, separators=(",", ":"), ensure_ascii=False).encode(
"utf-8"
)
)
signed_header_names = sorted(k.lower() for k in self.headers)
canonical_headers = "".join(
f"{k}:{self.headers[next(h for h in self.headers if h.lower() == k)]}\n"
for k in signed_header_names
)
signed_headers_str = ";".join(signed_header_names)
sorted_params = sorted(self.params.items())
canonical_qs = "&".join(
f"{quote(str(k), safe='')}={quote(str(v), safe='')}"
for k, v in sorted_params
)
body_hash = self.headers["X-Content-Sha256"]
canonical_request = "\n".join(
[
self.method,
self.path,
canonical_qs,
canonical_headers,
signed_headers_str,
body_hash,
]
)
credential_scope = f"{date_str}/{self.region}/{self.service}/request"
string_to_sign = "\n".join(
[
"HMAC-SHA256",
datetime_str,
credential_scope,
_sha256_hex(canonical_request.encode("utf-8")),
]
)
signing_key = _get_signing_key(secret_key, date_str, self.region, self.service)
signature = hmac.new(
signing_key, string_to_sign.encode("utf-8"), hashlib.sha256
).hexdigest()
self.headers["Authorization"] = (
f"HMAC-SHA256 Credential={access_key}/{credential_scope}, "
f"SignedHeaders={signed_headers_str}, "
f"Signature={signature}"
)