java线程 |
您所在的位置:网站首页 › java线程实现原理 › java线程 |
CAS
CAS简介
我们在学习多线程期间,看到最多的例子就是累加器,代码如下: public class Test { long count = 0; void add10K() { int idx = 0; while(idx++ < 10000) { count += 1; } } } 复制代码上面的代码并非线程安全的,问题主要出现在count变量的不可见性和count+=1并非原子性操作,之前的解决方式都是使用Synchronized或者java sdk加锁的方式来保证原子性,使用volatile来保证可见性。 在java线程-Lock详解&AQS原理详解一文中,发现大量使用了自旋+CAS操作,来保证线程安全性,这是一种无锁的操作,性能会比加锁的方式高很多。 我们先了解一下什么是CAS: CAS指的是先检查再更新这类复合操作,英文翻译有很多种:Compare And Set、Compare And Swap或者Check And Set。 CAS指令包含3个参数: 共享变量的内存地址A 用于比较的值B 共享变量的新值CCAS的检查更新逻辑如下: if A == B # 当 A = B的时候才能将A的值更新为C A = c 复制代码 原子CAS指令了解了CAS的基本概念以后,我们来看一下如何通过自旋 + CAS操作来实现一个线程安全的累加器。 代码如下: class SimulatedCAS{ volatile int count; // 实现count+=1 addOne(){ do { newValue = count+1; }while(count != cas(count,newValue) } // 模拟实现CAS,仅用来帮助理解 int cas( int expect, int newValue){ // 读目前count的值 int curValue = count; // 比较目前count值是否==期望值 if(curValue == expect){ // 如果是,则更新count的值 count= newValue; } // 返回写入前的值 return curValue; } } 复制代码但如果只是单纯的CAS操作,还是会出现线程安全问题,有可能出现这种情况,多个线程均检测到curvalue == expect,然后先后将count置为newValue,导致出现线程安全问题。 如果给cas方法加上synchronized修饰,虽说能解决上面说的问题,但是无法通过无锁的方式来解决原子性的问题,但如果cas操作本身就是一个原子性操作,那就解决了原子性操作。java作为一个高级编程语言,内部是提供了原子性的CAS操作的方法,那就是Unsafe类里的CAS操作。 Unsafe不过先来个番外,介绍Unsafe类的具体用法。 Unsafe是位于sun.misc包下的一个类,主要提供一些用于执行低级别、不安全操作的方法,如直接访问系统内存资源、自主管理内存资源等,这些方法在提升Java运行效率、增强Java语言底层资源操作能力方面起到了很大的作用。但由于Unsafe类使Java语言拥有了类似C语言指针一样操作内存空间的能力,这无疑也增加了程序发生相关指针问题的风险。在程序中过度、不正确使用Unsafe类会使得程序出错的概率变大,使得Java这种安全的语言变得不再“安全”,因此对Unsafe的使用一定要慎重。 Unsafe使用方法使用Unsafe的方式如下: // 可以直接调用Unsafe.getUnsafe()静态方法获取Unsafe对象。 public static Unsafe getUnsafe() { Class caller = Reflection.getCallerClass(); if (!VM.isSystemDomainLoader(caller.getClassLoader())) // 这是 throw new SecurityException("Unsafe"); return theUnsafe; } 复制代码不过直接在我们app代码里调用Unsafe.getUnsafe(),会抛出SecurityException("Unsafe")异常,这是因为JVM会判断当前类是否由Bootstrap classLoader加载,如果不是的话那么就会抛出一个SecurityException异常。也就是说,只有启动类加载器加载的类才能够调用Unsafe类中的方法,来防止这些方法在不可信的代码中被调用。 那么,为什么要对Unsafe类进行这么谨慎的使用限制呢,说到底,还是因为它实现的功能过于底层,例如直接进行内存操作、绕过jvm的安全检查创建对象等等。 那如果就想要在我们的app代码里想要使用这个类,那该如何使用的呢?这里提供两种方法: 方法一:利用反射,获取到Unsafe#theUnsafe变量 private static Unsafe reflectGetUnsafe() { try { Field field = Unsafe.class.getDeclaredField("theUnsafe"); field.setAccessible(true); return (Unsafe) field.get(null); } catch (Exception e) { log.error(e.getMessage(), e); return null; } } 复制代码方法二:通过Java命令行命令-Xbootclasspath/a把调用Unsafe相关方法的类A所在jar包路径追加到默认的bootstrap路径中,使得A被引导类加载器加载,从而通过Unsafe.getUnsafe方法安全的获取Unsafe实例。 // java -Xbootclasspath/a: ${path} # 其中path为调用Unsafe相关方法的类所在jar包路径. // java -Xbootclasspath:$JAVA_HOME/jre/lib/rt.jar:./A.jar foo.bar.MyApp public class A{ private static final Unsafe unsafe = Unsafe.getUnsafe(); // ... } 复制代码 Unsafe主要功能
不过这里我们主要介绍一下CAS操作,其余操作请参考美团技术团队写的文章 硬件指令X86提供的CAS指令为cmpxchg指令。指令格式如下: cmpxchg [目标操作数], [源操作数] 复制代码 目标操作数位于寄存器或者内存中,用于存储变量的当前值C(currentValue) 源操作数位于寄存器中,用于存储变量的更新值N(NewValue) 隐藏的操作数位于AX寄存器中,在指令中没有明确指出,用于存储变量的期望值E(ExpectedValue)在单核计算机上cmpxchg是原子操作,因为指令是CPU的最小单元,指令执行过程中不可中断。 但是在多核CPU上,cmpxchg就不是非原子操作,多个线程可以在多核CPU上并行执行cmpxchg指令,为了在多核CPU上还能保障cmpxchg的原子性,需要在cmpxchg指令前加上LOCK前缀。 LOCK cmpxchg [目标操作数], [源操作数] 复制代码 Unsafe-CAS但由于cmpxchg指令是硬件指令,java并不能直接调用,所以Unsafe类提供了三个的native方法,对CAS操作进行封装 /** * CAS * @param o 包含要修改field的对象 * @param offset 对象中某field的偏移量 * @param expected 期望值 * @param update 更新值 * @return true | false */ public final native boolean compareAndSwapObject(Object o, long offset, Object expected, Object update); public final native boolean compareAndSwapInt(Object o, long offset, int expected,int update); public final native boolean compareAndSwapLong(Object o, long offset, long expected, long update); 复制代码对应的C++代码如下: UNSAFE_ENTRY(jboolean, Unsafe_CompareAndSwapObject(JNIEnv *env, jobject unsafe, jobject obj, jlong offset, jobject e_h, jobject x_h)) UnsafeWrapper("Unsafe_CompareAndSwapObject"); oop x = JNIHandles::resolve(x_h); oop e = JNIHandles::resolve(e_h); oop p = JNIHandles::resolve(obj); HeapWord* addr = (HeapWord *)index_oop_from_field_offset_long(p, offset); oop res = oopDesc::atomic_compare_exchange_oop(x, addr, e, true); jboolean success = (res == e); if (success) update_barrier_set((void*)addr, x); return success; UNSAFE_END UNSAFE_ENTRY(jboolean, Unsafe_CompareAndSwapInt(JNIEnv *env, jobject unsafe, jobject obj, jlong offset, jint e, jint x)) UnsafeWrapper("Unsafe_CompareAndSwapInt"); oop p = JNIHandles::resolve(obj); jint* addr = (jint *) index_oop_from_field_offset_long(p, offset); return (jint)(Atomic::cmpxchg(x, addr, e)) == e; UNSAFE_END UNSAFE_ENTRY(jboolean, Unsafe_CompareAndSwapLong(JNIEnv *env, jobject unsafe, jobject obj, jlong offset, jlong e, jlong x)) UnsafeWrapper("Unsafe_CompareAndSwapLong"); Handle p (THREAD, JNIHandles::resolve(obj)); jlong* addr = (jlong*)(index_oop_from_field_offset_long(p(), offset)); if (VM_Version::supports_cx8()) return (jlong)(Atomic::cmpxchg(x, addr, e)) == e; else { jboolean success = false; ObjectLocker ol(p, THREAD); if (*addr == e) { *addr = x; success = true; } return success; } UNSAFE_END 复制代码通过对JVM源码的阅读,发现其实底层都是调用Atomic::cmpxchg这种汇编指令完成的。x86里的汇编指令如下:参考mail.openjdk.org/pipermail/h… inline jbyte Atomic::cmpxchg(jbyte exchange_value, volatile jbyte* dest, jbyte compare_value) { int mp = os::is_MP(); jbyte result; __asm__ volatile (LOCK_IF_MP(%4) "cmpxchgb %1,(%3)" : "=a" (result) : "q" (exchange_value), "a" (compare_value), "r" (dest), "r" (mp) : "cc", "memory"); return result; } 复制代码 CAS失败处理如果多个线程竞争执行CAS,那么只会有一个线程会执行成功,其他执行失败的线程又该如何处理?主要的方案有两种: 方案一:synchronized和Lock,通过加锁的方式 public void increment_lock(){ synchronized(this){ value ++; } } 复制代码但是这种方式默认是通过悲观锁来实现的,可能出现等待资源而阻塞线程导致内核态到用户态的上下文切换,带来性能损耗 方案二:自旋+原子性cas操作 private static final Unsafe unsafe = Unsafe.getUnsafe(); private static final long valueOffset; private volatile int value; static { try { valueOffset = unsafe.objectFieldOffset(Accumulator.class.getDeclaredField("value")); } catch (Exception ex) { throw new Error(ex); } } public void increment_cas(){ boolean success = false; while(!success){ int oldValue = value; int newValue = oldValue + 1; success = unsafe.compareAndSwapInt(this, valueOffset, oldValue, newValue); } } 复制代码方案一和方案二的优缺点 加锁自旋+CAS优点处于阻塞状态的线程不会占用CPU时间片,不会浪费CPU资源基于乐观锁实现,循环执行CAS,不需要阻塞线程,不会出现线程阻塞,不会出现内核态和用户态的上下文切换缺点通过悲观锁来实现的,可能出现等待资源而阻塞线程导致内核态到用户态的上下文切换,带来性能损耗如果线程一直在运行,会浪费CPU资源 ABA问题什么是ABA问题?例如有2个线程同时对同一个值(初始值为A)进行CAS操作,这三个线程如下 线程1,期望值为A,欲更新的值为B 线程2,期望值为A,欲更新的值为B 线程3,期望值为B,欲更新的值为A 线程1抢先获得CPU时间片,而线程2因为其他原因阻塞了。 线程1取值与期望的A值比较,发现相等然后将值更新为B。 然后这个时候出现了线程3,线程3取值与期望的值B比较,发现相等则将值更新为A。 此时线程2从阻塞中恢复,并且获得了CPU时间片,这时候线程2取值与期望的值A比较,发现相等则将值更新为B,虽然线程也完成了操作,但是线程2并不知道值已经经过了A->B->A的变化过程。大部分情况下我们还是不需要关心ABA问题,但是有些场景就不能忽视ABA问题。 ABA的解决方案一般是加上版本号或者时间戳。 原子类CAS在java里运用最常见的就是AQS里的操作和原子类了,但对于普通程序猿来说,最常见的还是原子类的使用,大家都会使用AtomicInteger#incrementAndGet来实现自增。这个章节就是主要来讨论一下原子类的实现。 java原子类的实现都在java.util.concurrent.atomic包下,基本是通过自旋+原子CAS操作实现的,不会出现安全问题。 根据处理的数据类型,可以大致分为5类: 重点说明一下基本类型原子类,引用类型原子类以及累加器 基本类型原子类基本类型原子类的实现基本相同,我们以AtomicInteger为例,深入学习一下基本类型原子类。 public class AtomicInteger extends Number implements java.io.Serializable { private static final Unsafe unsafe = Unsafe.getUnsafe(); private static final long valueOffset; static { try { valueOffset = unsafe.objectFieldOffset (AtomicInteger.class.getDeclaredField("value")); } catch (Exception ex) { throw new Error(ex); } } private volatile int value; public AtomicInteger(int initialValue) { value = initialValue; } public AtomicInteger() { } // 其余API getAndIncrement() //原子化i++ getAndDecrement() //原子化的i-- incrementAndGet() //原子化的++i decrementAndGet() //原子化的--i //当前值+=delta,返回+=前的值 getAndAdd(delta) //当前值+=delta,返回+=后的值 addAndGet(delta) //CAS操作,返回是否成功 compareAndSet(expect, update) //以下四个方法 //新值可以通过传入func函数来计算 getAndUpdate(func) updateAndGet(func) getAndAccumulate(x,func) accumulateAndGet(x,func) } 复制代码其中compareAndSet方法是CAS标准函数,如果value值等于expect,那么就将value更新为update值,并返回true。 getAndAdd和addAndGet方法类似,addAndGet只不过是在getAndAdd返回值的基础上再加上一个delta再返回。 getAndIncrement和incrementAndGet方法,相当于getAndAdd和addAndGet方法中的delta=1; getAndDecrement和decrementAndGet方法,相当于getAndAdd和addAndGet方法中的delta=-1; public final int getAndAdd(int delta) { return unsafe.getAndAddInt(this, valueOffset, delta); } public final int addAndGet(int delta) { return unsafe.getAndAddInt(this, valueOffset, delta) + delta; } public final int getAndIncrement() { return unsafe.getAndAddInt(this, valueOffset, 1); } public final int incrementAndGet() { return unsafe.getAndAddInt(this, valueOffset, 1) + 1; } public final int getAndDecrement() { return unsafe.getAndAddInt(this, valueOffset, -1); } public final int decrementAndGet() { return unsafe.getAndAddInt(this, valueOffset, -1) - 1; } // unsafe.getAndAddInt public final int getAndAddInt(Object o, long offset, int delta) { int oldValue; // 使用自旋+CAS,保证getAndAddInt总是可以将value的值增加delta do { oldValue = this.getIntVolatile(o, offset); } while(!this.compareAndSwapInt(o, offset, oldValue, oldValue + delta)); return oldValue; } 复制代码 引用类型原子类AtomicReference与AtomicInteger的实现方式类似,只不过引用类型原子类使用的CAS操作是sun.misc.Unsafe#compareAndSwapObject。 public class AtomicReference implements java.io.Serializable { private static final Unsafe unsafe = Unsafe.getUnsafe(); private static final long valueOffset; static { try { valueOffset = unsafe.objectFieldOffset (AtomicReference.class.getDeclaredField("value")); } catch (Exception ex) { throw new Error(ex); } } private volatile V value; public AtomicReference(V initialValue) { value = initialValue; } public AtomicReference() { } public final boolean compareAndSet(V expect, V update) { return unsafe.compareAndSwapObject(this, valueOffset, expect, update); } } 复制代码我们在学习CAS操作的时候,提到过ABA问题,在引用类型原子类中AtomicStampedReference和AtomicMarkableReference就是为了解决ABA问题而存在的。 AtomicStampedReference相比于AtomicReference多了一个int类型的stamp版本戳,它将stamp和引用封装成一个新的Pair对象,在Pair对象上执行CAS。即便引用对象存在ABA问题,但是Stamp单调递增,stamp不会存在ABA问题,所以两者组成的Pair对象,也就不会存在ABA问题。 public class AtomicStampedReference { private static class Pair { final T reference; final int stamp; private Pair(T reference, int stamp) { this.reference = reference; this.stamp = stamp; } static Pair of(T reference, int stamp) { return new Pair(reference, stamp); } } private volatile Pair pair; public AtomicStampedReference(V initialRef, int initialStamp) { pair = Pair.of(initialRef, initialStamp); } public boolean compareAndSet(V expectedReference, V newReference, int expectedStamp, int newStamp) { Pair current = pair; return expectedReference == current.reference && expectedStamp == current.stamp && ((newReference == current.reference && newStamp == current.stamp) || casPair(current, Pair.of(newReference, newStamp))); } private static final sun.misc.Unsafe UNSAFE = sun.misc.Unsafe.getUnsafe(); private static final long pairOffset = objectFieldOffset(UNSAFE, "pair", AtomicStampedReference.class); private boolean casPair(Pair cmp, Pair val) { return UNSAFE.compareAndSwapObject(this, pairOffset, cmp, val); } } 复制代码AtomicMarkableReference和AtomicStampedReference的区别在于AtomicMarkableReference使用的是boolean类型的mark是否改变,来判断reference是否有被更改过。 数组类型原子类AtomicIntegerArray中的原子操作跟AtomicInteger中的原子操作一一对应,只不过多了下标而已。 public final boolean compareAndSet(int i, int expect, int update) { return compareAndSetRaw(checkedByteOffset(i), expect, update); } private boolean compareAndSetRaw(long offset, int expect, int update) { return unsafe.compareAndSwapInt(array, offset, expect, update); } 复制代码 对象属性更新器如果某个类的属性没有提供合适的原子操作,那么我们可以使用对象属性更新器来对其进行原子操作,但是属性必须要被volatile修饰,否则会报异常。 参考代码: AtomicIntegerFieldUpdaterImpl(final Class tclass, final String fieldName, final Class caller) { final Field field; final int modifiers; try { field = AccessController.doPrivileged( new PrivilegedExceptionAction() { public Field run() throws NoSuchFieldException { return tclass.getDeclaredField(fieldName); } }); modifiers = field.getModifiers(); sun.reflect.misc.ReflectUtil.ensureMemberAccess( caller, tclass, null, modifiers); ClassLoader cl = tclass.getClassLoader(); ClassLoader ccl = caller.getClassLoader(); if ((ccl != null) && (ccl != cl) && ((cl == null) || !isAncestor(cl, ccl))) { sun.reflect.misc.ReflectUtil.checkPackageAccess(tclass); } } catch (PrivilegedActionException pae) { throw new RuntimeException(pae.getException()); } catch (Exception ex) { throw new RuntimeException(ex); } if (field.getType() != int.class) throw new IllegalArgumentException("Must be integer type"); if (!Modifier.isVolatile(modifiers)) throw new IllegalArgumentException("Must be volatile type"); this.cclass = (Modifier.isProtected(modifiers) && tclass.isAssignableFrom(caller) && !isSamePackage(tclass, caller)) ? caller : tclass; this.tclass = tclass; this.offset = U.objectFieldOffset(field); } 复制代码 累加器针对累加这种特殊的业务场景,JUC提供了专门的LongAdder累加器,它比AtomicLong原子类性能更高,在高并发的情况下,多线程同时执行add()函数,AtomicLong会因为大量线程而不断自旋导致性能下降,但是LongAdder却能保持高性能。 其底层原理比较复杂,涉及到数据分片,哈希优化,去伪共享,非精确求和等各种优化手段。 java.util.concurrent.atomic.Striped64是所有累加器的父类,目前JUC里的所有累加器都会继承它。 abstract class Striped64 extends Number { @sun.misc.Contended static final class Cell { volatile long value; Cell(long x) { value = x; } final boolean cas(long cmp, long val) { return UNSAFE.compareAndSwapLong(this, valueOffset, cmp, val); } // Unsafe mechanics private static final sun.misc.Unsafe UNSAFE; private static final long valueOffset; static { try { UNSAFE = sun.misc.Unsafe.getUnsafe(); Class ak = Cell.class; valueOffset = UNSAFE.objectFieldOffset (ak.getDeclaredField("value")); } catch (Exception e) { throw new Error(e); } } } static final int NCPU = Runtime.getRuntime().availableProcessors(); transient volatile Cell[] cells; transient volatile long base; transient volatile int cellsBusy; Striped64() { } final boolean casBase(long cmp, long val) { return UNSAFE.compareAndSwapLong(this, BASE, cmp, val); } final boolean casCellsBusy() { return UNSAFE.compareAndSwapInt(this, CELLSBUSY, 0, 1); } static final int getProbe() { return UNSAFE.getInt(Thread.currentThread(), PROBE); } // Unsafe mechanics private static final sun.misc.Unsafe UNSAFE; private static final long BASE; private static final long CELLSBUSY; private static final long PROBE; static { try { UNSAFE = sun.misc.Unsafe.getUnsafe(); Class sk = Striped64.class; BASE = UNSAFE.objectFieldOffset (sk.getDeclaredField("base")); CELLSBUSY = UNSAFE.objectFieldOffset (sk.getDeclaredField("cellsBusy")); Class tk = Thread.class; PROBE = UNSAFE.objectFieldOffset (tk.getDeclaredField("threadLocalRandomProbe")); } catch (Exception e) { throw new Error(e); } } } 复制代码Striped64里主要有四个变量 cells cells数组保存多个累加变量 Cell只包含一个成员变量value,以及一个操作value的cas()函数。 cells数组支持动态扩容,cells数组的长度必须是2的幂次方,每次扩容都会增加为原来数组长度的2倍。 最开始初始化为null,当第一次出现线程竞争执行add()函数的时候,才会被创建NCPU JVM最大可用CPU核数。 当cells数组长度大于等于NCPU的最小2的幂次方时,cells数组就不再扩容。这是因为同时执行累加操作的线程数不可能大于cpu的核数。base base是一个比较特殊的累加变量。 用来有效避免执行复杂的分片累加逻辑。cellBusy cellBusy用来实现锁,类似ReentrantLock中的state字段,cellBusy初始化为0,多个线程通过CAS竞争更新cellBusy,谁先将cellBusy设置为1,谁就持有了锁。 用来保证多个线程同时创建cells数组,创建cells数组中的cell对象,以及对cells数组进行动态扩容。 数据分片在高并发的情况下,AtomicLong性能不高的主要原因是,多线程同时CAS更新一个变量,但是LongAdder会将一个累加变量分解成多个。多线程同时执行累加操作时,不同的线程对不同的累加变量进行操作,线程之间互不影响,这样就避免了一个线程等待另一个线程操作完成之后再操作。 我们以add()函数为例,了解一下LongAdder的实现原理 // LongAdder#add() public void add(long x) { Cell[] as; long b, v; int m; Cell a; // 初始状态下,cells==null,第一个条件一定是false,此时会通过casBase方法,以CAS的方式更新base值,且只有当cas失败时,才会走到if中,再执行分片累加的逻辑(将新增值累加到cells数组中)。在低并发的情况下,使用base可以有效避免执行复杂的分片累加逻辑。 if ((as = cells) != null || !casBase(b = base, b + x)) { boolean uncontended = true; if (as == null || (m = as.length - 1) < 0 || (a = as[getProbe() & m]) == null || !(uncontended = a.cas(v = a.value, v + x))) longAccumulate(x, null, uncontended); } } 复制代码实现流程如下: 其中比较重要的是getProbe()函数,这是一个哈希函数,如果getProbe() & m = k,那么,当前线程会通过CAS将新增值x累加到cells[k]的value变量上。 另外一个函数就是longAccumulate,这是java.util.concurrent.atomic.Striped64#longAccumulate里的函数,看上述流程图,会有三种情况走到这一步。 cells数组为null,或者cells.length == 0; cells[getProbe() & m] == null; cas更新cells[getProbe() & m]对象的value值失败; final void longAccumulate(long x, LongBinaryOperator fn, boolean wasUncontended) { int h; if ((h = getProbe()) == 0) { ThreadLocalRandom.current(); // force initialization h = getProbe(); wasUncontended = true; } boolean collide = false; // True if last slot nonempty // 自旋 for (;;) { Cell[] as; Cell a; int n; long v; if ((as = cells) != null && (n = as.length) > 0) { if ((a = as[(n - 1) & h]) == null) { if (cellsBusy == 0) { // Try to attach new Cell Cell r = new Cell(x); // Optimistically create if (cellsBusy == 0 && casCellsBusy()) { boolean created = false; try { // Recheck under lock Cell[] rs; int m, j; if ((rs = cells) != null && (m = rs.length) > 0 && rs[j = (m - 1) & h] == null) { rs[j] = r; created = true; } } finally { cellsBusy = 0; } if (created) break; continue; // Slot is now non-empty } } collide = false; } else if (!wasUncontended) // CAS already known to fail wasUncontended = true; // Continue after rehash else if (a.cas(v = a.value, ((fn == null) ? v + x : fn.applyAsLong(v, x)))) break; else if (n >= NCPU || cells != as) collide = false; // At max size or stale else if (!collide) collide = true; else if (cellsBusy == 0 && casCellsBusy()) { try { if (cells == as) { // Expand table unless stale Cell[] rs = new Cell[n > 17; probe ^= probe |
今日新闻 |
推荐新闻 |
CopyRight 2018-2019 办公设备维修网 版权所有 豫ICP备15022753号-3 |