精品欧美一区二区三区在线观看 _久久久久国色av免费观看性色_国产精品久久在线观看_亚洲第一综合网站_91精品又粗又猛又爽_小泽玛利亚一区二区免费_91亚洲精品国偷拍自产在线观看 _久久精品视频在线播放_美女精品久久久_欧美日韩国产成人在线

Spring Boot + MQTT:消息交互輕松實現

網絡 網絡管理
MQTT(Message Queuing Telemetry Transport)是一種輕量級的消息傳輸協議,專為資源受限的設備和低帶寬、高延遲的網絡環境設計。在物聯網(IoT)領域,MQTT因其低開銷和高效能而被廣泛應用。

介紹

MQTT(Message Queuing Telemetry Transport)是一種輕量級的消息傳輸協議,專為資源受限的設備和低帶寬、高延遲的網絡環境設計。在物聯網(IoT)領域,MQTT因其低開銷和高效能而被廣泛應用。

圖片圖片

使用

安裝說明

https://docs.emqx.com/zh/emqx/v4.3/getting-started/install.html#zip-%E5%8E%8B%E7%BC%A9%E5%8C%85%E5%AE%89%E8%A3%85-linux%E3%80%81macos%E3%80%81windows

圖片圖片

默認賬號:admin/public,登錄后進入首頁:

圖片圖片

依賴引入

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-integration</artifactId>
</dependency>
<dependency>
    <groupId>org.springframework.integration</groupId>
    <artifactId>spring-integration-mqtt</artifactId>
</dependency>

配置

# MQTT配置
mqtt:
  broker-url: tcp://localhost:1883 # MQTT代理服務器地址
  client-id: yian-mqtt-client # 客戶端ID,必須唯一
  username: admin # 用戶名(如果需要)
  password: 1qazxsw2 # 密碼(如果需要)
  topics:
    topic1: test/topic_1
    topic2: test/topic_2
    topic3: test/topic_3
  qos: 1 # 消息質量等級(0、1或2)
  automatic-reconnect: true# 自動重連
  clean-session: true# 是否清除會話
  connection-timeout: 5000 # 連接超時時間(秒)
  keep-alive-interval: 30 # 保持連接時間(秒)
  pool-config:
    core-size: 8
    max-size: 32
    queue-capacity: 1024
    thread-name-prefix: mqtt-worker-
@Data
@Component
@ConfigurationProperties(prefix = "mqtt")
public class MqttProperties {
    /**
     * MQTT Broker地址
     */
    private String brokerUrl;

    /**
     * 客戶端ID
     */
    private String clientId;

    /**
     * 用戶名
     */
    private String username;

    /**
     * 密碼
     */
    private String password;

    /**
     * QoS級別 (0/1/2)
     */
    private int qos = 1;
    /**
     * 自動重連
     */
    private boolean automaticReconnect = false;
    /**
     * 清理會話
     */
    private boolean cleanSession = false;

    /**
     * 連接超時時間(ms)
     */
    private int connectionTimeout = 5000;

    /**
     * 保持連接間隔(秒)
     */
    private int keepAliveInterval = 30;

    /**
     * 主題配置
     */
    private Topics topics = new Topics();
    /**
     * 線程相關參數
     */
    private PoolConfig poolConfig = new PoolConfig();

    @Data
    public static class PoolConfig {
        private int coreSize = 8;
        private int maxSize = 16;
        private int queueCapacity = 1000;
        private String threadPrefix = "mqtt-worker-";
    }

    @Data
    public static class Topics {
        /**
         * topic1
         */
        private String topic1;

        /**
         * topic2
         */
        private String topic2;

        /**
         * topic3
         */
        private String topic3;

    }

    // 需要顯式聲明空構造器
    public MqttProperties() {
    }
}

MQTT配置類

@Slf4j
@Configuration
public class MqttConfig {
    private static final List<String> DEFAULT_TOPICS = Collections.singletonList("defaultTopic");

    private final MqttProperties mqttProperties;
    private final MqttCallback mqttCallback;

    public MqttConfig(MqttProperties mqttProperties, MqttCallback mqttCallback) {
        this.mqttProperties = mqttProperties;
        this.mqttCallback = mqttCallback;
    }

    @Bean
    public MqttClient mqttClient() throws MqttException {
        MqttClient client = createMqttClient();
        MqttConnectOptions options = buildMqttConnectOptions();

        try {
            client.connect(options);
            log.info("MQTT連接成功,Broker地址: {}", mqttProperties.getBrokerUrl());
            subscribeTopics(client);
        } catch (MqttException e) {
            log.error("MQTT連接異常: {},錯誤碼: {}", e.getMessage(), e.getReasonCode(), e);
            throw new RuntimeException("MQTT連接失敗", e);
        }

        client.setCallback(mqttCallback);
        return client;
    }

    private MqttClient createMqttClient() throws MqttException {
        return new MqttClient(
                mqttProperties.getBrokerUrl(),
                generateClientId(),
                new MemoryPersistence()
        );
    }

    private String generateClientId() {
        return Optional.ofNullable(mqttProperties.getClientId())
                .filter(StringUtils::hasText)
                .orElseGet(() -> "CLIENT_" + System.currentTimeMillis());
    }

    private MqttConnectOptions buildMqttConnectOptions() {
        MqttConnectOptions options = new MqttConnectOptions();
        options.setAutomaticReconnect(mqttProperties.isAutomaticReconnect());
        options.setCleanSession(mqttProperties.isCleanSession());

        Optional.ofNullable(mqttProperties.getUsername())
                .filter(StringUtils::hasText)
                .ifPresent(options::setUserName);

        Optional.ofNullable(mqttProperties.getPassword())
                .filter(StringUtils::hasText)
                .map(String::toCharArray)
                .ifPresent(options::setPassword);

        options.setConnectionTimeout(mqttProperties.getConnectionTimeout());
        options.setKeepAliveInterval(mqttProperties.getKeepAliveInterval());
        return options;
    }

    private void subscribeTopics(MqttClient client) throws MqttException {
        List<String> topics = getTopicsToSubscribe();
        for (String topic : topics) {
            try {
                client.subscribe(topic, mqttProperties.getQos());
                log.info("成功訂閱主題: {}", topic);
            } catch (MqttException e) {
                log.error("訂閱主題[{}]失敗,錯誤碼: {}", topic, e.getReasonCode(), e);
                throw e;
            }
        }
    }

    private List<String> getTopicsToSubscribe() {
        return Optional.ofNullable(mqttProperties.getTopics())
                .map(t -> Arrays.asList(t.getTopic1()))
                .filter(list -> !list.contains(null))
                .orElse(DEFAULT_TOPICS);
    }

    @Bean("mqttExecutor")
    public Executor mqttExecutor() {
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
        executor.setCorePoolSize(mqttProperties.getPoolConfig().getCoreSize());
        executor.setMaxPoolSize(mqttProperties.getPoolConfig().getMaxSize());
        executor.setQueueCapacity(mqttProperties.getPoolConfig().getQueueCapacity());
        executor.setThreadNamePrefix(mqttProperties.getPoolConfig().getThreadPrefix());
        executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
        executor.initialize();
        return executor;
    }
}

消息回調,處理接收的消息

@Slf4j
@Component
public class MqttMessageListener implements MqttCallback {
    private static final int MAX_RETRY_ATTEMPTS = 10;
    private static final long INITIAL_RETRY_DELAY = 1_000L;
    private static final List<String> DEFAULT_TOPICS = Collections.singletonList("defaultTopic");

    private final ScheduledExecutorService reconnectScheduler = Executors.newSingleThreadScheduledExecutor();
    private final AtomicInteger retryCounter = new AtomicInteger(0);

    private final Map<String, MessageHandler> topicHandlers = new ConcurrentHashMap<>();

