目录
前言
随着Spark在数据处理领域越来越火,应用范围也不断扩大。相应的Source和Sink支持也越来越多,因为项目中需要从Redis中读取数据并进行操作,所以需要一种优雅的方式来操作Redis。一番搜索以后发现,除了自定义Source以外还可以通过redislabs.com出品的Connector进行操作。Redis Labs是一家位于加利福尼亚州山景城的私人计算机软件公司。它提供了一个数据库管理系统,作为开源软件或使用云计算的服务作为“NoSQL”销售。其实就是Redis的商业化公司,毕竟Spark现在这么火,许多公司想搭上顺风车。
Maven依赖
废话不多说,可以添加依赖搞起。详细的pom文件如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 |
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>com.w3sun.data</groupId> <artifactId>spark-redis</artifactId> <version>1.0-SNAPSHOT</version> <properties> <maven.compiler.source>1.8</maven.compiler.source> <maven.compiler.target>1.8</maven.compiler.target> <spark.version>2.2.0</spark.version> <type.safe.version>1.3.0</type.safe.version> <joda.time.version>2.9.4</joda.time.version> <fast.json.version>1.2.10</fast.json.version> <spark.redis.version>0.3.2</spark.redis.version> <encoding>UTF-8</encoding> </properties> <repositories> <!-- list of RedisLabs repositories --> <repository> <id>SparkPackagesRepo</id> <url>http://dl.bintray.com/spark-packages/maven</url> </repository> </repositories> <dependencies> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-core_2.11</artifactId> <version>${spark.version}</version> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-streaming_2.11</artifactId> <version>${spark.version}</version> </dependency> <!--Configuration for configuration reader--> <dependency> <groupId>com.typesafe</groupId> <artifactId>config</artifactId> <version>${type.safe.version}</version> </dependency> <!--Configuration for Joda time to generate timestamp--> <dependency> <groupId>joda-time</groupId> <artifactId>joda-time</artifactId> <version>${joda.time.version}</version> </dependency> <dependency> <groupId>com.alibaba</groupId> <artifactId>fastjson</artifactId> <version>${fast.json.version}</version> </dependency> <!--ConfigConfiguration for Spark && Redis RDD generation transformation --> <dependency> <groupId>RedisLabs</groupId> <artifactId>spark-redis</artifactId> <version>${spark.redis.version}</version> </dependency> </dependencies> <build> <pluginManagement> <plugins> <plugin> <groupId>net.alchim31.maven</groupId> <artifactId>scala-maven-plugin</artifactId> <version>3.2.2</version> </plugin> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-compiler-plugin</artifactId> <version>3.5.1</version> </plugin> </plugins> </pluginManagement> <plugins> <plugin> <groupId>net.alchim31.maven</groupId> <artifactId>scala-maven-plugin</artifactId> <executions> <execution> <id>scala-compile-first</id> <phase>process-resources</phase> <goals> <goal>add-source</goal> <goal>compile</goal> </goals> </execution> <execution> <id>scala-test-compile</id> <phase>process-test-resources</phase> <goals> <goal>testCompile</goal> </goals> </execution> </executions> </plugin> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-compiler-plugin</artifactId> <executions> <execution> <phase>compile</phase> <goals> <goal>compile</goal> </goals> </execution> </executions> </plugin> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-shade-plugin</artifactId> <version>2.4.3</version> <executions> <execution> <phase>package</phase> <goals> <goal>shade</goal> </goals> <configuration> <filters> <filter> <artifact>*:*</artifact> <excludes> <exclude>META-INF/*.SF</exclude> <exclude>META-INF/*.DSA</exclude> <exclude>META-INF/*.RSA</exclude> </excludes> </filter> </filters> </configuration> </execution> </executions> </plugin> </plugins> </build> </project> |
编码
依赖添加完成以后就可以进行Demo测试了。详细说明可以参考:
- https://github.com/RedisLabs/spark-redis
- https://redislabs.com/blog/getting-started-redis-apache-spark-python/
将Redis配置写入文件中,并命名为application.conf方便后续typesafe进行解析:
1 2 3 4 5 6 7 8 9 |
redis.host = 10.10.11.8 redis.db = 0 redis.port = 6399 redis.max.total = 500 redis.pool.maxidle = 500 redis.pool.testonborrow = true redis.pool.testonreturn = true redis.pool.maxwaitmillis = 1000 redis.protocol.timeout = 48000 |
其实Java和Scala都一样,本文采用Scala进行测试:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 |
import java.util import com.redislabs.provider.redis.rdd.RedisKeysRDD import com.typesafe.config.{ConfigFactory, ConfigValue} import org.apache.spark.{SparkConf, SparkContext} /** * @author: w3sun * @date: 2018/08/29 11:51 * @description: * */ object App { def main(args: Array[String]): Unit = { val sparkConf = new SparkConf() .setMaster("local[*]") .setAppName(s"${this.getClass.getSimpleName}") //设置Redis服务器信息,导入操作Redis隐式转换 initRedis(sparkConf) import com.redislabs.provider.redis._ val sparkContext = new SparkContext(sparkConf) //根据指定pattern获取所有符合条件的Key所对应的RDD val keysRDD: RedisKeysRDD = sparkContext.fromRedisKeyPattern("1002010011540185*", 4) keysRDD.foreach(println) sparkContext.stop() } private def initRedis(sparkConf: SparkConf): Unit = { import scala.collection.JavaConverters._ ConfigFactory.load() .entrySet().asScala .foreach { entry: util.Map.Entry[String, ConfigValue] => sparkConf.set(entry.getKey, entry.getValue.unwrapped().toString) } } } |
输出信息如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 |
........ Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties 18/08/29 21:55:48 INFO SparkContext: Running Spark version 2.2.0 18/08/29 21:55:48 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable 18/08/29 21:55:49 INFO SparkContext: Submitted application: App$ 18/08/29 21:55:49 INFO SecurityManager: Changing view acls to: w3sun 18/08/29 21:55:49 INFO SecurityManager: Changing modify acls to: w3sun 18/08/29 21:55:49 INFO SecurityManager: Changing view acls groups to: 18/08/29 21:55:49 INFO SecurityManager: Changing modify acls groups to: 18/08/29 21:55:49 INFO SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(w3sun); groups with view permissions: Set(); users with modify permissions: Set(w3sun); groups with modify permissions: Set() 18/08/29 21:55:49 INFO Utils: Successfully started service 'sparkDriver' on port 55679. 18/08/29 21:55:49 INFO SparkEnv: Registering MapOutputTracker 18/08/29 21:55:49 INFO SparkEnv: Registering BlockManagerMaster 18/08/29 21:55:49 INFO BlockManagerMasterEndpoint: Using org.apache.spark.storage.DefaultTopologyMapper for getting topology information 18/08/29 21:55:49 INFO BlockManagerMasterEndpoint: BlockManagerMasterEndpoint up 18/08/29 21:55:49 INFO DiskBlockManager: Created local directory at /private/var/folders/qh/676_k89s0bsg_cpj9g7042br0000gn/T/blockmgr-9ee6c6d8-0431-4fdb-bdba-91fd49f26600 18/08/29 21:55:49 INFO MemoryStore: MemoryStore started with capacity 912.3 MB 18/08/29 21:55:49 INFO SparkEnv: Registering OutputCommitCoordinator 18/08/29 21:55:50 INFO Utils: Successfully started service 'SparkUI' on port 4040. 18/08/29 21:55:50 INFO SparkUI: Bound SparkUI to 0.0.0.0, and started at http://10.10.10.35:4040 18/08/29 21:55:50 INFO Executor: Starting executor ID driver on host localhost 18/08/29 21:55:50 INFO Utils: Successfully started service 'org.apache.spark.network.netty.NettyBlockTransferService' on port 55680. 18/08/29 21:55:50 INFO NettyBlockTransferService: Server created on 10.10.10.35:55680 18/08/29 21:55:50 INFO BlockManager: Using org.apache.spark.storage.RandomBlockReplicationPolicy for block replication policy 18/08/29 21:55:50 INFO BlockManagerMaster: Registering BlockManager BlockManagerId(driver, 10.10.10.35, 55680, None) 18/08/29 21:55:50 INFO BlockManagerMasterEndpoint: Registering block manager 10.10.10.35:55680 with 912.3 MB RAM, BlockManagerId(driver, 10.60.110.35, 55680, None) 18/08/29 21:55:50 INFO BlockManagerMaster: Registered BlockManager BlockManagerId(driver, 10.10.10.35, 55680, None) 18/08/29 21:55:50 INFO BlockManager: Initialized BlockManager: BlockManagerId(driver, 10.10.10.35, 55680, None) 18/08/29 21:55:51 INFO SparkContext: Starting job: foreach at App.scala:26 18/08/29 21:55:51 INFO DAGScheduler: Got job 0 (foreach at App.scala:26) with 3 output partitions 18/08/29 21:55:51 INFO DAGScheduler: Final stage: ResultStage 0 (foreach at App.scala:26) 18/08/29 21:55:51 INFO DAGScheduler: Parents of final stage: List() 18/08/29 21:55:51 INFO DAGScheduler: Missing parents: List() 18/08/29 21:55:51 INFO DAGScheduler: Submitting ResultStage 0 (RedisKeysRDD[0] at RDD at RedisRDD.scala:189), which has no missing parents 18/08/29 21:55:51 INFO MemoryStore: Block broadcast_0 stored as values in memory (estimated size 1864.0 B, free 912.3 MB) 18/08/29 21:55:51 INFO MemoryStore: Block broadcast_0_piece0 stored as bytes in memory (estimated size 1304.0 B, free 912.3 MB) 18/08/29 21:55:51 INFO BlockManagerInfo: Added broadcast_0_piece0 in memory on 10.10.10.35:55680 (size: 1304.0 B, free: 912.3 MB) 18/08/29 21:55:51 INFO SparkContext: Created broadcast 0 from broadcast at DAGScheduler.scala:1006 18/08/29 21:55:51 INFO DAGScheduler: Submitting 3 missing tasks from ResultStage 0 (RedisKeysRDD[0] at RDD at RedisRDD.scala:189) (first 15 tasks are for partitions Vector(0, 1, 2)) 18/08/29 21:55:51 INFO TaskSchedulerImpl: Adding task set 0.0 with 3 tasks 18/08/29 21:55:51 INFO TaskSetManager: Starting task 0.0 in stage 0.0 (TID 0, localhost, executor driver, partition 0, ANY, 5434 bytes) 18/08/29 21:55:51 INFO TaskSetManager: Starting task 1.0 in stage 0.0 (TID 1, localhost, executor driver, partition 1, ANY, 5434 bytes) 18/08/29 21:55:51 INFO TaskSetManager: Starting task 2.0 in stage 0.0 (TID 2, localhost, executor driver, partition 2, ANY, 5434 bytes) 18/08/29 21:55:51 INFO Executor: Running task 0.0 in stage 0.0 (TID 0) 18/08/29 21:55:51 INFO Executor: Running task 2.0 in stage 0.0 (TID 2) 18/08/29 21:55:51 INFO Executor: Running task 1.0 in stage 0.0 (TID 1) 1002010011540185822646 1002010011540185061520 1002010011540185971654 1002010011540185244056 18/08/29 21:55:56 INFO Executor: Finished task 1.0 in stage 0.0 (TID 1). 751 bytes result sent to driver 18/08/29 21:55:56 INFO Executor: Finished task 2.0 in stage 0.0 (TID 2). 751 bytes result sent to driver 18/08/29 21:55:56 INFO TaskSetManager: Finished task 1.0 in stage 0.0 (TID 1) in 4721 ms on localhost (executor driver) (1/3) 18/08/29 21:55:56 INFO TaskSetManager: Finished task 2.0 in stage 0.0 (TID 2) in 4721 ms on localhost (executor driver) (2/3) 18/08/29 21:55:57 INFO Executor: Finished task 0.0 in stage 0.0 (TID 0). 708 bytes result sent to driver 1002010011540185977218 18/08/29 21:55:57 INFO TaskSetManager: Finished task 0.0 in stage 0.0 (TID 0) in 6062 ms on localhost (executor driver) (3/3) 18/08/29 21:55:57 INFO TaskSchedulerImpl: Removed TaskSet 0.0, whose tasks have all completed, from pool 18/08/29 21:55:57 INFO DAGScheduler: ResultStage 0 (foreach at App.scala:26) finished in 6.079 s 18/08/29 21:55:57 INFO DAGScheduler: Job 0 finished: foreach at App.scala:26, took 6.331050 s 18/08/29 21:55:57 INFO SparkUI: Stopped Spark web UI at http://10.10.10.35:4040 18/08/29 21:55:57 INFO MapOutputTrackerMasterEndpoint: MapOutputTrackerMasterEndpoint stopped! 18/08/29 21:55:57 INFO MemoryStore: MemoryStore cleared 18/08/29 21:55:57 INFO BlockManager: BlockManager stopped 18/08/29 21:55:57 INFO BlockManagerMaster: BlockManagerMaster stopped 18/08/29 21:55:57 INFO OutputCommitCoordinator$OutputCommitCoordinatorEndpoint: OutputCommitCoordinator stopped! 18/08/29 21:55:57 INFO SparkContext: Successfully stopped SparkContext 18/08/29 21:55:57 INFO ShutdownHookManager: Shutdown hook called 18/08/29 21:55:57 INFO ShutdownHookManager: Deleting directory /private/var/folders/qh/676_k89s0bsg_cpj9g7042br0000gn/T/spark-1c43f998-49d6-45aa-9f8f-954ceed06b6e Process finished with exit code 0 |
从输出信息可以看出匹配到了4条数据,并输出了他们各自的Key。
Tips
其实自己在Debug调试的时候可能会发现打印信息太多了,为了减少无用日志,可以手动调整日志的输出级别:
1 2 3 4 5 6 |
object App { Logger.getLogger("org.apache.spark").setLevel(Level.WARN) def main(args: Array[String]): Unit = { ...... } } |
言归正传,从fromRedisKeyPattern源码中可以看到:
1 2 3 4 5 6 7 8 9 10 11 |
/** * @param keyPattern a key pattern to match, or a single key * @param partitionNum number of partitions * @return RedisKeysRDD of simple Keys stored in redis server */ def fromRedisKeyPattern(keyPattern: String = "*", partitionNum: Int = 3) (implicit redisConfig: RedisConfig = new RedisConfig(new RedisEndpoint(sc.getConf))): RedisKeysRDD = { new RedisKeysRDD(sc, redisConfig, keyPattern, partitionNum, null) } |
可以看出,默认的匹配模式是redis中对应的keys*操作,默认的分区数量为3,这些参数都可以灵活配置。当然,Spark-Redis支持的功能远远不止这些,使用的时候可以继续探索。
参考文档
© 著作权归作者所有
文章评论(0)