精品欧美一区二区三区在线观看 _久久久久国色av免费观看性色_国产精品久久在线观看_亚洲第一综合网站_91精品又粗又猛又爽_小泽玛利亚一区二区免费_91亚洲精品国偷拍自产在线观看 _久久精品视频在线播放_美女精品久久久_欧美日韩国产成人在线

Spring Boot 中 MQTT 的高級用法:從可靠性到精細化管控

開發 前端
MQTT的核心優勢之一是通過QoS(Quality of Service)級別保障消息可靠性,但不同QoS的實現邏輯與資源消耗差異顯著。在項目中,需結合業務場景選擇合適的QoS,并處理好消息重發、重復消息去重等問題。

引言

在基礎的MQTT集成(發布/訂閱)之上,實際生產場景往往對消息可靠性、連接穩定性、權限安全性、消息處理效率有更高要求。

本文將圍繞Spring Boot環境,深入講解MQTT的高級用法,包括QoS級別實戰、遺囑消息、持久化配置、主題過濾與權限控制、消息積壓處理等。

一、消息可靠性進階:QoS 級別實戰與消息重發機制

MQTT的核心優勢之一是通過QoS(Quality of Service)級別保障消息可靠性,但不同QoS的實現邏輯與資源消耗差異顯著。在項目中,需結合業務場景選擇合適的QoS,并處理好消息重發、重復消息去重等問題。

1.1 QoS 級別深度解析(對比與場景選型)

MQTT定義3級QoS,需根據 “數據重要性”“網絡穩定性”“資源成本” 綜合選型:

QoS 級別

核心邏輯

適用場景

資源消耗

QoS 0(最多一次)

客戶端發送消息后不等待確認,消息可能丟失或重復

非關鍵數據(如設備心跳、實時日志)

最低(無重發、無確認)

QoS 1(至少一次)

客戶端發送消息后等待服務端確認(PUBACK),未確認則重發;消息可能重復

關鍵數據但允許重復處理(如傳感器數據、訂單狀態通知)

中等(需存儲消息、處理重發)

QoS 2(恰好一次)

客戶端與服務端通過“四步握手”(PUBLISH→PUBREC→PUBREL→PUBCOMP)確保消息僅送達一次

核心數據(如支付指令、設備控制指令)

最高(需雙向確認、存儲消息狀態)

1.2 Spring Boot 中 QoS 2 的實現與重復消息處理

QoS 2雖能保障恰好一次,但服務端或客戶端異常時仍可能出現重復消息(如網絡延遲導致的重發)。需在項目中通過消息ID去重機制解決。

(1)QoS 2 的發布與訂閱配置

// 1. 發布端:使用QoS 2發布消息(核心是設置setQos(2))
@Service
public class HighReliabilityPublisher {
    @Resource
    private MqttClient mqttClient;
    // 存儲已處理的消息ID(用于去重,key:消息ID,value:處理時間)
    private final ConcurrentHashMap<Integer, Long> processedMsgIds = new ConcurrentHashMap<>();
    // 消息ID過期時間(如5分鐘,避免內存溢出)
    private static final long MSG_ID_EXPIRE_TIME = 5 * 60 * 1000;

    public void publishWithQos2(String topic, String payload) throws MqttException {
        if (!mqttClient.isConnected()) {
            mqttClient.reconnect();
        }
        MqttMessage message = new MqttMessage(payload.getBytes());
        message.setQos(2); // 關鍵:設置QoS 2
        // 發布消息并獲取消息ID(用于后續去重)
        IMqttDeliveryToken token = mqttClient.publish(topic, message);
        token.waitForCompletion(3000); // 等待發布完成(超時3秒)
        System.out.println("QoS 2消息發布成功,消息ID:" + token.getMessageId());
    }

