Skip to content

Commit 893d725

Browse files
Chen768959morningman
authored andcommitted
[Feature] Support Doris Catalog (#55274)
The new Doris Catalog replaces the previous method of accessing external tables in remote Doris clusters via JDBC Catalog. related: #56011 1. It retrieves metadata of Doris external tables through HTTP APIs. 2. The metadata information for Doris external tables is more accurate, fully capturing various metadata from remote cluster tables, such as primary keys, bucketing keys, and native Doris data types. 3. The underlying implementation uses Arrow Flight SQL to communicate with the remote Doris cluster, achieving approximately 4 times higher transmission performance compared to the single-node JDBC Catalog. 4. It supports concurrent retrieval of Arrow response results from the remote Doris cluster, with transmission performance scaling linearly as the cluster size increases.
1 parent 2f19e95 commit 893d725

File tree

48 files changed

+2862
-11
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

48 files changed

+2862
-11
lines changed

be/src/runtime/descriptors.cpp

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -341,6 +341,17 @@ std::string JdbcTableDescriptor::debug_string() const {
341341
return fmt::to_string(buf);
342342
}
343343

344+
RemoteDorisTableDescriptor::RemoteDorisTableDescriptor(const TTableDescriptor& tdesc)
345+
: TableDescriptor(tdesc) {}
346+
347+
RemoteDorisTableDescriptor::~RemoteDorisTableDescriptor() = default;
348+
349+
std::string RemoteDorisTableDescriptor::debug_string() const {
350+
std::stringstream out;
351+
out << "RemoteDorisTable(" << TableDescriptor::debug_string() << ")";
352+
return out.str();
353+
}
354+
344355
TupleDescriptor::TupleDescriptor(const TTupleDescriptor& tdesc, bool own_slots)
345356
: _id(tdesc.id),
346357
_num_materialized_slots(0),
@@ -614,6 +625,9 @@ Status DescriptorTbl::create(ObjectPool* pool, const TDescriptorTable& thrift_tb
614625
case TTableType::DICTIONARY_TABLE:
615626
desc = pool->add(new DictionaryTableDescriptor(tdesc));
616627
break;
628+
case TTableType::REMOTE_DORIS_TABLE:
629+
desc = pool->add(new RemoteDorisTableDescriptor(tdesc));
630+
break;
617631
default:
618632
DCHECK(false) << "invalid table type: " << tdesc.tableType;
619633
}

be/src/runtime/descriptors.h

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -324,6 +324,15 @@ class JdbcTableDescriptor : public TableDescriptor {
324324
bool _connection_pool_keep_alive;
325325
};
326326

327+
class RemoteDorisTableDescriptor : public TableDescriptor {
328+
public:
329+
RemoteDorisTableDescriptor(const TTableDescriptor& tdesc);
330+
~RemoteDorisTableDescriptor() override;
331+
std::string debug_string() const override;
332+
333+
private:
334+
};
335+
327336
class TupleDescriptor {
328337
public:
329338
TupleDescriptor(TupleDescriptor&&) = delete;
Lines changed: 127 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,127 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
#include "remote_doris_reader.h"
19+
20+
#include <iostream>
21+
#include <map>
22+
#include <memory>
23+
#include <string>
24+
25+
#include "arrow/flight/client.h"
26+
#include "arrow/flight/types.h"
27+
#include "arrow/ipc/reader.h"
28+
#include "arrow/memory_pool.h"
29+
#include "arrow/result.h"
30+
#include "arrow/status.h"
31+
#include "common/status.h"
32+
#include "runtime/descriptors.h"
33+
#include "runtime/runtime_state.h"
34+
#include "runtime/types.h"
35+
#include "util/arrow/utils.h"
36+
#include "vec/core/block.h"
37+
#include "vec/core/column_with_type_and_name.h"
38+
#include "vec/core/types.h"
39+
40+
namespace doris {
41+
class RuntimeProfile;
42+
class RuntimeState;
43+
44+
namespace vectorized {
45+
class Block;
46+
} // namespace vectorized
47+
} // namespace doris
48+
49+
namespace doris::vectorized {
50+
#include "common/compile_check_begin.h"
51+
52+
RemoteDorisReader::RemoteDorisReader(const std::vector<SlotDescriptor*>& file_slot_descs,
53+
RuntimeState* state, RuntimeProfile* profile,
54+
const TFileRangeDesc& range)
55+
: _range(range), _file_slot_descs(file_slot_descs) {
56+
TimezoneUtils::find_cctz_time_zone(TimezoneUtils::default_time_zone, _ctzz);
57+
}
58+
59+
Status RemoteDorisReader::init_reader() {
60+
RETURN_DORIS_STATUS_IF_ERROR(init_stream());
61+
DCHECK(_stream != nullptr);
62+
return Status::OK();
63+
}
64+
65+
Status RemoteDorisReader::get_next_block(Block* block, size_t* read_rows, bool* eof) {
66+
arrow::flight::FlightStreamChunk chunk;
67+
RETURN_DORIS_STATUS_IF_ERROR(_stream->Next().Value(&chunk));
68+
69+
if (!chunk.data) {
70+
*read_rows = 0;
71+
*eof = true;
72+
return Status::OK();
73+
}
74+
75+
// convert arrow batch to block
76+
auto batch = chunk.data;
77+
auto num_rows = batch->num_rows();
78+
auto num_columns = batch->num_columns();
79+
for (int c = 0; c < num_columns; ++c) {
80+
arrow::Array* column = batch->column(c).get();
81+
82+
std::string column_name = batch->schema()->field(c)->name();
83+
84+
try {
85+
const vectorized::ColumnWithTypeAndName& column_with_name =
86+
block->get_by_name(column_name);
87+
RETURN_IF_ERROR(column_with_name.type->get_serde()->read_column_from_arrow(
88+
column_with_name.column->assume_mutable_ref(), column, 0, num_rows, _ctzz));
89+
} catch (Exception& e) {
90+
return Status::InternalError(
91+
"Failed to convert from arrow to block, column_name: {}, e: {}", column_name,
92+
e.what());
93+
}
94+
}
95+
96+
*read_rows += num_rows;
97+
return Status::OK();
98+
}
99+
100+
Status RemoteDorisReader::get_columns(std::unordered_map<std::string, DataTypePtr>* name_to_type,
101+
std::unordered_set<std::string>* missing_cols) {
102+
for (const auto& slot : _file_slot_descs) {
103+
name_to_type->emplace(slot->col_name(), slot->type());
104+
}
105+
return Status::OK();
106+
}
107+
108+
Status RemoteDorisReader::close() {
109+
RETURN_DORIS_STATUS_IF_ERROR(_flight_client->Close());
110+
return Status::OK();
111+
}
112+
113+
arrow::Status RemoteDorisReader::init_stream() {
114+
ARROW_ASSIGN_OR_RAISE(auto location,
115+
arrow::flight::Location::Parse(
116+
_range.table_format_params.remote_doris_params.location_uri));
117+
ARROW_ASSIGN_OR_RAISE(auto ticket,
118+
arrow::flight::Ticket::Deserialize(
119+
_range.table_format_params.remote_doris_params.ticket));
120+
ARROW_ASSIGN_OR_RAISE(_flight_client, arrow::flight::FlightClient::Connect(location));
121+
ARROW_ASSIGN_OR_RAISE(_stream, _flight_client->DoGet(ticket));
122+
123+
return arrow::Status::OK();
124+
}
125+
126+
#include "common/compile_check_end.h"
127+
} // namespace doris::vectorized
Lines changed: 70 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,70 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
#pragma once
19+
20+
#include <arrow/flight/client.h>
21+
22+
#include <cstddef>
23+
#include <string>
24+
#include <unordered_map>
25+
#include <unordered_set>
26+
#include <vector>
27+
28+
#include "common/status.h"
29+
#include "exec/olap_common.h"
30+
#include "vec/exec/format/jni_reader.h"
31+
32+
namespace doris {
33+
class RuntimeProfile;
34+
class RuntimeState;
35+
class SlotDescriptor;
36+
namespace vectorized {
37+
class Block;
38+
} // namespace vectorized
39+
} // namespace doris
40+
41+
namespace doris::vectorized {
42+
#include "common/compile_check_begin.h"
43+
class RemoteDorisReader : public GenericReader {
44+
ENABLE_FACTORY_CREATOR(RemoteDorisReader);
45+
46+
public:
47+
RemoteDorisReader(const std::vector<SlotDescriptor*>& file_slot_descs, RuntimeState* state,
48+
RuntimeProfile* profile, const TFileRangeDesc& range);
49+
50+
~RemoteDorisReader() override = default;
51+
52+
Status init_reader();
53+
54+
Status get_next_block(Block* block, size_t* read_rows, bool* eof) override;
55+
56+
Status get_columns(std::unordered_map<std::string, DataTypePtr>* name_to_type,
57+
std::unordered_set<std::string>* missing_cols) override;
58+
59+
Status close() override;
60+
61+
private:
62+
arrow::Status init_stream();
63+
const TFileRangeDesc& _range;
64+
const std::vector<SlotDescriptor*>& _file_slot_descs;
65+
cctz::time_zone _ctzz;
66+
std::unique_ptr<arrow::flight::FlightClient> _flight_client;
67+
std::unique_ptr<arrow::flight::FlightStreamReader> _stream;
68+
};
69+
#include "common/compile_check_end.h"
70+
} // namespace doris::vectorized

be/src/vec/exec/scan/file_scanner.cpp

Lines changed: 12 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -70,6 +70,7 @@
7070
#include "vec/exec/format/table/max_compute_jni_reader.h"
7171
#include "vec/exec/format/table/paimon_jni_reader.h"
7272
#include "vec/exec/format/table/paimon_reader.h"
73+
#include "vec/exec/format/table/remote_doris_reader.h"
7374
#include "vec/exec/format/table/transactional_hive_reader.h"
7475
#include "vec/exec/format/table/trino_connector_jni_reader.h"
7576
#include "vec/exec/format/text/text_reader.h"
@@ -1136,9 +1137,17 @@ Status FileScanner::_get_next_reader() {
11361137
break;
11371138
}
11381139
case TFileFormatType::FORMAT_ARROW: {
1139-
_cur_reader = ArrowStreamReader::create_unique(_state, _profile, &_counter, *_params,
1140-
range, _file_slot_descs, _io_ctx.get());
1141-
init_status = ((ArrowStreamReader*)(_cur_reader.get()))->init_reader();
1140+
if (range.__isset.table_format_params &&
1141+
range.table_format_params.table_format_type == "remote_doris") {
1142+
_cur_reader =
1143+
RemoteDorisReader::create_unique(_file_slot_descs, _state, _profile, range);
1144+
init_status = ((RemoteDorisReader*)(_cur_reader.get()))->init_reader();
1145+
} else {
1146+
_cur_reader =
1147+
ArrowStreamReader::create_unique(_state, _profile, &_counter, *_params,
1148+
range, _file_slot_descs, _io_ctx.get());
1149+
init_status = ((ArrowStreamReader*)(_cur_reader.get()))->init_reader();
1150+
}
11421151
break;
11431152
}
11441153
default:

fe/fe-core/src/main/java/org/apache/doris/catalog/KeysType.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,8 @@ public enum KeysType {
2626
PRIMARY_KEYS,
2727
DUP_KEYS,
2828
UNIQUE_KEYS,
29-
AGG_KEYS;
29+
AGG_KEYS,
30+
UNKNOWN;
3031

3132
/**
3233
* Determine whether it is an aggregation type.

fe/fe-core/src/main/java/org/apache/doris/catalog/TableIf.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -398,7 +398,7 @@ enum TableType {
398398
@Deprecated ICEBERG, @Deprecated HUDI, JDBC,
399399
TABLE_VALUED_FUNCTION, HMS_EXTERNAL_TABLE, ES_EXTERNAL_TABLE, MATERIALIZED_VIEW, JDBC_EXTERNAL_TABLE,
400400
ICEBERG_EXTERNAL_TABLE, TEST_EXTERNAL_TABLE, PAIMON_EXTERNAL_TABLE, MAX_COMPUTE_EXTERNAL_TABLE,
401-
HUDI_EXTERNAL_TABLE, TRINO_CONNECTOR_EXTERNAL_TABLE, LAKESOUl_EXTERNAL_TABLE, DICTIONARY;
401+
HUDI_EXTERNAL_TABLE, TRINO_CONNECTOR_EXTERNAL_TABLE, LAKESOUl_EXTERNAL_TABLE, DICTIONARY, DORIS_EXTERNAL_TABLE;
402402

403403
public String toEngineName() {
404404
switch (this) {
@@ -475,6 +475,7 @@ public String toMysqlType() {
475475
case PAIMON_EXTERNAL_TABLE:
476476
case MATERIALIZED_VIEW:
477477
case TRINO_CONNECTOR_EXTERNAL_TABLE:
478+
case DORIS_EXTERNAL_TABLE:
478479
return "BASE TABLE";
479480
default:
480481
return null;

fe/fe-core/src/main/java/org/apache/doris/common/util/JsonUtil.java

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -78,4 +78,16 @@ public static ObjectNode parseObject(String text) {
7878
public static <T> T readValue(String text, Class<T> clazz) throws JsonProcessingException {
7979
return objectMapper.readValue(text, clazz);
8080
}
81+
82+
public static Integer safeGetAsInt(ObjectNode node, String field) {
83+
JsonNode value = node.get(field);
84+
return (value == null || value.isNull()) ? null : value.asInt();
85+
}
86+
87+
public static String convertNodeToString(JsonNode node) {
88+
if (node == null || node.isNull()) {
89+
return null;
90+
}
91+
return node.isTextual() ? node.asText() : node.toString();
92+
}
8193
}

fe/fe-core/src/main/java/org/apache/doris/datasource/CatalogFactory.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
import org.apache.doris.catalog.Resource;
2222
import org.apache.doris.common.DdlException;
2323
import org.apache.doris.common.FeConstants;
24+
import org.apache.doris.datasource.doris.RemoteDorisExternalCatalog;
2425
import org.apache.doris.datasource.es.EsExternalCatalog;
2526
import org.apache.doris.datasource.hive.HMSExternalCatalog;
2627
import org.apache.doris.datasource.iceberg.IcebergExternalCatalogFactory;
@@ -108,6 +109,9 @@ private static CatalogIf createCatalog(long catalogId, String name, String resou
108109
break;
109110
case "lakesoul":
110111
throw new DdlException("Lakesoul catalog is no longer supported");
112+
case "doris":
113+
catalog = new RemoteDorisExternalCatalog(catalogId, name, resource, props, comment);
114+
break;
111115
case "test":
112116
if (!FeConstants.runningUnitTest) {
113117
throw new DdlException("test catalog is only for FE unit test");

fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalCatalog.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@
3838
import org.apache.doris.common.util.Util;
3939
import org.apache.doris.datasource.ExternalSchemaCache.SchemaCacheKey;
4040
import org.apache.doris.datasource.connectivity.CatalogConnectivityTestCoordinator;
41+
import org.apache.doris.datasource.doris.RemoteDorisExternalDatabase;
4142
import org.apache.doris.datasource.es.EsExternalDatabase;
4243
import org.apache.doris.datasource.hive.HMSExternalCatalog;
4344
import org.apache.doris.datasource.hive.HMSExternalDatabase;
@@ -842,6 +843,8 @@ protected ExternalDatabase<? extends ExternalTable> buildDbForInit(String remote
842843
return new PaimonExternalDatabase(this, dbId, localDbName, remoteDbName);
843844
case TRINO_CONNECTOR:
844845
return new TrinoConnectorExternalDatabase(this, dbId, localDbName, remoteDbName);
846+
case REMOTE_DORIS:
847+
return new RemoteDorisExternalDatabase(this, dbId, localDbName, remoteDbName);
845848
default:
846849
break;
847850
}

0 commit comments

Comments
 (0)