Concurrent:
创新互联建站主要从事网站建设、网站设计、网页设计、企业做网站、公司建网站等业务。立足成都服务鹤庆,10年网站建设经验,价格优惠、服务专业,欢迎来电咨询建站服务:13518219792
1. BlockingQueue( 阻塞队列 )
ArrayBlockingQueue( 指定容量,不可变 ),LinkeBlocingQueue (指定容量不可变,也可以不指定容量,默认 Integer.Max_value )
PriorityBlockingQueue ( 根据实现的接口自定义排序,只有在逐个拿取的时候才有序 )
SynchronousQueue (长度以 1 只能为 1 )

2. ConcurrentMap
ConcurrentHashMap
1.5 分桶加锁,1.8 CAS+红黑树来保证线程安全
ConcurrentNavigableMap (针对有序列表,有 map.headMap map.subMap map.tailMap , 返回有序的在取出范围的 map )
3. CountDownLatch 闭锁
CountDownLatch 以一个给定的数量初始化。 countDown() 每被调用一次,这一数 量就减一。通过调用 await() 方法之一,线程可以阻塞等待这一数量到达零。
等待一定数量的线程完成。来执行其后的程序
4. CyclicBarrier 栅栏
它能够对处理一些算法的线程实现同
步。换句话讲,它就是一个所有线程必须等待的一个栅栏,直到所有线程都到达这
里,然后所有线程才可以继续做其他事情
5. Exchanger 交换机
类表示一种两个线程可以进行互相交换对象的会和点 ,只能两个线程之间交换数据

