精品欧美一区二区三区在线观看 _久久久久国色av免费观看性色_国产精品久久在线观看_亚洲第一综合网站_91精品又粗又猛又爽_小泽玛利亚一区二区免费_91亚洲精品国偷拍自产在线观看 _久久精品视频在线播放_美女精品久久久_欧美日韩国产成人在线

聊一聊時間輪的實現

開發 后端
在netty 和kafka 這兩種優秀的中間件中,都有時間輪的實現。文章最后,我們模擬kafka 中scala 的代碼實現java版的時間輪。

[[414553]]

上一篇我們講了定時器的幾種實現,分析了在大數據量高并發的場景下這幾種實現方式就有點力不從心了,從而引出時間輪這種數據結構。在netty 和kafka 這兩種優秀的中間件中,都有時間輪的實現。文章最后,我們模擬kafka 中scala 的代碼實現java版的時間輪。

Netty 的時間輪實現

接口定義

Netty 的實現自定義了一個超時器的接口io.netty.util.Timer,其方法如下:

  1. public interface Timer 
  2.     //新增一個延時任務,入參為定時任務TimerTask,和對應的延遲時間 
  3.     Timeout newTimeout(TimerTask task, long delay, TimeUnit unit); 
  4.     //停止時間輪的運行,并且返回所有未被觸發的延時任務 
  5.     Set < Timeout > stop(); 
  6. public interface Timeout 
  7.     Timer timer(); 
  8.     TimerTask task(); 
  9.     boolean isExpired(); 
  10.     boolean isCancelled(); 
  11.     boolean cancel(); 

Timeout接口是對延遲任務的一個封裝,其接口方法說明其實現內部需要維持該延遲任務的狀態。后續我們分析其實現內部代碼時可以更容易的看到。

Timer接口有唯一實現HashedWheelTimer。首先來看其構造方法,如下:

  1. public HashedWheelTimer(ThreadFactory threadFactory, long tickDuration, TimeUnit unit, int ticksPerWheel, boolean leakDetection, long maxPendingTimeouts) 
  2.     //省略代碼,省略參數非空檢查內容。 
  3.     wheel = createWheel(ticksPerWheel); 
  4.     mask = wheel.length - 1; 
  5.     //省略代碼,省略槽位時間范圍檢查,避免溢出以及小于 1 毫秒。 
  6.     workerThread = threadFactory.newThread(worker); 
  7.     //省略代碼,省略資源泄漏追蹤設置以及時間輪實例個數檢查 

mask 的設計和HashMap一樣,通過限制數組的大小為2的次方,利用位運算來替代取模運算,提高性能。

構建循環數組

首先是方法createWheel,用于創建時間輪的核心數據結構,循環數組。來看下其方法內容

  1. private static HashedWheelBucket[] createWheel(int ticksPerWheel) 
  2.     //省略代碼,確認 ticksPerWheel 處于正確的區間 
  3.     //將 ticksPerWheel 規范化為 2 的次方冪大小。 
  4.     ticksPerWheel = normalizeTicksPerWheel(ticksPerWheel); 
  5.     HashedWheelBucket[] wheel = new HashedWheelBucket[ticksPerWheel]; 
  6.     for(int i = 0; i < wheel.length; i++) 
  7.     { 
  8.         wheel[i] = new HashedWheelBucket(); 
  9.     } 
  10.     return wheel; 

數組的長度為 2 的次方冪方便進行求商和取余計算。

HashedWheelBucket內部存儲著由HashedWheelTimeout節點構成的雙向鏈表,并且存儲著鏈表的頭節點和尾結點,方便于任務的提取和插入。

新增延遲任務

方法HashedWheelTimer#newTimeout用于新增延遲任務,下面來看下代碼:

  1. public Timeout newTimeout(TimerTask task, long delay, TimeUnit unit) 
  2.     //省略代碼,用于參數檢查 
  3.     start(); 
  4.     long deadline = System.nanoTime() + unit.toNanos(delay) - startTime; 
  5.     if(delay > 0 && deadline < 0) 
  6.     { 
  7.         deadline = Long.MAX_VALUE; 
  8.     } 
  9.     HashedWheelTimeout timeout = new HashedWheelTimeout(this, task, deadline); 
  10.     timeouts.add(timeout); 
  11.     return timeout; 

可以看到任務并沒有直接添加到時間輪中,而是先入了一個 mpsc 隊列,我簡單說下 mpsc【多生產者單一消費者隊列】 是 JCTools 中的并發隊列,用在多個生產者可同時訪問隊列,但只有一個消費者會訪問隊列的情況。,采用這個模式主要出于提升并發性能考慮,因為這個隊列只有線程workerThread會進行任務提取操作。

