最新消息:天气越来越冷,记得加一件厚衣裳

Spark优雅操作Redis

Spark w3sun 2387浏览 0评论

前言

随着Spark在数据处理领域越来越火,应用范围也不断扩大。相应的Source和Sink支持也越来越多,因为项目中需要从Redis中读取数据并进行操作,所以需要一种优雅的方式来操作Redis。一番搜索以后发现,除了自定义Source以外还可以通过redislabs.com出品的Connector进行操作。Redis Labs是一家位于加利福尼亚州山景城的私人计算机软件公司。它提供了一个数据库管理系统,作为开源软件或使用云计算的服务作为“NoSQL”销售。其实就是Redis的商业化公司,毕竟Spark现在这么火,许多公司想搭上顺风车。

Maven依赖

废话不多说,可以添加依赖搞起。详细的pom文件如下:

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>com.w3sun.data</groupId>
    <artifactId>spark-redis</artifactId>
    <version>1.0-SNAPSHOT</version>

    <properties>
        <maven.compiler.source>1.8</maven.compiler.source>
        <maven.compiler.target>1.8</maven.compiler.target>
        <spark.version>2.2.0</spark.version>
        <type.safe.version>1.3.0</type.safe.version>
        <joda.time.version>2.9.4</joda.time.version>
        <fast.json.version>1.2.10</fast.json.version>
        <spark.redis.version>0.3.2</spark.redis.version>
        <encoding>UTF-8</encoding>
    </properties>


    <repositories>
        <!-- list of RedisLabs repositories -->
        <repository>
            <id>SparkPackagesRepo</id>
            <url>http://dl.bintray.com/spark-packages/maven</url>
        </repository>
    </repositories>

    <dependencies>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-core_2.11</artifactId>
            <version>${spark.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-streaming_2.11</artifactId>
            <version>${spark.version}</version>
        </dependency>

        <!--Configuration for configuration reader-->
        <dependency>
            <groupId>com.typesafe</groupId>
            <artifactId>config</artifactId>
            <version>${type.safe.version}</version>
        </dependency>

        <!--Configuration for Joda time to generate timestamp-->
        <dependency>
            <groupId>joda-time</groupId>
            <artifactId>joda-time</artifactId>
            <version>${joda.time.version}</version>
        </dependency>

        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>fastjson</artifactId>
            <version>${fast.json.version}</version>
        </dependency>

        <!--ConfigConfiguration for Spark && Redis RDD generation transformation -->
        <dependency>
            <groupId>RedisLabs</groupId>
            <artifactId>spark-redis</artifactId>
            <version>${spark.redis.version}</version>
        </dependency>
    </dependencies>

    <build>
        <pluginManagement>
            <plugins>
                <plugin>
                    <groupId>net.alchim31.maven</groupId>
                    <artifactId>scala-maven-plugin</artifactId>
                    <version>3.2.2</version>
                </plugin>
                <plugin>
                    <groupId>org.apache.maven.plugins</groupId>
                    <artifactId>maven-compiler-plugin</artifactId>
                    <version>3.5.1</version>
                </plugin>
            </plugins>
        </pluginManagement>
        <plugins>
            <plugin>
                <groupId>net.alchim31.maven</groupId>
                <artifactId>scala-maven-plugin</artifactId>
                <executions>
                    <execution>
                        <id>scala-compile-first</id>
                        <phase>process-resources</phase>
                        <goals>
                            <goal>add-source</goal>
                            <goal>compile</goal>
                        </goals>
                    </execution>
                    <execution>
                        <id>scala-test-compile</id>
                        <phase>process-test-resources</phase>
                        <goals>
                            <goal>testCompile</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>

            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <executions>
                    <execution>
                        <phase>compile</phase>
                        <goals>
                            <goal>compile</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>

            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-shade-plugin</artifactId>
                <version>2.4.3</version>
                <executions>
                    <execution>
                        <phase>package</phase>
                        <goals>
                            <goal>shade</goal>
                        </goals>
                        <configuration>
                            <filters>
                                <filter>
                                    <artifact>*:*</artifact>
                                    <excludes>
                                        <exclude>META-INF/*.SF</exclude>
                                        <exclude>META-INF/*.DSA</exclude>
                                        <exclude>META-INF/*.RSA</exclude>
                                    </excludes>
                                </filter>
                            </filters>
                        </configuration>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>
</project>

编码

依赖添加完成以后就可以进行Demo测试了。详细说明可以参考:

将Redis配置写入文件中,并命名为application.conf方便后续typesafe进行解析:

redis.host = 10.10.11.8
redis.db = 0
redis.port = 6399
redis.max.total = 500
redis.pool.maxidle = 500
redis.pool.testonborrow = true
redis.pool.testonreturn = true
redis.pool.maxwaitmillis = 1000
redis.protocol.timeout = 48000

其实Java和Scala都一样,本文采用Scala进行测试:

import java.util

import com.redislabs.provider.redis.rdd.RedisKeysRDD
import com.typesafe.config.{ConfigFactory, ConfigValue}
import org.apache.spark.{SparkConf, SparkContext}

/**
  * @author: w3sun
  * @date: 2018/08/29 11:51
  * @description:
  *
  */
object App {
  def main(args: Array[String]): Unit = {
    val sparkConf = new SparkConf()
      .setMaster("local[*]")
      .setAppName(s"${this.getClass.getSimpleName}")

    //设置Redis服务器信息,导入操作Redis隐式转换
    initRedis(sparkConf)
    import com.redislabs.provider.redis._
    val sparkContext = new SparkContext(sparkConf)
    //根据指定pattern获取所有符合条件的Key所对应的RDD
    val keysRDD: RedisKeysRDD = sparkContext.fromRedisKeyPattern("1002010011540185*", 4)
    keysRDD.foreach(println)
    sparkContext.stop()
  }

  private def initRedis(sparkConf: SparkConf): Unit = {
    import scala.collection.JavaConverters._
    ConfigFactory.load()
      .entrySet().asScala
      .foreach {
        entry: util.Map.Entry[String, ConfigValue] =>
          sparkConf.set(entry.getKey, entry.getValue.unwrapped().toString)
      }
  }
}

输出信息如下:

........
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
18/08/29 21:55:48 INFO SparkContext: Running Spark version 2.2.0
18/08/29 21:55:48 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
18/08/29 21:55:49 INFO SparkContext: Submitted application: App$
18/08/29 21:55:49 INFO SecurityManager: Changing view acls to: w3sun
18/08/29 21:55:49 INFO SecurityManager: Changing modify acls to: w3sun
18/08/29 21:55:49 INFO SecurityManager: Changing view acls groups to: 
18/08/29 21:55:49 INFO SecurityManager: Changing modify acls groups to: 
18/08/29 21:55:49 INFO SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users  with view permissions: Set(w3sun); groups with view permissions: Set(); users  with modify permissions: Set(w3sun); groups with modify permissions: Set()
18/08/29 21:55:49 INFO Utils: Successfully started service 'sparkDriver' on port 55679.
18/08/29 21:55:49 INFO SparkEnv: Registering MapOutputTracker
18/08/29 21:55:49 INFO SparkEnv: Registering BlockManagerMaster
18/08/29 21:55:49 INFO BlockManagerMasterEndpoint: Using org.apache.spark.storage.DefaultTopologyMapper for getting topology information
18/08/29 21:55:49 INFO BlockManagerMasterEndpoint: BlockManagerMasterEndpoint up
18/08/29 21:55:49 INFO DiskBlockManager: Created local directory at /private/var/folders/qh/676_k89s0bsg_cpj9g7042br0000gn/T/blockmgr-9ee6c6d8-0431-4fdb-bdba-91fd49f26600
18/08/29 21:55:49 INFO MemoryStore: MemoryStore started with capacity 912.3 MB
18/08/29 21:55:49 INFO SparkEnv: Registering OutputCommitCoordinator
18/08/29 21:55:50 INFO Utils: Successfully started service 'SparkUI' on port 4040.
18/08/29 21:55:50 INFO SparkUI: Bound SparkUI to 0.0.0.0, and started at http://10.10.10.35:4040
18/08/29 21:55:50 INFO Executor: Starting executor ID driver on host localhost
18/08/29 21:55:50 INFO Utils: Successfully started service 'org.apache.spark.network.netty.NettyBlockTransferService' on port 55680.
18/08/29 21:55:50 INFO NettyBlockTransferService: Server created on 10.10.10.35:55680
18/08/29 21:55:50 INFO BlockManager: Using org.apache.spark.storage.RandomBlockReplicationPolicy for block replication policy
18/08/29 21:55:50 INFO BlockManagerMaster: Registering BlockManager BlockManagerId(driver, 10.10.10.35, 55680, None)
18/08/29 21:55:50 INFO BlockManagerMasterEndpoint: Registering block manager 10.10.10.35:55680 with 912.3 MB RAM, BlockManagerId(driver, 10.60.110.35, 55680, None)
18/08/29 21:55:50 INFO BlockManagerMaster: Registered BlockManager BlockManagerId(driver, 10.10.10.35, 55680, None)
18/08/29 21:55:50 INFO BlockManager: Initialized BlockManager: BlockManagerId(driver, 10.10.10.35, 55680, None)
18/08/29 21:55:51 INFO SparkContext: Starting job: foreach at App.scala:26
18/08/29 21:55:51 INFO DAGScheduler: Got job 0 (foreach at App.scala:26) with 3 output partitions
18/08/29 21:55:51 INFO DAGScheduler: Final stage: ResultStage 0 (foreach at App.scala:26)
18/08/29 21:55:51 INFO DAGScheduler: Parents of final stage: List()
18/08/29 21:55:51 INFO DAGScheduler: Missing parents: List()
18/08/29 21:55:51 INFO DAGScheduler: Submitting ResultStage 0 (RedisKeysRDD[0] at RDD at RedisRDD.scala:189), which has no missing parents
18/08/29 21:55:51 INFO MemoryStore: Block broadcast_0 stored as values in memory (estimated size 1864.0 B, free 912.3 MB)
18/08/29 21:55:51 INFO MemoryStore: Block broadcast_0_piece0 stored as bytes in memory (estimated size 1304.0 B, free 912.3 MB)
18/08/29 21:55:51 INFO BlockManagerInfo: Added broadcast_0_piece0 in memory on 10.10.10.35:55680 (size: 1304.0 B, free: 912.3 MB)
18/08/29 21:55:51 INFO SparkContext: Created broadcast 0 from broadcast at DAGScheduler.scala:1006
18/08/29 21:55:51 INFO DAGScheduler: Submitting 3 missing tasks from ResultStage 0 (RedisKeysRDD[0] at RDD at RedisRDD.scala:189) (first 15 tasks are for partitions Vector(0, 1, 2))
18/08/29 21:55:51 INFO TaskSchedulerImpl: Adding task set 0.0 with 3 tasks
18/08/29 21:55:51 INFO TaskSetManager: Starting task 0.0 in stage 0.0 (TID 0, localhost, executor driver, partition 0, ANY, 5434 bytes)
18/08/29 21:55:51 INFO TaskSetManager: Starting task 1.0 in stage 0.0 (TID 1, localhost, executor driver, partition 1, ANY, 5434 bytes)
18/08/29 21:55:51 INFO TaskSetManager: Starting task 2.0 in stage 0.0 (TID 2, localhost, executor driver, partition 2, ANY, 5434 bytes)
18/08/29 21:55:51 INFO Executor: Running task 0.0 in stage 0.0 (TID 0)
18/08/29 21:55:51 INFO Executor: Running task 2.0 in stage 0.0 (TID 2)
18/08/29 21:55:51 INFO Executor: Running task 1.0 in stage 0.0 (TID 1)
1002010011540185822646
1002010011540185061520
1002010011540185971654
1002010011540185244056
18/08/29 21:55:56 INFO Executor: Finished task 1.0 in stage 0.0 (TID 1). 751 bytes result sent to driver
18/08/29 21:55:56 INFO Executor: Finished task 2.0 in stage 0.0 (TID 2). 751 bytes result sent to driver
18/08/29 21:55:56 INFO TaskSetManager: Finished task 1.0 in stage 0.0 (TID 1) in 4721 ms on localhost (executor driver) (1/3)
18/08/29 21:55:56 INFO TaskSetManager: Finished task 2.0 in stage 0.0 (TID 2) in 4721 ms on localhost (executor driver) (2/3)
18/08/29 21:55:57 INFO Executor: Finished task 0.0 in stage 0.0 (TID 0). 708 bytes result sent to driver
1002010011540185977218
18/08/29 21:55:57 INFO TaskSetManager: Finished task 0.0 in stage 0.0 (TID 0) in 6062 ms on localhost (executor driver) (3/3)
18/08/29 21:55:57 INFO TaskSchedulerImpl: Removed TaskSet 0.0, whose tasks have all completed, from pool 
18/08/29 21:55:57 INFO DAGScheduler: ResultStage 0 (foreach at App.scala:26) finished in 6.079 s
18/08/29 21:55:57 INFO DAGScheduler: Job 0 finished: foreach at App.scala:26, took 6.331050 s
18/08/29 21:55:57 INFO SparkUI: Stopped Spark web UI at http://10.10.10.35:4040
18/08/29 21:55:57 INFO MapOutputTrackerMasterEndpoint: MapOutputTrackerMasterEndpoint stopped!
18/08/29 21:55:57 INFO MemoryStore: MemoryStore cleared
18/08/29 21:55:57 INFO BlockManager: BlockManager stopped
18/08/29 21:55:57 INFO BlockManagerMaster: BlockManagerMaster stopped
18/08/29 21:55:57 INFO OutputCommitCoordinator$OutputCommitCoordinatorEndpoint: OutputCommitCoordinator stopped!
18/08/29 21:55:57 INFO SparkContext: Successfully stopped SparkContext
18/08/29 21:55:57 INFO ShutdownHookManager: Shutdown hook called
18/08/29 21:55:57 INFO ShutdownHookManager: Deleting directory /private/var/folders/qh/676_k89s0bsg_cpj9g7042br0000gn/T/spark-1c43f998-49d6-45aa-9f8f-954ceed06b6e

Process finished with exit code 0

从输出信息可以看出匹配到了4条数据,并输出了他们各自的Key。

Tips

其实自己在Debug调试的时候可能会发现打印信息太多了,为了减少无用日志,可以手动调整日志的输出级别:

object App {
  Logger.getLogger("org.apache.spark").setLevel(Level.WARN)
  def main(args: Array[String]): Unit = {
    ......
  }
}

言归正传,从fromRedisKeyPattern源码中可以看到:

/**
    * @param keyPattern a key pattern to match, or a single key
    * @param partitionNum number of partitions
    * @return RedisKeysRDD of simple Keys stored in redis server
    */
  def fromRedisKeyPattern(keyPattern: String = "*",
                          partitionNum: Int = 3)
                         (implicit redisConfig: RedisConfig = new RedisConfig(new RedisEndpoint(sc.getConf))):
  RedisKeysRDD = {
    new RedisKeysRDD(sc, redisConfig, keyPattern, partitionNum, null)
  }

默认的匹配模式是redis中对应的keys*操作,默认的分区数量为3,这些参数都可以灵活配置。当然,Spark-Redis支持的功能远远不止这些,使用的时候可以继续探索。

参考文档

转载请注明:雪后西塘 » Spark优雅操作Redis

发表我的评论
取消评论

表情

Hi,您需要填写昵称和邮箱!

  • 昵称 (必填)
  • 邮箱 (必填)
  • 网址