精品欧美一区二区三区在线观看 _久久久久国色av免费观看性色_国产精品久久在线观看_亚洲第一综合网站_91精品又粗又猛又爽_小泽玛利亚一区二区免费_91亚洲精品国偷拍自产在线观看 _久久精品视频在线播放_美女精品久久久_欧美日韩国产成人在线

「干掉定時(shí)任務(wù)!」Spring Boot + DelayQueue 手把手教你玩轉(zhuǎn)分布式延時(shí)隊(duì)列!

開發(fā) 前端
我們重構(gòu)并完整實(shí)現(xiàn)了一個(gè)基于 Spring Boot + DelayQueue 的分布式延時(shí)任務(wù)組件,滿足生產(chǎn)級(jí)別的三個(gè)核心要求:不丟失、不重復(fù)、準(zhǔn)時(shí)觸發(fā)。

為什么我們不再依賴定時(shí)任務(wù)

在很多分布式業(yè)務(wù)系統(tǒng)中,延時(shí)任務(wù)(例如訂單超時(shí)取消、活動(dòng)自動(dòng)結(jié)束、消息狀態(tài)回調(diào)等)是一種剛需功能。 過去,開發(fā)者通常使用 @Scheduled 或者 Quartz 等定時(shí)任務(wù)框架實(shí)現(xiàn),但它們存在幾個(gè)痛點(diǎn):

  • 任務(wù)分配不均:多節(jié)點(diǎn)部署后,容易出現(xiàn)任務(wù)重復(fù)執(zhí)行或遺漏執(zhí)行;
  • 缺乏靈活性:修改任務(wù)周期需重啟服務(wù);
  • 高可用困難:單節(jié)點(diǎn)宕機(jī)導(dǎo)致任務(wù)中斷;
  • 時(shí)間精度受限:無法保證“準(zhǔn)點(diǎn)”觸發(fā)。

本文將手把手帶你使用 Spring Boot + DelayQueue 實(shí)現(xiàn)一個(gè)高性能、高可用、可橫向擴(kuò)展的 分布式延時(shí)任務(wù)系統(tǒng)。 通過該方案,我們實(shí)現(xiàn)三大目標(biāo):

不丟失、不重復(fù)、 要準(zhǔn)時(shí)。

系統(tǒng)總體設(shè)計(jì)

整體組件架構(gòu)分為四個(gè)核心模塊

  1. 協(xié)調(diào)服務(wù)(Coordinator) 負(fù)責(zé)節(jié)點(diǎn)注冊(cè)與發(fā)現(xiàn)、心跳?;?、健康監(jiān)測(cè)與任務(wù)轉(zhuǎn)移。
  2. 任務(wù)存儲(chǔ)(TaskStorage) 負(fù)責(zé)任務(wù)數(shù)據(jù)的持久化與執(zhí)行狀態(tài)追蹤。
  3. 任務(wù)執(zhí)行器(DelayTaskExecutor) 執(zhí)行業(yè)務(wù)邏輯的回調(diào)接口。
  4. 分布式延時(shí)隊(duì)列(DistributedDelayQueue) 系統(tǒng)的統(tǒng)一入口,封裝任務(wù)分配、調(diào)度、執(zhí)行與節(jié)點(diǎn)管理。

系統(tǒng)的核心目標(biāo)與消息隊(duì)列一致:

不丟、不重、不堆積 —— 對(duì)應(yīng)延時(shí)隊(duì)列的三要素:穩(wěn)定、精確、可擴(kuò)展。

代碼實(shí)現(xiàn)

pom.xml 與 application.yml

文件:/pom.xml

<!-- 只列核心依賴,按需補(bǔ)充版本 -->
<project xmlns="http://maven.apache.org/POM/4.0.0" ...>
  <modelVersion>4.0.0</modelVersion>
  <groupId>com.icoderoad</groupId>
  <artifactId>distributed-delay-queue</artifactId>
  <version>1.0.0</version>
  <properties>
    <java.version>17</java.version>
    <spring.boot.version>3.1.0</spring.boot.version>
  </properties>
  <dependencies>
    <!-- Spring Boot Starter -->
    <dependency>
      <groupId>org.springframework.boot</groupId>
      <artifactId>spring-boot-starter</artifactId>
      <version>${spring.boot.version}</version>
    </dependency>


    <!-- Web -->
    <dependency>
      <groupId>org.springframework.boot</groupId>
      <artifactId>spring-boot-starter-web</artifactId>
    </dependency>


    <!-- Redis -->
    <dependency>
      <groupId>org.springframework.boot</groupId>
      <artifactId>spring-boot-starter-data-redis</artifactId>
    </dependency>


    <!-- Redisson -->
    <dependency>
      <groupId>org.redisson</groupId>
      <artifactId>redisson-spring-boot-starter</artifactId>
      <version>3.20.0</version>
    </dependency>


    <!-- Lombok -->
    <dependency>
      <groupId>org.projectlombok</groupId>
      <artifactId>lombok</artifactId>
      <optional>true</optional>
    </dependency>


    <!-- For logging -->
    <dependency>
      <groupId>org.slf4j</groupId>
      <artifactId>slf4j-api</artifactId>
    </dependency>


    <!-- Test -->
    <dependency>
      <groupId>org.springframework.boot</groupId>
      <artifactId>spring-boot-starter-test</artifactId>
      <scope>test</scope>
    </dependency>
  </dependencies>
  <build>
    <plugins>
      <!-- maven compiler etc. -->
      <plugin>
        <groupId>org.apache.maven.plugins</groupId>
        <artifactId>maven-compiler-plugin</artifactId>
        <configuration>
          <source>${java.version}</source>
          <target>${java.version}</target>
        </configuration>
      </plugin>
    </plugins>
  </build>
</project>

文件:/src/main/resources/application.yml

server:
  port: 8080


spring:
  redis:
    host: localhost
    port: 6379
    # 如需密碼請(qǐng)配置
    # password: your_password


ptc:
  delay:
    heartbeat-period: 30          # 心跳周期(s)
    pull-initial-delay: 0         # 拉取數(shù)據(jù)延時(shí)啟動(dòng)(s)
    pull-period: 600              # 拉取周期(s) 默認(rèn)10分鐘
    health-initial-delay: 120     # 健康檢查延時(shí)啟動(dòng)(s)
    health-period: 180            # 健康檢查周期(s)
    remove-initial-delay: 3600    # 已執(zhí)行記錄刪除延遲(默認(rèn)1小時(shí))
    remove-period: 3600           # 已執(zhí)行記錄刪除周期

常量與屬性

文件:/src/main/java/com/icoderoad/delayqueue/constants/DelayConstant.java

package com.icoderoad.delayqueue.constants;


/**
 * DelayQueue 相關(guān)常量
 */
