小示例
定制设计在说到锁的时候,定制设计大家第一反应想到的应该是用redis定制设计实现分布式锁。
定制设计那么就来看一下下面的几个代码(定制设计如果不想看示例,定制设计可以直接跳到正文)
#示例1
public class RedisLock { @Autowired private RedisTemplate<String, Object> redisTemplate; private static final String REQUEST_CONTAINER = "TEST_LOCK"; /** * 定制设计锁的过期时间可设置的长一些(定制设计业务执行完就释放了),定制设计至少要比预计的业务执行时间长(定制设计防止业务没执行完就释放了锁) */ private static final long EXPIRE_TIME = 300L; public void lock(String lockName) { String key = REQUEST_CONTAINER + lockName; boolean aBoolean = redisTemplate.opsForValue().setIfAbsent(key,1); if (aBoolean) { try { log.info("====================进入锁:{}==================",key); // 定制设计这里设置锁的过期时间和finally代码块都是为了防止死锁(JVM 崩溃,操作系统死掉等,finally是不会执行的) redisTemplate.expire(key,EXPIRE_TIME, TimeUnit.SECONDS); /*已进入业务*/ // ********业务逻辑处理中****** // log.info("处理业务中"); // *******业务逻辑处理结束****** // }catch (Exception e) { log.info("出现了点小问题-{}", e.getMessage()); } finally { // 业务异常,手动释放锁 redisTemplate.delete(key); log.info("------------redis锁释放成功-----------"); } }else { log.info("获取不到锁"); } }}
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
- 26
- 27
- 28
- 29
- 30
- 31
- 32
- 33
- 34
- 35
- 36
- 37
- 38
问题:在工作中,我发现是有人会和上面一样实现分布式锁的,那存在一个问题,当我们执行完setIfAbsent操作,设置了锁,但是在执行expire之前,宕机了或者服务器挂掉了,这个锁就没法释放了。
#示例2
public class RedisLock { @Autowired private RedisTemplate<String, Object> redisTemplate; private static final String REQUEST_CONTAINER = "TEST_LOCK"; /** * 锁的过期时间可设置的长一些(业务执行完就释放了),至少要比预计的业务执行时间长(防止业务没执行完就释放了锁) */ private static final long EXPIRE_TIME = 300L; public void lock(String lockName) { String key = REQUEST_CONTAINER + lockName; // 每个人进来先要进行加锁,key值为"good_lock",一般可以用实际业务中的key String value = UUID.randomUUID().toString().replace("-",""); try { boolean aBoolean = redisTemplate.opsForValue().setIfAbsent(REQUEST_CONTAINER, value,EXPIRE_TIME,TimeUnit.SECONDS); if (aBoolean) { log.info("====================进入锁:{}==================",key); // 这里设置锁的过期时间和finally代码块都是为了防止死锁(JVM 崩溃,操作系统死掉等,finally是不会执行的) redisTemplate.expire(key,EXPIRE_TIME, TimeUnit.SECONDS); /*已进入业务*/ // ********业务逻辑处理中****** // log.info("处理业务中"); // *******业务逻辑处理结束****** // }else { log.info("获取不到锁"); } }catch (Exception e) { log.info("出现了点小问题-{}", e.getMessage()); } finally { // 谁加的锁,谁才能删除!!!! if(redisTemplate.opsForValue().get(REQUEST_CONTAINER).equals(value)){ redisTemplate.delete(REQUEST_CONTAINER); } } }}
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
- 26
- 27
- 28
- 29
- 30
- 31
- 32
- 33
- 34
- 35
- 36
- 37
- 38
- 39
- 40
注: redis在2.1.3和之后的版本里,才可以同时设置key和过期时间
问题: 上面的代码是更完善的一个版本,但同样存在两个问题:
1、finally块的判断和del删除操作不是原子操作,并发的时候也会出问题
2、若业务逻辑复杂,不能在锁过期时间之内完成操作,锁过期了,其他线程就进来了,这也是一个很严重的问题
针对这两个问题,是有解决办法
1、用lua脚本保证finally块的判断和del删除是原子性的
2、通过额外的定时任务完成锁续命的逻辑
重点来了: 说了这么多,大家应该觉得这个分布式锁好麻烦啊,要注意的东西太多,那就看一下Redission如何实现的分布式锁吧
正文—Redission实现分布式锁
1、引入依赖
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-data-redis</artifactId> </dependency> <dependency> <groupId>org.redisson</groupId> <artifactId>redisson-spring-boot-starter</artifactId> <version>3.12.5</version> </dependency>
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
2、application.yml配置
spring: redis: host: 127.0.0.1 password: redis123 port: 6379 timeout: 0
- 1
- 2
- 3
- 4
- 5
- 6
3、Redission配置类
@Configurationpublic class RedissonConfig { @Value("${spring.redis.host}") private String host; @Value("${spring.redis.port}") private String port; @Value("${spring.redis.password}") private String redisPassword; @Bean public RedissonClient getRedisson(){ Config config = new Config(); //单机模式 依次设置redis地址和密码 config.useSingleServer(). setAddress("redis://" + host + ":" + port). setPassword(redisPassword); return Redisson.create(config); }}
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
4、Redission分布式锁对应业务类
@RestController@RequestMapping("/redisLock")public class RedisLockController { @Resource private StringRedisTemplate stringRedisTemplate; @Resource private RedissonClient redisson; private static final String REDIS_KEY = "redis_oa_01"; private static final int MAX_SIZE = 1000; /** * 初始化库存 */ @PostMapping("/init") public void init() { stringRedisTemplate.opsForValue().set(REDIS_KEY, String.valueOf(MAX_SIZE)); } /** * 扣库存业务 */ @PostMapping("/exportInventory") public void exportInventory() { String lockKey = "product001"; RLock lock = redisson.getLock(lockKey); try { lock.lock(); int s = Integer.parseInt(Objects.requireNonNull(stringRedisTemplate.opsForValue().get(REDIS_KEY))); s--; System.out.printf("1号服务:库存当前为:" + s + "\"); stringRedisTemplate.opsForValue().set(REDIS_KEY, String.valueOf(s)); }catch (Exception e){} finally { lock.unlock(); } }
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
- 26
- 27
- 28
- 29
- 30
- 31
- 32
- 33
- 34
- 35
- 36
- 37
- 38
- 39
- 40
- 41
以上操作分别写两份,并启动两个服务,模拟分布式的情况
现在我们可以下载一个jmeter,进行压测,看看我们的代码抗不抗揍
jmeter的使用教程,参考一下这个:
jmeter操作配置
Idea打印执行结果
我设置的是1000个库存,由两个服务去执行扣除,3s,执行1000次,没有一个重复的,还是抗揍的吧。