diff --git a/csharp/src/Drivers/Apache/Hive2/HiveServer2Connection.cs b/csharp/src/Drivers/Apache/Hive2/HiveServer2Connection.cs index fddb356f10..08506b3bf3 100644 --- a/csharp/src/Drivers/Apache/Hive2/HiveServer2Connection.cs +++ b/csharp/src/Drivers/Apache/Hive2/HiveServer2Connection.cs @@ -343,7 +343,7 @@ internal TCLIService.IAsync Client internal async Task OpenAsync() { - await this.TraceActivity(async activity => + await this.TraceActivityAsync(async activity => { CancellationToken cancellationToken = ApacheUtility.GetCancellationToken(ConnectTimeoutMilliseconds, ApacheUtility.TimeUnit.Milliseconds); try diff --git a/csharp/src/Drivers/Databricks/DatabricksConnection.cs b/csharp/src/Drivers/Databricks/DatabricksConnection.cs index d6480d214f..ccf0845d8a 100644 --- a/csharp/src/Drivers/Databricks/DatabricksConnection.cs +++ b/csharp/src/Drivers/Databricks/DatabricksConnection.cs @@ -31,6 +31,7 @@ using Apache.Arrow.Adbc.Drivers.Apache.Spark; using Apache.Arrow.Adbc.Drivers.Databricks.Auth; using Apache.Arrow.Adbc.Drivers.Databricks.Reader; +using Apache.Arrow.Adbc.Tracing; using Apache.Arrow.Ipc; using Apache.Hive.Service.Rpc.Thrift; using Thrift.Protocol; @@ -100,6 +101,28 @@ public DatabricksConnection(IReadOnlyDictionary properties) : ba ValidateProperties(); } + private void LogConnectionProperties(Activity? activity) + { + if (activity == null) return; + + foreach (var kvp in Properties) + { + string key = kvp.Key; + string value = kvp.Value; + + // Sanitize sensitive properties - only mask actual credentials/tokens, not configuration + bool isSensitive = key.IndexOf("password", StringComparison.OrdinalIgnoreCase) >= 0 || + key.IndexOf("secret", StringComparison.OrdinalIgnoreCase) >= 0 || + key.Equals(AdbcOptions.Password, StringComparison.OrdinalIgnoreCase) || + key.Equals(SparkParameters.AccessToken, StringComparison.OrdinalIgnoreCase) || + key.Equals(DatabricksParameters.OAuthClientSecret, StringComparison.OrdinalIgnoreCase); + + string logValue = isSensitive ? "***" : value; + + activity.SetTag(key, logValue); + } + } + public override IEnumerable>? GetActivitySourceTags(IReadOnlyDictionary properties) { IEnumerable>? tags = base.GetActivitySourceTags(properties); @@ -696,58 +719,129 @@ public override AdbcStatement CreateStatement() protected override TOpenSessionReq CreateSessionRequest() { - var req = new TOpenSessionReq + return this.TraceActivity(activity => { - Client_protocol = TProtocolVersion.SPARK_CLI_SERVICE_PROTOCOL_V7, - Client_protocol_i64 = (long)TProtocolVersion.SPARK_CLI_SERVICE_PROTOCOL_V7, - CanUseMultipleCatalogs = _enableMultipleCatalogSupport, - }; + // Log driver information at the beginning of the connection + activity?.AddEvent("connection.driver.info", [ + new("driver.name", "Apache Arrow ADBC Databricks Driver"), + new("driver.version", s_assemblyVersion), + new("driver.assembly", s_assemblyName) + ]); - // Set default namespace if available - if (_defaultNamespace != null) - { - req.InitialNamespace = _defaultNamespace; - } - req.Configuration = new Dictionary(); - // merge timestampConfig with serverSideProperties - foreach (var kvp in timestampConfig) - { - req.Configuration[kvp.Key] = kvp.Value; - } - // If not using queries to set server-side properties, include them in Configuration - if (!_applySSPWithQueries) - { - var serverSideProperties = GetServerSideProperties(); - foreach (var property in serverSideProperties) + // Log connection properties (sanitize sensitive values) + LogConnectionProperties(activity); + + var req = new TOpenSessionReq { - req.Configuration[property.Key] = property.Value; + Client_protocol = TProtocolVersion.SPARK_CLI_SERVICE_PROTOCOL_V7, + Client_protocol_i64 = (long)TProtocolVersion.SPARK_CLI_SERVICE_PROTOCOL_V7, + CanUseMultipleCatalogs = _enableMultipleCatalogSupport, + }; + + // Log OpenSession request details + activity?.SetTag("connection.client_protocol", req.Client_protocol.ToString()); + + // Set default namespace if available + if (_defaultNamespace != null) + { + req.InitialNamespace = _defaultNamespace; + activity?.SetTag("connection.initial_namespace.catalog", _defaultNamespace.CatalogName ?? "(none)"); + activity?.SetTag("connection.initial_namespace.schema", _defaultNamespace.SchemaName ?? "(none)"); } - } - return req; + req.Configuration = new Dictionary(); + // merge timestampConfig with serverSideProperties + foreach (var kvp in timestampConfig) + { + req.Configuration[kvp.Key] = kvp.Value; + } + // If not using queries to set server-side properties, include them in Configuration + if (!_applySSPWithQueries) + { + var serverSideProperties = GetServerSideProperties(); + foreach (var property in serverSideProperties) + { + req.Configuration[property.Key] = property.Value; + } + } + + activity?.SetTag("connection.configuration_count", req.Configuration.Count); + + return req; + }); } protected override async Task HandleOpenSessionResponse(TOpenSessionResp? session, Activity? activity = default) { + await base.HandleOpenSessionResponse(session, activity); + if (session != null) { var version = session.ServerProtocolVersion; + + // Log server protocol version + activity?.SetTag("connection.server_protocol_version", version.ToString()); + + // Validate it's a Databricks server if (!FeatureVersionNegotiator.IsDatabricksProtocolVersion(version)) { + activity?.SetTag("error.type", "InvalidServerProtocol"); + activity?.SetTag("error.message", "Non-Databricks server detected"); throw new DatabricksException("Attempted to use databricks driver with a non-databricks server"); } - _enablePKFK = _enablePKFK && FeatureVersionNegotiator.SupportsPKFK(version); + + // Log protocol version capabilities (what the server supports) + bool protocolSupportsPKFK = FeatureVersionNegotiator.SupportsPKFK(version); + bool protocolSupportsDescTableExtended = FeatureVersionNegotiator.SupportsDESCTableExtended(version); + + activity?.SetTag("connection.protocol.supports_pk_fk", protocolSupportsPKFK); + activity?.SetTag("connection.protocol.supports_desc_table_extended", protocolSupportsDescTableExtended); + + // Apply protocol constraints to user settings + bool pkfkBefore = _enablePKFK; + _enablePKFK = _enablePKFK && protocolSupportsPKFK; + + if (pkfkBefore && !_enablePKFK) + { + activity?.SetTag("connection.feature_downgrade.pk_fk", true); + activity?.SetTag("connection.feature_downgrade.pk_fk.reason", "Protocol version does not support PK/FK"); + } + + // Handle multiple catalog support from server response _enableMultipleCatalogSupport = session.__isset.canUseMultipleCatalogs ? session.CanUseMultipleCatalogs : false; + + // Log final feature flags as tags + activity?.SetTag("connection.feature.enable_pk_fk", _enablePKFK); + activity?.SetTag("connection.feature.enable_multiple_catalog_support", _enableMultipleCatalogSupport); + activity?.SetTag("connection.feature.enable_direct_results", _enableDirectResults); + activity?.SetTag("connection.feature.use_cloud_fetch", _useCloudFetch); + activity?.SetTag("connection.feature.use_desc_table_extended", _useDescTableExtended); + activity?.SetTag("connection.feature.enable_run_async_in_thrift_op", _runAsyncInThrift); + + // Handle default namespace if (session.__isset.initialNamespace) { _defaultNamespace = session.InitialNamespace; + activity?.AddEvent("connection.namespace.set_from_server", [ + new("catalog", _defaultNamespace.CatalogName ?? "(none)"), + new("schema", _defaultNamespace.SchemaName ?? "(none)") + ]); } else if (_defaultNamespace != null && !string.IsNullOrEmpty(_defaultNamespace.SchemaName)) { // catalog in namespace is introduced when SET CATALOG is introduced, so we don't need to fallback // server version is too old. Explicitly set the schema using queries + activity?.AddEvent("connection.namespace.fallback_to_use_schema", [ + new("schema_name", _defaultNamespace.SchemaName), + new("reason", "Server does not support initialNamespace in OpenSessionResp") + ]); await SetSchema(_defaultNamespace.SchemaName); } + + } + else + { + activity?.SetTag("error.type", "NullSessionResponse"); } }