-
Notifications
You must be signed in to change notification settings - Fork 4
Open
Description
When calling Cache.async_fetch with the same url multiple times concurrently, the coroutines step on each other while writing to a temporary file. The symptom is a FileNotFoundError thrown during os.replace():
| os.replace(tmp, output_file)
| FileNotFoundError: [Errno 2] No such file or directory: ...
Looks like this happens because the temp file gets shared between coroutines.
I think there needs to be either a locking mechanism in there, or a safer way to make temporary filenames.
Repro script:
import asyncio
import logging
from pathlib import Path
from honesty.cache import Cache
async def main() -> None:
"""
Fetch the same Python wheel ten times in parallel using honesty's cache.async_fetch function.
This is a torture test to stress test the honesty cache system.
"""
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
# Use libcst wheel as an example
package_name = "libcst"
wheel_url = "https://files.pythonhosted.org/packages/b7/31/39c110eb66d5fd7cc4891cf55192a358a6be8b8f6ac0e2eb709850104456/libcst-1.8.0-cp313-cp313t-manylinux_2_28_armv7l.manylinux_2_31_armv7l.whl"
logger.info(f"Starting parallel fetch of {package_name} wheel from {wheel_url}")
# Create a cache with proxy settings similar to the example
async with Cache(
fresh_index=True,
) as cache:
# Use task group to run 10 fetch tasks in parallel
tasks = []
async with asyncio.TaskGroup() as tg:
for i in range(10):
task = tg.create_task(fetch_wheel(cache, package_name, wheel_url, i))
tasks.append(task)
# Collect results after all tasks have completed
results = [task.result() for task in tasks]
# Check if all fetches were successful
successful = all(result is not None for result in results)
logger.info(f"All fetches {'successful' if successful else 'failed'}")
async def fetch_wheel(
cache: Cache, package_name: str, wheel_url: str, index: int
) -> Path:
"""Fetch a wheel and return the path to the downloaded file."""
logger = logging.getLogger(__name__)
logger.info(f"Starting fetch {index} for {package_name}")
try:
wheel_file = await cache.async_fetch(package_name, wheel_url)
logger.info(f"Fetch {index} completed: {wheel_file}")
return wheel_file
except Exception as e:
logger.error(f"Fetch {index} failed: {e}")
raise
if __name__ == "__main__":
asyncio.run(main())Metadata
Metadata
Assignees
Labels
No labels