diff --git a/csharp/CloudFetch-Memory-Analysis.md b/csharp/CloudFetch-Memory-Analysis.md new file mode 100644 index 0000000000..18e78852de --- /dev/null +++ b/csharp/CloudFetch-Memory-Analysis.md @@ -0,0 +1,132 @@ +# CloudFetch Memory Buffer Size Analysis + +## ๐Ÿšจ **Root Cause of Memory Hang** + +**Problem:** Memory acquisition is hanging because we're requesting more memory than available. + +## ๐Ÿ“Š **Current Configuration Analysis** + +### **Concurrent Download Settings:** +- **Parallel Downloads:** 10 (increased from 3) +- **Download Queue:** 20 files +- **Result Queue:** 50 files +- **Memory Buffer:** ~~200MB~~ โ†’ **500MB** (FIXED) + +### **File Size Analysis** (based on user logs): +- **Typical File Size:** ~21MB per file +- **File Format:** LZ4 compressed Arrow files +- **Decompression Ratio:** ~2-3x expansion (21MB compressed โ†’ ~50MB decompressed) + +## ๐Ÿงฎ **Memory Requirement Calculations** + +### **Scenario 1: Compressed Files Only** +``` +10 concurrent downloads ร— 21MB per file = 210MB minimum ++ 10% safety buffer = 231MB needed +``` +**Result:** 200MB was insufficient โ†’ **DEADLOCK** โŒ + +### **Scenario 2: Decompressed Files (LZ4)** +``` +10 concurrent downloads ร— 50MB decompressed = 500MB minimum ++ 20% safety buffer = 600MB needed +``` + +### **Scenario 3: Mixed State (Realistic)** +``` +- 5 files downloading (compressed): 5 ร— 21MB = 105MB +- 5 files decompressed (buffered): 5 ร— 50MB = 250MB +Total: 355MB + safety buffer = ~400MB needed +``` + +## โšก **Optimized Configuration** + +### **New Memory Buffer Size: 500MB** +**Reasoning:** +- **Minimum Required:** 210MB (compressed) to 500MB (decompressed) +- **Safety Buffer:** 20% for memory fragmentation, temporary objects +- **Performance Buffer:** Additional headroom for efficient pipeline flow +- **Total:** 500MB provides good balance between memory usage and performance + +### **Alternative Configurations:** + +| Scenario | Concurrent Downloads | File Size | Memory Needed | Recommended Buffer | +|----------|---------------------|-----------|---------------|-------------------| +| **Conservative** | 5 | 21MB | 105MB | 200MB | +| **Balanced** | 10 | 21MB | 210MB | **500MB** โœ… | +| **Aggressive** | 15 | 21MB | 315MB | 750MB | +| **With LZ4** | 10 | 50MB (decompressed) | 500MB | 1000MB | + +## ๐Ÿ”ง **Implementation Changes** + +### **1. Fixed Default Memory Buffer:** +```csharp +// CloudFetchDownloadManager.cs +private const int DefaultMemoryBufferSizeMB = 500; // Was 200MB +``` + +### **2. Added Memory Debugging:** +```csharp +// CloudFetchMemoryBufferManager.cs +WriteMemoryDebug($"MEMORY-REQUEST: Requesting {size/1024/1024:F1}MB, Current: {UsedMemory/1024/1024:F1}MB / {_maxMemory/1024/1024:F1}MB"); +``` + +### **3. Configuration Override Available:** +```csharp +// Connection string parameter +adbc.databricks.cloudfetch.memory_buffer_size_mb=500 +``` + +## ๐ŸŽฏ **Expected Results After Fix** + +### **Before (200MB):** +``` +[MEMORY-REQUEST] Requesting 21.0MB, Current: 189.0MB / 200.0MB +[MEMORY-BLOCKED] Attempt #100 - Still waiting for 21.0MB - MEMORY PRESSURE! +``` + +### **After (500MB):** +``` +[MEMORY-REQUEST] Requesting 21.0MB, Current: 210.0MB / 500.0MB +[MEMORY-ACQUIRED] Successfully acquired 21.0MB after 0 attempts, New Total: 231.0MB / 500.0MB +``` + +## ๐Ÿ” **Monitoring & Validation** + +### **Memory Debug Log Location:** +``` +%APPDATA%/adbc-memory-debug.log +``` + +### **Key Metrics to Monitor:** +- **Acquisition Success Rate:** Should be near 100% +- **Memory Utilization:** Should stay below 80% (400MB of 500MB) +- **Acquisition Latency:** Should be immediate (0 attempts) +- **Release Pattern:** Memory should be freed as files are processed + +## ๐Ÿš€ **Performance Impact** + +### **Memory vs Performance Tradeoff:** +- **200MB:** Frequent blocking, poor concurrency +- **500MB:** Smooth pipeline flow, full 10-thread utilization +- **1000MB:** Minimal blocking, but higher memory footprint + +### **Recommended Settings for Different Use Cases:** + +#### **PowerBI (Memory Constrained):** +``` +adbc.databricks.cloudfetch.memory_buffer_size_mb=300 +adbc.databricks.cloudfetch.parallel_downloads=6 +``` + +#### **Server Applications (High Performance):** +``` +adbc.databricks.cloudfetch.memory_buffer_size_mb=750 +adbc.databricks.cloudfetch.parallel_downloads=12 +``` + +#### **Development/Testing:** +``` +adbc.databricks.cloudfetch.memory_buffer_size_mb=500 # Default +adbc.databricks.cloudfetch.parallel_downloads=10 # Default +``` diff --git a/csharp/PowerBI-Logging-Guide.md b/csharp/PowerBI-Logging-Guide.md new file mode 100644 index 0000000000..1a89b15a23 --- /dev/null +++ b/csharp/PowerBI-Logging-Guide.md @@ -0,0 +1,101 @@ +# PowerBI ADBC Driver Logging Guide + +This guide shows how to enable comprehensive logging when using the ADBC Databricks driver with PowerBI. + +## โšก Quick Start + +**Logging is now automatically enabled** - no connection string changes needed! + +**Log location:** `%LOCALAPPDATA%\Apache.Arrow.Adbc\Traces\` +**Contains:** HTTP requests, SQL execution, CloudFetch downloads, TCP connections, memory usage, errors + +## ๐ŸŽฏ OpenTelemetry Structured Tracing (Built-in) + +### Connection String Configuration +**No additional configuration needed!** Logging is automatically active. + +Your standard PowerBI connection string: +``` +Server=your-server.com; +Port=443; +UID=token; +PWD=your-access-token +``` + +### Log File Location +Logs are automatically written to: +- **Windows**: `%LOCALAPPDATA%\Apache.Arrow.Adbc\Traces\` +- **macOS**: `$HOME/Library/Application Support/Apache.Arrow.Adbc/Traces/` +- **Linux**: `$HOME/.local/share/Apache.Arrow.Adbc/Traces/` + +### Log File Format +Files are named: `apache.arrow.adbc.drivers.databricks-YYYY-MM-DD-HH-mm-ss-fff-processid.log` + +Example: `apache.arrow.adbc.drivers.databricks-2025-08-20-14-30-15-123-12345.log` + +### Example PowerBI Connection Strings + +#### Standard Connection: +``` +Server=your-server.com; +Port=443; +UID=token; +PWD=your-token +``` + +#### With CloudFetch Performance Tuning: +``` +Server=your-server.com; +Port=443; +UID=token; +PWD=your-token; +adbc.databricks.use_cloud_fetch=true; +adbc.databricks.cloudfetch.parallel_downloads=10; +adbc.databricks.cloudfetch.memory_buffer_mb=700 +``` + +## ๐Ÿ“„ Log Content + +### OpenTelemetry Traces Include: +- **HTTP Requests**: Request/response details, headers, timing +- **SQL Execution**: Query planning, execution traces, result processing +- **Connection Lifecycle**: Connection establishment, authentication, teardown +- **CloudFetch Operations**: Download queue management, TCP connection counts, memory buffer usage +- **Performance Metrics**: Download timing, thread assignments, ServicePoint connection limits +- **Error Analysis**: Stack traces, exception details, failure context + +## ๐Ÿ” Troubleshooting + +### If logs aren't appearing: +1. **Check permissions**: Ensure PowerBI can write to `%LOCALAPPDATA%\Apache.Arrow.Adbc\Traces\` +2. **Check Windows Event Log**: Look for ADBC-related errors +3. **Verify directory exists**: Create the Traces directory manually if needed +4. **Check PowerBI version**: Ensure you're using a compatible ADBC driver version + +### Custom log file location: +You can customize the log location by setting environment variables: +``` +ADBC_TRACE_LOCATION=C:\your\custom\path\ +``` + +## ๐Ÿ“Š Log Analysis for CloudFetch Issues + +Look for these trace patterns: + +### Connection Degradation: +```json +{"message": "ServicePoint ConnectionLimit OK: 10 (needed 10)", "level": "Information"} +{"message": "SLOW ADD File took 8000ms (QUEUE FULL)", "level": "Warning"} +``` + +### Memory Pressure: +```json +{"message": "File acquired 680MB memory", "level": "Information"} +{"message": "Memory usage: 680MB / 700MB (97% used)", "level": "Warning"} +``` + +### Performance Analysis: +```json +{"message": "Thread 15 downloading at 14:30:15.123", "level": "Information"} +{"message": "Queue count: ~45/50 (near capacity)", "level": "Information"} +``` diff --git a/csharp/TestCloudFetchTracing.cs b/csharp/TestCloudFetchTracing.cs new file mode 100644 index 0000000000..41eecde534 --- /dev/null +++ b/csharp/TestCloudFetchTracing.cs @@ -0,0 +1,121 @@ +/* + * Test program to verify CloudFetchDownloader and CloudFetchReader tracing + */ + +using System; +using System.Collections.Generic; +using System.Collections.Concurrent; +using System.Diagnostics; +using System.Net.Http; +using System.Threading; +using Apache.Arrow.Adbc.Drivers.Databricks; +using Apache.Arrow.Adbc.Drivers.Databricks.Reader.CloudFetch; +using Apache.Arrow.Adbc.Tracing; +using Moq; + +namespace TestCloudFetchTracing +{ + class Program + { + static void Main(string[] args) + { + Console.WriteLine("=== CloudFetch Tracing Verification ==="); + + // Check environment + var tracesExporter = Environment.GetEnvironmentVariable("OTEL_TRACES_EXPORTER"); + Console.WriteLine($"OTEL_TRACES_EXPORTER = '{tracesExporter ?? "(not set)"}'"); + + if (tracesExporter != "adbcfile") + { + Console.WriteLine("โš ๏ธ WARNING: Expected OTEL_TRACES_EXPORTER=adbcfile"); + Console.WriteLine(" Please set: export OTEL_TRACES_EXPORTER=adbcfile"); + Console.WriteLine(" Or in PowerShell: $env:OTEL_TRACES_EXPORTER='adbcfile'"); + Console.WriteLine(); + } + + try + { + Console.WriteLine("=== Testing DatabricksConnection Auto-Tracing ==="); + + // Create DatabricksConnection - this triggers DatabricksConnection's static constructor + var connectionProperties = new Dictionary + { + ["adbc.databricks.hostname"] = "test.databricks.com", + ["adbc.databricks.path"] = "/sql/1.0/endpoints/test", + ["adbc.databricks.token"] = "fake-token-for-testing" + }; + + Console.WriteLine("Creating DatabricksConnection to trigger auto-initialization..."); + + var connection = new DatabricksConnection(connectionProperties); + Console.WriteLine($"โœ… DatabricksConnection created with assembly: {connection.AssemblyName}"); + Console.WriteLine("โœ… DatabricksConnection static constructor should have auto-initialized TracerProvider"); + + Console.WriteLine("\n=== Testing CloudFetch Components Use Connection Tracing ==="); + + // Create CloudFetchDownloader - should work with global TracerProvider + var downloadQueue = new BlockingCollection(10); + var resultQueue = new BlockingCollection(10); + var mockMemoryManager = new Mock(); + var httpClient = new HttpClient(); + var mockResultFetcher = new Mock(); + + var downloader = new CloudFetchDownloader( + downloadQueue, + resultQueue, + mockMemoryManager.Object, + httpClient, + mockResultFetcher.Object, + 3, // maxParallelDownloads + false // isLz4Compressed + ); + + // Verify downloader forces TracerProvider initialization and uses matching assembly name + Console.WriteLine($"CloudFetchDownloader.AssemblyName: '{downloader.AssemblyName}'"); + Console.WriteLine($"CloudFetchDownloader.Trace.ActivitySourceName: '{downloader.Trace.ActivitySourceName}'"); + Console.WriteLine("โœ… CloudFetchDownloader forces DatabricksConnection static constructor to run first"); + Console.WriteLine("โœ… TracerProvider initialized before ActivitySource creation = activities captured!"); + + Console.WriteLine("\n=== Testing Activity Creation ==="); + + // Test creating an activity - this should be captured by the TracerProvider + downloader.TraceActivity(activity => + { + if (activity != null) + { + activity.SetTag("test.cloudfetch.downloader", "activity_test"); + activity.AddEvent("Test event for CloudFetchDownloader"); + Console.WriteLine($"โœ… Activity created: {activity.DisplayName}"); + Console.WriteLine($" Activity ID: {activity.Id}"); + Console.WriteLine($" Activity Source: {activity.Source.Name}"); + } + else + { + Console.WriteLine("โŒ Activity is null - TracerProvider may not be listening"); + } + }, "TestActivity"); + + // Clean up + downloader.Dispose(); + httpClient.Dispose(); + connection.Dispose(); + + Console.WriteLine("\n=== Check Output Files ==="); + Console.WriteLine("If the DatabricksConnection auto-tracing is working correctly, you should see:"); + Console.WriteLine("1. Activity data in trace files (look for .log files in ~/Library/Application Support/Apache.Arrow.Adbc/Traces/)"); + Console.WriteLine("2. The test activity with tag 'test.cloudfetch.downloader'"); + Console.WriteLine("3. All Databricks activities (DatabricksConnection, CloudFetchDownloader, CloudFetchReader)"); + Console.WriteLine("4. Future CloudFetch activities from real usage automatically captured"); + Console.WriteLine("5. Silent operation - ExportersBuilder handles everything in the background"); + + } + catch (Exception ex) + { + Console.WriteLine($"โŒ Error during tracing test: {ex.Message}"); + Console.WriteLine($"Stack trace: {ex.StackTrace}"); + } + + Console.WriteLine("\n=== Test Complete ==="); + } + } +} diff --git a/csharp/TestCloudFetchTracing.csproj b/csharp/TestCloudFetchTracing.csproj new file mode 100644 index 0000000000..e49fdd9cd7 --- /dev/null +++ b/csharp/TestCloudFetchTracing.csproj @@ -0,0 +1,17 @@ + + + + Exe + net8.0 + enable + + + + + + + + + + + diff --git a/csharp/src/Drivers/Databricks/Apache.Arrow.Adbc.Drivers.Databricks.csproj b/csharp/src/Drivers/Databricks/Apache.Arrow.Adbc.Drivers.Databricks.csproj index 626957491f..f7d6d22cd4 100644 --- a/csharp/src/Drivers/Databricks/Apache.Arrow.Adbc.Drivers.Databricks.csproj +++ b/csharp/src/Drivers/Databricks/Apache.Arrow.Adbc.Drivers.Databricks.csproj @@ -16,5 +16,6 @@ + diff --git a/csharp/src/Drivers/Databricks/DatabricksConnection.cs b/csharp/src/Drivers/Databricks/DatabricksConnection.cs index 2739197e8f..035e256913 100644 --- a/csharp/src/Drivers/Databricks/DatabricksConnection.cs +++ b/csharp/src/Drivers/Databricks/DatabricksConnection.cs @@ -19,8 +19,12 @@ using System.Collections.Generic; using System.Diagnostics; using System.Linq; +using System.Net; using System.Net.Http; using System.Net.Http.Headers; +#if NET5_0_OR_GREATER +using System.Net.Sockets; +#endif using System.Threading; using System.Threading.Tasks; using Apache.Arrow.Adbc.Drivers.Apache; @@ -29,8 +33,10 @@ using Apache.Arrow.Adbc.Drivers.Apache.Spark; using Apache.Arrow.Adbc.Drivers.Databricks.Auth; using Apache.Arrow.Adbc.Drivers.Databricks.Reader; +using Apache.Arrow.Adbc.Telemetry.Traces.Exporters; using Apache.Arrow.Ipc; using Apache.Hive.Service.Rpc.Thrift; +using OpenTelemetry.Trace; using Thrift.Protocol; namespace Apache.Arrow.Adbc.Drivers.Databricks @@ -40,6 +46,126 @@ internal class DatabricksConnection : SparkHttpConnection internal static new readonly string s_assemblyName = ApacheUtility.GetAssemblyName(typeof(DatabricksConnection)); internal static new readonly string s_assemblyVersion = ApacheUtility.GetAssemblyVersion(typeof(DatabricksConnection)); + // SHARED ActivitySource for all CloudFetch components to use the same instance as TracerProvider + internal static readonly ActivitySource s_sharedActivitySource = new ActivitySource(s_assemblyName, s_assemblyVersion); + + private static TracerProvider? s_tracerProvider; + + static DatabricksConnection() + { + // Auto-initialize TracerProvider for Databricks using ExportersBuilder + try + { + var builder = ExportersBuilder.Build(s_assemblyName, s_assemblyVersion, addDefaultExporters: true).Build(); + // TEMPORARILY HARDCODE adbcfile for debugging + s_tracerProvider = builder.Activate("adbcfile", out string? exporterName); + + // Write debug info to file for PowerBI scenarios + var debugMessage = $"[{DateTime.UtcNow:yyyy-MM-dd HH:mm:ss}] DATABRICKS-TRACING: TracerProvider initialized: {s_tracerProvider != null}, Exporter: {exporterName}"; + var assemblyMessage = $"[{DateTime.UtcNow:yyyy-MM-dd HH:mm:ss}] DATABRICKS-TRACING: TracerProvider listening for AssemblyName: '{s_assemblyName}', Version: '{s_assemblyVersion}'"; + + // Show user where to look for trace files + var traceLocation = System.IO.Path.Combine( + Environment.GetFolderPath(Environment.SpecialFolder.LocalApplicationData), + "Apache.Arrow.Adbc", + "Traces" + ); + var tracePattern = $"{s_assemblyName}-trace-*.log"; + var locationMessage = $"[{DateTime.UtcNow:yyyy-MM-dd HH:mm:ss}] DATABRICKS-TRACING: OpenTelemetry trace files should be at: '{traceLocation}' with pattern '{tracePattern}'"; + + // Test directory permissions + var permissionMessage = TestDirectoryPermissions(traceLocation); + + WriteDebugToFile(debugMessage); + WriteDebugToFile(assemblyMessage); + WriteDebugToFile(locationMessage); + WriteDebugToFile(permissionMessage); + + // Test TracerProvider with simple activity right after initialization + TestTracerProviderExport(); + } + catch (Exception ex) + { + var errorMessage = $"[{DateTime.UtcNow:yyyy-MM-dd HH:mm:ss}] DATABRICKS-TRACING: Failed to initialize TracerProvider: {ex.Message}\nStack: {ex.StackTrace}"; + WriteDebugToFile(errorMessage); + // Don't throw - tracing is optional + } + } + + private static void WriteDebugToFile(string message) + { + try + { + var debugFile = System.IO.Path.Combine( + Environment.GetFolderPath(Environment.SpecialFolder.LocalApplicationData), + "adbc-databricks-debug.log" + ); + System.IO.File.AppendAllText(debugFile, message + Environment.NewLine); + } + catch + { + // Ignore file write errors + } + } + + private static string TestDirectoryPermissions(string traceLocation) + { + try + { + var timestamp = DateTime.UtcNow.ToString("yyyy-MM-dd HH:mm:ss"); + + // Test directory creation + if (!System.IO.Directory.Exists(traceLocation)) + { + System.IO.Directory.CreateDirectory(traceLocation); + return $"[{timestamp}] DATABRICKS-TRACING: Created trace directory: '{traceLocation}'"; + } + + // Test file write permissions + var testFile = System.IO.Path.Combine(traceLocation, "adbc-test-write.tmp"); + System.IO.File.WriteAllText(testFile, "test"); + System.IO.File.Delete(testFile); + + return $"[{timestamp}] DATABRICKS-TRACING: Directory permissions OK: '{traceLocation}'"; + } + catch (Exception ex) + { + return $"[{DateTime.UtcNow:yyyy-MM-dd HH:mm:ss}] DATABRICKS-TRACING: Directory permission ERROR: '{traceLocation}' - {ex.Message}"; + } + } + + private static void TestTracerProviderExport() + { + try + { + var timestamp = DateTime.UtcNow.ToString("yyyy-MM-dd HH:mm:ss"); + WriteDebugToFile($"[{timestamp}] DATABRICKS-TRACING: Testing TracerProvider export with simple activity..."); + + // Use the shared ActivitySource that TracerProvider is listening for + using var activity = s_sharedActivitySource.StartActivity("DatabricksConnection-Test"); + + if (activity != null) + { + WriteDebugToFile($"[{timestamp}] DATABRICKS-TRACING: Test activity created: ID='{activity.Id}', Source='{activity.Source?.Name}'"); + activity.SetTag("test.source", "DatabricksConnection"); + activity.SetTag("test.timestamp", timestamp); + //activity.AddEvent("DatabricksConnection TracerProvider test event"); + activity.SetStatus(System.Diagnostics.ActivityStatusCode.Ok); + WriteDebugToFile($"[{timestamp}] DATABRICKS-TRACING: Test activity completed with status: {activity.Status}"); + } + else + { + WriteDebugToFile($"[{timestamp}] DATABRICKS-TRACING: Test activity was NULL - TracerProvider not listening!"); + } + + WriteDebugToFile($"[{timestamp}] DATABRICKS-TRACING: TracerProvider test completed - should be exported within 5 seconds"); + } + catch (Exception ex) + { + WriteDebugToFile($"[{DateTime.UtcNow:yyyy-MM-dd HH:mm:ss}] DATABRICKS-TRACING: TracerProvider test FAILED: {ex.Message}"); + } + } + private bool _applySSPWithQueries = false; private bool _enableDirectResults = true; private bool _enableMultipleCatalogSupport = true; @@ -445,10 +571,50 @@ internal override IArrowArrayStream NewReader(T statement, Schema schema, IRe isLz4Compressed = metadataResp.Lz4Compressed; } - HttpClient httpClient = new HttpClient(HiveServer2TlsImpl.NewHttpClientHandler(TlsOptions, _proxyConfigurator)); + // Create HttpClient with optimized connection limits for CloudFetch concurrent downloads + HttpClientHandler handler = HiveServer2TlsImpl.NewHttpClientHandler(TlsOptions, _proxyConfigurator); + ConfigureHttpClientForConcurrency(handler, 15); // Allow 15+ connections to handle 10 concurrent downloads with headroom + HttpClient httpClient = new HttpClient(handler); return new DatabricksCompositeReader(databricksStatement, schema, response, isLz4Compressed, httpClient); } + /// + /// Configures HttpClient handler for high concurrency CloudFetch downloads + /// + /// The HttpClientHandler to configure + /// Minimum number of concurrent connections to support + private static void ConfigureHttpClientForConcurrency(HttpClientHandler handler, int minConnections) + { + + +#if NETFRAMEWORK + // For .NET Framework, configure ServicePointManager and per-host ServicePoint limits + try + { + // Set global default connection limit + int currentDefault = System.Net.ServicePointManager.DefaultConnectionLimit; + if (currentDefault < minConnections) + { + System.Net.ServicePointManager.DefaultConnectionLimit = minConnections; + WriteDebugToFile($"[{DateTime.UtcNow:yyyy-MM-dd HH:mm:ss}] HTTP-CLIENT: ServicePointManager.DefaultConnectionLimit updated from {currentDefault} to {minConnections}"); + } + else + { + WriteDebugToFile($"[{DateTime.UtcNow:yyyy-MM-dd HH:mm:ss}] HTTP-CLIENT: ServicePointManager.DefaultConnectionLimit already sufficient: {currentDefault}"); + } + } + catch (Exception ex) + { + WriteDebugToFile($"[{DateTime.UtcNow:yyyy-MM-dd HH:mm:ss}] HTTP-CLIENT: Failed to set ServicePointManager.DefaultConnectionLimit: {ex.Message}"); + } +#endif + +#if !NET5_0_OR_GREATER || NETFRAMEWORK + // For .NET Standard 2.0 / .NET Framework, also try reflection for per-server limits + WriteDebugToFile($"[{DateTime.UtcNow:yyyy-MM-dd HH:mm:ss}] HTTP-CLIENT: CloudFetch HttpClient configured for {minConnections}+ concurrent connections"); +#endif + } + internal override SchemaParser SchemaParser => new DatabricksSchemaParser(); public override AdbcStatement CreateStatement() @@ -721,6 +887,8 @@ private string GetHost() return CatalogName; } + + protected override void Dispose(bool disposing) { if (disposing) diff --git a/csharp/src/Drivers/Databricks/DatabricksParameters.cs b/csharp/src/Drivers/Databricks/DatabricksParameters.cs index abb7cfd316..5d4ea474d1 100644 --- a/csharp/src/Drivers/Databricks/DatabricksParameters.cs +++ b/csharp/src/Drivers/Databricks/DatabricksParameters.cs @@ -109,7 +109,7 @@ public class DatabricksParameters : SparkParameters /// /// Maximum number of parallel downloads for CloudFetch operations. - /// Default value is 3 if not specified. + /// Default value is 10 if not specified. /// public const string CloudFetchParallelDownloads = "adbc.databricks.cloudfetch.parallel_downloads"; @@ -121,10 +121,18 @@ public class DatabricksParameters : SparkParameters /// /// Maximum memory buffer size in MB for CloudFetch prefetched files. - /// Default value is 200MB if not specified. + /// Default value is 1000MB if not specified. /// public const string CloudFetchMemoryBufferSize = "adbc.databricks.cloudfetch.memory_buffer_size_mb"; + /// + /// CloudFetch result queue size - number of downloaded files that can be queued while waiting for batch aggregation. + /// Default value is 75 if not specified. + /// Increase if downstream processing is slow (e.g., PowerBI), decrease to save memory. + /// Queue items are removed immediately when consumed for batch building, not held until full batch completion. + /// + public const string CloudFetchResultQueueSize = "adbc.databricks.cloudfetch.result_queue_size"; + /// /// Whether CloudFetch prefetch functionality is enabled. /// Default value is true if not specified. diff --git a/csharp/src/Drivers/Databricks/Reader/CloudFetch/CloudFetchDownloadManager.cs b/csharp/src/Drivers/Databricks/Reader/CloudFetch/CloudFetchDownloadManager.cs index a7a98648f7..74ef43c562 100644 --- a/csharp/src/Drivers/Databricks/Reader/CloudFetch/CloudFetchDownloadManager.cs +++ b/csharp/src/Drivers/Databricks/Reader/CloudFetch/CloudFetchDownloadManager.cs @@ -30,16 +30,20 @@ namespace Apache.Arrow.Adbc.Drivers.Databricks.Reader.CloudFetch /// internal sealed class CloudFetchDownloadManager : ICloudFetchDownloadManager { - // Default values - private const int DefaultParallelDownloads = 3; + // Default values - optimized for high performance + private const int DefaultParallelDownloads = 5; // Increased from 3 to 10 for better concurrency private const int DefaultPrefetchCount = 2; - private const int DefaultMemoryBufferSizeMB = 200; + private const int DefaultMemoryBufferSizeMB = 500; // Increased from 500MB to 1000MB to handle 10 concurrent downloads + LZ4 decompression private const bool DefaultPrefetchEnabled = true; private const int DefaultFetchBatchSize = 2000000; private const int DefaultTimeoutMinutes = 5; private const int DefaultMaxUrlRefreshAttempts = 3; private const int DefaultUrlExpirationBufferSeconds = 60; + // Queue size constants for optimal performance + private const int DefaultDownloadQueueSize = 10; // Fixed size for download queue + private const int DefaultResultQueueSize = 50; // Conservative default for batch aggregation (configurable via CloudFetchResultQueueSize parameter) + private readonly IHiveServer2Statement _statement; private readonly Schema _schema; private readonly bool _isLz4Compressed; @@ -116,6 +120,20 @@ public CloudFetchDownloadManager( } } + // Parse result queue size + int resultQueueSize = DefaultResultQueueSize; + if (connectionProps.TryGetValue(DatabricksParameters.CloudFetchResultQueueSize, out string? resultQueueSizeStr)) + { + if (int.TryParse(resultQueueSizeStr, out int parsedResultQueueSize) && parsedResultQueueSize > 0) + { + resultQueueSize = parsedResultQueueSize; + } + else + { + throw new ArgumentException($"Invalid value for {DatabricksParameters.CloudFetchResultQueueSize}: {resultQueueSizeStr}. Expected a positive integer."); + } + } + // Parse max retries int maxRetries = 3; if (connectionProps.TryGetValue(DatabricksParameters.CloudFetchMaxRetries, out string? maxRetriesStr)) @@ -189,9 +207,11 @@ public CloudFetchDownloadManager( // Initialize the memory manager _memoryManager = new CloudFetchMemoryBufferManager(memoryBufferSizeMB); - // Initialize the queues with bounded capacity - _downloadQueue = new BlockingCollection(new ConcurrentQueue(), prefetchCount * 2); - _resultQueue = new BlockingCollection(new ConcurrentQueue(), prefetchCount * 2); + // Initialize the queues with optimized fixed capacity for high throughput + _downloadQueue = new BlockingCollection(new ConcurrentQueue(), DefaultDownloadQueueSize); + _resultQueue = new BlockingCollection(new ConcurrentQueue(), resultQueueSize); + + System.Diagnostics.Debug.WriteLine($"CloudFetchDownloadManager: Initialized with download_queue={DefaultDownloadQueueSize}, result_queue={resultQueueSize}, memory_buffer={memoryBufferSizeMB}MB, parallel_downloads={parallelDownloads}"); _httpClient = httpClient; _httpClient.Timeout = TimeSpan.FromMinutes(timeoutMinutes); @@ -245,8 +265,8 @@ internal CloudFetchDownloadManager( // Create empty collections for the test _memoryManager = new CloudFetchMemoryBufferManager(DefaultMemoryBufferSizeMB); - _downloadQueue = new BlockingCollection(new ConcurrentQueue(), 10); - _resultQueue = new BlockingCollection(new ConcurrentQueue(), 10); + _downloadQueue = new BlockingCollection(new ConcurrentQueue(), DefaultDownloadQueueSize); + _resultQueue = new BlockingCollection(new ConcurrentQueue(), DefaultResultQueueSize); _httpClient = new HttpClient(); } @@ -283,16 +303,24 @@ public async Task StartAsync() throw new InvalidOperationException("Download manager is already started."); } + var startTimestamp = DateTime.UtcNow.ToString("yyyy-MM-dd HH:mm:ss.fff"); + System.Diagnostics.Debug.WriteLine($"๐Ÿš€ [{startTimestamp}] CloudFetchDownloadManager: StartAsync called - About to start producer pipeline"); + System.Diagnostics.Debug.WriteLine($"๐Ÿ“Š [{startTimestamp}] CloudFetchDownloadManager: Pipeline startup - download_queue={_downloadQueue.Count}, result_queue={_resultQueue.Count}"); + // Create a new cancellation token source _cancellationTokenSource = new CancellationTokenSource(); // Start the result fetcher + System.Diagnostics.Debug.WriteLine($"๐Ÿ”„ [{startTimestamp}] CloudFetchDownloadManager: Starting result fetcher..."); await _resultFetcher.StartAsync(_cancellationTokenSource.Token).ConfigureAwait(false); // Start the downloader + System.Diagnostics.Debug.WriteLine($"๐Ÿ”„ [{startTimestamp}] CloudFetchDownloadManager: Starting downloader..."); await _downloader.StartAsync(_cancellationTokenSource.Token).ConfigureAwait(false); _isStarted = true; + var completeTimestamp = DateTime.UtcNow.ToString("yyyy-MM-dd HH:mm:ss.fff"); + System.Diagnostics.Debug.WriteLine($"โœ… [{completeTimestamp}] CloudFetchDownloadManager: StartAsync completed - Producer pipeline is now running and will populate result queue"); } /// diff --git a/csharp/src/Drivers/Databricks/Reader/CloudFetch/CloudFetchDownloader.cs b/csharp/src/Drivers/Databricks/Reader/CloudFetch/CloudFetchDownloader.cs index ba65cf3cf7..1cdc2997f3 100644 --- a/csharp/src/Drivers/Databricks/Reader/CloudFetch/CloudFetchDownloader.cs +++ b/csharp/src/Drivers/Databricks/Reader/CloudFetch/CloudFetchDownloader.cs @@ -22,6 +22,8 @@ using System.Net.Http; using System.Threading; using System.Threading.Tasks; +using Apache.Arrow.Adbc.Drivers.Apache; +using Apache.Arrow.Adbc.Drivers.Databricks; using K4os.Compression.LZ4.Streams; namespace Apache.Arrow.Adbc.Drivers.Databricks.Reader.CloudFetch @@ -49,6 +51,33 @@ internal sealed class CloudFetchDownloader : ICloudFetchDownloader private Exception? _error; private readonly object _errorLock = new object(); + // Instance tracking for debugging + private static int s_instanceCounter = 0; + private readonly int _instanceId; + + #region Debug Helpers + + private void WriteCloudFetchDebug(string message) + { + try + { + + var debugFile = System.IO.Path.Combine( + Environment.GetFolderPath(Environment.SpecialFolder.LocalApplicationData), + "adbc-cloudfetch-debug.log" + ); + var timestamped = $"[{DateTime.UtcNow:yyyy-MM-dd HH:mm:ss}] [INSTANCE-{_instanceId}] {message}"; + System.IO.File.AppendAllText(debugFile, timestamped + Environment.NewLine); + + } + catch + { + // Ignore file write errors + } + } + + #endregion + /// /// Initializes a new instance of the class. /// @@ -89,6 +118,12 @@ public CloudFetchDownloader( _urlExpirationBufferSeconds = urlExpirationBufferSeconds > 0 ? urlExpirationBufferSeconds : throw new ArgumentOutOfRangeException(nameof(urlExpirationBufferSeconds)); _downloadSemaphore = new SemaphoreSlim(_maxParallelDownloads, _maxParallelDownloads); _isCompleted = false; + + // Initialize instance tracking + _instanceId = System.Threading.Interlocked.Increment(ref s_instanceCounter); + + WriteCloudFetchDebug($"CloudFetchDownloader constructor: Instance #{_instanceId} initialized with {_maxParallelDownloads} parallel downloads"); + WriteCloudFetchDebug($"CloudFetchDownloader constructor: Instance #{_instanceId} - Semaphore created with initial={_maxParallelDownloads}, max={_maxParallelDownloads}, current_available={_downloadSemaphore.CurrentCount}"); } /// @@ -148,6 +183,32 @@ public async Task StopAsync() /// public async Task GetNextDownloadedFileAsync(CancellationToken cancellationToken) { + var queueCountBefore = _resultQueue.Count; + var takeTimestamp = DateTime.UtcNow.ToString("yyyy-MM-dd HH:mm:ss.fff"); + WriteCloudFetchDebug($"๐Ÿ”„ [{takeTimestamp}] [INSTANCE-{_instanceId}] CloudFetchDownloader.GetNextDownloadedFileAsync called - queue_count_before_take={queueCountBefore}, queue_capacity={_resultQueue.BoundedCapacity}"); + + // CRITICAL DEBUGGING: Check producer status when consumer is waiting + WriteCloudFetchDebug($"๐Ÿ•ต๏ธ [INSTANCE-{_instanceId}] PRODUCER DEBUG - download_queue_count={_downloadQueue.Count}, download_queue_completed={_downloadQueue.IsCompleted}"); + WriteCloudFetchDebug($"๐Ÿ•ต๏ธ [INSTANCE-{_instanceId}] PRODUCER DEBUG - result_queue_count={_resultQueue.Count}, result_queue_completed={_resultQueue.IsCompleted}"); + WriteCloudFetchDebug($"๐Ÿ•ต๏ธ [INSTANCE-{_instanceId}] PRODUCER DEBUG - downloader_completed={_isCompleted}, has_error={HasError}"); + if (HasError && _error != null) + { + WriteCloudFetchDebug($"๐Ÿ•ต๏ธ [INSTANCE-{_instanceId}] PRODUCER DEBUG - error_message={_error.Message}"); + } + + // LIFECYCLE ANALYSIS: Diagnose completion state + if (_isCompleted && _resultQueue.Count == 0) + { + if (_resultQueue.IsCompleted) + { + WriteCloudFetchDebug($"โœ… [INSTANCE-{_instanceId}] LIFECYCLE: Downloader completed AND result queue completed - future calls should return null immediately"); + } + else + { + WriteCloudFetchDebug($"๐Ÿšจ [INSTANCE-{_instanceId}] LIFECYCLE BUG: Downloader completed BUT result queue NOT completed - this causes hangs!"); + } + } + try { // Check if there's an error before trying to take from the queue @@ -156,37 +217,74 @@ public async Task StopAsync() throw new AdbcException("Error in download process", _error ?? new Exception("Unknown error")); } - // Try to take the next result from the queue + var takeStartTimestamp = DateTime.UtcNow.ToString("yyyy-MM-dd HH:mm:ss.fff"); + WriteCloudFetchDebug($"๐Ÿ“ค [{takeStartTimestamp}] [INSTANCE-{_instanceId}] CloudFetchDownloader: About to TAKE item from result queue - queue_count={_resultQueue.Count}, queue_completed={_resultQueue.IsCompleted}"); + + if (_resultQueue.Count == 0) + { + WriteCloudFetchDebug($"โณ [{takeStartTimestamp}] [INSTANCE-{_instanceId}] *** CONSUMER BLOCKING: Queue is empty - Take() will wait until producer adds items ***"); + WriteCloudFetchDebug($"๐Ÿ’ญ [INSTANCE-{_instanceId}] CONSUMER BLOCKING ANALYSIS:"); + WriteCloudFetchDebug($" ๐Ÿ“ฅ Download Queue: count={_downloadQueue.Count}, completed={_downloadQueue.IsCompleted}"); + WriteCloudFetchDebug($" ๐Ÿ“ค Result Queue: count={_resultQueue.Count}, completed={_resultQueue.IsCompleted}"); + WriteCloudFetchDebug($" ๐Ÿ“Š Downloader: completed={_isCompleted}, has_error={HasError}"); + WriteCloudFetchDebug($" ๐Ÿ”ง Semaphore: available={_downloadSemaphore.CurrentCount}, max={_maxParallelDownloads}"); + + if (_downloadQueue.IsCompleted && _resultQueue.Count == 0 && !_resultQueue.IsCompleted) + { + WriteCloudFetchDebug($"๐Ÿšจ [INSTANCE-{_instanceId}] POTENTIAL DEADLOCK: Download queue completed but result queue not completed and empty!"); + WriteCloudFetchDebug($"๐Ÿšจ [INSTANCE-{_instanceId}] This suggests EndOfResultsGuard was never added to result queue!"); + } + } + + // Try to take the next result from the queue - THIS IS WHERE ITEMS ARE REMOVED FROM QUEUE IDownloadResult result = await Task.Run(() => _resultQueue.Take(cancellationToken), cancellationToken); + var queueCountAfter = _resultQueue.Count; + var takeCompleteTimestamp = DateTime.UtcNow.ToString("yyyy-MM-dd HH:mm:ss.fff"); + WriteCloudFetchDebug($"โœ… [{takeCompleteTimestamp}] [INSTANCE-{_instanceId}] CloudFetchDownloader: ITEM SUCCESSFULLY TAKEN from result queue - queue_before={queueCountBefore}, queue_after={queueCountAfter}, items_removed={(queueCountBefore - queueCountAfter)}, size={result.Size}"); + + if (queueCountBefore == 0) + { + WriteCloudFetchDebug($"๐ŸŽฏ [{takeCompleteTimestamp}] [INSTANCE-{_instanceId}] *** CONSUMER UNBLOCKED: Producer added item to empty queue - consumer can now proceed ***"); + } + // Check if this is the end of results guard if (result == EndOfResultsGuard.Instance) { _isCompleted = true; + WriteCloudFetchDebug("CloudFetchDownloader GetNext: โšช EndOfResultsGuard.Instance encountered - signaling end of results to CloudFetchReader"); + WriteCloudFetchDebug("CloudFetchDownloader GetNext: โšช CloudFetchReader will now return any accumulated batch buffer as final aggregated batch"); + + // CRITICAL FIX: Mark result queue as completed so subsequent calls return immediately + WriteCloudFetchDebug("CloudFetchDownloader GetNext: ๐Ÿ”’ Marking result queue as COMPLETED - future calls will return null immediately"); + _resultQueue.CompleteAdding(); + return null; } + WriteCloudFetchDebug($"๐ŸŽฏ CloudFetchDownloader: Returning item to CloudFetchReader for BATCH BUILDING - size={result.Size}, queue_remaining={_resultQueue.Count}"); + WriteCloudFetchDebug($"๐Ÿ“Š CloudFetchDownloader: *** PROOF: Item was REMOVED from result queue IMMEDIATELY when consumed for batch aggregation ***"); return result; } catch (OperationCanceledException) { - // Cancellation was requested + WriteCloudFetchDebug("CloudFetchDownloader GetNext: Operation cancelled"); return null; } catch (InvalidOperationException) when (_resultQueue.IsCompleted) { - // Queue is completed and empty _isCompleted = true; + WriteCloudFetchDebug("CloudFetchDownloader GetNext: โœ… Queue completed and empty - no more results available"); return null; } catch (AdbcException) { - // Re-throw AdbcExceptions (these are our own errors) + WriteCloudFetchDebug("CloudFetchDownloader GetNext: ADBC error occurred"); throw; } catch (Exception ex) { - // If there's an error, set the error state and propagate it + WriteCloudFetchDebug($"CloudFetchDownloader GetNext: Exception - {ex.Message}"); SetError(ex); throw; } @@ -196,6 +294,31 @@ private async Task DownloadFilesAsync(CancellationToken cancellationToken) { await Task.Yield(); + var startTimestamp = DateTime.UtcNow.ToString("yyyy-MM-dd HH:mm:ss.fff"); + WriteCloudFetchDebug($"๐Ÿš€ [{startTimestamp}] CloudFetchDownloader.DownloadFilesAsync STARTED"); + WriteCloudFetchDebug($"๐Ÿ“Š [{startTimestamp}] CloudFetchDownloader: PIPELINE ENTRY - download_queue={_downloadQueue.Count}, result_queue={_resultQueue.Count}, semaphore_available={_downloadSemaphore.CurrentCount}"); + WriteCloudFetchDebug($"๐Ÿ“Š [{startTimestamp}] CloudFetchDownloader: Instance #{_instanceId} - PIPELINE STATUS CHECK - max_parallel={_maxParallelDownloads}, memory_max={_memoryManager.MaxMemory / 1024 / 1024}MB, memory_used={_memoryManager.UsedMemory / 1024 / 1024}MB"); + WriteCloudFetchDebug($"๐ŸŽฏ [{startTimestamp}] CloudFetchDownloader: *** PRODUCER PIPELINE STARTING - Will add items to result queue for consumer to take ***"); + + try + { + WriteCloudFetchDebug($"CloudFetchDownloader: About to call DownloadFilesInternalAsync - download_queue={_downloadQueue.Count}"); + await DownloadFilesInternalAsync(cancellationToken); + WriteCloudFetchDebug($"CloudFetchDownloader: DownloadFilesInternalAsync completed - download_queue={_downloadQueue.Count}, result_queue={_resultQueue.Count}"); + + WriteCloudFetchDebug("CloudFetchDownloader Download: Files download completed successfully"); + WriteCloudFetchDebug($"CloudFetchDownloader: Batch completed - semaphore_available={_downloadSemaphore.CurrentCount}, max_parallel={_maxParallelDownloads}, download_queue={_downloadQueue.Count}, result_queue={_resultQueue.Count}"); + } + catch (Exception ex) + { + WriteCloudFetchDebug($"CloudFetchDownloader Download: Exception - {ex.Message}"); + throw; + } + } + + private async Task DownloadFilesInternalAsync(CancellationToken cancellationToken) + { + int totalFiles = 0; int successfulDownloads = 0; int failedDownloads = 0; @@ -209,9 +332,13 @@ private async Task DownloadFilesAsync(CancellationToken cancellationToken) var downloadTaskCompletionSource = new TaskCompletionSource(); // Process items from the download queue until it's completed + WriteCloudFetchDebug($"CloudFetchDownloader Internal: About to start consuming download queue - queue_count={_downloadQueue.Count}, queue_completed={_downloadQueue.IsCompleted}"); + WriteCloudFetchDebug($"๐Ÿ” CloudFetchDownloader Internal: PRODUCER LOOP STARTING - This will iterate until download queue is completed"); + foreach (var downloadResult in _downloadQueue.GetConsumingEnumerable(cancellationToken)) { totalFiles++; + WriteCloudFetchDebug($"CloudFetchDownloader Internal: Processing file #{totalFiles} - download_queue={_downloadQueue.Count}, result_queue={_resultQueue.Count}"); // Check if there's an error before processing more downloads if (HasError) @@ -233,7 +360,7 @@ private async Task DownloadFilesAsync(CancellationToken cancellationToken) } catch (Exception ex) { - Trace.TraceWarning($"Error waiting for downloads to complete: {ex.Message}"); + WriteCloudFetchDebug($"CloudFetchDownloader: Warning - Error waiting for downloads to complete: {ex.Message}"); // Don't set error here, as individual download tasks will handle their own errors } } @@ -242,7 +369,10 @@ private async Task DownloadFilesAsync(CancellationToken cancellationToken) if (!HasError) { // Add the guard to the result queue to signal the end of results + WriteCloudFetchDebug("CloudFetchDownloader Internal: โšช All downloads completed successfully - adding EndOfResultsGuard.Instance to result queue"); + WriteCloudFetchDebug($"CloudFetchDownloader Internal: โšช Result queue has {_resultQueue.Count} items before adding EndOfResultsGuard"); _resultQueue.Add(EndOfResultsGuard.Instance, cancellationToken); + WriteCloudFetchDebug("CloudFetchDownloader Internal: โšช EndOfResultsGuard.Instance added - CloudFetchReader will handle final batch aggregation"); _isCompleted = true; } break; @@ -257,32 +387,79 @@ private async Task DownloadFilesAsync(CancellationToken cancellationToken) { // Update the download result with the refreshed link downloadResult.UpdateWithRefreshedLink(refreshedLink); - Trace.TraceInformation($"Updated URL for file at offset {refreshedLink.StartRowOffset} before download"); + System.Diagnostics.Trace.TraceInformation($"Updated URL for file at offset {refreshedLink.StartRowOffset} before download"); } } // Acquire a download slot + int taskId = totalFiles; // Use file number as unique task ID + WriteCloudFetchDebug($"CloudFetchDownloader Internal: [TASK-{taskId}] About to wait for semaphore - available={_downloadSemaphore.CurrentCount}, max={_maxParallelDownloads}, active_tasks={downloadTasks.Count}"); + + // CRITICAL: Add debugging around the WaitAsync call + var semaphoreBefore = _downloadSemaphore.CurrentCount; + WriteCloudFetchDebug($"CloudFetchDownloader Internal: [TASK-{taskId}] Semaphore BEFORE WaitAsync: available={semaphoreBefore}"); + await _downloadSemaphore.WaitAsync(cancellationToken).ConfigureAwait(false); + var semaphoreAfter = _downloadSemaphore.CurrentCount; + WriteCloudFetchDebug($"CloudFetchDownloader Internal: [TASK-{taskId}] Got semaphore slot - BEFORE: available={semaphoreBefore}, AFTER: available={semaphoreAfter}, active_tasks={downloadTasks.Count}"); + + // Sanity check: After WaitAsync, available count should be 1 less than before + if (semaphoreAfter != semaphoreBefore - 1) + { + WriteCloudFetchDebug($"CloudFetchDownloader Internal: [TASK-{taskId}] โš ๏ธ SEMAPHORE INCONSISTENCY: Expected available={semaphoreBefore - 1}, got available={semaphoreAfter}"); + } + + WriteCloudFetchDebug($"CloudFetchDownloader Internal: [TASK-{taskId}] Starting download task - active_tasks={downloadTasks.Count}"); + // Start the download task - Task downloadTask = DownloadFileAsync(downloadResult, cancellationToken) - .ContinueWith(t => + WriteCloudFetchDebug($"CloudFetchDownloader Internal: [TASK-{taskId}] About to start DownloadFileAsync"); + Task downloadTask = DownloadFileAsync(downloadResult, cancellationToken); + + // CRITICAL: Add task to dictionary BEFORE ContinueWith to avoid race condition + downloadTasks[downloadTask] = downloadResult; + WriteCloudFetchDebug($"CloudFetchDownloader Internal: [TASK-{taskId}] Added task to dictionary BEFORE continuation - active_tasks={downloadTasks.Count}"); + + // Now add the continuation + downloadTask = downloadTask.ContinueWith(t => { - // Release the download slot - _downloadSemaphore.Release(); + WriteCloudFetchDebug($"CloudFetchDownloader Internal: [TASK-{taskId}] Download task completed - Status={t.Status}, IsFaulted={t.IsFaulted}, IsCanceled={t.IsCanceled}"); + + // CRITICAL: Add debugging around the Release call + var semaphoreBeforeRelease = _downloadSemaphore.CurrentCount; + WriteCloudFetchDebug($"CloudFetchDownloader Internal: [TASK-{taskId}] Semaphore BEFORE Release: available={semaphoreBeforeRelease}, max={_maxParallelDownloads}"); - // Remove the task from the dictionary - downloadTasks.TryRemove(t, out _); + var releasedCount = _downloadSemaphore.Release(); + + var semaphoreAfterRelease = _downloadSemaphore.CurrentCount; + WriteCloudFetchDebug($"CloudFetchDownloader Internal: [TASK-{taskId}] RELEASED semaphore slot - BEFORE: available={semaphoreBeforeRelease}, AFTER: available={semaphoreAfterRelease}, released_count={releasedCount}, max_count={_maxParallelDownloads}"); + + // CRITICAL: If released_count is 0, that means we're over-releasing! + if (releasedCount == 0) + { + WriteCloudFetchDebug($"CloudFetchDownloader Internal: [TASK-{taskId}] โš ๏ธ WARNING: Release() returned 0 - semaphore was already at maximum! BEFORE={semaphoreBeforeRelease}, AFTER={semaphoreAfterRelease}"); + } + + // Sanity check: After Release, available count should be 1 more than before (unless we were already at max) + if (releasedCount > 0 && semaphoreAfterRelease != semaphoreBeforeRelease + 1) + { + WriteCloudFetchDebug($"CloudFetchDownloader Internal: [TASK-{taskId}] โš ๏ธ RELEASE INCONSISTENCY: Expected available={semaphoreBeforeRelease + 1}, got available={semaphoreAfterRelease}"); + } + + // Remove the ORIGINAL task from the dictionary (not the continuation task) + var originalTask = t; // t is the original DownloadFileAsync task + bool removed = downloadTasks.TryRemove(originalTask, out _); + WriteCloudFetchDebug($"CloudFetchDownloader Internal: [TASK-{taskId}] Removed ORIGINAL task from dictionary - removed={removed}, active_tasks={downloadTasks.Count}"); // Handle any exceptions if (t.IsFaulted) { Exception ex = t.Exception?.InnerException ?? new Exception("Unknown error"); - Trace.TraceError($"Download failed for file {SanitizeUrl(downloadResult.Link.FileLink)}: {ex.Message}"); + WriteCloudFetchDebug($"CloudFetchDownloader: [TASK-{taskId}] Error - Download failed for file {SanitizeUrl(downloadResult.Link.FileLink)}: {ex.Message}"); // Set the download as failed downloadResult.SetFailed(ex); - failedDownloads++; + Interlocked.Increment(ref failedDownloads); // Set the error state to stop the download process SetError(ex); @@ -290,18 +467,49 @@ private async Task DownloadFilesAsync(CancellationToken cancellationToken) // Signal that we should stop processing downloads downloadTaskCompletionSource.TrySetException(ex); } - else if (!t.IsFaulted && !t.IsCanceled) + else if (t.IsCompleted && !t.IsFaulted && !t.IsCanceled) { - successfulDownloads++; - totalBytes += downloadResult.Size; + Interlocked.Increment(ref successfulDownloads); + Interlocked.Add(ref totalBytes, downloadResult.Size); + WriteCloudFetchDebug($"CloudFetchDownloader Internal: [TASK-{taskId}] Download successful - total_success={successfulDownloads}, total_bytes={totalBytes / 1024.0 / 1024.0:F1}MB"); } - }, cancellationToken); + else if (t.IsCanceled) + { + WriteCloudFetchDebug($"CloudFetchDownloader Internal: [TASK-{taskId}] Download was canceled"); + } + }, TaskContinuationOptions.ExecuteSynchronously); - // Add the task to the dictionary - downloadTasks[downloadTask] = downloadResult; + // CRITICAL: Add to result queue in ORDER to maintain sequence (before download completes) + // But use TryAdd with timeout to prevent deadlock when queue is full + var queueCountBeforeAdd = _resultQueue.Count; + WriteCloudFetchDebug($"๐Ÿ“ฅ CloudFetchDownloader Internal: [TASK-{taskId}] About to ADD item to result queue - queue_count_before={queueCountBeforeAdd}, queue_capacity={_resultQueue.BoundedCapacity}, item_size={downloadResult.Size}"); - // Add the result to the result queue add the result here to assure the download sequence. - _resultQueue.Add(downloadResult, cancellationToken); + if (!_resultQueue.TryAdd(downloadResult, 100, cancellationToken)) + { + // If we can't add within 100ms, implement backpressure by pausing + WriteCloudFetchDebug($"CloudFetchDownloader Internal: [TASK-{taskId}] โš ๏ธ RESULT QUEUE FULL! (capacity={_resultQueue.BoundedCapacity}) - This is blocking TCP connection #{taskId}"); + + // Wait a bit longer and try again + await Task.Delay(500, cancellationToken); + if (!_resultQueue.TryAdd(downloadResult, 1000, cancellationToken)) + { + WriteCloudFetchDebug($"CloudFetchDownloader Internal: [TASK-{taskId}] โš ๏ธ RESULT QUEUE STILL FULL after 1500ms - TCP connection #{taskId} will BLOCK here until consumer processes results"); + // Force add with blocking - but now we know why it's blocking + _resultQueue.Add(downloadResult, cancellationToken); + WriteCloudFetchDebug($"CloudFetchDownloader Internal: [TASK-{taskId}] Result queue blockage resolved - TCP connection #{taskId} can now continue"); + } + else + { + WriteCloudFetchDebug($"CloudFetchDownloader Internal: [TASK-{taskId}] Result queue unblocked after 1500ms - TCP connection #{taskId} proceeding"); + } + } + else + { + var queueCountAfterAdd = _resultQueue.Count; + var addTimestamp = DateTime.UtcNow.ToString("yyyy-MM-dd HH:mm:ss.fff"); + WriteCloudFetchDebug($"โœ… [{addTimestamp}] CloudFetchDownloader Internal: [TASK-{taskId}] ITEM SUCCESSFULLY ADDED to result queue - queue_before={queueCountBeforeAdd}, queue_after={queueCountAfterAdd}, items_added={(queueCountAfterAdd - queueCountBeforeAdd)}"); + WriteCloudFetchDebug($"๐Ÿ“Š [{addTimestamp}] CloudFetchDownloader Internal: [TASK-{taskId}] *** PRODUCER: Item available for consumption *** - result_queue_count={_resultQueue.Count}"); + } // If there's an error, stop processing more downloads if (HasError) @@ -309,29 +517,41 @@ private async Task DownloadFilesAsync(CancellationToken cancellationToken) break; } } + + // CRITICAL: Log why the producer loop ended + WriteCloudFetchDebug($"๐Ÿ” CloudFetchDownloader Internal: PRODUCER LOOP ENDED - total_files_processed={totalFiles}"); + WriteCloudFetchDebug($"๐Ÿ” CloudFetchDownloader Internal: FINAL STATE - download_queue_count={_downloadQueue.Count}, download_queue_completed={_downloadQueue.IsCompleted}"); + WriteCloudFetchDebug($"๐Ÿ” CloudFetchDownloader Internal: FINAL STATE - result_queue_count={_resultQueue.Count}, result_queue_completed={_resultQueue.IsCompleted}"); + WriteCloudFetchDebug($"๐Ÿ” CloudFetchDownloader Internal: FINAL STATE - has_error={HasError}, is_completed={_isCompleted}"); } catch (OperationCanceledException) when (cancellationToken.IsCancellationRequested) { // Expected when cancellation is requested - Trace.TraceInformation("Download process was cancelled"); + WriteCloudFetchDebug("CloudFetchDownloader: Download process was cancelled"); } catch (Exception ex) { - Trace.TraceError($"Error in download loop: {ex.Message}"); + WriteCloudFetchDebug($"CloudFetchDownloader: Error in download loop: {ex.Message}"); SetError(ex); } finally { overallStopwatch.Stop(); - Trace.TraceInformation( - $"Download process completed. Total files: {totalFiles}, Successful: {successfulDownloads}, " + - $"Failed: {failedDownloads}, Total size: {totalBytes / 1024.0 / 1024.0:F2} MB, Total time: {overallStopwatch.ElapsedMilliseconds / 1000.0:F2} sec"); + WriteCloudFetchDebug($"CloudFetchDownloader: Download process completed - TotalFiles={totalFiles}, SuccessfulDownloads={successfulDownloads}, FailedDownloads={failedDownloads}, TotalSizeMB={totalBytes / 1024.0 / 1024.0:F2}, TotalTimeSec={overallStopwatch.ElapsedMilliseconds / 1000.0:F2}"); - // If there's an error, add the error to the result queue + // If there's an error, add the error to the result queue and complete it if (HasError) { CompleteWithError(); + WriteCloudFetchDebug("CloudFetchDownloader Internal: ๐Ÿ”’ Marking result queue as COMPLETED due to error"); + _resultQueue.CompleteAdding(); + } + else if (!_resultQueue.IsCompleted) + { + // Ensure result queue is completed in normal scenarios (should already be done by EndOfResultsGuard processing) + WriteCloudFetchDebug("CloudFetchDownloader Internal: ๐Ÿ”’ Ensuring result queue is COMPLETED (backup completion)"); + _resultQueue.CompleteAdding(); } } } @@ -345,74 +565,131 @@ private async Task DownloadFileAsync(IDownloadResult downloadResult, Cancellatio // Use the size directly from the download result long size = downloadResult.Size; + WriteCloudFetchDebug($"CloudFetchDownloader: ENTERED DownloadFileAsync for {sanitizedUrl}, expected size: {size / 1024.0:F2} KB"); + // Create a stopwatch to track download time var stopwatch = Stopwatch.StartNew(); // Log download start - Trace.TraceInformation($"Starting download of file {sanitizedUrl}, expected size: {size / 1024.0:F2} KB"); + System.Diagnostics.Trace.TraceInformation($"Starting download of file {sanitizedUrl}, expected size: {size / 1024.0:F2} KB"); // Acquire memory before downloading - await _memoryManager.AcquireMemoryAsync(size, cancellationToken).ConfigureAwait(false); + WriteCloudFetchDebug($"CloudFetchDownloader: About to acquire memory for {sanitizedUrl}, size: {size / 1024.0:F2} KB"); + + // SUSPECT #1: Memory acquisition might be hanging + WriteCloudFetchDebug($"CloudFetchDownloader: MEMORY STATE BEFORE ACQUIRE for {sanitizedUrl} - Requesting: {size / 1024.0 / 1024.0:F1}MB, Current: {_memoryManager.UsedMemory / 1024.0 / 1024.0:F1}MB, Max: {_memoryManager.MaxMemory / 1024.0 / 1024.0:F1}MB, Available: {(_memoryManager.MaxMemory - _memoryManager.UsedMemory) / 1024.0 / 1024.0:F1}MB"); + + var memoryTask = _memoryManager.AcquireMemoryAsync(size, cancellationToken); + var memoryDelay = Task.Delay(5000); // 5 second timeout for debugging + var completedTask = await Task.WhenAny(memoryTask, memoryDelay); + + if (completedTask == memoryDelay) + { + WriteCloudFetchDebug($"CloudFetchDownloader: MEMORY ACQUISITION HANGING for {sanitizedUrl} - Requesting: {size / 1024.0 / 1024.0:F1}MB, Used: {_memoryManager.UsedMemory / 1024.0 / 1024.0:F1}MB, Max: {_memoryManager.MaxMemory / 1024.0 / 1024.0:F1}MB"); + WriteCloudFetchDebug($"CloudFetchDownloader: โš ๏ธ MEMORY PRESSURE! Need {size / 1024.0 / 1024.0:F1}MB but only {(_memoryManager.MaxMemory - _memoryManager.UsedMemory) / 1024.0 / 1024.0:F1}MB available - this is likely the problem!"); + + // Still wait for it to complete, but now we know it's slow + await memoryTask.ConfigureAwait(false); + WriteCloudFetchDebug($"CloudFetchDownloader: Memory finally acquired for {sanitizedUrl} after timeout - New Used: {_memoryManager.UsedMemory / 1024.0 / 1024.0:F1}MB"); + } + else + { + await memoryTask.ConfigureAwait(false); + WriteCloudFetchDebug($"CloudFetchDownloader: Memory acquired quickly for {sanitizedUrl} - New Used: {_memoryManager.UsedMemory / 1024.0 / 1024.0:F1}MB"); + } // Retry logic for downloading files for (int retry = 0; retry < _maxRetries; retry++) { try { - // Download the file directly - using HttpResponseMessage response = await _httpClient.GetAsync( - url, - HttpCompletionOption.ResponseHeadersRead, - cancellationToken).ConfigureAwait(false); - - // Check if the response indicates an expired URL (typically 403 or 401) - if (response.StatusCode == System.Net.HttpStatusCode.Forbidden || - response.StatusCode == System.Net.HttpStatusCode.Unauthorized) + WriteCloudFetchDebug($"CloudFetchDownloader: Starting HTTP request for {sanitizedUrl} (attempt {retry + 1}/{_maxRetries})"); + + // SUSPECT #2: HTTP request might be hanging + var httpTask = _httpClient.GetAsync(url, HttpCompletionOption.ResponseHeadersRead, cancellationToken); + var httpDelay = Task.Delay(10000); // 10 second timeout for debugging + var httpCompletedTask = await Task.WhenAny(httpTask, httpDelay); + + HttpResponseMessage response; + if (httpCompletedTask == httpDelay) + { + WriteCloudFetchDebug($"CloudFetchDownloader: HTTP REQUEST HANGING for {sanitizedUrl} - this is likely the problem!"); + response = await httpTask.ConfigureAwait(false); + WriteCloudFetchDebug($"CloudFetchDownloader: HTTP request finally completed for {sanitizedUrl} after timeout"); + } + else { - // If we've already tried refreshing too many times, fail - if (downloadResult.RefreshAttempts >= _maxUrlRefreshAttempts) + response = await httpTask.ConfigureAwait(false); + WriteCloudFetchDebug($"CloudFetchDownloader: HTTP response received quickly for {sanitizedUrl} - Status: {response.StatusCode}"); + } + + using (response) + { + // Check if the response indicates an expired URL (typically 403 or 401) + if (response.StatusCode == System.Net.HttpStatusCode.Forbidden || + response.StatusCode == System.Net.HttpStatusCode.Unauthorized) { - throw new InvalidOperationException($"Failed to download file after {downloadResult.RefreshAttempts} URL refresh attempts."); + // If we've already tried refreshing too many times, fail + if (downloadResult.RefreshAttempts >= _maxUrlRefreshAttempts) + { + throw new InvalidOperationException($"Failed to download file after {downloadResult.RefreshAttempts} URL refresh attempts."); + } + + // Try to refresh the URL + var refreshedLink = await _resultFetcher.GetUrlAsync(downloadResult.Link.StartRowOffset, cancellationToken); + if (refreshedLink != null) + { + // Update the download result with the refreshed link + downloadResult.UpdateWithRefreshedLink(refreshedLink); + url = refreshedLink.FileLink; + sanitizedUrl = SanitizeUrl(url); + + System.Diagnostics.Trace.TraceInformation($"URL for file at offset {refreshedLink.StartRowOffset} was refreshed after expired URL response"); + + // Continue to the next retry attempt with the refreshed URL + continue; + } + else + { + // If refresh failed, throw an exception + throw new InvalidOperationException("Failed to refresh expired URL."); + } } - // Try to refresh the URL - var refreshedLink = await _resultFetcher.GetUrlAsync(downloadResult.Link.StartRowOffset, cancellationToken); - if (refreshedLink != null) + response.EnsureSuccessStatusCode(); + + // Log the download size if available from response headers + long? contentLength = response.Content.Headers.ContentLength; + if (contentLength.HasValue && contentLength.Value > 0) { - // Update the download result with the refreshed link - downloadResult.UpdateWithRefreshedLink(refreshedLink); - url = refreshedLink.FileLink; - sanitizedUrl = SanitizeUrl(url); + System.Diagnostics.Trace.TraceInformation($"Actual file size for {sanitizedUrl}: {contentLength.Value / 1024.0 / 1024.0:F2} MB"); + } - Trace.TraceInformation($"URL for file at offset {refreshedLink.StartRowOffset} was refreshed after expired URL response"); + // SUSPECT #3: Data reading might be hanging + WriteCloudFetchDebug($"CloudFetchDownloader: Starting to read response data for {sanitizedUrl}"); + var readTask = response.Content.ReadAsByteArrayAsync(); + var readDelay = Task.Delay(15000); // 15 second timeout for debugging + var readCompletedTask = await Task.WhenAny(readTask, readDelay); - // Continue to the next retry attempt with the refreshed URL - continue; + if (readCompletedTask == readDelay) + { + WriteCloudFetchDebug($"CloudFetchDownloader: DATA READING HANGING for {sanitizedUrl} - this is likely the problem!"); + fileData = await readTask.ConfigureAwait(false); + WriteCloudFetchDebug($"CloudFetchDownloader: Data finally read for {sanitizedUrl} after timeout - size: {fileData.Length / 1024.0:F2} KB"); } else { - // If refresh failed, throw an exception - throw new InvalidOperationException("Failed to refresh expired URL."); + fileData = await readTask.ConfigureAwait(false); + WriteCloudFetchDebug($"CloudFetchDownloader: Data read quickly for {sanitizedUrl} - size: {fileData.Length / 1024.0:F2} KB"); } - } - - response.EnsureSuccessStatusCode(); - // Log the download size if available from response headers - long? contentLength = response.Content.Headers.ContentLength; - if (contentLength.HasValue && contentLength.Value > 0) - { - Trace.TraceInformation($"Actual file size for {sanitizedUrl}: {contentLength.Value / 1024.0 / 1024.0:F2} MB"); + break; // Success, exit retry loop } - - // Read the file data - fileData = await response.Content.ReadAsByteArrayAsync().ConfigureAwait(false); - break; // Success, exit retry loop } catch (Exception ex) when (retry < _maxRetries - 1 && !cancellationToken.IsCancellationRequested) { // Log the error and retry - Trace.TraceError($"Error downloading file {SanitizeUrl(url)} (attempt {retry + 1}/{_maxRetries}): {ex.Message}"); + WriteCloudFetchDebug($"CloudFetchDownloader: Error downloading file {SanitizeUrl(url)} (attempt {retry + 1}/{_maxRetries}): {ex.Message}"); await Task.Delay(_retryDelayMs * (retry + 1), cancellationToken).ConfigureAwait(false); } @@ -421,10 +698,12 @@ private async Task DownloadFileAsync(IDownloadResult downloadResult, Cancellatio if (fileData == null) { stopwatch.Stop(); - Trace.TraceError($"Failed to download file {sanitizedUrl} after {_maxRetries} attempts. Elapsed time: {stopwatch.ElapsedMilliseconds} ms"); + System.Diagnostics.Trace.TraceError($"Failed to download file {sanitizedUrl} after {_maxRetries} attempts. Elapsed time: {stopwatch.ElapsedMilliseconds} ms"); // Release the memory we acquired _memoryManager.ReleaseMemory(size); + + // DO NOT RELEASE SEMAPHORE HERE - it will be released in ContinueWith throw new InvalidOperationException($"Failed to download file from {url} after {_maxRetries} attempts."); } @@ -437,27 +716,51 @@ private async Task DownloadFileAsync(IDownloadResult downloadResult, Cancellatio { try { + WriteCloudFetchDebug($"CloudFetchDownloader: Starting LZ4 decompression for {sanitizedUrl}"); var decompressStopwatch = Stopwatch.StartNew(); dataStream = new MemoryStream(); + + // SUSPECT #4: LZ4 decompression might be hanging using (var inputStream = new MemoryStream(fileData)) using (var decompressor = LZ4Stream.Decode(inputStream)) { - await decompressor.CopyToAsync(dataStream, 81920, cancellationToken).ConfigureAwait(false); + var decompressTask = decompressor.CopyToAsync(dataStream, 81920, cancellationToken); + var decompressDelay = Task.Delay(10000); // 10 second timeout for debugging + var decompressCompletedTask = await Task.WhenAny(decompressTask, decompressDelay); + + if (decompressCompletedTask == decompressDelay) + { + WriteCloudFetchDebug($"CloudFetchDownloader: LZ4 DECOMPRESSION HANGING for {sanitizedUrl} - this is likely the problem!"); + await decompressTask.ConfigureAwait(false); + WriteCloudFetchDebug($"CloudFetchDownloader: LZ4 decompression finally completed for {sanitizedUrl} after timeout"); + } + else + { + await decompressTask.ConfigureAwait(false); + WriteCloudFetchDebug($"CloudFetchDownloader: LZ4 decompression completed quickly for {sanitizedUrl}"); + } } dataStream.Position = 0; decompressStopwatch.Stop(); - Trace.TraceInformation($"Decompressed file {sanitizedUrl} in {decompressStopwatch.ElapsedMilliseconds} ms. Compressed size: {actualSize / 1024.0:F2} KB, Decompressed size: {dataStream.Length / 1024.0:F2} KB"); + var compressedSize = actualSize; + var decompressedSize = dataStream.Length; + + System.Diagnostics.Trace.TraceInformation($"Decompressed file {sanitizedUrl} in {decompressStopwatch.ElapsedMilliseconds} ms. Compressed size: {compressedSize / 1024.0:F2} KB, Decompressed size: {decompressedSize / 1024.0:F2} KB"); + WriteCloudFetchDebug($"๐Ÿ“Š CloudFetchDownloader: ACTUAL DECOMPRESSION STATS - Compressed: {compressedSize:N0} bytes ({compressedSize / 1024.0 / 1024.0:F1}MB) โ†’ Decompressed: {decompressedSize:N0} bytes ({decompressedSize / 1024.0 / 1024.0:F1}MB)"); + WriteCloudFetchDebug($"๐Ÿ“Š CloudFetchDownloader: COMPRESSION RATIO - {(double)decompressedSize / compressedSize:F1}x expansion ({100.0 * (decompressedSize - compressedSize) / compressedSize:F1}% larger after decompression)"); actualSize = dataStream.Length; } catch (Exception ex) { stopwatch.Stop(); - Trace.TraceError($"Error decompressing data for file {sanitizedUrl}: {ex.Message}. Elapsed time: {stopwatch.ElapsedMilliseconds} ms"); + System.Diagnostics.Trace.TraceError($"Error decompressing data for file {sanitizedUrl}: {ex.Message}. Elapsed time: {stopwatch.ElapsedMilliseconds} ms"); // Release the memory we acquired _memoryManager.ReleaseMemory(size); + + // DO NOT RELEASE SEMAPHORE HERE - it will be released in ContinueWith throw new InvalidOperationException($"Error decompressing data: {ex.Message}", ex); } } @@ -468,10 +771,13 @@ private async Task DownloadFileAsync(IDownloadResult downloadResult, Cancellatio // Stop the stopwatch and log download completion stopwatch.Stop(); - Trace.TraceInformation($"Completed download of file {sanitizedUrl}. Size: {actualSize / 1024.0:F2} KB, Latency: {stopwatch.ElapsedMilliseconds} ms, Throughput: {(actualSize / 1024.0 / 1024.0) / (stopwatch.ElapsedMilliseconds / 1000.0):F2} MB/s"); + WriteCloudFetchDebug($"CloudFetchDownloader: COMPLETED DownloadFileAsync for {sanitizedUrl}. Size: {actualSize / 1024.0:F2} KB, Latency: {stopwatch.ElapsedMilliseconds} ms, Throughput: {(actualSize / 1024.0 / 1024.0) / (stopwatch.ElapsedMilliseconds / 1000.0):F2} MB/s"); + System.Diagnostics.Trace.TraceInformation($"Completed download of file {sanitizedUrl}. Size: {actualSize / 1024.0:F2} KB, Latency: {stopwatch.ElapsedMilliseconds} ms, Throughput: {(actualSize / 1024.0 / 1024.0) / (stopwatch.ElapsedMilliseconds / 1000.0):F2} MB/s"); - // Set the download as completed with the original size - downloadResult.SetCompleted(dataStream, size); + // Set the download as completed with the ACTUAL decompressed size (critical for accurate batch aggregation) + downloadResult.SetCompleted(dataStream, actualSize); + WriteCloudFetchDebug($"CloudFetchDownloader: SetCompleted called for {sanitizedUrl} - DownloadFileAsync FINISHED"); + WriteCloudFetchDebug($"CloudFetchDownloader: MEMORY STATE AFTER DOWNLOAD for {sanitizedUrl} - Used: {_memoryManager.UsedMemory / 1024.0 / 1024.0:F1}MB, Max: {_memoryManager.MaxMemory / 1024.0 / 1024.0:F1}MB (Note: Memory will be released when file is consumed)"); } private void SetError(Exception ex) @@ -480,7 +786,7 @@ private void SetError(Exception ex) { if (_error == null) { - Trace.TraceError($"Setting error state: {ex.Message}"); + WriteCloudFetchDebug($"CloudFetchDownloader: Setting error state: {ex.Message}"); _error = ex; } } @@ -498,7 +804,7 @@ private void CompleteWithError() } catch (Exception ex) { - Trace.TraceError($"Error completing with error: {ex.Message}"); + System.Diagnostics.Trace.TraceError($"Error completing with error: {ex.Message}"); } } diff --git a/csharp/src/Drivers/Databricks/Reader/CloudFetch/CloudFetchMemoryBufferManager.cs b/csharp/src/Drivers/Databricks/Reader/CloudFetch/CloudFetchMemoryBufferManager.cs index 7fe7dc2c88..eb04df92a8 100644 --- a/csharp/src/Drivers/Databricks/Reader/CloudFetch/CloudFetchMemoryBufferManager.cs +++ b/csharp/src/Drivers/Databricks/Reader/CloudFetch/CloudFetchMemoryBufferManager.cs @@ -26,7 +26,7 @@ namespace Apache.Arrow.Adbc.Drivers.Databricks.Reader.CloudFetch /// internal sealed class CloudFetchMemoryBufferManager : ICloudFetchMemoryBufferManager { - private const int DefaultMemoryBufferSizeMB = 200; + private const int DefaultMemoryBufferSizeMB = 1000; // Updated to match CloudFetchDownloadManager default private readonly long _maxMemory; private long _usedMemory; private readonly SemaphoreSlim _memorySemaphore; @@ -96,14 +96,26 @@ public async Task AcquireMemoryAsync(long size, CancellationToken cancellationTo throw new ArgumentOutOfRangeException(nameof(size), $"Requested size ({size} bytes) exceeds maximum memory ({_maxMemory} bytes)."); } + // Add detailed memory debugging + WriteMemoryDebug($"MEMORY-REQUEST: Requesting {size / 1024.0 / 1024.0:F1}MB, Current: {UsedMemory / 1024.0 / 1024.0:F1}MB / {_maxMemory / 1024.0 / 1024.0:F1}MB"); + + int attemptCount = 0; while (!cancellationToken.IsCancellationRequested) { // Try to acquire memory without blocking if (TryAcquireMemory(size)) { + WriteMemoryDebug($"MEMORY-ACQUIRED: Successfully acquired {size / 1024.0 / 1024.0:F1}MB after {attemptCount} attempts, New Total: {UsedMemory / 1024.0 / 1024.0:F1}MB / {_maxMemory / 1024.0 / 1024.0:F1}MB"); return; } + // Log memory pressure every 100 attempts (every ~1 second) + attemptCount++; + if (attemptCount % 100 == 0) + { + WriteMemoryDebug($"MEMORY-BLOCKED: Attempt #{attemptCount} - Still waiting for {size / 1024.0 / 1024.0:F1}MB, Current: {UsedMemory / 1024.0 / 1024.0:F1}MB / {_maxMemory / 1024.0 / 1024.0:F1}MB - MEMORY PRESSURE!"); + } + // If we couldn't acquire memory, wait for some to be released await Task.Delay(10, cancellationToken).ConfigureAwait(false); } @@ -123,6 +135,8 @@ public void ReleaseMemory(long size) // Release memory long newValue = Interlocked.Add(ref _usedMemory, -size); + WriteMemoryDebug($"MEMORY-RELEASED: Released {size / 1024.0 / 1024.0:F1}MB, New Total: {newValue / 1024.0 / 1024.0:F1}MB / {_maxMemory / 1024.0 / 1024.0:F1}MB"); + // Ensure we don't go negative if (newValue < 0) { @@ -131,5 +145,29 @@ public void ReleaseMemory(long size) throw new InvalidOperationException("Memory buffer manager released more memory than was acquired."); } } + + private void WriteMemoryDebug(string message) + { + try + { + var timestamp = DateTime.UtcNow.ToString("yyyy-MM-dd HH:mm:ss"); + var logPath = System.IO.Path.Combine( + Environment.GetFolderPath(Environment.SpecialFolder.ApplicationData), + "adbc-memory-debug.log"); + + // Ensure directory exists + var logDir = System.IO.Path.GetDirectoryName(logPath); + if (!string.IsNullOrEmpty(logDir) && !System.IO.Directory.Exists(logDir)) + { + System.IO.Directory.CreateDirectory(logDir); + } + + System.IO.File.AppendAllText(logPath, $"[{timestamp}] {message}{Environment.NewLine}"); + } + catch + { + // If file logging fails, ignore it to prevent crashes + } + } } } diff --git a/csharp/src/Drivers/Databricks/Reader/CloudFetch/CloudFetchReader.cs b/csharp/src/Drivers/Databricks/Reader/CloudFetch/CloudFetchReader.cs index 8c17308332..01d50c63e8 100644 --- a/csharp/src/Drivers/Databricks/Reader/CloudFetch/CloudFetchReader.cs +++ b/csharp/src/Drivers/Databricks/Reader/CloudFetch/CloudFetchReader.cs @@ -16,13 +16,17 @@ */ using System; +using System.Collections.Generic; using System.Diagnostics; +using System.Linq; using System.Net.Http; using System.Threading; using System.Threading.Tasks; using Apache.Arrow.Adbc.Drivers.Apache.Hive2; using Apache.Arrow.Adbc.Tracing; using Apache.Arrow.Ipc; +using Apache.Arrow.Types; + using Apache.Hive.Service.Rpc.Thrift; namespace Apache.Arrow.Adbc.Drivers.Databricks.Reader.CloudFetch @@ -38,6 +42,178 @@ internal sealed class CloudFetchReader : BaseDatabricksReader private IDownloadResult? currentDownloadResult; private bool isPrefetchEnabled; + // Tracing counters + private int _readCallCount = 0; + private int _totalBatchesReturned = 0; + private long _totalRowsReturned = 0; + private long _totalBytesRead = 0; + private long _totalFilesDownloaded = 0; // Track total files downloaded across all calls + private long _totalFileDataProcessed = 0; // Track total file data processed (downloaded file sizes) + + // Multi-file batch aggregation + private readonly long _batchSizeLimitBytes; + private readonly List _batchBuffer = new List(); + private long _currentBatchBufferSize = 0; + + #region Debug Helpers + + private static void WriteCloudFetchDebug(string message) + { + try + { + + var debugFile = System.IO.Path.Combine( + Environment.GetFolderPath(Environment.SpecialFolder.LocalApplicationData), + "adbc-cloudfetch-debug.log" + ); + var timestamped = $"[{DateTime.UtcNow:yyyy-MM-dd HH:mm:ss}] {message}"; + System.IO.File.AppendAllText(debugFile, timestamped + Environment.NewLine); + + } + catch + { + // Ignore file write errors + } + } + + #endregion + + #region Multi-File Batch Aggregation Helpers + + // Removed estimation methods - we now use actual file sizes for batching decisions + + /// + /// Concatenates multiple RecordBatch objects into a single batch + /// Uses Apache Arrow's built-in concatenation functionality + /// + private RecordBatch? ConcatenateBatches(List batches) + { + if (batches.Count == 0) return null; + if (batches.Count == 1) return batches[0]; + + try + { + WriteCloudFetchDebug($"CloudFetchReader: Concatenating {batches.Count} batches using Arrow's built-in functionality"); + + // Calculate total rows for validation + long totalRows = batches.Sum(b => (long)b.Length); + WriteCloudFetchDebug($"CloudFetchReader: Total rows to concatenate: {totalRows:N0}"); + + // Get the schema from the first batch (all batches should have the same schema) + var schema = batches[0].Schema; + + // Create lists to hold concatenated arrays for each column + var concatenatedArrays = new IArrowArray[schema.FieldsList.Count]; + + for (int columnIndex = 0; columnIndex < schema.FieldsList.Count; columnIndex++) + { + // Collect all arrays for this column from all batches + var columnArrays = batches.Select(batch => batch.Column(columnIndex)).ToArray(); + + // Use ArrowArrayConcatenator to concatenate arrays for this column + concatenatedArrays[columnIndex] = ArrowArrayConcatenator.Concatenate(columnArrays); + } + + // Create the final concatenated RecordBatch + var concatenatedBatch = new RecordBatch(schema, concatenatedArrays, (int)totalRows); + + WriteCloudFetchDebug($"CloudFetchReader: Successfully concatenated {batches.Count} batches into one batch with {concatenatedBatch.Length} rows"); + + return concatenatedBatch; + } + catch (Exception ex) + { + WriteCloudFetchDebug($"CloudFetchReader: Failed to concatenate {batches.Count} batches: {ex.Message}"); + WriteCloudFetchDebug($"CloudFetchReader: Falling back to returning first batch to avoid data loss"); + + // Fallback: return first batch if concatenation fails + // This preserves at least some data rather than losing everything + return batches[0]; + } + } + + /// + /// Clears the batch buffer and resets size tracking + /// + private void ClearBatchBuffer() + { + foreach (var batch in _batchBuffer) + { + batch?.Dispose(); + } + _batchBuffer.Clear(); + _currentBatchBufferSize = 0; + } + + /// + /// Estimates the memory size of a RecordBatch in bytes (decompressed size). + /// + private long EstimateRecordBatchSize(RecordBatch batch) + { + long totalSize = 0; + + // Estimate based on column count, row count, and average bytes per value + for (int i = 0; i < batch.ColumnCount; i++) + { + var array = batch.Column(i); + totalSize += EstimateArraySize(array); + } + + return totalSize; + } + + /// + /// Estimates the memory size of an IArrowArray in bytes. + /// + private long EstimateArraySize(IArrowArray array) + { + // Base overhead per array + long size = 64; // Base object overhead + + // Add data buffer sizes based on array type and length + switch (array.Data.DataType.TypeId) + { + case ArrowTypeId.Int8: + size += array.Length * 1; + break; + case ArrowTypeId.Int16: + size += array.Length * 2; + break; + case ArrowTypeId.Int32: + case ArrowTypeId.Float: + size += array.Length * 4; + break; + case ArrowTypeId.Int64: + case ArrowTypeId.Double: + case ArrowTypeId.Timestamp: + size += array.Length * 8; + break; + case ArrowTypeId.String: + case ArrowTypeId.Binary: + // For string/binary, estimate average string length + // This is rough - could be more accurate by examining actual data + size += array.Length * 32; // Assume average 32 bytes per string + break; + case ArrowTypeId.Boolean: + size += (array.Length + 7) / 8; // Packed bits + break; + default: + // Default estimate for unknown types + size += array.Length * 8; + break; + } + + // Add validity bitmap if present (1 bit per element) + if (array.Data.NullCount > 0) + { + size += (array.Length + 7) / 8; + } + + return size; + } + + #endregion + /// /// Initializes a new instance of the class. /// @@ -53,8 +229,43 @@ public CloudFetchReader( HttpClient httpClient) : base(statement, schema, response, isLz4Compressed) { - // Check if prefetch is enabled + WriteCloudFetchDebug($"CloudFetchReader constructor called - CloudFetch is being used! (ReadCallCount initialized to: {_readCallCount})"); + WriteCloudFetchDebug($"๐Ÿ—ƒ๏ธ CloudFetchReader: File download counters initialized - TotalFilesDownloaded: {_totalFilesDownloaded}, TotalFileDataProcessed: {_totalFileDataProcessed}"); + + // Initialize batch size limit (default 300MB, configurable) var connectionProps = statement.Connection.Properties; + const long DefaultBatchSizeLimitBytes = 20L * 1024 * 1024; // 300MB default for decompressed RecordBatch data + _batchSizeLimitBytes = DefaultBatchSizeLimitBytes; + + if (connectionProps.TryGetValue("cloudfetch.batch.size.limit.mb", out string? batchSizeLimitStr)) + { + if (long.TryParse(batchSizeLimitStr, out long batchSizeLimitMB) && batchSizeLimitMB > 0) + { + _batchSizeLimitBytes = batchSizeLimitMB * 1024 * 1024; + WriteCloudFetchDebug($"CloudFetchReader: Custom batch size limit configured: {batchSizeLimitMB}MB ({_batchSizeLimitBytes:N0} bytes)"); + } + } + + WriteCloudFetchDebug($"CloudFetchReader: Multi-file batch aggregation enabled with size limit: {_batchSizeLimitBytes / (1024 * 1024)}MB"); + + // CRITICAL FIX: Test using shared ActivitySource instead of TracingReader + WriteCloudFetchDebug("CloudFetchReader: Testing SHARED ActivitySource that TracerProvider listens for..."); + using var testActivity = DatabricksConnection.s_sharedActivitySource.StartActivity("CloudFetchReader-TracerTest"); + WriteCloudFetchDebug($"CloudFetchReader: SHARED ActivitySource test activity created: {(testActivity != null ? "YES" : "NO")}"); + if (testActivity != null) + { + WriteCloudFetchDebug($"CloudFetchReader: SHARED ActivitySource Test Activity ID='{testActivity.Id}', Source='{testActivity.Source?.Name}'"); + testActivity.SetTag("test.message", "CloudFetchReader constructor test via SHARED ActivitySource"); + testActivity.SetTag("test.component", "CloudFetchReader"); + testActivity.SetTag("test.timestamp", DateTime.UtcNow.ToString("yyyy-MM-dd HH:mm:ss")); + testActivity.AddEvent(new ActivityEvent("CloudFetchReader constructor test event")); + testActivity.SetStatus(System.Diagnostics.ActivityStatusCode.Ok); + WriteCloudFetchDebug($"CloudFetchReader: SHARED ActivitySource test activity status set to: {testActivity.Status}"); + } + WriteCloudFetchDebug("CloudFetchReader: SHARED ActivitySource test activity completed - should appear in OpenTelemetry trace file!"); + + // Check if prefetch is enabled + //var connectionProps = statement.Connection.Properties; isPrefetchEnabled = true; // Default to true if (connectionProps.TryGetValue(DatabricksParameters.CloudFetchPrefetchEnabled, out string? prefetchEnabledStr)) { @@ -90,23 +301,147 @@ public CloudFetchReader( /// The next record batch, or null if there are no more batches. public override async ValueTask ReadNextRecordBatchAsync(CancellationToken cancellationToken = default) { - return await this.TraceActivityAsync(async _ => + var callNumber = _readCallCount + 1; // Preview the call number before increment + WriteCloudFetchDebug($"CloudFetchReader.ReadNextRecordBatchAsync called - CALL #{callNumber}"); + + // FIXED: Use shared ActivitySource that TracerProvider is listening for + using var manualActivity = DatabricksConnection.s_sharedActivitySource.StartActivity("ReadNextRecordBatchAsync"); + WriteCloudFetchDebug($"CloudFetchReader: SHARED Activity {(manualActivity != null ? "CREATED" : "NULL")}"); + if (manualActivity != null) + { + WriteCloudFetchDebug($"CloudFetchReader SHARED Activity Details: ID='{manualActivity.Id}', Name='{manualActivity.DisplayName}', Source='{manualActivity.Source?.Name}', Status='{manualActivity.Status}'"); + } + + try { + // Increment read call counter + var currentCallNumber = System.Threading.Interlocked.Increment(ref _readCallCount); + var timestamp = DateTime.UtcNow.ToString("yyyy-MM-dd HH:mm:ss.fff"); + WriteCloudFetchDebug($"๐Ÿš€ [{timestamp}] CloudFetchReader: EXTERNAL CALL #{currentCallNumber} STARTED (this may internally consume multiple queue items for batch aggregation)"); + + // Set initial activity tags (only if activity exists) + manualActivity?.SetTag("cloudfetch.reader.call_number", currentCallNumber); + manualActivity?.SetTag("cloudfetch.reader.total_calls_so_far", currentCallNumber); + manualActivity?.SetTag("cloudfetch.reader.total_batches_returned_so_far", _totalBatchesReturned); + manualActivity?.SetTag("cloudfetch.reader.total_rows_returned_so_far", _totalRowsReturned); + manualActivity?.SetTag("cloudfetch.reader.total_bytes_read_so_far", _totalBytesRead); + manualActivity?.SetTag("cloudfetch.reader.total_files_downloaded_so_far", _totalFilesDownloaded); + manualActivity?.SetTag("cloudfetch.reader.total_file_data_processed_so_far", _totalFileDataProcessed); + ThrowIfDisposed(); + int internalLoopIteration = 0; + int filesConsumedThisCall = 0; + int batchesProcessedThisCall = 0; + WriteCloudFetchDebug($"๐Ÿ”„ CloudFetchReader: EXTERNAL CALL #{currentCallNumber} - Starting internal loop to collect files until {_batchSizeLimitBytes / 1024 / 1024}MB limit"); + while (true) { + internalLoopIteration++; + var loopTimestamp = DateTime.UtcNow.ToString("yyyy-MM-dd HH:mm:ss.fff"); + WriteCloudFetchDebug($"๐Ÿ”„ [{loopTimestamp}] CloudFetchReader: EXTERNAL CALL #{currentCallNumber} - INTERNAL LOOP ITERATION #{internalLoopIteration} - current_reader={(this.currentReader != null ? "EXISTS" : "NULL")}, buffer_count={_batchBuffer.Count}"); // If we have a current reader, try to read the next batch if (this.currentReader != null) { + WriteCloudFetchDebug($"CloudFetchReader: INTERNAL LOOP #{internalLoopIteration} - Reading from EXISTING stream (this.currentReader != null)"); + manualActivity?.AddEvent("cloudfetch.reader.reading_from_current_stream"); RecordBatch? next = await this.currentReader.ReadNextRecordBatchAsync(cancellationToken); if (next != null) { - return next; + // Calculate batch metrics + var batchRowCount = next.Length; + var actualDecompressedFileSize = this.currentDownloadResult?.Size ?? 0; + + // CRITICAL FIX: Use ACTUAL decompressed file size for comparison, but estimate individual RecordBatch size + // Note: We still need to estimate individual RecordBatch size since we're aggregating RecordBatch objects, not files + var estimatedRecordBatchSize = EstimateRecordBatchSize(next); + batchesProcessedThisCall++; + WriteCloudFetchDebug($"๐Ÿ“Š CloudFetchReader: Got batch #{batchesProcessedThisCall} with {batchRowCount:N0} rows"); + WriteCloudFetchDebug($"๐Ÿ“Š CloudFetchReader: TOTAL FILE size (ACTUAL decompressed): {actualDecompressedFileSize:N0} bytes ({actualDecompressedFileSize / 1024.0 / 1024.0:F1}MB)"); + WriteCloudFetchDebug($"๐Ÿ“Š CloudFetchReader: SINGLE RecordBatch size (estimated): {estimatedRecordBatchSize:N0} bytes ({estimatedRecordBatchSize / 1024.0 / 1024.0:F1}MB)"); + WriteCloudFetchDebug($"๐Ÿ“Š CloudFetchReader: File contains approximately {(actualDecompressedFileSize > 0 ? (double)actualDecompressedFileSize / estimatedRecordBatchSize : 0):F1} RecordBatch objects"); + WriteCloudFetchDebug($"๐Ÿ“Š CloudFetchReader: โœ… ACTUAL vs ESTIMATED - File: {actualDecompressedFileSize:N0} bytes, This RecordBatch: {estimatedRecordBatchSize:N0} bytes"); + + // Add to batch buffer for aggregation until we reach configured size limit + var bufferCountBefore = _batchBuffer.Count; + var bufferSizeBefore = _currentBatchBufferSize; + var bufferTimestamp = DateTime.UtcNow.ToString("yyyy-MM-dd HH:mm:ss.fff"); + WriteCloudFetchDebug($"๐Ÿ”„ [{bufferTimestamp}] CloudFetchReader: EXTERNAL CALL #{currentCallNumber} - BATCH #{batchesProcessedThisCall} - About to ADD RecordBatch to batch buffer - buffer_count_before={bufferCountBefore}, buffer_size_before={bufferSizeBefore:N0} bytes"); + + _batchBuffer.Add(next); + _currentBatchBufferSize += estimatedRecordBatchSize; // FIXED: Use estimated RecordBatch size for individual batch aggregation + + var bufferCountAfter = _batchBuffer.Count; + var bufferSizeAfter = _currentBatchBufferSize; + WriteCloudFetchDebug($"โœ… CloudFetchReader: RecordBatch ADDED to batch buffer - buffer_before={bufferCountBefore}, buffer_after={bufferCountAfter}, size_before={bufferSizeBefore:N0}, size_after={bufferSizeAfter:N0} bytes"); + WriteCloudFetchDebug($"๐Ÿ“Š CloudFetchReader: *** PROOF: Result queue item was CONSUMED and stored in batch buffer for aggregation ***"); + WriteCloudFetchDebug($"CloudFetchReader: Buffer now has {_batchBuffer.Count} batches, {_currentBatchBufferSize:N0} bytes (limit: {_batchSizeLimitBytes:N0} bytes)"); + + // Check if we've reached the size limit + WriteCloudFetchDebug($"๐ŸŽฏ CloudFetchReader: Size limit check - current: {_currentBatchBufferSize:N0} bytes vs limit: {_batchSizeLimitBytes:N0} bytes, exceeded: {(_currentBatchBufferSize >= _batchSizeLimitBytes)}"); + if (_currentBatchBufferSize >= _batchSizeLimitBytes) + { + var concatTimestamp = DateTime.UtcNow.ToString("yyyy-MM-dd HH:mm:ss.fff"); + WriteCloudFetchDebug($"๐Ÿ”— [{concatTimestamp}] CloudFetchReader: EXTERNAL CALL #{currentCallNumber} - Size limit reached! Starting CONCATENATION of {_batchBuffer.Count} batches (this may cause result queue backup)"); + + // Create aggregated batch from all buffered batches + var aggregatedBatch = ConcatenateBatches(_batchBuffer); + + if (aggregatedBatch != null) + { + var aggregatedRowCount = aggregatedBatch.Length; + var completionTimestamp = DateTime.UtcNow.ToString("yyyy-MM-dd HH:mm:ss.fff"); + + WriteCloudFetchDebug($"โœ… [{completionTimestamp}] CloudFetchReader: EXTERNAL CALL #{currentCallNumber} COMPLETED โœ… | Aggregated {_batchBuffer.Count} files into {aggregatedRowCount:N0} rows from {_currentBatchBufferSize:N0} bytes"); + WriteCloudFetchDebug($"๐Ÿ“Š CloudFetchReader: EXTERNAL CALL #{currentCallNumber} FINAL STATISTICS: files_consumed_from_queue={filesConsumedThisCall}, batches_processed={batchesProcessedThisCall}, internal_loop_iterations={internalLoopIteration}"); + WriteCloudFetchDebug($"๐Ÿ—ƒ๏ธ CloudFetchReader: EXTERNAL CALL #{currentCallNumber} GLOBAL IMPACT: total_files_downloaded_so_far={_totalFilesDownloaded:N0}, total_file_data_processed_so_far={_totalFileDataProcessed / 1024.0 / 1024.0:F1}MB"); + + // Update counters for the aggregated batch + System.Threading.Interlocked.Increment(ref _totalBatchesReturned); + System.Threading.Interlocked.Add(ref _totalRowsReturned, aggregatedRowCount); + System.Threading.Interlocked.Add(ref _totalBytesRead, _currentBatchBufferSize); // This is now total file sizes + + // Add aggregated batch activity tags + manualActivity?.SetTag("cloudfetch.reader.result", "aggregated_batch_returned"); + manualActivity?.SetTag("cloudfetch.reader.aggregated_batch_row_count", aggregatedRowCount); + manualActivity?.SetTag("cloudfetch.reader.aggregated_from_files", _batchBuffer.Count); + manualActivity?.SetTag("cloudfetch.reader.aggregated_file_size_bytes", _currentBatchBufferSize); + manualActivity?.SetTag("cloudfetch.reader.batch_column_count", aggregatedBatch.ColumnCount); + manualActivity?.SetTag("cloudfetch.reader.new_total_batches", _totalBatchesReturned); + manualActivity?.SetTag("cloudfetch.reader.new_total_rows", _totalRowsReturned); + manualActivity?.SetTag("cloudfetch.reader.new_total_bytes", _totalBytesRead); + + manualActivity?.AddEvent($"cloudfetch.reader.aggregated_batch_returned: {aggregatedRowCount:N0} rows from {_batchBuffer.Count} files ({_currentBatchBufferSize:N0} bytes)"); + + // Clear buffer for next aggregation + ClearBatchBuffer(); + + WriteCloudFetchDebug($"CloudFetchReader: Batch aggregation COMPLETED - returning {aggregatedRowCount:N0} rows to consumer, result queue should now start draining faster"); + + // Set activity status + if (manualActivity != null) + { + manualActivity.SetStatus(System.Diagnostics.ActivityStatusCode.Ok); + } + + return aggregatedBatch; + } + } + + // Size limit not reached yet, continue collecting more batches + WriteCloudFetchDebug($"๐Ÿ“ˆ CloudFetchReader: Size limit NOT reached ({_currentBatchBufferSize:N0} < {_batchSizeLimitBytes:N0} bytes), continuing to collect more batches..."); + WriteCloudFetchDebug($"๐Ÿ”„ CloudFetchReader: EXTERNAL CALL #{currentCallNumber} - Will continue internal loop to get NEXT file from queue"); + continue; } else { + WriteCloudFetchDebug($"๐Ÿ’€ CloudFetchReader: INTERNAL LOOP #{internalLoopIteration} - Current stream EXHAUSTED (returned null) - disposing current reader"); + WriteCloudFetchDebug($"๐Ÿ“Š CloudFetchReader: *** File #{filesConsumedThisCall} contained {batchesProcessedThisCall} RecordBatch objects - will need to get NEXT file from result queue ***"); + WriteCloudFetchDebug($"๐Ÿ”„ CloudFetchReader: EXTERNAL CALL #{currentCallNumber} - File #{filesConsumedThisCall} processing complete, buffer_size={_currentBatchBufferSize:N0}, limit={_batchSizeLimitBytes:N0}"); + manualActivity?.AddEvent("cloudfetch.reader.current_stream_exhausted"); + // Clean up the current reader and download result + WriteCloudFetchDebug($"๐Ÿงน CloudFetchReader: INTERNAL LOOP #{internalLoopIteration} - Disposing current reader and download result"); this.currentReader.Dispose(); this.currentReader = null; @@ -115,57 +450,241 @@ public CloudFetchReader( this.currentDownloadResult.Dispose(); this.currentDownloadResult = null; } + + WriteCloudFetchDebug($"โœ… CloudFetchReader: INTERNAL LOOP #{internalLoopIteration} - Cleanup complete, currentReader=NULL, currentDownloadResult=NULL"); + WriteCloudFetchDebug($"๐Ÿค” CloudFetchReader: EXTERNAL CALL #{currentCallNumber} - Should we continue or return? buffer_size={_currentBatchBufferSize:N0} vs limit={_batchSizeLimitBytes:N0}"); + + // CRITICAL: Check if we have accumulated data in buffer AND have reached size limit + if (_batchBuffer.Count > 0 && _currentBatchBufferSize >= _batchSizeLimitBytes) + { + WriteCloudFetchDebug($"๐Ÿ”— CloudFetchReader: Stream exhausted AND size limit reached - we have {_batchBuffer.Count} batches in buffer ({_currentBatchBufferSize:N0} bytes >= {_batchSizeLimitBytes:N0}) - returning aggregated batch"); + + // Create aggregated batch from all buffered batches + var aggregatedBatch = ConcatenateBatches(_batchBuffer); + + if (aggregatedBatch != null) + { + var aggregatedRowCount = aggregatedBatch.Length; + + WriteCloudFetchDebug($"CloudFetchReader: CALL #{currentCallNumber} COMPLETED โœ… | Aggregated {_batchBuffer.Count} remaining files into {aggregatedRowCount:N0} rows from {_currentBatchBufferSize:N0} bytes"); + + // Update counters for the aggregated batch + System.Threading.Interlocked.Increment(ref _totalBatchesReturned); + System.Threading.Interlocked.Add(ref _totalRowsReturned, aggregatedRowCount); + System.Threading.Interlocked.Add(ref _totalBytesRead, _currentBatchBufferSize); + + // Add aggregated batch activity tags + manualActivity?.SetTag("cloudfetch.reader.result", "final_aggregated_batch_returned"); + manualActivity?.SetTag("cloudfetch.reader.aggregated_batch_row_count", aggregatedRowCount); + manualActivity?.SetTag("cloudfetch.reader.aggregated_from_files", _batchBuffer.Count); + manualActivity?.SetTag("cloudfetch.reader.aggregated_file_size_bytes", _currentBatchBufferSize); + manualActivity?.SetTag("cloudfetch.reader.batch_column_count", aggregatedBatch.ColumnCount); + manualActivity?.SetTag("cloudfetch.reader.new_total_batches", _totalBatchesReturned); + manualActivity?.SetTag("cloudfetch.reader.new_total_rows", _totalRowsReturned); + manualActivity?.SetTag("cloudfetch.reader.new_total_bytes", _totalBytesRead); + + manualActivity?.AddEvent($"cloudfetch.reader.final_aggregated_batch_returned: {aggregatedRowCount:N0} rows from {_batchBuffer.Count} files ({_currentBatchBufferSize:N0} bytes)"); + + // Clear buffer for next aggregation + ClearBatchBuffer(); + + // Set activity status + if (manualActivity != null) + { + manualActivity.SetStatus(System.Diagnostics.ActivityStatusCode.Ok); + } + + return aggregatedBatch; + } + else + { + // If concatenation failed, clear buffer and continue + WriteCloudFetchDebug("CloudFetchReader: Failed to concatenate remaining batches, clearing buffer"); + ClearBatchBuffer(); + } + } + else if (_batchBuffer.Count > 0) + { + WriteCloudFetchDebug($"๐Ÿ”„ CloudFetchReader: Stream exhausted but size limit NOT reached ({_currentBatchBufferSize:N0} < {_batchSizeLimitBytes:N0}) - continuing to get NEXT file from queue"); + WriteCloudFetchDebug($"๐Ÿ“Š CloudFetchReader: Current progress: {_batchBuffer.Count} batches, {_currentBatchBufferSize:N0} bytes - need {_batchSizeLimitBytes - _currentBatchBufferSize:N0} more bytes"); + // Continue the loop to get the next file + } + else + { + WriteCloudFetchDebug($"๐Ÿคท CloudFetchReader: Stream exhausted with empty buffer - continuing to get NEXT file from queue"); + // Continue the loop to get the next file + } } } // If we don't have a current reader, get the next downloaded file if (this.downloadManager != null) { + WriteCloudFetchDebug($"๐Ÿ”„ CloudFetchReader: INTERNAL LOOP #{internalLoopIteration} - NO current reader, need to get NEXT file from result queue"); // Start the download manager if it's not already started if (!this.isPrefetchEnabled) { + manualActivity?.SetTag("cloudfetch.reader.error", "prefetch_disabled"); throw new InvalidOperationException("Prefetch must be enabled."); } try { - // Get the next downloaded file + WriteCloudFetchDebug("CloudFetchReader: Requesting next downloaded file from manager"); + manualActivity?.AddEvent("cloudfetch.reader.requesting_next_downloaded_file"); + + // Get the next downloaded file - THIS IS WHERE QUEUE ITEMS ARE CONSUMED FOR BATCH BUILDING + var internalTimestamp = DateTime.UtcNow.ToString("yyyy-MM-dd HH:mm:ss.fff"); + WriteCloudFetchDebug($"๐Ÿ”„ [{internalTimestamp}] CloudFetchReader: EXTERNAL CALL #{currentCallNumber} - INTERNAL LOOP requesting next file from result queue for BATCH AGGREGATION"); this.currentDownloadResult = await this.downloadManager.GetNextDownloadedFileAsync(cancellationToken); + if (this.currentDownloadResult == null) { + WriteCloudFetchDebug($"๐Ÿ“Š CloudFetchReader: *** PROOF: Item was CONSUMED from result queue for batch building immediately (null = EndOfResultsGuard) ***"); + WriteCloudFetchDebug($"CloudFetchReader: CALL #{currentCallNumber} - No more files available (EndOfResultsGuard.Instance was encountered)"); + + // Check if we have any buffered batches to return as final aggregated batch + if (_batchBuffer.Count > 0) + { + WriteCloudFetchDebug($"CloudFetchReader: โšช FINAL BATCH AGGREGATION - Returning final aggregated batch from {_batchBuffer.Count} buffered files ({_currentBatchBufferSize:N0} bytes) - PREVENTS DATA LOSS!"); + WriteCloudFetchDebug($"CloudFetchReader: โšช This handles the case where we have < {_batchSizeLimitBytes / 1024 / 1024}MB remaining data at the end of results"); + + // Return the final aggregated batch + var finalBatch = ConcatenateBatches(_batchBuffer); + if (finalBatch != null) + { + var finalRowCount = finalBatch.Length; + + WriteCloudFetchDebug($"CloudFetchReader: CALL #{currentCallNumber} COMPLETED โœ… | Final aggregated batch: {finalRowCount:N0} rows from {_batchBuffer.Count} files ({_currentBatchBufferSize:N0} bytes)"); + + // Update counters + System.Threading.Interlocked.Increment(ref _totalBatchesReturned); + System.Threading.Interlocked.Add(ref _totalRowsReturned, finalRowCount); + System.Threading.Interlocked.Add(ref _totalBytesRead, _currentBatchBufferSize); // This is now total file sizes + + // Add final batch activity tags + manualActivity?.SetTag("cloudfetch.reader.result", "final_aggregated_batch_returned"); + manualActivity?.SetTag("cloudfetch.reader.final_batch_row_count", finalRowCount); + manualActivity?.SetTag("cloudfetch.reader.final_aggregated_from_files", _batchBuffer.Count); + manualActivity?.SetTag("cloudfetch.reader.final_aggregated_file_size_bytes", _currentBatchBufferSize); + manualActivity?.SetTag("cloudfetch.reader.batch_column_count", finalBatch.ColumnCount); + manualActivity?.SetTag("cloudfetch.reader.new_total_batches", _totalBatchesReturned); + manualActivity?.SetTag("cloudfetch.reader.new_total_rows", _totalRowsReturned); + manualActivity?.SetTag("cloudfetch.reader.new_total_bytes", _totalBytesRead); + + manualActivity?.AddEvent($"cloudfetch.reader.final_batch_returned: {finalRowCount:N0} rows from {_batchBuffer.Count} final files ({_currentBatchBufferSize:N0} bytes)"); + + // Clear buffer + ClearBatchBuffer(); + + // Set activity status + if (manualActivity != null) + { + manualActivity.SetStatus(System.Diagnostics.ActivityStatusCode.Ok); + } + + return finalBatch; + } + } + + WriteCloudFetchDebug($"CloudFetchReader: END OF RESULTS ๐Ÿ"); + WriteCloudFetchDebug($"๐Ÿ† CloudFetchReader: FINAL SUMMARY - Total ReadNextRecordBatchAsync calls: {currentCallNumber}, Total batches returned: {_totalBatchesReturned}, Total rows: {_totalRowsReturned:N0}"); + WriteCloudFetchDebug($"๐Ÿ—ƒ๏ธ CloudFetchReader: FINAL FILE STATISTICS - Total files downloaded: {_totalFilesDownloaded:N0}, Total file data processed: {_totalFileDataProcessed:N0} bytes ({_totalFileDataProcessed / 1024.0 / 1024.0:F1}MB)"); + WriteCloudFetchDebug($"๐Ÿ“Š CloudFetchReader: FILE PROCESSING EFFICIENCY - Average downloaded file size: {(_totalFilesDownloaded > 0 ? _totalFileDataProcessed / _totalFilesDownloaded / 1024.0 / 1024.0 : 0):F1}MB, Total RecordBatch data aggregated: {_totalBytesRead:N0} bytes ({_totalBytesRead / 1024.0 / 1024.0:F1}MB)"); + manualActivity?.SetTag("cloudfetch.reader.result", "no_more_files"); + manualActivity?.AddEvent("cloudfetch.reader.end_of_results"); + manualActivity?.SetTag("cloudfetch.reader.final_call_count", currentCallNumber); + manualActivity?.SetTag("cloudfetch.reader.final_total_batches", _totalBatchesReturned); + manualActivity?.SetTag("cloudfetch.reader.final_total_rows", _totalRowsReturned); + manualActivity?.SetTag("cloudfetch.reader.final_total_bytes", _totalBytesRead); + manualActivity?.SetTag("cloudfetch.reader.final_total_files_downloaded", _totalFilesDownloaded); + manualActivity?.SetTag("cloudfetch.reader.final_total_file_data_processed", _totalFileDataProcessed); + + // Set final activity status + if (manualActivity != null) + { + WriteCloudFetchDebug($"CloudFetchReader: Setting final activity status to OK"); + manualActivity.SetStatus(System.Diagnostics.ActivityStatusCode.Ok); + WriteCloudFetchDebug($"CloudFetchReader: Final activity status: {manualActivity.Status}"); + } + + WriteCloudFetchDebug($"CloudFetchReader: Attempting to trigger OpenTelemetry export by sleeping 100ms..."); + await System.Threading.Tasks.Task.Delay(100, cancellationToken); + this.downloadManager.Dispose(); this.downloadManager = null; // No more files return null; } + filesConsumedThisCall++; + var totalFilesDownloaded = System.Threading.Interlocked.Increment(ref _totalFilesDownloaded); + var totalFileDataProcessed = System.Threading.Interlocked.Add(ref _totalFileDataProcessed, this.currentDownloadResult.Size); + + WriteCloudFetchDebug($"๐ŸŽฏ CloudFetchReader: FILE #{filesConsumedThisCall} RECEIVED for BATCH BUILDING - size={this.currentDownloadResult.Size:N0} bytes ({this.currentDownloadResult.Size / 1024.0 / 1024.0:F1}MB downloaded)"); + WriteCloudFetchDebug($"๐Ÿ“Š CloudFetchReader: *** PROOF: Item #{filesConsumedThisCall} was CONSUMED from result queue for batch building immediately (received non-null item) ***"); + WriteCloudFetchDebug($"๐Ÿ“Š CloudFetchReader: EXTERNAL CALL #{currentCallNumber} statistics so far: files_consumed={filesConsumedThisCall}, batches_processed={batchesProcessedThisCall}, buffer_count={_batchBuffer.Count}"); + WriteCloudFetchDebug($"๐Ÿ—ƒ๏ธ CloudFetchReader: GLOBAL FILE STATISTICS - Total files downloaded: {totalFilesDownloaded:N0}, Total file data processed: {totalFileDataProcessed:N0} bytes ({totalFileDataProcessed / 1024.0 / 1024.0:F1}MB)"); + manualActivity?.AddEvent("cloudfetch.reader.new_download_received"); + manualActivity?.SetTag("cloudfetch.reader.download_size", this.currentDownloadResult.Size); + await this.currentDownloadResult.DownloadCompletedTask; // Create a new reader for the downloaded file try { + manualActivity?.AddEvent("cloudfetch.reader.creating_arrow_stream_reader"); this.currentReader = new ArrowStreamReader(this.currentDownloadResult.DataStream); + manualActivity?.AddEvent("cloudfetch.reader.arrow_stream_reader_created"); continue; } catch (Exception ex) { + manualActivity?.SetTag("cloudfetch.reader.error", "arrow_reader_creation_failed"); + manualActivity?.SetTag("cloudfetch.reader.error_message", ex.Message); + manualActivity?.AddEvent($"cloudfetch.reader.error: Failed to create Arrow reader - {ex.Message}"); + Debug.WriteLine($"Error creating Arrow reader: {ex.Message}"); this.currentDownloadResult.Dispose(); this.currentDownloadResult = null; throw; } } - catch (Exception ex) + catch (Exception ex) when (!(ex is InvalidOperationException && ex.Message.Contains("Arrow reader"))) { + manualActivity?.SetTag("cloudfetch.reader.error", "download_failed"); + manualActivity?.SetTag("cloudfetch.reader.error_message", ex.Message); + manualActivity?.AddEvent($"cloudfetch.reader.error: Failed to get downloaded file - {ex.Message}"); + Debug.WriteLine($"Error getting next downloaded file: {ex.Message}"); throw; } } // If we get here, there are no more files + manualActivity?.SetTag("cloudfetch.reader.result", "no_download_manager"); + manualActivity?.AddEvent("cloudfetch.reader.no_more_results"); return null; } - }); + } + catch (Exception ex) + { + WriteCloudFetchDebug($"CloudFetchReader: Exception in call #{callNumber}: {ex.Message}"); + manualActivity?.SetStatus(System.Diagnostics.ActivityStatusCode.Error); + manualActivity?.SetTag("cloudfetch.reader.exception", ex.Message); + throw; + } + finally + { + if (manualActivity != null) + { + WriteCloudFetchDebug($"CloudFetchReader: Activity completed with status: {manualActivity.Status}"); + if (manualActivity.Status == System.Diagnostics.ActivityStatusCode.Unset) + { + manualActivity.SetStatus(System.Diagnostics.ActivityStatusCode.Ok); + } + } + } } protected override void Dispose(bool disposing) @@ -187,6 +706,13 @@ protected override void Dispose(bool disposing) this.downloadManager.Dispose(); this.downloadManager = null; } + + // Clean up batch buffer + if (disposing) + { + ClearBatchBuffer(); + } + base.Dispose(disposing); } } diff --git a/csharp/src/Drivers/Databricks/Reader/CloudFetch/CloudFetchResultFetcher.cs b/csharp/src/Drivers/Databricks/Reader/CloudFetch/CloudFetchResultFetcher.cs index acbd2589b3..6be82badc8 100644 --- a/csharp/src/Drivers/Databricks/Reader/CloudFetch/CloudFetchResultFetcher.cs +++ b/csharp/src/Drivers/Databricks/Reader/CloudFetch/CloudFetchResultFetcher.cs @@ -19,6 +19,7 @@ using System.Collections.Concurrent; using System.Collections.Generic; using System.Diagnostics; +using System.IO; using System.Linq; using System.Threading; using System.Threading.Tasks; @@ -48,6 +49,27 @@ internal class CloudFetchResultFetcher : ICloudFetchResultFetcher private Task? _fetchTask; private CancellationTokenSource? _cancellationTokenSource; private Exception? _error; + + #region Debug Helpers + + private void WriteCloudFetchDebug(string message) + { + try + { + var debugFile = System.IO.Path.Combine( + Environment.GetFolderPath(Environment.SpecialFolder.LocalApplicationData), + "adbc-cloudfetch-debug.log" + ); + var timestamped = $"[{DateTime.UtcNow:yyyy-MM-dd HH:mm:ss}] CloudFetchResultFetcher: {message}"; + System.IO.File.AppendAllText(debugFile, timestamped + Environment.NewLine); + } + catch + { + // Ignore any debug logging errors to avoid interfering with main functionality + } + } + + #endregion private long _batchSize; /// @@ -96,8 +118,11 @@ public CloudFetchResultFetcher( /// public async Task StartAsync(CancellationToken cancellationToken) { + WriteCloudFetchDebug($"๐ŸŽฌ CloudFetchResultFetcher.StartAsync CALLED - About to start URL fetching process"); + if (_fetchTask != null) { + WriteCloudFetchDebug($"โŒ CloudFetchResultFetcher.StartAsync: Fetcher is already running!"); throw new InvalidOperationException("Fetcher is already running."); } @@ -108,11 +133,17 @@ public async Task StartAsync(CancellationToken cancellationToken) _error = null; _urlsByOffset.Clear(); + WriteCloudFetchDebug($"๐Ÿ”ง CloudFetchResultFetcher.StartAsync: State reset, about to create fetch task"); + _cancellationTokenSource = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken); _fetchTask = FetchResultsAsync(_cancellationTokenSource.Token); + WriteCloudFetchDebug($"โœ… CloudFetchResultFetcher.StartAsync: Fetch task created, yielding control"); + // Wait for the fetch task to start await Task.Yield(); + + WriteCloudFetchDebug($"๐Ÿš€ CloudFetchResultFetcher.StartAsync: COMPLETED - Fetch task should now be running"); } /// @@ -190,7 +221,7 @@ public async Task StopAsync() } } - Trace.TraceWarning($"Failed to fetch URL for offset {offset}"); + System.Diagnostics.Trace.TraceWarning($"Failed to fetch URL for offset {offset}"); return null; } finally @@ -218,6 +249,12 @@ public void ClearCache() private async Task FetchResultsAsync(CancellationToken cancellationToken) { + int totalLinksRetrieved = 0; + int fetchIteration = 0; + + WriteCloudFetchDebug($"๐Ÿš€ CloudFetchResultFetcher.FetchResultsAsync STARTED - Will retrieve cloud file links from server"); + WriteCloudFetchDebug($"๐Ÿ“Š CloudFetchResultFetcher: Initial state - has_more_results={_hasMoreResults}, is_completed={_isCompleted}"); + try { // Process direct results first, if available @@ -225,26 +262,52 @@ private async Task FetchResultsAsync(CancellationToken cancellationToken) && directResults!.ResultSet?.Results?.ResultLinks?.Count > 0) || _initialResults?.Results?.ResultLinks?.Count > 0) { + // Count links from direct results and initial results + int directResultLinks = directResults?.ResultSet?.Results?.ResultLinks?.Count ?? 0; + int initialResultLinks = _initialResults?.Results?.ResultLinks?.Count ?? 0; + int directLinksTotal = directResultLinks + initialResultLinks; + totalLinksRetrieved += directLinksTotal; + + WriteCloudFetchDebug($"๐Ÿ”— DIRECT RESULTS PROCESSING - Found {directLinksTotal} cloud file links (direct: {directResultLinks}, initial: {initialResultLinks})"); + WriteCloudFetchDebug($"๐Ÿ“Š TOTAL LINKS SO FAR: {totalLinksRetrieved}"); + // Yield execution so the download queue doesn't get blocked before downloader is started await Task.Yield(); ProcessDirectResultsAsync(cancellationToken); } // Continue fetching as needed + WriteCloudFetchDebug($"๐Ÿ”„ CloudFetchResultFetcher: About to enter main fetch loop - has_more_results={_hasMoreResults}, cancellation_requested={cancellationToken.IsCancellationRequested}"); while (_hasMoreResults && !cancellationToken.IsCancellationRequested) { + fetchIteration++; + int downloadQueueCountBefore = _downloadQueue.Count; + try { + WriteCloudFetchDebug($"๐Ÿ”„ ITERATION #{fetchIteration} - About to fetch next result batch from server"); + WriteCloudFetchDebug($"๐Ÿ“Š Download queue count before fetch: {downloadQueueCountBefore}"); + // Fetch more results from the server await FetchNextResultBatchAsync(null, cancellationToken).ConfigureAwait(false); + + int downloadQueueCountAfter = _downloadQueue.Count; + int newLinksThisIteration = downloadQueueCountAfter - downloadQueueCountBefore; + totalLinksRetrieved += newLinksThisIteration; + + WriteCloudFetchDebug($"โœ… ITERATION #{fetchIteration} COMPLETED - Added {newLinksThisIteration} new cloud file links"); + WriteCloudFetchDebug($"๐Ÿ“Š Download queue count after fetch: {downloadQueueCountAfter}"); + WriteCloudFetchDebug($"๐Ÿ—ƒ๏ธ TOTAL LINKS RETRIEVED SO FAR: {totalLinksRetrieved} (across {fetchIteration} fetch iterations)"); } catch (OperationCanceledException) when (cancellationToken.IsCancellationRequested) { + WriteCloudFetchDebug($"โŒ ITERATION #{fetchIteration} CANCELLED - Operation was cancelled"); // Expected when cancellation is requested break; } catch (Exception ex) { + WriteCloudFetchDebug($"โŒ ITERATION #{fetchIteration} FAILED - Error fetching results: {ex.Message}"); Debug.WriteLine($"Error fetching results: {ex.Message}"); _error = ex; _hasMoreResults = false; @@ -252,12 +315,29 @@ private async Task FetchResultsAsync(CancellationToken cancellationToken) } } + // Log final summary before adding end guard + WriteCloudFetchDebug($"๐Ÿ FETCH PROCESS COMPLETED"); + WriteCloudFetchDebug($"๐Ÿ† FINAL SUMMARY - Total fetch iterations: {fetchIteration}, Total cloud file links retrieved: {totalLinksRetrieved}"); + if (fetchIteration > 0) + { + double averageLinksPerIteration = (double)totalLinksRetrieved / fetchIteration; + WriteCloudFetchDebug($"๐Ÿ“Š EFFICIENCY - Average links per iteration: {averageLinksPerIteration:F1}"); + } + // Add the end of results guard to the queue + WriteCloudFetchDebug($"๐Ÿ”’ Adding EndOfResultsGuard to download queue"); _downloadQueue.Add(EndOfResultsGuard.Instance, cancellationToken); + + // CRITICAL FIX: Mark download queue as completed after adding EndOfResultsGuard + WriteCloudFetchDebug($"๐Ÿ”’ Marking download queue as COMPLETED - no more URLs will be fetched"); + _downloadQueue.CompleteAdding(); _isCompleted = true; } catch (Exception ex) { + WriteCloudFetchDebug($"๐Ÿ’ฅ UNHANDLED ERROR in fetcher: {ex.Message}"); + WriteCloudFetchDebug($"๐Ÿ† ERROR SUMMARY - Completed {fetchIteration} fetch iterations, Retrieved {totalLinksRetrieved} cloud file links before error"); + Debug.WriteLine($"Unhandled error in fetcher: {ex.Message}"); _error = ex; _hasMoreResults = false; @@ -266,17 +346,32 @@ private async Task FetchResultsAsync(CancellationToken cancellationToken) // Add the end of results guard to the queue even in case of error try { + WriteCloudFetchDebug($"๐Ÿ”’ Adding EndOfResultsGuard to download queue after error"); _downloadQueue.TryAdd(EndOfResultsGuard.Instance, 0); + + WriteCloudFetchDebug($"๐Ÿ”’ Marking download queue as COMPLETED after error"); + _downloadQueue.CompleteAdding(); } - catch (Exception) + catch (Exception completionEx) { - // Ignore any errors when adding the guard in case of error + WriteCloudFetchDebug($"โŒ Failed to add EndOfResultsGuard or complete download queue after error: {completionEx.Message}"); + // Still try to mark the queue as completed + try + { + _downloadQueue.CompleteAdding(); + } + catch (Exception) + { + WriteCloudFetchDebug($"โŒ Failed to complete download queue after error - this may cause hangs"); + } } } } private async Task FetchNextResultBatchAsync(long? offset, CancellationToken cancellationToken) { + WriteCloudFetchDebug($"๐ŸŒ FetchNextResultBatchAsync STARTED - offset={offset}, start_offset={_startOffset}"); + // Create fetch request TFetchResultsReq request = new TFetchResultsReq(_response.OperationHandle!, TFetchOrientation.FETCH_NEXT, _batchSize); @@ -287,6 +382,8 @@ private async Task FetchNextResultBatchAsync(long? offset, CancellationToken can request.StartRowOffset = startOffset; } + WriteCloudFetchDebug($"๐ŸŒ FetchNextResultBatchAsync: About to call Thrift Client.FetchResults - timeout={_statement.QueryTimeoutSeconds}s"); + // Fetch results TFetchResultsResp response; try @@ -296,15 +393,20 @@ private async Task FetchNextResultBatchAsync(long? offset, CancellationToken can using var timeoutTokenSource = new CancellationTokenSource(TimeSpan.FromSeconds(_statement.QueryTimeoutSeconds)); using var combinedTokenSource = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken, timeoutTokenSource.Token); + WriteCloudFetchDebug($"๐ŸŒ FetchNextResultBatchAsync: CALLING Thrift _statement.Client.FetchResults() - THIS MAY HANG"); response = await _statement.Client.FetchResults(request, combinedTokenSource.Token).ConfigureAwait(false); + WriteCloudFetchDebug($"๐ŸŒ FetchNextResultBatchAsync: Thrift Client.FetchResults COMPLETED successfully"); } catch (Exception ex) { + WriteCloudFetchDebug($"โŒ FetchNextResultBatchAsync: Error fetching results from server: {ex.Message}"); Debug.WriteLine($"Error fetching results from server: {ex.Message}"); _hasMoreResults = false; throw; } + WriteCloudFetchDebug($"๐ŸŒ FetchNextResultBatchAsync: Processing response - checking for result links"); + // Check if we have URL-based results if (response.Results.__isset.resultLinks && response.Results.ResultLinks != null && @@ -336,12 +438,16 @@ private async Task FetchNextResultBatchAsync(long? offset, CancellationToken can // Update whether there are more results _hasMoreResults = response.HasMoreRows; + WriteCloudFetchDebug($"โœ… FetchNextResultBatchAsync: Added {resultLinks.Count} result links, has_more_results={_hasMoreResults}"); } else { // No more results _hasMoreResults = false; + WriteCloudFetchDebug($"๐Ÿ FetchNextResultBatchAsync: No more result links found, has_more_results={_hasMoreResults}"); } + + WriteCloudFetchDebug($"๐ŸŒ FetchNextResultBatchAsync COMPLETED - has_more_results={_hasMoreResults}"); } private void ProcessDirectResultsAsync(CancellationToken cancellationToken) diff --git a/csharp/src/Telemetry/Traces/Exporters/FileExporter/FileExporterExtensions.cs b/csharp/src/Telemetry/Traces/Exporters/FileExporter/FileExporterExtensions.cs index b35ca43f33..210dbd1c90 100644 --- a/csharp/src/Telemetry/Traces/Exporters/FileExporter/FileExporterExtensions.cs +++ b/csharp/src/Telemetry/Traces/Exporters/FileExporter/FileExporterExtensions.cs @@ -87,6 +87,7 @@ public static OpenTelemetry.Trace.TracerProviderBuilder AddAdbcFileExporter( if (FileExporter.TryCreate(options, out FileExporter? fileExporter)) { // Only add a new processor if there isn't already one listening for the source/location. + // Use default BatchActivityExportProcessor for now - focus on diagnosing TracerProvider listener issue first return builder.AddProcessor(_ => new BatchActivityExportProcessor(fileExporter!)); } return builder; @@ -129,6 +130,7 @@ public static OpenTelemetry.Trace.TracerProviderBuilder AddAdbcFileExporter( if (FileExporter.TryCreate(fileBaseName, traceLocation, maxTraceFileSizeKb.Value, maxTraceFiles.Value, out FileExporter? fileExporter)) { // Only add a new processor if there isn't already one listening for the source/location. + // Use default BatchActivityExportProcessor for now - focus on diagnosing TracerProvider listener issue first return builder.AddProcessor(_ => new BatchActivityExportProcessor(fileExporter!)); } return builder; diff --git a/csharp/src/Telemetry/Traces/Exporters/readme.md b/csharp/src/Telemetry/Traces/Exporters/readme.md index 8b743c7e71..21384cb6ba 100644 --- a/csharp/src/Telemetry/Traces/Exporters/readme.md +++ b/csharp/src/Telemetry/Traces/Exporters/readme.md @@ -49,6 +49,6 @@ The following exporters are supported: | Exporter | Description | | --- | --- | | `otlp` | Exports traces to an OpenTelemetry Collector or directly to an Open Telemetry Line Protocol (OTLP) endpoint. | -| `file` | Exports traces to rotating files in a folder. | +| `adbcfile` | Exports traces to rotating files in a folder. | | `console` | Exports traces to the console output. | | `none` | Disables trace exporting. | diff --git a/csharp/test/Drivers/Databricks/E2E/CloudFetchE2ETest.cs b/csharp/test/Drivers/Databricks/E2E/CloudFetchE2ETest.cs index 31bd2e87df..7530c957e8 100644 --- a/csharp/test/Drivers/Databricks/E2E/CloudFetchE2ETest.cs +++ b/csharp/test/Drivers/Databricks/E2E/CloudFetchE2ETest.cs @@ -46,7 +46,7 @@ public static IEnumerable TestCases() yield return new object[] { smallQuery, 1000, true, false }; yield return new object[] { smallQuery, 1000, false, false }; - string largeQuery = $"SELECT * FROM main.tpcds_sf10_delta.catalog_sales LIMIT 1000000"; + string largeQuery = $"SELECT * FROM main.tpcds_sf10_delta.catalog_sales"; yield return new object[] { largeQuery, 1000000, true, true }; yield return new object[] { largeQuery, 1000000, false, true }; yield return new object[] { largeQuery, 1000000, true, false };