最新消息:天气越来越冷,记得加一件厚衣裳

Spark访问Aliyun OSS

Spark w3sun 3613浏览 0评论

OSS简介

阿里云对象存储OSS(Object Storage Service)是阿里云提供的海量、安全、低成本、高可靠的云存储服务。其数据设计持久性不低于99.9999999999%(12个9),服务可用性(或业务连续性)不低于99.995%。

OSS具有与平台无关的RESTful API接口,您可以在任何应用、任何时间、任何地点存储和访问任意类型的数据。用户可以使用阿里云提供的API、SDK接口或者OSS迁移工具轻松地将海量数据移入或移出阿里云OSS。数据存储到阿里云OSS以后,用户可以选择标准存储(Standard)作为移动应用、大型网站、图片分享或热点音视频的主要存储方式,也可以选择成本更低、存储期限更长的低频访问存储(Infrequent Access)、归档存储(Archive)作为不经常访问数据的存储方式。

详细介绍、试用与定价请移步:https://help.aliyun.com/product/31815.html

下载文件

组里有一个和厂商的合作项目,需要我们从对方OSS上读取一批数据,当前数据量较小可以通过client进行读取(参考《下载文件》章节),但是后续数据量可能增加,清洗逻辑也可能更复杂,最终选择使用Spark进行数据读取和处理。从阿里云OSS读取数据有两种方式,一种通过OSS SDK读取,另一种是通过JindoFS SDK读取。下面将通过具体代码展示两种数据读取方式的异同。

Spark+OSS SDK

<dependencies>
  <dependency>
      <groupId>org.apache.spark</groupId>
      <artifactId>spark-core_${scala.binary.version}</artifactId>
      <version>${spark.version}</version>
  </dependency>
  <dependency>
      <groupId>com.aliyun.emr</groupId>
      <artifactId>emr-core</artifactId>
      <version>1.5.0</version>
  </dependency>
  <dependency>
      <groupId>com.aliyun.oss</groupId>
      <artifactId>aliyun-sdk-oss</artifactId>
      <version>3.10.2</version>
  </dependency>
</dependencies>

需要添加emr-core和aliyun-sdk-oss依赖,否则在读取OSS数据时会提示无法找到chema为oss的路径:

Exception in thread "main" java.io.IOException: No FileSystem for scheme: oss
    at org.apache.hadoop.fs.FileSystem.getFileSystemClass(FileSystem.java:2421)
    at org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:2428)
    at org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:88)
    at org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:2467)
    at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:2449)
    at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:367)
    at org.apache.hadoop.fs.Path.getFileSystem(Path.java:287)

直接上Demo代码:

package com.w3sun.oss

import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}

/**
  * @author: w3sun
  * @date: 2020/7/8 20:45
  * @description:
  *
  */
object AliOSSApp {
  def main(args: Array[String]): Unit = {
    //填写OSS配置信息
    val endpoint = "oss-cn-beijing.aliyuncs.com"
    //填写真实id
    val accessKeyId = "accessKeyId"
    //填写真实secret
    val accessKeySecret = "accessKeySecret"
    //填写真实bucketName
    val bucketName = "bucketName"
    //填写真实objectName
    val objectName = "objectName"

    //初始化SparkConf
    val sparkConf: SparkConf = new SparkConf()
      .setAppName("AliOSSApplication")
      .setMaster("local[*]")

    //设置OSS与Spark集成参数
    sparkConf.set("spark.hadoop.fs.oss.impl", "com.aliyun.fs.oss.nat.NativeOssFileSystem")
    sparkConf.set("spark.hadoop.mapreduce.job.run-local", "true")
    sparkConf.set("spark.hadoop.fs.oss.accessKeyId", accessKeyId)
    sparkConf.set("spark.hadoop.fs.oss.accessKeySecret", accessKeySecret)

    //初始化SparkContext
    val sparkContext: SparkContext = new SparkContext(sparkConf)

    //拼接OSS上文件的存储路径
    val input: String = s"oss://${bucketName}.${endpoint}/${objectName}"

    //从OSS中读取的数据其实也是一行一行的String字符串
    val ossData: RDD[String] = sparkContext.textFile(input)
    ossData.foreachPartition(it => {
      it.foreach(println)
    })
    sparkContext.stop()
  }
}

运行结果:

20/07/08 23:08:02 INFO NativeOssFileSystem: Opening 'oss://bucketName.oss-cn-beijing.aliyuncs.com/objectName' for reading
this is a sentence for word count.
this is a sentence for word count.
this is a sentence for word count.
this is a sentence for word count.
20/07/08 23:08:03 INFO BufferReader: Closing input stream for 'objectName'.
20/07/08 23:08:03 INFO Executor: Finished task 1.0 in stage 0.0 (TID 1). 1158 bytes result sent to driver
20/07/08 23:08:03 INFO TaskSetManager: Finished task 1.0 in stage 0.0 (TID 1) in 1462 ms on localhost (executor driver) (2/2)

  • Demo对隐私数据进行了脱敏,参考使用时对示例代码中的accessKeyId、accessKeySecret、bucketName、objectName进行真实值替换即可。
  • 本地调试运行Spark代码读写OSS数据时,需要配置SparkConf,设置spark.hadoop.mapreduce.job.run-local为true,阿里云上是通过E-MapReduce或者MaxCompute操作OSS,Spark在外网IDC机房中是否可以通过Cluster访问OSS数据由于时间原因未测试,通过测试的同学可以联系本人进行更新。

Spark+JindoFS SDK

除了通过Spark+aliyun-sdk-oss读取OSS数据外,阿里云还支持Spark+JindoFS SDK组合进行数据读取。什么是JindoFS呢?JindoFS是一种云原生的文件系统,结合OSS和本地存储,成为E-MapReduce产品的新一代存储系统,为上层计算提供了高效可靠的存储。

JindoFS 提供了块存储模式(Block)和缓存模式(Cache)的存储模式。

JindoFS 采用了本地存储和OSS的异构多备份机制,Storage Service提供了数据存储能力,首先使用OSS作为存储后端,保证数据的高可靠性,同时利用本地存储实现冗余备份,利用本地的备份可以加速数据读取;另外,JindoFS 的元数据通过本地服务Namespace Service管理,从而保证了元数据操作的性能(和HDFS元数据操作性能相似)。

JindoFS SDK是一个简单易用面向Hadoop/Spark生态的OSS客户端,为阿里云OSS提供高度优化的Hadoop FileSystem实现。通过它用户可以

  • 访问OSS(作为 OSS 客户端)
  • 访问JindoFS Cache模式集群
  • 访问JindoFS Block模式集群

即使使用JindoFS SDK(只支持Linux与Mac OS)仅仅作为OSS客户端,相对于Hadoop社区OSS客户端实现用户还可以获得更好的性能和阿里云E-MapReduce产品技术团队更专业的支持。目前支持的Hadoop版本包括Hadoop 2.7+和Hadoop 3.x,详情参见:《JindoFS SDK 使用》,其中给出了JindoFS SDK和Hadoop-OSS-SDK性能对比测试,感兴趣的读者可以参考官方文档。以下将给出Spark+JindoFS SDK读取OSS数据的具体代码,首先添加pom依赖:

<dependencies>
  <dependency>
      <groupId>org.apache.spark</groupId>
      <artifactId>spark-core_${scala.binary.version}</artifactId>
      <version>${spark.version}</version>
      <scope>${jar.scope}</scope>
  </dependency>
  <dependency>
      <groupId>org.apache.hadoop</groupId>
      <artifactId>hadoop-common</artifactId>
      <version>2.8.5</version>
  </dependency>
</dependencies>

JindoFS SDK依赖既可以通过pom方式引入,也可以单独新建一个lib文件夹引入 jindofs-sdk-x.x.x.jar ,本文采用后者。

通过JindoFS SDK请求数据时访问的URI与第一种方式略有不同,SparkConf参数也需要进行相应调整:

oss://{accessKeyId}:{accessKeySecret}@{bucketName}.{endpoint}/{objectName}
package com.w3sun.oss

import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}

/**
  * @author: w3sun
  * @date: 2020/7/8 21:45
  * @description:
  *
  */
object AliOSSApp {
  def main(args: Array[String]): Unit = {
    val endpoint = "endpoint"
    val accessKeyId = "accessKeyId"
    val accessKeySecret = "accessKeySecret"
    val bucketName = "bucketName"
    val objectName = "objectName"

    //初始化SparkConf
    val sparkConf: SparkConf = new SparkConf()
      .setAppName("AliOSSApplication")
      .setMaster("local[*]")

    //设置OSS与Spark集成参数
    sparkConf.set("fs.AbstractFileSystem.oss.impl", "com.aliyun.emr.fs.oss.OSS")
    sparkConf.set("fs.oss.impl", "com.aliyun.emr.fs.oss.JindoOssFileSystem")
    
    //初始化SparkContext
    val sparkContext: SparkContext = new SparkContext(sparkConf)

    //拼接OSS上文件的存储路径, oss://<ak>:<secret>@<bucket>.<endpoint>/
    val input: String = s"oss://${accessKeyId}:${accessKeySecret}@${bucketName}.${endpoint}/${objectName}"
    //从OSS中读取的数据其实也是一行一行的String字符串
    val ossData: RDD[String] = sparkContext.textFile(input)
    val samples: Array[String] = ossData.take(2)
    samples.toBuffer.foreach(println)
    sparkContext.stop()
  }
}

运行结果:

20/07/08 23:18:03 INFO Executor: Finished task 0.0 in stage 0.0 (TID 0). 1006 bytes result sent to driver
20/07/08 23:18:03 INFO TaskSetManager: Finished task 0.0 in stage 0.0 (TID 0) in 16592 ms on localhost (executor driver) (1/2)
this is a sentence for word count.
this is a sentence for word count.
20/07/08 23:18:03 INFO FsStats: cmd=download, src=oss://bucketName.endpoint/objectName, dst=null, size=3050859, parameter=byteReaded:2002283, time-in-ms=16144, version=2.7.1
20/07/08 23:18:03 INFO Executor: Finished task 1.0 in stage 0.0 (TID 1). 1006 bytes result sent to driver
20/07/08 23:18:03 INFO TaskSetManager: Finished task 1.0 in stage 0.0 (TID 1) in 16659 ms on localhost (executor driver) (2/2)
20/07/08 23:18:03 INFO TaskSchedulerImpl: Removed TaskSet 0.0, whose tasks have all completed, from pool 
20/07/08 23:18:03 INFO DAGScheduler: ResultStage 0 (foreachPartition at AliOSSApp.scala:42) finished in 16.866 s
20/07/08 23:18:03 INFO DAGScheduler: Job 0 finished: foreachPartition at AliOSSApp.scala:42, took 17.087912 s

比较遗憾的是IDC中的服务器如果不具备外网访问权限是无法读取Aliyun OSS上存储的数据的,在三台联网机器上运行可以读取相应内容并完成计算:

可能遇到的问题

1.Mac OS

dyld: lazy symbol binding failed: Symbol not found: ____chkstk_darwin
Referenced from: /private/var/folders/qh/676_k89s0bsg_cpj9g7042br0000gn/T/libjboot-ed376a31ece9bc6a_20200623_141838.dylib (which was built for Mac OS X 10.15)
Expected in: /usr/lib/libSystem.B.dylib
dyld: Symbol not found: ____chkstk_darwin
Referenced from: /private/var/folders/qh/676_k89s0bsg_cpj9g7042br0000gn/T/libjboot-ed376a31ece9bc6a_20200623_141838.dylib (which was built for Mac OS X 10.15)
Expected in: /usr/lib/libSystem.B.dylib

解决方法:系统升级到10.15+

2.Linux(CentOS)

invalid target release: 1.8.0_171

解决方法:Settings->Build,Execution,Deployment->Compiler->Java Compiler->Project bytecode version设置成JDK版本

参考资料

https://help.aliyun.com/document_detail/164694.html
https://help.aliyun.com/document_detail/28118.html
https://github.com/aliyun/aliyun-emapreduce-sdk

转载请注明:雪后西塘 » Spark访问Aliyun OSS

发表我的评论
取消评论

表情

Hi,您需要填写昵称和邮箱!

  • 昵称 (必填)
  • 邮箱 (必填)
  • 网址