您的足迹:首页 > Hbase >Spark读取Hbase中的数据

Spark读取Hbase中的数据

 大家可能都知道很熟悉Spark的两种常见的数据读取方式(存放到RDD中):(1)、调用parallelize函数直接从集合中获取数据,并存入RDD中;Java版本如下:

JavaRDD<Integer> myRDD = sc.parallelize(Arrays.asList(1,2,3));

Scala版本如下:

val myRDD= sc.parallelize(List(1,2,3))

这种方式很简单,很容易就可以将一个集合中的数据变成RDD的初始化值;更常见的是(2)、从文本中读取数据到RDD中,这个文本可以是纯文本文件、可以是sequence文件;可以存放在本地(file://)、可以存放在HDFS(hdfs://)上,还可以存放在S3上。其实对文件来说,Spark支持Hadoop所支持的所有文件类型和文件存放位置。Java版如下:

/////////////////////////////////////////////////////////////////////
 User: 过往记忆
 Date: 14-6-29
 Time: 23:59
 bolg: http://www.iteblog.com
 本文地址:http://www.iteblog.com/archives/1051
 过往记忆博客,专注于hadoop、hive、spark、shark、flume的技术博客,大量的干货
 过往记忆博客微信公共帐号:iteblog_hadoop
/////////////////////////////////////////////////////////////////////
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;

SparkConf conf = new SparkConf().setAppName("Simple Application");
JavaSparkContext sc = new JavaSparkContext(conf);
sc.addFile("wyp.data");
JavaRDD<String> lines = sc.textFile(SparkFiles.get("wyp.data"));

Scala版本如下:

import org.apache.spark.SparkContext
import org.apache.spark.SparkConf

val conf = new SparkConf().setAppName("Simple Application")
val sc = new SparkContext(conf)
sc.addFile("spam.data")
val inFile = sc.textFile(SparkFiles.get("spam.data"))

在实际情况下,我们需要的数据可能不是简单的存放在HDFS文本中,我们需要的数据可能就存放在Hbase中,那么我们如何用Spark来读取Hbase中的数据呢?本文的所有测试是基于Hadoop 2.2.0、Hbase 0.98.2、Spark 0.9.1,不同版本可能代码的编写有点不同。本文只是简单地用Spark来读取Hbase中的数据,如果需要对Hbase进行更强的操作,本文可能不能帮你。话不多说,Spark操作Hbase的核心的Java版本代码如下:

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableInputFormat;
import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos;
import org.apache.hadoop.hbase.util.Base64;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaSparkContext;

/////////////////////////////////////////////////////////////////////
 User: 过往记忆
 Date: 14-6-29
 Time: 23:59
 bolg: http://www.iteblog.com
 本文地址:http://www.iteblog.com/archives/1051
 过往记忆博客,专注于hadoop、hive、spark、shark、flume的技术博客,大量的干货
 过往记忆博客微信公共帐号:iteblog_hadoop
/////////////////////////////////////////////////////////////////////

JavaSparkContext sc = new JavaSparkContext(master, "hbaseTest",
                System.getenv("SPARK_HOME"), System.getenv("JARS"));

Configuration conf = HBaseConfiguration.create();
Scan scan = new Scan();
scan.addFamily(Bytes.toBytes("cf"));
scan.addColumn(Bytes.toBytes("cf"), Bytes.toBytes("airName"));

try {
        String tableName = "flight_wap_order_log";
        conf.set(TableInputFormat.INPUT_TABLE, tableName);
        ClientProtos.Scan proto = ProtobufUtil.toScan(scan);
        String ScanToString = Base64.encodeBytes(proto.toByteArray());
        conf.set(TableInputFormat.SCAN, ScanToString);

        JavaPairRDD<ImmutableBytesWritable, Result> myRDD = 
                sc.newAPIHadoopRDD(conf,  TableInputFormat.class, 
                ImmutableBytesWritable.class, Result.class);

catch (Exception e) {
            e.printStackTrace();
}

这样本段代码段是从Hbase表名为flight_wap_order_log的数据库中读取cf列簇上的airName一列的数据,这样我们就可以对myRDD进行相应的操作:

System.out.println(myRDD.count());

本段代码需要在pom.xml文件加入以下依赖:

<dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-core_2.10</artifactId>
        <version>0.9.1</version>
</dependency>

<dependency>
        <groupId>org.apache.hbase</groupId>
        <artifactId>hbase</artifactId>
        <version>0.98.2-hadoop2</version>
</dependency>

<dependency>
        <groupId>org.apache.hbase</groupId>
        <artifactId>hbase-client</artifactId>
        <version>0.98.2-hadoop2</version>
</dependency>

<dependency>
        <groupId>org.apache.hbase</groupId>
        <artifactId>hbase-common</artifactId>
        <version>0.98.2-hadoop2</version>
</dependency>

<dependency>
        <groupId>org.apache.hbase</groupId>
        <artifactId>hbase-server</artifactId>
        <version>0.98.2-hadoop2</version>
</dependency>

Scala版如下:

import org.apache.spark._
import org.apache.spark.rdd.NewHadoopRDD
import org.apache.hadoop.hbase.{HBaseConfiguration, HTableDescriptor}
import org.apache.hadoop.hbase.client.HBaseAdmin
import org.apache.hadoop.hbase.mapreduce.TableInputFormat

/////////////////////////////////////////////////////////////////////
 User: 过往记忆
 Date: 14-6-29
 Time: 23:59
 bolg: http://www.iteblog.com
 本文地址:http://www.iteblog.com/archives/1051
 过往记忆博客,专注于hadoop、hive、spark、shark、flume的技术博客,大量的干货
 过往记忆博客微信公共帐号:iteblog_hadoop
/////////////////////////////////////////////////////////////////////

object HBaseTest {
  def main(args: Array[String]) {
    val sc = new SparkContext(args(0), "HBaseTest",
      System.getenv("SPARK_HOME"), SparkContext.jarOfClass(this.getClass))

    val conf = HBaseConfiguration.create()
    conf.set(TableInputFormat.INPUT_TABLE, args(1))

    val hBaseRDD = sc.newAPIHadoopRDD(conf, classOf[TableInputFormat], 
      classOf[org.apache.hadoop.hbase.io.ImmutableBytesWritable],
      classOf[org.apache.hadoop.hbase.client.Result])

    hBaseRDD.count()

    System.exit(0)
  }
}

我们需要在加入如下依赖:

libraryDependencies ++= Seq(
        "org.apache.spark" % "spark-core_2.10" % "0.9.1",
        "org.apache.hbase" % "hbase" % "0.98.2-hadoop2",
        "org.apache.hbase" % "hbase-client" % "0.98.2-hadoop2",
        "org.apache.hbase" % "hbase-common" % "0.98.2-hadoop2",
        "org.apache.hbase" % "hbase-server" % "0.98.2-hadoop2"
)

在测试的时候,需要配置好Hbase、Hadoop环境,否则程序会出现问题,特别是让程序找到Hbase-site.xml配置文件。

完整代码:

package com.iteblog.spark;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableInputFormat;
import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos;
import org.apache.hadoop.hbase.util.Base64;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.PairFunction;
import scala.Serializable;
import scala.Tuple2;

import java.io.IOException;
import java.util.List;

/**
 * User: iteblog
 * Date: 14-6-27
 * Time: 下午5:18
 *blog: http://www.iteblog.com
 *
 * Usage: bin/spark-submit --master yarn-cluster --class com.iteblog.spark.SparkFromHbase
 * --jars /home/q/hbase/hbase-0.96.0-hadoop2/lib/htrace-core-2.01.jar,
 * /home/q/hbase/hbase-0.96.0-hadoop2/lib/hbase-common-0.96.0-hadoop2.jar,
 * /home/q/hbase/hbase-0.96.0-hadoop2/lib/hbase-client-0.96.0-hadoop2.jar,
 * /home/q/hbase/hbase-0.96.0-hadoop2/lib/hbase-protocol-0.96.0-hadoop2.jar,
 * /home/q/hbase/hbase-0.96.0-hadoop2/lib/hbase-server-0.96.0-hadoop2.jar
 * ./spark_2.10-1.0.jar
 */
public class SparkFromHbase implements Serializable {

    /**
     * copy from org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil
     *
     * @param scan
     * @return
     * @throws IOException
     */
    String convertScanToString(Scan scan) throws IOException {
        ClientProtos.Scan proto = ProtobufUtil.toScan(scan);
        return Base64.encodeBytes(proto.toByteArray());
    }

    public void start() {
        SparkConf sparkConf = new SparkConf();
        JavaSparkContext sc = new JavaSparkContext(sparkConf);


        Configuration conf = HBaseConfiguration.create();

        Scan scan = new Scan();
        //scan.setStartRow(Bytes.toBytes("195861-1035177490"));
        //scan.setStopRow(Bytes.toBytes("195861-1072173147"));
        scan.addFamily(Bytes.toBytes("cf"));
        scan.addColumn(Bytes.toBytes("cf"), Bytes.toBytes("col_1"));

        try {

            String tableName = "wyp";
            conf.set(TableInputFormat.INPUT_TABLE, tableName);
            conf.set(TableInputFormat.SCAN, convertScanToString(scan));


            JavaPairRDD<ImmutableBytesWritable, Result> hBaseRDD = sc.newAPIHadoopRDD(conf,
                    TableInputFormat.class, ImmutableBytesWritable.class,
                    Result.class);

            JavaPairRDD<String, Integer> levels = hBaseRDD.mapToPair(
                    new PairFunction<Tuple2<ImmutableBytesWritable, Result>, String, Integer>() {
                        @Override
                        public Tuple2<String, Integer> call(Tuple2<ImmutableBytesWritable, Result> immutableBytesWritableResultTuple2) throws Exception {
                            byte[] o = immutableBytesWritableResultTuple2._2().getValue(Bytes.toBytes("cf"), Bytes.toBytes("col_1"));
                            if (o != null) {
                                return new Tuple2<String, Integer>(new String(o), 1);
                            }
                            return null;
                        }
                    });

            JavaPairRDD<String, Integer> counts = levels.reduceByKey(
                    new Function2<Integer, Integer, Integer>() {
                        @Override
                        public Integer call(Integer i1, Integer i2) {
                            return i1 + i2;
                        }
                    });

            List<Tuple2<String, Integer>> output = counts.collect();
            for (Tuple2 tuple : output) {
                System.out.println(tuple._1() + ": " + tuple._2());
            }

            sc.stop();

        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    public static void main(String[] args) throws InterruptedException {
        new SparkFromHbase().start();
        System.exit(0);
    }
}
本博客所有文章如无特别注明均为原创。作者:数据为王复制或转载请以超链接形式注明转自 数据为王
原文地址《Spark读取Hbase中的数据

相关推荐


  • blogger

发表评论

路人甲 表情
看不清楚?点图切换 Ctrl+Enter快速提交

网友评论(0)