精品欧美一区二区三区在线观看 _久久久久国色av免费观看性色_国产精品久久在线观看_亚洲第一综合网站_91精品又粗又猛又爽_小泽玛利亚一区二区免费_91亚洲精品国偷拍自产在线观看 _久久精品视频在线播放_美女精品久久久_欧美日韩国产成人在线

定時任務(wù)實現(xiàn)原理詳解

開發(fā) 前端
所謂最小堆方案,正如我們上面所說的,每當(dāng)有新任務(wù)加入的時候,會把需要即將要執(zhí)行的任務(wù)排到前面,同時會有一個線程不斷的輪詢判斷,如果當(dāng)前某個任務(wù)已經(jīng)到達(dá)執(zhí)行時間點,就會立即執(zhí)行,具體實現(xiàn)代表就是 JDK 中的 Timer 定時器!

一、摘要

在很多業(yè)務(wù)的系統(tǒng)中,我們常常需要定時的執(zhí)行一些任務(wù),例如定時發(fā)短信、定時變更數(shù)據(jù)、定時發(fā)起促銷活動等等。

本文會重點分析下單機的定時任務(wù)實現(xiàn)原理以及優(yōu)缺點,分布式框架的實現(xiàn)原理會在后續(xù)文章中進(jìn)行分析。

從單機角度,定時任務(wù)實現(xiàn)主要有以下 3 種方案:

  • while + sleep 組合
  • 最小堆實現(xiàn)
  • 時間輪實現(xiàn)

二、while+sleep組合

while+sleep 方案,簡單的說,就是定義一個線程,然后 while 循環(huán),通過 sleep 延遲時間來達(dá)到周期性調(diào)度任務(wù)。

簡單示例如下:

public static void main(String[] args) {
    final long timeInterval = 5000;
    new Thread(new Runnable() {
        @Override
        public void run() {
            while (true) {
                System.out.println(Thread.currentThread().getName() + "每隔5秒執(zhí)行一次");
                try {
                    Thread.sleep(timeInterval);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }
    }).start();
}

實現(xiàn)上非常簡單,如果我們想在創(chuàng)建一個每隔3秒鐘執(zhí)行一次任務(wù),怎么辦呢?

同樣的,也可以在創(chuàng)建一個線程,然后間隔性的調(diào)度方法;但是如果創(chuàng)建了大量這種類型的線程,這個時候會發(fā)現(xiàn)大量的定時任務(wù)線程在調(diào)度切換時性能消耗會非常大,而且整體效率低!

面對這種在情況,大佬們也想到了,于是想出了用一個線程將所有的定時任務(wù)存起來,事先排好序,按照一定的規(guī)則來調(diào)度,這樣不就可以極大的減少每個線程的切換消耗嗎?

正因此,JDK 中的 Timer 定時器由此誕生了!

三、最小堆實現(xiàn)

所謂最小堆方案,正如我們上面所說的,每當(dāng)有新任務(wù)加入的時候,會把需要即將要執(zhí)行的任務(wù)排到前面,同時會有一個線程不斷的輪詢判斷,如果當(dāng)前某個任務(wù)已經(jīng)到達(dá)執(zhí)行時間點,就會立即執(zhí)行,具體實現(xiàn)代表就是 JDK 中的 Timer 定時器!

3.1、Timer

首先我們來一個簡單的 Timer 定時器例子

public static void main(String[] args) {
    Timer timer = new Timer();
    //每隔1秒調(diào)用一次
    timer.schedule(new TimerTask() {
        @Override
        public void run() {
            System.out.println("test1");
        }
    }, 1000, 1000);
    //每隔3秒調(diào)用一次
    timer.schedule(new TimerTask() {
        @Override
        public void run() {
            System.out.println("test2");
        }
    }, 3000, 3000);

}

實現(xiàn)上,好像跟我們上面介紹的 while+sleep 方案差不多,同樣也是起一個TimerTask線程任務(wù),只不過共用一個Timer調(diào)度器。

下面我們一起來打開源碼看看里面到底有些啥!

  • 進(jìn)入Timer.schedule()方法

從方法上可以看出,這里主要做參數(shù)驗證,其中TimerTask是一個線程任務(wù),delay表示延遲多久執(zhí)行(單位毫秒),period表示多久執(zhí)行一次(單位毫秒)

public void schedule(TimerTask task, long delay, long period) {
    if (delay < 0)
        throw new IllegalArgumentException("Negative delay.");
    if (period <= 0)
        throw new IllegalArgumentException("Non-positive period.");
    sched(task, System.currentTimeMillis()+delay, -period);
}
  • 接著看sched()方法

這步操作中,可以很清晰的看到,在同步代碼塊里,會將task對象加入到queue

private void sched(TimerTask task, long time, long period) {
    if (time < 0)
        throw new IllegalArgumentException("Illegal execution time.");

    // Constrain value of period sufficiently to prevent numeric
    // overflow while still being effectively infinitely large.
    if (Math.abs(period) > (Long.MAX_VALUE >> 1))
        period >>= 1;

    synchronized(queue) {
        if (!thread.newTasksMayBeScheduled)
            throw new IllegalStateException("Timer already cancelled.");

        synchronized(task.lock) {
            if (task.state != TimerTask.VIRGIN)
                throw new IllegalStateException(
                    "Task already scheduled or cancelled");
            task.nextExecutionTime = time;
            task.period = period;
            task.state = TimerTask.SCHEDULED;
        }

        queue.add(task);
        if (queue.getMin() == task)
            queue.notify();
    }
}
  • 我們繼續(xù)來看queue對象

任務(wù)會將入到TaskQueue隊列中,同時在Timer初始化階段會將TaskQueue作為參數(shù)傳入到TimerThread線程中,并且起到線程

public class Timer {
    
    private final TaskQueue queue = new TaskQueue();

    private final TimerThread thread = new TimerThread(queue);

    public Timer() {
        this("Timer-" + serialNumber());
    }

    public Timer(String name) {
        thread.setName(name);
        thread.start();
    }

    //...
}
  • 而TaskQueue其實是一個最小堆的數(shù)據(jù)實體類,源碼如下

每當(dāng)有新元素加入的時候,會對原來的數(shù)組進(jìn)行重排,會將即將要執(zhí)行的任務(wù)排在數(shù)組的前面

class TaskQueue {
    
    private TimerTask[] queue = new TimerTask[128];


    private int size = 0;

    void add(TimerTask task) {
        // Grow backing store if necessary
        if (size + 1 == queue.length)
            queue = Arrays.copyOf(queue, 2*queue.length);

        queue[++size] = task;
        fixUp(size);
    }

    private void fixUp(int k) {
        while (k > 1) {
            int j = k >> 1;
            if (queue[j].nextExecutionTime <= queue[k].nextExecutionTime)
                break;
            TimerTask tmp = queue[j];
            queue[j] = queue[k];
            queue[k] = tmp;
            k = j;
        }
    }
    
    //....
}
  • 最后我們來看看TimerThread

TimerThread其實就是一個任務(wù)調(diào)度線程,首先從TaskQueue里面獲取排在最前面的任務(wù),然后判斷它是否到達(dá)任務(wù)執(zhí)行時間點,如果已到達(dá),就會立刻執(zhí)行任務(wù)

class TimerThread extends Thread {

    boolean newTasksMayBeScheduled = true;

    private TaskQueue queue;

    TimerThread(TaskQueue queue) {
        this.queue = queue;
    }

    public void run() {
        try {
            mainLoop();
        } finally {
            // Someone killed this Thread, behave as if Timer cancelled
            synchronized(queue) {
                newTasksMayBeScheduled = false;
                queue.clear();  // Eliminate obsolete references
            }
        }
    }

    /**
     * The main timer loop.  (See class comment.)
     */
    private void mainLoop() {
        while (true) {
            try {
                TimerTask task;
                boolean taskFired;
                synchronized(queue) {
                    // Wait for queue to become non-empty
                    while (queue.isEmpty() && newTasksMayBeScheduled)
                        queue.wait();
                    if (queue.isEmpty())
                        break; // Queue is empty and will forever remain; die

                    // Queue nonempty; look at first evt and do the right thing
                    long currentTime, executionTime;
                    task = queue.getMin();
                    synchronized(task.lock) {
                        if (task.state == TimerTask.CANCELLED) {
                            queue.removeMin();
                            continue;  // No action required, poll queue again
                        }
                        currentTime = System.currentTimeMillis();
                        executionTime = task.nextExecutionTime;
                        if (taskFired = (executionTime<=currentTime)) {
                            if (task.period == 0) { // Non-repeating, remove
                                queue.removeMin();
                                task.state = TimerTask.EXECUTED;
                            } else { // Repeating task, reschedule
                                queue.rescheduleMin(
                                  task.period<0 ? currentTime   - task.period
                                                : executionTime + task.period);
                            }
                        }
                    }
                    if (!taskFired) // Task hasn't yet fired; wait
                        queue.wait(executionTime - currentTime);
                }
                if (taskFired)  // Task fired; run it, holding no locks
                    task.run();
            } catch(InterruptedException e) {
            }
        }
    }
}

總結(jié)這個利用最小堆實現(xiàn)的方案,相比 while + sleep 方案,多了一個線程來管理所有的任務(wù),優(yōu)點就是減少了線程之間的性能開銷,提升了執(zhí)行效率;但是同樣也帶來的了一些缺點,整體的新加任務(wù)寫入效率變成了 O(log(n))。

同時,細(xì)心的發(fā)現(xiàn),這個方案還有以下幾個缺點:

  • 串行阻塞:調(diào)度線程只有一個,長任務(wù)會阻塞短任務(wù)的執(zhí)行,例如,A任務(wù)跑了一分鐘,B任務(wù)至少需要等1分鐘才能跑
  • 容錯能力差:沒有異常處理能力,一旦一個任務(wù)執(zhí)行故障,后續(xù)任務(wù)都無法執(zhí)行

3.2、ScheduledThreadPoolExecutor

鑒于 Timer 的上述缺陷,從 Java 5 開始,推出了基于線程池設(shè)計的 ScheduledThreadPoolExecutor 。

圖片圖片

其設(shè)計思想是,每一個被調(diào)度的任務(wù)都會由線程池來管理執(zhí)行,因此任務(wù)是并發(fā)執(zhí)行的,相互之間不會受到干擾。需要注意的是,只有當(dāng)任務(wù)的執(zhí)行時間到來時,ScheduledThreadPoolExecutor 才會真正啟動一個線程,其余時間 ScheduledThreadPoolExecutor 都是在輪詢?nèi)蝿?wù)的狀態(tài)。

簡單的使用示例:

