目录
1.锁
分布式锁,crm开发定制即分布式系统中的锁。crm开发定制随着业务发展的需要,crm开发定制原单体单机部署的系统crm开发定制被演化成分布式集群系统后,crm开发定制由于分布式系统多线程、crm开发定制多进程并且分布在不同机器上,crm开发定制这将使原单机部署情况crm开发定制下的并发控制锁策略失效,单纯的Java APIcrm开发定制并不能提供分布式锁的crm开发定制能力在单体应用中我们crm开发定制通过锁解决的是控制共crm开发定制享资源访问的问题,crm开发定制而分布式锁,crm开发定制就是解决了分布式系统中控制共享资源访问的问题。
下面主要介绍springboot集成redis实现分布式锁。
需要注意的是,分布式锁可以保证数据的一致性,但同时访问的速度也会受到影响。
2.springboot集成redis
在springboot项目中引入redis相关依赖:
- <dependency>
- <groupId>org.springframework.boot</groupId>
- <artifactId>spring-boot-starter-data-redis</artifactId>
- </dependency>
- <dependency>
- <groupId>org.apache.commons</groupId>
- <artifactId>commons-pool2</artifactId>
- <version>2.6.0</version>
- </dependency>
编写application.yml文件:
- spring:
- redis:
- host: 127.0.0.1 #服务器地址
- port: 6379 #端口号
- database: 0 #数据库索引(默认为0)
- timeout: 180000 #连接超时时间
- lettuce:
- pool:
- max-active: 20 #最大连接数
- max-wait: -1 #最大阻塞等待时间,-1即无限制
- max-idle: 8 #最大空闲连接数
- min-idle: 0 #最小空闲连接数
此处使用的是lettuce客户端而不是jedis客户端。Lettuce是基于Netty的事件驱动的Redis客户端,其方法调用是异步的,Lettuce的API也是线程安全的,所以多个线程可以操作单个Lettuce连接来完成各种操作,同时Lettuce也支持连接池。
编写redis配置类,实现序列化:
- package com.seven.redis.config;
-
- import com.fasterxml.jackson.annotation.JsonAutoDetect;
- import com.fasterxml.jackson.annotation.PropertyAccessor;
- import com.fasterxml.jackson.databind.ObjectMapper;
- import org.springframework.cache.annotation.EnableCaching;
- import org.springframework.context.annotation.Bean;
- import org.springframework.context.annotation.Configuration;
- import org.springframework.data.redis.connection.RedisConnectionFactory;
- import org.springframework.data.redis.core.RedisTemplate;
- import org.springframework.data.redis.serializer.Jackson2JsonRedisSerializer;
- import org.springframework.data.redis.serializer.StringRedisSerializer;
-
- @EnableCaching
- @Configuration
- public class RedisConfig {
-
- @Bean
- @SuppressWarnings("all")
- public RedisTemplate<String, Object> redisTemplate(RedisConnectionFactory factory) {
- RedisTemplate<String, Object> template = new RedisTemplate<String, Object>();
- template.setConnectionFactory(factory);
- Jackson2JsonRedisSerializer jackson2JsonRedisSerializer = new Jackson2JsonRedisSerializer(Object.class);
- ObjectMapper om = new ObjectMapper();
- om.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.ANY);
- om.enableDefaultTyping(ObjectMapper.DefaultTyping.NON_FINAL); //目前已弃用
- jackson2JsonRedisSerializer.setObjectMapper(om);
- StringRedisSerializer stringRedisSerializer = new StringRedisSerializer();
-
- // key采用String的序列化方式
- template.setKeySerializer(stringRedisSerializer);
- // hash的key也采用String的序列化方式
- template.setHashKeySerializer(stringRedisSerializer);
- // value序列化方式采用jackson
- template.setValueSerializer(jackson2JsonRedisSerializer);
- // hash的value序列化方式采用jackson
- template.setHashValueSerializer(jackson2JsonRedisSerializer);
- template.afterPropertiesSet();
-
- return template;
- }
-
-
- }
然后,我们可以通过直接导入RedisTemplate或来使用redis,
- @Resource
- private RedisTemplate redisTemplate;
或是自定义一个redisUtil工具类,重写RedisTemplate里的部分方法:
- package com.seven.redis.utils;
-
- import org.springframework.beans.factory.annotation.Autowired;
- import org.springframework.data.redis.core.RedisTemplate;
- import org.springframework.stereotype.Component;
- import org.springframework.util.CollectionUtils;
-
- import java.util.List;
- import java.util.Map;
- import java.util.Set;
- import java.util.concurrent.TimeUnit;
-
- @Component
- public final class RedisUtil {
-
- @Autowired
- private RedisTemplate<String, Object> redisTemplate;
-
- // =============================common============================
- /**
- * 指定缓存失效时间
- * @param key 键
- * @param time 时间(秒)
- */
- public boolean expire(String key, long time) {
- try {
- if (time > 0) {
- redisTemplate.expire(key, time, TimeUnit.SECONDS);
- }
- return true;
- } catch (Exception e) {
- e.printStackTrace();
- return false;
- }
- }
-
- /**
- * 根据key 获取过期时间
- * @param key 键 不能为null
- * @return 时间(秒) 返回0代表为永久有效
- */
- public long getExpire(String key) {
- return redisTemplate.getExpire(key, TimeUnit.SECONDS);
- }
-
-
- /**
- * 判断key是否存在
- * @param key 键
- * @return true 存在 false不存在
- */
- public boolean hasKey(String key) {
- try {
- return redisTemplate.hasKey(key);
- } catch (Exception e) {
- e.printStackTrace();
- return false;
- }
- }
-
-
- /**
- * 删除缓存
- * @param key 可以传一个值 或多个
- */
- @SuppressWarnings("unchecked")
- public void del(String... key) {
- if (key != null && key.length > 0) {
- if (key.length == 1) {
- redisTemplate.delete(key[0]);
- } else {
- redisTemplate.delete(CollectionUtils.arrayToList(key));
- }
- }
- }
-
- /**
- * set nx,上锁
- * @param key 一般设为lock
- *@param value 一般使用uuid
- *@param time 缓存时间,单位为s
- */
- public boolean setNx(String key, String value, int time){
- return Boolean.TRUE.equals(redisTemplate.opsForValue().setIfAbsent(key, value, time, TimeUnit.SECONDS));
- }
- //未指定过期时间
- public boolean setNx(String key, String value){
- return Boolean.TRUE.equals(redisTemplate.opsForValue().setIfAbsent(key, value));
- }
-
- // ============================String=============================
-
- /**
- * 普通缓存获取
- * @param key 键
- * @return 值
- */
- public Object get(String key) {
- return key == null ? null : redisTemplate.opsForValue().get(key);
- }
-
- /**
- * 普通缓存放入
- * @param key 键
- * @param value 值
- * @return true成功 false失败
- */
-
- public boolean set(String key, Object value) {
- try {
- redisTemplate.opsForValue().set(key, value);
- return true;
- } catch (Exception e) {
- e.printStackTrace();
- return false;
- }
- }
-
-
- /**
- * 普通缓存放入并设置时间
- * @param key 键
- * @param value 值
- * @param time 时间(秒) time要大于0 如果time小于等于0 将设置无限期
- * @return true成功 false 失败
- */
-
- public boolean set(String key, Object value, long time) {
- try {
- if (time > 0) {
- redisTemplate.opsForValue().set(key, value, time, TimeUnit.SECONDS);
- } else {
- set(key, value);
- }
- return true;
- } catch (Exception e) {
- e.printStackTrace();
- return false;
- }
- }
-
-
- /**
- * 递增
- * @param key 键
- * @param delta 要增加几(大于0)
- */
- public long incr(String key, long delta) {
- if (delta < 0) {
- throw new RuntimeException("递增因子必须大于0");
- }
- return redisTemplate.opsForValue().increment(key, delta);
- }
-
-
- /**
- * 递减
- * @param key 键
- * @param delta 要减少几(小于0)
- */
- public long decr(String key, long delta) {
- if (delta < 0) {
- throw new RuntimeException("递减因子必须大于0");
- }
- return redisTemplate.opsForValue().increment(key, -delta);
- }
-
-
-
- // ============================set=============================
-
- /**
- * 根据key获取Set中的所有值
- * @param key 键
- */
- public Set<Object> sGet(String key) {
- try {
- return redisTemplate.opsForSet().members(key);
- } catch (Exception e) {
- e.printStackTrace();
- return null;
- }
- }
-
-
- /**
- * 根据value从一个set中查询,是否存在
- *
- * @param key 键
- * @param value 值
- * @return true 存在 false不存在
- */
- public boolean sHasKey(String key, Object value) {
- try {
- return redisTemplate.opsForSet().isMember(key, value);
- } catch (Exception e) {
- e.printStackTrace();
- return false;
- }
- }
-
-
- /**
- * 将数据放入set缓存
- *
- * @param key 键
- * @param values 值 可以是多个
- * @return 成功个数
- */
- public long sSet(String key, Object... values) {
- try {
- return redisTemplate.opsForSet().add(key, values);
- } catch (Exception e) {
- e.printStackTrace();
- return 0;
- }
- }
-
-
- /**
- * 将set数据放入缓存
- *
- * @param key 键
- * @param time 时间(秒)
- * @param values 值 可以是多个
- * @return 成功个数
- */
- public long sSetAndTime(String key, long time, Object... values) {
- try {
- Long count = redisTemplate.opsForSet().add(key, values);
- if (time > 0)
- expire(key, time);
- return count;
- } catch (Exception e) {
- e.printStackTrace();
- return 0;
- }
- }
-
-
- /**
- * 获取set缓存的长度
- *
- * @param key 键
- */
- public long sGetSetSize(String key) {
- try {
- return redisTemplate.opsForSet().size(key);
- } catch (Exception e) {
- e.printStackTrace();
- return 0;
- }
- }
-
-
- /**
- * 移除值为value的
- *
- * @param key 键
- * @param values 值 可以是多个
- * @return 移除的个数
- */
-
- public long setRemove(String key, Object... values) {
- try {
- Long count = redisTemplate.opsForSet().remove(key, values);
- return count;
- } catch (Exception e) {
- e.printStackTrace();
- return 0;
- }
- }
-
-
- }
因本次实现分布式锁主要只使用String数据类型,固只实现了String数据类型的代码。
3.使用setnx命令实现分布式锁
在Redis中我们通常可以使用redis命令(setnx)实现分布式锁。
setnx key value 命令可以给key上锁,而解锁一般可以通过两种方法:
- 通过命令 del key 删除key
- 通过 set key value nx ex time 设置key的过期时间
对应RedisUtil工具类中的以下代码:
- /**
- * set nx,上锁
- * @param key 一般设为lock
- *@param value 一般使用uuid
- *@param time 缓存时间,单位为s
- */
- public boolean setNx(String key, String value, int time){
- return Boolean.TRUE.equals(redisTemplate.opsForValue().setIfAbsent(key, value, time, TimeUnit.SECONDS));
- }
- //未指定过期时间
- public boolean setNx(String key, String value){
- return Boolean.TRUE.equals(redisTemplate.opsForValue().setIfAbsent(key, value));
- }
在controller中编写模拟代码,代码逻辑如下:
- 设定锁lock,设置成功则对数据库、redis缓存等相关数据进行操作(下述代码中对redis中缓存的key:num进行+1操作)。锁期间,其他client无法对其进行操作。操作完成后,删除锁,其他客户端即可进行操作。
- 锁失败,对其0.1秒进行重试,重新进行上锁操作。
- package com.seven.redis.controller;
-
- import com.seven.redis.utils.RedisUtil;
- import org.springframework.util.StringUtils;
- import org.springframework.web.bind.annotation.GetMapping;
- import org.springframework.web.bind.annotation.RestController;
-
- import javax.annotation.Resource;
- import java.util.UUID;
-
- @RestController
- public class RedisController {
-
- @Resource
- private RedisUtil redisUtil;
-
- @GetMapping("/test")
- public String test(){
- //配置锁,设置随机uuid进行验证防止误删
- String uuid = UUID.randomUUID().toString();
- //设置过期时间为10s
- boolean lock = redisUtil.setNx("lock",uuid,10);
- if(lock){
- //若已经上锁
- Object value =redisUtil.get("num");
- //2.1判断num为空return
- if(StringUtils.isEmpty(value)){
- return "key is null";
- }
- //2.2有值就转成成int
- int num = Integer.parseInt(value+"");
- //2.3把redis的num加1
- redisUtil.set("num", ++num);
- //2.4释放锁,del,保证锁必须被释放-->当业务执行时间小与过期时间时需要释放锁
- if(uuid.equals((String)redisUtil.get("lock"))){
- redisUtil.del("lock");
- return "success";
- }else {
- return "fail";
- }
- }else {
- //上锁失败
- try {
- Thread.sleep(100);
- test();
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- }
- return "done";
- }
- }
上述代码中,为防止误删(即客户端a在进行操作,服务器发生卡顿,达到了key设定的过期时间,解开了锁,客户端b开始进行操作;然后在b进行操作期间,a卡顿结束,继续删锁操作,会导致误删了b的锁),设置了uuid值进行验证:
- if(uuid.equals((String)redisUtil.get("lock"))){
- redisUtil.del("lock");
- return "success";
- }
uuid一致,才可删除锁,否则,无法删除。
注意:此处删除操作缺乏原子性,可以通过lua脚本加强分布式锁的安全性。可参考以下代码,此处不进行详细叙述:
- /*使用lua脚本解锁*/
- // 定义lua 脚本
- String script = "if redis.call('get', KEYS[1]) == ARGV[1] then return redis.call('del', KEYS[1]) else return 0 end";
- // 使用redis执行lua执行
- DefaultRedisScript<Long> redisScript = new DefaultRedisScript<>();
- redisScript.setScriptText(script);
- // 设置一下返回值类型 为Long
- // 因为删除判断的时候,返回的0,给其封装为数据类型。如果不封装那么默认返回String 类型,
- // 那么返回字符串与0 会有发生错误。
- redisScript.setResultType(Long.class);
- // 第一个要是script 脚本 ,第二个需要判断的key,第三个就是key所对应的值。
- redisTemplate.execute(redisScript, Arrays.asList(locKey), uuid);
4.使用Redission实现分布式锁
实现Redis的分布式锁,除了自己基于redis client原生api来实现之外,还可以使用开源框架:Redission。
使用redission只需要通过他的api中的lock和unlock即可完成分布式锁,对比于setnx,他的优势在于:
- redisson所有指令都通过lua脚本执行,redis支持lua脚本原子性执行
- redisson设置一个key的默认过期时间为30s,redisson中有一个watchdog看门狗的概念,它会在你获取锁之后,每隔30s/3 的时间就会执行一次定时任务,帮你把key的超时时间设为30s进行续期,知道任务执行完毕
下面对springboot使用Redission进行一次演示:
导入相关依赖:
- <!--redission相关依赖-->
- <dependency>
- <groupId>org.redisson</groupId>
- <artifactId>redisson</artifactId>
- <version>3.16.0</version>
- </dependency>
编写Redission设置类:
- package com.seven.redis.config;
-
- import org.redisson.Redisson;
- import org.redisson.api.RedissonClient;
- import org.redisson.config.Config;
- import org.springframework.beans.factory.annotation.Value;
- import org.springframework.context.annotation.Bean;
- import org.springframework.context.annotation.Configuration;
-
- @Configuration
- public class RedissonConfig {
-
- @Value("${spring.redis.host}")
- private String host;
-
- @Value("${spring.redis.port}")
- private String port;
-
- @Bean
- public RedissonClient getRedisson(){
-
- Config config = new Config();
- //单机模式 依次设置redis地址和密码
- config.useSingleServer().
- setAddress("redis://" + host + ":" + port);
- return Redisson.create(config);
- }
- }
Redission还支持多种连接模式,以下仅作参考:
- //主从
- Config config = new Config();
- config.useMasterSlaveServers()
- .setMasterAddress("127.0.0.1:6379")
- .addSlaveAddress("127.0.0.1:6389", "127.0.0.1:6332", "127.0.0.1:6419")
- .addSlaveAddress("127.0.0.1:6399");
- RedissonClient redisson = Redisson.create(config);
-
-
- //哨兵
- Config config = new Config();
- config.useSentinelServers()
- .setMasterName("mymaster")
- .addSentinelAddress("127.0.0.1:26389", "127.0.0.1:26379")
- .addSentinelAddress("127.0.0.1:26319");
- RedissonClient redisson = Redisson.create(config);
-
-
- //集群
- Config config = new Config();
- config.useClusterServers()
- .setScanInterval(2000) // cluster state scan interval in milliseconds
- .addNodeAddress("127.0.0.1:7000", "127.0.0.1:7001")
- .addNodeAddress("127.0.0.1:7002");
- RedissonClient redisson = Redisson.create(config);
然后我们就可以通过导入Redission使用其分布式锁:
- @Resource
- private RedissonClient redisson;
下面在controller中进行一次库存扣减使用分布式锁的演示:
- @PostMapping("/lock/test")
- public void test() {
-
- String lockKey = UUID.randomUUID().toString();
- RLock lock = redisson.getLock(lockKey); //获取锁
- try {
- lock.lock(); //上锁
- log.info("锁已开启");
- synchronized (this){
- if(redisUtil.get("product")==null){
- log.error("商品不存在!");
- }else{
- //获取当前库存
- int stock = Integer.parseInt(redisUtil.get("product").toString());
- if (stock > 0){
- int realStock = stock - 1;
- //更新库存
- redisUtil.set("product", realStock + "");
- log.info("库存当前为:" + realStock);
- }else {
- log.warn("扣减失败,库存不足!");
- }
- }
- }
- }catch (Exception e){
- log.warn("系统错误,稍后重试");
- }
- finally {
- lock.unlock(); //删除锁
- log.info("锁已关闭");
- }
- }
此处还使用了 synchronized 对线程加锁,若只是启用redission的分布式锁,可不使用。
其运行过程和java多线程下的锁类似,其运行逻辑如下:
注意:锁的范围不易过大,在业务过程中应避免死锁的发生。
5.redission分布式锁的类型
此处注意的是redission分布式锁分为很多种,上文使用的是抢占式的分布式锁。即当锁释放后,其他请求会再次对锁进行抢占,而不是根据请求先后顺序进行。
如果需要公平的分配锁,即按照请求的先后顺序分配锁,可以使用公平锁:
RLock lock = redisson.getFairLock("myLock");
锁的使用方式和抢占式锁相同。
根据业务的需要,还可以使用读写锁:
- //读写锁
- RReadWriteLock lock = redisson.getReadWriteLock("myLock");
-
-
- //写锁
- lock.writeLock();
-
- //读锁
- lock.readLock();
注意,lock.readLock() 和 lock.writeLock() 两个锁用于两个不同的方法中,对应于lock.lock()方法。
读写锁可以在写方法未完成时,保证读方法无法进行;或是两个写方法进行时,保存先后顺序,保证数据的一致性。
只有当两个读方法时,才会不发生冲突。
更多的锁的使用,可以参考redission官网,进行选择: