精品欧美一区二区三区在线观看 _久久久久国色av免费观看性色_国产精品久久在线观看_亚洲第一综合网站_91精品又粗又猛又爽_小泽玛利亚一区二区免费_91亚洲精品国偷拍自产在线观看 _久久精品视频在线播放_美女精品久久久_欧美日韩国产成人在线

并發編程之Exchanger原理與使用

開發 前端
Exchanger是適用在兩個線程之間數據交換的并發工具類,它的作用是找到一個同步點,當兩個線程都執行到了同步點(exchange方法)之后(有一個沒有執行到就一直等待,也可以設置等待超時時間),就將自身線程的數據與對方交換。

[[356511]]

 前言

在JUC包中,除了一些常用的或者說常見的并發工具類(ReentrantLock,CountDownLatch,CyclicBarrier,Semaphore)等,還有一個不常用的線程同步器類 —— Exchanger。

Exchanger是適用在兩個線程之間數據交換的并發工具類,它的作用是找到一個同步點,當兩個線程都執行到了同步點(exchange方法)之后(有一個沒有執行到就一直等待,也可以設置等待超時時間),就將自身線程的數據與對方交換。

Exchanger 是什么?

它提供一個同步點,在這個同步點兩個線程可以交換彼此的數據。這個兩個線程通過exchange方法交換數據,如果第一個線程先執行exchange方法,它會一直等待第二個線程也執行exchange,當兩個線程都到達同步點時,這兩個線程就可以交換數據,將本線程生產出來的數據傳遞給對方。因此使用Exchanger的中斷時成對的線程使用exchange()方法,當有一對線程到達了同步點,就會進行交換數據,因此該工具類的線程對象是成對的。

線程可以在成對內配對和交換元素的同步點。每個線程在輸入exchange方法時提供一些對象,與合作者線程匹配,并在返回時接收其合作伙伴的對象。交換器可以被視為一個的雙向形式的SynchroniuzedQueue。交換器在諸如遺傳算法和管道設計的應用中可能是有用的。

一個用于兩個工作線程之間交換數據的封裝工具類,簡單說就是一個線程在完成一定事務后想與另一個線程交換數據,則第一個先拿出數據的線程會一直等待第二個線程,直到第二個線程拿著數據到來時才能彼此交換對應數據。


Exchanger 用法

  • Exchanger 泛型類型,其中V表示可交換的數據類型
  • V exchanger(V v):等待另一個線程到達此交換點(除非當前線程被中斷),然后將給定的對象傳送該線程,并接收該線程的對象。
  • V exchanger(V v, long timeout, TimeUnit unit):等待另一個線程到達此交換點(除非當前線程被中斷或超出類指定的等待時間),然后將給定的對象傳送給該線程,并接收該線程的對象。

應用場景

Exchanger可以用于遺傳算法,遺傳算法里需要選出兩個人作為交配對象,這時候會交換兩人的數據,并使用交叉規則得出2個交配結果。

Exchanger也可以用于校對工作。比如我們需要將紙制銀流通過人工的方式錄入成電子銀行流水,為了避免錯誤,采用AB崗兩人進行錄入,錄入到Excel之后,系統需要加載這兩個Excel,并對這兩個Excel數據進行校對,看看是否錄入的一致

Exchanger的典型應用場景是:一個任務在創建對象,而這些對象的生產代價很高,另一個任務在消費這些對象。通過這種方式,可以有更多的對象在被創建的同時被消費。

案例說明

Exchanger 用于兩個線程間交換數據,當然實際參與的線程可以不止兩個,測試用例如下:

  1. private static void test1() throws InterruptedException { 
  2.         Exchanger<String> exchanger = new Exchanger<>(); 
  3.         CountDownLatch countDownLatch = new CountDownLatch(5); 
  4.  
  5.         for (int i = 0; i < 5; i++) { 
  6.             new Thread(() ->  { 
  7.  
  8.                 try { 
  9.                     String origMsg = RandomStringUtils.randomNumeric(6); 
  10.  
  11.                     // 先到達的線程會在此等待,直到有一個線程跟它交換數據或者等待超時 
  12.                     String exchangeMsg = exchanger.exchange(origMsg,5, TimeUnit.SECONDS); 
  13.  
  14.                     System.out.println(Thread.currentThread().getName() + "\t origMsg:" + origMsg + "\t exchangeMsg:" + exchangeMsg); 
  15.                 } catch (InterruptedException e) { 
  16.                     e.printStackTrace(); 
  17.                 } catch (TimeoutException e) { 
  18.                     e.printStackTrace(); 
  19.                 }finally { 
  20.                     countDownLatch.countDown(); 
  21.                 } 
  22.  
  23.             },String.valueOf(i)).start(); 
  24.         } 
  25.  
  26.         countDownLatch.await(); 
  27.     } 

 第5個線程因為沒有匹配的線程而等待超時,輸出如下:

  1. 0  origMsg:524053  exchangeMsg:098544 
  2. 3  origMsg:433246  exchangeMsg:956604 
  3. 4  origMsg:098544  exchangeMsg:524053 
  4. 1  origMsg:956604  exchangeMsg:433246 
  5. java.util.concurrent.TimeoutException 
  6.  at java.util.concurrent.Exchanger.exchange(Exchanger.java:626) 
  7.  at com.nuih.juc.ExchangerDemo.lambda$test1$0(ExchangerDemo.java:37) 
  8.  at java.lang.Thread.run(Thread.java:748) 

 上述測試用例是比較簡單,可以模擬消息消費的場景來觀察Exchanger的行為,測試用例如下: 

  1. private static void test2() throws InterruptedException { 
  2.         Exchanger<String> exchanger = new Exchanger<>(); 
  3.         CountDownLatch countDownLatch = new CountDownLatch(4); 
  4.         CyclicBarrier cyclicBarrier = new CyclicBarrier(4); 
  5.  
  6.         // 生產者 
  7.         Runnable producer = new Runnable() { 
  8.             @Override 
  9.             public void run() { 
  10.                 try{ 
  11.                     cyclicBarrier.await(); 
  12.  
  13.                     for (int i = 0; i < 5; i++) { 
  14.                         String msg = RandomStringUtils.randomNumeric(6); 
  15.                         exchanger.exchange(msg,5,TimeUnit.SECONDS); 
  16.                         System.out.println(Thread.currentThread().getName() + "\t producer msg -> " + msg + " ,\t i -> " + i); 
  17.                     } 
  18.  
  19.                 }catch (Exception e){ 
  20.                     e.printStackTrace(); 
  21.                 }finally { 
  22.                     countDownLatch.countDown(); 
  23.                 } 
  24.             } 
  25.         }; 
  26.  
  27.         // 消費者 
  28.         Runnable consumer = new Runnable() { 
  29.             @Override 
  30.             public void run() { 
  31.                 try{ 
  32.                     cyclicBarrier.await(); 
  33.                     for (int i = 0; i < 5; i++) { 
  34.                         String msg = exchanger.exchange(null,5,TimeUnit.SECONDS); 
  35.                         System.out.println(Thread.currentThread().getName() + "\t consumer msg -> " + msg + ",\t" + i); 
  36.                     } 
  37.  
  38.                 }catch (Exception e){ 
  39.                     e.printStackTrace(); 
  40.                 }finally { 
  41.                     countDownLatch.countDown(); 
  42.                 } 
  43.             } 
  44.         }; 
  45.  
  46.         for (int i = 0; i < 2; i++){ 
  47.             new Thread(producer).start(); 
  48.             new Thread(consumer).start(); 
  49.         } 
  50.  
  51.         countDownLatch.await(); 
  52.     } 

 輸出如下,上面生產者和消費者線程數是一樣的,循環次數也是一樣的,但是還是出現等待超時的情形:

  1. Thread-3  consumer msg -> null, 0 
  2. Thread-1  consumer msg -> null, 0 
  3. Thread-1  consumer msg -> null, 1 
  4. Thread-2  producer msg -> 640010 ,  i -> 0 
  5. Thread-2  producer msg -> 733133 ,  i -> 1 
  6. Thread-3  consumer msg -> null, 1 
  7. Thread-3  consumer msg -> 476520, 2 
  8. Thread-1  consumer msg -> 640010, 2 
  9. Thread-1  consumer msg -> null, 3 
  10. Thread-0  producer msg -> 993414 ,  i -> 0 
  11. Thread-0  producer msg -> 292745 ,  i -> 1 
  12. Thread-2  producer msg -> 476520 ,  i -> 2 
  13. Thread-2  producer msg -> 408446 ,  i -> 3 
  14. Thread-3  consumer msg -> null, 3 
  15. Thread-1  consumer msg -> 292745, 4 
  16. Thread-2  producer msg -> 251971 ,  i -> 4 
  17. Thread-0  producer msg -> 078939 ,  i -> 2 
  18. Thread-3  consumer msg -> 251971, 4 
  19. java.util.concurrent.TimeoutException 
  20.  at java.util.concurrent.Exchanger.exchange(Exchanger.java:626) 
  21.  at com.nuih.juc.ExchangerDemo$1.run(ExchangerDemo.java:70) 
  22.  at java.lang.Thread.run(Thread.java:748) 
  23.  
  24. Process finished with exit code 0 

 這種等待超時是概率出現的,這是為啥?

因為系統調度的不均衡和Exchanger底層的大量自旋等待導致這4個線程并不是調用exchanger成功的次數并不一致。另外從輸出可以看出,消費者線程并沒有像我們想的那樣跟生產者線程一一匹配,生產者線程有時也充當來消費者線程,這是為啥?因為Exchanger匹配時完全不關注這個線程的角色,兩個線程之間的匹配完全由調度決定的,即CPU同時執行來或者緊挨著執行來兩個線程,這兩個線程就匹配成功來。

源碼分析

Exchanger 類圖 

 

其內部主要變量和方法如下:


成員屬性

  1. // ThreadLocal變量,每個線程都有之間的一個副本 
  2. private final Participant participant; 
  3. // 高并發下使用的,保存待匹配的Node實例 
  4. private volatile Node[] arena; 
  5. // 低并發下,arena未初始化時使用的保存待匹配的Node實例 
  6. private volatile Node slot; 
  7. // 初始值為0,當創建arena后被負責SEQ,用來記錄arena數組的可用最大索引, 
  8. // 會隨著并發的增大而增大直到等于最大值FULL, 
  9. // 會隨著并行的線程逐一匹配成功而減少恢復成初始值 
  10. private volatile int bound; 

 還有多個表示字段偏移量的靜態屬性,通過static代碼塊初始化,如下:

  1. // Unsafe mechanics 
  2. private static final sun.misc.Unsafe U; 
  3. private static final long BOUND; 
  4. private static final long SLOT; 
  5. private static final long MATCH; 
  6. private static final long BLOCKER; 
  7. private static final int ABASE; 
  8. static { 
  9.     int s; 
  10.     try { 
  11.         U = sun.misc.Unsafe.getUnsafe(); 
  12.         Class<?> ek = Exchanger.class; 
  13.         Class<?> nk = Node.class; 
  14.         Class<?> ak = Node[].class; 
  15.         Class<?> tk = Thread.class; 
  16.         BOUND = U.objectFieldOffset 
  17.             (ek.getDeclaredField("bound")); 
  18.         SLOT = U.objectFieldOffset 
  19.             (ek.getDeclaredField("slot")); 
  20.         MATCH = U.objectFieldOffset 
  21.             (nk.getDeclaredField("match")); 
  22.         BLOCKER = U.objectFieldOffset 
  23.             (tk.getDeclaredField("parkBlocker")); 
  24.         s = U.arrayIndexScale(ak); 
  25.         // ABASE absorbs padding in front of element 0 
  26.         ABASE = U.arrayBaseOffset(ak) + (1 << ASHIFT); 
  27.  
  28.     } catch (Exception e) { 
  29.         throw new Error(e); 
  30.     } 
  31.     if ((s & (s-1)) != 0 || s > (1 << ASHIFT)) 
  32.         throw new Error("Unsupported array scale"); 

 Exchanger 定義來多個靜態變量,如下: 

  1. // 初始化arena時使用, 1 << ASHIFT 是一個緩存行的大小,避免來不同的Node落入到同一個高速緩存行 
  2. // 這里實際是把數組容量擴大來8倍,原來索引相鄰的兩個元素,擴容后中間隔來7個元素,從元素的起始地址上看就隔來8個元素,中間的7個都是空的,為來避免原來相鄰的兩個元素都落入到同一個緩存行中 
  3. // 因為arena是對象數組,一個元素占8字節,8個就是64字節 
  4. private static final int ASHIFT = 7; 
  5. // arena 數組元素的索引最大值即255 
  6. private static final int MMASK = 0xff; 
  7. // arena 數組的最大長度即256 
  8. private static final int SEQ = MMASK + 1; 
  9. // 獲取CPU核數 
  10. private static final int NCPU = Runtime.getRuntime().availableProcessors(); 
  11. // 實際的數組長度,因為是線程兩兩配對的,所以最大長度是核數除以2 
  12. static final int FULL = (NCPU >= (MMASK << 1)) ? MMASK : NCPU >>> 1; 
  13. // 自旋等待的次數 
  14. private static final int SPINS = 1 << 10; 
  15. // 如果交換的對象是null,則返回此對象 
  16. private static final Object NULL_ITEM = new Object(); 
  17. // 如果等待超時導致交換失敗,則返回此對象 
  18. private static final Object TIMED_OUT = new Object(); 

 內部類

Exchanger類中有兩個內部類,一個Node,一個Participant。 Participant繼承了ThreadLocal并且重寫了其initialValue方法,返回一個Node對象。其定義如下:

  1. @sun.misc.Contended static final class Node { 
  2.     int index;              // Arena index 
  3.     int bound;              // Last recorded value of Exchanger.bound 
  4.     int collides;           // Number of CAS failures at current bound 
  5.     int hash;               // Pseudo-random for spins 
  6.     Object item;            // This thread's current item 
  7.     volatile Object match;  // Item provided by releasing thread 
  8.     volatile Thread parked; // Set to this thread when parked, else null 
  9.  
  10. /** The corresponding thread local class */ 
  11. static final class Participant extends ThreadLocal<Node> { 
  12.     public Node initialValue() { return new Node(); } 

 其中Contended注解是為了避免高速緩存行導致的偽共享問題

  • index用來記錄arena數組的索引
  • bound用于記錄上一次的Exchanger bound屬性
  • collides用于記錄在bound不變的情況下CAS搶占失敗的次數
  • hash是自旋等待時計算隨機數使用的
  • item表示當前線程請求交換的對象
  • match是同其它線程交換的結果,match不為null表示交換成功
  • parked為跟該Node關聯的處于休眠狀態的線程。

重要方法

exchange()方法

  1. @SuppressWarnings("unchecked"
  2. public V exchange(V x) throws InterruptedException { 
  3.     Object v; 
  4.     Object item = (x == null) ? NULL_ITEM : x; // translate null args 
  5.     if ((arena != null || // 是null就執行后面的方法 
  6.          (v = slotExchange(item, false, 0L)) == null) && 
  7.         // 如果執行slotExchange有結果就執行后面的,否則返回 
  8.         ((Thread.interrupted() || // 非中斷則執行后面的方法 
  9.           (v = arenaExchange(item, false, 0L)) == null))) 
  10.         throw new InterruptedException(); 
  11.     return (v == NULL_ITEM) ? null : (V)v; 

 exchange 方法的執行步驟:

  1. 如果執行 soltExchange 有結果就執行后面的 arenaExchange;
  2. 如果 slot 被占用,就執行 arenaExchange;
  3. 返回的數據 v 是對方線程的數據項;
  4. 總結即:如果A線程先調用,那么A的數據項存儲的 item中,則B線程的數據項存儲在 math 中;
  5. 當沒有多線程并發操作 Exchange 的時候,使用 slotExchange 就足夠了,slot 是一個 node 對象;
  6. 當出現并發了,一個 slot 就不夠了,就需要使用一個 node 數組 arena 操作了。

slotExchange()方法

slotExchange 是基于slot屬性來完成交換的,調用soltExchange方法時,如果slot屬性為null,當前線程會將slot屬性由null修改成當前線程的Node,如果修改失敗則下一次for循環走solt屬性不為null的邏輯,如果修改成功則自旋等待,自旋一定次數后通過Unsafe的park方法當當前線程休眠,可以指定休眠的時間,如果沒有指定則無限期休眠直到被喚醒;無論是因為線程中斷被喚醒,等待超時被喚醒還是其它線程unpark喚醒的,都會檢查當前線程的Node的屬性釋放為null,如果不為null說明交互成功,返回該對象;否則返回null或者TIME_OUT,在返回前會將item,match等屬性置為null,保存之前自旋時計算的hash值,方便下一次調用slotExchange。

調用slotExchange方法時,如果slot屬性不為null,則當前線程會嘗試將其修改null,如果cas修改成功,表示當前線程與slot屬性對應的線程匹配成功,會獲取slot屬性對應Node的item屬性,將當前線程交換的對象保存到slot屬性對應的Node的match屬性,然后喚醒獲取slot屬性對應Node的waiter屬性,即處理休眠狀態的線程,至此交換完成,同樣的在返回前需要將item,match等屬性置為null,保存之前自旋時計算的hash置,方便下一次調用slotExchange;如果cas修改slot屬性失敗,說明有其它線程也在搶占slot,則初始化arena屬性,下一次for循環因為arena屬性不為null,直接返回null,從而通過arenaExchange完成交換。

  1. // arena 為null是會調用此方法,返回null表示交換失敗 
  2. // item是交換的對象,timed表示是否等待指定的時間,為false表示無限期等待,ns為等待時間 
  3. private final Object slotExchange(Object item, boolean timed, long ns) { 
  4.     // 獲取當前線程關聯的participant Node 
  5.     Node p = participant.get(); 
  6.     Thread t = Thread.currentThread(); 
  7.     // 被中斷,返回null 
  8.     if (t.isInterrupted()) // preserve interrupt status so caller can recheck 
  9.         return null
  10.      
  11.     for (Node q;;) { 
  12.         if ((q = slot) != null) { // slot 不為null 
  13.             // 將slot置為null,slot對應的線程與當前線程匹配成功 
  14.             if (U.compareAndSwapObject(this, SLOT, q, null)) { 
  15.                 Object v = q.item; 
  16.                 // 保存item,即完成交互 
  17.                 q.match = item; 
  18.                 // 喚醒q對應的處于休眠狀態的線程 
  19.                 Thread w = q.parked; 
  20.                 if (w != null
  21.                     U.unpark(w); 
  22.                 return v; 
  23.             } 
  24.             // slot修改失敗,其它某個線程搶占來該slot,多個線程同時調用exchange方法會觸發此邏輯 
  25.             // bound等于0表示未初始化,此處校驗避免重復初始化 
  26.             if (NCPU > 1 && bound == 0 && 
  27.                 U.compareAndSwapInt(this, BOUND, 0, SEQ)) 
  28.                 arena = new Node[(FULL + 2) << ASHIFT]; 
  29.         } 
  30.         else if (arena != null
  31.             return null; // carena不為null,通過arenaExchange交互 
  32.         else { 
  33.             // slot和arena都為null 
  34.             p.item = item; 
  35.             // 修改slot為p,修改成功則終止循環 
  36.             if (U.compareAndSwapObject(this, SLOT, null, p)) 
  37.                 break; 
  38.             // 修改失敗則繼續for循環,將otem恢復成null 
  39.             p.item = null
  40.         } 
  41.     } 
  42.  
  43.     // 將slot修改為p后會進入此分支 
  44.     int h = p.hash; // hash初始為0 
  45.     long end = timed ? System.nanoTime() + ns : 0L; 
  46.     int spins = (NCPU > 1) ? SPINS : 1; 
  47.     Object v; 
  48.     // match保存著同其他線程交換的對象,如果不為null,說明交換成功了 
  49.     while ((v = p.match) == null) { 
  50.         // 執行自旋等待 
  51.         if (spins > 0) { 
  52.         h ^= h << 1; h ^= h >>> 3; h ^= h << 10; 
  53.         if (h == 0) 
  54.             h = SPINS | (int)t.getId(); 初始化h 
  55.         // 只有生成的h小于0時才減少spins 
  56.         else if (h < 0 && (--spins & ((SPINS >>> 1) - 1)) == 0) 
  57.             Thread.yield(); 
  58.         } 
  59.         // slot被修改了,已經有匹配的線程,重新自旋,讀取屬性,因為是先修改slot再修改屬性的,兩者因為CPU調度的問題可能有時間差 
  60.         else if (slot != p) 
  61.             spins = SPINS; 
  62.         // 線程沒有被中斷且arena為null 
  63.         else if (!t.isInterrupted() && arena == null && 
  64.                  (!timed || (ns = end - System.nanoTime()) > 0L)) { 
  65.             U.putObject(t, BLOCKER, this); 
  66.             p.parked = t; 
  67.             if (slot == p) 
  68.                 U.park(false, ns); 
  69.             // 線程被喚醒,繼續下一次for循環 
  70.             // 如果是因為等待超時而被喚醒,下次for循環進入下沒的else if分支,返回TIMED_OUT 
  71.             p.parked = null
  72.                 U.putObject(t, BLOCKER, null); 
  73.         } 
  74.         // 將slot修改成p 
  75.         else if (U.compareAndSwapObject(this, SLOT, p, null)) { 
  76.             // timed為flase,無限期等待,因為中斷被喚醒返回null 
  77.             // timed為ture,因為超時被喚醒,返回TIMED_OUT,因為中斷被喚醒返回null 
  78.             v = timed && ns <= 0L && !t.isInterrupted() ? TIMED_OUT : null
  79.             break; 
  80.         } 
  81.     } 
  82.     // 修改match為null,item為null,保存h,下一次exchange是h就不是初始值為0了 
  83.     U.putOrderedObject(p, MATCH, null); 
  84.     // 重置 item 
  85.     p.item = null
  86.     // 保留偽隨機數,供下次種子數字 
  87.     p.hash = h; 
  88.     // 返回 
  89.     return v; 

 總結一下上面執行的邏輯:

  • Exchange 使用了對象池的技術,將對象保存在 ThreadLocal 中,這個對象(Node)封裝了數據項,線程對象等關鍵數據;
  • 第一個線程進入的時候,會將數據放到池化對象中,并賦值給 slot 的 item,并阻塞自己(通常不會立即阻塞,而是使用 yield 自旋一會兒),等待對方取值;
  • 當第二個線程進入的時候,會拿出存儲在 slot item 中的值,然后對 slot 的 match 賦值,并喚醒上次阻塞的線程;
  • 當第一個線程阻塞被喚醒后,說明對方取到值了,就獲取 slot 的 match 值,并重置 slot 的數據和池化對象的數據,并返回自己的數據;
  • 如果超時了,就返回 Time_out 對象;
  • 如果線程中斷了,就返回 null。

在該方法中,會返回 2 種結果,一是有效的 item,二是 null 要么是線程競爭使用 slot 了,創建了 arena 數組,要么是線程中斷了。

通過一副圖來看看具體邏輯 


arenaExchange() 方法

arenaExchange是基于arena屬性完成交換的,整體邏輯比較復雜,有以下幾個要點:

  • m的初始值就是0,index的初始值也是0,兩個都是大于等于0且i不大于m,當某個線程多次嘗試搶占index對應數組元素的Node都失敗的情形下則嘗試將m加1,然后搶占m加1對應的新數組元素,將其由null修改成當前線程關聯的Node,然后自旋等待匹配;如果自旋結束,沒有匹配的線程,則將m加1對應的新數組元素重新置為null,將m減1,然后再次for循環搶占其他為null的數組元素。極端并發下m會一直增加直到達到最大值FULL為止,達到FULL后只能通過for循環不斷嘗試與其他線程匹配或者搶占為null的數組元素,然后隨著并發減少,m會一直減少到0。通過這種動態調整m的方式可以避免過多的線程基于CAS修改同一個元素導致CAS失敗,提高匹配的效率,這種思想跟LongAdder的實現是一致的。
  • 只有當m等于0的時候才會通過Unsafe park方法讓線程休眠,如果不等于0,即此時存在多個并行的等待匹配的線程,則主要通過自旋的方式等待其他線程到來,這是因為交換動作本身是很快的很短暫的,通過自旋等待就可以讓多個等待的線程快速的完成匹配;只有當前只剩下一個線程的時候,此時m肯定等于0,短期內沒有匹配的線程,才會考慮通過park方法阻塞。
  1. // 搶占slot失敗后進入此方法,arena不為空     
  2. private final Object arenaExchange(Object item, boolean timed, long ns) { 
  3.     Node[] a = arena; 
  4.     Node p = participant.get(); 
  5.     // index初始為0 
  6.     for (int i = p.index;;) {                      // access slot at i 
  7.         int b, m, c; long j;                       // j is raw array offset 
  8.         // 在創建arena時,將本來的數組容量 << ASHIFT,為了避免數組元素落到了同一個高速緩存行 
  9.         // 這里獲取真實的數組元素索引時也需要 << ASHIFR 
  10.         Node q = (Node)U.getObjectVolatile(a, j = (i << ASHIFT) + ABASE); 
  11.         // 如果q不為null,則將對應的數組元素置為null,表示當前線程和該元素對應的線程匹配l 
  12.         if (q != null && U.compareAndSwapObject(a, j, q, null)) { 
  13.             Object v = q.item;                     // release 
  14.             q.match = item; // 保存item,交互成功 
  15.             Thread w = q.parked; 
  16.             if (w != null) // 喚醒等待的線程 
  17.                 U.unpark(w); 
  18.             return v; 
  19.         } 
  20.         // q為null 或者q不為null,cas搶占q失敗了 
  21.         // bound初始化時時SEQ,SEQ & MMASK 就是0,即m的初始值就0,m為0時,i肯定為0 
  22.         else if (i <= (m = (b = bound) & MMASK) && q == null) { 
  23.             p.item = item;                         // offer 
  24.             if (U.compareAndSwapObject(a, j, null, p)) { 
  25.                 long end = (timed && m == 0) ? System.nanoTime() + ns : 0L; 
  26.                 Thread t = Thread.currentThread(); // wait 
  27.                 for (int h = p.hash, spins = SPINS;;) { 
  28.                     Object v = p.match; 
  29.                     if (v != null) { 
  30.                         U.putOrderedObject(p, MATCH, null); 
  31.                         p.item = null;             // clear for next use 
  32.                         p.hash = h; 
  33.                         return v; 
  34.                     } 
  35.                     else if (spins > 0) { 
  36.                         h ^= h << 1; h ^= h >>> 3; h ^= h << 10; // xorshift 
  37.                         if (h == 0)                // initialize hash 
  38.                             h = SPINS | (int)t.getId(); 
  39.                         else if (h < 0 &&          // approx 50% true 
  40.                                  (--spins & ((SPINS >>> 1) - 1)) == 0) 
  41.                             Thread.yield();        // two yields per wait 
  42.                     } 
  43.                     else if (U.getObjectVolatile(a, j) != p) 
  44.                         spins = SPINS;       // releaser hasn't set match yet 
  45.                     else if (!t.isInterrupted() && m == 0 && 
  46.                              (!timed || 
  47.                               (ns = end - System.nanoTime()) > 0L)) { 
  48.                         U.putObject(t, BLOCKER, this); // emulate LockSupport 
  49.                         p.parked = t;              // minimize window 
  50.                         if (U.getObjectVolatile(a, j) == p) 
  51.                             U.park(false, ns); 
  52.                         p.parked = null
  53.                         U.putObject(t, BLOCKER, null); 
  54.                     } 
  55.                     else if (U.getObjectVolatile(a, j) == p && 
  56.                              U.compareAndSwapObject(a, j, p, null)) { 
  57.                         if (m != 0)                // try to shrink 
  58.                             U.compareAndSwapInt(this, BOUND, b, b + SEQ - 1); 
  59.                         p.item = null
  60.                         p.hash = h; 
  61.                         i = p.index >>>= 1;        // descend 
  62.                         if (Thread.interrupted()) 
  63.                             return null
  64.                         if (timed && m == 0 && ns <= 0L) 
  65.                             return TIMED_OUT; 
  66.                         break;                     // expired; restart 
  67.                     } 
  68.                 } 
  69.             } 
  70.             else 
  71.                 p.item = null;                     // clear offer 
  72.         } 
  73.         else { 
  74.             if (p.bound != b) {                    // stale; reset 
  75.                 p.bound = b; 
  76.                 p.collides = 0; 
  77.                 i = (i != m || m == 0) ? m : m - 1; 
  78.             } 
  79.             else if ((c = p.collides) < m || m == FULL || 
  80.                      !U.compareAndSwapInt(this, BOUND, b, b + SEQ + 1)) { 
  81.                 p.collides = c + 1; 
  82.                 i = (i == 0) ? m : i - 1;          // cyclically traverse 
  83.             } 
  84.             else 
  85.                 i = m + 1;                         // grow 
  86.             p.index = i; 
  87.         } 
  88.     } 

 總結

Exchange 和 SynchronousQueue 類似,都是通過兩個線程操作同一個對象實現數據交換,只不過就像我們開始說的,SynchronousQueue 使用的是同一個屬性,通過不同的 isData 來區分,多線程并發時,使用了隊列進行排隊。

Exchange 使用了一個對象里的兩個屬性,item 和 match,就不需要 isData 屬性了,因為在 Exchange 里面,沒有 isData 這個語義。而多線程并發時,使用數組來控制,每個線程訪問數組中不同的槽。

PS:以上代碼提交在 Github :

https://github.com/Niuh-Study/niuh-juc-final.git

PS:這里有一個技術交流群(扣扣群:1158819530),方便大家一起交流,持續學習,共同進步,有需要的可以加一下。

 

責任編輯:姜華 來源: 今日頭條
相關推薦

2020-12-03 11:15:21

CyclicBarri

2020-12-04 19:28:53

CountDownLaPhaserCyclicBarri

2020-11-30 16:01:03

Semaphore

2020-12-16 10:54:52

編程ForkJoin框架

2020-12-08 08:53:53

編程ThreadPoolE線程池

2017-09-19 14:53:37

Java并發編程并發代碼設計

2020-12-10 07:00:38

編程線程池定時任務

2020-11-13 08:42:24

Synchronize

2020-12-11 07:32:45

編程ThreadLocalJava

2012-03-09 10:44:11

Java

2025-04-25 08:00:00

volatileJava編程

2024-11-27 09:26:29

2019-11-07 09:20:29

Java線程操作系統

2021-03-10 15:59:39

JavaSynchronize并發編程

2020-12-07 09:40:19

Future&Futu編程Java

2017-01-10 13:39:57

Python線程池進程池

2014-05-08 10:39:55

Python并發編程

2016-10-21 11:04:07

JavaScript異步編程原理解析

2023-07-06 08:06:47

LockCondition公平鎖

2020-07-06 08:03:32

Java悲觀鎖樂觀鎖
點贊
收藏

51CTO技術棧公眾號

成年人黄色在线观看| 亚洲高清久久网| 亚洲 欧洲 日韩| 亚洲第一成年人网站| 亚洲欧美日本日韩| 日韩有码在线播放| 国产精品无码在线| 久久av影院| 午夜在线成人av| 亚洲欧美国产精品桃花| 亚洲欧美国产高清va在线播放| 久久国产88| 欧美成年人在线观看| 国产精品探花一区二区在线观看| 国产乱子精品一区二区在线观看| 亚洲一区在线观看视频| 色狠狠久久av五月综合|| 国内老熟妇对白hdxxxx| 日韩电影免费一区| 91av在线免费观看视频| 国产suv精品一区二区68| 妖精视频一区二区三区免费观看| 欧美一区二区在线看| 成年人小视频网站| 阿v视频在线观看| 亚洲免费资源在线播放| 日韩.欧美.亚洲| 亚洲av成人无码久久精品老人| 国产在线不卡视频| 国产精品亚洲美女av网站| 依依成人综合网| 91久久综合| 久久久久亚洲精品| 久久99久久99精品免费看小说| 自拍亚洲一区| 日韩激情第一页| 无码任你躁久久久久久老妇| 国产视频一区二| 欧美日韩成人在线| 欧美日韩亚洲自拍| 亚洲a∨精品一区二区三区导航| 亚洲电影第三页| 成人在线视频一区二区三区| 免费黄网站在线播放| 欧美一区二区在线不卡| 91猫先生在线| 丁香花在线电影小说观看| 亚洲欧美一区二区不卡| 亚洲精品免费在线看| 青青操在线视频| 91免费视频观看| 精品欧美日韩在线| 欧美视频一二区| 丁香婷婷综合激情五月色| 91九色在线观看| 99热这里只有精品66| 国产一区二区三区在线观看精品 | 极品尤物一区二区| 精品一区二区三| 中文字幕日韩高清| 人妻互换一区二区激情偷拍| 精品国产一级毛片| 综合欧美国产视频二区| 男人晚上看的视频| 欧美成人亚洲| 97人人做人人爱| 6080午夜伦理| 日韩激情在线观看| 国产日韩欧美在线播放| 国产又大又粗又硬| 国产成人av一区| 激情久久av| 黄色小视频在线免费观看| 亚洲国产精品ⅴa在线观看| 亚洲一区二区三区午夜| av在线官网| 亚洲成人av福利| 亚洲乱码中文字幕久久孕妇黑人| 欧美色999| 91精品黄色片免费大全| 91亚洲一线产区二线产区 | 国产最新精品精品你懂的| 91夜夜揉人人捏人人添红杏| 懂色av成人一区二区三区| 成人sese在线| 欧美在线视频一区二区三区| 婷婷五月在线视频| 亚洲影院免费观看| 亚洲精品中文字幕无码蜜桃| 视频欧美精品| 亚洲国产精品久久精品怡红院| 亚洲天堂久久新| 婷婷精品进入| 欧美影院久久久| 国产精品无码白浆高潮| 99精品桃花视频在线观看| 天堂精品一区二区三区| 色操视频在线| 欧美天天综合网| av不卡中文字幕| 欧美日韩一二| 性日韩欧美在线视频| 一区二区日韩在线观看| 99久久综合99久久综合网站| 亚洲国产欧美不卡在线观看| 国产福利片在线观看| 欧美日韩成人综合天天影院| 性色av蜜臀av色欲av| 重囗味另类老妇506070| 日产精品99久久久久久| 午夜精品久久久久久久99老熟妇| 国产日韩欧美精品综合| 男女超爽视频免费播放| 91麻豆精品国产综合久久久 | 国产亚洲精久久久久久| 可以在线看黄的网站| 网友自拍亚洲| 亚洲激情视频在线观看| 欧美手机在线观看| 三级欧美在线一区| 极品尤物一区二区三区| 国产黄a三级三级三级av在线看 | 欧美成人精品xxx| 999视频在线| www.日本不卡| bt天堂新版中文在线地址| 激情中国色综合| 在线播放日韩专区| 香蕉影院在线观看| 99久久综合国产精品| av一区二区三区免费观看| 国产精品久久久久久久久久辛辛 | kk眼镜猥琐国模调教系列一区二区| 亚洲一区二区三区精品动漫| 亚洲第一会所| 亚洲欧美日韩综合| 国产污污视频在线观看 | 日韩精品成人一区二区在线| 久久久水蜜桃| 亚洲女同志freevdieo| 亚洲黄在线观看| 久久精品久久国产| 岛国一区二区在线观看| 久久久久久久9| 日韩精品一级| 欧美大秀在线观看| a视频免费在线观看| 136国产福利精品导航| www.久久91| 四季av在线一区二区三区| 国产精品最新在线观看| 日本天堂在线观看| 666欧美在线视频| 日韩精品一区二区亚洲av性色 | 国产亚洲一二三区| 国产chinese精品一区二区| 日本成a人片在线观看| 欧美丝袜自拍制服另类| 黄色国产在线播放| 麻豆成人久久精品二区三区红| 亚欧洲精品在线视频免费观看| 精品123区| 中文字幕亚洲无线码a| 中文字幕码精品视频网站| 国产精品色呦呦| 午夜av中文字幕| 欧美亚韩一区| 玛丽玛丽电影原版免费观看1977| 一区二区三区电影大全| 国产亚洲精品美女久久久| 国产精品无码一区| 亚洲人成网站在线| 国产乱淫av麻豆国产免费| 亚洲三级国产| 日本午夜精品一区二区三区| 国产欧美在线观看免费| 欧美超级免费视 在线| 黄色小视频免费观看| 欧美性精品220| 天天看天天摸天天操| 成人午夜视频免费看| 无码精品国产一区二区三区免费| 欧美在线电影| 粉嫩av一区二区三区免费观看| 涩涩网在线视频| 久久久精品国产| 天天爱天天干天天操| 欧美色区777第一页| 黄色一级视频免费| 久久嫩草精品久久久精品| www.久久91| 亚洲经典在线| 91制片厂免费观看| 天天久久夜夜| 91精品综合视频| 中文字幕在线视频久| 久久精品国产一区二区电影| 日韩在线视频第一页| 欧美日韩综合不卡| 黄色一级片免费看| 最近日韩中文字幕| 久久精品一区二区免费播放| 国产中文字幕精品| 日本女优爱爱视频| 亚洲毛片网站| 91九色国产ts另类人妖| 国产精品亚洲人成在99www| 成人综合电影| 欧美亚洲黄色| 国产999视频| 爱情岛论坛亚洲品质自拍视频网站| 中文字幕精品www乱入免费视频| 国内爆初菊对白视频| 欧美电影影音先锋| 懂色av中文字幕| 欧美日韩免费看| 久青草免费视频| 成人免费在线视频观看| 国产在线观看h| av日韩在线网站| 亚洲一区二区图片| 久久精品国产99国产精品| 国产黄色特级片| 亚洲国产三级| 免费在线看黄色片| 欧美激情亚洲| 在线观看污视频| 一区二区三区四区在线观看国产日韩 | 久热精品在线观看| 亚洲人成伊人成综合网小说| 国产91在线播放九色| 中文av一区特黄| 国产一二三四区在线| 国产欧美一区二区精品性| 中文字幕一区二区人妻在线不卡| 成人激情小说网站| 99久久久无码国产精品性波多| 国产精品亚洲专一区二区三区 | 成人自拍在线| 国产成人成网站在线播放青青| 日韩精品一级| 国产精品.com| 红杏aⅴ成人免费视频| 成人动漫视频在线观看完整版 | 在线看片福利| 欧亚精品在线观看| 北岛玲heyzo一区二区| 日韩av电影手机在线观看| 成人免费看黄| 国产精品福利无圣光在线一区| 三上悠亚激情av一区二区三区| 欧美中文字幕视频在线观看| 成人免费直播| 国产精品一区二区三区久久| 香蕉久久一区| 91精品国产一区二区三区动漫| 香蕉免费一区二区三区在线观看| 成人三级在线| 日韩系列在线| 日本一区二区精品| 日韩国产在线| 好吊色视频988gao在线观看| 国内精品久久久久久久影视蜜臀 | 亚洲欧洲免费无码| 91成人精品视频| 久久精品xxx| 另类激情亚洲| 99re6在线观看| 福利视频网站一区二区三区| 黄色av网址在线观看| 久久免费国产精品| 男人的午夜天堂| 亚洲成人自拍偷拍| 波多野结衣午夜| 91麻豆精品国产自产在线| 亚洲av永久无码国产精品久久 | 精品一区二区6| 一区二区免费视频| 亚洲天堂一区在线| 欧美日韩国产首页| 人妻偷人精品一区二区三区| 国产亚洲综合久久| 丝袜中文在线| 国产精品6699| 香蕉大人久久国产成人av| 蜜桃成人免费视频| 亚洲国产精品久久久天堂| 青青青青草视频| 久久99精品久久久| 欧美深性狂猛ⅹxxx深喉| 国产免费成人在线视频| 久久久香蕉视频| 在线观看91精品国产入口| 精品区在线观看| 在线a欧美视频| 草美女在线观看| 国产欧美久久一区二区| 极品束缚调教一区二区网站| 伊人色综合久久天天五月婷| 亚洲深夜av| 亚洲丝袜在线观看| 亚洲国产精品传媒在线观看| 国产午夜精品无码| 欧美嫩在线观看| 你懂的在线免费观看| 欧美高清激情视频| 欧美xxxx性| 蜜桃久久影院| 亚洲视频观看| 不用播放器的免费av| 久久麻豆一区二区| 国产无套粉嫩白浆内谢| 欧美高清视频在线高清观看mv色露露十八 | 亚洲精品电影网| 亚洲综合伊人久久大杳蕉| 国产精品露脸自拍| 丝袜久久网站| 日韩国产一级片| 国产成人综合在线| 青花影视在线观看免费高清| 在线一区二区三区四区五区| 手机看片1024国产| 欧美精品久久一区二区| 国产专区精品| 小说区视频区图片区| 免费在线观看日韩欧美| 色噜噜日韩精品欧美一区二区| 亚洲国产成人av好男人在线观看| 国产女人高潮时对白| 精品国产欧美一区二区三区成人| 国产在线精彩视频| 国产欧美日韩综合一区在线观看| 午夜精品久久99蜜桃的功能介绍| 日本不卡一区在线| 国产精品女同互慰在线看| 久久久999久久久| 亚洲人永久免费| 成人av观看| 欧美精品一区二区三区四区五区 | 日本人视频jizz页码69| 国产色一区二区| 婷婷激情五月综合| 亚洲人成人99网站| 另类图片综合电影| 日韩欧美视频第二区| 日韩av一区二区三区四区| 人妻无码一区二区三区| 欧美视频一区二区三区…| 欧洲天堂在线观看| 国产精国产精品| 日韩欧美视频| 在线免费观看av网| 一区二区三区中文字幕在线观看| 亚洲国产日韩在线观看| 国内精品模特av私拍在线观看| 精品视频高潮| 波多野结衣家庭教师视频| 国产拍揄自揄精品视频麻豆| 中文字幕欧美人妻精品一区蜜臀 | 日韩高清国产精品| 蜜桃视频在线观看一区| 日本精品人妻无码77777| 欧美成人精品高清在线播放| аⅴ资源天堂资源库在线| 欧洲精品亚洲精品| 激情亚洲综合在线| 久久婷婷一区二区| 亚洲精品视频久久| 久久精品黄色| 日本天堂免费a| 久久亚区不卡日本| 亚洲一区二区三区高清视频| 久久综合色影院| 奇米影视777在线欧美电影观看| 久久视频这里有精品| 日本一区二区动态图| a毛片在线免费观看| 欧美亚洲另类在线| 99精品在线观看| 国产二级一片内射视频播放| 在线观看一区不卡| 亚洲第一图区| 欧美韩国日本精品一区二区三区| 麻豆极品一区二区三区| 精品肉丝脚一区二区三区| 国产亚洲精品久久久久动| 亚洲不卡视频| 国产精品涩涩涩视频网站| 亚洲男人都懂的| 韩日视频在线| 成人资源视频网站免费| 日本强好片久久久久久aaa| 妺妺窝人体色www聚色窝仙踪 | 欧美精品aⅴ在线视频| 国产v日韩v欧美v| 亚洲日本japanese丝袜| www.成人在线| 国产a级免费视频| 国产精品久久久久久久久久免费 | 国产精品高潮久久久久无|