最新消息:天气越来越冷,记得加一件厚衣裳

Flink IDEA中执行的WebUI

Flink w3sun 4683浏览 0评论

Flink程序在Debug的过程中为了方便看到代码执行详细和相关指标,支持ExecutionEnvironment创建带有UI的LocalEnvironment:

Plain text
Copy to clipboard
Open code in new window
EnlighterJS 3 Syntax Highlighter
ExecutionEnvironment environmentWithWebUI = ExecutionEnvironment.createLocalEnvironmentWithWebUI(new Configuration());
ExecutionEnvironment environmentWithWebUI = ExecutionEnvironment.createLocalEnvironmentWithWebUI(new Configuration());
ExecutionEnvironment environmentWithWebUI = ExecutionEnvironment.createLocalEnvironmentWithWebUI(new Configuration());

但是程序在IDEA真正执行起来以后如果打开http://127.0.0.1:8081/#/overview并不能看到真正的Dashboard,以一个WordCount示例代码为例:

Plain text
Copy to clipboard
Open code in new window
EnlighterJS 3 Syntax Highlighter
package com.w3sun.flink.api;
import org.apache.flink.api.common.accumulators.LongCounter;
import org.apache.flink.api.common.functions.RichFlatMapFunction;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.operators.DataSource;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.shaded.guava18.com.google.common.base.Strings;
import org.apache.flink.util.Collector;
/**
* @author: w3sun
* @date: 2018/10/1 09:12
* @description:
*/
public class LambdaAPI {
public static void main(String[] args) throws Exception {
ParameterTool params = ParameterTool.fromArgs(args);
String input = params.get("input", "input");
ExecutionEnvironment environmentWithWebUI =
ExecutionEnvironment.createLocalEnvironmentWithWebUI(new Configuration());
DataSource<String> source = environmentWithWebUI.readTextFile(input);
source.flatMap(new RichFlatMapFunction<String, WordAndCount>() {
private static final long serialVersionUID = 8745781956542998958L;
private LongCounter numberLines = new LongCounter();
@Override
public void open(Configuration parameters) throws Exception {
getRuntimeContext().addAccumulator("num-lines", this.numberLines);
}
@Override
public void flatMap(String line, Collector<WordAndCount> out) throws Exception {
String[] words = line.split("\\s+");
for (String word : words) {
if (!Strings.isNullOrEmpty(word)) {
numberLines.add(1L);
out.collect(new WordAndCount(word, 1L));
}
}
}
}).returns(TypeInformation.of(WordAndCount.class)).print();
}
public static class WordAndCount {
private String word;
private Long count;
public WordAndCount() {
//empty the constructor.
}
public WordAndCount(String word, Long count) {
this.word = word;
this.count = count;
}
public String getWord() {
return word;
}
public void setWord(String word) {
this.word = word;
}
public Long getCount() {
return count;
}
public void setCount(Long count) {
this.count = count;
}
@Override
public String toString() {
return "WordAndCount{" +
"word='" + word + '\'' +
", count=" + count +
'}';
}
}
}
package com.w3sun.flink.api; import org.apache.flink.api.common.accumulators.LongCounter; import org.apache.flink.api.common.functions.RichFlatMapFunction; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.java.ExecutionEnvironment; import org.apache.flink.api.java.operators.DataSource; import org.apache.flink.api.java.utils.ParameterTool; import org.apache.flink.configuration.Configuration; import org.apache.flink.shaded.guava18.com.google.common.base.Strings; import org.apache.flink.util.Collector; /** * @author: w3sun * @date: 2018/10/1 09:12 * @description: */ public class LambdaAPI { public static void main(String[] args) throws Exception { ParameterTool params = ParameterTool.fromArgs(args); String input = params.get("input", "input"); ExecutionEnvironment environmentWithWebUI = ExecutionEnvironment.createLocalEnvironmentWithWebUI(new Configuration()); DataSource<String> source = environmentWithWebUI.readTextFile(input); source.flatMap(new RichFlatMapFunction<String, WordAndCount>() { private static final long serialVersionUID = 8745781956542998958L; private LongCounter numberLines = new LongCounter(); @Override public void open(Configuration parameters) throws Exception { getRuntimeContext().addAccumulator("num-lines", this.numberLines); } @Override public void flatMap(String line, Collector<WordAndCount> out) throws Exception { String[] words = line.split("\\s+"); for (String word : words) { if (!Strings.isNullOrEmpty(word)) { numberLines.add(1L); out.collect(new WordAndCount(word, 1L)); } } } }).returns(TypeInformation.of(WordAndCount.class)).print(); } public static class WordAndCount { private String word; private Long count; public WordAndCount() { //empty the constructor. } public WordAndCount(String word, Long count) { this.word = word; this.count = count; } public String getWord() { return word; } public void setWord(String word) { this.word = word; } public Long getCount() { return count; } public void setCount(Long count) { this.count = count; } @Override public String toString() { return "WordAndCount{" + "word='" + word + '\'' + ", count=" + count + '}'; } } }
package com.w3sun.flink.api;

import org.apache.flink.api.common.accumulators.LongCounter;
import org.apache.flink.api.common.functions.RichFlatMapFunction;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.operators.DataSource;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.shaded.guava18.com.google.common.base.Strings;
import org.apache.flink.util.Collector;

/**
 * @author: w3sun
 * @date: 2018/10/1 09:12
 * @description:
 */
public class LambdaAPI {
  public static void main(String[] args) throws Exception {
    ParameterTool params = ParameterTool.fromArgs(args);
    String input = params.get("input", "input");
    ExecutionEnvironment environmentWithWebUI =
        ExecutionEnvironment.createLocalEnvironmentWithWebUI(new Configuration());
    DataSource<String> source = environmentWithWebUI.readTextFile(input);
    source.flatMap(new RichFlatMapFunction<String, WordAndCount>() {
      private static final long serialVersionUID = 8745781956542998958L;
      private LongCounter numberLines = new LongCounter();

      @Override
      public void open(Configuration parameters) throws Exception {
        getRuntimeContext().addAccumulator("num-lines", this.numberLines);
      }

      @Override
      public void flatMap(String line, Collector<WordAndCount> out) throws Exception {
        String[] words = line.split("\\s+");
        for (String word : words) {
          if (!Strings.isNullOrEmpty(word)) {
            numberLines.add(1L);
            out.collect(new WordAndCount(word, 1L));
          }
        }
      }
    }).returns(TypeInformation.of(WordAndCount.class)).print();
  }

  public static class WordAndCount {
    private String word;
    private Long count;

    public WordAndCount() {
      //empty the constructor.
    }

    public WordAndCount(String word, Long count) {
      this.word = word;
      this.count = count;
    }

    public String getWord() {
      return word;
    }

    public void setWord(String word) {
      this.word = word;
    }

    public Long getCount() {
      return count;
    }

    public void setCount(Long count) {
      this.count = count;
    }

    @Override
    public String toString() {
      return "WordAndCount{" +
          "word='" + word + '\'' +
          ", count=" + count +
          '}';
    }
  }
}

用户可以在org.apache.flink.api.java.ExecutionEnvironment#createLocalEnvironmentWithWebUI中找到封装了WebUI相关的设置信息:

Plain text
Copy to clipboard
Open code in new window
EnlighterJS 3 Syntax Highlighter
@PublicEvolving
public static ExecutionEnvironment createLocalEnvironmentWithWebUI(Configuration conf) {
Preconditions.checkNotNull(conf, "conf");
conf.setBoolean("local.start-webserver", true);
if (!conf.contains(RestOptions.PORT)) {
conf.setInteger(RestOptions.PORT, (Integer)RestOptions.PORT.defaultValue());
}
@PublicEvolving public static ExecutionEnvironment createLocalEnvironmentWithWebUI(Configuration conf) { Preconditions.checkNotNull(conf, "conf"); conf.setBoolean("local.start-webserver", true); if (!conf.contains(RestOptions.PORT)) { conf.setInteger(RestOptions.PORT, (Integer)RestOptions.PORT.defaultValue()); }
@PublicEvolving
  public static ExecutionEnvironment createLocalEnvironmentWithWebUI(Configuration conf) {
    Preconditions.checkNotNull(conf, "conf");
    conf.setBoolean("local.start-webserver", true);
    if (!conf.contains(RestOptions.PORT)) {
      conf.setInteger(RestOptions.PORT, (Integer)RestOptions.PORT.defaultValue());
    }

继续挖掘到org.apache.flink.configuration.RestOptions发现:

Plain text
Copy to clipboard
Open code in new window
EnlighterJS 3 Syntax Highlighter
/**
* The port that the server listens on / the client connects to.
*/
public static final ConfigOption<Integer> PORT =
key("rest.port")
.defaultValue(8081)
.withDeprecatedKeys("web.port")
.withDescription("The port that the server listens on / the client connects to.");
/** * The port that the server listens on / the client connects to. */ public static final ConfigOption<Integer> PORT = key("rest.port") .defaultValue(8081) .withDeprecatedKeys("web.port") .withDescription("The port that the server listens on / the client connects to.");
/**
 * The port that the server listens on / the client connects to.
 */
public static final ConfigOption<Integer> PORT =
 key("rest.port")
  .defaultValue(8081)
  .withDeprecatedKeys("web.port")
  .withDescription("The port that the server listens on / the client connects to.");

默认端口为8081,但是当用户打开http://127.0.0.1:8081/#/overview,出现以前错误:

这是因为缺少了flink-web jar包,将相关依赖添加进pom.xml即可:

Plain text
Copy to clipboard
Open code in new window
EnlighterJS 3 Syntax Highlighter
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-runtime-web_2.11</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-runtime-web_2.11</artifactId> <version>${flink.version}</version> </dependency>
<dependency>
   <groupId>org.apache.flink</groupId>
   <artifactId>flink-runtime-web_2.11</artifactId>
   <version>${flink.version}</version>
</dependency>

转载请注明:雪后西塘 » Flink IDEA中执行的WebUI

发表我的评论
取消评论

表情

Hi,您需要填写昵称和邮箱!

  • 昵称 (必填)
  • 邮箱 (必填)
  • 网址