如何用Redisson实现一个读写锁?

wen java案例 63

本文目录导读:

如何用Redisson实现一个读写锁?

  1. 基础实现
  2. 高级功能实现
  3. 实际应用场景
  4. 完整的测试示例
  5. 重要特性总结

我来详细介绍如何使用Redisson实现读写锁(ReadWriteLock)。

基础实现

Maven依赖

<dependency>
    <groupId>org.redisson</groupId>
    <artifactId>redisson</artifactId>
    <version>3.20.0</version>
</dependency>

基本使用示例

import org.redisson.Redisson;
import org.redisson.api.RReadWriteLock;
import org.redisson.api.RLock;
import org.redisson.api.RedissonClient;
import org.redisson.config.Config;
public class ReadWriteLockExample {
    private RedissonClient redissonClient;
    private RReadWriteLock readWriteLock;
    public ReadWriteLockExample() {
        // 1. 创建配置
        Config config = new Config();
        config.useSingleServer()
              .setAddress("redis://127.0.0.1:6379")
              .setPassword("password");
        // 2. 创建Redisson客户端
        redissonClient = Redisson.create(config);
        // 3. 创建读写锁
        readWriteLock = redissonClient.getReadWriteLock("myReadWriteLock");
    }
    // 读操作 - 共享锁
    public void readOperation() {
        RLock readLock = readWriteLock.readLock();
        try {
            readLock.lock();
            System.out.println("获取读锁成功,线程:" + Thread.currentThread().getName());
            // 执行读操作
            Thread.sleep(1000);
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        } finally {
            readLock.unlock();
            System.out.println("释放读锁");
        }
    }
    // 写操作 - 排他锁
    public void writeOperation() {
        RLock writeLock = readWriteLock.writeLock();
        try {
            writeLock.lock();
            System.out.println("获取写锁成功,线程:" + Thread.currentThread().getName());
            // 执行写操作
            Thread.sleep(1000);
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        } finally {
            writeLock.unlock();
            System.out.println("释放写锁");
        }
    }
}

高级功能实现

带超时的读写锁

import java.util.concurrent.TimeUnit;
public class ReadWriteLockWithTimeout {
    private RedissonClient redissonClient;
    private RReadWriteLock readWriteLock;
    public ReadWriteLockWithTimeout() {
        Config config = new Config();
        config.useSingleServer().setAddress("redis://127.0.0.1:6379");
        redissonClient = Redisson.create(config);
        readWriteLock = redissonClient.getReadWriteLock("timeoutLock");
    }
    // 带超时的读锁
    public boolean tryReadLock(long waitTime, long leaseTime) {
        RLock readLock = readWriteLock.readLock();
        try {
            // waitTime: 等待获取锁的最大时间
            // leaseTime: 锁持有时间
            return readLock.tryLock(waitTime, leaseTime, TimeUnit.MILLISECONDS);
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            return false;
        }
    }
    // 带超时的写锁
    public boolean tryWriteLock(long waitTime, long leaseTime) {
        RLock writeLock = readWriteLock.writeLock();
        try {
            return writeLock.tryLock(waitTime, leaseTime, TimeUnit.MILLISECONDS);
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            return false;
        }
    }
    // 自动续期(看门狗机制)
    public void autoRenewLock() {
        RLock writeLock = readWriteLock.writeLock();
        writeLock.lock(30, TimeUnit.SECONDS); // 设置30秒后自动释放
        try {
            // 业务逻辑
            Thread.sleep(35000); // 超过30秒,看门狗会自动续期
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        } finally {
            writeLock.unlock();
        }
    }
}

实际应用场景

缓存读写场景

import java.util.concurrent.ConcurrentHashMap;
public class CacheReadWriteService {
    private final RedissonClient redissonClient;
    private final RReadWriteLock cacheLock;
    private final ConcurrentHashMap<String, Object> localCache = new ConcurrentHashMap<>();
    public CacheReadWriteService() {
        Config config = new Config();
        config.useSingleServer().setAddress("redis://127.0.0.1:6379");
        redissonClient = Redisson.create(config);
        cacheLock = redissonClient.getReadWriteLock("cacheLock");
    }
    // 从缓存读取数据
    public Object readFromCache(String key) {
        RLock readLock = cacheLock.readLock();
        readLock.lock(10, TimeUnit.SECONDS);
        try {
            // 先从本地缓存读取
            Object value = localCache.get(key);
            if (value != null) {
                return value;
            }
            // 再从Redis读取
            RBucket<Object> bucket = redissonClient.getBucket(key);
            value = bucket.get();
            // 放入本地缓存
            if (value != null) {
                localCache.put(key, value);
            }
            return value;
        } finally {
            readLock.unlock();
        }
    }
    // 更新缓存
    public void updateCache(String key, Object newValue) {
        RLock writeLock = cacheLock.writeLock();
        writeLock.lock(10, TimeUnit.SECONDS);
        try {
            // 更新Redis
            RBucket<Object> bucket = redissonClient.getBucket(key);
            bucket.set(newValue);
            // 更新本地缓存
            localCache.put(key, newValue);
            // 发布缓存更新事件
            RTopic topic = redissonClient.getTopic("cacheUpdateTopic");
            topic.publish(key);
        } finally {
            writeLock.unlock();
        }
    }
}

