因为内容太多了,所以将其拆分为以下内容
参考
https://www.bilibili.com/video/BV1B7411L7tE
常用辅助类(必会)
CountDownLatch
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
| public class TestCountDownLatch { public static void main(String[] args) throws InterruptedException { CountDownLatch countDownLatch = new CountDownLatch(6);
for (int i = 1; i <= 6; i++) { new Thread(()->{ System.out.println(Thread.currentThread().getName()+" Go out"); countDownLatch.countDown(); },String.valueOf(i)).start(); } countDownLatch.await(); System.out.println("Close the Door"); } }
|
原理:
countDownLatch.countDown();// 数量-1
countDownLatch.await();// 等待计数器归零,再向下执行
每次有线程调用countDown()数量-1.假设计数器变为0,countDownLatch.await()就会被唤醒,继续执行!
CyclicBarrier
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22
| public class TestCyclicBarrier { public static void main(String[] args) { CyclicBarrier cyclicBarrier=new CyclicBarrier(7,()->{ System.out.println("召唤神龙!"); });
for (int i = 1; i <= 7; i++) { final int temp=i; new Thread(()->{ System.out.println(Thread.currentThread().getName()+"收集了"+temp+"个龙珠"); try { cyclicBarrier.await(); } catch (InterruptedException e) { e.printStackTrace(); } catch (BrokenBarrierException e) { e.printStackTrace(); } }).start(); } } }
|
Semaphore
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20
| public class TestSemaphore { public static void main(String[] args) { Semaphore semaphore = new Semaphore(3);
for (int i = 1; i <= 6; i++) { new Thread(() -> { try { semaphore.acquire(); System.out.println(Thread.currentThread().getName() + "抢到车位"); TimeUnit.SECONDS.sleep(2); System.out.println(Thread.currentThread().getName() + "离开车位"); } catch (InterruptedException e) { e.printStackTrace(); } finally { semaphore.release(); } }).start(); } } }
|
原理:
semaphore.acquire();//
获得,加入已经满了,等待,等待到被释放为止!
semaphore.release();//
释放,会将当前信号量+1,然后唤醒等待的线程!
作用:多个共享资源互斥的使用!并发限流,控制最大的线程数!
读写锁
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84
| public class TestReadWriteLock { public static void main(String[] args) {
MyCacheLock myCache=new MyCacheLock();
for (int i = 1; i <= 5; i++) { final int temp = i; new Thread(() -> { myCache.put(temp + "", temp + ""); }, String.valueOf(i)).start(); } for (int i = 1; i <= 5; i++) { final int temp = i; new Thread(() -> { myCache.get(temp + ""); }, String.valueOf(i)).start(); } }
}
class MyCache { private volatile Map<String, Object> map = new HashMap<>();
public void put(String key, Object value) { System.out.println(Thread.currentThread().getName() + "写入" + key); map.put(key, value); System.out.println(Thread.currentThread().getName() + "写入OK"); }
public void get(String key) { System.out.println(Thread.currentThread().getName() + "读出" + key); Object o = map.get(key); System.out.println(Thread.currentThread().getName() + "读出OK"); } }
class MyCacheLock { private volatile Map<String,Object> map = new HashMap<>(); private ReadWriteLock readWriteLock = new ReentrantReadWriteLock();
public void put(String key, Object value) { readWriteLock.writeLock().lock(); try { System.out.println(Thread.currentThread().getName() + "写入" + key); map.put(key, value); System.out.println(Thread.currentThread().getName() + "写入OK"); } catch (Exception e) { e.printStackTrace(); } finally { readWriteLock.writeLock().unlock(); } }
public void get(String key) { readWriteLock.readLock().lock(); try { System.out.println(Thread.currentThread().getName() + "读出" + key); Object o = map.get(key); System.out.println(Thread.currentThread().getName() + "读出OK"); } catch (Exception e) { e.printStackTrace(); } finally { readWriteLock.readLock().unlock(); } } }
|
阻塞队列(BlockingQueue)
什么情况下需要使用阻塞队列,多线程并发处理,线程池!
学会使用队列
添加、移除
四组API
添加 |
add() |
offer() |
put() |
offer(,,) |
移除 |
remove() |
poll() |
take() |
poll(,) |
检测队首元素 |
element() |
peek() |
|
|
抛出异常
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24
|
public static void test1() { ArrayBlockingQueue arrayBlockingQueue = new ArrayBlockingQueue<>(3); System.out.println(arrayBlockingQueue.add("a")); System.out.println(arrayBlockingQueue.add("b")); System.out.println(arrayBlockingQueue.add("c"));
System.out.println("====="); System.out.println(arrayBlockingQueue.element());
System.out.println(arrayBlockingQueue.remove()); System.out.println(arrayBlockingQueue.remove()); System.out.println(arrayBlockingQueue.remove());
}
|
有返回值
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
|
public static void test2(){ ArrayBlockingQueue arrayBlockingQueue=new ArrayBlockingQueue<>(3); System.out.println(arrayBlockingQueue.offer("a")); System.out.println(arrayBlockingQueue.offer("b")); System.out.println(arrayBlockingQueue.offer("c")); System.out.println(arrayBlockingQueue.offer("d"));
System.out.println(arrayBlockingQueue.poll()); System.out.println(arrayBlockingQueue.poll()); System.out.println(arrayBlockingQueue.poll()); System.out.println(arrayBlockingQueue.poll()); }
|
阻塞等待
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22
|
public static void test3() throws InterruptedException { ArrayBlockingQueue arrayBlockingQueue=new ArrayBlockingQueue<>(3);
arrayBlockingQueue.put("a"); arrayBlockingQueue.put("b"); arrayBlockingQueue.put("c");
System.out.println("=====");
System.out.println(arrayBlockingQueue.take()); System.out.println(arrayBlockingQueue.take()); System.out.println(arrayBlockingQueue.take());
System.out.println(arrayBlockingQueue.take()); }
|
超时等待
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19
|
public static void test4() throws InterruptedException { ArrayBlockingQueue arrayBlockingQueue=new ArrayBlockingQueue<>(3); System.out.println(arrayBlockingQueue.offer("a")); System.out.println(arrayBlockingQueue.offer("b")); System.out.println(arrayBlockingQueue.offer("c")); System.out.println(arrayBlockingQueue.offer("d",2, TimeUnit.SECONDS));
System.out.println("=====");
System.out.println(arrayBlockingQueue.poll()); System.out.println(arrayBlockingQueue.poll()); System.out.println(arrayBlockingQueue.poll()); System.out.println(arrayBlockingQueue.poll(2,TimeUnit.SECONDS)); }
|
SynchronousQueue 同步队列
没有容量,进去一个元素,必须等待取出后,才能再存
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35
| public class SynchronousQueueDemo {
public static void main(String[] args) { BlockingQueue<String> blockingQueue = new SynchronousQueue<>(); new Thread(()->{ try { System.out.println(Thread.currentThread().getName()+" put 1"); blockingQueue.put("1"); System.out.println(Thread.currentThread().getName()+" put 2"); blockingQueue.put("2"); System.out.println(Thread.currentThread().getName()+" put 3"); blockingQueue.put("3"); } catch (InterruptedException e) { e.printStackTrace(); }
},"T1").start();
new Thread(()->{ try { TimeUnit.SECONDS.sleep(3); System.out.println(Thread.currentThread().getName()+"=>"+blockingQueue.take()); TimeUnit.SECONDS.sleep(3); System.out.println(Thread.currentThread().getName()+"=>"+blockingQueue.take()); TimeUnit.SECONDS.sleep(3); System.out.println(Thread.currentThread().getName()+"=>"+blockingQueue.take()); } catch (InterruptedException e) { e.printStackTrace(); } },"T2").start(); } }
|
线程池(重点)
池化技术
程序运行的本质:占用系统资源!优化资源的使用 => 池化技术
好处:
- 降低资源消耗
- 提高响应速度
- 方便管理
线程复用,可以控制最大并发数,管理线程
线程池三大方法
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19
| public class Demo1 { public static void main(String[] args) { ExecutorService threadPool = Executors.newCachedThreadPool();
try { for (int i = 0; i < 100; i++) { threadPool.execute(() -> { System.out.println(Thread.currentThread().getName() + " ok"); }); } } catch (Exception e) { e.printStackTrace(); } finally {java threadPool.shutdown(); } } }
|
七大参数
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42
| public static ExecutorService newSingleThreadExecutor() { return new FinalizableDelegatedExecutorService (new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>()));java } public static ExecutorService newFixedThreadPool(int nThreads) { return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>()); } public static ExecutorService newCachedThreadPool() { return new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>()); }
public ThreadPoolExecutor(int corePoolSize, // 核心线程池大小 int maximumPoolSize, // 最大核心线程池大小 long keepAliveTime, // 超时了没有人调用就会释放 TimeUnit unit, // 超时单位 BlockingQueue<Runnable> workQueue, // 阻塞队列 ThreadFactory threadFactory, // 线程工厂,创建线程的,一般不动 RejectedExecutionHandler handler) if (corePoolSize < 0 || maximumPoolSize <= 0 || maximumPoolSize < corePoolSize || keepAliveTime < 0) throw new IllegalArgumentException(); if (workQueue == null || threadFactory == null || handler == null) throw new NullPointerException(); this.acc = System.getSecurityManager() == null ? null : AccessController.getContext(); this.corePoolSize = corePoolSize; this.maximumPoolSize = maximumPoolSize; this.workQueue = workQueue; this.keepAliveTime = unit.toNanos(keepAliveTime); this.threadFactory = threadFactory; this.handler = handler; }
|
手动创建线程池 + 四种拒绝策略
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33
| public class Demo1 { public static void main(String[] args) {
ExecutorService threadPool = new ThreadPoolExecutor( 2, 5, 3, TimeUnit.SECONDS, new LinkedBlockingQueue<>(3), Executors.defaultThreadFactory(), new ThreadPoolExecutor.DiscardOldestPolicy());
try { for (int i = 1; i <= 9; i++) { threadPool.execute(() -> { System.out.println(Thread.currentThread().getName() + " ok"); }); } } catch (Exception e) { e.printStackTrace(); } finally { threadPool.shutdown(); } } }
|
小结
了解 CPU密集型,IO密集型
1 2 3 4 5 6 7 8 9 10
| public static void main(String[] args) {
System.out.println(Runtime.getRuntime().availableProcessors());
}
|
因为内容太多了,所以将其拆分为以下内容