implements CheckpointedFunction {
+ private static final String USERNAME = "user";
+ private static final String PASSWORD = "password";
+
+ private static final Logger log = LoggerFactory.getLogger(ClickHouseAppendSinkFunction.class);
+ private static final long serialVersionUID = 1L;
+
+ private Connection connection;
+ private BalancedClickhouseDataSource dataSource;
+ private PreparedStatement pstat;
+
+ private String address;
+ private String username;
+ private String password;
+
+ private String prepareStatement;
+ private Integer batchSize;
+ private Long commitPadding;
+
+ private Integer retries;
+ private Long retryInterval;
+
+ private Boolean ignoreInsertError;
+
+ private Integer currentSize;
+ private Long lastExecuteTime;
+
+ public ClickHouseAppendSinkFunction(String address, String username, String password, String prepareStatement, Integer batchSize, Long commitPadding, Integer retries, Long retryInterval, Boolean ignoreInsertError) {
+ this.address = address;
+ this.username = username;
+ this.password = password;
+ this.prepareStatement = prepareStatement;
+ this.batchSize = batchSize;
+ this.commitPadding = commitPadding;
+ this.retries = retries;
+ this.retryInterval = retryInterval;
+ this.ignoreInsertError = ignoreInsertError;
+ }
+
+ @Override
+ public void open(Configuration parameters) throws Exception {
+ super.open(parameters);
+ Properties properties = new Properties();
+ properties.setProperty(USERNAME, username);
+ properties.setProperty(PASSWORD, password);
+ ClickHouseProperties clickHouseProperties = new ClickHouseProperties(properties);
+ dataSource = new BalancedClickhouseDataSource(address, clickHouseProperties);
+ connection = dataSource.getConnection();
+ pstat = connection.prepareStatement(prepareStatement);
+ lastExecuteTime = System.currentTimeMillis();
+ currentSize = 0;
+
+ }
+
+ @Override
+ public void invoke(Row value, Context context) throws Exception {
+ for (int i = 0; i < value.getArity(); i++) {
+ pstat.setObject(i + 1, value.getField(i));
+ }
+ pstat.addBatch();
+ currentSize++;
+ if (currentSize >= batchSize || (System.currentTimeMillis() - lastExecuteTime) > commitPadding) {
+ try {
+ doExecuteRetries(retries, retryInterval);
+ } catch (Exception e) {
+ log.error("clickhouse-insert-error ( maxRetries:" + retries + " , retryInterval : " + retryInterval + " millisecond )" + e.getMessage());
+ } finally {
+ pstat.clearBatch();
+ currentSize = 0;
+ lastExecuteTime = System.currentTimeMillis();
+ }
+ }
+ }
+
+ public void doExecuteRetries(int count, long retryInterval) throws Exception {
+
+ int retrySize = 0;
+ Exception resultException = null;
+ for (int i = 0; i < count; i++) {
+ try {
+ pstat.executeBatch();
+ break;
+ } catch (Exception e) {
+ retrySize++;
+ resultException = e;
+ }
+ try {
+ Thread.sleep(retryInterval);
+ } catch (InterruptedException e) {
+ log.error("clickhouse retry interval exception : ",e);
+ }
+ }
+ if (retrySize == count && !ignoreInsertError) {
+ throw resultException;
+ }
+ }
+
+ @Override
+ public void snapshotState(FunctionSnapshotContext functionSnapshotContext) throws Exception {
+ doExecuteRetries(retries, retryInterval);
+ }
+
+ @Override
+ public void initializeState(FunctionInitializationContext functionInitializationContext) throws Exception {
+
+ }
+
+}
diff --git a/flink-connector-clickhouse/src/main/java/com/apache/flink/streaming/connectors/clickhouse/ClickHouseTableSink.java b/flink-connector-clickhouse/src/main/java/com/apache/flink/streaming/connectors/clickhouse/ClickHouseTableSink.java
new file mode 100644
index 00000000..8550bbfa
--- /dev/null
+++ b/flink-connector-clickhouse/src/main/java/com/apache/flink/streaming/connectors/clickhouse/ClickHouseTableSink.java
@@ -0,0 +1,274 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.apache.flink.streaming.connectors.clickhouse;
+
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.datastream.DataStreamSink;
+import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.sinks.AppendStreamTableSink;
+import org.apache.flink.table.sinks.TableSink;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.utils.TableConnectorUtils;
+import org.apache.flink.types.Row;
+
+import java.util.Arrays;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * An at-least-once Table sink for ClickHouse.
+ *
+ * The mechanisms of Flink guarantees delivering messages at-least-once to this sink (if
+ * checkpointing is enabled). However, one common use case is to run idempotent queries
+ * (e.g., REPLACE or INSERT OVERWRITE) to upsert into the database and
+ * achieve exactly-once semantic.
+ */
+public class ClickHouseTableSink implements AppendStreamTableSink {
+
+ private static final Integer BATCH_SIZE_DEFAULT = 5000;
+ private static final Long COMMIT_PADDING_DEFAULT = 5000L;
+ private static final Integer RETRIES_DEFAULT = 3;
+ private static final Long RETRY_INTERVAL_DEFAULT = 3000L;
+ private static final Boolean IGNORE_INSERT_ERROR = false;
+ private String address;
+ private String username;
+ private String password;
+ private String database;
+ private String table;
+ private TableSchema schema;
+ private Integer batchSize;
+ private Long commitPadding;
+ private Integer retries;
+ private Long retryInterval;
+ private Boolean ignoreInsertError;
+
+
+ public ClickHouseTableSink(String address, String username, String password, String database, String table, TableSchema schema, Integer batchSize, Long commitPadding, Integer retries, Long retryInterval, Boolean ignoreInsertError) {
+ this.address = address;
+ this.username = username;
+ this.password = password;
+ this.database = database;
+ this.table = table;
+ this.schema = schema;
+ this.batchSize = batchSize;
+ this.commitPadding = commitPadding;
+ this.retries = retries;
+ this.retryInterval = retryInterval;
+ this.ignoreInsertError = ignoreInsertError;
+
+ }
+
+ /**
+ *
+ * @return ClickHouseAppendSinkFunction
+ */
+ private ClickHouseAppendSinkFunction initSink() {
+ String prepareStatement = createPrepareStatement(schema, database, table);
+ return new ClickHouseAppendSinkFunction(address, username, password, prepareStatement, batchSize, commitPadding, retries, retryInterval, ignoreInsertError);
+ }
+
+ @Override
+ public TableSink configure(String[] strings, TypeInformation>[] typeInformations) {
+
+ ClickHouseTableSink copy;
+ try {
+ copy = new ClickHouseTableSink(address, username, password, database, table, schema, batchSize, commitPadding, retries, retryInterval, ignoreInsertError);
+ } catch (Exception e) {
+ throw new RuntimeException("ClickHouseTableSink configure exception : ",e);
+ }
+
+ return copy;
+ }
+
+ @Override
+ public String[] getFieldNames() {
+ return schema.getFieldNames();
+ }
+
+ @Override
+ public TypeInformation>[] getFieldTypes() {
+ return schema.getFieldTypes();
+ }
+
+ @Override
+ public DataStreamSink> consumeDataStream(DataStream dataStream) {
+ return dataStream.addSink(initSink())
+ .setParallelism(dataStream.getParallelism())
+ .name(TableConnectorUtils.generateRuntimeName(this.getClass(), schema.getFieldNames()));
+ }
+
+ @Override
+ public DataType getConsumedDataType() {
+ return schema.toRowDataType();
+ }
+
+
+ @Override
+ public TypeInformation getOutputType() {
+ return new RowTypeInfo(schema.getFieldTypes(), schema.getFieldNames());
+ }
+
+
+ @Override
+ public TableSchema getTableSchema() {
+ return schema;
+ }
+
+
+ public static Builder builder() {
+ return new Builder();
+ }
+
+
+ public String createPrepareStatement(TableSchema tableSchema, String database, String table) {
+ String[] fieldNames = tableSchema.getFieldNames();
+ String columns = String.join(",", fieldNames);
+ String questionMarks = Arrays.stream(fieldNames)
+ .map(field -> "?")
+ .reduce((left,right) -> left+","+right)
+ .get();
+ StringBuilder builder = new StringBuilder("insert into ");
+ builder.append(database).append(".")
+ .append(table).append(" ( ")
+ .append(columns).append(" ) values ( ").append(questionMarks).append(" ) ");
+ return builder.toString();
+
+ }
+
+ public static class Builder {
+
+ private String address;
+ private String username;
+ private String password;
+ private String database;
+ private String table;
+ private TableSchema schema;
+ private Integer batchSize = BATCH_SIZE_DEFAULT;
+ private Long commitPadding = COMMIT_PADDING_DEFAULT;
+
+ private Integer retries = RETRIES_DEFAULT;
+ private Long retryInterval = RETRY_INTERVAL_DEFAULT;
+ private Boolean ignoreInsertError = IGNORE_INSERT_ERROR;
+
+ public Builder setAddress(String address) {
+ this.address = address;
+ return this;
+ }
+
+ public Builder setUsername(String username) {
+ this.username = username;
+ return this;
+ }
+
+ public Builder setPassword(String password) {
+ this.password = password;
+ return this;
+ }
+
+ public Builder setSchema(TableSchema schema) {
+ this.schema = schema;
+ return this;
+ }
+
+ public Builder setDatabase(String database) {
+ this.database = database;
+ return this;
+ }
+
+ public Builder setTable(String table) {
+ this.table = table;
+ return this;
+ }
+
+ public Builder setBatchSize(Integer batchSize) {
+ this.batchSize = batchSize;
+ return this;
+ }
+
+ public Builder setCommitPadding(Long commitPadding) {
+ this.commitPadding = commitPadding;
+ return this;
+ }
+
+ public Builder setRetries(Integer retries) {
+ this.retries = retries;
+ return this;
+
+ }
+
+ public Builder setRetryInterval(Long retryInterval) {
+ this.retryInterval = retryInterval;
+ return this;
+
+ }
+
+ public Builder setIgnoreInsertError(Boolean ignoreInsertError) {
+ this.ignoreInsertError = ignoreInsertError;
+ return this;
+ }
+
+ public ClickHouseTableSink builder() {
+ checkNotNull(address, "No address supplied.");
+ checkNotNull(username, "No username supplied.");
+ checkNotNull(password, "No password supplied.");
+ checkNotNull(database, "No database supplied.");
+ checkNotNull(table, "No table supplied.");
+ checkNotNull(schema, "No table-schema supplied.");
+ return new ClickHouseTableSink(address, username, password, database, table, schema, batchSize, commitPadding, retries, retryInterval, ignoreInsertError);
+ }
+
+
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) return true;
+ if (o == null || getClass() != o.getClass()) return false;
+
+ ClickHouseTableSink that = (ClickHouseTableSink) o;
+
+ if (!address.equals(that.address)) return false;
+ if (!username.equals(that.username)) return false;
+ if (!password.equals(that.password)) return false;
+ if (!database.equals(that.database)) return false;
+ if (!table.equals(that.table)) return false;
+ if (!schema.equals(that.schema)) return false;
+ if (!batchSize.equals(that.batchSize)) return false;
+ if (!commitPadding.equals(that.commitPadding)) return false;
+ if (!retries.equals(that.retries)) return false;
+ if (!retryInterval.equals(that.retryInterval)) return false;
+ return ignoreInsertError.equals(that.ignoreInsertError);
+ }
+
+ @Override
+ public int hashCode() {
+ int result = address.hashCode();
+ result = 31 * result + username.hashCode();
+ result = 31 * result + password.hashCode();
+ result = 31 * result + database.hashCode();
+ result = 31 * result + table.hashCode();
+ result = 31 * result + schema.hashCode();
+ result = 31 * result + batchSize.hashCode();
+ result = 31 * result + commitPadding.hashCode();
+ result = 31 * result + retries.hashCode();
+ result = 31 * result + retryInterval.hashCode();
+ result = 31 * result + ignoreInsertError.hashCode();
+ return result;
+ }
+}
diff --git a/flink-connector-clickhouse/src/main/java/com/apache/flink/streaming/connectors/clickhouse/ClickHouseTableSourceSinkFactory.java b/flink-connector-clickhouse/src/main/java/com/apache/flink/streaming/connectors/clickhouse/ClickHouseTableSourceSinkFactory.java
new file mode 100644
index 00000000..5bc5f0e4
--- /dev/null
+++ b/flink-connector-clickhouse/src/main/java/com/apache/flink/streaming/connectors/clickhouse/ClickHouseTableSourceSinkFactory.java
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.apache.flink.streaming.connectors.clickhouse;
+
+import com.apache.flink.table.descriptors.ClickHouseValidator;
+import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.descriptors.DescriptorProperties;
+import org.apache.flink.table.descriptors.SchemaValidator;
+import org.apache.flink.table.factories.StreamTableSinkFactory;
+import org.apache.flink.table.sinks.StreamTableSink;
+import org.apache.flink.table.utils.TableSchemaUtils;
+import org.apache.flink.types.Row;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static com.apache.flink.table.descriptors.ClickHouseValidator.*;
+import static org.apache.flink.table.descriptors.ConnectorDescriptorValidator.CONNECTOR_PROPERTY_VERSION;
+import static org.apache.flink.table.descriptors.ConnectorDescriptorValidator.CONNECTOR_TYPE;
+import static org.apache.flink.table.descriptors.ConnectorDescriptorValidator.CONNECTOR_VERSION;
+import static org.apache.flink.table.descriptors.DescriptorProperties.*;
+import static org.apache.flink.table.descriptors.DescriptorProperties.WATERMARK;
+import static org.apache.flink.table.descriptors.DescriptorProperties.WATERMARK_STRATEGY_DATA_TYPE;
+import static org.apache.flink.table.descriptors.Schema.*;
+
+
+/**
+ * Factory for creating configured instances of {@link ClickHouseTableSink} .
+ */
+public class ClickHouseTableSourceSinkFactory implements StreamTableSinkFactory {
+
+ @Override
+ public Map requiredContext() {
+ Map context = new HashMap<>();
+ context.put(CONNECTOR_TYPE, CONNECTOR_TYPE_VALUE_CLICKHOUSE);
+ context.put(CONNECTOR_VERSION, "1");
+ context.put(CONNECTOR_PROPERTY_VERSION, "1");
+ return context;
+ }
+
+ @Override
+ public List supportedProperties() {
+ List properties = new ArrayList<>();
+ properties.add(CONNECTOR_ADRESS);
+ properties.add(CONNECTOR_DATABASE);
+ properties.add(CONNECTOR_TABLE);
+ properties.add(CONNECTOR_USERNAME);
+ properties.add(CONNECTOR_PASSWORD);
+
+ properties.add(CONNECTOR_COMMIT_BATCH_SIZE);
+ properties.add(CONNECTOR_COMMIT_PADDING);
+
+ properties.add(CONNECTOR_COMMIT_RETRY_ATTEMPTS);
+ properties.add(CONNECTOR_COMMIT_RETRY_INTERVAL);
+
+ properties.add(CONNECTOR_COMMIT_IGNORE_ERROR);
+
+ // schema
+ properties.add(SCHEMA + ".#." + SCHEMA_DATA_TYPE);
+ properties.add(SCHEMA + ".#." + SCHEMA_TYPE);
+ properties.add(SCHEMA + ".#." + SCHEMA_NAME);
+
+ // watermark
+ properties.add(SCHEMA + "." + WATERMARK + ".#." + WATERMARK_ROWTIME);
+ properties.add(SCHEMA + "." + WATERMARK + ".#." + WATERMARK_STRATEGY_EXPR);
+ properties.add(SCHEMA + "." + WATERMARK + ".#." + WATERMARK_STRATEGY_DATA_TYPE);
+
+ return properties;
+
+ }
+
+ @Override
+ public StreamTableSink createStreamTableSink(Map map) {
+ DescriptorProperties descriptorProperties = getValidatedPropertities(map);
+ TableSchema schema = TableSchemaUtils.getPhysicalSchema(descriptorProperties.getTableSchema(SCHEMA));
+ final ClickHouseTableSink.Builder builder = ClickHouseTableSink.builder()
+ .setSchema(schema);
+
+ descriptorProperties.getOptionalString(CONNECTOR_ADRESS).ifPresent(builder::setAddress);
+ descriptorProperties.getOptionalString(CONNECTOR_DATABASE).ifPresent(builder::setDatabase);
+ descriptorProperties.getOptionalString(CONNECTOR_TABLE).ifPresent(builder::setTable);
+ descriptorProperties.getOptionalString(CONNECTOR_USERNAME).ifPresent(builder::setUsername);
+ descriptorProperties.getOptionalString(CONNECTOR_PASSWORD).ifPresent(builder::setPassword);
+ descriptorProperties.getOptionalInt(CONNECTOR_COMMIT_BATCH_SIZE).ifPresent(builder::setBatchSize);
+ descriptorProperties.getOptionalLong(CONNECTOR_COMMIT_PADDING).ifPresent(builder::setCommitPadding);
+ descriptorProperties.getOptionalInt(CONNECTOR_COMMIT_RETRY_ATTEMPTS).ifPresent(builder::setRetries);
+ descriptorProperties.getOptionalLong(CONNECTOR_COMMIT_RETRY_INTERVAL).ifPresent(builder::setRetryInterval);
+ descriptorProperties.getOptionalBoolean(CONNECTOR_COMMIT_IGNORE_ERROR).ifPresent(builder::setIgnoreInsertError);
+
+ return builder.builder();
+ }
+
+ private DescriptorProperties getValidatedPropertities(Map properties) {
+ final DescriptorProperties descriptorProperties = new DescriptorProperties(true);
+ descriptorProperties.putProperties(properties);
+
+ new SchemaValidator(true, false, false).validate(descriptorProperties);
+ new ClickHouseValidator().validate(descriptorProperties);
+
+ return descriptorProperties;
+
+ }
+}
diff --git a/flink-connector-clickhouse/src/main/java/com/apache/flink/table/descriptors/ClickHouse.java b/flink-connector-clickhouse/src/main/java/com/apache/flink/table/descriptors/ClickHouse.java
new file mode 100644
index 00000000..0283ef78
--- /dev/null
+++ b/flink-connector-clickhouse/src/main/java/com/apache/flink/table/descriptors/ClickHouse.java
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.apache.flink.table.descriptors;
+
+import org.apache.flink.table.descriptors.ConnectorDescriptor;
+import org.apache.flink.table.descriptors.DescriptorProperties;
+
+import java.util.Map;
+
+import static com.apache.flink.table.descriptors.ClickHouseValidator.*;
+
+/**
+ * Connector descriptor for ClickHouse
+ */
+public class ClickHouse extends ConnectorDescriptor {
+
+ private DescriptorProperties properties = new DescriptorProperties();
+
+ public ClickHouse() {
+ super(CONNECTOR_TYPE_VALUE_CLICKHOUSE, 1, false);
+ }
+
+ /**
+ * Set the ClickHouse version to be used .Required.
+ *
+ * @param version ClickHouse version . E.g. , "1.0.0"
+ * @return
+ */
+ public ClickHouse version(String version) {
+ properties.putString(CONNECTOR_VERSION, version);
+ return this;
+ }
+ /**
+ * Set the ClickHouse address to connect the ClickHouse cluster .Required.
+ *
+ * @param address ClickHouse address. E.g. , "jdbc:clickhouse://localhost:8123"
+ * @return
+ */
+ public ClickHouse address(String address) {
+ properties.putString(CONNECTOR_ADRESS, address);
+ return this;
+ }
+
+ public ClickHouse database(String database) {
+ properties.putString(CONNECTOR_DATABASE, database);
+ return this;
+ }
+
+ public ClickHouse table(String table) {
+ properties.putString(CONNECTOR_TABLE, table);
+ return this;
+ }
+
+ public ClickHouse username(String username) {
+ properties.putString(CONNECTOR_USERNAME, username);
+ return this;
+ }
+
+ public ClickHouse password(String password) {
+ properties.putString(CONNECTOR_PASSWORD, password);
+ return this;
+ }
+
+ public ClickHouse batchSize(Integer batchSize) {
+ properties.putInt(CONNECTOR_COMMIT_BATCH_SIZE, batchSize);
+ return this;
+ }
+
+ public ClickHouse padding(Long padding) {
+ properties.putLong(CONNECTOR_COMMIT_PADDING, padding);
+ return this;
+ }
+
+ public ClickHouse retryAttempts(Integer attempts) {
+ properties.putInt(CONNECTOR_COMMIT_RETRY_ATTEMPTS, attempts);
+ return this;
+ }
+
+ public ClickHouse retryInterval(Long interval) {
+ properties.putLong(CONNECTOR_COMMIT_RETRY_INTERVAL, interval);
+ return this;
+ }
+
+ public ClickHouse ignoreError(Boolean ignore) {
+ properties.putBoolean(CONNECTOR_COMMIT_IGNORE_ERROR, ignore);
+ return this;
+ }
+
+ @Override
+ protected Map toConnectorProperties() {
+ return properties.asMap();
+ }
+}
diff --git a/flink-connector-clickhouse/src/main/java/com/apache/flink/table/descriptors/ClickHouseValidator.java b/flink-connector-clickhouse/src/main/java/com/apache/flink/table/descriptors/ClickHouseValidator.java
new file mode 100644
index 00000000..002235a3
--- /dev/null
+++ b/flink-connector-clickhouse/src/main/java/com/apache/flink/table/descriptors/ClickHouseValidator.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.apache.flink.table.descriptors;
+
+import org.apache.flink.table.descriptors.ConnectorDescriptorValidator;
+import org.apache.flink.table.descriptors.DescriptorProperties;
+
+/**
+ * The validator for ClickHouse
+ */
+
+public class ClickHouseValidator extends ConnectorDescriptorValidator {
+
+ public static final String CONNECTOR_TYPE_VALUE_CLICKHOUSE = "clickhouse";
+ public static final String CONNECTOR_ADRESS = "connector.address";
+ public static final String CONNECTOR_DATABASE = "connector.database";
+ public static final String CONNECTOR_TABLE = "connector.table";
+ public static final String CONNECTOR_USERNAME = "connector.username";
+ public static final String CONNECTOR_PASSWORD = "connector.password";
+ public static final String CONNECTOR_COMMIT_BATCH_SIZE = "connector.commit.batch.size";
+ public static final String CONNECTOR_COMMIT_PADDING = "connector.commit.padding";
+ public static final String CONNECTOR_COMMIT_RETRY_ATTEMPTS = "connector.commit.retry.attempts";
+ public static final String CONNECTOR_COMMIT_RETRY_INTERVAL = "connector.commit.retry.interval";
+ public static final String CONNECTOR_COMMIT_IGNORE_ERROR = "connector.commit.ignore.error";
+
+ @Override
+ public void validate(DescriptorProperties properties) {
+ super.validate(properties);
+ validateComonProperties(properties);
+ }
+
+ private void validateComonProperties(DescriptorProperties properties) {
+
+ properties.validateString(CONNECTOR_ADRESS, false, 1);
+ properties.validateString(CONNECTOR_DATABASE, false, 1);
+ properties.validateString(CONNECTOR_TABLE, false, 1);
+ properties.validateString(CONNECTOR_USERNAME, false,1);
+ properties.validateString(CONNECTOR_PASSWORD, false,1);
+ properties.validateInt(CONNECTOR_COMMIT_BATCH_SIZE, true);
+ properties.validateString(CONNECTOR_USERNAME, true);
+ properties.validateInt(CONNECTOR_COMMIT_PADDING, true);
+ }
+}
diff --git a/flink-connector-clickhouse/src/main/resources/META-INF/services/org.apache.flink.table.factories.TableFactory b/flink-connector-clickhouse/src/main/resources/META-INF/services/org.apache.flink.table.factories.TableFactory
new file mode 100644
index 00000000..c94ff048
--- /dev/null
+++ b/flink-connector-clickhouse/src/main/resources/META-INF/services/org.apache.flink.table.factories.TableFactory
@@ -0,0 +1,17 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+com.apache.flink.streaming.connectors.clickhouse.ClickHouseTableSourceSinkFactory
+
diff --git a/flink-connector-clickhouse/src/test/java/com/apache/flink/streaming/connectors/clickhouse/ClickHouseTableSourceSinkFactoryTest.java b/flink-connector-clickhouse/src/test/java/com/apache/flink/streaming/connectors/clickhouse/ClickHouseTableSourceSinkFactoryTest.java
new file mode 100644
index 00000000..75c5c9b2
--- /dev/null
+++ b/flink-connector-clickhouse/src/test/java/com/apache/flink/streaming/connectors/clickhouse/ClickHouseTableSourceSinkFactoryTest.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.apache.flink.streaming.connectors.clickhouse;
+
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.descriptors.DescriptorProperties;
+import org.apache.flink.table.descriptors.Schema;
+import org.apache.flink.table.utils.TableSchemaUtils;
+import org.junit.Test;
+
+import java.util.Map;
+
+import static com.apache.flink.table.descriptors.ClickHouseValidator.*;
+import static org.apache.flink.table.descriptors.Schema.SCHEMA;
+import static org.junit.Assert.assertEquals;
+
+public class ClickHouseTableSourceSinkFactoryTest {
+
+
+ @Test
+ public void testCreateStreamTableSink() throws Exception {
+ String connectorVersion = "1";
+ String connectorAddress = "jdbc:clickhouse://localhost:8123/default";
+ String connectorDatabase = "qtt";
+ String connectorTable = "insert_test";
+ String connectorUserName = "admin";
+ String connectorPassWord = "admin";
+ String connectorCommitBatchSize = "1";
+ String connectorCommitPadding = "1";
+ String connectorCommitRetryAttempts = "3";
+ String connectorCommitRetryInterval = "3000";
+ String connectorCommitIgnoreError = "false";
+
+ DescriptorProperties properties = new DescriptorProperties();
+ properties.putString(CONNECTOR_TYPE, CONNECTOR_TYPE_VALUE_CLICKHOUSE);
+ properties.putString(CONNECTOR_VERSION, connectorVersion);
+ properties.putString(CONNECTOR_ADRESS, connectorAddress);
+ properties.putString(CONNECTOR_DATABASE, connectorDatabase);
+ properties.putString(CONNECTOR_TABLE, connectorTable);
+ properties.putString(CONNECTOR_USERNAME, connectorUserName);
+ properties.putString(CONNECTOR_PASSWORD, connectorPassWord);
+ properties.putString(CONNECTOR_COMMIT_BATCH_SIZE, connectorCommitBatchSize);
+ properties.putString(CONNECTOR_COMMIT_PADDING, connectorCommitPadding);
+ properties.putString(CONNECTOR_COMMIT_RETRY_ATTEMPTS, connectorCommitRetryAttempts);
+ properties.putString(CONNECTOR_COMMIT_RETRY_INTERVAL, connectorCommitRetryInterval);
+ properties.putString(CONNECTOR_COMMIT_IGNORE_ERROR, connectorCommitIgnoreError);
+
+ Schema schema = new Schema().field("s", DataTypes.STRING()).field("d", DataTypes.BIGINT());
+ Map stringStringMap = schema.toProperties();
+
+ properties.putProperties(stringStringMap);
+
+ ClickHouseTableSourceSinkFactory factory = new ClickHouseTableSourceSinkFactory();
+ ClickHouseTableSink actualClickHouseTableSink = (ClickHouseTableSink) factory.createStreamTableSink(properties.asMap());
+
+ TableSchema tableSchema = TableSchemaUtils.getPhysicalSchema(properties.getTableSchema(SCHEMA));
+ ClickHouseTableSink expectedClickHouseTableSink = new ClickHouseTableSink(connectorAddress,
+ connectorUserName,
+ connectorPassWord,
+ connectorDatabase,
+ connectorTable,
+ tableSchema,
+ Integer.valueOf(connectorCommitBatchSize),
+ Long.valueOf(connectorCommitPadding),
+ Integer.valueOf(connectorCommitRetryAttempts),
+ Long.valueOf(connectorCommitRetryInterval),
+ Boolean.valueOf(connectorCommitIgnoreError)
+ );
+
+ assertEquals(expectedClickHouseTableSink, actualClickHouseTableSink);
+ }
+
+
+}
diff --git a/flink-connector-clickhouse/src/test/java/com/apache/flink/table/descriptors/ClickHouseTest.java b/flink-connector-clickhouse/src/test/java/com/apache/flink/table/descriptors/ClickHouseTest.java
new file mode 100644
index 00000000..70bc68a5
--- /dev/null
+++ b/flink-connector-clickhouse/src/test/java/com/apache/flink/table/descriptors/ClickHouseTest.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.apache.flink.table.descriptors;
+
+import org.junit.Test;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import static com.apache.flink.table.descriptors.ClickHouseValidator.*;
+import static org.junit.Assert.assertEquals;
+
+public class ClickHouseTest {
+
+ @Test
+ public void testToConnectorProperties() throws Exception {
+ String connectorVersion = "1";
+ String connectorAddress = "jdbc:clickhouse://localhost:8123/default";
+ String connectorDatabase = "qtt";
+ String connectorTable = "insert_test";
+ String connectorUserName = "admin";
+ String connectorPassWord = "admin";
+ String connectorCommitBatchSize = "1";
+ String connectorCommitPadding = "2000";
+ String connectorCommitRetryAttempts = "3";
+ String connectorCommitRetryInterval = "3000";
+ String connectorCommitIgnoreError = "false";
+ HashMap expectedMap = new HashMap<>();
+ expectedMap.put(CONNECTOR_VERSION, connectorVersion);
+ expectedMap.put(CONNECTOR_ADRESS, connectorAddress);
+ expectedMap.put(CONNECTOR_DATABASE, connectorDatabase);
+ expectedMap.put(CONNECTOR_TABLE, connectorTable);
+ expectedMap.put(CONNECTOR_USERNAME, connectorUserName);
+ expectedMap.put(CONNECTOR_PASSWORD, connectorPassWord);
+ expectedMap.put(CONNECTOR_COMMIT_BATCH_SIZE, connectorCommitBatchSize);
+ expectedMap.put(CONNECTOR_COMMIT_PADDING, connectorCommitPadding);
+ expectedMap.put(CONNECTOR_COMMIT_RETRY_ATTEMPTS, connectorCommitRetryAttempts);
+ expectedMap.put(CONNECTOR_COMMIT_RETRY_INTERVAL, connectorCommitRetryInterval);
+ expectedMap.put(CONNECTOR_COMMIT_IGNORE_ERROR, connectorCommitIgnoreError);
+
+ ClickHouse clickhouse = new ClickHouse()
+ .version(connectorVersion)
+ .address(connectorAddress)
+ .database(connectorDatabase)
+ .table(connectorTable)
+ .username(connectorUserName)
+ .password(connectorPassWord)
+ .batchSize(Integer.valueOf(connectorCommitBatchSize))
+ .padding(Long.valueOf(connectorCommitPadding))
+ .retryAttempts(Integer.valueOf(connectorCommitRetryAttempts))
+ .retryInterval(Long.valueOf(connectorCommitRetryInterval))
+ .ignoreError(Boolean.valueOf(connectorCommitIgnoreError));
+ Map actualMap = clickhouse.toConnectorProperties();
+
+ assertEquals(expectedMap, actualMap);
+
+
+ }
+
+}
diff --git a/pom.xml b/pom.xml
index 7e83d852..ac8b568c 100644
--- a/pom.xml
+++ b/pom.xml
@@ -71,6 +71,7 @@
+ flink-connector-clickhouse
flink-connector-activemq
flink-connector-akka
flink-connector-flume
@@ -98,6 +99,9 @@
1.12.2
+
+ 0.1.50
+
5.4.1
3.0.5