当前位置: 首页 > >

flink 窗口统计数据输出_带你走进Flink流计算的世界

发布时间:





随着互联网企业从粗放式运营向精细化运营过渡,随时随地的查看业务数据,灵活指导运营决策,成为业务人员的高频需求。数聚力定位于数据化运营工具,满足了数据如影随形的理念。在数据的采集、清洗、配置、可视化、分析、报警、见解等各重要环节上,提供便捷的解决方案,使得从“接数”到“看数”的体验获得提升,也让业务人员从繁重的业务报表之中解放出来,能够聚焦于运营决策之上,产出更多的业务价值。


数聚力的不断发展壮大,上报的数据越来越多,进行的统计和计算也越发复杂,这时就需要快速、高效、和开发稳定的流计算。为了满足这部分需求,数聚力引入了具有高吞吐、低延迟、高可靠和精确计算等特性的数据计算引擎Apache Flink。这篇文章主要分享关于Flink在数聚力使用过程中的一些心得体会,及描述一次按照业务发生时间对流数据统计的过程,希望能对大家有所帮助。


统 计 流 程





所有流计算统计的流程都是:


接入数据源进行多次数据转换操作(过滤、拆分、聚合计算等)计算结果的存储

其中数据源可以是多个,数据转换的节点处理完数据可以发送到一个和多个下一个节点继续处理数据。Flink程序构建的基本单元是stream和transformation(DataSet实质上也是stream)。stream是一个中间结果数据,transformation对数据的加工和操作,该操作以一个或多个stream为输入,计算输出一个或多个stream为结果,最后可以sink来存储数据。






包括数据源,每一次发射出来的数据结果都通过DataStream来传递给下一级继续处理。每一个Transformation要有2步:


处理数据将处理完的数据发射出去
Flink 的 数 据 源

Flink提供数据源只需要实现SourceFunction接口即可。SourceFunction有一个抽象实现类RichParallelSourceFunction。继承该实现类实现3个方法,既可以自定义Source


public void open(Configuration parameters) //初始化时调用,可以初始化一些参数
public void run(SourceContext ctx)//发送数据,在该方法里调用ctx的collect方法将数据发射出去。下面例子是每20秒发送一个Order类型的实体:






Flink 的数据转换操作

Flink针对于不同的场景提供了不同的解决方案,减少了用户去关注处理过程中的一些逻辑和效率问题。常见的操作有下面这些:


“map”是做一些映射,比如我们把两个字符串合并成一个字符串,把一个字符串拆成两个或者三个字符串“flatMap”是把一个记录拆分成两条、三条、甚至是四条记录,例如把一个字符串分割成一个字符数组“Filter”类似于过滤“keyBy”等效于SQL里的group by“aggregate”是一个聚合操作,如计数、求和、求*均等“reduce”类似于MapReduce里的reduce“join”操作等同数据库里面的join“connect”实现把两个流连成一个流

常见的操作有filter、map、flatMap、keyBy(分组)、aggregate(聚合)等,也可以在官方文档找到其他相关的Function,具体的使用方式后面的例子中会体现。


窗 口

流数据的计算可以把连续不断的数据按照一定的规则拆分成大量的片段,在片段内进行统计和计算。比如可以把一小时内的数据保存到一个小的数据库表里,然后对这部分数据进行计算和统计,这时流计算是提供自动切割的一种机制-窗口。常见的窗口有:


以时间为单位的Time Window,例如:每1秒钟、每1个小时等以数据的数量为单位的Count Window,例如:每一百个元素

Flink给我们提供了一些通用的时间窗口模型。


1、Tumbling Windows(不重叠的)


数据流中的每一条数据仅属于一个窗口。每一个都有固定的大小,同时窗口间彼此之间不会出现重叠的部分。如果指定一个大小为5分钟的tumbling窗口,那么每5分钟便会启动一个窗口,如下图所示:






2、Sliding Windows(重叠的)


与Tumbling窗口不同的是,在构建Sliding窗口时不仅需要指定窗口大小,还会指定一个窗口滑动参数(window slide parameter)来确定窗口的开始位置。因此当窗口滑动参数小于窗口大小时,窗口之间可能会出现重复的区域。例如,当你指定窗口大小为10分钟,滑动参数为5分钟时,如下图所示:






3、Session Windows (会话窗口)


当数据流中一段时间没有数据,则Session窗口会关闭。因此,Session Windows没有固定的大小,无法计算Session窗口的开始位置。事实上,Session窗口最终是通过窗口合并来实现不规则窗口的。






Flink中的时间概念

Flink中有3中不同的时间概念


1. 处理时间 Processing Time,指的是上面进行Transformation操作时,当时的系统时间。


2. 事件时间 Event Time,指的是业务发生时间,每一条业务记录上会携带一个时间戳,需要指定数据中那一个属性中获取。


在按业务发生时间统计数据时,面临一个问题,当接收数据的时间是无序的时候,我们什么时间去触发聚合计算,不可能无限制的等待。Flink引入了Watermark的概念,它是给窗口看的,是告诉窗口最长等待的时间是多久,超过这个时间的数据就抛弃不再处理。


3. 提取时间 Ingestion Time,指的是数据进入Flink当时的系统时间。


订单统计的例子

第一步:构建环境






第二步:添加数据源






其中为了实现20秒发送一次数据,通过调用setParallelism(1),将数据源的实例数设置为1(仅有1个实例);


第三步:数据校验






第四步:设置时间戳和Watermarks






前面已经设置了使用EventTime来处理数据,那么在进行时间窗口计算前必须给数据分配获取时间戳的字段,这里设置了Order的timestamp字段为EventTime,同时也设置了一个1分钟的Watermarks,表示最多等待1分钟(为了不无限的等待下去),业务发生时间超过系统时间1分钟的数据都不进行统计。


第五步:数据分组






这里设置了以Order中biz字段进行分组,就意味着所有biz相同的数据会进入到同一个时间窗口中进行计算。


第六步:指定时间窗口、聚合计算






这里设置了一个以1分钟为单位的不重叠的TumblingEventTimeWindow。然后使用OrderSumAggregator来进行聚合计算。需要注意的是如果最前面设置的是使用ProcessTime来处理数据,这里的窗口就会变成TumblingProcessTimeWinwow,前后必须是一一对应。


聚 合 计 算

上面例子中比较核心的部分就是聚合计算,也就是OrderSumAggregator。它只需实现Flink给我们提供AggregateFunction接口重写其方法即可。


ACC createAccumulator();//创建一个数据统计的容器,提供给后续操作使用。ACC add(IN in, ACC acc);//每个元素被添加进窗口的时候调用。

第一个参数是添加进窗口的元素,第二个参数是统计的容器(上面创建的那个)。


OUT getResult(ACC acc);//窗口统计事件触发时调用来返回出统计的结果。ACC merge(ACC acc1, ACC acc2);//只有在当窗口合并的时候调用,合并2个容器。

其中这个容器根据情况也可以是在内存里提供,也可以是在其他存储设备中提供。通过上面的例子就实现了按照业务时间来统计每分钟内的订单数量,订单最多可以延迟1分钟上报。但是为了等待1分钟内上报的数据,造成了数据会延迟1分钟进行统计,例如8点02分才能统计到8点到8点01分上报的数据。


为了解决这个问题,可以给window再设置一个自定义的统计触发器,这个触发器可以在整点触发统计事件(也就是调用上面的getResults方法),这样就达到了8点到8点01分这个时间段的数据,在8点01分统计一次,在8点02分再重新统计一次(加上后面1分钟上报的数据)。


总 结

在开发运维上来讲,Flink相较于其他计算引擎有比较强大和灵活的窗口api支持,而且自身提供的大量Function能覆盖日常数据转换中的大部分需求,但目前来说相关资料还是较少,学*只能依赖官网的API和阅读源码。最后,感谢大家一如既往的对数聚力产品和团队的支持!我们会加倍努力,为大家做出更好用的数据工具!



友情链接: