Skip to content
103 changes: 102 additions & 1 deletion csharp/src/Drivers/Databricks/DatabricksConnection.cs
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
using Apache.Arrow.Adbc.Drivers.Apache.Spark;
using Apache.Arrow.Adbc.Drivers.Databricks.Auth;
using Apache.Arrow.Adbc.Drivers.Databricks.Reader;
using Apache.Arrow.Adbc.Tracing;
using Apache.Arrow.Ipc;
using Apache.Hive.Service.Rpc.Thrift;
using Thrift.Protocol;
Expand Down Expand Up @@ -100,6 +101,32 @@ public DatabricksConnection(IReadOnlyDictionary<string, string> properties) : ba
ValidateProperties();
}

private void LogConnectionProperties(Activity? activity)
{
if (activity == null) return;

activity.AddEvent("connection.properties.start");

foreach (var kvp in Properties)
{
string key = kvp.Key;
string value = kvp.Value;

// Sanitize sensitive properties - only mask actual credentials/tokens, not configuration
bool isSensitive = key.IndexOf("password", StringComparison.OrdinalIgnoreCase) >= 0 ||
key.IndexOf("secret", StringComparison.OrdinalIgnoreCase) >= 0 ||
key.Equals(AdbcOptions.Password, StringComparison.OrdinalIgnoreCase) ||
key.Equals(SparkParameters.AccessToken, StringComparison.OrdinalIgnoreCase) ||
key.Equals(DatabricksParameters.OAuthClientSecret, StringComparison.OrdinalIgnoreCase);

string logValue = isSensitive ? "***" : value;

activity.SetTag(key, logValue);
}

activity.AddEvent("connection.properties.end");
}

public override IEnumerable<KeyValuePair<string, object?>>? GetActivitySourceTags(IReadOnlyDictionary<string, string> properties)
{
IEnumerable<KeyValuePair<string, object?>>? tags = base.GetActivitySourceTags(properties);
Expand Down Expand Up @@ -668,17 +695,34 @@ public override AdbcStatement CreateStatement()

protected override TOpenSessionReq CreateSessionRequest()
{
// Log driver information at the beginning of the connection
Activity.Current?.AddEvent("connection.driver.info", [
new("driver.name", "Apache Arrow ADBC Databricks Driver"),
new("driver.version", s_assemblyVersion),
new("driver.assembly", s_assemblyName)
]);

// Log connection properties (sanitize sensitive values)
LogConnectionProperties(Activity.Current);

var req = new TOpenSessionReq
{
Client_protocol = TProtocolVersion.SPARK_CLI_SERVICE_PROTOCOL_V7,
Client_protocol_i64 = (long)TProtocolVersion.SPARK_CLI_SERVICE_PROTOCOL_V7,
CanUseMultipleCatalogs = _enableMultipleCatalogSupport,
};

// Log OpenSession request details
Activity.Current?.AddEvent("connection.open_session_request.creating");
Activity.Current?.SetTag("connection.client_protocol", req.Client_protocol.ToString());
Activity.Current?.SetTag("connection.can_use_multiple_catalogs", _enableMultipleCatalogSupport);

// Set default namespace if available
if (_defaultNamespace != null)
{
req.InitialNamespace = _defaultNamespace;
Activity.Current?.SetTag("connection.initial_namespace.catalog", _defaultNamespace.CatalogName ?? "(none)");
Activity.Current?.SetTag("connection.initial_namespace.schema", _defaultNamespace.SchemaName ?? "(none)");
}
req.Configuration = new Dictionary<string, string>();
// merge timestampConfig with serverSideProperties
Expand All @@ -695,31 +739,88 @@ protected override TOpenSessionReq CreateSessionRequest()
req.Configuration[property.Key] = property.Value;
}
}

Activity.Current?.SetTag("connection.configuration_count", req.Configuration.Count);
Activity.Current?.AddEvent("connection.open_session_request.created");

return req;
}

protected override async Task HandleOpenSessionResponse(TOpenSessionResp? session, Activity? activity = default)
{
activity?.AddEvent("connection.open_session_response.received");
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems that since this HandleOpenSessionresponse is called from after this OpenSession

session = await Client.OpenSession(request, cancellationToken);
, and operation to the activity after that call will somehow not be serialized to the log. Do you have any idea why? @birschick-bq


Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: remove extra blank line

await base.HandleOpenSessionResponse(session, activity);

if (session != null)
{
var version = session.ServerProtocolVersion;

// Log server protocol version
activity?.SetTag("connection.server_protocol_version", version.ToString());

// Validate it's a Databricks server
if (!FeatureVersionNegotiator.IsDatabricksProtocolVersion(version))
{
activity?.SetTag("error.type", "InvalidServerProtocol");
activity?.SetTag("error.message", "Non-Databricks server detected");
throw new DatabricksException("Attempted to use databricks driver with a non-databricks server");
}
_enablePKFK = _enablePKFK && FeatureVersionNegotiator.SupportsPKFK(version);

// Log protocol version capabilities (what the server supports)
bool protocolSupportsPKFK = FeatureVersionNegotiator.SupportsPKFK(version);
bool protocolSupportsDescTableExtended = FeatureVersionNegotiator.SupportsDESCTableExtended(version);

activity?.SetTag("connection.protocol.supports_pk_fk", protocolSupportsPKFK);
activity?.SetTag("connection.protocol.supports_desc_table_extended", protocolSupportsDescTableExtended);

// Apply protocol constraints to user settings
bool pkfkBefore = _enablePKFK;
_enablePKFK = _enablePKFK && protocolSupportsPKFK;

if (pkfkBefore && !_enablePKFK)
{
activity?.SetTag("connection.feature_downgrade.pk_fk", true);
activity?.SetTag("connection.feature_downgrade.pk_fk.reason", "Protocol version does not support PK/FK");
}

// Handle multiple catalog support from server response
_enableMultipleCatalogSupport = session.__isset.canUseMultipleCatalogs ? session.CanUseMultipleCatalogs : false;

// Log final feature flags as tags
activity?.SetTag("connection.feature.enable_pk_fk", _enablePKFK);
activity?.SetTag("connection.feature.enable_multiple_catalog_support", _enableMultipleCatalogSupport);
activity?.SetTag("connection.feature.enable_direct_results", _enableDirectResults);
activity?.SetTag("connection.feature.use_cloud_fetch", _useCloudFetch);
activity?.SetTag("connection.feature.use_desc_table_extended", _useDescTableExtended);
activity?.SetTag("connection.feature.enable_run_async_in_thrift_op", _runAsyncInThrift);

// Handle default namespace
if (session.__isset.initialNamespace)
{
_defaultNamespace = session.InitialNamespace;
activity?.AddEvent("connection.namespace.set_from_server", [
new("catalog", _defaultNamespace.CatalogName ?? "(none)"),
new("schema", _defaultNamespace.SchemaName ?? "(none)")
]);
}
else if (_defaultNamespace != null && !string.IsNullOrEmpty(_defaultNamespace.SchemaName))
{
// catalog in namespace is introduced when SET CATALOG is introduced, so we don't need to fallback
// server version is too old. Explicitly set the schema using queries
activity?.AddEvent("connection.namespace.fallback_to_use_schema", [
new("schema_name", _defaultNamespace.SchemaName),
new("reason", "Server does not support initialNamespace in OpenSessionResp")
]);
await SetSchema(_defaultNamespace.SchemaName);
}

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: remove extra blank line

activity?.AddEvent("connection.open_session_response.completed");
}
else
{
activity?.SetTag("error.type", "NullSessionResponse");
activity?.AddEvent("connection.open_session_response.null");
}
}

Expand Down
Loading