Skip to content

Commit 2da0e52

Browse files
committed
Set up ducklake catalog in dev environment
1 parent 724ecc9 commit 2da0e52

File tree

9 files changed

+280
-7
lines changed

9 files changed

+280
-7
lines changed

bin/check_ducklake_up

Lines changed: 53 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,53 @@
1+
#!/usr/bin/env python3
2+
3+
from __future__ import annotations
4+
5+
import time
6+
7+
import duckdb
8+
9+
from ducklake_common import (
10+
attach_catalog,
11+
configure_connection,
12+
get_config,
13+
initialize_ducklake,
14+
run_smoke_check,
15+
)
16+
17+
HEALTHCHECK_ALIAS = "ducklake_dev_health"
18+
19+
20+
def check_once(config: dict[str, str]) -> None:
21+
conn = duckdb.connect()
22+
try:
23+
configure_connection(conn, config, install_extension=False)
24+
attach_catalog(conn, config, alias=HEALTHCHECK_ALIAS)
25+
run_smoke_check(conn, alias=HEALTHCHECK_ALIAS)
26+
finally:
27+
conn.close()
28+
29+
30+
def main() -> int:
31+
while True:
32+
config = get_config()
33+
try:
34+
check_once(config)
35+
except KeyboardInterrupt:
36+
raise
37+
except Exception as exc: # noqa: BLE001 - any failure should trigger a retry
38+
print(f"Awaiting DuckLake warmup... ({exc})")
39+
try:
40+
initialized = initialize_ducklake(config, alias="ducklake_dev")
41+
if initialized:
42+
print("Reinitialized DuckLake catalog, retrying health check...")
43+
except Exception as setup_exc: # noqa: BLE001
44+
print(f"DuckLake initialization attempt failed ({setup_exc})")
45+
time.sleep(1)
46+
continue
47+
48+
print("DuckLake is up!")
49+
return 0
50+
51+
52+
if __name__ == "__main__":
53+
raise SystemExit(main())

bin/ducklake_common.py

