linux互斥锁

java中的几种锁:synchronized,ReentrantLock,ReentrantReadWriteLock已基本可以满足编程需求,但其粒度都太大,同一时刻只有一个线程能进入同步块,这对于某些高并发的场景并不适用。本文实现了一个基于KEY(主键)的互斥锁,具有更细的粒度,

java中的几种锁:synchronized,ReentrantLock,ReentrantReadWriteLock已基本可以满足编程需求,但其粒度都太大,同一时刻只有一个线程能进入同步块,这对于某些高并发的场景并不适用。本文实现了一个基于KEY(主键)的互斥锁,具有更细的粒度,在缓存或其他基于KEY的场景中有很大的用处。下面将讲解这个锁的设计和实现

设想这么一个场景:转账

privateint[] accounts;// 账户数组,其索引为账户ID,内容为金额

publicbooleantransfer(intfrom,intto,intmoney) {

if(accounts[from] 

returnfalse;

accounts[from] -= money;

accounts[to] += money;

returntrue;

}

从from中转出金额到to中。可能同时会有很多个线程同时调用这个转账方法,为保证原子性,保证金额不会出错,必须为这个方法加个锁,防止对共享变量accounts的并发修改。

加锁后的代码如下:

privateint[] accounts;// 账户数组,其索引为账户ID,内容为金额

privateLock lock =newReentrantLock();

publicbooleantransfer(intfrom,intto,intmoney) {

lock.lock();

try{

if(accounts[from] 

returnfalse;

accounts[from] -= money;

accounts[to] += money;

returntrue;

} finally{

lock.unlock();

}

}

好了,加锁后这个代码就能保证金额不出错了。但问题又出现了,一次只能执行一个转账过程!意思就是A给B转账的时候,C要给D转账也得等A给B转完了才能开始转。这就有点扯蛋了,就像只有一个柜台,所有人必须排队等前面的处理完了才能到自己,效率太低。

解决这种情况有一个方案:A给B转账的时候只锁定A和B的账户,使其转账期间不能再有其他针对A和B账户的操作,但其他账户的操作可以并行发生。类似于如下场景:

publicbooleantransfer(intfrom,intto,intmoney) {

lock.lock(from, to);

try{

if(accounts[from] 

returnfalse;

accounts[from] -= money;

accounts[to] += money;

returntrue;

} finally{

lock.unlock(from, to);

}

}

但很显然,JAVA并没有为我们提供这样的锁(也有可能是我没找到。。。)

于是,就在这样的需求下我花了整一天来实现了这个锁——KeyLock(代码量很短,但多线程的东西真的很让人头疼)

不同于synchronized等锁,KeyLock是对所需处理的数据的KEY(主键)进行加锁,只要是对不同key操作,其就可以并行处理,大大提高了线程的并行度(最后有几个锁的对比测试)

总结下就是:对相同KEY操作的线程互斥,对不同KEY操作的线程可以并行

KeyLock有如下几个特性:

1、细粒度,高并行性

2、可重入

3、公平锁

4、加锁开销比ReentrantLock大,适用于处理耗时长、key范围大的场景

KeyLock代码如下(注释很少,因为我也不知道该怎么写清楚,能看懂就看,懒得看的直接用就行):

publicclassKeyLock {

// 保存所有锁定的KEY及其信号量

privatefinalConcurrentMap map =newConcurrentHashMap();

// 保存每个线程锁定的KEY及其锁定计数

privatefinalThreadLocal> local =newThreadLocal>() {

@Override

protectedMap initialValue() {

returnnewHashMap();

}

};

/**

* 锁定key,其他等待此key的线程将进入等待,直到调用{@link #unlock(K)}

* 使用hashcode和equals来判断key是否相同,因此key必须实现{@link #hashCode()}和

* {@link #equals(Object)}方法

*

* @param key

*/

publicvoidlock(K key) {

if(key ==null)

return;

LockInfo info = local.get().get(key);

if(info ==null) {

Semaphore current = newSemaphore(1);

current.acquireUninterruptibly();

Semaphore previous = map.put(key, current);

if(previous !=null)

previous.acquireUninterruptibly();

local.get().put(key, newLockInfo(current));

} else{

info.lockCount++;

}

}

/**

* 释放key,唤醒其他等待此key的线程

* @param key

*/

publicvoidunlock(K key) {

if(key ==null)

return;

LockInfo info = local.get().get(key);

if(info !=null&& --info.lockCount ==0) {

info.current.release();

map.remove(key, info.current);

local.get().remove(key);

}

}

/**

* 锁定多个key

* 建议在调用此方法前先对keys进行排序,使用相同的锁定顺序,防止死锁发生

* @param keys

*/

publicvoidlock(K[] keys) {

if(keys ==null)

return;

for(K key : keys) {

lock(key);

}

}

/**

* 释放多个key

* @param keys

*/

publicvoidunlock(K[] keys) {

if(keys ==null)

return;

for(K key : keys) {

unlock(key);

}

}

privatestaticclassLockInfo {

privatefinalSemaphore current;

privateintlockCount;

privateLockInfo(Semaphore current) {

this.current = current;

this.lockCount =1;

}

}

}

KeyLock使用示例:

privateint[] accounts;

privateKeyLock lock =newKeyLock();

publicbooleantransfer(intfrom,intto,intmoney) {

Integer[] keys = newInteger[] {from, to};

Arrays.sort(keys); //对多个key进行排序,保证锁定顺序防止死锁

lock.lock(keys);

try{

//处理不同的from和to的线程都可进入此同步块

if(accounts[from] 

returnfalse;

accounts[from] -= money;

accounts[to] += money;

returntrue;

} finally{

lock.unlock(keys);

}

}

好,工具有了,接下来就是测试了,为了测出并行度,我把转账过程延长了,加了个sleep(2),使每个转账过程至少要花2毫秒(这只是个demo,真实环境下对数据库操作也很费时)。

测试代码如下:

//场景:多线程并发转账

publicclassTest {

privatefinalint[] account;// 账户数组,其索引为账户ID,内容为金额

publicTest(intcount,intmoney) {

account = newint[count];

Arrays.fill(account, money);

}

booleantransfer(intfrom,intto,intmoney) {

if(account[from] 

returnfalse;

account[from] -= money;

try{

Thread.sleep(2);

} catch(Exception e) {

}

account[to] += money;

returntrue;

}

intgetAmount() {

intresult =0;

for(intm : account)

result += m;

returnresult;

}

publicstaticvoidmain(String[] args)throwsException {

intcount =100;//账户个数

intmoney =10000;//账户初始金额

intthreadNum =8;//转账线程数

intnumber =10000;//转账次数

intmaxMoney =1000;//随机转账最大金额

Test test = newTest(count, money);

//不加锁

//      Runner runner = test.new NonLockRunner(maxMoney, number);

//加synchronized锁

//      Runner runner = test.new SynchronizedRunner(maxMoney, number);

//加ReentrantLock锁

//      Runner runner = test.new ReentrantLockRunner(maxMoney, number);

//加KeyLock锁

Runner runner = test.newKeyLockRunner(maxMoney, number);

Thread[] threads = newThread[threadNum];

for(inti =0; i 

threads[i] = newThread(runner,"thread-"+ i);

longbegin = System.currentTimeMillis();

for(Thread t : threads)

t.start();

for(Thread t : threads)

t.join();

longtime = System.currentTimeMillis() - begin;

System.out.println("类型:"+ runner.getClass().getSimpleName());

System.out.printf("耗时:%dms\n", time);

System.out.printf("初始总金额:%d\n", count * money);

System.out.printf("终止总金额:%d\n", test.getAmount());

}

// 转账任务

abstractclassRunnerimplementsRunnable {

finalintmaxMoney;

finalintnumber;

privatefinalRandom random =newRandom();

privatefinalAtomicInteger count =newAtomicInteger();

Runner(intmaxMoney,intnumber) {

this.maxMoney = maxMoney;

this.number = number;

}

@Override

publicvoidrun() {

while(count.getAndIncrement() 

intfrom = random.nextInt(account.length);

intto;

while((to = random.nextInt(account.length)) == from)

;

intmoney = random.nextInt(maxMoney);

doTransfer(from, to, money);

}

}

abstractvoiddoTransfer(intfrom,intto,intmoney);

}

// 不加锁的转账

classNonLockRunnerextendsRunner {

NonLockRunner(intmaxMoney,intnumber) {

super(maxMoney, number);

}

@Override

voiddoTransfer(intfrom,intto,intmoney) {

transfer(from, to, money);

}

}

// synchronized的转账

classSynchronizedRunnerextendsRunner {

SynchronizedRunner(intmaxMoney,intnumber) {

super(maxMoney, number);

}

@Override

synchronizedvoiddoTransfer(intfrom,intto,intmoney) {

transfer(from, to, money);

}

}

// ReentrantLock的转账

classReentrantLockRunnerextendsRunner {

privatefinalReentrantLock lock =newReentrantLock();

ReentrantLockRunner(intmaxMoney,intnumber) {

super(maxMoney, number);

}

@Override

voiddoTransfer(intfrom,intto,intmoney) {

lock.lock();

try{

transfer(from, to, money);

} finally{

lock.unlock();

}

}

}

// KeyLock的转账

classKeyLockRunnerextendsRunner {

privatefinalKeyLock lock =newKeyLock();

KeyLockRunner(intmaxMoney,intnumber) {

super(maxMoney, number);

}

@Override

voiddoTransfer(intfrom,intto,intmoney) {

Integer[] keys = newInteger[] {from, to};

Arrays.sort(keys);

lock.lock(keys);

try{

transfer(from, to, money);

} finally{

lock.unlock(keys);

}

}

}

}

最最重要的

测试结果

(8线程对100个账户随机转账总共10000次):

类型:NonLockRunner(不加锁)

耗时:2482ms

初始总金额:1000000

终止总金额:998906(无法保证原子性)

类型:SynchronizedRunner(加synchronized锁)

耗时:20872ms

初始总金额:1000000

终止总金额:1000000

类型:ReentrantLockRunner(加ReentrantLock锁)

耗时:21588ms

初始总金额:1000000

终止总金额:1000000

类型:KeyLockRunner(加KeyLock锁)

耗时:2831ms        初始总金额:1000000        终止总金额:1000000

知秋君
上一篇 2024-08-24 09:02
下一篇 2024-08-24 08:36

相关推荐