精品欧美一区二区三区在线观看 _久久久久国色av免费观看性色_国产精品久久在线观看_亚洲第一综合网站_91精品又粗又猛又爽_小泽玛利亚一区二区免费_91亚洲精品国偷拍自产在线观看 _久久精品视频在线播放_美女精品久久久_欧美日韩国产成人在线

1.6萬字+28張圖盤點11種延遲任務(wù)的實現(xiàn)方式

數(shù)據(jù)庫 Redis
Redisson他是Redis的兒子(Redis son),基于Redis實現(xiàn)了非常多的功能,其中最常使用的就是Redis分布式鎖的實現(xiàn),但是除了實現(xiàn)Redis分布式鎖之外,它還實現(xiàn)了延遲隊列的功能。

大家好,我是三友~~

延遲任務(wù)在我們?nèi)粘I钪斜容^常見,比如訂單支付超時取消訂單功能,又比如自動確定收貨的功能等等。

所以本篇文章就來從實現(xiàn)到原理來盤點延遲任務(wù)的11種實現(xiàn)方式,這些方式并沒有絕對的好壞之分,只是適用場景的不大相同。

圖片圖片

DelayQueue

DelayQueue是JDK提供的api,是一個延遲隊列。

圖片圖片

DelayQueue泛型參數(shù)得實現(xiàn)Delayed接口,Delayed繼承了Comparable接口。

圖片圖片

getDelay方法返回這個任務(wù)還剩多久時間可以執(zhí)行,小于0的時候說明可以這個延遲任務(wù)到了執(zhí)行的時間了。

compareTo這個是對任務(wù)排序的,保證最先到延遲時間的任務(wù)排到隊列的頭。

來個demo

@Getter
public class SanYouTask implements Delayed {

    private final String taskContent;

    private final Long triggerTime;

    public SanYouTask(String taskContent, Long delayTime) {
        this.taskContent = taskContent;
        this.triggerTime = System.currentTimeMillis() + delayTime * 1000;
    }

    @Override
    public long getDelay(TimeUnit unit) {
        return unit.convert(triggerTime - System.currentTimeMillis(), TimeUnit.MILLISECONDS);
    }

    @Override
    public int compareTo(Delayed o) {
        return this.triggerTime.compareTo(((SanYouTask) o).triggerTime);
    }

}

SanYouTask實現(xiàn)了Delayed接口,構(gòu)造參數(shù)

  • taskContent:延遲任務(wù)的具體的內(nèi)容
  • delayTime:延遲時間,秒為單位
測試
@Slf4j
public class DelayQueueDemo {

    public static void main(String[] args) {
        DelayQueue<SanYouTask> sanYouTaskDelayQueue = new DelayQueue<>();

        new Thread(() -> {
            while (true) {
                try {
                    SanYouTask sanYouTask = sanYouTaskDelayQueue.take();
                    log.info("獲取到延遲任務(wù):{}", sanYouTask.getTaskContent());
                } catch (Exception e) {
                }
            }
        }).start();

        log.info("提交延遲任務(wù)");
        sanYouTaskDelayQueue.offer(new SanYouTask("三友的java日記5s", 5L));
        sanYouTaskDelayQueue.offer(new SanYouTask("三友的java日記3s", 3L));
        sanYouTaskDelayQueue.offer(new SanYouTask("三友的java日記8s", 8L));
    }
}

開啟一個線程從DelayQueue中獲取任務(wù),然后提交了三個任務(wù),延遲時間分為別5s,3s,8s。

測試結(jié)果:

圖片圖片

成功實現(xiàn)了延遲任務(wù)。

實現(xiàn)原理

圖片圖片

offer方法在提交任務(wù)的時候,會通過根據(jù)compareTo的實現(xiàn)對任務(wù)進行排序,將最先需要被執(zhí)行的任務(wù)放到隊列頭。

take方法獲取任務(wù)的時候,會拿到隊列頭部的元素,也就是隊列中最早需要被執(zhí)行的任務(wù),通過getDelay返回值判斷任務(wù)是否需要被立刻執(zhí)行,如果需要的話,就返回任務(wù),如果不需要就會等待這個任務(wù)到延遲時間的剩余時間,當時間到了就會將任務(wù)返回。

Timer

Timer也是JDK提供的api

先來demo

@Slf4j
public class TimerDemo {

    public static void main(String[] args) {
        Timer timer = new Timer();
        
        log.info("提交延遲任務(wù)");
        timer.schedule(new TimerTask() {
            @Override
            public void run() {
                log.info("執(zhí)行延遲任務(wù)");
            }
        }, 5000);
    }

}

通過schedule提交一個延遲時間為5s的延遲任務(wù)

圖片圖片

實現(xiàn)原理

提交的任務(wù)是一個TimerTask

public abstract class TimerTask implements Runnable {
    //忽略其它屬性
    
    long nextExecutionTime;
}

TimerTask內(nèi)部有一個nextExecutionTime屬性,代表下一次任務(wù)執(zhí)行的時間,在提交任務(wù)的時候會計算出nextExecutionTime值。

Timer內(nèi)部有一個TaskQueue對象,用來保存TimerTask任務(wù)的,會根據(jù)nextExecutionTime來排序,保證能夠快速獲取到最早需要被執(zhí)行的延遲任務(wù)。

在Timer內(nèi)部還有一個執(zhí)行任務(wù)的線程TimerThread,這個線程就跟DelayQueue demo中開啟的線程作用是一樣的,用來執(zhí)行到了延遲時間的任務(wù)。

所以總的來看,Timer有點像整體封裝了DelayQueue demo中的所有東西,讓用起來簡單點。

雖然Timer用起來比較簡單,但是在阿里規(guī)范中是不推薦使用的,主要是有以下幾點原因:

  • Timer使用單線程來處理任務(wù),長時間運行的任務(wù)會導(dǎo)致其他任務(wù)的延時處理
  • Timer沒有對運行時異常進行處理,一旦某個任務(wù)觸發(fā)運行時異常,會導(dǎo)致整個Timer崩潰,不安全