    private final TestService testService;
    private final MqttProperties mqttProperties;
    @Lazy
    @Autowired
    private MqttClient mqttClient;


    public MqttMessageListener(TestService testService, MqttProperties mqttProperties) {
        this.testService = testService;
        this.mqttProperties = mqttProperties;
        initializeHandlers();
    }

    private void initializeHandlers() {
        topicHandlers.put(mqttProperties.getTopics().getTopic1(), this::handleMessage2);
    }

    @Override
    public void connectionLost(Throwable cause) {
        log.error("MQTT連接中斷,原因: {}", cause.getMessage());
        scheduleReconnect();
    }

    private synchronized void scheduleReconnect() {
        int attempt = retryCounter.incrementAndGet();
        if (attempt > MAX_RETRY_ATTEMPTS) {
            log.error("達到最大重連次數[{}],停止重連嘗試", MAX_RETRY_ATTEMPTS);
            return;
        }
        long delay = INITIAL_RETRY_DELAY * (long) Math.pow(2, attempt - 1);
        log.info("將在{}ms后嘗試第{}次重連...", delay, attempt);

        reconnectScheduler.schedule(() -> {
            try {
                if (!mqttClient.isConnected()) {
                    mqttClient.reconnect();
                }
                subscribeTopics(mqttClient);
                mqttClient.setCallback(this);
                retryCounter.set(0);
                log.info("MQTT連接恢復成功");
            } catch (MqttException e) {
                log.error("第{}次重連失敗: {}", attempt, e.getMessage(),e);
                scheduleReconnect();
            }
        }, delay, TimeUnit.MILLISECONDS);
    }


    private void subscribeTopics(MqttClient client) throws MqttException {
        List<String> topics = getTopicsToSubscribe();
        for (String topic : topics) {
            try {
                client.subscribe(topic, mqttProperties.getQos());
                log.info("成功訂閱主題: {}", topic);
            } catch (MqttException e) {
                log.error("訂閱主題[{}]失敗,錯誤碼: {}", topic, e.getReasonCode(), e);
                throw e;
            }
        }
    }

    private List<String> getTopicsToSubscribe() {
        return Optional.ofNullable(mqttProperties.getTopics())
                .map(t -> Arrays.asList(t.getTopic1()))
                .filter(list -> !list.contains(null))
                .orElse(DEFAULT_TOPICS);
    }

    @Override
    public void messageArrived(String topic, MqttMessage message) {
        try {
            String payload = validatePayload(message.getPayload());
            log.info("收到消息 [Topic:{}][QoS:{}] {}", topic, message.getQos(), payload);

            MessageHandler handler = Optional.ofNullable(topicHandlers.get(topic))
                    .orElseThrow(() -> new MqttException(MqttException.REASON_CODE_CLIENT_EXCEPTION));

            asyncProcessMessage(() -> {
                try {
                    handler.handle(payload);
                } catch (Exception e) {
                    throw new RuntimeException(e);
                }
            });

        } catch (JSONException e) {
            log.error("消息格式錯誤 [Topic:{}]: {}", topic, e.getMessage(), e);
        } catch (MqttException e) {
            log.error("消息處理失敗 [Topic:{}]: {}", topic, e.getMessage(), e);
        } catch (Exception e) {
            log.error("未知處理異常 [Topic:{}]: {}", topic, e.getMessage(), e);
        }
    }

    private String validatePayload(byte[] payload) throws JSONException {
        String json = new String(payload);
        try {
            JSON.parse(json); // 完整解析 JSON,捕獲格式異常
        } catch (JSONException e) {
            log.error("非法JSON格式: {}", json);
            throw new JSONException("非法JSON格式", e);
        }
        return json;
    }

    @Async("mqttExecutor")
    public void asyncProcessMessage(Runnable task) {
        task.run();
    }

    private void handleMessage2(String payload) {
        testService.handleMessage2(payload);
    }
    private void handleMessage3(String payload) {
        testService.handleMessage3(payload);
    }

