JUC并发编程(四)

因为内容太多了,所以将其拆分为以下内容

参考

https://www.bilibili.com/video/BV1B7411L7tE

四大函数式接口(必须掌握)

现代程序员:Lambda表达式,链式编程,函数式接口,Stream流式计算

函数式接口:只有一个方法的接口

1
2
3
4
5
6
7
@FunctionalInterface
public interface Runnable {
public abstract void run();
}
// 超级多FunctionalInterface
// 简化编程模型,在新版本的框架层大量应用
// forEach(消费者类型的函数式接口)

代码测试

Function 函数式接口

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
/**
* 函数型接口:有一个输入参数,有一个输出
* 只要是函数式接口,可以用 lambda表达式 简化
*/
public class Demo1 {
public static void main(String[] args) {

// Function<String,String> function = new Function<String,String>(){
// @Override
// public String apply(String str) {
// return str;
// }
// };

Function<String,String> function=(str)->{
return str;
};

System.out.println(function.apply("abc"));
}
}

Predicate 断言型接口

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
/**
* 断言型接口:有一个输入参数,返回值只能是 布尔值!
*/
public class Demo2 {
public static void main(String[] args) {
// Predicate<String> predicate = new Predicate<String>(){
//
// @Override
// public boolean test(String str) {
// return str.isEmpty();
// }
// };

Predicate<String> predicate=(str)->{
return str.isEmpty();
};

System.out.println(predicate.test(""));
}
}

Consumer 消费型接口

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
/**
* 消费性接口:只有输入,没有返回
*/
public class Demo3 {
public static void main(String[] args) {
// Consumer<String> consumer = new Consumer<String>(){
//
// @Override
// public void accept(String str) {
// System.out.println(str);
// }
// };

Consumer<String> consumer=(str)->{
System.out.println(str);
};

consumer.accept("abc");
}
}

Supplier 供给型接口

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
/**
* 供给型接口:没有参数,有返回值
*/
public class Demo4 {
public static void main(String[] args) {
// Supplier<String> supplier = new Supplier<String>(){
//
// @Override
// public String get() {
// System.out.println("get()");
// return "1024";
// }
// };

Supplier<String> supplier = () -> {
System.out.println("get()");
return "1024";
};

System.out.println(supplier.get());
}
}

Stream流式计算

什么是Stream流式计算

大数据:存储 + 计算

集合、MySQL 本质就是存储

计算都应该交给流来操作!

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
/**
* 题目要求:
* 现有5个用户!筛选:
* 1、ID 必须是偶数
* 2、年龄必须大于23岁
* 3、用户名转为大写字母
* 4、用户名字母倒序排列
* 5、只输出一个用户
*/
public class Test {
public static void main(String[] args) {
User u1 = new User(1, "a", 21);
User u2 = new User(2, "b", 22);
User u3 = new User(3, "c", 23);
User u4 = new User(4, "d", 24);
User u5 = new User(6, "e", 25);

// 集合负责存储
List<User> list = Arrays.asList(u1, u2, u3, u4, u5);

// 计算交给Stream
list.stream()
.filter(u -> {
return u.getId() % 2 == 0;
})
.filter(u -> {
return u.getAge() > 23;
})
.map(u -> {
return u.getName().toUpperCase();
})
.sorted((A, B) -> {
return B.compareTo(A);
})
.limit(1)
.forEach(System.out::println);
}
}

ForkJoin

什么是ForkJoin

JDK1.7,并行执行任务!提高效率,大数据量!

大数据:Map Reduce(把大任务拆分为小任务)

ForkJoin 特点:工作窃取

这里维护的的是双向队列

