最新消息:天气越来越冷,记得加一件厚衣裳

Listener EventLoggingListener threw an exception java.io.IOException

Spark w3sun 5243浏览 0评论

问题背景

最近在做实时流计算,Coding过程中需要对写入到Redis中的数据进行Dump,经过综合考量决定采用Spark Core进行数据操作,并将结果数据Dump到HDFS上。Spark程序操作HDFS的工具类中,手动获取了FileSystem,执行完HDFS相关操作后对FileSystem进行close:

object HdfsHandler {

  @transient val logger = LoggerFactory.getLogger(this.getClass)
  val config: Configuration = new Configuration()
  /**
    * Get the storage path for current batch data.
    *
    * @param dateTime DateTime
    * @return storage path in String
    */
  def getCurrentPath(dateTime: DateTime): String = {
    var fs: FileSystem = null
    try {
      fs = FileSystem.get(config)
    } catch {
      case e: Exception => {
        logger.error("Get fs:FileSystem error!")
        e.printStackTrace()
      }
    }
    val dateDir: String = getDateDir(dateTime)
    val hour = dateTime.toString(AppConfig.HDFS_DIR_DATE_HOUR_FORMATTER)
    val pathStr = dateDir
      .concat(File.separator)
      .concat(hour)
    try {
      delHdfsFile(fs, pathStr)
    } catch {
      case e: Exception => {
        logger.error("Delete files from the directory to which the data will be storaged to failed!")
        e.printStackTrace()
      }
    }finally{
        fs.close()
   }
    pathStr
  }
.......
  /**
    * Delete file on HDFS.
    *
    * @param fs   FileSystem
    * @param elem string path
    */
  private def delHdfsFile(fs: FileSystem, elem: String): Unit = {
    try {
      val path = new Path(elem)
      if (fs.exists(path)) {
        fs.delete(path, true)
      }
    } catch {
      case e: Exception => {
        e.printStackTrace()
        new Throwable("Delete HDFS files Error! codeLocation: HdfsHandler.delHdfsFile")
      }
    }
  }
}

本来是标准资源释放流程,但是却抛出了一个大大的异常:

18/07/10 15:42:18 ERROR LiveListenerBus: Listener EventLoggingListener threw an exception
java.io.IOException: Filesystem closed
    at org.apache.hadoop.hdfs.DFSClient.checkOpen(DFSClient.java:798)
    at org.apache.hadoop.hdfs.DFSOutputStream.flushOrSync(DFSOutputStream.java:1985)
    at org.apache.hadoop.hdfs.DFSOutputStream.hflush(DFSOutputStream.java:1946)
    at org.apache.hadoop.fs.FSDataOutputStream.hflush(FSDataOutputStream.java:130)
    at org.apache.spark.scheduler.EventLoggingListener$$anonfun$logEvent$3.apply(EventLoggingListener.scala:140)
    at org.apache.spark.scheduler.EventLoggingListener$$anonfun$logEvent$3.apply(EventLoggingListener.scala:140)
    at scala.Option.foreach(Option.scala:257)
    at org.apache.spark.scheduler.EventLoggingListener.logEvent(EventLoggingListener.scala:140)
    at org.apache.spark.scheduler.EventLoggingListener.onJobStart(EventLoggingListener.scala:163)
    at org.apache.spark.scheduler.SparkListenerBus$class.doPostEvent(SparkListenerBus.scala:37)
    at org.apache.spark.scheduler.LiveListenerBus.doPostEvent(LiveListenerBus.scala:36)
    at org.apache.spark.scheduler.LiveListenerBus.doPostEvent(LiveListenerBus.scala:36)
    at org.apache.spark.util.ListenerBus$class.postToAll(ListenerBus.scala:63)
    at org.apache.spark.scheduler.LiveListenerBus.postToAll(LiveListenerBus.scala:36)
    at org.apache.spark.scheduler.LiveListenerBus$$anon$1$$anonfun$run$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(LiveListenerBus.scala:94)
    at org.apache.spark.scheduler.LiveListenerBus$$anon$1$$anonfun$run$1$$anonfun$apply$mcV$sp$1.apply(LiveListenerBus.scala:79)
    at org.apache.spark.scheduler.LiveListenerBus$$anon$1$$anonfun$run$1$$anonfun$apply$mcV$sp$1.apply(LiveListenerBus.scala:79)
    at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58)
    at org.apache.spark.scheduler.LiveListenerBus$$anon$1$$anonfun$run$1.apply$mcV$sp(LiveListenerBus.scala:78)
    at org.apache.spark.util.Utils$.tryOrStopSparkContext(Utils.scala:1245)
    at org.apache.spark.scheduler.LiveListenerBus$$anon$1.run(LiveListenerBus.scala:77)

解决方法

由于Spark程序是以Spark-On-Yarn模式运行,Spark会把job日志是写在HDFS上。他们使用的是同一个FileSystem,从而导致异常。将fs.close()去掉就会恢复正常。

转载请注明:雪后西塘 » Listener EventLoggingListener threw an exception java.io.IOException

发表我的评论
取消评论

表情

Hi,您需要填写昵称和邮箱!

  • 昵称 (必填)
  • 邮箱 (必填)
  • 网址