Skip to content

Regression using Flink 1.20 relating to timestamp precision #143

@davidradl

Description

@davidradl

I have a test case

 CREATE TABLE `source_1`
 (
     `customerId`                   STRING,
     `param_string_date_time`       TIMESTAMP_LTZ(9),
     `ts` TIMESTAMP(3),
     WATERMARK FOR `ts` AS `ts` - INTERVAL '1' MINUTE
 )
 WITH (
     'connector' = 'filesystem',
     'path' = '/Users/davidradley/temp/regressionissue.txt',
     'format' = 'json'
 );

with the file contents:

{ "orderId": 1, "customerId": "1", "param_string_date_time": "2022-11-20 00:00:00.000Z", "ts": "2020-04-15 08:05:00.000" }
{ "orderId": 2, "customerId": "2", "param_string_date_time": "2022-11-21 00:00:00.000Z", "ts": "2020-04-15 08:07:00.000" }
{ "orderId": 3, "customerId": "3", "param_string_date_time": "2022-11-22 00:00:00.000Z", "ts": "2020-04-15 08:09:00.000" }
{ "orderId": 4, "customerId": "4", "param_string_date_time": "2022-11-23 00:00:00.000Z", "ts": "2020-04-15 08:11:00.000" }

I then create a view

CREATE TEMPORARY VIEW `api_1_source_1__API` AS
SELECT *, PROCTIME() AS `proc_time`
FROM `source_1`; 

and a lookup table

CREATE TEMPORARY TABLE `api_1_lookup__API`
(
    `customerId`           STRING,
    `param_string_date_time` TIMESTAMP_LTZ(6)
)
WITH (
    'connector' = 'rest-lookup',
    'url' = 'http://localhost:8089/api1',
    'format' = 'json',
    'asyncPolling' = 'false',
    'lookup-method' = 'GET',
    'gid.connector.http.source.lookup.header.Content-Type' = 'application/json',
    'gid.connector.http.source.lookup.header.Origin' = '*',
    'gid.connector.http.source.lookup.header.X-Content-Type-Options' = 'nosniff',
    'gid.connector.http.source.lookup.request.timeout' = '30',
    'gid.connector.http.source.lookup.request.thread-pool.size' = '8',
    'gid.connector.http.source.lookup.response.thread-pool.size' = '4'
);

and so a lookup

SELECT
    `api_1_source_1__API`.`customerId` AS `customerId`,
    `api_1_lookup__API`.`param_string_date_time`  AS `param_string_date_time`
FROM `api_1_source_1__API`
JOIN `api_1_lookup__API` FOR SYSTEM_TIME AS OF `api_1_source_1__API`.`proc_time` ON
    `api_1_lookup__API`.`customerId`=`api_1_source_1__API`.`customerId` AND
    `api_1_lookup__API`.`param_string_date_time`=`api_1_source_1__API`.`param_string_date_time`;
;

At 1.19.1, the query that goes up has 2 query params customerId and param_string_date_time
At 1.20, the query that goes up has 1 query param customerId

This is caused by a flink PR which introduces logic to do compares at the maximum precision. The implication of this is that the table planner ends up seeing TIMESTAMP(6) and TIMESTAMP(9) as the 2 types in commonPhysicalLookupJoin analyzeLookupKeys. So it decides that this is not going to be a lookup key instead it treats it as a join condition.

Unfortunately the http connector currently is not looking for join conditions, so we lose this.

For JDBC connectors doing a look up, they support join conditions so the lookup join succeeds.

some thoughts / observations:

  • the http lookup connector to look for equality join conditions using similar logic to the JDBC connector then go ahead with this equality join condition as if it were a join key.
  • alternatively the lookup connector could be more explicit as to which conditions should be treated a lookup keys and which are join conditions, this would allow non-equality join conditions to be handled in the connector as a new capability
  • unfortunately applyFilters comes in an interface supportsFilterPushdown, which is only driven for scan sources.

@grzegorz8 @kzarzycki WDYT?

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions