Skip to content
This repository was archived by the owner on Feb 16, 2024. It is now read-only.

Commit f02ee07

Browse files
liufangliangpyscala
authored andcommitted
[BAHIR-234] Add ClickHouse Connector for Flink
Implement Streaming ClickHouseSink,support Flink Table API & Flink SQL for ClickHouse connector
1 parent b618a77 commit f02ee07

File tree

11 files changed

+1006
-1
lines changed

11 files changed

+1006
-1
lines changed

dev/checkstyle.xml

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,6 @@
1818
<!DOCTYPE module PUBLIC
1919
"-//Puppy Crawl//DTD Check Configuration 1.3//EN"
2020
"http://www.puppycrawl.com/dtds/configuration_1_3.dtd">
21-
2221
<!--
2322
2423
Checkstyle configuration based on the Google coding conventions from:

flink-connector-clickhouse/pom.xml

Lines changed: 111 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,111 @@
1+
<?xml version="1.0" encoding="UTF-8"?>
2+
<!--
3+
Licensed to the Apache Software Foundation (ASF) under one
4+
or more contributor license agreements. See the NOTICE file
5+
distributed with this work for additional information
6+
regarding copyright ownership. The ASF licenses this file
7+
to you under the Apache License, Version 2.0 (the
8+
"License"); you may not use this file except in compliance
9+
with the License. You may obtain a copy of the License at
10+
11+
http://www.apache.org/licenses/LICENSE-2.0
12+
13+
Unless required by applicable law or agreed to in writing,
14+
software distributed under the License is distributed on an
15+
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
16+
KIND, either express or implied. See the License for the
17+
specific language governing permissions and limitations
18+
under the License.
19+
-->
20+
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
21+
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
22+
<modelVersion>4.0.0</modelVersion>
23+
24+
<parent>
25+
<groupId>org.apache.bahir</groupId>
26+
<artifactId>bahir-flink-parent_2.11</artifactId>
27+
<version>1.1-SNAPSHOT</version>
28+
<relativePath>../pom.xml</relativePath>
29+
</parent>
30+
31+
<artifactId>flink-connector-clickhouse_${scala.binary.version}</artifactId>
32+
<name>flink-connector-clickhouse</name>
33+
<packaging>jar</packaging>
34+
35+
<dependencies>
36+
<dependency>
37+
<groupId>org.apache.flink</groupId>
38+
<artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
39+
<version>${flink.version}</version>
40+
<scope>provided</scope>
41+
</dependency>
42+
<dependency>
43+
<groupId>org.apache.flink</groupId>
44+
<artifactId>flink-streaming-scala_${scala.binary.version}</artifactId>
45+
<version>${flink.version}</version>
46+
<scope>provided</scope>
47+
</dependency>
48+
<dependency>
49+
<groupId>org.apache.flink</groupId>
50+
<artifactId>flink-table-api-java-bridge_${scala.binary.version}</artifactId>
51+
<version>${flink.version}</version>
52+
<scope>provided</scope>
53+
</dependency>
54+
55+
<dependency>
56+
<groupId>ru.yandex.clickhouse</groupId>
57+
<artifactId>clickhouse-jdbc</artifactId>
58+
<version>${clickhouse.version}</version>
59+
</dependency>
60+
61+
<dependency>
62+
<groupId>org.slf4j</groupId>
63+
<artifactId>slf4j-log4j12</artifactId>
64+
<version>1.7.7</version>
65+
<scope>runtime</scope>
66+
</dependency>
67+
<dependency>
68+
<groupId>log4j</groupId>
69+
<artifactId>log4j</artifactId>
70+
<version>1.2.17</version>
71+
<scope>runtime</scope>
72+
</dependency>
73+
74+
<dependency>
75+
<groupId>junit</groupId>
76+
<artifactId>junit</artifactId>
77+
<version>4.11</version>
78+
<scope>test</scope>
79+
</dependency>
80+
</dependencies>
81+
82+
<build>
83+
<pluginManagement>
84+
<plugins>
85+
<plugin>
86+
<groupId>org.apache.maven.plugins</groupId>
87+
<artifactId>maven-shade-plugin</artifactId>
88+
<executions>
89+
<execution>
90+
<phase>package</phase>
91+
<goals>
92+
<goal>shade</goal>
93+
</goals>
94+
<configuration>
95+
<shadeTestJar>false</shadeTestJar>
96+
<artifactSet>
97+
<excludes>
98+
<exclude>junit:junit</exclude>
99+
</excludes>
100+
<includes>
101+
<include>*:*</include>
102+
</includes>
103+
</artifactSet>
104+
</configuration>
105+
</execution>
106+
</executions>
107+
</plugin>
108+
</plugins>
109+
</pluginManagement>
110+
</build>
111+
</project>
Lines changed: 140 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,140 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
package com.apache.flink.streaming.connectors.clickhouse;
18+
import org.apache.flink.configuration.Configuration;
19+
import org.apache.flink.runtime.state.FunctionInitializationContext;
20+
import org.apache.flink.runtime.state.FunctionSnapshotContext;
21+
import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
22+
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
23+
import org.apache.flink.types.Row;
24+
import org.slf4j.Logger;
25+
import org.slf4j.LoggerFactory;
26+
import ru.yandex.clickhouse.BalancedClickhouseDataSource;
27+
import ru.yandex.clickhouse.settings.ClickHouseProperties;
28+
29+
import java.sql.Connection;
30+
import java.sql.PreparedStatement;
31+
import java.util.Properties;
32+
33+
public class ClickHouseAppendSinkFunction extends RichSinkFunction<Row> implements CheckpointedFunction {
34+
private static final String USERNAME = "user";
35+
private static final String PASSWORD = "password";
36+
37+
private static final Logger log = LoggerFactory.getLogger(ClickHouseAppendSinkFunction.class);
38+
private static final long serialVersionUID = 1L;
39+
40+
private Connection connection;
41+
private BalancedClickhouseDataSource dataSource;
42+
private PreparedStatement pstat;
43+
44+
private String address;
45+
private String username;
46+
private String password;
47+
48+
private String prepareStatement;
49+
private Integer batchSize;
50+
private Long commitPadding;
51+
52+
private Integer retries;
53+
private Long retryInterval;
54+
55+
private Boolean ignoreInsertError;
56+
57+
private Integer currentSize;
58+
private Long lastExecuteTime;
59+
60+
public ClickHouseAppendSinkFunction(String address, String username, String password, String prepareStatement, Integer batchSize, Long commitPadding, Integer retries, Long retryInterval, Boolean ignoreInsertError) {
61+
this.address = address;
62+
this.username = username;
63+
this.password = password;
64+
this.prepareStatement = prepareStatement;
65+
this.batchSize = batchSize;
66+
this.commitPadding = commitPadding;
67+
this.retries = retries;
68+
this.retryInterval = retryInterval;
69+
this.ignoreInsertError = ignoreInsertError;
70+
}
71+
72+
@Override
73+
public void open(Configuration parameters) throws Exception {
74+
super.open(parameters);
75+
Properties properties = new Properties();
76+
properties.setProperty(USERNAME, username);
77+
properties.setProperty(PASSWORD, password);
78+
ClickHouseProperties clickHouseProperties = new ClickHouseProperties(properties);
79+
dataSource = new BalancedClickhouseDataSource(address, clickHouseProperties);
80+
connection = dataSource.getConnection();
81+
pstat = connection.prepareStatement(prepareStatement);
82+
lastExecuteTime = System.currentTimeMillis();
83+
currentSize = 0;
84+
85+
}
86+
87+
@Override
88+
public void invoke(Row value, Context context) throws Exception {
89+
for (int i = 0; i < value.getArity(); i++) {
90+
pstat.setObject(i + 1, value.getField(i));
91+
}
92+
pstat.addBatch();
93+
currentSize++;
94+
if (currentSize >= batchSize || (System.currentTimeMillis() - lastExecuteTime) > commitPadding) {
95+
try {
96+
doExecuteRetries(retries, retryInterval);
97+
} catch (Exception e) {
98+
log.error("clickhouse-insert-error ( maxRetries:" + retries + " , retryInterval : " + retryInterval + " millisecond )" + e.getMessage());
99+
} finally {
100+
pstat.clearBatch();
101+
currentSize = 0;
102+
lastExecuteTime = System.currentTimeMillis();
103+
}
104+
}
105+
}
106+
107+
public void doExecuteRetries(int count, long retryInterval) throws Exception {
108+
109+
int retrySize = 0;
110+
Exception resultException = null;
111+
for (int i = 0; i < count; i++) {
112+
try {
113+
pstat.executeBatch();
114+
break;
115+
} catch (Exception e) {
116+
retrySize++;
117+
resultException = e;
118+
}
119+
try {
120+
Thread.sleep(retryInterval);
121+
} catch (InterruptedException e) {
122+
log.error("clickhouse retry interval exception : ",e);
123+
}
124+
}
125+
if (retrySize == count && !ignoreInsertError) {
126+
throw resultException;
127+
}
128+
}
129+
130+
@Override
131+
public void snapshotState(FunctionSnapshotContext functionSnapshotContext) throws Exception {
132+
doExecuteRetries(retries, retryInterval);
133+
}
134+
135+
@Override
136+
public void initializeState(FunctionInitializationContext functionInitializationContext) throws Exception {
137+
138+
}
139+
140+
}

0 commit comments

Comments
 (0)