    @Override
    public void deliveryComplete(IMqttDeliveryToken token) {
        try {
            if (token.getException() != null) {
                log.error("消息投遞失敗 [MessageId:{}]", token.getMessageId(), token.getException());
            } else {
                log.info("消息投遞成功 [MessageId:{}]", token.getMessageId());
            }
        } catch (Exception e) {
            log.error("獲取投遞狀態失敗", e);
        }
    }

    @FunctionalInterface
    private interface MessageHandler {
        void handle(String payload) throws Exception;
    }
}

消息發布

@Slf4j
@Component
public class MqttPublisher {

    @Autowired
    @Lazy
    private MqttClient mqttClient;

    @Value("${mqtt.qos:1}")
    private int qosLevel;

    private final Object publishLock = new Object();

    /**
     * 發布消息
     */
    public void publishMessage(String msg, String topic) {
        try {
            synchronized (publishLock) { // 線程安全鎖
                MqttMessage message = new MqttMessage();
                message.setPayload(msg.getBytes());
                message.setQos(qosLevel);
                message.setRetained(false); // 不保留消息

                mqttClient.publish(topic, message);
                log.info("Topic:{} 響應已發送: {}", topic, msg);
            }
        } catch (MqttException e) {
            log.error("MQTT發布失敗: {}", e.getMessage());
            handlePublishFailure(msg, topic, e);
        }
    }

    /**
     * 消息失敗重試機制
     */
    private void handlePublishFailure(String msg, String topic, MqttException e) {
        try {
            if (mqttClient.isConnected()) {
                mqttClient.disconnect();
            }
            mqttClient.connect();
            publishMessage(msg, topic); // 重試發布
        } catch (MqttException ex) {
            log.error("消息重試失敗: {}", ex.getMessage(),ex);
        }
    }

    /**
     * 消息持久化存儲(QoS 1保障)
     */
    private void saveFailedMessage(JSONObject msg) {
        // 實現數據庫存儲邏輯(此處需補充DAO操作)
        log.warn("持久化失敗消息: {}", msg);
    }
}

測試

啟動程序:

圖片圖片

查看主題:

圖片圖片

發布消息:

圖片圖片

責任編輯:武曉燕 來源: 一安未來
相關推薦

2024-10-11 11:32:22

Spring6RSocket服務

2025-05-29 01:33:00

微服務架構系統

2025-04-09 02:02:00

Spring框架開發

2021-08-04 10:22:27

鴻蒙HarmonyOS應用

2023-10-15 22:40:25

插件JIB

2020-04-23 15:59:04

SpringKafka集群

2024-08-09 08:52:26

2021-09-03 06:46:34

Spring 6pring Boot 項目

2021-09-15 09:02:20

Spring 6Spring BootJava

2022-07-13 08:36:57

MQ架構設計模式

2023-10-18 15:25:29

數據源數據庫

2025-02-08 08:16:16

2025-02-17 00:00:45

接口支付寶沙箱

2024-09-12 14:50:08

2024-10-25 08:41:18

消息隊列RedisList

2022-09-22 13:28:34

Redis分布式鎖

2022-09-29 08:28:57

SpringRedis分布式

2024-10-17 11:24:04

2024-01-04 18:01:55

高并發SpringBoot

2025-07-01 01:00:00

Spring消息系統Redis
點贊
收藏

51CTO技術棧公眾號

