Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.Net.Http;
using System.Text;
using System.Text.Json;
using System.Threading;
using System.Threading.Tasks;

namespace Apache.Arrow.Adbc.Drivers.Databricks.Telemetry
{
internal sealed class DatabricksTelemetryExporter : IDatabricksTelemetryExporter
{
private const string TelemetryEndpoint = "/telemetry-ext";

private readonly HttpClient _httpClient;
private readonly string _hostUrl;

public DatabricksTelemetryExporter(HttpClient httpClient, string hostUrl)
{
_httpClient = httpClient ?? throw new ArgumentNullException(nameof(httpClient));
_hostUrl = !string.IsNullOrEmpty(hostUrl) ? hostUrl : throw new ArgumentException("Host URL cannot be null or empty.", nameof(hostUrl));
}

public async Task ExportAsync(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this plug into existing Telemetry exporter framework? Does defining this single method work out of the box?
Take a look at FileExporter for example.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The single-method interface is sufficient because batching and aggregation happen in MetricsAggregator before reaching the exporter.

FileExporter (OpenTelemetry-based):
Activity → OpenTelemetry SDK → BaseExporter → Local File

DatabricksTelemetryExporter (Custom aggregation):
Activity → DatabricksActivityListener → MetricsAggregator → IDatabricksTelemetryExporter → Databricks Service

IReadOnlyList<TelemetryMetric> metrics,
CancellationToken cancellationToken = default)
{
if (metrics == null || metrics.Count == 0)
{
return;
}

try
{
await SendMetricsAsync(metrics, cancellationToken);
}
catch (Exception ex)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should there be a retry mechanism for these?

{
Debug.WriteLine($"Telemetry export failed: {ex.Message}");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this eventually hook into tracing instead?

}
}

private async Task SendMetricsAsync(
IReadOnlyList<TelemetryMetric> metrics,
CancellationToken cancellationToken)
{
string fullUrl = new UriBuilder(_hostUrl) { Path = TelemetryEndpoint }.ToString();
string json = SerializeMetrics(metrics);
var content = new StringContent(json, Encoding.UTF8, "application/json");
var request = new HttpRequestMessage(HttpMethod.Post, fullUrl) { Content = content };
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is an unauthenticated endpoint?

HttpResponseMessage response = await _httpClient.SendAsync(request, cancellationToken);
response.EnsureSuccessStatusCode();
}

private string SerializeMetrics(IReadOnlyList<TelemetryMetric> metrics)
{
var payload = new { metrics = metrics };
return JsonSerializer.Serialize(payload, new JsonSerializerOptions
{
PropertyNamingPolicy = JsonNamingPolicy.CamelCase
});
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks;

namespace Apache.Arrow.Adbc.Drivers.Databricks.Telemetry
{
/// <summary>
/// Interface for exporting telemetry metrics to Databricks telemetry service.
/// Implementations must never throw exceptions.
/// </summary>
public interface IDatabricksTelemetryExporter
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
public interface IDatabricksTelemetryExporter
internal interface IDatabricksTelemetryExporter

{
Task ExportAsync(
IReadOnlyList<TelemetryMetric> metrics,
CancellationToken cancellationToken = default);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

using System.Collections.Generic;

namespace Apache.Arrow.Adbc.Drivers.Databricks.Telemetry.TagDefinitions
{
/// <summary>
/// Tag definitions for Connection.Open events.
/// </summary>
internal static class ConnectionOpenEvent
{
public const string EventName = "Connection.Open";

// Identity
[TelemetryTag("workspace.id", ExportScope = TagExportScope.ExportAll, Required = true, Description = "Workspace ID")]
public const string WorkspaceId = "workspace.id";

[TelemetryTag("session.id", ExportScope = TagExportScope.ExportAll, Required = true, Description = "Session ID")]
public const string SessionId = "session.id";

// Driver Configuration
[TelemetryTag("driver.version", ExportScope = TagExportScope.ExportAll, Description = "Driver version")]
public const string DriverVersion = "driver.version";

[TelemetryTag("driver.os", ExportScope = TagExportScope.ExportAll, Description = "Operating system")]
public const string DriverOS = "driver.os";

[TelemetryTag("driver.runtime", ExportScope = TagExportScope.ExportAll, Description = ".NET runtime")]
public const string DriverRuntime = "driver.runtime";

// Feature Flags
[TelemetryTag("feature.cloudfetch", ExportScope = TagExportScope.ExportAll, Description = "CloudFetch enabled")]
public const string FeatureCloudFetch = "feature.cloudfetch";

[TelemetryTag("feature.lz4", ExportScope = TagExportScope.ExportAll, Description = "LZ4 compression enabled")]
public const string FeatureLz4 = "feature.lz4";

[TelemetryTag("feature.direct_results", ExportScope = TagExportScope.ExportAll, Description = "Direct results enabled")]
public const string FeatureDirectResults = "feature.direct_results";

[TelemetryTag("feature.multiple_catalog", ExportScope = TagExportScope.ExportAll, Description = "Multiple catalog enabled")]
public const string FeatureMultipleCatalog = "feature.multiple_catalog";

[TelemetryTag("feature.trace_propagation", ExportScope = TagExportScope.ExportAll, Description = "Trace propagation enabled")]
public const string FeatureTracePropagation = "feature.trace_propagation";

[TelemetryTag("server.address", ExportScope = TagExportScope.ExportLocal, Description = "Server address")]
public const string ServerAddress = "server.address";

/// <summary>
/// Returns tags allowed for Databricks export (privacy filter).
/// </summary>
public static IReadOnlyCollection<string> GetDatabricksExportTags()
{
return new HashSet<string>
{
WorkspaceId,
SessionId,
DriverVersion,
DriverOS,
DriverRuntime,
FeatureCloudFetch,
FeatureLz4,
FeatureDirectResults,
FeatureMultipleCatalog,
FeatureTracePropagation
};
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

using System.Collections.Generic;

namespace Apache.Arrow.Adbc.Drivers.Databricks.Telemetry.TagDefinitions
{
/// <summary>
/// Tag definitions for Error events.
/// </summary>
internal static class ErrorEvent
{
public const string EventName = "Error";

// Error Classification
[TelemetryTag("error.type", ExportScope = TagExportScope.ExportAll, Required = true, Description = "Error type")]
public const string ErrorType = "error.type";

[TelemetryTag("http.status_code", ExportScope = TagExportScope.ExportAll, Description = "HTTP status code")]
public const string HttpStatusCode = "http.status_code";

[TelemetryTag("db.sql_state", ExportScope = TagExportScope.ExportAll, Description = "SQL state")]
public const string DbSqlState = "db.sql_state";

[TelemetryTag("error.operation", ExportScope = TagExportScope.ExportAll, Description = "Failed operation")]
public const string ErrorOperation = "error.operation";

[TelemetryTag("error.retried", ExportScope = TagExportScope.ExportAll, Description = "Was retried")]
public const string ErrorRetried = "error.retried";

[TelemetryTag("error.retry_count", ExportScope = TagExportScope.ExportAll, Description = "Retry count")]
public const string ErrorRetryCount = "error.retry_count";

[TelemetryTag("error.message", ExportScope = TagExportScope.ExportLocal, Description = "Error message")]
public const string ErrorMessage = "error.message";

[TelemetryTag("error.stack_trace", ExportScope = TagExportScope.ExportLocal, Description = "Stack trace")]
public const string ErrorStackTrace = "error.stack_trace";

/// <summary>
/// Returns tags allowed for Databricks export (privacy filter).
/// </summary>
public static IReadOnlyCollection<string> GetDatabricksExportTags()
{
return new HashSet<string>
{
ErrorType,
HttpStatusCode,
DbSqlState,
ErrorOperation,
ErrorRetried,
ErrorRetryCount
};
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

using System.Collections.Generic;

namespace Apache.Arrow.Adbc.Drivers.Databricks.Telemetry.TagDefinitions
{
/// <summary>
/// Tag definitions for Statement execution events.
/// </summary>
internal static class StatementExecutionEvent
{
public const string EventName = "Statement.Execute";

// Identity
[TelemetryTag("statement.id", ExportScope = TagExportScope.ExportAll, Required = true, Description = "Statement ID")]
public const string StatementId = "statement.id";

[TelemetryTag("session.id", ExportScope = TagExportScope.ExportAll, Required = true, Description = "Session ID")]
public const string SessionId = "session.id";

// Result Metrics
[TelemetryTag("result.format", ExportScope = TagExportScope.ExportAll, Description = "Result format")]
public const string ResultFormat = "result.format";

[TelemetryTag("result.chunk_count", ExportScope = TagExportScope.ExportAll, Description = "Chunk count")]
public const string ResultChunkCount = "result.chunk_count";

[TelemetryTag("result.bytes_downloaded", ExportScope = TagExportScope.ExportAll, Description = "Bytes downloaded")]
public const string ResultBytesDownloaded = "result.bytes_downloaded";

[TelemetryTag("result.compression_enabled", ExportScope = TagExportScope.ExportAll, Description = "Compression enabled")]
public const string ResultCompressionEnabled = "result.compression_enabled";

[TelemetryTag("result.row_count", ExportScope = TagExportScope.ExportAll, Description = "Row count")]
public const string ResultRowCount = "result.row_count";

// Polling Metrics
[TelemetryTag("poll.count", ExportScope = TagExportScope.ExportAll, Description = "Poll count")]
public const string PollCount = "poll.count";

[TelemetryTag("poll.latency_ms", ExportScope = TagExportScope.ExportAll, Description = "Poll latency")]
public const string PollLatencyMs = "poll.latency_ms";

// Operation Type
[TelemetryTag("db.operation", ExportScope = TagExportScope.ExportAll, Description = "Operation type")]
public const string DbOperation = "db.operation";

[TelemetryTag("db.statement", ExportScope = TagExportScope.ExportLocal, Description = "SQL statement")]
public const string DbStatement = "db.statement";

[TelemetryTag("db.catalog", ExportScope = TagExportScope.ExportLocal, Description = "Catalog name")]
public const string DbCatalog = "db.catalog";

[TelemetryTag("db.schema", ExportScope = TagExportScope.ExportLocal, Description = "Schema name")]
public const string DbSchema = "db.schema";

public static IReadOnlyCollection<string> GetDatabricksExportTags()
{
return new HashSet<string>
{
StatementId,
SessionId,
ResultFormat,
ResultChunkCount,
ResultBytesDownloaded,
ResultCompressionEnabled,
ResultRowCount,
PollCount,
PollLatencyMs,
DbOperation
};
}
}
}
Loading
Loading