您的足迹:首页 > SparkSQL >【原创】spark sql 基础(持续更新-宋亚飞)

【原创】spark sql 基础(持续更新-宋亚飞)

以下内容是我对apache spark官网翻译总结,如有错误之处,敬请提出,感激不尽。

SparkSQL初始化

Java API

SparkConf sparkConf = new SparkConf().setAppName("JavaSparkSQL");	JavaSparkContext ctx = new JavaSparkContext(sparkConf);	  SQLContext sqlContext = new SQLContext(ctx);

启动Spark SQL CLI

1、将hive的配置文件hive-site.xml放到$spark_home的conf/下。

2、在spark的spark-env.sh中加入如下配置

export HADOOP_CONF_DIR=/usr/local/hadoop-2.7.0/etc/hadoop

3、启动命令:./bin/spark-sql启动。

Spark-shell on yarn

命令:./bin/spark-shell --master yarn-client --executor-memory 1g --num-executors 10

注意:这里的–master必须使用yarn-client模式,如果指定yarn-cluster,则会报错:

Error: Cluster deploy mode is not applicable to Spark shells.

因为spark-shell作为一个与用户交互的命令行,必须将Driver运行在本地,而不是yarn上。

其中的参数与提交Spark应用程序到yarn上用法一样。

在RM管理界面可以看到spark-sql作为一个长服务的任务运行这yarn上。

Spark-sql on yarn

运行这个命令之前,需要把hive-site.xml放入$spark_home/conf下,把mysql-connector-java-5.1.15-bin.jar放入$spark_home/lib下。

命令:./bin/spark-sql --master yarn-client --executor-memory 1g --num-executors 10

Spark-submit whith hive提交作业

./bin/spark-submit --class testHive.SparkSQLHiveOnYarn --master yarn-cluster /tmp/sparksql.jar

--master 参数需指定为yarn-cluster才能使用yarn分布式资源。

--master 参数值为local[*]时为本地模式执行,此时不会被yarn管理。

说明一下上面使用spark-submit提交的命令:

--master yarn-cluster  //指定以yarn-cluster模式运行,关于yarn-cluster和yarn-client的区别,在之前的文章中提到过
--driver-memory 4G  //指定Driver使用的内存为4G,
//如果太小的话,会报错:Exception: java.lang.OutOfMemoryError thrown from the UncaughtExceptionHandler in thread “Driver”
--driver-java-options “-XX:MaxPermSize=1G”   //指定Driver程序JVM参数
–files $HIVE_HOME/conf/hive-site.xml    //将Hive的配置文件添加到Driver和Executor的classpath中
--jars $HIVE_HOME/lib/mysql-connector-java-5.1.15-bin.jar,….    //将Hive依赖的jar包添加到Driver和Executor的classpath中
//需要依赖的jar包有:mysql-connector-java-5.1.15-bin.jar、datanucleus-api-jdo-3.2.6.jar、datanucleus-core-3.2.10.jar、datanucleus-rdbms-3.2.9.jar、guava-12.0.1.jar

注意:由于Driver和Executor需要访问Hive的元数据库,而Driver和Executor被分配到哪台机器上是不固定的,所以需要授权,使集群上所有机器都有操作Hive元数据库的权限。

运行Thrift JDBC/ODBC 服务

Thrift JDBC/ODBC server 的实现与hive1.2.1的hiveserver2一致。
可以使用spark或hive1.2.1中的beeline测试jdbc/odbc。

启动jdbc/odbc服务

使用以下命令,启动jdbc/odbc 服务

./sbin/start-thriftserver.sh

这个脚本接收所有bin/spark-submit的命令行操作,后面加上--hiveconf来指定hive属性。你可以运行脚本thriftserver.sh --help列出所有操纵参数。默认此服务监听localhost:10000端口,不过你可以通过环境变量覆盖此设置。

export HIVE_SERVER2_THRIFT_PORT=<listening-port>
export HIVE_SERVER2_THRIFT_BIND_HOST=<listening-host>
./sbin/start-thriftserver.sh \
--master <master-uri> \
...

或系统属性

./sbin/start-thriftserver.sh \	  
--hiveconf hive.server2.thrift.port=<listening-port> \	  
--hiveconf hive.server2.thrift.bind.host=<listening-host> \	  
--master <master-uri>	  
...

现在你可以使用beeline连接jdbc/odbc了。命令如下:

./bin/beeline

在beeline中连接jdbc/odbc服务

beeline> !connect jdbc:hive2://localhost:10000

执行上面命令后,beeline会要求输入用户名密码,在非安全模式下,只需要输入用户名即可,密码为空。在安全模式下,请参考beeline的文档

Thrift jdbc 服务还提供了通过http发送thrift RPC信息,使用以下配置开启HTTP模式。配置文件为/conf下的hive-site.xml

hive.server2.transport.mode - 设置为 http
hive.server2.thrift.http.port - 监听的HTTP端口号; 默认为 10001
hive.server2.http.endpoint - HTTP endpoint;默认为cliservice

 

以下是测试beeline在HTTP模式下连接JDBC/ODBC的服务