ForkJoin

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
public class Test {
public static void main(String[] args) throws ExecutionException, InterruptedException {
test1();
test2();
test3();
}

public static void test1() {
long start = System.currentTimeMillis();

Long sum = 0L;
for (Long i = 1L; i <= 1000_0000L; i++) {
sum += i;
}

long end = System.currentTimeMillis();
System.out.println("test1结果:" + sum + "耗时:" + (end - start));
}

public static void test2() throws ExecutionException, InterruptedException {
long start = System.currentTimeMillis();

ForkJoinPool forkJoinPool = new ForkJoinPool();
ForkJoinTask<Long> task = new ForkJoinDemo(1L, 1000_0000L);
ForkJoinTask<Long> submit = forkJoinPool.submit(task);// 提交任务
Long sum = submit.get();

long end = System.currentTimeMillis();
System.out.println("test2结果:" + sum + "耗时:" + (end - start));
}

public static void test3() {
long start = System.currentTimeMillis();

long sum = LongStream.rangeClosed(0L, 1000_0000L).parallel().reduce(0, Long::sum);

long end = System.currentTimeMillis();
System.out.println("test3结果:" + sum + "耗时:" + (end - start));
}
}

Demo

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
/**
* 使用 ForkJoin
* 1、ForkJoinPool 通过它来执行
* 2、计算任务 ForkJoinPool.execute(ForkJoinTask<?> task)
* 3、计算类继承
*/
public class ForkJoinDemo extends RecursiveTask<Long> {

private Long start;
private Long end;
private Long temp = 10000L;

public ForkJoinDemo(Long start, Long end) {
this.start = start;
this.end = end;
}

@Override
protected Long compute() {
if (end - start < temp) {
Long sum = 0L;
for (Long i = start; i <= end; i++) {
sum += i;
}
return sum;
} else {
long middle = (start + end) / 2;
ForkJoinDemo task1 = new ForkJoinDemo(start, middle);
task1.fork(); //拆分任务,把任务压入线程队列
ForkJoinDemo task2 = new ForkJoinDemo(middle + 1, end);
task2.fork();
return task1.join() + task2.join();
}
}
}

异步回调

Future 设计初衷,对将来某个事件

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
/**
* 异步调用:CompletableFuture
* 异步执行
* 成功回调
* 失败回调
*/
public class Demo1 {
public static void main(String[] args) throws ExecutionException, InterruptedException {
// 没有返回值的 runAsync 异步回调
// CompletableFuture<Void> completableFuture = CompletableFuture.runAsync(() -> {
//
// try {
// TimeUnit.SECONDS.sleep(2);
// } catch (InterruptedException e) {
// e.printStackTrace();
// }
// System.out.println(Thread.currentThread().getName() + " runAsync=>Void");
// });
//
// System.out.println("abc");
//
// completableFuture.get();

// 有返回值的supplyAsync 异步回调
CompletableFuture<Integer> completableFuture1 = CompletableFuture.supplyAsync(() -> {
System.out.println(Thread.currentThread().getName() + " supplyAsync=>Integer");
int i=10/0;
return 1024;
});

System.out.println(completableFuture1.whenComplete((t, u) -> {
System.out.println("t=>" + t);// 正常的返回值
System.out.println("u=>" + u);// 错误的信息:java.util.concurrent.CompletionException: java.lang.ArithmeticException: / by zero
}).exceptionally((e) -> {
System.out.println(e.getMessage());
return 233;
}).get());
}
}

JMM

请你谈谈对 Volatile 的理解

Volatile 是 java 虚拟机提供的轻量级同步机制

  1. 保证可见性
  2. 不保证原子性
  3. 禁止指令重排

什么是JMM

JMM:java内存模型,不存在的东西,概念!约定!

关于JMM的一些同步约定:

  1. 线程解锁前,必须把共享变量立即刷回主存。
  2. 线程加锁前,必须读取主存中的最新值到工作内存中!
  3. 加锁和解锁是统一把锁

线程:工作内存主存

内存交互操作有8种,虚拟机实现必须保证每一个操作都是原子的,不可在分的(对于double和long类型的变量来说,load、store、read和write操作在某些平台上允许例外)

    • lock (锁定):作用于主内存的变量,把一个变量标识为线程独占状态
    • unlock (解锁):作用于主内存的变量,它把一个处于锁定状态的变量释放出来,释放后的变量才可以被其他线程锁定
    • read (读取):作用于主内存变量,它把一个变量的值从主内存传输到线程的工作内存中,以便随后的load动作使用
    • load (载入):作用于工作内存的变量,它把read操作从主存中变量放入工作内存中
    • use (使用):作用于工作内存中的变量,它把工作内存中的变量传输给执行引擎,每当虚拟机遇到一个需要使用到变量的值,就会使用到这个指令
    • assign (赋值):作用于工作内存中的变量,它把一个从执行引擎中接受到的值放入工作内存的变量副本中
    • store (存储):作用于主内存中的变量,它把一个从工作内存中一个变量的值传送到主内存中,以便后续的write使用
    • write  (写入):作用于主内存中的变量,它把store操作从工作内存中得到的变量的值放入主内存的变量中

JMM对这八种指令的使用,制定了如下规则:

    • 不允许read和load、store和write操作之一单独出现。即使用了read必须load,使用了store必须write
    • 不允许线程丢弃他最近的assign操作,即工作变量的数据改变了之后,必须告知主存
    • 不允许一个线程将没有assign的数据从工作内存同步回主内存
    • 一个新的变量必须在主内存中诞生,不允许工作内存直接使用一个未被初始化的变量。就是怼变量实施use、store操作之前,必须经过assign和load操作
    • 一个变量同一时间只有一个线程能对其进行lock。多次lock后,必须执行相同次数的unlock才能解锁
    • 如果对一个变量进行lock操作,会清空所有工作内存中此变量的值,在执行引擎使用这个变量前,必须重新load或assign操作初始化变量的值
    • 如果一个变量没有被lock,就不能对其进行unlock操作。也不能unlock一个被其他线程锁住的变量
    • 对一个变量进行unlock操作之前,必须把此变量同步回主内存

  JMM对这八种操作规则和对volatile的一些特殊规则就能确定哪里操作是线程安全,哪些操作是线程不安全的了。但是这些规则实在复杂,很难在实践中直接分析。所以一般我们也不会通过上述规则进行分析。更多的时候,使用java的happen-before规则来进行分析。

Volatile

1、保证可见性

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
public class JMMDemo {
/**
* 不加 volatile 程序进入死循环
* 加 volatile 可以保证可见性
*/
private volatile static int num = 0;

public static void main(String[] args) {

new Thread(() -> {
while (num == 0) {
}
}).start();

try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
num = 1;
System.out.println(num);
}
}

2、不保证原子性

原子性:不可分割

线程在执行时,不能被打扰,要么成功,要么失败。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
public class JMMDemo2 {
/**
* volatile 不保证原子性
*/
private volatile static int num=0;

public static void add(){
num++;
}
public static void main(String[] args) {

// 理论结果为 20000
for (int i = 0; i < 20; i++) {
new Thread(()->{
for (int j = 0; j < 1000; j++) {
add();
}
}).start();
}
while (Thread.activeCount()>2){
Thread.yield();
}
System.out.println(Thread.currentThread().getName()+" "+num);
}
}

不加 Lock 和 synchronized ,怎样保证原子性

使用原子类,解决原子性问题

原子类

java.util.concurrent.atomic

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
public class JMMDemo2 {
/**
* volatile 不保证原子性
* AtomicInteger 原子类的 Integer
*/
private volatile static AtomicInteger num=new AtomicInteger();

public synchronized static void add(){
// num++; // 不是原子性操作
num.getAndIncrement(); // AtomicInteger + 1 方法 CAS
}
public static void main(String[] args) {

// 理论结果为 20000
for (int i = 0; i < 20; i++) {
new Thread(()->{
for (int j = 0; j < 1000; j++) {
add();
}
}).start();
}
while (Thread.activeCount()>2){
Thread.yield();
}
System.out.println(Thread.currentThread().getName()+" "+num);
}
}

这些类的底层都直接与操作系统挂钩!在内存中修改!Unsafe类是一个很特殊的存在

3、指令重排

什么是 指令重排:计算机并不是按你写的那样去执行的

源代码 ---> 编译器优化重排 ---> 指令并行也可能重排 ---> 内存系统也会重排 ---> 执行

处理机在进行指令重排时,考虑:数据之间的依赖性!

Volatile可以避免指令重排:

内存屏障,CPU指令,作用:

1、保证特定操作的执行顺序!

2、可以保证某些变量内存可见性!(利用这些特性 Volatile 实现了可见性)

因为内容太多了,所以将其拆分为以下内容