ScheduledThreadPoolExecutor

由于Timer在使用上有一定的問題,所以在JDK1.5版本的時候提供了ScheduledThreadPoolExecutor,這個跟Timer的作用差不多,并且他們的方法的命名都是差不多的,但是ScheduledThreadPoolExecutor解決了單線程和異常崩潰等問題。

來個demo

@Slf4j
public class ScheduledThreadPoolExecutorDemo {

    public static void main(String[] args) {
        ScheduledThreadPoolExecutor executor = new ScheduledThreadPoolExecutor(2, new ThreadPoolExecutor.CallerRunsPolicy());

        log.info("提交延遲任務(wù)");
        executor.schedule(() -> log.info("執(zhí)行延遲任務(wù)"), 5, TimeUnit.SECONDS);
    }

}

結(jié)果

圖片圖片

實現(xiàn)原理

ScheduledThreadPoolExecutor繼承了ThreadPoolExecutor,也就是繼承了線程池,所以可以有很多個線程來執(zhí)行任務(wù)。

ScheduledThreadPoolExecutor在構(gòu)造的時候會傳入一個DelayedWorkQueue阻塞隊列,所以線程池內(nèi)部的阻塞隊列是DelayedWorkQueue。

圖片圖片

在提交延遲任務(wù)的時候,任務(wù)會被封裝一個任務(wù)會被封裝成ScheduledFutureTask對象,然后放到DelayedWorkQueue阻塞隊列中。

ScheduledFutureTaskScheduledFutureTask

ScheduledFutureTask實現(xiàn)了前面提到的Delayed接口,所以其實可以猜到DelayedWorkQueue會根據(jù)ScheduledFutureTask對于Delayed接口的實現(xiàn)來排序,所以線程能夠獲取到最早到延遲時間的任務(wù)。

當線程從DelayedWorkQueue中獲取到需要執(zhí)行的任務(wù)之后就會執(zhí)行任務(wù)。

RocketMQ

RocketMQ是阿里開源的一款消息中間件,實現(xiàn)了延遲消息的功能,如果有對RocketMQ不熟悉的小伙伴可以看一下我之前寫的RocketMQ保姆級教程和RocketMQ消息短暫而又精彩的一生 這兩篇文章。

RocketMQ延遲消息的延遲時間默認有18個等級。

圖片圖片

當發(fā)送消息的時候只需要指定延遲等級即可。如果這18個等級的延遲時間不符和你的要求,可以修改RocketMQ服務(wù)端的配置文件。

來個demo

依賴

<dependency>
    <groupId>org.apache.rocketmq</groupId>
    <artifactId>rocketmq-spring-boot-starter</artifactId>
    <version>2.2.1</version>
  
<!--web依賴-->
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-web</artifactId>
    <version>2.2.5.RELEASE</version>
</dependency>

配置文件

rocketmq:
  name-server: 192.168.200.144:9876 #服務(wù)器ip:nameServer端口
  producer:
    group: sanyouProducer

controller類,通過DefaultMQProducer發(fā)送延遲消息到sanyouDelayTaskTopic這個topic,延遲等級為2,也就是延遲時間為5s的意思。

@RestController
@Slf4j
public class RocketMQDelayTaskController {

    @Resource
    private DefaultMQProducer producer;

    @GetMapping("/rocketmq/add")
    public void addTask(@RequestParam("task") String task) throws Exception {
        Message msg = new Message("sanyouDelayTaskTopic", "TagA", task.getBytes(RemotingHelper.DEFAULT_CHARSET));
        msg.setDelayTimeLevel(2);
        // 發(fā)送消息并得到消息的發(fā)送結(jié)果,然后打印
        log.info("提交延遲任務(wù)");
        producer.send(msg);
    }

}

創(chuàng)建一個消費者,監(jiān)聽sanyouDelayTaskTopic的消息。

@Component
@RocketMQMessageListener(consumerGroup = "sanyouConsumer", topic = "sanyouDelayTaskTopic")
@Slf4j
public class SanYouDelayTaskTopicListener implements RocketMQListener<String> {

    @Override
    public void onMessage(String msg) {
        log.info("獲取到延遲任務(wù):{}", msg);
    }

}

啟動應(yīng)用,瀏覽器輸入以下鏈接添加任務(wù)

http://localhost:8080/rocketmq/add?task=sanyou

測試結(jié)果:

圖片圖片

實現(xiàn)原理

圖片圖片

生產(chǎn)者發(fā)送延遲消息之后,RocketMQ服務(wù)端在接收到消息之后,會去根據(jù)延遲級別是否大于0來判斷是否是延遲消息

  • 如果不大于0,說明不是延遲消息,那就會將消息保存到指定的topic中
  • 如果大于0,說明是延遲消息,此時RocketMQ會進行一波偷梁換柱的操作,將消息的topic改成SCHEDULE_TOPIC_XXXX中,XXXX不是占位符,然后存儲。

在BocketMQ內(nèi)部有一個延遲任務(wù),相當于是一個定時任務(wù),這個任務(wù)就會獲取SCHEDULE_TOPIC_XXXX中的消息,判斷消息是否到了延遲時間,如果到了,那么就會將消息的topic存儲到原來真正的topic(拿我們的例子來說就是sanyouDelayTaskTopic)中,之后消費者就可以從真正的topic中獲取到消息了。

圖片圖片

定時任務(wù)

RocketMQ這種實現(xiàn)方式相比于前面提到的三種更加可靠,因為前面提到的三種任務(wù)內(nèi)容都是存在內(nèi)存的,服務(wù)器重啟任務(wù)就丟了,如果要實現(xiàn)任務(wù)不丟還得自己實現(xiàn)邏輯,但是RocketMQ消息有持久化機制,能夠保證任務(wù)不丟失。

RabbitMQ

RabbitMQ也是一款消息中間件,通過RabbitMQ的死信隊列也可以是先延遲任務(wù)的功能。

demo

引入RabbitMQ的依賴

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
    <version>2.2.5.RELEASE</version>
</dependency>

配置文件

spring:
  rabbitmq:
    host: 192.168.200.144 #服務(wù)器ip
    port: 5672
    virtual-host: /

RabbitMQ死信隊列的配置類,后面說原理的時候會介紹干啥的。

@Configuration
public class RabbitMQConfiguration {
    
    @Bean
    public DirectExchange sanyouDirectExchangee() {
        return new DirectExchange("sanyouDirectExchangee");
    }

    @Bean
    public Queue sanyouQueue() {
        return QueueBuilder
                //指定隊列名稱,并持久化
                .durable("sanyouQueue")
                //設(shè)置隊列的超時時間為5秒,也就是延遲任務(wù)的時間
                .ttl(5000)
                //指定死信交換機
                .deadLetterExchange("sanyouDelayTaskExchangee")
                .build();
    }

    @Bean
    public Binding sanyouQueueBinding() {
        return BindingBuilder.bind(sanyouQueue()).to(sanyouDirectExchangee()).with("");
    }

    @Bean
    public DirectExchange sanyouDelayTaskExchange() {
        return new DirectExchange("sanyouDelayTaskExchangee");
    }

    @Bean
    public Queue sanyouDelayTaskQueue() {
        return QueueBuilder
                //指定隊列名稱,并持久化
                .durable("sanyouDelayTaskQueue")
                .build();
    }

    @Bean
    public Binding sanyouDelayTaskQueueBinding() {
        return BindingBuilder.bind(sanyouDelayTaskQueue()).to(sanyouDelayTaskExchange()).with("");
    }

}

RabbitMQDelayTaskController用來發(fā)送消息,這里沒指定延遲時間,是因為在聲明隊列的時候指定了延遲時間為5s

@RestController
@Slf4j
public class RabbitMQDelayTaskController {

    @Resource
    private RabbitTemplate rabbitTemplate;

    @GetMapping("/rabbitmq/add")
    public void addTask(@RequestParam("task") String task) throws Exception {
        // 消息ID,需要封裝到CorrelationData中
        CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());
        log.info("提交延遲任務(wù)");
        // 發(fā)送消息
        rabbitTemplate.convertAndSend("sanyouDirectExchangee", "", task, correlationData);
    }

}

啟動應(yīng)用,瀏覽器輸入以下鏈接添加任務(wù)。

http://localhost:8080/rabbitmq/add?task=sanyou

測試結(jié)果,成功實現(xiàn)5s的延遲任務(wù)。

實現(xiàn)原理

圖片圖片

整個工作流程如下:

  • 消息發(fā)送的時候會將消息發(fā)送到sanyouDirectExchange這個交換機上
  • 由于sanyouDirectExchange綁定了sanyouQueue,所以消息會被路由到sanyouQueue這個隊列上
  • 由于sanyouQueue沒有消費者消費消息,并且又設(shè)置了5s的過期時間,所以當消息過期之后,消息就被放到綁定的sanyouDelayTaskExchange死信交換機中
  • 消息到達sanyouDelayTaskExchange交換機后,由于跟sanyouDelayTaskQueue進行了綁定,所以消息就被路由到sanyouDelayTaskQueue中,消費者就能從sanyouDelayTaskQueue中拿到消息了

上面說的隊列與交換機的綁定關(guān)系,就是上面的配置類所干的事。

其實從這個單從消息流轉(zhuǎn)的角度可以看出,RabbitMQ跟RocketMQ實現(xiàn)有相似之處。

消息最開始都并沒有放到最終消費者消費的隊列中,而都是放到一個中間隊列中,等消息到了過期時間或者說是延遲時間,消息就會被放到最終的隊列供消費者消息。

只不過RabbitMQ需要你顯示的手動指定消息所在的中間隊列,而RocketMQ是在內(nèi)部已經(jīng)做好了這塊邏輯。

除了基于RabbitMQ的死信隊列來做,RabbitMQ官方還提供了延時插件,也可以實現(xiàn)延遲消息的功能,這個插件的大致原理也跟上面說的一樣,延時消息會被先保存在一個中間的地方,叫做Mnesia,然后有一個定時任務(wù)去查詢最近需要被投遞的消息,將其投遞到目標隊列中。

監(jiān)聽Redis過期key

在Redis中,有個發(fā)布訂閱的機制。

圖片圖片

生產(chǎn)者在消息發(fā)送時需要到指定發(fā)送到哪個channel上,消費者訂閱這個channel就能獲取到消息。圖中channel理解成MQ中的topic。

并且在Redis中,有很多默認的channel,只不過向這些channel發(fā)送消息的生產(chǎn)者不是我們寫的代碼,而是Redis本身。這里面就有這么一個channel叫做__keyevent@<db>__:expired,db是指Redis數(shù)據(jù)庫的序號。

當某個Redis的key過期之后,Redis內(nèi)部會發(fā)布一個事件到__keyevent@<db>__:expired這個channel上,只要監(jiān)聽這個事件,那么就可以獲取到過期的key。

所以基于監(jiān)聽Redis過期key實現(xiàn)延遲任務(wù)的原理如下:

  • 將延遲任務(wù)作為key,過期時間設(shè)置為延遲時間
  • 監(jiān)聽__keyevent@<db>__:expired這個channel,那么一旦延遲任務(wù)到了過期時間(延遲時間),那么就可以獲取到這個任務(wù)

來個demo

Spring已經(jīng)實現(xiàn)了監(jiān)聽__keyevent@*__:expired這個channel這個功能,__keyevent@*__:expired中的*代表通配符的意思,監(jiān)聽所有的數(shù)據(jù)庫。

所以demo寫起來就很簡單了,只需4步即可

依賴

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-data-redis</artifactId>
    <version>2.2.5.RELEASE</version>
</dependency>

配置文件

spring:
  redis:
    host: 192.168.200.144
    port: 6379

配置類

@Configuration
public class RedisConfiguration {

    @Bean
    public RedisMessageListenerContainer redisMessageListenerContainer(RedisConnectionFactory connectionFactory) {
        RedisMessageListenerContainer redisMessageListenerContainer = new RedisMessageListenerContainer();
        redisMessageListenerContainer.setConnectionFactory(connectionFactory);
        return redisMessageListenerContainer;
    }

    @Bean
    public KeyExpirationEventMessageListener redisKeyExpirationListener(RedisMessageListenerContainer redisMessageListenerContainer) {
        return new KeyExpirationEventMessageListener(redisMessageListenerContainer);
    }

}

KeyExpirationEventMessageListener實現(xiàn)了對__keyevent@*__:expiredchannel的監(jiān)聽。

圖片圖片

當KeyExpirationEventMessageListener收到Redis發(fā)布的過期Key的消息的時候,會發(fā)布RedisKeyExpiredEvent事件。

圖片圖片

所以我們只需要監(jiān)聽RedisKeyExpiredEvent事件就可以拿到過期消息的Key,也就是延遲消息。

對RedisKeyExpiredEvent事件的監(jiān)聽實現(xiàn)MyRedisKeyExpiredEventListener

@Component
public class MyRedisKeyExpiredEventListener implements ApplicationListener<RedisKeyExpiredEvent> {

    @Override
    public void onApplicationEvent(RedisKeyExpiredEvent event) {
        byte[] body = event.getSource();
        System.out.println("獲取到延遲消息:" + new String(body));
    }

}

代碼寫好,啟動應(yīng)用。

之后我直接通過Redis命令設(shè)置消息,就沒通過代碼發(fā)送消息了,消息的key為sanyou,值為task,值不重要,過期時間為5s

set sanyou task 

expire sanyou 5

成功獲取到延遲任務(wù)。

圖片圖片

雖然這種方式可以實現(xiàn)延遲任務(wù),但是這種方式坑比較多。

任務(wù)存在延遲

Redis過期事件的發(fā)布不是指key到了過期時間就發(fā)布,而是key到了過期時間被清除之后才會發(fā)布事件。

而Redis過期key的兩種清除策略,就是面試八股文常背的兩種:

  • 惰性清除。當這個key過期之后,訪問時,這個Key才會被清除
  • 定時清除。后臺會定期檢查一部分key,如果有key過期了,就會被清除

所以即使key到了過期時間,Redis也不一定會發(fā)送key過期事件,這就到導(dǎo)致雖然延遲任務(wù)到了延遲時間也可能獲取不到延遲任務(wù)。

丟消息太頻繁

Redis實現(xiàn)的發(fā)布訂閱模式,消息是沒有持久化機制,當消息發(fā)布到某個channel之后,如果沒有客戶端訂閱這個channel,那么這個消息就丟了,并不會像MQ一樣進行持久化,等有消費者訂閱的時候再給消費者消費。

所以說,假設(shè)服務(wù)重啟期間,某個生產(chǎn)者或者是Redis本身發(fā)布了一條消息到某個channel,由于服務(wù)重啟,沒有監(jiān)聽這個channel,那么這個消息自然就丟了。

消息消費只有廣播模式

Redis的發(fā)布訂閱模式消息消費只有廣播模式一種。

所謂的廣播模式就是多個消費者訂閱同一個channel,那么每個消費者都能消費到發(fā)布到這個channel的所有消息。

圖片圖片

如圖,生產(chǎn)者發(fā)布了一條消息,內(nèi)容為sanyou,那么兩個消費者都可以同時收到sanyou這條消息。

所以,如果通過監(jiān)聽channel來獲取延遲任務(wù),那么一旦服務(wù)實例有多個的話,還得保證消息不能重復(fù)處理,額外地增加了代碼開發(fā)量。

接收到所有key的某個事件

這個不屬于Redis發(fā)布訂閱模式的問題,而是Redis本身事件通知的問題。

當監(jiān)聽了__keyevent@<db>__:expired的channel,那么所有的Redis的key只要發(fā)生了過期事件都會被通知給消費者,不管這個key是不是消費者想接收到的。

所以如果你只想消費某一類消息的key,那么還得自行加一些標記,比如消息的key加個前綴,消費的時候判斷一下帶前綴的key就是需要消費的任務(wù)。

Redisson的RDelayedQueue

Redisson他是Redis的兒子(Redis son),基于Redis實現(xiàn)了非常多的功能,其中最常使用的就是Redis分布式鎖的實現(xiàn),但是除了實現(xiàn)Redis分布式鎖之外,它還實現(xiàn)了延遲隊列的功能。

先來個demo

引入pom。

<dependency>
    <groupId>org.redisson</groupId>
    <artifactId>redisson</artifactId>
    <version>3.13.1</version>
</dependency>

封裝了一個RedissonDelayQueue類。

@Component
@Slf4j
public class RedissonDelayQueue {

    private RedissonClient redissonClient;

    private RDelayedQueue<String> delayQueue;
    private RBlockingQueue<String> blockingQueue;

    @PostConstruct
    public void init() {
        initDelayQueue();
        startDelayQueueConsumer();
    }

    private void initDelayQueue() {
        Config config = new Config();
        SingleServerConfig serverConfig = config.useSingleServer();
        serverConfig.setAddress("redis://localhost:6379");
        redissonClient = Redisson.create(config);

        blockingQueue = redissonClient.getBlockingQueue("SANYOU");
        delayQueue = redissonClient.getDelayedQueue(blockingQueue);
    }

