Java并发编程和高并发解决方案

您所在的位置:网站首页 java并发编程与高并发解决方案 Java并发编程和高并发解决方案

Java并发编程和高并发解决方案

2023-03-21 01:29| 来源: 网络整理| 查看: 265

目录 图示思维导图:并发和高并发基础概念并发基础1.CPU多级缓存(缓存一致性,乱序执行优化)2.Java内存模型3.并发的优势和风险CPU多级缓存CPU多级缓存-乱序执行优化java内存模型(java memory model,JMM)并发的优势与风险 线程安全性1.线程安全性概念2.线程安全性体现的三个方面线程安全性线程安全体现在以下三个方面原子性原子性——Atomic包原子性——锁原子性——对比 可见性可见性——synchronized可见性——volatile 有序性有序性——happens-before原则 安全发布对象1.发布与逸出2.四种方法发布与逸出安全发布对象四种方法 线程安全策略1.不可变对象2.线程封闭3.线程不安全类与写法4.同步容器5.并发容器6.安全共享策略不可变对象线程封闭性线程不安全类与写法线程安全——同步容器(在多线程环境下不推荐使用)线程安全——并发容器J.U.C(java.util.concurrent)(在多线程环境下推荐使用)安全共享对象策略——总结 J.U.C之AQS(AbstractQueuedSynchronizer)1.AQS介绍2.AQS-CountDownLatch3.AQS-Semaphore4.AQS-CyclicBarrier5.AQS-ReentrantLock与锁AQS介绍CountDownLatchSemaphoreCyclicBarrierReentrantLockReentrantReadWriteLockStampedLockCondition J.U.C组件扩展1.FutureTask2.Fork/Join3.BlockingQueueFutureTaskFork/Join框架BlockingQueue——阻塞队列 线程调度-线程池1.线程池概念和特点2.线程池状态3.线程池-相关类4.线程池-相关方法5.线程池合理配置线程池概念和特点线程池状态线程池-相关类线程池-相关方法线程池合理配置 多线程并发扩展1.死锁2.并发最佳实践3.Spring与线程安全4.HashMap与ConcurrentHashMap解析5.多线程并发与线程安全总结死锁并发最佳实践Spring与线程安全HashMap与ConcurrentHashMap 高并发之扩容思路高并发之缓存思路1.缓存-特征,场景及组件2.缓存-Redis3.高并发场景下缓存常见问题缓存-特征,场景及组件高并发场景下缓存常见问题 高并发之消息队列1.消息队列特性2.为什么需要消息队列3.消息队列技术 高并发之应用拆分1.拆分原则2.拆分思考拆分原则拆分思考 高并发之应用限流1.概念2.限流算法概念限流算法 高并发之服务降级与服务熔断高并发之数据库切库分库分表高并发之高可用手段

图示思维导图:

在这里插入图片描述 在这里插入图片描述 在这里插入图片描述 在这里插入图片描述

并发和高并发基础概念

并发 同时拥有两个或多个线程,如果程序在单核处理器上运行,多个线程将交替的换入或者换出内存,这些线程是同时“存在”的,每个线程都处于执行过程中的某个状态,如果运行在多核处理器上,此时,程序中的每个线程都将分配到一个处理器核上,因此可以同时运行。

高并发 高并发(High Concurrency)是互联网分布式系统架构设计中必须考虑的因素之一,它通常是指,通过设计保证系统能够同时并行处理很多请求。

总结:

并发:多个线程操作相同的资源,保证线程安全,合理使用资源 高并发:服务能同时处理很多请求,提高程序性能(更多的考虑技术手段)

并发基础 1.CPU多级缓存(缓存一致性,乱序执行优化) 2.Java内存模型 3.并发的优势和风险 CPU多级缓存

CPU 多级缓存:

主存和cpu通过主线连接,CPU缓存在主存和CPU之间,缓存的出现可以减少CPU读取共享主存的次数

为什么需要CPU cache:

CPU的频率太快了,快到主存跟不上,这样在处理器时钟周期内,CPU常常需要等待主存,浪费资源。所以cache的出现,是为了缓解CPU和内存之间速度不匹配问题(结构:cpu -> cache -> memery).

CPU cache有什么意义:

1)时间局部性:如果某个数据被访问,name在不久的将来它很可能被再次访问。 2)空间局部性:如果某个数据被访问,那么与它相邻的数据很快也可能被访问

CPU多级缓存-缓存一致性(MESI)

MESI分别代表cache数据的四种状态,这四种状态可以相互转换 缓存四种操作:local read、local write、remote read、remote write

CPU多级缓存-乱序执行优化

乱序执行优化

处理器为提高运算速度而做出违背代码原有顺序的优化

java内存模型(java memory model,JMM)

在这里插入图片描述 在这里插入图片描述 Java内存模型抽象结构图 在这里插入图片描述

java内存模型-同步八种操作

lock(锁定):作用于主内存的变量,把一个变量标识为一条线程独占状态 unlock(解锁):作用于主内存变脸个,把一个处于锁定状态的变量释放出来,释放后的变量才可以被其他线程锁定 read(读取):作用于主内存变量,把一个变量值从主内存传输到线程的工作内存中,以便随后的load动作使用 load(载入):作用于工作内存的变量,它把read操作从主内存中得到的变量值放入工作内存的变量副本中 use(使用):作用于工作内存的变量,把工作内存中的一个变量值传递给执行引擎 assign(赋值):作用于工作内存的变量,它把一个从执行引擎接收到的值赋值给工作内存的变量 store(存储):作用于工作内存的变量,把工作内存中的一个变量的值传送到主内存中,以遍随后的write的操作 write(写入):作用于主内存的变量,它把store操作从工作内存中的一个变量的值传送到主内存的变量中

java内存模型-同步规则

1,如果要把一个变量从主内存中复制到工作内存,就需要按顺序的执行read和load操作,如果把变量从工作内存中同步回主内存,就需要按顺序的执行store和write操作。但java内存模型只要求上述操作必须按顺序执行,而没有保证必须是连续执行

2,不允许read和load、store和write操作之一单独出现

3,不允许一个线程丢弃它的最近assign的操作,即变量在工作内存中改变了之后必须同步到主内存中。

4,不允许一个线程无原因的(没发生过任何assign操作)把数据从工作内存同步回主内存中

5,一个新的变量只能在主内存中诞生,不允许在工作内存中直接使用一个未被初始化(load或assign)的变量。即就是对一个变量实施use和store操作之前,必须先执行过了assign和load操作

6,一个变量在同一时刻只允许一条线程对其进行lock操作,但lock操作可以被同一条线程重复执行多次,多次执行lock后,只有执行相同次数的unlock操作,变量才会被解锁。lock和unlock必须成对出现

7,如果对一个变量执行lock操作,将会清空工作内存中此变量的值,在执行引擎使用这个变量之前需要重新执行load或assign操作初始化变量的值

8,如果一个变量实现没有被lock操作锁定,怎不允许对它执行unlock操作,也不允许去unlock一个被其他线程锁定的变量

9,对一个变量执行unlock操作之前,必须先把此变量同步到主内存中(执行store和write操作)

Java内存模型-同步操作与规则 在这里插入图片描述

并发的优势与风险

优势

1,速度:同时处理多个请求,响应更快;复杂的操作可以分成多个进程同时进行 2,设计:程序设计在某些情况下更简单,也可以更多的选择 3,资源利用:CPU能够在等待IO的时候做一些其他的事情

风险

1,安全性:多个线程共享数据时可能会产生于期望不相符的结果 2,活跃性:某个操作无法继续进行下去时,就会发生活跃性问题。比如死锁、饥饿等问题 3,性能:线程过多时会使得CPU频繁切换,调度时间增多;同步机制;消耗过多内存

线程安全性 1.线程安全性概念 2.线程安全性体现的三个方面 线程安全性

当多个线程访问某个类时,不管运行时环境采用何种调度方式或者这些线程将如何交替执行,并且在主调代码中不需要任何额外的同步或协同,这个类都能表现出正确的行为,那么就称这个类时线程安全的。

线程安全体现在以下三个方面

1,原子性:提供了互斥访问,同一时刻只能有一个线程来对他进行操作 2,可见性:一个线程对主内存的修改可以及时的被其他线程观察到 3,有序性:一个线程观察其他线程中的指令执行顺序,由于指令重排序的存在,该观察结果一般杂乱无序