Lines changed: 162 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,162 @@
1+
#!/usr/bin/env python3
2+
3+
from __future__ import annotations
4+
5+
import os
6+
from urllib.parse import urlparse
7+
8+
import duckdb
9+
import psycopg
10+
from psycopg import sql
11+
12+
DEFAULTS: dict[str, str] = {
13+
"DUCKLAKE_CATALOG_DSN": "ducklake:postgres:dbname=ducklake_catalog host=localhost user=posthog password=posthog",
14+
"DUCKLAKE_DATA_BUCKET": "ducklake-dev",
15+
"DUCKLAKE_DATA_ENDPOINT": "http://localhost:19000",
16+
"DUCKLAKE_S3_ACCESS_KEY": "object_storage_root_user",
17+
"DUCKLAKE_S3_SECRET_KEY": "object_storage_root_password",
18+
}
19+
20+
21+
def get_config() -> dict[str, str]:
22+
return {key: os.environ.get(key, default) or default for key, default in DEFAULTS.items()}
23+
24+
25+
def escape(value: str) -> str:
26+
return value.replace("'", "''")
27+
28+
29+
def normalize_endpoint(raw_endpoint: str) -> tuple[str, bool]:
30+
value = (raw_endpoint or "").strip()
31+
if not value:
32+
value = DEFAULTS["DUCKLAKE_DATA_ENDPOINT"]
33+
34+
if "://" in value:
35+
parsed = urlparse(value)
36+
endpoint = parsed.netloc or parsed.path
37+
use_ssl = parsed.scheme.lower() == "https"
38+
else:
39+
endpoint = value
40+
use_ssl = False
41+
42+
endpoint = endpoint.rstrip("/") or "localhost:19000"
43+
return endpoint, use_ssl
44+
45+
46+
def configure_connection(
47+
conn: duckdb.DuckDBPyConnection,
48+
config: dict[str, str],
49+
*,
50+
install_extension: bool,
51+
) -> None:
52+
if install_extension:
53+
conn.sql("INSTALL ducklake")
54+
conn.sql("LOAD ducklake")
55+
56+
endpoint, use_ssl = normalize_endpoint(config["DUCKLAKE_DATA_ENDPOINT"])
57+
conn.sql(f"SET s3_endpoint='{escape(endpoint)}'")
58+
conn.sql(f"SET s3_use_ssl={'true' if use_ssl else 'false'}")
59+
conn.sql(f"SET s3_access_key_id='{escape(config['DUCKLAKE_S3_ACCESS_KEY'])}'")
60+
conn.sql(f"SET s3_secret_access_key='{escape(config['DUCKLAKE_S3_SECRET_KEY'])}'")
61+
62+
63+
def attach_catalog(
64+
conn: duckdb.DuckDBPyConnection,
65+
config: dict[str, str],
66+
*,
67+
alias: str = "ducklake_dev",
68+
) -> None:
69+
if not alias.replace("_", "a").isalnum():
70+
raise ValueError(f"Unsupported DuckLake alias '{alias}'")
71+
72+
data_path = f"s3://{config['DUCKLAKE_DATA_BUCKET'].rstrip('/')}/"
73+
conn.sql(f"ATTACH '{escape(config['DUCKLAKE_CATALOG_DSN'])}' " f"AS {alias} (DATA_PATH '{escape(data_path)}')")
74+
75+
76+
def run_smoke_check(conn: duckdb.DuckDBPyConnection, *, alias: str = "ducklake_dev") -> None:
77+
conn.sql(f"SHOW TABLES FROM {alias}")
78+
79+
80+
def _strip_postgres_prefix(raw_dsn: str) -> str:
81+
for prefix in ("ducklake:postgres:", "postgres:"):
82+
if raw_dsn.startswith(prefix):
83+
return raw_dsn[len(prefix) :]
84+
return raw_dsn
85+
86+
87+
def parse_postgres_dsn(raw_dsn: str) -> dict[str, str]:
88+
cleaned = _strip_postgres_prefix(raw_dsn or "")
89+
params: dict[str, str] = {}
90+
for chunk in cleaned.split():
91+
if "=" not in chunk:
92+
continue
93+
key, value = chunk.split("=", 1)
94+
params[key] = value.strip("'\"")
95+
return params
96+
97+
98+
def ensure_ducklake_catalog(config: dict[str, str]) -> None:
99+
params = parse_postgres_dsn(config["DUCKLAKE_CATALOG_DSN"])
100+
target_db = params.get("dbname")
101+
if not target_db:
102+
raise ValueError("DUCKLAKE_CATALOG_DSN must include a dbname value")
103+
104+
conn_kwargs = {
105+
"dbname": params.get("maintenance_db") or "postgres",
106+
"host": params.get("host", "localhost"),
107+
"port": int(params.get("port", "5432")),
108+
"user": params.get("user", "posthog"),
109+
"password": params.get("password", "posthog"),
110+
"autocommit": True,
111+
}
112+
113+
try:
114+
with psycopg.connect(**conn_kwargs) as conn:
115+
with conn.cursor() as cur:
116+
cur.execute("SELECT 1 FROM pg_database WHERE datname = %s", (target_db,))
117+
if cur.fetchone():
118+
return
119+
120+
cur.execute(sql.SQL("CREATE DATABASE {}").format(sql.Identifier(target_db)))
121+
owner = params.get("user")
122+
if owner:
123+
cur.execute(
124+
sql.SQL("GRANT ALL PRIVILEGES ON DATABASE {} TO {}").format(
125+
sql.Identifier(target_db),
126+
sql.Identifier(owner),
127+
)
128+
)
129+
except psycopg.OperationalError as exc: # pragma: no cover - depends on PG state
130+
raise RuntimeError("Unable to ensure DuckLake catalog exists. Is Postgres running and accessible?") from exc
131+
132+
133+
def initialize_ducklake(config: dict[str, str], *, alias: str = "ducklake_dev") -> bool:
134+
conn = duckdb.connect()
135+
try:
136+
ensure_ducklake_catalog(config)
137+
configure_connection(conn, config, install_extension=True)
138+
try:
139+
attach_catalog(conn, config, alias=alias)
140+
attached = True
141+
except duckdb.CatalogException as exc:
142+
if alias in str(exc):
143+
attached = False
144+
else:
145+
raise
146+
run_smoke_check(conn, alias=alias)
147+
return attached
148+
finally:
149+
conn.close()
150+
151+
152+
__all__ = [
153+
"attach_catalog",
154+
"configure_connection",
155+
"escape",
156+
"get_config",
157+
"ensure_ducklake_catalog",
158+
"initialize_ducklake",
159+
"normalize_endpoint",
160+
"parse_postgres_dsn",
161+
"run_smoke_check",
162+
]

bin/mprocs.yaml

Lines changed: 10 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,15 +1,15 @@
11
procs:
22
backend:
3-
shell: 'uv sync --active && bin/check_postgres_up && bin/check_kafka_clickhouse_up && ./bin/start-backend'
3+
shell: 'uv sync --active && bin/check_postgres_up && bin/check_kafka_clickhouse_up && bin/check_ducklake_up && ./bin/start-backend'
44

55
celery-worker:
6-
shell: 'uv sync --active && bin/check_postgres_up && bin/check_kafka_clickhouse_up && ./bin/start-celery worker'
6+
shell: 'uv sync --active && bin/check_postgres_up && bin/check_kafka_clickhouse_up && bin/check_ducklake_up && ./bin/start-celery worker'
77

88
celery-beat:
9-
shell: 'uv sync --active && bin/check_postgres_up && bin/check_kafka_clickhouse_up && ./bin/start-celery beat'
9+
shell: 'uv sync --active && bin/check_postgres_up && bin/check_kafka_clickhouse_up && bin/check_ducklake_up && ./bin/start-celery beat'
1010

1111
plugin-server:
12-
shell: 'bin/check_postgres_up && bin/check_kafka_clickhouse_up && ./bin/plugin-server'
12+
shell: 'bin/check_postgres_up && bin/check_kafka_clickhouse_up && bin/check_ducklake_up && ./bin/plugin-server'
1313

1414
frontend:
1515
shell: './bin/start-frontend'
@@ -20,6 +20,7 @@ procs:
2020
bin/check_kafka_clickhouse_up && \
2121
bin/check_temporal_up && \
2222
bin/check_video_deps && \
23+
bin/check_ducklake_up && \
2324
if [ -n "$TEMPORAL_WATCH" ]; then \
2425
nodemon -w common -w dags -w ee -w posthog -w products -w pyproject.toml -e py --signal SIGTERM --exec "python manage.py start_temporal_worker --task-queue development-task-queue"; \
2526
else \
@@ -30,6 +31,7 @@ procs:
3031
shell: |-
3132
bin/check_postgres_up && \
3233
bin/check_kafka_clickhouse_up && \
34+
bin/check_ducklake_up && \
3335
dagster dev --workspace $DAGSTER_HOME/workspace.yaml -p $DAGSTER_UI_PORT
3436
3537
docker-compose:
@@ -84,11 +86,11 @@ procs:
8486
8587
# This takes ~10 s
8688
migrate-postgres:
87-
shell: 'bin/check_postgres_up && python manage.py migrate'
89+
shell: 'bin/check_postgres_up && bin/check_ducklake_up && python manage.py migrate'
8890

8991
# This takes ~10 s too
9092
migrate-clickhouse:
91-
shell: 'bin/check_kafka_clickhouse_up && python manage.py migrate_clickhouse'
93+
shell: 'bin/check_kafka_clickhouse_up && bin/check_ducklake_up && python manage.py migrate_clickhouse'
9294

9395
migrate-persons-db:
9496
shell: 'bin/check_postgres_up posthog_persons && cd rust && DATABASE_URL=postgres://posthog:posthog@localhost:5432/posthog_persons sqlx migrate run --source persons_migrations'
@@ -101,12 +103,14 @@ procs:
101103
bin/check_postgres_up && \
102104
bin/check_kafka_clickhouse_up && \
103105
bin/check_dagster_graphql_up && \
106+
bin/check_ducklake_up && \
104107
./manage.py generate_demo_data
105108
autostart: false
106109

