自建Spark + 腾讯云COS 数据源

第一步先编译腾讯云提供的Hadoop 插件jar hadoop-cos-2.7.2

1
2
3
4
git clone  https://github.com/tencentyun/cos-java-sdk-hadoop-v4
cd cos-java-sdk-hadoop-v4
运行以下命令进行编译,获取 target 目录下的 cos_api-4.2.jar:
mvn clean package -Dmaven.test.skip=true
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
	git clone  https://github.com/tencentyun/hadoop-cosn-v4
cd hadoop-cosn-v4
因为 cosn 依赖 SDK,请将上一步编译的 cos_api-4.2.jar 拷贝到 src/main/resources 下,然后运行以下命令进行编译,获取 target 目录下的 hadoop-cos-2.7.2.jar:
mvn clean package -Dmaven.test.skip=true
```

搭建你自己是Spark 工程。我这里使用的是Spark 2.2.0
将获取的hadoop-cos-2.7.2.jar 手动添加到工程 或者 放在自己的maven私服里面加入都可以。我这里使用的自己的私服。


<dependency>
<groupId>com.qcloud.hadoop-cos</groupId>
<artifactId>hadoop-cos</artifactId>
<version>2.7.2</version>
</dependency>

<dependency>
<groupId>com.qcloud</groupId>
<artifactId>cos_api</artifactId>
<version>4.2</version>
</dependency>
编写Spark 测试类CosTest:
object CosTest extends Logging {
  def main(args: Array[String]): Unit = {
	val conf = new SparkConf().setAppName("CosTest")
	conf.setMaster("local[1]")
	  .set("spark.executor.memory", "8g")
	  .set("spark.local.dir", "spark_dir")
	  .set("spark.broadcast.compress", "true")
	  .set("spark.rdd.compress", "true")
	  .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
	  .set("spark.hadoop.fs.cos.userinfo.appid", "1252448703")
	  .set("spark.hadoop.fs.cos.userinfo.secretId", "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx")
	  .set("spark.hadoop.fs.cos.userinfo.secretKey", "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx")
	  .set("spark.hadoop.fs.cosn.impl", "org.apache.hadoop.fs.cosnative.NativeCosFileSystem")
	  .set("spark.hadoop.fs.cos.userinfo.region", "tj")
	AppUtils.execute((cmd: CommandLine) => {
	  (true, "CosTest test")
	})(() => conf)(f = (cmd: CommandLine, spark: SparkSession) => {

	  val rdd = spark.sparkContext.textFile("cosn://bucketName/***.csv")
	  val count = rdd.count()
	  rdd.foreach(println)
	  println(s"count: $count")
	})(args, OssTest.getClass.getSimpleName, options)
  }
}
剩下的自己慢慢玩玩吧。躁起来