首页 技术 正文
技术 2022年11月10日
0 收藏 461 点赞 4,567 浏览 5076 个字

window是处理数据的核心。按需选择你需要的窗口类型后,它会将传入的原始数据流切分成多个buckets,所有计算都在window中进行。

flink本身提供的实例程序TopSpeedWindowing.java

import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.api.java.tuple.Tuple4;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.api.functions.timestamps.AscendingTimestampExtractor;
import org.apache.flink.streaming.api.functions.windowing.delta.DeltaFunction;
import org.apache.flink.streaming.api.windowing.assigners.GlobalWindows;
import org.apache.flink.streaming.api.windowing.evictors.TimeEvictor;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.triggers.DeltaTrigger;import java.util.Arrays;
import java.util.Random;
import java.util.concurrent.TimeUnit;/**
* An example of grouped stream windowing where different eviction and trigger
* policies can be used. A source fetches events from cars every 100 msec
* containing their id, their current speed (kmh), overall elapsed distance (m)
* and a timestamp. The streaming example triggers the top speed of each car
* every x meters elapsed for the last y seconds.
*/
public class TopSpeedWindowing { // *************************************************************************
// PROGRAM
// ************************************************************************* public static void main(String[] args) throws Exception { final ParameterTool params = ParameterTool.fromArgs(args); final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
env.getConfig().setGlobalJobParameters(params); @SuppressWarnings({"rawtypes", "serial"})
DataStream<Tuple4<Integer, Integer, Double, Long>> carData;
if (params.has("input")) {
carData = env.readTextFile(params.get("input")).map(new ParseCarData());
} else {
System.out.println("Executing TopSpeedWindowing example with default input data set.");
System.out.println("Use --input to specify file input.");
carData = env.addSource(CarSource.create(2));
} int evictionSec = 10;
double triggerMeters = 50;
DataStream<Tuple4<Integer, Integer, Double, Long>> topSpeeds = carData
.assignTimestampsAndWatermarks(new CarTimestamp()) //1
.keyBy(0)
.window(GlobalWindows.create()) //2
.evictor(TimeEvictor.of(Time.of(evictionSec, TimeUnit.SECONDS))) //3
.trigger(DeltaTrigger.of(triggerMeters,
new DeltaFunction<Tuple4<Integer, Integer, Double, Long>>() {//4
private static final long serialVersionUID = 1L; @Override
public double getDelta(
Tuple4<Integer, Integer, Double, Long> oldDataPoint,
Tuple4<Integer, Integer, Double, Long> newDataPoint) {
return newDataPoint.f2 - oldDataPoint.f2;
}
}, carData.getType().createSerializer(env.getConfig())))//4
.maxBy(1); if (params.has("output")) {
topSpeeds.writeAsText(params.get("output"));
} else {
System.out.println("Printing result to stdout. Use --output to specify output path.");
topSpeeds.print();
} env.execute("CarTopSpeedWindowingExample");
} // *************************************************************************
// USER FUNCTIONS
// ************************************************************************* private static class CarSource implements SourceFunction<Tuple4<Integer, Integer, Double, Long>> { private static final long serialVersionUID = 1L;
private Integer[] speeds;
private Double[] distances; private Random rand = new Random(); private volatile boolean isRunning = true; private CarSource(int numOfCars) {
speeds = new Integer[numOfCars];
distances = new Double[numOfCars];
Arrays.fill(speeds, 50);
Arrays.fill(distances, 0d);
} public static CarSource create(int cars) {
return new CarSource(cars);
} @Override
public void run(SourceContext<Tuple4<Integer, Integer, Double, Long>> ctx) throws Exception { while (isRunning) {
Thread.sleep(100);
for (int carId = 0; carId < speeds.length; carId++) {
if (rand.nextBoolean()) {
speeds[carId] = Math.min(100, speeds[carId] + 5);
} else {
speeds[carId] = Math.max(0, speeds[carId] - 5);
}
distances[carId] += speeds[carId] / 3.6d;
Tuple4<Integer, Integer, Double, Long> record = new Tuple4<>(carId,
speeds[carId], distances[carId], System.currentTimeMillis());
ctx.collect(record);
}
}
} @Override
public void cancel() {
isRunning = false;
}
} private static class ParseCarData extends RichMapFunction<String, Tuple4<Integer, Integer, Double, Long>> {
private static final long serialVersionUID = 1L; @Override
public Tuple4<Integer, Integer, Double, Long> map(String record) {
String rawData = record.substring(1, record.length() - 1);
String[] data = rawData.split(",");
return new Tuple4<>(Integer.valueOf(data[0]), Integer.valueOf(data[1]), Double.valueOf(data[2]), Long.valueOf(data[3]));
}
} private static class CarTimestamp extends AscendingTimestampExtractor<Tuple4<Integer, Integer, Double, Long>> {
private static final long serialVersionUID = 1L; @Override
public long extractAscendingTimestamp(Tuple4<Integer, Integer, Double, Long> element) {
return element.f3;
}
}}

其中,

1. 定义时间戳,上篇文章<flink中的时间戳如何使用?—Watermark使用及原理>上进行了介绍,本篇不做赘述。

2.窗口类型,Windows Assigner定义如何将数据流分配到一个或者多个窗口;其层次结构如下:

flink window实例分析

evictor:用于数据剔除,其层次结构如下:

flink window实例分析

3. trigger:窗口触发器,其层次结构如下:

flink window实例分析

4. Window function定义窗口内数据的计算逻辑,其层次结构如下:

flink window实例分析

参考资料

【1】https://www.jianshu.com/p/5302b48ca19b

相关推荐
python开发_常用的python模块及安装方法
adodb:我们领导推荐的数据库连接组件bsddb3:BerkeleyDB的连接组件Cheetah-1.0:我比较喜欢这个版本的cheeta…
日期:2022-11-24 点赞:878 阅读:9,492
Educational Codeforces Round 11 C. Hard Process 二分
C. Hard Process题目连接:http://www.codeforces.com/contest/660/problem/CDes…
日期:2022-11-24 点赞:807 阅读:5,907
下载Ubuntn 17.04 内核源代码
zengkefu@server1:/usr/src$ uname -aLinux server1 4.10.0-19-generic #21…
日期:2022-11-24 点赞:569 阅读:6,740
可用Active Desktop Calendar V7.86 注册码序列号
可用Active Desktop Calendar V7.86 注册码序列号Name: www.greendown.cn Code: &nb…
日期:2022-11-24 点赞:733 阅读:6,494
Android调用系统相机、自定义相机、处理大图片
Android调用系统相机和自定义相机实例本博文主要是介绍了android上使用相机进行拍照并显示的两种方式,并且由于涉及到要把拍到的照片显…
日期:2022-11-24 点赞:512 阅读:8,132
Struts的使用
一、Struts2的获取  Struts的官方网站为:http://struts.apache.org/  下载完Struts2的jar包,…
日期:2022-11-24 点赞:671 阅读:5,295