最新消息:天气越来越冷,记得加一件厚衣裳

Listener EventLoggingListener threw an exception java.io.IOException

Spark w3sun 5745浏览 0评论

问题背景

最近在做实时流计算,Coding过程中需要对写入到Redis中的数据进行Dump,经过综合考量决定采用Spark Core进行数据操作,并将结果数据Dump到HDFS上。Spark程序操作HDFS的工具类中,手动获取了FileSystem,执行完HDFS相关操作后对FileSystem进行close:

Plain text
Copy to clipboard
Open code in new window
EnlighterJS 3 Syntax Highlighter
object HdfsHandler {
@transient val logger = LoggerFactory.getLogger(this.getClass)
val config: Configuration = new Configuration()
/**
* Get the storage path for current batch data.
*
* @param dateTime DateTime
* @return storage path in String
*/
def getCurrentPath(dateTime: DateTime): String = {
var fs: FileSystem = null
try {
fs = FileSystem.get(config)
} catch {
case e: Exception => {
logger.error("Get fs:FileSystem error!")
e.printStackTrace()
}
}
val dateDir: String = getDateDir(dateTime)
val hour = dateTime.toString(AppConfig.HDFS_DIR_DATE_HOUR_FORMATTER)
val pathStr = dateDir
.concat(File.separator)
.concat(hour)
try {
delHdfsFile(fs, pathStr)
} catch {
case e: Exception => {
logger.error("Delete files from the directory to which the data will be storaged to failed!")
e.printStackTrace()
}
}finally{
fs.close()
}
pathStr
}
.......
/**
* Delete file on HDFS.
*
* @param fs FileSystem
* @param elem string path
*/
private def delHdfsFile(fs: FileSystem, elem: String): Unit = {
try {
val path = new Path(elem)
if (fs.exists(path)) {
fs.delete(path, true)
}
} catch {
case e: Exception => {
e.printStackTrace()
new Throwable("Delete HDFS files Error! codeLocation: HdfsHandler.delHdfsFile")
}
}
}
}
object HdfsHandler { @transient val logger = LoggerFactory.getLogger(this.getClass) val config: Configuration = new Configuration() /** * Get the storage path for current batch data. * * @param dateTime DateTime * @return storage path in String */ def getCurrentPath(dateTime: DateTime): String = { var fs: FileSystem = null try { fs = FileSystem.get(config) } catch { case e: Exception => { logger.error("Get fs:FileSystem error!") e.printStackTrace() } } val dateDir: String = getDateDir(dateTime) val hour = dateTime.toString(AppConfig.HDFS_DIR_DATE_HOUR_FORMATTER) val pathStr = dateDir .concat(File.separator) .concat(hour) try { delHdfsFile(fs, pathStr) } catch { case e: Exception => { logger.error("Delete files from the directory to which the data will be storaged to failed!") e.printStackTrace() } }finally{ fs.close() } pathStr } ....... /** * Delete file on HDFS. * * @param fs FileSystem * @param elem string path */ private def delHdfsFile(fs: FileSystem, elem: String): Unit = { try { val path = new Path(elem) if (fs.exists(path)) { fs.delete(path, true) } } catch { case e: Exception => { e.printStackTrace() new Throwable("Delete HDFS files Error! codeLocation: HdfsHandler.delHdfsFile") } } } }
object HdfsHandler {

  @transient val logger = LoggerFactory.getLogger(this.getClass)
  val config: Configuration = new Configuration()
  /**
    * Get the storage path for current batch data.
    *
    * @param dateTime DateTime
    * @return storage path in String
    */
  def getCurrentPath(dateTime: DateTime): String = {
    var fs: FileSystem = null
    try {
      fs = FileSystem.get(config)
    } catch {
      case e: Exception => {
        logger.error("Get fs:FileSystem error!")
        e.printStackTrace()
      }
    }
    val dateDir: String = getDateDir(dateTime)
    val hour = dateTime.toString(AppConfig.HDFS_DIR_DATE_HOUR_FORMATTER)
    val pathStr = dateDir
      .concat(File.separator)
      .concat(hour)
    try {
      delHdfsFile(fs, pathStr)
    } catch {
      case e: Exception => {
        logger.error("Delete files from the directory to which the data will be storaged to failed!")
        e.printStackTrace()
      }
    }finally{
        fs.close()
   }
    pathStr
  }
.......
  /**
    * Delete file on HDFS.
    *
    * @param fs   FileSystem
    * @param elem string path
    */
  private def delHdfsFile(fs: FileSystem, elem: String): Unit = {
    try {
      val path = new Path(elem)
      if (fs.exists(path)) {
        fs.delete(path, true)
      }
    } catch {
      case e: Exception => {
        e.printStackTrace()
        new Throwable("Delete HDFS files Error! codeLocation: HdfsHandler.delHdfsFile")
      }
    }
  }
}

本来是标准资源释放流程,但是却抛出了一个大大的异常:

Plain text
Copy to clipboard
Open code in new window
EnlighterJS 3 Syntax Highlighter
18/07/10 15:42:18 ERROR LiveListenerBus: Listener EventLoggingListener threw an exception
java.io.IOException: Filesystem closed
at org.apache.hadoop.hdfs.DFSClient.checkOpen(DFSClient.java:798)
at org.apache.hadoop.hdfs.DFSOutputStream.flushOrSync(DFSOutputStream.java:1985)
at org.apache.hadoop.hdfs.DFSOutputStream.hflush(DFSOutputStream.java:1946)
at org.apache.hadoop.fs.FSDataOutputStream.hflush(FSDataOutputStream.java:130)
at org.apache.spark.scheduler.EventLoggingListener$$anonfun$logEvent$3.apply(EventLoggingListener.scala:140)
at org.apache.spark.scheduler.EventLoggingListener$$anonfun$logEvent$3.apply(EventLoggingListener.scala:140)
at scala.Option.foreach(Option.scala:257)
at org.apache.spark.scheduler.EventLoggingListener.logEvent(EventLoggingListener.scala:140)
at org.apache.spark.scheduler.EventLoggingListener.onJobStart(EventLoggingListener.scala:163)
at org.apache.spark.scheduler.SparkListenerBus$class.doPostEvent(SparkListenerBus.scala:37)
at org.apache.spark.scheduler.LiveListenerBus.doPostEvent(LiveListenerBus.scala:36)
at org.apache.spark.scheduler.LiveListenerBus.doPostEvent(LiveListenerBus.scala:36)
at org.apache.spark.util.ListenerBus$class.postToAll(ListenerBus.scala:63)
at org.apache.spark.scheduler.LiveListenerBus.postToAll(LiveListenerBus.scala:36)
at org.apache.spark.scheduler.LiveListenerBus$$anon$1$$anonfun$run$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(LiveListenerBus.scala:94)
at org.apache.spark.scheduler.LiveListenerBus$$anon$1$$anonfun$run$1$$anonfun$apply$mcV$sp$1.apply(LiveListenerBus.scala:79)
at org.apache.spark.scheduler.LiveListenerBus$$anon$1$$anonfun$run$1$$anonfun$apply$mcV$sp$1.apply(LiveListenerBus.scala:79)
at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58)
at org.apache.spark.scheduler.LiveListenerBus$$anon$1$$anonfun$run$1.apply$mcV$sp(LiveListenerBus.scala:78)
at org.apache.spark.util.Utils$.tryOrStopSparkContext(Utils.scala:1245)
at org.apache.spark.scheduler.LiveListenerBus$$anon$1.run(LiveListenerBus.scala:77)
18/07/10 15:42:18 ERROR LiveListenerBus: Listener EventLoggingListener threw an exception java.io.IOException: Filesystem closed at org.apache.hadoop.hdfs.DFSClient.checkOpen(DFSClient.java:798) at org.apache.hadoop.hdfs.DFSOutputStream.flushOrSync(DFSOutputStream.java:1985) at org.apache.hadoop.hdfs.DFSOutputStream.hflush(DFSOutputStream.java:1946) at org.apache.hadoop.fs.FSDataOutputStream.hflush(FSDataOutputStream.java:130) at org.apache.spark.scheduler.EventLoggingListener$$anonfun$logEvent$3.apply(EventLoggingListener.scala:140) at org.apache.spark.scheduler.EventLoggingListener$$anonfun$logEvent$3.apply(EventLoggingListener.scala:140) at scala.Option.foreach(Option.scala:257) at org.apache.spark.scheduler.EventLoggingListener.logEvent(EventLoggingListener.scala:140) at org.apache.spark.scheduler.EventLoggingListener.onJobStart(EventLoggingListener.scala:163) at org.apache.spark.scheduler.SparkListenerBus$class.doPostEvent(SparkListenerBus.scala:37) at org.apache.spark.scheduler.LiveListenerBus.doPostEvent(LiveListenerBus.scala:36) at org.apache.spark.scheduler.LiveListenerBus.doPostEvent(LiveListenerBus.scala:36) at org.apache.spark.util.ListenerBus$class.postToAll(ListenerBus.scala:63) at org.apache.spark.scheduler.LiveListenerBus.postToAll(LiveListenerBus.scala:36) at org.apache.spark.scheduler.LiveListenerBus$$anon$1$$anonfun$run$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(LiveListenerBus.scala:94) at org.apache.spark.scheduler.LiveListenerBus$$anon$1$$anonfun$run$1$$anonfun$apply$mcV$sp$1.apply(LiveListenerBus.scala:79) at org.apache.spark.scheduler.LiveListenerBus$$anon$1$$anonfun$run$1$$anonfun$apply$mcV$sp$1.apply(LiveListenerBus.scala:79) at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58) at org.apache.spark.scheduler.LiveListenerBus$$anon$1$$anonfun$run$1.apply$mcV$sp(LiveListenerBus.scala:78) at org.apache.spark.util.Utils$.tryOrStopSparkContext(Utils.scala:1245) at org.apache.spark.scheduler.LiveListenerBus$$anon$1.run(LiveListenerBus.scala:77)
18/07/10 15:42:18 ERROR LiveListenerBus: Listener EventLoggingListener threw an exception
java.io.IOException: Filesystem closed
    at org.apache.hadoop.hdfs.DFSClient.checkOpen(DFSClient.java:798)
    at org.apache.hadoop.hdfs.DFSOutputStream.flushOrSync(DFSOutputStream.java:1985)
    at org.apache.hadoop.hdfs.DFSOutputStream.hflush(DFSOutputStream.java:1946)
    at org.apache.hadoop.fs.FSDataOutputStream.hflush(FSDataOutputStream.java:130)
    at org.apache.spark.scheduler.EventLoggingListener$$anonfun$logEvent$3.apply(EventLoggingListener.scala:140)
    at org.apache.spark.scheduler.EventLoggingListener$$anonfun$logEvent$3.apply(EventLoggingListener.scala:140)
    at scala.Option.foreach(Option.scala:257)
    at org.apache.spark.scheduler.EventLoggingListener.logEvent(EventLoggingListener.scala:140)
    at org.apache.spark.scheduler.EventLoggingListener.onJobStart(EventLoggingListener.scala:163)
    at org.apache.spark.scheduler.SparkListenerBus$class.doPostEvent(SparkListenerBus.scala:37)
    at org.apache.spark.scheduler.LiveListenerBus.doPostEvent(LiveListenerBus.scala:36)
    at org.apache.spark.scheduler.LiveListenerBus.doPostEvent(LiveListenerBus.scala:36)
    at org.apache.spark.util.ListenerBus$class.postToAll(ListenerBus.scala:63)
    at org.apache.spark.scheduler.LiveListenerBus.postToAll(LiveListenerBus.scala:36)
    at org.apache.spark.scheduler.LiveListenerBus$$anon$1$$anonfun$run$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(LiveListenerBus.scala:94)
    at org.apache.spark.scheduler.LiveListenerBus$$anon$1$$anonfun$run$1$$anonfun$apply$mcV$sp$1.apply(LiveListenerBus.scala:79)
    at org.apache.spark.scheduler.LiveListenerBus$$anon$1$$anonfun$run$1$$anonfun$apply$mcV$sp$1.apply(LiveListenerBus.scala:79)
    at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58)
    at org.apache.spark.scheduler.LiveListenerBus$$anon$1$$anonfun$run$1.apply$mcV$sp(LiveListenerBus.scala:78)
    at org.apache.spark.util.Utils$.tryOrStopSparkContext(Utils.scala:1245)
    at org.apache.spark.scheduler.LiveListenerBus$$anon$1.run(LiveListenerBus.scala:77)

解决方法

由于Spark程序是以Spark-On-Yarn模式运行,Spark会把job日志是写在HDFS上。他们使用的是同一个FileSystem,从而导致异常。将fs.close()去掉就会恢复正常。

转载请注明:雪后西塘 » Listener EventLoggingListener threw an exception java.io.IOException

发表我的评论
取消评论

表情

Hi,您需要填写昵称和邮箱!

  • 昵称 (必填)
  • 邮箱 (必填)
  • 网址