diff --git a/docs/backends/materialize.qmd b/docs/backends/materialize.qmd index 9180aebb5aea..ce6e9b75039f 100644 --- a/docs/backends/materialize.qmd +++ b/docs/backends/materialize.qmd @@ -333,13 +333,15 @@ for comprehensive recommendations. Materialize provides `mz_now()` for streaming temporal queries: ```python +from ibis.backends.materialize.api import mz_now + # Get Materialize's logical timestamp -current_time = con.mz_now() +current_time = mz_now() # Filter for recent events (idiomatic pattern) # Move operations to the right side of the comparison recent = events.filter( - con.mz_now() > events.created_at + ibis.interval(days=1) + mz_now() > events.created_at + ibis.interval(days=1) ) ``` diff --git a/ibis/backends/materialize/__init__.py b/ibis/backends/materialize/__init__.py index 7df5f65030d2..343f4f56621f 100644 --- a/ibis/backends/materialize/__init__.py +++ b/ibis/backends/materialize/__init__.py @@ -10,10 +10,12 @@ import ibis import ibis.expr.operations as ops import ibis.expr.types as ir -from ibis.backends.materialize import operations as mz_ops +from ibis.backends.materialize.api import mz_now from ibis.backends.postgres import Backend as PostgresBackend from ibis.backends.sql.compilers.materialize import MaterializeCompiler +__all__ = ("Backend", "mz_now") + class Backend(PostgresBackend): """Materialize backend for Ibis. @@ -215,67 +217,6 @@ def set_cluster(self, name: str) -> None: quoted_name = sg.to_identifier(name, quoted=True).sql(self.dialect) cur.execute(f"SET cluster = {quoted_name}") - def mz_now(self) -> ir.TimestampScalar: - """Return the logical timestamp in Materialize. - - This returns Materialize's `mz_now()` function, which provides the logical - time at which the query was executed. This is different from `ibis.now()` - (PostgreSQL's `now()`) which returns the system clock time. - - Key differences from `now()`: - - Returns logical timestamp (for streaming/incremental computation) - - Can be used in temporal filters in materialized views - - Value represents query execution time in Materialize's consistency model - - Returns - ------- - TimestampScalar - An expression representing Materialize's logical timestamp - - Examples - -------- - >>> import ibis - >>> con = ibis.materialize.connect() - >>> # Get the current logical timestamp - >>> con.mz_now() - - Use in temporal filters (e.g., last 30 seconds of data): - - >>> events = con.table("events") - >>> # Best practice: Isolate mz_now() on one side of comparison - >>> recent = events.filter(con.mz_now() > events.event_ts + ibis.interval(seconds=30)) - - Compare with regular now(): - - >>> # System clock time (wall clock) - >>> ibis.now() - >>> # Logical timestamp (streaming time) - >>> con.mz_now() - - See Also - -------- - ibis.now : PostgreSQL's now() function (system clock time) - - Notes - ----- - mz_now() is fundamental to Materialize's streaming SQL model and is used - for temporal filters in materialized views to enable incremental computation. - - **Best Practice**: When using mz_now() in temporal filters, isolate it on one - side of the comparison for optimal incremental computation: - - - ✅ Good: `mz_now() > created_at + INTERVAL '1 day'` - - ❌ Bad: `mz_now() - created_at > INTERVAL '1 day'` - - This pattern enables Materialize to efficiently compute incremental updates - without reprocessing the entire dataset. - - References - ---------- - - Function documentation: https://materialize.com/docs/sql/functions/now_and_mz_now/ - - Idiomatic patterns: https://materialize.com/docs/transform-data/idiomatic-materialize-sql/#temporal-filters - """ - return mz_ops.MzNow().to_expr() def _register_in_memory_table(self, op: ops.InMemoryTable) -> None: """Register an in-memory table using COPY FROM STDIN. diff --git a/ibis/backends/materialize/api.py b/ibis/backends/materialize/api.py new file mode 100644 index 000000000000..ef4f97a23cc5 --- /dev/null +++ b/ibis/backends/materialize/api.py @@ -0,0 +1,69 @@ +"""Materialize backend API functions.""" + +from __future__ import annotations + +import ibis.expr.types as ir +from ibis.backends.materialize import operations as mz_ops + + +def mz_now() -> ir.TimestampScalar: + """Return the logical timestamp in Materialize. + + This returns Materialize's `mz_now()` function, which provides the logical + time at which the query was executed. This is different from `ibis.now()` + (PostgreSQL's `now()`) which returns the system clock time. + + Key differences from `now()`: + - Returns logical timestamp (for streaming/incremental computation) + - Can be used in temporal filters in materialized views + - Value represents query execution time in Materialize's consistency model + + Returns + ------- + TimestampScalar + An expression representing Materialize's logical timestamp + + Examples + -------- + >>> import ibis + >>> from ibis.backends.materialize.api import mz_now + >>> # Get the current logical timestamp + >>> mz_now() + + Use in temporal filters (e.g., last 30 seconds of data): + + >>> events = con.table("events") + >>> # Best practice: Isolate mz_now() on one side of comparison + >>> recent = events.filter(mz_now() > events.event_ts + ibis.interval(seconds=30)) + + Compare with regular now(): + + >>> # System clock time (wall clock) + >>> ibis.now() + >>> # Logical timestamp (streaming time) + >>> mz_now() + + See Also + -------- + ibis.now : PostgreSQL's now() function (system clock time) + + Notes + ----- + mz_now() is fundamental to Materialize's streaming SQL model and is used + for temporal filters in materialized views to enable incremental computation. + + **Best Practice**: When using mz_now() in temporal filters, isolate it on one + side of the comparison for optimal incremental computation: + + - ✅ Good: `mz_now() > created_at + INTERVAL '1 day'` + - ❌ Bad: `mz_now() - created_at > INTERVAL '1 day'` + + This pattern enables Materialize to efficiently compute incremental updates + without reprocessing the entire dataset. + + References + ---------- + - Function documentation: https://materialize.com/docs/sql/functions/now_and_mz_now/ + - Idiomatic patterns: https://materialize.com/docs/transform-data/idiomatic-materialize-sql/#temporal-filters + """ + return mz_ops.MzNow().to_expr() diff --git a/ibis/backends/materialize/tests/test_aggregate_edge_cases.py b/ibis/backends/materialize/tests/test_aggregate_edge_cases.py index 7e5e6ceaeade..08e86b3e9625 100644 --- a/ibis/backends/materialize/tests/test_aggregate_edge_cases.py +++ b/ibis/backends/materialize/tests/test_aggregate_edge_cases.py @@ -14,6 +14,7 @@ import pytest import ibis +from ibis.backends.materialize.api import mz_now @pytest.mark.usefixtures("con") @@ -285,7 +286,7 @@ def test_aggregate_with_mz_now_filter(self, con): # Aggregate with temporal filter expr = ( - t.filter(con.mz_now() > t.created_at + ibis.interval(hours=1)) + t.filter(mz_now() > t.created_at + ibis.interval(hours=1)) .group_by("category") .aggregate(total=t.value.sum()) ) @@ -306,7 +307,7 @@ def test_aggregate_with_mz_now_in_select(self, con): # Add mz_now() as a column in aggregate expr = t.group_by("category").aggregate( - total=t.value.sum(), snapshot_time=con.mz_now() + total=t.value.sum(), snapshot_time=mz_now() ) sql = con.compile(expr) diff --git a/ibis/backends/materialize/tests/test_array_edge_cases.py b/ibis/backends/materialize/tests/test_array_edge_cases.py index 2bf8ed1f197e..45ccc7d3076a 100644 --- a/ibis/backends/materialize/tests/test_array_edge_cases.py +++ b/ibis/backends/materialize/tests/test_array_edge_cases.py @@ -13,6 +13,7 @@ import pytest import ibis +from ibis.backends.materialize.api import mz_now @pytest.mark.usefixtures("con") @@ -223,8 +224,8 @@ def test_array_with_mz_now_filter(self, con): t = t.mutate(created_at=t.created_at.cast("timestamp")) # Combine array operation with temporal filter - expr = t.mutate(tag_count=t.tags.length(), current_time=con.mz_now()).filter( - con.mz_now() > t.created_at + ibis.interval(hours=1) + expr = t.mutate(tag_count=t.tags.length(), current_time=mz_now()).filter( + mz_now() > t.created_at + ibis.interval(hours=1) ) # Should compile without error @@ -248,8 +249,8 @@ def test_unnest_with_temporal_context(self, con): t = t.mutate(ts=t.ts.cast("timestamp")) # Unnest with temporal marker - expr = t.mutate(snapshot_time=con.mz_now()).select( - t.id, event=t.events.unnest(), snapshot_time=con.mz_now() + expr = t.mutate(snapshot_time=mz_now()).select( + t.id, event=t.events.unnest(), snapshot_time=mz_now() ) sql = con.compile(expr) @@ -274,7 +275,7 @@ def test_array_operations_preserve_streaming_semantics(self, con): length=t.values.length(), first_elem=t.values[0], # Add a temporal marker - query_time=con.mz_now(), + query_time=mz_now(), ) # Should compile successfully diff --git a/ibis/backends/materialize/tests/test_idiomatic_patterns.py b/ibis/backends/materialize/tests/test_idiomatic_patterns.py index d98096f6fe95..ae49572a7a83 100644 --- a/ibis/backends/materialize/tests/test_idiomatic_patterns.py +++ b/ibis/backends/materialize/tests/test_idiomatic_patterns.py @@ -10,6 +10,7 @@ from __future__ import annotations import ibis +from ibis.backends.materialize.api import mz_now class TestDistinctOnPatterns: @@ -338,7 +339,7 @@ def test_temporal_filter_idiomatic(self, con): events = events.mutate(created_at=events.created_at.cast("timestamp")) # Idiomatic: mz_now() isolated on one side - expr = events.filter(con.mz_now() > events.created_at + ibis.interval(days=1)) + expr = events.filter(mz_now() > events.created_at + ibis.interval(days=1)) sql = con.compile(expr) assert "mz_now()" in sql.lower() @@ -358,19 +359,19 @@ def test_temporal_filter_with_comparison(self, con): for op in [">", ">=", "<", "<="]: if op == ">": expr = events.filter( - con.mz_now() > events.created_at + ibis.interval(days=1) + mz_now() > events.created_at + ibis.interval(days=1) ) elif op == ">=": expr = events.filter( - con.mz_now() >= events.created_at + ibis.interval(days=1) + mz_now() >= events.created_at + ibis.interval(days=1) ) elif op == "<": expr = events.filter( - con.mz_now() < events.created_at + ibis.interval(days=1) + mz_now() < events.created_at + ibis.interval(days=1) ) else: # <= expr = events.filter( - con.mz_now() <= events.created_at + ibis.interval(days=1) + mz_now() <= events.created_at + ibis.interval(days=1) ) sql = con.compile(expr) @@ -451,8 +452,8 @@ def test_or_with_temporal_filter(self, con): # Idiomatic pattern: Each OR branch has mz_now() isolated expr = events.filter( - (con.mz_now() > events.created_at + ibis.interval(days=1)) - | (con.mz_now() > events.updated_at + ibis.interval(hours=12)) + (mz_now() > events.created_at + ibis.interval(days=1)) + | (mz_now() > events.updated_at + ibis.interval(hours=12)) ) sql = con.compile(expr) @@ -520,8 +521,8 @@ def test_union_all_with_mz_now(self, con): union_expr = events_a.union(events_b, distinct=False) # Add temporal filter using mz_now() - expr = union_expr.mutate(query_time=con.mz_now()).filter( - con.mz_now() > union_expr.created_at + ibis.interval(days=1) + expr = union_expr.mutate(query_time=mz_now()).filter( + mz_now() > union_expr.created_at + ibis.interval(days=1) ) sql = con.compile(expr) diff --git a/ibis/backends/materialize/tests/test_json_edge_cases.py b/ibis/backends/materialize/tests/test_json_edge_cases.py index 5c1fd06ffb82..3c614d04d334 100644 --- a/ibis/backends/materialize/tests/test_json_edge_cases.py +++ b/ibis/backends/materialize/tests/test_json_edge_cases.py @@ -14,6 +14,7 @@ import pytest import ibis +from ibis.backends.materialize.api import mz_now @pytest.mark.usefixtures("con") @@ -209,7 +210,7 @@ def test_json_with_mz_now(self, con): t = t.mutate(metadata=t.metadata.cast("jsonb")) # Add mz_now() to query with JSON - expr = t.mutate(query_time=con.mz_now(), created=t.metadata["created"]) + expr = t.mutate(query_time=mz_now(), created=t.metadata["created"]) sql = con.compile(expr) assert "mz_now()" in sql.lower() diff --git a/ibis/backends/materialize/tests/test_mz_now.py b/ibis/backends/materialize/tests/test_mz_now.py index 695528b154d4..7a5118eb9f21 100644 --- a/ibis/backends/materialize/tests/test_mz_now.py +++ b/ibis/backends/materialize/tests/test_mz_now.py @@ -6,6 +6,7 @@ import ibis import ibis.expr.datatypes as dt +from ibis.backends.materialize.api import mz_now from ibis.backends.materialize import operations as mz_ops @@ -36,13 +37,13 @@ class TestMzNowCompilation: def test_compile_mz_now(self, con): """Test basic mz_now() compilation.""" - expr = con.mz_now() + expr = mz_now() sql = con.compile(expr) assert "mz_now()" in sql.lower() def test_mz_now_in_select(self, con): """Test mz_now() in SELECT statement.""" - expr = ibis.memtable({"a": [1, 2, 3]}).mutate(ts=con.mz_now()) + expr = ibis.memtable({"a": [1, 2, 3]}).mutate(ts=mz_now()) sql = con.compile(expr) assert "mz_now()" in sql.lower() @@ -61,7 +62,7 @@ def test_mz_now_in_filter(self, con): t = t.mutate(event_ts=t.event_ts.cast("timestamp")) # Filter for events within 30 seconds of mz_now() - expr = t.filter(con.mz_now() <= t.event_ts + ibis.interval(seconds=30)) + expr = t.filter(mz_now() <= t.event_ts + ibis.interval(seconds=30)) sql = con.compile(expr) assert "mz_now()" in sql.lower() @@ -74,11 +75,11 @@ def test_mz_now_comparison(self, con): # Test various comparison operators exprs = [ - t.filter(con.mz_now() > t.event_ts), - t.filter(con.mz_now() >= t.event_ts), - t.filter(con.mz_now() < t.event_ts), - t.filter(con.mz_now() <= t.event_ts), - t.filter(con.mz_now() == t.event_ts), + t.filter(mz_now() > t.event_ts), + t.filter(mz_now() >= t.event_ts), + t.filter(mz_now() < t.event_ts), + t.filter(mz_now() <= t.event_ts), + t.filter(mz_now() == t.event_ts), ] for expr in exprs: @@ -87,7 +88,7 @@ def test_mz_now_comparison(self, con): def test_mz_now_arithmetic(self, con): """Test mz_now() with interval arithmetic.""" - expr = con.mz_now() - ibis.interval(days=1) + expr = mz_now() - ibis.interval(days=1) sql = con.compile(expr) assert "mz_now()" in sql.lower() @@ -100,7 +101,7 @@ class TestMzNowExecution: def test_execute_mz_now(self, con): """Test that mz_now() can be executed and returns a timestamp.""" - result = con.execute(con.mz_now()) + result = con.execute(mz_now()) # Should return a timestamp value assert result is not None @@ -112,7 +113,7 @@ def test_execute_mz_now(self, con): def test_mz_now_vs_now(self, con): """Test that mz_now() and now() return different values.""" - mz_now_result = con.execute(con.mz_now()) + mz_now_result = con.execute(mz_now()) now_result = con.execute(ibis.now()) # Both should return timestamps @@ -139,7 +140,7 @@ def test_mz_now_in_table_query(self, con): data = data.mutate(created_at=data.created_at.cast("timestamp")) # Add mz_now() as a column - result_expr = data.mutate(current_ts=con.mz_now()) + result_expr = data.mutate(current_ts=mz_now()) result = con.execute(result_expr) # Should have current_ts column @@ -165,7 +166,7 @@ def test_temporal_filter_pattern(self, con): # Recommended pattern: mz_now() > event_ts + INTERVAL # (operation on right side of comparison) - expr = data.filter(con.mz_now() > data.event_ts + ibis.interval(seconds=30)) + expr = data.filter(mz_now() > data.event_ts + ibis.interval(seconds=30)) # Should compile without error sql = con.compile(expr) @@ -179,14 +180,13 @@ def test_temporal_filter_pattern(self, con): class TestMzNowDocumentation: """Test documentation and examples in mz_now().""" - def test_mz_now_method_exists(self, con): - """Test that mz_now() method exists on backend.""" - assert hasattr(con, "mz_now") - assert callable(con.mz_now) + def test_mz_now_function_exists(self): + """Test that mz_now() function exists.""" + assert callable(mz_now) - def test_mz_now_docstring(self, con): + def test_mz_now_docstring(self): """Test that mz_now() has proper documentation.""" - doc = con.mz_now.__doc__ + doc = mz_now.__doc__ assert doc is not None # Should explain key differences from now() @@ -199,16 +199,16 @@ def test_mz_now_docstring(self, con): # Should have link to docs assert "materialize.com/docs" in doc.lower() - def test_mz_now_return_type(self, con): + def test_mz_now_return_type(self): """Test that mz_now() returns correct expression type.""" - expr = con.mz_now() + expr = mz_now() # Should return a TimestampScalar expression assert expr.type().is_timestamp() - def test_mz_now_examples_in_docstring(self, con): + def test_mz_now_examples_in_docstring(self): """Test that docstring contains usage examples.""" - doc = con.mz_now.__doc__ + doc = mz_now.__doc__ # Should have examples section assert "Examples" in doc @@ -223,7 +223,7 @@ class TestMzNowEdgeCases: def test_mz_now_multiple_calls(self, con): """Test that multiple mz_now() calls work correctly.""" - expr = ibis.memtable({"a": [1, 2]}).mutate(ts1=con.mz_now(), ts2=con.mz_now()) + expr = ibis.memtable({"a": [1, 2]}).mutate(ts1=mz_now(), ts2=mz_now()) sql = con.compile(expr) # Should have two mz_now() calls @@ -232,7 +232,7 @@ def test_mz_now_multiple_calls(self, con): def test_mz_now_with_cast(self, con): """Test mz_now() with type casting.""" # Cast to string - expr = con.mz_now().cast("string") + expr = mz_now().cast("string") sql = con.compile(expr) assert "mz_now()" in sql.lower() assert "cast" in sql.lower() @@ -243,7 +243,7 @@ def test_mz_now_in_aggregate(self, con): # Use mz_now() in aggregate - since it's scalar, just add it as a column expr = data.group_by("group").aggregate( - total=data.value.sum(), snapshot_ts=con.mz_now() + total=data.value.sum(), snapshot_ts=mz_now() ) sql = con.compile(expr) @@ -260,7 +260,7 @@ def test_mz_now_with_window_function(self, con): # Add mz_now() and window function expr = data.mutate( - ts=con.mz_now(), row_num=ibis.row_number().over(order_by=data.value) + ts=mz_now(), row_num=ibis.row_number().over(order_by=data.value) ) sql = con.compile(expr) @@ -275,7 +275,7 @@ def test_mz_now_with_join(self, con): left = left.mutate(ts=left.ts.cast("timestamp")) # Add mz_now() in join - expr = left.join(right, "id").mutate(current=con.mz_now()) + expr = left.join(right, "id").mutate(current=mz_now()) sql = con.compile(expr) assert "mz_now()" in sql.lower() @@ -287,7 +287,7 @@ def test_mz_now_with_case_when(self, con): data = data.mutate(ts=data.ts.cast("timestamp")) # Use mz_now() in case expression using ifelse - expr = data.mutate(status=ibis.ifelse(con.mz_now() > data.ts, "past", "future")) + expr = data.mutate(status=ibis.ifelse(mz_now() > data.ts, "past", "future")) sql = con.compile(expr) assert "mz_now()" in sql.lower() diff --git a/ibis/backends/materialize/tests/test_streaming_edge_cases.py b/ibis/backends/materialize/tests/test_streaming_edge_cases.py index e702c2281f6e..4069a5ded81c 100644 --- a/ibis/backends/materialize/tests/test_streaming_edge_cases.py +++ b/ibis/backends/materialize/tests/test_streaming_edge_cases.py @@ -14,6 +14,7 @@ import pytest import ibis +from ibis.backends.materialize.api import mz_now @pytest.mark.usefixtures("con") @@ -39,7 +40,7 @@ def test_mz_now_in_multiple_filters(self, con): ) # Multiple mz_now() filters - expr = t.filter((con.mz_now() > t.start_time) & (con.mz_now() < t.end_time)) + expr = t.filter((mz_now() > t.start_time) & (mz_now() < t.end_time)) sql = con.compile(expr) # Should have multiple mz_now() calls @@ -61,13 +62,13 @@ def test_mz_now_with_case_when_expressions(self, con): # Complex CASE with mz_now() using nested ifelse expr = t.mutate( status=ibis.ifelse( - con.mz_now() > t.deadline + ibis.interval(days=30), + mz_now() > t.deadline + ibis.interval(days=30), "overdue_long", ibis.ifelse( - con.mz_now() > t.deadline, + mz_now() > t.deadline, "overdue", ibis.ifelse( - con.mz_now() > t.deadline - ibis.interval(days=7), + mz_now() > t.deadline - ibis.interval(days=7), "due_soon", "active", ), @@ -101,7 +102,7 @@ def test_mz_now_in_join_condition(self, con): # Join with temporal condition expr = left.join(right, "id").filter( - con.mz_now() > left.valid_from + ibis.interval(days=1) + mz_now() > left.valid_from + ibis.interval(days=1) ) sql = con.compile(expr) @@ -129,7 +130,7 @@ def test_mz_now_with_subquery(self, con): t = t.mutate(created_at=t.created_at.cast("timestamp")) # Subquery with mz_now() - recent = t.filter(con.mz_now() > t.created_at + ibis.interval(days=1)) + recent = t.filter(mz_now() > t.created_at + ibis.interval(days=1)) # Use subquery result expr = recent.group_by("category").aggregate(total=recent.value.sum()) @@ -145,14 +146,14 @@ class TestIntervalEdgeCases: def test_interval_zero_duration(self, con): """Test interval with zero duration.""" # Zero interval - expr = con.mz_now() + ibis.interval(seconds=0) + expr = mz_now() + ibis.interval(seconds=0) sql = con.compile(expr) assert "mz_now()" in sql.lower() def test_interval_negative_duration(self, con): """Test interval with negative duration (going backwards in time).""" # Negative interval - expr = con.mz_now() - ibis.interval(days=7) + expr = mz_now() - ibis.interval(days=7) sql = con.compile(expr) assert "mz_now()" in sql.lower() assert "interval" in sql.lower() @@ -163,7 +164,7 @@ def test_interval_mixed_units(self, con): interval = ( ibis.interval(days=1) + ibis.interval(hours=2) + ibis.interval(minutes=30) ) - expr = con.mz_now() + interval + expr = mz_now() + interval sql = con.compile(expr) assert "mz_now()" in sql.lower() @@ -171,7 +172,7 @@ def test_interval_mixed_units(self, con): def test_interval_very_large(self, con): """Test interval with very large duration.""" # 10 years into the future - expr = con.mz_now() + ibis.interval(years=10) + expr = mz_now() + ibis.interval(years=10) sql = con.compile(expr) assert "mz_now()" in sql.lower() @@ -305,7 +306,7 @@ def test_window_with_mz_now(self, con): # Window function with mz_now() column expr = t.mutate( - query_time=con.mz_now(), + query_time=mz_now(), row_num=ibis.row_number().over( ibis.window(group_by="category", order_by="created_at") ), @@ -341,7 +342,7 @@ def test_join_with_temporal_filter(self, con): # Temporal join: only recent orders expr = ( orders.join(customers, "customer_id") - .filter(con.mz_now() > orders.order_date + ibis.interval(hours=1)) + .filter(mz_now() > orders.order_date + ibis.interval(hours=1)) .select(orders.order_id, customers.name, orders.order_date) )