用戶離線實時畫像融合實踐得物技術(shù)

1、引言
用戶畫像,即用戶信息標簽化,它本質(zhì)是對用戶的一種建模,能夠幫助企業(yè)快速找到精準用戶群體以及用戶需求等更為廣泛的反饋信息,在現(xiàn)如今應(yīng)用越來越廣泛。本文主要講述用戶畫像在離線、實時方面的數(shù)據(jù)鏈路處理以及基于特定場景要求如何將離線、實時畫像進行在線融合的過程。
2、背景
目前的算法畫像服務(wù)分為兩部分,一部分是離線畫像,也就是批處理計算層,依賴DataWorks每天T+1的調(diào)度處理。批處理層是通過處理所有的已有歷史數(shù)據(jù)來實現(xiàn)數(shù)據(jù)的準確性。這意味著它是基于完整的數(shù)據(jù)集來重新計算的,能夠修復(fù)數(shù)據(jù)錯誤;另一部分是實時畫像,它的數(shù)據(jù)處理依賴流式計算層Flink。根據(jù)用戶實時的行為數(shù)據(jù)進行流式處理實時更新用戶畫像。由于兩種模式提供的狀態(tài)差異,所以需要我們?yōu)榕幚砗土魈幚硖峁┎煌姆?wù)層并在這個上面做合并處理。基于此,需要我們基于離線和實時畫像進行融合處理。
整個的數(shù)據(jù)鏈路大致如下:

主要分為三部分:批處理層、流處理層、數(shù)據(jù)融合層。接下來逐一講解每層的數(shù)據(jù)鏈路處理。
3、批處理層
批處理層依賴于定時調(diào)度,基于用戶日常的行為數(shù)據(jù)通過批處理過程以精確地計算用戶的離線畫像。離線畫像一方面用作補充實時鏈路的數(shù)據(jù)問題;另一方面是當用戶冷啟動時,如何進行用戶畫像的補充,在算法側(cè)請求時能夠拿到這部分用戶的畫像。同時在離線畫像數(shù)據(jù)加工完成后,需要考慮將這部分ODPS中的離線畫像及時地更新到用戶畫像服務(wù)中。在這里我們采取懶加載的方式,將離線畫像存儲到HBASE中,后續(xù)基于用戶當天第一次啟動App時,將用戶的離線畫像進行加載,這部分懶加載流程會在下文講解。
數(shù)據(jù)鏈路如下:

主要分為兩個部分:
a、每天定時調(diào)度生成日活用戶的離線畫像T+1,導(dǎo)入HBASE中。
b、基于步驟1的完成,向HBASE中記錄一條Log,代表當天T+1的離線畫像已經(jīng)成功寫入,Log中包含當天畫像的數(shù)據(jù)量、畫像的版本號及完成時間。這里的Log實際是作為標志位,用于判斷T+1畫像的完整性,后續(xù)懶加載流程會利用當天的Log來判斷是否加載離線畫像以及加載幾次。
4、流處理層
這里的流處理層分為兩塊,一塊是實時畫像,訂閱用戶的實時行為數(shù)據(jù)進行Flink處理而來;另一塊實際是對批處理層提供的離線畫像進行處理,基于用戶的實時登錄行為懶加載離線畫像。
上文提到在批處理層將用戶離線畫像導(dǎo)入HBASE后,通過懶加載的方式將離線畫像加載到畫像融合框架。
整個懶加載流程如下:

大致分為如下幾個步驟:
a、訂閱用戶的登錄行為埋點APPSTART。
b、根據(jù)訂閱的用戶登錄行為加載HBASE中的離線畫像。這里有一點需要說明的就是上述提到HBASE中的畫像Log記錄,利用Log來判斷是否需要加載畫像。假設(shè)當天T+1的畫像已經(jīng)完整的導(dǎo)入到HBASE中,當天用戶第一次登錄時,就會Load離線畫像,同時利用Flink的State記錄當天用戶已經(jīng)加載了T+1畫像,后續(xù)用戶當天再次登錄時就不會再Load離線畫像,做到當天只加載一次T+1畫像,降低HBASE的訪問壓力;相反如果用戶當天登錄時,T+1畫像并沒有Log記錄, Load畫像時State會記錄用戶當天加載了T+2畫像,后續(xù)只有當T+1畫像完成后用戶再次登錄,才會去獲取一次最新的離線畫像,同時更改State記錄。
c、讀取標簽配置表,根據(jù)對應(yīng)標簽的配置信息將畫像的格式、類型進行轉(zhuǎn)換滿足算法側(cè)的使用。
d、將轉(zhuǎn)換后的畫像統(tǒng)一成畫像框架消費的Action格式發(fā)送到消息隊列中,供后續(xù)融合框架消費和實時畫像進行融合。
懶加載的流程整體上就是上面所述,在這里有一點補充就是步驟1中訂閱的用戶登錄埋點APPSTART。在實際中由于受到埋點上報延遲、網(wǎng)絡(luò)等一系列原因,可能會導(dǎo)致部分用戶離線畫像加載的延遲,用戶請求時離線畫像尚未加載到,造成畫像覆蓋率降低。基于此,我們通過訂閱用戶的Init數(shù)據(jù)(先于推薦流請求)作為補充觸發(fā)事件來加載離線畫像,從而進一步提升畫像覆蓋率。
另外就是Log中版本號的概念,主要是為了容錯,防止出現(xiàn)畫像數(shù)據(jù)版本當天迭代更新。我們要求每次迭代version都要對應(yīng)+1,這樣當用戶登錄時假如當天的version出現(xiàn)了變化會再次加載最新的版本畫像,從而保障用戶加載的離線畫像版本是最新的。
接下來看下實時畫像的數(shù)據(jù)鏈路,整個流程如下:

