Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion csharp/src/Drivers/Apache/Hive2/HiveServer2Connection.cs
Original file line number Diff line number Diff line change
Expand Up @@ -343,7 +343,7 @@ internal TCLIService.IAsync Client

internal async Task OpenAsync()
{
await this.TraceActivity(async activity =>
await this.TraceActivityAsync(async activity =>
{
CancellationToken cancellationToken = ApacheUtility.GetCancellationToken(ConnectTimeoutMilliseconds, ApacheUtility.TimeUnit.Milliseconds);
try
Expand Down
144 changes: 119 additions & 25 deletions csharp/src/Drivers/Databricks/DatabricksConnection.cs
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
using Apache.Arrow.Adbc.Drivers.Apache.Spark;
using Apache.Arrow.Adbc.Drivers.Databricks.Auth;
using Apache.Arrow.Adbc.Drivers.Databricks.Reader;
using Apache.Arrow.Adbc.Tracing;
using Apache.Arrow.Ipc;
using Apache.Hive.Service.Rpc.Thrift;
using Thrift.Protocol;
Expand Down Expand Up @@ -100,6 +101,28 @@ public DatabricksConnection(IReadOnlyDictionary<string, string> properties) : ba
ValidateProperties();
}

private void LogConnectionProperties(Activity? activity)
{
if (activity == null) return;

foreach (var kvp in Properties)
{
string key = kvp.Key;
string value = kvp.Value;

// Sanitize sensitive properties - only mask actual credentials/tokens, not configuration
bool isSensitive = key.IndexOf("password", StringComparison.OrdinalIgnoreCase) >= 0 ||
key.IndexOf("secret", StringComparison.OrdinalIgnoreCase) >= 0 ||
key.Equals(AdbcOptions.Password, StringComparison.OrdinalIgnoreCase) ||
key.Equals(SparkParameters.AccessToken, StringComparison.OrdinalIgnoreCase) ||
key.Equals(DatabricksParameters.OAuthClientSecret, StringComparison.OrdinalIgnoreCase);

string logValue = isSensitive ? "***" : value;

activity.SetTag(key, logValue);
}
}

public override IEnumerable<KeyValuePair<string, object?>>? GetActivitySourceTags(IReadOnlyDictionary<string, string> properties)
{
IEnumerable<KeyValuePair<string, object?>>? tags = base.GetActivitySourceTags(properties);
Expand Down Expand Up @@ -696,58 +719,129 @@ public override AdbcStatement CreateStatement()

protected override TOpenSessionReq CreateSessionRequest()
{
var req = new TOpenSessionReq
return this.TraceActivity(activity =>
{
Client_protocol = TProtocolVersion.SPARK_CLI_SERVICE_PROTOCOL_V7,
Client_protocol_i64 = (long)TProtocolVersion.SPARK_CLI_SERVICE_PROTOCOL_V7,
CanUseMultipleCatalogs = _enableMultipleCatalogSupport,
};
// Log driver information at the beginning of the connection
activity?.AddEvent("connection.driver.info", [
new("driver.name", "Apache Arrow ADBC Databricks Driver"),
new("driver.version", s_assemblyVersion),
new("driver.assembly", s_assemblyName)
]);

// Set default namespace if available
if (_defaultNamespace != null)
{
req.InitialNamespace = _defaultNamespace;
}
req.Configuration = new Dictionary<string, string>();
// merge timestampConfig with serverSideProperties
foreach (var kvp in timestampConfig)
{
req.Configuration[kvp.Key] = kvp.Value;
}
// If not using queries to set server-side properties, include them in Configuration
if (!_applySSPWithQueries)
{
var serverSideProperties = GetServerSideProperties();
foreach (var property in serverSideProperties)
// Log connection properties (sanitize sensitive values)
LogConnectionProperties(activity);

var req = new TOpenSessionReq
{
req.Configuration[property.Key] = property.Value;
Client_protocol = TProtocolVersion.SPARK_CLI_SERVICE_PROTOCOL_V7,
Client_protocol_i64 = (long)TProtocolVersion.SPARK_CLI_SERVICE_PROTOCOL_V7,
CanUseMultipleCatalogs = _enableMultipleCatalogSupport,
};

// Log OpenSession request details
activity?.SetTag("connection.client_protocol", req.Client_protocol.ToString());

// Set default namespace if available
if (_defaultNamespace != null)
{
req.InitialNamespace = _defaultNamespace;
activity?.SetTag("connection.initial_namespace.catalog", _defaultNamespace.CatalogName ?? "(none)");
activity?.SetTag("connection.initial_namespace.schema", _defaultNamespace.SchemaName ?? "(none)");
}
}
return req;
req.Configuration = new Dictionary<string, string>();
// merge timestampConfig with serverSideProperties
foreach (var kvp in timestampConfig)
{
req.Configuration[kvp.Key] = kvp.Value;
}
// If not using queries to set server-side properties, include them in Configuration
if (!_applySSPWithQueries)
{
var serverSideProperties = GetServerSideProperties();
foreach (var property in serverSideProperties)
{
req.Configuration[property.Key] = property.Value;
}
}

activity?.SetTag("connection.configuration_count", req.Configuration.Count);

return req;
});
}

protected override async Task HandleOpenSessionResponse(TOpenSessionResp? session, Activity? activity = default)
{

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: remove extra blank line

await base.HandleOpenSessionResponse(session, activity);

if (session != null)
{
var version = session.ServerProtocolVersion;

// Log server protocol version
activity?.SetTag("connection.server_protocol_version", version.ToString());

// Validate it's a Databricks server
if (!FeatureVersionNegotiator.IsDatabricksProtocolVersion(version))
{
activity?.SetTag("error.type", "InvalidServerProtocol");
activity?.SetTag("error.message", "Non-Databricks server detected");
throw new DatabricksException("Attempted to use databricks driver with a non-databricks server");
}
_enablePKFK = _enablePKFK && FeatureVersionNegotiator.SupportsPKFK(version);

// Log protocol version capabilities (what the server supports)
bool protocolSupportsPKFK = FeatureVersionNegotiator.SupportsPKFK(version);
bool protocolSupportsDescTableExtended = FeatureVersionNegotiator.SupportsDESCTableExtended(version);

activity?.SetTag("connection.protocol.supports_pk_fk", protocolSupportsPKFK);
activity?.SetTag("connection.protocol.supports_desc_table_extended", protocolSupportsDescTableExtended);

// Apply protocol constraints to user settings
bool pkfkBefore = _enablePKFK;
_enablePKFK = _enablePKFK && protocolSupportsPKFK;

if (pkfkBefore && !_enablePKFK)
{
activity?.SetTag("connection.feature_downgrade.pk_fk", true);
activity?.SetTag("connection.feature_downgrade.pk_fk.reason", "Protocol version does not support PK/FK");
}

// Handle multiple catalog support from server response
_enableMultipleCatalogSupport = session.__isset.canUseMultipleCatalogs ? session.CanUseMultipleCatalogs : false;

// Log final feature flags as tags
activity?.SetTag("connection.feature.enable_pk_fk", _enablePKFK);
activity?.SetTag("connection.feature.enable_multiple_catalog_support", _enableMultipleCatalogSupport);
activity?.SetTag("connection.feature.enable_direct_results", _enableDirectResults);
activity?.SetTag("connection.feature.use_cloud_fetch", _useCloudFetch);
activity?.SetTag("connection.feature.use_desc_table_extended", _useDescTableExtended);
activity?.SetTag("connection.feature.enable_run_async_in_thrift_op", _runAsyncInThrift);

// Handle default namespace
if (session.__isset.initialNamespace)
{
_defaultNamespace = session.InitialNamespace;
activity?.AddEvent("connection.namespace.set_from_server", [
new("catalog", _defaultNamespace.CatalogName ?? "(none)"),
new("schema", _defaultNamespace.SchemaName ?? "(none)")
]);
}
else if (_defaultNamespace != null && !string.IsNullOrEmpty(_defaultNamespace.SchemaName))
{
// catalog in namespace is introduced when SET CATALOG is introduced, so we don't need to fallback
// server version is too old. Explicitly set the schema using queries
activity?.AddEvent("connection.namespace.fallback_to_use_schema", [
new("schema_name", _defaultNamespace.SchemaName),
new("reason", "Server does not support initialNamespace in OpenSessionResp")
]);
await SetSchema(_defaultNamespace.SchemaName);
}

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: remove extra blank line

}
else
{
activity?.SetTag("error.type", "NullSessionResponse");
}
}

Expand Down
Loading