Skip to content

Commit ed72668

Browse files
committed
hoist mz_now to module level
1 parent 0dd4aff commit ed72668

File tree

9 files changed

+139
-122
lines changed

9 files changed

+139
-122
lines changed

docs/backends/materialize.qmd

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -333,13 +333,15 @@ for comprehensive recommendations.
333333
Materialize provides `mz_now()` for streaming temporal queries:
334334

335335
```python
336+
from ibis.backends.materialize.api import mz_now
337+
336338
# Get Materialize's logical timestamp
337-
current_time = con.mz_now()
339+
current_time = mz_now()
338340

339341
# Filter for recent events (idiomatic pattern)
340342
# Move operations to the right side of the comparison
341343
recent = events.filter(
342-
con.mz_now() > events.created_at + ibis.interval(days=1)
344+
mz_now() > events.created_at + ibis.interval(days=1)
343345
)
344346
```
345347

ibis/backends/materialize/__init__.py

Lines changed: 3 additions & 62 deletions
Original file line numberDiff line numberDiff line change
@@ -10,10 +10,12 @@
1010
import ibis
1111
import ibis.expr.operations as ops
1212
import ibis.expr.types as ir
13-
from ibis.backends.materialize import operations as mz_ops
13+
from ibis.backends.materialize.api import mz_now
1414
from ibis.backends.postgres import Backend as PostgresBackend
1515
from ibis.backends.sql.compilers.materialize import MaterializeCompiler
1616

17+
__all__ = ("Backend", "mz_now")
18+
1719