大致分為如下幾個步驟:
1)Flink訂閱用戶行為數(shù)據(jù),根據(jù)畫像具體的業(yè)務(wù)要求處理行為數(shù)據(jù)。
2)將處理后的行為數(shù)據(jù)構(gòu)建畫像框架統(tǒng)一的Action算子發(fā)送到Kafka中。Action中包含標簽名稱、標簽值、標簽對應(yīng)的處理算子、行為時間等相關(guān)信息。
3)畫像框架消費Action信息,根據(jù)配置的信息做對應(yīng)的算子類型處理。比如map、List、String等一系列類型處理。
4)將處理后的實時畫像寫入Redis。 離線畫像的懶加載流程和實時畫像處理流程大致如上,最終目的是要按照框架Action格式發(fā)送到Kafka中供畫像框架融合使用,達到離線和實時畫像的合并。
5、畫像融合層
基于批處理層和流處理層的畫像數(shù)據(jù),我們需要將離線畫像和實時畫像進行融合處理。

首先需要明確的一點就是離線、實時畫像的數(shù)據(jù)格式一定要統(tǒng)一,否則談不上融合。同時在數(shù)據(jù)處理的口徑上也是要統(tǒng)一的,這樣做的好處是校驗數(shù)據(jù)時容易追溯、定位問題。
那如何進行畫像融合呢?這里以具體的標簽舉例。假如標簽a是用戶的點擊行為序列List,序列中包含用戶點擊商品cspuId、用戶行為時間、商品推薦渠道等信息。標簽a的數(shù)據(jù)格式如下:
在畫像配置表中,我們首先會配置標簽a的相關(guān)信息,比如sizeLimt為1000,排序字段為et,按照cspuId、et兩個字段去重等等信息。
在實時畫像層,我們知道用戶實時的點擊行為會產(chǎn)生實時的點擊畫像數(shù)據(jù),假設(shè)產(chǎn)生的實時畫像數(shù)據(jù)如下:
基于這個實時畫像數(shù)據(jù)我們會構(gòu)建統(tǒng)一的Action格式算子,實時的標簽a配置的處理算子是 list.rpush,代表將針對a標簽進行List的add操作。
在懶加載層,加載到的離線標簽a的數(shù)據(jù)格式如下:
基于這個離線畫像我們也會構(gòu)建統(tǒng)一的Action格式算子,離線標簽a配置的處理算子是 list.rpushl,代表對a標簽進行List的addAll操作。
畫像融合框架消費Action消息隊列時,由于TTL的原因,假設(shè)Redis中用戶的a標簽數(shù)據(jù)已經(jīng)清空,在用戶冷啟動時畫像框架會根據(jù)消費到的離線標簽數(shù)據(jù)及對應(yīng)的操作算子將a標簽數(shù)據(jù)補充完整。與此同時用戶后續(xù)產(chǎn)生了上述實時的畫像,同樣道理根據(jù)對應(yīng)的操作算子將實時畫像add到標簽a中,當然會根據(jù)標簽a的配置信息比如大小,排序字段等取最近的sizeLimit畫像。
另外比如用戶的a標簽中數(shù)據(jù)已經(jīng)有歷史累積了,這時候離線畫像可以用作數(shù)據(jù)修復(fù)。畫像融合框架拿到離線畫像會結(jié)合已經(jīng)存在的a標簽數(shù)據(jù)進行去重,按照et排序等一系列操作,補充實時鏈路可能出現(xiàn)的數(shù)據(jù)丟失問題,最終得到完整的上述a標簽數(shù)據(jù)。
考慮到不同類型標簽的操作差異,畫像融合框架會根據(jù)需求定制不同的操作算子,這樣可以很靈活地處理算法側(cè)不同的標簽需求。
基于此,通過簡單的標簽舉例,能夠了解整個畫像融合的過程。當然實際中還有更多細節(jié)問題可以后續(xù)進一步分享。
6、總結(jié)
整個離線、實時畫像的融合鏈路整體上如上所述。從數(shù)據(jù)準備、數(shù)據(jù)加工、數(shù)據(jù)融合到最終提供完整畫像,實際上類似于Lambda架構(gòu)。當然在批處理層,考慮到不同業(yè)務(wù)域?qū)+1日活畫像完整性的要求,我們采用了不同的處理方式。比如直接將這部分日活畫像寫到Redis中而不是通過懶加載方式去更新,這樣可以讓算法側(cè)自身去結(jié)合實際場景融合使用。另外一點就是在批處理層是否能夠進一步優(yōu)化,降低維護成本,比如HBASE的中間存儲,目前也在探索基于每天生成離線畫像的snapshot,直接從ODPS進行Load使用,也是在進一步探索如何充分利用離線畫像的同時降低開發(fā)成本。



































