mirror of
https://gitee.com/binary/weixin-java-tools.git
synced 2025-05-06 05:37:48 +08:00
🎨 #1592 实现简单的redis分布式锁 RedisTemplateSimpleDistributedLock
This commit is contained in:
parent
0a2e4d8aea
commit
2446fa626b
@ -1,11 +1,12 @@
|
|||||||
package me.chanjar.weixin.common.redis;
|
package me.chanjar.weixin.common.redis;
|
||||||
|
|
||||||
|
import lombok.NonNull;
|
||||||
import lombok.RequiredArgsConstructor;
|
import lombok.RequiredArgsConstructor;
|
||||||
|
import me.chanjar.weixin.common.util.locks.RedisTemplateSimpleDistributedLock;
|
||||||
import org.springframework.data.redis.core.StringRedisTemplate;
|
import org.springframework.data.redis.core.StringRedisTemplate;
|
||||||
|
|
||||||
import java.util.concurrent.TimeUnit;
|
import java.util.concurrent.TimeUnit;
|
||||||
import java.util.concurrent.locks.Lock;
|
import java.util.concurrent.locks.Lock;
|
||||||
import java.util.concurrent.locks.ReentrantLock;
|
|
||||||
|
|
||||||
@RequiredArgsConstructor
|
@RequiredArgsConstructor
|
||||||
public class RedisTemplateWxRedisOps implements WxRedisOps {
|
public class RedisTemplateWxRedisOps implements WxRedisOps {
|
||||||
@ -37,7 +38,7 @@ public class RedisTemplateWxRedisOps implements WxRedisOps {
|
|||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public Lock getLock(String key) {
|
public Lock getLock(@NonNull String key) {
|
||||||
return new ReentrantLock();
|
return new RedisTemplateSimpleDistributedLock(redisTemplate, key, 60 * 1000);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -0,0 +1,126 @@
|
|||||||
|
package me.chanjar.weixin.common.util.locks;
|
||||||
|
|
||||||
|
import lombok.Getter;
|
||||||
|
import lombok.NonNull;
|
||||||
|
import org.jetbrains.annotations.NotNull;
|
||||||
|
import org.springframework.dao.DataAccessException;
|
||||||
|
import org.springframework.data.redis.connection.RedisConnection;
|
||||||
|
import org.springframework.data.redis.connection.RedisStringCommands;
|
||||||
|
import org.springframework.data.redis.core.RedisCallback;
|
||||||
|
import org.springframework.data.redis.core.StringRedisTemplate;
|
||||||
|
import org.springframework.data.redis.core.script.DefaultRedisScript;
|
||||||
|
import org.springframework.data.redis.core.script.RedisScript;
|
||||||
|
import org.springframework.data.redis.core.types.Expiration;
|
||||||
|
|
||||||
|
import java.nio.charset.StandardCharsets;
|
||||||
|
import java.util.Arrays;
|
||||||
|
import java.util.List;
|
||||||
|
import java.util.UUID;
|
||||||
|
import java.util.concurrent.TimeUnit;
|
||||||
|
import java.util.concurrent.locks.Condition;
|
||||||
|
import java.util.concurrent.locks.Lock;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 实现简单的redis分布式锁, 支持重入, 不是红锁
|
||||||
|
*
|
||||||
|
* @see <a href="https://redis.io/topics/distlock">reids distlock</a>
|
||||||
|
*/
|
||||||
|
public class RedisTemplateSimpleDistributedLock implements Lock {
|
||||||
|
|
||||||
|
@Getter
|
||||||
|
private final StringRedisTemplate redisTemplate;
|
||||||
|
@Getter
|
||||||
|
private final String key;
|
||||||
|
@Getter
|
||||||
|
private final int leaseMilliseconds;
|
||||||
|
|
||||||
|
private final ThreadLocal<String> valueThreadLocal = new ThreadLocal<>();
|
||||||
|
|
||||||
|
public RedisTemplateSimpleDistributedLock(@NonNull StringRedisTemplate redisTemplate, int leaseMilliseconds) {
|
||||||
|
this(redisTemplate, "lock:" + UUID.randomUUID().toString(), leaseMilliseconds);
|
||||||
|
}
|
||||||
|
|
||||||
|
public RedisTemplateSimpleDistributedLock(@NonNull StringRedisTemplate redisTemplate, @NonNull String key, int leaseMilliseconds) {
|
||||||
|
if (leaseMilliseconds <= 0) {
|
||||||
|
throw new IllegalArgumentException("Parameter 'leaseMilliseconds' must grate then 0: " + leaseMilliseconds);
|
||||||
|
}
|
||||||
|
this.redisTemplate = redisTemplate;
|
||||||
|
this.key = key;
|
||||||
|
this.leaseMilliseconds = leaseMilliseconds;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void lock() {
|
||||||
|
while (!tryLock()) {
|
||||||
|
try {
|
||||||
|
Thread.sleep(1000);
|
||||||
|
} catch (InterruptedException e) {
|
||||||
|
// Ignore
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void lockInterruptibly() throws InterruptedException {
|
||||||
|
while (!tryLock()) {
|
||||||
|
Thread.sleep(1000);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public boolean tryLock() {
|
||||||
|
String value = valueThreadLocal.get();
|
||||||
|
if (value == null || value.length() == 0) {
|
||||||
|
value = UUID.randomUUID().toString();
|
||||||
|
valueThreadLocal.set(value);
|
||||||
|
}
|
||||||
|
final byte[] keyBytes = key.getBytes(StandardCharsets.UTF_8);
|
||||||
|
final byte[] valueBytes = value.getBytes(StandardCharsets.UTF_8);
|
||||||
|
List<Object> redisResults = redisTemplate.executePipelined(new RedisCallback<String>() {
|
||||||
|
@Override
|
||||||
|
public String doInRedis(RedisConnection connection) throws DataAccessException {
|
||||||
|
connection.set(keyBytes, valueBytes, Expiration.milliseconds(leaseMilliseconds), RedisStringCommands.SetOption.SET_IF_ABSENT);
|
||||||
|
connection.get(keyBytes);
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
});
|
||||||
|
Object currentLockSecret = redisResults.size() > 1 ? redisResults.get(1) : redisResults.get(0);
|
||||||
|
return currentLockSecret != null && currentLockSecret.toString().equals(value);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public boolean tryLock(long time, @NotNull TimeUnit unit) throws InterruptedException {
|
||||||
|
long waitMs = unit.toMillis(time);
|
||||||
|
boolean locked = tryLock();
|
||||||
|
while (!locked && waitMs > 0) {
|
||||||
|
long sleep = waitMs < 1000 ? waitMs : 1000;
|
||||||
|
Thread.sleep(sleep);
|
||||||
|
waitMs -= sleep;
|
||||||
|
locked = tryLock();
|
||||||
|
}
|
||||||
|
return locked;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void unlock() {
|
||||||
|
if (valueThreadLocal.get() != null) {
|
||||||
|
// 提示: 必须指定returnType, 类型: 此处必须为Long, 不能是Integer
|
||||||
|
RedisScript<Long> script = new DefaultRedisScript("if redis.call('get', KEYS[1]) == ARGV[1] then return redis.call('del', KEYS[1]) else return 0 end", Long.class);
|
||||||
|
redisTemplate.execute(script, Arrays.asList(key), valueThreadLocal.get());
|
||||||
|
valueThreadLocal.remove();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Condition newCondition() {
|
||||||
|
throw new UnsupportedOperationException();
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 获取当前锁的值
|
||||||
|
* return 返回null意味着没有加锁, 但是返回非null值并不以为着当前加锁成功(redis中key可能自动过期)
|
||||||
|
*/
|
||||||
|
public String getLockSecretValue() {
|
||||||
|
return valueThreadLocal.get();
|
||||||
|
}
|
||||||
|
}
|
@ -0,0 +1,79 @@
|
|||||||
|
package me.chanjar.weixin.common.util.locks;
|
||||||
|
|
||||||
|
import lombok.SneakyThrows;
|
||||||
|
import org.springframework.data.redis.connection.jedis.JedisConnectionFactory;
|
||||||
|
import org.springframework.data.redis.core.StringRedisTemplate;
|
||||||
|
import org.testng.annotations.BeforeTest;
|
||||||
|
import org.testng.annotations.Test;
|
||||||
|
|
||||||
|
import java.util.concurrent.CountDownLatch;
|
||||||
|
import java.util.concurrent.TimeUnit;
|
||||||
|
import java.util.concurrent.atomic.AtomicInteger;
|
||||||
|
|
||||||
|
import static org.testng.Assert.*;
|
||||||
|
|
||||||
|
@Test(enabled = false)
|
||||||
|
public class RedisTemplateSimpleDistributedLockTest {
|
||||||
|
|
||||||
|
RedisTemplateSimpleDistributedLock redisLock;
|
||||||
|
|
||||||
|
StringRedisTemplate redisTemplate;
|
||||||
|
|
||||||
|
AtomicInteger lockCurrentExecuteCounter;
|
||||||
|
|
||||||
|
@BeforeTest
|
||||||
|
public void init() {
|
||||||
|
JedisConnectionFactory connectionFactory = new JedisConnectionFactory();
|
||||||
|
connectionFactory.setHostName("127.0.0.1");
|
||||||
|
connectionFactory.setPort(6379);
|
||||||
|
connectionFactory.afterPropertiesSet();
|
||||||
|
StringRedisTemplate redisTemplate = new StringRedisTemplate(connectionFactory);
|
||||||
|
this.redisTemplate = redisTemplate;
|
||||||
|
this.redisLock = new RedisTemplateSimpleDistributedLock(redisTemplate, 60000);
|
||||||
|
this.lockCurrentExecuteCounter = new AtomicInteger(0);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test(description = "多线程测试锁排他性")
|
||||||
|
public void testLockExclusive() throws InterruptedException {
|
||||||
|
int threadSize = 100;
|
||||||
|
final CountDownLatch startLatch = new CountDownLatch(threadSize);
|
||||||
|
final CountDownLatch endLatch = new CountDownLatch(threadSize);
|
||||||
|
|
||||||
|
for (int i = 0; i < threadSize; i++) {
|
||||||
|
new Thread(new Runnable() {
|
||||||
|
@SneakyThrows
|
||||||
|
@Override
|
||||||
|
public void run() {
|
||||||
|
startLatch.await();
|
||||||
|
|
||||||
|
redisLock.lock();
|
||||||
|
assertEquals(lockCurrentExecuteCounter.incrementAndGet(), 1, "临界区同时只能有一个线程执行");
|
||||||
|
lockCurrentExecuteCounter.decrementAndGet();
|
||||||
|
redisLock.unlock();
|
||||||
|
|
||||||
|
endLatch.countDown();
|
||||||
|
}
|
||||||
|
}).start();
|
||||||
|
startLatch.countDown();
|
||||||
|
}
|
||||||
|
endLatch.await();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testTryLock() throws InterruptedException {
|
||||||
|
assertTrue(redisLock.tryLock(3, TimeUnit.SECONDS), "第一次加锁应该成功");
|
||||||
|
assertNotNull(redisLock.getLockSecretValue());
|
||||||
|
String redisValue = this.redisTemplate.opsForValue().get(redisLock.getKey());
|
||||||
|
assertEquals(redisValue, redisLock.getLockSecretValue());
|
||||||
|
|
||||||
|
redisLock.unlock();
|
||||||
|
assertNull(redisLock.getLockSecretValue());
|
||||||
|
redisValue = this.redisTemplate.opsForValue().get(redisLock.getKey());
|
||||||
|
assertNull(redisValue, "释放锁后key会被删除");
|
||||||
|
|
||||||
|
redisLock.unlock();
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
}
|
||||||
|
|
@ -1,10 +1,10 @@
|
|||||||
package com.github.binarywang.wxpay.bean.result;
|
package com.github.binarywang.wxpay.bean.result;
|
||||||
|
|
||||||
import org.testng.*;
|
|
||||||
import org.testng.annotations.*;
|
|
||||||
|
|
||||||
import com.thoughtworks.xstream.XStream;
|
import com.thoughtworks.xstream.XStream;
|
||||||
import me.chanjar.weixin.common.util.xml.XStreamInitializer;
|
import me.chanjar.weixin.common.util.xml.XStreamInitializer;
|
||||||
|
import org.testng.Assert;
|
||||||
|
import org.testng.annotations.BeforeTest;
|
||||||
|
import org.testng.annotations.Test;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* The type Wx pay send redpack result test.
|
* The type Wx pay send redpack result test.
|
||||||
@ -68,6 +68,6 @@ public class WxPaySendRedpackResultTest {
|
|||||||
Assert.assertEquals("FAIL", wxMpRedpackResult.getReturnCode());
|
Assert.assertEquals("FAIL", wxMpRedpackResult.getReturnCode());
|
||||||
Assert.assertEquals("FAIL", wxMpRedpackResult.getResultCode());
|
Assert.assertEquals("FAIL", wxMpRedpackResult.getResultCode());
|
||||||
Assert.assertEquals("onqOjjmM1tad-3ROpncN-yUfa6uI", wxMpRedpackResult.getReOpenid());
|
Assert.assertEquals("onqOjjmM1tad-3ROpncN-yUfa6uI", wxMpRedpackResult.getReOpenid());
|
||||||
Assert.assertEquals(1, wxMpRedpackResult.getTotalAmount());
|
Assert.assertEquals(Integer.valueOf(1), wxMpRedpackResult.getTotalAmount());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user