issue #66 对去重逻辑做了一些调整

This commit is contained in:
Daniel Qian
2015-01-22 14:06:15 +08:00
parent 3ff9d260f2
commit 3216cb54dd
5 changed files with 55 additions and 13 deletions

View File

@@ -9,10 +9,19 @@ package me.chanjar.weixin.common.util;
public interface WxMessageDuplicateChecker { public interface WxMessageDuplicateChecker {
/** /**
* 检查消息ID是否重复 * <h2>公众号的排重方式</h2>
* @param wxMsgId *
* <p>普通消息关于重试的消息排重推荐使用msgid排重。<a href="http://mp.weixin.qq.com/wiki/10/79502792eef98d6e0c6e1739da387346.html">文档参考</a>。</p>
* <p>事件消息关于重试的消息排重推荐使用FromUserName + CreateTime 排重。<a href="http://mp.weixin.qq.com/wiki/2/5baf56ce4947d35003b86a9805634b1e.html">文档参考</a></p>
*
* <h2>企业号的排重方式</h2>
*
* 官方文档完全没有写,参照公众号的方式排重。
*
* <p>或者可以采取更简单的方式如果有MsgId就用MsgId排重如果没有就用FromUserName+CreateTime排重</p>
* @param messageId messageId需要根据上面讲的方式构造
* @return 如果是重复消息返回true否则返回false * @return 如果是重复消息返回true否则返回false
*/ */
public boolean isDuplicate(Long wxMsgId); public boolean isDuplicate(String messageId);
} }

View File

@@ -25,7 +25,7 @@ public class WxMessageInMemoryDuplicateChecker implements WxMessageDuplicateChec
/** /**
* 消息id->消息时间戳的map * 消息id->消息时间戳的map
*/ */
private final ConcurrentHashMap<Long, Long> msgId2Timestamp = new ConcurrentHashMap<Long, Long>(); private final ConcurrentHashMap<String, Long> msgId2Timestamp = new ConcurrentHashMap<String, Long>();
/** /**
* 后台清理线程是否已经开启 * 后台清理线程是否已经开启
@@ -65,7 +65,7 @@ public class WxMessageInMemoryDuplicateChecker implements WxMessageDuplicateChec
while (true) { while (true) {
Thread.sleep(clearPeriod); Thread.sleep(clearPeriod);
Long now = System.currentTimeMillis(); Long now = System.currentTimeMillis();
for (Map.Entry<Long, Long> entry : msgId2Timestamp.entrySet()) { for (Map.Entry<String, Long> entry : msgId2Timestamp.entrySet()) {
if (now - entry.getValue() > timeToLive) { if (now - entry.getValue() > timeToLive) {
msgId2Timestamp.entrySet().remove(entry); msgId2Timestamp.entrySet().remove(entry);
} }
@@ -81,12 +81,12 @@ public class WxMessageInMemoryDuplicateChecker implements WxMessageDuplicateChec
} }
@Override @Override
public boolean isDuplicate(Long wxMsgId) { public boolean isDuplicate(String messageId) {
if (wxMsgId == null) { if (messageId == null) {
return false; return false;
} }
checkBackgroundProcessStarted(); checkBackgroundProcessStarted();
Long timestamp = msgId2Timestamp.putIfAbsent(wxMsgId, System.currentTimeMillis()); Long timestamp = msgId2Timestamp.putIfAbsent(messageId, System.currentTimeMillis());
if (timestamp == null) { if (timestamp == null) {
// 第一次接收到这个消息 // 第一次接收到这个消息
return false; return false;

View File

@@ -12,21 +12,21 @@ public class WxMessageInMemoryDuplicateCheckerTest {
// 第一次检查 // 第一次检查
for (Long msgId : msgIds) { for (Long msgId : msgIds) {
boolean result = checker.isDuplicate(msgId); boolean result = checker.isDuplicate(String.valueOf(msgId));
Assert.assertFalse(result); Assert.assertFalse(result);
} }
// 过1秒再检查 // 过1秒再检查
Thread.sleep(1000l); Thread.sleep(1000l);
for (Long msgId : msgIds) { for (Long msgId : msgIds) {
boolean result = checker.isDuplicate(msgId); boolean result = checker.isDuplicate(String.valueOf(msgId));
Assert.assertTrue(result); Assert.assertTrue(result);
} }
// 过1.5秒再检查 // 过1.5秒再检查
Thread.sleep(1500l); Thread.sleep(1500l);
for (Long msgId : msgIds) { for (Long msgId : msgIds) {
boolean result = checker.isDuplicate(msgId); boolean result = checker.isDuplicate(String.valueOf(msgId));
Assert.assertFalse(result); Assert.assertFalse(result);
} }

View File

@@ -115,7 +115,7 @@ public class WxCpMessageRouter {
* @param wxMessage * @param wxMessage
*/ */
public WxCpXmlOutMessage route(final WxCpXmlMessage wxMessage) { public WxCpXmlOutMessage route(final WxCpXmlMessage wxMessage) {
if (messageDuplicateChecker.isDuplicate(wxMessage.getMsgId())) { if (isDuplicateMessage(wxMessage)) {
// 如果是重复消息,那么就不做处理 // 如果是重复消息,那么就不做处理
return null; return null;
} }
@@ -177,6 +177,22 @@ public class WxCpMessageRouter {
return res; return res;
} }
protected boolean isDuplicateMessage(WxCpXmlMessage wxMessage) {
String messageId = "";
if (wxMessage.getMsgId() == null) {
messageId = wxMessage.getFromUserName() + "-" + String.valueOf(wxMessage.getCreateTime());
} else {
messageId = String.valueOf(wxMessage.getMsgId());
}
if (messageDuplicateChecker.isDuplicate(messageId)) {
return true;
}
return false;
}
/** /**
* 对session的访问结束 * 对session的访问结束
* @param wxMessage * @param wxMessage

View File

@@ -1,5 +1,6 @@
package me.chanjar.weixin.mp.api; package me.chanjar.weixin.mp.api;
import me.chanjar.weixin.common.api.WxConsts;
import me.chanjar.weixin.common.session.*; import me.chanjar.weixin.common.session.*;
import me.chanjar.weixin.common.util.WxMessageDuplicateChecker; import me.chanjar.weixin.common.util.WxMessageDuplicateChecker;
import me.chanjar.weixin.common.util.WxMessageInMemoryDuplicateChecker; import me.chanjar.weixin.common.util.WxMessageInMemoryDuplicateChecker;
@@ -113,7 +114,7 @@ public class WxMpMessageRouter {
* @param wxMessage * @param wxMessage
*/ */
public WxMpXmlOutMessage route(final WxMpXmlMessage wxMessage) { public WxMpXmlOutMessage route(final WxMpXmlMessage wxMessage) {
if (messageDuplicateChecker.isDuplicate(wxMessage.getMsgId())) { if (isDuplicateMessage(wxMessage)) {
// 如果是重复消息,那么就不做处理 // 如果是重复消息,那么就不做处理
return null; return null;
} }
@@ -175,6 +176,22 @@ public class WxMpMessageRouter {
return res; return res;
} }
protected boolean isDuplicateMessage(WxMpXmlMessage wxMessage) {
String messageId = "";
if (wxMessage.getMsgId() == null) {
messageId = wxMessage.getFromUserName() + "-" + String.valueOf(wxMessage.getCreateTime());
} else {
messageId = String.valueOf(wxMessage.getMsgId());
}
if (messageDuplicateChecker.isDuplicate(messageId)) {
return true;
}
return false;
}
/** /**
* 对session的访问结束 * 对session的访问结束
* @param wxMessage * @param wxMessage