国产精品视频在| 久操手机在线视频| 怡红院av久久久久久久| 国产一区二区欧美| 欧美精品xxxxbbbb| 久久天天东北熟女毛茸茸| 高清乱码毛片入口| 老司机精品导航| 中文字幕欧美日韩| 中国男女全黄大片| 成人美女视频| 亚洲欧洲另类国产综合| 国产伦精品一区二区三区照片| www.国产高清| 人人狠狠综合久久亚洲婷| 日韩亚洲欧美一区| 日本在线视频www| 久操免费在线| 91视频免费播放| 国产一区玩具在线观看| 在线免费观看毛片| 久久一区二区三区喷水| 精品美女在线观看| 超碰超碰在线观看| www.超碰在线| 亚洲色欲色欲www| 国产高清一区视频| 日本天堂网在线| 成人婷婷网色偷偷亚洲男人的天堂| 6080yy午夜一二三区久久| 欧美v在线观看| 最爽无遮挡行房视频在线| 久久亚洲一级片| 91久久国产精品91久久性色| 日韩无码精品一区二区三区| 欧美国产美女| 精品在线欧美视频| 亚洲 自拍 另类 欧美 丝袜| 99久久综合国产精品二区| 亚洲成人av免费| 99热都是精品| 3d成人动漫在线| 91在线一区二区三区| 亚洲最大的免费| 老熟妇一区二区三区啪啪| 中文高清一区| 欧美高清videos高潮hd| 国产色无码精品视频国产| 久久99国内| 亚洲韩国欧洲国产日产av| 免费不卡av网站| 欧美爱爱视频| 欧美在线短视频| 乱妇乱女熟妇熟女网站| 在线不卡日本v二区707| 国产精品久久久久aaaa| 午夜精品一区二区三区四区| 日韩一区免费视频| 国产成人精品免费视频网站| 91精品视频网站| 国产又粗又大又爽| 另类小说视频一区二区| 国产精品女人久久久久久| 成人公开免费视频| 久久精品日产第一区二区| 国产最新精品视频| 欧美精品成人久久| 国产精品啊v在线| 欧美大尺度激情区在线播放 | 成a人片亚洲日本久久| 亚洲综合精品一区二区| 国产99对白在线播放| 国产高清不卡二三区| 亚洲最大福利视频| 亚洲欧美另类一区| proumb性欧美在线观看| 国内精品视频在线播放| 色视频在线观看免费| 久久综合网色—综合色88| 欧美亚州在线观看| 国产精品毛片一区二区三区四区| 国产欧美一区二区精品性色超碰| 亚洲欧美日韩精品久久久 | 在线观看日韩羞羞视频| 国产原厂视频在线观看| 亚洲男人的天堂一区二区| 欧美 亚洲 视频| caoporn视频在线| 同产精品九九九| 日韩精品无码一区二区三区免费| 日本综合视频| 3d成人h动漫网站入口| 日本性生活一级片| 日韩一级电影| 中文字幕亚洲在线| 唐朝av高清盛宴| 中文精品在线| 国产免费一区二区三区在线观看 | 欧美成人vps| 波多野结衣先锋影音| 成人激情在线| 欧美国产日韩免费| 老熟妇仑乱一区二区av| 久久99精品国产| 国产九色91| 92国产在线视频| 亚洲线精品一区二区三区八戒| 欧美日韩在线视频一区二区三区| 亚洲精品第一| 日韩国产欧美区| 亚洲精品久久久久久国| 亚洲精选成人| 国产欧美一区二区三区在线| 黄色aaa毛片| 中文字幕乱码久久午夜不卡| 黄色成人在线免费观看| 99re久久| 亚洲精品国产美女| 你懂得在线观看| 国产伦理一区| 亚洲综合色av| 国产youjizz在线| 亚洲国产一区在线观看| 成人日韩在线视频| 天堂俺去俺来也www久久婷婷| 久久精品小视频| 久久久久久久久久久影院| 国产一区二区0| 日韩精品第一页| 波多野结衣中文在线| 欧美高清hd18日本| 精品无码在线观看| 免费在线亚洲欧美| 国产精品久久久久av福利动漫| 日韩毛片久久久| 色94色欧美sute亚洲线路一久| 精品人妻伦一二三区久| 一区二区电影| 国产精品一区二区三区在线播放 | 免费一级做a爰片久久毛片潮| 欧美日韩国产欧| 成人黄色av播放免费| 久久精品国产亚洲a∨麻豆| 亚洲电影中文字幕在线观看| 超碰在线超碰在线| 911久久香蕉国产线看观看| 国产精品久久久久久久久久久久 | 日本一本在线免费福利| 欧美剧在线免费观看网站| 久久美女免费视频| 亚洲一区图片| 精品一区二区三区国产| 99在线视频影院| 欧美xxxx老人做受| 欧美日韩国产精品综合| 国产精品中文欧美| 国产香蕉一区二区三区| 国产日韩在线观看视频| www.亚洲天堂| 国产又粗又黄又爽视频| 中文字幕一区二区5566日韩| 色国产在线视频| 青青草91久久久久久久久| 国产99视频精品免视看7| 黄网在线观看| 色偷偷久久人人79超碰人人澡| 久久中文字幕人妻| 久久精品免费| 色姑娘综合av| 日本免费成人| 美日韩在线视频| 成人免费视频国产| 欧美日韩国产在线| 成都免费高清电影| av毛片在线播放| 狠狠干成人综合网| 成人免费在线看片| 波多野结衣在线播放| 亚洲国产成人av在线| 影音先锋在线国产| 中文av一区特黄| 亚洲精品第三页| 91成人网在线观看| 国产99在线免费| 国产欧洲在线| 亚洲深夜福利在线| 一级特黄特色的免费大片视频| 亚洲人妖av一区二区| 欧美性受xxxx黒人xyx性爽| 欧美日韩1区| 农村寡妇一区二区三区| 国产一区二区三区影视| 久久视频在线视频| 六月丁香色婷婷| 欧美性猛交xxxx乱大交| 影音先锋男人看片资源| 国产精品一区二区果冻传媒| 日本韩国欧美在线观看| 精品国产中文字幕第一页| 成人免费看黄网站| av女在线播放| 最近2019中文字幕大全第二页 | 国产在线日韩在线| 成人在线免费观看黄色| 亚洲午夜av电影| jlzzjlzz亚洲女人18| 色综合色综合色综合| 欧美在线视频第一页| 91亚洲男人天堂| 欧美激情第3页| 99国产精品视频免费观看一公开 | 国产精品99久久免费黑人人妻| 婷婷激情图片久久| 精品综合在线| 蜜桃精品一区二区三区| 国产91在线播放精品91| 羞羞视频在线观看免费| 亚洲网站视频福利| 好吊视频一二三区| 欧美日韩国产在线观看| 日韩欧美亚洲视频| 亚洲视频一区二区在线观看| 好吊日免费视频| 国产精品99久久久久久久女警| 国产黄色一级网站| 欧美freesex交免费视频| 欧美日韩免费高清| 福利片一区二区| 成人黄色短视频在线观看| 欧美xx视频| 性欧美亚洲xxxx乳在线观看| 巨大荫蒂视频欧美大片| 亚洲性无码av在线| 视频在线观看你懂的| 精品国产乱码久久久久久1区2区| 一级片在线免费观看视频| 一本色道a无线码一区v| 日本一级黄色录像| 亚洲综合免费观看高清完整版| 国产黄色录像视频| 久久久美女毛片 | 久久午夜老司机| 国产chinesehd精品露脸| 久久99精品视频| 国产理论在线播放| 媚黑女一区二区| 成人综合视频在线| 亚洲区第一页| 99色这里只有精品| 一区三区视频| 国产精品一色哟哟| 国内精品美女在线观看| 欧美日韩一区二区三区电影| 国产精品精品| 伊人久久大香线蕉av一区| 成人激情视频| 亚洲欧洲精品一区二区三区波多野1战4 | 国产免费av高清在线| 亚洲精品720p| 婷婷综合激情网| 亚洲精品99久久久久| 国产综合无码一区二区色蜜蜜| 日韩免费看网站| 精品国产一级片| 日韩一区二区免费电影| av一级黄色片| 亚洲成人1234| 日本天堂在线| 亚洲欧美制服中文字幕| 国产乱视频在线观看| 尤物精品国产第一福利三区| 成人免费在线电影| 久久精品国产成人精品| 亚洲大胆人体大胆做受1| 久久久久久久久国产精品| 爱情岛亚洲播放路线| 91精品国产乱码久久久久久蜜臀| 新版的欧美在线视频| 日本久久亚洲电影| 深夜视频一区二区| 91在线视频精品| 国产精品主播在线观看| 欧洲高清一区二区| 久久亚洲在线| 日韩免费在线观看av| 一区二区毛片| 免费涩涩18网站入口| 国内久久精品视频| 中文字幕在线视频播放| 国产亚洲精品福利| 亚洲女人久久久| 亚洲成av人片一区二区| 蜜臀尤物一区二区三区直播| 91麻豆精品国产自产在线| 国模私拍视频在线| 亚洲视频在线免费看| 国产黄色在线观看| 91精品国产色综合| 久久亚洲精品人成综合网| av一区二区三区在线观看| 偷拍精品福利视频导航| 亚洲人体一区| 亚洲大胆视频| 一本岛在线视频| av色综合久久天堂av综合| 少妇精品无码一区二区免费视频| 亚洲免费av在线| 福利网址在线观看| 日韩欧美一级精品久久| 精品无人乱码| 欧美激情中文字幕在线| 18岁成人毛片| 蜜臀av性久久久久蜜臀aⅴ流畅 | 婷婷色一区二区三区| 一区二区免费看| jizz国产在线| 精品处破学生在线二十三| av网站在线免费观看| 欧美肥婆姓交大片| 国外成人福利视频| 精品网站在线看| 亚洲成人三区| 免费看污黄网站| 91亚洲精品一区二区乱码| 天天天天天天天天操| 91国偷自产一区二区开放时间| 免费看av毛片| 免费91在线视频| 成人精品高清在线视频| 美脚丝袜一区二区三区在线观看| 欧美在线三级| www.国产视频.com| 国产视频一区在线观看| 国产成人在线观看网站| 日韩精品影音先锋| 久做在线视频免费观看| 国产精品直播网红| 欧美美乳视频| 精品中文字幕av| 成人激情视频网站| 成人免费毛片东京热| 51精品秘密在线观看| 欧美日韩在线看片| 国产精品亚洲网站| 日本不卡高清| jizz欧美激情18| 久久亚洲精精品中文字幕早川悠里| 久久久久久久极品内射| 日韩欧美国产不卡| 在线三级电影| 亚洲一区制服诱惑| 在线看片不卡| aaaaaaaa毛片| 一区二区三区精品在线| 国产chinasex对白videos麻豆| 久热爱精品视频线路一| 国产精品视频一区二区三区| 天天爱天天做天天操| 激情综合五月婷婷| 999精品视频在线观看播放| 69精品人人人人| 26uuu亚洲电影在线观看| 亚洲综合中文字幕68页| 欧美日韩国产色综合一二三四| 任你躁av一区二区三区| 亚洲成人激情av| 日韩一级免费毛片| 欧美怡红院视频一区二区三区| 亚洲人挤奶视频| 男人搞女人网站| 中文字幕中文字幕在线一区 | 91免费版网站在线观看| 欧美激情91| 久久久午夜精品福利内容| 黑人巨大精品欧美一区二区| 加勒比一区二区三区在线| 国产精品欧美日韩久久| 天天做天天爱天天综合网2021| 日韩精品xxx| 午夜精品久久久久久久| 秋霞av在线| 国产剧情久久久久久| 牛牛国产精品| 国产精品无码在线| 色偷偷久久一区二区三区| 日本免费在线观看| 97夜夜澡人人双人人人喊| 99成人在线| 中国美女黄色一级片| 精品国产乱码久久久久久闺蜜| 欧美日韩精品免费观看视完整| 伊人精品久久久久7777| 成人午夜看片网址| 中文字幕日韩免费| 久久精品国产欧美激情| 精品深夜福利视频| www.99在线| 亚洲成人动漫在线观看| 高清毛片在线看| 成人女人免费毛片|