- weixin-java-mp组件简介
- weixin-java-mp组件如何管理公众号账户?
- 基于JVM内存管理公众号账户会有什么问题?
- 什么是Redis的Pub/Sub模式?
- 基于Redis Pub/Sub模式解决数据不一致
- 小结
在当前的互联网环境中,微信公众号已经成为了企业与用户进行互动的重要渠道。然而,随着业务线拓展带来的公众号数据增加和粉丝数量的增长,如何有效地管理多个微信公众号账户数据成为了一个挑战。
本文将介绍一种基于weixin-java-mp组件和Redis Pub/Sub模式的高可用架构下的微信公众号账户动态管理方案。
weixin-java-mp组件简介
weixin-java-mp组件是一个基于Java语言开发的微信公众号开发工具包,是WxJava SDK在微信公众号场景的一个实现。
weixin-java-mp组件它提供了一系列的功能和方法,方便开发者快速集成和使用微信公众号的相关功能。
公众号:的数字化之路WxJava | weixin-java-mp组件核心源码剖析+access_token管理的最佳实践
微信已经提供了功能丰富的公众号管理后台,为什么还要接收用户消息及开发者事件推送呢?
沉淀公众号上的粉丝数据。
有了这些数据再可以结合具体的业务,就可以“长”出有想象力的能力,从而实现更高效、更精细化的运营。
可以实现的能力包含但不限于:
1、数据分析
2、用户画像
3、个性化推送【千人千面】
4、其他定制化功能。譬如在线客服、预约系统等
公众号:的数字化之路微信生态圈 | 想让微信公众号做到“千人千面”?试试接入第三方服务!
weixin-java-mp组件是如何管理公众号账户的?
基于JVM内存管理。
https://www.processon.com/view/65473d0a2a499a6f61d231a9
weixin-java-mp组件使用java Map结构存放公众号账户信息,使用如下所示的单体架构是可以的:
这样搭建的项目只是一个玩具项目,在实际的生产环境,还要考虑更多的东西,譬如稳定性。
基于JVM内存管理公众号账户会有什么问题?
不支持后端服务的水平扩展。一旦流量加,系统的稳定性就会降低,不可用的风险就会增加。
那么,为什么要考虑后端服务的水平扩展呢?
为了提升系统的稳定性!因为来自互联网上的流量总是无法预知的,一旦超出处理能力,系统就崩了。
对一个生产环境的企业应用而言,稳定性是衡量服务质量的一个重要指标。系统都挂了,这个IT系统就不能提供什么价值了。
如果后端服务只有一个,当出现主机宕机或服务夯死的情况,整个系统就无法正常提供服务了。
对产研团队来说,就是出现生产事故了。不管是对公司业务负责,还是作为一个技术人,这种情况怎么可能容忍!
服务的高可用程度必须提升。
什么是高可用,又如何提升系统的高可用呢?
高可用(High Availability,HA)是指在系统发生故障或异常情况时,仍能够保持服务的稳定性和可用性,确保业务系统的持续运行。
什么是高可用五个9?系统的可用时间不能少于99.999%,即在一年中最多只有53分钟的故障时间。这是软件质量中用来衡量可用性的高标准。而相对的,"四个9"(99.99%)则被视为行业平均水平的可用性。
公众号:的数字化之路WxJava | weixin-java-mp组件核心源码剖析+access_token管理的最佳实践
服务的水平扩展先来一波。
将服务水平扩展多个节点,可以有效提升系统的高可用水平。
升级后系统架构如下所示:
https://www.processon.com/view/654b8bc9833c705155bcdcb5
服务水平扩展带来的问题:数据不一致。
如果一个服务有多个实例,如果公众号账户信息的变更没有同步到其它所有的后端服务,这就出现了数据不一致。
如果一位新公众号的回调流量打到没有新数据的后端服务节点上,这个请求是无法正常处理的。如下图所求,管理员新增一个公众号账户,请求落到服务1上,服务1中有了新增的公众号账户。但服务n中并没有。
紧接着去配置新添加公众号的菜单。
如果此请求被域名解析服务调度到服务n,服务n是无法处理的,因为没有相关的配置数据。
那么,这次请求服务端就处理失败了。
要解决这种数据不一致的问题,需要一个进程间通信的能力。
也就是服务1中更新了公众号账户数据,服务2、服务3、服务n会收到通知,然后各自更新内存中的公众号账户的数据。
这是一个典型的Pub/Sub的场景。
那么,使用哪个中间件来发布账户信息变更的事件通知呢?
市面上常用的具备进程间的Pub/Sub机制的中间间有:RocketMQ、RabbitMQ、Redis。
如果系统中已经引入了重量组的消息中心RocketMQ或RabbitMQ,直接使用这些中间件就可以了。
如果业务能接受延迟,使用一个定时任务轮询数据库也可以。
当然,把公众号账户数据直接保存到Redis缓存中也能解决问题,只是weixin-mp-java组件需要改动的地方有点多,工作量大且耗时较长。时间上来不及
本次,就采用Redis的Pub/Sub机制,通过公众号账户变更的事件消息来通知水平扩展的服务刷新JVM内存中的公众号账户数据。
因为项目目前只依赖Redis、数据库,并且公众号账户变更也很低频且数据量不大,不会出现阻塞的问题,使用这个轻量级的进程间通信方案已经满足需求了。
什么是Redis的Pub/Sub机制
Pub/Sub(发布/订阅)是一种消息传递模式,它允许一个或多个订阅者监听一个特定的主题(频道),当有新的消息发布到该主题时,所有订阅者都会收到通知。
这种模式在分布式系统中非常常见,因为它可以解耦生产者和消费者之间的关系,使得系统更加灵活和可扩展。
Redis提供了一组命令可以让开发者实现“发布/订阅”(publish/subscribe)模式,
包括以下几个指令:
PUBLISH、SUBSCRIBE、UNSUBSCRIBE、PSUBSCRIBE、PUNSUBSCRIBE
Redis的Pub/Sub模式可以解决以下问题:
实时消息推送:如新闻更新、股票价格变动等。
事件驱动系统:如用户注册等事件的通知。
公众号:的数字化之路深入理解Redis的Pub/Sub模式
下面来基于Redis的Pub/Sub机制如何来解决高可用环境下不同服务之间因数据变更引发的数据不一致问题。
基于Redis pub/sub模式解决数据不一致
升级后的架构图:
https://www.processon.com/view/654b8bc9833c705155bcdcb5
维护公众号账户的页面:
图很清楚,就不做赘述,直接上代码
依赖的组件及版本号:
<dependency>
<groupId>com.github.binarywang</groupId>
<artifactId>weixin-java-mp</artifactId>
<version>4.5.6.B</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-redis</artifactId>
<version>2.7.16</version>
</dependency>
API接口代码使用业界经典的三层架构。
三层架构:
Controller【表示层】
Service【业务逻辑层】
DAO【数据访问层】
公众号:的数字化之路真理大讨论:Service层的接口是不是多此一举?
核心代码如下【完整源码已上传Gitee,地址详见文末】:
Controller【表示层】: 更新公众号账户信息的后端API:
com.github.niefy.modules.wx.manage.WxAccountManageController#save
先保存,再Pub基于Redis Pub/Sub模式的公众号账户变更事件。
【想一想为什么不把Pub变更事件的代码写到Service层?】
import com.github.niefy.common.utils.R;
import com.github.niefy.modules.wx.config.EventMessageListenerContainerConfig;
import com.github.niefy.modules.wx.entity.WxAccount;
import com.github.niefy.modules.wx.service.WxAccountService;
import io.swagger.annotations.Api;
import io.swagger.annotations.ApiOperation;
import org.apache.shiro.authz.annotation.RequiresPermissions;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.web.bind.annotation.*;
import java.util.Arrays;
import java.util.List;
/**
* 公众号账号
*
* @author niefy
* @date 2020-06-17 13:56:51
*/
@RestController
@RequestMapping("/manage/wxAccount")
@Api(tags = {"公众号账号-管理后台"})
public class WxAccountManageController {
@Autowired
private WxAccountService wxAccountService;
@Autowired
private RedisTemplate<String, Object> redisTemplate;
/**
* 保存
*/
@PostMapping("/save")
@RequiresPermissions("wx:wxaccount:save")
@ApiOperation(value = "保存")
public R save(@RequestBody WxAccount wxAccount){
wxAccountService.saveOrUpdateWxAccount(wxAccount);
redisTemplate.convertAndSend(EventMessageListenerContainerConfig.WX_ACCOUNT_UPDATE, wxAccount.getAppid());
return R.ok();
}
}
Service【业务逻辑层】:
com.github.niefy.modules.wx.service.impl.WxAccountServiceImpl#saveOrUpdateWxAccount
更新公众号账户信息。
import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
import com.baomidou.mybatisplus.core.metadata.IPage;
import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
import com.baomidou.mybatisplus.extension.toolkit.SqlHelper;
import com.github.niefy.common.utils.PageUtils;
import com.github.niefy.common.utils.Query;
import com.github.niefy.modules.wx.dao.WxAccountMapper;
import com.github.niefy.modules.wx.entity.WxAccount;
import com.github.niefy.modules.wx.service.WxAccountService;
import lombok.extern.slf4j.Slf4j;
import me.chanjar.weixin.common.redis.WxRedisOps;
import me.chanjar.weixin.mp.api.WxMpService;
import me.chanjar.weixin.mp.config.impl.WxMpDefaultConfigImpl;
import me.chanjar.weixin.mp.config.impl.WxMpRedisConfigImpl;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import org.springframework.util.Assert;
import org.springframework.util.StringUtils;
import java.util.Collection;
import java.util.List;
import java.util.Map;
@Service("wxAccountService")
@Slf4j
public class WxAccountServiceImpl extends ServiceImpl<WxAccountMapper, WxAccount> implements WxAccountService {
@Autowired
private WxMpService wxMpService;
@Autowired
private WxRedisOps wxRedisOps;
@Override
public void loadWxMpConfigStorages(){
log.info("加载公众号配置...");
List<WxAccount> accountList = this.list();
if (accountList == null || accountList.isEmpty()) {
log.info("未读取到公众号配置,请在管理后台添加");
return;
}
log.info("加载到{}条公众号配置",accountList.size());
accountList.forEach(this::addAccountToRuntime);
log.info("公众号配置加载完成");
}
@Override
public boolean saveOrUpdateWxAccount(WxAccount entity) {
Assert.notNull(entity, "WxAccount不得为空");
boolean saveOrUpdate = saveOrUpdate(entity);
log.info(" saveOrUpdate {} appid: {} ", saveOrUpdate, entity.getAppid());
this.addAccountToRuntime(entity);
return saveOrUpdate;
}
/**
* 添加账号到当前程序,如首次添加需初始化configStorageMap
* @param entity
*/
private synchronized void addAccountToRuntime(WxAccount entity) {
String appid = entity.getAppid();
WxMpDefaultConfigImpl config = buildWxMpConfigImpl(entity);
wxMpService.addConfigStorage(appid, config);
}
private WxMpDefaultConfigImpl buildWxMpConfigImpl(WxAccount entity) {
WxMpRedisConfigImpl configStorage = new WxMpRedisConfigImpl(wxRedisOps, "v1:wx:mp");
configStorage.setAppId(entity.getAppid());
configStorage.setSecret(entity.getSecret());
configStorage.setToken(entity.getToken());
configStorage.setAesKey(entity.getAesKey());
return configStorage;
}
}
Sub基于Redis的Pub/Sub的公众号账户变更事件
com.github.niefy.modules.wx.config.EventMessageListenerContainerConfig#redisMessageListenerContainer
import com.github.niefy.modules.wx.listener.WxAccountChangedMessageListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.task.TaskExecutor;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.listener.ChannelTopic;
import org.springframework.data.redis.listener.RedisMessageListenerContainer;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import java.util.concurrent.ThreadPoolExecutor;
/**
* @Auther: cheng.tang
* @Date: 2023/11/11
* @Description: wx-api
*/
@Configuration
public class EventMessageListenerContainerConfig {
public static final String WX_ACCOUNT_UPDATE = "event_wx_accounts_changed";
@Autowired
private WxAccountChangedMessageListener wxAccountChangedMessageListener;
@Bean
public TaskExecutor taskExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(2);
executor.setMaxPoolSize(50);
executor.setQueueCapacity(100);
executor.setKeepAliveSeconds(600);
executor.setThreadNamePrefix("rEvent-");
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
return executor;
}
@Bean
public RedisMessageListenerContainer redisMessageListenerContainer(RedisConnectionFactory redisConnectionFactory,
ThreadPoolTaskExecutor threadPoolTaskExecutor) {
RedisMessageListenerContainer container = new RedisMessageListenerContainer();
container.setConnectionFactory(redisConnectionFactory);
container.setTaskExecutor(threadPoolTaskExecutor);
container.addMessageListener(wxAccountChangedMessageListener, new ChannelTopic(WX_ACCOUNT_UPDATE));
return container;
}
}
对公众号账户变更事件进行响应:
com.github.niefy.modules.wx.listener.WxAccountChangedMessageListener#onMessage
import com.github.niefy.modules.wx.service.WxAccountService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.connection.Message;
import org.springframework.data.redis.connection.MessageListener;
import org.springframework.stereotype.Service;
/**
* @Auther: cheng.tang
* @Date: 2023/11/11
* @Description: wx-api
*/
@Service
@Slf4j
public class WxAccountChangedMessageListener implements MessageListener {
@Autowired
private WxAccountService wxAccountService;
@Override
public void onMessage(Message message, byte[] pattern) {
log.info("receiving channel {} body {} pattern {} ", new String(message.getChannel()), new String(message.getBody()), new String(pattern));
try {
wxAccountService.loadWxMpConfigStorages();
log.info("finish ");
} catch (Exception e) {
log.error("消息处理失败了 {} ", e.getMessage());
}
}
}
运行后的效果:
小结
总的来说,虽然Redis的Pub/Sub模式可能会导致一些额外的操作,但是它提供了一种轻量级的进程间通信方案,可以有效地解决高可用架构下的微信公众号账户数据更新问题。
而且,由于微信公众号账户数据的更新并不频繁,我们可以容忍这种短时间的错误。
因此,基于Redis Pub/Sub模式的高可用架构下的微信公众号账户动态管理方案是一种值得考虑的选择。
Reference
源码:https://gitee.com/baidumap/wx-api
https://www.processon.com/view/654b8bc9833c705155bcdcb5
https://mp.weixin.qq.com/s/Xu_1ZlFkJHjEOcwu1CkUrA