精品欧美一区二区三区在线观看 _久久久久国色av免费观看性色_国产精品久久在线观看_亚洲第一综合网站_91精品又粗又猛又爽_小泽玛利亚一区二区免费_91亚洲精品国偷拍自产在线观看 _久久精品视频在线播放_美女精品久久久_欧美日韩国产成人在线

ElasticSearch - 批量更新bulk死鎖問題排查

開發 后端
升級ES客戶端版本到7.6正式版,后續版本通過將異常重試任務線程池和flush任務線程池進行了物理隔離,從而避免了線程池的競爭,但是需要考慮版本兼容性。

一、問題系統介紹

1. 監聽商品變更MQ消息,查詢商品最新的信息,調用BulkProcessor批量更新ES集群中的商品字段信息;

2. 由于商品數據非常多,所以將商品數據存儲到ES集群上,整個ES集群共劃分了256個分片,并根據商品的三級類目ID進行分片路由。

比如一個SKU的商品名稱發生變化,我們就會收到這個SKU的變更MQ消息,然后再去查詢商品接口,將商品的最新名稱查詢回來,再根據這個SKU的三級分類ID進行路由,找到對應的ES集群分片,然后更新商品名稱字段信息。

由于商品變更MQ消息量巨大,為了提升更新ES的性能,防止出現MQ消息積壓問題,所以本系統使用了BulkProcessor進行批量異步更新。

ES客戶端版本如下:

<dependency>
            <artifactId>elasticsearch-rest-client</artifactId>
            <groupId>org.elasticsearch.client</groupId>
            <version>6.5.3</version>
        </dependency>

BulkProcessor配置偽代碼如下:

//在這里調用build()方法構造bulkProcessor,在底層實際上是用了bulk的異步操作
        this.fullDataBulkProcessor = BulkProcessor.builder((request, bulkListener) ->
                fullDataEsClient.getClient().bulkAsync(request, RequestOptions.DEFAULT, bulkListener), listener)
                // 1000條數據請求執行一次bulk
                .setBulkActions(1000)
                // 5mb的數據刷新一次bulk
                .setBulkSize(new ByteSizeValue(5L, ByteSizeUnit.MB))
                // 并發請求數量, 0不并發, 1并發允許執行
                .setConcurrentRequests(1)
                // 固定1s必須刷新一次
                .setFlushInterval(TimeValue.timeValueSeconds(1L))
                // 重試5次,間隔1s
                .setBackoffPolicy(BackoffPolicy.constantBackoff(TimeValue.timeValueSeconds(1L), 5))
                .build();

二、問題怎么發現的

1. 618大促開始后,由于商品變更MQ消息非常頻繁,MQ消息每天的消息量更是達到了日常的數倍,而且好多商品還變更了三級類目ID;

2. 系統在更新這些三級類目ID發生變化的SKU商品信息時,根據修改后的三級類目ID路由后的分片更新商品信息時發生了錯誤,并且重試了5次,依然沒有成功;

3. 因為在新路由的分片上沒有這個商品的索引信息,這些更新請求永遠也不會執行成功,系統的日志文件中也記錄了大量的異常重試日志。

4. 商品變更MQ消息也開始出現了積壓報警,MQ消息的消費速度明顯趕不上生產速度。

5. 觀察MQ消息消費者的UMP監控數據,發現消費性能很平穩,沒有明顯波動,但是調用次數會在系統消費MQ一段時間后出現斷崖式下降,由原來的每分鐘幾萬調用量逐漸下降到個位數。

6. 在重啟應用后,系統又開始消費,UMP監控調用次數恢復到正常水平,但是系統運行一段時間后,還是會出現消費暫停問題,仿佛所有消費線程都被暫停了一樣。

三、排查問題的詳細過程

首先找一臺暫停消費MQ消息的容器,查看應用進程ID,使用jstack命令dump應用進程的整個線程堆棧信息,將導出的線程堆棧信息打包上傳到 https://fastthread.io/ 進行線程狀態分析。分析報告如下:

通過分析報告發現有124個處于BLOCKED狀態的線程,然后可以點擊查看各線程的詳細堆棧信息,堆棧信息如下:

連續查看多個線程的詳細堆棧信息,MQ消費線程都是在waiting to lock <0x00000005eb781b10> (a
org.elasticsearch.action.bulk.BulkProcessor),然后根據
0x00000005eb781b10去搜索發現,這個對象鎖正在被另外一個線程占用,占用線程堆棧信息如下:

這個線程狀態此時正處于WAITING狀態,通過線程名稱發現,該線程應該是ES客戶端內部線程。正是該線程搶占了業務線程的鎖,然后又在等待其他條件觸發該線程執行,所以導致了所有的MQ消費業務線程一直無法獲取BulkProcessor內部的鎖,導致出現了消費暫停問題。

但是這個線程elasticsearch[scheduler][T#1]為啥不能執行? 它是什么時候啟動的? 又有什么作用?

就需要我們對BulkProcessor進行深入分析,由于BulkProcessor是通過builder模塊進行創建的,所以深入builder源碼,了解一下BulkProcessor的創建過程。

public static Builder builder(BiConsumer<BulkRequest, ActionListener<BulkResponse>> consumer, Listener listener) {
        Objects.requireNonNull(consumer, "consumer");
        Objects.requireNonNull(listener, "listener");
        final ScheduledThreadPoolExecutor scheduledThreadPoolExecutor = Scheduler.initScheduler(Settings.EMPTY);
        return new Builder(consumer, listener,
                (delay, executor, command) -> scheduledThreadPoolExecutor.schedule(command, delay.millis(), TimeUnit.MILLISECONDS),
                () -> Scheduler.terminate(scheduledThreadPoolExecutor, 10, TimeUnit.SECONDS));
    }

內部創建了一個時間調度執行線程池,線程命名規則和上述持有鎖的線程名稱相似,具體代碼如下:

static ScheduledThreadPoolExecutor initScheduler(Settings settings) {
        ScheduledThreadPoolExecutor scheduler = new ScheduledThreadPoolExecutor(1,
                EsExecutors.daemonThreadFactory(settings, "scheduler"), new EsAbortPolicy());
        scheduler.setExecuteExistingDelayedTasksAfterShutdownPolicy(false);
        scheduler.setContinueExistingPeriodicTasksAfterShutdownPolicy(false);
        scheduler.setRemoveOnCancelPolicy(true);
        return scheduler;
    }

最后在build方法內部執行了BulkProcessor的內部有參構造方法,在構造方法內部啟動了一個周期性執行的flushing任務,代碼如下

BulkProcessor(BiConsumer<BulkRequest, ActionListener<BulkResponse>> consumer, BackoffPolicy backoffPolicy, Listener listener,
                  int concurrentRequests, int bulkActions, ByteSizeValue bulkSize, @Nullable TimeValue flushInterval,
                  Scheduler scheduler, Runnable onClose) {
        this.bulkActions = bulkActions;
        this.bulkSize = bulkSize.getBytes();
        this.bulkRequest = new BulkRequest();
        this.scheduler = scheduler;
        this.bulkRequestHandler = new BulkRequestHandler(consumer, backoffPolicy, listener, scheduler, concurrentRequests);
        // Start period flushing task after everything is setup
        this.cancellableFlushTask = startFlushTask(flushInterval, scheduler);
        this.onClose = onClose;
    }
private Scheduler.Cancellable startFlushTask(TimeValue flushInterval, Scheduler scheduler) {
        if (flushInterval == null) {
            return new Scheduler.Cancellable() {
                @Override
                public void cancel() {}

                @Override
                public boolean isCancelled() {
                    return true;
                }
            };
        }
        final Runnable flushRunnable = scheduler.preserveContext(new Flush());
        return scheduler.scheduleWithFixedDelay(flushRunnable, flushInterval, ThreadPool.Names.GENERIC);
    }
class Flush implements Runnable {

        @Override
        public void run() {
            synchronized (BulkProcessor.this) {
                if (closed) {
                    return;
                }
                if (bulkRequest.numberOfActions() == 0) {
                    return;
                }
                execute();
            }
        }
    }

通過源代碼發現,該flush任務就是在創建BulkProcessor對象時設置的固定時間flush邏輯,當setFlushInterval方法參數生效,就會啟動一個后臺定時flush任務。flush間隔,由setFlushInterval方法參數定義。該flush任務在運行期間,也會搶占BulkProcessor對象鎖,搶到鎖后,才會執行execute方法。具體的方法調用關系源代碼如下:

/**
     * Adds the data from the bytes to be processed by the bulk processor
     */
    public synchronized BulkProcessor add(BytesReference data, @Nullable String defaultIndex, @Nullable String defaultType,
                                          @Nullable String defaultPipeline, @Nullable Object payload, XContentType xContentType) throws Exception {
        bulkRequest.add(data, defaultIndex, defaultType, null, null, null, defaultPipeline, payload, true, xContentType);
        executeIfNeeded();
        return this;
    }

    private void executeIfNeeded() {
        ensureOpen();
        if (!isOverTheLimit()) {
            return;
        }
        execute();
    }

    // (currently) needs to be executed under a lock
    private void execute() {
        final BulkRequest bulkRequest = this.bulkRequest;
        final long executionId = executionIdGen.incrementAndGet();

        this.bulkRequest = new BulkRequest();
        this.bulkRequestHandler.execute(bulkRequest, executionId);
    }

而上述代碼中的add方法,則是由MQ消費業務線程去調用,在該方法上同樣有一個synchronized關鍵字,所以消費MQ業務線程會和flush任務執行線程直接會存在鎖競爭關系。具體MQ消費業務線程調用偽代碼如下:

@Override
 public void upsertCommonSku(CommonSkuEntity commonSkuEntity) {
            String source = JsonUtil.toString(commonSkuEntity);
            UpdateRequest updateRequest = new UpdateRequest(Constants.INDEX_NAME_SPU, Constants.INDEX_TYPE, commonSkuEntity.getSkuId().toString());
            updateRequest.doc(source, XContentType.JSON);
            IndexRequest indexRequest = new IndexRequest(Constants.INDEX_NAME_SPU, Constants.INDEX_TYPE, commonSkuEntity.getSkuId().toString());
            indexRequest.source(source, XContentType.JSON);
            updateRequest.upsert(indexRequest);
            updateRequest.routing(commonSkuEntity.getCat3().toString());
            fullbulkProcessor.add(updateRequest);
}

通過以上對線程堆棧分析,發現所有的業務線程都在等待elasticsearch[scheduler][T#1]線程釋放BulkProcessor對象鎖,但是該線程確一直沒有釋放該對象鎖,從而出現了業務線程的死鎖問題。

結合應用日志文件中出現的大量異常重試日志,可能與BulkProcessor的異常重試策略有關,然后進一步了解BulkProcessor的異常重試代碼邏輯。由于業務線程中提交BulkRequest請求都統一提交到了BulkRequestHandler對象中的execute方法內部進行處理,代碼如下:

public final class BulkRequestHandler {
    private final Logger logger;
    private final BiConsumer<BulkRequest, ActionListener<BulkResponse>> consumer;
    private final BulkProcessor.Listener listener;
    private final Semaphore semaphore;
    private final Retry retry;
    private final int concurrentRequests;

    BulkRequestHandler(BiConsumer<BulkRequest, ActionListener<BulkResponse>> consumer, BackoffPolicy backoffPolicy,
                       BulkProcessor.Listener listener, Scheduler scheduler, int concurrentRequests) {
        assert concurrentRequests >= 0;
        this.logger = Loggers.getLogger(getClass());
        this.consumer = consumer;
        this.listener = listener;
        this.concurrentRequests = concurrentRequests;
        this.retry = new Retry(backoffPolicy, scheduler);
        this.semaphore = new Semaphore(concurrentRequests > 0 ? concurrentRequests : 1);
    }

    public void execute(BulkRequest bulkRequest, long executionId) {
        Runnable toRelease = () -> {};
        boolean bulkRequestSetupSuccessful = false;
        try {
            listener.beforeBulk(executionId, bulkRequest);
            semaphore.acquire();
            toRelease = semaphore::release;
            CountDownLatch latch = new CountDownLatch(1);
            retry.withBackoff(consumer, bulkRequest, new ActionListener<BulkResponse>() {
                @Override
                public void onResponse(BulkResponse response) {
                    try {
                        listener.afterBulk(executionId, bulkRequest, response);
                    } finally {
                        semaphore.release();
                        latch.countDown();
                    }
                }

                @Override
                public void onFailure(Exception e) {
                    try {
                        listener.afterBulk(executionId, bulkRequest, e);
                    } finally {
                        semaphore.release();
                        latch.countDown();
                    }
                }
            });
            bulkRequestSetupSuccessful = true;
            if (concurrentRequests == 0) {
                latch.await();
            }
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            logger.info(() -> new ParameterizedMessage("Bulk request {} has been cancelled.", executionId), e);
            listener.afterBulk(executionId, bulkRequest, e);
        } catch (Exception e) {
            logger.warn(() -> new ParameterizedMessage("Failed to execute bulk request {}.", executionId), e);
            listener.afterBulk(executionId, bulkRequest, e);
        } finally {
            if (bulkRequestSetupSuccessful == false) {  // if we fail on client.bulk() release the semaphore
                toRelease.run();
            }
        }
    }

    boolean awaitClose(long timeout, TimeUnit unit) throws InterruptedException {
        if (semaphore.tryAcquire(this.concurrentRequests, timeout, unit)) {
            semaphore.release(this.concurrentRequests);
            return true;
        }
        return false;
    }
}

BulkRequestHandler通過構造方法初始化了一個Retry任務對象,該對象中也傳入了一個Scheduler,且該對象和flush任務中傳入的是同一個線程池,該線程池內部只維護了一個固定線程。而execute方法首先會先根據Semaphore來控制并發執行數量,該并發數量在構建BulkProcessor時通過參數指定,通過上述配置發現該值配置為1。所以每次只允許一個線程執行該方法。即MQ消費業務線程和flush任務線程,同一時間只能有一個線程可以執行。然后下面在了解一下重試任務是如何執行的,具體看如下代碼:

public void withBackoff(BiConsumer<BulkRequest, ActionListener<BulkResponse>> consumer, BulkRequest bulkRequest,
                            ActionListener<BulkResponse> listener) {
        RetryHandler r = new RetryHandler(backoffPolicy, consumer, listener, scheduler);
        r.execute(bulkRequest);
    }

RetryHandler內部會執行提交bulkRequest請求,同時也會監聽bulkRequest執行異常狀態,然后執行任務重試邏輯,重試代碼如下:

private void retry(BulkRequest bulkRequestForRetry) {
            assert backoff.hasNext();
            TimeValue next = backoff.next();
            logger.trace("Retry of bulk request scheduled in {} ms.", next.millis());
            Runnable command = scheduler.preserveContext(() -> this.execute(bulkRequestForRetry));
            scheduledRequestFuture = scheduler.schedule(next, ThreadPool.Names.SAME, command);
        }

RetryHandler將執行失敗的bulk請求重新交給了內部scheduler線程池去執行,通過以上代碼了解,該線程池內部只維護了一個固定線程,同時該線程池可能還會被另一個flush任務去占用執行。所以如果重試邏輯正在執行的時候,此時線程池內的唯一線程正在執行flush任務,則會阻塞重試邏輯執行,重試邏輯不能執行完成,則不會釋放Semaphore,但是由于并發數量配置的是1,所以flush任務線程需要等待其他線程釋放一個Semaphore許可后才能繼續執行。所以此處形成了循環等待,導致Semaphore和BulkProcessor對象鎖都無法釋放,從而使得所有的MQ消費業務線程都阻塞在獲取BulkProcessor鎖之前。

同時,在GitHub的ES客戶端源碼客戶端上也能搜索到類似問題,例如:
https://github.com/elastic/elasticsearch/issues/47599 ,所以更加印證了之前的猜想,就是因為bulk的不斷重試從而引發了BulkProcessor內部的死鎖問題。

四、如何解決問題

既然前邊已經了解到了問題產生的原因,所以就有了如下幾種解決方案:

1.升級ES客戶端版本到7.6正式版,后續版本通過將異常重試任務線程池和flush任務線程池進行了物理隔離,從而避免了線程池的競爭,但是需要考慮版本兼容性。

2.由于該死鎖問題是由大量異常重試邏輯引起的,可以在不影響業務邏輯的情況取消重試邏輯,該方案可以不需要升級客戶端版本,但是需要評估業務影響,執行失敗的請求可以通過其他其他方式進行業務重試。

作者:京東零售 曹志飛

來源:京東云開發者社區

責任編輯:武曉燕 來源: 今日頭條
相關推薦

2024-11-29 16:35:33

解決死鎖Java線程

2021-03-03 08:57:46

java死鎖線程

2023-02-16 08:10:40

死鎖線程

2017-10-18 15:07:21

MySQL執行死鎖

2016-10-20 15:27:18

MySQLredo死鎖

2024-06-24 09:29:15

2022-05-16 07:35:47

死鎖工具jstack

2021-08-24 08:01:15

死鎖工具多線編程

2010-04-26 11:01:26

Oracle 10g

2019-04-15 13:15:12

數據庫MySQL死鎖

2019-03-15 16:20:45

MySQL死鎖排查命令

2017-12-19 14:00:16

數據庫MySQL死鎖排查

2024-08-27 22:04:37

2024-08-14 14:20:00

2009-09-25 11:34:54

Hibernate處理Hibernate批量

2009-09-24 09:45:23

Hibernate批量

2024-06-06 08:34:33

2011-09-02 14:18:53

OracleBULK COLLECFORALL

2021-11-14 05:00:56

排查Sdk方式

2021-06-01 07:55:42

DockerEOFk8s
點贊
收藏

51CTO技術棧公眾號

一区二区欧美亚洲| 天天综合日日夜夜精品| 成人免费福利视频| 日本最新中文字幕| 神马久久av| 欧美视频一区二| 精品一区二区三区毛片| 手机看片1024国产| 日韩和欧美一区二区| 久久五月天色综合| 一区二区不卡免费视频| 少妇精品视频在线观看| 亚洲午夜精品在线| 亚洲7777| 天天射,天天干| 精品午夜久久福利影院| 欧美一级大片在线观看| 黄色录像一级片| 天堂在线精品| 欧美一区欧美二区| 日韩中文字幕免费在线| 中文字幕在线观看播放| 国产亚洲一区二区三区| 99视频免费观看蜜桃视频| 黄色av网站免费| 国产一区亚洲| 久久成人亚洲精品| 一区二区三区少妇| 天堂av一区| 欧美精品在线视频| 国产日韩成人内射视频| 午夜欧美激情| 亚洲h在线观看| 亚洲三区在线观看| 国产精品免费观看| 久久综合久久鬼色| 国产免费一区| 高清毛片aaaaaaaaa片| 极品美女销魂一区二区三区| 国产精品高潮呻吟久久av野狼 | 亚洲av片在线观看| 国产 日韩 欧美大片| 亚洲aⅴ男人的天堂在线观看| 久久久999久久久| 久久人人精品| 性欧美xxxx大乳国产app| 亚洲视频一二区| 在线免费观看成人| 福利视频在线导航| 国产区在线观看成人精品 | 三级性生活视频| 巨大黑人极品videos精品| 色婷婷久久综合| 一本色道无码道dvd在线观看| 高清毛片在线观看| 欧美丝袜第一区| 91黄色小网站| 欧美日韩五码| 欧美午夜精品理论片a级按摩| 久久久久国产精品熟女影院| 欧美成人精品三级网站| 在线观看国产精品网站| jizz18女人| 在线免费成人| 91精品国产日韩91久久久久久| www.com黄色片| 2020日本在线视频中文字幕| 亚洲成av人片一区二区| 性高湖久久久久久久久aaaaa| dy888亚洲精品一区二区三区| 国产精品大尺度| 制服国产精品| 国产在线看片| 一区二区三区四区在线免费观看| 在线观看欧美一区| 精品51国产黑色丝袜高跟鞋| 亚洲三级在线看| 免费成人深夜夜行网站视频| 亚洲婷婷噜噜| 午夜欧美大尺度福利影院在线看| 午夜免费福利小电影| 96av在线| 日韩欧美精品在线观看| 亚洲精品高清无码视频| 99久久久国产精品免费调教网站 | 国产又粗又猛视频免费| 免费的国产精品| 成人国产在线激情| 国产黄色片免费| 岛国一区二区三区| 免费av一区二区三区| 午夜精品久久久久久久99| 99麻豆久久久国产精品免费| 欧美性色黄大片人与善| 在线观看免费版| 亚洲免费观看高清完整版在线观看熊 | 国产高清精品网站| 加勒比在线一区二区三区观看| 天堂成人在线观看| 中文字幕av资源一区| 大桥未久一区二区| 超碰在线最新网址| 日本道免费精品一区二区三区| 欧美午夜aaaaaa免费视频| 精品一区二区三区在线观看视频| 精品国产91洋老外米糕| 免费亚洲电影在线| 亚洲黄色在线观看| 欧美精品日韩在线| 国产精品久久久久久久久久10秀| 欧美成人精品三级在线观看| 日本一区二区三区免费视频| 老司机久久99久久精品播放免费| 成人网在线免费看| 午夜视频在线播放| 久久久蜜桃精品| 日韩人妻精品一区二区三区| 在线天堂新版最新版在线8| 偷偷要91色婷婷| 538任你躁在线精品免费| 最新国产精品精品视频| 国产一区二区三区视频免费| 久久久久久久久久久网| 秋霞成人午夜伦在线观看| 99蜜桃在线观看免费视频网站| 国产天堂在线| 亚洲一区二区三区四区在线观看| 欧美 日韩 国产 激情| 老司机亚洲精品一区二区| 亚洲精品二三区| 国产精久久久久久| 韩国一区二区在线观看| 欧美日韩精品免费观看视一区二区 | av在线资源站| 欧美性xxxxhd| 韩国三级在线看| 91蜜臀精品国产自偷在线| 97avcom| 亚洲乱码在线观看| 一色桃子久久精品亚洲| 日韩精品一区二区三区不卡| 超碰在线成人| 久久精品国产视频| 国产裸体无遮挡| 欧美激情一区二区三区蜜桃视频| 国产午夜福利100集发布| 一区二区三区视频播放| 日韩视频免费观看| 中文字幕乱码视频| 久久久久9999亚洲精品| 国产一区二区网| 高清一区二区三区| 欧美超级免费视 在线| 国产精品丝袜黑色高跟鞋| 亚洲国产精品精华液2区45| 2022亚洲天堂| 私拍精品福利视频在线一区| 色综合视频网站| 性生活免费网站| 亚洲精品成人精品456| 亚洲天堂av一区二区| 欧美中文一区二区| 国产精品福利在线| 成年人视频在线看| 欧美三级电影一区| 亚洲色图日韩精品| 日韩精品电影一区亚洲| 一区中文字幕在线观看| 91麻豆精品| 久久久久99精品久久久久| 国产伦精品一区二区三区视频痴汉 | 午夜精品久久久久久不卡8050| 先锋资源av在线| 国产欧美日韩一级| 欧美日韩一区在线观看视频| 欧美成人ⅴideosxxxxx| 影音先锋欧美精品| 亚洲无码精品在线观看| 日韩一区在线免费观看| 国产精品熟女一区二区不卡| 自拍偷拍欧美| 欧美精品二区三区四区免费看视频| 日本在线啊啊| 亚洲欧美在线第一页| 伊人成年综合网| 国产精品视频yy9299一区| 中文字幕中文在线| 狠狠色丁香久久综合频道| 久久久亚洲综合网站| 日韩av电影资源网| 日韩一区二区三区在线播放| 国产丝袜在线视频| 亚洲风情在线资源站| 成人无码av片在线观看| 激情五月激情综合网| 国产av熟女一区二区三区| 久久久久久久久久成人| 亚洲国产aⅴ精品一区二区| 欧美激情一区二区三区在线视频观看| 午夜影院在线视频| 欧美亚州韩日在线看免费版国语版 | 国产亚洲人成a在线v网站| 久久成人综合视频| 天堂中文资源在线观看| 在线观看国产91| 久久久久人妻一区精品色欧美| 99国产精品久久久久久久久久久| 久久久久久久久久久久91| 中文字幕一区二区三区久久网站| 国产精品免费一区二区三区四区| 成人影院在线免费观看| 欧美激情精品久久久久久变态| 五月婷婷开心中文字幕| 欧美人牲a欧美精品| 国产亚洲精品久久777777| 国产亚洲欧美中文| 国产精品日日摸夜夜爽| 日本午夜精品视频在线观看 | 污污的视频在线观看| 亚洲美女av黄| 国内精品久久久久久久久久久| 欧美性生交大片免费| 外国一级黄色片| 国产欧美一区二区精品性| 中文字幕视频三区| 青青国产91久久久久久 | 91亚洲欧美| 亚洲精品黄网在线观看| av中文字幕免费| 欧美在线色视频| 国产一级做a爱片久久毛片a| 亚洲人成精品久久久久| 久久精品—区二区三区舞蹈 | 鬼打鬼之黄金道士1992林正英| 日韩一区二区三区免费视频| 欧美激情成人在线视频| 五月天激情在线| 日韩色av导航| av在线播放免费| 亚洲欧美国产视频| 日韩在线视频免费| 日韩三级视频在线看| 国产有码在线观看| 欧美精品一卡两卡| 中文字幕av网站| 在线亚洲一区观看| 天天做天天爱夜夜爽| 亚洲一区二区三区四区在线免费观看 | 国产精品九九九九| 欧美性色aⅴ视频一区日韩精品| 久久久久久久久影院| 亚洲动漫第一页| 国产主播在线播放| 亚洲成人av在线电影| 国产真实乱偷精品视频| 亚洲小说欧美激情另类| 永久久久久久久| 最新国产成人在线观看| а天堂中文在线资源| 亚洲国产精品av| 懂色av蜜桃av| 中文字幕免费不卡在线| 精品人妻一区二区三区蜜桃视频| 91麻豆国产自产在线观看| 成人h动漫精品一区| 久久综合久久综合久久综合| 国产av自拍一区| 国产精品亲子伦对白| 国产精品一区二区亚洲| 国产日韩影视精品| 免费中文字幕日韩| 亚洲精品免费在线| 麻豆疯狂做受xxxx高潮视频| 夜夜嗨av一区二区三区| 国产极品美女高潮无套嗷嗷叫酒店 | 日韩美女视频一区二区在线观看| 精品人妻久久久久一区二区三区| 日韩亚洲国产中文字幕欧美| 亚洲精品国产一区二| 亚洲国产成人精品久久久国产成人一区| 丁香六月色婷婷| 亚洲精品国产精品自产a区红杏吧 亚洲精品国产精品乱码不99按摩 亚洲精品国产精品久久清纯直播 亚洲精品国产精品国自产在线 | 欧美激情网址| 国产综合色一区二区三区| 欧美在线导航| 天堂社区 天堂综合网 天堂资源最新版 | 欧美在线高清| 国产av熟女一区二区三区 | www精品美女久久久tv| 亚洲色图欧美日韩| 久久久欧美精品sm网站| 人人干在线观看| 亚洲综合一二三区| 自拍偷拍欧美亚洲| 91久久精品一区二区三区| 91丨porny丨在线中文| 91精品国产色综合久久ai换脸 | 日韩av大片站长工具| 国产欧美va欧美va香蕉在线| 成人97精品毛片免费看| 国产精品区免费视频| 久久成人av| 久久99久久久久久| 视频一区视频二区中文| 午夜免费一级片| 99re亚洲国产精品| 日韩视频在线观看免费视频| 亚洲免费观看高清完整版在线 | 国产日产一区| 四虎精品欧美一区二区免费| 99视频一区| 亚洲欧美手机在线| 91免费视频观看| 欧美激情精品久久久久久免费| 日韩欧美在线国产| 99久久精品日本一区二区免费| 亚洲国产欧美一区二区三区同亚洲 | 久久99久久精品欧美| 熟妇人妻久久中文字幕| 中文字幕在线免费不卡| 国产精品国产三级国产专区52 | 亚洲爱情岛论坛永久| 亚洲视频网站在线观看| 视频在线观看入口黄最新永久免费国产| 欧美激情女人20p| 新片速递亚洲合集欧美合集| 91在线免费看网站| 国产a久久精品一区二区三区| 996这里只有精品| 蜜臀av性久久久久蜜臀aⅴ流畅| 免费观看黄网站| 久久免费美女视频| 国产在线视频卡一卡二| 欧美日韩精品二区第二页| 色视频在线观看免费| 久久99久久99精品中文字幕| 亚洲第一会所001| 就去色蜜桃综合| 亚洲精品黄色| 俄罗斯女人裸体性做爰| 中文字幕在线观看一区| 黄色一级片免费在线观看| 日韩禁在线播放| 日本黄色免费在线| 国产综合第一页| 亚洲手机视频| 成年人看片网站| 亚洲激情在线播放| 亚洲无码精品国产| 日韩网站在线观看| 桃子视频成人app| 美女被啪啪一区二区| 亚洲美洲欧洲综合国产一区| 无码人妻丰满熟妇啪啪网站| 亚洲视频1区2区| 亚洲国产日韩在线观看| 欧美黄色片免费观看| 视频一区日韩精品| a级黄色片免费| 免费成人美女在线观看.| 五月天精品视频| 国产精品777777在线播放| 欧美国产乱视频| 日本在线视频一区二区三区| 少妇高潮大叫好爽喷水| 国产精品69毛片高清亚洲| 国产老头老太做爰视频| 日韩丝袜美女视频| 日韩激情av| 国产尤物99| 乱码第一页成人| 少妇人妻好深好紧精品无码| 欧美色偷偷大香| 菠萝菠萝蜜在线观看| 亚洲r级在线观看| 国产一区二区中文| 亚洲黄色免费在线观看| 欧美性生活大片免费观看网址| 日本高清视频网站| 国产精品久久久久久久久久ktv| 日韩成人a**站| 伊人免费视频二| 亚洲一区二区三区四区在线| 深夜视频在线免费| 97视频在线观看网址| 欧美激情国产在线| 日韩高清一二三区| 天天色综合天天| 69视频在线观看| 亚洲在线www| 国产综合视频| 午夜精产品一区二区在线观看的| 欧美日韩在线电影| 青草影视电视剧免费播放在线观看| 国产一区二区三区黄| 天堂va蜜桃一区二区三区| 在线视频这里只有精品| 日韩美女在线视频| 欧美电影免费看| 亚洲免费视频播放|