-
Notifications
You must be signed in to change notification settings - Fork 308
Description
Describe the bug
Our FastStream NATS implementation has seen a lot of error, since the start of using it:
Error in NATS: nats: 'Authentication Timeout'
Error in NATS: nats: connection closed
Error in NATS: Connection lost
Error setting key value: nats: no response from stream
Error in NATS: [Errno 104] Connection reset by peer
How to reproduce
Our NATSService, used by our regular FastAPI app:
import asyncio
from enum import Enum
import logging
from opentelemetry import trace
from typing import List, TypeVar
from common.schemas.user import GetMeUserResponse
from faststream.nats import NatsBroker, StorageType, RetentionPolicy, JStream
from nats.js.api import KeyValueConfig
from nats.js.errors import KeyNotFoundError
from pydantic import AwareDatetime, RootModel
import sentry_sdk
from common.config import get_sabo_event_categories
from common.schemas.event import GetEventResponse
from common.schemas.feed import GetFeedItemResponse
from async_lru import alru_cache
T = TypeVar("T")
MOMO_WORKER_DEFAULT = JStream(
name="momo-worker-default",
retention=RetentionPolicy.WORK_QUEUE,
max_age=60*60*1*3, # 3 hours
declare=True,
storage=StorageType.MEMORY,
allow_direct=False,
)
class NATSBuckets(Enum):
MOMO_ORGANISATION_EVENTS = "momo-organisation-events"
MOMO_ORGANISATION_FEEDS = "momo-organisation-feeds"
MOMO_ORGANISATION_MEMBERS = "momo-organisation-members"
class NATSService:
def __init__(self, broker: NatsBroker):
self.broker = broker
async def init(self):
"""
create KV stores for each bucket
"""
try:
tasks = []
for bucket in NATSBuckets:
tasks.append(asyncio.create_task(self.broker.stream.create_key_value(KeyValueConfig(
bucket=bucket.value,
ttl=60*60*24*7, # 1 week
storage=StorageType.MEMORY,
))))
await asyncio.gather(*tasks)
except Exception as e:
logging.error(f"Error creating key value stores: {e}")
@alru_cache(maxsize=16)
async def _key_value(self, bucket):
return await self.broker.stream.key_value(bucket)
async def get_key_value(self, bucket: str, key: str):
with trace.get_tracer_provider().get_tracer("nats").start_as_current_span(f"get key value") as span:
span.set_attribute("bucket", bucket)
span.set_attribute("key", key)
try:
result = await (await self._key_value(bucket)).get(key)
return result
except KeyNotFoundError as e:
return None
except Exception as e:
logging.error(f"Error getting key value: {e}")
sentry_sdk.capture_exception(e)
return None
async def set_key_value(self, bucket: str, key: str, value: bytes):
with trace.get_tracer_provider().get_tracer("nats").start_as_current_span(f"set key value") as span:
span.set_attribute("bucket", bucket)
span.set_attribute("key", key)
try:
await (await self._key_value(bucket)).put(key, value)
except Exception as e:
logging.error(f"Error setting key value: {e}")
sentry_sdk.capture_exception(e)And here is is our Dishka setup for the NATS Worker service which will handle NATS messages, the broker is used by the NATSService and the router by the FastAPI FastStream implementation:
class NATSProvider(Provider):
def __init__(self, broker: NatsBroker):
super().__init__(scope=Scope.APP)
self.broker = broker
@provide(scope=Scope.APP)
async def nats_service(self) -> NATSService:
client = await self.broker.connect()
nats_service = NATSService(self.broker)
await nats_service.init()
return nats_service
def _make_nats_broker() -> NatsBroker:
async def error_cb(e):
logging.error(f"Error in NATS: {e}")
sentry_sdk.capture_exception(e)
tracer_provider = trace.get_tracer_provider()
_broker = NatsBroker(
get_config().NATS_SERVER_URL,
user_credentials="./nats-production-momo-api.creds",
error_cb=error_cb,
allow_reconnect=True,
middlewares=(
NatsTelemetryMiddleware(
tracer_provider=tracer_provider,
),
)
)
_router = NatsRouter(
get_config().NATS_SERVER_URL,
user_credentials="./nats-production-momo-api.creds",
error_cb=error_cb,
middlewares=(
NatsTelemetryMiddleware(
tracer_provider=tracer_provider,
),
)
)
return _broker, _router
def make_container() -> Tuple[AsyncContainer, NatsBroker, NatsRouter]:
nats_broker, nats_router = _make_nats_broker()
container = make_async_container(
DBProvider(),
FastStreamProvider(),
NATSProvider(broker=nats_broker),
ServiceProvider(),
)
return container, nats_broker, nats_routerAnd lastly the Worker App:
import asyncio
import random
from typing import Annotated
from fastapi import FastAPI
from fastapi.responses import ORJSONResponse
from faststream.nats import DeliverPolicy, PullSub
from dishka.integrations.faststream import inject, FromDishka
from api.auth import firebase_initialization
from common.config import get_config
from common.exceptions.handlers import init_exception_handlers
from common.middleware import init_middleware
from common.observability import init_observability
from common.services.nats_service import MOMO_WORKER_DEFAULT
from common.services.user_service import UserService
from di import make_container
from dishka.integrations.faststream import setup_dishka as setup_faststream_ioc
from dishka.integrations.fastapi import setup_dishka as setup_fastapi_ioc
from common.schemas.worker import UserEmailSummarySendPayload
#
# DI container
#
container, _, nats_router = make_container()
#
# FastStream setup
#
setup_faststream_ioc(
container,
nats_router,
finalize_container=False,
)
#
# FastAPI setup
#
_fastapi_app = FastAPI(
title="momo-worker",
lifespan=nats_router.lifespan_context,
docs_url=get_config().swagger_url(),
redoc_url=get_config().redoc_url(),
debug=get_config().DEBUG,
default_response_class=ORJSONResponse,
)
setup_fastapi_ioc(container, _fastapi_app)
init_observability("momo-worker", _fastapi_app)
init_exception_handlers(_fastapi_app)
init_middleware(_fastapi_app)
firebase_initialization()
# NOTE: subject: ENTITY.DOMAIN.ACTION
# e.g. user.email.summary.send
# e.g. organisation.feed.reset
# e.g. organisation.event.created
# e.g. organisation.event.deleted
# e.g. organisation.event.started
# e.g. organisation.event.ended
# e.g. organisation.member.added
#
# Handlers
#
@nats_router.subscriber(
"scheduled.bimonthly.user.email.summary.send",
durable="bimonthly-user-email-summary-send",
stream=MOMO_WORKER_DEFAULT,
max_workers=10,
deliver_policy=DeliverPolicy.ALL,
pull_sub=PullSub(batch_size=10),
)
@inject
async def handler(user_service: Annotated[UserService, FromDishka()]):
users = await user_service.get_all_users()
tasks = []
for user in users:
tasks.append(asyncio.create_task(nats_router.broker.publish(UserEmailSummarySendPayload(user_id=user.uid), "user.email.summary.send")))
await asyncio.gather(*tasks)
@nats_router.subscriber(
"user.email.summary.send",
durable="user-email-summary-send",
stream=MOMO_WORKER_DEFAULT,
max_workers=10,
deliver_policy=DeliverPolicy.ALL,
pull_sub=PullSub(batch_size=10)
)
@inject
async def handler(msg: UserEmailSummarySendPayload, user_service: Annotated[UserService, FromDishka()]):
print(f"Sending email to {msg.user_id}")
try:
user = await user_service.get_user(msg.user_id)
except Exception as e:
pass
await asyncio.sleep(random.randint(1, 5))
return msg
#
# Add routers
#
_fastapi_app.include_router(nats_router)
app = _fastapi_appExpected behavior
We expect our NATSService and FastStream app to have constant connection with the NATS server (hosted on Scaleway), without the errors, and to automatically reconnect which is default behaviour.
Observed behavior
We observe connection errors, so does this suggest that the broker is not reconnecting?:
Error in NATS: nats: 'Authentication Timeout'
Error in NATS: nats: connection closed
Error in NATS: Connection lost
Error setting key value: nats: no response from stream
Error in NATS: [Errno 104] Connection reset by peer
Environment
Running FastStream 0.5.13 with CPython 3.11.6 on Darwin
Additional context
We have contacted Scaleway support.
They stated that resource limits are:
The limits are :
300Mb totals in stream/Kv
50 streams/Kv
50 consumers per stream
Which is more than enough, and we are not exceeding this.
They also stated that we must be sure to handle automatic reconnection. But to our understanding this is the default behaviour.
so are we doing something totally wrong with our implementation?
We are not always getting these errors. They seem to appear irregular.