為了解決分散式資料庫下,複雜的 SQL(如全局性的排序、分組、join、子查詢,特別是非均衡欄位的這些邏輯操作)難以實現的問題;在有了一些分散式資料庫和 Hadoop 實際應用經驗的基礎上,對比兩者的優點和不足,加上自己的一些提煉和思考, 設計了一套綜合兩者的系統,利用兩者的優點, 補充兩者的不足。具體的說, 使用資料庫水平分割的思想實現數據存儲,使用 MapReduce的思想實現 SQL 計算。
這裡的資料庫水平分割的意思是只分庫不分表,對於不同數量級別的表,分庫的數量可以不一樣,例如 1 億的數據量分 10 個分庫,10 億的分 50 個分庫。對於使用 MapReduce的思想實現計算 ; 對於一個需求,轉換成一個或多個有依賴關係的SQL,其中的每個SQL分解成一個或多個 MapReduce任務,每個 MapReduce任務又包含 mapsql、洗牌(shuffle)、reducesql,這個過程可以理解為類似 hive,區別是連 MapReduce任務中的 map 和 reduce 操作也是通過 SQL 實現, 而非 Hadoop 中的 map 和 reduce 操作.
這是基本的 MapReduce的思想,但是在 Hadoop 的生態圈中, 第一代的 MapReduce將結果存儲於磁碟,第二代的 MapReduce根據內存使用情況將結果存儲於內存或磁碟,類比一下用資料庫來存儲,那麼 MapReduce 的結果就是存儲在表中,而資料庫的緩存機制天然支持根據內存情況決定存儲在內存還是磁碟 ; 另外,Hadoop 生態圈中, 計算模型也並非一種,這裡的 MapReduce的計算思想,可以用類似 spark 的 RDD 迭代計算方式來替代 ; 本系統還是基於 MapReduce來說明的。
架構
根據以上的思想, 系統的架構如下:
沒有代理節點有代理節點
模塊說明
關於系統中的模塊,由於和絕大部分的分散式系統類似,這裡僅做簡要說明:
名稱 | 說明 |
協調節點,也叫代理節點(proxy node) | 實現常用的資料庫客戶端和服務端協議,接收客戶端請求,並且將請求轉換成執行計劃,獲取執行結果,發送給客戶端 |
客戶端(client) | 發送請求和接收執行結果的;如果是沒有協調節點,那麼客戶端也負責協調節點的工作 |
資料庫節點(db node) | 用於存儲實際的數據,運行接收的mapsql和reducesql |
主控機(master) | 是主進程,管理各個模塊和元數據 |
元資料庫(meta database) | 存儲系統的元數據的地方 |
無代理節點的時候,客戶端擔負著比較大的工作,包括:發送請求、解析 SQL、生成執行計劃、申請資源、安排執行、獲取結果等;有代理節點的時候,代理節點擔負著接受請求、解析 SQL、生成執行計劃、申請資源、安排執行、返回結果給客戶端等大部分責任,另外代理節點提供支持外部協議的介面,如 mysql 的 c/s 協議,使用 mysql 的命令行可以直接連接進來執行 SQL,整個系統就像普通的 mysql server 一樣。
應用架構
實際應用環境可能是正式環境一套, 正式備份環境一套, 線下環境一套, 可以按照如下的架構進行部署。
基本概念 說明
下面針對架構中的一些概念做些說明
概念 | 說明 |
分散式表 | 類似關係型表的概念,只不過數據是分佈在不同的資料庫節點上,通過某個欄位將數據水平分割到不同的分庫的表中 |
分散式表的分割欄位,也叫均衡欄位 | 存儲數據的時候決定將數據插入分散式表的某個分庫的依據欄位,如常用的用戶id |
分散式表的分割方法,也叫均衡策略 | 存儲的時候決定如何根據分割欄位將數據插入分散式表的方法,如列表,範圍,取模hash |
計算的洗牌欄位 | 類似存儲數據時候的分割欄位,MapReduce計算的時候,將數據插入reduce端資料庫表中所依據的欄位;是通過分析SQL得到 |
計算的洗牌方法 | 類似存儲的時候,在MapReduce計算的時候,決定如何根據洗牌欄位將數據插入reduce端資料庫表中的方法 |
任務樹,也叫階段樹 | 根據客戶端輸入的SQL,進行分析得到的執行計劃 |
任務節點,也叫階段(stage) | 是任務樹的某個節點,其實就是MapReduce任務;包含map,洗牌和reduce過程 |
下面說明常用的增刪改查如何執行, 特別是查詢操作
增刪改操作
當插入數據的時候,根據均衡欄位和均衡策略將記錄插入到對應的資料庫節點中。
當更新數據的時候,需要根據均衡策略判斷數據更新前的和更新后的資料庫節點是否變化:如果沒有變化,直接更新;如果有變化,在更新前的資料庫節點中刪除老數據,在更新后的資料庫節點中插入新數據。
當刪除數據的時候,根據均衡策略在相應的資料庫節點中刪除。
這三種變更數據的操作,只要涉及到多個節點的數據變更,都需要使用分散式事務保證一致性、原子性等事務特性。
查詢操作
查詢操作的原理類似 hive,大家可以對比來理解 ; 為了方便解釋查詢操作, 首先來說明階段樹和階段的結構,如下圖所示:
階段樹階段
查詢步驟
結合上面的圖, 查詢操作的具體過程如下:
將輸入 SQL 經過詞法、語法、語義分析,集合表結構信息和數據分佈信息,生成包含多個階段(簡稱 stage)的執行計劃,這些階段具有一定的依賴關係,形成多輸入單輸出的任務樹。
每個階段包括兩種 SQL,稱為 mapsql 和 reducesql,另外每個階段包括三個操作,map、數據洗牌和 reduce;map 和 reduce 分別執行 mapsql 和 reducesql。
先在不同的資料庫節點中執行 map 操作,map 操作執行 mapsql,它的輸入是每個資料庫節點上的表內部的數據,輸出根據某個欄位按照一定的規則進行分割,放到不同的結果集中,結果集作為數據洗牌的輸入。
然後執行數據洗牌的過程,將不同結果集拷貝到不同的將要執行 reduce 的資料庫節點上。
在不同的資料庫節點中執行 reduce 操作,reduce 操作執行 reducesql;
最後返回結果。
例子
由於系統核心在於存儲和計算, 下面對存儲和計算相關的概念舉例說明
均衡策略
舉例說明均衡策略,基本信息如下:表名字:tab_user_login表描述:用於存儲用戶登錄信息節點數:4,分為 0、1、2、3
欄位 | 欄位類型 | 描述 |
u_id | int | 用戶id |
login_ip | varchar | 用戶登錄ip |
login_province | varchar | 登錄省份 |
login_dt | timestamp | 用戶登錄時間 |
舉例說下如下的幾種策略:
登錄省份 | 節點id |
北京 | 0 |
廣東 | 1 |
黑龍江 | 2 |
湖南 | 3 |
……. | ……. |
河南 | 0 |
浙江 | 1 |
遼寧 | 2 |
四川 | 3 |
用戶id%4 | 節點id |
0 | 0 |
1 | 1 |
2 | 2 |
3 | 3 |
用戶id範圍 | 節點id |
0<=value<2500w | 0 |
2500w <=value<5000w | 1 |
5000w<=value<7500w | 2 |
7500w<=value<1億 | 3 |
(u_id/10000) % 4 | 節點id |
0 | 0 |
1 | 1 |
2 | 2 |
3 | 3 |
舉例說明查詢操作,基本信息如下:
欄位 | 欄位類型 | 欄位描述 |
u_id | Int | 用戶id |
u_name | Varchar | 用戶姓名 |
u_reg_dt | Timestamp | 用戶註冊時間 |
u_addr | Varchar | 用戶地址 |
u_age | Int | 用戶年齡 |
用戶登錄表 tab_login_info 的結構如下:
欄位 | 欄位類型 | 欄位描述 |
u_id | Int | 用戶id |
login_ip | Int | 登錄ip |
login_dt | Timestamp | 登錄時間 |
login_product | Varchar | 登錄到哪個產品中 |
排序的關鍵點是節點之間存在大小關係,大的 key 或者 key 範圍放到節點 id 大的節點上,然後在節點上排序,獲取數據的時候根據節點 id 大小依次獲取。
以如下 sql 為例,某一註冊時間範圍內的用戶信息,按照年齡和 id 排序:
select * from tab_user_info t where u_reg_dt>=? and u_reg_dt<=? order by u_id
執行計劃可能為:
Map:
Shuffle:
執行完成之後,這種情況下由於需要按照 u_id 進行數據洗牌,所以各個存儲節點上需要按照 u_id 進行劃分。例如有 N 個計算節點,那麼按照(最大 u_id- 最小 u_id)/N 平均劃分,將不同存儲節點上的同一範圍的 u_id,劃分到同一個計算節點上即可(這裡的計算節點存在大小關係)。
Reduce:
select * from tab_user_info t order by u_id
關鍵點和排序類似,節點之間存在大小關係,大的 key 或者 key 範圍放到節點 id 大的節點上,然後在節點上分組聚合,獲取數據的時候根據節點 id 大小依次獲取。
以如下 sql 為例,某一註冊時間範圍內的用戶,按照年齡分組,計算每個分組內的用戶數:
select age,count(u_id) v from tab_user_info t where u_reg_dt>=? and u_reg_dt<=? group by age
執行計劃可能為:
Map:
Shuffle:
執行完成之後,這種情況下由於需要按照 age 進行數據洗牌,考慮到 age 的唯一值比較少,所以數據洗牌可以將所有的記錄拷貝到同一個計算節點上。
Reduce:
select age,sum(v) from t where group by age
連接
首先明確 join 的欄位類型為數字類型和字元串類型,其他類型如日期可以轉換為這兩種。數字類型的排序很簡單,字元串類型的數據排序需要確定規則,類似 mysql 中的 collation,比較常用的是按照 unicode 編碼順序,按照實際存儲節點的大小等;其次 join 的方式有等值 join 和非等值 join;以如下常用且比較簡單的情況為例。
以如下 sql 為例,某一註冊時間範圍內的用戶的所有登錄信息:
select t1.u_id,t1.u_name,t2.login_product
from tab_user_info t1 join tab_login_info t2
on (t1.u_id=t2.u_id and t1.u_reg_dt>=? and t1.u_reg_dt<=?)
執行計劃可能為:
Map:
由於是 join,所有的表都要進行查詢操作,並且為每張表打上自己的標籤,具體實施的時候可以加個表名字欄位,在所有存儲節點上執行
select u_id,u_name from tab_user_info t where u_reg_dt>=? and t1.u_reg_dt<=?
select u_id, login_product from tab_login_info t
Shuffle:這種情況下由於需要按照 u_id 進行數據洗牌,考慮到 u_id 的唯一值比較多,所以各個存儲節點上需要按照 u_id 進行劃分,例如有 N 個計算節點,那麼按照(最大 u_id- 最小 u_id)/N 平均劃分,將不同存儲節點上的同一範圍的 u_id,劃分到同一個計算節點上。
Reduce:
select t1.u_id,t1.u_name,t2.login_product
from tab_user_info t1 join tab_login_info t2
on (t1.u_id=t2.u_id)
子查詢
由於子查詢可以分解成具有依賴關係的不包含子查詢的 SQL,所以生成的執行計劃,就是多個 SQL 的執行計劃按照一定的依賴關係進行依次執行。
與已有系統的區別和優點
相比 hdfs 來說,數據的分佈是有規則的,hdfs 需要啟動之後執行命令去查詢文件具體在什麼節點上;元數據的較小,記錄規則即可,管理成本較低,在啟動速度方面很快。
數據是放在資料庫中的,可以很好的使用索引和資料庫本身的緩存機制,大大提高數據查詢的效率,特別是在大量數據的情況下,利用索引查詢返回少量的數據。
數據可以進行刪除和修改,這在基於 hdfs 的系統中一般比較麻煩和低效。
在計算方面,和 MapReduce 或者其他的分散式計算框架(如 spark)並沒有本質的區別(需要進行 shuffle)。但是由於數據的分佈是有規則的,在有些地方可以做的更好,在分散式全文索引體現。
由於線上系統一般使用資料庫作為最終的存儲位置,而把資料庫同步到 hdfs 中是比較麻煩的,並且對於有刪除和更新的情況,同步數據麻煩低效,速度較慢;相比之下,這個方案可以使用資料庫本身提供的鏡像複製功能來同步,基本沒有額外的麻煩和低效的工作。
基於以上,可以把線上系統(主系統)和線下的數據分析挖掘(從系統)做成統一的方案, 參見應用架構圖。
最後列舉一些應用場景
應用場景 | 說明 |
線上資料庫 | 適用於數據量大、併發大、需要分庫分表的情況,並且能兼容各種 SQL,這是最直接且比較合適的場景 |
數據分析 | 由於系統解決了分散式資料庫情況下的複雜 SQL 的執行問題,這也非常容易理解的 |
機器學習 | 機器學習的邏輯也可以通過 SQL 來實現 |
搜索引擎 | 通過設計三張分散式表,文檔表,單詞表,語料庫表 ; 文檔表使用文檔 id 作為均衡欄位,單詞表使用單詞 id 作為均衡欄位,語料庫表使用單詞 id 作為均衡欄位,結合搜索引擎的思想,抓取,分析,存儲 (索引),搜索等步驟,將其中的存儲 (索引),搜索移植到分散式資料庫上即可 ; 其實就是換了存儲,傳統的搜索引擎使用倒排表存儲,如 Lucene,現在使用分散式資料庫 |
流計算 | 類似 storm,spark streaming,要實現流計算,需要添加其他的組件,如開放服務埠,定時進行 SQL 計算,為了在速度要求比較高,非精確計算的場景查,可以實現類似布隆過濾器來實現唯一值的邏輯計算,而不需要每次使用 SQL 掃描大量數據,只需要每次到布隆過濾查詢一次即可 |
作者介紹
江和慧,目前就職稅友軟體,曾經任職網易,專註數據處理領域MySQL、Hadoop,分散式資料庫