|
2 | 2 | import time |
3 | 3 | import uuid |
4 | 4 |
|
| 5 | +import mock |
5 | 6 | import pytest |
6 | 7 | from celery import states |
7 | 8 | from celery.result import allow_join_result |
8 | 9 | from celery.utils.log import get_task_logger |
9 | 10 | from django.core.management import call_command |
10 | 11 | from django.test import TransactionTestCase |
11 | 12 | from django_celery_results.models import TaskResult |
| 13 | +from le_utils.constants import licenses |
| 14 | +from mock import patch |
12 | 15 |
|
13 | 16 | from . import testdata |
| 17 | +from .base import StudioTestCase |
14 | 18 | from .helpers import clear_tasks |
| 19 | +from .helpers import EagerTasksTestMixin |
| 20 | +from contentcuration import models as cc |
15 | 21 | from contentcuration.celery import app |
16 | 22 |
|
17 | 23 | logger = get_task_logger(__name__) |
@@ -273,3 +279,229 @@ def test_revoke_task(self): |
273 | 279 | TaskResult.objects.get(task_id=async_result.task_id, status=states.REVOKED) |
274 | 280 | except TaskResult.DoesNotExist: |
275 | 281 | self.fail("Missing revoked task result") |
| 282 | + |
| 283 | + |
| 284 | +class AuditChannelLicensesTaskTestCase(EagerTasksTestMixin, StudioTestCase): |
| 285 | + """Tests for the audit_channel_licenses_task""" |
| 286 | + |
| 287 | + def setUp(self): |
| 288 | + super().setUp() |
| 289 | + self.setUpBase() |
| 290 | + self.channel.main_tree.published = True |
| 291 | + self.channel.main_tree.save() |
| 292 | + self.channel.version = 1 |
| 293 | + self.channel.save() |
| 294 | + |
| 295 | + @patch("contentcuration.utils.audit_channel_licenses.KolibriContentNode") |
| 296 | + @patch( |
| 297 | + "contentcuration.utils.audit_channel_licenses.using_temp_migrated_content_database" |
| 298 | + ) |
| 299 | + @patch("contentcuration.utils.audit_channel_licenses.storage.exists") |
| 300 | + def test_audit_licenses_task__no_invalid_or_special_permissions( |
| 301 | + self, mock_storage_exists, mock_using_db, mock_kolibri_node |
| 302 | + ): |
| 303 | + """Test audit task when channel has no invalid or special permissions licenses""" |
| 304 | + from contentcuration.tasks import audit_channel_licenses_task |
| 305 | + |
| 306 | + license1, _ = cc.License.objects.get_or_create(license_name="CC BY") |
| 307 | + license2, _ = cc.License.objects.get_or_create(license_name="CC BY-SA") |
| 308 | + cc.License.objects.get_or_create(license_name=licenses.SPECIAL_PERMISSIONS) |
| 309 | + node1 = testdata.node({"kind_id": "video", "title": "Video Node"}) |
| 310 | + node1.parent = self.channel.main_tree |
| 311 | + node1.license = license1 |
| 312 | + node1.save() |
| 313 | + node1.published = True |
| 314 | + node1.save() |
| 315 | + |
| 316 | + node2 = testdata.node({"kind_id": "video", "title": "Video Node 2"}) |
| 317 | + node2.parent = self.channel.main_tree |
| 318 | + node2.license = license2 |
| 319 | + node2.save() |
| 320 | + node2.published = True |
| 321 | + node2.save() |
| 322 | + |
| 323 | + mock_storage_exists.return_value = True |
| 324 | + |
| 325 | + mock_context = mock.MagicMock() |
| 326 | + mock_using_db.return_value.__enter__ = mock.Mock(return_value=mock_context) |
| 327 | + mock_using_db.return_value.__exit__ = mock.Mock(return_value=None) |
| 328 | + |
| 329 | + # Mock KolibriContentNode to return license names from the nodes we created |
| 330 | + mock_license_names_distinct = ["CC BY", "CC BY-SA"] |
| 331 | + mock_license_names_values_list = mock.Mock() |
| 332 | + mock_license_names_values_list.distinct.return_value = ( |
| 333 | + mock_license_names_distinct |
| 334 | + ) |
| 335 | + mock_license_names_exclude3 = mock.Mock() |
| 336 | + mock_license_names_exclude3.values_list.return_value = ( |
| 337 | + mock_license_names_values_list |
| 338 | + ) |
| 339 | + mock_license_names_exclude2 = mock.Mock() |
| 340 | + mock_license_names_exclude2.exclude.return_value = mock_license_names_exclude3 |
| 341 | + mock_license_names_exclude1 = mock.Mock() |
| 342 | + mock_license_names_exclude1.exclude.return_value = mock_license_names_exclude2 |
| 343 | + |
| 344 | + mock_kolibri_node.objects = mock.Mock() |
| 345 | + mock_kolibri_node.objects.exclude = mock.Mock( |
| 346 | + return_value=mock_license_names_exclude1 |
| 347 | + ) |
| 348 | + |
| 349 | + audit_channel_licenses_task.apply( |
| 350 | + kwargs={"channel_id": self.channel.id, "user_id": self.user.id} |
| 351 | + ) |
| 352 | + |
| 353 | + self.channel.refresh_from_db() |
| 354 | + version_str = str(self.channel.version) |
| 355 | + self.assertIn(version_str, self.channel.published_data) |
| 356 | + published_data_version = self.channel.published_data[version_str] |
| 357 | + |
| 358 | + self.assertIn("included_licenses", published_data_version) |
| 359 | + self.assertIsNone( |
| 360 | + published_data_version.get("community_library_invalid_licenses") |
| 361 | + ) |
| 362 | + self.assertIsNone( |
| 363 | + published_data_version.get("community_library_special_permissions") |
| 364 | + ) |
| 365 | + |
| 366 | + @patch("contentcuration.utils.audit_channel_licenses.KolibriContentNode") |
| 367 | + @patch( |
| 368 | + "contentcuration.utils.audit_channel_licenses.using_temp_migrated_content_database" |
| 369 | + ) |
| 370 | + @patch("contentcuration.utils.audit_channel_licenses.storage.exists") |
| 371 | + def test_audit_licenses_task__with_all_rights_reserved( |
| 372 | + self, mock_storage_exists, mock_using_db, mock_kolibri_node |
| 373 | + ): |
| 374 | + """Test audit task when channel has All Rights Reserved license""" |
| 375 | + from contentcuration.tasks import audit_channel_licenses_task |
| 376 | + |
| 377 | + all_rights_license, _ = cc.License.objects.get_or_create( |
| 378 | + license_name=licenses.ALL_RIGHTS_RESERVED |
| 379 | + ) |
| 380 | + |
| 381 | + mock_storage_exists.return_value = True |
| 382 | + |
| 383 | + mock_context = mock.MagicMock() |
| 384 | + mock_using_db.return_value.__enter__ = mock.Mock(return_value=mock_context) |
| 385 | + mock_using_db.return_value.__exit__ = mock.Mock(return_value=None) |
| 386 | + |
| 387 | + mock_license_names_distinct = [licenses.ALL_RIGHTS_RESERVED] |
| 388 | + mock_license_names_values_list = mock.Mock() |
| 389 | + mock_license_names_values_list.distinct.return_value = ( |
| 390 | + mock_license_names_distinct |
| 391 | + ) |
| 392 | + mock_license_names_exclude3 = mock.Mock() |
| 393 | + mock_license_names_exclude3.values_list.return_value = ( |
| 394 | + mock_license_names_values_list |
| 395 | + ) |
| 396 | + mock_license_names_exclude2 = mock.Mock() |
| 397 | + mock_license_names_exclude2.exclude.return_value = mock_license_names_exclude3 |
| 398 | + mock_license_names_exclude1 = mock.Mock() |
| 399 | + mock_license_names_exclude1.exclude.return_value = mock_license_names_exclude2 |
| 400 | + mock_license_names_base = mock.Mock() |
| 401 | + mock_license_names_base.exclude.return_value = mock_license_names_exclude1 |
| 402 | + |
| 403 | + mock_kolibri_node.objects = mock.Mock() |
| 404 | + mock_kolibri_node.objects.exclude = mock.Mock( |
| 405 | + return_value=mock_license_names_exclude1 |
| 406 | + ) |
| 407 | + |
| 408 | + audit_channel_licenses_task.apply( |
| 409 | + kwargs={"channel_id": self.channel.id, "user_id": self.user.id} |
| 410 | + ) |
| 411 | + |
| 412 | + self.channel.refresh_from_db() |
| 413 | + version_str = str(self.channel.version) |
| 414 | + published_data_version = self.channel.published_data[version_str] |
| 415 | + |
| 416 | + self.assertEqual( |
| 417 | + published_data_version.get("community_library_invalid_licenses"), |
| 418 | + [all_rights_license.id], |
| 419 | + ) |
| 420 | + |
| 421 | + @patch("contentcuration.utils.audit_channel_licenses.KolibriContentNode") |
| 422 | + @patch( |
| 423 | + "contentcuration.utils.audit_channel_licenses.using_temp_migrated_content_database" |
| 424 | + ) |
| 425 | + @patch("contentcuration.utils.audit_channel_licenses.storage.exists") |
| 426 | + def test_audit_licenses_task__with_special_permissions( |
| 427 | + self, mock_storage_exists, mock_using_db, mock_kolibri_node |
| 428 | + ): |
| 429 | + """Test audit task when channel has Special Permissions licenses""" |
| 430 | + from contentcuration.tasks import audit_channel_licenses_task |
| 431 | + |
| 432 | + special_perms_license, _ = cc.License.objects.get_or_create( |
| 433 | + license_name="Special Permissions" |
| 434 | + ) |
| 435 | + node = testdata.node({"kind_id": "video", "title": "Video Node"}) |
| 436 | + node.parent = self.channel.main_tree |
| 437 | + node.license = special_perms_license |
| 438 | + node.save() |
| 439 | + node.published = True |
| 440 | + node.save() |
| 441 | + |
| 442 | + mock_storage_exists.return_value = True |
| 443 | + |
| 444 | + mock_context = mock.MagicMock() |
| 445 | + mock_using_db.return_value.__enter__ = mock.Mock(return_value=mock_context) |
| 446 | + mock_using_db.return_value.__exit__ = mock.Mock(return_value=None) |
| 447 | + |
| 448 | + mock_license_names_distinct = [licenses.SPECIAL_PERMISSIONS] |
| 449 | + mock_license_names_values_list = mock.Mock() |
| 450 | + mock_license_names_values_list.distinct.return_value = ( |
| 451 | + mock_license_names_distinct |
| 452 | + ) |
| 453 | + mock_license_names_exclude3 = mock.Mock() |
| 454 | + mock_license_names_exclude3.values_list.return_value = ( |
| 455 | + mock_license_names_values_list |
| 456 | + ) |
| 457 | + mock_license_names_exclude2 = mock.Mock() |
| 458 | + mock_license_names_exclude2.exclude.return_value = mock_license_names_exclude3 |
| 459 | + mock_license_names_exclude1 = mock.Mock() |
| 460 | + mock_license_names_exclude1.exclude.return_value = mock_license_names_exclude2 |
| 461 | + mock_license_names_base = mock.Mock() |
| 462 | + mock_license_names_base.exclude.return_value = mock_license_names_exclude1 |
| 463 | + mock_special_perms_distinct = ["Custom permission 1", "Custom permission 2"] |
| 464 | + mock_special_perms_values_list = mock.Mock() |
| 465 | + mock_special_perms_values_list.distinct.return_value = ( |
| 466 | + mock_special_perms_distinct |
| 467 | + ) |
| 468 | + mock_special_perms_exclude3 = mock.Mock() |
| 469 | + mock_special_perms_exclude3.values_list.return_value = ( |
| 470 | + mock_special_perms_values_list |
| 471 | + ) |
| 472 | + mock_special_perms_exclude2 = mock.Mock() |
| 473 | + mock_special_perms_exclude2.exclude.return_value = mock_special_perms_exclude3 |
| 474 | + mock_special_perms_exclude1 = mock.Mock() |
| 475 | + mock_special_perms_exclude1.exclude.return_value = mock_special_perms_exclude2 |
| 476 | + mock_special_perms_filter = mock.Mock() |
| 477 | + mock_special_perms_filter.exclude.return_value = mock_special_perms_exclude1 |
| 478 | + |
| 479 | + # Set up the mock to return different querysets based on the method called |
| 480 | + mock_kolibri_node.objects = mock.Mock() |
| 481 | + mock_kolibri_node.objects.exclude = mock.Mock( |
| 482 | + return_value=mock_license_names_exclude1 |
| 483 | + ) |
| 484 | + mock_kolibri_node.objects.filter = mock.Mock( |
| 485 | + return_value=mock_special_perms_filter |
| 486 | + ) |
| 487 | + |
| 488 | + audit_channel_licenses_task.apply( |
| 489 | + kwargs={"channel_id": self.channel.id, "user_id": self.user.id} |
| 490 | + ) |
| 491 | + |
| 492 | + self.channel.refresh_from_db() |
| 493 | + version_str = str(self.channel.version) |
| 494 | + published_data_version = self.channel.published_data[version_str] |
| 495 | + |
| 496 | + special_perms = published_data_version.get( |
| 497 | + "community_library_special_permissions" |
| 498 | + ) |
| 499 | + self.assertIsNotNone(special_perms) |
| 500 | + self.assertEqual(len(special_perms), 2) |
| 501 | + |
| 502 | + from contentcuration.models import AuditedSpecialPermissionsLicense |
| 503 | + |
| 504 | + audited_licenses = AuditedSpecialPermissionsLicense.objects.filter( |
| 505 | + description__in=["Custom permission 1", "Custom permission 2"] |
| 506 | + ) |
| 507 | + self.assertEqual(audited_licenses.count(), 2) |
0 commit comments