-
Notifications
You must be signed in to change notification settings - Fork 165
feat(csharp/src/Drivers/Databricks): Implement telemetry configuration and exporter (phase 2.0) #3664
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
feat(csharp/src/Drivers/Databricks): Implement telemetry configuration and exporter (phase 2.0) #3664
Changes from all commits
59f8b23
a9439d6
de4e548
b2a4c58
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,82 @@ | ||
| /* | ||
| * Licensed to the Apache Software Foundation (ASF) under one or more | ||
| * contributor license agreements. See the NOTICE file distributed with | ||
| * this work for additional information regarding copyright ownership. | ||
| * The ASF licenses this file to You under the Apache License, Version 2.0 | ||
| * (the "License"); you may not use this file except in compliance with | ||
| * the License. You may obtain a copy of the License at | ||
| * | ||
| * http://www.apache.org/licenses/LICENSE-2.0 | ||
| * | ||
| * Unless required by applicable law or agreed to in writing, software | ||
| * distributed under the License is distributed on an "AS IS" BASIS, | ||
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
| * See the License for the specific language governing permissions and | ||
| * limitations under the License. | ||
| */ | ||
|
|
||
| using System; | ||
| using System.Collections.Generic; | ||
| using System.Diagnostics; | ||
| using System.Net.Http; | ||
| using System.Text; | ||
| using System.Text.Json; | ||
| using System.Threading; | ||
| using System.Threading.Tasks; | ||
|
|
||
| namespace Apache.Arrow.Adbc.Drivers.Databricks.Telemetry | ||
| { | ||
| internal sealed class DatabricksTelemetryExporter : IDatabricksTelemetryExporter | ||
| { | ||
| private const string TelemetryEndpoint = "/telemetry-ext"; | ||
|
|
||
| private readonly HttpClient _httpClient; | ||
| private readonly string _hostUrl; | ||
|
|
||
| public DatabricksTelemetryExporter(HttpClient httpClient, string hostUrl) | ||
| { | ||
| _httpClient = httpClient ?? throw new ArgumentNullException(nameof(httpClient)); | ||
| _hostUrl = !string.IsNullOrEmpty(hostUrl) ? hostUrl : throw new ArgumentException("Host URL cannot be null or empty.", nameof(hostUrl)); | ||
| } | ||
|
|
||
| public async Task ExportAsync( | ||
| IReadOnlyList<TelemetryMetric> metrics, | ||
| CancellationToken cancellationToken = default) | ||
| { | ||
| if (metrics == null || metrics.Count == 0) | ||
| { | ||
| return; | ||
| } | ||
|
|
||
| try | ||
| { | ||
| await SendMetricsAsync(metrics, cancellationToken); | ||
| } | ||
| catch (Exception ex) | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Should there be a retry mechanism for these? |
||
| { | ||
| Debug.WriteLine($"Telemetry export failed: {ex.Message}"); | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Should this eventually hook into tracing instead? |
||
| } | ||
| } | ||
|
|
||
| private async Task SendMetricsAsync( | ||
| IReadOnlyList<TelemetryMetric> metrics, | ||
| CancellationToken cancellationToken) | ||
| { | ||
| string fullUrl = new UriBuilder(_hostUrl) { Path = TelemetryEndpoint }.ToString(); | ||
| string json = SerializeMetrics(metrics); | ||
| var content = new StringContent(json, Encoding.UTF8, "application/json"); | ||
| var request = new HttpRequestMessage(HttpMethod.Post, fullUrl) { Content = content }; | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is an unauthenticated endpoint? |
||
| HttpResponseMessage response = await _httpClient.SendAsync(request, cancellationToken); | ||
| response.EnsureSuccessStatusCode(); | ||
| } | ||
|
|
||
| private string SerializeMetrics(IReadOnlyList<TelemetryMetric> metrics) | ||
| { | ||
| var payload = new { metrics = metrics }; | ||
| return JsonSerializer.Serialize(payload, new JsonSerializerOptions | ||
| { | ||
| PropertyNamingPolicy = JsonNamingPolicy.CamelCase | ||
| }); | ||
| } | ||
| } | ||
| } | ||
| Original file line number | Diff line number | Diff line change | ||||
|---|---|---|---|---|---|---|
| @@ -0,0 +1,34 @@ | ||||||
| /* | ||||||
| * Licensed to the Apache Software Foundation (ASF) under one or more | ||||||
| * contributor license agreements. See the NOTICE file distributed with | ||||||
| * this work for additional information regarding copyright ownership. | ||||||
| * The ASF licenses this file to You under the Apache License, Version 2.0 | ||||||
| * (the "License"); you may not use this file except in compliance with | ||||||
| * the License. You may obtain a copy of the License at | ||||||
| * | ||||||
| * http://www.apache.org/licenses/LICENSE-2.0 | ||||||
| * | ||||||
| * Unless required by applicable law or agreed to in writing, software | ||||||
| * distributed under the License is distributed on an "AS IS" BASIS, | ||||||
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||||||
| * See the License for the specific language governing permissions and | ||||||
| * limitations under the License. | ||||||
| */ | ||||||
|
|
||||||
| using System.Collections.Generic; | ||||||
| using System.Threading; | ||||||
| using System.Threading.Tasks; | ||||||
|
|
||||||
| namespace Apache.Arrow.Adbc.Drivers.Databricks.Telemetry | ||||||
| { | ||||||
| /// <summary> | ||||||
| /// Interface for exporting telemetry metrics to Databricks telemetry service. | ||||||
| /// Implementations must never throw exceptions. | ||||||
| /// </summary> | ||||||
| public interface IDatabricksTelemetryExporter | ||||||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
|
||||||
| { | ||||||
| Task ExportAsync( | ||||||
| IReadOnlyList<TelemetryMetric> metrics, | ||||||
| CancellationToken cancellationToken = default); | ||||||
| } | ||||||
| } | ||||||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,85 @@ | ||
| /* | ||
| * Licensed to the Apache Software Foundation (ASF) under one or more | ||
| * contributor license agreements. See the NOTICE file distributed with | ||
| * this work for additional information regarding copyright ownership. | ||
| * The ASF licenses this file to You under the Apache License, Version 2.0 | ||
| * (the "License"); you may not use this file except in compliance with | ||
| * the License. You may obtain a copy of the License at | ||
| * | ||
| * http://www.apache.org/licenses/LICENSE-2.0 | ||
| * | ||
| * Unless required by applicable law or agreed to in writing, software | ||
| * distributed under the License is distributed on an "AS IS" BASIS, | ||
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
| * See the License for the specific language governing permissions and | ||
| * limitations under the License. | ||
| */ | ||
|
|
||
| using System.Collections.Generic; | ||
|
|
||
| namespace Apache.Arrow.Adbc.Drivers.Databricks.Telemetry.TagDefinitions | ||
| { | ||
| /// <summary> | ||
| /// Tag definitions for Connection.Open events. | ||
| /// </summary> | ||
| internal static class ConnectionOpenEvent | ||
| { | ||
| public const string EventName = "Connection.Open"; | ||
|
|
||
| // Identity | ||
| [TelemetryTag("workspace.id", ExportScope = TagExportScope.ExportAll, Required = true, Description = "Workspace ID")] | ||
| public const string WorkspaceId = "workspace.id"; | ||
|
|
||
| [TelemetryTag("session.id", ExportScope = TagExportScope.ExportAll, Required = true, Description = "Session ID")] | ||
| public const string SessionId = "session.id"; | ||
|
|
||
| // Driver Configuration | ||
| [TelemetryTag("driver.version", ExportScope = TagExportScope.ExportAll, Description = "Driver version")] | ||
| public const string DriverVersion = "driver.version"; | ||
|
|
||
| [TelemetryTag("driver.os", ExportScope = TagExportScope.ExportAll, Description = "Operating system")] | ||
| public const string DriverOS = "driver.os"; | ||
|
|
||
| [TelemetryTag("driver.runtime", ExportScope = TagExportScope.ExportAll, Description = ".NET runtime")] | ||
| public const string DriverRuntime = "driver.runtime"; | ||
|
|
||
| // Feature Flags | ||
| [TelemetryTag("feature.cloudfetch", ExportScope = TagExportScope.ExportAll, Description = "CloudFetch enabled")] | ||
| public const string FeatureCloudFetch = "feature.cloudfetch"; | ||
|
|
||
| [TelemetryTag("feature.lz4", ExportScope = TagExportScope.ExportAll, Description = "LZ4 compression enabled")] | ||
| public const string FeatureLz4 = "feature.lz4"; | ||
|
|
||
| [TelemetryTag("feature.direct_results", ExportScope = TagExportScope.ExportAll, Description = "Direct results enabled")] | ||
| public const string FeatureDirectResults = "feature.direct_results"; | ||
|
|
||
| [TelemetryTag("feature.multiple_catalog", ExportScope = TagExportScope.ExportAll, Description = "Multiple catalog enabled")] | ||
| public const string FeatureMultipleCatalog = "feature.multiple_catalog"; | ||
|
|
||
| [TelemetryTag("feature.trace_propagation", ExportScope = TagExportScope.ExportAll, Description = "Trace propagation enabled")] | ||
| public const string FeatureTracePropagation = "feature.trace_propagation"; | ||
|
|
||
| [TelemetryTag("server.address", ExportScope = TagExportScope.ExportLocal, Description = "Server address")] | ||
| public const string ServerAddress = "server.address"; | ||
|
|
||
| /// <summary> | ||
| /// Returns tags allowed for Databricks export (privacy filter). | ||
| /// </summary> | ||
| public static IReadOnlyCollection<string> GetDatabricksExportTags() | ||
| { | ||
| return new HashSet<string> | ||
| { | ||
| WorkspaceId, | ||
| SessionId, | ||
| DriverVersion, | ||
| DriverOS, | ||
| DriverRuntime, | ||
| FeatureCloudFetch, | ||
| FeatureLz4, | ||
| FeatureDirectResults, | ||
| FeatureMultipleCatalog, | ||
| FeatureTracePropagation | ||
| }; | ||
| } | ||
| } | ||
| } |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,70 @@ | ||
| /* | ||
| * Licensed to the Apache Software Foundation (ASF) under one or more | ||
| * contributor license agreements. See the NOTICE file distributed with | ||
| * this work for additional information regarding copyright ownership. | ||
| * The ASF licenses this file to You under the Apache License, Version 2.0 | ||
| * (the "License"); you may not use this file except in compliance with | ||
| * the License. You may obtain a copy of the License at | ||
| * | ||
| * http://www.apache.org/licenses/LICENSE-2.0 | ||
| * | ||
| * Unless required by applicable law or agreed to in writing, software | ||
| * distributed under the License is distributed on an "AS IS" BASIS, | ||
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
| * See the License for the specific language governing permissions and | ||
| * limitations under the License. | ||
| */ | ||
|
|
||
| using System.Collections.Generic; | ||
|
|
||
| namespace Apache.Arrow.Adbc.Drivers.Databricks.Telemetry.TagDefinitions | ||
| { | ||
| /// <summary> | ||
| /// Tag definitions for Error events. | ||
| /// </summary> | ||
| internal static class ErrorEvent | ||
| { | ||
| public const string EventName = "Error"; | ||
|
|
||
| // Error Classification | ||
| [TelemetryTag("error.type", ExportScope = TagExportScope.ExportAll, Required = true, Description = "Error type")] | ||
| public const string ErrorType = "error.type"; | ||
|
|
||
| [TelemetryTag("http.status_code", ExportScope = TagExportScope.ExportAll, Description = "HTTP status code")] | ||
| public const string HttpStatusCode = "http.status_code"; | ||
|
|
||
| [TelemetryTag("db.sql_state", ExportScope = TagExportScope.ExportAll, Description = "SQL state")] | ||
| public const string DbSqlState = "db.sql_state"; | ||
|
|
||
| [TelemetryTag("error.operation", ExportScope = TagExportScope.ExportAll, Description = "Failed operation")] | ||
| public const string ErrorOperation = "error.operation"; | ||
|
|
||
| [TelemetryTag("error.retried", ExportScope = TagExportScope.ExportAll, Description = "Was retried")] | ||
| public const string ErrorRetried = "error.retried"; | ||
|
|
||
| [TelemetryTag("error.retry_count", ExportScope = TagExportScope.ExportAll, Description = "Retry count")] | ||
| public const string ErrorRetryCount = "error.retry_count"; | ||
|
|
||
| [TelemetryTag("error.message", ExportScope = TagExportScope.ExportLocal, Description = "Error message")] | ||
| public const string ErrorMessage = "error.message"; | ||
|
|
||
| [TelemetryTag("error.stack_trace", ExportScope = TagExportScope.ExportLocal, Description = "Stack trace")] | ||
| public const string ErrorStackTrace = "error.stack_trace"; | ||
|
|
||
| /// <summary> | ||
| /// Returns tags allowed for Databricks export (privacy filter). | ||
| /// </summary> | ||
| public static IReadOnlyCollection<string> GetDatabricksExportTags() | ||
| { | ||
| return new HashSet<string> | ||
| { | ||
| ErrorType, | ||
| HttpStatusCode, | ||
| DbSqlState, | ||
| ErrorOperation, | ||
| ErrorRetried, | ||
| ErrorRetryCount | ||
| }; | ||
| } | ||
| } | ||
| } |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,89 @@ | ||
| /* | ||
| * Licensed to the Apache Software Foundation (ASF) under one or more | ||
| * contributor license agreements. See the NOTICE file distributed with | ||
| * this work for additional information regarding copyright ownership. | ||
| * The ASF licenses this file to You under the Apache License, Version 2.0 | ||
| * (the "License"); you may not use this file except in compliance with | ||
| * the License. You may obtain a copy of the License at | ||
| * | ||
| * http://www.apache.org/licenses/LICENSE-2.0 | ||
| * | ||
| * Unless required by applicable law or agreed to in writing, software | ||
| * distributed under the License is distributed on an "AS IS" BASIS, | ||
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
| * See the License for the specific language governing permissions and | ||
| * limitations under the License. | ||
| */ | ||
|
|
||
| using System.Collections.Generic; | ||
|
|
||
| namespace Apache.Arrow.Adbc.Drivers.Databricks.Telemetry.TagDefinitions | ||
| { | ||
| /// <summary> | ||
| /// Tag definitions for Statement execution events. | ||
| /// </summary> | ||
| internal static class StatementExecutionEvent | ||
| { | ||
| public const string EventName = "Statement.Execute"; | ||
|
|
||
| // Identity | ||
| [TelemetryTag("statement.id", ExportScope = TagExportScope.ExportAll, Required = true, Description = "Statement ID")] | ||
| public const string StatementId = "statement.id"; | ||
|
|
||
| [TelemetryTag("session.id", ExportScope = TagExportScope.ExportAll, Required = true, Description = "Session ID")] | ||
| public const string SessionId = "session.id"; | ||
|
|
||
| // Result Metrics | ||
| [TelemetryTag("result.format", ExportScope = TagExportScope.ExportAll, Description = "Result format")] | ||
| public const string ResultFormat = "result.format"; | ||
|
|
||
| [TelemetryTag("result.chunk_count", ExportScope = TagExportScope.ExportAll, Description = "Chunk count")] | ||
| public const string ResultChunkCount = "result.chunk_count"; | ||
|
|
||
| [TelemetryTag("result.bytes_downloaded", ExportScope = TagExportScope.ExportAll, Description = "Bytes downloaded")] | ||
| public const string ResultBytesDownloaded = "result.bytes_downloaded"; | ||
|
|
||
| [TelemetryTag("result.compression_enabled", ExportScope = TagExportScope.ExportAll, Description = "Compression enabled")] | ||
| public const string ResultCompressionEnabled = "result.compression_enabled"; | ||
|
|
||
| [TelemetryTag("result.row_count", ExportScope = TagExportScope.ExportAll, Description = "Row count")] | ||
| public const string ResultRowCount = "result.row_count"; | ||
|
|
||
| // Polling Metrics | ||
| [TelemetryTag("poll.count", ExportScope = TagExportScope.ExportAll, Description = "Poll count")] | ||
| public const string PollCount = "poll.count"; | ||
|
|
||
| [TelemetryTag("poll.latency_ms", ExportScope = TagExportScope.ExportAll, Description = "Poll latency")] | ||
| public const string PollLatencyMs = "poll.latency_ms"; | ||
|
|
||
| // Operation Type | ||
| [TelemetryTag("db.operation", ExportScope = TagExportScope.ExportAll, Description = "Operation type")] | ||
| public const string DbOperation = "db.operation"; | ||
|
|
||
| [TelemetryTag("db.statement", ExportScope = TagExportScope.ExportLocal, Description = "SQL statement")] | ||
| public const string DbStatement = "db.statement"; | ||
|
|
||
| [TelemetryTag("db.catalog", ExportScope = TagExportScope.ExportLocal, Description = "Catalog name")] | ||
| public const string DbCatalog = "db.catalog"; | ||
|
|
||
| [TelemetryTag("db.schema", ExportScope = TagExportScope.ExportLocal, Description = "Schema name")] | ||
| public const string DbSchema = "db.schema"; | ||
|
|
||
| public static IReadOnlyCollection<string> GetDatabricksExportTags() | ||
| { | ||
| return new HashSet<string> | ||
| { | ||
| StatementId, | ||
| SessionId, | ||
| ResultFormat, | ||
| ResultChunkCount, | ||
| ResultBytesDownloaded, | ||
| ResultCompressionEnabled, | ||
| ResultRowCount, | ||
| PollCount, | ||
| PollLatencyMs, | ||
| DbOperation | ||
| }; | ||
| } | ||
| } | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Does this plug into existing Telemetry exporter framework? Does defining this single method work out of the box?
Take a look at FileExporter for example.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The single-method interface is sufficient because batching and aggregation happen in
MetricsAggregatorbefore reaching the exporter.FileExporter (OpenTelemetry-based):
Activity → OpenTelemetry SDK → BaseExporter → Local File
DatabricksTelemetryExporter (Custom aggregation):
Activity → DatabricksActivityListener → MetricsAggregator → IDatabricksTelemetryExporter → Databricks Service