|
13 | 13 | import com.altinity.clickhouse.sink.connector.db.operations.ClickHouseAlterTable; |
14 | 14 | import com.altinity.clickhouse.sink.connector.executor.ClickHouseBatchExecutor; |
15 | 15 | import com.altinity.clickhouse.sink.connector.executor.ClickHouseBatchRunnable; |
| 16 | +import com.altinity.clickhouse.sink.connector.executor.ClickHouseBatchWriter; |
16 | 17 | import com.altinity.clickhouse.sink.connector.model.ClickHouseStruct; |
17 | 18 | import com.altinity.clickhouse.sink.connector.model.DBCredentials; |
18 | 19 | import com.clickhouse.jdbc.ClickHouseConnection; |
|
29 | 30 | import org.apache.kafka.connect.source.SourceRecord; |
30 | 31 | import org.apache.logging.log4j.LogManager; |
31 | 32 | import org.apache.logging.log4j.Logger; |
| 33 | +import org.checkerframework.checker.units.qual.C; |
32 | 34 | import org.json.simple.JSONArray; |
33 | 35 | import org.json.simple.JSONObject; |
34 | 36 | import org.json.simple.parser.ParseException; |
@@ -81,6 +83,8 @@ public class DebeziumChangeEventCapture { |
81 | 83 | // Keep one clickhouse connection. |
82 | 84 | private ClickHouseConnection conn; |
83 | 85 |
|
| 86 | + ClickHouseBatchWriter singleThreadedWriter; |
| 87 | + |
84 | 88 | public DebeziumChangeEventCapture() { |
85 | 89 | singleThreadDebeziumEventExecutor = Executors.newFixedThreadPool(1); |
86 | 90 | } |
@@ -596,7 +600,7 @@ public void handleBatch(List<ChangeEvent<SourceRecord, SourceRecord>> list, |
596 | 600 |
|
597 | 601 |
|
598 | 602 | if(batch.size() > 0) { |
599 | | - appendToRecords(batch); |
| 603 | + appendToRecords(batch, config); |
600 | 604 | } |
601 | 605 | } |
602 | 606 | }); |
@@ -775,23 +779,33 @@ DBCredentials parseDBConfiguration(ClickHouseSinkConnectorConfig config) { |
775 | 779 | */ |
776 | 780 | private void setupProcessingThread(ClickHouseSinkConnectorConfig config) { |
777 | 781 |
|
778 | | - // Setup separate thread to read messages from shared buffer. |
779 | | - // this.records = new ConcurrentLinkedQueue<>(); |
780 | | - //this.runnable = new ClickHouseBatchRunnable(this.records, config, new HashMap()); |
781 | | - ThreadFactory namedThreadFactory = |
782 | | - new ThreadFactoryBuilder().setNameFormat("Sink Connector thread-pool-%d").build(); |
783 | | - this.executor = new ClickHouseBatchExecutor(config.getInt(ClickHouseSinkConnectorConfigVariables.THREAD_POOL_SIZE.toString()), namedThreadFactory); |
784 | | - for(int i = 0; i < config.getInt(ClickHouseSinkConnectorConfigVariables.THREAD_POOL_SIZE.toString()); i++) { |
785 | | - this.executor.scheduleAtFixedRate(new ClickHouseBatchRunnable(this.records, config, new HashMap()), 0, |
786 | | - config.getLong(ClickHouseSinkConnectorConfigVariables.BUFFER_FLUSH_TIME.toString()), TimeUnit.MILLISECONDS); |
| 782 | + if(config.getBoolean(ClickHouseSinkConnectorConfigVariables.SINGLE_THREADED.toString())) { |
| 783 | + log.info("********* Running in Single Threaded mode *********"); |
| 784 | + singleThreadedWriter = new ClickHouseBatchWriter(config, new HashMap()); |
787 | 785 | } |
| 786 | + |
| 787 | + ThreadFactory namedThreadFactory = |
| 788 | + new ThreadFactoryBuilder().setNameFormat("Sink Connector thread-pool-%d").build(); |
| 789 | + this.executor = new ClickHouseBatchExecutor(config.getInt(ClickHouseSinkConnectorConfigVariables.THREAD_POOL_SIZE.toString()), namedThreadFactory); |
| 790 | + for (int i = 0; i < config.getInt(ClickHouseSinkConnectorConfigVariables.THREAD_POOL_SIZE.toString()); i++) { |
| 791 | + this.executor.scheduleAtFixedRate(new ClickHouseBatchRunnable(this.records, config, new HashMap()), 0, |
| 792 | + config.getLong(ClickHouseSinkConnectorConfigVariables.BUFFER_FLUSH_TIME.toString()), TimeUnit.MILLISECONDS); |
| 793 | + } |
| 794 | + |
788 | 795 | //this.executor.scheduleAtFixedRate(this.runnable, 0, config.getLong(ClickHouseSinkConnectorConfigVariables.BUFFER_FLUSH_TIME.toString()), TimeUnit.MILLISECONDS); |
789 | 796 | } |
790 | 797 |
|
791 | | - private void appendToRecords(List<ClickHouseStruct> convertedRecords) { |
| 798 | + private void appendToRecords(List<ClickHouseStruct> convertedRecords, ClickHouseSinkConnectorConfig config) { |
792 | 799 |
|
793 | | - synchronized (this.records) { |
794 | | - this.records.add(convertedRecords); |
| 800 | + // If config is set to single threaded. |
| 801 | + if(config.getBoolean(ClickHouseSinkConnectorConfigVariables.SINGLE_THREADED.toString())) { |
| 802 | + singleThreadedWriter.persistRecords(convertedRecords); |
| 803 | + |
| 804 | + } else { |
| 805 | + |
| 806 | + synchronized (this.records) { |
| 807 | + this.records.add(convertedRecords); |
| 808 | + } |
795 | 809 | } |
796 | 810 |
|
797 | 811 |
|
|
0 commit comments