我们先来看一下Semaphore 的定义;
Semaphore:一个计数信号量。从概念上讲,信号量维护了一个许可集。如有必要,在许可可用前会阻塞每一个 acquire(),然后再获取该许可。每个 release() 添加一个许可,从而可能释放一个正在阻塞的获取者。但是,不使用实际的许可对象,Semaphore 只对可用许可的号码进行计数,并采取相应的行动。
Semaphore 通常用于限制可以访问某些资源(物理或逻辑的)的线程数目。
我们简单的来讲,Semaphore里面存放着固定数量的许可证,假设为10,那么每个线程执行的时候,都需要先获得一个许可证,否则将进行阻塞,当10个许可证分别被10个线程所持有时,后面的线程将无法执行,全部在获取许可证这个操作时被阻塞,除非有线程执行完毕,归还许可证;
这就像极了数据库连接池,固定数量的连接被初始化好,当连接资源被线程消耗完毕时,其它线程暂时无法获得数据库连接,当其中有线程SQL操作完毕归还连接资源时,其它线程才能继续获得许可拿到线程资源;
我们来模拟一下吧!
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75
| class ConnectPool{ private int size; private Connect[] connects ;
private boolean [] connectFlag; private Semaphore semaphore;
public ConnectPool(int size) { this.size = size; semaphore = new Semaphore(size,true); connects = new Connect[size]; connectFlag = new boolean[size]; initConnects(); }
private void initConnects(){ for (int i = 0; i < this.size; i++) { connects[i] = new Connect(); } }
public Connect openConnect() throws InterruptedException{ semaphore.acquire(); return getConnect(); } private synchronized Connect getConnect(){ for (int i = 0; i < connectFlag.length; i++) { if(!connectFlag[i]){ connectFlag[i] = true; return connects[i]; } } return null; }
public synchronized void releaseConnect( Connect connect ){ for (int i = 0; i < this.size; i++) { if( connect==connects[i] ){ connectFlag[i] = false; semaphore.release(); } } } }
class Connect{ private static int count = 1; private int id = count++; public Connect() { try { TimeUnit.MILLISECONDS.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("连接#"+ id +"#已与数据库建立通道!"); } @Override public String toString() { return "#"+id+"#"; }
}
|
//Semaphore是信号量,使用方式具体查看API文档,这里只做一个示范,这个示范基本和Api上演示的差不多…
//什么是信号量呢?假设我们有数据库连接池,只有固定数量的2个连接,这个数量2就是一个信号量, 每当一个线程拿到连接时,
//信号量减一,当线程将连接释放时,信号量加一,当信号量为0时,想获取连接的线程将在阻塞中等待连接被释放;
//Semaphore类保证我们的信号量的增加和减少是线程安全的,也保证在信号量为0时,线程能正确的被阻塞直到信号量大于0;
//so,我们就来模仿数据库连接池的实现吧!(注意:这并非真正数据库连接池的实现,这里的示范只是做个非常简单的例子!)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26
| public static void semaphore(){ final ConnectPool pool = new ConnectPool(2); ExecutorService exec = Executors.newCachedThreadPool();
for (int i = 0; i < 5; i++) { final int id = i+1; exec.execute(new Runnable() { @Override public void run() { try { System.out.println("线程"+id+"等待获取数据库连接"); Connect connect = pool.openConnect(); System.out.println("线程"+id+"已拿到数据库连接:"+connect); TimeUnit.MILLISECONDS.sleep(2000); System.out.println("线程"+id+"释放数据库连接:"+connect); pool.releaseConnect(connect); } catch (InterruptedException e) { e.printStackTrace(); }
} }); } }
|
先看看ConnectPool类,它使用initConnects()方法初始化数据库资源池大小,这里传的参数是2,意思就是只有两个数据库连接;
我们在测试方法中开启5个线程,分别调用它的openConnect()来获取连接资源以及releaseConnect()来释放连接;
在openConnect()中,我们先调用信号量的acquire()方法拿到许可证,接着在getConnect()中判断某个连接是否正在被使用中,如果没有被使用,则取到该连接并将其标识为使用中;
在releaseConnect()中,我们将标识设置为false并调用信号量的release()归还许可证,这样后面的线程就能继续取得连接了;
看一下输出*************************************************************************************
连接#1#已与数据库建立通道!
连接#2#已与数据库建立通道!
线程2等待获取数据库连接
线程1等待获取数据库连接
线程2已拿到数据库连接:#1#
线程4等待获取数据库连接
线程1已拿到数据库连接:#2#
线程3等待获取数据库连接
线程5等待获取数据库连接
线程2释放数据库连接:#1#
线程1释放数据库连接:#2#
线程4已拿到数据库连接:#1#
线程3已拿到数据库连接:#2#
线程3释放数据库连接:#2#
线程4释放数据库连接:#1#
线程5已拿到数据库连接:#1#
线程5释放数据库连接:#1#
仔细看看输出,是否是多个线程在等待获取数据库连接,当某个线程释放数据库连接时,另一个线程马上获取到接着消费呢…