Module 22 - Async Programming
Asynchronous programming allows Python to handle multiple operations concurrently without blocking, making it ideal for I/O-bound tasks like network requests, file operations, and database queries.
1. Understanding Async Programming
Synchronous vs Asynchronous
Synchronous (Blocking):
import time
def task1():
print("Task 1 starting...")
time.sleep(2)
print("Task 1 done!")
def task2():
print("Task 2 starting...")
time.sleep(2)
print("Task 2 done!")
# Total time: 4 seconds
task1()
task2()
Asynchronous (Non-blocking):
import asyncio
async def task1():
print("Task 1 starting...")
await asyncio.sleep(2)
print("Task 1 done!")
async def task2():
print("Task 2 starting...")
await asyncio.sleep(2)
print("Task 2 done!")
# Total time: ~2 seconds (runs concurrently)
async def main():
await asyncio.gather(task1(), task2())
asyncio.run(main())
2. Core Concepts
2.1 Coroutines
Coroutines are functions defined with async def.
import asyncio
# Define a coroutine
async def greet(name):
print(f"Hello, {name}!")
await asyncio.sleep(1)
print(f"Goodbye, {name}!")
# Run the coroutine
asyncio.run(greet("Alice"))
2.2 The await Keyword
await pauses execution until an awaitable completes.
import asyncio
async def fetch_data():
print("Fetching data...")
await asyncio.sleep(2) # Simulate I/O operation
print("Data fetched!")
return {"data": "result"}
async def main():
result = await fetch_data()
print(result)
asyncio.run(main())
2.3 asyncio.run()
Entry point for async programs.
import asyncio
async def main():
print("Hello")
await asyncio.sleep(1)
print("World")
# Run the main coroutine
asyncio.run(main())
3. Running Multiple Coroutines
3.1 asyncio.gather()
Run multiple coroutines concurrently and wait for all to complete.
import asyncio
async def task(name, delay):
print(f"{name} starting")
await asyncio.sleep(delay)
print(f"{name} done after {delay}s")
return f"{name} result"
async def main():
results = await asyncio.gather(
task("Task 1", 2),
task("Task 2", 1),
task("Task 3", 3)
)
print(f"Results: {results}")
asyncio.run(main())
# Output:
# Task 1 starting
# Task 2 starting
# Task 3 starting
# Task 2 done after 1s
# Task 1 done after 2s
# Task 3 done after 3s
# Results: ['Task 1 result', 'Task 2 result', 'Task 3 result']
3.2 asyncio.create_task()
Create tasks to run coroutines concurrently.
import asyncio
async def task(name, delay):
await asyncio.sleep(delay)
print(f"{name} done")
return name
async def main():
# Create tasks
task1 = asyncio.create_task(task("Task 1", 2))
task2 = asyncio.create_task(task("Task 2", 1))
task3 = asyncio.create_task(task("Task 3", 3))
# Wait for all tasks
result1 = await task1
result2 = await task2
result3 = await task3
print(f"Results: {result1}, {result2}, {result3}")
asyncio.run(main())
3.3 asyncio.wait()
Wait for multiple tasks with more control.
import asyncio
async def task(name, delay):
await asyncio.sleep(delay)
return f"{name} done"
async def main():
tasks = [
asyncio.create_task(task("Task 1", 2)),
asyncio.create_task(task("Task 2", 1)),
asyncio.create_task(task("Task 3", 3))
]
# Wait for all to complete
done, pending = await asyncio.wait(tasks)
for task in done:
print(task.result())
asyncio.run(main())
4. Async with HTTP Requests
Using aiohttp
pip install aiohttp
import asyncio
import aiohttp
async def fetch_url(session, url):
"""Fetch a single URL"""
async with session.get(url) as response:
return await response.text()
async def fetch_multiple_urls(urls):
"""Fetch multiple URLs concurrently"""
async with aiohttp.ClientSession() as session:
tasks = [fetch_url(session, url) for url in urls]
results = await asyncio.gather(*tasks)
return results
async def main():
urls = [
"https://httpbin.org/delay/1",
"https://httpbin.org/delay/2",
"https://httpbin.org/delay/1"
]
print("Fetching URLs...")
results = await fetch_multiple_urls(urls)
print(f"Fetched {len(results)} URLs")
asyncio.run(main())
# Total time: ~2 seconds (not 1+2+1=4 seconds!)
Real-World Example: API Scraper
import asyncio
import aiohttp
from typing import List, Dict
async def fetch_user(session: aiohttp.ClientSession, user_id: int) -> Dict:
"""Fetch user data from API"""
url = f"https://jsonplaceholder.typicode.com/users/{user_id}"
async with session.get(url) as response:
return await response.json()
async def fetch_all_users(user_ids: List[int]) -> List[Dict]:
"""Fetch multiple users concurrently"""
async with aiohttp.ClientSession() as session:
tasks = [fetch_user(session, uid) for uid in user_ids]
return await asyncio.gather(*tasks)
async def main():
user_ids = [1, 2, 3, 4, 5]
users = await fetch_all_users(user_ids)
for user in users:
print(f"User: {user['name']} ({user['email']})")
asyncio.run(main())
5. Async Context Managers
Use async with for async context managers.
import asyncio
import aiofiles # pip install aiofiles
async def read_file(filename):
"""Read file asynchronously"""
async with aiofiles.open(filename, 'r') as file:
content = await file.read()
return content
async def write_file(filename, content):
"""Write file asynchronously"""
async with aiofiles.open(filename, 'w') as file:
await file.write(content)
async def main():
await write_file('test.txt', 'Hello, Async World!')
content = await read_file('test.txt')
print(content)
asyncio.run(main())
6. Async Iterators
Using async for
import asyncio
class AsyncRange:
def __init__(self, start, end):
self.start = start
self.end = end
def __aiter__(self):
return self
async def __anext__(self):
if self.start >= self.end:
raise StopAsyncIteration
await asyncio.sleep(0.1) # Simulate async operation
self.start += 1
return self.start - 1
async def main():
async for i in AsyncRange(0, 5):
print(i)
asyncio.run(main())
Async Generators
import asyncio
async def async_generator(n):
"""Generate numbers asynchronously"""
for i in range(n):
await asyncio.sleep(0.5)
yield i
async def main():
async for value in async_generator(5):
print(f"Received: {value}")
asyncio.run(main())
7. Error Handling
Handling Exceptions
import asyncio
async def risky_task(name, should_fail=False):
await asyncio.sleep(1)
if should_fail:
raise ValueError(f"{name} failed!")
return f"{name} succeeded"
async def main():
try:
results = await asyncio.gather(
risky_task("Task 1", False),
risky_task("Task 2", True),
risky_task("Task 3", False),
return_exceptions=True # Return exceptions instead of raising
)
for i, result in enumerate(results, 1):
if isinstance(result, Exception):
print(f"Task {i} error: {result}")
else:
print(f"Task {i}: {result}")
except Exception as e:
print(f"Error: {e}")
asyncio.run(main())
Timeout Control
import asyncio
async def long_task():
await asyncio.sleep(5)
return "Done"
async def main():
try:
result = await asyncio.wait_for(long_task(), timeout=2.0)
print(result)
except asyncio.TimeoutError:
print("Task timed out!")
asyncio.run(main())
8. Practical Examples
8.1 Concurrent File Processing
import asyncio
import aiofiles
from pathlib import Path
async def process_file(filepath):
"""Process a single file"""
async with aiofiles.open(filepath, 'r') as file:
content = await file.read()
word_count = len(content.split())
return filepath.name, word_count
async def process_directory(directory):
"""Process all text files in directory"""
files = Path(directory).glob('*.txt')
tasks = [process_file(f) for f in files]
results = await asyncio.gather(*tasks)
print("File Analysis:")
for filename, count in results:
print(f" {filename}: {count} words")
# asyncio.run(process_directory('.'))
8.2 Rate-Limited API Calls
import asyncio
import aiohttp
from asyncio import Semaphore
async def fetch_with_limit(session, url, semaphore):
"""Fetch URL with concurrency limit"""
async with semaphore:
print(f"Fetching {url}")
async with session.get(url) as response:
return await response.text()
async def main():
urls = [f"https://httpbin.org/delay/1" for _ in range(10)]
# Limit to 3 concurrent requests
semaphore = Semaphore(3)
async with aiohttp.ClientSession() as session:
tasks = [fetch_with_limit(session, url, semaphore) for url in urls]
results = await asyncio.gather(*tasks)
print(f"Fetched {len(results)} URLs")
asyncio.run(main())
8.3 WebSocket Client
import asyncio
import websockets # pip install websockets
async def listen_to_websocket():
"""Connect to WebSocket and listen for messages"""
uri = "wss://echo.websocket.org"
async with websockets.connect(uri) as websocket:
# Send message
await websocket.send("Hello, Server!")
# Receive response
response = await websocket.recv()
print(f"Received: {response}")
asyncio.run(listen_to_websocket())
9. Async Queues
import asyncio
from asyncio import Queue
async def producer(queue, n):
"""Produce items"""
for i in range(n):
await asyncio.sleep(0.5)
await queue.put(i)
print(f"Produced: {i}")
await queue.put(None) # Signal completion
async def consumer(queue):
"""Consume items"""
while True:
item = await queue.get()
if item is None:
break
await asyncio.sleep(0.3)
print(f"Consumed: {item}")
queue.task_done()
async def main():
queue = Queue()
await asyncio.gather(
producer(queue, 5),
consumer(queue)
)
asyncio.run(main())
10. When to Use Async
Good Use Cases ✅
- I/O-bound operations: Network requests, file operations, database queries
- Web scraping: Fetching multiple URLs concurrently
- API clients: Making multiple API calls
- WebSocket connections: Real-time communication
- Concurrent database queries
Not Suitable ❌
- CPU-bound operations: Heavy calculations (use multiprocessing instead)
- Simple scripts: Overhead not worth it
- Synchronous libraries: When libraries don't support async
11. Comparison with Threading
| Feature | Asyncio | Threading |
|---|---|---|
| Concurrency | Single-threaded | Multi-threaded |
| Overhead | Low | Higher |
| CPU-bound | ❌ | ✅ |
| I/O-bound | ✅✅ | ✅ |
| Complexity | Explicit (await) | Implicit (locks, race conditions) |
| Best for | Network I/O | Blocking I/O, CPU-bound |
Summary
✅ async/await enables non-blocking concurrent operations
✅ asyncio.gather() runs multiple coroutines concurrently
✅ Perfect for I/O-bound tasks (network, files, databases)
✅ Use aiohttp for async HTTP requests
✅ async with for async context managers
✅ Not suitable for CPU-intensive computations
Next Steps
In Module 23, you'll learn:
- Working with REST APIs
- The
requestslibrary - API authentication
- Error handling and retries
Practice Exercises
- Create an async web scraper that fetches multiple URLs concurrently
- Build a rate-limited API client with concurrency control
- Implement an async file downloader with progress tracking
- Create an async queue-based task processor
- Build a WebSocket echo server using
websockets
Create an async web crawler that:
- Starts from a seed URL
- Extracts all links from pages
- Crawls discovered pages (with depth limit)
- Respects rate limiting (max concurrent requests)
- Saves results to a database asynchronously
- Handles errors gracefully