评论

Python3对接微信支付V3版核心代码分享

声明:此示例仅支持直连商户

所需库

WeiXinPayV3/requirement.txt

Flask==1.0.2
pyOpenSSL==20.0.1
cryptography==3.4.7
requests==2.25.1

核心API对接

WeiXinPayV3/api.py

import datetime
import json
import os
import random
import string
import time
import OpenSSL
import base64
import requests
from urllib.parse import urlparse
from cryptography.hazmat.primitives.ciphers.aead import AESGCM


# V3版支付
class WxPayV3(object):
    basedir = os.path.abspath(os.path.dirname(__file__))  # 获取当前目录

    def __init__(self, ignore_resp_sign=False):
        """
        :param ignore_resp_sign: 是否忽略应答验签(用于第一次缓存证书)
        """
        self.app_id = '{app_id}'
        self.mch_id = '{mch_id}'
        self.api_v3_key = '{api_v3_key}'
        self.serial_no = '{serial_no}'  # 证书序列号(微信支付商户平台获取)
        self.ignore_resp_sign = ignore_resp_sign
        self.certificates_path = self.basedir + '/certificates.json'

    # 支付签名
    @staticmethod
    def sign(s):
        # 这里的key为示例
        api_client_key = ("-----BEGIN PRIVATE KEY-----\n"
                          "MIIEvQIBADANBgkqhkiG9w0BAQEFAASCBKcwggSjAgEAAoIBAQDAPM45XP7PC6qe\n"
                          "MIIEvQIBADANBgkqhkiG9w0BAQEFAASCBKcwggSjAgEAAoIBAQDAPM45XP7PC6qe\n"
                          "Kkh/7UGUD/hxkPwNB2PKAms=\n"
                          "-----END PRIVATE KEY-----")
        pkey = OpenSSL.crypto.load_privatekey(OpenSSL.crypto.FILETYPE_PEM, api_client_key)
        signature = base64.b64encode(OpenSSL.crypto.sign(pkey=pkey, data=s.encode(), digest='SHA256'))
        return signature.decode()

    # 请求签名
    def _auth(self, req):
        data = req.method + '\n'
        parsed = urlparse(req.url)
        data += parsed.path
        if parsed.query:
            data += "?" + parsed.query
        data += '\n'
        timestamp = str(int(time.time()))
        data += timestamp + '\n'
        nonce_str = ''.join(random.sample(string.ascii_letters + string.digits, 32))
        data += nonce_str + '\n'
        if req.data:
            data += req.data
        data += '\n'
        signature = self.sign(data)
        authorization = ('WECHATPAY2-SHA256-RSA2048 '
                         'mchid="{0}",nonce_str="{1}",'
                         'signature="{2}",timestamp="{3}",'
                         'serial_no="{4}"').format(self.mch_id,
                                                   nonce_str,
                                                   signature,
                                                   timestamp,
                                                   self.serial_no)
        req.headers["Authorization"] = authorization
        req.headers['Content-Type'] = 'application/json'
        req.headers['Accept'] = 'application/json'
        req.headers['User-Agent'] = 'requests ' + requests.__version__
        r = req.prepare()
        s = requests.Session()
        resp = s.send(r, timeout=2)
        # 验签
        if not self.ignore_resp_sign:
            nonce = resp.headers.get('Wechatpay-Nonce')
            signature = resp.headers.get('Wechatpay-Signature')
            serial = resp.headers.get('Wechatpay-Serial')
            timestamp = resp.headers.get('Wechatpay-Timestamp')
            body = resp.text
            cer = self.get_certificate_by_serial_no(serial)
            ret = self.resp_sign(timestamp=timestamp,
                                 nonce=nonce,
                                 body=body,
                                 cer=cer,
                                 signature=signature)
            assert ret is None, "resp sign error"
        return resp

    # Native下单
    def native_pay(self, total, out_trade_no, attach=None):
        url = 'https://api.mch.weixin.qq.com/v3/pay/transactions/native'
        notify_url = 'https://pay.weixin.com/notify'
        data = dict(appid=self.app_id,
                    mchid=self.mch_id,
                    description="测试native支付",
                    notify_url=notify_url,
                    out_trade_no=out_trade_no,
                    amount=dict(total=total,
                                currency='CNY'))
        if attach is not None:
            data['attach'] = attach
        req = requests.Request(method="POST", url=url, data=json.dumps(data))
        response = self._auth(req=req)
        result = response.json()
        return result['code_url']

    @staticmethod
    def gen_out_trade_no(trade_type="jsapi"):
        out_trade_no = "wx" + trade_type + datetime.datetime.now().strftime("%Y%m%d%H%M%S")
        out_trade_no += str(random.randint(1000000, 9999999))
        return out_trade_no

    # JSAPI下单
    def jsapi_pay(self, total, out_trade_no, openid, attach=None):
        url = 'https://api.mch.weixin.qq.com/v3/pay/transactions/jsapi'
        notify_url = 'https://pay.weixin.com/notify'
        data = dict(appid=self.app_id,
                    mchid=self.mch_id,
                    description="测试jsapi支付",
                    notify_url=notify_url,
                    out_trade_no=out_trade_no,
                    amount=dict(total=total,
                                currency='CNY'),
                    payer=dict(openid=openid))
        if attach is not None:
            data['attach'] = attach
        req = requests.Request(method="POST", url=url, data=json.dumps(data))
        response = self._auth(req=req)
        result = response.json()
        return result['prepay_id']

    # 查询订单
    def query_order(self, out_trade_no=None, transaction_id=None):
        if out_trade_no is None and transaction_id is None:
            raise ValueError('Param Error')
        if out_trade_no is not None:
            url = 'https://api.mch.weixin.qq.com/v3/pay/transactions/out-trade-no/{0}'.format(out_trade_no)
        else:
            url = 'https://api.mch.weixin.qq.com/v3/pay/transactions/id/{0}'.format(out_trade_no)
        url += '?' + 'mchid=' + self.mch_id
        req = requests.Request(method="GET", url=url)
        response = self._auth(req=req)
        result = response.json()
        return result

    # 获取微信平台证书并缓存
    def get_certificates_from_wx(self):
        # TODO 这里实现非常简陋,鉴于V3版本证书会动态更新建议使用中控服务统一获取和安全存储证书(惰性更新也可)
        url = 'https://api.mch.weixin.qq.com/v3/certificates'
        req = requests.Request(method="GET", url=url)
        response = self._auth(req=req)
        result = response.json()
        with open(self.certificates_path, 'w+') as f:
            f.write(json.dumps(result['data']))
        return result

    def get_certificate_by_serial_no(self, serial_no):
        if not os.path.exists(self.certificates_path):
            raise FileNotFoundError('请先下载证书进行缓存')
        with open(self.certificates_path, 'r') as f:
            content = f.read()
        certificates = json.loads(content)
        nonce, ciphertext, associated_data = None, None, None
        for certificate in certificates:
            if certificate['serial_no'] == serial_no:
                nonce = certificate['encrypt_certificate']['nonce']
                ciphertext = certificate['encrypt_certificate']['ciphertext']
                associated_data = certificate['encrypt_certificate']['associated_data']
                break
        if ciphertext is None:
            raise ValueError('certificate not found')
        cer = self.decrypt_aes_gcm(nonce=nonce,
                                   ciphertext=ciphertext,
                                   associated_data=associated_data)
        return cer

    def decrypt_aes_gcm(self, nonce, ciphertext, associated_data):
        aes_gcm = AESGCM(self.api_v3_key.encode())
        plaintext = aes_gcm.decrypt(nonce=nonce.encode(),
                                    associated_data=associated_data.encode(),
                                    data=base64.b64decode(ciphertext))
        return plaintext

    # 应答验签
    @staticmethod
    def resp_sign(timestamp, nonce, body, cer, signature):
        cert = OpenSSL.crypto.load_certificate(OpenSSL.crypto.FILETYPE_PEM, cer)
        s = '{0}\n{1}\n{2}\n'.format(timestamp, nonce, body)
        signature = base64.b64decode(signature)
        try:
            OpenSSL.crypto.verify(cert=cert, signature=signature, data=s, digest='SHA256')
        except Exception as e:
            return e.__str__()
        return None

    # JsApi签名(未验证)
    def jsapi_sign(self, prepay_id):
        timestamp = int(time.time())
        nonce_str = ''.join(random.sample(string.ascii_letters + string.digits, 32))
        package = "prepay_id=" + prepay_id
        sign_type = 'RSA'
        s = "{0}\n{1}\n{2}\n{3}\n".format(self.app_id, timestamp, nonce_str, package)
        pay_sign = self.sign(s)
        obj = dict(app_id=self.app_id,
                   timestamp=timestamp,
                   nonce_str=nonce_str,
                   package=package,
                   sign_type=sign_type,
                   pay_sign=pay_sign)
        return obj

验签及支付demo(这里以Flask为例)

WeiXinPayV3/web.py

import json

from flask import Flask, request, jsonify, abort

from api import WxPayV3

app = Flask(__name__)


# 支付结果通知
@app.route('/notify', methods=['POST'])
def notify():
    nonce = request.headers.get('Wechatpay-Nonce')
    signature = request.headers.get('Wechatpay-Signature')
    serial = request.headers.get('Wechatpay-Serial')
    timestamp = request.headers.get('Wechatpay-Timestamp')
    body = request.get_data(as_text=True)
    pay_v3 = WxPayV3()
    cer = pay_v3.get_certificate_by_serial_no(serial)
    # 支付结果通知签名验证
    ret = pay_v3.resp_sign(timestamp=timestamp,
                           nonce=nonce,
                           body=body,
                           cer=cer,
                           signature=signature)
    if ret is not None:  # 返回非空则为验签失败
        return abort(400)
    req_id = request.json.get('id')
    create_time = request.json.get('create_time')
    resource_type = request.json.get('resource_type')
    event_type = request.json.get('event_type')
    summary = request.json.get('summary')
    resource = request.json.get('resource')
    # 解密数据
    plaintext = pay_v3.decrypt_aes_gcm(nonce=resource['nonce'],
                                       ciphertext=resource['ciphertext'],
                                       associated_data=resource['associated_data'])
    transaction_data = json.loads(plaintext)
    transaction_id = transaction_data['transaction_id']
    out_trade_no = transaction_data['out_trade_no']
    trade_type = transaction_data['trade_type']
    trade_state = transaction_data['trade_state']
    total = transaction_data['amount']['total']
    if event_type == 'TRANSACTION.SUCCESS' and trade_state == "SUCCESS":  # 支付成功
        pass
    elif event_type == 'REFUND.SUCCESS' and trade_state == "SUCCESS":  # 退款成功
        pass
    return jsonify(code="SUCCESS", message="成功")