    // 2. 訂閱端:處理QoS 2消息并去重
    @PostConstruct
    public void subscribeWithQos2() throws MqttException {
        mqttClient.setCallback(new MqttCallbackExtended() { // 用MqttCallbackExtended增強回調
            @Override
            public void connectComplete(boolean reconnect, String serverURI) {
                // 連接完成回調(重連后可重新訂閱)
                try {
                    mqttClient.subscribe("critical/control/#", 2); // 訂閱QoS 2
                } catch (MqttException e) {
                    e.printStackTrace();
                }
            }

            @Override
            public void messageArrived(String topic, MqttMessage message) throws Exception {
                int msgId = message.getId(); // 獲取消息ID(QoS 1/2才有)
                // 1. 先清理過期的消息ID(避免內存泄漏)
                long currentTime = System.currentTimeMillis();
                processedMsgIds.entrySet().removeIf(entry -> currentTime - entry.getValue() > MSG_ID_EXPIRE_TIME);
                
                // 2. 檢查消息ID是否已處理(去重)
                if (processedMsgIds.containsKey(msgId)) {
                    System.out.println("重復消息,已忽略,消息ID:" + msgId);
                    return;
                }

                // 3. 處理消息(如設備控制指令)
                String payload = new String(message.getPayload());
                System.out.println("處理QoS 2消息,ID:" + msgId + ",內容:" + payload);
                // 模擬處理邏輯(如控制工業設備停機)
                processControlCommand(payload);

                // 4. 標記消息ID為已處理
                processedMsgIds.put(msgId, currentTime);
            }

            @Override
            public void deliveryComplete(IMqttDeliveryToken token) {}

            @Override
            public void connectionLost(Throwable cause) {}
        });
    }

    private void processControlCommand(String payload) {
        // 實際業務邏輯:如解析指令、調用設備API
    }
}

(2)關鍵注意點

  • 消息ID的作用:QoS 1/2的消息會攜帶唯一ID(message.getId()),是去重的核心依據;QoS 0無消息ID,無法去重。
  • 內存與過期策略:使用ConcurrentHashMap存儲已處理的消息ID,并定期清理過期數據(如5分鐘),避免內存溢出。
  • 回調選擇:使用MqttCallbackExtended替代基礎的MqttCallback,新增connectComplete回調,支持重連后自動重新訂閱。

二、連接穩定性增強:遺囑消息、自動重連與斷線恢復

在物聯網、工業監控等場景中,設備或客戶端可能因網絡波動斷開連接。需通過遺囑消息(Last Will and Testament) 、自動重連、斷線后消息恢復等機制,保障系統穩定性。

2.1 遺囑消息:異常斷開的 “狀態通知”

當客戶端異常斷開(如斷電、網絡中斷),服務端會自動向預設的遺囑主題發布消息,通知其他訂閱者該客戶端的離線狀態。

(1)Spring Boot 中配置遺囑消息

@Configuration
public class MqttAdvancedConfig {
    @Value("${spring.mqtt.broker-url}")
    private String brokerUrl;
    @Value("${spring.mqtt.client-id}")
    private String clientId;

    @Bean
    public MqttClient mqttClient() throws MqttException {
        MqttConnectOptions options = new MqttConnectOptions();
        options.setUserName("admin");
        options.setPassword("123456".toCharArray());
        options.setAutomaticReconnect(true); // 開啟自動重連
        options.setConnectionTimeout(3000);
        options.setKeepAliveInterval(60);

        // 核心:配置遺囑消息
        String willTopic = "client/status/" + clientId; // 遺囑主題(包含客戶端ID,便于識別)
        String willPayload = "{\"clientId\":\"" + clientId + "\",\"status\":\"offline\"}"; // 遺囑內容(離線狀態)
        options.setWill(willTopic, willPayload.getBytes(), 1, false); // QoS 1,不保留消息

        // 禁用Clean Session(關鍵:確保斷線后重連能恢復訂閱和未處理消息)
        options.setCleanSession(false);

        MqttClient client = new MqttClient(brokerUrl, clientId, new MemoryPersistence());
        client.setCallback(new MqttCallbackExtended() {
            @Override
            public void connectComplete(boolean reconnect, String serverURI) {
                if (reconnect) {
                    System.out.println("客戶端重連成功,服務器地址:" + serverURI);
                    // 重連后發布“在線”狀態(與遺囑消息的“離線”對應)
                    try {
                        String onlinePayload = "{\"clientId\":\"" + clientId + "\",\"status\":\"online\"}";
                        client.publish(willTopic, onlinePayload.getBytes(), 1, false);
                    } catch (MqttException e) {
                        e.printStackTrace();
                    }
                } else {
                    System.out.println("客戶端首次連接成功");
                }
            }

            @Override
            public void messageArrived(String topic, MqttMessage message) throws Exception {
                // 處理消息
            }

            @Override
            public void deliveryComplete(IMqttDeliveryToken token) {}

            @Override
            public void connectionLost(Throwable cause) {
                System.out.println("客戶端連接丟失:" + cause.getMessage());
            }
        });

        client.connect(options);
        // 首次連接成功后發布“在線”狀態
        String onlinePayload = "{\"clientId\":\"" + clientId + "\",\"status\":\"online\"}";
        client.publish(willTopic, onlinePayload.getBytes(), 1, false);
        return client;
    }
}

(2)遺囑消息的核心配置參數

  • setWill(topic, payload, qos, retained):
  • topic:遺囑主題(需明確,如client/status/device-001);
  • payload:遺囑內容(JSON格式便于解析,包含客戶端ID、狀態等);
  • qos:遺囑消息的QoS級別(建議1,確保通知可靠);
  • retained:是否保留遺囑消息(建議false,避免新訂閱者誤判狀態)。
  • setCleanSession(false):禁用 “清除會話”,服務端會保留客戶端的訂閱關系和未處理的QoS 1/2消息,重連后可恢復。

2.2 自動重連與斷線恢復

通過MqttConnectOptions.setAutomaticReconnect(true)開啟自動重連,但需注意:

  • 重連觸發條件:網絡恢復、服務端重啟后,客戶端會自動嘗試重連(默認重試間隔遞增,如 1 秒、2 秒、4 秒...);
  • 重連后的操作:在connectComplete回調中處理重連后的邏輯(如重新發布在線狀態、重新訂閱動態主題);
  • 消息恢復:僅當setCleanSession(false)且QoS≥1時,重連后服務端會補發客戶端斷線期間未確認的消息。

三、主題精細化管控:通配符、動態訂閱與權限控制

MQTT 的主題支持通配符,結合Spring Boot的權限管理,可實現 “按角色訂閱主題”“動態創建主題” 等精細化管控需求。

3.1 主題通配符實戰(單層 / 多層匹配)

MQTT支持兩種通配符,需在訂閱時合理使用:

  • +:單層通配符(匹配一個層級,如home/+/status可匹配home/lighting/status、home/aircon/status);
  • #:多層通配符(匹配所有子層級,需放在最后,如home/#可匹配home/lighting/status、home/aircon/control)。

(1)Spring Boot 中使用通配符訂閱

@Service
public class TopicWildcardService {
    @Resource
    private MqttClient mqttClient;

    // 訂閱“家庭所有設備的狀態”(多層通配符#)
    public void subscribeHomeAllStatus() throws MqttException {
        String topic = "home/#";
        mqttClient.subscribe(topic, 1, (topic1, message) -> {
            // 根據不同子主題處理消息
            if (topic1.startsWith("home/lighting/")) {
                processLightingMessage(new String(message.getPayload()));
            } elseif (topic1.startsWith("home/aircon/")) {
                processAirconMessage(new String(message.getPayload()));
            }
        });
        System.out.println("已訂閱家庭所有設備主題:" + topic);
    }

    // 訂閱“所有房間的燈光狀態”(單層通配符+)
    public void subscribeAllRoomLightingStatus() throws MqttException {
        String topic = "home/+/lighting/status"; // 匹配home/bedroom/lighting/status、home/living/lighting/status
        mqttClient.subscribe(topic, 1, (topic1, message) -> {
            // 解析房間名稱(從主題中提取,如“bedroom”)
            String room = topic1.split("/")[1];
            String payload = new String(message.getPayload());
            System.out.println("房間【" + room + "】燈光狀態:" + payload);
        });
        System.out.println("已訂閱所有房間燈光狀態主題:" + topic);
    }

    private void processLightingMessage(String payload) {}
    private void processAirconMessage(String payload) {}
}

3.2 動態訂閱與主題權限控制

在多租戶、多角色系統中,需根據用戶角色動態分配訂閱權限(如 “管理員可訂閱所有設備,普通用戶僅訂閱自己的設備”)。

(1)基于 Spring Security 的主題權限控制

@Service
public class DynamicSubscriptionService {
    @Resource
    private MqttClient mqttClient;
    @Resource
    private UserDetailsService userDetailsService;

    // 根據用戶角色動態訂閱主題
    public void subscribeByUserRole(String username) throws MqttException {
        // 1. 獲取用戶角色(如ADMIN、USER)
        UserDetails user = userDetailsService.loadUserByUsername(username);
        Set<String> roles = user.getAuthorities().stream()
                .map(GrantedAuthority::getAuthority)
                .collect(Collectors.toSet());

        // 2. 根據角色訂閱不同主題
        if (roles.contains("ROLE_ADMIN")) {
            // 管理員:訂閱所有設備主題
            mqttClient.subscribe("device/#", 1);
            System.out.println("管理員【" + username + "】已訂閱所有設備主題");
        } elseif (roles.contains("ROLE_USER")) {
            // 普通用戶:僅訂閱自己的設備(主題包含用戶名,如“device/user123/+/status”)
            String userTopic = "device/" + username + "/+/status";
            mqttClient.subscribe(userTopic, 1);
            System.out.println("用戶【" + username + "】已訂閱個人設備主題:" + userTopic);
        }
    }

    // 動態發布主題(僅允許發布自己的設備控制指令)
    public void publishByUserRole(String username, String deviceId, String command) throws MqttException {
        // 校驗權限:僅允許發布自己設備的控制主題
        String allowedTopic = "device/" + username + "/" + deviceId + "/control";
        MqttMessage message = new MqttMessage(command.getBytes());
        message.setQos(2); // 控制指令用QoS 2確保可靠
        mqttClient.publish(allowedTopic, message);
        System.out.println("用戶【" + username + "】向設備【" + deviceId + "】發送指令:" + command);
    }
}

(2)服務端權限配合(以 Mosquitto 為例)

客戶端權限控制需服務端配合,如Mosquitto可通過acl_file配置主題訪問權限:

# Mosquitto的acl.conf配置
user admin
topic write device/#
topic read device/#

user user123
topic write device/user123/+/control
topic read device/user123/+/status

客戶端需在MqttConnectOptions中設置用戶名/密碼,服務端驗證通過后才允許訂閱/發布。

四、消息處理優化:異步消費、積壓處理與批量發布

當消息量較大(如每秒數百條)時,需通過異步消費、消息積壓監控、批量發布等機制優化處理效率,避免客戶端阻塞。

4.1 異步消費消息(線程池隔離)

基礎的messageArrived回調運行在 MQTT 客戶端的線程中,若處理邏輯耗時(如數據庫寫入、API調用),會導致消息堆積。需通過線程池異步處理消息。

@Service
public class AsyncMessageHandler {
    // 1. 定義線程池(核心參數需結合業務場景調優)
    private final ExecutorService messageExecutor;

    // 構造方法:初始化線程池(避免硬編碼,支持參數化配置)
    public AsyncMessageHandler() {
        // 線程工廠:設置線程名稱,便于日志排查
        ThreadFactory threadFactory = new ThreadFactoryBuilder()
                .setNameFormat("mqtt-message-handler-%d") // 線程名格式:mqtt-message-handler-1、2...
                .setDaemon(true) // 設為守護線程:JVM退出時自動關閉,避免阻塞應用 shutdown
                .build();

        // 線程池核心參數說明:
        // corePoolSize:核心線程數(默認活躍的線程數,即使空閑也不銷毀)
        // maximumPoolSize:最大線程數(核心線程不夠時,最多再創建的線程數)
        // keepAliveTime:非核心線程空閑超時時間(超時后銷毀)
        // workQueue:任務隊列(核心線程滿時,任務暫存的隊列)
        // handler:任務拒絕策略(隊列滿+最大線程數滿時,如何處理新任務)
        this.messageExecutor = new ThreadPoolExecutor(
                8, // corePoolSize:根據CPU核心數/消息量調整(如8核CPU設為8)
                16, // maximumPoolSize:核心線程的2倍,避免線程過多導致上下文切換
                30, // keepAliveTime:30秒(非核心線程空閑30秒后銷毀)
                TimeUnit.SECONDS,
                new ArrayBlockingQueue<>(2048), // 有界隊列:避免無界隊列導致內存溢出(容量設為2048)
                threadFactory,
                new ThreadPoolExecutor.CallerRunsPolicy() // 拒絕策略:調用者(MQTT客戶端線程)自己執行任務,避免消息丟失
        );
    }

    @Autowired
    private MqttClient mqttClient;

    @Autowired
    private JdbcTemplate jdbcTemplate; // 模擬耗時操作:數據庫寫入

    // 2. 初始化MQTT回調,綁定異步消費邏輯
    @PostConstruct
    public void initAsyncHandler() throws MqttException {
        // 訂閱需要異步處理的主題(如傳感器數據主題,消息量較大)
        String targetTopic = "sensor/data/#";
        mqttClient.subscribe(targetTopic, 1, (topic, message) -> {
            // 3. 將消息處理邏輯提交到線程池異步執行
            submitAsyncTask(topic, message);
        });
        System.out.println("已訂閱主題【" + targetTopic + "】,啟用異步消費(線程池隔離)");
    }

    // 封裝異步任務提交邏輯(含異常捕獲)
    private void submitAsyncTask(String topic, MqttMessage message) {
        try {
            // 提交任務到線程池
            messageExecutor.submit(() -> {
                try {
                    // 4. 實際消息處理邏輯(如解析數據、寫入數據庫)
                    processMessage(topic, message);
                } catch (Exception e) {
                    // 5. 異常捕獲:避免單個任務異常導致線程池線程銷毀
                    System.err.println("異步處理消息失敗,主題:" + topic + ",異常信息:" + e.getMessage());
                    // 可選:失敗重試(需限制重試次數,避免死循環)
                    retryFailedTask(topic, message, 3); // 最多重試3次
                }
            });
        } catch (RejectedExecutionException e) {
            // 6. 捕獲任務拒絕異常(隊列滿+最大線程數滿時觸發)
            System.err.println("線程池任務隊列已滿,無法接收新消息,主題:" + topic + ",異常信息:" + e.getMessage());
            // 可選:將消息存入本地磁盤/Redis,后續重試(避免消息丟失)
            saveFailedMessageToLocal(topic, message);
        }
    }

    // 實際消息處理邏輯(模擬耗時操作:解析+數據庫寫入)
    private void processMessage(String topic, MqttMessage message) throws Exception {
        // 步驟1:解析消息(如傳感器ID、采集時間、數值)
        String payload = new String(message.getPayload(), "UTF-8");
        String[] topicParts = topic.split("/");
        String sensorId = topicParts[2]; // 從主題提取傳感器ID(如主題“sensor/data/sensor-001”)
        long timestamp = System.currentTimeMillis(); // 模擬采集時間

        // 步驟2:耗時操作:寫入數據庫(模擬200ms耗時)
        String sql = "INSERT INTO sensor_data (sensor_id, payload, collect_time, create_time) " +
                     "VALUES (?, ?, ?, NOW())";
        jdbcTemplate.update(sql, sensorId, payload, new java.sql.Timestamp(timestamp));

        // 日志記錄(可選:通過MDC添加追蹤ID,便于鏈路排查)
        System.out.println("異步處理完成,傳感器ID:" + sensorId + ",消息內容:" + payload + ",線程名:" + Thread.currentThread().getName());
    }

    // 失敗重試邏輯(限制重試次數)
    private void retryFailedTask(String topic, MqttMessage message, int remainingRetries) {
        if (remainingRetries <= 0) {
            System.err.println("消息重試次數耗盡,主題:" + topic + ",內容:" + new String(message.getPayload()));
            // 重試耗盡:存入失敗隊列,人工排查
            saveFailedMessageToLocal(topic, message);
            return;
        }

        try {
            // 重試間隔:指數退避(1s、2s、4s...),避免頻繁重試壓垮服務
            long retryDelay = (long) Math.pow(2, 3 - remainingRetries) * 1000;
            Thread.sleep(retryDelay);

            // 重新提交任務
            messageExecutor.submit(() -> {
                try {
                    processMessage(topic, message);
                    System.out.println("消息重試成功,剩余次數:" + (remainingRetries - 1) + ",主題:" + topic);
                } catch (Exception e) {
                    System.err.println("消息重試失敗,剩余次數:" + (remainingRetries - 1) + ",主題:" + topic);
                    retryFailedTask(topic, message, remainingRetries - 1); // 遞歸重試
                }
            });
        } catch (InterruptedException | RejectedExecutionException e) {
            System.err.println("重試任務提交失敗,剩余次數:" + (remainingRetries - 1) + ",主題:" + topic);
            retryFailedTask(topic, message, remainingRetries - 1);
        }
    }

    // 保存失敗消息到本地(模擬:實際可存入Redis/磁盤文件)
    private void saveFailedMessageToLocal(String topic, MqttMessage message) {
        // 簡化實現:實際需序列化消息,存入持久化存儲(如Redis的list)
        String failedMsg = "topic:" + topic + ", payload:" + new String(message.getPayload()) + ", timestamp:" + System.currentTimeMillis();
        System.out.println("保存失敗消息到本地:" + failedMsg);
        // 示例:jdbcTemplate.update("INSERT INTO mqtt_failed_msg (content, create_time) VALUES (?, NOW())", failedMsg);
    }

    // 7. 應用關閉時,優雅關閉線程池(避免任務丟失)
    @PreDestroy
    public void shutdownExecutor() {
        System.out.println("開始關閉MQTT消息消費線程池...");
        messageExecutor.shutdown(); // 禁止接收新任務
        try {
            // 等待已提交的任務執行完成(最多等待60秒)
            if (!messageExecutor.awaitTermination(60, TimeUnit.SECONDS)) {
                messageExecutor.shutdownNow(); // 超時未完成,強制關閉
                System.out.println("線程池強制關閉,可能存在未完成任務");
            } else {
                System.out.println("線程池優雅關閉完成");
            }
        } catch (InterruptedException e) {
            messageExecutor.shutdownNow();
            Thread.currentThread().interrupt(); // 恢復中斷狀態
        }
    }
}

責任編輯:武曉燕 來源: 一安未來
相關推薦

2011-03-18 14:38:25

云數據精細化流控

2009-07-21 10:08:42

綠色無線網絡網絡精細化建設

2010-09-01 11:32:07

無線網絡

2013-03-11 15:14:53

網絡虛擬化企業網絡運維IP網絡技術

2014-11-12 09:05:49

2013-05-14 13:36:01

華為校園網絡網絡架構

2010-12-28 19:50:21

可靠性產品可靠性

2023-07-05 08:36:24

2009-12-17 10:58:38

代理路由器設置步驟

2014-04-22 10:00:09

手游數據分析精細化運營

2017-11-02 14:23:04

易觀方舟數據分析

2014-05-09 10:03:09

上方花園

2009-01-04 16:50:54

2014-08-08 16:56:13

APP精細化運營

2011-06-09 17:43:57

佳能復印機

2009-12-03 09:43:54

2019-08-30 12:10:05

磁盤數據可靠性RAID

2020-12-06 14:51:23

物聯網可靠性IOT

2010-12-28 19:55:20

軟件架構可靠性

2017-10-26 13:02:11

大數據人民法院審判
點贊
收藏

51CTO技術棧公眾號

波多野结衣家庭教师在线播放| 91精品国产综合久久久久久久久| 亚洲观看黄色网| 久久uomeier| 亚洲视频在线一区观看| 国产精品国模大尺度私拍| 黄瓜视频在线免费观看| 日韩啪啪电影网| 日韩免费性生活视频播放| 91九色在线观看视频| av网站大全在线观看| 国产一区二区三区四区五区美女| 国内精品视频久久| 亚洲一级黄色录像| 懂色av一区二区| 欧美日韩精品三区| 黄色一级片播放| 免费成人黄色| 久久精品一区四区| 成人av免费电影| 中文字幕在线播放不卡| 99视频精品| 欧美成人黄色小视频| 国产熟妇搡bbbb搡bbbb| 亚洲码欧美码一区二区三区| 欧洲一区在线观看| 亚洲熟妇无码一区二区三区| 欧美jizz18hd性欧美| 2欧美一区二区三区在线观看视频| 国产女人精品视频| av大片在线免费观看| 欧美成人亚洲| 色婷婷综合久久久久中文字幕1| 亚洲精品一二三四| 91九色综合| 五月婷婷久久综合| 欧美一级爱爱视频| 黄色成人在线| 国产精品理论在线观看| 欧美日韩亚洲在线| 欧洲成人av| av在线不卡观看免费观看| 亚洲曰本av电影| 91激情在线观看| 日本午夜一本久久久综合| 538国产精品一区二区在线| 欧美在线视频第一页| 欧美gayvideo| 最近2019免费中文字幕视频三| 粉嫩av蜜桃av蜜臀av| 欧美绝顶高潮抽搐喷水合集| 精品国产污网站| 久久久精品人妻一区二区三区| 国产精品毛片无码| 91精品国产综合久久久蜜臀图片| 亚洲一级片网站| 国产精品99精品一区二区三区∴| 欧美综合一区二区三区| 亚洲 中文字幕 日韩 无码| 日韩精品av| 日韩欧美在线播放| 久久久久久久久久久久久久国产| 午夜伦理福利在线| 精品国产乱码久久久久酒店 | 国产精品久久久久久网站| 中文字幕超碰在线| 久久午夜精品一区二区| 青青久久aⅴ北条麻妃| 国产午夜精品久久久久| 日韩和欧美一区二区| 国产精品免费视频久久久| 在线免费观看一区二区| 国内久久婷婷综合| 国产91一区二区三区| 四虎永久在线观看| 久久久久久一二三区| 日本在线免费观看一区| 欧美a免费在线| 夜夜爽夜夜爽精品视频| 国精产品一区一区三区视频| 黄色成人免费网| 欧美日韩国产系列| 麻豆tv在线观看| 亚洲婷婷伊人| 久久精品视频一| 日韩成人高清视频| 日本不卡视频一二三区| 99精品欧美一区二区三区| 五月婷婷六月丁香| 国产欧美日韩综合精品一区二区| 国产成年人在线观看| 国产高清在线a视频大全| 色综合视频在线观看| 制服丝袜中文字幕第一页| 97一区二区国产好的精华液| 亚洲欧美成人网| 日韩女优一区二区| 欧美亚洲在线| 亚洲一区久久久| 毛片在线能看| 亚洲福利视频一区| 欧美女同在线观看| 欧洲亚洲一区二区三区| 日韩性生活视频| 日本视频免费在线| 狠狠色丁香婷综合久久| 麻豆精品传媒视频| 色女人在线视频| 欧美少妇性性性| 久久久久9999| 小小影院久久| 国产精品99久久久久久人| 丰满人妻一区二区三区免费视频 | 久久久久久久极品内射| 日韩精品一二三| 成人动漫在线视频| 欧美成人三区| 欧美偷拍一区二区| 黄色国产在线观看| 欧美日一区二区在线观看 | 6—12呦国产精品| 久久综合国产精品| 久久av高潮av| 国产精品国产亚洲精品| 中文日韩在线观看| 亚洲AV无码成人精品区东京热| 国产精品一区二区黑丝| 一区二区三区四区| 91看片一区| 亚洲人成网站免费播放| 日韩精品一区三区| 成人一区在线观看| 法国空姐在线观看免费| 福利一区二区三区视频在线观看| 亚洲精品在线观看www| 国产精品成人网站| 国产91丝袜在线观看| 久久久久久久免费视频| 91成人app| 久久精品电影一区二区| 中文字幕欧美色图| 国产午夜精品在线观看| 欧美亚洲国产成人| 欧美美女啪啪| 国产成人av在线| 国产在线电影| 欧美性xxxxxxxx| www成人啪啪18软件| 免费观看在线色综合| 日韩久久精品一区二区三区| 国产精品久久久久久久久免费高清 | 亚洲国产你懂的| 成人做爰69片免费| 伊人久久综合| 精品无人乱码一区二区三区的优势 | 偷拍亚洲精品| 欧美专区在线播放| 麻豆导航在线观看| 欧美亚日韩国产aⅴ精品中极品| 91精品小视频| 可以免费看不卡的av网站| 日韩电影大全在线观看| abab456成人免费网址| 一区二区三区黄色| 国产喷水福利在线视频| 亚洲综合色噜噜狠狠| 美女黄色一级视频| 老司机午夜精品视频在线观看| 视频一区二区三| 91麻豆精品| 久久久久久亚洲| 欧洲亚洲精品视频| 欧美精品第1页| 黄色一级视频免费| 91香蕉视频污| 国产高潮免费视频| 午夜日韩视频| 免费成人深夜夜行视频| 成人在线免费电影网站| 欧美伦理91i| 人成在线免费视频| 欧美日本一区二区在线观看| 久久免费视频6| 国产亚洲综合在线| 佐山爱在线视频| 免费一区视频| 日本一区二区三区四区五区六区| 啪啪激情综合网| 91精品在线国产| 热三久草你在线| 精品国产自在精品国产浪潮| 欧美一级片免费| 欧美午夜精品理论片a级按摩| 日韩一区二区三区四区在线| 国产视频911| 久久久久99人妻一区二区三区| 丝袜美腿高跟呻吟高潮一区| 欧美精品一区二区性色a+v| 理论片一区二区在线| 91精品久久久久| 678在线观看视频| 精品激情国产视频| 国产在线视频网| 亚洲精美色品网站| 国产xxxx在线观看| 在线观看免费亚洲| 日本中文字幕在线免费观看| 亚洲欧洲色图综合| 欧美图片第一页| 懂色av一区二区夜夜嗨| 污色网站在线观看| 欧美一级视频| 日韩视频在线视频| 最新欧美人z0oozo0| 亚洲国产精品日韩| 国产成人短视频在线观看| 国产精品成人一区二区三区| 性欧美video另类hd尤物| 日本午夜精品理论片a级appf发布| 一色桃子av在线| 最近2019中文免费高清视频观看www99 | 日韩久久一级片| 亚洲激情一区| 激情五月六月婷婷| 中文无码久久精品| 一区二区三视频| 精品一区二区三| 欧美精品一区二区三区四区五区| 激情小说一区| 国产精品一区二区不卡视频| 久久69av| 亚洲影视九九影院在线观看| 久久人体av| 国产精品偷伦一区二区| 欧美成a人片在线观看久| 欧美专区在线播放| 国偷自产一区二区免费视频| 欧美在线视频一区二区| 蜜桃视频www网站在线观看| 色综合久久精品亚洲国产| av在线影院| 美女国内精品自产拍在线播放| 九七电影韩国女主播在线观看| 久久激情五月丁香伊人| 国产原厂视频在线观看| 久久九九免费视频| a级片国产精品自在拍在线播放| 超碰91人人草人人干| caopen在线视频| 精品中文字幕在线| 国产蜜臀一区二区打屁股调教| 九九热这里只有精品免费看| 婷婷丁香在线| 高清欧美性猛交xxxx| 免费毛片b在线观看| 欧美在线一区二区视频| 偷拍视频一区二区三区| 国产精品自拍偷拍视频| 91视频亚洲| 官网99热精品| 亚欧日韩另类中文欧美| 欧美精品成人一区二区在线观看| 国产伦精品一区二区三区视频| 亚洲v欧美v另类v综合v日韩v| 国产精品精品| 欧美这里只有精品| 久久福利精品| 亚洲色图 在线视频| 国产资源在线一区| 精品国产aⅴ一区二区三区东京热 久久久久99人妻一区二区三区 | 天堂中文在线资源| 亚洲欧美另类人妖| 日本视频在线观看| 欧美俄罗斯乱妇| 自由日本语热亚洲人| 国产精品一区二区久久| 欧美国产亚洲精品| 精品中文字幕人| 欧美成免费一区二区视频| xxxxxx在线观看| 另类天堂av| 色偷偷中文字幕| 26uuu色噜噜精品一区| 国产福利在线导航| 亚洲一区二区三区在线播放| 精品久久久久久久久久久久久久久久久久 | 爽爽爽爽爽爽爽成人免费观看| а√中文在线8| 欧美一级淫片播放口| 日韩黄色三级在线观看| 国产高清一区视频| 成人无号精品一区二区三区| 女人色极品影院| 日韩不卡手机在线v区| 韩国三级与黑人| 国产欧美视频在线观看| 久久久久久国产精品视频| 欧美色偷偷大香| 少妇喷水在线观看| 大胆欧美人体视频| 免费欧美电影| 精品国产乱码久久久久软件| 婷婷综合久久| 精品国产成人av在线免| 国产91富婆露脸刺激对白| 国产精品酒店视频| 天天综合天天综合色| 国产剧情久久久| 在线看日韩av| 天堂资源在线| 国产一区二区免费电影| 亚洲乱码在线| 亚洲最大综合网| 久久日韩粉嫩一区二区三区| 九九免费精品视频| 欧美久久久久免费| 免费黄网站在线观看| 久久乐国产精品| 亚洲啊v在线免费视频| 制服诱惑一区| 日韩av网站在线观看| 波多野结衣福利| 精品久久香蕉国产线看观看亚洲 | 色777狠狠综合秋免鲁丝| 在线视频cao| 国产精品区一区| 欧美日韩国内| 手机精品视频在线| 国产精品家庭影院| 中文字幕+乱码+中文乱码91| 亚洲精选中文字幕| 一区二区精品伦理...| 国模精品一区二区三区| 在线成人h网| 成熟妇人a片免费看网站| 亚洲一级二级三级| 亚洲精品国偷拍自产在线观看蜜桃| 久久大大胆人体| 国产精品免费精品自在线观看| 一区二区免费电影| 麻豆精品一二三| 激情五月激情综合| 欧美日韩和欧美的一区二区| 色哟哟免费在线观看| 国产精品吴梦梦| 视频在线不卡免费观看| 日本国产一级片| 一区二区三区四区中文字幕| 精品国产av一区二区| 欧美大片免费看| silk一区二区三区精品视频| 欧美日韩福利在线| 99久久综合精品| 最新中文字幕一区| 一道本无吗dⅴd在线播放一区| 最新日韩一区| 欧美日韩视频免费在线观看| 国产一区不卡视频| 久久久久久久久久一区二区三区| 精品蜜桃在线看| 在线高清av| 视频在线精品一区| 国产尤物一区二区在线| 久久久国产精品黄毛片| 日韩av影片在线观看| **在线精品| 最新精品视频| 成人高清视频在线| 亚洲精品男人的天堂| 一本一本久久a久久精品牛牛影视| 免费日韩成人| 日本黄大片在线观看| 久久久噜噜噜久噜久久综合| 亚洲专区在线播放| 久久久久久久久国产| 国产一区二区三区四区| 五月六月丁香婷婷| 亚洲va欧美va人人爽午夜| 高清国产福利在线观看| 91在线中文字幕| 国产精品一页| 9999热视频| 精品视频在线播放色网色视频| 欧美视频在线视频精品| 国产精品一色哟哟| 欧美国产1区2区| 黄色片网站免费在线观看| 国产精品久久久久久久久久久久久久 | 欧美欧美黄在线二区| 一级黄色片在线免费观看| 欧美日韩亚洲91| 麻豆av在线导航| 久久精品aaaaaa毛片| 国内精品自线一区二区三区视频| 久久久久久久久久免费视频| 久久久精品久久| 国产精品美女久久久久久不卡| 国模大尺度视频| 欧美日韩国产一区二区三区地区| 三级中文字幕在线观看|