issue #69 添加Session的支持

This commit is contained in:
Daniel Qian
2015-01-21 19:58:40 +08:00
parent 7184711301
commit d18b66c38d
19 changed files with 320 additions and 69 deletions

View File

@@ -1,5 +1,6 @@
package me.chanjar.weixin.cp.api;
import me.chanjar.weixin.common.session.WxSessionManager;
import me.chanjar.weixin.cp.bean.WxCpXmlMessage;
import me.chanjar.weixin.cp.bean.WxCpXmlOutMessage;
@@ -7,18 +8,21 @@ import java.util.Map;
/**
* 处理微信推送消息的处理器接口
* @author Daniel Qian
*
* @author Daniel Qian
*/
public interface WxCpMessageHandler {
/**
*
* @param wxMessage
* @param context 上下文如果handler或interceptor之间有信息要传递可以用这个
* @param context 上下文如果handler或interceptor之间有信息要传递可以用这个
* @param wxCpService
* @param sessionManager
* @return xml格式的消息如果在异步规则里处理的话可以返回null
*/
public WxCpXmlOutMessage handle(WxCpXmlMessage wxMessage, Map<String, Object> context, WxCpService wxCpService);
public WxCpXmlOutMessage handle(WxCpXmlMessage wxMessage,
Map<String, Object> context,
WxCpService wxCpService,
WxSessionManager sessionManager);
}

View File

@@ -1,23 +1,29 @@
package me.chanjar.weixin.cp.api;
import me.chanjar.weixin.common.session.WxSessionManager;
import me.chanjar.weixin.cp.bean.WxCpXmlMessage;
import java.util.Map;
/**
* 微信消息拦截器,可以用来做验证
* @author Daniel Qian
*
* @author Daniel Qian
*/
public interface WxCpMessageInterceptor {
/**
* 拦截微信消息
*
* @param wxMessage
* @param context 上下文如果handler或interceptor之间有信息要传递可以用这个
* @param context 上下文如果handler或interceptor之间有信息要传递可以用这个
* @param wxCpService
* @return true代表OKfalse代表不OK
* @param sessionManager
* @return true代表OKfalse代表不OK
*/
public boolean intercept(WxCpXmlMessage wxMessage, Map<String, Object> context, WxCpService wxCpService);
public boolean intercept(WxCpXmlMessage wxMessage,
Map<String, Object> context,
WxCpService wxCpService,
WxSessionManager sessionManager);
}

View File

@@ -1,16 +1,24 @@
package me.chanjar.weixin.cp.api;
import me.chanjar.weixin.common.session.InternalSession;
import me.chanjar.weixin.common.session.WxSession;
import me.chanjar.weixin.common.session.InMemorySessionManager;
import me.chanjar.weixin.common.session.WxSessionManager;
import me.chanjar.weixin.common.util.WxMessageDuplicateChecker;
import me.chanjar.weixin.common.util.WxMessageInMemoryDuplicateChecker;
import me.chanjar.weixin.cp.bean.WxCpXmlMessage;
import me.chanjar.weixin.cp.bean.WxCpXmlOutMessage;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.regex.Pattern;
/**
@@ -43,6 +51,8 @@ import java.util.regex.Pattern;
*/
public class WxCpMessageRouter {
protected final Logger log = LoggerFactory.getLogger(WxCpMessageRouter.class);
private static final int DEFAULT_THREAD_POOL_SIZE = 100;
private final List<Rule> rules = new ArrayList<Rule>();
@@ -51,16 +61,22 @@ public class WxCpMessageRouter {
private ExecutorService executorService;
private WxMessageDuplicateChecker wxMessageDuplicateChecker;
private WxMessageDuplicateChecker messageDuplicateChecker;
private WxSessionManager sessionManager;
public WxCpMessageRouter(WxCpService wxCpService) {
this.wxCpService = wxCpService;
this.executorService = Executors.newFixedThreadPool(DEFAULT_THREAD_POOL_SIZE);
this.wxMessageDuplicateChecker = new WxMessageInMemoryDuplicateChecker();
this.messageDuplicateChecker = new WxMessageInMemoryDuplicateChecker();
this.sessionManager = new InMemorySessionManager();
}
/**
* 设置自定义的ExecutorService
* <pre>
* 设置自定义的 {@link ExecutorService}
* 如果不调用该方法,默认使用 Executors.newFixedThreadPool(100)
* </pre>
* @param executorService
*/
public void setExecutorService(ExecutorService executorService) {
@@ -68,11 +84,25 @@ public class WxCpMessageRouter {
}
/**
* 设置自定义的WxMsgIdDuplicateChecker
* @param wxMessageDuplicateChecker
* <pre>
* 设置自定义的 {@link me.chanjar.weixin.common.util.WxMessageDuplicateChecker}
* 如果不调用该方法,默认使用 {@link me.chanjar.weixin.common.util.WxMessageInMemoryDuplicateChecker}
* </pre>
* @param messageDuplicateChecker
*/
public void setWxMessageDuplicateChecker(WxMessageDuplicateChecker wxMessageDuplicateChecker) {
this.wxMessageDuplicateChecker = wxMessageDuplicateChecker;
public void setMessageDuplicateChecker(WxMessageDuplicateChecker messageDuplicateChecker) {
this.messageDuplicateChecker = messageDuplicateChecker;
}
/**
* <pre>
* 设置自定义的{@link me.chanjar.weixin.common.session.WxSessionManager}
* 如果不调用该方法,默认使用 {@linke SessionManagerImpl}
* </pre>
* @param sessionManager
*/
public void setSessionManager(WxSessionManager sessionManager) {
this.sessionManager = sessionManager;
}
/**
@@ -80,7 +110,7 @@ public class WxCpMessageRouter {
* @return
*/
public Rule rule() {
return new Rule(this, wxCpService);
return new Rule(this, wxCpService, sessionManager);
}
/**
@@ -88,7 +118,7 @@ public class WxCpMessageRouter {
* @param wxMessage
*/
public WxCpXmlOutMessage route(final WxCpXmlMessage wxMessage) {
if (wxMessageDuplicateChecker.isDuplicate(wxMessage.getMsgId())) {
if (messageDuplicateChecker.isDuplicate(wxMessage.getMsgId())) {
// 如果是重复消息,那么就不做处理
return null;
}
@@ -109,27 +139,66 @@ public class WxCpMessageRouter {
}
WxCpXmlOutMessage res = null;
final List<Future> futures = new ArrayList<Future>();
for (final Rule rule : matchRules) {
// 返回最后一个非异步的rule的执行结果
if(rule.async) {
executorService.submit(new Runnable() {
public void run() {
rule.service(wxMessage);
}
});
futures.add(
executorService.submit(new Runnable() {
public void run() {
rule.service(wxMessage);
}
})
);
} else {
res = rule.service(wxMessage);
}
}
// 告诉session它已经用不着了
if (futures.size() > 0) {
executorService.submit(new Runnable() {
@Override
public void run() {
for (Future future : futures) {
try {
future.get();
} catch (InterruptedException e) {
log.error("Error happened when wait task finish", e);
} catch (ExecutionException e) {
log.error("Error happened when wait task finish", e);
}
}
// 在这里session再也不会被使用了
sessionEndAccess(wxMessage);
}
});
} else {
// 在这里session再也不会被使用了
sessionEndAccess(wxMessage);
}
return res;
}
/**
* 对session的访问结束
* @param wxMessage
*/
protected void sessionEndAccess(WxCpXmlMessage wxMessage) {
WxSession session = sessionManager.getSession(wxMessage.getFromUserName(), false);
if (session != null) {
((InternalSession) session).endAccess();
}
}
public static class Rule {
private final WxCpMessageRouter routerBuilder;
private final WxCpService wxCpService;
private final WxSessionManager sessionManager;
private boolean async = true;
private String fromUser;
@@ -152,9 +221,10 @@ public class WxCpMessageRouter {
private List<WxCpMessageInterceptor> interceptors = new ArrayList<WxCpMessageInterceptor>();
protected Rule(WxCpMessageRouter routerBuilder, WxCpService wxCpService) {
protected Rule(WxCpMessageRouter routerBuilder, WxCpService wxCpService, WxSessionManager sessionManager) {
this.routerBuilder = routerBuilder;
this.wxCpService = wxCpService;
this.sessionManager = sessionManager;
}
/**
@@ -332,7 +402,7 @@ public class WxCpMessageRouter {
Map<String, Object> context = new HashMap<String, Object>();
// 如果拦截器不通过
for (WxCpMessageInterceptor interceptor : this.interceptors) {
if (!interceptor.intercept(wxMessage, context, wxCpService)) {
if (!interceptor.intercept(wxMessage, context, wxCpService, sessionManager)) {
return null;
}
}
@@ -341,7 +411,7 @@ public class WxCpMessageRouter {
WxCpXmlOutMessage res = null;
for (WxCpMessageHandler handler : this.handlers) {
// 返回最后handler的结果
res = handler.handle(wxMessage, context, wxCpService);
res = handler.handle(wxMessage, context, wxCpService, sessionManager);
}
return res;
}

View File

@@ -392,7 +392,7 @@ public interface WxCpService {
/**
* <pre>
* 设置WxSessionManager只有当需要使用个性化的WxSessionManager的时候才需要调用此方法
* WxCpService默认使用的是{@link me.chanjar.weixin.common.session.SessionManagerImpl}
* WxCpService默认使用的是{@link me.chanjar.weixin.common.session.InMemorySessionManager}
* </pre>
* @param sessionManager
*/

View File

@@ -12,7 +12,7 @@ import me.chanjar.weixin.common.bean.WxMenu;
import me.chanjar.weixin.common.bean.result.WxError;
import me.chanjar.weixin.common.bean.result.WxMediaUploadResult;
import me.chanjar.weixin.common.exception.WxErrorException;
import me.chanjar.weixin.common.session.SessionManagerImpl;
import me.chanjar.weixin.common.session.InMemorySessionManager;
import me.chanjar.weixin.common.session.WxSession;
import me.chanjar.weixin.common.session.WxSessionManager;
import me.chanjar.weixin.common.util.StringUtils;
@@ -46,7 +46,6 @@ import java.io.InputStream;
import java.io.StringReader;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicBoolean;
public class WxCpServiceImpl implements WxCpService {
@@ -67,7 +66,7 @@ public class WxCpServiceImpl implements WxCpService {
private int maxRetryTimes = 5;
protected WxSessionManager sessionManager = new SessionManagerImpl();
protected WxSessionManager sessionManager = new InMemorySessionManager();
public boolean checkSignature(String msgSignature, String timestamp, String nonce, String data) {
try {

View File

@@ -1,6 +1,7 @@
package me.chanjar.weixin.cp.api;
import me.chanjar.weixin.common.api.WxConsts;
import me.chanjar.weixin.common.session.WxSessionManager;
import me.chanjar.weixin.cp.bean.WxCpXmlMessage;
import me.chanjar.weixin.cp.bean.WxCpXmlOutMessage;
import org.testng.Assert;
@@ -67,7 +68,8 @@ public class WxCpMessageRouterTest {
final WxCpMessageRouter router = new WxCpMessageRouter(null);
router.rule().handler(new WxCpMessageHandler() {
@Override
public WxCpXmlOutMessage handle(WxCpXmlMessage wxMessage, Map<String, Object> context, WxCpService wxCpService) {
public WxCpXmlOutMessage handle(WxCpXmlMessage wxMessage, Map<String, Object> context, WxCpService wxCpService,
WxSessionManager sessionManager) {
return null;
}
}).end();
@@ -149,7 +151,8 @@ public class WxCpMessageRouterTest {
}
@Override
public WxCpXmlOutMessage handle(WxCpXmlMessage wxMessage, Map<String, Object> context, WxCpService wxCpService) {
public WxCpXmlOutMessage handle(WxCpXmlMessage wxMessage, Map<String, Object> context, WxCpService wxCpService,
WxSessionManager sessionManager) {
sb.append(this.echoStr).append(',');
return null;
}

View File

@@ -1,5 +1,6 @@
package me.chanjar.weixin.cp.demo;
import me.chanjar.weixin.common.session.WxSessionManager;
import me.chanjar.weixin.cp.api.*;
import me.chanjar.weixin.cp.bean.WxCpXmlMessage;
import me.chanjar.weixin.cp.bean.WxCpXmlOutMessage;
@@ -46,7 +47,7 @@ public class WxCpDemoServer {
WxCpMessageHandler handler = new WxCpMessageHandler() {
@Override
public WxCpXmlOutMessage handle(WxCpXmlMessage wxMessage, Map<String, Object> context,
WxCpService wxCpService) {
WxCpService wxCpService, WxSessionManager sessionManager) {
WxCpXmlOutTextMessage m = WxCpXmlOutMessage
.TEXT()
.content("测试加密消息")
@@ -60,7 +61,7 @@ public class WxCpDemoServer {
WxCpMessageHandler oauth2handler = new WxCpMessageHandler() {
@Override
public WxCpXmlOutMessage handle(WxCpXmlMessage wxMessage, Map<String, Object> context,
WxCpService wxCpService) {
WxCpService wxCpService, WxSessionManager sessionManager) {
String href = "<a href=\"" + wxCpService.oauth2buildAuthorizationUrl(null)
+ "\">测试oauth2</a>";
return WxCpXmlOutMessage