# Native支付
@app.route('/pay/native', methods=['GET'])
def pay_native():
    pay_v3 = WxPayV3()
    out_trade_no = pay_v3.gen_out_trade_no(trade_type="native")
    code_url = pay_v3.native_pay(total=1, out_trade_no=out_trade_no)
    return jsonify(code_url=code_url)


# 小程序/公众号支付
@app.route('/pay/jsapi', methods=['GET'])
def pay_jsapi():
    pay_v3 = WxPayV3()
    out_trade_no = pay_v3.gen_out_trade_no(trade_type="jsapi")
    prepay_id = pay_v3.jsapi_pay(total=1,
                                 out_trade_no=out_trade_no,
                                 openid='oE6E')
    obj = pay_v3.jsapi_sign(prepay_id=prepay_id)
    return jsonify(obj=obj)


if __name__ == '__main__':
    app.run(port=9012)

提醒

1.收到微信支付异步通知需要两个条件:通知地址为HTTPS、在商户中心设置APIv3密钥。

2.对于微信支付平台的应答,需要使用平台证书来进行验签;但平台证书只能通过 获取平台证书接口下载,所以当第一次去获取证书时将ignore_resp_sign设置为True以忽略应答验签。

3.小程序/公众号支付的签名目前并未进行验证。

4.本示例中所有证书(无论是客户端证书还是微信平台证书)均不需要通过命令行OpenSSL进行转换。

最后一次编辑于  2021-06-10  
点赞 4
收藏
评论

5 个评论

  • 陈久胜
    陈久胜
    2021-06-01

    2021-06-01
    赞同 1
    回复
  • 陈刚
    陈刚
    2021-09-24

    python开发者可以使用已经封装好的“微信支付 V3 API Python SDK”

    https://github.com/minibear2021/wechatpayv3

    2021-09-24
    赞同 1
    回复
  • 祖国的花朵🌺 จุ๊บ
    祖国的花朵🌺 จุ๊บ
    2021-06-24

    请问 这是付款码支付的吗

    2021-06-24
    赞同 1
    回复
  • A_jree
    A_jree
    2021-12-24

    企业付款到银行卡,获取RSA加密公钥报错

    https://fraud.mch.weixin.qq.com/risk/getpublickey

    http.client.RemoteDisconnected: Remote end closed connection without response

    远程端关闭连接,无响应?

    已经开通企业付款到银行卡,IP已设置!为什么会这样

    2021-12-24
    赞同
    回复
  • 茗
    2021-11-04

    我这里调用显示这样的信息 不能成功拉起支付啊! 能帮我解决一下嘛 麻烦你了

    2021-11-04
    赞同
    回复
登录 后发表内容