diff --git a/.source b/.source index 6654e716114..7c606c0a774 160000 --- a/.source +++ b/.source @@ -1 +1 @@ -Subproject commit 6654e716114c36258ce9a5fb39ad175d0099e086 +Subproject commit 7c606c0a77410b87bf0e0f198b1fed93db771d92 diff --git a/apps/api/migrations/clickhouse-migrations/2_add_workflow_id_to_schema.sql b/apps/api/migrations/clickhouse-migrations/2_add_workflow_id_to_schema.sql new file mode 100644 index 00000000000..15299bb786f --- /dev/null +++ b/apps/api/migrations/clickhouse-migrations/2_add_workflow_id_to_schema.sql @@ -0,0 +1,10 @@ +-- Add workflow_id column to step_runs table +-- This column stores the workflow template ID for each step execution +ALTER TABLE step_runs +ADD COLUMN IF NOT EXISTS workflow_id String DEFAULT ''; + + +-- Add workflow_id column to traces table +-- This column stores the workflow template ID for each trace +ALTER TABLE traces +ADD COLUMN IF NOT EXISTS workflow_id String DEFAULT ''; diff --git a/apps/api/package.json b/apps/api/package.json index 11452bbe232..efefc9c0c67 100644 --- a/apps/api/package.json +++ b/apps/api/package.json @@ -32,7 +32,7 @@ "test:e2e:novu-v0": "cross-env TS_NODE_COMPILER_OPTIONS='{\"strictNullChecks\": false}' NODE_ENV=test mocha --timeout 15000 --retries 3 --grep '#novu-v0' --require ts-node/register --exit --file e2e/setup.ts src/**/*.e2e{,-ee}.ts", "test:e2e:novu-v2": "cross-env NODE_ENV=test CI_EE_TEST=true CLERK_ENABLED=true NODE_OPTIONS=--max_old_space_size=8192 mocha --timeout 30000 --retries 3 --grep '#novu-v2' --require ./swc-register.js --exit --file e2e/setup.ts 'src/**/*.e2e{,-ee}.ts' 'e2e/enterprise/**/*.e2e.ts'", "migration": "cross-env NODE_ENV=local MIGRATION=true ts-node --transpileOnly", - "clickhouse:migrate": "clickhouse-migrations migrate --host=http://localhost:8123 --user=default --password= --db=novu-local --migrations-home=./migrations/clickhouse-migrations", + "clickhouse:migrate:local": "clickhouse-migrations migrate --host=http://localhost:8123 --user=default --password= --db=novu-local --migrations-home=./migrations/clickhouse-migrations", "clickhouse:migrate:prod": "clickhouse-migrations migrate --migrations-home=./migrations/clickhouse-migrations", "link:submodules": "pnpm link ../../enterprise/packages/auth && pnpm link ../../enterprise/packages/translation && pnpm link ../../enterprise/packages/billing", "admin:remove-user-account": "cross-env NODE_ENV=local MIGRATION=true ts-node --transpileOnly ./admin/remove-user-account.ts", diff --git a/apps/api/src/app/activity/usecases/build-active-subscribers-chart/build-active-subscribers-chart.command.ts b/apps/api/src/app/activity/usecases/build-active-subscribers-chart/build-active-subscribers-chart.command.ts index 1c216b5cab1..134cfa109a3 100644 --- a/apps/api/src/app/activity/usecases/build-active-subscribers-chart/build-active-subscribers-chart.command.ts +++ b/apps/api/src/app/activity/usecases/build-active-subscribers-chart/build-active-subscribers-chart.command.ts @@ -1,5 +1,5 @@ import { EnvironmentCommand } from '@novu/application-generic'; -import { IsDate, IsDefined } from 'class-validator'; +import { IsArray, IsDate, IsDefined, IsOptional, IsString } from 'class-validator'; export class BuildActiveSubscribersChartCommand extends EnvironmentCommand { @IsDate() @@ -9,4 +9,9 @@ export class BuildActiveSubscribersChartCommand extends EnvironmentCommand { @IsDate() @IsDefined() endDate: Date; + + @IsOptional() + @IsArray() + @IsString({ each: true }) + workflowIds?: string[]; } diff --git a/apps/api/src/app/activity/usecases/build-active-subscribers-chart/build-active-subscribers-chart.usecase.ts b/apps/api/src/app/activity/usecases/build-active-subscribers-chart/build-active-subscribers-chart.usecase.ts index 0961165ff59..bd887e40478 100644 --- a/apps/api/src/app/activity/usecases/build-active-subscribers-chart/build-active-subscribers-chart.usecase.ts +++ b/apps/api/src/app/activity/usecases/build-active-subscribers-chart/build-active-subscribers-chart.usecase.ts @@ -14,7 +14,7 @@ export class BuildActiveSubscribersChart { @InstrumentUsecase() async execute(command: BuildActiveSubscribersChartCommand): Promise { - const { environmentId, organizationId, startDate, endDate } = command; + const { environmentId, organizationId, startDate, endDate, workflowIds } = command; // Calculate previous period dates const periodDuration = endDate.getTime() - startDate.getTime(); @@ -27,7 +27,8 @@ export class BuildActiveSubscribersChart { startDate, endDate, previousStartDate, - previousEndDate + previousEndDate, + workflowIds ); return { diff --git a/apps/api/src/app/activity/usecases/build-active-subscribers-trend-chart/build-active-subscribers-trend-chart.command.ts b/apps/api/src/app/activity/usecases/build-active-subscribers-trend-chart/build-active-subscribers-trend-chart.command.ts index b93dd4c16ba..81b47d80042 100644 --- a/apps/api/src/app/activity/usecases/build-active-subscribers-trend-chart/build-active-subscribers-trend-chart.command.ts +++ b/apps/api/src/app/activity/usecases/build-active-subscribers-trend-chart/build-active-subscribers-trend-chart.command.ts @@ -1,5 +1,5 @@ import { EnvironmentCommand } from '@novu/application-generic'; -import { IsDate, IsDefined } from 'class-validator'; +import { IsArray, IsDate, IsDefined, IsOptional, IsString } from 'class-validator'; export class BuildActiveSubscribersTrendChartCommand extends EnvironmentCommand { @IsDate() @@ -9,4 +9,9 @@ export class BuildActiveSubscribersTrendChartCommand extends EnvironmentCommand @IsDate() @IsDefined() endDate: Date; + + @IsOptional() + @IsArray() + @IsString({ each: true }) + workflowIds?: string[]; } diff --git a/apps/api/src/app/activity/usecases/build-active-subscribers-trend-chart/build-active-subscribers-trend-chart.usecase.ts b/apps/api/src/app/activity/usecases/build-active-subscribers-trend-chart/build-active-subscribers-trend-chart.usecase.ts index 43c0010990e..828e72334f3 100644 --- a/apps/api/src/app/activity/usecases/build-active-subscribers-trend-chart/build-active-subscribers-trend-chart.usecase.ts +++ b/apps/api/src/app/activity/usecases/build-active-subscribers-trend-chart/build-active-subscribers-trend-chart.usecase.ts @@ -14,13 +14,14 @@ export class BuildActiveSubscribersTrendChart { @InstrumentUsecase() async execute(command: BuildActiveSubscribersTrendChartCommand): Promise { - const { environmentId, organizationId, startDate, endDate } = command; + const { environmentId, organizationId, startDate, endDate, workflowIds } = command; const activeSubscribers = await this.workflowRunRepository.getActiveSubscribersTrendData( environmentId, organizationId, startDate, - endDate + endDate, + workflowIds ); const chartDataMap = new Map(); diff --git a/apps/api/src/app/activity/usecases/build-avg-messages-per-subscriber-chart/build-avg-messages-per-subscriber-chart.command.ts b/apps/api/src/app/activity/usecases/build-avg-messages-per-subscriber-chart/build-avg-messages-per-subscriber-chart.command.ts index 9383ed8777c..63020c4594d 100644 --- a/apps/api/src/app/activity/usecases/build-avg-messages-per-subscriber-chart/build-avg-messages-per-subscriber-chart.command.ts +++ b/apps/api/src/app/activity/usecases/build-avg-messages-per-subscriber-chart/build-avg-messages-per-subscriber-chart.command.ts @@ -1,5 +1,5 @@ import { EnvironmentCommand } from '@novu/application-generic'; -import { IsDate, IsDefined } from 'class-validator'; +import { IsArray, IsDate, IsDefined, IsOptional, IsString } from 'class-validator'; export class BuildAvgMessagesPerSubscriberChartCommand extends EnvironmentCommand { @IsDate() @@ -9,4 +9,9 @@ export class BuildAvgMessagesPerSubscriberChartCommand extends EnvironmentComman @IsDate() @IsDefined() endDate: Date; + + @IsOptional() + @IsArray() + @IsString({ each: true }) + workflowIds?: string[]; } diff --git a/apps/api/src/app/activity/usecases/build-avg-messages-per-subscriber-chart/build-avg-messages-per-subscriber-chart.usecase.ts b/apps/api/src/app/activity/usecases/build-avg-messages-per-subscriber-chart/build-avg-messages-per-subscriber-chart.usecase.ts index 5961022970d..4d7a1c16eef 100644 --- a/apps/api/src/app/activity/usecases/build-avg-messages-per-subscriber-chart/build-avg-messages-per-subscriber-chart.usecase.ts +++ b/apps/api/src/app/activity/usecases/build-avg-messages-per-subscriber-chart/build-avg-messages-per-subscriber-chart.usecase.ts @@ -14,7 +14,7 @@ export class BuildAvgMessagesPerSubscriberChart { @InstrumentUsecase() async execute(command: BuildAvgMessagesPerSubscriberChartCommand): Promise { - const { environmentId, organizationId, startDate, endDate } = command; + const { environmentId, organizationId, startDate, endDate, workflowIds } = command; // Calculate previous period dates const periodDuration = endDate.getTime() - startDate.getTime(); @@ -27,7 +27,8 @@ export class BuildAvgMessagesPerSubscriberChart { startDate, endDate, previousStartDate, - previousEndDate + previousEndDate, + workflowIds ); return { diff --git a/apps/api/src/app/activity/usecases/build-delivery-trend-chart/build-delivery-trend-chart.command.ts b/apps/api/src/app/activity/usecases/build-delivery-trend-chart/build-delivery-trend-chart.command.ts index 0176ecd4fb0..23f72ef1001 100644 --- a/apps/api/src/app/activity/usecases/build-delivery-trend-chart/build-delivery-trend-chart.command.ts +++ b/apps/api/src/app/activity/usecases/build-delivery-trend-chart/build-delivery-trend-chart.command.ts @@ -1,5 +1,5 @@ import { EnvironmentCommand } from '@novu/application-generic'; -import { IsDate, IsDefined } from 'class-validator'; +import { IsArray, IsDate, IsDefined, IsOptional, IsString } from 'class-validator'; export class BuildDeliveryTrendChartCommand extends EnvironmentCommand { @IsDate() @@ -9,4 +9,9 @@ export class BuildDeliveryTrendChartCommand extends EnvironmentCommand { @IsDate() @IsDefined() endDate: Date; + + @IsOptional() + @IsArray() + @IsString({ each: true }) + workflowIds?: string[]; } diff --git a/apps/api/src/app/activity/usecases/build-delivery-trend-chart/build-delivery-trend-chart.usecase.ts b/apps/api/src/app/activity/usecases/build-delivery-trend-chart/build-delivery-trend-chart.usecase.ts index b29c7f10a38..9eaaef288f3 100644 --- a/apps/api/src/app/activity/usecases/build-delivery-trend-chart/build-delivery-trend-chart.usecase.ts +++ b/apps/api/src/app/activity/usecases/build-delivery-trend-chart/build-delivery-trend-chart.usecase.ts @@ -14,13 +14,14 @@ export class BuildDeliveryTrendChart { @InstrumentUsecase() async execute(command: BuildDeliveryTrendChartCommand): Promise { - const { environmentId, organizationId, startDate, endDate } = command; + const { environmentId, organizationId, startDate, endDate, workflowIds } = command; const stepRuns = await this.stepRunRepository.getDeliveryTrendData( environmentId, organizationId, startDate, - endDate + endDate, + workflowIds ); const chartDataMap = new Map>(); diff --git a/apps/api/src/app/activity/usecases/build-interaction-trend-chart/build-interaction-trend-chart.command.ts b/apps/api/src/app/activity/usecases/build-interaction-trend-chart/build-interaction-trend-chart.command.ts index 6c54edb02df..59263410553 100644 --- a/apps/api/src/app/activity/usecases/build-interaction-trend-chart/build-interaction-trend-chart.command.ts +++ b/apps/api/src/app/activity/usecases/build-interaction-trend-chart/build-interaction-trend-chart.command.ts @@ -1,5 +1,5 @@ import { EnvironmentCommand } from '@novu/application-generic'; -import { IsDate, IsDefined } from 'class-validator'; +import { IsArray, IsDate, IsDefined, IsOptional, IsString } from 'class-validator'; export class BuildInteractionTrendChartCommand extends EnvironmentCommand { @IsDate() @@ -9,4 +9,9 @@ export class BuildInteractionTrendChartCommand extends EnvironmentCommand { @IsDate() @IsDefined() endDate: Date; + + @IsOptional() + @IsArray() + @IsString({ each: true }) + workflowIds?: string[]; } diff --git a/apps/api/src/app/activity/usecases/build-interaction-trend-chart/build-interaction-trend-chart.usecase.ts b/apps/api/src/app/activity/usecases/build-interaction-trend-chart/build-interaction-trend-chart.usecase.ts index abfa0b86972..449fbeddade 100644 --- a/apps/api/src/app/activity/usecases/build-interaction-trend-chart/build-interaction-trend-chart.usecase.ts +++ b/apps/api/src/app/activity/usecases/build-interaction-trend-chart/build-interaction-trend-chart.usecase.ts @@ -14,13 +14,14 @@ export class BuildInteractionTrendChart { @InstrumentUsecase() async execute(command: BuildInteractionTrendChartCommand): Promise { - const { environmentId, organizationId, startDate, endDate } = command; + const { environmentId, organizationId, startDate, endDate, workflowIds } = command; const traces = await this.traceLogRepository.getInteractionTrendData( environmentId, organizationId, startDate, - endDate + endDate, + workflowIds ); const chartDataMap = new Map>(); diff --git a/apps/api/src/app/activity/usecases/build-messages-delivered-chart/build-messages-delivered-chart.command.ts b/apps/api/src/app/activity/usecases/build-messages-delivered-chart/build-messages-delivered-chart.command.ts index 91f4dc4919c..f4720825438 100644 --- a/apps/api/src/app/activity/usecases/build-messages-delivered-chart/build-messages-delivered-chart.command.ts +++ b/apps/api/src/app/activity/usecases/build-messages-delivered-chart/build-messages-delivered-chart.command.ts @@ -1,5 +1,5 @@ import { EnvironmentCommand } from '@novu/application-generic'; -import { IsDate, IsDefined } from 'class-validator'; +import { IsArray, IsDate, IsDefined, IsOptional, IsString } from 'class-validator'; export class BuildMessagesDeliveredChartCommand extends EnvironmentCommand { @IsDate() @@ -9,4 +9,9 @@ export class BuildMessagesDeliveredChartCommand extends EnvironmentCommand { @IsDate() @IsDefined() endDate: Date; + + @IsOptional() + @IsArray() + @IsString({ each: true }) + workflowIds?: string[]; } diff --git a/apps/api/src/app/activity/usecases/build-messages-delivered-chart/build-messages-delivered-chart.usecase.ts b/apps/api/src/app/activity/usecases/build-messages-delivered-chart/build-messages-delivered-chart.usecase.ts index 1b8d9a34d5a..bd1fe21bf49 100644 --- a/apps/api/src/app/activity/usecases/build-messages-delivered-chart/build-messages-delivered-chart.usecase.ts +++ b/apps/api/src/app/activity/usecases/build-messages-delivered-chart/build-messages-delivered-chart.usecase.ts @@ -14,7 +14,7 @@ export class BuildMessagesDeliveredChart { @InstrumentUsecase() async execute(command: BuildMessagesDeliveredChartCommand): Promise { - const { environmentId, organizationId, startDate, endDate } = command; + const { environmentId, organizationId, startDate, endDate, workflowIds } = command; // Calculate previous period dates const periodDuration = endDate.getTime() - startDate.getTime(); @@ -27,7 +27,8 @@ export class BuildMessagesDeliveredChart { startDate, endDate, previousStartDate, - previousEndDate + previousEndDate, + workflowIds ); return { diff --git a/apps/api/src/app/activity/usecases/build-provider-by-volume-chart/build-provider-by-volume-chart.command.ts b/apps/api/src/app/activity/usecases/build-provider-by-volume-chart/build-provider-by-volume-chart.command.ts index 446ae7184f4..1b2d2d74cb1 100644 --- a/apps/api/src/app/activity/usecases/build-provider-by-volume-chart/build-provider-by-volume-chart.command.ts +++ b/apps/api/src/app/activity/usecases/build-provider-by-volume-chart/build-provider-by-volume-chart.command.ts @@ -1,5 +1,5 @@ import { EnvironmentCommand } from '@novu/application-generic'; -import { IsDate, IsDefined } from 'class-validator'; +import { IsArray, IsDate, IsDefined, IsOptional, IsString } from 'class-validator'; export class BuildProviderByVolumeChartCommand extends EnvironmentCommand { @IsDate() @@ -9,4 +9,9 @@ export class BuildProviderByVolumeChartCommand extends EnvironmentCommand { @IsDate() @IsDefined() endDate: Date; + + @IsOptional() + @IsArray() + @IsString({ each: true }) + workflowIds?: string[]; } diff --git a/apps/api/src/app/activity/usecases/build-provider-by-volume-chart/build-provider-by-volume-chart.usecase.ts b/apps/api/src/app/activity/usecases/build-provider-by-volume-chart/build-provider-by-volume-chart.usecase.ts index c50cc401cdb..356c521000d 100644 --- a/apps/api/src/app/activity/usecases/build-provider-by-volume-chart/build-provider-by-volume-chart.usecase.ts +++ b/apps/api/src/app/activity/usecases/build-provider-by-volume-chart/build-provider-by-volume-chart.usecase.ts @@ -14,13 +14,14 @@ export class BuildProviderByVolumeChart { @InstrumentUsecase() async execute(command: BuildProviderByVolumeChartCommand): Promise { - const { environmentId, organizationId, startDate, endDate } = command; + const { environmentId, organizationId, startDate, endDate, workflowIds } = command; const providerData = await this.stepRunRepository.getProviderVolumeData( environmentId, organizationId, startDate, - endDate + endDate, + workflowIds ); return providerData.map((dataPoint) => ({ diff --git a/apps/api/src/app/activity/usecases/build-total-interactions-chart/build-total-interactions-chart.command.ts b/apps/api/src/app/activity/usecases/build-total-interactions-chart/build-total-interactions-chart.command.ts index 2439e76d597..eeb2b1464ea 100644 --- a/apps/api/src/app/activity/usecases/build-total-interactions-chart/build-total-interactions-chart.command.ts +++ b/apps/api/src/app/activity/usecases/build-total-interactions-chart/build-total-interactions-chart.command.ts @@ -1,5 +1,5 @@ import { EnvironmentCommand } from '@novu/application-generic'; -import { IsDate, IsDefined } from 'class-validator'; +import { IsArray, IsDate, IsDefined, IsOptional, IsString } from 'class-validator'; export class BuildTotalInteractionsChartCommand extends EnvironmentCommand { @IsDate() @@ -9,4 +9,9 @@ export class BuildTotalInteractionsChartCommand extends EnvironmentCommand { @IsDate() @IsDefined() endDate: Date; + + @IsOptional() + @IsArray() + @IsString({ each: true }) + workflowIds?: string[]; } diff --git a/apps/api/src/app/activity/usecases/build-total-interactions-chart/build-total-interactions-chart.usecase.ts b/apps/api/src/app/activity/usecases/build-total-interactions-chart/build-total-interactions-chart.usecase.ts index 6385f2bd0c0..ec7df2d271a 100644 --- a/apps/api/src/app/activity/usecases/build-total-interactions-chart/build-total-interactions-chart.usecase.ts +++ b/apps/api/src/app/activity/usecases/build-total-interactions-chart/build-total-interactions-chart.usecase.ts @@ -14,7 +14,7 @@ export class BuildTotalInteractionsChart { @InstrumentUsecase() async execute(command: BuildTotalInteractionsChartCommand): Promise { - const { environmentId, organizationId, startDate, endDate } = command; + const { environmentId, organizationId, startDate, endDate, workflowIds } = command; // Calculate previous period dates const periodDuration = endDate.getTime() - startDate.getTime(); @@ -27,7 +27,8 @@ export class BuildTotalInteractionsChart { startDate, endDate, previousStartDate, - previousEndDate + previousEndDate, + workflowIds ); return { diff --git a/apps/api/src/app/activity/usecases/build-workflow-by-volume-chart/build-workflow-by-volume-chart.command.ts b/apps/api/src/app/activity/usecases/build-workflow-by-volume-chart/build-workflow-by-volume-chart.command.ts index e6c1e16acbe..7911a5d8b15 100644 --- a/apps/api/src/app/activity/usecases/build-workflow-by-volume-chart/build-workflow-by-volume-chart.command.ts +++ b/apps/api/src/app/activity/usecases/build-workflow-by-volume-chart/build-workflow-by-volume-chart.command.ts @@ -1,5 +1,5 @@ import { EnvironmentCommand } from '@novu/application-generic'; -import { IsDate, IsDefined } from 'class-validator'; +import { IsArray, IsDate, IsDefined, IsOptional, IsString } from 'class-validator'; export class BuildWorkflowByVolumeChartCommand extends EnvironmentCommand { @IsDate() @@ -9,4 +9,9 @@ export class BuildWorkflowByVolumeChartCommand extends EnvironmentCommand { @IsDate() @IsDefined() endDate: Date; + + @IsOptional() + @IsArray() + @IsString({ each: true }) + workflowIds?: string[]; } diff --git a/apps/api/src/app/activity/usecases/build-workflow-by-volume-chart/build-workflow-by-volume-chart.usecase.ts b/apps/api/src/app/activity/usecases/build-workflow-by-volume-chart/build-workflow-by-volume-chart.usecase.ts index 56bfafeb8e1..487d9f8c46d 100644 --- a/apps/api/src/app/activity/usecases/build-workflow-by-volume-chart/build-workflow-by-volume-chart.usecase.ts +++ b/apps/api/src/app/activity/usecases/build-workflow-by-volume-chart/build-workflow-by-volume-chart.usecase.ts @@ -14,13 +14,14 @@ export class BuildWorkflowByVolumeChart { @InstrumentUsecase() async execute(command: BuildWorkflowByVolumeChartCommand): Promise { - const { environmentId, organizationId, startDate, endDate } = command; + const { environmentId, organizationId, startDate, endDate, workflowIds } = command; const workflowRuns = await this.workflowRunRepository.getWorkflowVolumeData( environmentId, organizationId, startDate, - endDate + endDate, + workflowIds ); const chartData: WorkflowVolumeDataPointDto[] = workflowRuns.map((workflowRun) => ({ diff --git a/apps/api/src/app/activity/usecases/build-workflow-runs-metric-chart/build-workflow-runs-metric-chart.command.ts b/apps/api/src/app/activity/usecases/build-workflow-runs-metric-chart/build-workflow-runs-metric-chart.command.ts index 6b01a63ac49..79c6e58c7b2 100644 --- a/apps/api/src/app/activity/usecases/build-workflow-runs-metric-chart/build-workflow-runs-metric-chart.command.ts +++ b/apps/api/src/app/activity/usecases/build-workflow-runs-metric-chart/build-workflow-runs-metric-chart.command.ts @@ -1,5 +1,5 @@ import { EnvironmentCommand } from '@novu/application-generic'; -import { IsDate, IsDefined } from 'class-validator'; +import { IsArray, IsDate, IsDefined, IsOptional, IsString } from 'class-validator'; export class BuildWorkflowRunsMetricChartCommand extends EnvironmentCommand { @IsDate() @@ -9,4 +9,9 @@ export class BuildWorkflowRunsMetricChartCommand extends EnvironmentCommand { @IsDate() @IsDefined() endDate: Date; + + @IsOptional() + @IsArray() + @IsString({ each: true }) + workflowIds?: string[]; } diff --git a/apps/api/src/app/activity/usecases/build-workflow-runs-metric-chart/build-workflow-runs-metric-chart.usecase.ts b/apps/api/src/app/activity/usecases/build-workflow-runs-metric-chart/build-workflow-runs-metric-chart.usecase.ts index 4b9c2016af9..2921294dca0 100644 --- a/apps/api/src/app/activity/usecases/build-workflow-runs-metric-chart/build-workflow-runs-metric-chart.usecase.ts +++ b/apps/api/src/app/activity/usecases/build-workflow-runs-metric-chart/build-workflow-runs-metric-chart.usecase.ts @@ -14,7 +14,7 @@ export class BuildWorkflowRunsMetricChart { @InstrumentUsecase() async execute(command: BuildWorkflowRunsMetricChartCommand): Promise { - const { environmentId, organizationId, startDate, endDate } = command; + const { environmentId, organizationId, startDate, endDate, workflowIds } = command; // Calculate previous period dates const periodDuration = endDate.getTime() - startDate.getTime(); @@ -27,7 +27,8 @@ export class BuildWorkflowRunsMetricChart { startDate, endDate, previousStartDate, - previousEndDate + previousEndDate, + workflowIds ); return { diff --git a/apps/api/src/app/activity/usecases/build-workflow-runs-trend-chart/build-workflow-runs-trend-chart.command.ts b/apps/api/src/app/activity/usecases/build-workflow-runs-trend-chart/build-workflow-runs-trend-chart.command.ts index ade128d2242..03a29dce35b 100644 --- a/apps/api/src/app/activity/usecases/build-workflow-runs-trend-chart/build-workflow-runs-trend-chart.command.ts +++ b/apps/api/src/app/activity/usecases/build-workflow-runs-trend-chart/build-workflow-runs-trend-chart.command.ts @@ -1,5 +1,5 @@ import { EnvironmentCommand } from '@novu/application-generic'; -import { IsDate, IsDefined } from 'class-validator'; +import { IsArray, IsDate, IsDefined, IsOptional, IsString } from 'class-validator'; export class BuildWorkflowRunsTrendChartCommand extends EnvironmentCommand { @IsDate() @@ -9,4 +9,9 @@ export class BuildWorkflowRunsTrendChartCommand extends EnvironmentCommand { @IsDate() @IsDefined() endDate: Date; + + @IsOptional() + @IsArray() + @IsString({ each: true }) + workflowIds?: string[]; } diff --git a/apps/api/src/app/activity/usecases/build-workflow-runs-trend-chart/build-workflow-runs-trend-chart.usecase.ts b/apps/api/src/app/activity/usecases/build-workflow-runs-trend-chart/build-workflow-runs-trend-chart.usecase.ts index 146268611a9..31e660275f2 100644 --- a/apps/api/src/app/activity/usecases/build-workflow-runs-trend-chart/build-workflow-runs-trend-chart.usecase.ts +++ b/apps/api/src/app/activity/usecases/build-workflow-runs-trend-chart/build-workflow-runs-trend-chart.usecase.ts @@ -14,13 +14,14 @@ export class BuildWorkflowRunsTrendChart { @InstrumentUsecase() async execute(command: BuildWorkflowRunsTrendChartCommand): Promise { - const { environmentId, organizationId, startDate, endDate } = command; + const { environmentId, organizationId, startDate, endDate, workflowIds } = command; const workflowRuns = await this.workflowRunRepository.getWorkflowRunsTrendData( environmentId, organizationId, startDate, - endDate + endDate, + workflowIds ); const chartDataMap = new Map>(); @@ -32,7 +33,7 @@ export class BuildWorkflowRunsTrendChart { dateKey, new Map([ ['pending', 0], // remove backward compatibility after data renews nv-6562 - ['processing', 0], + ['processing', 0], ['success', 0], // remove backward compatibility after data renews nv-6562 ['completed', 0], ['error', 0], @@ -48,7 +49,7 @@ export class BuildWorkflowRunsTrendChart { const statusMap = chartDataMap.get(date); if (statusMap?.has(status)) { const currentCount = statusMap.get(status) || 0; - statusMap.set(status, currentCount + parseInt(workflowRun.count, 10)); + statusMap.set(status, currentCount + parseInt(workflowRun.count, 10)); } } diff --git a/apps/api/src/app/activity/usecases/get-charts/get-charts.usecase.ts b/apps/api/src/app/activity/usecases/get-charts/get-charts.usecase.ts index 8652327bed8..e789368dd6d 100644 --- a/apps/api/src/app/activity/usecases/get-charts/get-charts.usecase.ts +++ b/apps/api/src/app/activity/usecases/get-charts/get-charts.usecase.ts @@ -132,6 +132,7 @@ export class GetCharts { organizationId, startDate, endDate, + workflowIds, }) ), }); @@ -146,6 +147,7 @@ export class GetCharts { organizationId, startDate, endDate, + workflowIds, }) ), }); @@ -160,6 +162,7 @@ export class GetCharts { organizationId, startDate, endDate, + workflowIds, }) ), }); @@ -174,6 +177,7 @@ export class GetCharts { organizationId, startDate, endDate, + workflowIds, }) ), }); @@ -188,6 +192,7 @@ export class GetCharts { organizationId, startDate, endDate, + workflowIds, }) ), }); @@ -202,6 +207,7 @@ export class GetCharts { organizationId, startDate, endDate, + workflowIds, }) ), }); @@ -216,6 +222,7 @@ export class GetCharts { organizationId, startDate, endDate, + workflowIds, }) ), }); @@ -230,6 +237,7 @@ export class GetCharts { organizationId, startDate, endDate, + workflowIds, }) ), }); @@ -261,6 +269,7 @@ export class GetCharts { organizationId, startDate, endDate, + workflowIds, }) ), }); @@ -275,6 +284,7 @@ export class GetCharts { organizationId, startDate, endDate, + workflowIds, }) ), }); @@ -289,6 +299,7 @@ export class GetCharts { organizationId, startDate, endDate, + workflowIds, }) ), }); diff --git a/apps/api/src/app/events/usecases/cancel-delayed/cancel-delayed.usecase.ts b/apps/api/src/app/events/usecases/cancel-delayed/cancel-delayed.usecase.ts index f6490f7f7e1..b1d005e9b7a 100644 --- a/apps/api/src/app/events/usecases/cancel-delayed/cancel-delayed.usecase.ts +++ b/apps/api/src/app/events/usecases/cancel-delayed/cancel-delayed.usecase.ts @@ -163,6 +163,7 @@ export class CancelDelayed { step_run_type: this.mapStepTypeEnumToStepType(job.type) || undefined, workflow_run_identifier: job.identifier || '', _notificationId: job._notificationId, + workflow_id: job._templateId, })); await this.messageInteractionService.trace( diff --git a/apps/api/src/app/events/usecases/parse-event-request/parse-event-request.usecase.ts b/apps/api/src/app/events/usecases/parse-event-request/parse-event-request.usecase.ts index 90e17bd8819..8a879f812f6 100644 --- a/apps/api/src/app/events/usecases/parse-event-request/parse-event-request.usecase.ts +++ b/apps/api/src/app/events/usecases/parse-event-request/parse-event-request.usecase.ts @@ -79,14 +79,14 @@ export class ParseEventRequest { const discoveredWorkflow = await this.queryDiscoverWorkflow(command); if (!discoveredWorkflow) { - await this.createRequestTrace( + await this.createRequestTrace({ requestId, command, - 'request_workflow_not_found', + eventType: 'request_workflow_not_found', transactionId, - 'error', - 'Bridge workflow not found' - ); + status: 'error', + message: 'Bridge workflow not found', + }); throw new UnprocessableEntityException('workflow_not_found'); } @@ -106,14 +106,14 @@ export class ParseEventRequest { })); if (!template) { - await this.createRequestTrace( + await this.createRequestTrace({ requestId, command, - 'request_workflow_not_found', + eventType: 'request_workflow_not_found', transactionId, - 'error', - 'Notification template not found' - ); + status: 'error', + message: 'Notification template not found', + }); throw new UnprocessableEntityException('workflow_not_found'); } @@ -127,15 +127,15 @@ export class ParseEventRequest { command.payload = validatedPayload; } catch (error) { if (error instanceof PayloadValidationException) { - await this.createRequestTrace( + await this.createRequestTrace({ requestId, command, - 'request_payload_validation_failed', + eventType: 'request_payload_validation_failed', transactionId, - 'error', - 'Payload validation failed', - { validationErrors: error.message, payload: command.payload } - ); + status: 'error', + message: 'Payload validation failed', + rawData: { validationErrors: error.message, payload: command.payload }, + }); } throw error; } @@ -222,35 +222,43 @@ export class ParseEventRequest { return result; } catch (error) { - // Trace: Request failed - await this.createRequestTrace( + await this.createRequestTrace({ requestId, command, - 'request_failed', + eventType: 'request_failed', transactionId, - 'error', - `Request processing failed: ${error.message}`, - { error: error.message, stack: error.stack } - ); + status: 'error', + message: `Request processing failed: ${error.message}`, + rawData: { error: error.message, stack: error.stack }, + }); throw error; } } - private async createRequestTrace( - requestId: string | undefined, - command: ParseEventRequestCommand, - eventType: EventType, - transactionId: string, - status: 'success' | 'error' = 'success', - message?: string, - rawData?: any - ): Promise { + private async createRequestTrace({ + requestId, + command, + eventType, + transactionId, + status = 'success', + message, + rawData, + }: { + requestId: string | undefined; + command: ParseEventRequestCommand; + eventType: EventType; + transactionId: string; + status?: 'success' | 'error'; + message?: string; + rawData?: any; + }): Promise { if (!requestId) { this.logger.warn( { command, eventType, transactionId, status, message, rawData }, 'Request trace skipped, no request ID found' ); + return; } @@ -270,6 +278,7 @@ export class ParseEventRequest { entity_type: 'request', entity_id: requestId, workflow_run_identifier: command.identifier, + workflow_id: command.workflow?._id || '', }; await this.traceLogRepository.createRequest([traceData]); @@ -341,15 +350,15 @@ export class ParseEventRequest { * otherwise we should continue with the valid recipients. */ if (!validRecipients && !isDryRun) { - await this.createRequestTrace( + await this.createRequestTrace({ requestId, command, - 'request_invalid_recipients', + eventType: 'request_invalid_recipients', transactionId, - 'error', - 'All recipients are invalid', - { invalidRecipients } - ); + status: 'error', + message: 'All recipients are invalid', + rawData: { invalidRecipients }, + }); return { acknowledged: true, diff --git a/apps/api/src/app/inbox/usecases/delete-many-notifications/delete-many-notifications.usecase.ts b/apps/api/src/app/inbox/usecases/delete-many-notifications/delete-many-notifications.usecase.ts index 71f62380550..cf659074bfa 100644 --- a/apps/api/src/app/inbox/usecases/delete-many-notifications/delete-many-notifications.usecase.ts +++ b/apps/api/src/app/inbox/usecases/delete-many-notifications/delete-many-notifications.usecase.ts @@ -216,5 +216,6 @@ function createTraceLog({ step_run_type: message.channel as StepType, workflow_run_identifier: '', _notificationId: message._notificationId, + workflow_id: message._templateId, }; } diff --git a/apps/api/src/app/inbox/usecases/mark-many-notifications-as/mark-many-notifications-as.usecase.ts b/apps/api/src/app/inbox/usecases/mark-many-notifications-as/mark-many-notifications-as.usecase.ts index 3a9ab2bf752..7438c816fa4 100644 --- a/apps/api/src/app/inbox/usecases/mark-many-notifications-as/mark-many-notifications-as.usecase.ts +++ b/apps/api/src/app/inbox/usecases/mark-many-notifications-as/mark-many-notifications-as.usecase.ts @@ -263,5 +263,6 @@ function createTraceLog({ step_run_type: message.channel as StepType, workflow_run_identifier: '', _notificationId: message._notificationId, + workflow_id: message._templateId, }; } diff --git a/apps/api/src/app/inbox/usecases/mark-notifications-as-seen/mark-notifications-as-seen.usecase.ts b/apps/api/src/app/inbox/usecases/mark-notifications-as-seen/mark-notifications-as-seen.usecase.ts index b89eef0c217..52302e0baa4 100644 --- a/apps/api/src/app/inbox/usecases/mark-notifications-as-seen/mark-notifications-as-seen.usecase.ts +++ b/apps/api/src/app/inbox/usecases/mark-notifications-as-seen/mark-notifications-as-seen.usecase.ts @@ -275,6 +275,7 @@ export class MarkNotificationsAsSeen { step_run_type: message.channel as StepType, workflow_run_identifier: '', _notificationId: message._notificationId, + workflow_id: message._templateId, }; } } diff --git a/apps/api/src/app/widgets/usecases/mark-message-as/mark-message-as.usecase.ts b/apps/api/src/app/widgets/usecases/mark-message-as/mark-message-as.usecase.ts index 374a81b86ce..08eb7ccd33d 100644 --- a/apps/api/src/app/widgets/usecases/mark-message-as/mark-message-as.usecase.ts +++ b/apps/api/src/app/widgets/usecases/mark-message-as/mark-message-as.usecase.ts @@ -146,6 +146,7 @@ export class MarkMessageAs { step_run_type: message.channel as StepType, workflow_run_identifier: '', _notificationId: message._notificationId, + workflow_id: message._templateId, }); } } diff --git a/apps/dashboard/src/pages/analytics.tsx b/apps/dashboard/src/pages/analytics.tsx index 1dc109810c8..5c6762e81f0 100644 --- a/apps/dashboard/src/pages/analytics.tsx +++ b/apps/dashboard/src/pages/analytics.tsx @@ -1,8 +1,8 @@ import { useOrganization } from '@clerk/clerk-react'; -import { EnvironmentTypeEnum } from '@novu/shared'; +import { EnvironmentTypeEnum, FeatureFlagsKeysEnum } from '@novu/shared'; import { CalendarIcon } from 'lucide-react'; import { motion } from 'motion/react'; -import { useEffect } from 'react'; +import { useEffect, useState } from 'react'; import { useSearchParams } from 'react-router-dom'; import { type ActiveSubscribersTrendDataPoint, @@ -28,8 +28,10 @@ import { Badge } from '../components/primitives/badge'; import { FacetedFormFilter } from '../components/primitives/form/faceted-filter/facated-form-filter'; import { InlineToast } from '../components/primitives/inline-toast'; import { useEnvironment } from '../context/environment/hooks'; +import { useFeatureFlag } from '../hooks/use-feature-flag'; import { useFetchCharts } from '../hooks/use-fetch-charts'; import { useFetchSubscription } from '../hooks/use-fetch-subscription'; +import { useFetchWorkflows } from '../hooks/use-fetch-workflows'; import { useTelemetry } from '../hooks/use-telemetry'; import { TelemetryEvent } from '../utils/telemetry'; @@ -39,6 +41,7 @@ export function AnalyticsPage() { const { subscription } = useFetchSubscription(); const { currentEnvironment, switchEnvironment, oppositeEnvironment } = useEnvironment(); const [searchParams] = useSearchParams(); + const isWorkflowFilterEnabled = useFeatureFlag(FeatureFlagsKeysEnum.IS_ANALYTICS_WORKFLOW_FILTER_ENABLED); const isDevMockMode = searchParams.get('dev_mock_date') === 'true'; @@ -48,6 +51,9 @@ export function AnalyticsPage() { upgradeCtaIcon: AnalyticsUpgradeCtaIcon, }); + const [selectedWorkflows, setSelectedWorkflows] = useState([]); + const { data: workflowTemplates } = useFetchWorkflows({ limit: 100 }); + // Define report types for each section const metricsReportTypes = [ ReportTypeEnum.MESSAGES_DELIVERED, @@ -68,6 +74,7 @@ export function AnalyticsPage() { const { charts: metricsCharts, isLoading: isMetricsLoading } = useFetchCharts({ reportType: metricsReportTypes, createdAtGte: chartsDateRange.createdAtGte, + workflowIds: selectedWorkflows.length > 0 ? selectedWorkflows : undefined, enabled: true, refetchInterval: CHART_CONFIG.refetchInterval, staleTime: CHART_CONFIG.staleTime, @@ -81,6 +88,7 @@ export function AnalyticsPage() { } = useFetchCharts({ reportType: chartsReportTypes, createdAtGte: chartsDateRange.createdAtGte, + workflowIds: selectedWorkflows.length > 0 ? selectedWorkflows : undefined, enabled: true, refetchInterval: CHART_CONFIG.refetchInterval, staleTime: CHART_CONFIG.staleTime, @@ -110,7 +118,7 @@ export function AnalyticsPage() { } > - + setSelectedDateRange(values[0])} icon={CalendarIcon} /> + {isWorkflowFilterEnabled && ( + ({ + label: workflow.name, + value: workflow._id, + })) || [] + } + selected={selectedWorkflows} + onSelect={(values) => setSelectedWorkflows(values)} + /> + )}
diff --git a/apps/worker/src/app/workflow/usecases/subscriber-job-bound/subscriber-job-bound.usecase.ts b/apps/worker/src/app/workflow/usecases/subscriber-job-bound/subscriber-job-bound.usecase.ts index b565d38b40e..54162f8243d 100644 --- a/apps/worker/src/app/workflow/usecases/subscriber-job-bound/subscriber-job-bound.usecase.ts +++ b/apps/worker/src/app/workflow/usecases/subscriber-job-bound/subscriber-job-bound.usecase.ts @@ -344,6 +344,7 @@ export class SubscriberJobBound { entity_type: 'request', entity_id: command.requestId, workflow_run_identifier: command.identifier, + workflow_id: command.templateId, }; await this.traceLogRepository.createRequest([traceData]); diff --git a/libs/application-generic/src/services/analytic-logs/step-run/step-run.repository.ts b/libs/application-generic/src/services/analytic-logs/step-run/step-run.repository.ts index a5e8a92e620..8ce54c5ee44 100644 --- a/libs/application-generic/src/services/analytic-logs/step-run/step-run.repository.ts +++ b/libs/application-generic/src/services/analytic-logs/step-run/step-run.repository.ts @@ -166,8 +166,12 @@ export class StepRunRepository extends LogRepository> { + const workflowFilter = + workflowIds && workflowIds.length > 0 ? `AND workflow_id IN {workflowIds:Array(String)}` : ''; + // Use ClickHouse aggregation query to get counts by date and step_type const query = ` SELECT @@ -182,17 +186,22 @@ export class StepRunRepository extends LogRepository = { environmentId, organizationId, startDate: LogRepository.formatDateTime64(startDate), endDate: LogRepository.formatDateTime64(endDate), }; + if (workflowIds && workflowIds.length > 0) { + params.workflowIds = workflowIds; + } + const result = await this.clickhouseService.query<{ date: string; step_type: string; @@ -211,8 +220,12 @@ export class StepRunRepository extends LogRepository { + const workflowFilter = + workflowIds && workflowIds.length > 0 ? `AND workflow_id IN {workflowIds:Array(String)}` : ''; + // Query for current period const currentPeriodQuery = ` SELECT count(*) as count @@ -224,6 +237,7 @@ export class StepRunRepository extends LogRepository = { environmentId, organizationId, }; + if (workflowIds && workflowIds.length > 0) { + baseParams.workflowIds = workflowIds; + } + const [currentResult, previousResult] = await Promise.all([ this.clickhouseService.query<{ count: string }>({ query: currentPeriodQuery, @@ -278,8 +297,12 @@ export class StepRunRepository extends LogRepository { + const workflowFilter = + workflowIds && workflowIds.length > 0 ? `AND workflow_id IN {workflowIds:Array(String)}` : ''; + // Query for current period average const currentPeriodQuery = ` SELECT @@ -293,6 +316,7 @@ export class StepRunRepository extends LogRepository = { environmentId, organizationId, }; + if (workflowIds && workflowIds.length > 0) { + baseParams.workflowIds = workflowIds; + } + const [currentResult, previousResult] = await Promise.all([ this.clickhouseService.query<{ total_step_runs: string; unique_subscribers: string }>({ query: currentPeriodQuery, @@ -353,8 +382,12 @@ export class StepRunRepository extends LogRepository> { + const workflowFilter = + workflowIds && workflowIds.length > 0 ? `AND workflow_id IN {workflowIds:Array(String)}` : ''; + const query = ` SELECT provider_id, @@ -367,18 +400,23 @@ export class StepRunRepository extends LogRepository = { environmentId, organizationId, startDate: LogRepository.formatDateTime64(startDate), endDate: LogRepository.formatDateTime64(endDate), }; + if (workflowIds && workflowIds.length > 0) { + params.workflowIds = workflowIds; + } + const result = await this.clickhouseService.query<{ provider_id: string; count: string; @@ -402,6 +440,7 @@ export class StepRunRepository extends LogRepository> { + const workflowFilter = + workflowIds && workflowIds.length > 0 ? `AND traces.workflow_id IN {workflowIds:Array(String)}` : ''; + const query = ` SELECT toDate(traces.created_at) as date, @@ -117,17 +121,22 @@ export class TraceLogRepository extends LogRepository= {startDate:DateTime64(3)} AND traces.created_at <= {endDate:DateTime64(3)} AND traces.event_type IN ('message_seen', 'message_read', 'message_snoozed', 'message_archived') + ${workflowFilter} GROUP BY date, traces.event_type ORDER BY date, traces.event_type `; - const params = { + const params: Record = { environmentId, organizationId, startDate: LogRepository.formatDateTime64(startDate), endDate: LogRepository.formatDateTime64(endDate), }; + if (workflowIds && workflowIds.length > 0) { + params.workflowIds = workflowIds; + } + const result = await this.clickhouseService.query<{ date: string; event_type: string; @@ -146,8 +155,12 @@ export class TraceLogRepository extends LogRepository { + const workflowFilter = + workflowIds && workflowIds.length > 0 ? `AND workflow_id IN {workflowIds:Array(String)}` : ''; + const currentQuery = ` SELECT count(*) as count FROM traces @@ -158,6 +171,7 @@ export class TraceLogRepository extends LogRepository = { environmentId, organizationId, startDate: LogRepository.formatDateTime64(startDate), endDate: LogRepository.formatDateTime64(endDate), }; - const previousParams = { + const previousParams: Record = { environmentId, organizationId, previousStartDate: LogRepository.formatDateTime64(previousStartDate), previousEndDate: LogRepository.formatDateTime64(previousEndDate), }; + if (workflowIds && workflowIds.length > 0) { + currentParams.workflowIds = workflowIds; + previousParams.workflowIds = workflowIds; + } + const [currentResult, previousResult] = await Promise.all([ this.clickhouseService.query<{ count: string }>({ query: currentQuery, diff --git a/libs/application-generic/src/services/analytic-logs/trace-log/trace-log.schema.ts b/libs/application-generic/src/services/analytic-logs/trace-log/trace-log.schema.ts index 991cc06de53..8d8b12a432c 100644 --- a/libs/application-generic/src/services/analytic-logs/trace-log/trace-log.schema.ts +++ b/libs/application-generic/src/services/analytic-logs/trace-log/trace-log.schema.ts @@ -1,5 +1,4 @@ import { - CHArray, CHDateTime64, CHLowCardinality, CHNullable, @@ -43,6 +42,7 @@ const schemaDefinition = { // Workflow run metadata workflow_run_identifier: { type: CHString('') }, // default value is empty string + workflow_id: { type: CHString('') }, // Maps to NotificationTemplateEntity._id }; export const ORDER_BY: (keyof typeof schemaDefinition)[] = [ diff --git a/libs/application-generic/src/services/analytic-logs/workflow-run/workflow-run.repository.ts b/libs/application-generic/src/services/analytic-logs/workflow-run/workflow-run.repository.ts index cf57d5e6163..c1d67f08a5e 100644 --- a/libs/application-generic/src/services/analytic-logs/workflow-run/workflow-run.repository.ts +++ b/libs/application-generic/src/services/analytic-logs/workflow-run/workflow-run.repository.ts @@ -501,8 +501,12 @@ export class WorkflowRunRepository extends LogRepository> { + const workflowFilter = + workflowIds && workflowIds.length > 0 ? 'AND workflow_id IN {workflowIds:Array(String)}' : ''; + const query = ` SELECT workflow_name, @@ -513,18 +517,23 @@ export class WorkflowRunRepository extends LogRepository= {startDate:DateTime64(3)} AND created_at <= {endDate:DateTime64(3)} + ${workflowFilter} GROUP BY workflow_name ORDER BY count DESC LIMIT 5 `; - const params = { + const params: Record = { environmentId, organizationId, startDate: LogRepository.formatDateTime64(startDate), endDate: LogRepository.formatDateTime64(endDate), }; + if (workflowIds && workflowIds.length > 0) { + params.workflowIds = workflowIds; + } + const result = await this.clickhouseService.query<{ workflow_name: string; count: string; @@ -542,8 +551,12 @@ export class WorkflowRunRepository extends LogRepository { + const workflowFilter = + workflowIds && workflowIds.length > 0 ? 'AND workflow_id IN {workflowIds:Array(String)}' : ''; + // Query for current period const currentPeriodQuery = ` SELECT count(DISTINCT external_subscriber_id) as count @@ -553,6 +566,7 @@ export class WorkflowRunRepository extends LogRepository= {startDate:DateTime64(3)} AND created_at <= {endDate:DateTime64(3)} + ${workflowFilter} `; // Query for previous period @@ -564,13 +578,18 @@ export class WorkflowRunRepository extends LogRepository= {previousStartDate:DateTime64(3)} AND created_at <= {previousEndDate:DateTime64(3)} + ${workflowFilter} `; - const baseParams = { + const baseParams: Record = { environmentId, organizationId, }; + if (workflowIds && workflowIds.length > 0) { + baseParams.workflowIds = workflowIds; + } + const [currentResult, previousResult] = await Promise.all([ this.clickhouseService.query<{ count: string }>({ query: currentPeriodQuery, @@ -605,8 +624,12 @@ export class WorkflowRunRepository extends LogRepository { + const workflowFilter = + workflowIds && workflowIds.length > 0 ? 'AND workflow_id IN {workflowIds:Array(String)}' : ''; + // Query for current period const currentPeriodQuery = ` SELECT count(*) as count @@ -616,6 +639,7 @@ export class WorkflowRunRepository extends LogRepository= {startDate:DateTime64(3)} AND created_at <= {endDate:DateTime64(3)} + ${workflowFilter} `; // Query for previous period @@ -627,13 +651,18 @@ export class WorkflowRunRepository extends LogRepository= {previousStartDate:DateTime64(3)} AND created_at <= {previousEndDate:DateTime64(3)} + ${workflowFilter} `; - const baseParams = { + const baseParams: Record = { environmentId, organizationId, }; + if (workflowIds && workflowIds.length > 0) { + baseParams.workflowIds = workflowIds; + } + const [currentResult, previousResult] = await Promise.all([ this.clickhouseService.query<{ count: string }>({ query: currentPeriodQuery, @@ -666,8 +695,12 @@ export class WorkflowRunRepository extends LogRepository> { + const workflowFilter = + workflowIds && workflowIds.length > 0 ? 'AND workflow_id IN {workflowIds:Array(String)}' : ''; + const query = ` SELECT toDate(created_at) as date, @@ -679,17 +712,22 @@ export class WorkflowRunRepository extends LogRepository= {startDate:DateTime64(3)} AND created_at <= {endDate:DateTime64(3)} + ${workflowFilter} GROUP BY date, status ORDER BY date, status `; - const params = { + const params: Record = { environmentId, organizationId, startDate: LogRepository.formatDateTime64(startDate), endDate: LogRepository.formatDateTime64(endDate), }; + if (workflowIds && workflowIds.length > 0) { + params.workflowIds = workflowIds; + } + const result = await this.clickhouseService.query<{ date: string; status: string; @@ -706,8 +744,12 @@ export class WorkflowRunRepository extends LogRepository> { + const workflowFilter = + workflowIds && workflowIds.length > 0 ? 'AND workflow_id IN {workflowIds:Array(String)}' : ''; + const query = ` SELECT toDate(created_at) as date, @@ -718,17 +760,22 @@ export class WorkflowRunRepository extends LogRepository= {startDate:DateTime64(3)} AND created_at <= {endDate:DateTime64(3)} + ${workflowFilter} GROUP BY date ORDER BY date `; - const params = { + const params: Record = { environmentId, organizationId, startDate: LogRepository.formatDateTime64(startDate), endDate: LogRepository.formatDateTime64(endDate), }; + if (workflowIds && workflowIds.length > 0) { + params.workflowIds = workflowIds; + } + const result = await this.clickhouseService.query<{ date: string; count: string; diff --git a/libs/application-generic/src/services/message-interaction.service.ts b/libs/application-generic/src/services/message-interaction.service.ts index f44f71e19c1..fd4fe97bdc5 100644 --- a/libs/application-generic/src/services/message-interaction.service.ts +++ b/libs/application-generic/src/services/message-interaction.service.ts @@ -50,6 +50,7 @@ export class MessageInteractionService { raw_data: trace.raw_data, status: trace.status, workflow_run_identifier: trace.workflow_run_identifier, + workflow_id: trace.workflow_id, }) satisfies Omit ) ); diff --git a/libs/application-generic/src/usecases/create-execution-details/create-execution-details.usecase.ts b/libs/application-generic/src/usecases/create-execution-details/create-execution-details.usecase.ts index 248e6c5f0b6..e98f860a468 100644 --- a/libs/application-generic/src/usecases/create-execution-details/create-execution-details.usecase.ts +++ b/libs/application-generic/src/usecases/create-execution-details/create-execution-details.usecase.ts @@ -182,6 +182,7 @@ export class CreateExecutionDetails { entity_id: command.jobId, step_run_type: command.channel as StepType, workflow_run_identifier: command.workflowRunIdentifier, + workflow_id: command.notificationTemplateId, }; await this.traceLogRepository.createStepRun([traceData]); diff --git a/libs/application-generic/src/usecases/trigger-broadcast/trigger-broadcast.usecase.ts b/libs/application-generic/src/usecases/trigger-broadcast/trigger-broadcast.usecase.ts index 1f000f4dede..c30fa0da057 100644 --- a/libs/application-generic/src/usecases/trigger-broadcast/trigger-broadcast.usecase.ts +++ b/libs/application-generic/src/usecases/trigger-broadcast/trigger-broadcast.usecase.ts @@ -125,6 +125,7 @@ export class TriggerBroadcast extends TriggerBase { entity_type: 'request', entity_id: command.requestId, workflow_run_identifier: command.template.triggers[0].identifier, + workflow_id: command.template._id, }; await this.traceLogRepository.createRequest([traceData]); diff --git a/libs/application-generic/src/usecases/trigger-event/trigger-event.usecase.ts b/libs/application-generic/src/usecases/trigger-event/trigger-event.usecase.ts index fc319ec2c1f..2074e3c13c4 100644 --- a/libs/application-generic/src/usecases/trigger-event/trigger-event.usecase.ts +++ b/libs/application-generic/src/usecases/trigger-event/trigger-event.usecase.ts @@ -55,10 +55,29 @@ export class TriggerEvent { @InstrumentUsecase() async execute(command: TriggerEventCommand) { - await this.createWorkflowTrace(command, 'workflow_execution_started', 'success', 'Workflow execution started'); + let storedWorkflow: NotificationTemplateEntity | null = null; try { - const mappedCommand = await this.getMappedCommand(command); + if (!command.bridgeWorkflow) { + storedWorkflow = await this.getAndUpdateWorkflowById({ + environmentId: command.environmentId, + triggerIdentifier: command.identifier, + payload: command.payload, + organizationId: command.organizationId, + userId: command.userId, + }); + } + + const mappedCommand = await this.getMappedCommand(command, storedWorkflow?._id); + + await this.createWorkflowTrace({ + command, + eventType: 'workflow_execution_started', + status: 'success', + message: 'Workflow execution started', + workflowId: storedWorkflow?._id, + }); + const { environmentId, identifier, organizationId, userId } = mappedCommand; const environment = await this.environmentRepository.findOne({ @@ -87,25 +106,15 @@ export class TriggerEvent { }, }); - let storedWorkflow: NotificationTemplateEntity | null = null; - if (!command.bridgeWorkflow) { - storedWorkflow = await this.getAndUpdateWorkflowById({ - environmentId: mappedCommand.environmentId, - triggerIdentifier: mappedCommand.identifier, - payload: mappedCommand.payload, - organizationId: mappedCommand.organizationId, - userId: mappedCommand.userId, - }); - } - if (!storedWorkflow && !command.bridgeWorkflow) { - await this.createWorkflowTrace( + await this.createWorkflowTrace({ command, - 'workflow_template_not_found', - 'error', - 'Notification template could not be found', - { identifier: mappedCommand.identifier } - ); + eventType: 'workflow_template_not_found', + status: 'error', + message: 'Notification template could not be found', + rawData: { identifier: mappedCommand.identifier }, + workflowId: storedWorkflow?._id, + }); throw new BadRequestException('Notification template could not be found'); } @@ -120,13 +129,14 @@ export class TriggerEvent { ); if (!tenantProcessed) { - await this.createWorkflowTrace( + await this.createWorkflowTrace({ command, - 'workflow_tenant_processing_failed', - 'warning', - 'Tenant processing failed', - { tenantIdentifier: mappedCommand.tenant.identifier } - ); + eventType: 'workflow_tenant_processing_failed', + status: 'warning', + message: 'Tenant processing failed', + rawData: { tenantIdentifier: mappedCommand.tenant.identifier }, + workflowId: storedWorkflow?._id, + }); Logger.warn( `Tenant with identifier ${JSON.stringify( mappedCommand.tenant.identifier @@ -147,13 +157,14 @@ export class TriggerEvent { this.buildCommand(environmentId, organizationId, mappedCommand.actor) ); } catch (error: any) { - await this.createWorkflowTrace( + await this.createWorkflowTrace({ command, - 'workflow_actor_processing_failed', - 'error', - 'Actor processing failed', - { error: error.message, stack: error.stack } - ); + eventType: 'workflow_actor_processing_failed', + status: 'error', + message: 'Actor processing failed', + rawData: { error: error.message, stack: error.stack }, + workflowId: storedWorkflow?._id, + }); throw error; } } @@ -196,13 +207,14 @@ export class TriggerEvent { } } catch (e) { const error = e as Error; - await this.createWorkflowTrace( + await this.createWorkflowTrace({ command, - 'workflow_execution_failed', - 'error', - `Workflow execution failed: ${error.message}`, - { error: error.message, stack: error.stack } - ); + eventType: 'workflow_execution_failed', + status: 'error', + message: `Workflow execution failed: ${error.message}`, + rawData: { error: error.message, stack: error.stack }, + workflowId: storedWorkflow?._id, + }); Logger.error( { @@ -219,7 +231,7 @@ export class TriggerEvent { } } - private async getMappedCommand(command: TriggerEventCommand) { + private async getMappedCommand(command: TriggerEventCommand, workflowId: string) { const isContextEnabled = await this.featureFlagsService.getFlag({ key: FeatureFlagsKeysEnum.IS_CONTEXT_ENABLED, defaultValue: false, @@ -232,17 +244,20 @@ export class TriggerEvent { ...command, tenant: this.mapTenant(command.tenant), actor: this.mapActor(command.actor), - ...(isContextEnabled && { contextKeys: await this.resolveContextKeys(command) }), + ...(isContextEnabled && { contextKeys: await this.resolveContextKeys(command, workflowId) }), }; } - private async createWorkflowTrace( - command: TriggerEventCommand, - eventType: EventType, - status: 'success' | 'error' | 'warning' = 'success', - message?: string, - rawData?: any - ): Promise { + private async createWorkflowTrace(params: { + command: TriggerEventCommand; + eventType: EventType; + status?: 'success' | 'error' | 'warning'; + message?: string; + rawData?: any; + workflowId?: string; + }): Promise { + const { command, eventType, status = 'success', message, rawData, workflowId } = params; + if (!command.requestId) { return; } @@ -263,6 +278,7 @@ export class TriggerEvent { entity_type: 'request', entity_id: command.requestId, workflow_run_identifier: command.identifier, + workflow_id: workflowId || '', }; await this.traceLogRepository.createRequest([traceData]); @@ -374,7 +390,7 @@ export class TriggerEvent { return subscriber; } - private async resolveContextKeys(command: TriggerEventCommand): Promise { + private async resolveContextKeys(command: TriggerEventCommand, workflowId: string): Promise { if (!command.context) { return []; } @@ -386,14 +402,21 @@ export class TriggerEvent { command.context ); - this.createWorkflowTrace(command, 'workflow_context_resolution_completed', 'success', 'Context resolved', { - context: contexts.map((context) => ({ - id: context.id, - type: context.type, - data: context.data, - createdAt: context.createdAt, - updatedAt: context.updatedAt, - })), + this.createWorkflowTrace({ + command, + eventType: 'workflow_context_resolution_completed', + status: 'success', + message: 'Context resolved', + rawData: { + context: contexts.map((context) => ({ + id: context.id, + type: context.type, + data: context.data, + createdAt: context.createdAt, + updatedAt: context.updatedAt, + })), + }, + workflowId, }); return contexts.map((context) => context.key); @@ -410,8 +433,13 @@ export class TriggerEvent { ); if (error instanceof BadRequestException) { - this.createWorkflowTrace(command, 'workflow_context_resolution_failed', 'error', 'Context resolution failed', { - context: command.context, + this.createWorkflowTrace({ + command, + eventType: 'workflow_context_resolution_failed', + status: 'error', + message: 'Context resolution failed', + rawData: { context: command.context }, + workflowId, }); } throw new BadRequestException( diff --git a/libs/application-generic/src/usecases/trigger-multicast/trigger-multicast.usecase.ts b/libs/application-generic/src/usecases/trigger-multicast/trigger-multicast.usecase.ts index b570d3a19c6..8c7917ea4d5 100644 --- a/libs/application-generic/src/usecases/trigger-multicast/trigger-multicast.usecase.ts +++ b/libs/application-generic/src/usecases/trigger-multicast/trigger-multicast.usecase.ts @@ -173,6 +173,7 @@ export class TriggerMulticast extends TriggerBase { entity_type: 'request', entity_id: command.requestId, workflow_run_identifier: command.template.triggers[0].identifier, + workflow_id: command.template._id, }; await this.traceLogRepository.createRequest([traceData]); diff --git a/libs/dal/src/repositories/message/message.entity.ts b/libs/dal/src/repositories/message/message.entity.ts index 63f8f5fdef5..d18c4a1a823 100644 --- a/libs/dal/src/repositories/message/message.entity.ts +++ b/libs/dal/src/repositories/message/message.entity.ts @@ -23,6 +23,7 @@ export type MessageChannelData