Skip to content

Commit f408fbf

Browse files
committed
feat: change ingester data insertion policy
Now allows for overwrites of null data Closes #1552
1 parent da32f90 commit f408fbf

File tree

3 files changed

+170
-16
lines changed

3 files changed

+170
-16
lines changed

backend/kernelCI_app/management/commands/helpers/kcidbng_ingester.py

Lines changed: 78 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@
2121
extract_log_excerpt,
2222
)
2323
import kcidb_io
24-
from django.db import transaction
24+
from django.db import connections, transaction
2525
from kernelCI_app.models import Issues, Checkouts, Builds, Tests, Incidents
2626

2727
from kernelCI_app.management.commands.helpers.process_submissions import (
@@ -111,23 +111,92 @@ def prepare_file_data(
111111
}
112112

113113

114-
def consume_buffer(buffer: list[TableModels], item_type: TableNames) -> None:
114+
def _generate_model_insert_query(
115+
table_name: TableNames, model: TableModels
116+
) -> tuple[list[str], str]:
117+
"""
118+
Dynamically generates the insert query for any model.
119+
This function should only be used inside a transaction context.
120+
121+
Gives priority to the existing data in the database.
122+
123+
Returns a list of which model properties can be updated and the insert query.
124+
"""
125+
updateable_model_fields: list[str] = []
126+
updateable_db_fields: list[str] = []
127+
query_params_properties: list[tuple[str, str]] = []
128+
129+
for field in model._meta.fields:
130+
if field.generated:
131+
continue
132+
133+
field_name = (
134+
field.name + "_id"
135+
if field.get_internal_type() == "ForeignKey"
136+
else field.name
137+
)
138+
real_name = field.db_column or field_name
139+
operation = "GREATEST" if real_name == "_timestamp" else "COALESCE"
140+
141+
query_params_properties.append((real_name, operation))
142+
updateable_model_fields.append(field_name)
143+
updateable_db_fields.append(real_name)
144+
145+
conflict_clauses = []
146+
for field, op in query_params_properties:
147+
conflict_clauses.append(
148+
f"""
149+
{field} = {op}({table_name}.{field}, EXCLUDED.{field})"""
150+
)
151+
152+
query = f"""
153+
INSERT INTO {table_name} (
154+
{', '.join(updateable_db_fields)}
155+
)
156+
VALUES (
157+
{', '.join(['%s'] * len(updateable_db_fields))}
158+
)
159+
ON CONFLICT (id)
160+
DO UPDATE SET {', '.join(conflict_clauses)};
161+
"""
162+
163+
return updateable_model_fields, query
164+
165+
166+
def consume_buffer(buffer: list[TableModels], table_name: TableNames) -> None:
115167
"""
116168
Consume a buffer of items and insert them into the database.
117169
This function is called by the db_worker thread.
118170
"""
119171
if not buffer:
120172
return
121173

122-
model = MODEL_MAP[item_type]
174+
try:
175+
model = MODEL_MAP[table_name]
176+
except KeyError:
177+
out(
178+
"Unknown table '%s' passed to consume_buffer. Valid tables: %s"
179+
% (table_name, str(", ".join(MODEL_MAP.keys())))
180+
)
181+
raise
182+
183+
updateable_model_fields, query = _generate_model_insert_query(table_name, model)
184+
185+
params = []
186+
for obj in buffer:
187+
obj_values = []
188+
for field in updateable_model_fields:
189+
value = getattr(obj, field)
190+
if isinstance(value, (dict, list)):
191+
value = json.dumps(value)
192+
obj_values.append(value)
193+
params.append(tuple(obj_values))
123194

124195
t0 = time.time()
125-
model.objects.bulk_create(
126-
buffer,
127-
batch_size=INGEST_BATCH_SIZE,
128-
ignore_conflicts=True,
129-
)
130-
out("bulk_create %s: n=%d in %.3fs" % (item_type, len(buffer), time.time() - t0))
196+
with connections["default"].cursor() as cursor:
197+
cursor.executemany(query, params)
198+
199+
out("bulk_create %s: n=%d in %.3fs" % (table_name, len(buffer), time.time() - t0))
131200

132201

133202
def flush_buffers(

backend/kernelCI_app/models.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,6 @@
1+
"""Defines the models used in the main database.
2+
All models should have explicit id column for the ingester to work properly."""
3+
14
from django.db import models
25
from django.contrib.postgres.fields import ArrayField
36
from django.contrib.postgres.indexes import GinIndex

backend/kernelCI_app/tests/unitTests/commands/monitorSubmissions/kcidbng_ingester_test.py

Lines changed: 89 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -206,6 +206,61 @@ def test_prepare_file_data_file_error(self, mock_file_open, mock_logger):
206206
mock_file_open.assert_called_once()
207207

208208

209+
class TestGenerateInsertQuery:
210+
def test_generate_model_insert_query_success(self):
211+
"""Test _generate_model_insert_query."""
212+
from kernelCI_app.management.commands.helpers.kcidbng_ingester import (
213+
_generate_model_insert_query,
214+
)
215+
216+
mock_model = MagicMock()
217+
218+
mock_field_timestamp = MagicMock()
219+
mock_field_timestamp.name = "field_timestamp"
220+
mock_field_timestamp.db_column = "_timestamp"
221+
mock_field_timestamp.generated = False
222+
mock_field_timestamp.get_internal_type.return_value = "DateTimeField"
223+
224+
mock_field_comment = MagicMock()
225+
mock_field_comment.name = "comment"
226+
mock_field_comment.db_column = None
227+
mock_field_comment.generated = False
228+
mock_field_comment.get_internal_type.return_value = "CharField"
229+
230+
mock_field3 = MagicMock()
231+
mock_field3.name = "checkout"
232+
mock_field3.db_column = None
233+
mock_field3.generated = False
234+
mock_field3.get_internal_type.return_value = "ForeignKey"
235+
236+
mock_field4 = MagicMock()
237+
mock_field4.name = "series"
238+
mock_field4.generated = True
239+
240+
mock_model._meta.fields = [
241+
mock_field_timestamp,
242+
mock_field_comment,
243+
mock_field3,
244+
mock_field4,
245+
]
246+
247+
with patch(
248+
"kernelCI_app.management.commands.helpers.kcidbng_ingester.MODEL_MAP",
249+
{"issues": mock_model},
250+
):
251+
updateable_fields, query = _generate_model_insert_query(
252+
"issues", mock_model
253+
)
254+
255+
assert updateable_fields == ["field_timestamp", "comment", "checkout_id"]
256+
assert "INSERT INTO issues" in query
257+
assert "_timestamp, comment, checkout_id" in query
258+
assert "GREATEST(issues._timestamp, EXCLUDED._timestamp)" in query
259+
assert "COALESCE(issues.comment, EXCLUDED.comment)" in query
260+
assert "COALESCE(issues.checkout_id, EXCLUDED.checkout_id)" in query
261+
assert "series" not in query
262+
263+
209264
class TestConsumeBuffer:
210265
"""Test cases for consume_buffer function."""
211266

@@ -218,24 +273,45 @@ class TestConsumeBuffer:
218273
INGEST_BATCH_SIZE_MOCK,
219274
)
220275
@patch("kernelCI_app.management.commands.helpers.kcidbng_ingester.out")
276+
@patch(
277+
"kernelCI_app.management.commands.helpers.kcidbng_ingester._generate_model_insert_query"
278+
)
279+
@patch("kernelCI_app.management.commands.helpers.kcidbng_ingester.connections")
221280
@patch("time.time", side_effect=TIME_MOCK)
222-
def test_consume_buffer_with_items(self, mock_time, mock_out):
281+
def test_consume_buffer_with_items(
282+
self, mock_time, mock_connections, mock_generate_query, mock_out
283+
):
223284
"""Test consume_buffer with items in buffer."""
285+
table_name = "issues"
224286
mock_model = MagicMock()
225287
mock_buffer = [MagicMock(), MagicMock()]
288+
mock_generate_query.return_value = (
289+
["_timestamp", "other_field"],
290+
"""
291+
INSERT INTO issues (
292+
_timestamp, other_field
293+
)
294+
VALUES (
295+
%s, %s
296+
)
297+
ON CONFLICT (id)
298+
DO UPDATE SET
299+
GREATEST(issues._timestamp, EXCLUDED._timestamp),
300+
COALESCE(issues.other_field, EXCLUDED.other_field);""",
301+
)
302+
mock_cursor = MagicMock()
303+
mock_connections["default"].cursor.return_value.__enter__.return_value = (
304+
mock_cursor
305+
)
226306

227307
with patch(
228308
"kernelCI_app.management.commands.helpers.kcidbng_ingester.MODEL_MAP",
229309
{"issues": mock_model},
230310
):
231-
consume_buffer(mock_buffer, "issues")
311+
consume_buffer(mock_buffer, table_name)
232312

233313
assert mock_time.call_count == 2
234-
mock_model.objects.bulk_create.assert_called_once_with(
235-
mock_buffer,
236-
batch_size=INGEST_BATCH_SIZE_MOCK,
237-
ignore_conflicts=True,
238-
)
314+
mock_cursor.executemany.assert_called_once()
239315
mock_out.assert_called_once()
240316

241317
@patch("kernelCI_app.management.commands.helpers.kcidbng_ingester.out")
@@ -254,6 +330,12 @@ def test_consume_buffer_empty_buffer(self, mock_time, mock_out):
254330
mock_time.assert_not_called()
255331
mock_out.assert_not_called()
256332

333+
def test_consume_buffer_wrong_table(self):
334+
"""Test consume_buffer with invalid table name raises KeyError."""
335+
with pytest.raises(KeyError):
336+
mock_model = MagicMock()
337+
consume_buffer([mock_model], "another")
338+
257339

258340
class TestFlushBuffers:
259341
"""Test cases for flush_buffers function."""

0 commit comments

Comments
 (0)