diff --git a/backend/kernelCI_app/management/commands/generate_insert_queries.py b/backend/kernelCI_app/management/commands/generate_insert_queries.py new file mode 100644 index 000000000..684a3613c --- /dev/null +++ b/backend/kernelCI_app/management/commands/generate_insert_queries.py @@ -0,0 +1,107 @@ +from django.core.management.base import BaseCommand +from kernelCI_app.typeModels.modelTypes import MODEL_MAP +import os +from datetime import datetime +from jinja2 import Template + + +class Command(BaseCommand): + help = """ + Dynamically generates the insert queries for all models, storing them in a specific file. + Gives priority to the existing data in the database. + + This command should not executed in runtime. + """ + + def add_arguments(self, parser): + # Add custom command arguments here + pass + + def handle(self, *args, **options): + var_insert_queries = {} + + for table_name in MODEL_MAP.keys(): + model = MODEL_MAP[table_name] + + updateable_model_fields: list[str] = [] + updateable_db_fields: list[str] = [] + query_params_properties: list[tuple[str, str]] = [] + + for field in model._meta.fields: + if field.generated: + continue + + field_name = ( + field.name + "_id" + if field.get_internal_type() == "ForeignKey" + else field.name + ) + real_name = field.db_column or field_name + updateable_model_fields.append(field_name) + updateable_db_fields.append(real_name) + + operation = "GREATEST" if real_name == "_timestamp" else "COALESCE" + # Fields that are never null don't need to have conflict clauses (except _timestamp) + if field.null or real_name == "_timestamp": + query_params_properties.append((real_name, operation)) + + updateable_db_fields_clauses = [ + f""" + {updateable_field}""" + for updateable_field in updateable_db_fields + ] + + conflict_clauses = [] + for field, op in query_params_properties: + conflict_clauses.append( + f""" + {field} = {op}({table_name}.{field}, EXCLUDED.{field})""" + ) + + query = f""" + INSERT INTO {table_name} ({','.join(updateable_db_fields_clauses)} + ) + VALUES ( + {', '.join(['%s'] * len(updateable_db_fields))} + ) + ON CONFLICT (id) + DO UPDATE SET{','.join(conflict_clauses)}; + """ + + var_insert_queries[table_name] = {} + var_insert_queries[table_name][ + "updateable_model_fields" + ] = updateable_model_fields + var_insert_queries[table_name]["query"] = query + + # Read the template file + template_path = os.path.join( + os.path.dirname(__file__), "templates", "insert_queries.txt.j2" + ) + with open(template_path, "r") as template_file: + template_content = template_file.read() + + # Render the template with the variables + template = Template(template_content) + rendered_content = template.render( + timestamp=datetime.now(), + checkouts=var_insert_queries["checkouts"], + issues=var_insert_queries["issues"], + builds=var_insert_queries["builds"], + tests=var_insert_queries["tests"], + incidents=var_insert_queries["incidents"], + ) + + # Write the result to a Python file + output_path = os.path.join( + os.path.dirname(__file__), "generated", "insert_queries.py" + ) + os.makedirs(os.path.dirname(output_path), exist_ok=True) + with open(output_path, "w") as output_file: + output_file.write(rendered_content) + + self.stdout.write( + self.style.SUCCESS( + f"Successfully generated insert queries at {output_path}" + ) + ) diff --git a/backend/kernelCI_app/management/commands/generated/insert_queries.py b/backend/kernelCI_app/management/commands/generated/insert_queries.py new file mode 100644 index 000000000..e2fae1cbc --- /dev/null +++ b/backend/kernelCI_app/management/commands/generated/insert_queries.py @@ -0,0 +1,308 @@ +"""Contains the queries used to insert db data, +including the conflict resolution logic and +which fields are actually updateable for each model""" + +# Automatically generated by generate_insert_queries.py. +# Do not edit manually. +# Last updated on 2025-12-01 18:25:36.652906. + +# flake8: noqa: E501 # Ignores long lines for better readability + +INSERT_QUERIES = { + "checkouts": { + "updateable_model_fields": [ + "field_timestamp", + "id", + "origin", + "tree_name", + "git_repository_url", + "git_commit_hash", + "git_commit_name", + "git_repository_branch", + "patchset_files", + "patchset_hash", + "message_id", + "comment", + "start_time", + "log_url", + "log_excerpt", + "valid", + "misc", + "git_commit_message", + "git_repository_branch_tip", + "git_commit_tags", + "origin_builds_finish_time", + "origin_tests_finish_time", + ], + "query": """ + INSERT INTO checkouts ( + _timestamp, + id, + origin, + tree_name, + git_repository_url, + git_commit_hash, + git_commit_name, + git_repository_branch, + patchset_files, + patchset_hash, + message_id, + comment, + start_time, + log_url, + log_excerpt, + valid, + misc, + git_commit_message, + git_repository_branch_tip, + git_commit_tags, + origin_builds_finish_time, + origin_tests_finish_time + ) + VALUES ( + %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s + ) + ON CONFLICT (id) + DO UPDATE SET + _timestamp = GREATEST(checkouts._timestamp, EXCLUDED._timestamp), + tree_name = COALESCE(checkouts.tree_name, EXCLUDED.tree_name), + git_repository_url = COALESCE(checkouts.git_repository_url, EXCLUDED.git_repository_url), + git_commit_hash = COALESCE(checkouts.git_commit_hash, EXCLUDED.git_commit_hash), + git_commit_name = COALESCE(checkouts.git_commit_name, EXCLUDED.git_commit_name), + git_repository_branch = COALESCE(checkouts.git_repository_branch, EXCLUDED.git_repository_branch), + patchset_files = COALESCE(checkouts.patchset_files, EXCLUDED.patchset_files), + patchset_hash = COALESCE(checkouts.patchset_hash, EXCLUDED.patchset_hash), + message_id = COALESCE(checkouts.message_id, EXCLUDED.message_id), + comment = COALESCE(checkouts.comment, EXCLUDED.comment), + start_time = COALESCE(checkouts.start_time, EXCLUDED.start_time), + log_url = COALESCE(checkouts.log_url, EXCLUDED.log_url), + log_excerpt = COALESCE(checkouts.log_excerpt, EXCLUDED.log_excerpt), + valid = COALESCE(checkouts.valid, EXCLUDED.valid), + misc = COALESCE(checkouts.misc, EXCLUDED.misc), + git_commit_message = COALESCE(checkouts.git_commit_message, EXCLUDED.git_commit_message), + git_repository_branch_tip = COALESCE(checkouts.git_repository_branch_tip, EXCLUDED.git_repository_branch_tip), + git_commit_tags = COALESCE(checkouts.git_commit_tags, EXCLUDED.git_commit_tags), + origin_builds_finish_time = COALESCE(checkouts.origin_builds_finish_time, EXCLUDED.origin_builds_finish_time), + origin_tests_finish_time = COALESCE(checkouts.origin_tests_finish_time, EXCLUDED.origin_tests_finish_time); + """, + }, + "issues": { + "updateable_model_fields": [ + "field_timestamp", + "id", + "version", + "origin", + "report_url", + "report_subject", + "culprit_code", + "culprit_tool", + "culprit_harness", + "comment", + "misc", + "categories", + ], + "query": """ + INSERT INTO issues ( + _timestamp, + id, + version, + origin, + report_url, + report_subject, + culprit_code, + culprit_tool, + culprit_harness, + comment, + misc, + categories + ) + VALUES ( + %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s + ) + ON CONFLICT (id) + DO UPDATE SET + _timestamp = GREATEST(issues._timestamp, EXCLUDED._timestamp), + report_url = COALESCE(issues.report_url, EXCLUDED.report_url), + report_subject = COALESCE(issues.report_subject, EXCLUDED.report_subject), + culprit_code = COALESCE(issues.culprit_code, EXCLUDED.culprit_code), + culprit_tool = COALESCE(issues.culprit_tool, EXCLUDED.culprit_tool), + culprit_harness = COALESCE(issues.culprit_harness, EXCLUDED.culprit_harness), + comment = COALESCE(issues.comment, EXCLUDED.comment), + misc = COALESCE(issues.misc, EXCLUDED.misc), + categories = COALESCE(issues.categories, EXCLUDED.categories); + """, + }, + "builds": { + "updateable_model_fields": [ + "field_timestamp", + "checkout_id", + "id", + "origin", + "comment", + "start_time", + "duration", + "architecture", + "command", + "compiler", + "input_files", + "output_files", + "config_name", + "config_url", + "log_url", + "log_excerpt", + "misc", + "status", + ], + "query": """ + INSERT INTO builds ( + _timestamp, + checkout_id, + id, + origin, + comment, + start_time, + duration, + architecture, + command, + compiler, + input_files, + output_files, + config_name, + config_url, + log_url, + log_excerpt, + misc, + status + ) + VALUES ( + %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s + ) + ON CONFLICT (id) + DO UPDATE SET + _timestamp = GREATEST(builds._timestamp, EXCLUDED._timestamp), + comment = COALESCE(builds.comment, EXCLUDED.comment), + start_time = COALESCE(builds.start_time, EXCLUDED.start_time), + duration = COALESCE(builds.duration, EXCLUDED.duration), + architecture = COALESCE(builds.architecture, EXCLUDED.architecture), + command = COALESCE(builds.command, EXCLUDED.command), + compiler = COALESCE(builds.compiler, EXCLUDED.compiler), + input_files = COALESCE(builds.input_files, EXCLUDED.input_files), + output_files = COALESCE(builds.output_files, EXCLUDED.output_files), + config_name = COALESCE(builds.config_name, EXCLUDED.config_name), + config_url = COALESCE(builds.config_url, EXCLUDED.config_url), + log_url = COALESCE(builds.log_url, EXCLUDED.log_url), + log_excerpt = COALESCE(builds.log_excerpt, EXCLUDED.log_excerpt), + misc = COALESCE(builds.misc, EXCLUDED.misc), + status = COALESCE(builds.status, EXCLUDED.status); + """, + }, + "tests": { + "updateable_model_fields": [ + "field_timestamp", + "build_id", + "id", + "origin", + "environment_comment", + "environment_misc", + "path", + "comment", + "log_url", + "log_excerpt", + "status", + "start_time", + "duration", + "output_files", + "misc", + "number_value", + "environment_compatible", + "number_prefix", + "number_unit", + "input_files", + ], + "query": """ + INSERT INTO tests ( + _timestamp, + build_id, + id, + origin, + environment_comment, + environment_misc, + path, + comment, + log_url, + log_excerpt, + status, + start_time, + duration, + output_files, + misc, + number_value, + environment_compatible, + number_prefix, + number_unit, + input_files + ) + VALUES ( + %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s + ) + ON CONFLICT (id) + DO UPDATE SET + _timestamp = GREATEST(tests._timestamp, EXCLUDED._timestamp), + environment_comment = COALESCE(tests.environment_comment, EXCLUDED.environment_comment), + environment_misc = COALESCE(tests.environment_misc, EXCLUDED.environment_misc), + path = COALESCE(tests.path, EXCLUDED.path), + comment = COALESCE(tests.comment, EXCLUDED.comment), + log_url = COALESCE(tests.log_url, EXCLUDED.log_url), + log_excerpt = COALESCE(tests.log_excerpt, EXCLUDED.log_excerpt), + status = COALESCE(tests.status, EXCLUDED.status), + start_time = COALESCE(tests.start_time, EXCLUDED.start_time), + duration = COALESCE(tests.duration, EXCLUDED.duration), + output_files = COALESCE(tests.output_files, EXCLUDED.output_files), + misc = COALESCE(tests.misc, EXCLUDED.misc), + number_value = COALESCE(tests.number_value, EXCLUDED.number_value), + environment_compatible = COALESCE(tests.environment_compatible, EXCLUDED.environment_compatible), + number_prefix = COALESCE(tests.number_prefix, EXCLUDED.number_prefix), + number_unit = COALESCE(tests.number_unit, EXCLUDED.number_unit), + input_files = COALESCE(tests.input_files, EXCLUDED.input_files); + """, + }, + "incidents": { + "updateable_model_fields": [ + "field_timestamp", + "id", + "origin", + "issue_id", + "issue_version", + "build_id", + "test_id", + "present", + "comment", + "misc", + ], + "query": """ + INSERT INTO incidents ( + _timestamp, + id, + origin, + issue_id, + issue_version, + build_id, + test_id, + present, + comment, + misc + ) + VALUES ( + %s, %s, %s, %s, %s, %s, %s, %s, %s, %s + ) + ON CONFLICT (id) + DO UPDATE SET + _timestamp = GREATEST(incidents._timestamp, EXCLUDED._timestamp), + build_id = COALESCE(incidents.build_id, EXCLUDED.build_id), + test_id = COALESCE(incidents.test_id, EXCLUDED.test_id), + present = COALESCE(incidents.present, EXCLUDED.present), + comment = COALESCE(incidents.comment, EXCLUDED.comment), + misc = COALESCE(incidents.misc, EXCLUDED.misc); + """, + }, +} diff --git a/backend/kernelCI_app/management/commands/helpers/kcidbng_ingester.py b/backend/kernelCI_app/management/commands/helpers/kcidbng_ingester.py index 327d8c993..1c5defaa4 100644 --- a/backend/kernelCI_app/management/commands/helpers/kcidbng_ingester.py +++ b/backend/kernelCI_app/management/commands/helpers/kcidbng_ingester.py @@ -16,28 +16,23 @@ import traceback from typing import Any, Optional, TypedDict from kernelCI_app.helpers.logger import out +from kernelCI_app.management.commands.generated.insert_queries import INSERT_QUERIES from kernelCI_app.management.commands.helpers.file_utils import move_file_to_failed_dir from kernelCI_app.management.commands.helpers.log_excerpt_utils import ( extract_log_excerpt, ) import kcidb_io -from django.db import transaction -from kernelCI_app.models import ( - Issues, - Checkouts, - Builds, - Tests, - Incidents, -) from kernelCI_app.management.commands.helpers.aggregation_helpers import ( aggregate_checkouts_and_tests, ) +from django.db import connections, transaction +from kernelCI_app.models import Issues, Checkouts, Builds, Tests, Incidents from kernelCI_app.management.commands.helpers.process_submissions import ( TableNames, build_instances_from_submission, ) -from kernelCI_app.typeModels.modelTypes import MODEL_MAP, TableModels +from kernelCI_app.typeModels.modelTypes import TableModels class SubmissionMetadata(TypedDict): @@ -120,7 +115,7 @@ def prepare_file_data( } -def consume_buffer(buffer: list[TableModels], item_type: TableNames) -> None: +def consume_buffer(buffer: list[TableModels], table_name: TableNames) -> None: """ Consume a buffer of items and insert them into the database. This function is called by the db_worker thread. @@ -128,15 +123,26 @@ def consume_buffer(buffer: list[TableModels], item_type: TableNames) -> None: if not buffer: return - model = MODEL_MAP[item_type] + insert_props = INSERT_QUERIES[table_name] + updateable_model_fields = insert_props["updateable_model_fields"] + query = insert_props["query"] + + params = [] + for obj in buffer: + obj_values = [] + for field in updateable_model_fields: + value = getattr(obj, field) + model_field = obj._meta.get_field(field) + if model_field.get_internal_type() == "JSONField" and value is not None: + value = json.dumps(value) + obj_values.append(value) + params.append(tuple(obj_values)) t0 = time.time() - model.objects.bulk_create( - buffer, - batch_size=INGEST_BATCH_SIZE, - ignore_conflicts=True, - ) - out("bulk_create %s: n=%d in %.3fs" % (item_type, len(buffer), time.time() - t0)) + with connections["default"].cursor() as cursor: + cursor.executemany(query, params) + + out("bulk_create %s: n=%d in %.3fs" % (table_name, len(buffer), time.time() - t0)) def flush_buffers( @@ -176,7 +182,7 @@ def flush_buffers( tests_instances=tests_buf, ) except Exception as e: - logger.error("Error during bulk_create flush: %s", e) + logger.error("Error during buffer flush: %s", e) finally: flush_dur = time.time() - flush_start rate = total / flush_dur if flush_dur > 0 else 0.0 diff --git a/backend/kernelCI_app/management/commands/templates/insert_queries.txt.j2 b/backend/kernelCI_app/management/commands/templates/insert_queries.txt.j2 new file mode 100644 index 000000000..540cb73cc --- /dev/null +++ b/backend/kernelCI_app/management/commands/templates/insert_queries.txt.j2 @@ -0,0 +1,58 @@ +"""Contains the queries used to insert db data, +including the conflict resolution logic and +which fields are actually updateable for each model""" + +# Automatically generated by generate_insert_queries.py. +# Do not edit manually. +# Last updated on {{timestamp}}. + +# flake8: noqa: E501 # Ignores long lines for better readability + +INSERT_QUERIES = { + "checkouts": { + "updateable_model_fields": [ + {% for field in checkouts["updateable_model_fields"] -%} + "{{ field }}"{% if not loop.last %}, + {% endif %} + {%- endfor %}, + ], + "query": """{{checkouts["query"]}}""", + }, + "issues": { + "updateable_model_fields": [ + {% for field in issues["updateable_model_fields"] -%} + "{{ field }}"{% if not loop.last %}, + {% endif %} + {%- endfor %}, + ], + "query": """{{issues["query"]}}""", + }, + "builds": { + "updateable_model_fields": [ + {% for field in builds["updateable_model_fields"] -%} + "{{ field }}"{% if not loop.last %}, + {% endif %} + {%- endfor %}, + ], + "query": """{{builds["query"]}}""", + }, + "tests": { + "updateable_model_fields": [ + {% for field in tests["updateable_model_fields"] -%} + "{{ field }}"{% if not loop.last %}, + {% endif %} + {%- endfor %}, + ], + "query": """{{tests["query"]}}""", + }, + "incidents": { + "updateable_model_fields": [ + {% for field in incidents["updateable_model_fields"] -%} + "{{ field }}"{% if not loop.last %}, + {% endif %} + {%- endfor %}, + ], + "query": """{{incidents["query"]}}""", + }, +} + diff --git a/backend/kernelCI_app/models.py b/backend/kernelCI_app/models.py index d75e1cb06..9fa963e4f 100644 --- a/backend/kernelCI_app/models.py +++ b/backend/kernelCI_app/models.py @@ -1,3 +1,6 @@ +"""Defines the models used in the main database. +All models should have explicit id column for the ingester to work properly.""" + from django.db import models from django.contrib.postgres.fields import ArrayField from django.contrib.postgres.indexes import GinIndex diff --git a/backend/kernelCI_app/tests/unitTests/commands/monitorSubmissions/kcidbng_ingester_test.py b/backend/kernelCI_app/tests/unitTests/commands/monitorSubmissions/kcidbng_ingester_test.py index 394a9639a..c721b86a9 100644 --- a/backend/kernelCI_app/tests/unitTests/commands/monitorSubmissions/kcidbng_ingester_test.py +++ b/backend/kernelCI_app/tests/unitTests/commands/monitorSubmissions/kcidbng_ingester_test.py @@ -212,30 +212,29 @@ class TestConsumeBuffer: # Test cases: # - buffer with items # - empty buffer + # - trying to insert in an invalid table @patch( "kernelCI_app.management.commands.helpers.kcidbng_ingester.INGEST_BATCH_SIZE", INGEST_BATCH_SIZE_MOCK, ) @patch("kernelCI_app.management.commands.helpers.kcidbng_ingester.out") + @patch("kernelCI_app.management.commands.helpers.kcidbng_ingester.connections") @patch("time.time", side_effect=TIME_MOCK) - def test_consume_buffer_with_items(self, mock_time, mock_out): + def test_consume_buffer_with_items(self, mock_time, mock_connections, mock_out): """Test consume_buffer with items in buffer.""" - mock_model = MagicMock() + table_name = "issues" + # TODO: test with a real example mock_buffer = [MagicMock(), MagicMock()] + mock_cursor = MagicMock() + mock_connections["default"].cursor.return_value.__enter__.return_value = ( + mock_cursor + ) - with patch( - "kernelCI_app.management.commands.helpers.kcidbng_ingester.MODEL_MAP", - {"issues": mock_model}, - ): - consume_buffer(mock_buffer, "issues") + consume_buffer(mock_buffer, table_name) assert mock_time.call_count == 2 - mock_model.objects.bulk_create.assert_called_once_with( - mock_buffer, - batch_size=INGEST_BATCH_SIZE_MOCK, - ignore_conflicts=True, - ) + mock_cursor.executemany.assert_called_once() mock_out.assert_called_once() @patch("kernelCI_app.management.commands.helpers.kcidbng_ingester.out") @@ -244,16 +243,18 @@ def test_consume_buffer_empty_buffer(self, mock_time, mock_out): """Test consume_buffer with empty buffer.""" mock_model = MagicMock() - with patch( - "kernelCI_app.management.commands.helpers.kcidbng_ingester.MODEL_MAP", - {"issues": mock_model}, - ): - consume_buffer([], "issues") + consume_buffer([], "issues") mock_model.objects.bulk_create.assert_not_called() mock_time.assert_not_called() mock_out.assert_not_called() + def test_consume_buffer_wrong_table(self): + """Test consume_buffer with invalid table name raises KeyError.""" + with pytest.raises(KeyError): + mock_model = MagicMock() + consume_buffer([mock_model], "another") + class TestFlushBuffers: """Test cases for flush_buffers function.""" @@ -416,7 +417,7 @@ def test_flush_buffers_with_db_error( assert mock_time.call_count == 2 mock_atomic.assert_called_once() mock_logger.error.assert_called_once_with( - "Error during bulk_create flush: %s", mock_consume.side_effect + "Error during buffer flush: %s", mock_consume.side_effect ) mock_aggregate.assert_not_called()