Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 4 additions & 2 deletions docs/backends/materialize.qmd
Original file line number Diff line number Diff line change
Expand Up @@ -333,13 +333,15 @@ for comprehensive recommendations.
Materialize provides `mz_now()` for streaming temporal queries:

```python
from ibis.backends.materialize.api import mz_now

# Get Materialize's logical timestamp
current_time = con.mz_now()
current_time = mz_now()

# Filter for recent events (idiomatic pattern)
# Move operations to the right side of the comparison
recent = events.filter(
con.mz_now() > events.created_at + ibis.interval(days=1)
mz_now() > events.created_at + ibis.interval(days=1)
)
```

Expand Down
65 changes: 3 additions & 62 deletions ibis/backends/materialize/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,12 @@
import ibis
import ibis.expr.operations as ops
import ibis.expr.types as ir
from ibis.backends.materialize import operations as mz_ops
from ibis.backends.materialize.api import mz_now
from ibis.backends.postgres import Backend as PostgresBackend
from ibis.backends.sql.compilers.materialize import MaterializeCompiler

__all__ = ("Backend", "mz_now")


class Backend(PostgresBackend):
"""Materialize backend for Ibis.
Expand Down Expand Up @@ -215,67 +217,6 @@ def set_cluster(self, name: str) -> None:
quoted_name = sg.to_identifier(name, quoted=True).sql(self.dialect)
cur.execute(f"SET cluster = {quoted_name}")

def mz_now(self) -> ir.TimestampScalar:
"""Return the logical timestamp in Materialize.
This returns Materialize's `mz_now()` function, which provides the logical
time at which the query was executed. This is different from `ibis.now()`
(PostgreSQL's `now()`) which returns the system clock time.
Key differences from `now()`:
- Returns logical timestamp (for streaming/incremental computation)
- Can be used in temporal filters in materialized views
- Value represents query execution time in Materialize's consistency model
Returns
-------
TimestampScalar
An expression representing Materialize's logical timestamp
Examples
--------
>>> import ibis
>>> con = ibis.materialize.connect()
>>> # Get the current logical timestamp
>>> con.mz_now()
Use in temporal filters (e.g., last 30 seconds of data):
>>> events = con.table("events")
>>> # Best practice: Isolate mz_now() on one side of comparison
>>> recent = events.filter(con.mz_now() > events.event_ts + ibis.interval(seconds=30))
Compare with regular now():
>>> # System clock time (wall clock)
>>> ibis.now()
>>> # Logical timestamp (streaming time)
>>> con.mz_now()
See Also
--------
ibis.now : PostgreSQL's now() function (system clock time)
Notes
-----
mz_now() is fundamental to Materialize's streaming SQL model and is used
for temporal filters in materialized views to enable incremental computation.
**Best Practice**: When using mz_now() in temporal filters, isolate it on one
side of the comparison for optimal incremental computation:
- ✅ Good: `mz_now() > created_at + INTERVAL '1 day'`
- ❌ Bad: `mz_now() - created_at > INTERVAL '1 day'`
This pattern enables Materialize to efficiently compute incremental updates
without reprocessing the entire dataset.
References
----------
- Function documentation: https://materialize.com/docs/sql/functions/now_and_mz_now/
- Idiomatic patterns: https://materialize.com/docs/transform-data/idiomatic-materialize-sql/#temporal-filters
"""
return mz_ops.MzNow().to_expr()

def _register_in_memory_table(self, op: ops.InMemoryTable) -> None:
"""Register an in-memory table using COPY FROM STDIN.
Expand Down
69 changes: 69 additions & 0 deletions ibis/backends/materialize/api.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
"""Materialize backend API functions."""

from __future__ import annotations

import ibis.expr.types as ir
from ibis.backends.materialize import operations as mz_ops


def mz_now() -> ir.TimestampScalar:
"""Return the logical timestamp in Materialize.

This returns Materialize's `mz_now()` function, which provides the logical
time at which the query was executed. This is different from `ibis.now()`
(PostgreSQL's `now()`) which returns the system clock time.

Key differences from `now()`:
- Returns logical timestamp (for streaming/incremental computation)
- Can be used in temporal filters in materialized views
- Value represents query execution time in Materialize's consistency model

Returns
-------
TimestampScalar
An expression representing Materialize's logical timestamp

Examples
--------
>>> import ibis
>>> from ibis.backends.materialize.api import mz_now
>>> # Get the current logical timestamp
>>> mz_now()

Use in temporal filters (e.g., last 30 seconds of data):

>>> events = con.table("events")
>>> # Best practice: Isolate mz_now() on one side of comparison
>>> recent = events.filter(mz_now() > events.event_ts + ibis.interval(seconds=30))

Compare with regular now():

>>> # System clock time (wall clock)
>>> ibis.now()
>>> # Logical timestamp (streaming time)
>>> mz_now()

See Also
--------
ibis.now : PostgreSQL's now() function (system clock time)

Notes
-----
mz_now() is fundamental to Materialize's streaming SQL model and is used
for temporal filters in materialized views to enable incremental computation.

**Best Practice**: When using mz_now() in temporal filters, isolate it on one
side of the comparison for optimal incremental computation:

- ✅ Good: `mz_now() > created_at + INTERVAL '1 day'`
- ❌ Bad: `mz_now() - created_at > INTERVAL '1 day'`

This pattern enables Materialize to efficiently compute incremental updates
without reprocessing the entire dataset.

References
----------
- Function documentation: https://materialize.com/docs/sql/functions/now_and_mz_now/
- Idiomatic patterns: https://materialize.com/docs/transform-data/idiomatic-materialize-sql/#temporal-filters
"""
return mz_ops.MzNow().to_expr()
5 changes: 3 additions & 2 deletions ibis/backends/materialize/tests/test_aggregate_edge_cases.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
import pytest

import ibis
from ibis.backends.materialize.api import mz_now


@pytest.mark.usefixtures("con")
Expand Down Expand Up @@ -285,7 +286,7 @@ def test_aggregate_with_mz_now_filter(self, con):

# Aggregate with temporal filter
expr = (
t.filter(con.mz_now() > t.created_at + ibis.interval(hours=1))
t.filter(mz_now() > t.created_at + ibis.interval(hours=1))
.group_by("category")
.aggregate(total=t.value.sum())
)
Expand All @@ -306,7 +307,7 @@ def test_aggregate_with_mz_now_in_select(self, con):

# Add mz_now() as a column in aggregate
expr = t.group_by("category").aggregate(
total=t.value.sum(), snapshot_time=con.mz_now()
total=t.value.sum(), snapshot_time=mz_now()
)

sql = con.compile(expr)
Expand Down
11 changes: 6 additions & 5 deletions ibis/backends/materialize/tests/test_array_edge_cases.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import pytest

import ibis
from ibis.backends.materialize.api import mz_now


@pytest.mark.usefixtures("con")
Expand Down Expand Up @@ -223,8 +224,8 @@ def test_array_with_mz_now_filter(self, con):
t = t.mutate(created_at=t.created_at.cast("timestamp"))

# Combine array operation with temporal filter
expr = t.mutate(tag_count=t.tags.length(), current_time=con.mz_now()).filter(
con.mz_now() > t.created_at + ibis.interval(hours=1)
expr = t.mutate(tag_count=t.tags.length(), current_time=mz_now()).filter(
mz_now() > t.created_at + ibis.interval(hours=1)
)

# Should compile without error
Expand All @@ -248,8 +249,8 @@ def test_unnest_with_temporal_context(self, con):
t = t.mutate(ts=t.ts.cast("timestamp"))

# Unnest with temporal marker
expr = t.mutate(snapshot_time=con.mz_now()).select(
t.id, event=t.events.unnest(), snapshot_time=con.mz_now()
expr = t.mutate(snapshot_time=mz_now()).select(
t.id, event=t.events.unnest(), snapshot_time=mz_now()
)

sql = con.compile(expr)
Expand All @@ -274,7 +275,7 @@ def test_array_operations_preserve_streaming_semantics(self, con):
length=t.values.length(),
first_elem=t.values[0],
# Add a temporal marker
query_time=con.mz_now(),
query_time=mz_now(),
)

# Should compile successfully
Expand Down
19 changes: 10 additions & 9 deletions ibis/backends/materialize/tests/test_idiomatic_patterns.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
from __future__ import annotations

import ibis
from ibis.backends.materialize.api import mz_now


class TestDistinctOnPatterns:
Expand Down Expand Up @@ -338,7 +339,7 @@ def test_temporal_filter_idiomatic(self, con):
events = events.mutate(created_at=events.created_at.cast("timestamp"))

# Idiomatic: mz_now() isolated on one side
expr = events.filter(con.mz_now() > events.created_at + ibis.interval(days=1))
expr = events.filter(mz_now() > events.created_at + ibis.interval(days=1))
sql = con.compile(expr)

assert "mz_now()" in sql.lower()
Expand All @@ -358,19 +359,19 @@ def test_temporal_filter_with_comparison(self, con):
for op in [">", ">=", "<", "<="]:
if op == ">":
expr = events.filter(
con.mz_now() > events.created_at + ibis.interval(days=1)
mz_now() > events.created_at + ibis.interval(days=1)
)
elif op == ">=":
expr = events.filter(
con.mz_now() >= events.created_at + ibis.interval(days=1)
mz_now() >= events.created_at + ibis.interval(days=1)
)
elif op == "<":
expr = events.filter(
con.mz_now() < events.created_at + ibis.interval(days=1)
mz_now() < events.created_at + ibis.interval(days=1)
)
else: # <=
expr = events.filter(
con.mz_now() <= events.created_at + ibis.interval(days=1)
mz_now() <= events.created_at + ibis.interval(days=1)
)

sql = con.compile(expr)
Expand Down Expand Up @@ -451,8 +452,8 @@ def test_or_with_temporal_filter(self, con):

# Idiomatic pattern: Each OR branch has mz_now() isolated
expr = events.filter(
(con.mz_now() > events.created_at + ibis.interval(days=1))
| (con.mz_now() > events.updated_at + ibis.interval(hours=12))
(mz_now() > events.created_at + ibis.interval(days=1))
| (mz_now() > events.updated_at + ibis.interval(hours=12))
)

sql = con.compile(expr)
Expand Down Expand Up @@ -520,8 +521,8 @@ def test_union_all_with_mz_now(self, con):
union_expr = events_a.union(events_b, distinct=False)

# Add temporal filter using mz_now()
expr = union_expr.mutate(query_time=con.mz_now()).filter(
con.mz_now() > union_expr.created_at + ibis.interval(days=1)
expr = union_expr.mutate(query_time=mz_now()).filter(
mz_now() > union_expr.created_at + ibis.interval(days=1)
)

sql = con.compile(expr)
Expand Down
3 changes: 2 additions & 1 deletion ibis/backends/materialize/tests/test_json_edge_cases.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
import pytest

import ibis
from ibis.backends.materialize.api import mz_now


@pytest.mark.usefixtures("con")
Expand Down Expand Up @@ -209,7 +210,7 @@ def test_json_with_mz_now(self, con):
t = t.mutate(metadata=t.metadata.cast("jsonb"))

# Add mz_now() to query with JSON
expr = t.mutate(query_time=con.mz_now(), created=t.metadata["created"])
expr = t.mutate(query_time=mz_now(), created=t.metadata["created"])

sql = con.compile(expr)
assert "mz_now()" in sql.lower()
Expand Down
Loading
Loading