原子性 原子性——Atomic包

1,AtomicXxxx:CAS、Unsafe.compareAndSwapInt 2,AtomicLong,LongAdder 3.AtomicReference,AtomicReferenceFieldUpdater

理解:

AtomicXxxx类中方法incrementAndGet()是重点, incrementAndGet方法中调用unsafe.getAndAddInt(), getAndAddInt方法中主题是do-while语句,while语句中调用compareAndSwapInt(var1, var2, var5, var5 + var4), compareAndSwapInt方法就是CAS核心:

在死循环内,不断尝试修改目标值,直到修改成功,如果竞争不激烈,修改成功率很高,否则失败概率很高,性能会受到影响

jdk8中新增LongAdder,它和AtomicLong比较 优点:性能好,在处理高并发情况下统计优先使用LongAdder LongAddr原理?

AtomicReference、AtomicReferenceFieldUpdater原子性更新字段(字段要求volatile修饰,并且是非static)

AtomicStampReference:CAS的ABA问题 ABA问题:变量已经被修改了,但是最终的值和原来的一样,那么如何区分是否被修改过呢,用版本号解决

AtomicBoolean可以让某些代码只执行一次

原子性——锁

synchronized:依赖JVM,作用对象的作用范围内 Lock:依赖特殊的CPU指令,代码实现,ReentrantLock

synchronized

1,修饰代码块:同步代码块,大括号括起来的代码,作用于调用的对象 2,修饰方法:同步方法,整个方法,作用于调用的对象 3,修饰静态方法:整个静态方法,作用于所有对象 4,修饰类:括号括起来的部分,作用于所有对象

Lock

依赖特殊的CPU指令,代码实现,ReentrantLock

原子性——对比

synchronized:不可中断锁,适合竞争不激烈,可读性好 Lock:可中断锁,多样化同步,竞争激烈时能维持常态 Atomic:竞争激烈时能维持常态,比Lock性能好,只能同步一个值

可见性

导致共享变量在线程间不可见的原因:

1 线程交叉执行 2 重排序结合线程交叉执行 3 共享变量更新后的值没有在工作内存与主内存间及时更新

可见性——synchronized

JMM关于synchronized的两条规定: 1,线程解锁前,必须把共享变量的最新值刷新到主内存 2,线程加锁时,将清空工作内存中共享变量的值,从而使用共享变量时需要从主内存中重新读取最新的值(注意,加锁和解锁是同一把锁)

可见性——volatile

通过加入内存屏障和禁止重排序优化来实现

1 对volatile变量写操作时,会在写操作后加入一条store屏障指令,将本地内存中的共享变量值刷新到主内存 2 随volatile变量度操作时,会在读操作前加入一条load屏障指令,从主内存中读取共享变量 图示: 在这里插入图片描述 在这里插入图片描述

使用volatile修饰变量,无法保证线程安全

volatile适合修饰状态标识量 在这里插入图片描述

有序性

java内存模型中,允许编译器和处理器对指令进行重排序,但是重排序过程不会影响到单线程程序的执行,却会影响到多线程并发执行的正确性

有序性——happens-before原则

1 程序次序规则:一个线程内,按照代码顺序,书写在前面的操作先行发生于书写在后面的操作 注:在单线程中,看起来是这样的,虚拟机可能会对代码进行指令重排序,虽然重排序了,但是运行结果在单线程中和指令书写顺序是一致的,事实上,这条规则是用来保证程序单在单线程中执行结果的正确性,无法保证程序在多线程中的正确性. 2 锁定规则:一个unlock操作先行发生于后面对同一个锁的lock操作 3 volatile变量规则:对一个变量的写操作先行发生于后面对这个变量的读操作 4 传递规则:如果操作A先行发生于操作B,而操作B又先行发生于操作C,则可以得出操作A先行发生于操作C 前四条规则比较重要 5 线程启动规则:Thread对象的start()方法先行发生于次线程的每一个动作 6 线程中断规则:对线程interrupt()方法的调用先行发生于被中断线程的代码监测到中断事件的发生 7 线程终结规则:线程中所有的操作都先行发生于线程的终止检测,我们可以通过Thread.join()方法结束、Thread.isAlive()的返回值手段检测到线程已经终止执行 8 对象终结规则:一个对象的初始化完成先行发生于他的finalize()方法的开始

理解:

一个线程观察其他线程指令执行顺序,由于重排序的存在,观察结果一般是无序的,如果两个操作执行顺序无法从happens-before原则推导出来,那么他们就不能保证有序性,虚拟机可以随意的对他们重排序

安全发布对象 1.发布与逸出 2.四种方法 发布与逸出

发布对象

使一个对象能够被当前范围之外的代码所使用

对象逸出

一种错误的发布。当一个对象还没有构造完成时,就使它被其他线程所见

安全发布对象四种方法

1 在静态初始化函数中初始化一个对象引用 2 将对象的引用保存到volatile类型域或者AtomicReference对象中 3 将对象的引用保存到某个正确构造对象的final类型域中 4 将对象的引用保存到一个由锁保护的域中

理解

私有构造函数,单例对象,静态工厂方法获取对象

以单例模式为例

懒汉模式:单例实例在第一次使用时进行创建(线程不安全)

懒汉模式也可以实现线程安全,给getInstance方法添加synchronized关键字(不推荐,因为性能不好)

双重同步锁单例模式:双重监测机制,在方法内部加synchronized关键字(不是线程安全的)

原因是,创建对象是,分为以下三个步骤:

1) memory = allocate() 分配对象的内存空间

2)ctorInstance() 初始化对象

3)instance = memory() 设置instance指向刚分配的内存

由于JVM和cpu优化,可能会发生指令重排:

1) memory = allocate() 分配对象的内存空间

3) instance = memory() 设置instance指向刚分配的内存

2) ctorInstance() 初始化对象

当以上面这种指令执行时,线程A执行到3 instance = memory() 设置instance指向刚分配的内存 这一步时,线程B执行if(instance == null)这段代码,此时instance != null,线程B直接return instance,导致对象没有初始化完毕就返回

解决办法就是限制对象创建时进行指令重排,volatile+双重监测机制->禁止指令重排引起非线程安全

饿汉模式:单例实例在类装载时进行创建(线程安全)

枚举模式:线程安全

线程安全策略 1.不可变对象 2.线程封闭 3.线程不安全类与写法 4.同步容器 5.并发容器 6.安全共享策略 不可变对象

不可变对象需要满足的条件:

对象创建以后其状态就不能修改 对象所有域都是final类型 对象是正确创建的(在对象创建期间,this引用没有逸出)

final关键字定义不可变对象

修饰类、方法、变量 修饰类:不能被继承 修饰方法:1.锁定方法不被继承类修改 2.效率 修饰变量:基本数据类型,数值不可变;引用类型变量,不能再指向另外一个对象,因此容易引起线程安全问题

其他实现不可变对象

Collections.unmodifiableXXX:Collection、List、Set、Map(线程安全) Guava:ImmutableXXX:Collection、List、Set、Map

线程封闭性

线程封闭

把对象封装到一个线程里,只有这个线程可以看到该对象,那么就算该对象不是线程安全的,也不会出现任何线程安全方面的问题。实现线程封闭的方法:

线程封闭方法

1 Ad-hoc线程封闭:程序控制实现,最糟糕,忽略 2 堆栈封闭:局部变量,无并发问题 3 threadLocal是线程安全的,做到了线程封闭

ThreadLocal内部维护了一个map,map的key是每个线程的名称,map的值是要封闭的对象,每一个线程中的对象都对应者一个map中的值

线程封闭的应用场景: 数据库连接jdbc的Connection对象

线程不安全类与写法

字符串

StringBuilder:线程不安全 StringBuffer:线程安全

时间转换

SimpleDateFormat:线程不安全 JodaTime:线程安全

集合

ArrayList,HashSet,HashMap等Collections:线程不安全

编程注意:

if(condition(a)){handle(a)}; 不是线程安全的,因为这条判断语句不是原子性的,如果有线程共享这条代码,则会出现并发问题,解决方案是想办法这这段代码是原子性的(加锁)

线程安全——同步容器(在多线程环境下不推荐使用)

ArrayList -> Vector, Stack(Vector中的方法使用synchronized修饰过,Stack继承Vector) HashMap -> HashTable(key、value不能为null,HashTable使用synchronized修饰方法) Collections.synchronizedXXX(List、Set、Map)

同步容器不完全是线程安全的 编程注意:如果使用foreach或者iterator遍历集合时,尽量不要对集合进行修改操作

线程安全——并发容器J.U.C(java.util.concurrent)(在多线程环境下推荐使用)

ArrayList -> CopyOnWriteArrayList:

相比ArrayList,CopyOnWriteArrayList是线程安全的,写操作时复制,即当有新元素添加到CopyOnWriteArrayList时,先从原有的数组里拷贝一份出来,然后在新的数组上写操作,写完之后再将原来的数组指向新的数组,CopyOnWriteArrayList整个操作都是在锁(ReentrantLock锁)的保护下进行的,这么做主要是避免在多线程并发做add操作时复制出多个副本出来,把数据搞乱了。第一个缺点是做写操作时,需要拷贝数组,就会消耗内存,如果元素内容比较多会导致youngGC或者是fullGc;第二个缺点是不能用于实时读的场景,比如拷贝数组、新增元素都需要时间,所以调用一个set操作后,读取到的数据可能还是旧的,虽然CopyOnWriteArrayList能够做到最终的一致性,但是没法满足实时性要求,因此CopyOnWriteArrayList更适合读多写少的场景 CopyOnWriteArrayList设计思想:1读写分离 2最终一致性 3使用时另外开辟空间解决并发冲突

HashSet -> CopyOnWriteArraySet

CopyOnWriteArraySet:底层实现是CopyOnWriteArrayList

TreeSet -> ConcurrentSkipListSet ConcurrentSkipListSet:和TreeSet 一样支持自然排序,基于map集合,但是批量操作不是线程安全的

HashMap -> ConcurrentHashMap :不允许空值,针对读操作做了大量的优化,具有特别高的并发性

TreeMap -> ConcurrentSkipListMap :内部使用SkipList跳表结构实现的,key是有序的,支持更高的并发

安全共享对象策略——总结

1 线程限制:一个呗线程限制的对象,由线程独占,并且只能被占有它的线程修改 2 共享只读:一个共享只读的对象,在没有额外的同步情况下,可以被多个线程并发访问,但是任何线程都不能修改它 3 线程安全对象:一个线程安全的对象或容器,在内部通过同步机制来保证线程安全,所以其他线程无序额外的同步就可以通过公共接口随意访问它 4 被守护对象:被守护对象只能通过获取特定的锁来访问

J.U.C之AQS(AbstractQueuedSynchronizer) 1.AQS介绍 2.AQS-CountDownLatch 3.AQS-Semaphore 4.AQS-CyclicBarrier 5.AQS-ReentrantLock与锁 AQS介绍

1 使用Node实现FIFO队列,可以用于构建锁或者其他同步装置的基础框架 2 利用了int类型表示状态 3 使用方法是继承 4 子类通过继承并通过实现它的方法管理器状态{acquire和release}的方法操纵状态 5 可以同时实现排它锁和共享锁模式(独占、共享)

AQS实现原理

?

AQS同步组件 1 CountDownLatch:闭锁,通过计数来保证线程是否需要一直阻塞 2 Semaphore:控制同一时间并发线程的数目 3 CyclicBarrier:和CountDownLatch相似,都能阻阻塞线程 4 ReentrantLock 5 Condition 6 FutureTask

CountDownLatch

CountDownLatch是一个同步辅助类,应用场景:并行运算,所有线程执行完毕才可执行

在这里插入图片描述 代码示例1:

import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; class Scratch{ private final static int threadCount = 50; public static void main(String[] args) throws Exception{ ExecutorService executorService = Executors.newCachedThreadPool(); final CountDownLatch countDownLatch = new CountDownLatch(threadCount); for (int i=0;i{ try { test(threadNum); } catch (Exception e) { System.out.println("exception:"+e); }finally { countDownLatch.countDown(); } }); } countDownLatch.await();// countDownLatch减为0时才会继续下面流程 System.out.println("run finish"); executorService.shutdown(); } private static void test(int threadNum) throws Exception{ Thread.sleep(100); System.out.println("run:"+threadNum); Thread.sleep(100); } }

代码示例2: await方法可以设定指定等待时间,超过这个时间久不再等待

public static void main(String[] args) throws Exception{ ExecutorService executorService = Executors.newCachedThreadPool(); final CountDownLatch countDownLatch = new CountDownLatch(threadCount); for (int i=0;i{ try { test(threadNum); } catch (Exception e) { System.out.println("exception:"+e); }finally { countDownLatch.countDown(); } }); } countDownLatch.await(10, TimeUnit.MILLISECONDS);//超过这个时间久不再等待 System.out.println("run finish"); executorService.shutdown(); } Semaphore

Semaphore可以很容易控制某个资源可同时访问的线程个数, 和CountDownLatch使用有些类似,提供acquire和release两个方法, acquire是获取一个许可,如果没有就等待, release是在操作完成后释放许可出来。 Semaphore维护了当前访问的线程的个数,提供同步机制来控制同时访问的个数,Semaphore可以实现有限大小的链表,重入锁(如ReentrantLock)也可以实现这个功能,但是实现上比较复杂。

Semaphore使用场景:适用于仅能提供有限资源,如数据库连接数

代码示例1:

import java.time.LocalDate; import java.util.concurrent.*; class Scratch{ private final static int threadCount = 20; public static void main(String[] args) throws Exception{ ExecutorService executorService = Executors.newCachedThreadPool(); final Semaphore semaphore = new Semaphore(3); for (int i=0;i{ try { semaphore.acquire(); test(threadNum); semaphore.release(); } catch (Exception e) { System.out.println("exception:"+e); } }); } executorService.shutdown(); } private static void test(int threadNum) throws Exception{ System.out.println("time:"+ LocalDate.now() +"-"+threadNum); Thread.sleep(1000); } }

代码示例2:

public static void main(String[] args) throws Exception{ ExecutorService executorService = Executors.newCachedThreadPool(); final Semaphore semaphore = new Semaphore(3); for (int i=0;i{ try { if (semaphore.tryAcquire()){ //尝试获取一个许可 test(threadNum); semaphore.release(); // 释放一个许可 } } catch (Exception e) { System.out.println("exception:"+e); } }); } executorService.shutdown(); }

代码示例3

public static void main(String[] args) throws Exception{ ExecutorService executorService = Executors.newCachedThreadPool(); final Semaphore semaphore = new Semaphore(3); for (int i=0;i{ try { if (semaphore.tryAcquire(1,5000,TimeUnit.MICROSECONDS)){ test(threadNum); semaphore.release(); } } catch (Exception e) { System.out.println("exception:"+e); } }); } executorService.shutdown(); } CyclicBarrier

在这里插入图片描述

与CountDownLatch相似,都是通过计数器实现,当某个线程调用await方法,该线程就进入等待状态,且计数器进行加1操作,当计数器的值达到设置的初始值,进入await等待的线程会被唤醒,继续执行他们后续的操作。由于CyclicBarrier在释放等待线程后可以重用,所以又称循环屏障。使用场景和CountDownLatch相似,可用于并发运算。

CyclicBarrier和CountDownLatch区别:

1 CountDownLatch计数器只能使用一次,CyclicBarrier的计数器可以使用reset方法重置循环使用 2 CountDownLatch主要是视线1个或n个线程需要等待其他线程完成某项操作才能继续往下执行,CyclicBarrier主要是实现多个线程之间相互等待知道所有线程都满足了条件之后才能继续执行后续的操作,CyclicBarrier能处理更复杂的场景

代码示例:

import java.util.concurrent.*; class Scratch{ private final static int threadCount = 10; private static CyclicBarrier barrier = new CyclicBarrier(5); public static void main(String[] args) throws Exception{ ExecutorService executorService = Executors.newCachedThreadPool(); for (int i=0;i{ try { test(threadNum); } catch (Exception e) { System.out.println("exception:"+e); } }); } executorService.shutdown(); } private static void test(int threadNum) throws Exception{ Thread.sleep(1000); System.out.println(threadNum+" is ready"); barrier.await(); System.out.println(threadNum+" continue"); } } ReentrantLock

reentrantLock(可重入锁)和synchronized区别

1 可重入性:同一线程可以重入获得相同的锁,计数器加1,释放锁计数器减1 synchronized也是可重入锁 2 锁的实现:synchronized依赖jvm实现(操作系统级别的实现),reentrantLock是jdk实现的(用户自己编程实现) 3 性能区别:synchronized在优化前性能比reentrantLock差,优化后性能有了恨到提升,相同条件下优先使用synchronized 4 功能区别:1)便利性方面,synchronized使用简单,reentrantLock需要手工加锁和释放锁2)锁的细粒度和灵活度方面,reentrantLock优于synchronized 5 reentrantlock独有的功能:1)可指定是公平锁还是非公平锁,synchronized只能是非公平锁 2)提供了一个Condition类,可以分组唤醒需要唤醒的线程 3)能够提供中断等待锁的线程机制,lock.lockInterruptibly()

代码示例:

import java.util.concurrent.*; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; class Scratch{ private final static int threadCount = 10; private static CyclicBarrier barrier = new CyclicBarrier(5); // 请求总数 public static int clientTotal = 5000; // 同时并发执行的线程数 public static int threadTotal = 200; public static int count = 0; private final static Lock lock = new ReentrantLock(); public static void main(String[] args) throws Exception { ExecutorService executorService = Executors.newCachedThreadPool(); final Semaphore semaphore = new Semaphore(threadTotal); final CountDownLatch countDownLatch = new CountDownLatch(clientTotal); for (int i = 0; i < clientTotal ; i++) { executorService.execute(() -> { try { semaphore.acquire(); add(); semaphore.release(); } catch (Exception e) { System.out.println(e); } countDownLatch.countDown(); }); } countDownLatch.await(); executorService.shutdown(); System.out.println("count:"+count); } private static void add() { lock.lock(); try { count++; } finally { lock.unlock(); } } } ReentrantReadWriteLock

在没有任何读写锁时,才可以取得写入锁 悲观写锁,即当所有读锁释放之后,才能加写锁,对于读多写少的程序,会引起堵塞或者死锁

代码示例:

import java.util.Map; import java.util.Set; import java.util.TreeMap; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantReadWriteLock; class Scratch{ private final Map map = new TreeMap(); private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock(); private final Lock readLock = lock.readLock(); private final Lock writeLock = lock.writeLock(); public Scratch.Data get(String key) { readLock.lock(); try { return map.get(key); } finally { readLock.unlock(); } } public Set getAllKeys() { readLock.lock(); try { return map.keySet(); } finally { readLock.unlock(); } } public Scratch.Data put(String key, Scratch.Data value) { writeLock.lock(); try { return map.put(key, value); } finally { readLock.unlock(); } } class Data { } } StampedLock

StampedLock

?

示例代码1

import java.util.concurrent.locks.StampedLock; class Scratch{ class Point { private double x, y; private final StampedLock sl = new StampedLock(); void move(double deltaX, double deltaY) { // an exclusively locked method long stamp = sl.writeLock(); try { x += deltaX; y += deltaY; } finally { sl.unlockWrite(stamp); } } //下面看看乐观读锁案例 double distanceFromOrigin() { // A read-only method long stamp = sl.tryOptimisticRead(); //获得一个乐观读锁 double currentX = x, currentY = y; //将两个字段读入本地局部变量 if (!sl.validate(stamp)) { //检查发出乐观读锁后同时是否有其他写锁发生? stamp = sl.readLock(); //如果没有,我们再次获得一个读悲观锁 try { currentX = x; // 将两个字段读入本地局部变量 currentY = y; // 将两个字段读入本地局部变量 } finally { sl.unlockRead(stamp); } } return Math.sqrt(currentX * currentX + currentY * currentY); } //下面是悲观读锁案例 void moveIfAtOrigin(double newX, double newY) { // upgrade // Could instead start with optimistic, not read mode long stamp = sl.readLock(); try { while (x == 0.0 && y == 0.0) { //循环,检查当前状态是否符合 long ws = sl.tryConvertToWriteLock(stamp); //将读锁转为写锁 if (ws != 0L) { //这是确认转为写锁是否成功 stamp = ws; //如果成功 替换票据 x = newX; //进行状态改变 y = newY; //进行状态改变 break; } else { //如果不能成功转换为写锁 sl.unlockRead(stamp); //我们显式释放读锁 stamp = sl.writeLock(); //显式直接进行写锁 然后再通过循环再试 } } } finally { sl.unlock(stamp); //释放读锁或写锁 } } } }

代码示例2

import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Semaphore; import java.util.concurrent.locks.StampedLock; class Scratch{ // 请求总数 public static int clientTotal = 5000; // 同时并发执行的线程数 public static int threadTotal = 200; public static int count = 0; private final static StampedLock lock = new StampedLock(); public static void main(String[] args) throws Exception { ExecutorService executorService = Executors.newCachedThreadPool(); final Semaphore semaphore = new Semaphore(threadTotal); final CountDownLatch countDownLatch = new CountDownLatch(clientTotal); for (int i = 0; i < clientTotal ; i++) { executorService.execute(() -> { try { semaphore.acquire(); add(); semaphore.release(); } catch (Exception e) { log.error("exception", e); } countDownLatch.countDown(); }); } countDownLatch.await(); executorService.shutdown(); log.info("count:{}", count); } private static void add() { long stamp = lock.writeLock(); try { count++; } finally { lock.unlock(stamp); } } } Condition

多线程建协调通信的工具类

代码示例:

import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.ReentrantLock; class Scratch{ public static void main(String[] args) { ReentrantLock reentrantLock = new ReentrantLock(); Condition condition = reentrantLock.newCondition(); new Thread(() -> { try { reentrantLock.lock(); log.info("wait signal"); // 1 condition.await(); } catch (InterruptedException e) { e.printStackTrace(); } log.info("get signal"); // 4 reentrantLock.unlock(); }).start(); new Thread(() -> { reentrantLock.lock(); log.info("get lock"); // 2 try { Thread.sleep(3000); } catch (InterruptedException e) { e.printStackTrace(); } condition.signalAll(); log.info("send signal ~ "); // 3 reentrantLock.unlock(); }).start(); } } J.U.C组件扩展 1.FutureTask 2.Fork/Join 3.BlockingQueue FutureTask

Callable与Runnable接口对比 Future接口,可以得到任务的返回值 FutureTask父类是RunnableFuture,RunnableFuture继承了Runnable和Future两个接口

使用场景

假设又一个很费事逻辑,需要计算,并且返回这个值,同时这个值又不是马上需要, 那么就可以使用这个组合,用另外一个线程去计算返回值, 而当前线程在使用这个返回值之前可以做其他操作,等到需要这个返回值时,再通过Future得到

Future代码示例1:

import lombok.extern.slf4j.Slf4j; import java.util.concurrent.Callable; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; @Slf4j public class FutureExample { static class MyCallable implements Callable { @Override public String call() throws Exception { log.info("do something in callable"); Thread.sleep(5000); return "Done"; } } public static void main(String[] args) throws Exception { ExecutorService executorService = Executors.newCachedThreadPool(); Future future = executorService.submit(new MyCallable()); log.info("do something in main"); Thread.sleep(1000); String result = future.get(); log.info("result:{}", result); } }

FutureTask示例代码2:

@Slf4j public class FutureTaskExample { public static void main(String[] args) throws Exception { FutureTask futureTask = new FutureTask(new Callable() { @Override public String call() throws Exception { log.info("do something in callable"); Thread.sleep(5000); return "Done"; } }); new Thread(futureTask).start(); log.info("do something in main"); Thread.sleep(1000); String result = futureTask.get(); log.info("result:{}", result); } } Fork/Join框架

将大人物切分成多个小任务并行执行,最后将结果汇总,思想和mapreduce类似。采用工作窃取算法,充分利用线程并行计算 在这里插入图片描述

代码示例

public class ForkJoinTaskExample extends RecursiveTask { public static final int threshold = 2; private int start; private int end; public ForkJoinTaskExample(int start, int end) { this.start = start; this.end = end; } @Override protected Integer compute() { int sum = 0; //如果任务足够小就计算任务 boolean canCompute = (end - start)


【本文地址】


今日新闻


推荐新闻


CopyRight 2018-2019 办公设备维修网 版权所有 豫ICP备15022753号-3