Skip to content
Merged
Show file tree
Hide file tree
Changes from 10 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 5 additions & 5 deletions .github/workflows/python.yml
Original file line number Diff line number Diff line change
Expand Up @@ -30,17 +30,17 @@ jobs:
with:
python-version: ${{ matrix.python-version }}
- name: setup poetry
uses: abatilo/actions-poetry@v2.0.0
uses: abatilo/actions-poetry@v3.0.2
with:
poetry-version: 1.4.2
poetry-version: 2.1.3
- name: install dependencies
run: make --silent install
- name: lint
run: make --silent pre-commit
test:
strategy:
matrix:
python-version: [ "3.7", "3.8", "3.9", "3.10"]
python-version: [ "3.10"]
os-name: [ ubuntu-22.04 ]
runs-on: ${{ matrix.os-name }}
steps:
Expand All @@ -49,9 +49,9 @@ jobs:
with:
python-version: ${{ matrix.python-version }}
- name: setup poetry
uses: abatilo/actions-poetry@v2.1.6
uses: abatilo/actions-poetry@v3.0.2
with:
poetry-version: 1.4.2
poetry-version: 2.1.3
- name: install dependencies
run: make --silent install
- name: test
Expand Down
2 changes: 1 addition & 1 deletion pramen-py/.env.ci
Original file line number Diff line number Diff line change
Expand Up @@ -34,5 +34,5 @@ export PRAMENPY_SPARK_CONFIG='spark.master=local,
spark.sql.catalog.spark_catalog=org.apache.spark.sql.delta.catalog.DeltaCatalog,
spark.sql.extensions=io.delta.sql.DeltaSparkSessionExtension,
spark.sql.session.timeZone=Africa/Johannesburg,
spark.jars.packages=io.delta:delta-core_2.12:1.0.1,
spark.jars.packages=io.delta:delta-spark_2.12:3.3.1,
spark.jars.repositories=https://maven-central.storage-download.googleapis.com/maven2/'
2 changes: 1 addition & 1 deletion pramen-py/.env.local
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,6 @@ export PRAMENPY_SPARK_CONFIG='spark.master=local,
spark.sql.catalog.spark_catalog=org.apache.spark.sql.delta.catalog.DeltaCatalog,
spark.sql.extensions=io.delta.sql.DeltaSparkSessionExtension,
spark.sql.session.timeZone=Africa/Johannesburg,
spark.jars.packages=io.delta:delta-core_2.12:1.0.1,
spark.jars.packages=io.delta:delta-spark_2.12:3.3.1,
spark.jars.repositories=https://maven-central.storage-download.googleapis.com/maven2/,
spark.sql.shuffle.partitions=1'
2 changes: 1 addition & 1 deletion pramen-py/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ endif
install: .poetry_installed

.poetry_installed:
poetry install
poetry install --with test
touch $@

.env:
Expand Down
680 changes: 282 additions & 398 deletions pramen-py/poetry.lock

Large diffs are not rendered by default.

80 changes: 42 additions & 38 deletions pramen-py/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@

[tool.black]
line-length = 79
target-version = ["py36"]
target-version = ["py310"]

[tool.isort]
atomic = true
Expand Down Expand Up @@ -84,65 +84,69 @@ minversion = "6.0.0"
addopts = "-vv -s --tb=native -n auto"
testpaths = "tests/"

[tool.poetry]
[project]
name = "pramen-py"
version = "1.11.1"
description = "Pramen transformations written in python"
authors = [
"Artem Zhukov <[email protected]>",
"Valerii Khalimendik <[email protected]>",
"Jiří Filip <[email protected]>"
{ name = "Artem Zhukov", email = "[email protected]" },
{ name = "Valerii Khalimendik", email = "[email protected]" },
{ name = "Jiří Filip", email = "[email protected]" }
]
maintainers = [
"Artem Zhukov <[email protected]>",
"Valerii Khalimendik <[email protected]>",
"Jiří Filip <[email protected]>"
{ name = "Artem Zhukov", email = "[email protected]" },
{ name = "Valerii Khalimendik", email = "[email protected]" },
{ name = "Jiří Filip", email = "[email protected]" }
]
readme = "README.md"
repository = "https://github.com/AbsaOSS/pramen"
packages = [
{ include = "pramen_py", from = "src" },
{ include = "pramen_py/py.typed", from = "src" },
{ include = "transformations" },
]
keywords = ["paramen", "pyspark", "transformations", "metastore"]
include = [
".env.example",
"tests/resources/real_config.yaml",
]

[tool.poetry.plugins]
[tool.poetry.plugins."pytest11"]
"pramen_py" = "pramen_py.test_utils.plugin"
requires-python = ">=3.9,<4.0"

dependencies = [
"click>=8.0.3",
"attrs>=21.4.0",
"rich>=11.1.0",
"contextvars>=2.4",
"environs>=9.5.0",
"PyYAML>=6.0",
"cattrs>=1.0.0; python_version<'3.7'",
"cattrs>=22.1.0; python_version>='3.7'",
"pyspark==3.5.7",
"loguru>=0.6.0",
"typing-extensions>=4.1.1",
"pyhocon>=0.3.59"
]

[tool.poetry.scripts]
[project.entry-points.pytest11]
pramen_py = "pramen_py.test_utils.plugin"

[project.scripts]
pramen-py = "pramen_py.app.cli:main"

[tool.poetry.dependencies]
python = ">=3.6.8,<4.0"
click = "^8.0.3"
attrs = "^21.4.0"
rich = "^11.1.0"
contextvars = "^2.4"
environs = "^9.5.0"
PyYAML = "^6.0"
cattrs = [
{version="1.0.0", python = ">=3.6.8,<3.7"},
{version="^22.1.0", python = ">=3.7,<4.0"},
[tool.poetry]
packages = [
{ include = "pramen_py", from = "src" },
{ include = "pramen_py/py.typed", from = "src" },
{ include = "transformations" },
]
pyspark = "3.1.3"
loguru = "^0.6.0"
pytest = "6.2.5"
pytest-asyncio = "0.16"
pytest-cov = "2.12.1"

[tool.poetry.group.test.dependencies]
pytest = "^6.2.5"
pytest-asyncio = "^0.16"
pytest-cov = "^2.12.1"
types-PyYAML = "^6.0.4"
chispa = "^0.9.2"
pytest-sugar = "^0.9.4"
pytest-mock = "3.6.1"
typing-extensions = "^4.1.1"
pyhocon = "^0.3.59"
pytest-mock = "^3.6.1"
setuptools = "^67.6.1"

[tool.poetry.dev-dependencies]
[tool.poetry.group.dev.dependencies]
neovim = "^0.3.1"
pdbpp = "^0.10.3"
isort = "^5.10.1"
Expand All @@ -154,5 +158,5 @@ pytest-xdist = "^2.5.0"
pyspark-stubs = "2.3.0.post2"

[build-system]
requires = ["poetry>=1.2.0"]
requires = ["poetry>=2.1.3"]
build-backend = "poetry.masonry.api"
11 changes: 9 additions & 2 deletions pramen-py/src/pramen_py/utils/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,14 @@ def wrapper(
*args: object,
**kwargs: object,
) -> OPS_RET:
loop = asyncio.get_event_loop()
try:
# Try to get the running event loop (Python 3.7+)
loop = asyncio.get_running_loop()
except RuntimeError:
# No running loop, create a new one
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)

return loop.run_until_complete(f(*args, **kwargs))
Comment on lines +124 to 132
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🔴 Critical

Critical: run_until_complete() will fail if a loop is already running.

If asyncio.get_running_loop() succeeds (line 126), it means the event loop is already running. Calling loop.run_until_complete() (line 132) on an already-running loop raises RuntimeError: This event loop is already running. This breaks the decorator when called from async contexts.

The recommended fix depends on the intended use case:

Option 1: Prohibit calling from async contexts (simpler, matches typical sync-wrapper semantics):

     @wraps(f)
     def wrapper(
         *args: object,
         **kwargs: object,
     ) -> OPS_RET:
-        try:
-            # Try to get the running event loop (Python 3.7+)
-            loop = asyncio.get_running_loop()
-        except RuntimeError:
-            # No running loop, create a new one
-            loop = asyncio.new_event_loop()
-            asyncio.set_event_loop(loop)
-
+        # Ensure we're not inside a running loop
+        try:
+            asyncio.get_running_loop()
+            raise RuntimeError(
+                f"{f.__name__} cannot be called from an async context. "
+                "Call the async function directly instead."
+            )
+        except RuntimeError:
+            pass  # No running loop, proceed
+        
+        loop = asyncio.new_event_loop()
+        asyncio.set_event_loop(loop)
+        try:
-        return loop.run_until_complete(f(*args, **kwargs))
+            return loop.run_until_complete(f(*args, **kwargs))
+        finally:
+            loop.close()

Option 2: Support both sync and async contexts (more complex):

     @wraps(f)
     def wrapper(
         *args: object,
         **kwargs: object,
     ) -> OPS_RET:
         try:
-            # Try to get the running event loop (Python 3.7+)
             loop = asyncio.get_running_loop()
+            # Already in async context, schedule as task
+            import asyncio
+            task = asyncio.create_task(f(*args, **kwargs))
+            # This path should ideally return a coroutine, but the signature
+            # doesn't allow it. Consider redesigning if this path is needed.
+            raise RuntimeError(
+                f"{f.__name__} cannot be called from an async context. "
+                "Call the async function directly."
+            )
         except RuntimeError:
             # No running loop, create a new one
             loop = asyncio.new_event_loop()
             asyncio.set_event_loop(loop)
-
-        return loop.run_until_complete(f(*args, **kwargs))
+            try:
+                return loop.run_until_complete(f(*args, **kwargs))
+            finally:
+                loop.close()

Recommendation: Use Option 1 unless there's a specific requirement to support calls from async contexts. Also consider adding loop.close() in a finally block to prevent resource leaks.

Committable suggestion skipped: line range outside the PR's diff.

🤖 Prompt for AI Agents
In pramen-py/src/pramen_py/utils/__init__.py around lines 124 to 132, the
decorator calls loop.run_until_complete() even when asyncio.get_running_loop()
succeeds which raises RuntimeError in async contexts; change behavior to
explicitly prohibit use from an already-running event loop by detecting a
running loop and raising a clear RuntimeError (or a custom exception)
instructing callers to not use the decorator from async code, and when you
create a new event loop wrap run_until_complete in try/finally to close the loop
(and restore/set_event_loop as needed) to avoid resource leaks.


return wrapper
Expand Down Expand Up @@ -151,7 +158,7 @@ def get_or_create_spark_session(
if force_recreate:
logger.info("Force recreating a spark session")
spark = (
SparkSession.getActiveSession()
SparkSession.getActiveSession() # type: ignore
if callable(getattr(SparkSession, "getActiveSession", None))
else SparkSession.getOrCreate() # type: ignore
)
Expand Down
Loading