翼度科技»论坛 编程开发 mysql 查看内容

MySQL Flink Watermark实现事件时间处理的关键技术

4

主题

4

帖子

12

积分

新手上路

Rank: 1

积分
12
1.概述

生活中有种场景:
车辆进入隧道,信号不好,出了隧道后,信号就正常了。
正常情况下,车辆进入隧道后,如果车辆正常,没有事故,会正常驶出隧道。
在正常的隧道行驶过程中,可能会因为信号的原因,导致数据没有像信号正常的时候那么快到达。
也就是说,这种情况下,数据出现了延迟。我们把这种延迟数据称之为迟到数据。
生活中,这种场景非常多,比如:车辆进入地下车库,手机欠费,网络抖动等。这都属于生活的正常情况。无法避免。
程序中,一般不会允许数据丢失。所以,我们程序会推出一些机制来保证迟到数据被正常处理。
Watermark就是用来保证正常迟到的数据被正确的处理。
Watermark,也叫水印,或者是水位线。用来处理一定程度下的延迟数据。

2.SQL案例-演示Watermark为零的情况
  1. #1.创建表
  2. CREATE TABLE source_table (
  3. user_id STRING,
  4. price BIGINT,
  5. `timestamp` bigint,
  6. row_time AS TO_TIMESTAMP(FROM_UNIXTIME(`timestamp`)),
  7. watermark for row_time as row_time - interval '0' second
  8. ) WITH (
  9.   'connector' = 'socket',
  10.   'hostname' = 'node1',
  11.   'port' = '9999',
  12.   'format' = 'csv'
  13. );
  14. #2.数据查询SQL
  15. select
  16. user_id,
  17. count(*) as pv,
  18. sum(price) as sum_price,
  19. UNIX_TIMESTAMP(CAST(tumble_start(row_time, interval '5' second) AS STRING)) * 1000  as window_start,
  20. UNIX_TIMESTAMP(CAST(tumble_end(row_time, interval '5' second) AS STRING)) * 1000  as window_end
  21. from source_table
  22. group by
  23.     user_id,
  24.     tumble(row_time, interval '5' second);
复制代码
3.SQL案例-演示Watermark不为零的情况

Watermark不为零,就有可能是两种情况:

  • 小于0,窗口会提前触发计算,这种情况在实际应用不存在,所以这里也不讨论
  • 大于0,窗口会延迟触发计算,延迟的时间就是我们设置的Watermark的值
这里,我们主要是讨论Watermark>0的情况。
  1. #1.创建表
  2. CREATE TABLE source_table (
  3. user_id STRING,
  4. price BIGINT,
  5. `timestamp` bigint,
  6. row_time AS TO_TIMESTAMP(FROM_UNIXTIME(`timestamp`)),
  7. watermark for row_time as row_time - interval '2' second
  8. ) WITH (
  9.   'connector' = 'socket',
  10.   'hostname' = 'node1',
  11.   'port' = '9999',
  12.   'format' = 'csv'
  13. );
  14. #2.Watermark的解释
  15. WATERMARK FOR ts AS ts - INTERVAL '2' SECOND
  16. 这里的2,表示,数据允许延迟2秒钟到达,窗口会在(正常结束+延迟时间)后触发计算
  17. #3.查询SQL
  18. select
  19. user_id,
  20. count(*) as pv,
  21. sum(price) as sum_price,
  22. UNIX_TIMESTAMP(CAST(tumble_start(row_time, interval '5' second) AS STRING)) * 1000  as window_start,
  23. UNIX_TIMESTAMP(CAST(tumble_end(row_time, interval '5' second) AS STRING)) * 1000  as window_end
  24. from source_table
  25. group by
  26.     user_id,
  27.     tumble(row_time, interval '5' second);
复制代码
到此这篇关于MySQL Flink Watermark实现事件时间处理的关键技术的文章就介绍到这了,更多相关MySQL Flink Watermark内容请搜索脚本之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持脚本之家!

来源:https://www.jb51.net/article/283703.htm
免责声明:由于采集信息均来自互联网,如果侵犯了您的权益,请联系我们【E-Mail:cb@itdo.tech】 我们会及时删除侵权内容,谢谢合作!

举报 回复 使用道具