From 5ca08c3d9bed7508a3287d4c944a08098223c211 Mon Sep 17 00:00:00 2001 From: Nathan Goldbaum Date: Wed, 26 Nov 2025 09:55:35 -0700 Subject: [PATCH 01/22] attempt at free-threaded support --- .github/workflows/ci.yml | 10 +++++++--- setup.py | 13 ++++++++----- 2 files changed, 15 insertions(+), 8 deletions(-) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index c3e26a2..4c05c30 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -22,7 +22,7 @@ jobs: fail-fast: false matrix: os: [ubuntu-latest, macos-latest, windows-latest] - python-version: ["3.8", "3.9", "3.10", "3.11", "3.12", "3.13", "3.14"] + python-version: ["3.8", "3.9", "3.10", "3.11", "3.12", "3.13", "3.14", "3.14t"] steps: - name: Set git to use LF on Windows if: runner.os == 'Windows' @@ -65,6 +65,10 @@ jobs: strategy: matrix: os: [ubuntu-latest, macos-latest, windows-latest] + python-version: [ + ["3.x", "cp38-* pp*-*"], + ["3.14t", "cp314t-*"], + ] steps: - uses: actions/checkout@v5 @@ -72,7 +76,7 @@ jobs: submodules: recursive - uses: actions/setup-python@v6 with: - python-version: 3.x + python-version: ${{ matrix.python-version[0] }} - name: Install cibuildwheel run: python -m pip install cibuildwheel # - name: Install Visual C++ for Python 2.7 @@ -87,7 +91,7 @@ jobs: - name: Build wheels run: python -m cibuildwheel --output-dir wheelhouse env: - CIBW_BUILD: cp38-* pp*-* + CIBW_BUILD: ${{ matrix.python-version[1] }} CIBW_SKIP: "*musllinux*" CIBW_ENABLE: pypy CIBW_ARCHS_LINUX: auto aarch64 diff --git a/setup.py b/setup.py index e7176fe..f46e453 100644 --- a/setup.py +++ b/setup.py @@ -3,6 +3,7 @@ import re import platform import sys +import sysconfig from setuptools import find_packages, setup from setuptools.command.build_ext import build_ext @@ -74,11 +75,13 @@ def run(self): except ImportError: pass else: - class BDistWheel(wheel.bdist_wheel.bdist_wheel): - def finalize_options(self): - self.py_limited_api = "cp3{}".format(sys.version_info[1]) - wheel.bdist_wheel.bdist_wheel.finalize_options(self) - cmdclass['bdist_wheel'] = BDistWheel + # the limited API is only supported on GIL builds as of Python 3.14 + if not bool(sysconfig.get_config_var("Py_GIL_DISABLED")): + class BDistWheel(wheel.bdist_wheel.bdist_wheel): + def finalize_options(self): + self.py_limited_api = "cp3{}".format(sys.version_info[1]) + wheel.bdist_wheel.bdist_wheel.finalize_options(self) + cmdclass['bdist_wheel'] = BDistWheel setup( name="brotlicffi", From 3206dd9985acc01b6494f46a9f6391dec3fe485c Mon Sep 17 00:00:00 2001 From: Nathan Goldbaum Date: Wed, 26 Nov 2025 10:57:50 -0700 Subject: [PATCH 02/22] update out-of-date action --- .github/workflows/ci.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 4c05c30..bc99196 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -84,7 +84,7 @@ jobs: # run: choco install vcpython27 -f -y - name: Install QEMU if: runner.os == 'Linux' - uses: docker/setup-qemu-action@v1 + uses: docker/setup-qemu-action@v3 with: platforms: all From 110418d2e5e2204bbec79e281b3d665de6d68eec Mon Sep 17 00:00:00 2001 From: Nathan Goldbaum Date: Wed, 26 Nov 2025 12:15:50 -0700 Subject: [PATCH 03/22] make wheel artifact names unique --- .github/workflows/ci.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index bc99196..868c99f 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -98,7 +98,7 @@ jobs: CIBW_BEFORE_BUILD_LINUX: yum install -y libffi-devel - uses: actions/upload-artifact@v4 with: - name: wheels-${{ matrix.os }} + name: wheels-${{ matrix.os }}-${{ matrix.python-version[0] }} path: ./wheelhouse/*.whl publish: From 7fb4b991a35e41caf4dc49cafcb07aec7e963b62 Mon Sep 17 00:00:00 2001 From: Nathan Goldbaum Date: Wed, 26 Nov 2025 12:42:00 -0700 Subject: [PATCH 04/22] add a shared compressor test --- test/test_multithreaded_sharing.py | 106 +++++++++++++++++++++++++++++ 1 file changed, 106 insertions(+) create mode 100644 test/test_multithreaded_sharing.py diff --git a/test/test_multithreaded_sharing.py b/test/test_multithreaded_sharing.py new file mode 100644 index 0000000..8cf5126 --- /dev/null +++ b/test/test_multithreaded_sharing.py @@ -0,0 +1,106 @@ +""" Test multithreaded sharing of Compressor and Decompressor instances + +Originally written by Eugene Kliuchnikov at Google for the brotli +Python bindings under an MIT license. + +""" +import queue +import random +import threading +import time + +import brotlicffi + + +def make_input(size): + abc = [bytes([b]) for b in b"abcdefghijklmnopqrstuvwxyz"] + abc_cap = [bytes([b]) for b in b"ABCDEFGHIJKLMNOPQRSTUVWXYZ"] + num_words_by_len = [0, 25, 100, 175, 1700, 1000, 1000, 1000] + word_set = set() + rng = random.Random() + rng.seed(2025) + words_by_len = [[]] + for word_len in range(1, len(num_words_by_len)): + num_words = num_words_by_len[word_len] + words = [] + for _ in range(num_words): + while True: + word = b"".join( + [rng.choice(abc_cap)] + + [rng.choice(abc) for _ in range(word_len - 1)] + ) + if word not in word_set: + word_set.add(word) + words.append(word) + break + words_by_len.append(words) + total_size = 0 + out = [] + while total_size < size: + word_len = rng.choice(range(1, len(num_words_by_len))) + word = rng.choice(words_by_len[word_len]) + total_size += len(word) + out.append(word) + return b"".join(out) + + +def _thread_compress(original, compressor, results): + compressed = compressor.process(original) + compressed += compressor.finish() + results.put(1) + + +def _thread_concurrent_process(compressor, results): + time.sleep(0.01) + try: + _ = compressor.process(b"whatever") + except brotlicffi.error: + results.put(2) + + +def _thread_concurrent_flush(compressor, results): + time.sleep(0.02) + try: + _ = compressor.flush() + except brotlicffi.error: + results.put(3) + + +def _thread_concurrent_finish(compressor, results): + time.sleep(0.03) + try: + _ = compressor.finish() + except brotlicffi.error: + results.put(4) + + +def test_concurrency(): + original = make_input(2 * 1024 * 1024) + compressor = brotlicffi.Compressor(quality=9) + results = queue.Queue() + threads = [] + threads.append( + threading.Thread( + target=_thread_compress, args=(original, compressor, results) + ) + ) + threads.append( + threading.Thread( + target=_thread_concurrent_process, args=(compressor, results) + ) + ) + threads.append( + threading.Thread( + target=_thread_concurrent_flush, args=(compressor, results) + ) + ) + threads.append( + threading.Thread( + target=_thread_concurrent_finish, args=(compressor, results) + ) + ) + for thread in threads: + thread.start() + for thread in threads: + thread.join() + assert sorted(list(results.queue)) == [1, 2, 3, 4] From c336a4ba795bc449114de6c0a6ec7353c4e4cbbf Mon Sep 17 00:00:00 2001 From: Nathan Goldbaum Date: Wed, 26 Nov 2025 13:34:42 -0700 Subject: [PATCH 05/22] prevent multithreaded sharing of Compressor and Decompressor instances --- src/brotlicffi/_api.py | 184 ++++++++++++++++++++++++----------------- 1 file changed, 106 insertions(+), 78 deletions(-) diff --git a/src/brotlicffi/_api.py b/src/brotlicffi/_api.py index b7e34be..8ba5521 100644 --- a/src/brotlicffi/_api.py +++ b/src/brotlicffi/_api.py @@ -1,6 +1,7 @@ # -*- coding: utf-8 -*- import math import enum +import threading from ._brotlicffi import ffi, lib @@ -249,6 +250,7 @@ def __init__(self, quality=lib.BROTLI_DEFAULT_QUALITY, lgwin=lib.BROTLI_DEFAULT_WINDOW, lgblock=0): + self.lock = threading.Lock() enc = lib.BrotliEncoderCreateInstance( ffi.NULL, ffi.NULL, ffi.NULL ) @@ -284,15 +286,21 @@ def _compress(self, data, operation): input_buffer = ffi.new("uint8_t []", data) ptr_to_input_buffer = ffi.new("uint8_t **", input_buffer) - rc = lib.BrotliEncoderCompressStream( - self._encoder, - operation, - input_size, - ptr_to_input_buffer, - available_out, - ptr_to_output_buffer, - ffi.NULL - ) + if not self.lock.acquire(blocking=False): + raise error( + "Concurrently sharing Compressor objects is not allowed") + try: + rc = lib.BrotliEncoderCompressStream( + self._encoder, + operation, + input_size, + ptr_to_input_buffer, + available_out, + ptr_to_output_buffer, + ffi.NULL + ) + finally: + self.lock.release() if rc != lib.BROTLI_TRUE: # pragma: no cover raise error("Error encountered compressing data.") @@ -362,6 +370,7 @@ class Decompressor(object): _unconsumed_data = None def __init__(self, dictionary=b''): + self.lock = threading.Lock() dec = lib.BrotliDecoderCreateInstance(ffi.NULL, ffi.NULL, ffi.NULL) self._decoder = ffi.gc(dec, lib.BrotliDecoderDestroyInstance) self._unconsumed_data = b'' @@ -420,73 +429,78 @@ def decompress(self, data, output_buffer_limit=None): if output_buffer_limit is not None and output_buffer_limit <= 0: return b'' - # Use unconsumed data if available, use new data otherwise. - if self._unconsumed_data: - input_data = self._unconsumed_data - self._unconsumed_data = b'' - else: - input_data = data + if not self.lock.acquire(blocking=False): + raise error( + "Concurrently sharing Decompressor instances is not allowed") + try: + # Use unconsumed data if available, use new data otherwise. + if self._unconsumed_data: + input_data = self._unconsumed_data + self._unconsumed_data = b'' + else: + input_data = data - chunks = [] - chunks_len = 0 - - available_in = ffi.new("size_t *", len(input_data)) - in_buffer = ffi.new("uint8_t[]", input_data) - next_in = ffi.new("uint8_t **", in_buffer) - - while True: - buffer_size = self._calculate_buffer_size( - input_data_len=len(input_data), - output_buffer_limit=output_buffer_limit, - chunks_len=chunks_len, - chunks_num=len(chunks), - ) + chunks = [] + chunks_len = 0 - available_out = ffi.new("size_t *", buffer_size) - out_buffer = ffi.new("uint8_t[]", buffer_size) - next_out = ffi.new("uint8_t **", out_buffer) - - rc = lib.BrotliDecoderDecompressStream(self._decoder, - available_in, - next_in, - available_out, - next_out, - ffi.NULL) - - # First, check for errors. - if rc == lib.BROTLI_DECODER_RESULT_ERROR: - error_code = lib.BrotliDecoderGetErrorCode(self._decoder) - error_message = lib.BrotliDecoderErrorString(error_code) - raise error( - b"Decompression error: %s" % ffi.string(error_message) - ) + available_in = ffi.new("size_t *", len(input_data)) + in_buffer = ffi.new("uint8_t[]", input_data) + next_in = ffi.new("uint8_t **", in_buffer) - # Next, copy the result out. - chunk = ffi.buffer(out_buffer, buffer_size - available_out[0])[:] - chunks.append(chunk) - chunks_len += len(chunk) - - # Save any unconsumed input for the next call. - if available_in[0] > 0: - remaining_input = ffi.buffer(next_in[0], available_in[0])[:] - self._unconsumed_data = remaining_input - - # Check if we've reached the output limit. - if ( - output_buffer_limit is not None - and chunks_len >= output_buffer_limit - ): - break - - if rc == lib.BROTLI_DECODER_RESULT_NEEDS_MORE_INPUT: - assert available_in[0] == 0 - break - elif rc == lib.BROTLI_DECODER_RESULT_SUCCESS: - break - else: - # It's cool if we need more output, we just loop again. - assert rc == lib.BROTLI_DECODER_RESULT_NEEDS_MORE_OUTPUT + while True: + buffer_size = self._calculate_buffer_size( + input_data_len=len(input_data), + output_buffer_limit=output_buffer_limit, + chunks_len=chunks_len, + chunks_num=len(chunks), + ) + available_out = ffi.new("size_t *", buffer_size) + out_buffer = ffi.new("uint8_t[]", buffer_size) + next_out = ffi.new("uint8_t **", out_buffer) + + rc = lib.BrotliDecoderDecompressStream(self._decoder, + available_in, + next_in, + available_out, + next_out, + ffi.NULL) + + # First, check for errors. + if rc == lib.BROTLI_DECODER_RESULT_ERROR: + error_code = lib.BrotliDecoderGetErrorCode(self._decoder) + error_message = lib.BrotliDecoderErrorString(error_code) + raise error( + b"Decompression error: %s" % ffi.string(error_message) + ) + + # Next, copy the result out. + chunk = ffi.buffer(out_buffer, buffer_size - available_out[0])[:] + chunks.append(chunk) + chunks_len += len(chunk) + + # Save any unconsumed input for the next call. + if available_in[0] > 0: + remaining_input = ffi.buffer(next_in[0], available_in[0])[:] + self._unconsumed_data = remaining_input + + # Check if we've reached the output limit. + if ( + output_buffer_limit is not None + and chunks_len >= output_buffer_limit + ): + break + + if rc == lib.BROTLI_DECODER_RESULT_NEEDS_MORE_INPUT: + assert available_in[0] == 0 + break + elif rc == lib.BROTLI_DECODER_RESULT_SUCCESS: + break + else: + # It's cool if we need more output, we just loop again. + assert rc == lib.BROTLI_DECODER_RESULT_NEEDS_MORE_OUTPUT + finally: + self.lock.release() return b''.join(chunks) process = decompress @@ -527,7 +541,14 @@ def is_finished(self): Returns ``True`` if the decompression stream is complete, ``False`` otherwise """ - return lib.BrotliDecoderIsFinished(self._decoder) == lib.BROTLI_TRUE + if not self.lock.acquire(blocking=False): + raise error( + "Concurrently sharing Decompressor instances is not allowed") + try: + return ( + lib.BrotliDecoderIsFinished(self._decoder) == lib.BROTLI_TRUE) + finally: + self.lock.release() def can_accept_more_data(self): """ @@ -550,8 +571,15 @@ def can_accept_more_data(self): more compressed data. :rtype: ``bool`` """ - if len(self._unconsumed_data) > 0: - return False - if lib.BrotliDecoderHasMoreOutput(self._decoder) == lib.BROTLI_TRUE: - return False - return True + if not self.lock.acquire(blocking=False): + raise error( + "Concurrently sharing Decompressor instances is not allowed") + try: + ret = True + if len(self._unconsumed_data) > 0: + ret = False + if lib.BrotliDecoderHasMoreOutput(self._decoder) == lib.BROTLI_TRUE: + ret = False + finally: + self.lock.release() + return ret From 583d633b9814a8f0e3519ab900979406fc58bf9d Mon Sep 17 00:00:00 2001 From: Nathan Goldbaum Date: Wed, 26 Nov 2025 13:34:54 -0700 Subject: [PATCH 06/22] add decompressor test --- test/test_multithreaded_sharing.py | 190 +++++++++++++++++++---------- 1 file changed, 127 insertions(+), 63 deletions(-) diff --git a/test/test_multithreaded_sharing.py b/test/test_multithreaded_sharing.py index 8cf5126..8c7b13f 100644 --- a/test/test_multithreaded_sharing.py +++ b/test/test_multithreaded_sharing.py @@ -12,69 +12,64 @@ import brotlicffi -def make_input(size): - abc = [bytes([b]) for b in b"abcdefghijklmnopqrstuvwxyz"] - abc_cap = [bytes([b]) for b in b"ABCDEFGHIJKLMNOPQRSTUVWXYZ"] - num_words_by_len = [0, 25, 100, 175, 1700, 1000, 1000, 1000] - word_set = set() - rng = random.Random() - rng.seed(2025) - words_by_len = [[]] - for word_len in range(1, len(num_words_by_len)): - num_words = num_words_by_len[word_len] - words = [] - for _ in range(num_words): - while True: - word = b"".join( - [rng.choice(abc_cap)] - + [rng.choice(abc) for _ in range(word_len - 1)] - ) - if word not in word_set: - word_set.add(word) - words.append(word) - break - words_by_len.append(words) - total_size = 0 - out = [] - while total_size < size: - word_len = rng.choice(range(1, len(num_words_by_len))) - word = rng.choice(words_by_len[word_len]) - total_size += len(word) - out.append(word) - return b"".join(out) - - -def _thread_compress(original, compressor, results): - compressed = compressor.process(original) - compressed += compressor.finish() - results.put(1) - - -def _thread_concurrent_process(compressor, results): - time.sleep(0.01) - try: - _ = compressor.process(b"whatever") - except brotlicffi.error: - results.put(2) - - -def _thread_concurrent_flush(compressor, results): - time.sleep(0.02) - try: - _ = compressor.flush() - except brotlicffi.error: - results.put(3) - - -def _thread_concurrent_finish(compressor, results): - time.sleep(0.03) - try: - _ = compressor.finish() - except brotlicffi.error: - results.put(4) - - -def test_concurrency(): +def test_compress_concurrency(): + def make_input(size): + abc = [bytes([b]) for b in b"abcdefghijklmnopqrstuvwxyz"] + abc_cap = [bytes([b]) for b in b"ABCDEFGHIJKLMNOPQRSTUVWXYZ"] + num_words_by_len = [0, 25, 100, 175, 1700, 1000, 1000, 1000] + word_set = set() + rng = random.Random() + rng.seed(0x4d3d3d3) + words_by_len = [[]] + for word_len in range(1, len(num_words_by_len)): + num_words = num_words_by_len[word_len] + words = [] + for _ in range(num_words): + while True: + word = b"".join( + [rng.choice(abc_cap)] + + [rng.choice(abc) for _ in range(word_len - 1)] + ) + if word not in word_set: + word_set.add(word) + words.append(word) + break + words_by_len.append(words) + total_size = 0 + out = [] + while total_size < size: + word_len = rng.choice(range(1, len(num_words_by_len))) + word = rng.choice(words_by_len[word_len]) + total_size += len(word) + out.append(word) + return b"".join(out) + + def _thread_compress(original, compressor, results): + compressed = compressor.process(original) + compressed += compressor.finish() + results.put(1) + + def _thread_concurrent_process(compressor, results): + time.sleep(0.001) + try: + _ = compressor.process(b"whatever") + except brotlicffi.error: + results.put(2) + + def _thread_concurrent_flush(compressor, results): + time.sleep(0.002) + try: + _ = compressor.flush() + except brotlicffi.error: + results.put(3) + + def _thread_concurrent_finish(compressor, results): + time.sleep(0.003) + try: + _ = compressor.finish() + except brotlicffi.error: + results.put(4) + original = make_input(2 * 1024 * 1024) compressor = brotlicffi.Compressor(quality=9) results = queue.Queue() @@ -104,3 +99,72 @@ def test_concurrency(): for thread in threads: thread.join() assert sorted(list(results.queue)) == [1, 2, 3, 4] + + +def test_decompressor_concurrency(): + def make_input(size): + compressor = brotlicffi.Compressor(quality=1) + prologue = compressor.process(b'b') + prologue += compressor.flush() + filler = compressor.process(b'c') + filler += compressor.flush() + epilogue = compressor.finish() + return b''.join( + [prologue] + [filler] * (size // len(filler)) + [epilogue]) + + def _thread_decompress(compressed, decompressor, results): + _ = decompressor.process(compressed) + if decompressor.is_finished(): + results.put(1) + + def _thread_concurrent_process(decompressor, results): + time.sleep(0.001) + try: + _ = decompressor.process(b'') + except brotlicffi.error: + results.put(2) + + def _thread_concurrent_can_accept_more_data(decompressor, results): + time.sleep(0.002) + try: + _ = decompressor.can_accept_more_data() + except brotlicffi.error: + results.put(3) + + def _thread_concurrent_is_finished(decompressor, results): + time.sleep(0.03) + try: + _ = decompressor.is_finished() + except brotlicffi.error: + results.put(4) + + compressed = make_input(16 * 1024 * 1024) + decompressor = brotlicffi.Decompressor() + results = queue.Queue() + threads = [] + threads.append( + threading.Thread( + target=_thread_decompress, args=(compressed, decompressor, results) + ) + ) + threads.append( + threading.Thread( + target=_thread_concurrent_process, args=(decompressor, results) + ) + ) + threads.append( + threading.Thread( + target=_thread_concurrent_can_accept_more_data, + args=(decompressor, results), + ) + ) + threads.append( + threading.Thread( + target=_thread_concurrent_is_finished, args=(decompressor, results) + ) + ) + for thread in threads: + thread.start() + for thread in threads: + thread.join() + assert sorted(list(results.queue)) == [1, 2, 3, 4] From 8f65ef0e3c2fb59532691aee78a47296debc81dc Mon Sep 17 00:00:00 2001 From: Nathan Goldbaum Date: Wed, 26 Nov 2025 13:37:33 -0700 Subject: [PATCH 07/22] use a glob pattern to download wheel artifacts --- .github/workflows/ci.yml | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 868c99f..1ceaa1b 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -112,15 +112,15 @@ jobs: path: dist/ - uses: actions/download-artifact@v6 with: - name: wheels-windows-latest + pattern: wheels-windows-latest-* path: dist/ - uses: actions/download-artifact@v6 with: - name: wheels-macos-latest + pattern: wheels-macos-latest-* path: dist/ - uses: actions/download-artifact@v6 with: - name: wheels-ubuntu-latest + pattern: wheels-ubuntu-latest-* path: dist/ - name: Publish to PyPI if: github.event_name == 'push' && startsWith(github.event.ref, 'refs/tags/') From 29a3c0b7eae376a03e726d57900fd7d15a1693eb Mon Sep 17 00:00:00 2001 From: Nathan Goldbaum Date: Wed, 26 Nov 2025 13:46:37 -0700 Subject: [PATCH 08/22] appease linter --- src/brotlicffi/_api.py | 135 ++++++++++--------- test/test_multithreaded_sharing.py | 209 +++++++++++++++-------------- 2 files changed, 181 insertions(+), 163 deletions(-) diff --git a/src/brotlicffi/_api.py b/src/brotlicffi/_api.py index 8ba5521..30ad7fe 100644 --- a/src/brotlicffi/_api.py +++ b/src/brotlicffi/_api.py @@ -433,75 +433,79 @@ def decompress(self, data, output_buffer_limit=None): raise error( "Concurrently sharing Decompressor instances is not allowed") try: - # Use unconsumed data if available, use new data otherwise. - if self._unconsumed_data: - input_data = self._unconsumed_data - self._unconsumed_data = b'' - else: - input_data = data + chunks = self._decompress(data, output_buffer_limit) + finally: + self.lock.release() + return b''.join(chunks) - chunks = [] - chunks_len = 0 + def _decompress(self, data, output_buffer_limit): + # Use unconsumed data if available, use new data otherwise. + if self._unconsumed_data: + input_data = self._unconsumed_data + self._unconsumed_data = b'' + else: + input_data = data - available_in = ffi.new("size_t *", len(input_data)) - in_buffer = ffi.new("uint8_t[]", input_data) - next_in = ffi.new("uint8_t **", in_buffer) + chunks = [] + chunks_len = 0 + + available_in = ffi.new("size_t *", len(input_data)) + in_buffer = ffi.new("uint8_t[]", input_data) + next_in = ffi.new("uint8_t **", in_buffer) + + while True: + buffer_size = self._calculate_buffer_size( + input_data_len=len(input_data), + output_buffer_limit=output_buffer_limit, + chunks_len=chunks_len, + chunks_num=len(chunks), + ) - while True: - buffer_size = self._calculate_buffer_size( - input_data_len=len(input_data), - output_buffer_limit=output_buffer_limit, - chunks_len=chunks_len, - chunks_num=len(chunks), + available_out = ffi.new("size_t *", buffer_size) + out_buffer = ffi.new("uint8_t[]", buffer_size) + next_out = ffi.new("uint8_t **", out_buffer) + + rc = lib.BrotliDecoderDecompressStream(self._decoder, + available_in, + next_in, + available_out, + next_out, + ffi.NULL) + + # First, check for errors. + if rc == lib.BROTLI_DECODER_RESULT_ERROR: + error_code = lib.BrotliDecoderGetErrorCode(self._decoder) + error_message = lib.BrotliDecoderErrorString(error_code) + raise error( + b"Decompression error: %s" % ffi.string(error_message) ) - available_out = ffi.new("size_t *", buffer_size) - out_buffer = ffi.new("uint8_t[]", buffer_size) - next_out = ffi.new("uint8_t **", out_buffer) - - rc = lib.BrotliDecoderDecompressStream(self._decoder, - available_in, - next_in, - available_out, - next_out, - ffi.NULL) - - # First, check for errors. - if rc == lib.BROTLI_DECODER_RESULT_ERROR: - error_code = lib.BrotliDecoderGetErrorCode(self._decoder) - error_message = lib.BrotliDecoderErrorString(error_code) - raise error( - b"Decompression error: %s" % ffi.string(error_message) - ) - - # Next, copy the result out. - chunk = ffi.buffer(out_buffer, buffer_size - available_out[0])[:] - chunks.append(chunk) - chunks_len += len(chunk) - - # Save any unconsumed input for the next call. - if available_in[0] > 0: - remaining_input = ffi.buffer(next_in[0], available_in[0])[:] - self._unconsumed_data = remaining_input - - # Check if we've reached the output limit. - if ( - output_buffer_limit is not None - and chunks_len >= output_buffer_limit - ): - break - - if rc == lib.BROTLI_DECODER_RESULT_NEEDS_MORE_INPUT: - assert available_in[0] == 0 - break - elif rc == lib.BROTLI_DECODER_RESULT_SUCCESS: - break - else: - # It's cool if we need more output, we just loop again. - assert rc == lib.BROTLI_DECODER_RESULT_NEEDS_MORE_OUTPUT - finally: - self.lock.release() - return b''.join(chunks) + # Next, copy the result out. + chunk = ffi.buffer(out_buffer, buffer_size - available_out[0])[:] + chunks.append(chunk) + chunks_len += len(chunk) + + # Save any unconsumed input for the next call. + if available_in[0] > 0: + remaining_input = ffi.buffer(next_in[0], available_in[0])[:] + self._unconsumed_data = remaining_input + + # Check if we've reached the output limit. + if ( + output_buffer_limit is not None + and chunks_len >= output_buffer_limit + ): + break + + if rc == lib.BROTLI_DECODER_RESULT_NEEDS_MORE_INPUT: + assert available_in[0] == 0 + break + elif rc == lib.BROTLI_DECODER_RESULT_SUCCESS: + break + else: + # It's cool if we need more output, we just loop again. + assert rc == lib.BROTLI_DECODER_RESULT_NEEDS_MORE_OUTPUT + return chunks process = decompress @@ -578,7 +582,8 @@ def can_accept_more_data(self): ret = True if len(self._unconsumed_data) > 0: ret = False - if lib.BrotliDecoderHasMoreOutput(self._decoder) == lib.BROTLI_TRUE: + if ((lib.BrotliDecoderHasMoreOutput(self._decoder) == + lib.BROTLI_TRUE)): ret = False finally: self.lock.release() diff --git a/test/test_multithreaded_sharing.py b/test/test_multithreaded_sharing.py index 8c7b13f..966b25b 100644 --- a/test/test_multithreaded_sharing.py +++ b/test/test_multithreaded_sharing.py @@ -12,65 +12,70 @@ import brotlicffi -def test_compress_concurrency(): - def make_input(size): - abc = [bytes([b]) for b in b"abcdefghijklmnopqrstuvwxyz"] - abc_cap = [bytes([b]) for b in b"ABCDEFGHIJKLMNOPQRSTUVWXYZ"] - num_words_by_len = [0, 25, 100, 175, 1700, 1000, 1000, 1000] - word_set = set() - rng = random.Random() - rng.seed(0x4d3d3d3) - words_by_len = [[]] - for word_len in range(1, len(num_words_by_len)): - num_words = num_words_by_len[word_len] - words = [] - for _ in range(num_words): - while True: - word = b"".join( - [rng.choice(abc_cap)] - + [rng.choice(abc) for _ in range(word_len - 1)] - ) - if word not in word_set: - word_set.add(word) - words.append(word) - break - words_by_len.append(words) - total_size = 0 - out = [] - while total_size < size: - word_len = rng.choice(range(1, len(num_words_by_len))) - word = rng.choice(words_by_len[word_len]) - total_size += len(word) - out.append(word) - return b"".join(out) - - def _thread_compress(original, compressor, results): - compressed = compressor.process(original) - compressed += compressor.finish() - results.put(1) +def make_compress_input(size): + abc = [bytes([b]) for b in b"abcdefghijklmnopqrstuvwxyz"] + abc_cap = [bytes([b]) for b in b"ABCDEFGHIJKLMNOPQRSTUVWXYZ"] + num_words_by_len = [0, 25, 100, 175, 1700, 1000, 1000, 1000] + word_set = set() + rng = random.Random() + rng.seed(0x4d3d3d3) + words_by_len = [[]] + for word_len in range(1, len(num_words_by_len)): + num_words = num_words_by_len[word_len] + words = [] + for _ in range(num_words): + while True: + word = b"".join( + [rng.choice(abc_cap)] + + [rng.choice(abc) for _ in range(word_len - 1)] + ) + if word not in word_set: + word_set.add(word) + words.append(word) + break + words_by_len.append(words) + total_size = 0 + out = [] + while total_size < size: + word_len = rng.choice(range(1, len(num_words_by_len))) + word = rng.choice(words_by_len[word_len]) + total_size += len(word) + out.append(word) + return b"".join(out) + + +def _thread_compress(original, compressor, results): + compressed = compressor.process(original) + compressed += compressor.finish() + results.put(1) + + +def _thread_concurrent_process_compress(compressor, results): + time.sleep(0.001) + try: + _ = compressor.process(b"whatever") + except brotlicffi.error: + results.put(2) + + +def _thread_concurrent_flush_compress(compressor, results): + time.sleep(0.002) + try: + _ = compressor.flush() + except brotlicffi.error: + results.put(3) + + +def _thread_concurrent_finish_compress(compressor, results): + time.sleep(0.003) + try: + _ = compressor.finish() + except brotlicffi.error: + results.put(4) - def _thread_concurrent_process(compressor, results): - time.sleep(0.001) - try: - _ = compressor.process(b"whatever") - except brotlicffi.error: - results.put(2) - - def _thread_concurrent_flush(compressor, results): - time.sleep(0.002) - try: - _ = compressor.flush() - except brotlicffi.error: - results.put(3) - - def _thread_concurrent_finish(compressor, results): - time.sleep(0.003) - try: - _ = compressor.finish() - except brotlicffi.error: - results.put(4) - - original = make_input(2 * 1024 * 1024) + +def test_compress_concurrency(): + original = make_compress_input(2 * 1024 * 1024) compressor = brotlicffi.Compressor(quality=9) results = queue.Queue() threads = [] @@ -81,17 +86,20 @@ def _thread_concurrent_finish(compressor, results): ) threads.append( threading.Thread( - target=_thread_concurrent_process, args=(compressor, results) + target=_thread_concurrent_process_compress, + args=(compressor, results) ) ) threads.append( threading.Thread( - target=_thread_concurrent_flush, args=(compressor, results) + target=_thread_concurrent_flush_compress, + args=(compressor, results) ) ) threads.append( threading.Thread( - target=_thread_concurrent_finish, args=(compressor, results) + target=_thread_concurrent_finish_compress, + args=(compressor, results) ) ) for thread in threads: @@ -101,44 +109,49 @@ def _thread_concurrent_finish(compressor, results): assert sorted(list(results.queue)) == [1, 2, 3, 4] +def make_decompress_input(size): + compressor = brotlicffi.Compressor(quality=1) + prologue = compressor.process(b'b') + prologue += compressor.flush() + filler = compressor.process(b'c') + filler += compressor.flush() + epilogue = compressor.finish() + return b''.join( + [prologue] + [filler] * (size // len(filler)) + [epilogue]) + + +def _thread_decompress(compressed, decompressor, results): + _ = decompressor.process(compressed) + if decompressor.is_finished(): + results.put(1) + + +def _thread_concurrent_process(decompressor, results): + time.sleep(0.001) + try: + _ = decompressor.process(b'') + except brotlicffi.error: + results.put(2) + + +def _thread_concurrent_can_accept_more_data(decompressor, results): + time.sleep(0.002) + try: + _ = decompressor.can_accept_more_data() + except brotlicffi.error: + results.put(3) + + +def _thread_concurrent_is_finished(decompressor, results): + time.sleep(0.03) + try: + _ = decompressor.is_finished() + except brotlicffi.error: + results.put(4) + + def test_decompressor_concurrency(): - def make_input(size): - compressor = brotlicffi.Compressor(quality=1) - prologue = compressor.process(b'b') - prologue += compressor.flush() - filler = compressor.process(b'c') - filler += compressor.flush() - epilogue = compressor.finish() - return b''.join( - [prologue] + [filler] * (size // len(filler)) + [epilogue]) - - def _thread_decompress(compressed, decompressor, results): - _ = decompressor.process(compressed) - if decompressor.is_finished(): - results.put(1) - - def _thread_concurrent_process(decompressor, results): - time.sleep(0.001) - try: - _ = decompressor.process(b'') - except brotlicffi.error: - results.put(2) - - def _thread_concurrent_can_accept_more_data(decompressor, results): - time.sleep(0.002) - try: - _ = decompressor.can_accept_more_data() - except brotlicffi.error: - results.put(3) - - def _thread_concurrent_is_finished(decompressor, results): - time.sleep(0.03) - try: - _ = decompressor.is_finished() - except brotlicffi.error: - results.put(4) - - compressed = make_input(16 * 1024 * 1024) + compressed = make_decompress_input(16 * 1024 * 1024) decompressor = brotlicffi.Decompressor() results = queue.Queue() threads = [] From 087c9491d7b98023af3e01e8186eb0cb4e04f93b Mon Sep 17 00:00:00 2001 From: Nathan Goldbaum Date: Wed, 26 Nov 2025 14:04:04 -0700 Subject: [PATCH 09/22] explicitly run a tox environment --- .github/workflows/ci.yml | 15 ++++++++++++--- 1 file changed, 12 insertions(+), 3 deletions(-) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 1ceaa1b..5a40564 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -22,7 +22,16 @@ jobs: fail-fast: false matrix: os: [ubuntu-latest, macos-latest, windows-latest] - python-version: ["3.8", "3.9", "3.10", "3.11", "3.12", "3.13", "3.14", "3.14t"] + python-version: [ + ["3.8", "py38"], + ["3.9", "py39"], + ["3.10", "py310"], + ["3.11", "py311"], + ["3.12", "py312"], + ["3.13", "py313"], + ["3.14", "py314"], + ["3.14t", "py314t"], + ] steps: - name: Set git to use LF on Windows if: runner.os == 'Windows' @@ -34,12 +43,12 @@ jobs: submodules: recursive - uses: actions/setup-python@v6 with: - python-version: ${{ matrix.python-version }} + python-version: ${{ matrix.python-version[0] }} allow-prereleases: true - name: Run tests run: | python -m pip install tox - tox --skip-missing-interpreters + tox -e ${{ matrix.python-version[1] }} package-sdist: runs-on: ubuntu-latest From 1c250a2d3df08f45e3a9f46fcb05fcb86d4d3286 Mon Sep 17 00:00:00 2001 From: Nathan Goldbaum Date: Wed, 26 Nov 2025 14:12:23 -0700 Subject: [PATCH 10/22] adjust test timing --- test/test_multithreaded_sharing.py | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/test/test_multithreaded_sharing.py b/test/test_multithreaded_sharing.py index 966b25b..c8576c1 100644 --- a/test/test_multithreaded_sharing.py +++ b/test/test_multithreaded_sharing.py @@ -51,7 +51,7 @@ def _thread_compress(original, compressor, results): def _thread_concurrent_process_compress(compressor, results): - time.sleep(0.001) + time.sleep(0.01) try: _ = compressor.process(b"whatever") except brotlicffi.error: @@ -59,7 +59,7 @@ def _thread_concurrent_process_compress(compressor, results): def _thread_concurrent_flush_compress(compressor, results): - time.sleep(0.002) + time.sleep(0.02) try: _ = compressor.flush() except brotlicffi.error: @@ -67,7 +67,7 @@ def _thread_concurrent_flush_compress(compressor, results): def _thread_concurrent_finish_compress(compressor, results): - time.sleep(0.003) + time.sleep(0.03) try: _ = compressor.finish() except brotlicffi.error: @@ -127,7 +127,7 @@ def _thread_decompress(compressed, decompressor, results): def _thread_concurrent_process(decompressor, results): - time.sleep(0.001) + time.sleep(0.01) try: _ = decompressor.process(b'') except brotlicffi.error: @@ -135,7 +135,7 @@ def _thread_concurrent_process(decompressor, results): def _thread_concurrent_can_accept_more_data(decompressor, results): - time.sleep(0.002) + time.sleep(0.02) try: _ = decompressor.can_accept_more_data() except brotlicffi.error: From d73408bd98d689145393b63a658daac0350a8cdd Mon Sep 17 00:00:00 2001 From: Nathan Goldbaum Date: Wed, 26 Nov 2025 14:17:46 -0700 Subject: [PATCH 11/22] build pypy wheels in parallel with abi3 wheels --- .github/workflows/ci.yml | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 5a40564..36ba049 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -75,7 +75,8 @@ jobs: matrix: os: [ubuntu-latest, macos-latest, windows-latest] python-version: [ - ["3.x", "cp38-* pp*-*"], + ["3.x", "cp38-*"], + ["3.x", "pp*-*"], ["3.14t", "cp314t-*"], ] From 5cd5e970576bb1fc33f15be2d9fbeb16ee4cc4c2 Mon Sep 17 00:00:00 2001 From: Nathan Goldbaum Date: Wed, 26 Nov 2025 14:26:48 -0700 Subject: [PATCH 12/22] fix upload artifact name --- .github/workflows/ci.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 36ba049..e62dbc7 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -108,7 +108,7 @@ jobs: CIBW_BEFORE_BUILD_LINUX: yum install -y libffi-devel - uses: actions/upload-artifact@v4 with: - name: wheels-${{ matrix.os }}-${{ matrix.python-version[0] }} + name: wheels-${{ matrix.os }}-${{ matrix.python-version[0] }}-${{ matrix.python-version[1] }} path: ./wheelhouse/*.whl publish: From 3c5337b642e5d8e1a8b467d3bcb411876c6644d5 Mon Sep 17 00:00:00 2001 From: Nathan Goldbaum Date: Wed, 26 Nov 2025 14:30:34 -0700 Subject: [PATCH 13/22] tweak upload artifact fix --- .github/workflows/ci.yml | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index e62dbc7..9f3f33a 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -75,9 +75,9 @@ jobs: matrix: os: [ubuntu-latest, macos-latest, windows-latest] python-version: [ - ["3.x", "cp38-*"], - ["3.x", "pp*-*"], - ["3.14t", "cp314t-*"], + ["3.x", "cp38"], + ["3.x", "pp311"], + ["3.14t", "cp314t"], ] steps: @@ -101,7 +101,7 @@ jobs: - name: Build wheels run: python -m cibuildwheel --output-dir wheelhouse env: - CIBW_BUILD: ${{ matrix.python-version[1] }} + CIBW_BUILD: ${{ matrix.python-version[1] }}-* CIBW_SKIP: "*musllinux*" CIBW_ENABLE: pypy CIBW_ARCHS_LINUX: auto aarch64 From 3e768bae3c8a24f829200c382d563dbb5f22492c Mon Sep 17 00:00:00 2001 From: Nathan Goldbaum Date: Wed, 26 Nov 2025 14:34:26 -0700 Subject: [PATCH 14/22] simplify matrix --- .github/workflows/ci.yml | 11 +++-------- 1 file changed, 3 insertions(+), 8 deletions(-) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 9f3f33a..1897261 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -74,11 +74,6 @@ jobs: strategy: matrix: os: [ubuntu-latest, macos-latest, windows-latest] - python-version: [ - ["3.x", "cp38"], - ["3.x", "pp311"], - ["3.14t", "cp314t"], - ] steps: - uses: actions/checkout@v5 @@ -86,7 +81,7 @@ jobs: submodules: recursive - uses: actions/setup-python@v6 with: - python-version: ${{ matrix.python-version[0] }} + python-version: 3.x - name: Install cibuildwheel run: python -m pip install cibuildwheel # - name: Install Visual C++ for Python 2.7 @@ -101,14 +96,14 @@ jobs: - name: Build wheels run: python -m cibuildwheel --output-dir wheelhouse env: - CIBW_BUILD: ${{ matrix.python-version[1] }}-* + CIBW_BUILD: "cp38-* pp*-* cp314t-*" CIBW_SKIP: "*musllinux*" CIBW_ENABLE: pypy CIBW_ARCHS_LINUX: auto aarch64 CIBW_BEFORE_BUILD_LINUX: yum install -y libffi-devel - uses: actions/upload-artifact@v4 with: - name: wheels-${{ matrix.os }}-${{ matrix.python-version[0] }}-${{ matrix.python-version[1] }} + name: wheels-${{ matrix.os }}-${{ matrix.wheel-tag }} path: ./wheelhouse/*.whl publish: From dd6c59c66ba1e4ae08ccd15a7641817709bb163b Mon Sep 17 00:00:00 2001 From: Nathan Goldbaum Date: Wed, 26 Nov 2025 14:44:38 -0700 Subject: [PATCH 15/22] don't depend on timing --- test/test_multithreaded_sharing.py | 21 ++++++++++++++------- 1 file changed, 14 insertions(+), 7 deletions(-) diff --git a/test/test_multithreaded_sharing.py b/test/test_multithreaded_sharing.py index c8576c1..edfe16d 100644 --- a/test/test_multithreaded_sharing.py +++ b/test/test_multithreaded_sharing.py @@ -7,7 +7,6 @@ import queue import random import threading -import time import brotlicffi @@ -44,14 +43,18 @@ def make_compress_input(size): return b"".join(out) +compress_started = threading.Event() + + def _thread_compress(original, compressor, results): + compress_started.set() compressed = compressor.process(original) compressed += compressor.finish() results.put(1) def _thread_concurrent_process_compress(compressor, results): - time.sleep(0.01) + compress_started.wait() try: _ = compressor.process(b"whatever") except brotlicffi.error: @@ -59,7 +62,7 @@ def _thread_concurrent_process_compress(compressor, results): def _thread_concurrent_flush_compress(compressor, results): - time.sleep(0.02) + compress_started.wait() try: _ = compressor.flush() except brotlicffi.error: @@ -67,7 +70,7 @@ def _thread_concurrent_flush_compress(compressor, results): def _thread_concurrent_finish_compress(compressor, results): - time.sleep(0.03) + compress_started.wait() try: _ = compressor.finish() except brotlicffi.error: @@ -120,14 +123,18 @@ def make_decompress_input(size): [prologue] + [filler] * (size // len(filler)) + [epilogue]) +decompress_started = threading.Event() + + def _thread_decompress(compressed, decompressor, results): + decompress_started.set() _ = decompressor.process(compressed) if decompressor.is_finished(): results.put(1) def _thread_concurrent_process(decompressor, results): - time.sleep(0.01) + decompress_started.wait() try: _ = decompressor.process(b'') except brotlicffi.error: @@ -135,7 +142,7 @@ def _thread_concurrent_process(decompressor, results): def _thread_concurrent_can_accept_more_data(decompressor, results): - time.sleep(0.02) + decompress_started.wait() try: _ = decompressor.can_accept_more_data() except brotlicffi.error: @@ -143,7 +150,7 @@ def _thread_concurrent_can_accept_more_data(decompressor, results): def _thread_concurrent_is_finished(decompressor, results): - time.sleep(0.03) + decompress_started.wait() try: _ = decompressor.is_finished() except brotlicffi.error: From bf935bef502ab97cff2f2429ba86c6de038422b1 Mon Sep 17 00:00:00 2001 From: Nathan Goldbaum Date: Wed, 26 Nov 2025 14:52:19 -0700 Subject: [PATCH 16/22] adjust Compressor locking --- src/brotlicffi/_api.py | 56 +++++++++++++++++++++++++----------------- 1 file changed, 34 insertions(+), 22 deletions(-) diff --git a/src/brotlicffi/_api.py b/src/brotlicffi/_api.py index 30ad7fe..5c7d995 100644 --- a/src/brotlicffi/_api.py +++ b/src/brotlicffi/_api.py @@ -250,7 +250,7 @@ def __init__(self, quality=lib.BROTLI_DEFAULT_QUALITY, lgwin=lib.BROTLI_DEFAULT_WINDOW, lgblock=0): - self.lock = threading.Lock() + self.lock = threading.RLock() enc = lib.BrotliEncoderCreateInstance( ffi.NULL, ffi.NULL, ffi.NULL ) @@ -273,23 +273,23 @@ def _compress(self, data, operation): because almost all of the code uses the exact same setup. It wouldn't have to, but it doesn't hurt at all. """ - # The 'algorithm' for working out how big to make this buffer is from - # the Brotli source code, brotlimodule.cc. - original_output_size = int( - math.ceil(len(data) + (len(data) >> 2) + 10240) - ) - available_out = ffi.new("size_t *") - available_out[0] = original_output_size - output_buffer = ffi.new("uint8_t []", available_out[0]) - ptr_to_output_buffer = ffi.new("uint8_t **", output_buffer) - input_size = ffi.new("size_t *", len(data)) - input_buffer = ffi.new("uint8_t []", data) - ptr_to_input_buffer = ffi.new("uint8_t **", input_buffer) - if not self.lock.acquire(blocking=False): raise error( "Concurrently sharing Compressor objects is not allowed") try: + # The 'algorithm' for working out how big to make this buffer is + # from the Brotli source code, brotlimodule.cc. + original_output_size = int( + math.ceil(len(data) + (len(data) >> 2) + 10240) + ) + available_out = ffi.new("size_t *") + available_out[0] = original_output_size + output_buffer = ffi.new("uint8_t []", available_out[0]) + ptr_to_output_buffer = ffi.new("uint8_t **", output_buffer) + input_size = ffi.new("size_t *", len(data)) + input_buffer = ffi.new("uint8_t []", data) + ptr_to_input_buffer = ffi.new("uint8_t **", input_buffer) + rc = lib.BrotliEncoderCompressStream( self._encoder, operation, @@ -328,11 +328,17 @@ def flush(self): will not destroy the compressor. It can be used, for example, to ensure that given chunks of content will decompress immediately. """ - chunks = [self._compress(b'', lib.BROTLI_OPERATION_FLUSH)] - - while lib.BrotliEncoderHasMoreOutput(self._encoder) == lib.BROTLI_TRUE: - chunks.append(self._compress(b'', lib.BROTLI_OPERATION_FLUSH)) + if not self.lock.acquire(blocking=False): + raise error( + "Concurrently sharing Compressor objects is not allowed") + try: + chunks = [self._compress(b'', lib.BROTLI_OPERATION_FLUSH)] + while ((lib.BrotliEncoderHasMoreOutput(self._encoder) == + lib.BROTLI_TRUE)): + chunks.append(self._compress(b'', lib.BROTLI_OPERATION_FLUSH)) + finally: + self.lock.release() return b''.join(chunks) def finish(self): @@ -341,10 +347,16 @@ def finish(self): transition the compressor to a completed state. The compressor cannot be used again after this point, and must be replaced. """ - chunks = [] - while lib.BrotliEncoderIsFinished(self._encoder) == lib.BROTLI_FALSE: - chunks.append(self._compress(b'', lib.BROTLI_OPERATION_FINISH)) - + if not self.lock.acquire(blocking=False): + raise error( + "Concurrently sharing Compressor objects is not allowed") + try: + chunks = [] + while ((lib.BrotliEncoderIsFinished(self._encoder) == + lib.BROTLI_FALSE)): + chunks.append(self._compress(b'', lib.BROTLI_OPERATION_FINISH)) + finally: + self.lock.release() return b''.join(chunks) From afb4700359e2c7197e8d7414fc4b66de8e21657a Mon Sep 17 00:00:00 2001 From: Nathan Goldbaum Date: Wed, 26 Nov 2025 14:57:51 -0700 Subject: [PATCH 17/22] adjust locking once more --- src/brotlicffi/_api.py | 21 ++++++++++----------- 1 file changed, 10 insertions(+), 11 deletions(-) diff --git a/src/brotlicffi/_api.py b/src/brotlicffi/_api.py index 5c7d995..607ceb7 100644 --- a/src/brotlicffi/_api.py +++ b/src/brotlicffi/_api.py @@ -430,17 +430,6 @@ def decompress(self, data, output_buffer_limit=None): :type output_buffer_limit: ``int`` or ``None`` :returns: A bytestring containing the decompressed data. """ - if self._unconsumed_data and data: - raise error( - "brotli: decoder process called with data when " - "'can_accept_more_data()' is False" - ) - - # We should avoid operations on the `self._unconsumed_data` if no data - # is to be processed. - if output_buffer_limit is not None and output_buffer_limit <= 0: - return b'' - if not self.lock.acquire(blocking=False): raise error( "Concurrently sharing Decompressor instances is not allowed") @@ -451,6 +440,16 @@ def decompress(self, data, output_buffer_limit=None): return b''.join(chunks) def _decompress(self, data, output_buffer_limit): + if self._unconsumed_data and data: + raise error( + "brotli: decoder process called with data when " + "'can_accept_more_data()' is False" + ) + + # We should avoid operations on the `self._unconsumed_data` if no data + # is to be processed. + if output_buffer_limit is not None and output_buffer_limit <= 0: + return b'' # Use unconsumed data if available, use new data otherwise. if self._unconsumed_data: input_data = self._unconsumed_data From d3c56640d79c85f4de27c57745e14c65732d48d4 Mon Sep 17 00:00:00 2001 From: Nathan Goldbaum Date: Wed, 26 Nov 2025 15:06:54 -0700 Subject: [PATCH 18/22] adjust timing again --- test/test_multithreaded_sharing.py | 67 +++++++++++++++++------------- 1 file changed, 37 insertions(+), 30 deletions(-) diff --git a/test/test_multithreaded_sharing.py b/test/test_multithreaded_sharing.py index edfe16d..aa05151 100644 --- a/test/test_multithreaded_sharing.py +++ b/test/test_multithreaded_sharing.py @@ -7,6 +7,7 @@ import queue import random import threading +import time import brotlicffi @@ -43,34 +44,34 @@ def make_compress_input(size): return b"".join(out) -compress_started = threading.Event() - - -def _thread_compress(original, compressor, results): - compress_started.set() +def _thread_compress(original, compressor, barrier, results): + barrier.wait() compressed = compressor.process(original) compressed += compressor.finish() results.put(1) -def _thread_concurrent_process_compress(compressor, results): - compress_started.wait() +def _thread_concurrent_process_compress(compressor, barrier, results): + barrier.wait() + time.sleep(.001) try: _ = compressor.process(b"whatever") except brotlicffi.error: results.put(2) -def _thread_concurrent_flush_compress(compressor, results): - compress_started.wait() +def _thread_concurrent_flush_compress(compressor, barrier, results): + barrier.wait() + time.sleep(.001) try: _ = compressor.flush() except brotlicffi.error: results.put(3) -def _thread_concurrent_finish_compress(compressor, results): - compress_started.wait() +def _thread_concurrent_finish_compress(compressor, barrier, results): + barrier.wait() + time.sleep(.001) try: _ = compressor.finish() except brotlicffi.error: @@ -81,28 +82,30 @@ def test_compress_concurrency(): original = make_compress_input(2 * 1024 * 1024) compressor = brotlicffi.Compressor(quality=9) results = queue.Queue() + barrier = threading.Barrier(4) threads = [] threads.append( threading.Thread( - target=_thread_compress, args=(original, compressor, results) + target=_thread_compress, + args=(original, compressor, barrier, results) ) ) threads.append( threading.Thread( target=_thread_concurrent_process_compress, - args=(compressor, results) + args=(compressor, barrier, results) ) ) threads.append( threading.Thread( target=_thread_concurrent_flush_compress, - args=(compressor, results) + args=(compressor, barrier, results) ) ) threads.append( threading.Thread( target=_thread_concurrent_finish_compress, - args=(compressor, results) + args=(compressor, barrier, results) ) ) for thread in threads: @@ -123,34 +126,34 @@ def make_decompress_input(size): [prologue] + [filler] * (size // len(filler)) + [epilogue]) -decompress_started = threading.Event() - - -def _thread_decompress(compressed, decompressor, results): - decompress_started.set() +def _thread_decompress(compressed, decompressor, barrier, results): + barrier.wait() _ = decompressor.process(compressed) if decompressor.is_finished(): results.put(1) -def _thread_concurrent_process(decompressor, results): - decompress_started.wait() +def _thread_concurrent_process(decompressor, barrier, results): + barrier.wait() + time.sleep(.001) try: _ = decompressor.process(b'') except brotlicffi.error: results.put(2) -def _thread_concurrent_can_accept_more_data(decompressor, results): - decompress_started.wait() +def _thread_concurrent_can_accept_more_data(decompressor, barrier, results): + barrier.wait() + time.sleep(.001) try: _ = decompressor.can_accept_more_data() except brotlicffi.error: results.put(3) -def _thread_concurrent_is_finished(decompressor, results): - decompress_started.wait() +def _thread_concurrent_is_finished(decompressor, barrier, results): + barrier.wait() + time.sleep(.001) try: _ = decompressor.is_finished() except brotlicffi.error: @@ -161,26 +164,30 @@ def test_decompressor_concurrency(): compressed = make_decompress_input(16 * 1024 * 1024) decompressor = brotlicffi.Decompressor() results = queue.Queue() + barrier = threading.Barrier(4) threads = [] threads.append( threading.Thread( - target=_thread_decompress, args=(compressed, decompressor, results) + target=_thread_decompress, + args=(compressed, decompressor, barrier, results) ) ) threads.append( threading.Thread( - target=_thread_concurrent_process, args=(decompressor, results) + target=_thread_concurrent_process, + args=(decompressor, barrier, results) ) ) threads.append( threading.Thread( target=_thread_concurrent_can_accept_more_data, - args=(decompressor, results), + args=(decompressor, barrier, results), ) ) threads.append( threading.Thread( - target=_thread_concurrent_is_finished, args=(decompressor, results) + target=_thread_concurrent_is_finished, + args=(decompressor, barrier, results) ) ) for thread in threads: From 6b2d9c7ee3ed1a8bf3a7ad96c3349d1e095321a6 Mon Sep 17 00:00:00 2001 From: Nathan Goldbaum Date: Wed, 26 Nov 2025 15:31:39 -0700 Subject: [PATCH 19/22] add trove classifier --- setup.py | 1 + 1 file changed, 1 insertion(+) diff --git a/setup.py b/setup.py index f46e453..73b2ef8 100644 --- a/setup.py +++ b/setup.py @@ -125,5 +125,6 @@ def finalize_options(self): "Programming Language :: Python :: 3.12", "Programming Language :: Python :: 3.13", "Programming Language :: Python :: 3.14", + "Programming Language :: Python :: Free Threading :: 2 - Beta", ] ) From 40b590fd369d363ce78a6fe46a8cfd159e71f533 Mon Sep 17 00:00:00 2001 From: Nathan Goldbaum Date: Wed, 26 Nov 2025 15:39:38 -0700 Subject: [PATCH 20/22] small tweaks based on reviewing the PR diff --- src/brotlicffi/_api.py | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/src/brotlicffi/_api.py b/src/brotlicffi/_api.py index 607ceb7..9670d73 100644 --- a/src/brotlicffi/_api.py +++ b/src/brotlicffi/_api.py @@ -503,8 +503,8 @@ def _decompress(self, data, output_buffer_limit): # Check if we've reached the output limit. if ( - output_buffer_limit is not None - and chunks_len >= output_buffer_limit + output_buffer_limit is not None + and chunks_len >= output_buffer_limit ): break @@ -560,10 +560,11 @@ def is_finished(self): raise error( "Concurrently sharing Decompressor instances is not allowed") try: - return ( + ret = ( lib.BrotliDecoderIsFinished(self._decoder) == lib.BROTLI_TRUE) finally: self.lock.release() + return ret def can_accept_more_data(self): """ From e78fb2b1d81b54663e7b9c849a6de3aade72c93f Mon Sep 17 00:00:00 2001 From: Nathan Goldbaum Date: Wed, 26 Nov 2025 15:42:09 -0700 Subject: [PATCH 21/22] revert whitespace-only change --- src/brotlicffi/_api.py | 1 + 1 file changed, 1 insertion(+) diff --git a/src/brotlicffi/_api.py b/src/brotlicffi/_api.py index 9670d73..e6601f4 100644 --- a/src/brotlicffi/_api.py +++ b/src/brotlicffi/_api.py @@ -450,6 +450,7 @@ def _decompress(self, data, output_buffer_limit): # is to be processed. if output_buffer_limit is not None and output_buffer_limit <= 0: return b'' + # Use unconsumed data if available, use new data otherwise. if self._unconsumed_data: input_data = self._unconsumed_data From 7bd490bc53337e0080a12cf4184a28351bedfeaa Mon Sep 17 00:00:00 2001 From: Nathan Goldbaum Date: Wed, 26 Nov 2025 15:47:07 -0700 Subject: [PATCH 22/22] adjust timing again --- test/test_multithreaded_sharing.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/test/test_multithreaded_sharing.py b/test/test_multithreaded_sharing.py index aa05151..a355c5c 100644 --- a/test/test_multithreaded_sharing.py +++ b/test/test_multithreaded_sharing.py @@ -53,7 +53,7 @@ def _thread_compress(original, compressor, barrier, results): def _thread_concurrent_process_compress(compressor, barrier, results): barrier.wait() - time.sleep(.001) + time.sleep(.01) try: _ = compressor.process(b"whatever") except brotlicffi.error: @@ -62,7 +62,7 @@ def _thread_concurrent_process_compress(compressor, barrier, results): def _thread_concurrent_flush_compress(compressor, barrier, results): barrier.wait() - time.sleep(.001) + time.sleep(.01) try: _ = compressor.flush() except brotlicffi.error: @@ -71,7 +71,7 @@ def _thread_concurrent_flush_compress(compressor, barrier, results): def _thread_concurrent_finish_compress(compressor, barrier, results): barrier.wait() - time.sleep(.001) + time.sleep(.01) try: _ = compressor.finish() except brotlicffi.error: