您的位置 首页 java

Flink SQL 时区

Flink SQL 时区

大数据技术AI Flink/Spark/Hadoop/数仓,数据分析、面试,源码解读等干货学习资料 95篇原创内容 –>

公众号

TIMESTAMP vs TIMESTAMP_LTZ


TIMESTAMP 类型

  1. TIMESTAMP(p) TIMESTAMP(p) WITHOUT TIME ZONE 的简写, 精度 p 支持的范围是0-9, 默认是6。
  2. TIMESTAMP 用于描述年, 月, 日, 小时, 分钟, 秒 和 小数秒对应的时间戳
  3. TIMESTAMP 可以通过一个字符串来指定,例如:
 Flink SQL> SELECT TIMESTAMP '1970-01-01 00:00:04.001';

+-------------------------+

| 1970-01-01 00:00:04.001 |

+-------------------------+

  

TIMESTAMP_LTZ 类型

  • TIMESTAMP_LTZ(p) TIMESTAMP(p) WITH LOCAL TIME ZONE 的简写, 精度 p 支持的范围是0-9, 默认是6。
  • TIMESTAMP_LTZ 用于描述时间线上的 绝对时间点 , 使用 long 保存从 epoch 至今的毫秒数, 使用int保存毫秒中的纳秒数。epoch 时间是从 java 的标准 epoch 时间 1970-01-01T00:00:00Z 开始计算。在计算和可视化时, 每个 TIMESTAMP_LTZ 类型的数据都是使用的 session (会话)中配置的时区。
  • TIMESTAMP_LTZ 没有字符串表达形式因此无法通过字符串来指定, 可以通过一个 long 类型的 epoch 时间来转化(例如: 通过 Java 来产生一个 long 类型的 epoch 时间 System.currentTimeMillis() )
  • TIMESTAMP_LTZ 可以用于跨时区的计算,因为它是一个基于 epoch 的绝对时间点(比如上例中的 4001 毫秒)代表的就是不同时区的同一个绝对时间点。
  • 背景知识 在同一个时间点, 全世界所有的机器上执行 System.currentTimeMillis() 都会返回同样的值。(比如上例中的 4001 milliseconds), 这就是绝对时间的定义。
 Flink SQL> SET 'table.local-time-zone' = 'UTC';

Flink SQL> SELECT * FROM T1;

+---------------------------+

| TO_TIMESTAMP_LTZ(4001, 3) |

+---------------------------+

|   1970-01-01 00:00:04.001 |

+---------------------------+




Flink SQL> SET 'table.local-time-zone' = ' Asia /Shanghai';

Flink SQL> SELECT * FROM T1;

+---------------------------+

| TO_TIMESTAMP_LTZ(4001, 3) |

+---------------------------+

|   1970-01-01 08:00:04.001 |

+---------------------------+

  

时区设置


scala 设置:

 val envSetting = EnvironmentSettings.inStreamingMode()

val tEnv = TableEnvironment.create(envSetting)




// 设置为 UTC 时区

tEnv.getConfig.setLocalTimeZone(ZoneId.of("UTC"))




// 设置为上海时区

tEnv.getConfig.setLocalTimeZone(ZoneId.of("Asia/Shanghai"))




// 设置为 Los_Angeles 时区

tEnv.getConfig.setLocalTimeZone(ZoneId.of("America/Los_Angeles"))

  

SQL Client 设置:

 -- 设置为 UTC 时区

Flink SQL> SET 'table.local-time-zone' = 'UTC';




-- 设置为上海时区

Flink SQL> SET 'table.local-time-zone' = 'Asia/Shanghai';




-- 设置为Los_Angeles时区

Flink SQL> SET 'table.local-time-zone' = 'America/Los_Angeles';

  

处理时间和时区


Flink SQL 使用函数 PROCTIME() 来定义处理时间属性, 该函数返回的类型是 TIMESTAMP_LTZ

例如:当上海的时间为 2021-03-01 12:00:00 时, PROCTIME() 显示的时间却是错误的 2021-03-01 04:00:00 。这个问题在 Flink 1.13 中修复了, 因此用户不用再去处理时区的问题了。

 Flink SQL> SET 'table.local-time-zone' = 'UTC';

Flink SQL> SELECT PROCTIME();
+-------------------------+

|              PROCTIME() |

+-------------------------+

| 2021-04-15 14:48:31.387 |

+-------------------------+
Flink SQL> SET 'table.local-time-zone' = 'Asia/Shanghai';

Flink SQL> SELECT PROCTIME();
+-------------------------+

|              PROCTIME() |

+-------------------------+

| 2021-04-15 22:48:31.387 |

+-------------------------+
Flink SQL> CREATE TABLE MyTable1 (

                  item STRING,

                  price DOUBLE,

                  proctime as PROCTIME()

            ) WITH (

                'connector' = ' socket ',

                'hostname' = ' 127.0.0.1 ',

                'port' = '9999',

                'format' = ' csv '

           );




Flink SQL> CREATE VIEW MyView3 AS

            SELECT

                TUMBLE_START(proctime, INTERVAL '10' MINUTES) AS window_start,

                TUMBLE_END(proctime, INTERVAL '10' MINUTES) AS window_end,

                TUMBLE_PROCTIME(proctime, INTERVAL '10' MINUTES) as window_proctime,

                item,

                MAX(price) as max_price

            FROM MyTable1

                GROUP BY TUMBLE(proctime, INTERVAL '10' MINUTES), item;




Flink SQL> DESC MyView3;
+-----------------+-----------------------------+-------+-----+--------+-----------+

|           name  |                        type |  null | key | extras | watermark |

+-----------------+-----------------------------+-------+-----+--------+-----------+

|    window_start |                TIMESTAMP(3) | false |     |        |           |

|      window_end |                TIMESTAMP(3) | false |     |        |           |

| window_proctime | TIMESTAMP_LTZ(3) *PROCTIME* | false |     |        |           |

|            item |                      STRING | true  |     |        |           |

|       max_price |                      DOUBLE |  true |     |        |           |

+-----------------+-----------------------------+-------+-----+--------+-----------+

  

在终端执行以下命令写入数据到 MyTable1

 > nc -lk 9999

A,1.1

B,1.2

A,1.8

B,2.5

C,3.8
  
 Flink SQL> SET 'table.local-time-zone' = 'UTC';
Flink SQL> SELECT * FROM MyView3;  
 +-------------------------+-------------------------+-------------------------+------+-----------+

|            window_start |              window_end |          window_procime | item | max_price |

+-------------------------+-------------------------+-------------------------+------+-----------+

| 2021-04-15 14:00:00.000 | 2021-04-15 14:10:00.000 | 2021-04-15 14:10:00.005 |    A |       1.8 |

| 2021-04-15 14:00:00.000 | 2021-04-15 14:10:00.000 | 2021-04-15 14:10:00.007 |    B |       2.5 |

| 2021-04-15 14:00:00.000 | 2021-04-15 14:10:00.000 | 2021-04-15 14:10:00.007 |    C |       3.8 |

+-------------------------+-------------------------+-------------------------+------+-----------+
Flink SQL> SET 'table.local-time-zone' = 'Asia/Shanghai';

Flink SQL> SELECT * FROM MyView3;

  

相比在 UTC 时区下的计算结果, 在 Asia/Shanghai 时区下计算的窗口开始时间, 窗口结束时间和窗口处理时间是不同的。

 +-------------------------+-------------------------+-------------------------+------+-----------+

|            window_start |              window_end |          window_procime | item | max_price |

+-------------------------+-------------------------+-------------------------+------+-----------+

| 2021-04-15 22:00:00.000 | 2021-04-15 22:10:00.000 | 2021-04-15 22:10:00.005 |    A |       1.8 |

| 2021-04-15 22:00:00.000 | 2021-04-15 22:10:00.000 | 2021-04-15 22:10:00.007 |    B |       2.5 |

| 2021-04-15 22:00:00.000 | 2021-04-15 22:10:00.000 | 2021-04-15 22:10:00.007 |    C |       3.8 |

+-------------------------+-------------------------+-------------------------+------+-----------+
  

处理时间窗口是不确定的, 每次运行都会返回不同的窗口和聚合结果。

事件时间和时区


TIMESTAMP 上的事件时间属性

如果 source 中的时间用于表示 年-月-日-小时-分钟-秒 , 通常是一个不带时区的字符串, 例如: 2020-04-15 20:13:40.564 。推荐在 TIMESTAMP 列上定义事件时间属性。

 Flink SQL> CREATE TABLE MyTable2 (

                  item STRING,

                  price DOUBLE,

                  ts TIMESTAMP(3), -- TIMESTAMP data type

                  WATERMARK FOR ts AS ts - INTERVAL '10' SECOND

            ) WITH (

                'connector' = 'socket',

                'hostname' = '127.0.0.1',

                'port' = '9999',

                'format' = 'csv'

           );




