JUC并发编程(二)

因为内容太多了,所以将其拆分为以下内容

参考

https://www.bilibili.com/video/BV1B7411L7tE

生产者消费者问题

面试:单例模式,排序算法,生产者消费者问题,死锁

生产者消费者问题 Synchronized

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
public class A {
public static void main(String[] args) {
Data data=new Data();
new Thread(()->{
for (int i = 0; i < 10; i++) {
try {
data.increment();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
},"A").start();
new Thread(()->{
for (int i = 0; i < 10; i++) {
try {
data.decrement();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
},"B").start();
}
}
class Data{
private int number=0;

public synchronized void increment() throws InterruptedException {
if (number!=0){
this.wait();
}
number++;
System.out.println(Thread.currentThread().getName()+"=>"+number);
this.notifyAll();
}
public synchronized void decrement() throws InterruptedException {
if(number==0){
this.wait();
}
number--;
System.out.println(Thread.currentThread().getName()+"=>"+number);
this.notifyAll();
}
}

问题存在,A,B,C,D,4个线程!虚假唤醒

if改为while

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
public class A {
public static void main(String[] args) {
Data data=new Data();
new Thread(()->{
for (int i = 0; i < 10; i++) {
try {
data.increment();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
},"A").start();
new Thread(()->{
for (int i = 0; i < 10; i++) {
try {
data.decrement();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
},"B").start();
new Thread(()->{
for (int i = 0; i < 10; i++) {
try {
data.increment();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
},"C").start();
new Thread(()->{
for (int i = 0; i < 10; i++) {
try {
data.decrement();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
},"D").start();
}
}
class Data{
private int number=0;

public synchronized void increment() throws InterruptedException {
while (number!=0){
this.wait();
}
number++;
System.out.println(Thread.currentThread().getName()+"=>"+number);
this.notifyAll();
}
public synchronized void decrement() throws InterruptedException {
while (number==0){
this.wait();
}
number--;
System.out.println(Thread.currentThread().getName()+"=>"+number);
this.notifyAll();
}
}

JUC版的生产者消费者问题

通过Lock找到Condition

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
public class B {
public static void main(String[] args) {
Data2 data = new Data2();
new Thread(()->{
for (int i = 0; i < 10; i++) {
try {
data.increment();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
},"A").start();
new Thread(()->{
for (int i = 0; i < 10; i++) {
try {
data.decrement();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
},"B").start();
new Thread(()->{
for (int i = 0; i < 10; i++) {
try {
data.increment();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
},"C").start();
new Thread(()->{
for (int i = 0; i < 10; i++) {
try {
data.decrement();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
},"D").start();
}
}

class Data2 {
private int number = 0;

Lock lock = new ReentrantLock();

Condition condition = lock.newCondition();
// condition.await();
// condition.signalAll();

public void increment() throws InterruptedException {
lock.lock();
try {
while (number != 0) {
condition.await();
}
number++;
System.out.println(Thread.currentThread().getName() + "=>" + number);
condition.signalAll();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}

public void decrement() throws InterruptedException {
lock.lock();

try {
while (number == 0) {
condition.await();
}
number--;
System.out.println(Thread.currentThread().getName() + "=>" + number);
condition.signalAll();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}
}

Condition 精准的通知和唤醒线程

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
public class C {
public static void main(String[] args) {
Data3 data = new Data3();
new Thread(() -> {
for (int i = 0; i < 10; i++) {
data.printfA();
}
}, "A").start();
new Thread(() -> {
for (int i = 0; i < 10; i++) {
data.printfB();
}
}, "B").start();
new Thread(() -> {
for (int i = 0; i < 10; i++) {
data.printfC();
}
}, "C").start();
}
}

class Data3 {
private int number = 1;
private Lock lock = new ReentrantLock();
Condition condition1 = lock.newCondition();
Condition condition2 = lock.newCondition();
Condition condition3 = lock.newCondition();

public void printfA() {
lock.lock();
try {
while (number != 1) {
condition1.await();
}
number=2;
System.out.println(Thread.currentThread().getName() + "=>AAAAA");
condition2.signal();
} catch (Exception e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}

public void printfB() {
lock.lock();
try {
while (number != 2) {
condition2.await();
}
number=3;
System.out.println(Thread.currentThread().getName() + "=>BBBBB");
condition3.signal();
} catch (Exception e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}

public void printfC() {
lock.lock();
try {
while (number != 3) {
condition3.await();
}
number=1;
System.out.println(Thread.currentThread().getName() + "=>CCCCC");
condition1.signal();
} catch (Exception e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}
}

并发下集合不安全

java.util.ConcurrentModificationException 并发修改异常

List 不安全

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
public class TestList {
public static void main(String[] args) {
List<String> list= new CopyOnWriteArrayList<>();

/**
* 并发下ArrayList不安全,synchronized
* 1.Vector<>()
* 2.Collections.synchronizedList(new ArrayList<>())
* 3.new CopyOnWriteArrayList<>();
*/

for (int i = 1; i <= 10; i++) {
new Thread(()->{
list.add(UUID.randomUUID().toString().substring(0,5));
System.out.println(list);
},String.valueOf(i)).start();
}
}
}

Set 不安全

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
public class TestSet {
public static void main(String[] args) {
Set<String> set =new CopyOnWriteArraySet<>();

/**
* 1.Collections.synchronizedSet(new HashSet<>());
* 2.new CopyOnWriteArraySet<>();
*/
for (int i = 0; i < 30; i++) {
new Thread(()->{
set.add(UUID.randomUUID().toString().substring(0,5));
System.out.println(set);
},String.valueOf(i)).start();
}
}
}

Hashset底层HashMap

1
2
3
4
5
6
public HashSet() {
map = new HashMap<>();
}
public boolean add(E e) {
return map.put(e, PRESENT)==null;
}

Map 不安全

回顾Map基本操作

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
/**
* The default initial capacity - MUST be a power of two.
*/
static final int DEFAULT_INITIAL_CAPACITY = 1 << 4; // aka 16

/**
* The maximum capacity, used if a higher value is implicitly specified
* by either of the constructors with arguments.
* MUST be a power of two <= 1<<30.
*/
static final int MAXIMUM_CAPACITY = 1 << 30;

/**
* The load factor used when none specified in constructor.
*/
static final float DEFAULT_LOAD_FACTOR = 0.75f;

测试

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
public class TestMap {
public static void main(String[] args) {
/**
* map这样用的吗? 不是,工作中不用 HashMap
* 默认等价于什么? new HashMap<>(16,0.75);
* 加载因子,初始容量
* 1.Collections.synchronizedMap(new HashMap<>());
* 2.new ConcurrentHashMap<>();
*/

Map<String,Object> map= new ConcurrentHashMap<>();

for (int i = 0; i < 10; i++) {
new Thread(()->{
map.put(Thread.currentThread().getName(), UUID.randomUUID().toString().substring(0,5));
System.out.println(map);
}).start();
}

}
}

Callable

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
public class TestCallable {
public static void main(String[] args) throws ExecutionException, InterruptedException {
/**
* new Thread(new Runable()).start();
* new Thread(new FutureTask<V>()).start();
* new Thread(new FutureTask<V>(Callable )).start();
*/

MyThread myThread=new MyThread();
FutureTask futureTask=new FutureTask(myThread);

new Thread(futureTask,"A").start();

Integer o = (Integer) futureTask.get();//可能产生阻塞
System.out.println(o);
}
}
class MyThread implements Callable<Integer> {

@Override
public Integer call(){
System.out.println("Call");
return 1024;
}
}

细节:

  1. 有缓存
  2. 结果可能需要等待,会阻塞!

因为内容太多了,所以将其拆分为以下内容