JUC并发编程(三)

因为内容太多了,所以将其拆分为以下内容

参考

https://www.bilibili.com/video/BV1B7411L7tE

常用辅助类(必会)

CountDownLatch

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
// 计数器
public class TestCountDownLatch {
public static void main(String[] args) throws InterruptedException {
// 总数6
CountDownLatch countDownLatch = new CountDownLatch(6);

for (int i = 1; i <= 6; i++) {
new Thread(()->{
System.out.println(Thread.currentThread().getName()+" Go out");
countDownLatch.countDown();// 数量-1
},String.valueOf(i)).start();
}
countDownLatch.await();// 等待计数器归零,再向下执行
System.out.println("Close the Door");
}
}

原理:

countDownLatch.countDown();// 数量-1

countDownLatch.await();// 等待计数器归零,再向下执行

每次有线程调用countDown()数量-1.假设计数器变为0,countDownLatch.await()就会被唤醒,继续执行!

CyclicBarrier

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
// 加法计数器
public class TestCyclicBarrier {
public static void main(String[] args) {
CyclicBarrier cyclicBarrier=new CyclicBarrier(7,()->{
System.out.println("召唤神龙!");
});

for (int i = 1; i <= 7; i++) {
final int temp=i;
new Thread(()->{
System.out.println(Thread.currentThread().getName()+"收集了"+temp+"个龙珠");
try {
cyclicBarrier.await();
} catch (InterruptedException e) {
e.printStackTrace();
} catch (BrokenBarrierException e) {
e.printStackTrace();
}
}).start();
}
}
}

Semaphore

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
public class TestSemaphore {
public static void main(String[] args) {
Semaphore semaphore = new Semaphore(3);

for (int i = 1; i <= 6; i++) {
new Thread(() -> {
try {
semaphore.acquire();// 获得
System.out.println(Thread.currentThread().getName() + "抢到车位");
TimeUnit.SECONDS.sleep(2);
System.out.println(Thread.currentThread().getName() + "离开车位");
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
semaphore.release();// 释放
}
}).start();
}
}
}

原理:

semaphore.acquire();// 获得,加入已经满了,等待,等待到被释放为止!

semaphore.release();// 释放,会将当前信号量+1,然后唤醒等待的线程!

作用:多个共享资源互斥的使用!并发限流,控制最大的线程数!

读写锁

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
public class TestReadWriteLock {
public static void main(String[] args) {
/**
* 独占锁(写锁)
* 共享锁(读锁)
* ReadWriteLock
* 读-读:可以共存
* 读-写:不可共存
* 写+写:不可共存
*/
// MyCache myCache = new MyCache();
MyCacheLock myCache=new MyCacheLock();

//写入
for (int i = 1; i <= 5; i++) {
final int temp = i;
new Thread(() -> {
myCache.put(temp + "", temp + "");
}, String.valueOf(i)).start();
}
//读出
for (int i = 1; i <= 5; i++) {
final int temp = i;
new Thread(() -> {
myCache.get(temp + "");
}, String.valueOf(i)).start();
}
}

}

/**
* 自定义缓存
*/
class MyCache {
private volatile Map<String, Object> map = new HashMap<>();

public void put(String key, Object value) {
System.out.println(Thread.currentThread().getName() + "写入" + key);
map.put(key, value);
System.out.println(Thread.currentThread().getName() + "写入OK");
}

public void get(String key) {
System.out.println(Thread.currentThread().getName() + "读出" + key);
Object o = map.get(key);
System.out.println(Thread.currentThread().getName() + "读出OK");
}
}

/**
* 自定义缓存
* 加锁
*/
class MyCacheLock {
private volatile Map<String,Object> map = new HashMap<>();
private ReadWriteLock readWriteLock = new ReentrantReadWriteLock();

public void put(String key, Object value) {
readWriteLock.writeLock().lock();
try {
System.out.println(Thread.currentThread().getName() + "写入" + key);
map.put(key, value);
System.out.println(Thread.currentThread().getName() + "写入OK");
} catch (Exception e) {
e.printStackTrace();
} finally {
readWriteLock.writeLock().unlock();
}
}

public void get(String key) {
readWriteLock.readLock().lock();
try {
System.out.println(Thread.currentThread().getName() + "读出" + key);
Object o = map.get(key);
System.out.println(Thread.currentThread().getName() + "读出OK");
} catch (Exception e) {
e.printStackTrace();
} finally {
readWriteLock.readLock().unlock();
}
}
}

阻塞队列(BlockingQueue)

什么情况下需要使用阻塞队列,多线程并发处理,线程池!

学会使用队列

添加、移除

四组API

方式 抛出异常 有返回值 阻塞等待 超时等待
添加 add() offer() put() offer(,,)
移除 remove() poll() take() poll(,)
检测队首元素 element() peek()

抛出异常

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
/**
* 抛出异常
*/
public static void test1() {
// 队列大小
ArrayBlockingQueue arrayBlockingQueue = new ArrayBlockingQueue<>(3);
System.out.println(arrayBlockingQueue.add("a"));
System.out.println(arrayBlockingQueue.add("b"));
System.out.println(arrayBlockingQueue.add("c"));

System.out.println("=====");
// 查看队首
System.out.println(arrayBlockingQueue.element());

// java.lang.IllegalStateException: Queue full 抛出异常,队列已满
// System.out.println(arrayBlockingQueue.add("d"));

System.out.println(arrayBlockingQueue.remove());
System.out.println(arrayBlockingQueue.remove());
System.out.println(arrayBlockingQueue.remove());

// java.util.NoSuchElementException 抛出异常,队列已满
//System.out.println(arrayBlockingQueue.remove());
}

有返回值

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
/**
* 有返回值没有异常
*/
public static void test2(){
ArrayBlockingQueue arrayBlockingQueue=new ArrayBlockingQueue<>(3);
System.out.println(arrayBlockingQueue.offer("a"));
System.out.println(arrayBlockingQueue.offer("b"));
System.out.println(arrayBlockingQueue.offer("c"));
// false
System.out.println(arrayBlockingQueue.offer("d"));

System.out.println(arrayBlockingQueue.poll());
System.out.println(arrayBlockingQueue.poll());
System.out.println(arrayBlockingQueue.poll());
// null
System.out.println(arrayBlockingQueue.poll());
}

阻塞等待

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
/**
* 等待,阻塞(一直阻塞)
*/
public static void test3() throws InterruptedException {
ArrayBlockingQueue arrayBlockingQueue=new ArrayBlockingQueue<>(3);

arrayBlockingQueue.put("a");
arrayBlockingQueue.put("b");
arrayBlockingQueue.put("c");

System.out.println("=====");

// 等待,一直阻塞
//arrayBlockingQueue.put("d");

System.out.println(arrayBlockingQueue.take());
System.out.println(arrayBlockingQueue.take());
System.out.println(arrayBlockingQueue.take());

// 等待,一直阻塞
System.out.println(arrayBlockingQueue.take());
}

超时等待

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
/**
* 超时等待
*/
public static void test4() throws InterruptedException {
ArrayBlockingQueue arrayBlockingQueue=new ArrayBlockingQueue<>(3);
System.out.println(arrayBlockingQueue.offer("a"));
System.out.println(arrayBlockingQueue.offer("b"));
System.out.println(arrayBlockingQueue.offer("c"));
// false,等待2秒退出
System.out.println(arrayBlockingQueue.offer("d",2, TimeUnit.SECONDS));

System.out.println("=====");

System.out.println(arrayBlockingQueue.poll());
System.out.println(arrayBlockingQueue.poll());
System.out.println(arrayBlockingQueue.poll());
// null
System.out.println(arrayBlockingQueue.poll(2,TimeUnit.SECONDS));
}

SynchronousQueue 同步队列

没有容量,进去一个元素,必须等待取出后,才能再存

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
public class SynchronousQueueDemo {
/**
* 同步队列
* put()元素,必须取出take(),才能再次put()
*/
public static void main(String[] args) {
BlockingQueue<String> blockingQueue = new SynchronousQueue<>();
new Thread(()->{
try {
System.out.println(Thread.currentThread().getName()+" put 1");
blockingQueue.put("1");
System.out.println(Thread.currentThread().getName()+" put 2");
blockingQueue.put("2");
System.out.println(Thread.currentThread().getName()+" put 3");
blockingQueue.put("3");
} catch (InterruptedException e) {
e.printStackTrace();
}

},"T1").start();

new Thread(()->{
try {
TimeUnit.SECONDS.sleep(3);
System.out.println(Thread.currentThread().getName()+"=>"+blockingQueue.take());
TimeUnit.SECONDS.sleep(3);
System.out.println(Thread.currentThread().getName()+"=>"+blockingQueue.take());
TimeUnit.SECONDS.sleep(3);
System.out.println(Thread.currentThread().getName()+"=>"+blockingQueue.take());
} catch (InterruptedException e) {
e.printStackTrace();
}
},"T2").start();
}
}

线程池(重点)

池化技术

程序运行的本质:占用系统资源!优化资源的使用 => 池化技术

好处:

  1. 降低资源消耗
  2. 提高响应速度
  3. 方便管理

线程复用,可以控制最大并发数,管理线程

线程池三大方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
public class Demo1 {
public static void main(String[] args) {
// ExecutorService threadPool = Executors.newSingleThreadExecutor();// 单个线程
// ExecutorService threadPool = Executors.newFixedThreadPool(5);// 固定
ExecutorService threadPool = Executors.newCachedThreadPool();// 可变的

try {
for (int i = 0; i < 100; i++) {
threadPool.execute(() -> {
System.out.println(Thread.currentThread().getName() + " ok");
});
}
} catch (Exception e) {
e.printStackTrace();
} finally {java
threadPool.shutdown();
}
}
}

七大参数

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()));java
}
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}
// 本质newCachedThreadPool()

public ThreadPoolExecutor(int corePoolSize, // 核心线程池大小
int maximumPoolSize, // 最大核心线程池大小
long keepAliveTime, // 超时了没有人调用就会释放
TimeUnit unit, // 超时单位
BlockingQueue<Runnable> workQueue, // 阻塞队列
ThreadFactory threadFactory, // 线程工厂,创建线程的,一般不动
RejectedExecutionHandler handler) // 拒绝策略 {
if (corePoolSize < 0 ||
maximumPoolSize <= 0 ||
maximumPoolSize < corePoolSize ||
keepAliveTime < 0)
throw new IllegalArgumentException();
if (workQueue == null || threadFactory == null || handler == null)
throw new NullPointerException();
this.acc = System.getSecurityManager() == null ?
null :
AccessController.getContext();
this.corePoolSize = corePoolSize;
this.maximumPoolSize = maximumPoolSize;
this.workQueue = workQueue;
this.keepAliveTime = unit.toNanos(keepAliveTime);
this.threadFactory = threadFactory;
this.handler = handler;
}

手动创建线程池 + 四种拒绝策略

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
public class Demo1 {
public static void main(String[] args) {
/**
* 四种拒绝策略
* new ThreadPoolExecutor.AbortPolicy():银行满了,还有人来,不处理,抛出异常
* new ThreadPoolExecutor.CallerRunsPolicy():哪来的去哪里
* new ThreadPoolExecutor.DiscardPolicy():队列满了,丢到任务,不会抛出异常
* new ThreadPoolExecutor.DiscardOldestPolicy():队列满了,丢弃最旧的,重试,也不会抛出异常
*/
ExecutorService threadPool = new ThreadPoolExecutor(
2,
5,
3,
TimeUnit.SECONDS,
new LinkedBlockingQueue<>(3),
Executors.defaultThreadFactory(),
new ThreadPoolExecutor.DiscardOldestPolicy());

try {
// 最大承载 = 最大线程数 + 队列大小
// 超出 java.util.concurrent.RejectedExecutionException
for (int i = 1; i <= 9; i++) {
threadPool.execute(() -> {
System.out.println(Thread.currentThread().getName() + " ok");
});
}
} catch (Exception e) {
e.printStackTrace();
} finally {
threadPool.shutdown();
}
}
}

小结

了解 CPU密集型,IO密集型

1
2
3
4
5
6
7
8
9
10
public static void main(String[] args) {
/**
* 最大线程到底该如何定义
* 1.CPU 密集型 几核,就是几,可以保持CPU的效率最高!
* 2.IO 密集型 >判断程序中十分耗IO的线程
*/

System.out.println(Runtime.getRuntime().availableProcessors());

}

因为内容太多了,所以将其拆分为以下内容