Flink SQL> CREATE VIEW MyView4 AS

            SELECT

                TUMBLE_START(ts, INTERVAL '10' MINUTES) AS window_start,

                TUMBLE_END(ts, INTERVAL '10' MINUTES) AS window_end,

                TUMBLE_ROWTIME(ts, INTERVAL '10' MINUTES) as window_rowtime,

                item,

                MAX(price) as max_price

            FROM MyTable2

                GROUP BY TUMBLE(ts, INTERVAL '10' MINUTES), item;




Flink SQL> DESC MyView4;
+----------------+------------------------+------+-----+--------+-----------+

|           name |                   type | null | key | extras | watermark |

+----------------+------------------------+------+-----+--------+-----------+

|   window_start |           TIMESTAMP(3) | true |     |        |           |

|     window_end |           TIMESTAMP(3) | true |     |        |           |

| window_rowtime | TIMESTAMP(3) *ROWTIME* | true |     |        |           |

|           item |                 STRING | true |     |        |           |

|      max_price |                 DOUBLE | true |     |        |           |

+----------------+------------------------+------+-----+--------+-----------+

  

在终端执行以下命令用于写入数据到 MyTable2

 > nc -lk 9999

A,1.1,2021-04-15 14:01:00

B,1.2,2021-04-15 14:02:00

A,1.8,2021-04-15 14:03:00 

B,2.5,2021-04-15 14:04:00

C,3.8,2021-04-15 14:05:00       

C,3.8,2021-04-15 14:11:00
Flink SQL> SET 'table.local-time-zone' = 'UTC'; 

Flink SQL> SELECT * FROM MyView4;
+-------------------------+-------------------------+-------------------------+------+-----------+

|            window_start |              window_end |          window_rowtime | item | max_price |

+-------------------------+-------------------------+-------------------------+------+-----------+

| 2021-04-15 14:00:00.000 | 2021-04-15 14:10:00.000 | 2021-04-15 14:09:59.999 |    A |       1.8 |

| 2021-04-15 14:00:00.000 | 2021-04-15 14:10:00.000 | 2021-04-15 14:09:59.999 |    B |       2.5 |

| 2021-04-15 14:00:00.000 | 2021-04-15 14:10:00.000 | 2021-04-15 14:09:59.999 |    C |       3.8 |

+-------------------------+-------------------------+-------------------------+------+-----------+
Flink SQL> SET 'table.local-time-zone' = 'Asia/Shanghai'; 

Flink SQL> SELECT * FROM MyView4;

  

相比在 UTC 时区下的计算结果, 在 Asia/Shanghai 时区下计算的窗口开始时间, 窗口结束时间和窗口的 rowtime 是相同的。

 +-------------------------+-------------------------+-------------------------+------+-----------+


|            window_start |              window_end |          window_rowtime | item | max_price |


+-------------------------+-------------------------+-------------------------+------+-----------+


| 2021-04-15 14:00:00.000 | 2021-04-15 14:10:00.000 | 2021-04-15 14:09:59.999 |    A |       1.8 |


| 2021-04-15 14:00:00.000 | 2021-04-15 14:10:00.000 | 2021-04-15 14:09:59.999 |    B |       2.5 |


| 2021-04-15 14:00:00.000 | 2021-04-15 14:10:00.000 | 2021-04-15 14:09:59.999 |    C |       3.8 |


+-------------------------+-------------------------+-------------------------+------+-----------+




TIMESTAMP_LTZ 上的事件时间属性如果源数据中的时间为一个 epoch 时间, 通常是一个 long 值, 例如: 1618989564564

 ,推荐将事件时间属性定义在 TIMESTAMP_LTZ

 列上。Flink SQL> CREATE TABLE MyTable3 (


                  item STRING,


                  price DOUBLE,


                  ts BIGINT, -- long time value in epoch milliseconds


                  ts_ltz AS TO_TIMESTAMP_LTZ(ts, 3),


                  WATERMARK FOR ts_ltz AS ts_ltz - INTERVAL '10' SECOND


            ) WITH (


                'connector' = 'socket',


                'hostname' = '127.0.0.1',


                'port' = '9999',


                'format' = 'csv'


           );






