-
Notifications
You must be signed in to change notification settings - Fork 127
Open
Labels
Description
Versions
- nsqd v1.2.1 (built w/go1.16.7)
- Python 3.9.7
- pynsq 0.9.0
- tornado 6.1
Issue
Reader constantly loses connection with nsqd on large payloads when running on tornado 6 with enabled deflate option. This also happens to Writer but quite rarely. I believe there is a bug somewhere in DeflateSocket.
The issue isn't reproducible with tornado 5 or with disabled deflate.
Steps to reproduce
Running nsqd:
docker run --name lookupd -d -p 4160:4160 -p 4161:4161 nsqio/nsq /nsqlookupd
docker run --name nsqd -p 4150:4150 -p 4151:4151 nsqio/nsq /nsqd --broadcast-address=172.17.0.2 --l
ookupd-tcp-address=172.17.0.2:4160
Preparing virtual environment:
virtualenv -p python3.9 venv
. venv/bin/activate
pip install pynsq==0.9.0 tornado==6.1.0
Running the program (sometimes you should restart the script a few times to run into the issue):
python main.py
# Output:
Received 1 messages
ERROR:nsq.client:[127.0.0.1:4150:test:test] ERROR: ConnectionClosedError('Stream is closed')
WARNING:nsq.reader:[127.0.0.1:4150:test:test] connection closed
main.py:
from nsq import Reader, Writer
import tornado.ioloop
from tornado import gen
import json
# crafting some non-trivial payload
DATA = json.dumps({str(x): x for x in range(10000)}).encode()
print(f'DATA size: {len(DATA)}')
NSQD_HOST = '127.0.0.1'
NSQD_PORT = 4150
USE_DEFLATE = True
@gen.coroutine
def producer():
writer = Writer(
nsqd_tcp_addresses=[f'{NSQD_HOST}:{NSQD_PORT}'],
deflate=USE_DEFLATE,
)
# waiting for connection
while not writer.conns:
yield gen.sleep(.1)
# filling the queue
for _ in range(10):
writer.pub('test', DATA)
cnt = 0
while True:
writer.pub('test', DATA)
cnt += 1
print(f'Sent {cnt} messages')
yield gen.sleep(1)
@gen.coroutine
def consumer():
# waiting for the queue to fill
yield gen.sleep(1)
cnt = 0
def handler(msg):
msg.finish()
nonlocal cnt
cnt += 1
print(f'Received {cnt} messages')
_ = Reader(
topic='test',
channel='test',
message_handler=handler,
nsqd_tcp_addresses=[f'{NSQD_HOST}:{NSQD_PORT}'],
deflate=USE_DEFLATE,
)
while True:
yield gen.sleep(1)
if __name__ == '__main__':
loop = tornado.ioloop.IOLoop.current()
loop.add_callback(producer)
loop.add_callback(consumer)
try:
loop.start()
except KeyboardInterrupt:
pass