public static void main(String[] args) {
    ScheduledThreadPoolExecutor executor = new ScheduledThreadPoolExecutor(3);
    //啟動1秒之后,每隔1秒執(zhí)行一次
    executor.scheduleAtFixedRate((new Runnable() {
        @Override
        public void run() {
            System.out.println("test3");
        }
    }),1,1, TimeUnit.SECONDS);
    //啟動1秒之后,每隔3秒執(zhí)行一次
    executor.scheduleAtFixedRate((new Runnable() {
        @Override
        public void run() {
            System.out.println("test4");
        }
    }),1,3, TimeUnit.SECONDS);
}

同樣的,我們首先打開源碼,看看里面到底做了啥

  • 進(jìn)入scheduleAtFixedRate()方法

首先是校驗基本參數(shù),然后將任務(wù)作為封裝到ScheduledFutureTask線程中,ScheduledFutureTask繼承自RunnableScheduledFuture,并作為參數(shù)調(diào)用delayedExecute()方法進(jìn)行預(yù)處理

public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,
                                              long initialDelay,
                                              long period,
                                              TimeUnit unit) {
    if (command == null || unit == null)
        throw new NullPointerException();
    if (period <= 0)
        throw new IllegalArgumentException();
    ScheduledFutureTask<Void> sft =
        new ScheduledFutureTask<Void>(command,
                                      null,
                                      triggerTime(initialDelay, unit),
                                      unit.toNanos(period));
    RunnableScheduledFuture<Void> t = decorateTask(command, sft);
    sft.outerTask = t;
    delayedExecute(t);
    return t;
}
  • 繼續(xù)看delayedExecute()方法

可以很清晰的看到,當(dāng)線程池沒有關(guān)閉的時候,會通過super.getQueue().add(task)操作,將任務(wù)加入到隊列,同時調(diào)用ensurePrestart()方法做預(yù)處理

private void delayedExecute(RunnableScheduledFuture<?> task) {
    if (isShutdown())
        reject(task);
    else {
        super.getQueue().add(task);
        if (isShutdown() &&
            !canRunInCurrentRunState(task.isPeriodic()) &&
            remove(task))
            task.cancel(false);
        else
            //預(yù)處理
            ensurePrestart();
    }
}

其中super.getQueue()得到的是一個自定義的new DelayedWorkQueue()阻塞隊列,數(shù)據(jù)存儲方面也是一個最小堆結(jié)構(gòu)的隊列,這一點在初始化new ScheduledThreadPoolExecutor()的時候,可以看出!

public ScheduledThreadPoolExecutor(int corePoolSize) {
    super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
          new DelayedWorkQueue());
}

打開源碼可以看到,DelayedWorkQueue其實是ScheduledThreadPoolExecutor中的一個靜態(tài)內(nèi)部類,在添加的時候,會將任務(wù)加入到RunnableScheduledFuture數(shù)組中,同時線程池中的Woker線程會通過調(diào)用任務(wù)隊列中的take()方法獲取對應(yīng)的ScheduledFutureTask線程任務(wù),接著執(zhí)行對應(yīng)的任務(wù)線程

static class DelayedWorkQueue extends AbstractQueue<Runnable>
        implements BlockingQueue<Runnable> {

    private static final int INITIAL_CAPACITY = 16;
    private RunnableScheduledFuture<?>[] queue =
        new RunnableScheduledFuture<?>[INITIAL_CAPACITY];
    private final ReentrantLock lock = new ReentrantLock();
    private int size = 0;   

    //....

    public boolean add(Runnable e) {
        return offer(e);
    }

    public boolean offer(Runnable x) {
        if (x == null)
            throw new NullPointerException();
        RunnableScheduledFuture<?> e = (RunnableScheduledFuture<?>)x;
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            int i = size;
            if (i >= queue.length)
                grow();
            size = i + 1;
            if (i == 0) {
                queue[0] = e;
                setIndex(e, 0);
            } else {
                siftUp(i, e);
            }
            if (queue[0] == e) {
                leader = null;
                available.signal();
            }
        } finally {
            lock.unlock();
        }
        return true;
    }

    public RunnableScheduledFuture<?> take() throws InterruptedException {
        final ReentrantLock lock = this.lock;
        lock.lockInterruptibly();
        try {
            for (;;) {
                RunnableScheduledFuture<?> first = queue[0];
                if (first == null)
                    available.await();
                else {
                    long delay = first.getDelay(NANOSECONDS);
                    if (delay <= 0)
                        return finishPoll(first);
                    first = null; // don't retain ref while waiting
                    if (leader != null)
                        available.await();
                    else {
                        Thread thisThread = Thread.currentThread();
                        leader = thisThread;
                        try {
                            available.awaitNanos(delay);
                        } finally {
                            if (leader == thisThread)
                                leader = null;
                        }
                    }
                }
            }
        } finally {
            if (leader == null && queue[0] != null)
                available.signal();
            lock.unlock();
        }
    }
}
  • 回到我們最開始說到的ScheduledFutureTask任務(wù)線程類,最終執(zhí)行任務(wù)的其實就是它

ScheduledFutureTask任務(wù)線程,才是真正執(zhí)行任務(wù)的線程類,只是繞了一圈,做了很多包裝,run()方法就是真正執(zhí)行定時任務(wù)的方法。