6. Semaphore 信号量
l
acquire()
l
release()
计数信号量由一个指定数量的
"
许可
"
初始化。每调用一次
acquire()
,一个许可会
被调用线程取走。每调用一次
release()
,一个许可会被返还给信号量。因此,在没
有任何
release()
调用时,最多有
N
个线程能够通过
acquire()
方法,
N
是该信
号量初始化时的许可的指定数量。
线程池
7. ExecutorService
(executors.newFixedThreadPool ,长任务场景,只有核心线程,没有临时线程,容纳无限多
,newCachedThreadPool (高并发短任务场景,没有核心线程,全部都是临时线程,处理任意多的线程)
,newSingleThreadPool
,newSchedulerThreadPool (有核心线程,有临时线程)
)
提交线程的方法
execute() 提交线程 没有返回值submit(Runnable) 提交线程,返Future 可以通过Future.get()得到该线程的状态但是如果该线程未执行完成,那么该方法阻塞
submit(Callable) 同上面类似,但是线程可以带有返回值
invokeAny(.....),随机选择线程执行一个
invokeAll(),自动执行所有的线程
关闭线程池: ExecutorService.shutdown();该方法不再接受线程池,等待所有线程执行完毕后,线程池结束。
ExecutorService.shutdown();立即关闭,退出任务,正在执行的线程可能会出错。
##Callable只能用线程池提交
Callable runnable :
1. 返回值
2. 异常, runnabel没有容错机制,callable有容错机制,可以将 异常抛给上层处理
3. Callable只能通过submit方法提交,runnable可以new 也可 以通过线程池提交
8. ReadWriteLock 读写锁 ,可以是公平,也可以是非公平的。该锁可以跨方法
读锁可以共享,写锁互斥
读锁 readLock().lock();
写锁 writeLock().lock();
package hgs.test;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.ConcurrentNavigableMap;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.Exchanger;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.PriorityBlockingQueue;
import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.Semaphore;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReentrantReadWriteLock;
public class Test {
public static void main(String[] args) throws InterruptedException, BrokenBarrierException {
//ConcurrentMap
//BlockingQueue bq = new ArrayBlockingQueue(4);
//add(超过长度会报错) remove(没有元素会报错)
/*
bq.add("1");
bq.add("1");
bq.add("1");
bq.add("1");
bq.remove();
bq.remove();
bq.remove();
bq.remove();
*/
//offer(如果可以插入返回true 否则 false) poll(如果没有元素返回null)
//bq.offer("1");
//bq.offer("1");
//bq.offer("1");
//bq.offer("1");
//bq.offer("1");
//String flag = bq.poll();
//System.out.println(flag);
//put take 阻塞式
//bq.put("1");
//bq.put("1");
//bq.put("1");
//bq.put("1");
//bq.put("1");
//bq.put("1");
//bq.take();
//offer(o,timeout,timeunit),poll(timeout,tameunit) 等待timeout时间,然后跳过
//bq.poll(10, TimeUnit.SECONDS);
//bq.element();//检查是否为空,是的话跑出异常
//bq.peek();//阻塞
/* ConcurrentNavigableMap m = new ConcurrentSkipListMap() ;
m.put(1, "1");
m.put(2, "2");
m.put(5, "5");
m.put(4, "4");
m.put(6, "6");
m.put(5, "5");
System.out.println("head(\"5\")"+m.headMap(4));
System.out.println();
System.out.println();*/
/*Boy b1 = new Boy("b1",24);
Boy b2 = new Boy("b2",23);
Boy b3 = new Boy("b3",26);
Boy b4 = new Boy("b4",19);
PriorityBlockingQueue< Boy> pbq = new PriorityBlockingQueue(100);
pbq.add(b1);
pbq.add(b2);
pbq.add(b3);
pbq.add(b4);
for(int i =0 ;i<4;i++) {
System.out.println(pbq.take().toString());
} */
//闭锁
/*CountDownLatch cdl = new CountDownLatch(4);
for(int i = 0; i<4; i++) {
new Thread(new BoyRun(cdl)).start();
}
cdl.await();
System.out.println("全部到达。。。");*/
//栅栏
/*CyclicBarrier cb = new CyclicBarrier(4);
for(int i = 0; i<4; i++) {
new Thread(new GirlRan(cb)).start();
}
cb.await();
System.out.println("all comming.");*/
//exchanger交换器
/*Exchanger ex = new Exchanger();
ExchangerTest e1 = new ExchangerTest(ex);
ExchangerTest e2 = new ExchangerTest(ex);
new Thread(e1).start();
new Thread(e2).start(); */
//6.Semaphore 信号量
/* Semaphore s = new Semaphore(5);
for(int i = 0 ;i<9;i++) {
new SemaphoreTest(s).start();
}*/
//原始创建线程池,executors.newCacheThreadPool 的底层调用该方法
/* ExecutorService es = new
ThreadPoolExecutor(5, 10, 10, TimeUnit.SECONDS, new ArrayBlockingQueue<>(10),
new RejectedExecutionHandler() {
@Override
public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
System.out.println("full......");
}
});
for(int i = 0 ;i<24;i++) {
es.execute(new ThreadPoolTest());
}
es.shutdown();*/
//可重入锁 ReentrantLock 可重入读写锁ReentrantReadWriteLock
/*
ReentrantReadWriteLock rwLock = new ReentrantReadWriteLock();
for (int i = 0 ;i<3;i++) {
new ReadLockTest( rwLock ).start();
}*/
ReentrantReadWriteLock rwLock = new ReentrantReadWriteLock();
for (int i = 0 ;i<3;i++) {
new WriteLockTest( rwLock ).start();
}
}
}
class WriteLockTest extends Thread{
ReentrantReadWriteLock rwLock ;
public WriteLockTest(ReentrantReadWriteLock rwLock ) {
this.rwLock = rwLock;
}
@Override
public void run() {
rwLock.writeLock().lock();
System.out.println("reading......");
try {
Thread.sleep((long)(Math.random()*2000));
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
System.out.println("done......");
rwLock.writeLock().unlock();
}
}
class ReadLockTest extends Thread{
ReentrantReadWriteLock rwLock ;
public ReadLockTest(ReentrantReadWriteLock rwLock ) {
this.rwLock = rwLock;
}
@Override
public void run() {
rwLock.readLock().lock();
System.out.println("reading......");
try {
Thread.sleep((long)(Math.random()*2000));
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
System.out.println("done......");
rwLock.readLock().unlock();
}
}
class ThreadPoolTest implements Runnable{
ThreadPoolTest (){
}
@Override
public void run() {
System.out.println(Thread.currentThread().getName());
try {
Thread.sleep((long)(Math.random()*2000));
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}
class SemaphoreTest extends Thread{
Semaphore s = null;
public SemaphoreTest(Semaphore s) {
this.s = s;
}
@Override
public void run() {
try {
s.acquire();
System.out.println("aquire......");
Thread.sleep((long)(Math.random()*3000));
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
System.out.println("release......");
s.release();
}
}
class ExchangerTest implements Runnable{
Exchanger ex = null;
public ExchangerTest(Exchanger ex) {
this.ex = ex;
}
@Override
public void run() {
String my = Thread.currentThread().getName();
String exstr = null;
try {
exstr = ex.exchange(my);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
System.out.println(my+" "+exstr);
}
}
class BoyRun implements Runnable{
CountDownLatch cdl ;
public BoyRun(CountDownLatch cdl ) {
this.cdl = cdl;
}
@Override
public void run() {
System.out.println("cdl"+" +1");
cdl.countDown();
System.out.println("cdl"+" +1--");
}
}
class GirlRan implements Runnable {
CyclicBarrier cb ;
public GirlRan(CyclicBarrier cb ) {
this.cb = cb;
}
@Override
public void run() {
System.out.println("cdl"+" +1");
try {
Thread.sleep((long)(Math.random()*5000));
cb.await();
} catch (InterruptedException | BrokenBarrierException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
System.out.println("cdl"+" +1--");
}
}
class Boy implements Comparable{
String name;
int age;
public Boy(String name ,int age) {
this.name = name;
this.age = age;
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
public int getAge() {
return age;
}
public void setAge(int age) {
this.age = age;
}
public int compareTo(Boy o) {
return this.age - o.age;
}
@Override
public String toString() {
return "Boy [name=" + name + ", age=" + age + "]";
}
}
当前题目:day1javaconcurrent包一些知识点
分享路径:http://www.scyingshan.cn/article/ipchhi.html


咨询
建站咨询