工作線程如何執行

  1. public void run() 
  2.     {//代碼塊① 
  3.         startTime = System.nanoTime(); 
  4.         if(startTime == 0) 
  5.         { 
  6.             //使用startTime==0 作為線程進入工作狀態模式標識,因此這里重新賦值為1 
  7.             startTime = 1; 
  8.         } 
  9.         //通知外部初始化工作線程的線程,工作線程已經啟動完畢 
  10.         startTimeInitialized.countDown(); 
  11.     } 
  12.     {//代碼塊② 
  13.         do { 
  14.             final long deadline = waitForNextTick(); 
  15.             if(deadline > 0) 
  16.             { 
  17.                 int idx = (int)(tick & mask); 
  18.                 processCancelledTasks(); 
  19.                 HashedWheelBucket bucket = wheel[idx]; 
  20.                 transferTimeoutsToBuckets(); 
  21.                 bucket.expireTimeouts(deadline); 
  22.                 tick++; 
  23.             } 
  24.         } while (WORKER_STATE_UPDATER.get(HashedWheelTimer.this) == WORKER_STATE_STARTED); 
  25.     } 
  26.     {//代碼塊③ 
  27.         for(HashedWheelBucket bucket: wheel) 
  28.         { 
  29.             bucket.clearTimeouts(unprocessedTimeouts); 
  30.         } 
  31.         for(;;) 
  32.         { 
  33.             HashedWheelTimeout timeout = timeouts.poll(); 
  34.             if(timeout == null
  35.             { 
  36.                 break; 
  37.             } 
  38.             if(!timeout.isCancelled()) 
  39.             { 
  40.                 unprocessedTimeouts.add(timeout); 
  41.             } 
  42.         } 
  43.         processCancelledTasks(); 
  44.     } 

看 waitForNextTick,是如何得到下一次執行時間的。

  1. private long waitForNextTick() 
  2.     long deadline = tickDuration * (tick + 1);//計算下一次需要檢查的時間 
  3.     for(;;) 
  4.     { 
  5.         final long currentTime = System.nanoTime() - startTime; 
  6.         long sleepTimeMs = (deadline - currentTime + 999999) / 1000000; 
  7.         if(sleepTimeMs <= 0)//說明時間已經到了 
  8.         { 
  9.             if(currentTime == Long.MIN_VALUE) 
  10.             { 
  11.                 return -Long.MAX_VALUE; 
  12.             } 
  13.             else 
  14.             { 
  15.                 return currentTime; 
  16.             } 
  17.         } 
  18.         //windows 下有bug  sleep 必須是10 的倍數 
  19.         if(PlatformDependent.isWindows()) 
  20.         { 
  21.             sleepTimeMs = sleepTimeMs / 10 * 10; 
  22.         } 
  23.         try 
  24.         { 
  25.             Thread.sleep(sleepTimeMs);// 等待時間到來 
  26.         } 
  27.         catch(InterruptedException ignored) 
  28.         { 
  29.             if(WORKER_STATE_UPDATER.get(HashedWheelTimer.this) == WORKER_STATE_SHUTDOWN) 
  30.             { 
  31.                 return Long.MIN_VALUE; 
  32.             } 
  33.         } 
  34.     } 

簡單的說就是通過 tickDuration 和此時已經滴答的次數算出下一次需要檢查的時間,時候未到就sleep等著。

任務如何入槽的。

  1. private void transferTimeoutsToBuckets() { 
  2.             //最多處理100000 怕任務延遲 
  3.             for(int i = 0; i < 100000; ++i) { 
  4.                 //從隊列里面拿出任務呢 
  5.                 HashedWheelTimer.HashedWheelTimeout timeout = (HashedWheelTimer.HashedWheelTimeout)HashedWheelTimer.this.timeouts.poll(); 
  6.                 if (timeout == null) { 
  7.                     break; 
  8.                 } 
  9.  
  10.                 if (timeout.state() != 1) { 
  11.                     long calculated = timeout.deadline / HashedWheelTimer.this.tickDuration; 
  12.                     //計算排在第幾輪 
  13.                     timeout.remainingRounds = (calculated - this.tick) / (long)HashedWheelTimer.this.wheel.length; 
  14.                     long ticks = Math.max(calculated, this.tick); 
  15.                     //計算放在哪個槽中 
  16.                     int stopIndex = (int)(ticks & (long)HashedWheelTimer.this.mask); 
  17.                     HashedWheelTimer.HashedWheelBucket bucket = HashedWheelTimer.this.wheel[stopIndex]; 
  18.                     //入槽,就是鏈表入隊列 
  19.                     bucket.addTimeout(timeout); 
  20.                 } 
  21.             } 
  22.  
  23.         } 

如何執行的

  1. public void expireTimeouts(long deadline) { 
  2.             HashedWheelTimer.HashedWheelTimeout next
  3.             //拿到槽的鏈表頭部 
  4.             for(HashedWheelTimer.HashedWheelTimeout timeout = this.head; timeout != null; timeout = next) { 
  5.                 boolean remove = false
  6.                 if (timeout.remainingRounds <= 0L) {//如果到這輪l  
  7.                     if (timeout.deadline > deadline) { 
  8.                         throw new IllegalStateException(String.format("timeout.deadline (%d) > deadline (%d)", timeout.deadline, deadline)); 
  9.                     } 
  10.  
  11.                     timeout.expire();//執行 
  12.                     remove = true
  13.                 } else if (timeout.isCancelled()) { 
  14.                     remove = true
  15.                 } else { 
  16.                     --timeout.remainingRounds;//輪數-1 
  17.                 } 
  18.  
  19.                 next = timeout.next;//繼續下一任務 
  20.                 if (remove) { 
  21.                     this.remove(timeout);//移除完成的任務 
  22.                 } 
  23.             } 
  24.         } 

就是通過輪數和時間雙重判斷,執行完了移除任務。

小結一下

總體上看 Netty 的實現就是上文說的時間輪通過輪數的實現,完全一致。可以看出時間精度由 TickDuration 把控,并且工作線程的除了處理執行到時的任務還做了其他操作,因此任務不一定會被精準的執行。

而且任務的執行如果不是新起一個線程,或者將任務扔到線程池執行,那么耗時的任務會阻塞下個任務的執行。

并且會有很多無用的 tick 推進,例如 TickDuration 為1秒,此時就一個延遲350秒的任務,那就是有349次無用的操作。出現空推。

但是從另一面來看,如果任務都執行很快(當然你也可以異步執行),并且任務數很多,通過分批執行,并且增刪任務的時間復雜度都是O(1)來說。時間輪還是比通過優先隊列實現的延時任務來的合適些。

Kafka 中的時間輪

上面我們說到 Kafka 中的時間輪是多層次時間輪實現,總的而言實現和上述說的思路一致。不過細節有些不同,并且做了點優化。

先看看添加任務的方法。在添加的時候就設置任務執行的絕對時間。

Kafka 中的時間輪

上面我們說到 Kafka 中的時間輪是多層次時間輪實現,總的而言實現和上述說的思路一致。不過細節有些不同,并且做了點優化。

先看看添加任務的方法。在添加的時候就設置任務執行的絕對時間。

  1. def add(timerTaskEntry: TimerTaskEntry): Boolean = { 
  2.     val expiration = timerTaskEntry.expirationMs 
  3.  
  4.     if (timerTaskEntry.cancelled) { 
  5.       // Cancelled 
  6.       false 
  7.     } else if (expiration < currentTime + tickMs) { 
  8.       // 如果已經到期 返回false 
  9.       // Already expired 
  10.       false 
  11.     } else if (expiration < currentTime + interval) {//如果在本層范圍內 
  12.       // Put in its own bucket 
  13.       val virtualId = expiration / tickMs 
  14.       val bucket = buckets((virtualId % wheelSize.toLong).toInt)//計算槽位 
  15.       bucket.add(timerTaskEntry)//添加到槽內雙向鏈表中 
  16.  
  17.       // Set the bucket expiration time 
  18.       if (bucket.setExpiration(virtualId * tickMs)) {//更新槽時間 
  19.         // The bucket needs to be enqueued because it was an expired bucket 
  20.         // We only need to enqueue the bucket when its expiration time has changed, i.e. the wheel has advanced 
  21.         // and the previous buckets gets reused; further calls to set the expiration within the same wheel cycle 
  22.         // will pass in the same value and hence return false, thus the bucket with the same expiration will not 
  23.         // be enqueued multiple times. 
  24.         queue.offer(bucket)//將槽加入DelayQueue,由DelayQueue來推進執行 
  25.       } 
  26.       true 
  27.     } else { 
  28.       //如果超過本層能表示的延遲時間,則將任務添加到上層。這里看到上層是按需創建的。 
  29.       // Out of the interval. Put it into the parent timer 
  30.       if (overflowWheel == null) addOverflowWheel() 
  31.       overflowWheel.add(timerTaskEntry) 
  32.     } 
  33.   } 

那么時間輪是如何推動的呢?Netty 中是通過固定的時間間隔掃描,時候未到就等待來進行時間輪的推動。上面我們分析到這樣會有空推進的情況。

而 Kafka 就利用了空間換時間的思想,通過 DelayQueue,來保存每個槽,通過每個槽的過期時間排序。這樣擁有最早需要執行任務的槽會有優先獲取。如果時候未到,那么 delayQueue.poll 就會阻塞著,這樣就不會有空推進的情況發送。

我們來看下推進的方法。

  1. def advanceClock(timeoutMs: Long): Boolean = { 
  2. //從延遲隊列中獲取槽 
  3.     var bucket = delayQueue.poll(timeoutMs, TimeUnit.MILLISECONDS) 
  4.     if (bucket != null) { 
  5.       writeLock.lock() 
  6.       try { 
  7.         while (bucket != null) { 
  8.           // 更新每層時間輪的currentTime 
  9.           timingWheel.advanceClock(bucket.getExpiration()) 
  10.           //因為更新了currentTime,進行一波任務的重新插入,來實現任務時間輪的降級 
  11.           bucket.flush(reinsert) 
  12.           //獲取下一個槽 
  13.           bucket = delayQueue.poll() 
  14.         } 
  15.       } finally { 
  16.         writeLock.unlock() 
  17.       } 
  18.       true 
  19.     } else { 
  20.       false 
  21.     } 
  22.   } 
  23.    
  24.  // Try to advance the clock 
  25.   def advanceClock(timeMs: Long): Unit = { 
  26.     if (timeMs >= currentTime + tickMs) { 
  27.      // 必須是tickMs 整數倍 
  28.       currentTime = timeMs - (timeMs % tickMs) 
  29.       //推動上層時間輪也更新currentTime 
  30.       // Try to advance the clock of the overflow wheel if present 
  31.       if (overflowWheel != null) overflowWheel.advanceClock(currentTime) 
  32.     } 
  33.   } 

從上面的 add 方法我們知道每次對比都是根據expiration < currentTime + interval 來進行對比的,而advanceClock 就是用來推進更新 currentTime 的。

小結一下

Kafka 用了多層次時間輪來實現,并且是按需創建時間輪,采用任務的絕對時間來判斷延期,并且對于每個槽(槽內存放的也是任務的雙向鏈表)都會維護一個過期時間,利用 DelayQueue 來對每個槽的過期時間排序,來進行時間的推進,防止空推進的存在。

每次推進都會更新 currentTime 為當前時間戳,當然做了點微調使得 currentTime 是 tickMs 的整數倍。并且每次推進都會把能降級的任務重新插入降級。

可以看到這里的 DelayQueue 的元素是每個槽,而不是任務,因此數量就少很多了,這應該是權衡了對于槽操作的延時隊列的時間復雜度與空推進的影響。

模擬kafka的時間輪實現java版

定時器

  1. public class Timer { 
  2.  
  3.     /** 
  4.      * 底層時間輪 
  5.      */ 
  6.     private TimeWheel timeWheel; 
  7.  
  8.     /** 
  9.      * 一個Timer只有一個delayQueue 
  10.      */ 
  11.     private DelayQueue<TimerTaskList> delayQueue = new DelayQueue<>(); 
  12.  
  13.     /** 
  14.      * 過期任務執行線程 
  15.      */ 
  16.     private ExecutorService workerThreadPool; 
  17.  
  18.     /** 
  19.      * 輪詢delayQueue獲取過期任務線程 
  20.      */ 
  21.     private ExecutorService bossThreadPool; 
  22.  
  23.     /** 
  24.      * 構造函數 
  25.      */ 
  26.     public Timer() { 
  27.         timeWheel = new TimeWheel(1000, 2, System.currentTimeMillis(), delayQueue); 
  28.         workerThreadPool = Executors.newFixedThreadPool(100); 
  29.         bossThreadPool = Executors.newFixedThreadPool(1); 
  30.         //20ms獲取一次過期任務 
  31.         bossThreadPool.submit(() -> { 
  32.             while (true) { 
  33.                 this.advanceClock(1000); 
  34.             } 
  35.         }); 
  36.     } 
  37.  
  38.     /** 
  39.      * 添加任務 
  40.      */ 
  41.     public void addTask(TimerTask timerTask) { 
  42.         //添加失敗任務直接執行 
  43.         if (!timeWheel.addTask(timerTask)) { 
  44.             workerThreadPool.submit(timerTask.getTask()); 
  45.         } 
  46.     } 
  47.  
  48.     /** 
  49.      * 獲取過期任務 
  50.      */ 
  51.     private void advanceClock(long timeout) { 
  52.         try { 
  53.             TimerTaskList timerTaskList = delayQueue.poll(timeout, TimeUnit.MILLISECONDS); 
  54.             if (timerTaskList != null) { 
  55.  
  56.                 //推進時間 
  57.                 timeWheel.advanceClock(timerTaskList.getExpiration()); 
  58.                 //執行過期任務(包含降級操作) 
  59.                 timerTaskList.flush(this::addTask); 
  60.             } 
  61.         } catch (Exception e) { 
  62.             e.printStackTrace(); 
  63.         } 
  64.     } 

任務

  1. public class TimerTask { 
  2.  
  3.     /** 
  4.      * 延遲時間 
  5.      */ 
  6.     private long delayMs; 
  7.  
  8.     /** 
  9.      * 任務 
  10.      */ 
  11.     private MyThread task; 
  12.  
  13.     /** 
  14.      * 時間槽 
  15.      */ 
  16.     protected TimerTaskList timerTaskList; 
  17.  
  18.     /** 
  19.      * 下一個節點 
  20.      */ 
  21.     protected TimerTask next
  22.  
  23.     /** 
  24.      * 上一個節點 
  25.      */ 
  26.     protected TimerTask pre; 
  27.  
  28.     /** 
  29.      * 描述 
  30.      */ 
  31.     public String desc
  32.  
  33.     public TimerTask(long delayMs, MyThread task) { 
  34.         this.delayMs = System.currentTimeMillis() + delayMs; 
  35.         this.task = task; 
  36.         this.timerTaskList = null
  37.         this.next = null
  38.         this.pre = null
  39.     } 
  40.  
  41.     public MyThread getTask() { 
  42.         return task; 
  43.     } 
  44.  
  45.     public long getDelayMs() { 
  46.         return delayMs; 
  47.     } 
  48.  
  49.     @Override 
  50.     public String toString() { 
  51.         return desc
  52.     } 

時間槽

  1. public class TimerTaskList implements Delayed { 
  2.  
  3.     /** 
  4.      * 過期時間 
  5.      */ 
  6.     private AtomicLong expiration = new AtomicLong(-1L); 
  7.  
  8.     /** 
  9.      * 根節點 
  10.      */ 
  11.     private TimerTask root = new TimerTask(-1L, null); 
  12.  
  13.     { 
  14.         root.pre = root; 
  15.         root.next = root; 
  16.     } 
  17.  
  18.     /** 
  19.      * 設置過期時間 
  20.      */ 
  21.     public boolean setExpiration(long expire) { 
  22.         return expiration.getAndSet(expire) != expire; 
  23.     } 
  24.  
  25.     /** 
  26.      * 獲取過期時間 
  27.      */ 
  28.     public long getExpiration() { 
  29.         return expiration.get(); 
  30.     } 
  31.  
  32.     /** 
  33.      * 新增任務 
  34.      */ 
  35.     public void addTask(TimerTask timerTask) { 
  36.         synchronized (this) { 
  37.             if (timerTask.timerTaskList == null) { 
  38.                 timerTask.timerTaskList = this; 
  39.                 TimerTask tail = root.pre; 
  40.                 timerTask.next = root; 
  41.                 timerTask.pre = tail; 
  42.                 tail.next = timerTask; 
  43.                 root.pre = timerTask; 
  44.             } 
  45.         } 
  46.     } 
  47.  
  48.     /** 
  49.      * 移除任務 
  50.      */ 
  51.     public void removeTask(TimerTask timerTask) { 
  52.         synchronized (this) { 
  53.             if (timerTask.timerTaskList.equals(this)) { 
  54.                 timerTask.next.pre = timerTask.pre; 
  55.                 timerTask.pre.next = timerTask.next
  56.                 timerTask.timerTaskList = null
  57.                 timerTask.next = null
  58.                 timerTask.pre = null
  59.             } 
  60.         } 
  61.     } 
  62.  
  63.     /** 
  64.      * 重新分配 
  65.      */ 
  66.     public synchronized void flush(Consumer<TimerTask> flush) { 
  67.         TimerTask timerTask = root.next
  68.         while (!timerTask.equals(root)) { 
  69.             this.removeTask(timerTask); 
  70.             flush.accept(timerTask); 
  71.             timerTask = root.next
  72.         } 
  73.         expiration.set(-1L); 
  74.     } 
  75.  
  76.     @Override 
  77.     public long getDelay(TimeUnit unit) { 
  78.         return Math.max(0, unit.convert(expiration.get() - System.currentTimeMillis(), TimeUnit.MILLISECONDS)); 
  79.     } 
  80.  
  81.     @Override 
  82.     public int compareTo(Delayed o) { 
  83.         if (o instanceof TimerTaskList) { 
  84.             return Long.compare(expiration.get(), ((TimerTaskList) o).expiration.get()); 
  85.         } 
  86.         return 0; 
  87.     } 

時間輪

  1. public class TimeWheel { 
  2.  
  3.     /** 
  4.      * 一個時間槽的范圍 
  5.      */ 
  6.     private long tickMs; 
  7.  
  8.     /** 
  9.      * 時間輪大小 
  10.      */ 
  11.     private int wheelSize; 
  12.  
  13.     /** 
  14.      * 時間跨度 
  15.      */ 
  16.     private long interval; 
  17.  
  18.     /** 
  19.      * 時間槽 
  20.      */ 
  21.     private TimerTaskList[] timerTaskLists; 
  22.  
  23.     /** 
  24.      * 當前時間 
  25.      */ 
  26.     private long currentTime; 
  27.  
  28.     /** 
  29.      * 上層時間輪 
  30.      */ 
  31.     private volatile TimeWheel overflowWheel; 
  32.  
  33.     /** 
  34.      * 一個Timer只有一個delayQueue 
  35.      */ 
  36.     private DelayQueue<TimerTaskList> delayQueue; 
  37.  
  38.     public TimeWheel(long tickMs, int wheelSize, long currentTime, DelayQueue<TimerTaskList> delayQueue) { 
  39.         this.currentTime = currentTime; 
  40.         this.tickMs = tickMs; 
  41.         this.wheelSize = wheelSize; 
  42.         this.interval = tickMs * wheelSize; 
  43.         this.timerTaskLists = new TimerTaskList[wheelSize]; 
  44.         //currentTime為tickMs的整數倍 這里做取整操作 
  45.         this.currentTime = currentTime - (currentTime % tickMs); 
  46.         this.delayQueue = delayQueue; 
  47.         for (int i = 0; i < wheelSize; i++) { 
  48.             timerTaskLists[i] = new TimerTaskList(); 
  49.         } 
  50.     } 
  51.  
  52.     /** 
  53.      * 創建或者獲取上層時間輪 
  54.      */ 
  55.     private TimeWheel getOverflowWheel() { 
  56.         if (overflowWheel == null) { 
  57.             synchronized (this) { 
  58.                 if (overflowWheel == null) { 
  59.                     overflowWheel = new TimeWheel(interval, wheelSize, currentTime, delayQueue); 
  60.                 } 
  61.             } 
  62.         } 
  63.         return overflowWheel; 
  64.     } 
  65.  
  66.     /** 
  67.      * 添加任務到時間輪 
  68.      */ 
  69.     public boolean addTask(TimerTask timerTask) { 
  70.         long expiration = timerTask.getDelayMs(); 
  71.         //過期任務直接執行 
  72.         if (expiration < currentTime + tickMs) { 
  73.             return false
  74.         } else if (expiration < currentTime + interval) { 
  75.             //當前時間輪可以容納該任務 加入時間槽 
  76.             Long virtualId = expiration / tickMs; 
  77.             int index = (int) (virtualId % wheelSize); 
  78.             System.out.println("tickMs:" + tickMs + "------index:" + index + "------expiration:" + expiration); 
  79.             TimerTaskList timerTaskList = timerTaskLists[index]; 
  80.             timerTaskList.addTask(timerTask); 
  81.             if (timerTaskList.setExpiration(virtualId * tickMs)) { 
  82.                 //添加到delayQueue中 
  83.                 delayQueue.offer(timerTaskList); 
  84.             } 
  85.         } else { 
  86.             //放到上一層的時間輪 
  87.             TimeWheel timeWheel = getOverflowWheel(); 
  88.             timeWheel.addTask(timerTask); 
  89.         } 
  90.         return true
  91.     } 
  92.  
  93.     /** 
  94.      * 推進時間 
  95.      */ 
  96.     public void advanceClock(long timestamp) { 
  97.         if (timestamp >= currentTime + tickMs) { 
  98.             currentTime = timestamp - (timestamp % tickMs); 
  99.             if (overflowWheel != null) { 
  100.                 //推進上層時間輪時間 
  101.                 System.out.println("推進上層時間輪時間 time="+System.currentTimeMillis()); 
  102.                 this.getOverflowWheel().advanceClock(timestamp); 
  103.             } 
  104.         } 
  105.     } 

我們來模擬一個請求,超時和不超時的情況

首先定義一個Mythread 類,用于設置任務超時的值。

  1. public class MyThread implements Runnable{ 
  2.     CompletableFuture<String> cf; 
  3.     public MyThread(CompletableFuture<String>  cf){ 
  4.         this.cf = cf; 
  5.     } 
  6.     public void run(){ 
  7.         if (!cf.isDone()) { 
  8.             cf.complete("超時"); 
  9.         } 
  10.     } 

模擬超時

  1. public static void main(String[] args) throws Exception{ 
  2.         Timer timer = new Timer(); 
  3.         CompletableFuture<String> base =CompletableFuture.supplyAsync(()->{ 
  4.             try { 
  5.                 Thread.sleep(3000); 
  6.             } catch (InterruptedException e) { 
  7.                 e.printStackTrace(); 
  8.             } 
  9.             return  "正常返回"
  10.         }); 
  11.         TimerTask timerTask2 = new TimerTask(1000, new MyThread(base)); 
  12.         timer.addTask(timerTask2); 
  13.         System.out.println("base.get==="+base.get()); 
  14.     } 

模擬正常返回

  1. public static void main(String[] args) throws Exception{ 
  2.         Timer timer = new Timer(); 
  3.         CompletableFuture<String> base =CompletableFuture.supplyAsync(()->{ 
  4.             try { 
  5.                 Thread.sleep(300); 
  6.             } catch (InterruptedException e) { 
  7.                 e.printStackTrace(); 
  8.             } 
  9.             return  "正常返回"
  10.         }); 
  11.         TimerTask timerTask2 = new TimerTask(2000, new MyThread(base)); 
  12.         timer.addTask(timerTask2); 
  13.         System.out.println("base.get==="+base.get()); 
  14.     } 

本文轉載自微信公眾號「小汪哥寫代碼」,可以通過以下二維碼關注。轉載本文請聯系小汪哥寫代碼公眾號。

 

責任編輯:武曉燕 來源: 小汪哥寫代碼
相關推薦

2023-09-27 09:04:50

2023-07-06 13:56:14

微軟Skype

2022-04-13 18:01:39

CSS組件技巧

2020-09-08 06:54:29

Java Gradle語言

2021-01-28 22:31:33

分組密碼算法

2020-05-22 08:16:07

PONGPONXG-PON

2023-09-22 17:36:37

2018-06-07 13:17:12

契約測試單元測試API測試

2025-02-18 00:00:05

vue后端權限

2023-09-27 16:39:38

2024-10-28 21:02:36

消息框應用程序

2021-12-06 09:43:01

鏈表節點函數

2021-03-01 18:37:15

MySQL存儲數據

2021-07-16 11:48:26

模型 .NET微軟

2023-09-20 23:01:03

Twitter算法

2024-09-12 10:06:21

2024-03-11 07:46:40

React優先級隊列二叉堆

2022-07-06 14:16:19

Python數據函數

2021-02-06 08:34:49

函數memoize文檔

2021-01-29 08:32:21

數據結構數組
點贊
收藏

51CTO技術棧公眾號

亚洲天堂狠狠干| 欧美一区二区三区思思人| 亚洲精品在线视频| 永久免费在线看片视频| 黄色污污网站在线观看| av毛片精品| 久久精品国产网站| 亚洲人成电影网站色| 精品少妇一区二区三区在线| 国产精品无码白浆高潮| 精品国产一区二区三区久久久樱花 | 国产手机av在线| 国产一区二区电影在线观看| 亚洲成人精品一区| 91在线播放视频| 亚洲综合视频网站| 国产激情欧美| 国产精品视频在线看| 日韩美女主播视频| 瑟瑟视频在线观看| 暖暖成人免费视频| 久久久99精品久久| 国产精品白丝jk喷水视频一区 | 日韩精品三级| 成人免费一区二区三区在线观看| 国产精品香蕉国产| 色偷偷男人天堂| 国产精品69xx| 一区二区在线影院| 欧美精品乱码久久久久久| 亚洲v国产v在线观看| 最近中文字幕av| 欧美一级淫片| 欧美酷刑日本凌虐凌虐| 免费毛片小视频| 青青草免费在线视频| 久久久蜜桃一区二区人| 亚洲一级一级97网| 午夜激情av在线| 污污网站在线看| 99精品偷自拍| 国产精品久久久久久av下载红粉 | 国产精品第十页| 精品国产91洋老外米糕| 妞干网在线观看视频| 亚洲 国产 欧美 日韩| 久久久综合网| 久久精品精品电影网| 制服.丝袜.亚洲.中文.综合懂| 精精国产xxxx视频在线中文版| zzijzzij亚洲日本少妇熟睡| 国产a∨精品一区二区三区不卡| 久久高清免费视频| 精品国精品国产自在久国产应用| 日韩电影中文字幕一区| 亚洲精品自拍网| 97人人做人人爽香蕉精品| 色哟哟精品一区| 国产成人三级视频| 免费在线观看av片| 99国产精品久久久久久久久久 | 欧洲成人午夜免费大片| 日韩不卡av在线| 91国内精品白嫩初高生| 91久久精品午夜一区二区| 少妇高潮大叫好爽喷水| 国产区在线观看| 久久久精品国产99久久精品芒果| 久久久久综合一区二区三区| 136福利视频导航| 国产欧美亚洲一区| 久色乳综合思思在线视频| 欧美图片一区二区| 一区二区三区视频免费视频观看网站 | 伊人伊成久久人综合网小说 | 成人精品小蝌蚪| 国产精品高潮视频| 伊人成人在线观看| 国产日韩一区二区三区在线播放| 欧美亚洲另类视频| 久久久久久久久艹| 日韩三级在线| 亚洲人成在线电影| 日本在线观看网址| 欧美视频福利| www.日韩免费| 日韩av在线天堂网| 国产一区二区不卡视频在线观看| 国产高潮流白浆| 国产精品超碰| 91精品国产麻豆| 色综合久久久无码中文字幕波多| 国产精品18hdxxxⅹ在线| 亚洲女人被黑人巨大进入| 欧美一级大片免费看| 欧美wwwwww| 精品久久五月天| 天天做天天干天天操| 欧美日韩精品一区二区三区视频| 午夜精品久久久久久久久久久| 中文字幕99| 999国产在线视频| 久久综合久久99| 久久www免费人成精品| 国产在线视频网站| 久久精品无码一区二区三区| 中文字幕乱码一区二区三区| 黑人玩欧美人三根一起进| 一本一道综合狠狠老| 午夜免费福利网站| 国产精品久久久久久久久久辛辛 | 一区二区自拍偷拍| 成人av一区二区三区| 亚洲精品在线视频观看| 国产视频福利在线| 久久久精品欧美丰满| 日韩精品福利片午夜免费观看| 国产区在线观看| 欧美性高跟鞋xxxxhd| 欧美二区在线视频| 99er精品视频| 欧美一区二区人人喊爽| 91国模少妇一区二区三区| 欧美日韩国产成人精品| 国产精品三级美女白浆呻吟| 中文字幕在线观看你懂的| 99久久伊人精品| 波多野结衣与黑人| 91九色porn在线资源| 婷婷开心久久网| 情侣黄网站免费看| 电影在线观看一区二区| 日韩av中文字幕在线播放| av激情在线观看| 麻豆91在线播放免费| 久久涩涩网站| 九色porny丨国产首页在线| 狠狠色狠狠色综合日日小说| 一本色道无码道dvd在线观看| 日本成人片在线| 6080日韩午夜伦伦午夜伦| 免费啪视频在线观看| 色老板在线视频一区二区| 亚洲丝袜一区在线| 欧产日产国产69| 极品美女销魂一区二区三区| 成人欧美一区二区三区在线观看 | 中文字幕亚洲无线码在线一区| 日本免费网站视频| 日本欧美大码aⅴ在线播放| 成人有码视频在线播放| 后入内射欧美99二区视频| 久久综合久久久久88| 美女日批免费视频| 久久精品黄色| 日韩av网址在线| 日韩特黄一级片| 久久国产精品99久久久久久老狼 | 中文字幕乱码人妻综合二区三区 | 97超碰免费观看| 精品国产欧美| 日韩av网址在线观看| 国产做受高潮漫动| 久久影院午夜片一区| av天堂永久资源网| 欧美限制电影| 成人写真福利网| av观看在线| 在线免费精品视频| 亚洲午夜精品在线观看| 国产伦精品一区二区三区千人斩 | 中文字幕激情视频| 亚洲欧美一区二区三区国产精品| 日韩国产一级片| 激情小说一区| 久久精品国产一区| www.五月婷婷| 国产精品免费网站在线观看| 国产乱子伦农村叉叉叉| 综合国产视频| 韩国三级日本三级少妇99| 亚洲一区二区精品视频| 精品久久久久久国产91| 中文字幕天堂网| 999久久久91| 国产91成人在在线播放| 国产香蕉视频在线看| 欧美日韩国产精品成人| 国产精品无码一区二区三区| 丝袜美腿高跟呻吟高潮一区| 亚洲三区视频| 国产精品亚洲一区二区三区在线观看| 在线看欧美日韩| 亚洲精品视频91| 亚洲色图在线看| 一本色道久久hezyo无码| 麻豆久久婷婷| 少妇久久久久久被弄到高潮| 中文字幕亚洲影视| 97久久天天综合色天天综合色hd| 日本不卡1234视频| 久久影视电视剧免费网站| 天堂91在线| 精品久久久视频| 亚洲精品久久久久久国| 另类专区欧美蜜桃臀第一页| 久草视频国产在线| 国产精品传媒| 成人女保姆的销魂服务| 免费看av不卡| 国产小视频91| 秋霞视频一区二区| 五月天欧美精品| 亚洲综合图片一区| 久久麻豆一区二区| av在线播放网址| 国产精品久久国产愉拍| 一区二区三区日韩视频| 成人在线视频国产| 国产97免费视| av在线私库| 久久99热精品这里久久精品| 狠狠人妻久久久久久综合麻豆| 欧美午夜精品一区二区蜜桃 | 99精品黄色片免费大全| 午夜一级免费视频| 全部av―极品视觉盛宴亚洲| 性欧美大战久久久久久久| 亚洲一区二区三区| 亚洲午夜精品久久| 郴州新闻综合频道在线直播| 精品999在线观看| 天堂在线中文网官网| 亚洲欧洲一区二区三区在线观看| 亚洲精品一区二区口爆| 91精品国产欧美一区二区18| 中文字幕日韩国产| 欧美小视频在线| 日产亚洲一区二区三区| 亚洲一区二区三区四区的 | 黄色av网址在线免费观看| 亚洲精品福利在线观看| 国产又大又粗又爽| 亚瑟在线精品视频| 69精品久久久| 日本一区二区三区高清不卡| 日本特黄在线观看| 一本色道精品久久一区二区三区| 水蜜桃亚洲精品| 精品视频亚洲| 亚洲a∨一区二区三区| 欧美一区二区三区高清视频| 日韩av一级大片| 视频精品国内| 超碰97在线人人| 成人一区福利| 国产精品电影在线观看| 欧美色片在线观看| 国产精品免费一区| 99久久99九九99九九九| 91久久国产综合久久蜜月精品| 秋霞一区二区| 国产日韩欧美亚洲一区| 蜜桃精品噜噜噜成人av| 2020国产精品久久精品不卡| 欧美日本三级| 国内视频一区| 日本精品黄色| 99久re热视频精品98| 亚洲调教视频在线观看| 欧美色图色综合| 日韩电影在线免费观看| 日本中文字幕亚洲| 六月天综合网| 91高清国产视频| 久久天堂成人| 亚洲美女爱爱视频| 国产成人一区在线| 在线观看免费的av| 国产精品1区二区.| 三级黄色片网站| 成人性生交大片免费看视频在线| 91亚洲免费视频| 国产精品自拍三区| 国产探花在线看| 波波电影院一区二区三区| 亚洲精品国产91| 99re6这里只有精品视频在线观看| 在线不卡av电影| 亚洲老司机在线| 免费三级在线观看| 精品欧美激情精品一区| 中文字字幕在线中文乱码| 精品播放一区二区| 91网页在线观看| 久久久久久久久久久久久久久久久久av| 日本电影全部在线观看网站视频| 欧美日韩福利视频| 深夜视频一区二区| 国产伦理久久久| 国产精品黄网站| 亚洲精品9999| 国产精品嫩草99av在线| 一级做a爱视频| 国产视频一区二区在线观看| 亚洲永久无码7777kkk| 亚洲欧美综合另类在线卡通| 青青青国产在线| 欧美日韩国产高清视频| 久草免费在线| 2019中文在线观看| 97精品资源在线观看| 欧美国产一区二区在线| 欧美日韩一区二区三区四区在线观看 | 日韩av片电影专区| 国偷自产视频一区二区久| av中文字幕av| 麻豆精品国产传媒mv男同| 中文字幕免费看| 精品日韩中文字幕| 亚洲精品久久久久avwww潮水| 少妇久久久久久| 色视频在线免费观看| 日韩在线视频网站| 成人爱爱网址| 精品视频在线观看| 亚洲高清av| 午夜精品久久久内射近拍高清 | 成人国内精品久久久久一区| 你微笑时很美电视剧整集高清不卡| 免费人成在线观看视频播放| 国产一区二区三区黄视频| 欧美一级片在线免费观看| 国产精品理论在线观看| 波多野结衣高清视频| 欧美老女人在线| 天堂资源在线中文| 国产日本欧美在线观看| 亚洲天堂av资源在线观看| 黄色网zhan| 国产一区二区三区免费看| 顶级黑人搡bbw搡bbbb搡| 欧美日韩1区2区| 日本在线观看视频| 成人激情视频在线| 亚洲五月综合| 中文字幕永久免费| 亚洲最新在线观看| 91porny九色| 国产亚洲精品一区二区| 精品网站在线| 神马欧美一区二区| 美女一区二区视频| 亚洲婷婷在线观看| 亚洲va国产天堂va久久en| 三级网站在线看| 久久精品影视伊人网| www.久久久久爱免| 伊人网在线免费| 99热这里都是精品| 日本a级c片免费看三区| 中文精品99久久国产香蕉| 亚洲精品伊人| 精品久久久无码人妻字幂| av激情综合网| 无码日韩精品一区二区| 色婷婷av一区二区三区久久| 成人在线视频区| 久久久久久久久久久99| 久久久久久**毛片大全| 一级特黄aaa大片| 欧美华人在线视频| 北岛玲精品视频在线观看| 路边理发店露脸熟妇泻火| 不卡的av中国片| 老熟妇一区二区三区啪啪| 久久中文字幕在线| 欧美成人午夜77777| 日本va中文字幕| 久久亚洲综合色一区二区三区| 中文在线字幕免费观| 九九热这里只有精品6| 色综合久久中文| 911福利视频| 亚洲1区2区3区视频| 国产日本在线| 99久久综合狠狠综合久久止| 久久亚洲风情| 久草网站在线观看| 91麻豆精品91久久久久同性| 成年人视频免费在线播放| 欧美午夜精品久久久久免费视| 精品一区二区三区免费视频| 日本熟妇色xxxxx日本免费看| 中文字幕久久久av一区| 国偷自产av一区二区三区| 日本在线播放一区二区| 黑人欧美xxxx| 先锋成人av| 视频一区国产精品|