目录
前言
随着Spark在数据处理领域越来越火,应用范围也不断扩大。相应的Source和Sink支持也越来越多,因为项目中需要从Redis中读取数据并进行操作,所以需要一种优雅的方式来操作Redis。一番搜索以后发现,除了自定义Source以外还可以通过redislabs.com出品的Connector进行操作。Redis Labs是一家位于加利福尼亚州山景城的私人计算机软件公司。它提供了一个数据库管理系统,作为开源软件或使用云计算的服务作为“NoSQL”销售。其实就是Redis的商业化公司,毕竟Spark现在这么火,许多公司想搭上顺风车。
Maven依赖
废话不多说,可以添加依赖搞起。详细的pom文件如下:
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>com.w3sun.data</groupId> <artifactId>spark-redis</artifactId> <version>1.0-SNAPSHOT</version> <properties> <maven.compiler.source>1.8</maven.compiler.source> <maven.compiler.target>1.8</maven.compiler.target> <spark.version>2.2.0</spark.version> <type.safe.version>1.3.0</type.safe.version> <joda.time.version>2.9.4</joda.time.version> <fast.json.version>1.2.10</fast.json.version> <spark.redis.version>0.3.2</spark.redis.version> <encoding>UTF-8</encoding> </properties> <repositories> <!-- list of RedisLabs repositories --> <repository> <id>SparkPackagesRepo</id> <url>http://dl.bintray.com/spark-packages/maven</url> </repository> </repositories> <dependencies> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-core_2.11</artifactId> <version>${spark.version}</version> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-streaming_2.11</artifactId> <version>${spark.version}</version> </dependency> <!--Configuration for configuration reader--> <dependency> <groupId>com.typesafe</groupId> <artifactId>config</artifactId> <version>${type.safe.version}</version> </dependency> <!--Configuration for Joda time to generate timestamp--> <dependency> <groupId>joda-time</groupId> <artifactId>joda-time</artifactId> <version>${joda.time.version}</version> </dependency> <dependency> <groupId>com.alibaba</groupId> <artifactId>fastjson</artifactId> <version>${fast.json.version}</version> </dependency> <!--ConfigConfiguration for Spark && Redis RDD generation transformation --> <dependency> <groupId>RedisLabs</groupId> <artifactId>spark-redis</artifactId> <version>${spark.redis.version}</version> </dependency> </dependencies> <build> <pluginManagement> <plugins> <plugin> <groupId>net.alchim31.maven</groupId> <artifactId>scala-maven-plugin</artifactId> <version>3.2.2</version> </plugin> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-compiler-plugin</artifactId> <version>3.5.1</version> </plugin> </plugins> </pluginManagement> <plugins> <plugin> <groupId>net.alchim31.maven</groupId> <artifactId>scala-maven-plugin</artifactId> <executions> <execution> <id>scala-compile-first</id> <phase>process-resources</phase> <goals> <goal>add-source</goal> <goal>compile</goal> </goals> </execution> <execution> <id>scala-test-compile</id> <phase>process-test-resources</phase> <goals> <goal>testCompile</goal> </goals> </execution> </executions> </plugin> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-compiler-plugin</artifactId> <executions> <execution> <phase>compile</phase> <goals> <goal>compile</goal> </goals> </execution> </executions> </plugin> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-shade-plugin</artifactId> <version>2.4.3</version> <executions> <execution> <phase>package</phase> <goals> <goal>shade</goal> </goals> <configuration> <filters> <filter> <artifact>*:*</artifact> <excludes> <exclude>META-INF/*.SF</exclude> <exclude>META-INF/*.DSA</exclude> <exclude>META-INF/*.RSA</exclude> </excludes> </filter> </filters> </configuration> </execution> </executions> </plugin> </plugins> </build> </project>
编码
依赖添加完成以后就可以进行Demo测试了。详细说明可以参考:
- https://github.com/RedisLabs/spark-redis
- https://redislabs.com/blog/getting-started-redis-apache-spark-python/
将Redis配置写入文件中,并命名为application.conf方便后续typesafe进行解析:
redis.host = 10.10.11.8 redis.db = 0 redis.port = 6399 redis.max.total = 500 redis.pool.maxidle = 500 redis.pool.testonborrow = true redis.pool.testonreturn = true redis.pool.maxwaitmillis = 1000 redis.protocol.timeout = 48000
其实Java和Scala都一样,本文采用Scala进行测试:
import java.util import com.redislabs.provider.redis.rdd.RedisKeysRDD import com.typesafe.config.{ConfigFactory, ConfigValue} import org.apache.spark.{SparkConf, SparkContext} /** * @author: w3sun * @date: 2018/08/29 11:51 * @description: * */ object App { def main(args: Array[String]): Unit = { val sparkConf = new SparkConf() .setMaster("local[*]") .setAppName(s"${this.getClass.getSimpleName}") //设置Redis服务器信息,导入操作Redis隐式转换 initRedis(sparkConf) import com.redislabs.provider.redis._ val sparkContext = new SparkContext(sparkConf) //根据指定pattern获取所有符合条件的Key所对应的RDD val keysRDD: RedisKeysRDD = sparkContext.fromRedisKeyPattern("1002010011540185*", 4) keysRDD.foreach(println) sparkContext.stop() } private def initRedis(sparkConf: SparkConf): Unit = { import scala.collection.JavaConverters._ ConfigFactory.load() .entrySet().asScala .foreach { entry: util.Map.Entry[String, ConfigValue] => sparkConf.set(entry.getKey, entry.getValue.unwrapped().toString) } } }
输出信息如下:
........ Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties 18/08/29 21:55:48 INFO SparkContext: Running Spark version 2.2.0 18/08/29 21:55:48 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable 18/08/29 21:55:49 INFO SparkContext: Submitted application: App$ 18/08/29 21:55:49 INFO SecurityManager: Changing view acls to: w3sun 18/08/29 21:55:49 INFO SecurityManager: Changing modify acls to: w3sun 18/08/29 21:55:49 INFO SecurityManager: Changing view acls groups to: 18/08/29 21:55:49 INFO SecurityManager: Changing modify acls groups to: 18/08/29 21:55:49 INFO SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(w3sun); groups with view permissions: Set(); users with modify permissions: Set(w3sun); groups with modify permissions: Set() 18/08/29 21:55:49 INFO Utils: Successfully started service 'sparkDriver' on port 55679. 18/08/29 21:55:49 INFO SparkEnv: Registering MapOutputTracker 18/08/29 21:55:49 INFO SparkEnv: Registering BlockManagerMaster 18/08/29 21:55:49 INFO BlockManagerMasterEndpoint: Using org.apache.spark.storage.DefaultTopologyMapper for getting topology information 18/08/29 21:55:49 INFO BlockManagerMasterEndpoint: BlockManagerMasterEndpoint up 18/08/29 21:55:49 INFO DiskBlockManager: Created local directory at /private/var/folders/qh/676_k89s0bsg_cpj9g7042br0000gn/T/blockmgr-9ee6c6d8-0431-4fdb-bdba-91fd49f26600 18/08/29 21:55:49 INFO MemoryStore: MemoryStore started with capacity 912.3 MB 18/08/29 21:55:49 INFO SparkEnv: Registering OutputCommitCoordinator 18/08/29 21:55:50 INFO Utils: Successfully started service 'SparkUI' on port 4040. 18/08/29 21:55:50 INFO SparkUI: Bound SparkUI to 0.0.0.0, and started at http://10.10.10.35:4040 18/08/29 21:55:50 INFO Executor: Starting executor ID driver on host localhost 18/08/29 21:55:50 INFO Utils: Successfully started service 'org.apache.spark.network.netty.NettyBlockTransferService' on port 55680. 18/08/29 21:55:50 INFO NettyBlockTransferService: Server created on 10.10.10.35:55680 18/08/29 21:55:50 INFO BlockManager: Using org.apache.spark.storage.RandomBlockReplicationPolicy for block replication policy 18/08/29 21:55:50 INFO BlockManagerMaster: Registering BlockManager BlockManagerId(driver, 10.10.10.35, 55680, None) 18/08/29 21:55:50 INFO BlockManagerMasterEndpoint: Registering block manager 10.10.10.35:55680 with 912.3 MB RAM, BlockManagerId(driver, 10.60.110.35, 55680, None) 18/08/29 21:55:50 INFO BlockManagerMaster: Registered BlockManager BlockManagerId(driver, 10.10.10.35, 55680, None) 18/08/29 21:55:50 INFO BlockManager: Initialized BlockManager: BlockManagerId(driver, 10.10.10.35, 55680, None) 18/08/29 21:55:51 INFO SparkContext: Starting job: foreach at App.scala:26 18/08/29 21:55:51 INFO DAGScheduler: Got job 0 (foreach at App.scala:26) with 3 output partitions 18/08/29 21:55:51 INFO DAGScheduler: Final stage: ResultStage 0 (foreach at App.scala:26) 18/08/29 21:55:51 INFO DAGScheduler: Parents of final stage: List() 18/08/29 21:55:51 INFO DAGScheduler: Missing parents: List() 18/08/29 21:55:51 INFO DAGScheduler: Submitting ResultStage 0 (RedisKeysRDD[0] at RDD at RedisRDD.scala:189), which has no missing parents 18/08/29 21:55:51 INFO MemoryStore: Block broadcast_0 stored as values in memory (estimated size 1864.0 B, free 912.3 MB) 18/08/29 21:55:51 INFO MemoryStore: Block broadcast_0_piece0 stored as bytes in memory (estimated size 1304.0 B, free 912.3 MB) 18/08/29 21:55:51 INFO BlockManagerInfo: Added broadcast_0_piece0 in memory on 10.10.10.35:55680 (size: 1304.0 B, free: 912.3 MB) 18/08/29 21:55:51 INFO SparkContext: Created broadcast 0 from broadcast at DAGScheduler.scala:1006 18/08/29 21:55:51 INFO DAGScheduler: Submitting 3 missing tasks from ResultStage 0 (RedisKeysRDD[0] at RDD at RedisRDD.scala:189) (first 15 tasks are for partitions Vector(0, 1, 2)) 18/08/29 21:55:51 INFO TaskSchedulerImpl: Adding task set 0.0 with 3 tasks 18/08/29 21:55:51 INFO TaskSetManager: Starting task 0.0 in stage 0.0 (TID 0, localhost, executor driver, partition 0, ANY, 5434 bytes) 18/08/29 21:55:51 INFO TaskSetManager: Starting task 1.0 in stage 0.0 (TID 1, localhost, executor driver, partition 1, ANY, 5434 bytes) 18/08/29 21:55:51 INFO TaskSetManager: Starting task 2.0 in stage 0.0 (TID 2, localhost, executor driver, partition 2, ANY, 5434 bytes) 18/08/29 21:55:51 INFO Executor: Running task 0.0 in stage 0.0 (TID 0) 18/08/29 21:55:51 INFO Executor: Running task 2.0 in stage 0.0 (TID 2) 18/08/29 21:55:51 INFO Executor: Running task 1.0 in stage 0.0 (TID 1) 1002010011540185822646 1002010011540185061520 1002010011540185971654 1002010011540185244056 18/08/29 21:55:56 INFO Executor: Finished task 1.0 in stage 0.0 (TID 1). 751 bytes result sent to driver 18/08/29 21:55:56 INFO Executor: Finished task 2.0 in stage 0.0 (TID 2). 751 bytes result sent to driver 18/08/29 21:55:56 INFO TaskSetManager: Finished task 1.0 in stage 0.0 (TID 1) in 4721 ms on localhost (executor driver) (1/3) 18/08/29 21:55:56 INFO TaskSetManager: Finished task 2.0 in stage 0.0 (TID 2) in 4721 ms on localhost (executor driver) (2/3) 18/08/29 21:55:57 INFO Executor: Finished task 0.0 in stage 0.0 (TID 0). 708 bytes result sent to driver 1002010011540185977218 18/08/29 21:55:57 INFO TaskSetManager: Finished task 0.0 in stage 0.0 (TID 0) in 6062 ms on localhost (executor driver) (3/3) 18/08/29 21:55:57 INFO TaskSchedulerImpl: Removed TaskSet 0.0, whose tasks have all completed, from pool 18/08/29 21:55:57 INFO DAGScheduler: ResultStage 0 (foreach at App.scala:26) finished in 6.079 s 18/08/29 21:55:57 INFO DAGScheduler: Job 0 finished: foreach at App.scala:26, took 6.331050 s 18/08/29 21:55:57 INFO SparkUI: Stopped Spark web UI at http://10.10.10.35:4040 18/08/29 21:55:57 INFO MapOutputTrackerMasterEndpoint: MapOutputTrackerMasterEndpoint stopped! 18/08/29 21:55:57 INFO MemoryStore: MemoryStore cleared 18/08/29 21:55:57 INFO BlockManager: BlockManager stopped 18/08/29 21:55:57 INFO BlockManagerMaster: BlockManagerMaster stopped 18/08/29 21:55:57 INFO OutputCommitCoordinator$OutputCommitCoordinatorEndpoint: OutputCommitCoordinator stopped! 18/08/29 21:55:57 INFO SparkContext: Successfully stopped SparkContext 18/08/29 21:55:57 INFO ShutdownHookManager: Shutdown hook called 18/08/29 21:55:57 INFO ShutdownHookManager: Deleting directory /private/var/folders/qh/676_k89s0bsg_cpj9g7042br0000gn/T/spark-1c43f998-49d6-45aa-9f8f-954ceed06b6e Process finished with exit code 0
从输出信息可以看出匹配到了4条数据,并输出了他们各自的Key。
Tips
其实自己在Debug调试的时候可能会发现打印信息太多了,为了减少无用日志,可以手动调整日志的输出级别:
object App { Logger.getLogger("org.apache.spark").setLevel(Level.WARN) def main(args: Array[String]): Unit = { ...... } }
言归正传,从fromRedisKeyPattern源码中可以看到:
/** * @param keyPattern a key pattern to match, or a single key * @param partitionNum number of partitions * @return RedisKeysRDD of simple Keys stored in redis server */ def fromRedisKeyPattern(keyPattern: String = "*", partitionNum: Int = 3) (implicit redisConfig: RedisConfig = new RedisConfig(new RedisEndpoint(sc.getConf))): RedisKeysRDD = { new RedisKeysRDD(sc, redisConfig, keyPattern, partitionNum, null) }
默认的匹配模式是redis中对应的keys*操作,默认的分区数量为3,这些参数都可以灵活配置。当然,Spark-Redis支持的功能远远不止这些,使用的时候可以继续探索。
参考文档
转载请注明:雪后西塘 » Spark优雅操作Redis