    private void startDelayQueueConsumer() {
        new Thread(() -> {
            while (true) {
                try {
                    String task = blockingQueue.take();
                    log.info("接收到延遲任務(wù):{}", task);
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        }, "SANYOU-Consumer").start();
    }

    public void offerTask(String task, long seconds) {
        log.info("添加延遲任務(wù):{} 延遲時間:{}s", task, seconds);
        delayQueue.offer(task, seconds, TimeUnit.SECONDS);
    }

}

這個類在創(chuàng)建的時候會去初始化延遲隊列,創(chuàng)建一個RedissonClient對象,之后通過RedissonClient對象獲取到RDelayedQueue和RBlockingQueue對象,傳入的隊列名字叫SANYOU,這個名字無所謂。

當延遲隊列創(chuàng)建之后,會開啟一個延遲任務(wù)的消費線程,這個線程會一直從RBlockingQueue中通過take方法阻塞獲取延遲任務(wù)。

添加任務(wù)的時候是通過RDelayedQueue的offer方法添加的。

controller類,通過接口添加任務(wù),延遲時間為5s。

@RestController
public class RedissonDelayQueueController {

    @Resource
    private RedissonDelayQueue redissonDelayQueue;

    @GetMapping("/add")
    public void addTask(@RequestParam("task") String task) {
        redissonDelayQueue.offerTask(task, 5);
    }

}

啟動項目,在瀏覽器輸入如下連接,添加任務(wù)。

http://localhost:8080/add?task=sanyou

靜靜等待5s,成功獲取到任務(wù)。

圖片圖片

實現(xiàn)原理

如下是Redisson延遲隊列的實現(xiàn)原理:

圖片圖片

SANYOU前面的前綴都是固定的,Redisson創(chuàng)建的時候會拼上前綴。

  • redisson_delay_queue_timeout:SANYOU,sorted set數(shù)據(jù)類型,存放所有延遲任務(wù),按照延遲任務(wù)的到期時間戳(提交任務(wù)時的時間戳 + 延遲時間)來排序的,所以列表的最前面的第一個元素就是整個延遲隊列中最早要被執(zhí)行的任務(wù),這個概念很重要
  • redisson_delay_queue:SANYOU,list數(shù)據(jù)類型,也是存放所有的任務(wù),但是研究下來發(fā)現(xiàn)好像沒什么用。。
  • SANYOU,list數(shù)據(jù)類型,被稱為目標隊列,這個里面存放的任務(wù)都是已經(jīng)到了延遲時間的,可以被消費者獲取的任務(wù),所以上面demo中的RBlockingQueue的take方法是從這個目標隊列中獲取到任務(wù)的
  • redisson_delay_queue_channel:SANYOU,是一個channel,用來通知客戶端開啟一個延遲任務(wù)

任務(wù)提交的時候,Redisson會將任務(wù)放到redisson_delay_queue_timeout:SANYOU中,分數(shù)就是提交任務(wù)的時間戳+延遲時間,就是延遲任務(wù)的到期時間戳。

Redisson客戶端內(nèi)部通過監(jiān)聽redisson_delay_queue_channel:SANYOU這個channel來提交一個延遲任務(wù),這個延遲任務(wù)能夠保證將redisson_delay_queue_timeout:SANYOU中到了延遲時間的任務(wù)從redisson_delay_queue_timeout:SANYOU中移除,存到SANYOU這個目標隊列中。

于是消費者就可以從SANYOU這個目標隊列獲取到延遲任務(wù)了。

所以從這可以看出,Redisson的延遲任務(wù)的實現(xiàn)跟前面說的MQ的實現(xiàn)都是殊途同歸,最開始任務(wù)放到中間的一個地方,叫做redisson_delay_queue_timeout:SANYOU,然后會開啟一個類似于定時任務(wù)的一個東西,去判斷這個中間地方的消息是否到了延遲時間,到了再放到最終的目標的隊列供消費者消費。

Redisson的這種實現(xiàn)方式比監(jiān)聽Redis過期key的實現(xiàn)方式更加可靠,因為消息都存在list和sorted set數(shù)據(jù)類型中,所以消息很少丟。

上述說的兩種Redis的方案更詳細的介紹,可以查看我之前寫的用Redis實現(xiàn)延遲隊列,我研究了兩種方案,發(fā)現(xiàn)并不簡單這篇文章。

Netty的HashedWheelTimer

先來個demo

@Slf4j
public class NettyHashedWheelTimerDemo {

    public static void main(String[] args) {
        HashedWheelTimer timer = new HashedWheelTimer(100, TimeUnit.MILLISECONDS, 8);
        timer.start();

        log.info("提交延遲任務(wù)");
        timer.newTimeout(timeout -> log.info("執(zhí)行延遲任務(wù)"), 5, TimeUnit.SECONDS);
    }

}

測試結(jié)果:

圖片圖片

實現(xiàn)原理

圖片圖片

如圖,時間輪會被分成很多格子(上述demo中的8就代表了8個格子),一個格子代表一段時間(上述demo中的100就代表一個格子是100ms),所以上述demo中,每800ms會走一圈。

當任務(wù)提交的之后,會根據(jù)任務(wù)的到期時間進行hash取模,計算出這個任務(wù)的執(zhí)行時間所在具體的格子,然后添加到這個格子中,通過如果這個格子有多個任務(wù),會用鏈表來保存。所以這個任務(wù)的添加有點像HashMap儲存元素的原理。

HashedWheelTimer內(nèi)部會開啟一個線程,輪詢每個格子,找到到了延遲時間的任務(wù),然后執(zhí)行。

由于HashedWheelTimer也是單線程來處理任務(wù),所以跟Timer一樣,長時間運行的任務(wù)會導(dǎo)致其他任務(wù)的延時處理。

前面Redisson中提到的客戶端延遲任務(wù)就是基于Netty的HashedWheelTimer實現(xiàn)的。

Hutool的SystemTimer

Hutool工具類也提供了延遲任務(wù)的實現(xiàn)SystemTimer

demo

@Slf4j
public class SystemTimerDemo {

    public static void main(String[] args) {
        SystemTimer systemTimer = new SystemTimer();
        systemTimer.start();

        log.info("提交延遲任務(wù)");
        systemTimer.addTask(new TimerTask(() -> log.info("執(zhí)行延遲任務(wù)"), 5000));
    }

}

執(zhí)行結(jié)果

圖片圖片

Hutool底層其實也用到了時間輪。

Qurtaz

Qurtaz是一款開源作業(yè)調(diào)度框架,基于Qurtaz提供的api也可以實現(xiàn)延遲任務(wù)的功能。

demo

依賴

<dependency>
    <groupId>org.quartz-scheduler</groupId>
    <artifactId>quartz</artifactId>
    <version>2.3.2</version>
</dependency>

SanYouJob實現(xiàn)Job接口,當任務(wù)到達執(zhí)行時間的時候會調(diào)用execute的實現(xiàn),從context可以獲取到任務(wù)的內(nèi)容。

@Slf4j
public class SanYouJob implements Job {
    @Override
    public void execute(JobExecutionContext context) throws JobExecutionException {
        JobDetail jobDetail = context.getJobDetail();
        JobDataMap jobDataMap = jobDetail.getJobDataMap();
        log.info("獲取到延遲任務(wù):{}", jobDataMap.get("delayTask"));
    }
}

測試類

public class QuartzDemo {

    public static void main(String[] args) throws SchedulerException, InterruptedException {
        // 1.創(chuàng)建Scheduler的工廠
        SchedulerFactory sf = new StdSchedulerFactory();
        // 2.從工廠中獲取調(diào)度器實例
        Scheduler scheduler = sf.getScheduler();

        // 6.啟動 調(diào)度器
        scheduler.start();

        // 3.創(chuàng)建JobDetail,Job類型就是上面說的SanYouJob
        JobDetail jb = JobBuilder.newJob(SanYouJob.class)
                .usingJobData("delayTask", "這是一個延遲任務(wù)")
                .build();

        // 4.創(chuàng)建Trigger
        Trigger t = TriggerBuilder.newTrigger()
                //任務(wù)的觸發(fā)時間就是延遲任務(wù)到的延遲時間
                .startAt(DateUtil.offsetSecond(new Date(), 5))
                .build();

        // 5.注冊任務(wù)和定時器
        log.info("提交延遲任務(wù)");
        scheduler.scheduleJob(jb, t);
    }
}

執(zhí)行結(jié)果:

圖片圖片

實現(xiàn)原理

核心組件

  • Job:表示一個任務(wù),execute方法的實現(xiàn)是對任務(wù)的執(zhí)行邏輯
  • JobDetail:任務(wù)的詳情,可以設(shè)置任務(wù)需要的參數(shù)等信息
  • Trigger:觸發(fā)器,是用來觸發(fā)業(yè)務(wù)的執(zhí)行,比如說指定5s后觸發(fā)任務(wù),那么任務(wù)就會在5s后觸發(fā)
  • Scheduler:調(diào)度器,內(nèi)部可以注冊多個任務(wù)和對應(yīng)任務(wù)的觸發(fā)器,之后會調(diào)度任務(wù)的執(zhí)行

圖片圖片

啟動的時候會開啟一個QuartzSchedulerThread調(diào)度線程,這個線程會去判斷任務(wù)是否到了執(zhí)行時間,到的話就將任務(wù)交給任務(wù)線程池去執(zhí)行。

無限輪詢延遲任務(wù)

無限輪詢的意思就是開啟一個線程不停的去輪詢?nèi)蝿?wù),當這些任務(wù)到達了延遲時間,那么就執(zhí)行任務(wù)。

