最新消息:天气越来越冷,记得加一件厚衣裳

Flink IDEA中执行的WebUI

Flink w3sun 3856浏览 0评论

Flink程序在Debug的过程中为了方便看到代码执行详细和相关指标,支持ExecutionEnvironment创建带有UI的LocalEnvironment:

ExecutionEnvironment environmentWithWebUI = ExecutionEnvironment.createLocalEnvironmentWithWebUI(new Configuration());

但是程序在IDEA真正执行起来以后如果打开http://127.0.0.1:8081/#/overview并不能看到真正的Dashboard,以一个WordCount示例代码为例:

package com.w3sun.flink.api;

import org.apache.flink.api.common.accumulators.LongCounter;
import org.apache.flink.api.common.functions.RichFlatMapFunction;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.operators.DataSource;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.shaded.guava18.com.google.common.base.Strings;
import org.apache.flink.util.Collector;

/**
 * @author: w3sun
 * @date: 2018/10/1 09:12
 * @description:
 */
public class LambdaAPI {
  public static void main(String[] args) throws Exception {
    ParameterTool params = ParameterTool.fromArgs(args);
    String input = params.get("input", "input");
    ExecutionEnvironment environmentWithWebUI =
        ExecutionEnvironment.createLocalEnvironmentWithWebUI(new Configuration());
    DataSource<String> source = environmentWithWebUI.readTextFile(input);
    source.flatMap(new RichFlatMapFunction<String, WordAndCount>() {
      private static final long serialVersionUID = 8745781956542998958L;
      private LongCounter numberLines = new LongCounter();

      @Override
      public void open(Configuration parameters) throws Exception {
        getRuntimeContext().addAccumulator("num-lines", this.numberLines);
      }

      @Override
      public void flatMap(String line, Collector<WordAndCount> out) throws Exception {
        String[] words = line.split("\\s+");
        for (String word : words) {
          if (!Strings.isNullOrEmpty(word)) {
            numberLines.add(1L);
            out.collect(new WordAndCount(word, 1L));
          }
        }
      }
    }).returns(TypeInformation.of(WordAndCount.class)).print();
  }

  public static class WordAndCount {
    private String word;
    private Long count;

    public WordAndCount() {
      //empty the constructor.
    }

    public WordAndCount(String word, Long count) {
      this.word = word;
      this.count = count;
    }

    public String getWord() {
      return word;
    }

    public void setWord(String word) {
      this.word = word;
    }

    public Long getCount() {
      return count;
    }

    public void setCount(Long count) {
      this.count = count;
    }

    @Override
    public String toString() {
      return "WordAndCount{" +
          "word='" + word + '\'' +
          ", count=" + count +
          '}';
    }
  }
}

用户可以在org.apache.flink.api.java.ExecutionEnvironment#createLocalEnvironmentWithWebUI中找到封装了WebUI相关的设置信息:

@PublicEvolving
  public static ExecutionEnvironment createLocalEnvironmentWithWebUI(Configuration conf) {
    Preconditions.checkNotNull(conf, "conf");
    conf.setBoolean("local.start-webserver", true);
    if (!conf.contains(RestOptions.PORT)) {
      conf.setInteger(RestOptions.PORT, (Integer)RestOptions.PORT.defaultValue());
    }

继续挖掘到org.apache.flink.configuration.RestOptions发现:

/**
 * The port that the server listens on / the client connects to.
 */
public static final ConfigOption<Integer> PORT =
 key("rest.port")
  .defaultValue(8081)
  .withDeprecatedKeys("web.port")
  .withDescription("The port that the server listens on / the client connects to.");

默认端口为8081,但是当用户打开http://127.0.0.1:8081/#/overview,出现以前错误:

这是因为缺少了flink-web jar包,将相关依赖添加进pom.xml即可:

<dependency>
   <groupId>org.apache.flink</groupId>
   <artifactId>flink-runtime-web_2.11</artifactId>
   <version>${flink.version}</version>
</dependency>

转载请注明:雪后西塘 » Flink IDEA中执行的WebUI

发表我的评论
取消评论

表情

Hi,您需要填写昵称和邮箱!

  • 昵称 (必填)
  • 邮箱 (必填)
  • 网址