Skip to main content

Module 22 - Async Programming

Asynchronous programming allows Python to handle multiple operations concurrently without blocking, making it ideal for I/O-bound tasks like network requests, file operations, and database queries.


1. Understanding Async Programming

Synchronous vs Asynchronous

Synchronous (Blocking):

import time

def task1():
print("Task 1 starting...")
time.sleep(2)
print("Task 1 done!")

def task2():
print("Task 2 starting...")
time.sleep(2)
print("Task 2 done!")

# Total time: 4 seconds
task1()
task2()

Asynchronous (Non-blocking):

import asyncio

async def task1():
print("Task 1 starting...")
await asyncio.sleep(2)
print("Task 1 done!")

async def task2():
print("Task 2 starting...")
await asyncio.sleep(2)
print("Task 2 done!")

# Total time: ~2 seconds (runs concurrently)
async def main():
await asyncio.gather(task1(), task2())

asyncio.run(main())

2. Core Concepts

2.1 Coroutines

Coroutines are functions defined with async def.

import asyncio

# Define a coroutine
async def greet(name):
print(f"Hello, {name}!")
await asyncio.sleep(1)
print(f"Goodbye, {name}!")

# Run the coroutine
asyncio.run(greet("Alice"))

2.2 The await Keyword

await pauses execution until an awaitable completes.

import asyncio

async def fetch_data():
print("Fetching data...")
await asyncio.sleep(2) # Simulate I/O operation
print("Data fetched!")
return {"data": "result"}

async def main():
result = await fetch_data()
print(result)

asyncio.run(main())

2.3 asyncio.run()

Entry point for async programs.

import asyncio

async def main():
print("Hello")
await asyncio.sleep(1)
print("World")

# Run the main coroutine
asyncio.run(main())

3. Running Multiple Coroutines

3.1 asyncio.gather()

Run multiple coroutines concurrently and wait for all to complete.

import asyncio

async def task(name, delay):
print(f"{name} starting")
await asyncio.sleep(delay)
print(f"{name} done after {delay}s")
return f"{name} result"

async def main():
results = await asyncio.gather(
task("Task 1", 2),
task("Task 2", 1),
task("Task 3", 3)
)
print(f"Results: {results}")

asyncio.run(main())

# Output:
# Task 1 starting
# Task 2 starting
# Task 3 starting
# Task 2 done after 1s
# Task 1 done after 2s
# Task 3 done after 3s
# Results: ['Task 1 result', 'Task 2 result', 'Task 3 result']

3.2 asyncio.create_task()

Create tasks to run coroutines concurrently.

import asyncio

async def task(name, delay):
await asyncio.sleep(delay)
print(f"{name} done")
return name

async def main():
# Create tasks
task1 = asyncio.create_task(task("Task 1", 2))
task2 = asyncio.create_task(task("Task 2", 1))
task3 = asyncio.create_task(task("Task 3", 3))

# Wait for all tasks
result1 = await task1
result2 = await task2
result3 = await task3

print(f"Results: {result1}, {result2}, {result3}")

asyncio.run(main())

3.3 asyncio.wait()

Wait for multiple tasks with more control.

import asyncio

async def task(name, delay):
await asyncio.sleep(delay)
return f"{name} done"

async def main():
tasks = [
asyncio.create_task(task("Task 1", 2)),
asyncio.create_task(task("Task 2", 1)),
asyncio.create_task(task("Task 3", 3))
]

# Wait for all to complete
done, pending = await asyncio.wait(tasks)

for task in done:
print(task.result())

asyncio.run(main())

4. Async with HTTP Requests

Using aiohttp

pip install aiohttp
import asyncio
import aiohttp

async def fetch_url(session, url):
"""Fetch a single URL"""
async with session.get(url) as response:
return await response.text()

async def fetch_multiple_urls(urls):
"""Fetch multiple URLs concurrently"""
async with aiohttp.ClientSession() as session:
tasks = [fetch_url(session, url) for url in urls]
results = await asyncio.gather(*tasks)
return results

async def main():
urls = [
"https://httpbin.org/delay/1",
"https://httpbin.org/delay/2",
"https://httpbin.org/delay/1"
]

print("Fetching URLs...")
results = await fetch_multiple_urls(urls)
print(f"Fetched {len(results)} URLs")

asyncio.run(main())
# Total time: ~2 seconds (not 1+2+1=4 seconds!)

Real-World Example: API Scraper

import asyncio
import aiohttp
from typing import List, Dict

async def fetch_user(session: aiohttp.ClientSession, user_id: int) -> Dict:
"""Fetch user data from API"""
url = f"https://jsonplaceholder.typicode.com/users/{user_id}"
async with session.get(url) as response:
return await response.json()

async def fetch_all_users(user_ids: List[int]) -> List[Dict]:
"""Fetch multiple users concurrently"""
async with aiohttp.ClientSession() as session:
tasks = [fetch_user(session, uid) for uid in user_ids]
return await asyncio.gather(*tasks)

async def main():
user_ids = [1, 2, 3, 4, 5]
users = await fetch_all_users(user_ids)

for user in users:
print(f"User: {user['name']} ({user['email']})")

asyncio.run(main())

5. Async Context Managers

Use async with for async context managers.

import asyncio
import aiofiles # pip install aiofiles

async def read_file(filename):
"""Read file asynchronously"""
async with aiofiles.open(filename, 'r') as file:
content = await file.read()
return content

async def write_file(filename, content):
"""Write file asynchronously"""
async with aiofiles.open(filename, 'w') as file:
await file.write(content)

async def main():
await write_file('test.txt', 'Hello, Async World!')
content = await read_file('test.txt')
print(content)

asyncio.run(main())