1820
class Backend(PostgresBackend):
1921
"""Materialize backend for Ibis.
@@ -215,67 +217,6 @@ def set_cluster(self, name: str) -> None:
215217
quoted_name = sg.to_identifier(name, quoted=True).sql(self.dialect)
216218
cur.execute(f"SET cluster = {quoted_name}")
217219

218-
def mz_now(self) -> ir.TimestampScalar:
219-
"""Return the logical timestamp in Materialize.
220-
221-
This returns Materialize's `mz_now()` function, which provides the logical
222-
time at which the query was executed. This is different from `ibis.now()`
223-
(PostgreSQL's `now()`) which returns the system clock time.
224-
225-
Key differences from `now()`:
226-
- Returns logical timestamp (for streaming/incremental computation)
227-
- Can be used in temporal filters in materialized views
228-
- Value represents query execution time in Materialize's consistency model
229-
230-
Returns
231-
-------
232-
TimestampScalar
233-
An expression representing Materialize's logical timestamp
234-
235-
Examples
236-
--------
237-
>>> import ibis
238-
>>> con = ibis.materialize.connect()
239-
>>> # Get the current logical timestamp
240-
>>> con.mz_now()
241-
242-
Use in temporal filters (e.g., last 30 seconds of data):
243-
244-
>>> events = con.table("events")
245-
>>> # Best practice: Isolate mz_now() on one side of comparison
246-
>>> recent = events.filter(con.mz_now() > events.event_ts + ibis.interval(seconds=30))
247-
248-
Compare with regular now():
249-
250-
>>> # System clock time (wall clock)
251-
>>> ibis.now()
252-
>>> # Logical timestamp (streaming time)
253-
>>> con.mz_now()
254-
255-
See Also
256-
--------
257-
ibis.now : PostgreSQL's now() function (system clock time)
258-
259-
Notes
260-
-----
261-
mz_now() is fundamental to Materialize's streaming SQL model and is used
262-
for temporal filters in materialized views to enable incremental computation.
263-
264-
**Best Practice**: When using mz_now() in temporal filters, isolate it on one
265-
side of the comparison for optimal incremental computation:
266-
267-
- ✅ Good: `mz_now() > created_at + INTERVAL '1 day'`
268-
- ❌ Bad: `mz_now() - created_at > INTERVAL '1 day'`
269-
270-
This pattern enables Materialize to efficiently compute incremental updates
271-
without reprocessing the entire dataset.
272-
273-
References
274-
----------
275-
- Function documentation: https://materialize.com/docs/sql/functions/now_and_mz_now/
276-
- Idiomatic patterns: https://materialize.com/docs/transform-data/idiomatic-materialize-sql/#temporal-filters
277-
"""
278-
return mz_ops.MzNow().to_expr()
279220

280221
def _register_in_memory_table(self, op: ops.InMemoryTable) -> None:
281222
"""Register an in-memory table using COPY FROM STDIN.

ibis/backends/materialize/api.py

Lines changed: 69 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,69 @@
1+
"""Materialize backend API functions."""
2+
3+
from __future__ import annotations
4+
5+
import ibis.expr.types as ir
6+
from ibis.backends.materialize import operations as mz_ops
7+
8+
9+
def mz_now() -> ir.TimestampScalar:
10+
"""Return the logical timestamp in Materialize.
11+
12+
This returns Materialize's `mz_now()` function, which provides the logical
13+
time at which the query was executed. This is different from `ibis.now()`
14+
(PostgreSQL's `now()`) which returns the system clock time.
15+
16+
Key differences from `now()`:
17+
- Returns logical timestamp (for streaming/incremental computation)
18+
- Can be used in temporal filters in materialized views
19+
- Value represents query execution time in Materialize's consistency model
20+
21+
Returns
22+
-------
23+
TimestampScalar
24+
An expression representing Materialize's logical timestamp
25+
26+
Examples
27+
--------
28+
>>> import ibis
29+
>>> from ibis.backends.materialize.api import mz_now
30+
>>> # Get the current logical timestamp
31+
>>> mz_now()
32+
33+
Use in temporal filters (e.g., last 30 seconds of data):
34+
35+
>>> events = con.table("events")
36+
>>> # Best practice: Isolate mz_now() on one side of comparison
37+
>>> recent = events.filter(mz_now() > events.event_ts + ibis.interval(seconds=30))
38+
39+
Compare with regular now():
40+
41+
>>> # System clock time (wall clock)
42+
>>> ibis.now()
43+
>>> # Logical timestamp (streaming time)
44+
>>> mz_now()
45+
46+
See Also
47+
--------
48+
ibis.now : PostgreSQL's now() function (system clock time)
49+
50+
Notes
51+
-----
52+
mz_now() is fundamental to Materialize's streaming SQL model and is used
53+
for temporal filters in materialized views to enable incremental computation.
54+
55+
**Best Practice**: When using mz_now() in temporal filters, isolate it on one
56+
side of the comparison for optimal incremental computation:
57+
58+
- ✅ Good: `mz_now() > created_at + INTERVAL '1 day'`
59+
- ❌ Bad: `mz_now() - created_at > INTERVAL '1 day'`
60+
61+
This pattern enables Materialize to efficiently compute incremental updates
62+
without reprocessing the entire dataset.
63+
64+
References
65+
----------
66+
- Function documentation: https://materialize.com/docs/sql/functions/now_and_mz_now/
67+
- Idiomatic patterns: https://materialize.com/docs/transform-data/idiomatic-materialize-sql/#temporal-filters
68+
"""
69+
return mz_ops.MzNow().to_expr()

ibis/backends/materialize/tests/test_aggregate_edge_cases.py

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414
import pytest
1515

1616
import ibis
17+
from ibis.backends.materialize.api import mz_now
1718

1819

1920
@pytest.mark.usefixtures("con")
@@ -285,7 +286,7 @@ def test_aggregate_with_mz_now_filter(self, con):
285286

286287
# Aggregate with temporal filter
287288
expr = (
288-
t.filter(con.mz_now() > t.created_at + ibis.interval(hours=1))
289+
t.filter(mz_now() > t.created_at + ibis.interval(hours=1))
289290
.group_by("category")
290291
.aggregate(total=t.value.sum())
291292
)
@@ -306,7 +307,7 @@ def test_aggregate_with_mz_now_in_select(self, con):
306307

307308
# Add mz_now() as a column in aggregate
308309
expr = t.group_by("category").aggregate(
309-
total=t.value.sum(), snapshot_time=con.mz_now()
310+
total=t.value.sum(), snapshot_time=mz_now()
310311
)
311312

312313
sql = con.compile(expr)

ibis/backends/materialize/tests/test_array_edge_cases.py

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
import pytest
1414

1515
import ibis
16+
from ibis.backends.materialize.api import mz_now
1617

1718

1819
@pytest.mark.usefixtures("con")
@@ -223,8 +224,8 @@ def test_array_with_mz_now_filter(self, con):
223224
t = t.mutate(created_at=t.created_at.cast("timestamp"))
224225

225226
# Combine array operation with temporal filter
226-
expr = t.mutate(tag_count=t.tags.length(), current_time=con.mz_now()).filter(
227-
con.mz_now() > t.created_at + ibis.interval(hours=1)
227+
expr = t.mutate(tag_count=t.tags.length(), current_time=mz_now()).filter(
228+
mz_now() > t.created_at + ibis.interval(hours=1)
228229
)
229230

230231
# Should compile without error
@@ -248,8 +249,8 @@ def test_unnest_with_temporal_context(self, con):
248249
t = t.mutate(ts=t.ts.cast("timestamp"))
249250

250251
# Unnest with temporal marker
251-
expr = t.mutate(snapshot_time=con.mz_now()).select(
252-
t.id, event=t.events.unnest(), snapshot_time=con.mz_now()
252+
expr = t.mutate(snapshot_time=mz_now()).select(
253+
t.id, event=t.events.unnest(), snapshot_time=mz_now()
253254
)
254255

255256
sql = con.compile(expr)
@@ -274,7 +275,7 @@ def test_array_operations_preserve_streaming_semantics(self, con):
274275
length=t.values.length(),
275276
first_elem=t.values[0],
276277
# Add a temporal marker
277-
query_time=con.mz_now(),
278+
query_time=mz_now(),
278279
)
279280

280281
# Should compile successfully

ibis/backends/materialize/tests/test_idiomatic_patterns.py

Lines changed: 10 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010
from __future__ import annotations
1111

1212
import ibis
13+
from ibis.backends.materialize.api import mz_now
1314

1415

1516
class TestDistinctOnPatterns:
@@ -338,7 +339,7 @@ def test_temporal_filter_idiomatic(self, con):
338339
events = events.mutate(created_at=events.created_at.cast("timestamp"))
339340

340341
# Idiomatic: mz_now() isolated on one side
341-
expr = events.filter(con.mz_now() > events.created_at + ibis.interval(days=1))
342+
expr = events.filter(mz_now() > events.created_at + ibis.interval(days=1))
342343
sql = con.compile(expr)
343344

344345
assert "mz_now()" in sql.lower()
@@ -358,19 +359,19 @@ def test_temporal_filter_with_comparison(self, con):
358359
for op in [">", ">=", "<", "<="]:
359360
if op == ">":
360361
expr = events.filter(
361-
con.mz_now() > events.created_at + ibis.interval(days=1)
362+
mz_now() > events.created_at + ibis.interval(days=1)
362363
)
363364
elif op == ">=":
364365
expr = events.filter(
365-
con.mz_now() >= events.created_at + ibis.interval(days=1)
366+
mz_now() >= events.created_at + ibis.interval(days=1)
366367
)
367368
elif op == "<":
368369
expr = events.filter(
369-
con.mz_now() < events.created_at + ibis.interval(days=1)
370+
mz_now() < events.created_at + ibis.interval(days=1)
370371
)
371372
else: # <=
372373
expr = events.filter(
373-
con.mz_now() <= events.created_at + ibis.interval(days=1)
374+
mz_now() <= events.created_at + ibis.interval(days=1)
374375
)
375376

376377
sql = con.compile(expr)
@@ -451,8 +452,8 @@ def test_or_with_temporal_filter(self, con):
451452

452453
# Idiomatic pattern: Each OR branch has mz_now() isolated
453454
expr = events.filter(
454-
(con.mz_now() > events.created_at + ibis.interval(days=1))
455-
| (con.mz_now() > events.updated_at + ibis.interval(hours=12))
455+
(mz_now() > events.created_at + ibis.interval(days=1))
456+
| (mz_now() > events.updated_at + ibis.interval(hours=12))
456457
)
457458

458459
sql = con.compile(expr)
@@ -520,8 +521,8 @@ def test_union_all_with_mz_now(self, con):
520521
union_expr = events_a.union(events_b, distinct=False)
521522

522523
# Add temporal filter using mz_now()
523-
expr = union_expr.mutate(query_time=con.mz_now()).filter(
524-
con.mz_now() > union_expr.created_at + ibis.interval(days=1)
524+
expr = union_expr.mutate(query_time=mz_now()).filter(
525+
mz_now() > union_expr.created_at + ibis.interval(days=1)
525526
)
526527

527528
sql = con.compile(expr)

ibis/backends/materialize/tests/test_json_edge_cases.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414
import pytest
1515

1616
import ibis
17+
from ibis.backends.materialize.api import mz_now
1718

1819

1920
@pytest.mark.usefixtures("con")
@@ -209,7 +210,7 @@ def test_json_with_mz_now(self, con):
209210
t = t.mutate(metadata=t.metadata.cast("jsonb"))
210211

211212
# Add mz_now() to query with JSON
212-
expr = t.mutate(query_time=con.mz_now(), created=t.metadata["created"])
213+
expr = t.mutate(query_time=mz_now(), created=t.metadata["created"])
213214

214215
sql = con.compile(expr)
215216
assert "mz_now()" in sql.lower()

0 commit comments

Comments
 (0)