diff --git a/atomizer.py b/atomizer.py index 0991395..ecdcadf 100755 --- a/atomizer.py +++ b/atomizer.py @@ -54,6 +54,8 @@ def __init__(self, loop, target, threads=3, debug=False): self.sprayer = None self.threads = int(threads) self.debug = debug + # Maximum 3 concurrent tasks per thread. + self.sync = asyncio.BoundedSemaphore(self.threads * 3) log_format = '%(threadName)10s %(name)18s: %(message)s' if debug else '%(message)s' @@ -83,6 +85,10 @@ def imap(self, port): port=port ) + def _atomize(func, username, password, sync): + func(username, password) + sync.release() + async def atomize(self, userfile, password): log = logging.getLogger('atomize') log.debug('atomizing...') @@ -91,13 +97,15 @@ async def atomize(self, userfile, password): log.debug('creating executor tasks') logging.info(print_info(f"Starting spray at {get_utc_time()} UTC")) - blocking_tasks = [ - self.loop.run_in_executor(self.executor, partial(auth_function, username=username.strip(), password=password)) - for username in userfile - ] + + pending = [] + for username in userfile: + await self.sync.acquire() + task = self.loop.run_in_executor(self.executor, partial(Atomizer._atomize, auth_function, username.strip(), password, self.sync)) + pending.append(task) log.debug('waiting for executor tasks') - await asyncio.wait(blocking_tasks) + await asyncio.wait(pending) log.debug('exiting') async def atomize_csv(self, csvreader: csv.DictReader, user_row_name='Email Address', pass_row_name='Password'):