基于Spark的數(shù)據(jù)分析實(shí)踐
三、SparkSQL
Spark 從 1.3 版本開始原有 SchemaRDD 的基礎(chǔ)上提供了類似Pandas DataFrame API。新的DataFrame API不僅可以大幅度降低普通開發(fā)者的學(xué)習(xí)門檻,同時(shí)還支持Scala、Java與Python三種語言。更重要的是,由于脫胎自SchemaRDD,DataFrame天然適用于分布式大數(shù)據(jù)場(chǎng)景。
一般的數(shù)據(jù)處理步驟:讀入數(shù)據(jù) -> 對(duì)數(shù)據(jù)進(jìn)行處理 -> 分析結(jié)果 -> 寫入結(jié)果
SparkSQL 結(jié)構(gòu)化數(shù)據(jù)
處理結(jié)構(gòu)化數(shù)據(jù)(如 CSV,JSON,Parquet 等);
把已經(jīng)結(jié)構(gòu)化數(shù)據(jù)抽象成 DataFrame (HiveTable);
非結(jié)構(gòu)化數(shù)據(jù)通過 RDD.map.filter 轉(zhuǎn)換成結(jié)構(gòu)化進(jìn)行處理;
按照列式數(shù)據(jù)庫,只加載非結(jié)構(gòu)化中可結(jié)構(gòu)化的部分列(Hbase,MongoDB);
處理非結(jié)構(gòu)化數(shù)據(jù),不能簡(jiǎn)單的用 DataFrame 裝載。而是要用 SparkRDD 把數(shù)據(jù)讀入,在通過一系列的 Transformer Method 把非結(jié)構(gòu)化的數(shù)據(jù)加工為結(jié)構(gòu)化,或者過濾到不合法的數(shù)據(jù)。
SparkSQL DataFrame
SparkSQL 中一切都是 DataFrame,all in DataFrame. DataFrame是一種以RDD為基礎(chǔ)的分布式數(shù)據(jù)集,類似于傳統(tǒng)數(shù)據(jù)庫中的二維表格。DataFrame與RDD的主要區(qū)別在于,前者帶有schema元信息,即DataFrame所表示的二維表數(shù)據(jù)集的每一列都帶有名稱和類型。如果熟悉 Python Pandas 庫中的 DataFrame 結(jié)構(gòu),則會(huì)對(duì) SparkSQL DataFrame 概念非常熟悉。
TextFile DataFrame
import.org.a(chǎn)pache.spark.sql._//定義數(shù)據(jù)的列名稱和類型valdt=StructType(List(id:String,name:String,gender:String,age:Int))
//導(dǎo)入user_info.csv文件并指定分隔符vallines = sc.textFile("/path/user_info.csv").map(_.split(","))
//將表結(jié)構(gòu)和數(shù)據(jù)關(guān)聯(lián)起來,把讀入的數(shù)據(jù)user.csv映射成行,構(gòu)成數(shù)據(jù)集valrowRDD = lines.map(x=>Row(x(0),x(1),x(2),x(3).toInt))
//通過SparkSession.createDataFrame()創(chuàng)建表,并且數(shù)據(jù)表表頭val df= spark.createDataFrame(rowRDD, dt)
可左右滑動(dòng)查看代碼
讀取規(guī)則數(shù)據(jù)文件作為DataFrame
SparkSession.Builder builder = SparkSession.builder()Builder.setMaster("local").setAppName("TestSparkSQLApp")SparkSession spark = builder.getOrCreate();SQLContext sqlContext = spark.sqlContext();
# 讀取 JSON 數(shù)據(jù),path 可為文件或者目錄valdf=sqlContext.read().json(path);
# 讀取 HadoopParquet 文件vardf=sqlContext.read().parquet(path);
# 讀取 HadoopORC 文件vardf=sqlContext.read().orc(path);
可左右滑動(dòng)查看代碼
JSON 文件為每行一個(gè) JSON 對(duì)象的文件類型,行尾無須逗號(hào)。文件頭也無須[]指定為數(shù)組;SparkSQL 讀取是只是按照每行一條 JSON Record序列化;
Parquet文件
Configurationconfig = new Configuration();ParquetFileReaderreader = ParquetFileReader.open( HadoopInputFile.fromPath(new Path("hdfs:///path/file.parquet"),conf));Map<String, String>schema = reader.getFileMetaData().getKeyValueMetaData();String allFields= schema.get("org.a(chǎn)pache.spark.sql.parquet.row.metadata");
可左右滑動(dòng)查看代碼
allFiedls 的值就是各字段的名稱和具體的類型,整體是一個(gè)json格式進(jìn)行展示。
讀取 Hive 表作為 DataFrame
Spark2 API 推薦通過 SparkSession.Builder 的 Builder 模式創(chuàng)建 SparkContext。 Builder.getOrCreate() 用于創(chuàng)建 SparkSession,SparkSession 是 SparkContext 的封裝。
在Spark1.6中有兩個(gè)核心組件SQLcontext和HiveContext。SQLContext 用于處理在 SparkSQL 中動(dòng)態(tài)注冊(cè)的表,HiveContext 用于處理 Hive 中的表。
從Spark2.0以上的版本開始,spark是使用全新的SparkSession接口代替Spark1.6中的SQLcontext和HiveContext。SQLContext.sql 即可執(zhí)行 Hive 中的表,也可執(zhí)行內(nèi)部注冊(cè)的表;
在需要執(zhí)行 Hive 表時(shí),只需要在 SparkSession.Builder 中開啟 Hive 支持即可(enableHiveSupport())。
SparkSession.Builder builder = SparkSession.builder().enableHiveSupport();SparkSession spark = builder.getOrCreate();SQLContext sqlContext = spark.sqlContext();
可左右滑動(dòng)查看代碼
// db 指 Hive 庫中的數(shù)據(jù)庫名,如果不寫默認(rèn)為 default
// tableName 指 hive 庫的數(shù)據(jù)表名
sqlContext.sql(“select * from db.tableName”)
可左右滑動(dòng)查看代碼
SparkSQL ThriftServer
//首先打開 Hive 的 Metastore服務(wù)
hive$bin/hive –-service metastore –p 8093
可左右滑動(dòng)查看代碼
//把 Spark 的相關(guān) jar 上傳到hadoophdfs指定目錄,用于指定sparkonyarn的依賴 jar
spark$hadoop fs –put jars/*.jar /lib/spark2
可左右滑動(dòng)查看代碼
// 啟動(dòng) spark thriftserver 服務(wù)
spark$ sbin/start-thriftserver.sh --master yarn-client --driver-memory 1G --conf spark.yarn.jars=hdfs:///lib/spark2/*.jar
可左右滑動(dòng)查看代碼
當(dāng)hdfs 上傳了spark 依賴 jar 時(shí),通過spark.yarn.jars 可看到日志 spark 無須每個(gè)job 都上傳jar,可節(jié)省啟動(dòng)時(shí)間
19/06/1114:08:26 INFO Client: Source and destination file systems are the same. Notcopying hdfs://localhost:9000/lib/spark2/snappy-java-1.0.5.jar19/06/1114:08:26 INFO Client: Source and destination file systems are the same. Notcopying hdfs://localhost:9000/lib/spark2/snappy-java-1.1.7.3.jar
可左右滑動(dòng)查看代碼
//通過 spark bin 下的 beeline 工具,可以連接到 spark ThriftServer(SparkOnHive)
bin/beeline -u jdbc:hive2://ip:10000/default -n hadoop
可左右滑動(dòng)查看代碼
-u 是指定 beeline 的執(zhí)行驅(qū)動(dòng)地址;
-n 是指定登陸到 spark Session 上的用戶名稱;
Beeline 還支持傳入-e 可傳入一行 SQL,
-e <query> query that should be executed
也可通過 –f 指定一個(gè) SQL File,內(nèi)部可用逗號(hào)分隔的多個(gè) SQL(存儲(chǔ)過程)
-f <exec file> script file that should be executed
SparkSQL Beeline 的執(zhí)行效果展示
SparkSQL ThriftServer
對(duì)于 SparkSQL ThriftServer 服務(wù),每個(gè)登陸的用戶都有創(chuàng)建的 SparkSession,并且執(zhí)行的對(duì)個(gè) SQL 會(huì)通過時(shí)間順序列表展示。
SparkSQL ThriftServer 服務(wù)可用于其他支持的數(shù)據(jù)庫工具創(chuàng)建查詢,也用于第三方的 BI 工具,如 tableau。
四、SparkSQL Flow
SparkSQL Flow 是以 SparkSQL 為基礎(chǔ),開發(fā)的統(tǒng)一的基于 XML 配置化的可執(zhí)行一連串的 SQL 操作,這一連串的 SQL 操作定義為一個(gè) Flow。下文開始 SparkSQL Flow 的介紹:
SparkSQL Flow 是基于 SparkSQL 開發(fā)的一種基于 XML 配置化的 SQL 數(shù)據(jù)流轉(zhuǎn)處理模型。該模型簡(jiǎn)化了 SparkSQL 、Spark RDD的開發(fā),并且降低開發(fā)了難度,適合了解數(shù)據(jù)業(yè)務(wù)但無法駕馭大數(shù)據(jù)以及 Spark 技術(shù)的開發(fā)者。
一個(gè)由普元技術(shù)部提供的基于 SparkSQL 的開發(fā)模型;
一個(gè)可二次定制開發(fā)的大數(shù)據(jù)開發(fā)框架,提供了靈活的可擴(kuò)展 API;
一個(gè)提供了 對(duì)文件,數(shù)據(jù)庫,NoSQL 等統(tǒng)一的數(shù)據(jù)開發(fā)視界語義;
基于 SQL 的開發(fā)語言和 XML 的模板配置,支持 Spark UDF 的擴(kuò)展管理;
支持基于 Spark Standlone,Yarn,Mesos 資源管理平臺(tái);
支持開源、華為、星環(huán)等平臺(tái)統(tǒng)一認(rèn)證。
SparkSQL Flow 適合的場(chǎng)景:
批量 ETL;
非實(shí)時(shí)分析服務(wù);
SparkSQL Flow XML 概覽
Properties 內(nèi)定義一組變量,可用于宏替換;
Methods 內(nèi)可注冊(cè) udf 和 udaf 兩種函數(shù);
Prepare 內(nèi)可定義前置 SQL,用于執(zhí)行 source 前的 sql 操作;
Sources 內(nèi)定義一個(gè)到多個(gè)數(shù)據(jù)表視圖;
Transformer 內(nèi)可定義 0 到多個(gè)基于 SQL 的數(shù)據(jù)轉(zhuǎn)換操作(支持 join);
Targets 用于定義 1 到多個(gè)數(shù)據(jù)輸出;
After 可定義 0到多個(gè)任務(wù)日志;
如你所見,source 的 type 參數(shù)用于區(qū)分 source 的類型,source 支持的種類直接決定SparkSQL Flow 的數(shù)據(jù)源加載廣度;并且,根據(jù) type 不同,source 也需要配置不同的參數(shù),如數(shù)據(jù)庫還需要 driver,url,user和 password 參數(shù)。
Transformer 是基于 source 定的數(shù)據(jù)視圖可執(zhí)行的一組轉(zhuǎn)換 SQL,該 SQL 符合 SparkSQL 的語法(SQL99)。Transform 的 SQL 的執(zhí)行結(jié)果被作為中間表命名為 table_name 指定的值。
Targets 為定義輸出,table_name 的值需在 source 或者 Transformer 中定義。

發(fā)表評(píng)論
請(qǐng)輸入評(píng)論內(nèi)容...
請(qǐng)輸入評(píng)論/評(píng)論長(zhǎng)度6~500個(gè)字
最新活動(dòng)更多
-
6月20日立即下載>> 【白皮書】精準(zhǔn)測(cè)量 安全高效——福祿克光伏行業(yè)解決方案
-
7月3日立即報(bào)名>> 【在線會(huì)議】英飛凌新一代智能照明方案賦能綠色建筑與工業(yè)互聯(lián)
-
7月22-29日立即報(bào)名>> 【線下論壇】第三屆安富利汽車生態(tài)圈峰會(huì)
-
7.30-8.1火熱報(bào)名中>> 全數(shù)會(huì)2025(第六屆)機(jī)器人及智能工廠展
-
7月31日免費(fèi)預(yù)約>> OFweek 2025具身機(jī)器人動(dòng)力電池技術(shù)應(yīng)用大會(huì)
-
免費(fèi)參會(huì)立即報(bào)名>> 7月30日- 8月1日 2025全數(shù)會(huì)工業(yè)芯片與傳感儀表展
推薦專題
- 1 AI 眼鏡讓百萬 APP「集體失業(yè)」?
- 2 大廠紛紛入局,百度、阿里、字節(jié)搶奪Agent話語權(quán)
- 3 深度報(bào)告|中國(guó)AI產(chǎn)業(yè)正在崛起成全球力量,市場(chǎng)潛力和關(guān)鍵挑戰(zhàn)有哪些?
- 4 上海跑出80億超級(jí)獨(dú)角獸:獲上市公司戰(zhàn)投,干人形機(jī)器人
- 5 國(guó)家數(shù)據(jù)局局長(zhǎng)劉烈宏調(diào)研格創(chuàng)東智
- 6 下一代入口之戰(zhàn):大廠為何紛紛押注智能體?
- 7 百億AI芯片訂單,瘋狂傾銷中東?
- 8 Robotaxi新消息密集釋放,量產(chǎn)元年誰在領(lǐng)跑?
- 9 格斗大賽出圈!人形機(jī)器人致命短板曝光:頭腦過于簡(jiǎn)單
- 10 一文看懂視覺語言動(dòng)作模型(VLA)及其應(yīng)用