public class DelayConstant {
    public static final String NODES_KEY = "ptc:nodes";
    public static final String NODE_HEARTBEAT_KEY_PREFIX = "ptc:heartbeat:";
    public static final String NODES_HEALTH_KEY = "ptc:nodes:health";
    public static final String DELAY_TASK_KEY_PREFIX = "ptc:delay:task:";
    public static final String DELAY_EXECUTED_KEY = "ptc:delay:executed";
    public static final String DELAY_EXECUTING_KEY_PREFIX = "ptc:delay:executing:";
}

文件:/src/main/java/com/icoderoad/delayqueue/config/DelayProperties.java

package com.icoderoad.delayqueue.config;


import lombok.Data;
import org.springframework.boot.context.properties.ConfigurationProperties;


/**
 * 可配置屬性
 */
@Data
@ConfigurationProperties(prefix = "ptc.delay")
public class DelayProperties {
    private Integer heartbeatPeriod = 30;
    private Integer pullInitialDelay = 0;
    private Integer pullPeriod = 10 * 60;
    private Integer healthInitialDelay = 2 * 60;
    private Integer healthPeriod = 3 * 60;
    private Integer removeInitialDelay = 60 * 60;
    private Integer removePeriod = 60 * 60;
}

實(shí)體類:DelayTask

文件:/src/main/java/com/icoderoad/delayqueue/core/DelayTask.java

package com.icoderoad.delayqueue.core;


import lombok.Data;
import org.springframework.lang.NonNull;


import java.util.concurrent.Delayed;
import java.util.concurrent.TimeUnit;


/**
 * 延時(shí)任務(wù)實(shí)體 - 實(shí)現(xiàn) JDK Delayed 接口以支持 DelayQueue
 */
@Data
public class DelayTask implements Delayed {


    private final String queueName;
    private final String taskId;
    private final long executeTime;


    public DelayTask(String queueName, String taskId, long executeTime) {
        this.queueName = queueName;
        this.taskId = taskId;
        this.executeTime = executeTime;
    }


    @Override
    public long getDelay(TimeUnit unit) {
        return unit.convert(executeTime - System.currentTimeMillis(), TimeUnit.MILLISECONDS);
    }


    @Override
    public int compareTo(@NonNull Delayed o) {
        return Long.compare(this.executeTime, ((DelayTask) o).getExecuteTime());
    }


    public String queueTaskId() {
        return this.queueName + ":" + this.taskId;
    }
}

執(zhí)行器接口

文件:/src/main/java/com/icoderoad/delayqueue/executor/DelayTaskExecutor.java

package com.icoderoad.delayqueue.executor;


import com.icoderoad.delayqueue.core.DelayTask;


/**
 * 延時(shí)任務(wù)業(yè)務(wù)執(zhí)行器接口
 */
public interface DelayTaskExecutor {


    void run(DelayTask delayedTask);


    String queueName();
}

存儲(chǔ)接口與 Redis 實(shí)現(xiàn)

文件:/src/main/java/com/icoderoad/delayqueue/storage/TaskStorage.java

package com.icoderoad.delayqueue.storage;


import com.icoderoad.delayqueue.core.DelayTask;


import java.util.List;


/**
 * 任務(wù)存儲(chǔ)接口 - 支持多種后端實(shí)現(xiàn)
 */
public interface TaskStorage {
    void addTask(DelayTask task);
    void removeTask(DelayTask task);
    List<DelayTask> listTask(String queueName, Long startTime, Long endTime);
    void addExecutedTask(DelayTask task);
    boolean isExecuted(DelayTask task);
    void removeExecutedTask(Long startTime, Long endTime);
}

文件:/src/main/java/com/icoderoad/delayqueue/storage/impl/RedisTaskStorage.java

package com.icoderoad.delayqueue.storage.impl;


import com.icoderoad.delayqueue.constants.DelayConstant;
import com.icoderoad.delayqueue.core.DelayTask;
import com.icoderoad.delayqueue.storage.TaskStorage;
import jakarta.annotation.Resource;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.data.redis.core.ZSetOperations;
import org.springframework.util.CollectionUtils;


import java.util.ArrayList;
import java.util.List;
import java.util.Set;


/**
 * Redis 實(shí)現(xiàn) - 使用 ZSet 存儲(chǔ)任務(wù) (score = executeTime)
 */
public class RedisTaskStorage implements TaskStorage {


    @Resource
    private StringRedisTemplate stringRedisTemplate;


    @Override
    public void addTask(DelayTask task) {
        stringRedisTemplate.opsForZSet().add(DelayConstant.DELAY_TASK_KEY_PREFIX + task.getQueueName(),
                task.getTaskId(), task.getExecuteTime());
    }


    @Override
    public void removeTask(DelayTask task) {
        stringRedisTemplate.opsForZSet().remove(DelayConstant.DELAY_TASK_KEY_PREFIX + task.getQueueName(),
                task.getTaskId());
    }


    @Override
    public List<DelayTask> listTask(String queueName, Long startTime, Long endTime) {
        Set<ZSetOperations.TypedTuple<String>> tuples = stringRedisTemplate.opsForZSet()
                .rangeByScoreWithScores(DelayConstant.DELAY_TASK_KEY_PREFIX + queueName, startTime, endTime);
        List<DelayTask> result = new ArrayList<>();
        if (CollectionUtils.isEmpty(tuples)) {
            return result;
        }
        for (ZSetOperations.TypedTuple<String> t : tuples) {
            Double score = t.getScore();
            if (score == null) continue;
            result.add(new DelayTask(queueName, t.getValue(), score.longValue()));
        }
        return result;
    }


    @Override
    public void addExecutedTask(DelayTask task) {
        stringRedisTemplate.opsForZSet().add(DelayConstant.DELAY_EXECUTED_KEY, task.queueTaskId(), System.currentTimeMillis());
    }


    @Override
    public boolean isExecuted(DelayTask task) {
        Double score = stringRedisTemplate.opsForZSet().score(DelayConstant.DELAY_EXECUTED_KEY, task.queueTaskId());
        if (score == null) return false;
        return task.getExecuteTime() <= score.longValue();
    }


    @Override
    public void removeExecutedTask(Long startTime, Long endTime) {
        stringRedisTemplate.opsForZSet().removeRangeByScore(DelayConstant.DELAY_EXECUTED_KEY, startTime, endTime);
    }
}

(Coordinator)接口與 Redis 實(shí)現(xiàn)

文件:/src/main/java/com/icoderoad/delayqueue/cluster/Coordinator.java

package com.icoderoad.delayqueue.cluster;


import java.util.List;
import java.util.function.BiConsumer;


/**
 * 協(xié)調(diào)器接口:節(jié)點(diǎn)注冊(cè)、心跳、健康檢查、獲取活躍節(jié)點(diǎn)等
 */
public interface Coordinator {
    String registerNode();
    void unRegisterNode(String nodeId);
    List<String> getActiveNodes();
    void heartBeat(String nodeId);
    void checkClusterHealth(BiConsumer<List<String>, List<String>> consumer);
}

文件:/src/main/java/com/icoderoad/delayqueue/cluster/impl/RedisCoordinator.java

package com.icoderoad.delayqueue.cluster.impl;


import com.icoderoad.delayqueue.cluster.Coordinator;
import com.icoderoad.delayqueue.constants.DelayConstant;
import com.icoderoad.delayqueue.config.DelayProperties;
import jakarta.annotation.Resource;
import org.redisson.api.RLock;
import org.redisson.api.RedissonClient;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.util.CollectionUtils;


import java.util.ArrayList;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.function.BiConsumer;


/**
 * 基于 Redis 的協(xié)調(diào)器實(shí)現(xiàn)
 */
public class RedisCoordinator implements Coordinator {


    private static final Logger logger = LoggerFactory.getLogger(RedisCoordinator.class);


    @Resource
    private StringRedisTemplate stringRedisTemplate;


    @Resource
    private RedissonClient redissonClient;


    @Resource
    private DelayProperties delayProperties;


    private final ScheduledExecutorService heartbeatExecutor = Executors.newSingleThreadScheduledExecutor();


    @Override
    public String registerNode() {
        String nodeId = UUID.randomUUID().toString().replace("-", "");
        stringRedisTemplate.opsForList().rightPush(DelayConstant.NODES_KEY, nodeId);
        setNodeHeartbeat(nodeId);
        return nodeId;
    }


    @Override
    public void unRegisterNode(String nodeId) {
        stringRedisTemplate.opsForList().remove(DelayConstant.NODES_KEY, 1, nodeId);
        stringRedisTemplate.delete(DelayConstant.NODE_HEARTBEAT_KEY_PREFIX + nodeId);
        heartbeatExecutor.shutdownNow();
    }


    @Override
    public List<String> getActiveNodes() {
        List<String> nodes = stringRedisTemplate.opsForList().range(DelayConstant.NODES_KEY, 0, -1);
        if (CollectionUtils.isEmpty(nodes)) return new ArrayList<>();
        List<String> active = new ArrayList<>();
        for (String node : nodes) {
            Boolean exist = stringRedisTemplate.hasKey(DelayConstant.NODE_HEARTBEAT_KEY_PREFIX + node);
            if (Boolean.TRUE.equals(exist)) active.add(node);
        }
        return active;
    }


    @Override
    public void heartBeat(String nodeId) {
        heartbeatExecutor.scheduleAtFixedRate(() -> setNodeHeartbeat(nodeId),
                delayProperties.getHeartbeatPeriod(), delayProperties.getHeartbeatPeriod(), TimeUnit.SECONDS);
    }


    @Override
    public void checkClusterHealth(BiConsumer<List<String>, List<String>> consumer) {
        RLock lock = redissonClient.getLock(DelayConstant.NODES_HEALTH_KEY);
        boolean locked = false;
        try {
            locked = lock.tryLock();
            if (!locked) return;


            List<String> nodes = stringRedisTemplate.opsForList().range(DelayConstant.NODES_KEY, 0, -1);
            if (CollectionUtils.isEmpty(nodes)) return;


            List<String> dead = new ArrayList<>();
            for (String node : nodes) {
                Boolean exist = stringRedisTemplate.hasKey(DelayConstant.NODE_HEARTBEAT_KEY_PREFIX + node);
                if (!Boolean.TRUE.equals(exist)) dead.add(node);
            }


            if (!dead.isEmpty()) {
                // 從列表中移除 dead 節(jié)點(diǎn)(逐個(gè)刪除以保持原子性)
                dead.forEach(node -> stringRedisTemplate.opsForList().remove(DelayConstant.NODES_KEY, 1, node));
                // callback: 讓調(diào)用者處理任務(wù)遷移
                consumer.accept(nodes, dead);
            }
        } catch (Exception e) {
            logger.error("checkClusterHealth error", e);
        } finally {
            if (locked) {
                try { lock.unlock(); } catch (Exception ignored) {}
            }
        }
    }


    private void setNodeHeartbeat(String nodeId) {
        stringRedisTemplate.opsForValue().set(DelayConstant.NODE_HEARTBEAT_KEY_PREFIX + nodeId,
                "1", delayProperties.getHeartbeatPeriod() * 2, TimeUnit.SECONDS);
    }
}

線程池包裝(PlasticeneThreadExecutor)

文件:/src/main/java/com/icoderoad/delayqueue/util/PlasticeneThreadExecutor.java

package com.icoderoad.delayqueue.util;


import java.util.concurrent.*;


/**
 * 簡易線程池包裝,作為工作線程池
 */
public class PlasticeneThreadExecutor {


    private final ThreadPoolExecutor executor;


    public PlasticeneThreadExecutor(int corePoolSize, int maximumPoolSize, long keepAliveMillis, String threadNamePrefix) {
        BlockingQueue<Runnable> queue = new LinkedBlockingQueue<>(10000);
        ThreadFactory factory = r -> {
            Thread t = new Thread(r);
            t.setName(threadNamePrefix + "-" + t.getId());
            t.setDaemon(false);
            return t;
        };
        executor = new ThreadPoolExecutor(corePoolSize, maximumPoolSize, keepAliveMillis, TimeUnit.MILLISECONDS, queue, factory,
                new ThreadPoolExecutor.CallerRunsPolicy());
    }


    public void submit(Runnable task) {
        executor.submit(task);
    }


    public void shutdown() {
        executor.shutdown();
    }
}

分布式延時(shí)隊(duì)列核心:DistributedDelayQueue

文件:/src/main/java/com/icoderoad/delayqueue/core/DistributedDelayQueue.java

這是系統(tǒng)的核心,邏輯較長 — 我將完整給出,并在方法前添加注釋以便理解。

package com.icoderoad.delayqueue.core;


import com.icoderoad.delayqueue.cluster.Coordinator;
import com.icoderoad.delayqueue.config.DelayProperties;
import com.icoderoad.delayqueue.constants.DelayConstant;
import com.icoderoad.delayqueue.executor.DelayTaskExecutor;
import com.icoderoad.delayqueue.storage.TaskStorage;
import com.icoderoad.delayqueue.util.PlasticeneThreadExecutor;
import jakarta.annotation.Resource;
import org.redisson.api.RLock;
import org.redisson.api.RedissonClient;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.context.ApplicationListener;
import org.springframework.context.event.ContextClosedEvent;
import org.springframework.util.CollectionUtils;


import java.util.*;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Collectors;


/**
 * 分布式延時(shí)隊(duì)列核心實(shí)現(xiàn)
 */
public class DistributedDelayQueue implements InitializingBean, ApplicationListener<ContextClosedEvent> {


    private static final Logger logger = LoggerFactory.getLogger(DistributedDelayQueue.class);


    @Resource
    private Coordinator coordinator;


    @Resource
    private TaskStorage taskStorage;


    @Resource
    private RedissonClient redissonClient;


    @Resource
    private DelayProperties delayProperties;


    // 可選注入的業(yè)務(wù)執(zhí)行器
    @Autowired(required = false)
    private List<DelayTaskExecutor> taskExecutors;


    // 隊(duì)列名稱 -> DelayQueue
    private final ConcurrentMap<String, DelayQueue<DelayTask>> delayMap = new ConcurrentHashMap<>();


    private final Set<String> queueNameSet = new HashSet<>();
    private final Set<String> taskIdSet = new HashSet<>(); // 本節(jié)點(diǎn)已加載任務(wù) id
    private String nodeId;
    private final AtomicBoolean running = new AtomicBoolean(false);


    private final ScheduledExecutorService loadExecutor = Executors.newSingleThreadScheduledExecutor(r -> {
        Thread t = new Thread(r); t.setName("ddq-load-executor"); return t;
    });
    private final ScheduledExecutorService runningExecutor = Executors.newSingleThreadScheduledExecutor(r -> {
        Thread t = new Thread(r); t.setName("ddq-running-executor"); return t;
    });
    private final ScheduledExecutorService healthExecutor = Executors.newSingleThreadScheduledExecutor(r -> {
        Thread t = new Thread(r); t.setName("ddq-health-executor"); return t;
    });
    private final ScheduledExecutorService removeExecutor = Executors.newSingleThreadScheduledExecutor(r -> {
        Thread t = new Thread(r); t.setName("ddq-remove-executor"); return t;
    });


    private final PlasticeneThreadExecutor workExecutor = new PlasticeneThreadExecutor(
            Runtime.getRuntime().availableProcessors() + 1,
            Runtime.getRuntime().availableProcessors() * 5,
            1000,
            "delay-consumer"
    );


    /**
     * 添加延遲任務(wù)(業(yè)務(wù)入口)
     * 1) 若該任務(wù)在下一次拉取周期內(nèi)會(huì)到期,則直接放入本地 DelayQueue 以保證準(zhǔn)時(shí);
     * 2) 同時(shí)持久化到存儲(chǔ)(Redis 等)
     */
    public void addTask(DelayTask delayTask) {
        long executeTime = delayTask.getExecuteTime();
        long afterTime = System.currentTimeMillis() + TimeUnit.SECONDS.toMillis(delayProperties.getPullPeriod());
        if (afterTime > executeTime) {
            offerTask(delayTask);
        }
        taskStorage.addTask(delayTask);
    }


    @Override
    public void afterPropertiesSet() {
        if (CollectionUtils.isEmpty(taskExecutors)) {
            // 如果沒有任何業(yè)務(wù)執(zhí)行器,組件無需初始化
            logger.warn("no DelayTaskExecutor found, DistributedDelayQueue will not start.");
            return;
        }
        // 收集隊(duì)列名
        taskExecutors.forEach(executor -> queueNameSet.add(executor.queueName()));


        // 注冊(cè)節(jié)點(diǎn)并啟動(dòng)心跳
        this.nodeId = coordinator.registerNode();
        coordinator.heartBeat(this.nodeId);


        // 啟動(dòng)工作線程(掃描本地 DelayQueues)
        running.set(true);
        runningExecutor.submit(() -> {
            while (running.get()) {
                try {
                    for (Map.Entry<String, DelayQueue<DelayTask>> entry : delayMap.entrySet()) {
                        DelayQueue<DelayTask> dq = entry.getValue();
                        DelayTask task = dq.poll(1, TimeUnit.SECONDS);
                        executeTask(task);
                    }
                    TimeUnit.SECONDS.sleep(1);
                } catch (InterruptedException e) {
                    running.set(false);
                    Thread.currentThread().interrupt();
                    logger.error("DistributedDelayQueue worker interrupted", e);
                } catch (Exception e) {
                    logger.error("DistributedDelayQueue worker error", e);
                }
            }
        });


        // 啟動(dòng)定時(shí)拉取任務(wù)(拉取未來 pullPeriod 時(shí)間內(nèi)的任務(wù))
        loadExecutor.scheduleAtFixedRate(this::loadTask,
                delayProperties.getPullInitialDelay(),
                delayProperties.getPullPeriod(),
                TimeUnit.SECONDS);


        // 健康檢查:檢測(cè)下線節(jié)點(diǎn)并觸發(fā)任務(wù)遷移
        healthExecutor.scheduleWithFixedDelay(() ->
                        coordinator.checkClusterHealth(this::moveOfflineNodeTask),
                delayProperties.getHealthInitialDelay(),
                delayProperties.getHealthPeriod(),
                TimeUnit.SECONDS);


        // 定期刪除已執(zhí)行的歷史數(shù)據(jù)
        removeExecutor.scheduleAtFixedRate(() -> {
                    long endTime = System.currentTimeMillis() - TimeUnit.SECONDS.toMillis(delayProperties.getRemovePeriod());
                    taskStorage.removeExecutedTask(0L, endTime);
                }, delayProperties.getRemoveInitialDelay(),
                delayProperties.getRemovePeriod(), TimeUnit.SECONDS);


        logger.info("DistributedDelayQueue initialized: nodeId={}", this.nodeId);
    }


    @Override
    public void onApplicationEvent(ContextClosedEvent event) {
        running.set(false);
        // 注銷節(jié)點(diǎn),優(yōu)雅下線
        try {
            coordinator.unRegisterNode(this.nodeId);
        } catch (Exception e) {
            logger.warn("unregister node failed", e);
        }
        healthExecutor.shutdownNow();
        loadExecutor.shutdownNow();
        runningExecutor.shutdownNow();
        workExecutor.shutdown();
        removeExecutor.shutdownNow();
        delayMap.clear();
        taskIdSet.clear();
        queueNameSet.clear();
        logger.info("DistributedDelayQueue destroyed: nodeId={}", this.nodeId);
    }


    /**
     * 執(zhí)行任務(wù):用線程池異步執(zhí)行,執(zhí)行前通過分布式鎖保證冪等性
     */
    private void executeTask(DelayTask task) {
        if (task == null) return;


        workExecutor.submit(() -> {
            taskIdSet.remove(task.queueTaskId());
            RLock lock = redissonClient.getLock(DelayConstant.DELAY_EXECUTING_KEY_PREFIX + task.queueTaskId());
            boolean locked = false;
            try {
                locked = lock.tryLock();
                if (!locked) return;


                // 從存儲(chǔ)中移除(刪除原始任務(wù))
                taskStorage.removeTask(task);


                // 檢查是否執(zhí)行過
                if (taskStorage.isExecuted(task)) {
                    logger.info("task already executed: {}", task.queueTaskId());
                    return;
                }


                // 先記錄執(zhí)行過
                taskStorage.addExecutedTask(task);


                // 找到對(duì)應(yīng)的 executor 并執(zhí)行
                for (DelayTaskExecutor executor : taskExecutors) {
                    if (Objects.equals(executor.queueName(), task.getQueueName())) {
                        try {
                            executor.run(task);
                        } catch (Exception e) {
                            // 執(zhí)行業(yè)務(wù)異常:可加重試、告警等機(jī)制(此處只記錄)
                            logger.error("business executor error for task: {}", task.queueTaskId(), e);
                        }
                    }
                }
                logger.info("task execute success: {}", task.queueTaskId());
            } catch (Exception e) {
                logger.error("executeTask error: {}", task.queueTaskId(), e);
            } finally {
                if (locked) {
                    try { lock.unlock(); } catch (Exception ignored) {}
                }
            }
        });
    }


    /**
     * 當(dāng)檢測(cè)到節(jié)點(diǎn)下線時(shí),將下線節(jié)點(diǎn)負(fù)責(zé)的任務(wù)遷移到本節(jié)點(diǎn)(或其它活躍節(jié)點(diǎn))
     */
    private void moveOfflineNodeTask(List<String> originNodes, List<String> deadNodes) {
        if (!running.get()) return;
        if (CollectionUtils.isEmpty(originNodes) || CollectionUtils.isEmpty(deadNodes)) return;


        long end = System.currentTimeMillis() + TimeUnit.SECONDS.toMillis(delayProperties.getPullPeriod());


        // 對(duì)于每個(gè)下線節(jié)點(diǎn),掃描各業(yè)務(wù)隊(duì)列里執(zhí)行時(shí)間在 [0, end] 的任務(wù),按照哈希判斷是否曾經(jīng)分配到該下線節(jié)點(diǎn)
        for (String deadNode : deadNodes) {
            for (String queueName : queueNameSet) {
                List<DelayTask> tasks = taskStorage.listTask(queueName, 0L, end);
                for (DelayTask t : tasks) {
                    int hash = Math.abs(t.getTaskId().hashCode());
                    int index = hash % originNodes.size();
                    if (Objects.equals(originNodes.get(index), deadNode)) {
                        offerTask(t);
                    }
                }
            }
        }
    }


    /**
     * 周期性從存儲(chǔ)中加載未來一段時(shí)間內(nèi)將到期的任務(wù),按 hash 分片分配到各節(jié)點(diǎn)
     */
    private void loadTask() {
        if (!running.get()) return;
        logger.info("DistributedDelayQueue loadTask start: nodeId={}", this.nodeId);
        List<String> activeNodes = coordinator.getActiveNodes();
        if (CollectionUtils.isEmpty(activeNodes)) {
            logger.warn("no active nodes found during loadTask");
            return;
        }


        long end = System.currentTimeMillis() + TimeUnit.SECONDS.toMillis(delayProperties.getPullPeriod());
        for (String queueName : queueNameSet) {
            List<DelayTask> tasks = taskStorage.listTask(queueName, 0L, end);
            if (tasks == null || tasks.isEmpty()) continue;
            for (DelayTask t : tasks) {
                if (canProcess(t, activeNodes)) {
                    offerTask(t);
                }
            }
        }
    }


    private Boolean canProcess(DelayTask task, List<String> activeNodes) {
        if (!running.get()) return false;
        if (taskIdSet.contains(task.queueTaskId())) return false;
        if (CollectionUtils.isEmpty(activeNodes)) return false;


        int hash = Math.abs(task.getTaskId().hashCode());
        int index = hash % activeNodes.size();
        return activeNodes.get(index).equals(this.nodeId);
    }


    private void offerTask(DelayTask task) {
        DelayQueue<DelayTask> dq = delayMap.computeIfAbsent(task.getQueueName(), k -> new DelayQueue<>());
        dq.offer(task);
        taskIdSet.add(task.queueTaskId());
        logger.info("task offered: {} to node {}", task.queueTaskId(), this.nodeId);
    }
}

自動(dòng)配置類(Spring Boot 自動(dòng)注冊(cè) Bean)

文件:/src/main/java/com/icoderoad/delayqueue/autoconfig/DelayAutoConfiguration.java

package com.icoderoad.delayqueue.autoconfig;


import com.icoderoad.delayqueue.cluster.Coordinator;
import com.icoderoad.delayqueue.cluster.impl.RedisCoordinator;
import com.icoderoad.delayqueue.config.DelayProperties;
import com.icoderoad.delayqueue.core.DistributedDelayQueue;
import com.icoderoad.delayqueue.storage.TaskStorage;
import com.icoderoad.delayqueue.storage.impl.RedisTaskStorage;
import org.redisson.api.RedissonClient;
import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;


/**
 * 自動(dòng)配置:當(dāng)用戶未提供自定義實(shí)現(xiàn)時(shí),使用默認(rèn) Redis 實(shí)現(xiàn)
 */
@Configuration
@EnableConfigurationProperties(DelayProperties.class)
public class DelayAutoConfiguration {


    @Bean
    @ConditionalOnMissingBean(Coordinator.class)
    public Coordinator coordinator() {
        return new RedisCoordinator();
    }


    @Bean
    @ConditionalOnMissingBean(TaskStorage.class)
    public TaskStorage taskStorage() {
        return new RedisTaskStorage();
    }


    @Bean
    public DistributedDelayQueue distributedDelayQueue() {
        return new DistributedDelayQueue();
    }
}

業(yè)務(wù)示例:訂單延時(shí)任務(wù) Executor

文件:/src/main/java/com/icoderoad/delayqueue/example/OrderTaskExecutor.java

package com.icoderoad.delayqueue.example;


import com.icoderoad.delayqueue.core.DelayTask;
import com.icoderoad.delayqueue.executor.DelayTaskExecutor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;


/**
 * 訂單業(yè)務(wù)延時(shí)任務(wù)示例
 */
@Component
@Slf4j
public class OrderTaskExecutor implements DelayTaskExecutor {


    @Override
    public void run(DelayTask delayedTask) {
        // 在這里實(shí)現(xiàn)訂單超時(shí)處理等業(yè)務(wù)
        log.info("OrderTaskExecutor running task: {}", delayedTask);
        // TODO: 調(diào)用訂單服務(wù),取消未支付訂單等
    }


    @Override
    public String queueName() {
        return "order";
    }
}

控制器示例:添加延時(shí)任務(wù)接口

文件:/src/main/java/com/icoderoad/delayqueue/example/DelayController.java

package com.icoderoad.delayqueue.example;


import com.icoderoad.delayqueue.core.DelayTask;
import com.icoderoad.delayqueue.core.DistributedDelayQueue;
import jakarta.annotation.Resource;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;


/**
 * 簡單示例接口:用于添加延時(shí)任務(wù)
 */
@RestController
@RequestMapping("/delay")
public class DelayController {


    @Resource
    private DistributedDelayQueue distributedDelayQueue;


    @PostMapping("/add-order")
    public String addOrderDelayTask(String id, int minutes) {
        long executeTime = System.currentTimeMillis() + minutes * 60L * 1000L;
        DelayTask task = new DelayTask("order", id, executeTime);
        distributedDelayQueue.addTask(task);
        return "ok";
    }
}

啟動(dòng)類

文件:/src/main/java/com/icoderoad/delayqueue/DelayQueueApplication.java

package com.icoderoad.delayqueue;


import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;


/**
 * 啟動(dòng)類
 */
@SpringBootApplication(scanBasePackages = "com.icoderoad")
public class DelayQueueApplication {


    public static void main(String[] args) {
        SpringApplication.run(DelayQueueApplication.class, args);
    }
}

部署與驗(yàn)證

  1. 準(zhǔn)備依賴:確保 Redis 可用,Redisson 配置正確(可在 application.yml 中配置 spring.redis,Redisson 自動(dòng)從 Spring Redis 連接中讀取或使用 Redisson 配置)。
  2. 啟動(dòng)兩個(gè)實(shí)例(或多個(gè)):
  • 在不同端口啟動(dòng)兩份服務(wù)(例如 --server.port=8080 和 --server.port=8081),觀察日志會(huì)打印注冊(cè)節(jié)點(diǎn)、心跳、拉取任務(wù)、任務(wù)分配等信息。
  1. 添加任務(wù):
  • 通過 POST /delay/add-order?id=order001&minutes=3 添加一個(gè) 3 分鐘后執(zhí)行任務(wù)。
  • 若任務(wù)的 executeTime 在下一次拉取周期內(nèi),會(huì)被立即放入本地 DelayQueue 以保證準(zhǔn)時(shí)觸發(fā)。
  1. 驗(yàn)證不重復(fù):
  • 當(dāng)兩個(gè)節(jié)點(diǎn)都加載了同一任務(wù)時(shí),executeTask 通過 Redisson 分布式鎖(DELAY_EXECUTING_KEY_PREFIX + queueTaskId)保證只有一個(gè)節(jié)點(diǎn)能拿到鎖并執(zhí)行。
  • executed 集合記錄執(zhí)行時(shí)間,若任務(wù)再次入隊(duì)(原因是業(yè)務(wù)修改執(zhí)行時(shí)間等),會(huì)根據(jù)記錄決定是否需要再次執(zhí)行。
  1. 節(jié)點(diǎn)下線重平衡:
  • 啟動(dòng)健康檢查后,如果某節(jié)點(diǎn)宕機(jī),其他節(jié)點(diǎn)會(huì)檢測(cè)到并將下線節(jié)點(diǎn)負(fù)責(zé)的任務(wù)遷移到自己(哈希分配判斷)。
  1. 生產(chǎn)事項(xiàng):
  • 為避免 Redis 單點(diǎn),使用 Redis 集群或主從 + 哨兵方案。
  • 可在 DelayAutoConfiguration 中替換 TaskStorage 為 MySQL / RocksDB 的實(shí)現(xiàn)以滿足海量數(shù)據(jù)場(chǎng)景。
  • 增加監(jiān)控(Prometheus)與告警(釘釘、企業(yè)微信)以便運(yùn)維。

結(jié)論與展望

我們重構(gòu)并完整實(shí)現(xiàn)了一個(gè)基于 Spring Boot + DelayQueue 的分布式延時(shí)任務(wù)組件,滿足生產(chǎn)級(jí)別的三個(gè)核心要求:不丟失、不重復(fù)、準(zhǔn)時(shí)觸發(fā)。實(shí)現(xiàn)要點(diǎn)包括:

  • 本地 DelayQueue + 周期性拉取:既保證實(shí)時(shí)性,也避免內(nèi)存堆積。
  • 持久化存儲(chǔ)(Redis ZSet):保證任務(wù)不會(huì)因服務(wù)重啟而丟失。
  • 協(xié)調(diào)服務(wù) + 健康檢查:保證節(jié)點(diǎn)注冊(cè)、心跳續(xù)期與任務(wù)自動(dòng)遷移。
  • 分布式鎖與執(zhí)行記錄:保證同一任務(wù)不會(huì)被多個(gè)節(jié)點(diǎn)重復(fù)執(zhí)行,并支持冪等校驗(yàn)。
  • 優(yōu)雅啟停與資源回收:支持線上擴(kuò)縮容和節(jié)點(diǎn)替換。

后續(xù)可做方向

  1. 監(jiān)控與可視化(高優(yōu)先級(jí))
  • 集成 Prometheus + Grafana,采集:待處理任務(wù)數(shù)、執(zhí)行速率、失敗率、節(jié)點(diǎn)數(shù)、延遲分布。
  • 開發(fā)簡單的任務(wù)管理 UI(Thymeleaf + Bootstrap),支持手動(dòng)重試、取消與詳情查看。
  1. 存儲(chǔ)層擴(kuò)展(中優(yōu)先級(jí))
  • 為海量任務(wù)提供分表 / 分區(qū)策略或持久化到 RocksDB、MySQL 分庫等方案。
  • 支持任務(wù)分頁查詢、歷史保留策略。
  1. 高級(jí)調(diào)度策略(中/低優(yōu)先級(jí))
  • 支持優(yōu)先級(jí)隊(duì)列、延時(shí)任務(wù)批量分發(fā)、延遲任務(wù)合并與分片算法優(yōu)化。
  1. 故障演練(高優(yōu)先級(jí))
  • 定期進(jìn)行 Chaos 測(cè)試(節(jié)點(diǎn)離線、網(wǎng)絡(luò)抖動(dòng)、Redis 宕機(jī))以驗(yàn)證邊界場(chǎng)景處理。
責(zé)任編輯:武曉燕 來源: 路條編程
相關(guān)推薦

2023-03-27 08:28:57

spring代碼,starter

2020-07-09 08:59:52

if else模板Service

2022-03-23 11:45:39

Quartz數(shù)據(jù)庫節(jié)點(diǎn)

2018-05-22 15:30:30

Python網(wǎng)絡(luò)爬蟲分布式爬蟲

2021-03-05 08:52:00

Celery在Windows分布式

2025-11-04 01:21:00

Spring分布式數(shù)據(jù)

2025-07-28 01:12:00

2025-02-19 08:00:00

SpringBootOllamaDeepSeek

2021-06-29 12:27:19

Spring BootCAS 登錄

2025-08-01 08:47:45

2024-02-19 00:00:00

分布式定時(shí)任務(wù)框架

2022-08-09 08:40:37

框架分布式定時(shí)任務(wù)

2025-08-27 00:00:00

ClaudeCodeAI生成工具

2018-05-09 09:44:51

Java分布式系統(tǒng)

2022-03-28 07:51:25

分布式定時(shí)任務(wù)

2011-05-03 15:59:00

黑盒打印機(jī)

2011-01-10 14:41:26

2025-05-07 00:31:30

2021-07-14 09:00:00

JavaFX開發(fā)應(yīng)用

2019-11-12 10:50:13

Spring BootstarterJava
點(diǎn)贊
收藏

51CTO技術(shù)棧公眾號(hào)

国产精品永久免费观看| 18欧美亚洲精品| 欧美精品18videos性欧美| 精品国产人妻一区二区三区| 松下纱荣子在线观看| 国产日产欧美精品一区二区三区| 成人在线免费观看视视频| 国产亚洲精品久久久久久无几年桃 | 国产成a人亚洲精v品无码| 影音先锋一区| 最近2019中文字幕一页二页| 在线播放av网址| 日韩免费小视频| 亚洲自拍偷拍网站| 久久久久天天天天| 国产特级黄色片| 日产欧产美韩系列久久99| 欧美老女人性生活| 国内精品久久国产| 国产高清中文字幕| 天天av综合| 亚洲摸下面视频| 麻豆传媒在线看| 在线国产成人影院| 午夜精品福利一区二区三区蜜桃| 亚洲制服欧美久久| 亚洲色图21p| 国产一区二区三区久久悠悠色av| 欧洲精品毛片网站| 亚洲国产综合久久| 欧美ab在线视频| 日韩在线精品一区| 国产高清一区二区三区四区| 91精品丝袜国产高跟在线| 欧美日韩精品一区二区三区蜜桃| 免费黄色日本网站| 丰满诱人av在线播放| 欧美韩国一区二区| 欧美极品jizzhd欧美| 刘亦菲久久免费一区二区| 狠狠色丁香婷综合久久| 国产精品亚洲自拍| 69视频免费看| 蜜乳av另类精品一区二区| 久久人91精品久久久久久不卡| 中文字幕在线有码| 久久视频精品| 中文字幕亚洲一区在线观看| 中文字幕国产专区| 亚洲另类春色校园小说| 亚洲国产精品久久久久| 超碰在线超碰在线| 日韩三级一区| 欧美二区三区的天堂| 中文字幕在线综合| 色8久久久久| 欧美日韩国产综合一区二区三区| 一级在线免费视频| 97人人做人人爽香蕉精品| 欧美在线免费观看视频| 国产又黄又猛又粗| 欧美黄页在线免费观看| 欧美日韩免费在线视频| 手机看片一级片| 亚洲欧洲二区| 日韩一区二区三区四区五区六区| 免费人成视频在线播放| 亚洲国产高清在线观看| 精品久久免费看| 中文字幕乱码一区| 亚洲免费专区| 最近2019年日本中文免费字幕 | 日本一区二区高清不卡| 中文字幕亚洲综合久久筱田步美| 久久国产高清视频| 欧美日韩国产欧| 91黄色8090| 午夜一级黄色片| 韩国成人福利片在线播放| 91天堂在线视频| 婷婷在线观看视频| 久久久不卡影院| 中文字幕久久综合| 丁香高清在线观看完整电影视频 | 久久成人免费观看| 欧美xxx性| 91精品欧美久久久久久动漫| 高清中文字幕mv的电影| 欧美**字幕| 久久久久北条麻妃免费看| 久久精品久久国产| 久久性天堂网| 亚洲japanese制服美女| 天天干天天操av| 日本一二三不卡| 免费cad大片在线观看| 这里有精品可以观看| 欧美日韩午夜影院| 国产国语老龄妇女a片| 国产一区二区三区电影在线观看| 超碰97人人做人人爱少妇| 一级片中文字幕| 国产美女在线精品| 欧美一卡2卡3卡4卡无卡免费观看水多多| 视频免费一区| 亚洲第一激情av| 女同激情久久av久久| 秋霞蜜臀av久久电影网免费| www.欧美免费| 亚洲av中文无码乱人伦在线视色| 国产精品一区二区在线观看不卡| 欧美一区二区三区四区夜夜大片| 中文字幕资源网在线观看| 色天使色偷偷av一区二区| 国产美女视频免费看| 影视先锋久久| 97在线观看视频国产| 91精品国产色综合久久不8| 91香蕉视频污| 18禁裸男晨勃露j毛免费观看 | 国产一区二区高清视频| 黄色av免费在线| 日本韩国欧美在线| 亚洲中文字幕一区| 韩国自拍一区| 91超碰rencao97精品| 日本高清在线观看wwwww色| 欧美日韩激情小视频| 佐佐木明希电影| 天天做天天爱天天综合网| 国产精品www| 精品一二三区视频| 日韩欧美亚洲成人| yy1111111| 一区二区三区国产在线| 国产精品.com| 蜜桃传媒在线观看免费进入| 欧美一区二区播放| 精品国产大片大片大片| 精品电影在线| 麻豆视频一区| 欧美日韩福利电影| 国产亲伦免费视频播放| 国产精品久久久久久久久搜平片 | 久久国产精品精品国产色婷婷| 国产精品久久麻豆| 欧美精三区欧美精三区| 极品尤物一区二区| 麻豆精品新av中文字幕| 翔田千里亚洲一二三区| 久久久www免费人成黑人精品| 日本ー区在线视频| 精品免费在线观看| 波多野结衣一二三区| 在线观看一区视频| 国产精品三区www17con| 99re6在线精品视频免费播放| 精品国产精品一区二区夜夜嗨| 欧美成人免费看| 大白屁股一区二区视频| 久艹视频在线免费观看| 国产三级精品三级在线观看国产| 97国产精品免费视频| 午夜在线观看视频18| 色综合婷婷久久| 欧美激情视频二区| 国产在线视频精品一区| 天堂а√在线中文在线| 成人18夜夜网深夜福利网| 97视频com| 麻豆导航在线观看| 欧美日韩国产中文| 青青草原在线免费观看视频| 成a人片亚洲日本久久| 国产成人黄色片| 成人免费在线播放| 91夜夜揉人人捏人人添红杏| 成年网站在线视频网站| 日韩精品免费在线| 中文字幕一区二区在线视频| 亚洲精品亚洲人成人网 | 波多野结衣在线观看一区| 亚洲国产精品av| 国产精品二区视频| 美女国产精品| 男同互操gay射视频在线看| 亚洲日本视频在线| 日韩av电影在线播放| 老司机99精品99| 亚洲国产婷婷香蕉久久久久久 | 欧美gv在线观看| 亚洲欧美日韩成人| av免费观看网址| 精品久久久久久久久中文字幕| av免费播放网站| 成人午夜激情片| 成年网站在线播放| 在线亚洲成人| 51xx午夜影福利| 国产成人短视频在线观看| 亚洲a∨日韩av高清在线观看| 欧美伦理91| 久久影院免费观看| 欧美扣逼视频| 精品久久久久久久久久久久久久久久久| 天码人妻一区二区三区在线看| 亚洲精品视频在线看| 国产人妻一区二区| 国v精品久久久网| 艹b视频在线观看| 亚洲永久在线| 17c丨国产丨精品视频| 日韩成人精品一区| 欧美精品欧美精品| 91久久精品无嫩草影院| 日韩免费黄色av| jizzjizz中国精品麻豆| 久久精品国产亚洲一区二区| 九色在线播放| 日韩福利视频在线观看| 成人h动漫精品一区二区无码| 欧美亚洲日本国产| av大片免费观看| 亚洲成人av在线电影| 丁香花五月激情| 国产精品天天看| 在线免费看黄视频| 91影院在线观看| 国产一线在线观看| 国产黄人亚洲片| 国产毛片久久久久久| 久久精品国产久精国产爱| 成人久久久久久久久| av成人激情| 九一国产精品视频| 亚洲电影av| 精品成在人线av无码免费看| 欧美欧美天天天天操| 黄色网址在线免费看| 色偷偷综合网| 一区二区视频在线播放| 成人在线国产| 亚洲国产精品日韩| 成人无号精品一区二区三区| 色综合影院在线观看| 凹凸成人精品亚洲精品密奴| 欧美日韩一区在线视频| 深爱激情综合| 视频一区不卡| 日韩在线观看| 性欧美18一19内谢| 欧美1区2区| 欧美一级视频在线播放| 91久久亚洲| 国产最新免费视频| 久久人人精品| 欧美黄色性生活| 国内精品久久久久影院色| 性久久久久久久久久久久久久| 国产一区二区三区免费播放| 亚洲成a人片在线www| av电影在线观看完整版一区二区| 一出一进一爽一粗一大视频| 久久综合五月天婷婷伊人| 精品人妻无码一区二区三区换脸| 欧美国产丝袜视频| 欧美三级黄色大片| 亚洲在线成人精品| 成年人免费高清视频| 在线日韩一区二区| 国产精品一品二区三区的使用体验| 欧美一区二区三区四区高清 | 电影一区二区三区久久免费观看| 5g影院天天爽成人免费下载| 精品欧美午夜寂寞影院| 欧美日韩精品中文字幕一区二区| 欧美一区2区| 日韩人妻一区二区三区蜜桃视频| 1024成人| 一区二区三区 欧美| 国产在线播精品第三| 艳妇乳肉豪妇荡乳xxx| 国产日韩av一区二区| 国产精品久久久久久久精| 亚洲成人免费av| 午夜视频网站在线观看| 日韩无一区二区| 日韩porn| 久久躁狠狠躁夜夜爽| 精品丝袜在线| 成人免费观看a| 欧美黑人做爰爽爽爽| 亚洲综合网中心| 亚洲欧洲午夜| 一区二区三区四区毛片| a在线欧美一区| 亚洲伦理一区二区三区| 欧美日韩国产精品| 国产理论片在线观看| 亚洲精品小视频| 污污视频在线看| 国产精品亚洲一区二区三区| 久久视频在线观看| 中文字幕一区二区三区有限公司| 99热这里只有精品8| 毛片毛片毛片毛| 久久久精品一品道一区| 久久久久成人精品无码| 欧美日韩一区二区三区免费看 | 久久久久久久久久久久电影| 欧美日韩免费观看一区| 国产精品av久久久久久麻豆网| 亚洲免费看av| wwww国产精品欧美| 国产亚洲精品码| 欧美高清性hdvideosex| 国产永久av在线| 欧美亚洲视频在线观看| 亚洲一区 二区| 色中文字幕在线观看| 日本视频中文字幕一区二区三区| 日韩aaaaa| 亚洲综合色区另类av| 91成人国产综合久久精品| 国产一区二区三区丝袜| 五月天国产在线| 国产高清在线一区| 午夜欧美精品| 日韩av影视大全| 国产精品成人免费| 亚洲黄网在线观看| 亚洲第一页在线| av有码在线观看| 国产精成人品localhost| 欧美日韩p片| 亚洲少妇一区二区| 亚洲乱码国产乱码精品精可以看| 91亚洲欧美激情| 少妇av一区二区三区| 99久久久国产精品免费调教网站| 欧美精品一区二区视频| 六月丁香综合| 成年人免费观看视频网站| 欧美日韩国产中文精品字幕自在自线| 丰满岳乱妇国产精品一区| 九九热精品视频国产| 亚洲日本va中文字幕| 97在线国产视频| 95精品视频在线| 69成人免费视频| 亚洲人成啪啪网站| 色综合一本到久久亚洲91| 色婷婷精品国产一区二区三区| 青青草国产精品97视觉盛宴| 99久久久无码国产精品不卡| 欧美日韩精品一区二区在线播放| 免费在线观看黄| 91在线观看免费网站| 欧美天堂亚洲电影院在线观看 | 99免费视频观看| 国产女主播一区| 国产又粗又长又黄| 欧美日韩福利视频| 欧美sss在线视频| 99re在线视频免费观看| 国产嫩草影院久久久久| 6—12呦国产精品| 欧美日韩成人精品| 欧亚精品一区| 91n.com在线观看| 亚洲同性同志一二三专区| 精品国产av一区二区| 午夜美女久久久久爽久久| 九九在线精品| 亚洲免费黄色网| 亚洲午夜久久久久中文字幕久| 亚洲色大成网站www| 国产精品自产拍在线观看| 欧美a级在线| 91av在线免费| 欧美日韩精品一区二区三区蜜桃 | 国产精品日韩在线播放| 夜间精品视频| 网站免费在线观看| 欧美色偷偷大香| av中文在线资源库| 性欧美精品一区二区三区在线播放 | 欧美午夜电影在线播放| yellow91字幕网在线| 久久精品午夜一区二区福利| 美女视频黄a大片欧美| 国产一级生活片| 伊人激情综合网| 99久久免费精品国产72精品九九| 无码无遮挡又大又爽又黄的视频| 亚洲欧洲在线观看av| 午夜视频在线播放| 亚洲在线免费视频| 日韩av网站免费在线| 国产一级淫片a|