Skip to content

Commit ff294d3

Browse files
committed
Initial Commit: adding ClickHouseDialect
1 parent 82f5768 commit ff294d3

20 files changed

+2697
-0
lines changed
Lines changed: 94 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,94 @@
1+
<?xml version="1.0" encoding="UTF-8"?>
2+
<project xmlns="http://maven.apache.org/POM/4.0.0"
3+
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
4+
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
5+
<modelVersion>4.0.0</modelVersion>
6+
7+
<parent>
8+
<groupId>org.apache.flink</groupId>
9+
<artifactId>flink-connector-jdbc-parent</artifactId>
10+
<version>4.0-SNAPSHOT</version>
11+
</parent>
12+
13+
<artifactId>flink-connector-jdbc-clickhouse</artifactId>
14+
<name>Flink : Connectors : JDBC : ClickHouse</name>
15+
16+
<packaging>jar</packaging>
17+
18+
<properties>
19+
<clickhouse.version>0.6.2</clickhouse.version>
20+
</properties>
21+
22+
<dependencies>
23+
24+
<dependency>
25+
<groupId>org.apache.flink</groupId>
26+
<artifactId>flink-connector-jdbc-core</artifactId>
27+
<version>${project.version}</version>
28+
</dependency>
29+
30+
<dependency>
31+
<groupId>org.apache.flink</groupId>
32+
<artifactId>flink-connector-jdbc-core</artifactId>
33+
<version>${project.version}</version>
34+
<type>test-jar</type>
35+
<scope>test</scope>
36+
</dependency>
37+
38+
<dependency>
39+
<groupId>org.apache.flink</groupId>
40+
<artifactId>flink-table-api-java-bridge</artifactId>
41+
<version>${flink.version}</version>
42+
<scope>provided</scope>
43+
<optional>true</optional>
44+
</dependency>
45+
46+
<dependency>
47+
<groupId>org.apache.flink</groupId>
48+
<artifactId>flink-table-planner_${scala.binary.version}</artifactId>
49+
<version>${flink.version}</version>
50+
<scope>test</scope>
51+
</dependency>
52+
<dependency>
53+
<groupId>org.apache.flink</groupId>
54+
<artifactId>flink-table-planner_${scala.binary.version}</artifactId>
55+
<version>${flink.version}</version>
56+
<type>test-jar</type>
57+
<scope>test</scope>
58+
</dependency>
59+
60+
<dependency>
61+
<groupId>org.apache.flink</groupId>
62+
<artifactId>flink-test-utils</artifactId>
63+
<version>${flink.version}</version>
64+
<scope>test</scope>
65+
</dependency>
66+
67+
68+
<!-- ClickHouse -->
69+
<dependency>
70+
<groupId>com.clickhouse</groupId>
71+
<artifactId>clickhouse-jdbc</artifactId>
72+
<version>${clickhouse.version}</version>
73+
<scope>provided</scope>
74+
</dependency>
75+
76+
<!-- Assertions test dependencies -->
77+
78+
<dependency>
79+
<groupId>org.assertj</groupId>
80+
<artifactId>assertj-core</artifactId>
81+
<version>${assertj.version}</version>
82+
<scope>test</scope>
83+
</dependency>
84+
85+
<!-- ClickHouse tests -->
86+
<dependency>
87+
<groupId>org.testcontainers</groupId>
88+
<artifactId>clickhouse</artifactId>
89+
<scope>test</scope>
90+
</dependency>
91+
92+
</dependencies>
93+
94+
</project>
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,50 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.flink.connector.jdbc.clickhouse.database;
20+
21+
import org.apache.flink.annotation.Internal;
22+
import org.apache.flink.connector.jdbc.clickhouse.database.dialect.ClickHouseDialect;
23+
import org.apache.flink.connector.jdbc.core.database.JdbcFactory;
24+
import org.apache.flink.connector.jdbc.core.database.catalog.JdbcCatalog;
25+
import org.apache.flink.connector.jdbc.core.database.dialect.JdbcDialect;
26+
27+
/** Factory for {@link ClickHouseDialect}. */
28+
@Internal
29+
public class ClickHouseFactory implements JdbcFactory {
30+
@Override
31+
public boolean acceptsURL(String url) {
32+
return url.startsWith("jdbc:clickhouse:");
33+
}
34+
35+
@Override
36+
public JdbcDialect createDialect() {
37+
return new ClickHouseDialect();
38+
}
39+
40+
@Override
41+
public JdbcCatalog createCatalog(
42+
ClassLoader classLoader,
43+
String catalogName,
44+
String defaultDatabase,
45+
String username,
46+
String pwd,
47+
String baseUrl) {
48+
throw new UnsupportedOperationException("Catalog for ClickHouse is not supported yet.");
49+
}
50+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,139 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.flink.connector.jdbc.clickhouse.database.dialect;
20+
21+
import org.apache.flink.annotation.Internal;
22+
import org.apache.flink.connector.jdbc.core.database.dialect.AbstractDialect;
23+
import org.apache.flink.connector.jdbc.core.database.dialect.JdbcDialectConverter;
24+
import org.apache.flink.table.types.logical.LogicalTypeRoot;
25+
import org.apache.flink.table.types.logical.RowType;
26+
27+
import java.util.Arrays;
28+
import java.util.EnumSet;
29+
import java.util.Optional;
30+
import java.util.Set;
31+
import java.util.stream.Collectors;
32+
33+
import static java.lang.String.format;
34+
35+
/** JDBC dialect for ClickHouse. */
36+
@Internal
37+
public class ClickHouseDialect extends AbstractDialect {
38+
39+
private static final long serialVersionUID = 1L;
40+
41+
// Define MAX/MIN precision of TIMESTAMP type according to ClickHouse docs:
42+
// https://clickhouse.com/docs/sql-reference/data-types/datetime64
43+
private static final int MAX_TIMESTAMP_PRECISION = 9;
44+
private static final int MIN_TIMESTAMP_PRECISION = 0;
45+
46+
// Define MAX/MIN precision of DECIMAL type according to ClickHouse docs:
47+
// https://clickhouse.com/docs/sql-reference/data-types/decimal
48+
private static final int MAX_DECIMAL_PRECISION = 76;
49+
private static final int MIN_DECIMAL_PRECISION = 1;
50+
51+
@Override
52+
public JdbcDialectConverter getRowConverter(RowType rowType) {
53+
return new ClickHouseDialectConverter(rowType);
54+
}
55+
56+
@Override
57+
public String getLimitClause(long limit) {
58+
return "LIMIT " + limit;
59+
}
60+
61+
@Override
62+
public Optional<String> defaultDriverName() {
63+
return Optional.of("com.clickhouse.jdbc.ClickHouseDriver");
64+
}
65+
66+
@Override
67+
public String dialectName() {
68+
return "ClickHouse";
69+
}
70+
71+
@Override
72+
public String quoteIdentifier(String identifier) {
73+
return identifier;
74+
}
75+
76+
// ClickHouse does not support Upsert statements
77+
// Instead you can create a table with ReplacingMergeTree engine;
78+
// https://clickhouse.com/docs/engines/table-engines/mergetree-family/replacingmergetree
79+
@Override
80+
public Optional<String> getUpsertStatement(
81+
String tableName, String[] fieldNames, String[] uniqueKeyFields) {
82+
return Optional.empty();
83+
}
84+
85+
// ClickHouse pkField cannot be updated via sql
86+
@Override
87+
public String getUpdateStatement(
88+
String tableName, String[] fieldNames, String[] conditionFields) {
89+
String setClause =
90+
Arrays.stream(fieldNames)
91+
.filter(item -> !Arrays.asList(conditionFields).contains(item))
92+
.map(f -> format("%s = :%s", quoteIdentifier(f), f))
93+
.collect(Collectors.joining(", "));
94+
String conditionClause =
95+
Arrays.stream(conditionFields)
96+
.map(f -> format("%s = :%s", quoteIdentifier(f), f))
97+
.collect(Collectors.joining(" AND "));
98+
return "UPDATE "
99+
+ quoteIdentifier(tableName)
100+
+ " SET "
101+
+ setClause
102+
+ " WHERE "
103+
+ conditionClause;
104+
}
105+
106+
@Override
107+
public Optional<Range> decimalPrecisionRange() {
108+
return Optional.of(Range.of(MIN_DECIMAL_PRECISION, MAX_DECIMAL_PRECISION));
109+
}
110+
111+
@Override
112+
public Optional<Range> timestampPrecisionRange() {
113+
return Optional.of(Range.of(MIN_TIMESTAMP_PRECISION, MAX_TIMESTAMP_PRECISION));
114+
}
115+
116+
@Override
117+
public Set<LogicalTypeRoot> supportedTypes() {
118+
// The data types used in ClickHouse are list at:
119+
// https://clickhouse.com/docs/sql-reference/data-types
120+
121+
return EnumSet.of(
122+
LogicalTypeRoot.CHAR,
123+
LogicalTypeRoot.VARCHAR,
124+
LogicalTypeRoot.BOOLEAN,
125+
LogicalTypeRoot.DECIMAL,
126+
LogicalTypeRoot.TINYINT,
127+
LogicalTypeRoot.SMALLINT,
128+
LogicalTypeRoot.INTEGER,
129+
LogicalTypeRoot.BIGINT,
130+
LogicalTypeRoot.FLOAT,
131+
LogicalTypeRoot.DOUBLE,
132+
LogicalTypeRoot.DATE,
133+
LogicalTypeRoot.MAP,
134+
LogicalTypeRoot.ARRAY,
135+
LogicalTypeRoot.TIME_WITHOUT_TIME_ZONE,
136+
LogicalTypeRoot.TIMESTAMP_WITHOUT_TIME_ZONE,
137+
LogicalTypeRoot.TIMESTAMP_WITH_TIME_ZONE);
138+
}
139+
}

0 commit comments

Comments
 (0)