Skip to content

Commit 2e861d1

Browse files
committed
wip: insert with sql
1 parent 1b3bea7 commit 2e861d1

File tree

4 files changed

+110
-9
lines changed

4 files changed

+110
-9
lines changed

backend/kernelCI_app/constants/ingester.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -48,3 +48,5 @@
4848
except (ValueError, TypeError):
4949
logger.warning("Invalid INGEST_QUEUE_MAXSIZE, using default 5000")
5050
INGEST_QUEUE_MAXSIZE = 5000
51+
52+
PRIO_DB = is_boolean_or_string_true(os.environ.get("PRIO_BD", "True"))

backend/kernelCI_app/management/commands/helpers/kcidbng_ingester.py

Lines changed: 72 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010
INGEST_FLUSH_TIMEOUT_SEC,
1111
INGEST_QUEUE_MAXSIZE,
1212
VERBOSE,
13+
PRIO_DB,
1314
)
1415
import threading
1516
import time
@@ -21,7 +22,7 @@
2122
extract_log_excerpt,
2223
)
2324
import kcidb_io
24-
from django.db import transaction
25+
from django.db import connections, transaction
2526
from kernelCI_app.models import Issues, Checkouts, Builds, Tests, Incidents
2627

2728
from kernelCI_app.management.commands.helpers.process_submissions import (
@@ -111,23 +112,85 @@ def prepare_file_data(
111112
}
112113

113114

114-
def consume_buffer(buffer: list[TableModels], item_type: TableNames) -> None:
115+
def consume_buffer(buffer: list[TableModels], table_name: TableNames) -> None:
115116
"""
116117
Consume a buffer of items and insert them into the database.
117118
This function is called by the db_worker thread.
118119
"""
119120
if not buffer:
120121
return
121122

122-
model = MODEL_MAP[item_type]
123+
model = MODEL_MAP[table_name]
124+
125+
# Get nullable fields for coalesce
126+
# Get non-nullable but updateable fields such as timestamp for other functions
127+
128+
updateable_model_fields = []
129+
updateable_db_fields = []
130+
query_params_properties: list[tuple] = []
131+
for field in model._meta.fields:
132+
if field.generated:
133+
continue
134+
135+
field_name = (
136+
field.name + "_id"
137+
if field.get_internal_type() == "ForeignKey"
138+
else field.name
139+
)
140+
real_name = field.db_column or field_name
141+
142+
operation = "GREATEST" if real_name == "_timestamp" else "COALESCE"
143+
144+
query_params_properties.append((real_name, operation))
145+
146+
updateable_model_fields.append(field_name)
147+
updateable_db_fields.append(real_name)
148+
149+
print("🚀 ~ query_params_properties:", query_params_properties)
150+
print("🚀 ~ all_updateable_model_fields:", updateable_model_fields)
151+
print("🚀 ~ all_updateable_db_fields:", updateable_db_fields)
152+
153+
conflict_clauses = []
154+
for field, op in query_params_properties:
155+
if PRIO_DB:
156+
conflict_clauses.append(
157+
f"""
158+
{field} = {op}({table_name}.{field}, EXCLUDED.{field})"""
159+
)
160+
else:
161+
conflict_clauses.append(
162+
f"""
163+
{field} = {op}(EXCLUDED.{field}, {table_name}.{field})"""
164+
)
165+
166+
query = f"""
167+
INSERT INTO {table_name} (
168+
{', '.join(updateable_db_fields)}
169+
)
170+
VALUES (
171+
{', '.join(['%s'] * len(updateable_db_fields))}
172+
)
173+
ON CONFLICT (id)
174+
DO UPDATE SET {', '.join(conflict_clauses)};
175+
"""
176+
print("🚀 ~ query:", query)
177+
178+
params = []
179+
for obj in buffer:
180+
obj_values = []
181+
for field in updateable_model_fields:
182+
value = getattr(obj, field)
183+
if isinstance(value, (dict, list)):
184+
value = json.dumps(value)
185+
obj_values.append(value)
186+
params.append(tuple(obj_values))
187+
print("🚀 ~ params:", params)
123188

124189
t0 = time.time()
125-
model.objects.bulk_create(
126-
buffer,
127-
batch_size=INGEST_BATCH_SIZE,
128-
ignore_conflicts=True,
129-
)
130-
out("bulk_create %s: n=%d in %.3fs" % (item_type, len(buffer), time.time() - t0))
190+
with connections["default"].cursor() as cursor:
191+
cursor.executemany(query, params)
192+
193+
out("bulk_create %s: n=%d in %.3fs" % (table_name, len(buffer), time.time() - t0))
131194

132195

133196
def flush_buffers(
Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,33 @@
1+
{
2+
"checkouts": [],
3+
"builds": [
4+
{
5+
"checkout_id": "ingestertest:687ac2282ce2c1874ed7adad",
6+
"id": "ingestertest:0000",
7+
"origin": "ingestertest",
8+
"comment": "pm-6.16-rc7-234-gc7de79e662b86",
9+
"start_time": "2025-07-18T21:55:31.295000+00:00",
10+
"architecture": "i386",
11+
"compiler": "clang-17",
12+
"config_name": "i386_defconfig+allmodconfig+CONFIG_FRAME_WARN=2048",
13+
"status": "PASS",
14+
"misc": {
15+
"platform": "kubernetes",
16+
"runtime": "k8s-all",
17+
"lab": "k8s-all",
18+
"job_id": "kci-687ac2d32ce2c1874ed7b174-kbuild-clang-17-i386-all-lvti1gzs",
19+
"job_context": "gke_android-kernelci-external_us-central1-c_kci-us-central1",
20+
"kernel_type": "bzimage",
21+
"maestro_viewer": "https://api.kernelci.org/viewer?node_id=687ac2d32ce2c1874ed7b174"
22+
},
23+
"output_files": [],
24+
"config_url": "https://files.kernelci.org/kbuild-clang-17-i386-allmodconfig-687ac2d32ce2c1874ed7b174/.config",
25+
"log_url": "https://files.kernelci.org/kbuild-clang-17-i386-allmodconfig-687ac2d32ce2c1874ed7b174/build.log.gz",
26+
"log_excerpt": ""
27+
}
28+
],
29+
"tests": [],
30+
"issues": [],
31+
"incidents": [],
32+
"version": { "major": 5, "minor": 3 }
33+
}

backend/kernelCI_app/models.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,6 @@
1+
"""Defines the models used in the main database.
2+
All models should have explicit id column for the ingester to work properly."""
3+
14
from django.db import models
25
from django.contrib.postgres.fields import ArrayField
36
from django.contrib.postgres.indexes import GinIndex

0 commit comments

Comments
 (0)