并发编程之一分布式与高并发之并发编程的发展

您所在的位置:网站首页 并发和分布式程序设计原理 并发编程之一分布式与高并发之并发编程的发展

并发编程之一分布式与高并发之并发编程的发展

2023-12-28 21:26| 来源: 网络整理| 查看: 265

分布式与高并发之并发编程的发展 目标(了解线程) 了解并发和并发的价值了解线程java中的线程的实现多线程的基本原理线程的启动和停止 并发 高并发 当前系统能够同时承载的并发数 一般通过两个值来衡量当前系统的并发数 TPS:每秒处理的事务的数量 QPS:每秒处理的查询的数量如何支撑高并发? 核心点:硬件资源 CPU,核心数:代表了当前程序同时并行的任务数量 内存,用于IO性能的提高,储存热点数据等等 磁盘,用高效读写的磁盘提升性能 网卡,万兆千兆的网卡提升数据传输速度 …何合理的利用资源呢? 如一个软件需要用到 CPU -->线程资源 8核,同时可以运行8个线程 IO --> 数据库的交互 --> 数据刷到磁盘, 优化点:内存/缓存/异步刷盘策略…;数据库分库分表,分布式缓存、分布式消息中间件 单节点遇到瓶颈,采用多个计算机组成一个分布式计算机 最终的收益:都是为了QPS、TPS的提升 qps 1000 --> 10000 最终的目的:配合程序一起提升系统的系统 多线程技术 什么是线程 java程序 -> .java源文件(磁盘) ->JVM(编译) .class -> main方法运行这个程序(加载到程内存中) -->产生一个进程 CPU来执行这个进程中的指令 假设进程中,有一个从磁盘加载一个文件保存到数据库的操作 涉及到CPU的运算效率,和磁盘的IO效率 CPU的效率 > 磁盘的效率 CPU会产生阻塞 CPU是计算机中的核心资源(CPU阻塞会造成CPU资源的浪费) 如何提升CPU的利用率? 思考:当某个进程因为IO阻塞时,能否切换到其他的进程来执行呢? 多道程序设计:多进程,让多个进程同时加载到内存,进程之间是相互隔离不影响。 分时系统:利用CPU的时间片的切换,当某个进程阻塞时可以切换到其他进程执行

线程 --> 即轻量级进程 一个进程中 -> 可以存在N个线程 1.加载磁盘 -> 将耗时的操作分配给线程执行 2.保存到数据库 3… 拓展概念: 并行:同一个时刻有多少个线程可以同时运行 并发:这个系统能够最高支撑处理的访问是多少

线程的特征 异步 (用户注册 --> 异步发送email、短信)并行(依赖CPU核数) Java中如何实现线程 继承Thread类实现Runable接口Callable/Future public class ThreadDemo implements Runnable{ @Override public void run() { try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("当前线程会被执行的代码"); } public static void main(String[] args) { new Thread(new ThreadDemo()).start(); //不需要等待这个程序的处理结果 new Thread(new SmsSenderTask()).start(); //不需要等待这个程序的处理结果,比如发送短信直接创建一个SmsSenderTask类就可以了 //为什么用start方法不用run方法,因为start方法是JVM层面的,等待CPU的调度就可以了 //start方法执行后会回调run方法 //run方法就只是一个实例方法的调用不能算是多线程 System.out.println("Main方法的输出结果"); } } 多线程的基本原理 (java中开启线程的执行过程) 线程的start方法,实际上底层做了很多事情,具体的实现简图如下,画得不一定工整,但是能够表达大 概意思就行。 OS调度算法有很多,比如先来先服务调度算法(FIFO)、最短优先(就是对短作业的优先调度)、时 间片轮转调度等。

在这里插入图片描述

线程的生命周期

线程从开始 --> 结束 start()启动一个线程 当前线程中的执行逻辑执行完毕后自动销毁,run()结束 线程的其他状态 --> 下方代码示例

// 线程的运行状态 public class ThreadStatusDemo { public static void main(String[] args) { new Thread(()->{ while(true){ try { TimeUnit.SECONDS.sleep(100); } catch (InterruptedException e) { e.printStackTrace(); } } },"TIME_WAITING").start(); new Thread(()->{ while(true){ synchronized (ThreadStatusDemo.class){ try { ThreadStatusDemo.class.wait(); } catch (InterruptedException e) { e.printStackTrace(); } } } },"WAITING"); new Thread(new BlockDemo(),"BLOCKED-DEMO-01").start(); new Thread(new BlockDemo(),"BLOCKED-DEMO-02").start(); } static class BlockDemo extends Thread{ @Override public void run() { synchronized (BlockDemo.class){ while(true){ try { TimeUnit.SECONDS.sleep(100); } catch (InterruptedException e) { e.printStackTrace(); } } } } } } 打印: 1.命令:jps 16484 ThreadStatusDemo 8756 Jps 15212 Launcher 16012 Launcher 8892 2.命令:jstack 16484 "BLOCKED-DEMO-02" #19 prio=5 os_prio=0 tid=0x000002caa99e7000 nid=0x3b38 waiting for monitor entry [0x000000dc24efe000] java.lang.Thread.State: BLOCKED (on object monitor) at com.example.threaddemo.ThreadStatusDemo$BlockDemo.run(ThreadStatusDemo.java:40) - waiting to lock (a java.lang.Class for com.example.threaddemo.ThreadStatusDemo$BlockDemo) at java.lang.Thread.run(Thread.java:748) "BLOCKED-DEMO-01" #17 prio=5 os_prio=0 tid=0x000002caa99e5800 nid=0x14b8 waiting on condition [0x000000dc24dff000] java.lang.Thread.State: TIMED_WAITING (sleeping) at java.lang.Thread.sleep(Native Method) at java.lang.Thread.sleep(Thread.java:340) at java.util.concurrent.TimeUnit.sleep(TimeUnit.java:386) at com.example.threaddemo.ThreadStatusDemo$BlockDemo.run(ThreadStatusDemo.java:40) - locked (a java.lang.Class for com.example.threaddemo.ThreadStatusDemo$BlockDemo) at java.lang.Thread.run(Thread.java:748) //线程的运行状态 运行上述示例,打开终端命令,输入"jps"(显示当前所有Java进程pid); 根据获取到的pid, 通过jstack pid ,可以打印指定Java进程ID的堆栈信息 通过堆栈信息,可以看到线程的运行状态 线程的状态 //线程的运行状态 通过上面这段代码可以看到,线程在运行过程中,会存在几种不同的状态,一般来说,在Java中,线程 的状态一共是6种状态,分别是 NEW:初始状态,线程被构建,但是还没有调用start方法 RUNNABLED:运行状态,JAVA线程把操作系统中的就绪和运行两种状态统一称为“运行中” BLOCKED:阻塞状态,表示线程进入等待状态,也就是线程因为某种原因放弃了CPU使用权,阻塞 也分为几种情况 Ø 等待阻塞:运行的线程执行wait方法,jvm会把当前线程放入到等待队列 Ø 同步阻塞:运行的线程在获取对象的同步锁时,若该同步锁被其他线程锁占用了,那么 jvm会把当前的线程放入到锁池中 Ø 其他阻塞:运行的线程执行Thread.sleep或者t.join方法,或者发出了I/O请求时,JVM会把 当前线程设置为阻塞状态,当sleep结束、join线程终止、io处理完毕则线程恢复 WAITING: 等待状态 TIME_WAITING:超时等待状态,超时以后自动返回 TERMINATED:终止状态,表示当前线程执行完毕

在这里插入图片描述

线程的终止 我们知道Thread提供了线程的一些操作方法,比如stop、suspend等,这些方法可以终止一个线程或者 挂起一个线程,但是这些方法都不建议大家使用。原因比较简单, 举个例子,假设一个线程中,有多个任务在执行,此时,如果调用stop方法去强行中断,那么这个时候 相当于是发送一个指令告诉操作系统把这个线程结束掉,但是操作系统的这个结束动作完成不代表线程 中的任务执行完成,很可能出现线程的任务执行了一半被强制中断,最终导致数据产生问题。这种行为 类似于在linux系统中执行 kill -9类似,它是一种不安全的操作。 那么除了这种方法之外,还有什么方式可以实现线程的终止呢?要了解这个问题,我们首先需要知道, 一个线程什么情况下算是终止了。 我们分析一下下面这段代码,通过start()启动一个线程之后,本质上就是执行这个线程的run方法。 那么如果这个线程在run方法执行完之前,一直处于运行状态,直到run方法中的指令执行完毕,那么这 个线程就会被销毁。 // 普通线程的执行 public class MyThread extends Thread{ @Override public void run() { System.out.println("MyThread run()"); } } //测试类 public class ThreadTest { public static void main(String[] args) { MyThread thread1 = new MyThread(); thread1.start(); } } 在正常情况下,这个线程是不需要人为干预去结束的。如果要强制结束,只能走stop这个方法。 那在哪些情况下,线程的中断需要外部干预呢? 1.线程中存在无限循环执行,比如while(true)循环 2.线程中存在一些阻塞的操作,比如sleep、wait、join等。 存在循环的线程 假设存在如下场景,在run方法中,存在一个while循环,因为这个循环的存在使得这个run方法一直无 法运行结束,这种情况下,如何终止呢? // 存在循环的线程 public class MyThread extends Thread{ @Override public void run() { while(true){ System.out.println("MyThread.run()"); } } } //测试类 public class ThreadTest { public static void main(String[] args) { MyThread thread1 = new MyThread(); thread1.start(); } } 按照我们开发的思维来说,首先要解决的就是,while(true)这个循环,必须要有一个结束条件,其次是 要在其他地方能够修改这个结束条件让该线程感知到变化。假设我们把while(true)改成while(flag),这 个flag可以作为共享变量被外部修改,修改之后使得循环条件无法被满足,从而退出循环并且结束线 程。 这段逻辑其实非常简单,其实就是给了线程一个退出的条件,如果没有这个条件,那么线程将会一直运 行。 实际上,在Java提供了一个 interrupt 方法,这个方法就是实现线程中断操作的,它的作用和上面讲的 这个案例的作用一样。 interrupt方法 当其他线程通过调用当前线程的interrupt方法,表示向当前线程打个招呼,告诉他可以中断线程的执行 了,至于什么时候中断,取决于当前线程自己。 线程通过检查资深是否被中断来进行相应,可以通过isInterrupted()来判断是否被中断。 // 先看一个简单的例子说明 public class InterruptExample implements Runnable{ public static void main(String[] args) { Thread thread1 = new Thread(new InterruptExample()); thread1.interrupt();//发送中断信号 } @Override public void run() { //todo 业务逻辑··········· //对外提供一个信号,通过这个信号来判断这个线程是否可以结束 //当run()方法无法自动结束的情况下,去中断线程 //没有以下情况是不需要被中断的! //1.while(true){} //2.阻塞 try { InterruptExample.class.wait();//阻塞 } catch (InterruptedException e) { e.printStackTrace(); } //3. try { Thread.sleep(100); } catch (InterruptedException e) { e.printStackTrace(); } } } // 慢慢来看interrupt是如何使用的 public class InterruptExample implements Runnable{ //可以在外部使用标记的方式来作为中断的一种方法思路 static boolean flag = true; public static void main(String[] args) { Thread thread1 = new Thread(new InterruptExample()); // thread1.interrupt();//发送中断信号 flag = false; } @Override public void run() { //todo 业务逻辑··········· //如果让线程友好的结束,只有当前run方法中的程序知道的时候才可以 //由当前线程本身来决定中断 while(flag){ } } } //线程中提供了一个方法来获取中断的状态 public class InterruptExample implements Runnable{ public static void main(String[] args) throws InterruptedException { Thread thread1 = new Thread(new InterruptExample()); thread1.start(); //先执行5S再去触发中断 Thread.sleep(5000); thread1.interrupt();//发送中断信号 } @Override public void run() { //todo 业务逻辑··········· //如果让线程友好的结束,只有当前run方法中的程序知道的时候才可以 //由当前线程本身来决定中断 //Thread.currentThread().isInterrupted()获取中断的状态 while(!Thread.currentThread().isInterrupted()){ try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); //java中通过铺货InterruptedException 异常来告诉开发者这里可能会发生中断 //中断后如何处理,这里的选择在于开发者 //如果你想让其中断 添加下方这行代码进行中断 Thread.currentThread().interrupt();//进行中断 } System.out.println(Thread.currentThread().getName()+"--"); } } } 打印: Thread-0-- Thread-0-- Thread-0-- Thread-0-- java.lang.InterruptedException: sleep interrupted at java.lang.Thread.sleep(Native Method) at com.example.threaddemo.InterruptExample.run(InterruptExample.java:26) at java.lang.Thread.run(Thread.java:748) Thread-0-- Thread-0-- 这种方式和上边的定义 static boolean flag = true; 是一样的只是提供了一个更方便的方法 我们发现抛出异常后线程并没有终止仍然继续执行打印,这里涉及到一个复位的概念 我们平时在线程中使用的sleep、wait、join等操作,它都会抛出一个InterruptedException异常,为什么会抛出异常,是因为它在阻塞期间,必须要能够响应被其他线程发起中断请求之后的一个响应,而这个响应是通过InterruptedException来体现的。 interrupt()有两个功能 1.唤醒处于阻塞状态下的线程 2.修改中断的标记 由false --> true 在catch 住InterruptedException 异常这里 1.java中通过铺货InterruptedException 异常来告诉开发者这里可能会发生中断 2.中断后如何处理,这里的选择在于开发者 3.InterruptedException抛出来之后会恢复中断状态。(复位) 在这个异常中如果不做任何处理的话,我们是无法去中断线程的,因为当前的异常只是响应了外部对于这个线程的中断命令,同时,线程的中断状态也会复位 假如你希望他中断 直接在catch中添加 Thread.currentThread().interrupt();//进行中断 所以,InterruptedException异常的抛出并不意味着线程必须终止,而是提醒当前线程有中断的操作发 生,至于接下来怎么处理取决于线程本身,比如 1.直接捕获异常不做任何处理 2.将异常往外抛出 3.停止当前线程,并打印异常信息

isInterrupted是native修饰的方法 Hotspot源码中可以看到 官方源码下载:链接: http://hg.openjdk.java.net.

void os::interrupt(Thread* thread) { assert(Thread::current() == thread || Threads_lock->owned_by_self(), "possibility of dangling Thread pointer"); OSThread* osthread = thread->osthread(); if (!osthread->interrupted()) { osthread->set_interrupted(true); //更新中断标记 // More than one thread can get here with the same value of osthread, // resulting in multiple notifications. We do, however, want the store // to interrupted() to be visible to other threads before we execute unpark(). OrderAccess::fence(); ParkEvent * const slp = thread->_SleepEvent ; if (slp != NULL) slp->unpark() ; } // For JSR166. Unpark even if interrupt status already was set if (thread->is_Java_thread()) ((JavaThread*)thread)->parker()->unpark(); //唤醒被阻塞的方法 ParkEvent * ev = thread->_ParkEvent ; if (ev != NULL) ev->unpark() ; } 排查问题 --Thread Dump日志分析 CPU占用率很高,响应很慢CPU占用率不高,但响应很慢线程出现死锁的情况 // 演示代码 @RestController public class ThreadController { @GetMapping("/loop") public String dumpWhile(){ new Thread(new WhileThread()).start(); return "ok"; } @GetMapping("/dead") public String dumpDeadLock(){ Thread a = new ThreadRunA(); Thread b = new ThreadRunB(); a.start(); b.start(); return "ok"; } } class WhileThread implements Runnable { @Override public void run() { while (true) { System.out.println("Thread"); } } } // 运行 nohup java -jar -Dserver.port=8088 thread-demo-0.0.1-SNAPSHOT.jar > all.log & CPU占用率不高,但响应很慢 // 死锁 通过 curl http://127.0.0.1:8088/dead 演示死锁的场景

查看死锁问题的操作步骤如下:

通过 jps 命令,查看java进程的pid通过`jstack 查看线程日志

如果存在死锁情况,Thread Dump日志里面肯定会给出Found one Java-level deadlock:信息。只要 找到这个信息就可以立马定位到问题并且去解决。

Found one Java-level deadlock: ============================= "Thread-1": waiting to lock monitor 0x0000000026070c88 (object 0x00000007163b7d78, a java.lang.Integer), which is held by "Thread-0" "Thread-0": waiting to lock monitor 0x00000000260735c8 (object 0x0000000716649aa8, a java.lang.Integer), which is held by "Thread-1" CPU占用率很高,响应很慢

有的时候我们会发现CPU占用率很高,系统日志也看不出问题,那么这种情况下,我们需要去看一下运 行中的线程有没有异常。

// 死循环 执行 curl http://127.0.0.1:8088/loop 这个方法,会出现一个线程死循环的情况。 通过 top -c 动态显示进程及占用资源的排行榜,从而找到占用CPU最高的进程PID,得到的 PID=80972 PID USER PR NI VIRT RES SHR S %CPU %MEM TIME+ COMMAND 80972 root 20 0 7773456 296124 12904 S 100.2 1.8 0:38.83 java 然后再定位到对应的线程, top -H -p 80972 查找到该进程中最消耗CPU的线程,得到 PID=81122 PID USER PR NI VIRT RES SHR S %CPU %MEM TIME+ COMMAND 81122 root 20 0 7773456 258504 12932 R 99.8 1.6 5:56.34 java 80972 root 20 0 7773456 258504 12932 S 0.0 1.6 0:00.00 java 通过 printf “0x%x\n” 81122 命令,把对应的线程PID转化为16进制 [root@localhost test]# printf "0x%x\n" 81122 0x13ce2 截止执行这个命令 jstack 80972 | grep -A 20 0x13ce2 查看线程Dump日志,其中-A 20表示 展示20行, 80972表示进程ID, 0x13ce2表示线程ID [root@localhost test]# jstack 80972 | grep -A 20 0x13ce2 "Thread-3" #30 daemon prio=5 os_prio=0 tid=0x00007f84500ce000 nid=0x13ce2 runnable [0x00007f84a78f7000] java.lang.Thread.State: RUNNABLE at java.io.FileOutputStream.writeBytes(Native Method) at java.io.FileOutputStream.write(FileOutputStream.java:326) at java.io.BufferedOutputStream.flushBuffer(BufferedOutputStream.java:82) at java.io.BufferedOutputStream.flush(BufferedOutputStream.java:140) - locked (a java.io.BufferedOutputStream) at java.io.PrintStream.write(PrintStream.java:482) - locked (a java.io.PrintStream) at sun.nio.cs.StreamEncoder.writeBytes(StreamEncoder.java:221) at sun.nio.cs.StreamEncoder.implFlushBuffer(StreamEncoder.java:291) at sun.nio.cs.StreamEncoder.flushBuffer(StreamEncoder.java:104) - locked (a java.io.OutputStreamWriter) at java.io.OutputStreamWriter.flushBuffer(OutputStreamWriter.java:185) at java.io.PrintStream.newLine(PrintStream.java:546) - eliminated (a java.io.PrintStream) at java.io.PrintStream.println(PrintStream.java:807) - locked (a java.io.PrintStream) at com.example.threaddemo.WhileThread.run(ThreadController.java:33) at java.lang.Thread.run(Thread.java:748)

从上述内容可以看出,是WhileThread.run方法中,执行的逻辑导致CPU占用过高。



【本文地址】


今日新闻


推荐新闻


CopyRight 2018-2019 办公设备维修网 版权所有 豫ICP备15022753号-3