demo

@Slf4j
public class PollingTaskDemo {

    private static final List<DelayTask> DELAY_TASK_LIST = new CopyOnWriteArrayList<>();

    public static void main(String[] args) {
        new Thread(() -> {
            while (true) {
                try {
                    for (DelayTask delayTask : DELAY_TASK_LIST) {
                        if (delayTask.triggerTime <= System.currentTimeMillis()) {
                            log.info("處理延遲任務(wù):{}", delayTask.taskContent);
                            DELAY_TASK_LIST.remove(delayTask);
                        }
                    }
                    TimeUnit.MILLISECONDS.sleep(100);
                } catch (Exception e) {
                }
            }
        }).start();

        log.info("提交延遲任務(wù)");
        DELAY_TASK_LIST.add(new DelayTask("三友的java日記", 5L));
    }

    @Getter
    @Setter
    public static class DelayTask {

        private final String taskContent;

        private final Long triggerTime;

        public DelayTask(String taskContent, Long delayTime) {
            this.taskContent = taskContent;
            this.triggerTime = System.currentTimeMillis() + delayTime * 1000;
        }
    }

}

任務(wù)可以存在數(shù)據(jù)庫又或者是內(nèi)存,看具體的需求,這里我為了簡單就放在內(nèi)存里了。

執(zhí)行結(jié)果:

圖片圖片

這種操作簡單,但是就是效率低下,每次都得遍歷所有的任務(wù)。

最后

最后,本文所有示例代碼地址:

https://github.com/sanyou3/delay-task-demo.git
責任編輯:武曉燕 來源: 三友的java日記
相關(guān)推薦

2023-11-07 22:19:05

消息服務(wù)端care

2023-05-23 22:19:04

索引MySQL優(yōu)化

2024-09-09 23:15:55

2024-01-02 22:47:47

Nacos注冊中心節(jié)點

2023-02-27 22:03:06

數(shù)據(jù)庫內(nèi)存RocketMQ

2020-11-05 08:14:17

鏈表

2022-11-17 09:14:58

MySQL加行級鎖幻讀

2021-12-08 10:47:35

RabbitMQ 實現(xiàn)延遲

2023-10-31 12:58:00

TypeScriptJavaScript

2021-03-16 08:21:29

Spark系統(tǒng)并行

2023-03-30 08:28:57

explain關(guān)鍵字MySQL

2022-05-18 08:45:25

Nginx網(wǎng)絡(luò)代碼

2025-09-05 07:23:12

2022-05-31 09:36:18

JDKDelayQueueRedis

2025-05-08 07:19:01

2021-11-11 09:27:02

技術(shù)RedisMySQL

2021-10-18 11:58:56

負載均衡虛擬機

2024-07-19 08:34:18

2024-08-13 15:07:20

2023-01-06 08:15:58

StreamAPI接口
點贊
收藏

51CTO技術(shù)棧公眾號

97视频一区| 精品51国产黑色丝袜高跟鞋| 亚洲欧洲一区| av资源站一区| 91po在线观看91精品国产性色| 农村妇女精品一二区| 人人九九精品| 麻豆一区二区三| 久久久久久国产免费 | www在线免费观看视频| 高清视频一区二区| 日韩免费不卡av| 老熟妇高潮一区二区三区| 日韩精品a在线观看91| 欧美日韩一级片网站| 嫩草影院中文字幕| 国产精品影院在线| 盗摄精品av一区二区三区| 国产成人免费av| 妺妺窝人体色www在线下载| 久久av资源| 精品国产百合女同互慰| 中文字幕日韩综合| 自拍偷拍欧美视频| 不卡视频一二三四| 国产日韩欧美电影在线观看| 日本网站在线免费观看| 欧美成人自拍| 日韩精品一区二区视频| 爱情岛论坛亚洲自拍| 99久久婷婷国产综合精品首页| 成人黄色小视频在线观看| 国产精品久久久久久久9999| 永久免费看片在线播放| 天天天综合网| 亚洲视频在线观看视频| 人妻无码中文久久久久专区| 精品国产三级| 欧美日韩高清影院| 国产又粗又长又大的视频| 99色在线观看| 亚洲国产视频直播| 国产一级黄色录像片| 久久日韩视频| 中文字幕一区二区三区四区 | 三上悠亚在线一区| 欧美日韩五码| 久久精品夜夜夜夜久久| 国产三区二区一区久久| 亚洲va天堂va欧美ⅴa在线| 国产精品草草| 久久亚洲精品中文字幕冲田杏梨| 精品亚洲aⅴ无码一区二区三区| 欧美五码在线| 亚洲黄页视频免费观看| 午夜剧场免费看| 深夜成人在线| 亚洲大片一区二区三区| 国产一级做a爰片久久毛片男| 黄色大片在线播放| 最新国产精品久久精品| 精品国产福利| 色窝窝无码一区二区三区| 成人精品视频一区| 国产一区二区自拍| 免费福利在线视频| 国产一区不卡视频| 成人亚洲欧美一区二区三区| 一级特黄特色的免费大片视频| 蜜臀av一区二区| 国产在线a不卡| 91九色蝌蚪91por成人| 久久狠狠亚洲综合| 亚洲mm色国产网站| 亚洲AV无码一区二区三区性| 成人黄页毛片网站| 免费精品视频一区| 国产www.大片在线| 综合分类小说区另类春色亚洲小说欧美| 中文字幕99| 欧美另类tv| 都市激情亚洲色图| 精品久久久久久久无码 | 欧美黄在线观看| 精品网站999www| 久久精品国产亚洲av久| 999国产精品一区| 日韩成人小视频| 能看毛片的网站| 久久91在线| 一二美女精品欧洲| 日韩欧美中文字幕视频| 日韩亚洲精品在线| 国产精品久久二区| www.com在线观看| 久久久影院官网| 在线看无码的免费网站| 黄色污污视频在线观看| 欧美丝袜自拍制服另类| 国产无套精品一区二区三区| 日韩成人一级| 久久精品男人天堂| 四虎精品永久在线| 国产中文一区二区三区| 精品国产91亚洲一区二区三区www| 3d成人动漫在线| 久久久久久夜精品精品免费| 亚洲永久一区二区三区在线| free性欧美| 欧美精品18+| 欧美第一页浮力影院| 欧美黑人疯狂性受xxxxx野外| 亚洲精品一二三区| 日韩少妇内射免费播放18禁裸乳| 青春草在线免费视频| 色偷偷88欧美精品久久久| 99热这里只有精品2| 蜜桃一区二区| 欧美激情一二区| 亚洲午夜激情视频| 99久久综合精品| 影音先锋男人的网站| 二区三区不卡| 精品成人一区二区三区| 黄视频网站免费看| 日本91福利区| 免费看成人片| gogo久久| 精品美女一区二区| 少妇高潮在线观看| 久久一区二区三区超碰国产精品| 99久热re在线精品996热视频| 成人在线免费看| 欧美视频在线看| 极品白嫩的小少妇| 中国成人一区| 91精品视频观看| 永久免费在线观看视频| 色噜噜狠狠成人网p站| 人妻少妇精品视频一区二区三区| 美女午夜精品| 欧美高清视频在线| 一区二区三区精| 国产精品色哟哟网站| 久久综合伊人77777麻豆最新章节| 久久99精品久久久久久欧洲站 | 欧美野外wwwxxx| 欧美一级在线免费| 91香蕉国产线在线观看| 日韩在线不卡| 国产精品自产拍在线观| 国产农村妇女毛片精品| 国产精品丝袜黑色高跟| 日本免费观看网站| 国产精品一区2区3区| 日韩免费视频在线观看| 免费国产在线观看| 欧美私模裸体表演在线观看| 欧美亚洲色综久久精品国产| 美女在线观看视频一区二区| 亚洲一区免费看| 亚洲精品三区| 欧美日韩福利在线观看| www精品国产| 亚洲国产aⅴ天堂久久| 亚洲精品乱码久久久久久蜜桃图片| 国产综合激情| 精品国产_亚洲人成在线| 亚洲天堂电影| 在线观看久久av| 91亚洲精品国偷拍自产在线观看 | 久久电影国产免费久久电影| 99视频国产精品免费观看| 日韩另类在线| 日韩av中文字幕在线免费观看| 久久久久久久久久久久久久av| 91美女视频网站| 99免费视频观看| 天天综合精品| 91麻豆蜜桃| 男女羞羞在线观看| 一区二区三区久久精品| 国产精品久久综合青草亚洲AV| 国产精品你懂的在线| 一级日本黄色片| 制服诱惑一区二区| 亚洲精品国产一区| 中文字幕日韩高清在线| 欧美孕妇性xx| 日本中文字幕在线观看| 精品国产免费久久| 日本精品入口免费视频| 亚洲免费在线视频一区 二区| 国产精品扒开腿做爽爽爽a片唱戏| 麻豆91精品| 久99久在线| 福利精品在线| 国内成人精品视频| 成人av无码一区二区三区| 疯狂做受xxxx欧美肥白少妇| 麻豆一区在线观看| 成人久久视频在线观看| 2025韩国理伦片在线观看| 国内一区二区三区| 深田咏美在线x99av| 综合在线影院| 欧美成人免费全部| 国产毛片在线看| 日韩一区二区影院| 中文字幕免费高清网站| 一区二区三区丝袜| 国产精品久久免费观看| 成人午夜看片网址| 一二三级黄色片| 久久不射中文字幕| 欧美成人免费在线观看视频| 久久久久久久久久久妇女| 免费在线一区二区| 爱高潮www亚洲精品| 国产在线日韩在线| 日本综合视频| 欧美一级视频一区二区| 国产网红女主播精品视频| xxav国产精品美女主播| 国产天堂在线| 亚洲精品在线看| 成人午夜免费在线观看| 91精品国产色综合久久不卡蜜臀| 中文字幕xxxx| 欧美视频在线免费看| 久久久久97国产| 亚洲女同一区二区| 女性裸体视频网站| 欧美国产精品专区| 老熟妇一区二区| 91美女福利视频| 强迫凌虐淫辱の牝奴在线观看| 国产宾馆实践打屁股91| 国产xxxxhd| 国产美女在线观看一区| 粉色视频免费看| 久久综合综合久久综合| 色戒在线免费观看| 久久精品国产一区二区| 成人免费在线观看视频网站| 天堂午夜影视日韩欧美一区二区| 波多野结衣综合网| 一区二区激情| 久久久久免费看黄a片app| 亚洲国产高清一区二区三区| 韩日视频在线观看| 日韩视频在线一区二区三区| 国产人妻777人伦精品hd| 9色国产精品| 亚洲国产精品久久久久爰色欲| 一本色道久久综合一区| 欧美性久久久久| 日韩精品一区第一页| 久久国产乱子伦免费精品| 天堂成人免费av电影一区| 别急慢慢来1978如如2| 奇米精品一区二区三区四区| www.超碰97.com| 国产一区二区不卡老阿姨| 亚洲午夜久久久久久久久| 99亚偷拍自图区亚洲| 国产精品亚洲无码| 中文字幕乱码日本亚洲一区二区 | 欧美有码在线视频| 欧美大片1688网站| 成人日韩av在线| 中文字幕一区二区三区中文字幕| 国产主播一区二区三区四区| 美女毛片一区二区三区四区| 亚洲一区二区在| 精品成人在线| 999精品网站| 国精产品一区一区三区mba桃花| 女人扒开双腿让男人捅| 91在线国产福利| 欧美成人久久久免费播放| 一区二区三区四区精品在线视频 | 欧美 日本 国产| 欧美极品美女视频| 日韩欧美中文字幕视频| 色婷婷综合久久久中文一区二区| 在线免费av片| 精品久久久久av影院| 国产高清美女一级毛片久久| 欧美成人黑人xx视频免费观看| 国产美女高潮在线观看| 国产精品久久久久福利| 6080成人| 亚洲国产精品一区二区第一页| 欧美成人一区二免费视频软件| 免费日韩中文字幕| 国产高清在线观看免费不卡| 成人精品999| 亚洲黄色尤物视频| 怡红院av久久久久久久| 日韩精品一区二区三区在线播放| 免费黄色在线视频网站| 欧美—级高清免费播放| 成人免费在线观看视频| 国产午夜精品在线| 午夜激情久久| 国产午夜伦鲁鲁| 国产一区二区免费在线| 黄免费在线观看| 亚洲影院理伦片| 一卡二卡在线视频| 亚洲欧美国产一区二区三区| 欧美人动性xxxxz0oz| 国产区精品在线观看| 视频精品在线观看| 国产freexxxx性播放麻豆| 精品亚洲国产成人av制服丝袜| 国产人妻一区二区| 午夜一区二区三区在线观看| 国产乱码久久久久| 色先锋资源久久综合5566| 男女污视频在线观看| 久久6免费高清热精品| 色8久久久久| 日韩片电影在线免费观看| 1024日韩| 国产亚洲精品成人a| 亚洲欧美区自拍先锋| 久久精品国产亚洲av麻豆蜜芽| 日韩久久精品成人| 老司机深夜福利在线观看| 99蜜桃在线观看免费视频网站| 中文精品久久| 天天摸天天舔天天操| 国产精品毛片久久久久久| 国产在线一级片| 亚洲天堂2020| 欧美三级精品| 日本一区视频在线播放| 噜噜噜躁狠狠躁狠狠精品视频| 在线 丝袜 欧美 日韩 制服| 午夜国产精品影院在线观看| 亚洲精品国产精品乱码不卡| 欧美成人在线网站| 欧美成人精品一级| 天天想你在线观看完整版电影免费| 久久精品国产成人一区二区三区| 少妇久久久久久久久久| 在线欧美小视频| jizz日韩| 国产日韩欧美视频| 一区二区蜜桃| 91人人澡人人爽| 亚洲国产成人91porn| 男人天堂综合网| 91豆花精品一区| 自拍偷拍精品| 五月婷婷狠狠操| 国产精品久久久久久久第一福利| 亚洲天堂2021av| 美女av一区二区三区| 免费看日产一区二区三区| 波多野结衣av一区二区全免费观看| 成人av免费网站| 永久免费无码av网站在线观看| 亚洲图片欧洲图片av| 在线看一级片| 国产精品中出一区二区三区| 亚洲一区国产| 国产午夜福利一区| 69成人精品免费视频| 日韩av成人| 国产精品成人v| 亚洲网色网站| 亚洲欧美高清在线| 一本一道波多野结衣一区二区| 成人免费在线电影| 91亚洲一区精品| 99国内精品| 又嫩又硬又黄又爽的视频| 欧美一区二区日韩| 欧美三级网站| 亚洲成年人专区| 99久久伊人精品| 这里只有久久精品视频| 欧美老女人性视频| 偷拍亚洲精品| 国产高清999| 欧美午夜精品久久久久久久| 在线观看麻豆蜜桃| 国产在线欧美日韩| 蜜臀av国产精品久久久久| 国产一级二级三级| 一区二区成人精品| 99re热精品视频| 亚洲色图久久久| 亚洲国产美国国产综合一区二区| av大片在线播放| 国产一区二区精品免费| 久久成人av少妇免费|