Skip to content

Client configured with deflate=true loses connection on Tornado 6 #258

@ayamnikovx

Description

@ayamnikovx

Versions

  • nsqd v1.2.1 (built w/go1.16.7)
  • Python 3.9.7
  • pynsq 0.9.0
  • tornado 6.1

Issue

Reader constantly loses connection with nsqd on large payloads when running on tornado 6 with enabled deflate option. This also happens to Writer but quite rarely. I believe there is a bug somewhere in DeflateSocket.
The issue isn't reproducible with tornado 5 or with disabled deflate.

Steps to reproduce

Running nsqd:

docker run --name lookupd -d -p 4160:4160 -p 4161:4161 nsqio/nsq /nsqlookupd
docker run --name nsqd -p 4150:4150 -p 4151:4151 nsqio/nsq /nsqd --broadcast-address=172.17.0.2 --l
ookupd-tcp-address=172.17.0.2:4160

Preparing virtual environment:

virtualenv -p python3.9 venv
. venv/bin/activate
pip install pynsq==0.9.0 tornado==6.1.0

Running the program (sometimes you should restart the script a few times to run into the issue):

python main.py
# Output:
Received 1 messages
ERROR:nsq.client:[127.0.0.1:4150:test:test] ERROR: ConnectionClosedError('Stream is closed')
WARNING:nsq.reader:[127.0.0.1:4150:test:test] connection closed

main.py:

from nsq import Reader, Writer
import tornado.ioloop
from tornado import gen
import json


# crafting some non-trivial payload
DATA = json.dumps({str(x): x for x in range(10000)}).encode()
print(f'DATA size: {len(DATA)}')
NSQD_HOST = '127.0.0.1'
NSQD_PORT = 4150
USE_DEFLATE = True


@gen.coroutine
def producer():
    writer = Writer(
        nsqd_tcp_addresses=[f'{NSQD_HOST}:{NSQD_PORT}'],
        deflate=USE_DEFLATE,
    )

    # waiting for connection
    while not writer.conns:
        yield gen.sleep(.1)

    # filling the queue
    for _ in range(10):
        writer.pub('test', DATA)

    cnt = 0
    while True:
        writer.pub('test', DATA)
        cnt += 1
        print(f'Sent {cnt} messages')
        yield gen.sleep(1)


@gen.coroutine
def consumer():
    # waiting for the queue to fill
    yield gen.sleep(1)

    cnt = 0

    def handler(msg):
        msg.finish()
        nonlocal cnt
        cnt += 1
        print(f'Received {cnt} messages')

    _ = Reader(
        topic='test',
        channel='test',
        message_handler=handler,
        nsqd_tcp_addresses=[f'{NSQD_HOST}:{NSQD_PORT}'],
        deflate=USE_DEFLATE,
    )

    while True:
        yield gen.sleep(1)


if __name__ == '__main__':
    loop = tornado.ioloop.IOLoop.current()
    loop.add_callback(producer)
    loop.add_callback(consumer)
    try:
        loop.start()
    except KeyboardInterrupt:
        pass

Metadata

Metadata

Assignees

No one assigned

    Labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions