diff --git a/dev/checkstyle.xml b/dev/checkstyle.xml index 7a0558cd..35ce2b2b 100644 --- a/dev/checkstyle.xml +++ b/dev/checkstyle.xml @@ -18,7 +18,6 @@ - + + 4.0.0 + + + org.apache.bahir + bahir-flink-parent_2.11 + 1.1-SNAPSHOT + ../pom.xml + + + flink-connector-clickhouse_${scala.binary.version} + flink-connector-clickhouse + jar + + + + org.apache.flink + flink-streaming-java_${scala.binary.version} + ${flink.version} + provided + + + org.apache.flink + flink-streaming-scala_${scala.binary.version} + ${flink.version} + provided + + + org.apache.flink + flink-table-api-java-bridge_${scala.binary.version} + ${flink.version} + provided + + + + ru.yandex.clickhouse + clickhouse-jdbc + ${clickhouse.version} + + + + org.slf4j + slf4j-log4j12 + ${slf4j.version} + runtime + + + log4j + log4j + ${log4j.version} + runtime + + + + junit + junit + 4.11 + test + + + + + + + + org.apache.maven.plugins + maven-shade-plugin + + + package + + shade + + + false + + + junit:junit + + + *:* + + + + + + + + + + diff --git a/flink-connector-clickhouse/src/main/java/com/apache/flink/streaming/connectors/clickhouse/ClickHouseAppendSinkFunction.java b/flink-connector-clickhouse/src/main/java/com/apache/flink/streaming/connectors/clickhouse/ClickHouseAppendSinkFunction.java new file mode 100644 index 00000000..28c4f650 --- /dev/null +++ b/flink-connector-clickhouse/src/main/java/com/apache/flink/streaming/connectors/clickhouse/ClickHouseAppendSinkFunction.java @@ -0,0 +1,140 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.apache.flink.streaming.connectors.clickhouse; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.runtime.state.FunctionInitializationContext; +import org.apache.flink.runtime.state.FunctionSnapshotContext; +import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction; +import org.apache.flink.streaming.api.functions.sink.RichSinkFunction; +import org.apache.flink.types.Row; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import ru.yandex.clickhouse.BalancedClickhouseDataSource; +import ru.yandex.clickhouse.settings.ClickHouseProperties; + +import java.sql.Connection; +import java.sql.PreparedStatement; +import java.util.Properties; + +public class ClickHouseAppendSinkFunction extends RichSinkFunction implements CheckpointedFunction { + private static final String USERNAME = "user"; + private static final String PASSWORD = "password"; + + private static final Logger log = LoggerFactory.getLogger(ClickHouseAppendSinkFunction.class); + private static final long serialVersionUID = 1L; + + private Connection connection; + private BalancedClickhouseDataSource dataSource; + private PreparedStatement pstat; + + private String address; + private String username; + private String password; + + private String prepareStatement; + private Integer batchSize; + private Long commitPadding; + + private Integer retries; + private Long retryInterval; + + private Boolean ignoreInsertError; + + private Integer currentSize; + private Long lastExecuteTime; + + public ClickHouseAppendSinkFunction(String address, String username, String password, String prepareStatement, Integer batchSize, Long commitPadding, Integer retries, Long retryInterval, Boolean ignoreInsertError) { + this.address = address; + this.username = username; + this.password = password; + this.prepareStatement = prepareStatement; + this.batchSize = batchSize; + this.commitPadding = commitPadding; + this.retries = retries; + this.retryInterval = retryInterval; + this.ignoreInsertError = ignoreInsertError; + } + + @Override + public void open(Configuration parameters) throws Exception { + super.open(parameters); + Properties properties = new Properties(); + properties.setProperty(USERNAME, username); + properties.setProperty(PASSWORD, password); + ClickHouseProperties clickHouseProperties = new ClickHouseProperties(properties); + dataSource = new BalancedClickhouseDataSource(address, clickHouseProperties); + connection = dataSource.getConnection(); + pstat = connection.prepareStatement(prepareStatement); + lastExecuteTime = System.currentTimeMillis(); + currentSize = 0; + + } + + @Override + public void invoke(Row value, Context context) throws Exception { + for (int i = 0; i < value.getArity(); i++) { + pstat.setObject(i + 1, value.getField(i)); + } + pstat.addBatch(); + currentSize++; + if (currentSize >= batchSize || (System.currentTimeMillis() - lastExecuteTime) > commitPadding) { + try { + doExecuteRetries(retries, retryInterval); + } catch (Exception e) { + log.error("clickhouse-insert-error ( maxRetries:" + retries + " , retryInterval : " + retryInterval + " millisecond )" + e.getMessage()); + } finally { + pstat.clearBatch(); + currentSize = 0; + lastExecuteTime = System.currentTimeMillis(); + } + } + } + + public void doExecuteRetries(int count, long retryInterval) throws Exception { + + int retrySize = 0; + Exception resultException = null; + for (int i = 0; i < count; i++) { + try { + pstat.executeBatch(); + break; + } catch (Exception e) { + retrySize++; + resultException = e; + } + try { + Thread.sleep(retryInterval); + } catch (InterruptedException e) { + log.error("clickhouse retry interval exception : ",e); + } + } + if (retrySize == count && !ignoreInsertError) { + throw resultException; + } + } + + @Override + public void snapshotState(FunctionSnapshotContext functionSnapshotContext) throws Exception { + doExecuteRetries(retries, retryInterval); + } + + @Override + public void initializeState(FunctionInitializationContext functionInitializationContext) throws Exception { + + } + +} diff --git a/flink-connector-clickhouse/src/main/java/com/apache/flink/streaming/connectors/clickhouse/ClickHouseTableSink.java b/flink-connector-clickhouse/src/main/java/com/apache/flink/streaming/connectors/clickhouse/ClickHouseTableSink.java new file mode 100644 index 00000000..8550bbfa --- /dev/null +++ b/flink-connector-clickhouse/src/main/java/com/apache/flink/streaming/connectors/clickhouse/ClickHouseTableSink.java @@ -0,0 +1,274 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.apache.flink.streaming.connectors.clickhouse; + +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.java.typeutils.RowTypeInfo; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.datastream.DataStreamSink; +import org.apache.flink.table.api.TableSchema; +import org.apache.flink.table.sinks.AppendStreamTableSink; +import org.apache.flink.table.sinks.TableSink; +import org.apache.flink.table.types.DataType; +import org.apache.flink.table.utils.TableConnectorUtils; +import org.apache.flink.types.Row; + +import java.util.Arrays; + +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * An at-least-once Table sink for ClickHouse. + * + *

The mechanisms of Flink guarantees delivering messages at-least-once to this sink (if + * checkpointing is enabled). However, one common use case is to run idempotent queries + * (e.g., REPLACE or INSERT OVERWRITE) to upsert into the database and + * achieve exactly-once semantic.

+ */ +public class ClickHouseTableSink implements AppendStreamTableSink { + + private static final Integer BATCH_SIZE_DEFAULT = 5000; + private static final Long COMMIT_PADDING_DEFAULT = 5000L; + private static final Integer RETRIES_DEFAULT = 3; + private static final Long RETRY_INTERVAL_DEFAULT = 3000L; + private static final Boolean IGNORE_INSERT_ERROR = false; + private String address; + private String username; + private String password; + private String database; + private String table; + private TableSchema schema; + private Integer batchSize; + private Long commitPadding; + private Integer retries; + private Long retryInterval; + private Boolean ignoreInsertError; + + + public ClickHouseTableSink(String address, String username, String password, String database, String table, TableSchema schema, Integer batchSize, Long commitPadding, Integer retries, Long retryInterval, Boolean ignoreInsertError) { + this.address = address; + this.username = username; + this.password = password; + this.database = database; + this.table = table; + this.schema = schema; + this.batchSize = batchSize; + this.commitPadding = commitPadding; + this.retries = retries; + this.retryInterval = retryInterval; + this.ignoreInsertError = ignoreInsertError; + + } + + /** + * + * @return ClickHouseAppendSinkFunction + */ + private ClickHouseAppendSinkFunction initSink() { + String prepareStatement = createPrepareStatement(schema, database, table); + return new ClickHouseAppendSinkFunction(address, username, password, prepareStatement, batchSize, commitPadding, retries, retryInterval, ignoreInsertError); + } + + @Override + public TableSink configure(String[] strings, TypeInformation[] typeInformations) { + + ClickHouseTableSink copy; + try { + copy = new ClickHouseTableSink(address, username, password, database, table, schema, batchSize, commitPadding, retries, retryInterval, ignoreInsertError); + } catch (Exception e) { + throw new RuntimeException("ClickHouseTableSink configure exception : ",e); + } + + return copy; + } + + @Override + public String[] getFieldNames() { + return schema.getFieldNames(); + } + + @Override + public TypeInformation[] getFieldTypes() { + return schema.getFieldTypes(); + } + + @Override + public DataStreamSink consumeDataStream(DataStream dataStream) { + return dataStream.addSink(initSink()) + .setParallelism(dataStream.getParallelism()) + .name(TableConnectorUtils.generateRuntimeName(this.getClass(), schema.getFieldNames())); + } + + @Override + public DataType getConsumedDataType() { + return schema.toRowDataType(); + } + + + @Override + public TypeInformation getOutputType() { + return new RowTypeInfo(schema.getFieldTypes(), schema.getFieldNames()); + } + + + @Override + public TableSchema getTableSchema() { + return schema; + } + + + public static Builder builder() { + return new Builder(); + } + + + public String createPrepareStatement(TableSchema tableSchema, String database, String table) { + String[] fieldNames = tableSchema.getFieldNames(); + String columns = String.join(",", fieldNames); + String questionMarks = Arrays.stream(fieldNames) + .map(field -> "?") + .reduce((left,right) -> left+","+right) + .get(); + StringBuilder builder = new StringBuilder("insert into "); + builder.append(database).append(".") + .append(table).append(" ( ") + .append(columns).append(" ) values ( ").append(questionMarks).append(" ) "); + return builder.toString(); + + } + + public static class Builder { + + private String address; + private String username; + private String password; + private String database; + private String table; + private TableSchema schema; + private Integer batchSize = BATCH_SIZE_DEFAULT; + private Long commitPadding = COMMIT_PADDING_DEFAULT; + + private Integer retries = RETRIES_DEFAULT; + private Long retryInterval = RETRY_INTERVAL_DEFAULT; + private Boolean ignoreInsertError = IGNORE_INSERT_ERROR; + + public Builder setAddress(String address) { + this.address = address; + return this; + } + + public Builder setUsername(String username) { + this.username = username; + return this; + } + + public Builder setPassword(String password) { + this.password = password; + return this; + } + + public Builder setSchema(TableSchema schema) { + this.schema = schema; + return this; + } + + public Builder setDatabase(String database) { + this.database = database; + return this; + } + + public Builder setTable(String table) { + this.table = table; + return this; + } + + public Builder setBatchSize(Integer batchSize) { + this.batchSize = batchSize; + return this; + } + + public Builder setCommitPadding(Long commitPadding) { + this.commitPadding = commitPadding; + return this; + } + + public Builder setRetries(Integer retries) { + this.retries = retries; + return this; + + } + + public Builder setRetryInterval(Long retryInterval) { + this.retryInterval = retryInterval; + return this; + + } + + public Builder setIgnoreInsertError(Boolean ignoreInsertError) { + this.ignoreInsertError = ignoreInsertError; + return this; + } + + public ClickHouseTableSink builder() { + checkNotNull(address, "No address supplied."); + checkNotNull(username, "No username supplied."); + checkNotNull(password, "No password supplied."); + checkNotNull(database, "No database supplied."); + checkNotNull(table, "No table supplied."); + checkNotNull(schema, "No table-schema supplied."); + return new ClickHouseTableSink(address, username, password, database, table, schema, batchSize, commitPadding, retries, retryInterval, ignoreInsertError); + } + + + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + + ClickHouseTableSink that = (ClickHouseTableSink) o; + + if (!address.equals(that.address)) return false; + if (!username.equals(that.username)) return false; + if (!password.equals(that.password)) return false; + if (!database.equals(that.database)) return false; + if (!table.equals(that.table)) return false; + if (!schema.equals(that.schema)) return false; + if (!batchSize.equals(that.batchSize)) return false; + if (!commitPadding.equals(that.commitPadding)) return false; + if (!retries.equals(that.retries)) return false; + if (!retryInterval.equals(that.retryInterval)) return false; + return ignoreInsertError.equals(that.ignoreInsertError); + } + + @Override + public int hashCode() { + int result = address.hashCode(); + result = 31 * result + username.hashCode(); + result = 31 * result + password.hashCode(); + result = 31 * result + database.hashCode(); + result = 31 * result + table.hashCode(); + result = 31 * result + schema.hashCode(); + result = 31 * result + batchSize.hashCode(); + result = 31 * result + commitPadding.hashCode(); + result = 31 * result + retries.hashCode(); + result = 31 * result + retryInterval.hashCode(); + result = 31 * result + ignoreInsertError.hashCode(); + return result; + } +} diff --git a/flink-connector-clickhouse/src/main/java/com/apache/flink/streaming/connectors/clickhouse/ClickHouseTableSourceSinkFactory.java b/flink-connector-clickhouse/src/main/java/com/apache/flink/streaming/connectors/clickhouse/ClickHouseTableSourceSinkFactory.java new file mode 100644 index 00000000..5bc5f0e4 --- /dev/null +++ b/flink-connector-clickhouse/src/main/java/com/apache/flink/streaming/connectors/clickhouse/ClickHouseTableSourceSinkFactory.java @@ -0,0 +1,119 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.apache.flink.streaming.connectors.clickhouse; + +import com.apache.flink.table.descriptors.ClickHouseValidator; +import org.apache.flink.table.api.TableSchema; +import org.apache.flink.table.descriptors.DescriptorProperties; +import org.apache.flink.table.descriptors.SchemaValidator; +import org.apache.flink.table.factories.StreamTableSinkFactory; +import org.apache.flink.table.sinks.StreamTableSink; +import org.apache.flink.table.utils.TableSchemaUtils; +import org.apache.flink.types.Row; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import static com.apache.flink.table.descriptors.ClickHouseValidator.*; +import static org.apache.flink.table.descriptors.ConnectorDescriptorValidator.CONNECTOR_PROPERTY_VERSION; +import static org.apache.flink.table.descriptors.ConnectorDescriptorValidator.CONNECTOR_TYPE; +import static org.apache.flink.table.descriptors.ConnectorDescriptorValidator.CONNECTOR_VERSION; +import static org.apache.flink.table.descriptors.DescriptorProperties.*; +import static org.apache.flink.table.descriptors.DescriptorProperties.WATERMARK; +import static org.apache.flink.table.descriptors.DescriptorProperties.WATERMARK_STRATEGY_DATA_TYPE; +import static org.apache.flink.table.descriptors.Schema.*; + + +/** + * Factory for creating configured instances of {@link ClickHouseTableSink} . + */ +public class ClickHouseTableSourceSinkFactory implements StreamTableSinkFactory { + + @Override + public Map requiredContext() { + Map context = new HashMap<>(); + context.put(CONNECTOR_TYPE, CONNECTOR_TYPE_VALUE_CLICKHOUSE); + context.put(CONNECTOR_VERSION, "1"); + context.put(CONNECTOR_PROPERTY_VERSION, "1"); + return context; + } + + @Override + public List supportedProperties() { + List properties = new ArrayList<>(); + properties.add(CONNECTOR_ADRESS); + properties.add(CONNECTOR_DATABASE); + properties.add(CONNECTOR_TABLE); + properties.add(CONNECTOR_USERNAME); + properties.add(CONNECTOR_PASSWORD); + + properties.add(CONNECTOR_COMMIT_BATCH_SIZE); + properties.add(CONNECTOR_COMMIT_PADDING); + + properties.add(CONNECTOR_COMMIT_RETRY_ATTEMPTS); + properties.add(CONNECTOR_COMMIT_RETRY_INTERVAL); + + properties.add(CONNECTOR_COMMIT_IGNORE_ERROR); + + // schema + properties.add(SCHEMA + ".#." + SCHEMA_DATA_TYPE); + properties.add(SCHEMA + ".#." + SCHEMA_TYPE); + properties.add(SCHEMA + ".#." + SCHEMA_NAME); + + // watermark + properties.add(SCHEMA + "." + WATERMARK + ".#." + WATERMARK_ROWTIME); + properties.add(SCHEMA + "." + WATERMARK + ".#." + WATERMARK_STRATEGY_EXPR); + properties.add(SCHEMA + "." + WATERMARK + ".#." + WATERMARK_STRATEGY_DATA_TYPE); + + return properties; + + } + + @Override + public StreamTableSink createStreamTableSink(Map map) { + DescriptorProperties descriptorProperties = getValidatedPropertities(map); + TableSchema schema = TableSchemaUtils.getPhysicalSchema(descriptorProperties.getTableSchema(SCHEMA)); + final ClickHouseTableSink.Builder builder = ClickHouseTableSink.builder() + .setSchema(schema); + + descriptorProperties.getOptionalString(CONNECTOR_ADRESS).ifPresent(builder::setAddress); + descriptorProperties.getOptionalString(CONNECTOR_DATABASE).ifPresent(builder::setDatabase); + descriptorProperties.getOptionalString(CONNECTOR_TABLE).ifPresent(builder::setTable); + descriptorProperties.getOptionalString(CONNECTOR_USERNAME).ifPresent(builder::setUsername); + descriptorProperties.getOptionalString(CONNECTOR_PASSWORD).ifPresent(builder::setPassword); + descriptorProperties.getOptionalInt(CONNECTOR_COMMIT_BATCH_SIZE).ifPresent(builder::setBatchSize); + descriptorProperties.getOptionalLong(CONNECTOR_COMMIT_PADDING).ifPresent(builder::setCommitPadding); + descriptorProperties.getOptionalInt(CONNECTOR_COMMIT_RETRY_ATTEMPTS).ifPresent(builder::setRetries); + descriptorProperties.getOptionalLong(CONNECTOR_COMMIT_RETRY_INTERVAL).ifPresent(builder::setRetryInterval); + descriptorProperties.getOptionalBoolean(CONNECTOR_COMMIT_IGNORE_ERROR).ifPresent(builder::setIgnoreInsertError); + + return builder.builder(); + } + + private DescriptorProperties getValidatedPropertities(Map properties) { + final DescriptorProperties descriptorProperties = new DescriptorProperties(true); + descriptorProperties.putProperties(properties); + + new SchemaValidator(true, false, false).validate(descriptorProperties); + new ClickHouseValidator().validate(descriptorProperties); + + return descriptorProperties; + + } +} diff --git a/flink-connector-clickhouse/src/main/java/com/apache/flink/table/descriptors/ClickHouse.java b/flink-connector-clickhouse/src/main/java/com/apache/flink/table/descriptors/ClickHouse.java new file mode 100644 index 00000000..0283ef78 --- /dev/null +++ b/flink-connector-clickhouse/src/main/java/com/apache/flink/table/descriptors/ClickHouse.java @@ -0,0 +1,107 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.apache.flink.table.descriptors; + +import org.apache.flink.table.descriptors.ConnectorDescriptor; +import org.apache.flink.table.descriptors.DescriptorProperties; + +import java.util.Map; + +import static com.apache.flink.table.descriptors.ClickHouseValidator.*; + +/** + * Connector descriptor for ClickHouse + */ +public class ClickHouse extends ConnectorDescriptor { + + private DescriptorProperties properties = new DescriptorProperties(); + + public ClickHouse() { + super(CONNECTOR_TYPE_VALUE_CLICKHOUSE, 1, false); + } + + /** + * Set the ClickHouse version to be used .Required. + * + * @param version ClickHouse version . E.g. , "1.0.0" + * @return + */ + public ClickHouse version(String version) { + properties.putString(CONNECTOR_VERSION, version); + return this; + } + /** + * Set the ClickHouse address to connect the ClickHouse cluster .Required. + * + * @param address ClickHouse address. E.g. , "jdbc:clickhouse://localhost:8123" + * @return + */ + public ClickHouse address(String address) { + properties.putString(CONNECTOR_ADRESS, address); + return this; + } + + public ClickHouse database(String database) { + properties.putString(CONNECTOR_DATABASE, database); + return this; + } + + public ClickHouse table(String table) { + properties.putString(CONNECTOR_TABLE, table); + return this; + } + + public ClickHouse username(String username) { + properties.putString(CONNECTOR_USERNAME, username); + return this; + } + + public ClickHouse password(String password) { + properties.putString(CONNECTOR_PASSWORD, password); + return this; + } + + public ClickHouse batchSize(Integer batchSize) { + properties.putInt(CONNECTOR_COMMIT_BATCH_SIZE, batchSize); + return this; + } + + public ClickHouse padding(Long padding) { + properties.putLong(CONNECTOR_COMMIT_PADDING, padding); + return this; + } + + public ClickHouse retryAttempts(Integer attempts) { + properties.putInt(CONNECTOR_COMMIT_RETRY_ATTEMPTS, attempts); + return this; + } + + public ClickHouse retryInterval(Long interval) { + properties.putLong(CONNECTOR_COMMIT_RETRY_INTERVAL, interval); + return this; + } + + public ClickHouse ignoreError(Boolean ignore) { + properties.putBoolean(CONNECTOR_COMMIT_IGNORE_ERROR, ignore); + return this; + } + + @Override + protected Map toConnectorProperties() { + return properties.asMap(); + } +} diff --git a/flink-connector-clickhouse/src/main/java/com/apache/flink/table/descriptors/ClickHouseValidator.java b/flink-connector-clickhouse/src/main/java/com/apache/flink/table/descriptors/ClickHouseValidator.java new file mode 100644 index 00000000..002235a3 --- /dev/null +++ b/flink-connector-clickhouse/src/main/java/com/apache/flink/table/descriptors/ClickHouseValidator.java @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.apache.flink.table.descriptors; + +import org.apache.flink.table.descriptors.ConnectorDescriptorValidator; +import org.apache.flink.table.descriptors.DescriptorProperties; + +/** + * The validator for ClickHouse + */ + +public class ClickHouseValidator extends ConnectorDescriptorValidator { + + public static final String CONNECTOR_TYPE_VALUE_CLICKHOUSE = "clickhouse"; + public static final String CONNECTOR_ADRESS = "connector.address"; + public static final String CONNECTOR_DATABASE = "connector.database"; + public static final String CONNECTOR_TABLE = "connector.table"; + public static final String CONNECTOR_USERNAME = "connector.username"; + public static final String CONNECTOR_PASSWORD = "connector.password"; + public static final String CONNECTOR_COMMIT_BATCH_SIZE = "connector.commit.batch.size"; + public static final String CONNECTOR_COMMIT_PADDING = "connector.commit.padding"; + public static final String CONNECTOR_COMMIT_RETRY_ATTEMPTS = "connector.commit.retry.attempts"; + public static final String CONNECTOR_COMMIT_RETRY_INTERVAL = "connector.commit.retry.interval"; + public static final String CONNECTOR_COMMIT_IGNORE_ERROR = "connector.commit.ignore.error"; + + @Override + public void validate(DescriptorProperties properties) { + super.validate(properties); + validateComonProperties(properties); + } + + private void validateComonProperties(DescriptorProperties properties) { + + properties.validateString(CONNECTOR_ADRESS, false, 1); + properties.validateString(CONNECTOR_DATABASE, false, 1); + properties.validateString(CONNECTOR_TABLE, false, 1); + properties.validateString(CONNECTOR_USERNAME, false,1); + properties.validateString(CONNECTOR_PASSWORD, false,1); + properties.validateInt(CONNECTOR_COMMIT_BATCH_SIZE, true); + properties.validateString(CONNECTOR_USERNAME, true); + properties.validateInt(CONNECTOR_COMMIT_PADDING, true); + } +} diff --git a/flink-connector-clickhouse/src/main/resources/META-INF/services/org.apache.flink.table.factories.TableFactory b/flink-connector-clickhouse/src/main/resources/META-INF/services/org.apache.flink.table.factories.TableFactory new file mode 100644 index 00000000..c94ff048 --- /dev/null +++ b/flink-connector-clickhouse/src/main/resources/META-INF/services/org.apache.flink.table.factories.TableFactory @@ -0,0 +1,17 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +com.apache.flink.streaming.connectors.clickhouse.ClickHouseTableSourceSinkFactory + diff --git a/flink-connector-clickhouse/src/test/java/com/apache/flink/streaming/connectors/clickhouse/ClickHouseTableSourceSinkFactoryTest.java b/flink-connector-clickhouse/src/test/java/com/apache/flink/streaming/connectors/clickhouse/ClickHouseTableSourceSinkFactoryTest.java new file mode 100644 index 00000000..75c5c9b2 --- /dev/null +++ b/flink-connector-clickhouse/src/test/java/com/apache/flink/streaming/connectors/clickhouse/ClickHouseTableSourceSinkFactoryTest.java @@ -0,0 +1,90 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.apache.flink.streaming.connectors.clickhouse; + +import org.apache.flink.table.api.DataTypes; +import org.apache.flink.table.api.TableSchema; +import org.apache.flink.table.descriptors.DescriptorProperties; +import org.apache.flink.table.descriptors.Schema; +import org.apache.flink.table.utils.TableSchemaUtils; +import org.junit.Test; + +import java.util.Map; + +import static com.apache.flink.table.descriptors.ClickHouseValidator.*; +import static org.apache.flink.table.descriptors.Schema.SCHEMA; +import static org.junit.Assert.assertEquals; + +public class ClickHouseTableSourceSinkFactoryTest { + + + @Test + public void testCreateStreamTableSink() throws Exception { + String connectorVersion = "1"; + String connectorAddress = "jdbc:clickhouse://localhost:8123/default"; + String connectorDatabase = "qtt"; + String connectorTable = "insert_test"; + String connectorUserName = "admin"; + String connectorPassWord = "admin"; + String connectorCommitBatchSize = "1"; + String connectorCommitPadding = "1"; + String connectorCommitRetryAttempts = "3"; + String connectorCommitRetryInterval = "3000"; + String connectorCommitIgnoreError = "false"; + + DescriptorProperties properties = new DescriptorProperties(); + properties.putString(CONNECTOR_TYPE, CONNECTOR_TYPE_VALUE_CLICKHOUSE); + properties.putString(CONNECTOR_VERSION, connectorVersion); + properties.putString(CONNECTOR_ADRESS, connectorAddress); + properties.putString(CONNECTOR_DATABASE, connectorDatabase); + properties.putString(CONNECTOR_TABLE, connectorTable); + properties.putString(CONNECTOR_USERNAME, connectorUserName); + properties.putString(CONNECTOR_PASSWORD, connectorPassWord); + properties.putString(CONNECTOR_COMMIT_BATCH_SIZE, connectorCommitBatchSize); + properties.putString(CONNECTOR_COMMIT_PADDING, connectorCommitPadding); + properties.putString(CONNECTOR_COMMIT_RETRY_ATTEMPTS, connectorCommitRetryAttempts); + properties.putString(CONNECTOR_COMMIT_RETRY_INTERVAL, connectorCommitRetryInterval); + properties.putString(CONNECTOR_COMMIT_IGNORE_ERROR, connectorCommitIgnoreError); + + Schema schema = new Schema().field("s", DataTypes.STRING()).field("d", DataTypes.BIGINT()); + Map stringStringMap = schema.toProperties(); + + properties.putProperties(stringStringMap); + + ClickHouseTableSourceSinkFactory factory = new ClickHouseTableSourceSinkFactory(); + ClickHouseTableSink actualClickHouseTableSink = (ClickHouseTableSink) factory.createStreamTableSink(properties.asMap()); + + TableSchema tableSchema = TableSchemaUtils.getPhysicalSchema(properties.getTableSchema(SCHEMA)); + ClickHouseTableSink expectedClickHouseTableSink = new ClickHouseTableSink(connectorAddress, + connectorUserName, + connectorPassWord, + connectorDatabase, + connectorTable, + tableSchema, + Integer.valueOf(connectorCommitBatchSize), + Long.valueOf(connectorCommitPadding), + Integer.valueOf(connectorCommitRetryAttempts), + Long.valueOf(connectorCommitRetryInterval), + Boolean.valueOf(connectorCommitIgnoreError) + ); + + assertEquals(expectedClickHouseTableSink, actualClickHouseTableSink); + } + + +} diff --git a/flink-connector-clickhouse/src/test/java/com/apache/flink/table/descriptors/ClickHouseTest.java b/flink-connector-clickhouse/src/test/java/com/apache/flink/table/descriptors/ClickHouseTest.java new file mode 100644 index 00000000..70bc68a5 --- /dev/null +++ b/flink-connector-clickhouse/src/test/java/com/apache/flink/table/descriptors/ClickHouseTest.java @@ -0,0 +1,75 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.apache.flink.table.descriptors; + +import org.junit.Test; + +import java.util.HashMap; +import java.util.Map; + +import static com.apache.flink.table.descriptors.ClickHouseValidator.*; +import static org.junit.Assert.assertEquals; + +public class ClickHouseTest { + + @Test + public void testToConnectorProperties() throws Exception { + String connectorVersion = "1"; + String connectorAddress = "jdbc:clickhouse://localhost:8123/default"; + String connectorDatabase = "qtt"; + String connectorTable = "insert_test"; + String connectorUserName = "admin"; + String connectorPassWord = "admin"; + String connectorCommitBatchSize = "1"; + String connectorCommitPadding = "2000"; + String connectorCommitRetryAttempts = "3"; + String connectorCommitRetryInterval = "3000"; + String connectorCommitIgnoreError = "false"; + HashMap expectedMap = new HashMap<>(); + expectedMap.put(CONNECTOR_VERSION, connectorVersion); + expectedMap.put(CONNECTOR_ADRESS, connectorAddress); + expectedMap.put(CONNECTOR_DATABASE, connectorDatabase); + expectedMap.put(CONNECTOR_TABLE, connectorTable); + expectedMap.put(CONNECTOR_USERNAME, connectorUserName); + expectedMap.put(CONNECTOR_PASSWORD, connectorPassWord); + expectedMap.put(CONNECTOR_COMMIT_BATCH_SIZE, connectorCommitBatchSize); + expectedMap.put(CONNECTOR_COMMIT_PADDING, connectorCommitPadding); + expectedMap.put(CONNECTOR_COMMIT_RETRY_ATTEMPTS, connectorCommitRetryAttempts); + expectedMap.put(CONNECTOR_COMMIT_RETRY_INTERVAL, connectorCommitRetryInterval); + expectedMap.put(CONNECTOR_COMMIT_IGNORE_ERROR, connectorCommitIgnoreError); + + ClickHouse clickhouse = new ClickHouse() + .version(connectorVersion) + .address(connectorAddress) + .database(connectorDatabase) + .table(connectorTable) + .username(connectorUserName) + .password(connectorPassWord) + .batchSize(Integer.valueOf(connectorCommitBatchSize)) + .padding(Long.valueOf(connectorCommitPadding)) + .retryAttempts(Integer.valueOf(connectorCommitRetryAttempts)) + .retryInterval(Long.valueOf(connectorCommitRetryInterval)) + .ignoreError(Boolean.valueOf(connectorCommitIgnoreError)); + Map actualMap = clickhouse.toConnectorProperties(); + + assertEquals(expectedMap, actualMap); + + + } + +} diff --git a/pom.xml b/pom.xml index 7e83d852..ac8b568c 100644 --- a/pom.xml +++ b/pom.xml @@ -71,6 +71,7 @@ + flink-connector-clickhouse flink-connector-activemq flink-connector-akka flink-connector-flume @@ -98,6 +99,9 @@ 1.12.2 + + 0.1.50 + 5.4.1 3.0.5