beeline> !connect jdbc:hive2://<host>:<port>/<database>?hive.server2.transport.mode=http;hive.server2.thrift.http.path=<http_endpoint>

核心类

DataFrame

此类为spark1.3.0及以后版本提供,在org.apache.spark.sql包下,实现了java.io.Serializable接口。

DataFrame类简化了以前使用RDD的操作。举个例子,有时我们需要用到一些额外的结构化数据(比如做IP和地址的映射),通常这样的数据会存在MySQL,而访问的方式有两种:一是每个worker远程去检索数据库,弊端是耗费额外的网络I/O资源;二是使用JdbcRDD的API转化为RDD格式,然后编写繁复的函数去实现检索,显然要写更多的代码。而现在,Spark提供了一种新的选择,一行代码就能实现从MySQL到DataFrame的转化,并且支持SQL查询。

使用:SQLContext sqlContext = ...  // An existing SQLContextDataFrame df =  	sqlContext.sql("SELECT * FROM table") 

数据格式和来源

现代的应用程序通常需要收集和分析来自各种不同数据源的数据,而DataFrame与生俱来就支持读取最流行的格式,包括JSON文件、Parquet文件和Hive表格。DataFrame还支持从多种类型的文件系统中读取,比如本地文件系统、分布式文件系统(HDFS)以及云存储(S3)。同时,配合JDBC,它还可以读取外部关系型数据库系统。此外,通过Spark SQL的外部数据源(external data sources) API,DataFrames可以更广泛地支持任何第三方数据格式和数据源。值得一提的是,当下的第三方扩展已经包含Avro、CSV、ElasticSearch和Cassandra。

DataFrame和RDD区别

升级到spark1.3.0以后,最大的改变是SchemaRDD已经改名为DataFrame。主要原因是DataFrame不再从RDD直接继承。RDDS应该专注于提供他们自己的实现,但DataFrame任然可以通过.rdd方式转换为RDDs。

在scala中,为了兼容性,scala提供了一alias用于从SchemaRDD转换为DataFrame。

以下是DataFrame和RDD的主要区别。

图片1.png

支持的数据类型

Json格式

如:scala> val df = sqlContext.jsonFile(“/path/to/your/jsonfile”) 

----此方式在spark1.4.0中已废弃。替换为:

DataFrame df = sqlContext.read().json("examples/src/main/resources/people.json"); 

parquet格式

val people = sqlContext.read.parquet("...")  // in Scala 	DataFrame people = sqlContext.read().parquet("...")  // in Java 

DataFrame方法

// 输出表结构 	
df.printSchema() 
// 选择所有年龄大于21岁的人,只保留name字段 	
df.filter(df(“age”) > 21).select(“name”).show() 
// 选择name,并把age字段自增 	df.select(“name”, df(“age”) + 1).show()
 // 按年龄分组计数 	
df.groupBy(“age”).count().show() 
// 左联表(注意是3个等号!) 	
df.join(df2, df(“name”) === df2(“name”), “left”).show() 


ToDF

Returns a new DataFrame with columns renamed.  	This can be quite convenient in conversion from a RDD 	 of tuples into a DataFrame with meaningful names. For example: 	   val rdd: RDD[(Int, String)] = ... 	   rdd.toDF()  // this implicit conversion creates a DataFrame with column name _1 and _2 	   rdd.toDF("id", "name")  // this creates a DataFrame with column name "id" and "name" 

虚拟表

将DataFrame对象转为虚拟表

df.registerTempTable(“people”) 	sqlContext.sql(“select age, count(*) from people group by age”).show() 

其实,以上语句就等同于

df.groupBy("age").count().show() 

DataFrameReader

此类主要实现将从外部存储系统加载数据如:本地文件系统、分布式文件系统(HDFS)、key-value存储、云存储(S3)、etc。同时,配合JDBC,它还可以读取外部关系型数据库系统。此外,通过Spark SQL的外部数据源(external data sources) API,DataFrames可以更广泛地支持任何第三方数据格式和数据源。值得一提的是,当下的第三方扩展已经包含Avro、CSV、ElasticSearch和Cassandra。

Since spark1.4.0

MySQL

除了JSON之外,DataFrame现在已经能支持MySQL、Hive、HDFS、PostgreSQL等外部数据源,而对关系数据库的读取,是通过jdbc实现的。

对于不同的关系数据库,必须在SPARK_CLASSPATH变量中加入对应connector的jar包,比如希望连接MySQL的话应该这么启动spark-shell:

SPARK_CLASSPATH=mysql-connector-java-x.x.x-bin.jar spark-shell 

下面要将一个MySQL表转化为DataFrame对象:

