Flink程序在Debug的过程中为了方便看到代码执行详细和相关指标,支持ExecutionEnvironment创建带有UI的LocalEnvironment:
ExecutionEnvironment environmentWithWebUI = ExecutionEnvironment.createLocalEnvironmentWithWebUI(new Configuration());
但是程序在IDEA真正执行起来以后如果打开http://127.0.0.1:8081/#/overview并不能看到真正的Dashboard,以一个WordCount示例代码为例:
package com.w3sun.flink.api; import org.apache.flink.api.common.accumulators.LongCounter; import org.apache.flink.api.common.functions.RichFlatMapFunction; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.java.ExecutionEnvironment; import org.apache.flink.api.java.operators.DataSource; import org.apache.flink.api.java.utils.ParameterTool; import org.apache.flink.configuration.Configuration; import org.apache.flink.shaded.guava18.com.google.common.base.Strings; import org.apache.flink.util.Collector; /** * @author: w3sun * @date: 2018/10/1 09:12 * @description: */ public class LambdaAPI { public static void main(String[] args) throws Exception { ParameterTool params = ParameterTool.fromArgs(args); String input = params.get("input", "input"); ExecutionEnvironment environmentWithWebUI = ExecutionEnvironment.createLocalEnvironmentWithWebUI(new Configuration()); DataSource<String> source = environmentWithWebUI.readTextFile(input); source.flatMap(new RichFlatMapFunction<String, WordAndCount>() { private static final long serialVersionUID = 8745781956542998958L; private LongCounter numberLines = new LongCounter(); @Override public void open(Configuration parameters) throws Exception { getRuntimeContext().addAccumulator("num-lines", this.numberLines); } @Override public void flatMap(String line, Collector<WordAndCount> out) throws Exception { String[] words = line.split("\\s+"); for (String word : words) { if (!Strings.isNullOrEmpty(word)) { numberLines.add(1L); out.collect(new WordAndCount(word, 1L)); } } } }).returns(TypeInformation.of(WordAndCount.class)).print(); } public static class WordAndCount { private String word; private Long count; public WordAndCount() { //empty the constructor. } public WordAndCount(String word, Long count) { this.word = word; this.count = count; } public String getWord() { return word; } public void setWord(String word) { this.word = word; } public Long getCount() { return count; } public void setCount(Long count) { this.count = count; } @Override public String toString() { return "WordAndCount{" + "word='" + word + '\'' + ", count=" + count + '}'; } } }
用户可以在org.apache.flink.api.java.ExecutionEnvironment#createLocalEnvironmentWithWebUI中找到封装了WebUI相关的设置信息:
@PublicEvolving public static ExecutionEnvironment createLocalEnvironmentWithWebUI(Configuration conf) { Preconditions.checkNotNull(conf, "conf"); conf.setBoolean("local.start-webserver", true); if (!conf.contains(RestOptions.PORT)) { conf.setInteger(RestOptions.PORT, (Integer)RestOptions.PORT.defaultValue()); }
继续挖掘到org.apache.flink.configuration.RestOptions发现:
/** * The port that the server listens on / the client connects to. */ public static final ConfigOption<Integer> PORT = key("rest.port") .defaultValue(8081) .withDeprecatedKeys("web.port") .withDescription("The port that the server listens on / the client connects to.");
默认端口为8081,但是当用户打开http://127.0.0.1:8081/#/overview,出现以前错误:
这是因为缺少了flink-web jar包,将相关依赖添加进pom.xml即可:
<dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-runtime-web_2.11</artifactId> <version>${flink.version}</version> </dependency>
转载请注明:雪后西塘 » Flink IDEA中执行的WebUI