def upload_file_stream(user_id: str, filename: str, data: bytes, content_type: str) -> Dict[str, str]:
"""
统一的对象存储上传入口:
- 环境变量 STORAGE_DRIVER=COS 时,使用腾讯云 COS
- 否则走本地目录 STORAGE_LOCAL_DIR(默认 /tmp/saas_uploads)
返回:
- key: 对象键(路径)
- url: 可访问的URL(COS为公网,LOCAL需自行映射静态目录或开发使用)
"""
driver = (os.getenv("STORAGE_DRIVER") or "LOCAL").upper()
ts = int(time.time())
fid = uuid.uuid4().hex[:12]
fname = _safe_filename(filename)
key = f"uploads/{user_id}/{ts}-{fid}-{fname}"
if driver == "COS":
bucket = os.getenv("COS_BUCKET")
region = os.getenv("COS_REGION")
base_url = os.getenv("COS_BASE_URL")
secret_id = os.getenv("COS_SECRET_ID")
secret_key = os.getenv("COS_SECRET_KEY")
token = None
if not (secret_id and secret_key):
try:
resp = urllib_request.urlopen("http://api.weixin.qq.com/_/cos/getauth", timeout=3)
if resp.status == 200:
auth_data = json.loads(resp.read().decode('utf-8'))
secret_id = auth_data.get("TmpSecretId")
secret_key = auth_data.get("TmpSecretKey")
token = auth_data.get("Token")
except Exception:
pass
if not all([secret_id, secret_key, bucket, region]):
print(f"[DEBUG] Missing COS Config: bucket={bucket}, region={region}, has_secret_id={bool(secret_id)}, has_secret_key={bool(secret_key)}")
if not all([secret_id, secret_key, bucket, region]):
raise RuntimeError("COS config missing: COS_BUCKET|COS_REGION is required. COS_SECRET_ID|COS_SECRET_KEY is required unless in WXCloud environment.")
try:
from qcloud_cos import CosConfig, CosS3Client
except Exception:
raise RuntimeError("Missing dependency: cos-python-sdk-v5. Please `pip install cos-python-sdk-v5`")
config = CosConfig(Region=region, SecretId=secret_id, SecretKey=secret_key, Token=token)
client = CosS3Client(config)
client.put_object(
Bucket=bucket,
Body=data,
Key=key,
ContentType=content_type or "application/octet-stream",
)
if base_url:
url = f"{base_url.rstrip('/')}/{key}"
else:
url = f"https://{bucket}.cos.{region}.myqcloud.com/{key}"
env_id = os.getenv("WX_CLOUD_ENV_ID")
file_id = None
if env_id:
file_id = f"cloud://{env_id}.{bucket}/{key}"
signed_url = client.get_presigned_url(
Method='GET',
Bucket=bucket,
Key=key,
Expired=3600
)
print('signed_url: ', signed_url, flush=True)
return {"key": key, "url": url, "file_id": file_id, "signed_url": signed_url}
返回的signed_url,直接用浏览器打开报错