java中的几种锁:synchronized,ReentrantLock,ReentrantReadWriteLock已基本可以满足编程需求,但其粒度都太大,同一时刻只有一个线程能进入同步块,这对于某些高并发的场景并不适用。本文实现了一个基于KEY(主键)的互斥锁,具有更细的粒度,在缓存或其他基于KEY的场景中有很大的用处。下面将讲解这个锁的设计和实现
设想这么一个场景:转账
privateint[] accounts;// 账户数组,其索引为账户ID,内容为金额
publicbooleantransfer(intfrom,intto,intmoney) {
if(accounts[from]
returnfalse;
accounts[from] -= money;
accounts[to] += money;
returntrue;
}
从from中转出金额到to中。可能同时会有很多个线程同时调用这个转账方法,为保证原子性,保证金额不会出错,必须为这个方法加个锁,防止对共享变量accounts的并发修改。
加锁后的代码如下:
privateint[] accounts;// 账户数组,其索引为账户ID,内容为金额
privateLock lock =newReentrantLock();
publicbooleantransfer(intfrom,intto,intmoney) {
lock.lock();
try{
if(accounts[from]
returnfalse;
accounts[from] -= money;
accounts[to] += money;
returntrue;
} finally{
lock.unlock();
}
}
好了,加锁后这个代码就能保证金额不出错了。但问题又出现了,一次只能执行一个转账过程!意思就是A给B转账的时候,C要给D转账也得等A给B转完了才能开始转。这就有点扯蛋了,就像只有一个柜台,所有人必须排队等前面的处理完了才能到自己,效率太低。
解决这种情况有一个方案:A给B转账的时候只锁定A和B的账户,使其转账期间不能再有其他针对A和B账户的操作,但其他账户的操作可以并行发生。类似于如下场景:
publicbooleantransfer(intfrom,intto,intmoney) {
lock.lock(from, to);
try{
if(accounts[from]
returnfalse;
accounts[from] -= money;
accounts[to] += money;
returntrue;
} finally{
lock.unlock(from, to);
}
}
但很显然,JAVA并没有为我们提供这样的锁(也有可能是我没找到。。。)
于是,就在这样的需求下我花了整一天来实现了这个锁——KeyLock(代码量很短,但多线程的东西真的很让人头疼)
不同于synchronized等锁,KeyLock是对所需处理的数据的KEY(主键)进行加锁,只要是对不同key操作,其就可以并行处理,大大提高了线程的并行度(最后有几个锁的对比测试)
总结下就是:对相同KEY操作的线程互斥,对不同KEY操作的线程可以并行
KeyLock有如下几个特性:
1、细粒度,高并行性
2、可重入
3、公平锁
4、加锁开销比ReentrantLock大,适用于处理耗时长、key范围大的场景
KeyLock代码如下(注释很少,因为我也不知道该怎么写清楚,能看懂就看,懒得看的直接用就行):
publicclassKeyLock {
// 保存所有锁定的KEY及其信号量
privatefinalConcurrentMap map =newConcurrentHashMap();
// 保存每个线程锁定的KEY及其锁定计数
privatefinalThreadLocal> local =newThreadLocal>() {
@Override
protectedMap initialValue() {
returnnewHashMap();
}
};
/**
* 锁定key,其他等待此key的线程将进入等待,直到调用{@link #unlock(K)}
* 使用hashcode和equals来判断key是否相同,因此key必须实现{@link #hashCode()}和
* {@link #equals(Object)}方法
*
* @param key
*/
publicvoidlock(K key) {
if(key ==null)
return;
LockInfo info = local.get().get(key);
if(info ==null) {
Semaphore current = newSemaphore(1);
current.acquireUninterruptibly();
Semaphore previous = map.put(key, current);
if(previous !=null)
previous.acquireUninterruptibly();
local.get().put(key, newLockInfo(current));
} else{
info.lockCount++;
}
}
/**
* 释放key,唤醒其他等待此key的线程
* @param key
*/
publicvoidunlock(K key) {
if(key ==null)
return;
LockInfo info = local.get().get(key);
if(info !=null&& --info.lockCount ==0) {
info.current.release();
map.remove(key, info.current);
local.get().remove(key);
}
}
/**
* 锁定多个key
* 建议在调用此方法前先对keys进行排序,使用相同的锁定顺序,防止死锁发生
* @param keys
*/
publicvoidlock(K[] keys) {
if(keys ==null)
return;
for(K key : keys) {
lock(key);
}
}
/**
* 释放多个key
* @param keys
*/
publicvoidunlock(K[] keys) {
if(keys ==null)
return;
for(K key : keys) {
unlock(key);
}
}
privatestaticclassLockInfo {
privatefinalSemaphore current;
privateintlockCount;
privateLockInfo(Semaphore current) {
this.current = current;
this.lockCount =1;
}
}
}
KeyLock使用示例:
privateint[] accounts;
privateKeyLock lock =newKeyLock();
publicbooleantransfer(intfrom,intto,intmoney) {
Integer[] keys = newInteger[] {from, to};
Arrays.sort(keys); //对多个key进行排序,保证锁定顺序防止死锁
lock.lock(keys);
try{
//处理不同的from和to的线程都可进入此同步块
if(accounts[from]
returnfalse;
accounts[from] -= money;
accounts[to] += money;
returntrue;
} finally{
lock.unlock(keys);
}
}
好,工具有了,接下来就是测试了,为了测出并行度,我把转账过程延长了,加了个sleep(2),使每个转账过程至少要花2毫秒(这只是个demo,真实环境下对数据库操作也很费时)。
测试代码如下:
//场景:多线程并发转账
publicclassTest {
privatefinalint[] account;// 账户数组,其索引为账户ID,内容为金额
publicTest(intcount,intmoney) {
account = newint[count];
Arrays.fill(account, money);
}
booleantransfer(intfrom,intto,intmoney) {
if(account[from]
returnfalse;
account[from] -= money;
try{
Thread.sleep(2);
} catch(Exception e) {
}
account[to] += money;
returntrue;
}
intgetAmount() {
intresult =0;
for(intm : account)
result += m;
returnresult;
}
publicstaticvoidmain(String[] args)throwsException {
intcount =100;//账户个数
intmoney =10000;//账户初始金额
intthreadNum =8;//转账线程数
intnumber =10000;//转账次数
intmaxMoney =1000;//随机转账最大金额
Test test = newTest(count, money);
//不加锁
// Runner runner = test.new NonLockRunner(maxMoney, number);
//加synchronized锁
// Runner runner = test.new SynchronizedRunner(maxMoney, number);
//加ReentrantLock锁
// Runner runner = test.new ReentrantLockRunner(maxMoney, number);
//加KeyLock锁
Runner runner = test.newKeyLockRunner(maxMoney, number);
Thread[] threads = newThread[threadNum];
for(inti =0; i
threads[i] = newThread(runner,"thread-"+ i);
longbegin = System.currentTimeMillis();
for(Thread t : threads)
t.start();
for(Thread t : threads)
t.join();
longtime = System.currentTimeMillis() - begin;
System.out.println("类型:"+ runner.getClass().getSimpleName());
System.out.printf("耗时:%dms\n", time);
System.out.printf("初始总金额:%d\n", count * money);
System.out.printf("终止总金额:%d\n", test.getAmount());
}
// 转账任务
abstractclassRunnerimplementsRunnable {
finalintmaxMoney;
finalintnumber;
privatefinalRandom random =newRandom();
privatefinalAtomicInteger count =newAtomicInteger();
Runner(intmaxMoney,intnumber) {
this.maxMoney = maxMoney;
this.number = number;
}
@Override
publicvoidrun() {
while(count.getAndIncrement()
intfrom = random.nextInt(account.length);
intto;
while((to = random.nextInt(account.length)) == from)
;
intmoney = random.nextInt(maxMoney);
doTransfer(from, to, money);
}
}
abstractvoiddoTransfer(intfrom,intto,intmoney);
}
// 不加锁的转账
classNonLockRunnerextendsRunner {
NonLockRunner(intmaxMoney,intnumber) {
super(maxMoney, number);
}
@Override
voiddoTransfer(intfrom,intto,intmoney) {
transfer(from, to, money);
}
}
// synchronized的转账
classSynchronizedRunnerextendsRunner {
SynchronizedRunner(intmaxMoney,intnumber) {
super(maxMoney, number);
}
@Override
synchronizedvoiddoTransfer(intfrom,intto,intmoney) {
transfer(from, to, money);
}
}
// ReentrantLock的转账
classReentrantLockRunnerextendsRunner {
privatefinalReentrantLock lock =newReentrantLock();
ReentrantLockRunner(intmaxMoney,intnumber) {
super(maxMoney, number);
}
@Override
voiddoTransfer(intfrom,intto,intmoney) {
lock.lock();
try{
transfer(from, to, money);
} finally{
lock.unlock();
}
}
}
// KeyLock的转账
classKeyLockRunnerextendsRunner {
privatefinalKeyLock lock =newKeyLock();
KeyLockRunner(intmaxMoney,intnumber) {
super(maxMoney, number);
}
@Override
voiddoTransfer(intfrom,intto,intmoney) {
Integer[] keys = newInteger[] {from, to};
Arrays.sort(keys);
lock.lock(keys);
try{
transfer(from, to, money);
} finally{
lock.unlock(keys);
}
}
}
}
最最重要的
测试结果
:
(8线程对100个账户随机转账总共10000次):
类型:NonLockRunner(不加锁)
耗时:2482ms
初始总金额:1000000
终止总金额:998906(无法保证原子性)
类型:SynchronizedRunner(加synchronized锁)
耗时:20872ms
初始总金额:1000000
终止总金额:1000000
类型:ReentrantLockRunner(加ReentrantLock锁)
耗时:21588ms
初始总金额:1000000
终止总金额:1000000
类型:KeyLockRunner(加KeyLock锁)
耗时:2831ms 初始总金额:1000000 终止总金额:1000000