# Program to demonstrate using async-I/O and thread-pools to improve # throughput on blocking I/O calls. # # Sample output (4 workers, 50 messages): # 2021-06-17 15:42:56,433 sending msgs: starting # 2021-06-17 15:42:56,434 call to send msg 1/50 # 2021-06-17 15:42:56,434 sending msg: 0 # 2021-06-17 15:42:56,434 call to send msg 2/50 # 2021-06-17 15:42:56,434 sending msg: 1 # 2021-06-17 15:42:56,434 call to send msg 3/50 # 2021-06-17 15:42:56,434 sending msg: 2 # 2021-06-17 15:42:56,434 call to send msg 4/50 # 2021-06-17 15:42:56,435 sending msg: 3 # 2021-06-17 15:42:56,435 call to send msg 5/50 # 2021-06-17 15:42:56,435 call to send msg 6/50 # 2021-06-17 15:42:56,435 call to send msg 7/50 # 2021-06-17 15:42:56,435 call to send msg 8/50 # 2021-06-17 15:42:56,435 call to send msg 9/50 # 2021-06-17 15:42:56,435 call to send msg 10/50 # 2021-06-17 15:42:56,435 call to send msg 11/50 # 2021-06-17 15:42:56,435 call to send msg 12/50 # 2021-06-17 15:42:56,435 call to send msg 13/50 # 2021-06-17 15:42:56,435 call to send msg 14/50 # 2021-06-17 15:42:56,435 call to send msg 15/50 # 2021-06-17 15:42:56,435 call to send msg 16/50 # 2021-06-17 15:42:56,436 call to send msg 17/50 # 2021-06-17 15:42:56,436 call to send msg 18/50 # 2021-06-17 15:42:56,436 call to send msg 19/50 # 2021-06-17 15:42:56,436 call to send msg 20/50 # 2021-06-17 15:42:56,436 call to send msg 21/50 # 2021-06-17 15:42:56,436 call to send msg 22/50 # 2021-06-17 15:42:56,436 call to send msg 23/50 # 2021-06-17 15:42:56,436 call to send msg 24/50 # 2021-06-17 15:42:56,436 call to send msg 25/50 # 2021-06-17 15:42:56,436 call to send msg 26/50 # 2021-06-17 15:42:56,436 call to send msg 27/50 # 2021-06-17 15:42:56,436 call to send msg 28/50 # 2021-06-17 15:42:56,436 call to send msg 29/50 # 2021-06-17 15:42:56,436 call to send msg 30/50 # 2021-06-17 15:42:56,436 call to send msg 31/50 # 2021-06-17 15:42:56,436 call to send msg 32/50 # 2021-06-17 15:42:56,436 call to send msg 33/50 # 2021-06-17 15:42:56,436 call to send msg 34/50 # 2021-06-17 15:42:56,436 call to send msg 35/50 # 2021-06-17 15:42:56,437 call to send msg 36/50 # 2021-06-17 15:42:56,437 call to send msg 37/50 # 2021-06-17 15:42:56,437 call to send msg 38/50 # 2021-06-17 15:42:56,437 call to send msg 39/50 # 2021-06-17 15:42:56,437 call to send msg 40/50 # 2021-06-17 15:42:56,437 call to send msg 41/50 # 2021-06-17 15:42:56,437 call to send msg 42/50 # 2021-06-17 15:42:56,437 call to send msg 43/50 # 2021-06-17 15:42:56,437 call to send msg 44/50 # 2021-06-17 15:42:56,437 call to send msg 45/50 # 2021-06-17 15:42:56,437 call to send msg 46/50 # 2021-06-17 15:42:56,437 call to send msg 47/50 # 2021-06-17 15:42:56,437 call to send msg 48/50 # 2021-06-17 15:42:56,437 call to send msg 49/50 # 2021-06-17 15:42:56,437 call to send msg 50/50 # 2021-06-17 15:42:57,439 sending msg: 4 # 2021-06-17 15:42:57,439 sending msg: 5 # 2021-06-17 15:42:57,440 sending msg: 6 # 2021-06-17 15:42:57,440 sending msg: 7 # 2021-06-17 15:42:57,440 sending msg failed: send failed on msg: 1 # 2021-06-17 15:42:58,440 sending msg: 8 # 2021-06-17 15:42:58,440 sending msg: 9 # 2021-06-17 15:42:58,441 sending msg: 10 # 2021-06-17 15:42:58,441 sending msg: 11 # 2021-06-17 15:42:58,441 sending msg failed: send failed on msg: 7 # 2021-06-17 15:42:59,443 sending msg: 12 # 2021-06-17 15:42:59,443 sending msg: 13 # 2021-06-17 15:42:59,444 sending msg: 14 # 2021-06-17 15:42:59,444 sending msg: 15 # 2021-06-17 15:43:00,445 sending msg: 16 # 2021-06-17 15:43:00,445 sending msg: 17 # 2021-06-17 15:43:00,446 sending msg: 18 # 2021-06-17 15:43:00,446 sending msg: 19 # 2021-06-17 15:43:01,447 sending msg: 20 # 2021-06-17 15:43:01,447 sending msg: 21 # 2021-06-17 15:43:01,448 sending msg: 22 # 2021-06-17 15:43:01,448 sending msg: 23 # 2021-06-17 15:43:02,449 sending msg: 24 # 2021-06-17 15:43:02,452 sending msg: 25 # 2021-06-17 15:43:02,452 sending msg: 26 # 2021-06-17 15:43:02,453 sending msg: 27 # 2021-06-17 15:43:03,453 sending msg: 28 # 2021-06-17 15:43:03,456 sending msg: 29 # 2021-06-17 15:43:03,456 sending msg: 30 # 2021-06-17 15:43:03,457 sending msg: 31 # 2021-06-17 15:43:04,456 sending msg: 32 # 2021-06-17 15:43:04,458 sending msg: 33 # 2021-06-17 15:43:04,459 sending msg: 34 # 2021-06-17 15:43:04,459 sending msg: 35 # 2021-06-17 15:43:05,459 sending msg: 36 # 2021-06-17 15:43:05,459 sending msg: 37 # 2021-06-17 15:43:05,464 sending msg: 38 # 2021-06-17 15:43:05,464 sending msg: 39 # 2021-06-17 15:43:06,462 sending msg: 40 # 2021-06-17 15:43:06,462 sending msg: 41 # 2021-06-17 15:43:06,467 sending msg: 42 # 2021-06-17 15:43:06,469 sending msg: 43 # 2021-06-17 15:43:07,465 sending msg: 44 # 2021-06-17 15:43:07,465 sending msg: 45 # 2021-06-17 15:43:07,471 sending msg: 46 # 2021-06-17 15:43:07,471 sending msg: 47 # 2021-06-17 15:43:08,470 sending msg: 48 # 2021-06-17 15:43:08,470 sending msg: 49 # 2021-06-17 15:43:09,474 sending msgs: finished import logging import time import concurrent.futures import asyncio import functools import random num_msgs = 50 num_workers = 4 failure_pct = 10 def send_message(msg): logging.info('sending msg: %s', msg) # this simulates slowness time.sleep(1) if random.randint(0, 99) % failure_pct == msg: # this simulates failure raise IOError(f'send failed on msg: {msg}') async def post_messages(pool, event_loop, msgs): def after_send(fut): e = fut.exception() if e is not None: logging.warning('sending msg failed: %s', e) futures = [] for i, msg in enumerate(msgs): logging.info('call to send msg %d/%d', i+1, len(msgs)) # note task to run future = event_loop.run_in_executor(pool, functools.partial(send_message, msg)) future.add_done_callback(after_send) futures.append(future) # post all messages, our concurrency is only limited by # no. of available threads in the thread-pool await asyncio.wait(futures) def send_messages_quickly_method1(msgs): with concurrent.futures.ThreadPoolExecutor(max_workers=num_workers) as pool: # for any async-I/O, we always need an event-processing loop event_loop = asyncio.get_event_loop() logging.info('sending msgs: starting') event_loop.run_until_complete(post_messages(pool, event_loop, msgs)) logging.info('sending msgs: finished') if __name__ == '__main__': logging.basicConfig(format='%(asctime)s %(message)s', level=logging.INFO) random.seed(42) msgs = list(range(num_msgs)) send_messages_quickly_method1(msgs)