This commit is contained in:
Daniel Qian
2015-01-21 19:02:47 +08:00
parent 4accbe6ec2
commit 7184711301
8 changed files with 54 additions and 44 deletions

View File

@@ -6,7 +6,7 @@ package me.chanjar.weixin.common.util;
* 微信服务器在五秒内收不到响应会断掉连接并且重新发起请求总共重试三次 * 微信服务器在五秒内收不到响应会断掉连接并且重新发起请求总共重试三次
* </pre> * </pre>
*/ */
public interface WxMsgIdDuplicateChecker { public interface WxMessageDuplicateChecker {
/** /**
* 检查消息ID是否重复 * 检查消息ID是否重复

View File

@@ -10,7 +10,7 @@ import java.util.concurrent.atomic.AtomicBoolean;
* 将每个消息id保存在内存里每隔5秒清理已经过期的消息id每个消息id的过期时间是15秒 * 将每个消息id保存在内存里每隔5秒清理已经过期的消息id每个消息id的过期时间是15秒
* </pre> * </pre>
*/ */
public class WxMsgIdMemoryDuplicateChecker implements WxMsgIdDuplicateChecker { public class WxMessageInMemoryDuplicateChecker implements WxMessageDuplicateChecker {
/** /**
* 一个消息ID在内存的过期时间15秒 * 一个消息ID在内存的过期时间15秒
@@ -39,7 +39,7 @@ public class WxMsgIdMemoryDuplicateChecker implements WxMsgIdDuplicateChecker {
* 每隔多少周期检查消息ID是否过期5秒 * 每隔多少周期检查消息ID是否过期5秒
* </pre> * </pre>
*/ */
public WxMsgIdMemoryDuplicateChecker() { public WxMessageInMemoryDuplicateChecker() {
this.timeToLive = 15 * 1000l; this.timeToLive = 15 * 1000l;
this.clearPeriod = 5 * 1000l; this.clearPeriod = 5 * 1000l;
} }
@@ -49,7 +49,7 @@ public class WxMsgIdMemoryDuplicateChecker implements WxMsgIdDuplicateChecker {
* @param timeToLive 一个消息ID在内存的过期时间毫秒 * @param timeToLive 一个消息ID在内存的过期时间毫秒
* @param clearPeriod 每隔多少周期检查消息ID是否过期毫秒 * @param clearPeriod 每隔多少周期检查消息ID是否过期毫秒
*/ */
public WxMsgIdMemoryDuplicateChecker(Long timeToLive, Long clearPeriod) { public WxMessageInMemoryDuplicateChecker(Long timeToLive, Long clearPeriod) {
this.timeToLive = timeToLive; this.timeToLive = timeToLive;
this.clearPeriod = clearPeriod; this.clearPeriod = clearPeriod;
} }

View File

@@ -4,11 +4,11 @@ import org.testng.Assert;
import org.testng.annotations.Test; import org.testng.annotations.Test;
@Test @Test
public class WxMsgIdMemoryDuplicateCheckerTest { public class WxMessageInMemoryDuplicateCheckerTest {
public void test() throws InterruptedException { public void test() throws InterruptedException {
Long[] msgIds = new Long[] { 1l, 2l, 3l, 4l, 5l, 6l, 7l, 8l }; Long[] msgIds = new Long[] { 1l, 2l, 3l, 4l, 5l, 6l, 7l, 8l };
WxMsgIdMemoryDuplicateChecker checker = new WxMsgIdMemoryDuplicateChecker(2000l, 1000l); WxMessageInMemoryDuplicateChecker checker = new WxMessageInMemoryDuplicateChecker(2000l, 1000l);
// 第一次检查 // 第一次检查
for (Long msgId : msgIds) { for (Long msgId : msgIds) {

View File

@@ -7,7 +7,7 @@
<class name="me.chanjar.weixin.common.bean.WxErrorTest"/> <class name="me.chanjar.weixin.common.bean.WxErrorTest"/>
<class name="me.chanjar.weixin.common.bean.WxMenuTest"/> <class name="me.chanjar.weixin.common.bean.WxMenuTest"/>
<class name="me.chanjar.weixin.common.util.crypto.WxCryptUtilTest"/> <class name="me.chanjar.weixin.common.util.crypto.WxCryptUtilTest"/>
<class name="me.chanjar.weixin.common.util.WxMsgIdMemoryDuplicateCheckerTest"/> <class name="me.chanjar.weixin.common.util.WxMessageInMemoryDuplicateCheckerTest"/>
</classes> </classes>
</test> </test>
</suite> </suite>

View File

@@ -37,11 +37,11 @@ public class WxCpInMemoryConfigStorage implements WxCpConfigStorage {
this.expiresTime = 0; this.expiresTime = 0;
} }
public void updateAccessToken(WxAccessToken accessToken) { public synchronized void updateAccessToken(WxAccessToken accessToken) {
updateAccessToken(accessToken.getAccessToken(), accessToken.getExpiresIn()); updateAccessToken(accessToken.getAccessToken(), accessToken.getExpiresIn());
} }
public void updateAccessToken(String accessToken, int expiresInSeconds) { public synchronized void updateAccessToken(String accessToken, int expiresInSeconds) {
this.accessToken = accessToken; this.accessToken = accessToken;
this.expiresTime = System.currentTimeMillis() + (expiresInSeconds - 200) * 1000l; this.expiresTime = System.currentTimeMillis() + (expiresInSeconds - 200) * 1000l;
} }

View File

@@ -1,7 +1,7 @@
package me.chanjar.weixin.cp.api; package me.chanjar.weixin.cp.api;
import me.chanjar.weixin.common.util.WxMsgIdDuplicateChecker; import me.chanjar.weixin.common.util.WxMessageDuplicateChecker;
import me.chanjar.weixin.common.util.WxMsgIdMemoryDuplicateChecker; import me.chanjar.weixin.common.util.WxMessageInMemoryDuplicateChecker;
import me.chanjar.weixin.cp.bean.WxCpXmlMessage; import me.chanjar.weixin.cp.bean.WxCpXmlMessage;
import me.chanjar.weixin.cp.bean.WxCpXmlOutMessage; import me.chanjar.weixin.cp.bean.WxCpXmlOutMessage;
@@ -43,7 +43,7 @@ import java.util.regex.Pattern;
*/ */
public class WxCpMessageRouter { public class WxCpMessageRouter {
private static final int DEFAULT_THREAD_POOL_SIZE = 20; private static final int DEFAULT_THREAD_POOL_SIZE = 100;
private final List<Rule> rules = new ArrayList<Rule>(); private final List<Rule> rules = new ArrayList<Rule>();
@@ -51,18 +51,12 @@ public class WxCpMessageRouter {
private ExecutorService executorService; private ExecutorService executorService;
private WxMsgIdDuplicateChecker wxMsgIdDuplicateChecker; private WxMessageDuplicateChecker wxMessageDuplicateChecker;
public WxCpMessageRouter(WxCpService wxCpService) { public WxCpMessageRouter(WxCpService wxCpService) {
this.wxCpService = wxCpService; this.wxCpService = wxCpService;
this.executorService = Executors.newFixedThreadPool(DEFAULT_THREAD_POOL_SIZE); this.executorService = Executors.newFixedThreadPool(DEFAULT_THREAD_POOL_SIZE);
this.wxMsgIdDuplicateChecker = new WxMsgIdMemoryDuplicateChecker(); this.wxMessageDuplicateChecker = new WxMessageInMemoryDuplicateChecker();
}
public WxCpMessageRouter(WxCpService wxMpService, int threadPoolSize) {
this.wxCpService = wxMpService;
this.executorService = Executors.newFixedThreadPool(threadPoolSize);
this.wxMsgIdDuplicateChecker = new WxMsgIdMemoryDuplicateChecker();
} }
/** /**
@@ -75,10 +69,10 @@ public class WxCpMessageRouter {
/** /**
* 设置自定义的WxMsgIdDuplicateChecker * 设置自定义的WxMsgIdDuplicateChecker
* @param wxMsgIdDuplicateChecker * @param wxMessageDuplicateChecker
*/ */
public void setWxMsgIdDuplicateChecker(WxMsgIdDuplicateChecker wxMsgIdDuplicateChecker) { public void setWxMessageDuplicateChecker(WxMessageDuplicateChecker wxMessageDuplicateChecker) {
this.wxMsgIdDuplicateChecker = wxMsgIdDuplicateChecker; this.wxMessageDuplicateChecker = wxMessageDuplicateChecker;
} }
/** /**
@@ -94,7 +88,7 @@ public class WxCpMessageRouter {
* @param wxMessage * @param wxMessage
*/ */
public WxCpXmlOutMessage route(final WxCpXmlMessage wxMessage) { public WxCpXmlOutMessage route(final WxCpXmlMessage wxMessage) {
if (wxMsgIdDuplicateChecker.isDuplicate(wxMessage.getMsgId())) { if (wxMessageDuplicateChecker.isDuplicate(wxMessage.getMsgId())) {
// 如果是重复消息,那么就不做处理 // 如果是重复消息,那么就不做处理
return null; return null;
} }

View File

@@ -152,7 +152,6 @@ public class WxMpInMemoryConfigStorage implements WxMpConfigStorage {
this.http_proxy_password = http_proxy_password; this.http_proxy_password = http_proxy_password;
} }
@Override @Override
public String toString() { public String toString() {
return "WxMpInMemoryConfigStorage{" + return "WxMpInMemoryConfigStorage{" +
@@ -166,6 +165,8 @@ public class WxMpInMemoryConfigStorage implements WxMpConfigStorage {
", http_proxy_port=" + http_proxy_port + ", http_proxy_port=" + http_proxy_port +
", http_proxy_username='" + http_proxy_username + '\'' + ", http_proxy_username='" + http_proxy_username + '\'' +
", http_proxy_password='" + http_proxy_password + '\'' + ", http_proxy_password='" + http_proxy_password + '\'' +
", jsapiTicket='" + jsapiTicket + '\'' +
", jsapiTicketExpiresTime='" + jsapiTicketExpiresTime + '\'' +
'}'; '}';
} }

View File

@@ -4,14 +4,13 @@ import me.chanjar.weixin.common.session.InternalSession;
import me.chanjar.weixin.common.session.SessionManagerImpl; import me.chanjar.weixin.common.session.SessionManagerImpl;
import me.chanjar.weixin.common.session.WxSession; import me.chanjar.weixin.common.session.WxSession;
import me.chanjar.weixin.common.session.WxSessionManager; import me.chanjar.weixin.common.session.WxSessionManager;
import me.chanjar.weixin.common.util.WxMsgIdDuplicateChecker; import me.chanjar.weixin.common.util.WxMessageDuplicateChecker;
import me.chanjar.weixin.common.util.WxMsgIdMemoryDuplicateChecker; import me.chanjar.weixin.common.util.WxMessageInMemoryDuplicateChecker;
import me.chanjar.weixin.mp.bean.WxMpXmlMessage; import me.chanjar.weixin.mp.bean.WxMpXmlMessage;
import me.chanjar.weixin.mp.bean.WxMpXmlOutMessage; import me.chanjar.weixin.mp.bean.WxMpXmlOutMessage;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import javax.swing.text.StyledEditorKit;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.HashMap; import java.util.HashMap;
import java.util.List; import java.util.List;
@@ -52,7 +51,7 @@ public class WxMpMessageRouter {
protected final Logger log = LoggerFactory.getLogger(WxMpMessageRouter.class); protected final Logger log = LoggerFactory.getLogger(WxMpMessageRouter.class);
private static final int DEFAULT_THREAD_POOL_SIZE = 20; private static final int DEFAULT_THREAD_POOL_SIZE = 100;
private final List<Rule> rules = new ArrayList<Rule>(); private final List<Rule> rules = new ArrayList<Rule>();
@@ -60,24 +59,22 @@ public class WxMpMessageRouter {
private ExecutorService executorService; private ExecutorService executorService;
private WxMsgIdDuplicateChecker wxMsgIdDuplicateChecker; private WxMessageDuplicateChecker wxMessageDuplicateChecker;
protected WxSessionManager sessionManager = new SessionManagerImpl(); private WxSessionManager sessionManager;
public WxMpMessageRouter(WxMpService wxMpService) { public WxMpMessageRouter(WxMpService wxMpService) {
this.wxMpService = wxMpService; this.wxMpService = wxMpService;
this.executorService = Executors.newFixedThreadPool(DEFAULT_THREAD_POOL_SIZE); this.executorService = Executors.newFixedThreadPool(DEFAULT_THREAD_POOL_SIZE);
this.wxMsgIdDuplicateChecker = new WxMsgIdMemoryDuplicateChecker(); this.wxMessageDuplicateChecker = new WxMessageInMemoryDuplicateChecker();
} this.sessionManager = new SessionManagerImpl();
public WxMpMessageRouter(WxMpService wxMpService, int threadPoolSize) {
this.wxMpService = wxMpService;
this.executorService = Executors.newFixedThreadPool(threadPoolSize);
this.wxMsgIdDuplicateChecker = new WxMsgIdMemoryDuplicateChecker();
} }
/** /**
* 设置自定义的ExecutorService * <pre>
* 设置自定义的 {@link ExecutorService}
* 如果不调用该方法,默认使用 Executors.newFixedThreadPool(100)
* </pre>
* @param executorService * @param executorService
*/ */
public void setExecutorService(ExecutorService executorService) { public void setExecutorService(ExecutorService executorService) {
@@ -85,11 +82,25 @@ public class WxMpMessageRouter {
} }
/** /**
* 设置自定义的WxMsgIdDuplicateChecker * <pre>
* @param wxMsgIdDuplicateChecker * 设置自定义的 {@link me.chanjar.weixin.common.util.WxMessageDuplicateChecker}
* 如果不调用该方法,默认使用 {@link me.chanjar.weixin.common.util.WxMessageInMemoryDuplicateChecker}
* </pre>
* @param wxMessageDuplicateChecker
*/ */
public void setWxMsgIdDuplicateChecker(WxMsgIdDuplicateChecker wxMsgIdDuplicateChecker) { public void setWxMessageDuplicateChecker(WxMessageDuplicateChecker wxMessageDuplicateChecker) {
this.wxMsgIdDuplicateChecker = wxMsgIdDuplicateChecker; this.wxMessageDuplicateChecker = wxMessageDuplicateChecker;
}
/**
* <pre>
* 设置自定义的{@link me.chanjar.weixin.common.session.WxSessionManager}
* 如果不调用该方法,默认使用 {@linke SessionManagerImpl}
* </pre>
* @param sessionManager
*/
public void setSessionManager(WxSessionManager sessionManager) {
this.sessionManager = sessionManager;
} }
/** /**
@@ -105,7 +116,7 @@ public class WxMpMessageRouter {
* @param wxMessage * @param wxMessage
*/ */
public WxMpXmlOutMessage route(final WxMpXmlMessage wxMessage) { public WxMpXmlOutMessage route(final WxMpXmlMessage wxMessage) {
if (wxMsgIdDuplicateChecker.isDuplicate(wxMessage.getMsgId())) { if (wxMessageDuplicateChecker.isDuplicate(wxMessage.getMsgId())) {
// 如果是重复消息,那么就不做处理 // 如果是重复消息,那么就不做处理
return null; return null;
} }
@@ -167,6 +178,10 @@ public class WxMpMessageRouter {
return res; return res;
} }
/**
* 对session的访问结束
* @param wxMessage
*/
protected void sessionEndAccess(WxMpXmlMessage wxMessage) { protected void sessionEndAccess(WxMpXmlMessage wxMessage) {
WxSession session = sessionManager.getSession(wxMessage.getFromUserName(), false); WxSession session = sessionManager.getSession(wxMessage.getFromUserName(), false);
if (session != null) { if (session != null) {