精品欧美一区二区三区在线观看 _久久久久国色av免费观看性色_国产精品久久在线观看_亚洲第一综合网站_91精品又粗又猛又爽_小泽玛利亚一区二区免费_91亚洲精品国偷拍自产在线观看 _久久精品视频在线播放_美女精品久久久_欧美日韩国产成人在线

PyFlink場景案例 - PyFlink實現CDN日志實時分析

企業動態
CDN將源站資源緩存至遍布全球的加速節點上,當終端用戶請求獲取該資源時,無需回源,系統自動調用離終端用戶最近的CDN節點上已緩存的資源,那么如何進行實時日志分析呢?

CDN 日志實時分析綜述

CDN將源站資源緩存至遍布全球的加速節點上,當終端用戶請求獲取該資源時,無需回源,系統自動調用離終端用戶最近的CDN節點上已緩存的資源,那么如何進行實時日志分析呢?

架構

CDN日志的解析一般有一個通用的架構模式,就是首先要將各個邊緣節點的日志數據進行采集,一般會采集到消息隊列,然后將消息隊列和實時計算集群進行集成進行實時的日志分析,最后將分析的結果寫到存儲系統里面。那么我今天的案例將架構實例化,消息隊列采用Kafka,實時計算采用Flink,最終將數據存儲到MySql中。如下圖所示:

需求說明

阿里云實際的CDN日志數據結構如下(可能會不斷豐富字段信息):

為了介紹方便,我們將實際的統計需求進行簡化,示例將從CDN訪問日志中,根據IP解析出其所屬的地區,統計指標:

  • 按地區統計資源訪問量
  • 按地區統計資源下載總量
  • 按地區統計資源平均下載速度

CDN實時日志分析UDF定義

這里我們需要定義一個 ip_to_province()的UDF,輸入是ip地址,輸出是地區名字字符串。UDF的輸入類型是一個字符串,輸出類型也是一個字符串。同時我們會用到地理區域查詢服務

(http://whois.pconline.com.cn/ipJson.jsp?ip=27.184.139.25),大家在自己的生產環境要替換為可靠的地域查詢服務。

  1. import re 
  2. import json 
  3. from pyflink.table import DataTypes 
  4. from pyflink.table.udf import udf 
  5. from urllib.parse import quote_plus 
  6. from urllib.request import urlopen 
  7.  
  8. @udf(input_types=[DataTypes.STRING()], result_type=DataTypes.STRING()) 
  9. def ip_to_province(ip): 
  10.    """ 
  11.    format: 
  12.        { 
  13.        'ip': '27.184.139.25', 
  14.        'pro': '河北省', 
  15.        'proCode': '130000', 
  16.        'city': '石家莊市', 
  17.        'cityCode': '130100', 
  18.        'region': '靈壽縣', 
  19.        'regionCode': '130126', 
  20.        'addr': '河北省石家莊市靈壽縣 電信', 
  21.        'regionNames': '', 
  22.        'err': '' 
  23.        } 
  24.    """ 
  25.    try: 
  26.        urlobj = urlopen( \ 
  27.         'http://whois.pconline.com.cn/ipJson.jsp?ip=%s' % quote_plus(ip)) 
  28.        data = str(urlobj.read(), "gbk") 
  29.        pos = re.search("{[^{}]+\}", data).span() 
  30.        geo_data = json.loads(data[pos[0]:pos[1]]) 
  31.        if geo_data['pro']: 
  32.            return geo_data['pro'] 
  33.        else: 
  34.            return geo_data['err'] 
  35.    except: 
  36.        return "UnKnow" 

數據讀取和結果寫入定義

按照通用的作業結構,需要定義Source connector來讀取Kafka數據,定義Sink connector來將計算結果存儲到MySQL。最后是編寫統計邏輯。在這特別說明一下,在PyFlink中也支持SQL DDL的編寫,我們用一個簡單的DDL描述,就完成了Source Connector的開發。其中connector.type填寫kafka。SinkConnector也一樣,用一行DDL描述即可,其中connector.type填寫jdbc。

1. Kafka 數據源讀取DDL定義

數據統計需求我們只選取核心的字段,比如:uuid,表示唯一的日志標示,client_ip表示訪問來源,request_time表示資源下載耗時,response_size表示資源數據大小。Kafka數據字段進行簡化如下:

DDL定義如下:

  1. kafka_source_ddl = ""
  2. CREATE TABLE cdn_access_log ( 
  3.  uuid VARCHAR, 
  4.  client_ip VARCHAR, 
  5.  request_time BIGINT, 
  6.  response_size BIGINT, 
  7.  uri VARCHAR 
  8. ) WITH ( 
  9.  'connector.type' = 'kafka', 
  10.  'connector.version' = 'universal', 
  11.  'connector.topic' = 'access_log', 
  12.  'connector.properties.zookeeper.connect' = 'localhost:2181', 
  13.  'connector.properties.bootstrap.servers' = 'localhost:9092', 
  14.  'format.type' = 'csv', 
  15.  'format.ignore-parse-errors' = 'true' 
  16. """ 

2. MySql 數據寫入DDL定義

其中我們發現我們需求是按地區分組,但是原始日志里面并沒有地區的字段信息,所以我們需要定義一個Python UDF 更具 client_ip 來查詢對應的地區。所以我們對應的MySqL統計結果表如下:

DDL定義如下:

  1. mysql_sink_ddl = ""
  2. CREATE TABLE cdn_access_statistic ( 
  3.  province VARCHAR, 
  4.  access_count BIGINT, 
  5.  total_download BIGINT, 
  6.  download_speed DOUBLE 
  7. ) WITH ( 
  8.  'connector.type' = 'jdbc', 
  9.  'connector.url' = 'jdbc:mysql://localhost:3306/Flink', 
  10.  'connector.table' = 'access_statistic', 
  11.  'connector.username' = 'root', 
  12.  'connector.password' = '123456', 
  13.  'connector.write.flush.interval' = '1s' 
  14. """ 

核心統計邏輯

我們首先要將client_ip轉換為地區名字,然后在做數據統計,如下核心統計邏輯:

  1. # 核心的統計邏輯 
  2. t_env.from_path("cdn_access_log")\ 
  3.    .select("uuid, " 
  4.            "ip_to_province(client_ip) as province, " # IP 轉換為地區名稱 
  5.            "response_size, request_time")\ 
  6.    .group_by("province")\ 
  7.    .select( # 計算訪問量 
  8.            "province, count(uuid) as access_count, "  
  9.            # 計算下載總量  
  10.            "sum(response_size) as total_download,  "  
  11.            # 計算下載速度 
  12.            "sum(response_size) * 1.0 / sum(request_time) as download_speed") \ 
  13.    .insert_into("cdn_access_statistic") 

完整的作業代碼

我們整體看一遍完整代碼結構,首先是核心依賴的導入,然后是我們需要創建一個ENV,并設置采用的planner(目前Flink支持Flink和blink兩套planner)建議大家采用 blink planner。 接下來將我們剛才描述的kafka和mysql的ddl進行表的注冊。再將Python UDF進行注冊,這里特別提醒一點,UDF所依賴的其他文件也可以在API里面進行制定,這樣在job提交時候會一起提交到集群。然后是核心的統計邏輯,最后調用executre提交作業。這樣一個實際的CDN日志實時分析的作業就開發完成了。具體代碼如下:

  1. import os 
  2.  
  3. from pyFlink.datastream import StreamExecutionEnvironment 
  4. from pyflink.table import StreamTableEnvironment, EnvironmentSettings 
  5. from enjoyment.cdn.cdn_udf import ip_to_province 
  6. from enjoyment.cdn.cdn_connector_ddl import kafka_source_ddl, mysql_sink_ddl 
  7.  
  8. # 創建Table Environment, 并選擇使用的Planner 
  9. env = StreamExecutionEnvironment.get_execution_environment() 
  10. t_env = StreamTableEnvironment.create( 
  11.    env, 
  12.    environment_settings=EnvironmentSettings.new_instance().use_blink_planner().build()) 
  13.  
  14. # 創建Kafka數據源表 
  15. t_env.sql_update(kafka_source_ddl) 
  16. # 創建MySql結果表 
  17. t_env.sql_update(mysql_sink_ddl) 
  18.  
  19. # 注冊IP轉換地區名稱的UDF 
  20. t_env.register_function("ip_to_province", ip_to_province) 
  21.  
  22. # 添加依賴的Python文件 
  23. t_env.add_Python_file( 
  24.     os.path.dirname(os.path.abspath(__file__)) + "/enjoyment/cdn/cdn_udf.py") 
  25. t_env.add_Python_file(os.path.dirname( 
  26.     os.path.abspath(__file__)) + "/enjoyment/cdn/cdn_connector_ddl.py") 
  27.  
  28. # 核心的統計邏輯 
  29. t_env.from_path("cdn_access_log")\ 
  30.    .select("uuid, " 
  31.            "ip_to_province(client_ip) as province, " # IP 轉換為地區名稱 
  32.            "response_size, request_time")\ 
  33.    .group_by("province")\ 
  34.    .select( # 計算訪問量 
  35.            "province, count(uuid) as access_count, "  
  36.            # 計算下載總量  
  37.            "sum(response_size) as total_download,  "  
  38.            # 計算下載速度 
  39.            "sum(response_size) * 1.0 / sum(request_time) as download_speed") \ 
  40.    .insert_into("cdn_access_statistic") 
  41.  
  42. # 執行作業 
  43. t_env.execute("pyFlink_parse_cdn_log") 

環境搭建(MacOS)

1. 安裝MySQL

  1. $  brew install mysql 
  2. Updating Homebrew... 
  3. ==> Auto-updated Homebrew! 
  4. ... 
  5. ... 
  6. MySQL is configured to only allow connections from localhost by default 
  7.  
  8. To connect run: 
  9.     mysql -uroot 
  10.  
  11. To have launchd start mysql now and restart at login: 
  12.   brew services start mysql 
  13. Or, if you don't want/need a background service you can just run: 
  14.   mysql.server start 

如果沒有安裝brew,執行以下指令安裝:

  1. $ /usr/bin/ruby -e "$(curl -fsSL https://raw.githubusercontent.com/Homebrew/install/master/install)" 

安裝完成后,執行安全配置,將root用戶的密碼設置為JxXX&&l2j2#:

  1. $ mysql_secure_installation 

啟動mysql服務:

  1. $ brew services start mysql 
  2.  
  3. ==> Tapping homebrew/services 
  4. Cloning into '/usr/local/Homebrew/Library/Taps/homebrew/homebrew-services'... 
  5. ... 
  6. ... 
  7. ==> Successfully started `mysql` (label: homebrew.mxcl.mysql) 

登錄mysql并創建flink數據庫和cdn_access_statistic表,并確認開放端口號為默認的3306:

  1. $ mysql -uroot -p 
  2. Enter password: JxXX&&l2j2# 
  3. mysql> use flink; 
  4. Database changed 
  5. mysql> CREATE TABLE cdn_access_statistic( 
  6.     ->   province VARCHAR(255) PRIMARY KEY, 
  7.     ->   access_count BIGINT, 
  8.     ->   total_download BIGINT, 
  9.     ->   download_speed DOUBLE 
  10.     -> ) ENGINE=INNODB CHARSET=utf8mb4
  11. Query OK, 0 rows affected (0.01 sec) 
  12. exit 

這樣mysql的環境就準備好了。

2. 安裝Kafka

在Mac系統安裝Kafka也非常方便,如下:

  1. brew install kafka 
  2. Updating Homebrew... 
  3. ==> Installing dependencies for kafka: zookeeper 
  4. ... 
  5. ... 
  6. ==> Summary 
  7. /usr/local/Cellar/zookeeper/3.5.7: 394 files, 11.3MB 
  8. ==> Installing kafka 
  9. ... 
  10. ... 
  11. ==> kafka 
  12. To have launchd start kafka now and restart at login: 
  13.   brew services start kafka 
  14. Or, if you don't want/need a background service you can just run: 
  15.   zookeeper-server-start /usr/local/etc/kafka/zookeeper.properties & kafka-server-start /usr/local/etc/kafka/server.properties 

安裝完成后執行以下指令啟動zookeeper和kafka:

  1. $ cd /usr/local/opt/kafka/libexec/bin/ 
  2. $ ./zookeeper-server-start.sh ../config/zookeeper.properties & 
  3. $ (回車) 
  4. $ ./kafka-server-start.sh ../config/server.properties & 
  5. $ (回車) 

然后在kafka中創建名為cdn_access_log的topic:

  1. $ kafka-topics --create --replication-factor 1 --partitions 1 --topic cdn_access_log --zookeeper localhost:2181 
  2. ... 
  3. Created topic cdn_access_log. 

這樣kafka的環境也準備好了.

3. 安裝PyFlink

確保使用Python3.5+的版本,檢驗如下:

  1. $ python --version 
  2. Python 3.7.6 

說過pip install安裝PyFlink

  1. python -m pip install apache-flink==1.10.0 
  2. ... 
  3. ... 
  4. Successfully installed apache-beam-2.15.0 dill-0.2.9 pyarrow-0.14.1 

檢查我們是否成功安裝了PyFlink和依賴的Beam,如下:

  1. $ python -m pip list |grep apache- 
  2. apache-beam                   2.15.0                  
  3. apache-flink                  1.10.0 

如上顯示說明已經完成安裝。

4. Copy使用的Connector的JAR包

Flink默認是沒有打包connector的,所以我們需要下載各個connector所需的jar包并放入PyFlink的lib目錄。首先拿到PyFlink的lib目錄的路徑:

  1. PYFLINK_LIB=`python -c "import pyflink;import os;print(os.path.dirname(os.path.abspath(pyflink.__file__))+'/lib')"` 
  2. $ echo $PYFLINK_LIB 
  3. /usr/local/lib/python3.7/site-packages/pyflink/lib 

然后想需要的jar包下載到lib目錄中去:

  1. $ cd $PYFLINK_LIB 
  2. $ curl -O https://repo1.maven.org/maven2/org/apache/flink/flink-sql-connector-kafka_2.11/1.10.0/flink-sql-connector-kafka_2.11-1.10.0.jar 
  3. $ curl -O https://repo1.maven.org/maven2/org/apache/flink/flink-jdbc_2.11/1.10.0/flink-jdbc_2.11-1.10.0.jar 
  4. $ curl -O https://repo1.maven.org/maven2/org/apache/flink/flink-csv/1.10.0/flink-csv-1.10.0-sql-jar.jar 
  5. $ curl -O https://repo1.maven.org/maven2/mysql/mysql-connector-java/8.0.19/mysql-connector-java-8.0.19.jar 

最終lib的JARs如下:

  1. $ ls -la 
  2. total 310448 
  3. drwxr-xr-x  12 jincheng.sunjc  admin        384  3 27 15:48 . 
  4. drwxr-xr-x  28 jincheng.sunjc  admin        896  3 24 11:44 .. 
  5. -rw-r--r--   1 jincheng.sunjc  admin      36695  3 27 15:48 flink-csv-1.10.0-sql-jar.jar 
  6. -rw-r--r--   1 jincheng.sunjc  admin  110055308  2 27 13:33 flink-dist_2.11-1.10.0.jar 
  7. -rw-r--r--   1 jincheng.sunjc  admin      89695  3 27 15:48 flink-jdbc_2.11-1.10.0.jar 
  8. -rw-r--r--   1 jincheng.sunjc  admin    2885638  3 27 15:49 flink-sql-connector-kafka_2.11-1.10.0.jar 
  9. -rw-r--r--   1 jincheng.sunjc  admin   22520058  2 27 13:33 flink-table-blink_2.11-1.10.0.jar 
  10. -rw-r--r--   1 jincheng.sunjc  admin   19301237  2 27 13:33 flink-table_2.11-1.10.0.jar 
  11. -rw-r--r--   1 jincheng.sunjc  admin     489884  2 27 13:33 log4j-1.2.17.jar 
  12. -rw-r--r--   1 jincheng.sunjc  admin    2356711  3 27 15:48 mysql-connector-java-8.0.19.jar 
  13. -rw-r--r--   1 jincheng.sunjc  admin      20275 12  5 17:01 pyflink-demo-connector-0.1.jar 
  14. -rw-r--r--   1 jincheng.sunjc  admin       9931  2 27 13:33 slf4j-log4j12-1.7.15.jar 

這樣所有的環境就準備好了。

本地運行作業

  • 啟動本地集群:
  1. $PYFLINK_LIB/../bin/start-cluster.sh local 

啟動成功后可以打開web界面(默認8081端口,我更改為4000)

  • 提交作業

示例涉及到如下三個py文件:

我們在作業文件所在目錄進行執行:

  1. $ $PYFLINK_LIB/../bin/flink run -m localhost:4000 -py cdn_demo.py 
  2. Job has been submitted with JobID e71de2db44fd6cfb9bae93bd04ee2ec9 

查看控制臺如下:

目前我們已經成功的將作業啟動起來了,接下來我們向Kafka里面進行數據的寫入。

Mock 數據

我們對CDN數據進行Mock,并進行統計可視化,工具代碼請下載。

進行源碼編譯

  1. $ python setup.py sdist 
  2. $ sudo python -m pip install dist/* 

運行工具

  1. $ start_dashboard.py  
  2. --------------------------------------environment config-------------------------------------- 
  3. Target kafka port: localhost:9092 
  4. Target kafka topic: cdn_access_log 
  5. Target mysql://localhost:3306/flink 
  6. Target mysql table: cdn_access_statistic 
  7. Listen at: http://localhost:64731 
  8. ---------------------------------------------------------------------------------------------- 
  9. To change above environment config, edit this file: /usr/local/bin/start_dashboard.py 

打開可視化界面: (根據命令行提示的 Listen at:后面的地址)

小結

本篇是場景案例系列,開篇分享了老子教導我們要學會放空自己,空杯才能注水。全篇圍繞阿里云CDN日志實時分析需求進行展開,描述了任何利用PyFlink解決實際業務問題。最后又為大家提供了示例的數據的Mock工具和可視化統計頁面,希望對大家有所幫助。

作者介紹

本人孫金城,淘寶花名“金竹”,阿里巴巴高級技術專家。2011年加入阿里,在2016年開始ASF社區貢獻,目前是 ASF Member, PMC member of @ApacheFlink and a Committer for @ApacheFlink, @ApacheBeam, @ApacheIoTDB。

【本文為51CTO專欄作者“金竹”原創稿件,轉載請聯系原作者】

戳這里,看該作者更多好文 

 

責任編輯:趙寧寧 來源: 51CTO專欄
相關推薦

2021-08-30 10:19:05

PyFlink 開發環境Zeppelin No

2020-05-15 10:28:04

實時分析客戶需求CIO

2022-07-12 10:38:25

分布式框架

2014-01-22 11:22:44

華為HANA一體機FusionCube大數據分析

2016-06-13 14:38:46

開源Skydive

2016-11-29 09:27:22

Apache SparDashboard構建

2016-10-31 19:19:20

實時分析

2016-09-17 00:12:46

2022-07-14 15:08:21

SQL數據驅動NoSQL

2018-09-19 10:01:39

MSSQL列存儲實時分析

2020-04-09 11:08:30

PyFlinkJAR依賴

2016-08-31 14:41:31

大數據實時分析算法分類

2023-10-31 15:40:12

2021-06-07 10:20:26

實時分析IT領導者CIO

2019-08-19 14:24:39

數據分析Spark操作

2012-02-21 10:25:35

SAPHANA實時分析

2018-12-18 15:21:22

海量數據Oracle

2016-11-09 15:23:44

2024-06-06 08:58:08

大數據SQLAPI

2024-06-04 14:10:00

FlinkSQL窗口大數據
點贊
收藏

51CTO技術棧公眾號

国产视频二区在线观看| 亚洲日本韩国在线| 亚洲免费看片| 综合欧美一区二区三区| 91蜜桃网站免费观看| 国产在线视频你懂的| 国产成人精品一区二区免费看京 | 亚洲成人久久一区| 热久久精品免费视频| 成人片在线看| 国产亚洲综合av| 超碰在线97av| 在线观看免费黄色小视频| 欧美三级在线| 在线观看国产成人av片| 日韩女优在线视频| 成人交换视频| 亚洲成人手机在线| 一区二区在线观看网站| 天堂在线视频观看| 国产一区二区毛片| 久久久噜久噜久久综合| 特级西西人体高清大胆| 欧美偷窥清纯综合图区| 6080日韩午夜伦伦午夜伦| 黑人糟蹋人妻hd中文字幕| 91香蕉在线观看| 国产精品美女久久久久久2018| 国产一区自拍视频| 99久久精品免费看国产交换| 日韩成人一区二区| 性欧美长视频免费观看不卡| www.99re7| 日韩激情免费| 亚洲亚裔videos黑人hd| 99久久免费看精品国产一区| 清纯唯美激情亚洲| 欧美剧情片在线观看| 免费看a级黄色片| 都市激情亚洲一区| 欧美性xxxxxxxxx| 国产69精品久久久久999小说| 在线三级中文| 亚洲欧美日本在线| 永久免费在线看片视频| 日本高清视频在线观看| 中文字幕欧美三区| 亚洲春色在线视频| 波多野结衣在线网站| 国产亚洲制服色| 欧美激情论坛| 黄色片在线免费看| 久久久久国产精品人| 欧美极品一区| 黄色大片在线免费观看| 久久女同互慰一区二区三区| 免费成人看片网址| 男女污污视频在线观看| 91蝌蚪porny成人天涯| 精品一区二区三区国产| 欧美色18zzzzxxxxx| 久久婷婷国产综合精品青草 | 亚洲 国产 欧美 日韩| 99免费精品视频| 久久精品日产第一区二区三区| 色香蕉在线视频| 久久亚洲精华国产精华液 | 欧美女v视频| 国产性天天综合网| 五月天亚洲综合| 看女生喷水的网站在线观看| 亚洲免费伊人电影| www.日本少妇| 欧美人体一区二区三区| 欧美日韩午夜在线视频| 日日夜夜精品视频免费观看| 97青娱国产盛宴精品视频| 亚洲国产欧美一区二区丝袜黑人 | www.五月天激情| 成人看片黄a免费看在线| 国产一区二区三区高清视频| 欧美男男同志| 亚洲品质自拍视频| 老太脱裤子让老头玩xxxxx| 日韩av大片站长工具| 欧美日韩精品一区二区三区蜜桃 | 国产91精品久久久| 国产suv精品一区二区33| 久久国产成人午夜av影院| 91亚色免费| 青青操视频在线| 国产精品国产自产拍高清av| 中文字幕在线成人| 波多野结衣久久久久| 狠狠色丁香久久综合频道| 奇米四色中文综合久久| 91美女精品网站| www.亚洲在线| 在线观看精品视频| 乱馆动漫1~6集在线观看| 欧美日韩亚洲综合| 大黑人交xxx极品hd| 围产精品久久久久久久| 91精品国产高清自在线| 国产精品久久久久久免费| 不卡电影一区二区三区| 一区二区av| 在线视频超级| 欧美不卡一区二区三区四区| 黄免费在线观看| 在线欧美福利| 91精品国产综合久久香蕉的用户体验| 免费国产羞羞网站视频| 国产精品久久影院| 黄色a级片免费| 8x国产一区二区三区精品推荐| 这里只有精品视频| 国产情侣自拍av| 国产成人综合视频| 一区二区三区四区五区视频| 一区二区三区短视频| 日韩精品一区在线| 五月天激情丁香| 美女国产一区二区| 清纯唯美一区二区三区| 青草av在线| 日韩网站在线看片你懂的| 国产又粗又猛又爽又黄的视频四季| 国产一区成人| 国产午夜精品一区| 国产精品69xx| 日韩视频中午一区| 欧美国产日韩在线观看成人 | 18欧美乱大交hd1984| 农村妇女精品一二区| 久久综合五月婷婷| 午夜精品99久久免费| 精品人妻少妇AV无码专区| 中文字幕亚洲欧美在线不卡| 国产又大又黄又粗的视频| 中文字幕伦av一区二区邻居| 91成人在线观看国产| 国产 日韩 欧美 精品| 亚洲一区在线免费观看| 中文国产在线观看| 亚洲五月综合| 99国产视频| 日本在线视频www鲁啊鲁| 欧美本精品男人aⅴ天堂| 欧美成人精品激情在线视频| 国产美女娇喘av呻吟久久| 佐佐木明希av| 在线视频亚洲欧美中文| 久久久久这里只有精品| 午夜av免费观看| 欧美性高潮在线| 亚洲第一综合网| 久久99久久99| 欧美日韩午夜爽爽| 国内精品偷拍| 欧美制服第一页| 在线日本中文字幕| 91精品国产色综合久久不卡蜜臀 | 日韩毛片无码永久免费看| 日韩国产欧美在线观看| 亚洲成人a**址| 亚洲毛片在线免费| 国内精品久久久久久久久| 婷婷亚洲一区二区三区| 欧美自拍偷拍一区| 五月综合色婷婷| 成人h动漫精品一区二区| 国产精品无码av在线播放| 欧美日韩高清| 亚洲自拍小视频免费观看| segui88久久综合9999| 亚洲日韩中文字幕在线播放| 亚洲一区二区人妻| 一二三区精品福利视频| 亚洲自拍偷拍一区二区| 久久99精品视频| 无码粉嫩虎白一线天在线观看 | 亚洲视频第一页| 97精品人妻一区二区三区| 亚洲一区二区三区四区不卡| 超碰97人人干| 国产一区二区三区久久久| 欧美三级在线观看视频| 成人同人动漫免费观看 | 在线看成人短视频| 成人h猎奇视频网站| 福利在线免费视频| 日韩最新av在线| 亚洲aⅴ在线观看| 欧美日韩国产免费一区二区 | 国产又黄又粗视频| 国产酒店精品激情| 国产日韩一区二区在线观看| 亚洲一区在线| 日本在线一区| 国产色噜噜噜91在线精品| 国产精品网站视频| 国产理论在线| 久久亚洲精品中文字幕冲田杏梨| 性xxxxbbbb| 日韩一区二区三区观看| 国内av在线播放| 午夜精品国产更新| 亚洲av无码一区二区三区在线| 久久综合成人精品亚洲另类欧美 | 91干在线观看| 日韩免费影院| 丝袜亚洲欧美日韩综合| 牛牛澡牛牛爽一区二区| 精品少妇一区二区三区在线视频 | 亚洲国产欧美一区二区丝袜黑人 | 精品久久毛片| 日本欧美精品在线| 99色在线观看| 欧美高清视频一区二区| 免费高清完整在线观看| 国产一区二区激情| 你懂的在线看| 日韩激情av在线播放| 国 产 黄 色 大 片| 日韩精品一区国产麻豆| 精品国产亚洲AV| 777xxx欧美| 91午夜交换视频| 欧美日韩精品专区| 在线免费看av的网站| 在线看日韩精品电影| 福利网址在线观看| 欧美视频一二三| 成人在线免费看视频| 偷窥少妇高潮呻吟av久久免费| 久久免费公开视频| 艳妇臀荡乳欲伦亚洲一区| 欧美高清视频一区二区三区| 亚洲欧美精品午睡沙发| 五月综合色婷婷| 亚洲欧美色综合| 久草网站在线观看| 亚洲激情在线播放| 久久精品99久久久久久| 亚洲激情自拍视频| 久久久久99精品| 性欧美疯狂xxxxbbbb| 成年人免费看毛片| 欧美性xxxxx极品娇小| 久久久精品毛片| 欧美视频在线一区| 91麻豆国产在线| 日韩女优av电影| 色呦呦免费观看| 亚洲免费一级电影| 国产粉嫩一区二区三区在线观看| 国产亚洲免费的视频看| 在线观看a视频| 九九热精品视频国产| 国产盗摄一区二区| 2019中文字幕免费视频| 欧洲一级精品| 91久久精品一区| 国产成人夜色高潮福利影视| 韩国成人一区| 成人黄色小视频| www.亚洲一区二区| 国产一区二区中文| 东京热加勒比无码少妇| 麻豆精品久久精品色综合| 午夜影院免费观看视频| 成人av免费在线播放| 人人妻人人澡人人爽人人精品| 日本一区二区在线不卡| 91视频综合网| 精品福利在线视频| 中文字幕乱码人妻无码久久| 日韩午夜中文字幕| 九色在线观看| 久久99精品国产99久久6尤物| www.色在线| 国产精品日韩在线观看| 91久久精品无嫩草影院| 免费精品视频一区二区三区| 97精品在线| www国产精品内射老熟女| 蜜臂av日日欢夜夜爽一区| 国产艳妇疯狂做爰视频| 国产日韩欧美制服另类| 久久国产精品二区| 欧美在线影院一区二区| 亚洲奶汁xxxx哺乳期| 影音先锋欧美精品| 毛片在线导航| 国产一区二区在线播放| 全国精品免费看| 18视频在线观看娇喘| 裸体一区二区| 蜜臀视频在线观看| 国产精品欧美极品| 欧美a∨亚洲欧美亚洲| 欧美一区二区三区四区五区 | 国产亚洲精品福利| 久久久久99精品成人片毛片| 欧美怡红院视频| 日本福利午夜视频在线| 欧美俄罗斯性视频| 欧美与亚洲与日本直播| 狠狠色狠狠色综合人人| 亚洲精品在线观看91| 国产精品拍拍拍| 99久久国产综合色|国产精品| 中文字幕人妻一区二| 欧美四级电影在线观看| 四虎在线免费观看| 久久久噜噜噜久噜久久| 精品国产伦一区二区三区观看说明| 欧美日韩国产三区| 亚洲免费黄色| 亚洲精品乱码久久久久久蜜桃欧美| 国产精品国产自产拍高清av| 久久精品99北条麻妃| 精品视频在线播放免| av在线理伦电影| av成人观看| 欧美激情成人在线| 亚洲黄色av片| 中文字幕亚洲电影| 在线观看色网站| 少妇高潮久久久久久潘金莲| 天天综合网站| 日韩激情视频| 视频在线观看一区二区三区| 免费黄色在线视频| 欧美日韩亚洲网| 深夜影院在线观看| 91chinesevideo永久地址| 欧美日韩一区二区三区在线电影 | 日本sm残虐另类| 婷婷色一区二区三区| 欧美一a一片一级一片| 国产综合视频一区二区三区免费| 国产69久久精品成人| 中文字幕精品影院| 中文字幕天天干| 国产精品久久久久久久久久免费看| 中文字幕观看视频| 日韩在线视频网| 精品中文在线| 嫩草影院中文字幕| 99久久精品免费看国产| 久久久久久久久影院| 亚洲人成电影在线| 精品久久福利| 成人手机在线播放| 成人不卡免费av| 手机在线看片1024| 正在播放亚洲1区| 精品一区二区三区在线观看视频| 国产在线无码精品| 成人妖精视频yjsp地址| 欧美啪啪小视频| 国产一区二区三区视频在线观看| 91在线亚洲| 黄色一级片av| 91尤物视频在线观看| 无码日韩精品一区二区| 日韩在线观看成人| 这里视频有精品| 虎白女粉嫩尤物福利视频| 国产精品美女视频| 亚洲美女福利视频| 国产mv免费观看入口亚洲| 97精品国产| 精品国产一区在线| 欧美亚洲综合色| 亚洲区欧洲区| 欧美激情www| 国产精品白丝av| 国产黄网在线观看| 欧美理论片在线观看| 日韩精品丝袜美腿| 天堂av2020| 欧美视频在线视频| 国产午夜精品久久久久免费视| 国产视频一区二区三区四区| 美腿丝袜亚洲一区| 日韩免费在线视频观看| 最近2019中文字幕大全第二页| a看欧美黄色女同性恋| 国产成人手机视频| 性感美女极品91精品| 视频免费一区| 狠狠色综合色区| 国产一区视频在线看| 亚洲综合久久网| 欧美精品久久久久久久| 四虎国产精品免费观看| 成年人的黄色片|