From 185df6e16d1eabf982a9b9eeab42b1ff2c2ab541 Mon Sep 17 00:00:00 2001 From: heiheihei <36981492+biubiubiu3971@users.noreply.github.com> Date: Sun, 12 Mar 2023 20:55:47 +0800 Subject: [PATCH] =?UTF-8?q?:art:=20=E6=B7=BB=E5=8A=A0redis=E5=BE=AE?= =?UTF-8?q?=E4=BF=A1=E6=B6=88=E6=81=AF=E9=87=8D=E5=A4=8D=E6=A3=80=E6=9F=A5?= =?UTF-8?q?=E7=9B=B8=E5=85=B3=E7=B1=BB?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../api/WxMessageInRedisDuplicateChecker.java | 47 +++++++++++++++ .../WxMessageInRedisDuplicateCheckerTest.java | 57 +++++++++++++++++++ .../weixin/mp/api/WxMpMessageRouter.java | 5 ++ 3 files changed, 109 insertions(+) create mode 100644 weixin-java-common/src/main/java/me/chanjar/weixin/common/api/WxMessageInRedisDuplicateChecker.java create mode 100644 weixin-java-common/src/test/java/me/chanjar/weixin/common/api/WxMessageInRedisDuplicateCheckerTest.java diff --git a/weixin-java-common/src/main/java/me/chanjar/weixin/common/api/WxMessageInRedisDuplicateChecker.java b/weixin-java-common/src/main/java/me/chanjar/weixin/common/api/WxMessageInRedisDuplicateChecker.java new file mode 100644 index 000000000..88c5e9a4e --- /dev/null +++ b/weixin-java-common/src/main/java/me/chanjar/weixin/common/api/WxMessageInRedisDuplicateChecker.java @@ -0,0 +1,47 @@ +package me.chanjar.weixin.common.api; + +import lombok.RequiredArgsConstructor; +import org.redisson.api.RBucket; +import org.redisson.api.RedissonClient; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.concurrent.TimeUnit; + +/** + * 利用redis检查消息是否重复 + * + */ +@RequiredArgsConstructor +public class WxMessageInRedisDuplicateChecker implements WxMessageDuplicateChecker { + + /** + * 过期时间 + */ + private int expire = 10; + + private final Logger log = LoggerFactory.getLogger(getClass()); + + private final RedissonClient redissonClient; + + /** + * messageId是否重复 + * + * @param messageId messageId + * @return 是否 + */ + @Override + public boolean isDuplicate(String messageId) { + RBucket r = redissonClient.getBucket("wx:message:duplicate:check:" + messageId); + boolean setSuccess = r.trySet("1", expire, TimeUnit.SECONDS); + return !setSuccess; + } + + public int getExpire() { + return expire; + } + + public void setExpire(int expire) { + this.expire = expire; + } +} diff --git a/weixin-java-common/src/test/java/me/chanjar/weixin/common/api/WxMessageInRedisDuplicateCheckerTest.java b/weixin-java-common/src/test/java/me/chanjar/weixin/common/api/WxMessageInRedisDuplicateCheckerTest.java new file mode 100644 index 000000000..382618862 --- /dev/null +++ b/weixin-java-common/src/test/java/me/chanjar/weixin/common/api/WxMessageInRedisDuplicateCheckerTest.java @@ -0,0 +1,57 @@ +package me.chanjar.weixin.common.api; + +import org.redisson.Redisson; +import org.redisson.api.RedissonClient; +import org.redisson.config.Config; +import org.redisson.config.TransportMode; +import org.testng.annotations.BeforeTest; +import org.testng.annotations.Test; + +import java.util.concurrent.TimeUnit; + +import static org.testng.Assert.assertFalse; +import static org.testng.Assert.assertTrue; + +@Test +public class WxMessageInRedisDuplicateCheckerTest { + + private RedissonClient redissonClient; + + @BeforeTest + public void init() { + Config config = new Config(); + config.useSingleServer().setAddress("redis://127.0.0.1:6379"); + config.setTransportMode(TransportMode.NIO); + this.redissonClient = Redisson.create(config); + checker = new WxMessageInRedisDuplicateChecker(redissonClient); + checker.setExpire(2); + } + + private WxMessageInRedisDuplicateChecker checker; + + public void test() throws InterruptedException { + Long[] msgIds = new Long[]{1L, 2L, 3L, 4L, 5L, 6L, 7L, 8L}; + + // 第一次检查 + for (Long msgId : msgIds) { + boolean result = checker.isDuplicate(String.valueOf(msgId)); + assertFalse(result); + } + + // 过1秒再检查 + TimeUnit.SECONDS.sleep(1); + for (Long msgId : msgIds) { + boolean result = checker.isDuplicate(String.valueOf(msgId)); + assertTrue(result); + } + + // 过1.5秒再检查 + TimeUnit.MILLISECONDS.sleep(1500L); + for (Long msgId : msgIds) { + boolean result = checker.isDuplicate(String.valueOf(msgId)); + assertFalse(result); + } + + } + +} diff --git a/weixin-java-mp/src/main/java/me/chanjar/weixin/mp/api/WxMpMessageRouter.java b/weixin-java-mp/src/main/java/me/chanjar/weixin/mp/api/WxMpMessageRouter.java index e55e49909..6d8ab429f 100644 --- a/weixin-java-mp/src/main/java/me/chanjar/weixin/mp/api/WxMpMessageRouter.java +++ b/weixin-java-mp/src/main/java/me/chanjar/weixin/mp/api/WxMpMessageRouter.java @@ -14,10 +14,12 @@ import me.chanjar.weixin.common.session.WxSessionManager; import me.chanjar.weixin.common.util.LogExceptionHandler; import me.chanjar.weixin.mp.bean.message.WxMpXmlMessage; import me.chanjar.weixin.mp.bean.message.WxMpXmlOutMessage; +import me.chanjar.weixin.mp.util.WxMpConfigStorageHolder; import org.apache.commons.lang3.StringUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import javax.xml.ws.Holder; import java.util.ArrayList; import java.util.HashMap; import java.util.List; @@ -208,11 +210,14 @@ public class WxMpMessageRouter { WxMpXmlOutMessage res = null; final List> futures = new ArrayList<>(); + String appId = WxMpConfigStorageHolder.get(); for (final WxMpMessageRouterRule rule : matchRules) { // 返回最后一个非异步的rule的执行结果 if (rule.isAsync()) { futures.add( this.executorService.submit(() -> { + //传入父线程的appId + this.wxMpService.switchoverTo(appId); rule.service(wxMessage, context, mpService, WxMpMessageRouter.this.sessionManager, WxMpMessageRouter.this.exceptionHandler); }) );