private class ScheduledFutureTask<V>
            extends FutureTask<V> implements RunnableScheduledFuture<V> {

    /** Sequence number to break ties FIFO */
    private final long sequenceNumber;

    /** The time the task is enabled to execute in nanoTime units */
    private long time;

    /**
     * Period in nanoseconds for repeating tasks.  A positive
     * value indicates fixed-rate execution.  A negative value
     * indicates fixed-delay execution.  A value of 0 indicates a
     * non-repeating task.
     */
    private final long period;

    /** The actual task to be re-enqueued by reExecutePeriodic */
    RunnableScheduledFuture<V> outerTask = this;

    /**
     * Overrides FutureTask version so as to reset/requeue if periodic.
     */
    public void run() {
        boolean periodic = isPeriodic();
        if (!canRunInCurrentRunState(periodic))
            cancel(false);
        else if (!periodic)
            ScheduledFutureTask.super.run();
        else if (ScheduledFutureTask.super.runAndReset()) {
            setNextRunTime();
            reExecutePeriodic(outerTask);
        }
    }
    
    //...
}

3.3、小結(jié)

ScheduledExecutorService 相比 Timer 定時器,完美的解決上面說到的 Timer 存在的兩個缺點!

在單體應(yīng)用里面,使用 ScheduledExecutorService 可以解決大部分需要使用定時任務(wù)的業(yè)務(wù)需求!

但是這是否意味著它是最佳的解決方案呢?

我們發(fā)現(xiàn)線程池中 ScheduledExecutorService 的排序容器跟 Timer 一樣,都是采用最小堆的存儲結(jié)構(gòu),新任務(wù)加入排序效率是O(log(n)),執(zhí)行取任務(wù)是O(1)。

這里的寫入排序效率其實是有空間可提升的,有可能優(yōu)化到O(1)的時間復(fù)雜度,也就是我們下面要介紹的時間輪實現(xiàn)!

四、時間輪實現(xiàn)

所謂時間輪(RingBuffer)實現(xiàn),從數(shù)據(jù)結(jié)構(gòu)上看,簡單的說就是循環(huán)隊列,從名稱上看可能感覺很抽象。

它其實就是一個環(huán)形的數(shù)組,如圖所示,假設(shè)我們創(chuàng)建了一個長度為 8 的時間輪。

插入、取值流程:

1.當(dāng)我們需要新建一個 1s 延時任務(wù)的時候,則只需要將它放到下標(biāo)為 1 的那個槽中,2、3、...、7也同樣如此。

2.而如果是新建一個 10s 的延時任務(wù),則需要將它放到下標(biāo)為 2 的槽中,但同時需要記錄它所對應(yīng)的圈數(shù),也就是 1 圈,不然就和 2 秒的延時消息重復(fù)了

3.當(dāng)創(chuàng)建一個 21s 的延時任務(wù)時,它所在的位置就在下標(biāo)為 5 的槽中,同樣的需要為他加上圈數(shù)為 2,依次類推...

因此,總結(jié)起來有兩個核心的變量:

  • 數(shù)組下標(biāo):表示某個任務(wù)延遲時間,從數(shù)據(jù)操作上對執(zhí)行時間點進(jìn)行取余
  • 圈數(shù):表示需要循環(huán)圈數(shù)

通過這張圖可以更直觀的理解!

當(dāng)我們需要取出延時任務(wù)時,只需要每秒往下移動這個指針,然后取出該位置的所有任務(wù)即可,取任務(wù)的時間消耗為O(1)。

當(dāng)我們需要插入任務(wù)式,也只需要計算出對應(yīng)的下表和圈數(shù),即可將任務(wù)插入到對應(yīng)的數(shù)組位置中,插入任務(wù)的時間消耗為O(1)。

如果時間輪的槽比較少,會導(dǎo)致某一個槽上的任務(wù)非常多,那么效率也比較低,這就和 HashMap 的 hash 沖突是一樣的,因此在設(shè)計槽的時候不能太大也不能太小。

4.1、代碼實現(xiàn)

  • 首先創(chuàng)建一個RingBufferWheel時間輪定時任務(wù)管理器
public class RingBufferWheel {

    private Logger logger = LoggerFactory.getLogger(RingBufferWheel.class);


    /**
     * default ring buffer size
     */
    private static final int STATIC_RING_SIZE = 64;

    private Object[] ringBuffer;

    private int bufferSize;

    /**
     * business thread pool
     */
    private ExecutorService executorService;

    private volatile int size = 0;

    /***
     * task stop sign
     */
    private volatile boolean stop = false;

    /**
     * task start sign
     */
    private volatile AtomicBoolean start = new AtomicBoolean(false);

    /**
     * total tick times
     */
    private AtomicInteger tick = new AtomicInteger();

    private Lock lock = new ReentrantLock();
    private Condition condition = lock.newCondition();

    private AtomicInteger taskId = new AtomicInteger();
    private Map<Integer, Task> taskMap = new ConcurrentHashMap<>(16);

    /**
     * Create a new delay task ring buffer by default size
     *
     * @param executorService the business thread pool
     */
    public RingBufferWheel(ExecutorService executorService) {
        this.executorService = executorService;
        this.bufferSize = STATIC_RING_SIZE;
        this.ringBuffer = new Object[bufferSize];
    }


    /**
     * Create a new delay task ring buffer by custom buffer size
     *
     * @param executorService the business thread pool
     * @param bufferSize      custom buffer size
     */
    public RingBufferWheel(ExecutorService executorService, int bufferSize) {
        this(executorService);

        if (!powerOf2(bufferSize)) {
            throw new RuntimeException("bufferSize=[" + bufferSize + "] must be a power of 2");
        }
        this.bufferSize = bufferSize;
        this.ringBuffer = new Object[bufferSize];
    }

    /**
     * Add a task into the ring buffer(thread safe)
     *
     * @param task business task extends {@link Task}
     */
    public int addTask(Task task) {
        int key = task.getKey();
        int id;

        try {
            lock.lock();
            int index = mod(key, bufferSize);
            task.setIndex(index);
            Set<Task> tasks = get(index);

            int cycleNum = cycleNum(key, bufferSize);
            if (tasks != null) {
                task.setCycleNum(cycleNum);
                tasks.add(task);
            } else {
                task.setIndex(index);
                task.setCycleNum(cycleNum);
                Set<Task> sets = new HashSet<>();
                sets.add(task);
                put(key, sets);
            }
            id = taskId.incrementAndGet();
            task.setTaskId(id);
            taskMap.put(id, task);
            size++;
        } finally {
            lock.unlock();
        }

        start();

        return id;
    }


    /**
     * Cancel task by taskId
     * @param id unique id through {@link #addTask(Task)}
     * @return
     */
    public boolean cancel(int id) {

        boolean flag = false;
        Set<Task> tempTask = new HashSet<>();

        try {
            lock.lock();
            Task task = taskMap.get(id);
            if (task == null) {
                return false;
            }

            Set<Task> tasks = get(task.getIndex());
            for (Task tk : tasks) {
                if (tk.getKey() == task.getKey() && tk.getCycleNum() == task.getCycleNum()) {
                    size--;
                    flag = true;
                    taskMap.remove(id);
                } else {
                    tempTask.add(tk);
                }

            }
            //update origin data
            ringBuffer[task.getIndex()] = tempTask;
        } finally {
            lock.unlock();
        }

        return flag;
    }

    /**
     * Thread safe
     *
     * @return the size of ring buffer
     */
    public int taskSize() {
        return size;
    }

    /**
     * Same with method {@link #taskSize}
     * @return
     */
    public int taskMapSize(){
        return taskMap.size();
    }

    /**
     * Start background thread to consumer wheel timer, it will always run until you call method {@link #stop}
     */
    public void start() {
        if (!start.get()) {

            if (start.compareAndSet(start.get(), true)) {
                logger.info("Delay task is starting");
                Thread job = new Thread(new TriggerJob());
                job.setName("consumer RingBuffer thread");
                job.start();
                start.set(true);
            }

        }
    }

    /**
     * Stop consumer ring buffer thread
     *
     * @param force True will force close consumer thread and discard all pending tasks
     *              otherwise the consumer thread waits for all tasks to completes before closing.
     */
    public void stop(boolean force) {
        if (force) {
            logger.info("Delay task is forced stop");
            stop = true;
            executorService.shutdownNow();
        } else {
            logger.info("Delay task is stopping");
            if (taskSize() > 0) {
                try {
                    lock.lock();
                    condition.await();
                    stop = true;
                } catch (InterruptedException e) {
                    logger.error("InterruptedException", e);
                } finally {
                    lock.unlock();
                }
            }
            executorService.shutdown();
        }


    }


    private Set<Task> get(int index) {
        return (Set<Task>) ringBuffer[index];
    }

    private void put(int key, Set<Task> tasks) {
        int index = mod(key, bufferSize);
        ringBuffer[index] = tasks;
    }

    /**
     * Remove and get task list.
     * @param key
     * @return task list
     */
    private Set<Task> remove(int key) {
        Set<Task> tempTask = new HashSet<>();
        Set<Task> result = new HashSet<>();

        Set<Task> tasks = (Set<Task>) ringBuffer[key];
        if (tasks == null) {
            return result;
        }

        for (Task task : tasks) {
            if (task.getCycleNum() == 0) {
                result.add(task);

                size2Notify();
            } else {
                // decrement 1 cycle number and update origin data
                task.setCycleNum(task.getCycleNum() - 1);
                tempTask.add(task);
            }
            // remove task, and free the memory.
            taskMap.remove(task.getTaskId());
        }

        //update origin data
        ringBuffer[key] = tempTask;

        return result;
    }

    private void size2Notify() {
        try {
            lock.lock();
            size--;
            if (size == 0) {
                condition.signal();
            }
        } finally {
            lock.unlock();
        }
    }

    private boolean powerOf2(int target) {
        if (target < 0) {
            return false;
        }
        int value = target & (target - 1);
        if (value != 0) {
            return false;
        }

        return true;
    }

    private int mod(int target, int mod) {
        // equals target % mod
        target = target + tick.get();
        return target & (mod - 1);
    }

    private int cycleNum(int target, int mod) {
        //equals target/mod
        return target >> Integer.bitCount(mod - 1);
    }

    /**
     * An abstract class used to implement business.
     */
    public abstract static class Task extends Thread {

        private int index;

        private int cycleNum;

        private int key;

        /**
         * The unique ID of the task
         */
        private int taskId ;

        @Override
        public void run() {
        }

        public int getKey() {
            return key;
        }

        /**
         *
         * @param key Delay time(seconds)
         */
        public void setKey(int key) {
            this.key = key;
        }

        public int getCycleNum() {
            return cycleNum;
        }

        private void setCycleNum(int cycleNum) {
            this.cycleNum = cycleNum;
        }

        public int getIndex() {
            return index;
        }

        private void setIndex(int index) {
            this.index = index;
        }

