精品欧美一区二区三区在线观看 _久久久久国色av免费观看性色_国产精品久久在线观看_亚洲第一综合网站_91精品又粗又猛又爽_小泽玛利亚一区二区免费_91亚洲精品国偷拍自产在线观看 _久久精品视频在线播放_美女精品久久久_欧美日韩国产成人在线

Redis Pipelining 底層原理分析及實(shí)踐

開發(fā)
本文結(jié)合實(shí)踐分析了Spring Boot框架下Redis的Lettuce客戶端和Redisson客戶端對Pipeline特性的支持原理,并針對實(shí)踐過程中遇到的問題進(jìn)行了分析,可以幫助開發(fā)者了解不同客戶端對Pipeline支持原理及避免實(shí)際使用中出現(xiàn)

Redis是一種基于客戶端-服務(wù)端模型以及請求/響應(yīng)的TCP服務(wù)。在遇到批處理命令執(zhí)行時,Redis提供了Pipelining(管道)來提升批處理性能。本文結(jié)合實(shí)踐分析了Spring Boot框架下Redis的Lettuce客戶端和Redisson客戶端對Pipeline特性的支持原理,并針對實(shí)踐過程中遇到的問題進(jìn)行了分析,可以幫助開發(fā)者了解不同客戶端對Pipeline支持原理及避免實(shí)際使用中出現(xiàn)問題。

一、前言

Redis 已經(jīng)提供了像 mget 、mset 這種批量的命令,但是某些操作根本就不支持或沒有批量的操作,從而與 Redis 高性能背道而馳。為此, Redis基于管道機(jī)制,提供Redis Pipeline新特性。Redis Pipeline是一種通過一次性發(fā)送多條命令并在執(zhí)行完后一次性將結(jié)果返回,從而減少客戶端與redis的通信次數(shù)來實(shí)現(xiàn)降低往返延時時間提升操作性能的技術(shù)。目前,Redis Pipeline是被很多個版本的Redis 客戶端所支持的。 

二、Pipeline 底層原理分析

 2.1 Redis單個命令執(zhí)行基本步驟

Redis是一種基于客戶端-服務(wù)端模型以及請求/響應(yīng)的TCP服務(wù)。一次Redis客戶端發(fā)起的請求,經(jīng)過服務(wù)端的響應(yīng)后,大致會經(jīng)歷如下的步驟:

  1. 客戶端發(fā)起一個(查詢/插入)請求,并監(jiān)聽socket返回,通常情況都是阻塞模式等待Redis服務(wù)器的響應(yīng)。
  2. 服務(wù)端處理命令,并且返回處理結(jié)果給客戶端。
  3. 客戶端接收到服務(wù)的返回結(jié)果,程序從阻塞代碼處返回。


圖片

2.2 RTT 時間

Redis客戶端和服務(wù)端之間通過網(wǎng)絡(luò)連接進(jìn)行數(shù)據(jù)傳輸,數(shù)據(jù)包從客戶端到達(dá)服務(wù)器,并從服務(wù)器返回?cái)?shù)據(jù)回復(fù)客戶端的時間被稱之為RTT(Round Trip Time - 往返時間)。我們可以很容易就意識到,Redis在連續(xù)請求服務(wù)端時,如果RTT時間為250ms, 即使Redis每秒能處理100k請求,但也會因?yàn)榫W(wǎng)絡(luò)傳輸花費(fèi)大量時間,導(dǎo)致每秒最多也只能處理4個請求,導(dǎo)致整體性能的下降。

圖片


2.3 Redis Pipeline

為了提升效率,這時候Pipeline出現(xiàn)了。Pipelining不僅僅能夠降低RRT,實(shí)際上它極大的提升了單次執(zhí)行的操作數(shù)。這是因?yàn)槿绻皇褂肞ipelining,那么每次執(zhí)行單個命令,從訪問數(shù)據(jù)的結(jié)構(gòu)和服務(wù)端產(chǎn)生應(yīng)答的角度,它的成本是很低的。但是從執(zhí)行網(wǎng)絡(luò)IO的角度,它的成本其實(shí)是很高的。其中涉及到read()和write()的系統(tǒng)調(diào)用,這意味著需要從用戶態(tài)切換到內(nèi)核態(tài),而這個上下文的切換成本是巨大的。

當(dāng)使用Pipeline時,它允許多個命令的讀通過一次read()操作,多個命令的應(yīng)答使用一次write()操作,它允許客戶端可以一次發(fā)送多條命令,而不等待上一條命令執(zhí)行的結(jié)果。不僅減少了RTT,同時也減少了IO調(diào)用次數(shù)(IO調(diào)用涉及到用戶態(tài)到內(nèi)核態(tài)之間的切換),最終提升程序的執(zhí)行效率與性能。如下圖:

圖片

要支持Pipeline,其實(shí)既要服務(wù)端的支持,也要客戶端支持。對于服務(wù)端來說,所需要的是能夠處理一個客戶端通過同一個TCP連接發(fā)來的多個命令,可以理解為,這里將多個命令切分,和處理單個命令一樣,Redis就是這樣處理的。而客戶端,則是要將多個命令緩存起來,緩沖區(qū)滿了就發(fā)送,然后再寫緩沖,最后才處理Redis的應(yīng)答。

三、Pipeline 基本使用及性能比較

下面我們以給10w個set結(jié)構(gòu)分別插入一個整數(shù)值為例,分別使用jedis單個命令插入、jedis使用Pipeline模式進(jìn)行插入和redisson使用Pipeline模式進(jìn)行插入以及測試其耗時。

@Slf4j
public class RedisPipelineTestDemo {
    public static void main(String[] args) {
        //連接redis
        Jedis jedis = new Jedis("10.101.17.180", 6379);
 
        //jedis逐一給每個set新增一個value
        String zSetKey = "Pipeline-test-set";
        int size = 100000;
 
        long begin = System.currentTimeMillis();
        for (int i = 0; i < size; i++) {
            jedis.sadd(zSetKey + i, "aaa");
        }
        log.info("Jedis逐一給每個set新增一個value耗時:{}ms", (System.currentTimeMillis() - begin));
 
        //Jedis使用Pipeline模式         Pipeline Pipeline = jedis.Pipelined();
        begin = System.currentTimeMillis();
        for (int i = 0; i < size; i++) {             Pipeline.sadd(zSetKey + i, "bbb");
        }         Pipeline.sync();
        log.info("Jedis Pipeline模式耗時:{}ms", (System.currentTimeMillis() - begin));
 
        //Redisson使用Pipeline模式
        Config config = new Config();
        config.useSingleServer().setAddress("redis://10.101.17.180:6379");
        RedissonClient redisson = Redisson.create(config);
        RBatch redisBatch = redisson.createBatch();
 
        begin = System.currentTimeMillis();
        for (int i = 0; i < size; i++) {
            redisBatch.getSet(zSetKey + i).addAsync("ccc");
        }
        redisBatch.execute();
        log.info("Redisson Pipeline模式耗時:{}ms", (System.currentTimeMillis() - begin));
 
        //關(guān)閉         Pipeline.close();
        jedis.close();
        redisson.shutdown();
    }
}

測試結(jié)果如下:

Jedis逐一給每個set新增一個value耗時:162655ms

Jedis Pipeline模式耗時:504ms

Redisson Pipeline模式耗時:1399ms

我們發(fā)現(xiàn)使用Pipeline模式對應(yīng)的性能會明顯好于單個命令執(zhí)行的情況。

四、項(xiàng)目中實(shí)際應(yīng)用

在實(shí)際使用過程中有這樣一個場景,很多應(yīng)用在節(jié)假日的時候需要更新應(yīng)用圖標(biāo)樣式,在運(yùn)營進(jìn)行后臺配置的時候, 可以根據(jù)圈選的用戶標(biāo)簽預(yù)先計(jì)算出單個用戶需要下發(fā)的圖標(biāo)樣式并存儲在Redis里面,從而提升性能,這里就涉及Redis的批量操作問題,業(yè)務(wù)流程如下:

圖片

為了提升Redis操作性能,我們決定使用Redis Pipelining機(jī)制進(jìn)行批量執(zhí)行。

4.1 Redis 客戶端對比

針對Java技術(shù)棧而言,目前Redis使用較多的客戶端為Jedis、Lettuce和Redisson。

圖片

目前項(xiàng)目主要是基于SpringBoot開發(fā),針對Redis,其默認(rèn)的客戶端為Lettuce,所以我們基于Lettuce客戶端進(jìn)行分析。

4.2 Spring環(huán)境下Lettuce客戶端對Pipeline的實(shí)現(xiàn)

在Spring環(huán)境下,使用Redis的Pipeline也是很簡單的。spring-data-redis提供了

StringRedisTemplate簡化了對Redis的操作,  只需要調(diào)用StringRedisTemplate的executePipelined方法就可以了,但是在參數(shù)中提供了兩種回調(diào)方式:SessionCallback和RedisCallback。

兩種使用方式如下(這里以操作set結(jié)構(gòu)為例):

RedisCallback的使用方式:

public void testRedisCallback() {
        List<Integer> ids= Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9);
        Integer contentId = 1;
        redisTemplate.executePipelined(new InsertPipelineExecutionA(ids, contentId));
    }
  
@AllArgsConstructor
    private static class InsertPipelineExecutionA implements RedisCallback<Void> {
  
        private final List<Integer> ids;
        private final Integer contentId;
  
        @Override
        public Void doInRedis(RedisConnection connection) DataAccessException {
            RedisSetCommands redisSetCommands = connection.setCommands();
  
            ids.forEach(id-> {
                String redisKey = "aaa:" + id;
                String value = String.valueOf(contentId);
                redisSetCommands.sAdd(redisKey.getBytes(), value.getBytes());
            });
            return null;
        }
    }

SessionCallback的使用方式:

public void testSessionCallback() {
        List<Integer> ids= Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9);
        Integer contentId = 1;
        redisTemplate.executePipelined(new InsertPipelineExecutionB(ids, contentId));
    }
  
@AllArgsConstructor
    private static class InsertPipelineExecutionB implements SessionCallback<Void> {
  
        private final List<Integer> ids;
        private final Integer contentId;
  
        @Override
        public <K, V> Void execute(RedisOperations<K, V> operations) throws DataAccessException {
            SetOperations<String, String> setOperations = (SetOperations<String, String>) operations.opsForSet();
            ids.forEach(id-> {
                String redisKey = "aaa:" + id;
                String value = String.valueOf(contentId);
                setOperations.add(redisKey, value);
            });
            return null;
        }
    }

4.3 RedisCallBack和SessionCallback之間的比較

1、RedisCallBack和SessionCallback都可以實(shí)現(xiàn)回調(diào),通過它們可以在同一條連接中一次執(zhí)行多個redis命令。

2、RedisCallback使用的是原生

RedisConnection,用起來比較麻煩,比如上面執(zhí)行set的add操作,key和value需要進(jìn)行轉(zhuǎn)換,可讀性差,但原生api提供的功能比較齊全。

3、SessionCalback提供了良好的封裝,可以優(yōu)先選擇使用這種回調(diào)方式。

最終的代碼實(shí)現(xiàn)如下:

public void executeB(List<Integer> userIds, Integer iconId) {
        redisTemplate.executePipelined(new InsertPipelineExecution(userIds, iconId));
}
 
 
@AllArgsConstructor
private static class InsertPipelineExecution implements SessionCallback<Void> {
 
     private final List<Integer> userIds;
     private final Integer iconId;
 
     @Override
     public <K, V> Void execute(RedisOperations<K, V> operations) throws DataAccessException {
         SetOperations<String, String> setOperations = (SetOperations<String, String>) operations.opsForSet();
         userIds.forEach(userId -> {
             String redisKey = "aaa:" + userId;
             String value = String.valueOf(iconId);
             setOperations.add(redisKey, value);
         });
         return null;
     }
}

4.4 源碼分析

那么為什么使用Pipeline方式會對性能有較大提升呢,我們現(xiàn)在從源碼入手著重分析一下:

4.4.1 Pipeline方式下獲取連接相關(guān)原理分析:

@Override
    public List<Object> executePipelined(SessionCallback<?> session, @Nullable RedisSerializer<?> resultSerializer) {
 
        Assert.isTrue(initialized, "template not initialized; call afterPropertiesSet() before using it");
        Assert.notNull(session, "Callback object must not be null");
 
        //1. 獲取對應(yīng)的Redis連接工廠
        RedisConnectionFactory factory = getRequiredConnectionFactory();
        //2. 綁定連接過程
        RedisConnectionUtils.bindConnection(factory, enableTransactionSupport);
        try {
            //3. 執(zhí)行命令流程, 這里請求參數(shù)為RedisCallback, 里面有對應(yīng)的回調(diào)操作
           return execute((RedisCallback<List<Object>>) connection -> {
                //具體的回調(diào)邏輯
                connection.openPipeline();
                boolean PipelinedClosed = false;
                try {
                    //執(zhí)行命令
                    Object result = executeSession(session);
                    if (result != null) {
                        throw new InvalidDataAccessApiUsageException(
                                "Callback cannot return a non-null value as it gets overwritten by the Pipeline");
                    }
                    List<Object> closePipeline = connection.closePipeline();      PipelinedClosed = true;
                    return deserializeMixedResults(closePipeline, resultSerializer, hashKeySerializer, hashValueSerializer);
                } finally {
                    if (!PipelinedClosed) {
                        connection.closePipeline();
                    }
                }
            });
        } finally {
            RedisConnectionUtils.unbindConnection(factory);
        }
    }

① 獲取對應(yīng)的Redis連接工廠,這里要使用Pipeline特性需要使用

LettuceConnectionFactory方式,這里獲取的連接工廠就是LettuceConnectionFactory。

② 綁定連接過程,具體指的是將當(dāng)前連接綁定到當(dāng)前線程上面, 核心方法為:doGetConnection。

public static RedisConnection doGetConnection(RedisConnectionFactory factory, boolean allowCreate, boolean bind,
            boolean enableTransactionSupport) {
 
        Assert.notNull(factory, "No RedisConnectionFactory specified");
 
        //核心類,有緩存作用,下次可以從這里獲取已經(jīng)存在的連接
        RedisConnectionHolder connHolder = (RedisConnectionHolder) TransactionSynchronizationManager.getResource(factory);
 
        //如果connHolder不為null, 則獲取已經(jīng)存在的連接, 提升性能
        if (connHolder != null) {
            if (enableTransactionSupport) {
                potentiallyRegisterTransactionSynchronisation(connHolder, factory);
            }
            return connHolder.getConnection();
        }
 
        ......
 
        //第一次獲取連接,需要從Redis連接工廠獲取連接
        RedisConnection conn = factory.getConnection();
 
        //bind = true 執(zhí)行綁定
        if (bind) {
 
            RedisConnection connectionToBind = conn;
            ......
            connHolder = new RedisConnectionHolder(connectionToBind);
 
            //綁定核心代碼: 將獲取的連接和當(dāng)前線程綁定起來
            TransactionSynchronizationManager.bindResource(factory, connHolder);
            ......
 
            return connHolder.getConnection();
        }
 
        return conn;
    }

里面有個核心類RedisConnectionHolder,我們看一下

RedisConnectionHolder connHolder = 

(RedisConnectionHolder) 

TransactionSynchronizationManager.getResource(factory);

@Nullable
    public static Object getResource(Object key) {
        Object actualKey = TransactionSynchronizationUtils.unwrapResourceIfNecessary(key);
        Object value = doGetResource(actualKey);
        if (value != null && logger.isTraceEnabled()) {
            logger.trace("Retrieved value [" + value + "] for key [" + actualKey + "] bound to thread [" +
                    Thread.currentThread().getName() + "]");
        }
        return value;
    }

里面有一個核心方法doGetResource

(actualKey),大家很容易猜測這里涉及到一個map結(jié)構(gòu),如果我們看源碼,也確實(shí)是這樣一個結(jié)構(gòu)。

@Nullable
    private static Object doGetResource(Object actualKey) {
        Map<Object, Object> map = resources.get();
        if (map == null) {
            return null;
        }
        Object value = map.get(actualKey);
        // Transparently remove ResourceHolder that was marked as void...
        if (value instanceof ResourceHolder && ((ResourceHolder) value).isVoid()) {
            map.remove(actualKey);
            // Remove entire ThreadLocal if empty...
            if (map.isEmpty()) {
                resources.remove();
            }
            value = null;
        }
        return value;
    }

resources是一個ThreadLocal類型,這里會涉及到根據(jù)RedisConnectionFactory獲取到連接connection的邏輯,如果下一次是同一個actualKey,那么就直接使用已經(jīng)存在的連接,而不需要新建一個連接。第一次這里map為null,就直接返回了,然后回到doGetConnection方法,由于這里bind為true,我們會執(zhí)行TransactionSynchronizationManager.bindResource(factory, connHolder);,也就是將連接和當(dāng)前線程綁定了起來。

public static void bindResource(Object key, Object value) throws IllegalStateException {
        Object actualKey = TransactionSynchronizationUtils.unwrapResourceIfNecessary(key);
        Assert.notNull(value, "Value must not be null");
        Map<Object, Object> map = resources.get();
        // set ThreadLocal Map if none found
        if (map == null) {
            map = new HashMap<>();
            resources.set(map);
        }
        Object oldValue = map.put(actualKey, value);
        ......
    }

③ 我們回到executePipelined,在獲取到連接工廠,將連接和當(dāng)前線程綁定起來以后,就開始需要正式去執(zhí)行命令了, 這里會調(diào)用execute方法

@Override
@Nullable
public <T> T execute(RedisCallback<T> action) {
    return execute(action, isExposeConnection());
}

這里我們注意到execute方法的入?yún)镽edisCallback<T>action,RedisCallback對應(yīng)的doInRedis操作如下,這里在后面的調(diào)用過程中會涉及到回調(diào)。

connection.openPipeline();
boolean PipelinedClosed = false;
try {
    Object result = executeSession(session);
    if (result != null) {
        throw new InvalidDataAccessApiUsageException(
                "Callback cannot return a non-null value as it gets overwritten by the Pipeline");
    }
    List<Object> closePipeline = connection.closePipeline();  PipelinedClosed = true;
    return deserializeMixedResults(closePipeline, resultSerializer, hashKeySerializer, hashValueSerializer);
} finally {
    if (!PipelinedClosed) {
        connection.closePipeline();
    }
}

我們再來看execute(action, 

isExposeConnection())方法,這里最終會調(diào)用

<T>execute(RedisCallback<T>action, boolean exposeConnection, boolean Pipeline)方法。

@Nullable
public <T> T execute(RedisCallback<T> action, boolean exposeConnection, boolean Pipeline) {
 
    Assert.isTrue(initialized, "template not initialized; call afterPropertiesSet() before using it");
    Assert.notNull(action, "Callback object must not be null");
 
    //獲取對應(yīng)的連接工廠
    RedisConnectionFactory factory = getRequiredConnectionFactory();
    RedisConnection conn = null;
    try {
        if (enableTransactionSupport) {
            // only bind resources in case of potential transaction synchronization
            conn = RedisConnectionUtils.bindConnection(factory, enableTransactionSupport);
        } else {
            //獲取對應(yīng)的連接(enableTransactionSupport=false)   
            conn = RedisConnectionUtils.getConnection(factory);
        }
 
        boolean existingConnection = TransactionSynchronizationManager.hasResource(factory);
 
        RedisConnection connToUse = preProcessConnection(conn, existingConnection);
 
        boolean PipelineStatus = connToUse.isPipelined();
        if (Pipeline && !PipelineStatus) {
            connToUse.openPipeline();
        }
 
        RedisConnection connToExpose = (exposeConnection ? connToUse : createRedisConnectionProxy(connToUse));
         
        //核心方法,這里就開始執(zhí)行回調(diào)操作
        T result = action.doInRedis(connToExpose);
 
        // close Pipeline
        if (Pipeline && !PipelineStatus) {
            connToUse.closePipeline();
        }
 
        // TODO: any other connection processing?
        return postProcessResult(result, connToUse, existingConnection);
    } finally {
        RedisConnectionUtils.releaseConnection(conn, factory, enableTransactionSupport);
    }
}

我們看到這里最開始也是獲取對應(yīng)的連接工廠,然后獲取對應(yīng)的連接

(enableTransactionSupport=false),具體調(diào)用是

RedisConnectionUtils.getConnection(factory)方法,最終會調(diào)用

RedisConnection doGetConnection(RedisConnectionFactory factory, boolean allowCreate, boolean bind, boolean enableTransactionSupport),此時bind為false

public static RedisConnection doGetConnection(RedisConnectionFactory factory, boolean allowCreate, boolean bind,
        boolean enableTransactionSupport) {
 
    Assert.notNull(factory, "No RedisConnectionFactory specified");
 
    //直接獲取與當(dāng)前線程綁定的Redis連接
    RedisConnectionHolder connHolder = (RedisConnectionHolder) TransactionSynchronizationManager.getResource(factory);
 
    if (connHolder != null) {
        if (enableTransactionSupport) {
            potentiallyRegisterTransactionSynchronisation(connHolder, factory);
        }
        return connHolder.getConnection();
    }
 
    ......
 
    return conn;
}

前面我們分析過一次,這里調(diào)用

RedisConnectionHolder connHolder = 

(RedisConnectionHolder) TransactionSynchronizationManager.getResource(factory);會獲取到之前和當(dāng)前線程綁定的Redis,而不會新創(chuàng)建一個連接。

然后會去執(zhí)行T result = action.

doInRedis(connToExpose),這里的action為RedisCallback,執(zhí)行doInRedis為:

//開啟Pipeline功能
connection.openPipeline();
boolean PipelinedClosed = false;
try {
    //執(zhí)行Redis命令
    Object result = executeSession(session);
    if (result != null) {
        throw new InvalidDataAccessApiUsageException(
                "Callback cannot return a non-null value as it gets overwritten by the Pipeline");
    }
    List<Object> closePipeline = connection.closePipeline();  PipelinedClosed = true;
    return deserializeMixedResults(closePipeline, resultSerializer, hashKeySerializer, hashValueSerializer);
} finally {
    if (!PipelinedClosed) {
        connection.closePipeline();
    }
}

這里最開始會開啟Pipeline功能,然后執(zhí)行

Object result = executeSession(session);

private Object executeSession(SessionCallback<?> session) {
    return session.execute(this);
}

這里會調(diào)用我們自定義的execute方法

@AllArgsConstructor
private static class InsertPipelineExecution implements SessionCallback<Void> {
 
     private final List<Integer> userIds;
     private final Integer iconId;
 
     @Override
     public <K, V> Void execute(RedisOperations<K, V> operations) throws DataAccessException {
         SetOperations<String, String> setOperations = (SetOperations<String, String>) operations.opsForSet();
         userIds.forEach(userId -> {
             String redisKey = "aaa:" + userId;
             String value = String.valueOf(iconId);
             setOperations.add(redisKey, value);
         });
         return null;
     }
}

進(jìn)入到foreach循環(huán),執(zhí)行DefaultSetOperations的add方法。

@Override
public Long add(K key, V... values) {
 
    byte[] rawKey = rawKey(key);
    byte[][] rawValues = rawValues((Object[]) values);
    //這里的connection.sAdd是后續(xù)回調(diào)要執(zhí)行的方法
   return execute(connection -> connection.sAdd(rawKey, rawValues), true);
}

這里會繼續(xù)執(zhí)行redisTemplate的execute方法,里面最終會調(diào)用我們之前分析過的<T>T execute(RedisCallback<T>action, boolean exposeConnection, boolean Pipeline)方法。

@Nullable
public <T> T execute(RedisCallback<T> action, boolean exposeConnection, boolean Pipeline) {
 
    Assert.isTrue(initialized, "template not initialized; call afterPropertiesSet() before using it");
    Assert.notNull(action, "Callback object must not be null");
 
    RedisConnectionFactory factory = getRequiredConnectionFactory();
    RedisConnection conn = null;
    try {
 
        ......
        //再次執(zhí)行回調(diào)方法,這里執(zhí)行的Redis基本數(shù)據(jù)結(jié)構(gòu)對應(yīng)的操作命令
        T result = action.doInRedis(connToExpose);
         
        ......
 
        // TODO: any other connection processing?
        return postProcessResult(result, connToUse, existingConnection);
    } finally {
        RedisConnectionUtils.releaseConnection(conn, factory, enableTransactionSupport);
    }
}

這里會繼續(xù)執(zhí)行T result = 

action.doInRedis(connToExpose);,這里其實(shí)執(zhí)行的doInRedis方法為:

connection -> connection.sAdd(rawKey, rawValues)

4.4.2 Pipeline方式下執(zhí)行命令的流程分析:

① 接著上面的流程分析,這里的sAdd方法實(shí)際調(diào)用的是DefaultStringRedisConnection的sAdd方法

@Override
public Long sAdd(byte[] key, byte[]... values) {
    return convertAndReturn(delegate.sAdd(key, values), identityConverter);
}

② 這里會進(jìn)一步調(diào)用

DefaultedRedisConnection的sAdd方法

@Override
@Deprecated
default Long sAdd(byte[] key, byte[]... values) {
    return setCommands().sAdd(key, values);
}

③ 接著調(diào)用LettuceSetCommands的sAdd方法

@Override
public Long sAdd(byte[] key, byte[]... values) {
 
    Assert.notNull(key, "Key must not be null!");
    Assert.notNull(values, "Values must not be null!");
    Assert.noNullElements(values, "Values must not contain null elements!");
 
    try {
        // 如果開啟了 Pipelined 模式,獲取的是 異步連接,進(jìn)行異步操作
        if (isPipelined()) {    Pipeline(connection.newLettuceResult(getAsyncConnection().sadd(key, values)));
            return null;
        }


        if (isQueueing()) {
            transaction(connection.newLettuceResult(getAsyncConnection().sadd(key, values)));
            return null;
        }
        //常規(guī)模式下,使用的是同步操作
        return getConnection().sadd(key, values);
    } catch (Exception ex) {
        throw convertLettuceAccessException(ex);
    }
}

這里我們開啟了Pipeline, 實(shí)際會調(diào)用

Pipeline(connection.newLettuceResult(getAsyncConnection().sadd(key, values))); 也就是獲取異步連接getAsyncConnection,然后進(jìn)行異步操作sadd,而常規(guī)模式下,使用的是同步操作,所以在Pipeline模式下,執(zhí)行效率更高。

從上面的獲取連接和具體命令執(zhí)行相關(guān)源碼分析可以得出使用Lettuce客戶端Pipeline模式高效的根本原因:

  1. 普通模式下,每執(zhí)行一個命令都需要先打開一個連接,命令執(zhí)行完畢以后又需要關(guān)閉這個連接,執(zhí)行下一個命令時,又需要經(jīng)過連接打開和關(guān)閉的流程;而Pipeline的所有命令的執(zhí)行只需要經(jīng)過一次連接打開和關(guān)閉。
  2. 普通模式下命令的執(zhí)行是同步阻塞模式,而Pipeline模式下命令的執(zhí)行是異步非阻塞模式。

五、項(xiàng)目中遇到的坑

前面介紹了涉及到批量操作,可以使用Redis Pipelining機(jī)制,那是不是任何批量操作相關(guān)的場景都可以使用呢,比如list類型數(shù)據(jù)的批量移除操作,我們的代碼最開始是這么寫的:

public void deleteSet(String updateKey, Set<Integer> userIds) {
        if (CollectionUtils.isEmpty(userIds)) {
            return;
        }
 
        redisTemplate.executePipelined(new DeleteListCallBack(userIds, updateKey));
    }
 
@AllArgsConstructor
private static class DeleteListCallBack implements SessionCallback<Object> {
 
    private Set<Integer> userIds;
 
    private String updateKey;
 
    @Override
    public <K, V> Object execute(RedisOperations<K, V> operations) throws DataAccessException {
        ListOperations<String, String> listOperations = (ListOperations<String, String>) operations.opsForList();
        userIds.forEach(userId -> listOperations.remove(updateKey, 1, userId.toString()));
        return null;
    }
}

在數(shù)據(jù)量比較小的時候沒有出現(xiàn)問題,直到有一條收到了Redis的內(nèi)存和cpu利用率的告警消息,我們發(fā)現(xiàn)這么使用是有問題的,核心原因在于list的lrem操作的時間復(fù)雜度是O(N+M),其中N是list的長度, M是要移除的元素的個數(shù),而我們這里還是一個一個移除的,當(dāng)然會導(dǎo)致Redis數(shù)據(jù)積壓和cpu每秒ops升高導(dǎo)致cpu利用率飚高。也就是說,即使使用Pipeline進(jìn)行批量操作,但是由于單次操作很耗時,是會導(dǎo)致整個Redis出現(xiàn)問題的。

后面我們進(jìn)行了優(yōu)化,選用了list的ltrim命令,一次命令執(zhí)行批量remove操作:

public void deleteSet(String updateKey, Set<Integer> deviceIds) {
        if (CollectionUtils.isEmpty(deviceIds)) {
            return;
        }
 
        int maxSize = 10000;
        redisTemplate.opsForList().trim(updateKey, maxSize + 1, -1);
    }

由于ltrim本身的時間復(fù)雜度為O(M), 其中M要移除的元素的個數(shù),相比于原始方案的lrem,效率提升很多,可以不需要使用Redis Pipeline,優(yōu)化結(jié)果使得Redis內(nèi)存利用率和cpu利用率都極大程度得到緩解。


圖片

六、Redisson 對 Redis Pipeline 特性支持

在redisson官方文檔中額外特性介紹中有說到批量命令執(zhí)行這個特性, 也就是多個命令在一次網(wǎng)絡(luò)調(diào)用中集中發(fā)送,該特性是RBatch這個類支持的,從這個類的描述來看,主要是為Redis Pipeline這個特性服務(wù)的,并且主要是通過隊(duì)列和異步實(shí)現(xiàn)的。

/**
 * Interface for using Redis Pipeline feature.
 * <p>
 * All method invocations on objects got through this interface
 * are batched to separate queue and could be executed later
 * with <code>execute()</code> or <code>executeAsync()</code> methods.
 *
 *
 * @author Nikita Koksharov
 *
 */
public interface RBatch {
 
    /**
     * Returns stream instance by <code>name</code>
     *
     * @param <K> type of key
     * @param <V> type of value
     * @param name of stream
     * @return RStream object
     */
    <K, V> RStreamAsync<K, V> getStream(String name);
     
    /**
     * Returns stream instance by <code>name</code>
     * using provided <code>codec</code> for entries.
     *
     * @param <K> type of key
     * @param <V> type of value
     * @param name - name of stream
     * @param codec - codec for entry
     * @return RStream object
     */
    <K, V> RStreamAsync<K, V> getStream(String name, Codec codec);
     
    ......
    
    /**
     * Returns list instance by name.
     *
     * @param <V> type of object
     * @param name - name of object
     * @return List object
     */
    <V> RListAsync<V> getList(String name);
 
    <V> RListAsync<V> getList(String name, Codec codec);
 
    ......
 
    /**
     * Executes all operations accumulated during async methods invocations.
     * <p>
     * If cluster configuration used then operations are grouped by slot ids
     * and may be executed on different servers. Thus command execution order could be changed
     *
     * @return List with result object for each command
     * @throws RedisException in case of any error
     *
     */
    BatchResult<?> execute() throws RedisException;
 
    /**
     * Executes all operations accumulated during async methods invocations asynchronously.
     * <p>
     * In cluster configurations operations grouped by slot ids
     * so may be executed on different servers. Thus command execution order could be changed
     *
     * @return List with result object for each command
     */
    RFuture<BatchResult<?>> executeAsync();
 
    /**
     * Discard batched commands and release allocated buffers used for parameters encoding.
     */
    void discard();
 
    /**
     * Discard batched commands and release allocated buffers used for parameters encoding.
     *
     * @return void
     */
    RFuture<Void> discardAsync();
 
 
}

簡單的測試代碼如下:

@Slf4j
public class RedisPipelineTest {
    public static void main(String[] args) {
        //Redisson使用Pipeline模式
        Config config = new Config();
        config.useSingleServer().setAddress("redis://xx.xx.xx.xx:6379");
        RedissonClient redisson = Redisson.create(config);
        RBatch redisBatch = redisson.createBatch();
 
        int size = 100000;
        String zSetKey = "Pipeline-test-set";
        long begin = System.currentTimeMillis();
         
        //將命令放入隊(duì)列中
        for (int i = 0; i < size; i++) {
            redisBatch.getSet(zSetKey + i).addAsync("ccc");
        }
        //批量執(zhí)行命令
        redisBatch.execute();
        log.info("Redisson Pipeline模式耗時:{}ms", (System.currentTimeMillis() - begin));
 
        //關(guān)閉
        redisson.shutdown();
    }
}

核心方法分析:

1.建Redisson客戶端RedissonClient redisson = redisson.create(config), 該方法最終會調(diào)用Reddison的構(gòu)造方法Redisson(Config config)。

protected Redisson(Config config) {
        this.config = config;
        Config configCopy = new Config(config);
 
        connectionManager = ConfigSupport.createConnectionManager(configCopy);
        RedissonObjectBuilder objectBuilder = null;
        if (config.isReferenceEnabled()) {
            objectBuilder = new RedissonObjectBuilder(this);
        }
        //新建異步命令執(zhí)行器
      commandExecutor = new CommandSyncService(connectionManager, objectBuilder);
        //執(zhí)行刪除超時任務(wù)的定時器
      evictionScheduler = new EvictionScheduler(commandExecutor);
        writeBehindService = new WriteBehindService(commandExecutor);
}

該構(gòu)造方法中會新建異步命名執(zhí)行器CommandAsyncExecutor commandExecutor和用戶刪除超時任務(wù)的EvictionScheduler evictionScheduler。

2.創(chuàng)建RBatch實(shí)例RBatch redisBatch = redisson.createBatch(), 該方法會使用到步驟1中的commandExecutor和evictionScheduler實(shí)例對象。

@Override
public RBatch createBatch(BatchOptions options) {
    return new RedissonBatch(evictionScheduler, commandExecutor, options);
}
 
public RedissonBatch(EvictionScheduler evictionScheduler, CommandAsyncExecutor executor, BatchOptions options) {
        this.executorService = new CommandBatchService(executor, options);
        this.evictionScheduler = evictionScheduler;
}

其中的options對象會影響后面批量執(zhí)行命令的流程。

3. 異步給set集合添加元素的操作addAsync,這里會具體調(diào)用RedissonSet的addAsync方法

@Override
public RFuture<Boolean> addAsync(V e) {
    String name = getRawName(e);
    return commandExecutor.writeAsync(name, codec, RedisCommands.SADD_SINGLE, name, encode(e));
}

(1)接著調(diào)用CommandAsyncExecutor的異步寫入方法writeAsync。

@Override
public <T, R> RFuture<R> writeAsync(String key, Codec codec, RedisCommand<T> command, Object... params) {
    RPromise<R> mainPromise = createPromise();
    NodeSource source = getNodeSource(key);
    async(false, source, codec, command, params, mainPromise, false);
    return mainPromise;
}

(2) 接著調(diào)用批量命令執(zhí)行器

CommandBatchService的異步發(fā)送命令。

@Override
public <V, R> void async(boolean readOnlyMode, NodeSource nodeSource,
        Codec codec, RedisCommand<V> command, Object[] params, RPromise<R> mainPromise, boolean ignoreRedirect) {
    if (isRedisBasedQueue()) {
        boolean isReadOnly = options.getExecutionMode() == ExecutionMode.REDIS_READ_ATOMIC;
        RedisExecutor<V, R> executor = new RedisQueuedBatchExecutor<>(isReadOnly, nodeSource, codec, command, params, mainPromise,
                false, connectionManager, objectBuilder, commands, connections, options, index, executed, latch, referenceType);
        executor.execute();
    } else {
        //執(zhí)行分支
        RedisExecutor<V, R> executor = new RedisBatchExecutor<>(readOnlyMode, nodeSource, codec, command, params, mainPromise,
                false, connectionManager, objectBuilder, commands, options, index, executed, referenceType);
        executor.execute();
    }
     
}


(3) 接著調(diào)用了RedisBatchExecutor.

execute方法和BaseRedisBatchExecutor.

addBatchCommandData方法。

@Override
public void execute() {
    addBatchCommandData(params);
}
 
protected final void addBatchCommandData(Object[] batchParams) {
    MasterSlaveEntry msEntry = getEntry(source);
    Entry entry = commands.get(msEntry);
    if (entry == null) {
        entry = new Entry();
        Entry oldEntry = commands.putIfAbsent(msEntry, entry);
        if (oldEntry != null) {
            entry = oldEntry;
        }
    }
 
    if (!readOnlyMode) {
        entry.setReadOnlyMode(false);
    }
 
    Codec codecToUse = getCodec(codec);
    BatchCommandData<V, R> commandData = new BatchCommandData<V, R>(mainPromise, codecToUse, command, batchParams, index.incrementAndGet());
    entry.getCommands().add(commandData);
}

這里的commands以主節(jié)點(diǎn)為KEY,以待發(fā)送命令隊(duì)列列表為VALUE(Entry),保存一個MAP.然后會把命令都添加到entry的commands命令隊(duì)列中, Entry結(jié)構(gòu)如下面代碼所示。

public static class Entry {
 
    Deque<BatchCommandData<?, ?>> commands = new LinkedBlockingDeque<>();
    volatile boolean readOnlyMode = true;
 
    public Deque<BatchCommandData<?, ?>> getCommands() {
        return commands;
    }
 
    public void setReadOnlyMode(boolean readOnlyMode) {
        this.readOnlyMode = readOnlyMode;
    }
 
    public boolean isReadOnlyMode() {
        return readOnlyMode;
    }
     
 
    public void clearErrors() {
        for (BatchCommandData<?, ?> commandEntry : commands) {
            commandEntry.clearError();
        }
    }
 
}

4. 批量執(zhí)行命令redisBatch.execute(),這里會最終調(diào)用CommandBatchService的executeAsync方法,該方法完整代碼如下,我們下面來逐一進(jìn)行拆解。

public RFuture<BatchResult<?>> executeAsync() {
         
        ......
 
        RPromise<BatchResult<?>> promise = new RedissonPromise<>();
        RPromise<Void> voidPromise = new RedissonPromise<Void>();
        if (this.options.isSkipResult()
                && this.options.getSyncSlaves() == 0) {
            ......
        } else {
            //這里是對異步執(zhí)行結(jié)果進(jìn)行處理,可以先忽略, 后面會詳細(xì)講,先關(guān)注批量執(zhí)行命令的邏輯
            voidPromise.onComplete((res, ex) -> {
                ......
            });
        }
 
        AtomicInteger slots = new AtomicInteger(commands.size());
 
        ......
         
        //真正執(zhí)行的代碼入口,批量執(zhí)行命令
        for (Map.Entry<MasterSlaveEntry, Entry> e : commands.entrySet()) {
            RedisCommonBatchExecutor executor = new RedisCommonBatchExecutor(new NodeSource(e.getKey()), voidPromise,
                                                    connectionManager, this.options, e.getValue(), slots, referenceType);
            executor.execute();
        }
        return promise;
    }

里面會用到我們在3.3步驟所生成的commands實(shí)例。

(1)接著調(diào)用了基類RedisExecutor的execute方法

public void execute() {
         
        ......
 
        connectionFuture.onComplete((connection, e) -> {
            if (connectionFuture.isCancelled()) {
                connectionManager.getShutdownLatch().release();
                return;
            }
 
            if (!connectionFuture.isSuccess()) {
                connectionManager.getShutdownLatch().release();
                exception = convertException(connectionFuture);
                return;
            }
 
            //調(diào)用RedisCommonBatchExecutor的sendCommand方法, 里面會將多個命令放到一個List<CommandData<?, ?>> list列表里面
        sendCommand(attemptPromise, connection);
 
            writeFuture.addListener(new ChannelFutureListener() {
                @Override
                public void operationComplete(ChannelFuture future) throws Exception {
                    checkWriteFuture(writeFuture, attemptPromise, connection);
                }
            });
        });
 
        ......
    }

(2)接著調(diào)用

RedisCommonBatchExecutor的sendCommand方法,里面會將多個命令放到一個List<commanddata> list列表里面。

@Override
    protected void sendCommand(RPromise<Void> attemptPromise, RedisConnection connection) {
        boolean isAtomic = options.getExecutionMode() != ExecutionMode.IN_MEMORY;
        boolean isQueued = options.getExecutionMode() == ExecutionMode.REDIS_READ_ATOMIC
                                || options.getExecutionMode() == ExecutionMode.REDIS_WRITE_ATOMIC;
 
        //將多個命令放到一個List<CommandData<?, ?>> list列表里面
      List<CommandData<?, ?>> list = new ArrayList<>(entry.getCommands().size());
        if (source.getRedirect() == Redirect.ASK) {
            RPromise<Void> promise = new RedissonPromise<Void>();
            list.add(new CommandData<Void, Void>(promise, StringCodec.INSTANCE, RedisCommands.ASKING, new Object[] {}));
        }
        for (CommandData<?, ?> c : entry.getCommands()) {
            if ((c.getPromise().isCancelled() || c.getPromise().isSuccess())
                    && !isWaitCommand(c)
                        && !isAtomic) {
                // skip command
                continue;
            }
            list.add(c);
        }
         
        ......
        //調(diào)用RedisConnection的send方法,將命令一次性發(fā)到Redis服務(wù)器端
      writeFuture = connection.send(new CommandsData(attemptPromise, list, options.isSkipResult(), isAtomic, isQueued, options.getSyncSlaves() > 0));
    }

(3)接著調(diào)用RedisConnection的send方法,通過Netty通信發(fā)送命令到Redis服務(wù)器端執(zhí)行,這里也驗(yàn)證了Redisson客戶端底層是采用Netty進(jìn)行通信的。

public ChannelFuture send(CommandsData data) {
        return channel.writeAndFlush(data);
}

5. 接收返回結(jié)果,這里主要是監(jiān)聽事件是否完成,然后組裝返回結(jié)果, 核心方法是步驟4提到的CommandBatchService的executeAsync方法,里面會對返回結(jié)果進(jìn)行監(jiān)聽和處理, 核心代碼如下:

public RFuture<BatchResult<?>> executeAsync() {
    ......
     
    RPromise<BatchResult<?>> promise = new RedissonPromise<>();
    RPromise<Void> voidPromise = new RedissonPromise<Void>();
    if (this.options.isSkipResult()
            && this.options.getSyncSlaves() == 0) {
        ......
    } else {
        voidPromise.onComplete((res, ex) -> {
            //對返回結(jié)果的處理
            executed.set(true);
            ......
            List<Object> responses = new ArrayList<Object>(entries.size());
            int syncedSlaves = 0;
            for (BatchCommandData<?, ?> commandEntry : entries) {
                if (isWaitCommand(commandEntry)) {
                    syncedSlaves = (Integer) commandEntry.getPromise().getNow();
                } else if (!commandEntry.getCommand().getName().equals(RedisCommands.MULTI.getName())
                        && !commandEntry.getCommand().getName().equals(RedisCommands.EXEC.getName())
                        && !this.options.isSkipResult()) {
                     
                    ......
                    //獲取單個命令的執(zhí)行結(jié)果
                    Object entryResult = commandEntry.getPromise().getNow();
                    ......
                    //將單個命令執(zhí)行結(jié)果放到List中
                    responses.add(entryResult);
                }
            }
             
            BatchResult<Object> result = new BatchResult<Object>(responses, syncedSlaves);
            promise.trySuccess(result);
            ......
        });
    }
 
    ......
    return promise;
}

這里會把單個命令的執(zhí)行結(jié)果放到responses里面,最終返回RPromise<batchresult>promise。

從上面的分析來看,Redisson客戶端對Redis Pipeline的支持也是從多個命令在一次網(wǎng)絡(luò)通信中執(zhí)行和異步處理來實(shí)現(xiàn)的。

七、總結(jié)

Redis提供了Pipelining進(jìn)行批量操作的高級特性,極大地提高了部分?jǐn)?shù)據(jù)類型沒有批量執(zhí)行命令導(dǎo)致的執(zhí)行耗時而引起的性能問題,但是我們在使用的過程中需要考慮Pipeline操作中單個命令執(zhí)行的耗時問題,否則帶來的效果可能適得其反。最后擴(kuò)展分析了Redisson客戶端對Redis Pipeline特性的支持原理,可以與Lettuce客戶端對Redis Pipeline支持原理進(jìn)行比較,加深Pipeline在不同Redis客戶端實(shí)現(xiàn)方式的理解。

責(zé)任編輯:龐桂玉 來源: vivo互聯(lián)網(wǎng)技術(shù)
相關(guān)推薦

2019-10-16 16:33:41

Docker架構(gòu)語言

2024-03-07 07:47:04

代碼塊Monitor

2017-10-20 15:25:17

DockerOpenStack Cvolume

2018-09-11 09:33:49

Redis高可用架構(gòu)

2018-04-09 12:25:11

2021-08-09 11:15:28

MybatisJavaSpring

2024-05-10 11:35:22

Redis延時隊(duì)列數(shù)據(jù)庫

2021-06-28 06:45:06

內(nèi)存溢出內(nèi)存泄露JavaScript

2017-05-25 09:45:35

2020-04-27 07:13:37

Nginx底層進(jìn)程

2024-07-07 21:49:22

2022-05-31 08:04:03

Redis高可用集群

2023-11-06 18:37:23

虛擬線程編寫

2021-10-04 21:11:18

Redis混合持久化

2023-05-31 08:39:04

redis事件驅(qū)動

2020-11-05 11:14:29

Docker底層原理

2023-01-04 07:54:03

HashMap底層JDK

2024-01-05 09:00:00

SpringMVC軟件

2009-07-24 13:54:39

MVVM模式

2020-04-21 22:59:50

Redis搭建選舉
點(diǎn)贊
收藏

51CTO技術(shù)棧公眾號

