Skip to content
77 changes: 53 additions & 24 deletions macros/base/base_create_snowplow_events_this_run.sql
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@

{#
Copyright (c) 2021-present Snowplow Analytics Ltd. All rights reserved.
This program is licensed to you under the Snowplow Personal and Academic License Version 1.0,
Expand All @@ -9,6 +10,7 @@ You may obtain a copy of the Snowplow Personal and Academic License Version 1.0
{{ return(adapter.dispatch('base_create_snowplow_events_this_run', 'snowplow_utils')(sessions_this_run_table, session_identifiers, session_sql, session_timestamp, derived_tstamp_partitioned, days_late_allowed, max_session_days, app_ids, snowplow_events_database, snowplow_events_schema, snowplow_events_table, entities_or_sdes, custom_sql, allow_null_dvce_tstamps)) }}
{% endmacro %}


{% macro default__base_create_snowplow_events_this_run(sessions_this_run_table, session_identifiers, session_sql, session_timestamp, derived_tstamp_partitioned, days_late_allowed, max_session_days, app_ids, snowplow_events_database, snowplow_events_schema, snowplow_events_table, entities_or_sdes, custom_sql, allow_null_dvce_tstamps) %}
{%- set lower_limit, upper_limit = snowplow_utils.return_limits_from_model(ref(sessions_this_run_table),
'start_tstamp',
Expand Down Expand Up @@ -37,6 +39,14 @@ You may obtain a copy of the Snowplow Personal and Academic License Version 1.0
e.*

from {{ snowplow_events }} e
where
{% if var('snowplow__enable_keyhole_backfill',false) %}
e.{{ session_timestamp }} >= {{ lower_limit }}
and e.{{ session_timestamp }} <= {{ snowplow_utils.timestamp_add('day', max_session_days, upper_limit) }}
{% else %}
e.{{ session_timestamp }} >= {{ lower_limit }}
and e.{{ session_timestamp }} <= {{ upper_limit }}
{% endif %}

)

Expand All @@ -51,24 +61,30 @@ You may obtain a copy of the Snowplow Personal and Academic License Version 1.0
inner join {{ sessions_this_run }} as b
on a.session_identifier = b.session_identifier

where a.{{ session_timestamp }} <= {{ snowplow_utils.timestamp_add('day', max_session_days, 'b.start_tstamp') }}

where
{% if allow_null_dvce_tstamps %}
and coalesce(a.dvce_sent_tstamp, a.collector_tstamp) <= {{ snowplow_utils.timestamp_add('day', days_late_allowed, 'coalesce(a.dvce_created_tstamp, a.collector_tstamp)') }}
{% else %}
and a.dvce_sent_tstamp <= {{ snowplow_utils.timestamp_add('day', days_late_allowed, 'a.dvce_created_tstamp') }}
{% endif %}

and a.{{ session_timestamp }} >= {{ lower_limit }}
and a.{{ session_timestamp }} <= {{ upper_limit }}
and a.{{ session_timestamp }} >= b.start_tstamp -- deal with late loading events

{% if derived_tstamp_partitioned and target.type == 'bigquery' | as_bool() %}
and a.derived_tstamp >= {{ snowplow_utils.timestamp_add('hour', -1, lower_limit) }}
and a.derived_tstamp <= {{ upper_limit }}

{% if not var('snowplow__enable_keyhole_backfill',false) %}

a.{{ session_timestamp }} <= {{ snowplow_utils.timestamp_add('day', max_session_days, 'b.start_tstamp') }}

and a.{{ session_timestamp }} >= {{ lower_limit }}
and a.{{ session_timestamp }} <= {{ upper_limit }}
and a.{{ session_timestamp }} >= b.start_tstamp -- deal with late loading events

{% if derived_tstamp_partitioned and target.type == 'bigquery' | as_bool() %}
and a.derived_tstamp >= {{ snowplow_utils.timestamp_add('hour', -1, lower_limit) }}
and a.derived_tstamp <= {{ upper_limit }}
{% endif %}

and
{% endif %}

and {{ snowplow_utils.app_id_filter(app_ids) }}
{{ snowplow_utils.app_id_filter(app_ids) }}

qualify row_number() over (partition by a.event_id order by a.{{ session_timestamp }}, a.dvce_created_tstamp) = 1
{% endset %}
Expand Down Expand Up @@ -197,7 +213,9 @@ You may obtain a copy of the Snowplow Personal and Academic License Version 1.0
inner join {{ sessions_this_run }} as b
on a.session_identifier = b.session_identifier

where a.{{ session_timestamp }} <= {{ snowplow_utils.timestamp_add('day', max_session_days, 'b.start_tstamp') }}
where
{% if not var('snowplow__enable_keyhole_backfill',false) %}
a.{{ session_timestamp }} <= {{ snowplow_utils.timestamp_add('day', max_session_days, 'b.start_tstamp') }}
{% if allow_null_dvce_tstamps %}
and coalesce(a.dvce_sent_tstamp, a.collector_tstamp) <= {{ snowplow_utils.timestamp_add('day', days_late_allowed, 'coalesce(a.dvce_created_tstamp, a.collector_tstamp)') }}
{% else %}
Expand All @@ -206,7 +224,9 @@ You may obtain a copy of the Snowplow Personal and Academic License Version 1.0
and a.{{ session_timestamp }} >= {{ lower_limit }}
and a.{{ session_timestamp }} <= {{ upper_limit }}
and a.{{ session_timestamp }} >= b.start_tstamp -- deal with late loading events
and {{ snowplow_utils.app_id_filter(app_ids) }}
and
{% endif %}
{{ snowplow_utils.app_id_filter(app_ids) }}

)

Expand Down Expand Up @@ -284,8 +304,14 @@ You may obtain a copy of the Snowplow Personal and Academic License Version 1.0
{%- endif %}
e.*
from {{ snowplow_events }} e
WHERE e.{{ session_timestamp }} >= {{ lower_limit }}
and e.{{ session_timestamp }} <= {{ upper_limit }}
WHERE
{% if var('snowplow__enable_keyhole_backfill',false) %}
e.{{ session_timestamp }} >= {{ lower_limit }}
and e.{{ session_timestamp }} <= {{ snowplow_utils.timestamp_add('day', max_session_days, upper_limit) }}
{% else %}
e.{{ session_timestamp }} >= {{ lower_limit }}
and e.{{ session_timestamp }} <= {{ upper_limit }}
{% endif %}

),
main_logic as (
Expand All @@ -301,22 +327,25 @@ You may obtain a copy of the Snowplow Personal and Academic License Version 1.0
inner join {{ sessions_this_run }} as b
on a.session_identifier = b.session_identifier

where a.{{ session_timestamp }} <= {{ snowplow_utils.timestamp_add('day', max_session_days, 'b.start_tstamp') }}
where 1=1
{% if allow_null_dvce_tstamps %}
and coalesce(a.dvce_sent_tstamp, a.collector_tstamp) <= {{ snowplow_utils.timestamp_add('day', days_late_allowed, 'coalesce(a.dvce_created_tstamp, a.collector_tstamp)') }}
{% else %}
and a.dvce_sent_tstamp <= {{ snowplow_utils.timestamp_add('day', days_late_allowed, 'a.dvce_created_tstamp') }}
{% endif %}
and a.{{ session_timestamp }} >= {{ lower_limit }}
and a.{{ session_timestamp }} <= {{ upper_limit }}
and a.{{ session_timestamp }} >= b.start_tstamp -- deal with late loading events

{% if derived_tstamp_partitioned and target.type == 'bigquery' | as_bool() %}
and a.derived_tstamp >= {{ snowplow_utils.timestamp_add('hour', -1, lower_limit) }}
and a.derived_tstamp <= {{ upper_limit }}
{% endif %}
{% if not var('snowplow__enable_keyhole_backfill',false) %}
and a.{{ session_timestamp }} <= {{ snowplow_utils.timestamp_add('day', max_session_days, 'b.start_tstamp') }}
and a.{{ session_timestamp }} >= {{ lower_limit }}
and a.{{ session_timestamp }} <= {{ upper_limit }}
and a.{{ session_timestamp }} >= b.start_tstamp -- deal with late loading events

and {{ snowplow_utils.app_id_filter(app_ids) }}
{% if derived_tstamp_partitioned and target.type == 'bigquery' | as_bool() %}
and a.derived_tstamp >= {{ snowplow_utils.timestamp_add('hour', -1, lower_limit) }}
and a.derived_tstamp <= {{ upper_limit }}
{% endif %}
{% endif %}
and {{ snowplow_utils.app_id_filter(app_ids) }}
)
SELECT *
FROM main_logic
Expand Down
27 changes: 18 additions & 9 deletions macros/incremental_hooks/get_run_limits.sql
Original file line number Diff line number Diff line change
@@ -1,11 +1,8 @@
{#
Copyright (c) 2021-present Snowplow Analytics Ltd. All rights reserved.
This program is licensed to you under the Snowplow Personal and Academic License Version 1.0,
and you may not use this file except in compliance with the Snowplow Personal and Academic License Version 1.0.
You may obtain a copy of the Snowplow Personal and Academic License Version 1.0 at https://docs.snowplow.io/personal-and-academic-license-1.0/
#}
{# Returns the sql to calculate the lower/upper limits of the run #}
{% macro get_run_limits(min_last_success, max_last_success, models_matched_from_manifest, has_matched_all_models, start_date) -%}
{{ return(adapter.dispatch('get_run_limits', 'snowplow_utils')(min_last_success, max_last_success, models_matched_from_manifest, has_matched_all_models, start_date)) }}
{%- endmacro %}

{% macro default__get_run_limits(min_last_success, max_last_success, models_matched_from_manifest, has_matched_all_models, start_date) -%}

{% set start_tstamp = snowplow_utils.cast_to_tstamp(start_date) %}
{% set min_last_success = snowplow_utils.cast_to_tstamp(min_last_success) %}
Expand All @@ -15,7 +12,19 @@ You may obtain a copy of the Snowplow Personal and Academic License Version 1.0
{{ return('') }}
{% endif %}

{% if models_matched_from_manifest == 0 %}
{% if var('snowplow__enable_keyhole_backfill',false) %}
{% set start_tstamp = snowplow_utils.cast_to_tstamp(var('snowplow__keyhole_backfill_start_date')) %}
{% set max_last_success = snowplow_utils.cast_to_tstamp(var('snowplow__keyhole_backfill_end_date')) %}

{% do snowplow_utils.log_message("Snowplow: Keyhole backfill enabled") %}

{% set run_limits_query %}
select {{ start_tstamp }} as lower_limit,
least({{ max_last_success }},
{{ snowplow_utils.timestamp_add('day', var("snowplow__backfill_limit_days", 30), start_tstamp) }}) as upper_limit
{% endset %}

{% elif models_matched_from_manifest == 0 %}
{# If no snowplow models are in the manifest, start from start_tstamp #}
{% do snowplow_utils.log_message("Snowplow: No data in manifest. Processing data from start_date") %}

Expand Down Expand Up @@ -60,4 +69,4 @@ You may obtain a copy of the Snowplow Personal and Academic License Version 1.0

{{ return(run_limits_query) }}

{% endmacro %}
{% endmacro %}
23 changes: 14 additions & 9 deletions macros/incremental_hooks/return_base_new_event_limits.sql
Original file line number Diff line number Diff line change
@@ -1,10 +1,8 @@
{#
Copyright (c) 2021-present Snowplow Analytics Ltd. All rights reserved.
This program is licensed to you under the Snowplow Personal and Academic License Version 1.0,
and you may not use this file except in compliance with the Snowplow Personal and Academic License Version 1.0.
You may obtain a copy of the Snowplow Personal and Academic License Version 1.0 at https://docs.snowplow.io/personal-and-academic-license-1.0/
#}
{% macro return_base_new_event_limits(base_events_this_run) -%}
{{ return(adapter.dispatch('return_base_new_event_limits', 'snowplow_utils')(base_events_this_run)) }}
{%- endmacro %}

{% macro default__return_base_new_event_limits(base_events_this_run) -%}

{# In case of not execute just return empty strings to avoid hitting database #}
{% if not execute %}
Expand All @@ -16,7 +14,15 @@ You may obtain a copy of the Snowplow Personal and Academic License Version 1.0
schema=base_events_this_run.schema,
identifier=base_events_this_run.name) %}

{% if target_relation is not none %}
{% if var('snowplow__enable_keyhole_backfill',false) %}

{% set lower_limit = snowplow_utils.cast_to_tstamp(var('snowplow__keyhole_backfill_start_date')) %}
{% set upper_limit = snowplow_utils.cast_to_tstamp(var('snowplow__keyhole_backfill_end_date')) %}
{% set session_start_limit = snowplow_utils.timestamp_add('day',-var("snowplow__max_session_days", 3),snowplow_utils.cast_to_tstamp(var('snowplow__keyhole_backfill_start_date'))) %}

{{ return([lower_limit, upper_limit, session_start_limit]) }}

{% elif target_relation is not none %}

{% set limit_query %}
select
Expand All @@ -25,7 +31,6 @@ You may obtain a copy of the Snowplow Personal and Academic License Version 1.0
{{ snowplow_utils.timestamp_add('day',
-var("snowplow__max_session_days", 3),
'lower_limit') }} as session_start_limit

from {{ base_events_this_run }}
{% endset %}

Expand All @@ -51,4 +56,4 @@ You may obtain a copy of the Snowplow Personal and Academic License Version 1.0

{% endif %}

{%- endmacro %}
{%- endmacro %}
35 changes: 15 additions & 20 deletions macros/incremental_hooks/snowplow_incremental_post_hook.sql
Original file line number Diff line number Diff line change
@@ -1,28 +1,23 @@
{#
Copyright (c) 2021-present Snowplow Analytics Ltd. All rights reserved.
This program is licensed to you under the Snowplow Personal and Academic License Version 1.0,
and you may not use this file except in compliance with the Snowplow Personal and Academic License Version 1.0.
You may obtain a copy of the Snowplow Personal and Academic License Version 1.0 at https://docs.snowplow.io/personal-and-academic-license-1.0/
#}
{# post-hook for incremental runs #}
{% macro snowplow_incremental_post_hook(package_name='snowplow', incremental_manifest_table_name=none, base_events_this_run_table_name=none, session_timestamp=var('snowplow__session_timestamp', 'load_tstamp')) %}

{% set enabled_snowplow_models = snowplow_utils.get_enabled_snowplow_models(package_name) -%}
{% if not var('snowplow__enable_keyhole_backfill',false) %}
{% set enabled_snowplow_models = snowplow_utils.get_enabled_snowplow_models(package_name) -%}

{% set successful_snowplow_models = snowplow_utils.get_successful_models(models=enabled_snowplow_models) -%}
{% set successful_snowplow_models = snowplow_utils.get_successful_models(models=enabled_snowplow_models) -%}

{%- if incremental_manifest_table_name -%}
{%- set incremental_manifest_table = ref(incremental_manifest_table_name) -%}
{%- else -%}
{% set incremental_manifest_table = snowplow_utils.get_incremental_manifest_table_relation(package_name) -%}
{%- endif -%}
{%- if incremental_manifest_table_name -%}
{%- set incremental_manifest_table = ref(incremental_manifest_table_name) -%}
{%- else -%}
{% set incremental_manifest_table = snowplow_utils.get_incremental_manifest_table_relation(package_name) -%}
{%- endif -%}

{%- if base_events_this_run_table_name -%}
{%- set base_events_this_run_table = ref(base_events_this_run_table_name) -%}
{%- else -%}
{% set base_events_this_run_table = ref(package_name~'_base_events_this_run') -%}
{%- endif -%}
{%- if base_events_this_run_table_name -%}
{%- set base_events_this_run_table = ref(base_events_this_run_table_name) -%}
{%- else -%}
{% set base_events_this_run_table = ref(package_name~'_base_events_this_run') -%}
{%- endif -%}

{{ snowplow_utils.update_incremental_manifest_table(incremental_manifest_table, base_events_this_run_table, successful_snowplow_models, session_timestamp) }}
{{ snowplow_utils.update_incremental_manifest_table(incremental_manifest_table, base_events_this_run_table, successful_snowplow_models, session_timestamp) }}

{% endif %}
{% endmacro %}
Loading