0
点赞
收藏
分享

微信扫一扫

redis(单机版)+jedis实现分布式锁(一)


本方案只是简单实现,针对@Scheduled使用,并且只记录一次,不支持计数,如果需要​实现计数功能,需要简单的改造。

方案使用LUA实现原子性锁定。

项目结构如下

redis(单机版)+jedis实现分布式锁(一)_同步锁

annotation:实现注解
config:主要实现SchedulingConfigurer + ScheduledAnnotationBeanPostProcessor(这个就是定时任务拦截器了)
enums:自定义的定时任务类型
lock:锁的service
processor:定时任务拦截器
properties:redis的相关属性
LockExpireObject:锁对象,名称、过期时间、类型、等等,封装为对象以后就好扩展了
RedisMethodRunnable:执行定时任务的进程


关键点说明

ScheduledAnnotationBeanPostProcessor + RedisMethodRunnable重写,覆盖原有的执行逻辑,加入锁的判断


代码展示:@RedisLock

package cn.kunming.kgoa.starter.schedule.lock.annotation;

import cn.kunming.kgoa.starter.schedule.lock.LockExpireObject;
import cn.kunming.kgoa.starter.schedule.lock.enums.Gran;
import org.springframework.context.annotation.Import;

import java.lang.annotation.*;
import java.util.concurrent.TimeUnit;

@Target({ElementType.METHOD,ElementType.ANNOTATION_TYPE})
@Retention(RetentionPolicy.RUNTIME)
@Documented
public @interface RedisLock {

/**
* 锁名,这强制填写,防止方法名重复出问题
*/
String lockKey() default "";

String lockValue() default "加锁资源";

/**
* 默认120s
*
* @return
*/
long expire() default 60 * 1000 * 2;

/**
* 锁粒度
*
* 默认:秒级别
* 如果是分级别,则用默认的
* enums:Gran
*/
Gran granularity() default Gran.SECONDS;

/**
* 检测值
*/
long fix() default 1000L;

}


SchedulerConfig:

package cn.kunming.kgoa.starter.schedule.lock.config;

import cn.kunming.kgoa.starter.redis.core.RedisUtil;
import cn.kunming.kgoa.starter.schedule.lock.lock.RedisDistributedLock;
import cn.kunming.kgoa.starter.schedule.lock.processor.RedisScheduledAnnotationBeanPostProcessor;
import cn.kunming.kgoa.starter.schedule.lock.properties.RedisProperties;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.beans.factory.config.BeanDefinition;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Primary;
import org.springframework.context.annotation.Role;
import org.springframework.scheduling.TaskScheduler;
import org.springframework.scheduling.annotation.ScheduledAnnotationBeanPostProcessor;
import org.springframework.scheduling.annotation.SchedulingConfigurer;
import org.springframework.scheduling.concurrent.ConcurrentTaskScheduler;
import org.springframework.scheduling.config.ScheduledTaskRegistrar;
import org.springframework.scheduling.config.TaskManagementConfigUtils;
import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisPool;
import redis.clients.jedis.JedisPoolConfig;

import javax.annotation.Resource;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledThreadPoolExecutor;

@Configuration
public class SchedulerConfig implements SchedulingConfigurer {

private Logger logger = LoggerFactory.getLogger(SchedulerConfig.class);

private int schedulerThreadCount = 5;

@Autowired
private RedisDistributedLock lock;

@Override
public void configureTasks(ScheduledTaskRegistrar taskRegistrar) {
TaskScheduler taskScheduler = new ConcurrentTaskScheduler(scheduledExecutorService());
taskRegistrar.setTaskScheduler(taskScheduler);
}

@Bean(destroyMethod="shutdown")
public ScheduledExecutorService scheduledExecutorService() {
return new ScheduledThreadPoolExecutor(schedulerThreadCount);
}

@Bean(name = TaskManagementConfigUtils.SCHEDULED_ANNOTATION_PROCESSOR_BEAN_NAME)
@Role(BeanDefinition.ROLE_INFRASTRUCTURE)
@Qualifier
@Primary
public ScheduledAnnotationBeanPostProcessor scheduledAnnotationProcessor() {
// 注册拦截器
return new RedisScheduledAnnotationBeanPostProcessor(lock);
}
}


Gran:

package cn.kunming.kgoa.starter.schedule.lock.enums;

import lombok.Getter;

/**
* 定时任务粒度
*/
@Getter
public enum Gran {

SECONDS(1, "seconds", 1000L, "秒级"),
MINUTES(2, "minutes", 60000L,"分钟级"),
HOURS(3, "hourse", 3600000L,"小时级"),
DAYS(4, "days", 86400000L,"天级"),
;

private final Integer code;

private final String name;

private final String msg;

// 误差:毫秒
private final Long fix;

Gran(Integer code, String name, Long fix, String msg) {
this.code = code;
this.name = name;
this.fix = fix;
this.msg = msg;
}

public boolean equals(Integer code){
return this.code.equals(code);
}
}


RedisDistributedLock:

package cn.kunming.kgoa.starter.schedule.lock.lock;

import cn.kunming.kgoa.starter.redis.core.RedisUtil;
import cn.kunming.kgoa.starter.schedule.lock.LockExpireObject;
import cn.kunming.kgoa.starter.schedule.lock.annotation.RedisLock;
import cn.kunming.kgoa.starter.schedule.lock.enums.Gran;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.core.env.Environment;
import org.springframework.data.redis.core.RedisCallback;
import org.springframework.stereotype.Component;
import org.springframework.util.StringUtils;
import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisCluster;
import redis.clients.jedis.JedisPool;
import redis.clients.jedis.JedisPoolConfig;
import redis.clients.jedis.commands.JedisCommands;
import redis.clients.jedis.params.SetParams;

import java.util.ArrayList;
import java.util.List;

@Component
public class RedisDistributedLock {

private Logger logger = LoggerFactory.getLogger(RedisDistributedLock.class);

// @Autowired
private Jedis jedis;

@Value("${spring.redis.host}")
private String redisHost;

@Value("${spring.redis.port}")
private Integer redisPort;

@Value("${spring.redis.password}")
private String redisPwd;

@Autowired
private Environment environment;

@Autowired
private final RedisUtil redisUtil;

private final String defaultAppName = "default_app_name";

private final String redisKeyStart = "distributed_lock:";

public static final String UNLOCK_LUA;

static {
StringBuilder sb = new StringBuilder();
sb.append("if redis.call(\"get\",KEYS[1]) == ARGV[1] ");
sb.append("then ");
sb.append(" return redis.call(\"del\",KEYS[1]) ");
sb.append("else ");
sb.append(" return 0 ");
sb.append("end ");
UNLOCK_LUA = sb.toString();
}

public RedisDistributedLock (RedisUtil redisUtil){
this.redisUtil = redisUtil;
}

public boolean setLock(RedisLock lockAnno, LockExpireObject lockObject) {
try {
getJedis();
String key = buildRedisKey(lockAnno, lockAnno.granularity());
SetParams params = SetParams.setParams().nx().px(lockAnno.granularity().getFix());
String result = jedis.set(key, lockAnno.lockKey(), params);
return "OK".equals(result);
} catch (Exception e) {
logger.error("set redis occured an exception", e);
}
return false;
}

public boolean releaseLock(RedisLock lockAnno, LockExpireObject lockObject) {
// 释放锁的时候,有可能因为持锁之后方法执行时间大于锁的有效期,此时有可能已经被另外一个线程持有锁,所以不能直接删除
try {
String key = buildRedisKey(lockAnno, lockAnno.granularity());
/**
* 如果当前进程和锁中的ID不一样,说明不是同一个进程
* 只有锁中的进程可以释放锁
*/
String uniqueId = getLock(lockAnno, lockObject);
if(null == uniqueId || !uniqueId.equals(lockObject.getUniqueId())){
logger.info("不能释放锁");
return false;
}
String script = "if redis.call('get', KEYS[1]) == ARGV[1] then return redis.call('del', KEYS[1]) else return 0 end";
Object result = jedis.eval(script, 1, key, lockObject.getUniqueId());
logger.info("可以:" + result);
return true;
} catch (Exception e) {
logger.error("release lock occured an exception", e);
} finally {
closeJedis();
}
return false;
}

public String getLock(RedisLock lockAnno, LockExpireObject lockObject){
String key = buildRedisKey(lockAnno, lockAnno.granularity());
String uniqueId = jedis.get(key);
return uniqueId;
}

/**
* 使用Lua脚本,脚本中使用setnex+expire命令进行加锁操作
*/
public boolean setLuaLock(RedisLock lockAnno, LockExpireObject lockObject) {
getJedis();
String uniqueId = getLock(lockAnno, lockObject);
// 当前锁存在,设置失败
if(null != uniqueId && !"".equals(uniqueId)){

return false;
}

String luaScripts = "if redis.call('setnx',KEYS[1],ARGV[1]) == 1 then " +
"redis.call('expire',KEYS[1],ARGV[2]) return 1 else return 0 end";
List<String> keys = new ArrayList<>();
keys.add(buildRedisKey(lockAnno, lockAnno.granularity()));
List<String> values = new ArrayList<>();
values.add(lockObject.getUniqueId());
values.add(String.valueOf(lockAnno.granularity().getFix()/1000));
Object result = jedis.eval(luaScripts, keys, values);
// logger.info("进程【" + lockObject.getUniqueId() + "】存储:" + lockObject.getUniqueId());
// logger.info("进程【" + lockObject.getUniqueId() + "】执行结果:" + result);
if(!result.equals(1L)){
return false;
}

/**
* 设置完成锁后,需要检查一下,有没有被其他锁覆盖
* lua存在原子性,使用队列进行操作
* 后面的进程依然会执行,导致覆盖前面的进程
* 所以需要检查一下有没有被覆盖(是否是最后一个/或者可执行)
*/
uniqueId = getLock(lockAnno, lockObject);
if(!lockObject.getUniqueId().equals(uniqueId)){

return false;
}

return true;
}

/**
* 构建key
*/
private String buildRedisKey(RedisLock lockName, Gran lockType) {
String key = redisKeyStart;
key += getCurAppName() + ":";
key += lockType.getName() + ":";
key += lockName.lockKey();
return key;
}

/**
* 获取当前应用名称
*/
private String getCurAppName(){
String name = environment.getProperty("spring.application.name");
if(null == name){
return defaultAppName;
}
return name;
}

public void getJedis(){
// 这里使用同步进程锁,增加一道安全策略
synchronized (environment){
JedisPoolConfig config = new JedisPoolConfig();
config.setTestOnBorrow(true);
config.setTestOnReturn(true);
JedisPool jedis;
if(null == redisPwd || "".equals(redisPwd)){
jedis = new JedisPool(config, redisHost, redisPort, 6000);
}else{
jedis = new JedisPool(config, redisHost, redisPort, 6000, redisPwd);
}
this.jedis = jedis.getResource();
}

}

public void closeJedis(){
if(null != jedis){
jedis.disconnect();
}
}

}


RedisScheduledAnnotationBeanPostProcessor:

package cn.kunming.kgoa.starter.schedule.lock.processor;

import cn.kunming.kgoa.starter.schedule.lock.RedisMethodRunnable;
import cn.kunming.kgoa.starter.schedule.lock.annotation.RedisLock;
import cn.kunming.kgoa.starter.schedule.lock.lock.RedisDistributedLock;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.aop.support.AopUtils;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.scheduling.annotation.ScheduledAnnotationBeanPostProcessor;
import org.springframework.scheduling.config.*;
import org.springframework.scheduling.support.CronTrigger;
import org.springframework.scheduling.support.ScheduledMethodRunnable;
import org.springframework.util.Assert;
import org.springframework.util.StringUtils;
import org.springframework.util.StringValueResolver;

import java.lang.reflect.Field;
import java.lang.reflect.Method;
import java.time.Duration;
import java.util.*;

public class RedisScheduledAnnotationBeanPostProcessor extends ScheduledAnnotationBeanPostProcessor {

private Logger logger = LoggerFactory.getLogger(RedisScheduledAnnotationBeanPostProcessor.class);

protected RedisDistributedLock lock;

private StringValueResolver embeddedValueResolver;

private final ScheduledTaskRegistrar registrar = (ScheduledTaskRegistrar) getFieldValueFromParentClass("registrar");

private final Map<Object, Set<ScheduledTask>> scheduledTasks = (Map) getFieldValueFromParentClass("scheduledTasks");

public RedisScheduledAnnotationBeanPostProcessor(RedisDistributedLock lock){
this.lock = lock;
}

@Override
public void setEmbeddedValueResolver(StringValueResolver resolver) {
super.setEmbeddedValueResolver(resolver);
this.embeddedValueResolver = resolver;
}

/**
* @param scheduled
* @param method
* @param bean
* @see ScheduledAnnotationBeanPostProcessor
*/
@Override
protected void processScheduled(Scheduled scheduled, Method method, Object bean) {
logger.info("processScheduled:" + method.getName());
try {
Assert.isTrue(method.getParameterTypes().length == 0,
"Only no-arg methods may be annotated with @Scheduled");

Method invocableMethod = AopUtils.selectInvocableMethod(method, bean.getClass());

Runnable runnable = null;
//判断如果加了@RedisLock注解就使用redis分布式锁单机器执行的方案
if (hasRedisScheduleAnnotation(invocableMethod)) {
Map<String,Object> map = getRedisScheduleAnnotationValue(invocableMethod);
runnable = new RedisMethodRunnable(bean, invocableMethod, lock, map);
} else {
runnable = new ScheduledMethodRunnable(bean, invocableMethod);
}
//判断如果加了@RedisLock注解就使用redis分布式锁单机器执行的方案

// 往下都父类的代码粘贴下来的
boolean processedSchedule = false;
String errorMessage =
"Exactly one of the 'cron', 'fixedDelay(String)', or 'fixedRate(String)' attributes is required";

Set<ScheduledTask> tasks = new LinkedHashSet<>(4);

// Determine initial delay
long initialDelay = scheduled.initialDelay();
String initialDelayString = scheduled.initialDelayString();
if (StringUtils.hasText(initialDelayString)) {
Assert.isTrue(initialDelay < 0, "Specify 'initialDelay' or 'initialDelayString', not both");
if (this.embeddedValueResolver != null) {
initialDelayString = this.embeddedValueResolver.resolveStringValue(initialDelayString);
}
if (StringUtils.hasLength(initialDelayString)) {
try {
initialDelay = parseDelayAsLong(initialDelayString);
}
catch (RuntimeException ex) {
throw new IllegalArgumentException(
"Invalid initialDelayString value \"" + initialDelayString + "\" - cannot parse into long");
}
}
}

// Check cron expression
String cron = scheduled.cron();
if (StringUtils.hasText(cron)) {
String zone = scheduled.zone();
if (this.embeddedValueResolver != null) {
cron = this.embeddedValueResolver.resolveStringValue(cron);
zone = this.embeddedValueResolver.resolveStringValue(zone);
}
if (StringUtils.hasLength(cron)) {
Assert.isTrue(initialDelay == -1, "'initialDelay' not supported for cron triggers");
processedSchedule = true;
if (!Scheduled.CRON_DISABLED.equals(cron)) {
TimeZone timeZone;
if (StringUtils.hasText(zone)) {
timeZone = StringUtils.parseTimeZoneString(zone);
}
else {
timeZone = TimeZone.getDefault();
}
tasks.add(this.registrar.scheduleCronTask(new CronTask(runnable, new CronTrigger(cron, timeZone))));
}
}
}

// At this point we don't need to differentiate between initial delay set or not anymore
if (initialDelay < 0) {
initialDelay = 0;
}

// Check fixed delay
long fixedDelay = scheduled.fixedDelay();
if (fixedDelay >= 0) {
Assert.isTrue(!processedSchedule, errorMessage);
processedSchedule = true;
tasks.add(this.registrar.scheduleFixedDelayTask(new FixedDelayTask(runnable, fixedDelay, initialDelay)));
}
String fixedDelayString = scheduled.fixedDelayString();
if (StringUtils.hasText(fixedDelayString)) {
if (this.embeddedValueResolver != null) {
fixedDelayString = this.embeddedValueResolver.resolveStringValue(fixedDelayString);
}
if (StringUtils.hasLength(fixedDelayString)) {
Assert.isTrue(!processedSchedule, errorMessage);
processedSchedule = true;
try {
fixedDelay = parseDelayAsLong(fixedDelayString);
}
catch (RuntimeException ex) {
throw new IllegalArgumentException(
"Invalid fixedDelayString value \"" + fixedDelayString + "\" - cannot parse into long");
}
tasks.add(this.registrar.scheduleFixedDelayTask(new FixedDelayTask(runnable, fixedDelay, initialDelay)));
}
}

// Check fixed rate
long fixedRate = scheduled.fixedRate();
if (fixedRate >= 0) {
Assert.isTrue(!processedSchedule, errorMessage);
processedSchedule = true;
tasks.add(this.registrar.scheduleFixedRateTask(new FixedRateTask(runnable, fixedRate, initialDelay)));
}
String fixedRateString = scheduled.fixedRateString();
if (StringUtils.hasText(fixedRateString)) {
if (this.embeddedValueResolver != null) {
fixedRateString = this.embeddedValueResolver.resolveStringValue(fixedRateString);
}
if (StringUtils.hasLength(fixedRateString)) {
Assert.isTrue(!processedSchedule, errorMessage);
processedSchedule = true;
try {
fixedRate = parseDelayAsLong(fixedRateString);
}
catch (RuntimeException ex) {
throw new IllegalArgumentException(
"Invalid fixedRateString value \"" + fixedRateString + "\" - cannot parse into long");
}
tasks.add(this.registrar.scheduleFixedRateTask(new FixedRateTask(runnable, fixedRate, initialDelay)));
}
}

// Check whether we had any attribute set
Assert.isTrue(processedSchedule, errorMessage);

// Finally register the scheduled tasks
synchronized (this.scheduledTasks) {
Set<ScheduledTask> regTasks = this.scheduledTasks.computeIfAbsent(bean, key -> new LinkedHashSet<>(4));
regTasks.addAll(tasks);
}
}
catch (IllegalArgumentException ex) {
throw new IllegalStateException(
"Encountered invalid @Scheduled method '" + method.getName() + "': " + ex.getMessage());
}
}

private Map<String, Object> getRedisScheduleAnnotationValue(Method invocableMethod) {
Map<String,Object> map = new HashMap<>(4);
RedisLock redisScheduled = invocableMethod.getAnnotation(RedisLock.class);
map.put("lockKey",redisScheduled.lockKey());
map.put("lockValue",redisScheduled.lockValue());
map.put("expire",redisScheduled.expire());
map.put("granularity",redisScheduled.granularity());
return map;
}

private boolean hasRedisScheduleAnnotation(Method invocableMethod) {

return invocableMethod.isAnnotationPresent(RedisLock.class);
}

private Object getFieldValueFromParentClass(String registrar) {
try {
Field field = ScheduledAnnotationBeanPostProcessor.class.getDeclaredField(registrar);
field.setAccessible(true);
Object fieldVaule = field.get(this);
return fieldVaule;
} catch (Exception e) {
logger.error("通过反射读取ScheduledAnnotationBeanPostProcessor.{} 的时候出现错误", e);
throw new IllegalArgumentException(e);
}
}

private static long parseDelayAsLong(String value) throws RuntimeException {
return value.length() <= 1 || !isP(value.charAt(0)) && !isP(value.charAt(1)) ? Long.parseLong(value) : Duration.parse(value).toMillis();
}

private static boolean isP(char ch) {
return ch == 'P' || ch == 'p';
}
}


LockExpireObject:

package cn.kunming.kgoa.starter.schedule.lock;

import lombok.Data;

import java.io.Serializable;

@Data
public class LockExpireObject implements Serializable {
private static final long serialVersionUID = 1L;

public Long createTime;

public String name;

/**
* 唯一ID
*/
private String UniqueId;

public static LockExpireObject buildOne(){
return new LockExpireObject();
}
}


RedisMethodRunnable:

package cn.kunming.kgoa.starter.schedule.lock;

import cn.kunming.kgoa.starter.schedule.lock.annotation.RedisLock;
import cn.kunming.kgoa.starter.schedule.lock.lock.RedisDistributedLock;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.scheduling.support.ScheduledMethodRunnable;

import java.lang.reflect.Method;
import java.net.InetAddress;
import java.util.Map;
import java.util.UUID;

public class RedisMethodRunnable extends ScheduledMethodRunnable {

private Logger logger = LoggerFactory.getLogger(RedisMethodRunnable.class);

private String name;

private Method method;

private final static String lockKey = "redisLockKey";

private Map<String, Object> map ;

private RedisDistributedLock lock;

public RedisMethodRunnable(Object target, Method method, RedisDistributedLock lock, Map<String, Object> map) {
super(target, method);
this.lock = lock;
this.map = map;
this.method = method;
this.name = method.getName();
}

@Override
public void run() {
RedisLock lockAnnotation = method.getAnnotation(RedisLock.class);
// 构建锁对象
LockExpireObject lockObject = new LockExpireObject();
lockObject.setCreateTime(System.currentTimeMillis());
lockObject.setUniqueId(UUID.randomUUID().toString());
lockObject.setName(lockAnnotation.lockKey() + "_" + lockObject.getUniqueId());

try {
logger.info("定时任务:" + lockAnnotation.lockKey());
if(!lock.setLuaLock(lockAnnotation, lockObject)){
logger.info("不能");
return;
}
logger.info("能--加锁");
//执行任务
super.run();
} catch (Throwable e) {
// 释放锁
logger.error(e.getMessage());
throw e;
} finally {
// 释放锁
lock.releaseLock(lockAnnotation, lockObject);
return;
}
}

}


注解使用:

@RedisLock(lockKey = "push_msg_1", granularity = Gran.DAYS)
@Scheduled(cron = "0 50 8 * * ? ")
private void pushMsg1(){
pushMsgService.pushDayHotMsg();
log.info("pushMsg");
}





举报

相关推荐

0 条评论