Flink SQL> CREATE VIEW MyView5 AS 


            SELECT 


                TUMBLE_START(ts_ltz, INTERVAL '10' MINUTES) AS window_start,        


                TUMBLE_END(ts_ltz, INTERVAL '10' MINUTES) AS window_end,


                TUMBLE_ROWTIME(ts_ltz, INTERVAL '10' MINUTES) as window_rowtime,


                item,


                MAX(price) as max_price


            FROM MyTable3


                GROUP BY TUMBLE(ts_ltz, INTERVAL '10' MINUTES), item;






Flink SQL> DESC MyView5;

+----------------+----------------------------+-------+-----+--------+-----------+


|           name |                       type |  null | key | extras | watermark |


+----------------+----------------------------+-------+-----+--------+-----------+


|   window_start |               TIMESTAMP(3) | false |     |        |           |


|     window_end |               TIMESTAMP(3) | false |     |        |           |


| window_rowtime | TIMESTAMP_LTZ(3) *ROWTIME* |  true |     |        |           |


|           item |                     STRING |  true |     |        |           |


|      max_price |                     DOUBLE |  true |     |        |           |


+----------------+----------------------------+-------+-----+--------+-----------+




MyTable3

 的输入数据为:A,1.1,1618495260000  # The corresponding utc timestamp is 2021-04-15 14:01:00


B,1.2,1618495320000  # The corresponding utc timestamp is 2021-04-15 14:02:00


A,1.8,1618495380000  # The corresponding utc timestamp is 2021-04-15 14:03:00


B,2.5,1618495440000  # The corresponding utc timestamp is 2021-04-15 14:04:00


C,3.8,1618495500000  # The corresponding utc timestamp is 2021-04-15 14:05:00       


C,3.8,1618495860000  # The corresponding utc timestamp is 2021-04-15 14:11:00

Flink SQL> SET 'table.local-time-zone' = 'UTC'; 


Flink SQL> SELECT * FROM MyView5;

+-------------------------+-------------------------+-------------------------+------+-----------+


|            window_start |              window_end |          window_rowtime | item | max_price |


+-------------------------+-------------------------+-------------------------+------+-----------+


| 2021-04-15 14:00:00.000 | 2021-04-15 14:10:00.000 | 2021-04-15 14:09:59.999 |    A |       1.8 |


| 2021-04-15 14:00:00.000 | 2021-04-15 14:10:00.000 | 2021-04-15 14:09:59.999 |    B |       2.5 |


| 2021-04-15 14:00:00.000 | 2021-04-15 14:10:00.000 | 2021-04-15 14:09:59.999 |    C |       3.8 |


+-------------------------+-------------------------+-------------------------+------+-----------+

Flink SQL> SET 'table.local-time-zone' = 'Asia/Shanghai'; 


Flink SQL> SELECT * FROM MyView5;




相比在 UTC 时区下的计算结果, 在 Asia/Shanghai 时区下计算的窗口开始时间, 窗口结束时间和窗口的 rowtime 是不同的。+-------------------------+-------------------------+-------------------------+------+-----------+


|            window_start |              window_end |          window_rowtime | item | max_price |


+-------------------------+-------------------------+-------------------------+------+-----------+


| 2021-04-15 22:00:00.000 | 2021-04-15 22:10:00.000 | 2021-04-15 22:09:59.999 |    A |       1.8 |


| 2021-04-15 22:00:00.000 | 2021-04-15 22:10:00.000 | 2021-04-15 22:09:59.999 |    B |       2.5 |


| 2021-04-15 22:00:00.000 | 2021-04-15 22:10:00.000 | 2021-04-15 22:09:59.999 |    C |       3.8 |


+-------------------------+-------------------------+-------------------------+------+-----------+


Batch 模式和 Streaming 模式的区别
以下函数:
LOCALTIMELOCALTIMESTAMPCURRENT_DATECURRENT_TIMECURRENT_TIMESTAMPNOW()Flink 会根据执行模式来进行不同计算:Streaming 模式下这些函数是每条记录都会计算一次Batch 模式下,只会在 query 开始时计算一次,所有记录都使用相同的结果。以下时间函数无论是在 Streaming 模式还是 Batch 模式下,都会为每条记录计算一次结果:CURRENT_ROW_TIMESTAMP()PROCTIME()

 
  

文章来源:智云一二三科技

文章标题:Flink SQL 时区

文章地址:https://www.zhihuclub.com/189184.shtml

关于作者: 智云科技

热门文章

网站地图