search
尋找貓咪~QQ 地點 桃園市桃園區 Taoyuan , Taoyuan

10 億級海量數據運算下,Apache Spark 的四個技術應用實踐

【51CTO.com原創稿件】2013 年,蘇寧大數據團隊以 Hadoop 生態系統為核心構建了整套大數據平台,為整個蘇寧集團所有業務團隊提供大數據的存儲以及計算能力。

在蘇寧中台供應鏈計算等應用場景下,我們基於 Apache Spark 來構建整套零售核心數據計算與分析平台,解決海量數據離線和在線計算時效和性能問題。

本文主要介紹 Apache Spark 如何實現蘇寧中台商品價格信息的 TB 級別複雜業務數據處理運算,以及其中碰到的問題和解決方案。

蘇寧大數據平台和整體框架結構

蘇寧大數據平台的整體架構以開源的基礎平台為主,輔助以自研的組件。

圖 1:蘇寧大數據平台架構

綜合商品價格運算系統中 Spark 的應用

在整個綜合商品價格運算系統中,我們對供應鏈等數據進行了整合,生成了目前全部可售商品的價格庫存等數據。

該數據的整合涉及到多個外圍系統的數據整合和業務的執行。在該項目中,我們運用 Spark 技術來解決海量數據抽取、海量數據運算的問題。

整體流程可以描述為:

  • 使用 Spark 從上游系統的 DB2、MySQL 生產環境備庫中抽取全量數據。
  • 使用 Spark 進行數據的關聯和聚合,將各個源頭數據加工轉換成計算所需要的數據維度。
  • 運用 Spark 的 Map 進行全量數據的運算轉換。
  • 存儲結果到 HDFS 中,並且在 Hive 表中建立外部表映射到 HDFS 目錄。

下面講述 Apache Spark 的四個技術應用實踐。

使用 DataFrame 實現異構資料庫海量數據抽取

數據處理的規劃:由於上游系統的數據,尚未同步到大數據存儲系統中(HDFS,Hive,HBase),項目需要獨立進行數據的 ETL 工作。

這些上游系統的數據存在以下特點:

  • 數據量較大:兩個主要數據源頭數據量在 10 億級。
  • 存儲介質不同:DB2,MySQL,Hive。
  • 存儲的分佈不同:業務庫有 10 個庫 1000 張表,也有 5 個庫 100 張表,和不分表等存儲結構,分庫的規則有取表名後綴的模,有取表名後綴的區間等。
  • 在系統需求上,又需要將整個運算任務壓縮到 1 小時之內。我們最終採用的方案是使用 SparkSQL 的 JDBC 介面直接進行數據的抽取和計算,相當於將數據 ETL 和數據的業務處理放置在一個程序中。
  • 相對於 Sqoop,這樣的解決方案是輕量級的,1000 個 DataFrame 的 Load 要比 1000 個 Sqoop 任務的資源消耗要低很多,以及調度開銷的消耗也少很多。
  • 便於數據業務代碼,業務針對動態表的切換,可以將讀取當前表編號的模塊直接嵌入到 Spark 的代碼中進行。

對於 DataFrame 在載入數據前的資料庫 Schema 性能問題,有了一個較好的優化方案,以下是優化前和優化后,DataFrame 在創建過程中的流程:

圖 3 :優化前 DataFrame 創建流程

圖 4 :優化后的 DataFrame 創建流程

在使用該方案后,任務 DataFrame 的 Load 數據時間從原先的接近 30 分鐘縮短為 5 分鐘以內。

使用 SparkSQL 結合 ZipPartition 實現多層次多維度數據關聯和優化

電商和互聯網的運營一般會涉及到數據維度的擴散,為了簡化運營端的操作難度和提高數據提供方的性能,一般會使用維度擴散的方案,將上游運營系統的數據進行數據的擴散,放置到下游數據使用方存儲中。

在進行維度擴散時經常會有數據層次的問題,數據需要多層的關聯和層次不同的數據需要關聯的情況。例如,上游運營的數據為全國、地區、城市維度並且有優先順序,下游數據服務方的維度統一為城市維度。

圖 5:多層關聯示意圖

在應用系統中,一般的做法是使用點關聯查詢,從最高優先順序的維度進行關聯,關聯到則返回,關聯不到則繼續向下一個優先順序進行關聯,直到最終結果。

但是在 Spark 中,我們是需要運用類似於資料庫關聯的模式解決該問題。

使用最高優先順序進行左關聯,然後過濾出未關聯到的數據,再依次將未關聯到的數據進行下面優先順序的關聯,直到生成結果。可以表示為:

  • DataSet LeftJoin DimensionA => DataSetA
  • DataSetA Filter(A.Field == NULL) => DataSetToJoinB
  • DataSetA Filter(A.Field != NULL) => DataSetAFinal
  • DataSetToJoinB LeftJoin DimensionB => DataSetB
  • DataSetFinal = DataSetAFinal UNION DataSetBFinal UNION DataSetCFinal

這裡就出現一個問題:

  • 同樣的數據被多次使用,這裡從技術上可以採用 Cache 的方法應對。
  • 在使用 Cache 對數據緩存的方法上,假如最高優先順序的數據少,則實際上大量的數據都會需要 Cache 並且落到最後一層。

針對可能緩存多次數據的問題,我們嘗試了另外一種方法,全部進行左關聯,並且帶上優先順序。最終,我們使用 Group By 的方法對優先順序進行了處理,可以表示為:

  • DataSet LeftJoin DimensionA with A => DataSetA
  • DataSet LeftJoin DimensionB with B => DataSetB
  • DataSet LeftJoin DimensionC with C => DataSetC
  • DataSetA UNION DataSetB UINION DataSetC => DataSetToGroupBy
  • DataSetFinal = (DataSetToGroupBy GroupBy Dimension).ApplyPriority

這裡方案就只需要進行一次的數據 Cache。

最終我們根據兩種方案對實際測試的結果進行取捨,該部分的優化和一般的資料庫優化一樣,都需要考慮到實際的數據關聯的情況和業務要求,獲取最優化的方案。已達到最優的任務運行效益。

通過并行控制 DAG,優化執行時間

在進行複雜業務的處理過程中,我們發現有部分數據未進行分表分庫,並且數據量相對較大(約 20 億),這部分數據的載入效率直接影響了後面的整個效率。

例如,我們正常使用 128 核進行運算,但一旦運行到該步驟,則變成了單個核進行運行,時間長達 5 分鐘。而 5 分鐘對於系統運算有嚴格時間區間要求的業務需求是非常嚴峻的。

針對該問題,我們使用 Driver 端并行提交任務的方法進行解決。這是根據我們的任務模式所決定的,Driver 需要大量的時間建立數據的整個流程(4000 多張表的 DataFrame),並在最終存儲結果 action 代碼執行之前就進行數據的載入。

具體操作流程是:

  • 將該部分數據標記為 Cache。
  • 執行 countSync 直接進行數據載入。
  • 在必須用到該步驟的流程進行 Get 命令阻塞 Driver 主線程,確保數據載入後進行後續的操作。
  • 後續步驟直接使用已經緩存的數據進行運算。

該方案可以用下圖表示:

圖 6:并行優化示意圖

計算結果是通過并行的載入后,我們將整個的流程縮短了 5 分鐘。我們也在其他對於運行時間有嚴格要求的項目中使用該方法對於業務流程中需要獨立計算,資源佔用小,但是耗時較長的模塊進行了優化,都一定程度上縮短了所需要的時間。

Spark 的 ClassLoader 所帶來的問題和解決方案

該問題出現的原因是公司對運維和研發的數據隔離有明確的要求,代碼和研發的配置中不允許出現生產環境的資料庫配置。

原先的配置都是由運維在 Websphere,Wildfly 等中間件中,通過 JNDI 的方法配置給實際的應用程序。

但是項目中又需要使用 JDBC 去直接連接生產環境的資料庫,這樣就帶來了資料庫連接的問題。

我們採用自定義的 JDBC 封裝原先的 JDBC,讓外層的 JDBC 以 Token 的方式獲取實際的資料庫連接,並且由實際的 JDBC 進行操作。

在這種需求下,我們在處理過程中考慮到目前分散式協調組件的壓力,將資料庫的 Token 封裝到 Jar 包,使用 ClassLoader 去讀取 Token 數據。

正常情況下,我們在 Spark 端使用到的 Classloader 順序來載入 Token 文件:

ExtClassLoader-> AppClassLoader -> MutableURLClassLoader.

然而,在集群運行時,我們在 Driver 主線程中的 ClassLoader 是 AppClassLoader,而它無法讀取到 http 的 jar 包裡面的 token 文件。

我們進行了簡單的方案,將當期線程中的 ClassLoader 替換成為當期業務代碼類的 ClassLoader。

Spark 應用的實踐總結

我們在價格運算系統中使用了 SOA 模式中的 Aggregate Reporting 模式,將多個產品線的數據進行了整合,提供了一個業務聚合的數據集市。

在模塊設計上,我們將數據的抽取和數據的運算集成到一起,這樣在帶來效率和便利的同時也必然帶來模塊耦合。

從目前的實踐中,可以得出幾個結論:

  • 使用 JDBC 抽取大量數據表直接進行計算是可行的,尤其針對分表分庫和可以進行并行抽取的資料庫。但是有幾點需要特別關註:

①DataFrame 在大量創建時,對帶來的 Driver 端時間消耗需要進行優化,減少數據流轉的時間。

②數據源中一旦有任何一個出現失敗,整個任務就可能需要重新運算,這個是將 ETL 和運算放到一個模塊帶來的問題。

③DataFrame 大量并行抽取時,對資料庫的 IO 壓力是比較大的,Spark 難以對這部分的并行度進行控制。

需要根據實際情況考慮使用合併抽取(Union多張表),或者進行任務拆分。任務拆分可以解決抽取時并行度和計算時并行度和資源要求不一樣的問題。

  • ZipPartition 是數據關聯的最終解決方案,通過 SparkSQL 和 RDD 的 JOIN 時維度層次不一樣,或者其他難以 JOIN 的問題都可以通過這個方法解決。

但由於 ZipPartition 是 RDD 底層的操作,開發人員幾乎完全控制 Worker 的關聯模型,它的性能調優顯得尤其重要。

而對於有層次結構的關聯,使用 JOIN – FILTER –UNION 和 JOIN – UNION –GROUPBY,則需要分析實際數據的情況,再進行取捨。

  • 在 Driver 端進行并行可以解決部分計算時間的問題,但需要滿足幾個條件:

①可以并行的時間點,必須是在集群相對空閑的時候,例如:Driver 端在進行初始化 RDD 的時候,在編排大型 DAG 的時候。

②可并行的數據必須在最終使用之前確認載入到 Cache。

③不應該對過於大的數據集進行并行載入操作。

  • Spark 在 ClassLoader 層面進行了變動,將配置文件封裝在 jar 包中有可能無法讀取,正確的做法是將配置信息使用同一配置管理,例如基於 Zookeeper 配置。

蘇寧未來的開發方向

蘇寧在 IT 的系統架構方面未來會根據各種不同的應用需求使用異構化的數據存儲,在數據架構層面逐漸向 DataLake+Big DataWarehousing 的模式發展。

數據服務必須具備以下能力:

  • 數據治理能力:各個系統通過統一的規範進行數據的業務分類,數據輸出。其他系統和分析人員可以使用統一的數據規範運用數據。

數據需要能夠支持快速的輸出,並且保持與業務系統實時同步。

  • 數據分析能力:在統一數據治理的基礎上進行數據的分析,挖掘數據對於運營系統和銷售系統的價值。

我們將使用 Apache Spark 平台結合 Hadoop 生態圈的其他工具進行開發,逐步形成以 Spark,Storm 等為引擎的一體化數據處理分析平台,提升整個蘇寧的數據運用能力。

話題討論

當前大數據架構最火熱的莫過於分散式計算架構 Hadoop 和流數據處理框架 Spark/Storm 這兩類。

網上逐漸有一種聲音說 Hadoop 的日子已經快到頭了,真是這樣嗎?未來大數據架構究竟該走向何方呢? 歡迎大家直接在底部參與評論!

王卓偉

蘇寧雲商 IT 總部高級技術經理

擁有多年 IT 平台研發和管理工作經驗,先後在聯創,焦點,蘇寧等大型互聯網和 IT 企業工作,現主要承擔蘇寧易購價格數據分析和實時處理優化提升,負責價格搜索響應提升系統,在大數據和數據分析上有多年的實戰經驗,專註 Java,大數據,SOA 等技術領域。



熱門推薦

本文由 yidianzixun 提供 原文連結

寵物協尋 相信 終究能找到回家的路
寫了7763篇文章,獲得2次喜歡
留言回覆
回覆
精彩推薦