What ?
- sparksql
- 内存列存储
- 字节码生成技术CG
- hive on spark
- 底层spark执行
- hive做 存储和sql解析优化
- spark on hive
- spark做 sql 优化和执行
- hive做存储
<!-- more -->
sparksql 数据源
- json
- jdbc
- hive
- hbase
谓词下推
Why ?
Where?
How?
DataFrame
-
通过json文件创建 DataFrame
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15val conf = new SparkConf() conf.setMaster("local").setAppName("jsonfile")
val sc = new SparkContext(conf)
val sqlContext = new SQLContext(sc)
val df = sqlContext.read.json("sparksql/json")
//val df1 = sqlContext.read.format("json").load("sparksql/json")
df.show()
df.printSchema()
//select * from table
df.select(df.col("name")).show()
//select name from table where age>19 df.select(df.col("name"),df.col("age")).where(df.col("age").gt(19)).show() //select count(*) from table group by age df.groupBy(df.col("age")).count().show();
/**
* 注册临时表 */
df.registerTempTable("jtable")
val result = sqlContext.sql("select * from jtable") result.show()
sc.stop() -
通过json格式RDD创建 DataFrame
-
通过反射创建 DataFrame
1
2
3
4
5
6
7
8
9
10SparkConf conf = new SparkConf(); conf.setMaster("local").setAppName("RDD");
JavaSparkContext sc = new JavaSparkContext(conf);
SQLContext sqlContext = new SQLContext(sc);
JavaRDD<String> lineRDD = sc.textFile("sparksql/person.txt"); JavaRDD<Person> personRDD = lineRDD.map(new Function<String, Person>() {
/**
*
*/
private static final long serialVersionUID = 1L;
public Person call(String s) throws Exception { Person p = new Person(); p.setId(s.split(",")[0]); p.setName(s.split(",")[1]); p.setAge(Integer.valueOf(s.split(",")[2])); -
动态创建DataFrame
-
java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24SparkConf conf = new SparkConf(); conf.setMaster("local").setAppName("rddStruct"); JavaSparkContext sc = new JavaSparkContext(conf);
SQLContext sqlContext = new SQLContext(sc);
JavaRDD<String> lineRDD = sc.textFile("./sparksql/person.txt"); /**
* 转换成Row类型的RDD
*/
JavaRDD<Row> rowRDD = lineRDD.map(new Function<String, Row>() {
/** *
*/
private static final long serialVersionUID = 1L;
@Override
public Row call(String s) throws Exception { return RowFactory.create(
);
} });
String.valueOf(s.split(",")[0]), String.valueOf(s.split(",")[1]), Integer.valueOf(s.split(",")[2])
/**
* 动态构建DataFrame中的元数据,一般来说这里的字段可以来源自字符串,也可以来源
于外部数据库
*/
List<StructField> asList =Arrays.asList( DataTypes.createStructField("id", DataTypes.StringType, true), DataTypes.createStructField("name", DataTypes.StringType, true), DataTypes.createStructField("age", DataTypes.IntegerType, true)
);
StructType schema = DataTypes.createStructType(asList);
DataFrame df = sqlContext.createDataFrame(rowRDD, schema);
df.show();
sc.stop(); -
scala
1
2
3
4
5
6
7
8
9
10val conf = new SparkConf() conf.setMaster("local").setAppName("rddStruct") val sc = new SparkContext(conf)
val sqlContext = new SQLContext(sc)
val lineRDD = sc.textFile("./sparksql/person.txt") val rowRDD = lineRDD.map { x => {
val split = x.split(",")
RowFactory.create(split(0),split(1),Integer.valueOf(split(2))) }}
val schema = StructType(List( StructField("id",StringType,true), StructField("name",StringType,true), StructField("age",IntegerType,true)
))
val df = sqlContext.createDataFrame(rowRDD, schema) df.show()
df.printSchema()
sc.stop()
-
-
读取JDBC中的数据创建DataFrame(MySql为例)
- scala
1
2
3
4
5
6
7
8
9
10
11
12val conf = new SparkConf() conf.setMaster("local").setAppName("mysql") val sc = new SparkContext(conf)
val sqlContext = new SQLContext(sc)
val reader = sqlContext.read.format("jdbc")
reader.option("url", "jdbc:mysql://192.168.179.4:3306/spark") reader.option("driver","com.mysql.jdbc.Driver")
reader.option("user","root")
reader.option("password","123456")
reader.option("dbtable", "score")
val score = reader.load()
score.show()
score.registerTempTable("score")
val result = sqlContext.sql("select person.id,person.name,score.score from person,score where person.name = score.name")
result.show()
- scala
-
读取parquet文件创建DataFrame
- scala
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18val conf = new SparkConf() conf.setMaster("local").setAppName("parquet")
val sc = new SparkContext(conf)
val sqlContext = new SQLContext(sc)
val jsonRDD = sc.textFile("sparksql/json")
val df = sqlContext.read.json(jsonRDD)
df.show()
/**
* 将DF保存为parquet文件 */
df.write.mode(SaveMode.Overwrite).format("parquet").save("./sparksql/pa
rquet")
df.write.mode(SaveMode.Overwrite).parquet("./sparksql/parquet")
/**
* 读取parquet文件
*/
var result = sqlContext.read.parquet("./sparksql/parquet")
result = sqlContext.read.format("parquet").load("./sparksql/parquet")
result.show()
sc.stop()
- scala
-
读取Hive中的数据加载成DataFrame
- scala
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24val conf = new SparkConf()
conf.setAppName("HiveSource")
val sc = new SparkContext(conf)
/**
* HiveContext是SQLContext的子类。
*/
val hiveContext = new HiveContext(sc)
hiveContext.sql("use spark")
hiveContext.sql("drop table if exists student_infos")
hiveContext.sql("create table if not exists student_infos (name string,age int) row format delimited fields terminated by '\t'")
hiveContext.sql("load data local inpath '/root/test/student_infos' into table student_infos")
hiveContext.sql("drop table if exists student_scores")
hiveContext.sql("create table if not exists student_scores (name string,score int) row format delimited fields terminated by '\t'")
hiveContext.sql("load data local inpath '/root/test/student_scores' into table student_scores")
val df = hiveContext.sql("select si.name,si.age,ss.score from student_infos si,student_scores ss where si.name = ss.name")
hiveContext.sql("drop table if exists good_student_infos")
/**
* 将结果写入到hive表中
*/
df.write.mode(SaveMode.Overwrite).saveAsTable("good_student_infos")
sc.stop()
- scala
-
落地DataFrame,已有api
-
jdbc
1
2
3
4
5
6
7/**
* 将数据写入到Mysql表中
*/
val properties = new Properties()
properties.setProperty("user", "root")
properties.setProperty("password", "1234")
result.write.mode(SaveMode.Append).jdbc("jdbc:mysql://192.168.100.111:3306/spark", "result", properties) -
parquet
1
2
3
4
5/**
* 将DF保存为parquet文件
*/
df.write.mode(SaveMode.Overwrite).format("parquet").save("data/parquet")
df.write.mode(SaveMode.Overwrite).parquet("data/parquet") -
hive
1
2
3
4/**
* 将结果写入到hive表中
*/
df.write.mode(SaveMode.Overwrite).saveAsTable("good_student_infos")
-
测试java中以下几种情况下不被序列化的问题:
-
反序列化时serializable 版本号不一致时会导致不能反序列化。
-
子类中实现了serializable接口,父类中没有实现,父类中的变量不能被序列化,序列化后父类中的变量会得到null。 注意:父类实现serializable接口,子类没有实现serializable接口时,子类可以正常序列化
-
被关键字transient修饰的变量不能被序列化。
-
静态变量不能被序列化,属于类,不属于方法和对象,所以不能被序列化。
spark on hive
-
在 Spark 客户端安装包下 spark-1.6.0/conf 中创建文件 hive-site.xml:
-
配置 hive 的 metastore 路径
1
2
3
4
5
6<configuration>
<property>
<name>hive.metastore.uris</name>
<value>thrift://node1:9083</value>
</property>
</configuration> -
启动Hive的metastore服务
hive --service metastore
-
启动zookeeper集群,启动HDFS集群。
自定义函数
-
UDF
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19val conf = new SparkConf()
conf.setMaster("local").setAppName("udf")
val sc = new SparkContext(conf)
val sqlContext = new SQLContext(sc);
val rdd = sc.makeRDD(Array("zhansan", "lisi", "wangwu"))
val rowRDD = rdd.map { x => {
RowFactory.create(x)
}
}
val schema = DataTypes.createStructType(Array(StructField("name", StringType, true)))
val df = sqlContext.createDataFrame(rowRDD, schema)
df.registerTempTable("user")
// sqlContext.udf.register("StrLen",(s : String)=>{s.length()})
// sqlContext.sql("select name ,StrLen(name) as length from user").show
sqlContext.udf.register("StrLen", (s: String, i: Int) => {
s.length() + i
})
sqlContext.sql("select name ,StrLen(name,10) as length from user").show
sc.stop() -
UDAF
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66class MyUDAF extends UserDefinedAggregateFunction {
// 聚合操作时,所处理的数据的类型
def bufferSchema: StructType = {
DataTypes.createStructType(Array(DataTypes.createStructField("aaa", IntegerType, true)))
}
// 最终函数返回值的类型
def dataType: DataType = {
DataTypes.IntegerType
}
def deterministic: Boolean = {
true
}
// 最后返回一个最终的聚合值 要和dataType的类型一一对应
def evaluate(buffer: Row): Any = {
buffer.getAs[Int](0)
}
// 为每个分组的数据执行初始化值
def initialize(buffer: MutableAggregationBuffer): Unit = {
buffer(0) = 0
}
//输入数据的类型
def inputSchema: StructType = {
DataTypes.createStructType(Array(DataTypes.createStructField("input", StringType, true)))
}
// 最后merger的时候,在各个节点上的聚合值,要进行merge,也就是合并
def merge(buffer1: MutableAggregationBuffer, buffer2: Row): Unit = {
buffer1(0) = buffer1.getAs[Int](0) + buffer2.getAs[Int](0)
}
// 每个组,有新的值进来的时候,进行分组对应的聚合值的计算
def update(buffer: MutableAggregationBuffer, input: Row): Unit = {
buffer(0) = buffer.getAs[Int](0) + 1
}
}
object UDAF {
def main(args: Array[String]): Unit = {
val conf = new SparkConf()
conf.setMaster("local").setAppName("udaf")
val sc = new SparkContext(conf)
val sqlContext = new SQLContext(sc)
val rdd = sc.makeRDD(Array("zhangsan", "lisi", "wangwu", "zhangsan", "lisi"))
val rowRDD = rdd.map { x => {
RowFactory.create(x)
}
}
val schema = DataTypes.createStructType(Array(DataTypes.createStructField("name", StringType, true)))
val df = sqlContext.createDataFrame(rowRDD, schema)
df.show()
df.registerTempTable("user")
/**
* 注册一个udaf函数
*/
sqlContext.udf.register("StringCount", new MyUDAF())
sqlContext.sql("select name ,StringCount(name) from user group by name").show()
sc.stop()
}
} -
开窗函数
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20val conf = new SparkConf()
conf.setAppName("windowfun")
val sc = new SparkContext(conf)
val hiveContext = new HiveContext(sc)
hiveContext.sql("use spark");
hiveContext.sql("drop table if exists sales");
hiveContext.sql("create table if not exists sales (riqi string,leibie string,jine
Int) "
+ "row format delimited fields terminated by '\t'");
hiveContext.sql("load data local inpath '/root/test/sales' into table sales");
/**
* 开窗函数格式:
* 【 rou_number() over (partitin by XXX order by XXX) 】 */
val result = hiveContext.sql("select riqi,leibie,jine " + "from ("
+ "select riqi,leibie,jine,"
+ "row_number() over (partition by leibie order by jine desc) rank "
+ "from sales) t "
+ "where t.rank<=3");
result.show();
sc.stop()
优化
- 合适的数据列
- 最有的数据存储格式
- 内存的使用