diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index c3e26a2..1897261 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -22,7 +22,16 @@ jobs: fail-fast: false matrix: os: [ubuntu-latest, macos-latest, windows-latest] - python-version: ["3.8", "3.9", "3.10", "3.11", "3.12", "3.13", "3.14"] + python-version: [ + ["3.8", "py38"], + ["3.9", "py39"], + ["3.10", "py310"], + ["3.11", "py311"], + ["3.12", "py312"], + ["3.13", "py313"], + ["3.14", "py314"], + ["3.14t", "py314t"], + ] steps: - name: Set git to use LF on Windows if: runner.os == 'Windows' @@ -34,12 +43,12 @@ jobs: submodules: recursive - uses: actions/setup-python@v6 with: - python-version: ${{ matrix.python-version }} + python-version: ${{ matrix.python-version[0] }} allow-prereleases: true - name: Run tests run: | python -m pip install tox - tox --skip-missing-interpreters + tox -e ${{ matrix.python-version[1] }} package-sdist: runs-on: ubuntu-latest @@ -80,21 +89,21 @@ jobs: # run: choco install vcpython27 -f -y - name: Install QEMU if: runner.os == 'Linux' - uses: docker/setup-qemu-action@v1 + uses: docker/setup-qemu-action@v3 with: platforms: all - name: Build wheels run: python -m cibuildwheel --output-dir wheelhouse env: - CIBW_BUILD: cp38-* pp*-* + CIBW_BUILD: "cp38-* pp*-* cp314t-*" CIBW_SKIP: "*musllinux*" CIBW_ENABLE: pypy CIBW_ARCHS_LINUX: auto aarch64 CIBW_BEFORE_BUILD_LINUX: yum install -y libffi-devel - uses: actions/upload-artifact@v4 with: - name: wheels-${{ matrix.os }} + name: wheels-${{ matrix.os }}-${{ matrix.wheel-tag }} path: ./wheelhouse/*.whl publish: @@ -108,15 +117,15 @@ jobs: path: dist/ - uses: actions/download-artifact@v6 with: - name: wheels-windows-latest + pattern: wheels-windows-latest-* path: dist/ - uses: actions/download-artifact@v6 with: - name: wheels-macos-latest + pattern: wheels-macos-latest-* path: dist/ - uses: actions/download-artifact@v6 with: - name: wheels-ubuntu-latest + pattern: wheels-ubuntu-latest-* path: dist/ - name: Publish to PyPI if: github.event_name == 'push' && startsWith(github.event.ref, 'refs/tags/') diff --git a/setup.py b/setup.py index e7176fe..73b2ef8 100644 --- a/setup.py +++ b/setup.py @@ -3,6 +3,7 @@ import re import platform import sys +import sysconfig from setuptools import find_packages, setup from setuptools.command.build_ext import build_ext @@ -74,11 +75,13 @@ def run(self): except ImportError: pass else: - class BDistWheel(wheel.bdist_wheel.bdist_wheel): - def finalize_options(self): - self.py_limited_api = "cp3{}".format(sys.version_info[1]) - wheel.bdist_wheel.bdist_wheel.finalize_options(self) - cmdclass['bdist_wheel'] = BDistWheel + # the limited API is only supported on GIL builds as of Python 3.14 + if not bool(sysconfig.get_config_var("Py_GIL_DISABLED")): + class BDistWheel(wheel.bdist_wheel.bdist_wheel): + def finalize_options(self): + self.py_limited_api = "cp3{}".format(sys.version_info[1]) + wheel.bdist_wheel.bdist_wheel.finalize_options(self) + cmdclass['bdist_wheel'] = BDistWheel setup( name="brotlicffi", @@ -122,5 +125,6 @@ def finalize_options(self): "Programming Language :: Python :: 3.12", "Programming Language :: Python :: 3.13", "Programming Language :: Python :: 3.14", + "Programming Language :: Python :: Free Threading :: 2 - Beta", ] ) diff --git a/src/brotlicffi/_api.py b/src/brotlicffi/_api.py index b7e34be..e6601f4 100644 --- a/src/brotlicffi/_api.py +++ b/src/brotlicffi/_api.py @@ -1,6 +1,7 @@ # -*- coding: utf-8 -*- import math import enum +import threading from ._brotlicffi import ffi, lib @@ -249,6 +250,7 @@ def __init__(self, quality=lib.BROTLI_DEFAULT_QUALITY, lgwin=lib.BROTLI_DEFAULT_WINDOW, lgblock=0): + self.lock = threading.RLock() enc = lib.BrotliEncoderCreateInstance( ffi.NULL, ffi.NULL, ffi.NULL ) @@ -271,28 +273,34 @@ def _compress(self, data, operation): because almost all of the code uses the exact same setup. It wouldn't have to, but it doesn't hurt at all. """ - # The 'algorithm' for working out how big to make this buffer is from - # the Brotli source code, brotlimodule.cc. - original_output_size = int( - math.ceil(len(data) + (len(data) >> 2) + 10240) - ) - available_out = ffi.new("size_t *") - available_out[0] = original_output_size - output_buffer = ffi.new("uint8_t []", available_out[0]) - ptr_to_output_buffer = ffi.new("uint8_t **", output_buffer) - input_size = ffi.new("size_t *", len(data)) - input_buffer = ffi.new("uint8_t []", data) - ptr_to_input_buffer = ffi.new("uint8_t **", input_buffer) - - rc = lib.BrotliEncoderCompressStream( - self._encoder, - operation, - input_size, - ptr_to_input_buffer, - available_out, - ptr_to_output_buffer, - ffi.NULL - ) + if not self.lock.acquire(blocking=False): + raise error( + "Concurrently sharing Compressor objects is not allowed") + try: + # The 'algorithm' for working out how big to make this buffer is + # from the Brotli source code, brotlimodule.cc. + original_output_size = int( + math.ceil(len(data) + (len(data) >> 2) + 10240) + ) + available_out = ffi.new("size_t *") + available_out[0] = original_output_size + output_buffer = ffi.new("uint8_t []", available_out[0]) + ptr_to_output_buffer = ffi.new("uint8_t **", output_buffer) + input_size = ffi.new("size_t *", len(data)) + input_buffer = ffi.new("uint8_t []", data) + ptr_to_input_buffer = ffi.new("uint8_t **", input_buffer) + + rc = lib.BrotliEncoderCompressStream( + self._encoder, + operation, + input_size, + ptr_to_input_buffer, + available_out, + ptr_to_output_buffer, + ffi.NULL + ) + finally: + self.lock.release() if rc != lib.BROTLI_TRUE: # pragma: no cover raise error("Error encountered compressing data.") @@ -320,11 +328,17 @@ def flush(self): will not destroy the compressor. It can be used, for example, to ensure that given chunks of content will decompress immediately. """ - chunks = [self._compress(b'', lib.BROTLI_OPERATION_FLUSH)] - - while lib.BrotliEncoderHasMoreOutput(self._encoder) == lib.BROTLI_TRUE: - chunks.append(self._compress(b'', lib.BROTLI_OPERATION_FLUSH)) - + if not self.lock.acquire(blocking=False): + raise error( + "Concurrently sharing Compressor objects is not allowed") + try: + chunks = [self._compress(b'', lib.BROTLI_OPERATION_FLUSH)] + + while ((lib.BrotliEncoderHasMoreOutput(self._encoder) == + lib.BROTLI_TRUE)): + chunks.append(self._compress(b'', lib.BROTLI_OPERATION_FLUSH)) + finally: + self.lock.release() return b''.join(chunks) def finish(self): @@ -333,10 +347,16 @@ def finish(self): transition the compressor to a completed state. The compressor cannot be used again after this point, and must be replaced. """ - chunks = [] - while lib.BrotliEncoderIsFinished(self._encoder) == lib.BROTLI_FALSE: - chunks.append(self._compress(b'', lib.BROTLI_OPERATION_FINISH)) - + if not self.lock.acquire(blocking=False): + raise error( + "Concurrently sharing Compressor objects is not allowed") + try: + chunks = [] + while ((lib.BrotliEncoderIsFinished(self._encoder) == + lib.BROTLI_FALSE)): + chunks.append(self._compress(b'', lib.BROTLI_OPERATION_FINISH)) + finally: + self.lock.release() return b''.join(chunks) @@ -362,6 +382,7 @@ class Decompressor(object): _unconsumed_data = None def __init__(self, dictionary=b''): + self.lock = threading.Lock() dec = lib.BrotliDecoderCreateInstance(ffi.NULL, ffi.NULL, ffi.NULL) self._decoder = ffi.gc(dec, lib.BrotliDecoderDestroyInstance) self._unconsumed_data = b'' @@ -409,6 +430,16 @@ def decompress(self, data, output_buffer_limit=None): :type output_buffer_limit: ``int`` or ``None`` :returns: A bytestring containing the decompressed data. """ + if not self.lock.acquire(blocking=False): + raise error( + "Concurrently sharing Decompressor instances is not allowed") + try: + chunks = self._decompress(data, output_buffer_limit) + finally: + self.lock.release() + return b''.join(chunks) + + def _decompress(self, data, output_buffer_limit): if self._unconsumed_data and data: raise error( "brotli: decoder process called with data when " @@ -486,8 +517,7 @@ def decompress(self, data, output_buffer_limit=None): else: # It's cool if we need more output, we just loop again. assert rc == lib.BROTLI_DECODER_RESULT_NEEDS_MORE_OUTPUT - - return b''.join(chunks) + return chunks process = decompress @@ -527,7 +557,15 @@ def is_finished(self): Returns ``True`` if the decompression stream is complete, ``False`` otherwise """ - return lib.BrotliDecoderIsFinished(self._decoder) == lib.BROTLI_TRUE + if not self.lock.acquire(blocking=False): + raise error( + "Concurrently sharing Decompressor instances is not allowed") + try: + ret = ( + lib.BrotliDecoderIsFinished(self._decoder) == lib.BROTLI_TRUE) + finally: + self.lock.release() + return ret def can_accept_more_data(self): """ @@ -550,8 +588,16 @@ def can_accept_more_data(self): more compressed data. :rtype: ``bool`` """ - if len(self._unconsumed_data) > 0: - return False - if lib.BrotliDecoderHasMoreOutput(self._decoder) == lib.BROTLI_TRUE: - return False - return True + if not self.lock.acquire(blocking=False): + raise error( + "Concurrently sharing Decompressor instances is not allowed") + try: + ret = True + if len(self._unconsumed_data) > 0: + ret = False + if ((lib.BrotliDecoderHasMoreOutput(self._decoder) == + lib.BROTLI_TRUE)): + ret = False + finally: + self.lock.release() + return ret diff --git a/test/test_multithreaded_sharing.py b/test/test_multithreaded_sharing.py new file mode 100644 index 0000000..a355c5c --- /dev/null +++ b/test/test_multithreaded_sharing.py @@ -0,0 +1,197 @@ +""" Test multithreaded sharing of Compressor and Decompressor instances + +Originally written by Eugene Kliuchnikov at Google for the brotli +Python bindings under an MIT license. + +""" +import queue +import random +import threading +import time + +import brotlicffi + + +def make_compress_input(size): + abc = [bytes([b]) for b in b"abcdefghijklmnopqrstuvwxyz"] + abc_cap = [bytes([b]) for b in b"ABCDEFGHIJKLMNOPQRSTUVWXYZ"] + num_words_by_len = [0, 25, 100, 175, 1700, 1000, 1000, 1000] + word_set = set() + rng = random.Random() + rng.seed(0x4d3d3d3) + words_by_len = [[]] + for word_len in range(1, len(num_words_by_len)): + num_words = num_words_by_len[word_len] + words = [] + for _ in range(num_words): + while True: + word = b"".join( + [rng.choice(abc_cap)] + + [rng.choice(abc) for _ in range(word_len - 1)] + ) + if word not in word_set: + word_set.add(word) + words.append(word) + break + words_by_len.append(words) + total_size = 0 + out = [] + while total_size < size: + word_len = rng.choice(range(1, len(num_words_by_len))) + word = rng.choice(words_by_len[word_len]) + total_size += len(word) + out.append(word) + return b"".join(out) + + +def _thread_compress(original, compressor, barrier, results): + barrier.wait() + compressed = compressor.process(original) + compressed += compressor.finish() + results.put(1) + + +def _thread_concurrent_process_compress(compressor, barrier, results): + barrier.wait() + time.sleep(.01) + try: + _ = compressor.process(b"whatever") + except brotlicffi.error: + results.put(2) + + +def _thread_concurrent_flush_compress(compressor, barrier, results): + barrier.wait() + time.sleep(.01) + try: + _ = compressor.flush() + except brotlicffi.error: + results.put(3) + + +def _thread_concurrent_finish_compress(compressor, barrier, results): + barrier.wait() + time.sleep(.01) + try: + _ = compressor.finish() + except brotlicffi.error: + results.put(4) + + +def test_compress_concurrency(): + original = make_compress_input(2 * 1024 * 1024) + compressor = brotlicffi.Compressor(quality=9) + results = queue.Queue() + barrier = threading.Barrier(4) + threads = [] + threads.append( + threading.Thread( + target=_thread_compress, + args=(original, compressor, barrier, results) + ) + ) + threads.append( + threading.Thread( + target=_thread_concurrent_process_compress, + args=(compressor, barrier, results) + ) + ) + threads.append( + threading.Thread( + target=_thread_concurrent_flush_compress, + args=(compressor, barrier, results) + ) + ) + threads.append( + threading.Thread( + target=_thread_concurrent_finish_compress, + args=(compressor, barrier, results) + ) + ) + for thread in threads: + thread.start() + for thread in threads: + thread.join() + assert sorted(list(results.queue)) == [1, 2, 3, 4] + + +def make_decompress_input(size): + compressor = brotlicffi.Compressor(quality=1) + prologue = compressor.process(b'b') + prologue += compressor.flush() + filler = compressor.process(b'c') + filler += compressor.flush() + epilogue = compressor.finish() + return b''.join( + [prologue] + [filler] * (size // len(filler)) + [epilogue]) + + +def _thread_decompress(compressed, decompressor, barrier, results): + barrier.wait() + _ = decompressor.process(compressed) + if decompressor.is_finished(): + results.put(1) + + +def _thread_concurrent_process(decompressor, barrier, results): + barrier.wait() + time.sleep(.001) + try: + _ = decompressor.process(b'') + except brotlicffi.error: + results.put(2) + + +def _thread_concurrent_can_accept_more_data(decompressor, barrier, results): + barrier.wait() + time.sleep(.001) + try: + _ = decompressor.can_accept_more_data() + except brotlicffi.error: + results.put(3) + + +def _thread_concurrent_is_finished(decompressor, barrier, results): + barrier.wait() + time.sleep(.001) + try: + _ = decompressor.is_finished() + except brotlicffi.error: + results.put(4) + + +def test_decompressor_concurrency(): + compressed = make_decompress_input(16 * 1024 * 1024) + decompressor = brotlicffi.Decompressor() + results = queue.Queue() + barrier = threading.Barrier(4) + threads = [] + threads.append( + threading.Thread( + target=_thread_decompress, + args=(compressed, decompressor, barrier, results) + ) + ) + threads.append( + threading.Thread( + target=_thread_concurrent_process, + args=(decompressor, barrier, results) + ) + ) + threads.append( + threading.Thread( + target=_thread_concurrent_can_accept_more_data, + args=(decompressor, barrier, results), + ) + ) + threads.append( + threading.Thread( + target=_thread_concurrent_is_finished, + args=(decompressor, barrier, results) + ) + ) + for thread in threads: + thread.start() + for thread in threads: + thread.join() + assert sorted(list(results.queue)) == [1, 2, 3, 4]