库存管理场景

public class InventoryManager {
    private final RedissonClient redissonClient;
    private final RReadWriteLock inventoryLock;
    public InventoryManager() {
        Config config = new Config();
        config.useSingleServer().setAddress("redis://127.0.0.1:6379");
        redissonClient = Redisson.create(config);
        inventoryLock = redissonClient.getReadWriteLock("inventoryLock");
    }
    // 查询库存(读操作,可以并发)
    public int getInventory(String productId) {
        RLock readLock = inventoryLock.readLock();
        readLock.lock();
        try {
            RBucket<Integer> inventoryBucket = redissonClient.getBucket("inventory_" + productId);
            return inventoryBucket.get() != null ? inventoryBucket.get() : 0;
        } finally {
            readLock.unlock();
        }
    }
    // 扣减库存(写操作,需要独享)
    public boolean deductInventory(String productId, int quantity) {
        RLock writeLock = inventoryLock.writeLock();
        writeLock.lock();
        try {
            RBucket<Integer> inventoryBucket = redissonClient.getBucket("inventory_" + productId);
            Integer currentInventory = inventoryBucket.get();
            if (currentInventory == null || currentInventory < quantity) {
                return false;
            }
            inventoryBucket.set(currentInventory - quantity);
            // 记录库存变更日志
            RList<Object> logList = redissonClient.getList("inventory_log_" + productId);
            logList.add("Deduct: " + quantity + ", remaining: " + (currentInventory - quantity));
            return true;
        } finally {
            writeLock.unlock();
        }
    }
}

完整的测试示例

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicInteger;
public class ReadWriteLockTest {
    private static final int READ_THREADS = 10;
    private static final int WRITE_THREADS = 3;
    private static AtomicInteger readCount = new AtomicInteger(0);
    private static AtomicInteger writeCount = new AtomicInteger(0);
    public static void main(String[] args) throws InterruptedException {
        ReadWriteLockExample example = new ReadWriteLockExample();
        CountDownLatch latch = new CountDownLatch(READ_THREADS + WRITE_THREADS);
        ExecutorService executor = Executors.newFixedThreadPool(READ_THREADS + WRITE_THREADS);
        // 启动读线程
        for (int i = 0; i < READ_THREADS; i++) {
            executor.execute(() -> {
                try {
                    example.readOperation();
                    readCount.incrementAndGet();
                } finally {
                    latch.countDown();
                }
            });
        }
        // 启动写线程
        for (int i = 0; i < WRITE_THREADS; i++) {
            executor.execute(() -> {
                try {
                    example.writeOperation();
                    writeCount.incrementAndGet();
                } finally {
                    latch.countDown();
                }
            });
        }
        latch.await();
        executor.shutdown();
        System.out.println("读操作完成: " + readCount.get());
        System.out.println("写操作完成: " + writeCount.get());
    }
}

重要特性总结

特性说明

  1. 读锁(共享锁):多个线程可以同时获取读锁
  2. 写锁(排他锁):写锁是独占的,获取写锁时不能有其他锁
  3. 锁升级:Redisson支持从读锁升级到写锁
  4. 自动续期:默认30秒看门狗机制
  5. 可重入性:同一个线程可以多次获取同一个锁

最佳实践

// 1. 设置合理的超时时间
readWriteLock.readLock().lock(10, TimeUnit.SECONDS);
// 2. 在finally块中释放锁
finally {
    if (readWriteLock.readLock().isHeldByCurrentThread()) {
        readWriteLock.readLock().unlock();
    }
}
// 3. 使用try-finally确保释放
writeLock.lock();
try {
    // 业务逻辑
} finally {
    writeLock.unlock();
}

Redisson的读写锁实现提供了完整的分布式锁功能,适用于读多写少的场景,可以有效提高系统并发性能。

抱歉,评论功能暂时关闭!