Spark进阶之Sparksql

What ?

  • sparksql
    • 内存列存储
    • 字节码生成技术CG
  • hive on spark
    • 底层spark执行
    • hive做 存储和sql解析优化
  • spark on hive
    • spark做 sql 优化和执行
    • hive做存储

<!-- more -->

sparksql 数据源

  • json
  • jdbc
  • hive
  • hbase

谓词下推

Why ?

Where?

How?

DataFrame

  • 通过json文件创建 DataFrame

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    val conf = new SparkConf() conf.setMaster("local").setAppName("jsonfile")
    val sc = new SparkContext(conf)
    val sqlContext = new SQLContext(sc)
    val df = sqlContext.read.json("sparksql/json")
    //val df1 = sqlContext.read.format("json").load("sparksql/json")
    df.show()
    df.printSchema()
    //select * from table
    df.select(df.col("name")).show()
    //select name from table where age>19 df.select(df.col("name"),df.col("age")).where(df.col("age").gt(19)).show() //select count(*) from table group by age df.groupBy(df.col("age")).count().show();
    /**
    * 注册临时表 */
    df.registerTempTable("jtable")
    val result = sqlContext.sql("select * from jtable") result.show()
    sc.stop()

  • 通过json格式RDD创建 DataFrame

  • 通过反射创建 DataFrame

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    SparkConf conf = new SparkConf(); conf.setMaster("local").setAppName("RDD");
    JavaSparkContext sc = new JavaSparkContext(conf);
    SQLContext sqlContext = new SQLContext(sc);
    JavaRDD<String> lineRDD = sc.textFile("sparksql/person.txt"); JavaRDD<Person> personRDD = lineRDD.map(new Function<String, Person>() {
    /**
    *
    */
    private static final long serialVersionUID = 1L;
    @Override
    public Person call(String s) throws Exception { Person p = new Person(); p.setId(s.split(",")[0]); p.setName(s.split(",")[1]); p.setAge(Integer.valueOf(s.split(",")[2]));

  • 动态创建DataFrame

    • java

      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      15
      16
      17
      18
      19
      20
      21
      22
      23
      24
      SparkConf conf = new SparkConf(); conf.setMaster("local").setAppName("rddStruct"); JavaSparkContext sc = new JavaSparkContext(conf);
      SQLContext sqlContext = new SQLContext(sc);
      JavaRDD<String> lineRDD = sc.textFile("./sparksql/person.txt"); /**
      * 转换成Row类型的RDD
      */
      JavaRDD<Row> rowRDD = lineRDD.map(new Function<String, Row>() {
      /** *
      */
      private static final long serialVersionUID = 1L;
      @Override
      public Row call(String s) throws Exception { return RowFactory.create(
      );
      } });
      String.valueOf(s.split(",")[0]), String.valueOf(s.split(",")[1]), Integer.valueOf(s.split(",")[2])
      /**
      * 动态构建DataFrame中的元数据,一般来说这里的字段可以来源自字符串,也可以来源
      于外部数据库
      */
      List<StructField> asList =Arrays.asList( DataTypes.createStructField("id", DataTypes.StringType, true), DataTypes.createStructField("name", DataTypes.StringType, true), DataTypes.createStructField("age", DataTypes.IntegerType, true)
      );
      StructType schema = DataTypes.createStructType(asList);
      DataFrame df = sqlContext.createDataFrame(rowRDD, schema);
      df.show();
      sc.stop();

    • scala

      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      val conf = new SparkConf() conf.setMaster("local").setAppName("rddStruct") val sc = new SparkContext(conf)
      val sqlContext = new SQLContext(sc)
      val lineRDD = sc.textFile("./sparksql/person.txt") val rowRDD = lineRDD.map { x => {
      val split = x.split(",")
      RowFactory.create(split(0),split(1),Integer.valueOf(split(2))) }}
      val schema = StructType(List( StructField("id",StringType,true), StructField("name",StringType,true), StructField("age",IntegerType,true)
      ))
      val df = sqlContext.createDataFrame(rowRDD, schema) df.show()
      df.printSchema()
      sc.stop()

  • 读取JDBC中的数据创建DataFrame(MySql为例)

    • scala
      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      val conf = new SparkConf() conf.setMaster("local").setAppName("mysql") val sc = new SparkContext(conf)
      val sqlContext = new SQLContext(sc)
      val reader = sqlContext.read.format("jdbc")
      reader.option("url", "jdbc:mysql://192.168.179.4:3306/spark") reader.option("driver","com.mysql.jdbc.Driver")
      reader.option("user","root")
      reader.option("password","123456")
      reader.option("dbtable", "score")
      val score = reader.load()
      score.show()
      score.registerTempTable("score")
      val result = sqlContext.sql("select person.id,person.name,score.score from person,score where person.name = score.name")
      result.show()
  • 读取parquet文件创建DataFrame

    • scala
      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      15
      16
      17
      18
      val conf = new SparkConf() conf.setMaster("local").setAppName("parquet") 
      val sc = new SparkContext(conf)
      val sqlContext = new SQLContext(sc)
      val jsonRDD = sc.textFile("sparksql/json")
      val df = sqlContext.read.json(jsonRDD)
      df.show()
      /**
      * 将DF保存为parquet文件 */
      df.write.mode(SaveMode.Overwrite).format("parquet").save("./sparksql/pa
      rquet")
      df.write.mode(SaveMode.Overwrite).parquet("./sparksql/parquet")
      /**
      * 读取parquet文件
      */
      var result = sqlContext.read.parquet("./sparksql/parquet")
      result = sqlContext.read.format("parquet").load("./sparksql/parquet")
      result.show()
      sc.stop()
  • 读取Hive中的数据加载成DataFrame

    • scala
      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      15
      16
      17
      18
      19
      20
      21
      22
      23
      24
      val conf = new SparkConf()
      conf.setAppName("HiveSource")
      val sc = new SparkContext(conf)
      /**
      * HiveContext是SQLContext的子类。
      */
      val hiveContext = new HiveContext(sc)
      hiveContext.sql("use spark")
      hiveContext.sql("drop table if exists student_infos")
      hiveContext.sql("create table if not exists student_infos (name string,age int) row format delimited fields terminated by '\t'")
      hiveContext.sql("load data local inpath '/root/test/student_infos' into table student_infos")

      hiveContext.sql("drop table if exists student_scores")
      hiveContext.sql("create table if not exists student_scores (name string,score int) row format delimited fields terminated by '\t'")
      hiveContext.sql("load data local inpath '/root/test/student_scores' into table student_scores")

      val df = hiveContext.sql("select si.name,si.age,ss.score from student_infos si,student_scores ss where si.name = ss.name")
      hiveContext.sql("drop table if exists good_student_infos")
      /**
      * 将结果写入到hive表中
      */
      df.write.mode(SaveMode.Overwrite).saveAsTable("good_student_infos")

      sc.stop()
  • 落地DataFrame,已有api

    • jdbc

      1
      2
      3
      4
      5
      6
      7
      /**
      * 将数据写入到Mysql表中
      */
      val properties = new Properties()
      properties.setProperty("user", "root")
      properties.setProperty("password", "1234")
      result.write.mode(SaveMode.Append).jdbc("jdbc:mysql://192.168.100.111:3306/spark", "result", properties)

    • parquet

      1
      2
      3
      4
      5
      /**
      * 将DF保存为parquet文件
      */
      df.write.mode(SaveMode.Overwrite).format("parquet").save("data/parquet")
      df.write.mode(SaveMode.Overwrite).parquet("data/parquet")

    • hive

      1
      2
      3
      4
      /**
      * 将结果写入到hive表中
      */
      df.write.mode(SaveMode.Overwrite).saveAsTable("good_student_infos")

测试java中以下几种情况下不被序列化的问题:

  1. 反序列化时serializable 版本号不一致时会导致不能反序列化。

  2. 子类中实现了serializable接口,父类中没有实现,父类中的变量不能被序列化,序列化后父类中的变量会得到null。 注意:父类实现serializable接口,子类没有实现serializable接口时,子类可以正常序列化

  3. 被关键字transient修饰的变量不能被序列化。

  4. 静态变量不能被序列化,属于类,不属于方法和对象,所以不能被序列化。

spark on hive

  • 在 Spark 客户端安装包下 spark-1.6.0/conf 中创建文件 hive-site.xml:

  • 配置 hive 的 metastore 路径

    1
    2
    3
    4
    5
    6
    <configuration>
    <property>
    <name>hive.metastore.uris</name>
    <value>thrift://node1:9083</value>
    </property>
    </configuration>

  • 启动Hive的metastore服务 hive --service metastore

  • 启动zookeeper集群,启动HDFS集群。

自定义函数

  • UDF

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    val conf = new SparkConf()
    conf.setMaster("local").setAppName("udf")
    val sc = new SparkContext(conf)
    val sqlContext = new SQLContext(sc);
    val rdd = sc.makeRDD(Array("zhansan", "lisi", "wangwu"))
    val rowRDD = rdd.map { x => {
    RowFactory.create(x)
    }
    }
    val schema = DataTypes.createStructType(Array(StructField("name", StringType, true)))
    val df = sqlContext.createDataFrame(rowRDD, schema)
    df.registerTempTable("user")
    // sqlContext.udf.register("StrLen",(s : String)=>{s.length()})
    // sqlContext.sql("select name ,StrLen(name) as length from user").show
    sqlContext.udf.register("StrLen", (s: String, i: Int) => {
    s.length() + i
    })
    sqlContext.sql("select name ,StrLen(name,10) as length from user").show
    sc.stop()

  • UDAF

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    65
    66
    class MyUDAF extends UserDefinedAggregateFunction {
    // 聚合操作时,所处理的数据的类型
    def bufferSchema: StructType = {
    DataTypes.createStructType(Array(DataTypes.createStructField("aaa", IntegerType, true)))
    }

    // 最终函数返回值的类型
    def dataType: DataType = {
    DataTypes.IntegerType
    }

    def deterministic: Boolean = {
    true
    }

    // 最后返回一个最终的聚合值 要和dataType的类型一一对应
    def evaluate(buffer: Row): Any = {
    buffer.getAs[Int](0)
    }

    // 为每个分组的数据执行初始化值
    def initialize(buffer: MutableAggregationBuffer): Unit = {
    buffer(0) = 0
    }

    //输入数据的类型
    def inputSchema: StructType = {
    DataTypes.createStructType(Array(DataTypes.createStructField("input", StringType, true)))
    }

    // 最后merger的时候,在各个节点上的聚合值,要进行merge,也就是合并
    def merge(buffer1: MutableAggregationBuffer, buffer2: Row): Unit = {
    buffer1(0) = buffer1.getAs[Int](0) + buffer2.getAs[Int](0)
    }

    // 每个组,有新的值进来的时候,进行分组对应的聚合值的计算
    def update(buffer: MutableAggregationBuffer, input: Row): Unit = {
    buffer(0) = buffer.getAs[Int](0) + 1
    }
    }

    object UDAF {
    def main(args: Array[String]): Unit = {
    val conf = new SparkConf()
    conf.setMaster("local").setAppName("udaf")
    val sc = new SparkContext(conf)
    val sqlContext = new SQLContext(sc)
    val rdd = sc.makeRDD(Array("zhangsan", "lisi", "wangwu", "zhangsan", "lisi"))
    val rowRDD = rdd.map { x => {
    RowFactory.create(x)
    }
    }

    val schema = DataTypes.createStructType(Array(DataTypes.createStructField("name", StringType, true)))
    val df = sqlContext.createDataFrame(rowRDD, schema)
    df.show()
    df.registerTempTable("user")

    /**
    * 注册一个udaf函数
    */
    sqlContext.udf.register("StringCount", new MyUDAF())
    sqlContext.sql("select name ,StringCount(name) from user group by name").show()
    sc.stop()
    }
    }

  • 开窗函数

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    val conf = new SparkConf()
    conf.setAppName("windowfun")
    val sc = new SparkContext(conf)
    val hiveContext = new HiveContext(sc)
    hiveContext.sql("use spark");
    hiveContext.sql("drop table if exists sales");
    hiveContext.sql("create table if not exists sales (riqi string,leibie string,jine
    Int) "
    + "row format delimited fields terminated by '\t'");
    hiveContext.sql("load data local inpath '/root/test/sales' into table sales");
    /**
    * 开窗函数格式:
    * 【 rou_number() over (partitin by XXX order by XXX) 】 */
    val result = hiveContext.sql("select riqi,leibie,jine " + "from ("
    + "select riqi,leibie,jine,"
    + "row_number() over (partition by leibie order by jine desc) rank "
    + "from sales) t "
    + "where t.rank<=3");
    result.show();
    sc.stop()

优化

  • 合适的数据列
  • 最有的数据存储格式
  • 内存的使用
Juforg wechat
欢迎您扫一扫上面的微信公众号,订阅我的博客!