所需库
WeiXinPayV3/requirement.txt
Flask==1.0.2
pyOpenSSL==20.0.1
cryptography==3.4.7
requests==2.25.1
核心API对接
WeiXinPayV3/api.py
import datetime
import json
import os
import random
import string
import time
import OpenSSL
import base64
import requests
from urllib.parse import urlparse
from cryptography.hazmat.primitives.ciphers.aead import AESGCM
# V3版支付
class WxPayV3(object):
basedir = os.path.abspath(os.path.dirname(__file__)) # 获取当前目录
def __init__(self, ignore_resp_sign=False):
"""
:param ignore_resp_sign: 是否忽略应答验签(用于第一次缓存证书)
"""
self.app_id = '{app_id}'
self.mch_id = '{mch_id}'
self.api_v3_key = '{api_v3_key}'
self.serial_no = '{serial_no}' # 证书序列号(微信支付商户平台获取)
self.ignore_resp_sign = ignore_resp_sign
self.certificates_path = self.basedir + '/certificates.json'
# 支付签名
@staticmethod
def sign(s):
# 这里的key为示例
api_client_key = ("-----BEGIN PRIVATE KEY-----\n"
"MIIEvQIBADANBgkqhkiG9w0BAQEFAASCBKcwggSjAgEAAoIBAQDAPM45XP7PC6qe\n"
"MIIEvQIBADANBgkqhkiG9w0BAQEFAASCBKcwggSjAgEAAoIBAQDAPM45XP7PC6qe\n"
"Kkh/7UGUD/hxkPwNB2PKAms=\n"
"-----END PRIVATE KEY-----")
pkey = OpenSSL.crypto.load_privatekey(OpenSSL.crypto.FILETYPE_PEM, api_client_key)
signature = base64.b64encode(OpenSSL.crypto.sign(pkey=pkey, data=s.encode(), digest='SHA256'))
return signature.decode()
# 请求签名
def _auth(self, req):
data = req.method + '\n'
parsed = urlparse(req.url)
data += parsed.path
if parsed.query:
data += "?" + parsed.query
data += '\n'
timestamp = str(int(time.time()))
data += timestamp + '\n'
nonce_str = ''.join(random.sample(string.ascii_letters + string.digits, 32))
data += nonce_str + '\n'
if req.data:
data += req.data
data += '\n'
signature = self.sign(data)
authorization = ('WECHATPAY2-SHA256-RSA2048 '
'mchid="{0}",nonce_str="{1}",'
'signature="{2}",timestamp="{3}",'
'serial_no="{4}"').format(self.mch_id,
nonce_str,
signature,
timestamp,
self.serial_no)
req.headers["Authorization"] = authorization
req.headers['Content-Type'] = 'application/json'
req.headers['Accept'] = 'application/json'
req.headers['User-Agent'] = 'requests ' + requests.__version__
r = req.prepare()
s = requests.Session()
resp = s.send(r, timeout=2)
# 验签
if not self.ignore_resp_sign:
nonce = resp.headers.get('Wechatpay-Nonce')
signature = resp.headers.get('Wechatpay-Signature')
serial = resp.headers.get('Wechatpay-Serial')
timestamp = resp.headers.get('Wechatpay-Timestamp')
body = resp.text
cer = self.get_certificate_by_serial_no(serial)
ret = self.resp_sign(timestamp=timestamp,
nonce=nonce,
body=body,
cer=cer,
signature=signature)
assert ret is None, "resp sign error"
return resp
# Native下单
def native_pay(self, total, out_trade_no, attach=None):
url = 'https://api.mch.weixin.qq.com/v3/pay/transactions/native'
notify_url = 'https://pay.weixin.com/notify'
data = dict(appid=self.app_id,
mchid=self.mch_id,
description="测试native支付",
notify_url=notify_url,
out_trade_no=out_trade_no,
amount=dict(total=total,
currency='CNY'))
if attach is not None:
data['attach'] = attach
req = requests.Request(method="POST", url=url, data=json.dumps(data))
response = self._auth(req=req)
result = response.json()
return result['code_url']
@staticmethod
def gen_out_trade_no(trade_type="jsapi"):
out_trade_no = "wx" + trade_type + datetime.datetime.now().strftime("%Y%m%d%H%M%S")
out_trade_no += str(random.randint(1000000, 9999999))
return out_trade_no
# JSAPI下单
def jsapi_pay(self, total, out_trade_no, openid, attach=None):
url = 'https://api.mch.weixin.qq.com/v3/pay/transactions/jsapi'
notify_url = 'https://pay.weixin.com/notify'
data = dict(appid=self.app_id,
mchid=self.mch_id,
description="测试jsapi支付",
notify_url=notify_url,
out_trade_no=out_trade_no,
amount=dict(total=total,
currency='CNY'),
payer=dict(openid=openid))
if attach is not None:
data['attach'] = attach
req = requests.Request(method="POST", url=url, data=json.dumps(data))
response = self._auth(req=req)
result = response.json()
return result['prepay_id']
# 查询订单
def query_order(self, out_trade_no=None, transaction_id=None):
if out_trade_no is None and transaction_id is None:
raise ValueError('Param Error')
if out_trade_no is not None:
url = 'https://api.mch.weixin.qq.com/v3/pay/transactions/out-trade-no/{0}'.format(out_trade_no)
else:
url = 'https://api.mch.weixin.qq.com/v3/pay/transactions/id/{0}'.format(out_trade_no)
url += '?' + 'mchid=' + self.mch_id
req = requests.Request(method="GET", url=url)
response = self._auth(req=req)
result = response.json()
return result
# 获取微信平台证书并缓存
def get_certificates_from_wx(self):
# TODO 这里实现非常简陋,鉴于V3版本证书会动态更新建议使用中控服务统一获取和安全存储证书(惰性更新也可)
url = 'https://api.mch.weixin.qq.com/v3/certificates'
req = requests.Request(method="GET", url=url)
response = self._auth(req=req)
result = response.json()
with open(self.certificates_path, 'w+') as f:
f.write(json.dumps(result['data']))
return result
def get_certificate_by_serial_no(self, serial_no):
if not os.path.exists(self.certificates_path):
raise FileNotFoundError('请先下载证书进行缓存')
with open(self.certificates_path, 'r') as f:
content = f.read()
certificates = json.loads(content)
nonce, ciphertext, associated_data = None, None, None
for certificate in certificates:
if certificate['serial_no'] == serial_no:
nonce = certificate['encrypt_certificate']['nonce']
ciphertext = certificate['encrypt_certificate']['ciphertext']
associated_data = certificate['encrypt_certificate']['associated_data']
break
if ciphertext is None:
raise ValueError('certificate not found')
cer = self.decrypt_aes_gcm(nonce=nonce,
ciphertext=ciphertext,
associated_data=associated_data)
return cer
def decrypt_aes_gcm(self, nonce, ciphertext, associated_data):
aes_gcm = AESGCM(self.api_v3_key.encode())
plaintext = aes_gcm.decrypt(nonce=nonce.encode(),
associated_data=associated_data.encode(),
data=base64.b64decode(ciphertext))
return plaintext
# 应答验签
@staticmethod
def resp_sign(timestamp, nonce, body, cer, signature):
cert = OpenSSL.crypto.load_certificate(OpenSSL.crypto.FILETYPE_PEM, cer)
s = '{0}\n{1}\n{2}\n'.format(timestamp, nonce, body)
signature = base64.b64decode(signature)
try:
OpenSSL.crypto.verify(cert=cert, signature=signature, data=s, digest='SHA256')
except Exception as e:
return e.__str__()
return None
# JsApi签名(未验证)
def jsapi_sign(self, prepay_id):
timestamp = int(time.time())
nonce_str = ''.join(random.sample(string.ascii_letters + string.digits, 32))
package = "prepay_id=" + prepay_id
sign_type = 'RSA'
s = "{0}\n{1}\n{2}\n{3}\n".format(self.app_id, timestamp, nonce_str, package)
pay_sign = self.sign(s)
obj = dict(app_id=self.app_id,
timestamp=timestamp,
nonce_str=nonce_str,
package=package,
sign_type=sign_type,
pay_sign=pay_sign)
return obj
验签及支付demo(这里以Flask为例)
WeiXinPayV3/web.py
import json
from flask import Flask, request, jsonify, abort
from api import WxPayV3
app = Flask(__name__)
# 支付结果通知
@app.route('/notify', methods=['POST'])
def notify():
nonce = request.headers.get('Wechatpay-Nonce')
signature = request.headers.get('Wechatpay-Signature')
serial = request.headers.get('Wechatpay-Serial')
timestamp = request.headers.get('Wechatpay-Timestamp')
body = request.get_data(as_text=True)
pay_v3 = WxPayV3()
cer = pay_v3.get_certificate_by_serial_no(serial)
# 支付结果通知签名验证
ret = pay_v3.resp_sign(timestamp=timestamp,
nonce=nonce,
body=body,
cer=cer,
signature=signature)
if ret is not None: # 返回非空则为验签失败
return abort(400)
req_id = request.json.get('id')
create_time = request.json.get('create_time')
resource_type = request.json.get('resource_type')
event_type = request.json.get('event_type')
summary = request.json.get('summary')
resource = request.json.get('resource')
# 解密数据
plaintext = pay_v3.decrypt_aes_gcm(nonce=resource['nonce'],
ciphertext=resource['ciphertext'],
associated_data=resource['associated_data'])
transaction_data = json.loads(plaintext)
transaction_id = transaction_data['transaction_id']
out_trade_no = transaction_data['out_trade_no']
trade_type = transaction_data['trade_type']
trade_state = transaction_data['trade_state']
total = transaction_data['amount']['total']
if event_type == 'TRANSACTION.SUCCESS' and trade_state == "SUCCESS": # 支付成功
pass
elif event_type == 'REFUND.SUCCESS' and trade_state == "SUCCESS": # 退款成功
pass
return jsonify(code="SUCCESS", message="成功")
# Native支付
@app.route('/pay/native', methods=['GET'])
def pay_native():
pay_v3 = WxPayV3()
out_trade_no = pay_v3.gen_out_trade_no(trade_type="native")
code_url = pay_v3.native_pay(total=1, out_trade_no=out_trade_no)
return jsonify(code_url=code_url)
# 小程序/公众号支付
@app.route('/pay/jsapi', methods=['GET'])
def pay_jsapi():
pay_v3 = WxPayV3()
out_trade_no = pay_v3.gen_out_trade_no(trade_type="jsapi")
prepay_id = pay_v3.jsapi_pay(total=1,
out_trade_no=out_trade_no,
openid='oE6E')
obj = pay_v3.jsapi_sign(prepay_id=prepay_id)
return jsonify(obj=obj)
if __name__ == '__main__':
app.run(port=9012)
提醒
1.收到微信支付异步通知需要两个条件:通知地址为HTTPS、在商户中心设置APIv3密钥。
2.对于微信支付平台的应答,需要使用平台证书来进行验签;但平台证书只能通过 获取平台证书接口下载,所以当第一次去获取证书时将ignore_resp_sign设置为True以忽略应答验签。
3.小程序/公众号支付的签名目前并未进行验证。
4.本示例中所有证书(无论是客户端证书还是微信平台证书)均不需要通过命令行OpenSSL进行转换。
赞
python开发者可以使用已经封装好的“微信支付 V3 API Python SDK”
https://github.com/minibear2021/wechatpayv3
请问 这是付款码支付的吗
企业付款到银行卡,获取RSA加密公钥报错
https://fraud.mch.weixin.qq.com/risk/getpublickey
http.client.RemoteDisconnected: Remote end closed connection without response
远程端关闭连接,无响应?
已经开通企业付款到银行卡,IP已设置!为什么会这样
我这里调用显示这样的信息 不能成功拉起支付啊! 能帮我解决一下嘛 麻烦你了