import json
import random
import string
import time
from base64 import encodebytes
import requests
from Cryptodome.PublicKey import RSA
from Crypto.Hash import SHA256
from Crypto.Signature import PKCS1_v1_5
KEY_PATH = "证书路径"
mch_id = "商户号"
serial_No = "商户序列号"
def nonce_str(length=30):
char = string.ascii_letters + string.digits
return "".join(random.choice(char) for _ in range(length))
def sign_v3(method, url, timestamp, nonce_str, body):
sign_list = [
method,
url,
timestamp,
nonce_str,
body
]
message = '\n'.join(sign_list) + '\n'
with open(KEY_PATH, mode='r') as f:
signer = PKCS1_v1_5.new(RSA.importKey(f.read()))
signature = signer.sign(SHA256.new(message.encode("utf-8")))
sign = encodebytes(signature).decode("utf8").replace("\n", "")
return sign
def authorization(sign, nonce_str, timestamp):
"""
拼接api版本中请求头的authorization的值
"""
return 'WECHATPAY2-SHA256-RSA2048 mchid="{michid}",' \
'nonce_str="{nonce_str}",timestamp="{timestamp}",' \
'serial_no="{serial_no}",signature="{signature}"'.format(
michid=mch_id, nonce_str=nonce_str, timestamp=timestamp,
serial_no=serial_No, signature=sign)
def get_authorization(method: str, url: str, body: str = ""):
timestamp = str(int(time.time()))
nonce_tr = nonce_str()
sign_url = url.split("https://api.mch.weixin.qq.com",
maxsplit=1)[1]
sign = sign_v3(method, sign_url, timestamp, nonce_tr, body)
aut = authorization(sign, nonce_tr, timestamp)
return aut
file_path = "图片路径"
file_name = "图片名字"
url = "https://api.mch.weixin.qq.com/v3/merchant-service/" \
"images/upload"
with open(file_paths, "rb") as f:
data_sha256 = SHA256.new(f.read()).hexdigest()
meta = {"filename": file_name, "sha256": data_sha256}
aut = get_authorization("POST", url, json.dumps(meta))
headers = {
"Content-Type": "multipart/form-data",
"Authorization": aut
}
body = {
"meta": json.dumps(meta)
}
files = [("file", ("file_name", open(file_path, "rb"), "image/png"))]
response = requests.post(url, data=body, files=files, headers=headers)