问题背景
最近在做实时流计算,Coding过程中需要对写入到Redis中的数据进行Dump,经过综合考量决定采用Spark Core进行数据操作,并将结果数据Dump到HDFS上。Spark程序操作HDFS的工具类中,手动获取了FileSystem,执行完HDFS相关操作后对FileSystem进行close:
object HdfsHandler { @transient val logger = LoggerFactory.getLogger(this.getClass) val config: Configuration = new Configuration() /** * Get the storage path for current batch data. * * @param dateTime DateTime * @return storage path in String */ def getCurrentPath(dateTime: DateTime): String = { var fs: FileSystem = null try { fs = FileSystem.get(config) } catch { case e: Exception => { logger.error("Get fs:FileSystem error!") e.printStackTrace() } } val dateDir: String = getDateDir(dateTime) val hour = dateTime.toString(AppConfig.HDFS_DIR_DATE_HOUR_FORMATTER) val pathStr = dateDir .concat(File.separator) .concat(hour) try { delHdfsFile(fs, pathStr) } catch { case e: Exception => { logger.error("Delete files from the directory to which the data will be storaged to failed!") e.printStackTrace() } }finally{ fs.close() } pathStr } ....... /** * Delete file on HDFS. * * @param fs FileSystem * @param elem string path */ private def delHdfsFile(fs: FileSystem, elem: String): Unit = { try { val path = new Path(elem) if (fs.exists(path)) { fs.delete(path, true) } } catch { case e: Exception => { e.printStackTrace() new Throwable("Delete HDFS files Error! codeLocation: HdfsHandler.delHdfsFile") } } } }
本来是标准资源释放流程,但是却抛出了一个大大的异常:
18/07/10 15:42:18 ERROR LiveListenerBus: Listener EventLoggingListener threw an exception java.io.IOException: Filesystem closed at org.apache.hadoop.hdfs.DFSClient.checkOpen(DFSClient.java:798) at org.apache.hadoop.hdfs.DFSOutputStream.flushOrSync(DFSOutputStream.java:1985) at org.apache.hadoop.hdfs.DFSOutputStream.hflush(DFSOutputStream.java:1946) at org.apache.hadoop.fs.FSDataOutputStream.hflush(FSDataOutputStream.java:130) at org.apache.spark.scheduler.EventLoggingListener$$anonfun$logEvent$3.apply(EventLoggingListener.scala:140) at org.apache.spark.scheduler.EventLoggingListener$$anonfun$logEvent$3.apply(EventLoggingListener.scala:140) at scala.Option.foreach(Option.scala:257) at org.apache.spark.scheduler.EventLoggingListener.logEvent(EventLoggingListener.scala:140) at org.apache.spark.scheduler.EventLoggingListener.onJobStart(EventLoggingListener.scala:163) at org.apache.spark.scheduler.SparkListenerBus$class.doPostEvent(SparkListenerBus.scala:37) at org.apache.spark.scheduler.LiveListenerBus.doPostEvent(LiveListenerBus.scala:36) at org.apache.spark.scheduler.LiveListenerBus.doPostEvent(LiveListenerBus.scala:36) at org.apache.spark.util.ListenerBus$class.postToAll(ListenerBus.scala:63) at org.apache.spark.scheduler.LiveListenerBus.postToAll(LiveListenerBus.scala:36) at org.apache.spark.scheduler.LiveListenerBus$$anon$1$$anonfun$run$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(LiveListenerBus.scala:94) at org.apache.spark.scheduler.LiveListenerBus$$anon$1$$anonfun$run$1$$anonfun$apply$mcV$sp$1.apply(LiveListenerBus.scala:79) at org.apache.spark.scheduler.LiveListenerBus$$anon$1$$anonfun$run$1$$anonfun$apply$mcV$sp$1.apply(LiveListenerBus.scala:79) at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58) at org.apache.spark.scheduler.LiveListenerBus$$anon$1$$anonfun$run$1.apply$mcV$sp(LiveListenerBus.scala:78) at org.apache.spark.util.Utils$.tryOrStopSparkContext(Utils.scala:1245) at org.apache.spark.scheduler.LiveListenerBus$$anon$1.run(LiveListenerBus.scala:77)
解决方法
由于Spark程序是以Spark-On-Yarn模式运行,Spark会把job日志是写在HDFS上。他们使用的是同一个FileSystem,从而导致异常。将fs.close()去掉就会恢复正常。
转载请注明:雪后西塘 » Listener EventLoggingListener threw an exception java.io.IOException