Shuffle核心概念、Shuffle調(diào)優(yōu)及故障排除
Spark調(diào)優(yōu)之Shuffle調(diào)優(yōu)
本節(jié)開(kāi)始先講解Shuffle核心概念;然后針對(duì)HashShuffle、SortShuffle進(jìn)行調(diào)優(yōu);接下來(lái)對(duì)map端、reduce端調(diào)優(yōu);再針對(duì)Spark中的數(shù)據(jù)傾斜問(wèn)題進(jìn)行剖析及調(diào)優(yōu);最后是Spark運(yùn)行過(guò)程中的故障排除。
一、Shuffle的核心概念
1. ShuffleMapStage與ResultStage
ShuffleMapStage與ResultStage
在劃分stage時(shí),最后一個(gè)stage稱(chēng)為FinalStage,它本質(zhì)上是一個(gè)ResultStage對(duì)象,前面的所有stage被稱(chēng)為ShuffleMapStage。
ShuffleMapStage的結(jié)束伴隨著shuffle文件的寫(xiě)磁盤(pán)。
ResultStage基本上對(duì)應(yīng)代碼中的action算子,即將一個(gè)函數(shù)應(yīng)用在RDD的各個(gè)partition的數(shù)據(jù)集上,意味著一個(gè)job的運(yùn)行結(jié)束。
2. Shuffle中的任務(wù)個(gè)數(shù)
我們知道,Spark Shuffle分為map階段和reduce階段,或者稱(chēng)之為ShuffleRead階段和ShuffleWrite階段,那么對(duì)于一次Shuffle,map過(guò)程和reduce過(guò)程都會(huì)由若干個(gè)task來(lái)執(zhí)行,那么map task和reduce task的數(shù)量是如何確定的呢?
假設(shè)Spark任務(wù)從HDFS中讀取數(shù)據(jù),那么初始RDD分區(qū)個(gè)數(shù)由該文件的split個(gè)數(shù)決定,也就是一個(gè)split對(duì)應(yīng)生成的RDD的一個(gè)partition,我們假設(shè)初始partition個(gè)數(shù)為N。
初始RDD經(jīng)過(guò)一系列算子計(jì)算后(假設(shè)沒(méi)有執(zhí)行repartition和coalesce算子進(jìn)行重分區(qū),則分區(qū)個(gè)數(shù)不變,仍為N,如果經(jīng)過(guò)重分區(qū)算子,那么分區(qū)個(gè)數(shù)變?yōu)镸),我們假設(shè)分區(qū)個(gè)數(shù)不變,當(dāng)執(zhí)行到Shuffle操作時(shí),map端的task個(gè)數(shù)和partition個(gè)數(shù)一致,即map task為N個(gè)。
reduce端的stage默認(rèn)取spark.default.parallelism這個(gè)配置項(xiàng)的值作為分區(qū)數(shù),如果沒(méi)有配置,則以map端的最后一個(gè)RDD的分區(qū)數(shù)作為其分區(qū)數(shù)(也就是N),那么分區(qū)數(shù)就決定了reduce端的task的個(gè)數(shù)。
3. reduce端數(shù)據(jù)的讀取
根據(jù)stage的劃分我們知道,map端task和reduce端task不在相同的stage中,map task位于ShuffleMapStage,reduce task位于ResultStage,map task會(huì)先執(zhí)行,那么后執(zhí)行的reduce task如何知道從哪里去拉取map task落盤(pán)后的數(shù)據(jù)呢?
reduce端的數(shù)據(jù)拉取過(guò)程如下:
map task 執(zhí)行完畢后會(huì)將計(jì)算狀態(tài)以及磁盤(pán)小文件位置等信息封裝到MapStatus對(duì)象中,然后由本進(jìn)程中的MapOutPutTrackerWorker對(duì)象將mapStatus對(duì)象發(fā)送給Driver進(jìn)程的MapOutPutTrackerMaster對(duì)象;在reduce task開(kāi)始執(zhí)行之前會(huì)先讓本進(jìn)程中的MapOutputTrackerWorker向Driver進(jìn)程中的MapoutPutTrakcerMaster發(fā)動(dòng)請(qǐng)求,請(qǐng)求磁盤(pán)小文件位置信息;當(dāng)所有的Map task執(zhí)行完畢后,Driver進(jìn)程中的MapOutPutTrackerMaster就掌握了所有的磁盤(pán)小文件的位置信息。此時(shí)MapOutPutTrackerMaster會(huì)告訴MapOutPutTrackerWorker磁盤(pán)小文件的位置信息;完成之前的操作之后,由BlockTransforService去Executor0所在的節(jié)點(diǎn)拉數(shù)據(jù),默認(rèn)會(huì)啟動(dòng)五個(gè)子線程。每次拉取的數(shù)據(jù)量不能超過(guò)48M(reduce task每次最多拉取48M數(shù)據(jù),將拉來(lái)的數(shù)據(jù)存儲(chǔ)到Executor內(nèi)存的20%內(nèi)存中)。
二、HashShuffle解析
以下的討論都假設(shè)每個(gè)Executor有1個(gè)cpu core。
1. 未經(jīng)優(yōu)化的HashShuffleManager
shuffle write階段,主要就是在一個(gè)stage結(jié)束計(jì)算之后,為了下一個(gè)stage可以執(zhí)行shuffle類(lèi)的算子(比如reduceByKey),而將每個(gè)task處理的數(shù)據(jù)按key進(jìn)行“劃分”。所謂“劃分”,就是對(duì)相同的key執(zhí)行hash算法,從而將相同key都寫(xiě)入同一個(gè)磁盤(pán)文件中,而每一個(gè)磁盤(pán)文件都只屬于下游stage的一個(gè)task。在將數(shù)據(jù)寫(xiě)入磁盤(pán)之前,會(huì)先將數(shù)據(jù)寫(xiě)入內(nèi)存緩沖中,當(dāng)內(nèi)存緩沖填滿之后,才會(huì)溢寫(xiě)到磁盤(pán)文件中去。
下一個(gè)stage的task有多少個(gè),當(dāng)前stage的每個(gè)task就要?jiǎng)?chuàng)建多少份磁盤(pán)文件。比如下一個(gè)stage總共有100個(gè)task,那么當(dāng)前stage的每個(gè)task都要?jiǎng)?chuàng)建100份磁盤(pán)文件。如果當(dāng)前stage有50個(gè)task,總共有10個(gè)Executor,每個(gè)Executor執(zhí)行5個(gè)task,那么每個(gè)Executor上總共就要?jiǎng)?chuàng)建500個(gè)磁盤(pán)文件,所有Executor上會(huì)創(chuàng)建5000個(gè)磁盤(pán)文件。由此可見(jiàn),未經(jīng)優(yōu)化的shuffle write操作所產(chǎn)生的磁盤(pán)文件的數(shù)量是極其驚人的。
shuffle read階段,通常就是一個(gè)stage剛開(kāi)始時(shí)要做的事情。此時(shí)該stage的每一個(gè)task就需要將上一個(gè)stage的計(jì)算結(jié)果中的所有相同key,從各個(gè)節(jié)點(diǎn)上通過(guò)網(wǎng)絡(luò)都拉取到自己所在的節(jié)點(diǎn)上,然后進(jìn)行key的聚合或連接等操作。由于shuffle write的過(guò)程中,map task給下游stage的每個(gè)reduce task都創(chuàng)建了一個(gè)磁盤(pán)文件,因此shuffle read的過(guò)程中,每個(gè)reduce task只要從上游stage的所有map task所在節(jié)點(diǎn)上,拉取屬于自己的那一個(gè)磁盤(pán)文件即可。
shuffle read的拉取過(guò)程是一邊拉取一邊進(jìn)行聚合的。每個(gè)shuffle read task都會(huì)有一個(gè)自己的buffer緩沖,每次都只能拉取與buffer緩沖相同大小的數(shù)據(jù),然后通過(guò)內(nèi)存中的一個(gè)Map進(jìn)行聚合等操作。聚合完一批數(shù)據(jù)后,再拉取下一批數(shù)據(jù),并放到buffer緩沖中進(jìn)行聚合操作。以此類(lèi)推,直到最后將所有數(shù)據(jù)到拉取完,并得到最終的結(jié)果。
未優(yōu)化的HashShuffleManager工作原理如下圖所示:
未優(yōu)化的HashShuffleManager工作原理2. 優(yōu)化后的HashShuffleManager
為了優(yōu)化HashShuffleManager我們可以設(shè)置一個(gè)參數(shù):spark.shuffle.consolidateFiles,該參數(shù)默認(rèn)值為false,將其設(shè)置為true即可開(kāi)啟優(yōu)化機(jī)制,通常來(lái)說(shuō),如果我們使用HashShuffleManager,那么都建議開(kāi)啟這個(gè)選項(xiàng)。
開(kāi)啟consolidate機(jī)制之后,在shuffle write過(guò)程中,task就不是為下游stage的每個(gè)task創(chuàng)建一個(gè)磁盤(pán)文件了,此時(shí)會(huì)出現(xiàn)shuffleFileGroup的概念,每個(gè)shuffleFileGroup會(huì)對(duì)應(yīng)一批磁盤(pán)文件,磁盤(pán)文件的數(shù)量與下游stage的task數(shù)量是相同的。一個(gè)Executor上有多少個(gè)cpu core,就可以并行執(zhí)行多少個(gè)task。而第一批并行執(zhí)行的每個(gè)task都會(huì)創(chuàng)建一個(gè)shuffleFileGroup,并將數(shù)據(jù)寫(xiě)入對(duì)應(yīng)的磁盤(pán)文件內(nèi)。
當(dāng)Executor的cpu core執(zhí)行完一批task,接著執(zhí)行下一批task時(shí),下一批task就會(huì)復(fù)用之前已有的shuffleFileGroup,包括其中的磁盤(pán)文件,也就是說(shuō),此時(shí)task會(huì)將數(shù)據(jù)寫(xiě)入已有的磁盤(pán)文件中,而不會(huì)寫(xiě)入新的磁盤(pán)文件中。因此,consolidate機(jī)制允許不同的task復(fù)用同一批磁盤(pán)文件,這樣就可以有效將多個(gè)task的磁盤(pán)文件進(jìn)行一定程度上的合并,從而大幅度減少磁盤(pán)文件的數(shù)量,進(jìn)而提升shuffle write的性能。
假設(shè)第二個(gè)stage有100個(gè)task,第一個(gè)stage有50個(gè)task,總共還是有10個(gè)Executor(Executor CPU個(gè)數(shù)為1),每個(gè)Executor執(zhí)行5個(gè)task。那么原本使用未經(jīng)優(yōu)化的HashShuffleManager時(shí),每個(gè)Executor會(huì)產(chǎn)生500個(gè)磁盤(pán)文件,所有Executor會(huì)產(chǎn)生5000個(gè)磁盤(pán)文件的。但是此時(shí)經(jīng)過(guò)優(yōu)化之后,每個(gè)Executor創(chuàng)建的磁盤(pán)文件的數(shù)量的計(jì)算公式為:cpu core的數(shù)量 * 下一個(gè)stage的task數(shù)量,也就是說(shuō),每個(gè)Executor此時(shí)只會(huì)創(chuàng)建100個(gè)磁盤(pán)文件,所有Executor只會(huì)創(chuàng)建1000個(gè)磁盤(pán)文件。
優(yōu)化后的HashShuffleManager工作原理如下圖所示:
優(yōu)化后的HashShuffleManager工作原理

發(fā)表評(píng)論
請(qǐng)輸入評(píng)論內(nèi)容...
請(qǐng)輸入評(píng)論/評(píng)論長(zhǎng)度6~500個(gè)字
最新活動(dòng)更多
-
6月20日立即下載>> 【白皮書(shū)】精準(zhǔn)測(cè)量 安全高效——福祿克光伏行業(yè)解決方案
-
7月3日立即報(bào)名>> 【在線會(huì)議】英飛凌新一代智能照明方案賦能綠色建筑與工業(yè)互聯(lián)
-
7月22-29日立即報(bào)名>> 【線下論壇】第三屆安富利汽車(chē)生態(tài)圈峰會(huì)
-
7.30-8.1火熱報(bào)名中>> 全數(shù)會(huì)2025(第六屆)機(jī)器人及智能工廠展
-
7月31日免費(fèi)預(yù)約>> OFweek 2025具身機(jī)器人動(dòng)力電池技術(shù)應(yīng)用大會(huì)
-
免費(fèi)參會(huì)立即報(bào)名>> 7月30日- 8月1日 2025全數(shù)會(huì)工業(yè)芯片與傳感儀表展
推薦專(zhuān)題
- 1 AI 眼鏡讓百萬(wàn) APP「集體失業(yè)」?
- 2 大廠紛紛入局,百度、阿里、字節(jié)搶奪Agent話語(yǔ)權(quán)
- 3 深度報(bào)告|中國(guó)AI產(chǎn)業(yè)正在崛起成全球力量,市場(chǎng)潛力和關(guān)鍵挑戰(zhàn)有哪些?
- 4 上海跑出80億超級(jí)獨(dú)角獸:獲上市公司戰(zhàn)投,干人形機(jī)器人
- 5 國(guó)家數(shù)據(jù)局局長(zhǎng)劉烈宏調(diào)研格創(chuàng)東智
- 6 下一代入口之戰(zhàn):大廠為何紛紛押注智能體?
- 7 百億AI芯片訂單,瘋狂傾銷(xiāo)中東?
- 8 Robotaxi新消息密集釋放,量產(chǎn)元年誰(shuí)在領(lǐng)跑?
- 9 一文看懂視覺(jué)語(yǔ)言動(dòng)作模型(VLA)及其應(yīng)用
- 10 格斗大賽出圈!人形機(jī)器人致命短板曝光:頭腦過(guò)于簡(jiǎn)單