Skip to content

Commit 747c673

Browse files
authored
fix: emit regular pipeline heartbeats (HEXA-1406) (#329)
1 parent f93363c commit 747c673

16 files changed

+434
-36
lines changed

openhexa/graphql/graphql_client/__init__.py

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -388,6 +388,10 @@
388388
UpdateDatasetUpdateDataset,
389389
UpdateDatasetUpdateDatasetDataset,
390390
)
391+
from .update_pipeline_heartbeat import (
392+
UpdatePipelineHeartbeat,
393+
UpdatePipelineHeartbeatUpdatePipelineHeartbeat,
394+
)
391395
from .update_webapp import UpdateWebapp, UpdateWebappUpdateWebapp
392396
from .update_workspace import (
393397
UpdateWorkspace,
@@ -737,6 +741,8 @@
737741
"UpdateOrganizationMemberError",
738742
"UpdateOrganizationMemberInput",
739743
"UpdatePipelineError",
744+
"UpdatePipelineHeartbeat",
745+
"UpdatePipelineHeartbeatUpdatePipelineHeartbeat",
740746
"UpdatePipelineInput",
741747
"UpdatePipelineProgressInput",
742748
"UpdatePipelineRecipientInput",

openhexa/graphql/graphql_client/client.py

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -84,6 +84,10 @@
8484
from .stop_pipeline import StopPipeline, StopPipelineStopPipeline
8585
from .update_connection import UpdateConnection, UpdateConnectionUpdateConnection
8686
from .update_dataset import UpdateDataset, UpdateDatasetUpdateDataset
87+
from .update_pipeline_heartbeat import (
88+
UpdatePipelineHeartbeat,
89+
UpdatePipelineHeartbeatUpdatePipelineHeartbeat,
90+
)
8791
from .update_webapp import UpdateWebapp, UpdateWebappUpdateWebapp
8892
from .update_workspace import UpdateWorkspace, UpdateWorkspaceUpdateWorkspace
8993
from .upgrade_pipeline_version_from_template import (
@@ -310,6 +314,29 @@ def stop_pipeline(
310314
data = self.get_data(response)
311315
return StopPipeline.model_validate(data).stop_pipeline
312316

317+
def update_pipeline_heartbeat(
318+
self, **kwargs: Any
319+
) -> UpdatePipelineHeartbeatUpdatePipelineHeartbeat:
320+
query = gql(
321+
"""
322+
mutation UpdatePipelineHeartbeat {
323+
updatePipelineHeartbeat {
324+
success
325+
errors
326+
}
327+
}
328+
"""
329+
)
330+
variables: Dict[str, object] = {}
331+
response = self.execute(
332+
query=query,
333+
operation_name="UpdatePipelineHeartbeat",
334+
variables=variables,
335+
**kwargs
336+
)
337+
data = self.get_data(response)
338+
return UpdatePipelineHeartbeat.model_validate(data).update_pipeline_heartbeat
339+
313340
def add_pipeline_recipient(
314341
self, input: CreatePipelineRecipientInput, **kwargs: Any
315342
) -> AddPipelineRecipientAddPipelineRecipient:

openhexa/graphql/graphql_client/create_pipeline_from_template_version.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -10,9 +10,9 @@
1010

1111

1212
class CreatePipelineFromTemplateVersion(BaseModel):
13-
create_pipeline_from_template_version: "CreatePipelineFromTemplateVersionCreatePipelineFromTemplateVersion" = Field(
14-
alias="createPipelineFromTemplateVersion"
15-
)
13+
create_pipeline_from_template_version: (
14+
"CreatePipelineFromTemplateVersionCreatePipelineFromTemplateVersion"
15+
) = Field(alias="createPipelineFromTemplateVersion")
1616

1717

1818
class CreatePipelineFromTemplateVersionCreatePipelineFromTemplateVersion(BaseModel):

openhexa/graphql/graphql_client/create_pipeline_template_version.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -10,9 +10,9 @@
1010

1111

1212
class CreatePipelineTemplateVersion(BaseModel):
13-
create_pipeline_template_version: "CreatePipelineTemplateVersionCreatePipelineTemplateVersion" = Field(
14-
alias="createPipelineTemplateVersion"
15-
)
13+
create_pipeline_template_version: (
14+
"CreatePipelineTemplateVersionCreatePipelineTemplateVersion"
15+
) = Field(alias="createPipelineTemplateVersion")
1616

1717

1818
class CreatePipelineTemplateVersionCreatePipelineTemplateVersion(BaseModel):

openhexa/graphql/graphql_client/enums.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -750,6 +750,7 @@ class UpdateTeamError(str, Enum):
750750

751751

752752
class UpdateTemplateError(str, Enum):
753+
INVALID_CONFIG = "INVALID_CONFIG"
753754
NOT_FOUND = "NOT_FOUND"
754755
PERMISSION_DENIED = "PERMISSION_DENIED"
755756

openhexa/graphql/graphql_client/input_types.py

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -147,6 +147,7 @@ class CreatePipelineInput(BaseModel):
147147
)
148148
name: str
149149
notebook_path: Optional[str] = Field(alias="notebookPath", default=None)
150+
tags: Optional[List[str]] = None
150151
workspace_slug: str = Field(alias="workspaceSlug")
151152

152153

@@ -638,8 +639,12 @@ class UpdateTeamInput(BaseModel):
638639
class UpdateTemplateInput(BaseModel):
639640
config: Optional[Any] = None
640641
description: Optional[str] = None
642+
functional_type: Optional[PipelineFunctionalType] = Field(
643+
alias="functionalType", default=None
644+
)
641645
id: Any
642646
name: Optional[str] = None
647+
tags: Optional[List[str]] = None
643648

644649

645650
class UpdateTemplateVersionInput(BaseModel):
Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,23 @@
1+
# Generated by ariadne-codegen
2+
# Source: openhexa/graphql/queries.graphql
3+
4+
from typing import List
5+
6+
from pydantic import Field
7+
8+
from .base_model import BaseModel
9+
from .enums import PipelineError
10+
11+
12+
class UpdatePipelineHeartbeat(BaseModel):
13+
update_pipeline_heartbeat: "UpdatePipelineHeartbeatUpdatePipelineHeartbeat" = Field(
14+
alias="updatePipelineHeartbeat"
15+
)
16+
17+
18+
class UpdatePipelineHeartbeatUpdatePipelineHeartbeat(BaseModel):
19+
success: bool
20+
errors: List[PipelineError]
21+
22+
23+
UpdatePipelineHeartbeat.model_rebuild()

openhexa/graphql/graphql_client/upgrade_pipeline_version_from_template.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -10,9 +10,9 @@
1010

1111

1212
class UpgradePipelineVersionFromTemplate(BaseModel):
13-
upgrade_pipeline_version_from_template: "UpgradePipelineVersionFromTemplateUpgradePipelineVersionFromTemplate" = Field(
14-
alias="upgradePipelineVersionFromTemplate"
15-
)
13+
upgrade_pipeline_version_from_template: (
14+
"UpgradePipelineVersionFromTemplateUpgradePipelineVersionFromTemplate"
15+
) = Field(alias="upgradePipelineVersionFromTemplate")
1616

1717

1818
class UpgradePipelineVersionFromTemplateUpgradePipelineVersionFromTemplate(BaseModel):

openhexa/graphql/queries.graphql

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -112,6 +112,13 @@ mutation StopPipeline($input: StopPipelineInput!) {
112112
}
113113
}
114114

115+
mutation UpdatePipelineHeartbeat {
116+
updatePipelineHeartbeat {
117+
success
118+
errors
119+
}
120+
}
121+
115122
mutation AddPipelineRecipient($input: CreatePipelineRecipientInput!) {
116123
addPipelineRecipient(input: $input) {
117124
success

openhexa/graphql/schema.generated.graphql

Lines changed: 45 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -797,6 +797,7 @@ input CreatePipelineInput {
797797
functionalType: PipelineFunctionalType
798798
name: String!
799799
notebookPath: String
800+
tags: [String!]
800801
workspaceSlug: String!
801802
}
802803

@@ -1863,7 +1864,8 @@ type File {
18631864
path: String!
18641865
size: BigInt
18651866
type: FileType!
1866-
updated: DateTime
1867+
updated: DateTime @deprecated(reason: "Use updatedAt instead")
1868+
updatedAt: DateTime
18671869
}
18681870

18691871
"""Represents a file or directory node in a flattened structure."""
@@ -2574,6 +2576,12 @@ type Mutation {
25742576
"""Updates an existing pipeline."""
25752577
updatePipeline(input: UpdatePipelineInput!): UpdatePipelineResult!
25762578

2579+
"""
2580+
Updates the heartbeat timestamp for the current pipeline run.
2581+
Must be called by an authenticated pipeline run.
2582+
"""
2583+
updatePipelineHeartbeat: UpdatePipelineHeartbeatResult!
2584+
25772585
"""Updates the progress of a pipeline."""
25782586
updatePipelineProgress(input: UpdatePipelineProgressInput!): UpdatePipelineProgressResult!
25792587

@@ -2625,6 +2633,9 @@ type Organization {
26252633
"""The contact information of the organization."""
26262634
contactInfo: String!
26272635

2636+
"""Dataset links available in the organization"""
2637+
datasetLinks(page: Int = 1, perPage: Int = 15, query: String): DatasetLinkPage!
2638+
26282639
"""Datasets available in the organization"""
26292640
datasets(page: Int = 1, perPage: Int = 15, query: String): DatasetPage!
26302641

@@ -2640,9 +2651,20 @@ type Organization {
26402651
"""The name of the organization."""
26412652
name: String!
26422653

2654+
"""
2655+
The direct invitations sent to join a specific workspace in the organization.
2656+
"""
2657+
pendingWorkspaceInvitations(page: Int, perPage: Int): WorkspaceInvitationPage!
2658+
26432659
"""The permissions the current user has in the organization."""
26442660
permissions: OrganizationPermissions!
26452661

2662+
"""Pipeline tags used within this organization."""
2663+
pipelineTags: [String!]!
2664+
2665+
"""Pipeline template tags used within this organization."""
2666+
pipelineTemplateTags: [String!]!
2667+
26462668
"""The short name of the organization."""
26472669
shortName: String
26482670

@@ -2899,6 +2921,7 @@ type PipelineParameter {
28992921
code: String!
29002922
connection: String
29012923
default: Generic
2924+
directory: String
29022925
help: String
29032926
multiple: Boolean!
29042927
name: String!
@@ -3018,10 +3041,12 @@ type PipelineTemplate {
30183041
config: String
30193042
currentVersion: PipelineTemplateVersion
30203043
description: String
3044+
functionalType: PipelineFunctionalType
30213045
id: UUID!
30223046
name: String!
30233047
permissions: PipelineTemplatePermissions!
30243048
sourcePipeline: Pipeline
3049+
tags: [Tag!]!
30253050
updatedAt: DateTime!
30263051
versions(page: Int, perPage: Int): TemplateVersionPage!
30273052
workspace: Workspace
@@ -3229,7 +3254,7 @@ enum PrepareObjectUploadError {
32293254

32303255
"""
32313256
Input for preparing to upload an object to a workspace's bucket.
3232-
The `contentType`
3257+
The `contentType`
32333258
"""
32343259
input PrepareObjectUploadInput {
32353260
contentType: String
@@ -3332,8 +3357,11 @@ type Query {
33323357
"""Retrieves a pipeline run by ID."""
33333358
pipelineRun(id: UUID!): PipelineRun
33343359

3360+
"""Retrieves a pipeline template version by ID."""
3361+
pipelineTemplateVersion(id: UUID!): PipelineTemplateVersion
3362+
33353363
"""Retrieves a page of pipeline templates."""
3336-
pipelineTemplates(page: Int = 1, perPage: Int = 15, search: String, workspaceSlug: String): PipelineTemplatePage!
3364+
pipelineTemplates(functionalType: PipelineFunctionalType, page: Int = 1, perPage: Int = 15, search: String, tags: [String!], workspaceSlug: String): PipelineTemplatePage!
33373365

33383366
"""Retrieves a pipeline version by ID."""
33393367
pipelineVersion(id: UUID!): PipelineVersion
@@ -3342,7 +3370,7 @@ type Query {
33423370
pipelines(functionalType: PipelineFunctionalType, name: String, page: Int, perPage: Int, search: String, tags: [String!], workspaceSlug: String): PipelinesPage!
33433371
searchDatabaseTables(organizationId: UUID, page: Int = 1, perPage: Int = 15, query: String!, workspaceSlugs: [String]): DatabaseTableResultPage!
33443372
searchDatasets(organizationId: UUID, page: Int = 1, perPage: Int = 15, query: String!, workspaceSlugs: [String]): DatasetResultPage!
3345-
searchFiles(organizationId: UUID, page: Int = 1, perPage: Int = 15, query: String!, workspaceSlugs: [String]): FileResultPage!
3373+
searchFiles(organizationId: UUID, page: Int = 1, perPage: Int = 15, prefix: String, query: String!, workspaceSlugs: [String]): FileResultPage!
33463374
searchPipelineTemplates(organizationId: UUID, page: Int = 1, perPage: Int = 15, query: String!, workspaceSlugs: [String]): PipelineTemplateResultPage!
33473375
searchPipelines(functionalType: PipelineFunctionalType, organizationId: UUID, page: Int = 1, perPage: Int = 15, query: String!, workspaceSlugs: [String]): PipelineResultPage!
33483376
team(id: UUID!): Team
@@ -4052,6 +4080,12 @@ enum UpdatePipelineError {
40524080
PERMISSION_DENIED
40534081
}
40544082

4083+
"""Represents the result of updating a pipeline heartbeat."""
4084+
type UpdatePipelineHeartbeatResult {
4085+
errors: [PipelineError!]!
4086+
success: Boolean!
4087+
}
4088+
40554089
"""Represents the input for updating a pipeline."""
40564090
input UpdatePipelineInput {
40574091
autoUpdateFromTemplate: Boolean
@@ -4162,6 +4196,7 @@ type UpdateTeamResult {
41624196
Enum representing the possible errors that can occur when updating a template.
41634197
"""
41644198
enum UpdateTemplateError {
4199+
INVALID_CONFIG
41654200
NOT_FOUND
41664201
PERMISSION_DENIED
41674202
}
@@ -4170,8 +4205,10 @@ enum UpdateTemplateError {
41704205
input UpdateTemplateInput {
41714206
config: JSON
41724207
description: String
4208+
functionalType: PipelineFunctionalType
41734209
id: UUID!
41744210
name: String
4211+
tags: [String!]
41754212
}
41764213

41774214
"""Represents the result of updating a template."""
@@ -4482,6 +4519,8 @@ type Workspace {
44824519
name: String!
44834520
organization: Organization
44844521
permissions: WorkspacePermissions!
4522+
pipelineTags: [String!]!
4523+
pipelineTemplateTags: [String!]!
44854524
slug: String!
44864525
updatedAt: DateTime
44874526
}
@@ -4531,6 +4570,7 @@ enum WorkspaceInvitationStatus {
45314570
type WorkspaceMembership {
45324571
createdAt: DateTime!
45334572
id: UUID!
4573+
organizationMembership: OrganizationMembership
45344574
role: WorkspaceMembershipRole!
45354575
updatedAt: DateTime
45364576
user: User!
@@ -4595,4 +4635,4 @@ type WorkspacePermissions {
45954635
launchNotebookServer: Boolean!
45964636
manageMembers: Boolean!
45974637
update: Boolean!
4598-
}
4638+
}

0 commit comments

Comments
 (0)