6. Async Iterators

Using async for

import asyncio

class AsyncRange:
def __init__(self, start, end):
self.start = start
self.end = end

def __aiter__(self):
return self

async def __anext__(self):
if self.start >= self.end:
raise StopAsyncIteration
await asyncio.sleep(0.1) # Simulate async operation
self.start += 1
return self.start - 1

async def main():
async for i in AsyncRange(0, 5):
print(i)

asyncio.run(main())

Async Generators

import asyncio

async def async_generator(n):
"""Generate numbers asynchronously"""
for i in range(n):
await asyncio.sleep(0.5)
yield i

async def main():
async for value in async_generator(5):
print(f"Received: {value}")

asyncio.run(main())

7. Error Handling

Handling Exceptions

import asyncio

async def risky_task(name, should_fail=False):
await asyncio.sleep(1)
if should_fail:
raise ValueError(f"{name} failed!")
return f"{name} succeeded"

async def main():
try:
results = await asyncio.gather(
risky_task("Task 1", False),
risky_task("Task 2", True),
risky_task("Task 3", False),
return_exceptions=True # Return exceptions instead of raising
)

for i, result in enumerate(results, 1):
if isinstance(result, Exception):
print(f"Task {i} error: {result}")
else:
print(f"Task {i}: {result}")

except Exception as e:
print(f"Error: {e}")

asyncio.run(main())

Timeout Control

import asyncio

async def long_task():
await asyncio.sleep(5)
return "Done"

async def main():
try:
result = await asyncio.wait_for(long_task(), timeout=2.0)
print(result)
except asyncio.TimeoutError:
print("Task timed out!")

asyncio.run(main())

8. Practical Examples

8.1 Concurrent File Processing

import asyncio
import aiofiles
from pathlib import Path

async def process_file(filepath):
"""Process a single file"""
async with aiofiles.open(filepath, 'r') as file:
content = await file.read()
word_count = len(content.split())
return filepath.name, word_count

async def process_directory(directory):
"""Process all text files in directory"""
files = Path(directory).glob('*.txt')
tasks = [process_file(f) for f in files]
results = await asyncio.gather(*tasks)

print("File Analysis:")
for filename, count in results:
print(f" {filename}: {count} words")

# asyncio.run(process_directory('.'))

8.2 Rate-Limited API Calls

import asyncio
import aiohttp
from asyncio import Semaphore

async def fetch_with_limit(session, url, semaphore):
"""Fetch URL with concurrency limit"""
async with semaphore:
print(f"Fetching {url}")
async with session.get(url) as response:
return await response.text()

async def main():
urls = [f"https://httpbin.org/delay/1" for _ in range(10)]

# Limit to 3 concurrent requests
semaphore = Semaphore(3)

async with aiohttp.ClientSession() as session:
tasks = [fetch_with_limit(session, url, semaphore) for url in urls]
results = await asyncio.gather(*tasks)

print(f"Fetched {len(results)} URLs")

asyncio.run(main())

8.3 WebSocket Client

import asyncio
import websockets # pip install websockets

async def listen_to_websocket():
"""Connect to WebSocket and listen for messages"""
uri = "wss://echo.websocket.org"

async with websockets.connect(uri) as websocket:
# Send message
await websocket.send("Hello, Server!")

# Receive response
response = await websocket.recv()
print(f"Received: {response}")

asyncio.run(listen_to_websocket())

9. Async Queues

import asyncio
from asyncio import Queue

async def producer(queue, n):
"""Produce items"""
for i in range(n):
await asyncio.sleep(0.5)
await queue.put(i)
print(f"Produced: {i}")
await queue.put(None) # Signal completion

async def consumer(queue):
"""Consume items"""
while True:
item = await queue.get()
if item is None:
break
await asyncio.sleep(0.3)
print(f"Consumed: {item}")
queue.task_done()

async def main():
queue = Queue()

await asyncio.gather(
producer(queue, 5),
consumer(queue)
)

asyncio.run(main())

10. When to Use Async

Good Use Cases ✅

  • I/O-bound operations: Network requests, file operations, database queries
  • Web scraping: Fetching multiple URLs concurrently
  • API clients: Making multiple API calls
  • WebSocket connections: Real-time communication
  • Concurrent database queries

Not Suitable ❌

  • CPU-bound operations: Heavy calculations (use multiprocessing instead)
  • Simple scripts: Overhead not worth it
  • Synchronous libraries: When libraries don't support async

11. Comparison with Threading

FeatureAsyncioThreading
ConcurrencySingle-threadedMulti-threaded
OverheadLowHigher
CPU-bound
I/O-bound✅✅
ComplexityExplicit (await)Implicit (locks, race conditions)
Best forNetwork I/OBlocking I/O, CPU-bound

Summary

async/await enables non-blocking concurrent operations
asyncio.gather() runs multiple coroutines concurrently
✅ Perfect for I/O-bound tasks (network, files, databases)
✅ Use aiohttp for async HTTP requests
async with for async context managers
✅ Not suitable for CPU-intensive computations


Next Steps

In Module 23, you'll learn:

  • Working with REST APIs
  • The requests library
  • API authentication
  • Error handling and retries

Practice Exercises

  1. Create an async web scraper that fetches multiple URLs concurrently
  2. Build a rate-limited API client with concurrency control
  3. Implement an async file downloader with progress tracking
  4. Create an async queue-based task processor
  5. Build a WebSocket echo server using websockets
Challenge

Create an async web crawler that:

  • Starts from a seed URL
  • Extracts all links from pages
  • Crawls discovered pages (with depth limit)
  • Respects rate limiting (max concurrent requests)
  • Saves results to a database asynchronously
  • Handles errors gracefully