Flink程序在Debug的过程中为了方便看到代码执行详细和相关指标,支持ExecutionEnvironment创建带有UI的LocalEnvironment:
ExecutionEnvironment environmentWithWebUI = ExecutionEnvironment.createLocalEnvironmentWithWebUI(new Configuration());
但是程序在IDEA真正执行起来以后如果打开http://127.0.0.1:8081/#/overview并不能看到真正的Dashboard,以一个WordCount示例代码为例:
package com.w3sun.flink.api;
import org.apache.flink.api.common.accumulators.LongCounter;
import org.apache.flink.api.common.functions.RichFlatMapFunction;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.operators.DataSource;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.shaded.guava18.com.google.common.base.Strings;
import org.apache.flink.util.Collector;
/**
* @author: w3sun
* @date: 2018/10/1 09:12
* @description:
*/
public class LambdaAPI {
public static void main(String[] args) throws Exception {
ParameterTool params = ParameterTool.fromArgs(args);
String input = params.get("input", "input");
ExecutionEnvironment environmentWithWebUI =
ExecutionEnvironment.createLocalEnvironmentWithWebUI(new Configuration());
DataSource<String> source = environmentWithWebUI.readTextFile(input);
source.flatMap(new RichFlatMapFunction<String, WordAndCount>() {
private static final long serialVersionUID = 8745781956542998958L;
private LongCounter numberLines = new LongCounter();
@Override
public void open(Configuration parameters) throws Exception {
getRuntimeContext().addAccumulator("num-lines", this.numberLines);
}
@Override
public void flatMap(String line, Collector<WordAndCount> out) throws Exception {
String[] words = line.split("\\s+");
for (String word : words) {
if (!Strings.isNullOrEmpty(word)) {
numberLines.add(1L);
out.collect(new WordAndCount(word, 1L));
}
}
}
}).returns(TypeInformation.of(WordAndCount.class)).print();
}
public static class WordAndCount {
private String word;
private Long count;
public WordAndCount() {
//empty the constructor.
}
public WordAndCount(String word, Long count) {
this.word = word;
this.count = count;
}
public String getWord() {
return word;
}
public void setWord(String word) {
this.word = word;
}
public Long getCount() {
return count;
}
public void setCount(Long count) {
this.count = count;
}
@Override
public String toString() {
return "WordAndCount{" +
"word='" + word + '\'' +
", count=" + count +
'}';
}
}
}
用户可以在org.apache.flink.api.java.ExecutionEnvironment#createLocalEnvironmentWithWebUI中找到封装了WebUI相关的设置信息:
@PublicEvolving
public static ExecutionEnvironment createLocalEnvironmentWithWebUI(Configuration conf) {
Preconditions.checkNotNull(conf, "conf");
conf.setBoolean("local.start-webserver", true);
if (!conf.contains(RestOptions.PORT)) {
conf.setInteger(RestOptions.PORT, (Integer)RestOptions.PORT.defaultValue());
}
继续挖掘到org.apache.flink.configuration.RestOptions发现:
/**
* The port that the server listens on / the client connects to.
*/
public static final ConfigOption<Integer> PORT =
key("rest.port")
.defaultValue(8081)
.withDeprecatedKeys("web.port")
.withDescription("The port that the server listens on / the client connects to.");
默认端口为8081,但是当用户打开http://127.0.0.1:8081/#/overview,出现以前错误:
这是因为缺少了flink-web jar包,将相关依赖添加进pom.xml即可:
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-runtime-web_2.11</artifactId>
<version>${flink.version}</version>
</dependency>
转载请注明:雪后西塘 » Flink IDEA中执行的WebUI