亚洲欧洲一区| 伊人亚洲精品| 久久久亚洲国产美女国产盗摄| 日韩av电影手机在线观看| 国产精品20p| 国产精品一区二区三区www| 亚洲福利视频一区| 五月天亚洲综合| 亚洲精品18在线观看| 日韩不卡一区二区三区 | 亚洲午夜激情影院| aa国产成人| 国产区在线观看成人精品| 亚洲一区美女视频在线观看免费| 日本网站在线播放| 香蕉综合视频| 亚洲乱码一区二区| 伊人久久久久久久久| 91亚洲视频| 欧美色视频日本高清在线观看| 伊人色综合影院| 日韩欧美亚洲系列| 国内精品视频666| 日韩av电影院| 久久久午夜影院| 影音先锋日韩在线| 中文字幕亚洲欧美在线| 粉嫩av懂色av蜜臀av分享| 精品一区二区三区免费看| 91国产丝袜在线播放| 亚洲中文字幕无码专区| 日韩伦理av| 亚洲欧美视频在线观看| 亚洲成人18| 久久电影中文字幕| 99久久精品免费精品国产| 亚洲伊人一本大道中文字幕| 亚洲天堂视频在线| 久久一区二区三区超碰国产精品| 国内精品久久久久久久| 人妻人人澡人人添人人爽| 日本在线电影一区二区三区| 亚洲精品自拍视频| 亚洲国产果冻传媒av在线观看| 免费看一区二区三区| 欧美精品在线一区二区三区| 日本久久久久久久久久久久| 成人免费直播| 一本一道综合狠狠老| 欧美 日韩 国产在线观看| av毛片午夜不卡高**水| 亚洲韩国精品一区| 日本大片免费看| 免费在线观看av电影| 一区二区理论电影在线观看| 亚洲天堂第一区| 最爽无遮挡行房视频在线| 亚洲免费高清视频在线| 黄色一级片网址| av黄色在线| 亚洲一区二区三区四区在线观看| 特级西西人体www高清大胆| 超碰最新在线| 亚洲资源中文字幕| 国产黄页在线观看| www.com.cn成人| 欧美性受xxxx| 日本中文字幕观看| 日本一区精品视频| 精品久久久久久久人人人人传媒| 亚洲最大视频网| 老牛国内精品亚洲成av人片| 日韩精品免费视频| 素人fc2av清纯18岁| 国产一区二区电影在线观看| 中文字幕欧美日韩| 日本福利片在线观看| 国产精品v亚洲精品v日韩精品| 色综合久久悠悠| 精品欧美一区二区三区免费观看| 久久亚洲电影| 成人激情视频在线| 神马久久久久久久久久| 不卡大黄网站免费看| 欧洲精品在线一区| 国产一区久久精品| 狠狠综合久久av一区二区小说 | 亚洲成人xxx| www.中文字幕av| 国产电影一区二区在线观看| 欧美激情中文网| 国产污视频网站| 国内精品国产三级国产a久久| 成人羞羞视频免费| 久久久久久青草| 亚洲欧美一区二区三区极速播放| 国产一区二区网| 欧美成人福利| 日韩av网址在线观看| 五月天免费网站| 亚洲精品欧美| 91在线观看免费网站| 午夜在线视频免费| 亚洲欧美一区二区久久| 麻豆av免费在线| 亚洲图色一区二区三区| 亚洲午夜未删减在线观看| 欧美三级黄色大片| 免费日韩精品中文字幕视频在线| 91精品国产一区二区三区动漫| 视频一区二区三区国产| 亚洲日本成人在线观看| 成人精品视频一区二区| 136福利精品导航| 中文字幕在线视频日韩| 国产精品一区二区6| 国产麻豆9l精品三级站| 日本中文不卡| 热色播在线视频| 欧美岛国在线观看| 午夜国产福利视频| 久久精品女人| 久久人人97超碰人人澡爱香蕉| 综合图区亚洲| 欧美久久久久免费| youjizz亚洲女人| 国产一区二区你懂的| 丁香五月网久久综合| av基地在线| 色菇凉天天综合网| 国产偷人妻精品一区| 国产精品a级| 91手机在线视频| 九色porny丨首页在线| 欧美日韩一区成人| 国产在线综合视频| 视频一区二区中文字幕| 韩国精品一区二区三区六区色诱| 羞羞污视频在线观看| 制服丝袜中文字幕一区| 精品少妇一区二区三区密爱| 免费日本视频一区| 日韩三级电影网站| 美女色狠狠久久| 亚洲一二三在线| 在线免费观看av网址| 久久精品视频一区| 毛葺葺老太做受视频| 国模精品一区| 国产精品高潮粉嫩av| 国产一区二区影视| 在线观看一区二区精品视频| 99久久精品免费视频 | 亚洲综合123| 亚洲影视一区| 国产精品12| www.九色在线| 亚洲欧洲av一区二区| 日本黄色一级视频| 欧美国产精品久久| 17c国产在线| 欧美在线1区| 国产福利久久| 自拍网站在线观看| 一区二区三区黄色| 国产精品国产av| 亚洲一区二区三区自拍| 一区二区不卡免费视频| 男女男精品视频| 经典三级在线视频| 91成人福利| 欧美孕妇性xx| 婷婷在线视频观看| 日韩免费一区二区| 精品成人av一区二区在线播放| 日本一区二区三区四区| 在线观看免费不卡av| 亚洲午夜视频| 日韩av影视| 视频一区在线| 日韩av色在线| a级网站在线播放| 日韩风俗一区 二区| 中文字幕在线播放av| 亚洲精品视频免费观看| av无码av天天av天天爽| 极品美女销魂一区二区三区 | 91色在线看| 国产一区二区三区四区福利| 国产黄色片av| 色老汉一区二区三区| 国产97免费视频| 91丝袜呻吟高潮美腿白嫩在线观看| 91最新在线观看| 伊人蜜桃色噜噜激情综合| 日韩精品欧美在线| 中文字幕日韩在线| 国产精品国产三级国产aⅴ浪潮| 超碰在线观看免费| 国产亚洲欧美另类中文| 亚洲精品国偷拍自产在线观看蜜桃 | 秋霞av亚洲一区二区三| 青青青在线观看视频| 日韩av在线中文字幕| 国内精品久久国产| 欧美综合影院| 欧美亚洲国产日韩2020| 久草免费在线| 亚洲欧美成人一区二区在线电影| 国产乱码精品一区二区三区精东| 欧美日韩一区二区三区| 少妇aaaaa| 国产精品天干天干在观线| 国产原创剧情av| 国产一区二区久久| 国产三级日本三级在线播放 | 国产精品-区区久久久狼| 亚洲欧洲美洲一区二区三区| 欧美精品尤物在线| 久本草在线中文字幕亚洲| 91啪国产在线| 国产精品美女午夜爽爽| 日本韩国在线不卡| √8天堂资源地址中文在线| 美女精品视频一区| 午夜视频在线观看网站| 亚洲免费一在线| 婷婷在线观看视频| 日韩一级片在线播放| 国产一区二区在线视频观看| 欧美无乱码久久久免费午夜一区| 欧美一级视频免费观看| 亚洲国产成人va在线观看天堂| 精品人妻伦九区久久aaa片| 国产精品免费人成网站| 男人舔女人下部高潮全视频| 国产亚洲精品福利| 国产精品成人免费一区久久羞羞| 蜜臀精品一区二区三区在线观看 | 久久久国产成人精品| 成人性生交大片免费看午夜| 亚洲人成在线观看| 美国成人毛片| 亚洲欧美变态国产另类| 经典三级在线| 亚洲日韩中文字幕| 国产免费av在线| 中文字幕久久亚洲| 日本中文在线| 久久天天躁狠狠躁夜夜躁| 免费av毛片在线看| 理论片在线不卡免费观看| 日本福利在线| 欧美成人一区在线| 日本电影在线观看| 97国产精品人人爽人人做| sis001亚洲原创区| 18性欧美xxxⅹ性满足| 亚洲欧洲高清| 国产精品久久久久av免费| 欧美成a人片免费观看久久五月天| 国产欧美婷婷中文| 久久亚洲精精品中文字幕| 99电影在线观看| 久久夜色精品国产噜噜av小说| 好吊色欧美一区二区三区四区| 日韩最新在线| 亚欧精品在线| 欧美三区不卡| www黄色av| 久久激情五月婷婷| 国产精品日日摸夜夜爽| 99热在这里有精品免费| 五月婷六月丁香| 亚洲人成小说网站色在线| 中日韩精品视频在线观看| 色悠久久久久综合欧美99| 91丨九色丨丰满| 亚洲变态欧美另类捆绑| 国产专区在线| 欧美精品一二区| 亚洲啊v在线| 成人久久久久久| 精品亚洲精品| 亚洲欧美日韩精品综合在线观看| 欧美va亚洲va日韩∨a综合色| 国产精品久久中文字幕| 热久久一区二区| 欧美一级片在线免费观看| 国产视频一区在线观看| 婷婷在线精品视频| 一本一道综合狠狠老| 国产黄a三级三级三级| 日韩精品免费综合视频在线播放| 日本中文字幕电影在线免费观看| 97精品国产97久久久久久免费 | 欧美三级乱人伦电影| 亚洲xxx在线| 中文字幕成人在线| 草美女在线观看| 成人在线激情视频| 亚洲va久久| 国产一区二区片| 日本视频一区二区三区| 一级欧美一级日韩片| 成人欧美一区二区三区白人| 日韩av大片在线观看| 欧美一区二区三区思思人| 国产乱理伦片a级在线观看| 久久久久久久久久久网站| 欧洲亚洲精品久久久久| 欧美精品亚洲精品| 91久久在线| 免费黄频在线观看| 久久精品一区八戒影视| 日本免费一二三区| 91精品国产高清一区二区三区| 国产有码在线| 97超视频免费观看| 97久久综合精品久久久综合| 伊人久久99| 美国一区二区三区在线播放 | 少妇又色又爽又黄的视频| 久久精品视频在线播放| 91亚洲视频| 日本成人黄色免费看| 亚洲神马久久| av电影中文字幕| 亚洲免费观看高清在线观看| 国产又粗又黄又爽| 在线观看欧美www| japanese23hdxxxx日韩| 加勒比在线一区二区三区观看| 欧美视频不卡| 日本泡妞xxxx免费视频软件| 亚洲伦在线观看| 国产叼嘿视频在线观看| 久久天天躁狠狠躁夜夜爽蜜月| 日韩av一级| 神马影院一区二区三区| 日韩精品一二三| 国产精品久久久久无码av色戒| 疯狂欧美牲乱大交777| 少妇精品视频一区二区| 国产综合在线视频| 六月丁香久久丫| 女性女同性aⅴ免费观女性恋| av中文一区二区三区| 国产又黄又爽又色| 亚洲欧美一区二区三区四区 | 成人高清免费在线| 91天堂在线观看| 欧美精品不卡| yjizz视频| 日韩欧中文字幕| porn亚洲| 91九色国产视频| 亚洲欧美伊人| 超碰caoprom| 欧美午夜xxx| 91电影在线播放| 国产综合香蕉五月婷在线| 欧美影院一区| 老熟妇精品一区二区三区| 日韩欧美国产骚| 日本在线看片免费人成视1000| 91精品视频免费观看| 亚洲第一精品影视| 自拍偷拍亚洲天堂| 欧美日免费三级在线| 中文字幕中文字幕在线中高清免费版 | 91麻豆福利精品推荐| 少妇一级淫片日本| 久久精品国产久精国产一老狼| 在线日韩成人| 久久黄色免费看| 综合色天天鬼久久鬼色| 成人无码一区二区三区| 日韩美女激情视频| 亚洲91中文字幕无线码三区| 女性生殖扒开酷刑vk| 色狠狠av一区二区三区| 性欧美videoshd高清| 老司机精品福利在线观看| 精品在线免费观看| 免费在线观看黄网站| 在线电影欧美日韩一区二区私密| 另类视频一区二区三区| 人妻精品无码一区二区三区| 国产欧美日韩另类视频免费观看| 国产chinasex对白videos麻豆| 538国产精品视频一区二区| 999国产精品视频| 日本一级片在线播放| 欧美女孩性生活视频| zzzwww在线看片免费| 麻豆中文字幕在线观看| 91亚洲国产成人精品一区二三| 亚洲视频在线免费播放| 97超级碰碰碰久久久|