您的位置 首页 php

聊聊flink DataStream的split操作

本文主要研究一下flink DataStream的split操作

实例

SplitStream<Integer> split = someDataStream.split(new OutputSelector<Integer>() {
 @Override
 public Iterable<String> select(Integer value) {
 List<String> output = new ArrayList<String>();
 if (value % 2 == 0) {
 output.add("even");
 }
 else {
 output.add("odd");
 }
 return output;
 }
});
 
  • 本实例将dataStream split为两个dataStream,一个outputName为even,另一个outputName为odd

DataStream.split

flink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/ api /datastream/DataStream.java

@Public
public class DataStream<T> {
​
 //......
​
 public SplitStream<T> split(OutputSelector<T> outputSelector) {
 return new SplitStream<>(this, clean(outputSelector));
 }
​
 //......
}
 
  • DataStream的split操作接收OutputSelector参数,然后创建并返回SplitStream

OutputSelector

flink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/api/collector/selector/OutputSelector.java

@PublicEvolving
public interface OutputSelector<OUT> extends Serializable {
​
 Iterable<String> select(OUT value);
​
}
 
  • OutputSelector定义了select方法用于给 element 打上outputNames

SplitStream

flink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/api/datastream/SplitStream.java

@PublicEvolving
public class SplitStream<OUT> extends DataStream<OUT> {
​
 protected SplitStream(DataStream<OUT> dataStream, OutputSelector<OUT> outputSelector) {
 super(dataStream.getExecutionEnvironment(), new SplitTransformation<OUT>(dataStream.getTransformation(), outputSelector));
 }
​
 public DataStream<OUT> select(String... outputNames) {
 return selectOutput(outputNames);
 }
​
 private DataStream<OUT> selectOutput(String[] outputNames) {
 for (String outName : outputNames) {
 if (outName == null) {
 throw new RuntimeException("Selected names must not be null");
 }
 }
​
 SelectTransformation<OUT> selectTransform = new SelectTransformation<OUT>(this.getTransformation(), Lists.newArrayList(outputNames));
 return new DataStream<OUT>(this.getExecutionEnvironment(), selectTransform);
 }
​
}
 
  • SplitStream继承了DataStream,它定义了select方法,可以用来根据outputNames选择split出来的dataStream;select方法创建了SelectTransformation

StreamGraphGenerator

flink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/api/graph/StreamGraphGenerator.java

@ Internal 
public class StreamGraphGenerator {
​
 //......
​
 private  Collection <Integer> transform(StreamTransformation<?> transform) {
​
 if (alreadyTransformed.containsKey(transform)) {
 return alreadyTransformed.get(transform);
 }
​
  LOG .debug("Transforming " + transform);
​
 if (transform.getMaxParallelism() <= 0) {
​
 // if the max parallelism hasn't been set, then first use the job wide max parallelism
 // from theExecutionConfig.
 int globalMaxParallelismFromConfig = env.getConfig().getMaxParallelism();
 if (globalMaxParallelismFromConfig > 0) {
 transform.setMaxParallelism(globalMaxParallelismFromConfig);
 }
 }
​
 // call at least once to trigger exceptions about MissingTypeInfo
 transform.getOutputType();
​
 Collection<Integer> transformedIds;
 if (transform instanceof OneInputTransformation<?, ?>) {
 transformedIds = transformOneInputTransform((OneInputTransformation<?, ?>) transform);
 } else if (transform instanceof TwoInputTransformation<?, ?, ?>) {
 transformedIds = transformTwoInputTransform((TwoInputTransformation<?, ?, ?>) transform);
 } else if (transform instanceof SourceTransformation<?>) {
 transformedIds = transformSource((SourceTransformation<?>) transform);
 } else if (transform instanceof SinkTransformation<?>) {
 transformedIds = transformSink((SinkTransformation<?>) transform);
 } else if (transform instanceof UnionTransformation<?>) {
 transformedIds = transformUnion((UnionTransformation<?>) transform);
 } else if (transform instanceof SplitTransformation<?>) {
 transformedIds = transformSplit((SplitTransformation<?>) transform);
 } else if (transform instanceof SelectTransformation<?>) {
 transformedIds = transformSelect((SelectTransformation<?>) transform);
 } else if (transform instanceof FeedbackTransformation<?>) {
 transformedIds = transformFeedback((FeedbackTransformation<?>) transform);
 } else if (transform instanceof CoFeedbackTransformation<?>) {
 transformedIds = transformCoFeedback((CoFeedbackTransformation<?>) transform);
 } else if (transform instanceof PartitionTransformation<?>) {
 transformedIds = transformPartition((PartitionTransformation<?>) transform);
 } else if (transform instanceof SideOutputTransformation<?>) {
 transformedIds = transformSideOutput((SideOutputTransformation<?>) transform);
 } else {
 throw new IllegalStateException("Unknown transformation: " + transform);
 }
​
 // need this check because the iterate transformation adds itself before
 // transforming the feedback edges
 if (!alreadyTransformed.containsKey(transform)) {
 alreadyTransformed.put(transform, transformedIds);
 }
​
 if (transform.getBufferTimeout() >= 0) {
 streamGraph.setBufferTimeout(transform.getId(), transform.getBufferTimeout());
 }
 if (transform.getUid() != null) {
 streamGraph.setTransformationUID(transform.getId(), transform.getUid());
 }
 if (transform.getUserProvidedNodeHash() != null) {
 streamGraph.setTransformationUserHash(transform.getId(), transform.getUserProvidedNodeHash());
 }
​
 if (transform.getMinResources() != null && transform.getPreferredResources() != null) {
 streamGraph.setResources(transform.getId(), transform.getMinResources(), transform.getPreferredResources());
 }
​
 return transformedIds;
 }
​
 private <T> Collection<Integer> transformSelect(SelectTransformation<T> select) {
 StreamTransformation<T> input = select.getInput();
 Collection<Integer> resultIds = transform(input);
​
 // the recursive transform might have already transformed this
 if (alreadyTransformed.containsKey(select)) {
 return alreadyTransformed.get(select);
 }
​
 List<Integer> virtualResultIds = new ArrayList<>();
​
 for (int inputId : resultIds) {
 int virtualId = StreamTransformation.getNewNodeId();
 streamGraph.addVirtualSelectNode(inputId, virtualId, select.getSelectedNames());
 virtualResultIds.add(virtualId);
 }
 return virtualResultIds;
 }
​
 private <T> Collection<Integer> transformSplit(SplitTransformation<T> split) {
​
 StreamTransformation<T> input = split.getInput();
 Collection<Integer> resultIds = transform(input);
​
 // the recursive transform call might have transformed this already
 if (alreadyTransformed.containsKey(split)) {
 return alreadyTransformed.get(split);
 }
​
 for (int inputId : resultIds) {
 streamGraph.addOutputSelector(inputId, split.getOutputSelector());
 }
​
 return resultIds;
 }
​
 //......
}
 
  • StreamGraphGenerator里头的transform会对SelectTransformation以及SplitTransformation进行相应的处理
  • transformSelect方法会根据select.getSelectedNames()来addVirtualSelectNode
  • transformSplit方法则根据split.getOutputSelector()来addOutputSelector

小结

  • DataStream的split操作接收OutputSelector参数,然后创建并返回SplitStream
  • OutputSelector定义了select方法用于给element打上outputNames
  • SplitStream继承了DataStream,它定义了select方法,可以用来根据outputNames选择split出来的dataStream

doc

  • DataStream Transformations

文章来源:智云一二三科技

文章标题:聊聊flink DataStream的split操作

文章地址:https://www.zhihuclub.com/79090.shtml

关于作者: 智云科技

热门文章

网站地图