107110
sync-feature-flags:
108111
shell: |-
109112
bin/check_postgres_up && \
113+
bin/check_ducklake_up && \
110114
./manage.py sync_feature_flags
111115
autostart: false
112116

bin/start

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,23 @@ if [ -f .env ]; then
4646
set +o allexport
4747
fi
4848

49+
# Ducklake
50+
export DUCKLAKE_CATALOG_DSN=${DUCKLAKE_CATALOG_DSN:-"ducklake:postgres:dbname=ducklake_catalog host=localhost user=posthog password=posthog"}
51+
export DUCKLAKE_DATA_BUCKET=${DUCKLAKE_DATA_BUCKET:-ducklake-dev}
52+
export DUCKLAKE_DATA_ENDPOINT=${DUCKLAKE_DATA_ENDPOINT:-"http://localhost:19000"}
53+
export DUCKLAKE_S3_ACCESS_KEY=${DUCKLAKE_S3_ACCESS_KEY:-object_storage_root_user}
54+
export DUCKLAKE_S3_SECRET_KEY=${DUCKLAKE_S3_SECRET_KEY:-object_storage_root_password}
55+
56+
if ! python3 - <<'PY' >/dev/null 2>&1
57+
import duckdb # noqa: F401
58+
PY
59+
then
60+
echo "DuckLake requires the duckdb Python package."
61+
echo "Run 'uv sync --active' (or activate Flox) before executing hogli start."
62+
exit 1
63+
fi
64+
65+
# Geo files
4966
./bin/download-mmdb
5067

5168
if ! command -v mprocs &>/dev/null; then

common/hogli/manifest.yaml

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -453,6 +453,10 @@ tools:
453453
bin_script: sync-storage
454454
description: 'TODO: add description for sync-storage'
455455
hidden: true
456+
tool:check_ducklake_up:
457+
bin_script: check_ducklake_up
458+
description: Healthcheck for Ducklake with self-healing
459+
hidden: true
456460
utilities:
457461
utilities:posthog-worktree:
458462
bin_script: posthog-worktree

docker-compose.base.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -170,7 +170,7 @@ services:
170170
MINIO_ROOT_USER: object_storage_root_user
171171
MINIO_ROOT_PASSWORD: object_storage_root_password
172172
entrypoint: sh
173-
command: -c 'mkdir -p /data/posthog && minio server --address ":19000" --console-address ":19001" /data' # create the 'posthog' bucket before starting the service
173+
command: -c 'mkdir -p /data/posthog /data/ducklake-dev && minio server --address ":19000" --console-address ":19001" /data' # create the 'posthog' and 'ducklake-dev' buckets before starting the service
174174

175175
maildev:
176176
image: maildev/maildev:2.0.5
Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,16 @@
1+
#!/bin/bash
2+
3+
set -e
4+
set -u
5+
6+
echo "Checking if database 'ducklake_catalog' exists..."
7+
DB_EXISTS=$(psql -U "$POSTGRES_USER" -tAc "SELECT 1 FROM pg_database WHERE datname='ducklake_catalog'")
8+
9+
if [ -z "$DB_EXISTS" ]; then
10+
echo "Creating database 'ducklake_catalog'..."
11+
psql -U "$POSTGRES_USER" -c "CREATE DATABASE ducklake_catalog;"
12+
psql -U "$POSTGRES_USER" -c "GRANT ALL PRIVILEGES ON DATABASE ducklake_catalog TO $POSTGRES_USER;"
13+
echo "Database 'ducklake_catalog' created successfully"
14+
else
15+
echo "Database 'ducklake_catalog' already exists"
16+
fi

pyproject.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -179,6 +179,7 @@ dependencies = [
179179
"cachetools>=5.5.2",
180180
"textcase>=0.4.5",
181181
"gitpython>=3.1.44",
182+
"duckdb>=1.3.0",
182183
]
183184

184185
[dependency-groups]

uv.lock

Lines changed: 16 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

0 commit comments

Comments
 (0)