【精选】Java多线程

您所在的位置:网站首页 java线程之间传递数据 【精选】Java多线程

【精选】Java多线程

2023-11-05 04:07| 来源: 网络整理| 查看: 265

线程间的通信方式 等待唤醒机制的替代:Lock 和 Condition

wait() 和 notify() 方法,只能被同步监听锁对象来调用,否则就会报出 IllegalMonitorZStateException 的异常,那么现在问题来了,我们在上一篇提到的 Lock 机制根本就没有同步锁了,也就是没有自动获取锁和自动释放锁的概念,因为没有同步锁,也就意味着 Lock 机制不能调用 wait 和 notify 方法,我们怎么办呢?

好在 Java 5 中提供了 Lock 机制的同时也提供了用于 Lock 机制控制通信的 Condition 接口,如果大家理解了上面说到的 Object.wait() 和 Object.notify() 方法的话,那么就能很容易地理解 Condition 对象了。

它和 wait() 和 notify() 方法的作用是大致相同的,只不过后者是配合 synchronized 关键字使用的,而 Condition 是与重入锁相关联的。通过 Lock 接口(重入锁就实现了这一接口)的 newCondition() 方法可以生成一个与当前重入锁绑定的 Condition 实例。利用 Condition 对象,我们就可以让线程在合适的时间等待,或者在某一个特定的时刻得到通知,继续执行。

我们拿上面的生产者消费者来举例,修改成 Lock 和 Condition 代码如下:

publicclass ShareResource { private String name; private String gender; // 新增加一个标志位,表示共享资源是否为空,默认为 true privateboolean isEmpty = true; private Lock lock = new ReentrantLock(); private Condition condition = lock.newCondition(); /** * 模拟生产者向共享资源对象中存储数据 * * @param name * @param gender */ public void push(String name, String gender) { lock.lock(); try { while (!isEmpty) { // 当前共享资源不为空的时,则等待消费者来消费 condition.await(); } // 开始生产 this.name = name; Thread.sleep(10); this.gender = gender; // 生产结束 isEmpty = false; // 生产结束唤醒消费者来消费 condition.signalAll(); } catch (Exception ignored) { } finally { lock.unlock(); } } /** * 模拟消费者从共享资源中取出数据 */ public void popup() { lock.lock(); try { while (isEmpty) { // 为空则等着生产者进行生产 condition.await(); } // 消费开始 Thread.sleep(10); System.out.println(this.name + "-" + this.gender); // 消费结束 isEmpty = true; // 消费结束唤醒生产者去生产 condition.signalAll(); } catch (InterruptedException ignored) { } finally { lock.unlock(); } } }

在 JDK 内部,重入锁和 Condition 对象被广泛地使用,以 ArrayBlockingQueue 为例,它的 put() 方法实现如下:

/** Main lock guarding all access */ final ReentrantLock lock; /** Condition for waiting takes */ privatefinal Condition notEmpty; /** Condition for waiting puts */ privatefinal Condition notFull; // 构造函数,初始化锁以及对应的 Condition 对象 public ArrayBlockingQueue(int capacity, boolean fair) { if (capacity while (count == items.length) // 等待队列有足够的空间 notFull.await(); enqueue(e); } finally { lock.unlock(); } } private void enqueue(E x) { // assert lock.getHoldCount() == 1; // assert items[putIndex] == null; final Object[] items = this.items; items[putIndex] = x; if (++putIndex == items.length) putIndex = 0; count++; // 通知需要 take() 的线程,队列已有数据 notEmpty.signal(); }

同理,对应的 take() 方法实现如下:

public E take() throws InterruptedException { final ReentrantLock lock = this.lock; lock.lockInterruptibly(); try { while (count == 0) // 如果队列为空,则消费者队列要等待一个非空的信号 notEmpty.await(); return dequeue(); } finally { lock.unlock(); } } 允许多个线程同时访问:信号量(Semaphore)

以下内容摘录 or 改编自 《实战 Java 高并发程序设计》 3.1.3 节的内容

信号量为多线程协作提供了更为强大的控制方法。广义上说,信号量是对锁的扩展,无论是内部锁 synchronized 还是重入锁 ReentrantLock,一次都只允许一个线程访问一个资源,而信号量却可以指定多个线程,同时访问某一个资源。信号量主要提供了以下构造函数:

public Semaphore(int permits) public Semaphore(int permits, boolean fair) // 第二个参数可以指定是否公平

在构造信号量对象时,必须要指定信号量的准入数,即同时能申请多少个许可。当每个线程每次只申请一个许可时,这就相当于指定了同时有多少个线程可以访问某一个资源。信号量的主要逻辑如下:

public void acquire() public void acquireUninterruptibly() public boolean tryAcquire() public boolean tryAcquire(long timeout, TimeUnit unit) public void release()

acquire() 方法尝试获得一个准入的许可。若无法获得,则线程会等待,直到有线程释放一个许可或者当前线程被中断。 acquireUninterruptibly() 方法和 acquire() 方法类似,但是不响应中断。 tryAcquire() 尝试获得一个许可,如果成功则返回 true,失败则返回 false,它不会进行等待,立即返回。 release() 用于在线程访问资源结束后,释放一个许可,以使其他等待许可的线程可以进行资源访问。

在 JDK 的官方 Javadoc 中,就有一个有关信号量使用的简单实例,有兴趣的读者可以自行去翻阅一下,这里给出一个更傻瓜化的例子:

publicclass SemapDemo implements Runnable { final Semaphore semaphore = new Semaphore(5); @Override public void run() { try { semaphore.acquire(); // 模拟耗时操作 Thread.sleep(2000); System.out.println(Thread.currentThread().getId() + ":done!"); semaphore.release(); } catch (InterruptedException ignore) { } } public static void main(String[] args) { ExecutorService executorService = Executors.newFixedThreadPool(20); final SemapDemo demo = new SemapDemo(); for (int i = 0; i public static void main(String[] args) throws InterruptedException { Thread previous = Thread.currentThread(); for (int i = 0; i private Thread thread; public Domino(Thread thread) { this.thread = thread; } @Override public void run() { try { thread.join(); } catch (InterruptedException ignore) { } System.out.println(Thread.currentThread().getName() + " terminate. "); } } }

运行程序,可以看到下列输出:

main terminate. 0 terminate. 1 terminate. 2 terminate. 3 terminate. 4 terminate. 5 terminate. 6 terminate. 7 terminate. 8 terminate. 9 terminate.

说明每个线程终止的前提都是前驱线程的终止,每个线程等待前驱线程结束后,才从 join() 方法中返回,这里涉及了等待/ 通知机制,在 JDK 的源码中,我们可以看到 join() 的方法如下:

lic final synchronized void join(long millis) throws InterruptedException { long base = System.currentTimeMillis(); long now = 0; if (millis // 条件不满足则继续等待 while (isAlive()) { wait(0); } // 条件符合则返回 } else { while (isAlive()) { long delay = millis - now; if (delay Thread thread = new Thread(() -> { while (!isOver) { } System.out.println("线程已感知到 isOver 置为 true,线程正常返回!"); }); thread.start(); Thread.sleep(500); isOver = true; System.out.println("isOver 已置为 true"); }

我们开启了一个主线程和一个子线程,我们期望子线程能够感知到 isOver 变量的变化以结束掉死循环正常返回,但是运行程序却发现并不是像我们期望的那样发生,子线程一直处在了死循环的状态!

为什么会这样呢?

Java 内存模型

关于这一点,我们有几点需要说明,首先需要搞懂 Java 的内存模型: 在这里插入图片描述 Java 虚拟机规范中试图定义一种 Java 内存模型(Java Memory Model, JMM)来屏蔽掉各层硬件和操作系统的内存访问差异,以实现让 Java 程序在各种平台下都能达到一致的内存访问效果。

Java 内存模型规定了所有的变量都存储在主内存(Main Memory)中。每条线程还有自己的工作内存(Working Memory),线程的工作内存中保存了被该线程使用到的变量的主内存副本拷贝,线程对变量的所有操作(读取、赋值等)都必须在主内存中进行,而不能直接读写主内存中的变量。不同的线程之间也无法直接访问对方工作内存中的变量,线程间的变量值的传递均需要通过主内存来完成,线程、主内存、工作内存三者的关系如上图。

那么不同的线程之间是如何通信的呢?

在共享内存的并发模型里,线程之间共享程序的公共状态,线程之间通过写-读内存中的公共状态来隐式进行通信,典型的共享内存通信方式就是通过共享对象进行通信。 在这里插入图片描述 例如上图线程 A 与 线程 B 之间如果要通信的话,那么就必须经历下面两个步骤:

首先,线程 A 把本地内存 A 更新过的共享变量刷新到主内存中去 然后,线程 B 到主内存中去读取线程 A 之前更新过的共享变量 在这里插入图片描述 在消息传递的并发模型里,线程之间没有公共状态,线程之间必须通过明确的发送消息来显式进行通信,在 Java 中典型的消息传递方式就是 wait() 和 notify()。

说回刚才出现的问题,就很容易理解了:每个线程都有独占的内存区域,如操作栈、本地变量表等。线程本地保存了引用变量在堆内存中的副本,线程对变量的所有操作都在本地内存区域中进行,执行结束后再同步到堆内存中去。也就是说,我们在主线程中修改的 isOver 的值并没有被子线程读取到(没有被刷入主内存),也就造成了子线程对于 isOver 变量不可见。

解决方法也很简单,只需要在 isOver 变量前加入 volatile 关键字就可以了,这是因为加入了 volatile 修饰的变量允许直接与主内存交互,进行读写操作,保证可见性。

可以查阅之前写的博客:指令重排/ happen-before 原则

volatile 不保证原子性 可以查阅之前的写的博客:demo验证volatile原子性

保证原子性:synchronized

Java 中任何一个对象都有一个唯一与之关联的锁,这样的锁作为该对象的一系列标志位存储在对象信息的头部。Java 对象头里的 Mark Word 里默认的存放的对象的 Hashcode/ 分代年龄和锁标记位。32 为 JVM Mark Word 默认存储结构如下: 在这里插入图片描述 Java SE 1.6 中,锁一共有 4 种状态,级别从低到高依次是:无锁状态、偏向锁状态、轻量级锁状态和重量级锁状态,这几个状态会随着竞争情况逐渐升级。锁可以升级但不能降级,意味着偏向锁升级成轻量级锁后不能降级成偏向锁。这种锁升级却不能降级的策略,目的是为了提高获得锁和释放锁的效率。

偏向锁

HotSpot 的作者经过研究发现,大多数情况下,锁不仅不存在多线程竞争,而且总是由同一线程多次获得,为了让线程获得锁的代价更低而引入了偏向锁。

偏向锁的获取:当一个线程访问同步块并获取锁时,会在对象头和栈帧中的锁记录里存储锁偏向的线程 ID,以后该线程在进入和退出同步块时不需要进行 CAS 操作来加锁和解锁,只需简单地测试一下对象头的 Mark Word 里是否存储着指向当前线程的偏向锁。如果测试成功,表示线程已经获得了锁。如果测试失败,则需要再测试一下 Mark Word 中偏向锁的标识是否设置成 1(表示当前是偏向锁),如果没有设置,则使用 CAS 竞争锁;如果设置了,则尝试使用 CAS 将对象头的偏向锁指向当前线程。

偏向锁的撤销:偏向锁使用了一种等到竞争出现才释放锁的机制,所以当其他线程尝试竞争偏向锁时,持有偏向锁的线程才会释放锁。

下图线程 1 展示了偏向锁获取的过程,线程 2 展示了偏向锁撤销的过程。 在这里插入图片描述

轻量级锁和自旋锁

如果偏向锁失败,虚拟机并不会立即挂起线程。它还会使用一种称为轻量级锁的优化手段。

线程在执行同步块之前,JVM 会先在当前线程的栈桢中创建用于存储锁记录的空间,并将对象头中的 Mark Word 复制到锁记录中,官方称为 Displaced Mark Word。然后线程尝试使用 CAS 将对象头中的 Mark Word 替换为指向锁记录的指针。如果成功,当前线程获得锁,如果失败,表示其他线程竞争锁,当前线程便尝试使用自旋(自己执行几个空循环再进行尝试)来获取锁。

轻量级解锁时,会使用原子的 CAS 操作将 Displaced Mark Word 替换回到对象头,如果成功,则表示没有竞争发生。如果失败,表示当前锁存在竞争,锁就会膨胀成重量级锁。下图是两个线程同时争夺锁,导致锁膨胀的流程图。

在这里插入图片描述

几种锁的比较

下图就简单概括了一下几种锁的比较: 在这里插入图片描述

每人一支笔:ThreadLocal

除了控制资源的访问外,我们还可以通过增加资源来保证所有对象的线程安全。比如,让 100 个人填写个人信息表,如果只有一支笔,那么大家就得挨个写,对于管理人员来说,必须保证大家不会去哄抢这仅存的一支笔,否则,谁也填不完。从另外一个角度出发,我们可以干脆就准备 100 支笔,那么所有人都可以各自为营,很快就能完成表格的填写工作。

如果说锁是使用第一种思路,那么 ThreadLocal 就是使用第二种思路了。

当使用 ThreadLocal 维护变量时,其为每个使用该变量的线程提供独立的变量副本,所以每一个线程都可以独立的改变自己的副本,而不会影响其他线程对应的副本。

建议阅读之前写的博客:ThreadLocal介绍以及内存泄漏问题



【本文地址】


今日新闻


推荐新闻


CopyRight 2018-2019 办公设备维修网 版权所有 豫ICP备15022753号-3