前言
Flink程序在Debug的过程中为了方便看到代码执行详细和相关指标,支持ExecutionEnvironment创建带有UI的LocalEnvironment:
1 2 |
ExecutionEnvironment environmentWithWebUI = ExecutionEnvironment.createLocalEnvironmentWithWebUI(new Configuration()); |
但是程序在IDEA真正执行起来以后如果打开http://127.0.0.1:8081/#/overview并不能看到真正的Dashboard,以一个WordCount示例代码为例:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 |
package com.w3sun.flink.api; import org.apache.flink.api.common.accumulators.LongCounter; import org.apache.flink.api.common.functions.RichFlatMapFunction; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.java.ExecutionEnvironment; import org.apache.flink.api.java.operators.DataSource; import org.apache.flink.api.java.utils.ParameterTool; import org.apache.flink.configuration.Configuration; import org.apache.flink.shaded.guava18.com.google.common.base.Strings; import org.apache.flink.util.Collector; /** * @author: w3sun * @date: 2018/10/1 09:12 * @description: */ public class LambdaAPI { public static void main(String[] args) throws Exception { ParameterTool params = ParameterTool.fromArgs(args); String input = params.get("input", "input"); ExecutionEnvironment environmentWithWebUI = ExecutionEnvironment.createLocalEnvironmentWithWebUI(new Configuration()); DataSource<String> source = environmentWithWebUI.readTextFile(input); source.flatMap(new RichFlatMapFunction<String, WordAndCount>() { private static final long serialVersionUID = 8745781956542998958L; private LongCounter numberLines = new LongCounter(); @Override public void open(Configuration parameters) throws Exception { getRuntimeContext().addAccumulator("num-lines", this.numberLines); } @Override public void flatMap(String line, Collector<WordAndCount> out) throws Exception { String[] words = line.split("\\s+"); for (String word : words) { if (!Strings.isNullOrEmpty(word)) { numberLines.add(1L); out.collect(new WordAndCount(word, 1L)); } } } }).returns(TypeInformation.of(WordAndCount.class)).print(); } public static class WordAndCount { private String word; private Long count; public WordAndCount() { //empty the constructor. } public WordAndCount(String word, Long count) { this.word = word; this.count = count; } public String getWord() { return word; } public void setWord(String word) { this.word = word; } public Long getCount() { return count; } public void setCount(Long count) { this.count = count; } @Override public String toString() { return "WordAndCount{" + "word='" + word + '\'' + ", count=" + count + '}'; } } } |
用户可以在org.apache.flink.api.java.ExecutionEnvironment#createLocalEnvironmentWithWebUI中找到封装了WebUI相关的设置信息:
1 2 3 4 5 6 7 |
@PublicEvolving public static ExecutionEnvironment createLocalEnvironmentWithWebUI(Configuration conf) { Preconditions.checkNotNull(conf, "conf"); conf.setBoolean("local.start-webserver", true); if (!conf.contains(RestOptions.PORT)) { conf.setInteger(RestOptions.PORT, (Integer)RestOptions.PORT.defaultValue()); } |
继续挖掘到org.apache.flink.configuration.RestOptions发现:
1 2 3 4 5 6 7 8 |
/** * The port that the server listens on / the client connects to. */ public static final ConfigOption<Integer> PORT = key("rest.port") .defaultValue(8081) .withDeprecatedKeys("web.port") .withDescription("The port that the server listens on / the client connects to."); |
默认端口为8081,但是当用户打开http://127.0.0.1:8081/#/overview,出现以前错误:
这是因为缺少了flink-web jar包,将相关依赖添加进pom.xml即可:
1 2 3 4 5 |
<dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-runtime-web_2.11</artifactId> <version>${flink.version}</version> </dependency> |
© 著作权归作者所有
文章评论(0)