目录
前言
随着Spark在数据处理领域越来越火,应用范围也不断扩大。相应的Source和Sink支持也越来越多,因为项目中需要从Redis中读取数据并进行操作,所以需要一种优雅的方式来操作Redis。一番搜索以后发现,除了自定义Source以外还可以通过redislabs.com出品的Connector进行操作。Redis Labs是一家位于加利福尼亚州山景城的私人计算机软件公司。它提供了一个数据库管理系统,作为开源软件或使用云计算的服务作为“NoSQL”销售。其实就是Redis的商业化公司,毕竟Spark现在这么火,许多公司想搭上顺风车。
Maven依赖
废话不多说,可以添加依赖搞起。详细的pom文件如下:
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.w3sun.data</groupId>
<artifactId>spark-redis</artifactId>
<version>1.0-SNAPSHOT</version>
<properties>
<maven.compiler.source>1.8</maven.compiler.source>
<maven.compiler.target>1.8</maven.compiler.target>
<spark.version>2.2.0</spark.version>
<type.safe.version>1.3.0</type.safe.version>
<joda.time.version>2.9.4</joda.time.version>
<fast.json.version>1.2.10</fast.json.version>
<spark.redis.version>0.3.2</spark.redis.version>
<encoding>UTF-8</encoding>
</properties>
<repositories>
<!-- list of RedisLabs repositories -->
<repository>
<id>SparkPackagesRepo</id>
<url>http://dl.bintray.com/spark-packages/maven</url>
</repository>
</repositories>
<dependencies>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.11</artifactId>
<version>${spark.version}</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_2.11</artifactId>
<version>${spark.version}</version>
</dependency>
<!--Configuration for configuration reader-->
<dependency>
<groupId>com.typesafe</groupId>
<artifactId>config</artifactId>
<version>${type.safe.version}</version>
</dependency>
<!--Configuration for Joda time to generate timestamp-->
<dependency>
<groupId>joda-time</groupId>
<artifactId>joda-time</artifactId>
<version>${joda.time.version}</version>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>${fast.json.version}</version>
</dependency>
<!--ConfigConfiguration for Spark && Redis RDD generation transformation -->
<dependency>
<groupId>RedisLabs</groupId>
<artifactId>spark-redis</artifactId>
<version>${spark.redis.version}</version>
</dependency>
</dependencies>
<build>
<pluginManagement>
<plugins>
<plugin>
<groupId>net.alchim31.maven</groupId>
<artifactId>scala-maven-plugin</artifactId>
<version>3.2.2</version>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.5.1</version>
</plugin>
</plugins>
</pluginManagement>
<plugins>
<plugin>
<groupId>net.alchim31.maven</groupId>
<artifactId>scala-maven-plugin</artifactId>
<executions>
<execution>
<id>scala-compile-first</id>
<phase>process-resources</phase>
<goals>
<goal>add-source</goal>
<goal>compile</goal>
</goals>
</execution>
<execution>
<id>scala-test-compile</id>
<phase>process-test-resources</phase>
<goals>
<goal>testCompile</goal>
</goals>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<executions>
<execution>
<phase>compile</phase>
<goals>
<goal>compile</goal>
</goals>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>2.4.3</version>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<filters>
<filter>
<artifact>*:*</artifact>
<excludes>
<exclude>META-INF/*.SF</exclude>
<exclude>META-INF/*.DSA</exclude>
<exclude>META-INF/*.RSA</exclude>
</excludes>
</filter>
</filters>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.w3sun.data</groupId>
<artifactId>spark-redis</artifactId>
<version>1.0-SNAPSHOT</version>
<properties>
<maven.compiler.source>1.8</maven.compiler.source>
<maven.compiler.target>1.8</maven.compiler.target>
<spark.version>2.2.0</spark.version>
<type.safe.version>1.3.0</type.safe.version>
<joda.time.version>2.9.4</joda.time.version>
<fast.json.version>1.2.10</fast.json.version>
<spark.redis.version>0.3.2</spark.redis.version>
<encoding>UTF-8</encoding>
</properties>
<repositories>
<!-- list of RedisLabs repositories -->
<repository>
<id>SparkPackagesRepo</id>
<url>http://dl.bintray.com/spark-packages/maven</url>
</repository>
</repositories>
<dependencies>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.11</artifactId>
<version>${spark.version}</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_2.11</artifactId>
<version>${spark.version}</version>
</dependency>
<!--Configuration for configuration reader-->
<dependency>
<groupId>com.typesafe</groupId>
<artifactId>config</artifactId>
<version>${type.safe.version}</version>
</dependency>
<!--Configuration for Joda time to generate timestamp-->
<dependency>
<groupId>joda-time</groupId>
<artifactId>joda-time</artifactId>
<version>${joda.time.version}</version>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>${fast.json.version}</version>
</dependency>
<!--ConfigConfiguration for Spark && Redis RDD generation transformation -->
<dependency>
<groupId>RedisLabs</groupId>
<artifactId>spark-redis</artifactId>
<version>${spark.redis.version}</version>
</dependency>
</dependencies>
<build>
<pluginManagement>
<plugins>
<plugin>
<groupId>net.alchim31.maven</groupId>
<artifactId>scala-maven-plugin</artifactId>
<version>3.2.2</version>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.5.1</version>
</plugin>
</plugins>
</pluginManagement>
<plugins>
<plugin>
<groupId>net.alchim31.maven</groupId>
<artifactId>scala-maven-plugin</artifactId>
<executions>
<execution>
<id>scala-compile-first</id>
<phase>process-resources</phase>
<goals>
<goal>add-source</goal>
<goal>compile</goal>
</goals>
</execution>
<execution>
<id>scala-test-compile</id>
<phase>process-test-resources</phase>
<goals>
<goal>testCompile</goal>
</goals>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<executions>
<execution>
<phase>compile</phase>
<goals>
<goal>compile</goal>
</goals>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>2.4.3</version>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<filters>
<filter>
<artifact>*:*</artifact>
<excludes>
<exclude>META-INF/*.SF</exclude>
<exclude>META-INF/*.DSA</exclude>
<exclude>META-INF/*.RSA</exclude>
</excludes>
</filter>
</filters>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>com.w3sun.data</groupId> <artifactId>spark-redis</artifactId> <version>1.0-SNAPSHOT</version> <properties> <maven.compiler.source>1.8</maven.compiler.source> <maven.compiler.target>1.8</maven.compiler.target> <spark.version>2.2.0</spark.version> <type.safe.version>1.3.0</type.safe.version> <joda.time.version>2.9.4</joda.time.version> <fast.json.version>1.2.10</fast.json.version> <spark.redis.version>0.3.2</spark.redis.version> <encoding>UTF-8</encoding> </properties> <repositories> <!-- list of RedisLabs repositories --> <repository> <id>SparkPackagesRepo</id> <url>http://dl.bintray.com/spark-packages/maven</url> </repository> </repositories> <dependencies> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-core_2.11</artifactId> <version>${spark.version}</version> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-streaming_2.11</artifactId> <version>${spark.version}</version> </dependency> <!--Configuration for configuration reader--> <dependency> <groupId>com.typesafe</groupId> <artifactId>config</artifactId> <version>${type.safe.version}</version> </dependency> <!--Configuration for Joda time to generate timestamp--> <dependency> <groupId>joda-time</groupId> <artifactId>joda-time</artifactId> <version>${joda.time.version}</version> </dependency> <dependency> <groupId>com.alibaba</groupId> <artifactId>fastjson</artifactId> <version>${fast.json.version}</version> </dependency> <!--ConfigConfiguration for Spark && Redis RDD generation transformation --> <dependency> <groupId>RedisLabs</groupId> <artifactId>spark-redis</artifactId> <version>${spark.redis.version}</version> </dependency> </dependencies> <build> <pluginManagement> <plugins> <plugin> <groupId>net.alchim31.maven</groupId> <artifactId>scala-maven-plugin</artifactId> <version>3.2.2</version> </plugin> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-compiler-plugin</artifactId> <version>3.5.1</version> </plugin> </plugins> </pluginManagement> <plugins> <plugin> <groupId>net.alchim31.maven</groupId> <artifactId>scala-maven-plugin</artifactId> <executions> <execution> <id>scala-compile-first</id> <phase>process-resources</phase> <goals> <goal>add-source</goal> <goal>compile</goal> </goals> </execution> <execution> <id>scala-test-compile</id> <phase>process-test-resources</phase> <goals> <goal>testCompile</goal> </goals> </execution> </executions> </plugin> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-compiler-plugin</artifactId> <executions> <execution> <phase>compile</phase> <goals> <goal>compile</goal> </goals> </execution> </executions> </plugin> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-shade-plugin</artifactId> <version>2.4.3</version> <executions> <execution> <phase>package</phase> <goals> <goal>shade</goal> </goals> <configuration> <filters> <filter> <artifact>*:*</artifact> <excludes> <exclude>META-INF/*.SF</exclude> <exclude>META-INF/*.DSA</exclude> <exclude>META-INF/*.RSA</exclude> </excludes> </filter> </filters> </configuration> </execution> </executions> </plugin> </plugins> </build> </project>
编码
依赖添加完成以后就可以进行Demo测试了。详细说明可以参考:
- https://github.com/RedisLabs/spark-redis
- https://redislabs.com/blog/getting-started-redis-apache-spark-python/
将Redis配置写入文件中,并命名为application.conf方便后续typesafe进行解析:
redis.host = 10.10.11.8
redis.db = 0
redis.port = 6399
redis.max.total = 500
redis.pool.maxidle = 500
redis.pool.testonborrow = true
redis.pool.testonreturn = true
redis.pool.maxwaitmillis = 1000
redis.protocol.timeout = 48000
redis.host = 10.10.11.8
redis.db = 0
redis.port = 6399
redis.max.total = 500
redis.pool.maxidle = 500
redis.pool.testonborrow = true
redis.pool.testonreturn = true
redis.pool.maxwaitmillis = 1000
redis.protocol.timeout = 48000
redis.host = 10.10.11.8 redis.db = 0 redis.port = 6399 redis.max.total = 500 redis.pool.maxidle = 500 redis.pool.testonborrow = true redis.pool.testonreturn = true redis.pool.maxwaitmillis = 1000 redis.protocol.timeout = 48000
其实Java和Scala都一样,本文采用Scala进行测试:
import java.util
import com.redislabs.provider.redis.rdd.RedisKeysRDD
import com.typesafe.config.{ConfigFactory, ConfigValue}
import org.apache.spark.{SparkConf, SparkContext}
/**
* @author: w3sun
* @date: 2018/08/29 11:51
* @description:
*
*/
object App {
def main(args: Array[String]): Unit = {
val sparkConf = new SparkConf()
.setMaster("local[*]")
.setAppName(s"${this.getClass.getSimpleName}")
//设置Redis服务器信息,导入操作Redis隐式转换
initRedis(sparkConf)
import com.redislabs.provider.redis._
val sparkContext = new SparkContext(sparkConf)
//根据指定pattern获取所有符合条件的Key所对应的RDD
val keysRDD: RedisKeysRDD = sparkContext.fromRedisKeyPattern("1002010011540185*", 4)
keysRDD.foreach(println)
sparkContext.stop()
}
private def initRedis(sparkConf: SparkConf): Unit = {
import scala.collection.JavaConverters._
ConfigFactory.load()
.entrySet().asScala
.foreach {
entry: util.Map.Entry[String, ConfigValue] =>
sparkConf.set(entry.getKey, entry.getValue.unwrapped().toString)
}
}
}
import java.util
import com.redislabs.provider.redis.rdd.RedisKeysRDD
import com.typesafe.config.{ConfigFactory, ConfigValue}
import org.apache.spark.{SparkConf, SparkContext}
/**
* @author: w3sun
* @date: 2018/08/29 11:51
* @description:
*
*/
object App {
def main(args: Array[String]): Unit = {
val sparkConf = new SparkConf()
.setMaster("local[*]")
.setAppName(s"${this.getClass.getSimpleName}")
//设置Redis服务器信息,导入操作Redis隐式转换
initRedis(sparkConf)
import com.redislabs.provider.redis._
val sparkContext = new SparkContext(sparkConf)
//根据指定pattern获取所有符合条件的Key所对应的RDD
val keysRDD: RedisKeysRDD = sparkContext.fromRedisKeyPattern("1002010011540185*", 4)
keysRDD.foreach(println)
sparkContext.stop()
}
private def initRedis(sparkConf: SparkConf): Unit = {
import scala.collection.JavaConverters._
ConfigFactory.load()
.entrySet().asScala
.foreach {
entry: util.Map.Entry[String, ConfigValue] =>
sparkConf.set(entry.getKey, entry.getValue.unwrapped().toString)
}
}
}
import java.util import com.redislabs.provider.redis.rdd.RedisKeysRDD import com.typesafe.config.{ConfigFactory, ConfigValue} import org.apache.spark.{SparkConf, SparkContext} /** * @author: w3sun * @date: 2018/08/29 11:51 * @description: * */ object App { def main(args: Array[String]): Unit = { val sparkConf = new SparkConf() .setMaster("local[*]") .setAppName(s"${this.getClass.getSimpleName}") //设置Redis服务器信息,导入操作Redis隐式转换 initRedis(sparkConf) import com.redislabs.provider.redis._ val sparkContext = new SparkContext(sparkConf) //根据指定pattern获取所有符合条件的Key所对应的RDD val keysRDD: RedisKeysRDD = sparkContext.fromRedisKeyPattern("1002010011540185*", 4) keysRDD.foreach(println) sparkContext.stop() } private def initRedis(sparkConf: SparkConf): Unit = { import scala.collection.JavaConverters._ ConfigFactory.load() .entrySet().asScala .foreach { entry: util.Map.Entry[String, ConfigValue] => sparkConf.set(entry.getKey, entry.getValue.unwrapped().toString) } } }
输出信息如下:
........
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
18/08/29 21:55:48 INFO SparkContext: Running Spark version 2.2.0
18/08/29 21:55:48 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
18/08/29 21:55:49 INFO SparkContext: Submitted application: App$
18/08/29 21:55:49 INFO SecurityManager: Changing view acls to: w3sun
18/08/29 21:55:49 INFO SecurityManager: Changing modify acls to: w3sun
18/08/29 21:55:49 INFO SecurityManager: Changing view acls groups to:
18/08/29 21:55:49 INFO SecurityManager: Changing modify acls groups to:
18/08/29 21:55:49 INFO SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(w3sun); groups with view permissions: Set(); users with modify permissions: Set(w3sun); groups with modify permissions: Set()
18/08/29 21:55:49 INFO Utils: Successfully started service 'sparkDriver' on port 55679.
18/08/29 21:55:49 INFO SparkEnv: Registering MapOutputTracker
18/08/29 21:55:49 INFO SparkEnv: Registering BlockManagerMaster
18/08/29 21:55:49 INFO BlockManagerMasterEndpoint: Using org.apache.spark.storage.DefaultTopologyMapper for getting topology information
18/08/29 21:55:49 INFO BlockManagerMasterEndpoint: BlockManagerMasterEndpoint up
18/08/29 21:55:49 INFO DiskBlockManager: Created local directory at /private/var/folders/qh/676_k89s0bsg_cpj9g7042br0000gn/T/blockmgr-9ee6c6d8-0431-4fdb-bdba-91fd49f26600
18/08/29 21:55:49 INFO MemoryStore: MemoryStore started with capacity 912.3 MB
18/08/29 21:55:49 INFO SparkEnv: Registering OutputCommitCoordinator
18/08/29 21:55:50 INFO Utils: Successfully started service 'SparkUI' on port 4040.
18/08/29 21:55:50 INFO SparkUI: Bound SparkUI to 0.0.0.0, and started at http://10.10.10.35:4040
18/08/29 21:55:50 INFO Executor: Starting executor ID driver on host localhost
18/08/29 21:55:50 INFO Utils: Successfully started service 'org.apache.spark.network.netty.NettyBlockTransferService' on port 55680.
18/08/29 21:55:50 INFO NettyBlockTransferService: Server created on 10.10.10.35:55680
18/08/29 21:55:50 INFO BlockManager: Using org.apache.spark.storage.RandomBlockReplicationPolicy for block replication policy
18/08/29 21:55:50 INFO BlockManagerMaster: Registering BlockManager BlockManagerId(driver, 10.10.10.35, 55680, None)
18/08/29 21:55:50 INFO BlockManagerMasterEndpoint: Registering block manager 10.10.10.35:55680 with 912.3 MB RAM, BlockManagerId(driver, 10.60.110.35, 55680, None)
18/08/29 21:55:50 INFO BlockManagerMaster: Registered BlockManager BlockManagerId(driver, 10.10.10.35, 55680, None)
18/08/29 21:55:50 INFO BlockManager: Initialized BlockManager: BlockManagerId(driver, 10.10.10.35, 55680, None)
18/08/29 21:55:51 INFO SparkContext: Starting job: foreach at App.scala:26
18/08/29 21:55:51 INFO DAGScheduler: Got job 0 (foreach at App.scala:26) with 3 output partitions
18/08/29 21:55:51 INFO DAGScheduler: Final stage: ResultStage 0 (foreach at App.scala:26)
18/08/29 21:55:51 INFO DAGScheduler: Parents of final stage: List()
18/08/29 21:55:51 INFO DAGScheduler: Missing parents: List()
18/08/29 21:55:51 INFO DAGScheduler: Submitting ResultStage 0 (RedisKeysRDD[0] at RDD at RedisRDD.scala:189), which has no missing parents
18/08/29 21:55:51 INFO MemoryStore: Block broadcast_0 stored as values in memory (estimated size 1864.0 B, free 912.3 MB)
18/08/29 21:55:51 INFO MemoryStore: Block broadcast_0_piece0 stored as bytes in memory (estimated size 1304.0 B, free 912.3 MB)
18/08/29 21:55:51 INFO BlockManagerInfo: Added broadcast_0_piece0 in memory on 10.10.10.35:55680 (size: 1304.0 B, free: 912.3 MB)
18/08/29 21:55:51 INFO SparkContext: Created broadcast 0 from broadcast at DAGScheduler.scala:1006
18/08/29 21:55:51 INFO DAGScheduler: Submitting 3 missing tasks from ResultStage 0 (RedisKeysRDD[0] at RDD at RedisRDD.scala:189) (first 15 tasks are for partitions Vector(0, 1, 2))
18/08/29 21:55:51 INFO TaskSchedulerImpl: Adding task set 0.0 with 3 tasks
18/08/29 21:55:51 INFO TaskSetManager: Starting task 0.0 in stage 0.0 (TID 0, localhost, executor driver, partition 0, ANY, 5434 bytes)
18/08/29 21:55:51 INFO TaskSetManager: Starting task 1.0 in stage 0.0 (TID 1, localhost, executor driver, partition 1, ANY, 5434 bytes)
18/08/29 21:55:51 INFO TaskSetManager: Starting task 2.0 in stage 0.0 (TID 2, localhost, executor driver, partition 2, ANY, 5434 bytes)
18/08/29 21:55:51 INFO Executor: Running task 0.0 in stage 0.0 (TID 0)
18/08/29 21:55:51 INFO Executor: Running task 2.0 in stage 0.0 (TID 2)
18/08/29 21:55:51 INFO Executor: Running task 1.0 in stage 0.0 (TID 1)
1002010011540185822646
1002010011540185061520
1002010011540185971654
1002010011540185244056
18/08/29 21:55:56 INFO Executor: Finished task 1.0 in stage 0.0 (TID 1). 751 bytes result sent to driver
18/08/29 21:55:56 INFO Executor: Finished task 2.0 in stage 0.0 (TID 2). 751 bytes result sent to driver
18/08/29 21:55:56 INFO TaskSetManager: Finished task 1.0 in stage 0.0 (TID 1) in 4721 ms on localhost (executor driver) (1/3)
18/08/29 21:55:56 INFO TaskSetManager: Finished task 2.0 in stage 0.0 (TID 2) in 4721 ms on localhost (executor driver) (2/3)
18/08/29 21:55:57 INFO Executor: Finished task 0.0 in stage 0.0 (TID 0). 708 bytes result sent to driver
1002010011540185977218
18/08/29 21:55:57 INFO TaskSetManager: Finished task 0.0 in stage 0.0 (TID 0) in 6062 ms on localhost (executor driver) (3/3)
18/08/29 21:55:57 INFO TaskSchedulerImpl: Removed TaskSet 0.0, whose tasks have all completed, from pool
18/08/29 21:55:57 INFO DAGScheduler: ResultStage 0 (foreach at App.scala:26) finished in 6.079 s
18/08/29 21:55:57 INFO DAGScheduler: Job 0 finished: foreach at App.scala:26, took 6.331050 s
18/08/29 21:55:57 INFO SparkUI: Stopped Spark web UI at http://10.10.10.35:4040
18/08/29 21:55:57 INFO MapOutputTrackerMasterEndpoint: MapOutputTrackerMasterEndpoint stopped!
18/08/29 21:55:57 INFO MemoryStore: MemoryStore cleared
18/08/29 21:55:57 INFO BlockManager: BlockManager stopped
18/08/29 21:55:57 INFO BlockManagerMaster: BlockManagerMaster stopped
18/08/29 21:55:57 INFO OutputCommitCoordinator$OutputCommitCoordinatorEndpoint: OutputCommitCoordinator stopped!
18/08/29 21:55:57 INFO SparkContext: Successfully stopped SparkContext
18/08/29 21:55:57 INFO ShutdownHookManager: Shutdown hook called
18/08/29 21:55:57 INFO ShutdownHookManager: Deleting directory /private/var/folders/qh/676_k89s0bsg_cpj9g7042br0000gn/T/spark-1c43f998-49d6-45aa-9f8f-954ceed06b6e
Process finished with exit code 0
........
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
18/08/29 21:55:48 INFO SparkContext: Running Spark version 2.2.0
18/08/29 21:55:48 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
18/08/29 21:55:49 INFO SparkContext: Submitted application: App$
18/08/29 21:55:49 INFO SecurityManager: Changing view acls to: w3sun
18/08/29 21:55:49 INFO SecurityManager: Changing modify acls to: w3sun
18/08/29 21:55:49 INFO SecurityManager: Changing view acls groups to:
18/08/29 21:55:49 INFO SecurityManager: Changing modify acls groups to:
18/08/29 21:55:49 INFO SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(w3sun); groups with view permissions: Set(); users with modify permissions: Set(w3sun); groups with modify permissions: Set()
18/08/29 21:55:49 INFO Utils: Successfully started service 'sparkDriver' on port 55679.
18/08/29 21:55:49 INFO SparkEnv: Registering MapOutputTracker
18/08/29 21:55:49 INFO SparkEnv: Registering BlockManagerMaster
18/08/29 21:55:49 INFO BlockManagerMasterEndpoint: Using org.apache.spark.storage.DefaultTopologyMapper for getting topology information
18/08/29 21:55:49 INFO BlockManagerMasterEndpoint: BlockManagerMasterEndpoint up
18/08/29 21:55:49 INFO DiskBlockManager: Created local directory at /private/var/folders/qh/676_k89s0bsg_cpj9g7042br0000gn/T/blockmgr-9ee6c6d8-0431-4fdb-bdba-91fd49f26600
18/08/29 21:55:49 INFO MemoryStore: MemoryStore started with capacity 912.3 MB
18/08/29 21:55:49 INFO SparkEnv: Registering OutputCommitCoordinator
18/08/29 21:55:50 INFO Utils: Successfully started service 'SparkUI' on port 4040.
18/08/29 21:55:50 INFO SparkUI: Bound SparkUI to 0.0.0.0, and started at http://10.10.10.35:4040
18/08/29 21:55:50 INFO Executor: Starting executor ID driver on host localhost
18/08/29 21:55:50 INFO Utils: Successfully started service 'org.apache.spark.network.netty.NettyBlockTransferService' on port 55680.
18/08/29 21:55:50 INFO NettyBlockTransferService: Server created on 10.10.10.35:55680
18/08/29 21:55:50 INFO BlockManager: Using org.apache.spark.storage.RandomBlockReplicationPolicy for block replication policy
18/08/29 21:55:50 INFO BlockManagerMaster: Registering BlockManager BlockManagerId(driver, 10.10.10.35, 55680, None)
18/08/29 21:55:50 INFO BlockManagerMasterEndpoint: Registering block manager 10.10.10.35:55680 with 912.3 MB RAM, BlockManagerId(driver, 10.60.110.35, 55680, None)
18/08/29 21:55:50 INFO BlockManagerMaster: Registered BlockManager BlockManagerId(driver, 10.10.10.35, 55680, None)
18/08/29 21:55:50 INFO BlockManager: Initialized BlockManager: BlockManagerId(driver, 10.10.10.35, 55680, None)
18/08/29 21:55:51 INFO SparkContext: Starting job: foreach at App.scala:26
18/08/29 21:55:51 INFO DAGScheduler: Got job 0 (foreach at App.scala:26) with 3 output partitions
18/08/29 21:55:51 INFO DAGScheduler: Final stage: ResultStage 0 (foreach at App.scala:26)
18/08/29 21:55:51 INFO DAGScheduler: Parents of final stage: List()
18/08/29 21:55:51 INFO DAGScheduler: Missing parents: List()
18/08/29 21:55:51 INFO DAGScheduler: Submitting ResultStage 0 (RedisKeysRDD[0] at RDD at RedisRDD.scala:189), which has no missing parents
18/08/29 21:55:51 INFO MemoryStore: Block broadcast_0 stored as values in memory (estimated size 1864.0 B, free 912.3 MB)
18/08/29 21:55:51 INFO MemoryStore: Block broadcast_0_piece0 stored as bytes in memory (estimated size 1304.0 B, free 912.3 MB)
18/08/29 21:55:51 INFO BlockManagerInfo: Added broadcast_0_piece0 in memory on 10.10.10.35:55680 (size: 1304.0 B, free: 912.3 MB)
18/08/29 21:55:51 INFO SparkContext: Created broadcast 0 from broadcast at DAGScheduler.scala:1006
18/08/29 21:55:51 INFO DAGScheduler: Submitting 3 missing tasks from ResultStage 0 (RedisKeysRDD[0] at RDD at RedisRDD.scala:189) (first 15 tasks are for partitions Vector(0, 1, 2))
18/08/29 21:55:51 INFO TaskSchedulerImpl: Adding task set 0.0 with 3 tasks
18/08/29 21:55:51 INFO TaskSetManager: Starting task 0.0 in stage 0.0 (TID 0, localhost, executor driver, partition 0, ANY, 5434 bytes)
18/08/29 21:55:51 INFO TaskSetManager: Starting task 1.0 in stage 0.0 (TID 1, localhost, executor driver, partition 1, ANY, 5434 bytes)
18/08/29 21:55:51 INFO TaskSetManager: Starting task 2.0 in stage 0.0 (TID 2, localhost, executor driver, partition 2, ANY, 5434 bytes)
18/08/29 21:55:51 INFO Executor: Running task 0.0 in stage 0.0 (TID 0)
18/08/29 21:55:51 INFO Executor: Running task 2.0 in stage 0.0 (TID 2)
18/08/29 21:55:51 INFO Executor: Running task 1.0 in stage 0.0 (TID 1)
1002010011540185822646
1002010011540185061520
1002010011540185971654
1002010011540185244056
18/08/29 21:55:56 INFO Executor: Finished task 1.0 in stage 0.0 (TID 1). 751 bytes result sent to driver
18/08/29 21:55:56 INFO Executor: Finished task 2.0 in stage 0.0 (TID 2). 751 bytes result sent to driver
18/08/29 21:55:56 INFO TaskSetManager: Finished task 1.0 in stage 0.0 (TID 1) in 4721 ms on localhost (executor driver) (1/3)
18/08/29 21:55:56 INFO TaskSetManager: Finished task 2.0 in stage 0.0 (TID 2) in 4721 ms on localhost (executor driver) (2/3)
18/08/29 21:55:57 INFO Executor: Finished task 0.0 in stage 0.0 (TID 0). 708 bytes result sent to driver
1002010011540185977218
18/08/29 21:55:57 INFO TaskSetManager: Finished task 0.0 in stage 0.0 (TID 0) in 6062 ms on localhost (executor driver) (3/3)
18/08/29 21:55:57 INFO TaskSchedulerImpl: Removed TaskSet 0.0, whose tasks have all completed, from pool
18/08/29 21:55:57 INFO DAGScheduler: ResultStage 0 (foreach at App.scala:26) finished in 6.079 s
18/08/29 21:55:57 INFO DAGScheduler: Job 0 finished: foreach at App.scala:26, took 6.331050 s
18/08/29 21:55:57 INFO SparkUI: Stopped Spark web UI at http://10.10.10.35:4040
18/08/29 21:55:57 INFO MapOutputTrackerMasterEndpoint: MapOutputTrackerMasterEndpoint stopped!
18/08/29 21:55:57 INFO MemoryStore: MemoryStore cleared
18/08/29 21:55:57 INFO BlockManager: BlockManager stopped
18/08/29 21:55:57 INFO BlockManagerMaster: BlockManagerMaster stopped
18/08/29 21:55:57 INFO OutputCommitCoordinator$OutputCommitCoordinatorEndpoint: OutputCommitCoordinator stopped!
18/08/29 21:55:57 INFO SparkContext: Successfully stopped SparkContext
18/08/29 21:55:57 INFO ShutdownHookManager: Shutdown hook called
18/08/29 21:55:57 INFO ShutdownHookManager: Deleting directory /private/var/folders/qh/676_k89s0bsg_cpj9g7042br0000gn/T/spark-1c43f998-49d6-45aa-9f8f-954ceed06b6e
Process finished with exit code 0
........ Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties 18/08/29 21:55:48 INFO SparkContext: Running Spark version 2.2.0 18/08/29 21:55:48 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable 18/08/29 21:55:49 INFO SparkContext: Submitted application: App$ 18/08/29 21:55:49 INFO SecurityManager: Changing view acls to: w3sun 18/08/29 21:55:49 INFO SecurityManager: Changing modify acls to: w3sun 18/08/29 21:55:49 INFO SecurityManager: Changing view acls groups to: 18/08/29 21:55:49 INFO SecurityManager: Changing modify acls groups to: 18/08/29 21:55:49 INFO SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(w3sun); groups with view permissions: Set(); users with modify permissions: Set(w3sun); groups with modify permissions: Set() 18/08/29 21:55:49 INFO Utils: Successfully started service 'sparkDriver' on port 55679. 18/08/29 21:55:49 INFO SparkEnv: Registering MapOutputTracker 18/08/29 21:55:49 INFO SparkEnv: Registering BlockManagerMaster 18/08/29 21:55:49 INFO BlockManagerMasterEndpoint: Using org.apache.spark.storage.DefaultTopologyMapper for getting topology information 18/08/29 21:55:49 INFO BlockManagerMasterEndpoint: BlockManagerMasterEndpoint up 18/08/29 21:55:49 INFO DiskBlockManager: Created local directory at /private/var/folders/qh/676_k89s0bsg_cpj9g7042br0000gn/T/blockmgr-9ee6c6d8-0431-4fdb-bdba-91fd49f26600 18/08/29 21:55:49 INFO MemoryStore: MemoryStore started with capacity 912.3 MB 18/08/29 21:55:49 INFO SparkEnv: Registering OutputCommitCoordinator 18/08/29 21:55:50 INFO Utils: Successfully started service 'SparkUI' on port 4040. 18/08/29 21:55:50 INFO SparkUI: Bound SparkUI to 0.0.0.0, and started at http://10.10.10.35:4040 18/08/29 21:55:50 INFO Executor: Starting executor ID driver on host localhost 18/08/29 21:55:50 INFO Utils: Successfully started service 'org.apache.spark.network.netty.NettyBlockTransferService' on port 55680. 18/08/29 21:55:50 INFO NettyBlockTransferService: Server created on 10.10.10.35:55680 18/08/29 21:55:50 INFO BlockManager: Using org.apache.spark.storage.RandomBlockReplicationPolicy for block replication policy 18/08/29 21:55:50 INFO BlockManagerMaster: Registering BlockManager BlockManagerId(driver, 10.10.10.35, 55680, None) 18/08/29 21:55:50 INFO BlockManagerMasterEndpoint: Registering block manager 10.10.10.35:55680 with 912.3 MB RAM, BlockManagerId(driver, 10.60.110.35, 55680, None) 18/08/29 21:55:50 INFO BlockManagerMaster: Registered BlockManager BlockManagerId(driver, 10.10.10.35, 55680, None) 18/08/29 21:55:50 INFO BlockManager: Initialized BlockManager: BlockManagerId(driver, 10.10.10.35, 55680, None) 18/08/29 21:55:51 INFO SparkContext: Starting job: foreach at App.scala:26 18/08/29 21:55:51 INFO DAGScheduler: Got job 0 (foreach at App.scala:26) with 3 output partitions 18/08/29 21:55:51 INFO DAGScheduler: Final stage: ResultStage 0 (foreach at App.scala:26) 18/08/29 21:55:51 INFO DAGScheduler: Parents of final stage: List() 18/08/29 21:55:51 INFO DAGScheduler: Missing parents: List() 18/08/29 21:55:51 INFO DAGScheduler: Submitting ResultStage 0 (RedisKeysRDD[0] at RDD at RedisRDD.scala:189), which has no missing parents 18/08/29 21:55:51 INFO MemoryStore: Block broadcast_0 stored as values in memory (estimated size 1864.0 B, free 912.3 MB) 18/08/29 21:55:51 INFO MemoryStore: Block broadcast_0_piece0 stored as bytes in memory (estimated size 1304.0 B, free 912.3 MB) 18/08/29 21:55:51 INFO BlockManagerInfo: Added broadcast_0_piece0 in memory on 10.10.10.35:55680 (size: 1304.0 B, free: 912.3 MB) 18/08/29 21:55:51 INFO SparkContext: Created broadcast 0 from broadcast at DAGScheduler.scala:1006 18/08/29 21:55:51 INFO DAGScheduler: Submitting 3 missing tasks from ResultStage 0 (RedisKeysRDD[0] at RDD at RedisRDD.scala:189) (first 15 tasks are for partitions Vector(0, 1, 2)) 18/08/29 21:55:51 INFO TaskSchedulerImpl: Adding task set 0.0 with 3 tasks 18/08/29 21:55:51 INFO TaskSetManager: Starting task 0.0 in stage 0.0 (TID 0, localhost, executor driver, partition 0, ANY, 5434 bytes) 18/08/29 21:55:51 INFO TaskSetManager: Starting task 1.0 in stage 0.0 (TID 1, localhost, executor driver, partition 1, ANY, 5434 bytes) 18/08/29 21:55:51 INFO TaskSetManager: Starting task 2.0 in stage 0.0 (TID 2, localhost, executor driver, partition 2, ANY, 5434 bytes) 18/08/29 21:55:51 INFO Executor: Running task 0.0 in stage 0.0 (TID 0) 18/08/29 21:55:51 INFO Executor: Running task 2.0 in stage 0.0 (TID 2) 18/08/29 21:55:51 INFO Executor: Running task 1.0 in stage 0.0 (TID 1) 1002010011540185822646 1002010011540185061520 1002010011540185971654 1002010011540185244056 18/08/29 21:55:56 INFO Executor: Finished task 1.0 in stage 0.0 (TID 1). 751 bytes result sent to driver 18/08/29 21:55:56 INFO Executor: Finished task 2.0 in stage 0.0 (TID 2). 751 bytes result sent to driver 18/08/29 21:55:56 INFO TaskSetManager: Finished task 1.0 in stage 0.0 (TID 1) in 4721 ms on localhost (executor driver) (1/3) 18/08/29 21:55:56 INFO TaskSetManager: Finished task 2.0 in stage 0.0 (TID 2) in 4721 ms on localhost (executor driver) (2/3) 18/08/29 21:55:57 INFO Executor: Finished task 0.0 in stage 0.0 (TID 0). 708 bytes result sent to driver 1002010011540185977218 18/08/29 21:55:57 INFO TaskSetManager: Finished task 0.0 in stage 0.0 (TID 0) in 6062 ms on localhost (executor driver) (3/3) 18/08/29 21:55:57 INFO TaskSchedulerImpl: Removed TaskSet 0.0, whose tasks have all completed, from pool 18/08/29 21:55:57 INFO DAGScheduler: ResultStage 0 (foreach at App.scala:26) finished in 6.079 s 18/08/29 21:55:57 INFO DAGScheduler: Job 0 finished: foreach at App.scala:26, took 6.331050 s 18/08/29 21:55:57 INFO SparkUI: Stopped Spark web UI at http://10.10.10.35:4040 18/08/29 21:55:57 INFO MapOutputTrackerMasterEndpoint: MapOutputTrackerMasterEndpoint stopped! 18/08/29 21:55:57 INFO MemoryStore: MemoryStore cleared 18/08/29 21:55:57 INFO BlockManager: BlockManager stopped 18/08/29 21:55:57 INFO BlockManagerMaster: BlockManagerMaster stopped 18/08/29 21:55:57 INFO OutputCommitCoordinator$OutputCommitCoordinatorEndpoint: OutputCommitCoordinator stopped! 18/08/29 21:55:57 INFO SparkContext: Successfully stopped SparkContext 18/08/29 21:55:57 INFO ShutdownHookManager: Shutdown hook called 18/08/29 21:55:57 INFO ShutdownHookManager: Deleting directory /private/var/folders/qh/676_k89s0bsg_cpj9g7042br0000gn/T/spark-1c43f998-49d6-45aa-9f8f-954ceed06b6e Process finished with exit code 0
从输出信息可以看出匹配到了4条数据,并输出了他们各自的Key。
Tips
其实自己在Debug调试的时候可能会发现打印信息太多了,为了减少无用日志,可以手动调整日志的输出级别:
object App {
Logger.getLogger("org.apache.spark").setLevel(Level.WARN)
def main(args: Array[String]): Unit = {
......
}
}
object App {
Logger.getLogger("org.apache.spark").setLevel(Level.WARN)
def main(args: Array[String]): Unit = {
......
}
}
object App { Logger.getLogger("org.apache.spark").setLevel(Level.WARN) def main(args: Array[String]): Unit = { ...... } }
言归正传,从fromRedisKeyPattern源码中可以看到:
/**
* @param keyPattern a key pattern to match, or a single key
* @param partitionNum number of partitions
* @return RedisKeysRDD of simple Keys stored in redis server
*/
def fromRedisKeyPattern(keyPattern: String = "*",
partitionNum: Int = 3)
(implicit redisConfig: RedisConfig = new RedisConfig(new RedisEndpoint(sc.getConf))):
RedisKeysRDD = {
new RedisKeysRDD(sc, redisConfig, keyPattern, partitionNum, null)
}
/**
* @param keyPattern a key pattern to match, or a single key
* @param partitionNum number of partitions
* @return RedisKeysRDD of simple Keys stored in redis server
*/
def fromRedisKeyPattern(keyPattern: String = "*",
partitionNum: Int = 3)
(implicit redisConfig: RedisConfig = new RedisConfig(new RedisEndpoint(sc.getConf))):
RedisKeysRDD = {
new RedisKeysRDD(sc, redisConfig, keyPattern, partitionNum, null)
}
/** * @param keyPattern a key pattern to match, or a single key * @param partitionNum number of partitions * @return RedisKeysRDD of simple Keys stored in redis server */ def fromRedisKeyPattern(keyPattern: String = "*", partitionNum: Int = 3) (implicit redisConfig: RedisConfig = new RedisConfig(new RedisEndpoint(sc.getConf))): RedisKeysRDD = { new RedisKeysRDD(sc, redisConfig, keyPattern, partitionNum, null) }
默认的匹配模式是redis中对应的keys*操作,默认的分区数量为3,这些参数都可以灵活配置。当然,Spark-Redis支持的功能远远不止这些,使用的时候可以继续探索。
参考文档
转载请注明:雪后西塘 » Spark优雅操作Redis