diff --git a/.github/workflows/pull_request_workflow.yaml b/.github/workflows/pull_request_workflow.yaml index 48d2054..c93fe35 100644 --- a/.github/workflows/pull_request_workflow.yaml +++ b/.github/workflows/pull_request_workflow.yaml @@ -9,7 +9,7 @@ on: jobs: # Enforces that only `develop` and hotfix branches can merge into `main` - # Source: https://stackoverflow.com/questions/71120146/ + # Source: https://stackoverflow.com/questions/71120146/ check_branch: runs-on: ubuntu-latest steps: @@ -55,6 +55,10 @@ jobs: uses: actions/setup-python@v6 with: python-version: ${{ matrix.python-version }} + + - name: Install system dependencies + run: | + sudo apt-get update && sudo apt-get install -y libkrb5-dev - name: Install dependencies run: | diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml index c57b58c..99def29 100644 --- a/.pre-commit-config.yaml +++ b/.pre-commit-config.yaml @@ -8,16 +8,22 @@ repos: ^.github/ ) exclude_types: [markdown, yaml, toml] + - id: end-of-file-fixer exclude: | (?x)( ^.github/ ) exclude_types: [markdown, yaml, toml] + - id: check-docstring-first + - id: check-yaml + - id: check-toml + - id: debug-statements + - id: requirements-txt-fixer - repo: https://github.com/astral-sh/ruff-pre-commit diff --git a/CHANGELOG.md b/CHANGELOG.md index 06b39de..36aae17 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -13,7 +13,9 @@ and this project adheres to [semantic versioning](https://semver.org/spec/v2.0.0 - Utility functions and their unit tests. - Matrix A* functions and their unit tests. - Match score functions and their unit tests. -- New unit test for: +- Run script, `main.py`, with updates to work on Spark 3.5.1. +- Configs template, with updates to reflect updated `main.py`. +- New unit tests: - `cartesian_join_dataframes` in `tests/utils/utils.py`. - `create_spark_session` in `tests/utils/test_utils_create_spark_session.py`. - `get_input_variables` in `tests/utils/utils.py`. @@ -21,10 +23,10 @@ and this project adheres to [semantic versioning](https://semver.org/spec/v2.0.0 ### Changed -- Dependabot updates including: +- Dependabot: - In GitHub Actions, bump actions/checkout from v4 to v5. - In GitHub Actions, bump actions/setup-python from v5 to v6. -- Function `create_spark_session` in `scalelink/utils/utils.py`, to make it less verbose. +- Function `create_spark_session` in `scalelink/utils/utils.py`, to make it less verbose and make it work with Spark 3.5.1. - Code contribution guidelines, to clarify that we are only accepting contributions from ONSdigital users currently. - Dependabot config, so that version updates are targeted to `develop` not `main`. @@ -33,8 +35,9 @@ and this project adheres to [semantic versioning](https://semver.org/spec/v2.0.0 ### Fixed - GitHub Action that increments release version - fixed typos. -- Unit test for `cartesian_join_dataframes` in `tests/utils/utils.py`. -- Unit test for `get_s` in `tests/utils/utils.py`. +- Unit tests: + - `cartesian_join_dataframes` in `tests/utils/utils.py`. + - `get_s` in `tests/utils/utils.py`. ### Removed diff --git a/configs_template.ini b/configs_template.ini new file mode 100755 index 0000000..c949073 --- /dev/null +++ b/configs_template.ini @@ -0,0 +1,28 @@ +[run_spec] +#Session size can be s, m, l or xl +spark_session_size = + +[filepaths] +# df1_path, df2_path and df_candidates_path must be to parquet files +# checkpoint_path must be to a unique location as it will be deleted at the end of the run +bucket_name = +ssl_file = +df1_path = +df2_path = +df_candidates_path = +checkpoint_path = +output_path = + +[variables] +df1_id = +df2_id = +linkage_vars = +df1_suffix = +df2_suffix = + +[cutpoints] +day_cutpoints = +month_cutpoints = +year_cutpoints = +sex_cutpoints = +surname_cutpoints = diff --git a/scalelink/main.py b/scalelink/main.py new file mode 100755 index 0000000..4a8763e --- /dev/null +++ b/scalelink/main.py @@ -0,0 +1,124 @@ +""" +Run script for Scalelink. +""" + +import boto3 +import raz_client +from rdsa_utils.cdp.helpers.s3_utils import delete_folder + +from scalelink.indicator_matrix import indicator_matrix as im +from scalelink.match_scores import match_scores as ms +from scalelink.matrix_a_star import matrix_a_star as ma +from scalelink.utils import utils as ut + + +def run_scalelink(config_path="scalelink/configs.ini"): + """ + Takes a path for the location of the config file. From this, runs the entire + scaling method as per Goldstein et al. (2017) on the specified datasets, + using the specified linkage variables. + + Args: + config_path (str): + The filepath for the config file. The default is a file called + configs.ini in the head of this repo. The contents should follow the + template found at scalelink/configs_template.ini in this repo. + + Dependencies: + shutil + + Returns: + df_weights_match_scores (Spark DataFrame): + A dataframe either derived from the Cartesian join of the two input + dataframes or derived from the dataset located at df_candidates_path. + The columns present on this dataset are: + - The ID column of each dataset. + - Weight columns (named after the linkage variables, suffixed with + '_weight') containing weights for each linkage variable and row. + - A column called match_score which contains the sum of the weights, + row-wise. + This dataframe is also written to the output_path specified in the config + file. + """ + input_variables = ut.get_input_variables(config_path=config_path) + + spark = ut.create_spark_session( + spark_session_name="Scalelink", + spark_session_size=input_variables["spark_session_size"], + ) + + if input_variables["df_candidates_path"] == "": + df_candidate_pairs = ut.cartesian_join_dataframes( + df1_path="s3a://" + + input_variables["bucket_name"] + + input_variables["df1_path"], + df2_path="s3a://" + + input_variables["bucket_name"] + + input_variables["df2_path"], + spark=spark, + ) + else: + df_candidate_pairs = spark.read.parquet( + "s3a://" + + input_variables["bucket_name"] + + input_variables["df_candidates_path"] + ) + + input_variables = ut.get_s( + input_variables=input_variables, df_cartesian_join=df_candidate_pairs + ) + + spark.sparkContext.setCheckpointDir( + "s3a://" + input_variables["bucket_name"] + input_variables["checkpoint_path"] + ) + + df_deltas = im.get_deltas( + df_cartesian_join=df_candidate_pairs, input_variables=input_variables + ) + + df_delta_comparisons = im.compare_deltas( + df=df_deltas, + linkage_vars=input_variables["linkage_vars"], + delta_col_prefix="di_", + ) + + print("Deltas have been calculated") + + matrix_a_star = ma.get_matrix_a_star( + df_delta_comparisons=df_delta_comparisons, input_variables=input_variables + ) + + print("Matrix A* has been calculated") + + x_star_scaled_labelled = ma.get_scaled_labelled_x_star( + matrix_a_star=matrix_a_star, input_variables=input_variables + ) + + df_weights_match_scores = ms.get_match_scores( + df_deltas=df_deltas, + x_star_scaled_labelled=x_star_scaled_labelled, + input_variables=input_variables, + spark=spark, + ) + + print("Match scores have been calculated") + + df_weights_match_scores.write.mode("overwrite").parquet( + "s3a://" + input_variables["bucket_name"] + input_variables["output_path"] + ) + + print("Your linked dataset has been written to:", input_variables["output_path"]) + + client = boto3.client("s3") + raz_client.configure_ranger_raz(client, ssl_file=input_variables["ssl_file"]) + delete_folder( + client, input_variables["bucket_name"], input_variables["checkpoint_path"] + ) + + print("Your checkpoint files have been tidied up") + + return df_weights_match_scores + + +if __name__ == "__main__": + run_scalelink() diff --git a/scalelink/utils/utils.py b/scalelink/utils/utils.py index 3ab11e8..b2bcef6 100755 --- a/scalelink/utils/utils.py +++ b/scalelink/utils/utils.py @@ -168,7 +168,7 @@ def create_spark_session(spark_session_name, spark_session_size): session_config = session_configs[spark_session_size] spark_builder = ( SparkSession.builder.appName(spark_session_name) - .config("spark.shuffle.service.enabled", "true") + .config("spark.shuffle.service.enabled", "false") .config("spark.ui.showConsoleProgress", "false") .enableHiveSupport() ) @@ -324,19 +324,28 @@ def get_input_variables(config_path): spark_session_size (str): The specified Spark session size for this run. Can be 's', 'm', 'l' or 'xl'. + bucket_name (str): + The name of the S3 bucket where the various filepaths can be + found. Must not include the "s://" prefix. + ssl_file (str): + The path, including file name and extension, for the SSL + Certificate to be used by the Boto3 client. df1_path (str): - The filepath for df1. + The filepath for df1, excluding the S3 bucket name. df2_path (str): - The filepath for df2. + The filepath for df2, excluding the S3 bucket name. df_candidates_path (str): - The filepath for the dataset of candidate pairs, if a - specific set of candidate pairs (e.g., from blocking) is to - be used instead of getting candidate pairs by Cartesian join - of df1 and df2. + The filepath for the dataset of candidate pairs, excluding + the S3 bucket name. + Only use if a specific set of candidate pairs (e.g. from + blocking) is to be used instead of getting candidate pairs + by Cartesian join of df1 and df2. checkpoint_path (str): - The filepath where checkpoints will be written. + The filepath where checkpoints will be written, excluding + the S3 bucket name. output_path (str): - The filepath where the linked dataset will be written. + The filepath where the linked dataset will be written, + excluding the S3 bucket name. df1_id (str): The name of the ID variable in df1. df2_id (str): @@ -380,6 +389,10 @@ def get_input_variables(config_path): spark_session_size = run_spec_configs["spark_session_size"] + bucket_name = filepath_configs["bucket_name"] + + ssl_file = filepath_configs["ssl_file"] + df1_path = filepath_configs["df1_path"] df2_path = filepath_configs["df2_path"] @@ -414,6 +427,8 @@ def get_input_variables(config_path): input_variables = { "spark_session_size": spark_session_size, + "bucket_name": bucket_name, + "ssl_file": ssl_file, "df1_path": df1_path, "df2_path": df2_path, "df_candidates_path": df_candidates_path, diff --git a/setup.cfg b/setup.cfg index 2d9c513..20cde90 100644 --- a/setup.cfg +++ b/setup.cfg @@ -20,7 +20,12 @@ packages = find: python_requires = >=3.8, <3.11 include_package_data = True install_requires = - pandas + boto3 >=1.34.149 + numpy >=1.24 + pandas >=2.0 + pyspark >=3.5.1 + raz_client >=1.1.0 + rdsa-utils >=0.16.0 [options.packages.find] where = . @@ -33,7 +38,6 @@ dev = chispa >=0.9.2 isort >=5.13.2 pre-commit >=2.6.0 - pyspark ==3.5.1 pytest >=7.1.0, <8.0.0 # Temporarily pin pytest due to https://github.com/TvoroG/pytest-lazy-fixture/issues/65 pytest-cov >=4.0.0 ruff >=0.0.270 diff --git a/tests/utils/test_utils.py b/tests/utils/test_utils.py index 8bb361c..41faaf3 100755 --- a/tests/utils/test_utils.py +++ b/tests/utils/test_utils.py @@ -241,12 +241,13 @@ def test_get_input_variables( test_input_config = cp.ConfigParser() test_input_config["run_spec"] = {"spark_session_size": "m"} test_input_config["filepaths"] = { + "bucket_name": "my_bucket", + "ssl_file": "my_ssl_file", "df1_path": "folder/subfolder/df1", "df2_path": "folder/subfolder/df2", "df_candidates_path": "folder/subfolder/df_candidates", "checkpoint_path": "folder/subfolder/checkpoints/", "output_path": "folder/subfolder/output/", - "hdfs_test_path": "folder/subfolder/hdfs_tests/", } test_input_config["variables"] = { "df1_id": "df1_id", diff --git a/tests/utils/test_utils_create_spark_session.py b/tests/utils/test_utils_create_spark_session.py index 6309796..cf4463d 100755 --- a/tests/utils/test_utils_create_spark_session.py +++ b/tests/utils/test_utils_create_spark_session.py @@ -37,7 +37,7 @@ def test_create_spark_session(): "spark.executor.cores": "1", "spark.dynamicAllocation.maxExecutors": "3", "spark.sql.shuffle.partitions": "12", - "spark.shuffle.service.enabled": "true", + "spark.shuffle.service.enabled": "false", "spark.ui.showConsoleProgress": "false", }, "m": { @@ -46,7 +46,7 @@ def test_create_spark_session(): "spark.executor.cores": "3", "spark.dynamicAllocation.maxExecutors": "3", "spark.sql.shuffle.partitions": "18", - "spark.shuffle.service.enabled": "true", + "spark.shuffle.service.enabled": "false", "spark.ui.showConsoleProgress": "false", }, "l": { @@ -55,7 +55,7 @@ def test_create_spark_session(): "spark.executor.cores": "5", "spark.dynamicAllocation.maxExecutors": "5", "spark.sql.shuffle.partitions": "200", - "spark.shuffle.service.enabled": "true", + "spark.shuffle.service.enabled": "false", "spark.ui.showConsoleProgress": "false", }, "xl": { @@ -64,7 +64,7 @@ def test_create_spark_session(): "spark.executor.cores": "5", "spark.dynamicAllocation.maxExecutors": "12", "spark.sql.shuffle.partitions": "240", - "spark.shuffle.service.enabled": "true", + "spark.shuffle.service.enabled": "false", "spark.ui.showConsoleProgress": "false", }, }