        public int getTaskId() {
            return taskId;
        }

        public void setTaskId(int taskId) {
            this.taskId = taskId;
        }
    }


    private class TriggerJob implements Runnable {

        @Override
        public void run() {
            int index = 0;
            while (!stop) {
                try {
                    Set<Task> tasks = remove(index);
                    for (Task task : tasks) {
                        executorService.submit(task);
                    }

                    if (++index > bufferSize - 1) {
                        index = 0;
                    }

                    //Total tick number of records
                    tick.incrementAndGet();
                    TimeUnit.SECONDS.sleep(1);

                } catch (Exception e) {
                    logger.error("Exception", e);
                }

            }

            logger.info("Delay task has stopped");
        }
    }
}
  • 接著,編寫一個客戶端,測試客戶端
public static void main(String[] args) {
    RingBufferWheel ringBufferWheel = new RingBufferWheel( Executors.newFixedThreadPool(2));
    for (int i = 0; i < 3; i++) {
        RingBufferWheel.Task job = new Job();
        job.setKey(i);
        ringBufferWheel.addTask(job);
    }
}

public static class Job extends RingBufferWheel.Task{
    @Override
    public void run() {
        System.out.println("test5");
    }
}

運行結(jié)果:

test5
test5
test5

如果要周期性執(zhí)行任務(wù),可以在任務(wù)執(zhí)行完成之后,再重新加入到時間輪中。

詳細(xì)源碼分析地址:點擊這里獲取

4.2、應(yīng)用

時間輪的應(yīng)用還是非常廣的,例如在 Disruptor 項目中就運用到了 RingBuffer,還有Netty中的HashedWheelTimer工具原理也差不多等等,有興趣的同學(xué),可以閱讀一下官方對應(yīng)的源碼!

五、小結(jié)

本文主要圍繞單體應(yīng)用中的定時任務(wù)原理進(jìn)行分析,可能也有理解不對的地方,歡迎評論區(qū)留言!

六、參考

1、https://www.jianshu.com/p/84d9db1b1def

2、https://crossoverjie.top/2019/09/27/algorithm

責(zé)任編輯:武曉燕 來源: 潘志的研發(fā)筆記
相關(guān)推薦

2020-12-21 07:31:23

實現(xiàn)單機JDK

2024-05-31 13:07:29

.NET Core定時任務(wù)編程

2021-06-30 07:19:34

SpringBoot定時任務(wù)

2024-09-20 05:49:04

SpringBoot后端

2024-10-15 16:41:35

2024-11-04 16:01:01

2024-07-17 17:54:28

XXLJob分布式任務(wù)Java

2019-12-25 15:10:00

MySQL事件數(shù)據(jù)庫

2023-12-19 08:09:06

Python定時任務(wù)Cron表達(dá)式

2020-12-10 07:00:38

編程線程池定時任務(wù)

2024-05-13 09:49:30

.NETQuartz庫Cron表達(dá)式

2017-08-16 16:41:04

JavaSpringBoot定時任務(wù)

2024-01-22 08:53:00

策略任務(wù)RocketMQ

2021-11-22 12:35:40

Python命令定時任務(wù)

2024-02-26 11:12:33

定時任務(wù)線程

2024-01-31 08:38:57

Python定時任務(wù)函數(shù)

2009-10-28 10:05:29

Ubuntucrontab定時任務(wù)

2012-02-07 13:31:14

SpringJava

2010-03-10 15:47:58

crontab定時任務(wù)

2024-02-28 09:54:07

線程池配置
點贊
收藏

51CTO技術(shù)棧公眾號