Val jdbcDF = sqlContext.load(“jdbc”, Map(“url” -> “jdbc:mysql://localhost:3306/your_database?user=your_user&password=your_password”, “dbtable” -> “your_table”)) 

Hive

Spark提供了一个HiveContext的上下文,此类是SQLContext的子类,但从作用上来说,sqlContext也支持Hive数据源。需要在部署Spark的时候加入Hive选项,并把已有的hive-site.xml文件移动到$SPARK_HOME/conf路径下,我们就可以直接用Spark查询包含已有元数据的Hive表了:

sqlContext.sql(“select count(*) from hive_people”).show() 

持久表saveAsTable

当使用hivecontext时,DataFrame也可以使用命令saveAsTable保存为持久表。不同于registerTempTable命令,saveAsTable会持久化DataFrame里的数据,并在hivemetastore里创建一个指向链接。只要你连接的是同一个metastore,即使spark程序重启了,持久表也会一直存在。在sqlContext中,可以使用表名称的方式调用持久表的方法。

默认情况下,saveAsTable 会自动创建自己的“管理表”来存储持久表的元数据,当表删除时,管理表也将管理的元数据链接删除。

表分区

表分区是像hive这样的系统优化常用的方法。在分区表中,数据通常存在不同的目录中。每一个列值是一个分区目录。Parquet 格式的数据可以自动发现和自动推断分区信息。

如下:

path

└── to

    └── table

        ├── gender=male

        │   ├── ...

        │   │

        │   ├── country=US

        │   │   └── data.parquet

        │   ├── country=CN

        │   │   └── data.parquet

        │   └── ...

        └── gender=female

            ├── ...

            │

            ├── country=US

            │   └── data.parquet

            ├── country=CN

            │   └── data.parquet

            └── ...

Hive table

//scis an existing JavaSparkContext.HiveContext sqlContext = 	  new org.apache.spark.sql.hive.HiveContext(sc.sc); 	sqlContext.sql("CREATE TABLE IF NOT EXISTS src (key INT, value STRING)"); 	sqlContext.sql("LOAD DATA LOCAL INPATH 'examples/src/main/resources/kv1.txt' INTO TABLE src"); 	// Queries are expressed in HiveQL.Row[] results = sqlContext.sql("FROM src SELECT key, value").collect(); 

支持的hive特性

Spark SQL 支持的hive特性如下:

l Hive query statements, including:

1、SELECT 	
2、GROUP BY 	
3、ORDER BY 	
4、CLUSTER BY 	
5、SORT BY 

l All Hive operators, including:

1、Relational operators (=, ⇔, ==, <>, <, >, >=, <=, etc) 	
2、Arithmetic operators (+, -, *, /, %, etc) 	
3、Logical operators (AND, &&, OR, ||, etc) 	
4、Complex type constructors 	
5、Mathematical functions (sign, ln, cos, etc) 	
6、String functions (instr, length, printf, etc) 

l User defined functions (UDF)

l User defined aggregation functions (UDAF)

l User defined serialization formats (SerDes)

l Window functions

l Joins

1、JOIN 
2、{LEFT|RIGHT|FULL} OUTER JOIN 
3、LEFT SEMI JOIN 	
4、CROSS JOIN 

l Unions

l Sub-queries

SELECT col FROM ( SELECT a + b AS col from t1) t2

l Sampling

l Explain

l Partitioned tables including dynamic partition insertion

l View

l All Hive DDL Functions, including:

1、CREATE TABLE 	
2、CREATE TABLE AS SELECT 	
3、ALTER TABLE 

l Most Hive Data types, including:

1、TINYINT 	
2、SMALLINT 	
3、INT 	
4、BIGINT 	
5、BOOLEAN 	
6、FLOAT 	
7、DOUBLE 	
8、STRING 	
9、BINARY 	
10、TIMESTAMP 	
11、DATE 	
12、ARRAY<> 	
13、MAP<> 	
14、STRUCT<> 

不支持的hive特性

    1、桶buckets,桶是hive表分区的一种方式,目前为止,spark sql还不支持。 
    2、Hive查询元数据,不用启动作业,而sparksql查询元数据时,仍然要启动一个作业来查询。
    3、Hive支持索引,而spark sql则暂时不支持。
    4、块级位图索引和虚拟列(用于构建索引),这些sparksql还不支持。 
    5、Hive中可以自动为joins和groupbys确定reduces数量。而在sparksql中则需要手动设置并行度。设置SET spark.sql.shuffle.partitions=[num_tasks]; 
    6、数据倾斜标识,spark sql不支持hive的数据倾斜标识。
    7、Hive支持加入STREAMTABLE提示,spark SQL不支持STREAMTABLE 提示。 
    8、查询结果合并多个小文件:如果结果输出包含多个小文件,hive可以选择小文件合并到更少的大文件,以避免HDFS元数据文件过大。Spark SQL不支持。 
    9、UNION type 
    10、Unique join 
    11、Column statistics collecting:不支持列信息统计,仅支持
    12、populating the sizeInBytes field of the hive metastore.
    13、Hive Input/Output Formats
1、CLI文件格式化:当结果在CLI显示时,SparkSQL仅支持TextOutputFormat. 	
2、Hadoop archive 

 

 

本博客所有文章如无特别注明均为原创。作者:数据为王复制或转载请以超链接形式注明转自 数据为王
原文地址《【原创】spark sql 基础(持续更新-宋亚飞)

相关推荐


  • blogger

发表评论

路人甲 表情
看不清楚?点图切换 Ctrl+Enter快速提交

网友评论(0)