问题来源
最近在做实时流计算,Coding过程中需要对写入到Redis中的数据进行Dump,经过综合考量决定采用Spark Core进行数据操作,并将结果数据Dump到HDFS上。Spark程序操作HDFS的工具类中,手动获取了FileSystem,执行完HDFS相关操作后对FileSystem进行close:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 |
object HdfsHandler { @transient val logger = LoggerFactory.getLogger(this.getClass) val config: Configuration = new Configuration() /** * Get the storage path for current batch data. * * @param dateTime DateTime * @return storage path in String */ def getCurrentPath(dateTime: DateTime): String = { var fs: FileSystem = null try { fs = FileSystem.get(config) } catch { case e: Exception => { logger.error("Get fs:FileSystem error!") e.printStackTrace() } } val dateDir: String = getDateDir(dateTime) val hour = dateTime.toString(AppConfig.HDFS_DIR_DATE_HOUR_FORMATTER) val pathStr = dateDir .concat(File.separator) .concat(hour) try { delHdfsFile(fs, pathStr) } catch { case e: Exception => { logger.error("Delete files from the directory to which the data will be storaged to failed!") e.printStackTrace() } }finally{ fs.close() } pathStr } ....... /** * Delete file on HDFS. * * @param fs FileSystem * @param elem string path */ private def delHdfsFile(fs: FileSystem, elem: String): Unit = { try { val path = new Path(elem) if (fs.exists(path)) { fs.delete(path, true) } } catch { case e: Exception => { e.printStackTrace() new Throwable("Delete HDFS files Error! codeLocation: HdfsHandler.delHdfsFile") } } } } |
本来是标准资源释放流程,但是却抛出了一个大大的异常:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 |
18/07/10 15:42:18 ERROR LiveListenerBus: Listener EventLoggingListener threw an exception java.io.IOException: Filesystem closed at org.apache.hadoop.hdfs.DFSClient.checkOpen(DFSClient.java:798) at org.apache.hadoop.hdfs.DFSOutputStream.flushOrSync(DFSOutputStream.java:1985) at org.apache.hadoop.hdfs.DFSOutputStream.hflush(DFSOutputStream.java:1946) at org.apache.hadoop.fs.FSDataOutputStream.hflush(FSDataOutputStream.java:130) at org.apache.spark.scheduler.EventLoggingListener$$anonfun$logEvent$3.apply(EventLoggingListener.scala:140) at org.apache.spark.scheduler.EventLoggingListener$$anonfun$logEvent$3.apply(EventLoggingListener.scala:140) at scala.Option.foreach(Option.scala:257) at org.apache.spark.scheduler.EventLoggingListener.logEvent(EventLoggingListener.scala:140) at org.apache.spark.scheduler.EventLoggingListener.onJobStart(EventLoggingListener.scala:163) at org.apache.spark.scheduler.SparkListenerBus$class.doPostEvent(SparkListenerBus.scala:37) at org.apache.spark.scheduler.LiveListenerBus.doPostEvent(LiveListenerBus.scala:36) at org.apache.spark.scheduler.LiveListenerBus.doPostEvent(LiveListenerBus.scala:36) at org.apache.spark.util.ListenerBus$class.postToAll(ListenerBus.scala:63) at org.apache.spark.scheduler.LiveListenerBus.postToAll(LiveListenerBus.scala:36) at org.apache.spark.scheduler.LiveListenerBus$$anon$1$$anonfun$run$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(LiveListenerBus.scala:94) at org.apache.spark.scheduler.LiveListenerBus$$anon$1$$anonfun$run$1$$anonfun$apply$mcV$sp$1.apply(LiveListenerBus.scala:79) at org.apache.spark.scheduler.LiveListenerBus$$anon$1$$anonfun$run$1$$anonfun$apply$mcV$sp$1.apply(LiveListenerBus.scala:79) at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58) at org.apache.spark.scheduler.LiveListenerBus$$anon$1$$anonfun$run$1.apply$mcV$sp(LiveListenerBus.scala:78) at org.apache.spark.util.Utils$.tryOrStopSparkContext(Utils.scala:1245) at org.apache.spark.scheduler.LiveListenerBus$$anon$1.run(LiveListenerBus.scala:77) |
解决方法
由于Spark程序是以Spark-On-Yarn模式运行,Spark会把job日志是写在HDFS上。他们使用的是同一个FileSystem,从而导致异常。将fs.close()去掉就会恢复正常。
© 著作权归作者所有
文章评论(0)