亚洲v天堂v手机在线| 色黄网站在线观看| 久久99精品国产.久久久久久| 久久精品一偷一偷国产| 成年女人免费视频| 一区在线影院| 亚洲一区二区精品3399| 欧美二区三区在线| www.激情五月| 日本中文字幕一区二区视频 | 日韩欧美精品中文字幕| 一级二级三级欧美| 天堂v视频永久在线播放| 免费在线看成人av| 91精品91久久久久久| 一级片一级片一级片| 国产剧情在线观看一区| 精品久久久久一区| 向日葵污视频在线观看| 国产在线观看www| 中文字幕亚洲综合久久菠萝蜜| 精品国产乱码久久久久软件 | 尤物国产精品| 欧美日韩激情视频一区二区三区| 国产福利一区二区三区视频| 国产精品免费在线免费| 精品人妻一区二区色欲产成人| 最新国产精品久久久| 亚洲色图欧美制服丝袜另类第一页| 欧美xxxx黑人| www一区二区三区| 一本色道亚洲精品aⅴ| 欧美无砖专区免费| 牛牛精品在线| 亚洲欧美日韩国产成人精品影院 | 国产精品成人99一区无码| 热久久久久久| 欧美偷拍一区二区| 超碰在线97免费| 免费看av不卡| 欧美性极品xxxx娇小| 六月婷婷在线视频| 国产精品蜜臀| 亚洲一区在线视频| av动漫在线免费观看| 99热国产在线| 亚洲理论在线观看| 成人区一区二区| 97超碰资源站在线观看| 亚洲精选视频免费看| 一区二区三区四区欧美日韩| 91在线导航| 国产欧美精品一区二区色综合朱莉| 久久久久高清| 日韩欧美电影在线观看| 337p粉嫩大胆色噜噜噜噜亚洲| 国产一区二区三区免费不卡| 日韩在线观看视频一区二区三区| 成人黄色网址在线观看| 国产伦精品一区二区三区在线| 亚洲乱码在线观看| 成人av资源在线观看| 精品久久久久亚洲| 男女av在线| 亚洲国产成人自拍| 9999在线观看| 手机av在线播放| 无码av中文一区二区三区桃花岛| 久久久久久久久久久99| 天天综合网天天| 欧美日韩你懂得| 国模大尺度视频| 国产无遮挡裸体免费久久| 日韩成人高清在线| 久久久视频6r| 一区二区三区在线电影| 久久青草福利网站| 五月天婷婷导航| 激情综合网最新| av在线不卡观看| 四虎影视2018在线播放alocalhost| 26uuu久久综合| 亚洲高清精品中出| 日韩另类在线| 欧美午夜电影在线| 国产精品久久久久久久av福利| 国产一区二区三区免费在线| 亚洲激情成人网| 精品一区二区在线观看视频| 国内久久视频| 国产精品久久久久国产a级| 91亚洲国产成人久久精品麻豆| 国产a区久久久| 日韩视频专区| 97天天综合网| 欧美日韩精品免费| 37p粉嫩大胆色噜噜噜| 91精品一区二区三区综合| 久久男人的天堂| 在线视频1卡二卡三卡| 粉嫩绯色av一区二区在线观看| 日本在线观看不卡| 91老司机福利在线| 欧美亚洲动漫制服丝袜| 国产精品久久久久久亚洲色| 成人精品影院| 国自在线精品视频| 97国产成人无码精品久久久| 91在线一区二区| 国产精品88久久久久久妇女| 欧洲亚洲两性| 欧美精品一区在线观看| 日韩免费av一区| 免费视频一区| 国产精品毛片一区视频| 老司机av在线免费看| 一本大道久久a久久综合婷婷| 97中文字幕在线观看| 久久亚洲精品中文字幕蜜潮电影| 91国内在线视频| 国产福利资源在线| 国产精品国产三级国产有无不卡| 91猫先生在线| 精品人人人人| 久久91精品国产91久久久| 91久久久久国产一区二区| 久久久不卡影院| 97国产精东麻豆人妻电影| 欧美久久一区二区三区| 精品国产一区二区三区久久狼5月| 亚洲熟妇无码乱子av电影| 成人午夜激情影院| 国产欧美123| 国产精品视频一区视频二区| 这里只有精品在线播放| 免费一级a毛片| 久久日韩精品一区二区五区| 欧美一区二区中文字幕| 懂色av一区二区| 久久久噜噜噜久久| 后进极品白嫩翘臀在线视频| 亚洲综合另类小说| 亚洲妇女无套内射精| 欧美69视频| 国产精品国色综合久久| 美足av综合网| 亚洲国产高潮在线观看| 激情五月色婷婷| 成人福利一区| 男人操女人的视频在线观看欧美| 国产日韩一区二区| 黄色在线观看www| 国产丝袜视频一区| 久久久精品视频网站| 久久精品人人做人人综合| 日韩欧美黄色大片| 欧美视频网址| 国产美女精彩久久| 午夜老司机在线观看| 欧美精品乱码久久久久久按摩| 波多野结衣喷潮| 黄色小说综合网站| 男人天堂成人网| 999久久精品| 2019av中文字幕| 你懂的视频在线播放| 欧洲av一区二区嗯嗯嗯啊| 影音先锋男人看片资源| 狠狠色丁香久久婷婷综合丁香| 宅男一区二区三区| 视频一区在线| 欧美一级bbbbb性bbbb喷潮片| 日本福利片高清在线观看| 欧洲亚洲国产日韩| 一起操在线播放| 成人午夜伦理影院| 国产精品亚洲二区在线观看| 日韩免费一区| 成人黄动漫网站免费| 啊啊啊久久久| 国产亚洲精品久久久久久牛牛| 91精东传媒理伦片在线观看| 亚洲一区二区在线观看视频| 午夜理伦三级做爰电影| 精品中文av资源站在线观看| 国产精品久久久久9999爆乳| 久久av导航| 亚洲曰本av电影| 偷拍自拍在线看| 久久午夜a级毛片| 亚洲色欧美另类| 欧美麻豆精品久久久久久| 91看片在线播放| 国产精品白丝在线| 三级视频网站在线观看| 美洲天堂一区二卡三卡四卡视频| 男人添女荫道口喷水视频| 精品久久久久久久久久久下田| 不卡一区二区三区视频| 欧美成a人片在线观看久| 九九热r在线视频精品| 蜜桃视频在线播放| 欧美成人高清电影在线| 国产精品成人久久久| 亚洲va欧美va人人爽| 乱老熟女一区二区三区| www一区二区| 日韩高清一二三区| 老司机精品视频一区二区三区| 91九色在线观看视频| 亚洲乱码电影| 亚洲精品一卡二卡三卡四卡| 日韩精品丝袜美腿| 69堂成人精品视频免费| 国产激情欧美| 欧美中文字幕精品| 丰满诱人av在线播放| 久久亚洲精品网站| 91在线观看| 亚洲人成网在线播放| 免费观看国产视频| 欧美va亚洲va香蕉在线| 国产一区二区女内射| 91福利社在线观看| 免费的毛片视频| 精品高清一区二区三区| 国产精品第一页在线观看| 亚洲欧美偷拍三级| 久久嫩草捆绑紧缚| 国产精品久久午夜夜伦鲁鲁| 91精品人妻一区二区| 99国产精品久久久久| 国模无码视频一区| 国产成人av在线影院| 久久aaaa片一区二区| 国产综合色产在线精品| 在线观看免费av网址| 日韩av电影一区| 天堂中文视频在线| 日韩二区三区在线观看| 成人在线观看a| 久久综合亚州| www.色就是色| 蜜桃av一区二区在线观看| 三级视频中文字幕| 美腿丝袜亚洲色图| 久久久福利影院| 国产麻豆精品在线| 少妇伦子伦精品无吗| 成人激情小说乱人伦| 亚洲精品在线视频免费观看| 99精品欧美一区二区三区小说| 丰满岳乱妇一区二区| 91视频精品在这里| 亚洲永久精品ww.7491进入| 国产亚洲1区2区3区| 中文字幕欧美激情极品| 国产精品久久综合| 欧美日韩偷拍视频| 亚洲成在线观看| 国产又黄又粗又爽| 欧美日韩免费高清一区色橹橹| 亚洲综合网av| 日韩欧美国产综合| 婷婷久久久久久| 国产亚洲一区二区在线| 欧美jizz18性欧美| 色综合天天狠天天透天天伊人 | 中文字幕一区二区三区在线不卡| 一级性生活免费视频| 亚洲一区精品在线| 麻豆精品久久久久久久99蜜桃| 欧美午夜在线观看| 国产模特av私拍大尺度| 亚洲福利视频网站| 成年人在线观看| 九色精品美女在线| 亚洲天堂手机| 91免费国产网站| 日韩深夜影院| 不卡中文字幕在线| 国产精品人人爽人人做我的可爱 | 国产91对白在线观看九色| 99久久久久久久久久| 18成人在线视频| 天天操天天摸天天干| 欧美日韩精品欧美日韩精品一综合| 亚洲毛片欧洲毛片国产一品色| 一区二区欧美久久| 韩国日本一区| 国产在线精品播放| 西野翔中文久久精品国产| 美女黄色片网站| 三级欧美韩日大片在线看| 亚洲成人福利视频| 国产精品免费看片| 国产情侣在线视频| 欧美一级国产精品| 成人网视频在线观看| 97国产精品久久| 美女国产精品久久久| 日韩av一区二区三区在线| 亚洲天堂偷拍| 亚洲精品在线视频播放| 久久综合给合久久狠狠狠97色69| 日韩成人短视频| 欧美日韩在线一区二区| 五月婷婷六月丁香| 欧美日本在线视频中文字字幕| jizz久久久久久| 欧美日韩一区二区三区在线观看免| 欧美精品国产一区| 欧美性受xxxxxx黑人xyx性爽| 国产三级一区二区三区| 日韩av一二三区| 日韩欧美激情一区| 国产在线观看免费麻豆| 国产精品丝袜白浆摸在线| 亚洲人成精品久久久 | 蓝色福利精品导航| 国产精久久一区二区三区| 精品久久久久久中文字幕一区奶水| 精品区在线观看| 日韩在线播放视频| 久久人人视频| 亚洲精品一区二| 日韩av在线免费观看不卡| 在线免费观看日韩av| 天天操天天色综合| 视频一区 中文字幕| 国模极品一区二区三区| 18国产精品| 乱熟女高潮一区二区在线| 国产一区美女在线| 欧美黑人性猛交xxx| 7777精品伊人久久久大香线蕉完整版| a视频网址在线观看| 国产精品人成电影在线观看| 精品国产91| 国产wwwxx| 国产精品久99| 国产偷拍一区二区| 精品中文字幕在线观看| 欧美黄视频在线观看| 菠萝蜜视频在线观看入口| 成人毛片在线观看| 亚洲黄色三级视频| 亚洲男人av电影| 97久久香蕉国产线看观看| 欧美在线播放一区| 男女性色大片免费观看一区二区 | 少妇毛片一区二区三区| 欧美丝袜一区二区三区| 国产在线视频福利| 国产精品美女av| 亚洲女同中文字幕| 人妻激情偷乱频一区二区三区| 亚洲成人激情综合网| 亚洲欧美色视频| 国产精品com| 欧美高清在线| 麻豆av免费看| 欧美午夜www高清视频| 在线视频自拍| 91在线看www| 亚洲精品美女| 国产aⅴ激情无码久久久无码| 欧美日韩性生活| 尤物视频在线看| 美女黄毛**国产精品啪啪| 日韩成人av影视| 激情综合网五月天| 亚洲男人天堂九九视频| 日韩美女在线| 精品无码国产一区二区三区av| 久久久亚洲国产美女国产盗摄| 中文字幕永久在线观看| 美女av一区二区| 男男gay无套免费视频欧美 | 91香蕉视频mp4| 中文字幕你懂的| 欧美激情久久久| gogogo高清在线观看一区二区| 波多野结衣中文字幕在线播放| 精品久久久视频| 成人福利在线观看视频| 久久国产精品99久久久久久丝袜| 蜜臀av性久久久久蜜臀aⅴ| 欧美色图亚洲天堂| 亚洲男人天堂视频| 一区二区在线视频观看| 一区二区在线播放视频| 亚洲一区国产视频| 日本在线观看网站| 免费一区二区三区在在线视频| 国内精品免费**视频| 高清乱码免费看污| 欧美激情一级精品国产| 日韩av久操| 美女洗澡无遮挡|