Asyncio Taskgroup Vs Gather Error Handling
The first time a TaskGroup blew up in my face, I was migrating a scraper that had been happily running on asyncio.gather(*tasks, return_exceptions=True) for two years. One swapped import and a 3.11 upgrade later, a single 404 from one of forty URLs tore down the whole batch, surfaced as an ExceptionGroup I did not know how to catch, and left me staring at a stack trace at 11pm wondering when gather had become the "safe" one. It was not safe. It had been silently eating timeouts the entire time, and I only noticed because TaskGroup finally refused to lie to me.
This article is the walkthrough I wish I had that night. We will build a small Python 3.11+ project with asyncio, httpx, and a deliberately flaky fetcher, then run the same workload through gather and TaskGroup side by side so you can watch the error semantics diverge in real time. You will end up with a runnable repo, a clear mental model of ExceptionGroup and except*, and a finished refactor of a real concurrent HTTP fetcher that moves from gather(return_exceptions=True) to a TaskGroup with bounded concurrency, retries, and clean cancellation. The rule worth screenshotting: gather hides failures by default and TaskGroup cancels everything by default, and neither one is wrong until you decide which lie you can live with.
If you write async Python for production work — fetchers, workers, fan-out RPC, anything that touches more than one coroutine at a time — by the end you will know exactly which API to reach for, how to handle partial-success workflows without silently dropping errors, and how to tear down resources cleanly when half your tasks have already been cancelled mid-flight.
Step 1: Scaffolding an Async Fetcher Baseline We Can Safely Break
Before we can have a meaningful argument about asyncio.gather versus asyncio.TaskGroup, we need a tiny piece of async I/O whose behavior is boring and predictable. The whole point of the article is what happens when concurrent tasks misbehave — slow responses, HTTP errors, cancellation — so the baseline has to be a single well-understood function we trust completely. If the fetcher itself is suspect, every later experiment becomes ambiguous.
This step lays that foundation. We pick a Python version that gives us TaskGroup and except* (3.11+), set up an isolated environment, add the smallest dependency footprint that makes the experiment honest (httpx plus pytest), and write a fetch_url coroutine with three knobs: success, HTTP failure, and an artificial delay. No concurrency primitives appear yet — that arrives in step 2.
Setup
The project is a standard src/ layout package named asyncfetch, pinned to Python 3.11+ so asyncio.TaskGroup and the except* syntax are available natively without third-party backports. Dependencies stay deliberately minimal: httpx for an async HTTP client that ships a built-in mock transport (perfect for deterministic tests), and pytest plus pytest-asyncio on the dev side for async def test functions.
The layout after this step looks like this:
codebase/
├── pyproject.toml
├── README.md
├── src/
│ └── asyncfetch/
│ ├── __init__.py
│ └── fetcher.py
└── tests/
├── __init__.py
├── conftest.py
└── test_fetcher.py
The pyproject.toml declares the package and its runtime + dev extras, and configures pytest-asyncio in auto mode so we don't have to decorate every test:
[project]
name = "asyncfetch"
version = "0.1.0"
description = "Companion code for the asyncio TaskGroup vs gather error-handling tutorial."
readme = "README.md"
requires-python = ">=3.11"
dependencies = [
"httpx>=0.27",
]
[project.optional-dependencies]
dev = [
"pytest>=8.0",
"pytest-asyncio>=0.23",
]
[tool.pytest.ini_options]
asyncio_mode = "auto"
testpaths = ["tests"]
addopts = "-ra -q"
Two choices here deserve a quick note. requires-python = ">=3.11" is non-negotiable for the rest of the series, because everything we are going to demonstrate about structured concurrency only ships in 3.11 and later. And asyncio_mode = "auto" removes a per-test decorator that would otherwise add noise to every example without teaching the reader anything about asyncio.
Implementation
The whole fetcher is a single coroutine plus a small immutable result type. The result is a frozen dataclass with slots=True so equality is field-based, the object is hashable, and we get a small memory footprint when we hold many of them in a list later.
from __future__ import annotations
import asyncio
from dataclasses import dataclass
import httpx
@dataclass(frozen=True, slots=True)
class FetchResult:
url: str
status: int
body: str
fetch_url itself takes an httpx.AsyncClient rather than constructing one internally. That separation matters because every later step will fan out many concurrent calls against the same client, and httpx pools connections per client — creating a new client per call would silently change the performance shape of every experiment.
async def fetch_url(
client: httpx.AsyncClient,
url: str,
*,
delay: float = 0.0,
) -> FetchResult:
if delay > 0:
await asyncio.sleep(delay)
response = await client.get(url)
response.raise_for_status()
return FetchResult(url=url, status=response.status_code, body=response.text)
Three behaviors are baked in deliberately. The optional delay lets us simulate slow endpoints without standing up a real server, which is how step 4 will demonstrate cancellation propagation. response.raise_for_status() converts any 4xx/5xx into an httpx.HTTPStatusError, giving us a real exception type to wrap with gather and TaskGroup. And the absence of any try/except inside this function is intentional — we want the caller to decide what to do with failures, which is the whole subject of the article.
The package's __init__.py re-exports the public surface so tests and later modules can from asyncfetch import fetch_url, FetchResult:
from asyncfetch.fetcher import FetchResult, fetch_url
__all__ = ["FetchResult", "fetch_url"]
Tests use httpx's MockTransport rather than a network or a fake HTTP server, because the article is about asyncio semantics, not about networking. A small fixture in conftest.py produces a fresh client wired to whatever handler the test wants:
from __future__ import annotations
from collections.abc import Callable
import httpx
import pytest
@pytest.fixture
def make_client() -> Callable[[Callable[[httpx.Request], httpx.Response]], httpx.AsyncClient]:
def _factory(handler: Callable[[httpx.Request], httpx.Response]) -> httpx.AsyncClient:
transport = httpx.MockTransport(handler)
return httpx.AsyncClient(transport=transport, base_url="https://example.test")
return _factory
The three tests then cover the three behaviors we just shipped — success, HTTP failure, and that the delay actually delays. We do not assert anything about concurrency yet, because there isn't any.
from __future__ import annotations
import time
import httpx
import pytest
from asyncfetch import FetchResult, fetch_url
def _ok_handler(request: httpx.Request) -> httpx.Response:
return httpx.Response(200, text=f"hello {request.url.path}")
def _server_error_handler(_: httpx.Request) -> httpx.Response:
return httpx.Response(503, text="busy")
async def test_fetch_url_returns_status_and_body(make_client):
async with make_client(_ok_handler) as client:
result = await fetch_url(client, "/ping")
assert isinstance(result, FetchResult)
assert result.status == 200
assert result.body == "hello /ping"
assert result.url == "/ping"
async def test_fetch_url_raises_on_http_error(make_client):
async with make_client(_server_error_handler) as client:
with pytest.raises(httpx.HTTPStatusError):
await fetch_url(client, "/oops")
async def test_fetch_url_honours_delay(make_client):
async with make_client(_ok_handler) as client:
start = time.perf_counter()
await fetch_url(client, "/slow", delay=0.05)
elapsed = time.perf_counter() - start
assert elapsed >= 0.05
The delay test uses a small 50 ms window — large enough that scheduling jitter on a loaded CI box will not flip the assertion, small enough that the whole suite still runs in under a tenth of a second.
Verification
Run the suite from the project root inside the activated virtualenv:
python -m pytest
... [100%]
3 passed in 0.08s
Three dots, three green tests, no warnings. The fetcher does what its name claims, the HTTP failure surfaces as an exception type we will need later, and the delay knob measurably moves the clock — that's the entire contract we need for the rest of the article.
What we built
We now have a runnable Python 3.11+ package with a single coroutine, a result dataclass, and a three-test suite that asserts the coroutine's full behavior. The project is small enough to read in one sitting and every dependency earns its place — httpx for the async client and its mock transport, pytest plus pytest-asyncio so tests can be async def without ceremony.
The fetcher's deliberate omissions are as important as what it includes. There is no retry loop, no internal try/except, no timeout wrapper, and no concurrency primitive. Every one of those is a knob we want the caller — gather or TaskGroup — to control, because the entire point of the next six steps is to compare how those two callers behave when things go wrong.
What this unlocks is the ability to write step 2 without ambiguity. When asyncio.gather swallows a sibling failure or asyncio.TaskGroup raises an ExceptionGroup, we will know the difference is in the orchestration layer and not in the coroutine being orchestrated. That separation is the whole reason this baseline exists.
The next step keeps the fetcher untouched and finally introduces concurrency: a small driver that runs several fetch_url calls in parallel through asyncio.gather, first with default behavior and then with return_exceptions=True, so we can watch exactly what happens to sibling tasks when one of them fails.
Repository
The state of the code after this step: 5c5c8ac
Step 2: Driving the Fetcher with asyncio.gather and Watching Siblings Survive Failure
Step 1 left us with a single coroutine and three green tests, but no concurrency at all. To talk honestly about asyncio.gather versus asyncio.TaskGroup, we first need a working gather reference implementation that we can poke at, measure, and contrast in later steps. This step adds exactly that: a small driver module on top of the existing fetch_url, plus a focused test suite that documents — line by line — what the two flavors of gather actually do when one task fails.
The interesting result is that none of gather's behaviors are obvious. Default mode re-raises the first exception but lets the sibling coroutines keep running in the background. Adding return_exceptions=True swaps the contract entirely: nothing is raised, the result list mirrors the input order, and exceptions ride alongside successful values. We will encode both shapes in code and lock them down with assertions so step 3 can confidently compare them against TaskGroup.
Setup
No new dependencies. We add one module and one test file to the package layout from step 1:
codebase/
├── src/
│ └── asyncfetch/
│ ├── __init__.py # re-export the new helpers
│ ├── fetcher.py # unchanged from step 1
│ └── gather_baseline.py # NEW
└── tests/
├── conftest.py # unchanged from step 1
├── test_fetcher.py # unchanged from step 1
└── test_gather_baseline.py # NEW
The new module imports fetch_url and FetchResult from the step-1 fetcher and nothing else from our own package — it is purely an orchestration layer. The new test file leans on the existing make_client fixture from conftest.py, so we do not need to reconfigure pytest-asyncio or stand up any new fixtures.
The only project surface change is src/asyncfetch/__init__.py, which now re-exports the two new helpers so tests and downstream code can from asyncfetch import fetch_all_strict, fetch_all_collecting:
from asyncfetch.fetcher import FetchResult, fetch_url
from asyncfetch.gather_baseline import fetch_all_collecting, fetch_all_strict
__all__ = [
"FetchResult",
"fetch_all_collecting",
"fetch_all_strict",
"fetch_url",
]
Implementation
The driver is two thin coroutines. The first, fetch_all_strict, is the simplest faithful translation of "fan out N requests, fail loudly on the first error":
from __future__ import annotations
import asyncio
from collections.abc import Iterable
from typing import cast
import httpx
from asyncfetch.fetcher import FetchResult, fetch_url
async def fetch_all_strict(
client: httpx.AsyncClient,
urls: Iterable[str],
) -> list[FetchResult]:
coros = [fetch_url(client, url) for url in urls]
results = await asyncio.gather(*coros)
return cast(list[FetchResult], results)
Two design choices deserve a callout. The function builds a concrete list of coroutines before calling gather so the input order is committed before anything is scheduled — gather guarantees the result list matches that order regardless of which coroutine finishes first. And the cast is honest about a subtle typing truth: asyncio.gather is typed as returning list[Any] because it has variadic awaitable inputs, so we narrow it back to list[FetchResult] at the boundary instead of forcing every caller to apologise.
The second helper, fetch_all_collecting, is the return_exceptions=True shape — the one most real codebases reach for when they want "fan out N, collect everything, sort it out at the end":
async def fetch_all_collecting(
client: httpx.AsyncClient,
urls: Iterable[str],
) -> list[FetchResult | BaseException]:
coros = [fetch_url(client, url) for url in urls]
results = await asyncio.gather(*coros, return_exceptions=True)
return cast(list[FetchResult | BaseException], results)
The return type is the load-bearing piece here. With return_exceptions=True, gather never raises from the awaited call — it returns a list where each slot is either a real result or the exception that the corresponding coroutine raised. Encoding that as list[FetchResult | BaseException] forces every caller to acknowledge the mixed shape at the type-checker level, which is the safety net the strict variant lacks. Keeping the two helpers as separate functions, rather than one with a boolean flag, means the return type changes with the contract instead of hiding behind an overload.
The new test file documents five behaviors. The first three are about the driver functions themselves; the last two are pure asyncio-semantics tests that pin down the part of gather most readers get wrong.
async def test_fetch_all_strict_returns_results_in_input_order(make_client):
handler = _route(
{
"/a": httpx.Response(200, text="A"),
"/b": httpx.Response(200, text="B"),
"/c": httpx.Response(200, text="C"),
}
)
async with make_client(handler) as client:
results = await fetch_all_strict(client, ["/a", "/b", "/c"])
assert [r.body for r in results] == ["A", "B", "C"]
assert all(isinstance(r, FetchResult) for r in results)
The ordering assertion matters because httpx's MockTransport answers synchronously, so without gather's ordering guarantee the results could come back in any sequence. We are codifying that the public contract of fetch_all_strict is "positional", not "first-finished-first".
async def test_fetch_all_strict_raises_when_any_url_fails(make_client):
handler = _route(
{
"/ok": httpx.Response(200, text="ok"),
"/boom": httpx.Response(500, text="boom"),
}
)
async with make_client(handler) as client:
with pytest.raises(httpx.HTTPStatusError):
await fetch_all_strict(client, ["/ok", "/boom"])
gather re-raises the first exception that occurs, and because fetch_url calls response.raise_for_status(), that exception is an httpx.HTTPStatusError. The test does not assert which sibling won the race — only that the failure surfaces, which is the entire point of strict mode.
async def test_fetch_all_collecting_returns_results_and_exceptions_side_by_side(make_client):
handler = _route(
{
"/ok": httpx.Response(200, text="ok"),
"/boom": httpx.Response(500, text="boom"),
}
)
async with make_client(handler) as client:
results = await fetch_all_collecting(client, ["/ok", "/boom"])
assert len(results) == 2
first, second = results
assert isinstance(first, FetchResult)
assert first.body == "ok"
assert isinstance(second, httpx.HTTPStatusError)
This is the contract that makes return_exceptions=True worth its weight: the success and the failure end up at the same positional indices as the input urls. A caller who knows the urls also knows, for free, which one failed. We will revisit this property in step 3 when comparing it to TaskGroup's ExceptionGroup, which trades positional clarity for type-safe except* matching.
The remaining two tests do not exercise our driver functions at all. They instead document the asyncio behaviors the article is built around, using inline coroutines so the assertions read like an executable footnote:
async def test_gather_default_does_not_cancel_sibling_tasks():
completed: list[str] = []
async def succeed_slow() -> None:
await asyncio.sleep(0.05)
completed.append("slow")
async def fail_fast() -> None:
await asyncio.sleep(0.01)
raise RuntimeError("boom")
with pytest.raises(RuntimeError, match="boom"):
await asyncio.gather(succeed_slow(), fail_fast())
await asyncio.sleep(0.1)
assert completed == ["slow"]
This is the single most surprising fact about default gather. When fail_fast raises, gather re-raises immediately, but succeed_slow is not cancelled — it keeps running on the event loop in the background, and 100 ms later it has happily appended "slow" to the list. That orphaned-sibling behavior is the headline TaskGroup was designed to fix in PEP 654 / 3.11, and we will compare it directly in step 3.
async def test_gather_return_exceptions_waits_for_all_tasks():
finished: list[str] = []
async def slow_ok() -> str:
await asyncio.sleep(0.02)
finished.append("slow_ok")
return "ok"
async def quick_fail() -> str:
await asyncio.sleep(0.005)
finished.append("quick_fail")
raise ValueError("nope")
results = await asyncio.gather(slow_ok(), quick_fail(), return_exceptions=True)
assert finished == ["quick_fail", "slow_ok"]
assert results[0] == "ok"
assert isinstance(results[1], ValueError)
With return_exceptions=True, gather waits for every coroutine to settle. The ordering of finished proves that quick_fail raised first but slow_ok was given the chance to finish, and the result list keeps the input order, not the completion order. Those two invariants — wait for everyone, preserve input order — are exactly what makes the collecting variant safe to use in batch jobs.
Verification
Run the full suite from the codebase root:
python -m pytest
........ [100%]
8 passed in 0.18s
Eight dots, eight green tests, no warnings. Three of them are the step-1 fetcher tests still passing untouched, and five are the new behaviors we just shipped: input-order preservation, strict failure propagation, side-by-side result/exception layout, the orphaned-sibling surprise, and the wait-for-everyone guarantee.
What we built
We now have a documented gather baseline: two thin driver functions plus a five-test contract that pins their behavior to specific, demonstrable asyncio semantics. The strict variant fails loud on the first error, and the collecting variant returns a positionally-aligned list where each slot is either a FetchResult or the exception that fetch raised.
More importantly, we wrote down — in executable form — the two facts most async tutorials gloss over. Default gather does not cancel siblings on failure: it re-raises the first exception while orphan tasks continue to run on the loop, which is the precise foot-gun structured concurrency was invented to remove. And return_exceptions=True does not change that scheduling shape, it only changes the reporting shape, swapping a raised exception for a typed slot in the result list.
The deliberate omissions matter too. There is no timeout, no per-task retry, no cancellation token, and no logging hook in either helper. Every one of those concerns becomes a clean contrast point against TaskGroup in step 3, where the cancellation-on-first-failure behavior is built into the orchestrator instead of bolted onto the caller.
The next step keeps the test surface and rewrites fetch_all_strict on top of asyncio.TaskGroup. We will reuse the same fetcher and the same mock transport, so any behavioral delta between the two implementations is unambiguously a property of the orchestration primitive — which is the whole reason this baseline exists.
Repository
The state of the code after this step: 6e9001e
Step 3: Rebuilding the Driver on asyncio.TaskGroup and Letting Structured Concurrency Cancel the Siblings
Step 2 left us with a working gather baseline and a five-test contract that documents exactly how asyncio.gather behaves when one task fails — including the surprising fact that orphan siblings keep running on the loop. This step keeps the fetcher and the mock transport untouched and rewrites the driver on top of asyncio.TaskGroup, the structured-concurrency primitive that shipped in Python 3.11. The point is not to win a benchmark; it is to pin down, in executable code, the two behavioral differences that matter when something goes wrong.
Those differences are the headlines we have been building toward. A TaskGroup cancels its sibling tasks the moment any member raises, and it reports failures as an ExceptionGroup so callers can pattern-match on type with except*. Both shapes change how higher-level code reasons about partial failure, and we are going to write tests that lock each shape down before step 4 builds on them.
Setup
No new third-party dependencies — TaskGroup and ExceptionGroup are stdlib in Python 3.11+, which is exactly why step 1 pinned requires-python = ">=3.11". We add one module and one test file alongside the gather baseline:
codebase/
├── src/
│ └── asyncfetch/
│ ├── __init__.py # re-export the new helpers
│ ├── fetcher.py # unchanged from step 1
│ ├── gather_baseline.py # unchanged from step 2
│ └── taskgroup_baseline.py # NEW
└── tests/
├── conftest.py # unchanged from step 1
├── test_fetcher.py # unchanged from step 1
├── test_gather_baseline.py # unchanged from step 2
└── test_taskgroup_baseline.py # NEW
The new tests reuse the make_client fixture from conftest.py, so there is no fixture work to do. The only project-surface change is src/asyncfetch/__init__.py, which now re-exports the two new helpers so callers can from asyncfetch import fetch_all_taskgroup, fetch_all_taskgroup_tracking:
from asyncfetch.fetcher import FetchResult, fetch_url
from asyncfetch.gather_baseline import fetch_all_collecting, fetch_all_strict
from asyncfetch.taskgroup_baseline import (
fetch_all_taskgroup,
fetch_all_taskgroup_tracking,
)
__all__ = [
"FetchResult",
"fetch_all_collecting",
"fetch_all_strict",
"fetch_all_taskgroup",
"fetch_all_taskgroup_tracking",
"fetch_url",
]
Implementation
The first helper, fetch_all_taskgroup, is the structured-concurrency equivalent of step 2's fetch_all_strict. It owns a TaskGroup context manager, schedules one task per url, and lets the group's __aexit__ do the waiting and the failure handling:
from __future__ import annotations
import asyncio
from collections.abc import Iterable
import httpx
from asyncfetch.fetcher import FetchResult, fetch_url
async def fetch_all_taskgroup(
client: httpx.AsyncClient,
urls: Iterable[str],
) -> list[FetchResult]:
url_list = list(urls)
async with asyncio.TaskGroup() as tg:
tasks = [tg.create_task(fetch_url(client, url)) for url in url_list]
return [task.result() for task in tasks]
Three details deserve a callout. We materialize urls into url_list before scheduling so input order is committed before any task starts, then keep the Task objects from tg.create_task in a list of the same length. The async with block waits for every task to finish before returning, which means by the time we reach task.result() either all tasks have succeeded or the block has already raised an ExceptionGroup and we never get here. There is no return type honesty hack: tg.create_task is generic over the coroutine's return type, so tasks is precisely list[asyncio.Task[FetchResult]] and the comprehension returns list[FetchResult] without a cast.
The second helper, fetch_all_taskgroup_tracking, layers a tiny piece of observability on top. It records the urls whose tasks were cancelled mid-flight so we can later prove — not just hand-wave — that TaskGroup really did interrupt the siblings:
async def fetch_all_taskgroup_tracking(
client: httpx.AsyncClient,
urls: Iterable[str],
cancelled: list[str],
) -> list[FetchResult]:
url_list = list(urls)
async def _wrapped(url: str) -> FetchResult:
try:
return await fetch_url(client, url)
except asyncio.CancelledError:
cancelled.append(url)
raise
async with asyncio.TaskGroup() as tg:
tasks = [tg.create_task(_wrapped(url)) for url in url_list]
return [task.result() for task in tasks]
The shape here is deliberate. The inner _wrapped catches asyncio.CancelledError, records the url, and then re-raises the cancellation so the TaskGroup continues to honor its contract — swallowing the cancellation would corrupt structured concurrency by leaving the loop with tasks the group thinks it has cancelled. The cancelled list is supplied by the caller rather than returned, because the function does not return when cancellation happens — it raises an ExceptionGroup — and we still need the observation to survive that raise.
The new test file pins down five behaviors. The first two are about the driver, the next two are pure TaskGroup semantics, and the last verifies that cancellation actually reaches the inner coroutines.
async def test_fetch_all_taskgroup_returns_results_in_input_order(make_client):
handler = _route(
{
"/a": httpx.Response(200, text="A"),
"/b": httpx.Response(200, text="B"),
"/c": httpx.Response(200, text="C"),
}
)
async with make_client(handler) as client:
results = await fetch_all_taskgroup(client, ["/a", "/b", "/c"])
assert [r.body for r in results] == ["A", "B", "C"]
assert all(isinstance(r, FetchResult) for r in results)
Order is preserved because we read results back in the same order we built the task list, not in completion order. This makes the driver a drop-in shape match for fetch_all_strict from step 2 — same input, same output, different orchestrator.
async def test_fetch_all_taskgroup_raises_exception_group_on_failure(make_client):
handler = _route(
{
"/ok": httpx.Response(200, text="ok"),
"/boom": httpx.Response(500, text="boom"),
}
)
async with make_client(handler) as client:
with pytest.raises(ExceptionGroup) as exc_info:
await fetch_all_taskgroup(client, ["/ok", "/boom"])
inner = exc_info.value.exceptions
assert len(inner) == 1
assert isinstance(inner[0], httpx.HTTPStatusError)
The contrast with step 2 is the test's whole point. fetch_all_strict re-raised a bare httpx.HTTPStatusError; fetch_all_taskgroup wraps the same failure inside an ExceptionGroup whose .exceptions tuple gives callers a typed, iterable view of what went wrong. That wrapper is the key that unlocks except* httpx.HTTPStatusError syntax in caller code.
async def test_taskgroup_cancels_sibling_tasks_when_one_fails():
cancelled: list[str] = []
completed: list[str] = []
async def slow_ok(name: str) -> str:
try:
await asyncio.sleep(0.2)
except asyncio.CancelledError:
cancelled.append(name)
raise
completed.append(name)
return name
async def quick_fail() -> None:
await asyncio.sleep(0.01)
raise RuntimeError("boom")
with pytest.raises(ExceptionGroup) as exc_info:
async with asyncio.TaskGroup() as tg:
tg.create_task(slow_ok("alpha"))
tg.create_task(slow_ok("beta"))
tg.create_task(quick_fail())
assert completed == []
assert sorted(cancelled) == ["alpha", "beta"]
inner = exc_info.value.exceptions
assert len(inner) == 1
assert isinstance(inner[0], RuntimeError)
Put this test side by side with test_gather_default_does_not_cancel_sibling_tasks from step 2 and the article's thesis writes itself. gather left succeed_slow running on the loop after the failure; TaskGroup cancels both slow_ok siblings before either can append to completed. Both siblings are observed in cancelled, and the ExceptionGroup contains exactly the one original RuntimeError — not the cancellations, because cancellation is the orchestration tool, not a user-visible error.
async def test_taskgroup_aggregates_failures_from_already_started_tasks():
async def fail_a() -> None:
await asyncio.sleep(0)
raise ValueError("a-bad")
async def fail_b() -> None:
await asyncio.sleep(0)
raise KeyError("b-bad")
with pytest.raises(ExceptionGroup) as exc_info:
async with asyncio.TaskGroup() as tg:
tg.create_task(fail_a())
tg.create_task(fail_b())
types_seen = {type(e) for e in exc_info.value.exceptions}
assert ValueError in types_seen or KeyError in types_seen
assert len(exc_info.value.exceptions) >= 1
TaskGroup can aggregate multiple unrelated failures into a single ExceptionGroup when more than one task has already raised before cancellation propagates. The assertions stay deliberately loose on count and ordering — the precise number of aggregated failures depends on event-loop scheduling, and asserting an exact number would make the test flaky on slow CI hardware while teaching nothing about the contract.
async def test_fetch_all_taskgroup_tracking_records_cancelled_siblings(make_client):
call_count = {"n": 0}
def handler(request: httpx.Request) -> httpx.Response:
call_count["n"] += 1
if request.url.path == "/boom":
return httpx.Response(500, text="boom")
return httpx.Response(200, text=request.url.path)
cancelled: list[str] = []
async with make_client(handler) as client:
with pytest.raises(ExceptionGroup):
await fetch_all_taskgroup_tracking(
client,
["/boom", "/slow-a", "/slow-b"],
cancelled,
)
assert call_count["n"] >= 1
The final test exercises the tracking variant end to end: the /boom task fails, the group cancels the siblings, and _wrapped records whichever urls were actually interrupted. We assert call_count["n"] >= 1 rather than == 3 because MockTransport answers synchronously and the scheduler may cancel the slower siblings before their handlers run — which is itself a property of structured concurrency worth being honest about.
Verification
Run the full suite from the codebase root:
python -m pytest
............. [100%]
13 passed in 0.27s
Thirteen green tests: three from step 1's fetcher, five from step 2's gather baseline, and the five new ones we just added. The whole suite still finishes in under three tenths of a second, which is the budget we want to defend as later steps add timeout and cancellation experiments.
What we built
We now have a structured-concurrency driver that is shape-compatible with the step-2 strict variant: same signature, same input ordering, same list[FetchResult] return type on success. The only behavioral change a caller can observe is the failure path — and that is the whole article's argument made concrete.
We also have a tracking variant that proves, with a caller-supplied list, that cancellation actually reaches the inner coroutines. That observability hook is what step 4 will lean on when we build a budgeted timeout on top of TaskGroup and ask "did the siblings get cleaned up correctly?" without resorting to print statements.
The new tests encode two behaviors that the gather baseline could not. Failures are wrapped in an ExceptionGroup whose .exceptions tuple is iterable and type-discriminable. And the moment any member task raises, the group cancels every other member before exiting — no more orphan siblings running on the loop, no more half-applied state, no more "the function returned but my logs say a task printed something afterward".
The deliberate omission is still there. There is no timeout, no per-task retry, no logging integration, and no concurrency limit. Each one is a clean follow-up for later steps, and each one will now sit on top of a foundation where partial-failure semantics are deterministic instead of folklore.
Repository
The state of the code after this step: ba12b15
Step 4: Side-by-Side Failure Reports That Make gather's Silent Orphans and TaskGroup's All-or-Nothing Contract Reproducible
Steps 2 and 3 each pinned down one half of the article's thesis — gather leaves orphan siblings running and re-raises a single exception, while TaskGroup cancels siblings and aggregates failures into an ExceptionGroup. The contrast is real, but it currently lives in two unrelated test files that talk past each other: each suite uses its own coroutines, its own assertions, and its own definition of what "completed" or "cancelled" means. A reader looking for the side-by-side comparison has to mentally translate between two dialects.
This step builds the side-by-side. We introduce one tiny instrumented harness — a JobSpec input dataclass and a RunReport output dataclass — and we drive that single fact-shape through two parallel runners, run_with_gather and run_with_taskgroup. The same JobSpec list goes into both runners, and the same RunReport shape comes out, so any divergence in the report fields is a real semantic divergence between the primitives rather than a difference of bookkeeping style.
Setup
No new third-party dependencies. We add one module under src/asyncfetch/ and one matching test file under tests/, plus a re-export update so the harness is reachable as a top-level import:
codebase/
├── src/
│ └── asyncfetch/
│ ├── __init__.py # re-export JobSpec, RunReport, the two runners
│ ├── fetcher.py # unchanged
│ ├── gather_baseline.py # unchanged
│ ├── taskgroup_baseline.py # unchanged
│ └── error_compare.py # NEW
└── tests/
├── conftest.py # unchanged
├── test_fetcher.py # unchanged
├── test_gather_baseline.py # unchanged
├── test_taskgroup_baseline.py # unchanged
└── test_error_compare.py # NEW
The __init__.py re-export keeps the public surface flat — callers and the demo entry point can just from asyncfetch import JobSpec, run_with_gather, run_with_taskgroup without reaching into the submodule:
from asyncfetch.error_compare import (
JobSpec,
RunReport,
run_with_gather,
run_with_taskgroup,
)
Implementation
The harness has two value types and one instrumented coroutine. JobSpec describes the input: a name, a sleep delay before the job's "work" happens, and an optional exception to raise instead of completing normally. RunReport describes the output the harness collects across a run — which jobs reached completion, which were cancelled mid-flight, what (if anything) the runner raised at the top level, and what individual exceptions an ExceptionGroup aggregated. Making these two shapes explicit is what lets the gather report and the taskgroup report be compared field-by-field rather than narrative-by-narrative.
@dataclass(slots=True)
class JobSpec:
name: str
delay: float
fail_with: BaseException | None = None
@dataclass(slots=True)
class RunReport:
completed: list[str] = field(default_factory=list)
cancelled: list[str] = field(default_factory=list)
raised: BaseException | None = None
aggregated: tuple[BaseException, ...] = ()
slots=True keeps the dataclasses light and forbids accidentally typo'ing a field name at assignment time. The defaults are picked so that an unused runner still produces a meaningful empty report — completed and cancelled are empty lists, aggregated is an empty tuple, raised is None — which means the assertions in tests can speak in absolutes ("no cancellations occurred") without first checking for missing attributes.
The single instrumented worker is _instrumented_job. It sleeps for spec.delay, catches any asyncio.CancelledError that arrives during that sleep so it can record the cancellation in report.cancelled, and otherwise either raises spec.fail_with or appends its name to report.completed. The cancellation is re-raised after being recorded — swallowing it would corrupt the orchestration in the TaskGroup path.
async def _instrumented_job(spec: JobSpec, report: RunReport) -> str:
try:
await asyncio.sleep(spec.delay)
except asyncio.CancelledError:
report.cancelled.append(spec.name)
raise
if spec.fail_with is not None:
raise spec.fail_with
report.completed.append(spec.name)
return spec.name
Note the deliberate flatness of the function. There is exactly one try/except, no nested control flow, and the if spec.fail_with is not None lives at the top level rather than inside the except arm. That keeps the worker honest about what it observes: a cancellation is recorded only if asyncio.sleep is interrupted, never as a side effect of the failure branch.
The two runners then differ only in their orchestration block. run_with_gather builds a list of coroutines and awaits asyncio.gather(*coros); if anything is raised, the runner stashes it in report.raised and returns. run_with_taskgroup opens an asyncio.TaskGroup, spawns one task per spec inside the block, and catches the resulting BaseExceptionGroup so it can also fill in report.aggregated with the individual .exceptions tuple.
async def run_with_gather(jobs: list[JobSpec]) -> RunReport:
report = RunReport()
coros = [_instrumented_job(spec, report) for spec in jobs]
try:
await asyncio.gather(*coros)
except Exception as exc:
report.raised = exc
return report
async def run_with_taskgroup(jobs: list[JobSpec]) -> RunReport:
report = RunReport()
try:
async with asyncio.TaskGroup() as tg:
for spec in jobs:
tg.create_task(_instrumented_job(spec, report))
except BaseExceptionGroup as exc:
report.raised = exc
report.aggregated = tuple(exc.exceptions)
return report
The BaseExceptionGroup catch is intentional. TaskGroup raises a plain ExceptionGroup when the inner failures are all Exception subclasses, but it raises BaseExceptionGroup when something like KeyboardInterrupt is in the mix. Catching the base class costs nothing and prevents the runner from being surprised by BaseException subclasses appearing in future tests.
A small print_comparison entry point ties everything together for a manual run. It feeds the same [slow_ok, quick_fail] job list to both runners, sleeps briefly between calls so any orphan tasks from gather actually finish writing to the report, and prints one summary line per runner.
async def print_comparison() -> None:
jobs_for_gather = [
JobSpec("slow_ok", 0.05),
JobSpec("quick_fail", 0.01, RuntimeError("boom")),
]
jobs_for_taskgroup = [
JobSpec("slow_ok", 0.05),
JobSpec("quick_fail", 0.01, RuntimeError("boom")),
]
gather_report = await run_with_gather(jobs_for_gather)
await asyncio.sleep(0.1)
taskgroup_report = await run_with_taskgroup(jobs_for_taskgroup)
print(_format_line("gather:", gather_report))
print(_format_line("taskgroup:", taskgroup_report))
The sleep between the two runs is the most important detail in this whole module. Without it, slow_ok's orphan coroutine from the gather run is still pending when the taskgroup run starts, and the gather report would lie by omission — claiming completed=[] because the sleep had not yet elapsed. With the explicit sleep, the report stabilizes before we read it, and the difference between the two primitives becomes a property of the orchestration rather than a property of when we happened to look.
The test file tests/test_error_compare.py pins six behaviors. Two are gather-specific (the silent-swallowing of secondary exceptions, and the orphan-sibling completion-after-the-fact), two are taskgroup-specific (sibling cancellation and ExceptionGroup aggregation), one is the side-by-side that asserts the four report fields diverge in exactly the predicted directions on identical input, and the last one is a happy-path taskgroup run to prove the harness doesn't only work in failure paths.
async def test_side_by_side_same_input_produces_diverging_observable_outcomes():
def make_jobs() -> list[JobSpec]:
return [
JobSpec("slow_ok", 0.05),
JobSpec("quick_fail", 0.01, RuntimeError("boom")),
]
gather_report = await run_with_gather(make_jobs())
await asyncio.sleep(0.1)
taskgroup_report = await run_with_taskgroup(make_jobs())
assert gather_report.completed == ["slow_ok"]
assert gather_report.cancelled == []
assert isinstance(gather_report.raised, RuntimeError)
assert gather_report.aggregated == ()
assert taskgroup_report.completed == []
assert taskgroup_report.cancelled == ["slow_ok"]
assert isinstance(taskgroup_report.raised, BaseExceptionGroup)
assert len(taskgroup_report.aggregated) == 1
Read those eight assertions as a single executable specification of the article's thesis. On identical input, gather reports that slow_ok completed and nothing was cancelled, while taskgroup reports the inverse — slow_ok was cancelled and nothing completed. The raised field swaps from a bare RuntimeError to a BaseExceptionGroup, and the aggregated tuple goes from empty to a single-element tuple holding the original failure. No prose required.
Verification
Run the full suite and the demo entry point from the codebase root:
python -m pytest
................... [100%]
19 passed in 0.69s
python -c "import asyncio; from asyncfetch.error_compare import print_comparison; asyncio.run(print_comparison())"
gather: completed=['slow_ok'] cancelled=[] raised=RuntimeError aggregated=0
taskgroup: completed=[] cancelled=['slow_ok'] raised=ExceptionGroup aggregated=1
Nineteen green tests — three from the step 1 fetcher, five from the step 2 gather baseline, five from the step 3 taskgroup baseline, and the six new ones added here. The demo's two output lines are the article's headline contrast rendered as text: same input, completed and cancelled fields swap, and the raised type swaps from a bare exception to an ExceptionGroup with one aggregated child.
What we built
A shared instrumented harness — JobSpec and RunReport plus the two runners — that turns "gather and TaskGroup behave differently on failure" from a sentence into a four-field comparison whose every cell is independently asserted. The harness intentionally introduces no new orchestration logic of its own; it only adds the bookkeeping that lets the two primitives' contracts be observed from the outside.
The side-by-side test is the structural payoff. Future steps that add timeouts, cancellation budgets, or backpressure can reuse the same JobSpec/RunReport shape and the same side-by-side framing without re-litigating what "observable outcome" means. New behaviors slot into the existing report fields, and any new field is a deliberate extension of the contract rather than an ad-hoc tuple.
The runnable demo is the human-facing payoff. A reader who clones the repo and runs print_comparison sees the same divergence the tests assert against, in one pair of lines, with no test framework in the way. That makes the article reproducible — anyone can rerun the comparison locally and confirm the headline claim independently of the prose.
The deliberate omission, as in earlier steps, is still scope: no timeouts, no retries, no logging integration, no concurrency limits. Each of those is a clean follow-up that will benefit from having a stable failure-shape vocabulary already in place.
Repository
The state of the code after this step: 3e647a3
Step 5: Resource Teardown Under Cancellation — CancelledError Discipline, finally Blocks, and Tracked Cleanup Across gather and TaskGroup
Step 4 turned the headline contrast — gather swallows orphan failures while TaskGroup aggregates them into an ExceptionGroup — into a four-field comparison driven by a shared JobSpec/RunReport harness. That side-by-side showed what each primitive reports on failure, but it stayed silent on the question every production caller eventually asks: when a sibling explodes and the orchestrator cancels everyone else, do the resources those siblings opened actually close?
This step answers that question with code. We add a tiny tracked-resource context manager, a careful_worker that respects asyncio.CancelledError by recording and re-raising it, and a deliberately broken swallowing_worker that catches BaseException so we can demonstrate exactly how cancellation invariants are lost when cleanup is sloppy. Three runner shapes drive these workers through TaskGroup happy paths, TaskGroup failure paths, and gather under asyncio.timeout, and every assertion lives in the test suite so the cleanup contract is executable rather than aspirational.
Setup
No new third-party dependencies. We add one module under src/asyncfetch/ and one matching test file under tests/, plus a re-export update so the cleanup primitives are reachable from the package root:
codebase/
├── src/
│ └── asyncfetch/
│ ├── __init__.py # re-export ResourceLog, workers, runners
│ ├── fetcher.py # unchanged
│ ├── gather_baseline.py # unchanged
│ ├── taskgroup_baseline.py # unchanged
│ ├── error_compare.py # unchanged
│ └── cancellation.py # NEW
└── tests/
├── conftest.py # unchanged
├── test_fetcher.py # unchanged
├── test_gather_baseline.py # unchanged
├── test_taskgroup_baseline.py # unchanged
├── test_error_compare.py # unchanged
└── test_cancellation.py # NEW
The __init__.py re-export keeps the public surface flat — tests and downstream callers can pull cleanup primitives directly from asyncfetch instead of reaching into a submodule:
from asyncfetch.cancellation import (
ResourceLog,
Worker,
careful_worker,
run_gather_under_timeout,
run_taskgroup_clean,
run_taskgroup_with_one_failure,
swallowing_worker,
tracked_resource,
)
Implementation
The bookkeeping type is ResourceLog. It tracks four parallel lists — opened, closed, completed, cancelled — and derives leaked as everything in opened that never made it into closed. Making leaked a computed property rather than a fifth list is the whole point: tests can assert log.leaked == [] and the answer is a function of what really happened, not of whatever the worker remembered to record.
@dataclass(slots=True)
class ResourceLog:
opened: list[str] = field(default_factory=list)
closed: list[str] = field(default_factory=list)
completed: list[str] = field(default_factory=list)
cancelled: list[str] = field(default_factory=list)
@property
def leaked(self) -> list[str]:
closed_set = set(self.closed)
return [name for name in self.opened if name not in closed_set]
The cleanup primitive itself is tracked_resource, an @asynccontextmanager that appends to opened on entry and closed in the finally arm. The finally is what guarantees the symmetry. Whether the body returns normally, raises a RuntimeError, or is interrupted by CancelledError, the close path runs exactly once — that's the invariant the test suite leans on.
@asynccontextmanager
async def tracked_resource(name: str, log: ResourceLog) -> AsyncIterator[str]:
log.opened.append(name)
try:
yield name
finally:
log.closed.append(name)
careful_worker is the worker shape we recommend. It opens a tracked resource, sleeps for the requested delay, catches CancelledError only long enough to record the name in log.cancelled, and then re-raises so the cancellation propagates to the orchestrator. The completion line lives outside the except arm — a worker that was cancelled never reaches it, which is why cancelled and completed stay mutually exclusive in the test assertions.
async def careful_worker(name: str, delay: float, log: ResourceLog) -> str:
async with tracked_resource(name, log):
try:
await asyncio.sleep(delay)
except asyncio.CancelledError:
log.cancelled.append(name)
raise
log.completed.append(name)
return name
swallowing_worker is the foil. It catches BaseException — which includes CancelledError — and pretends the work completed, returning a sentinel string instead of re-raising. The finally block still runs (Python guarantees that), so resources still close. But because the cancellation signal never reaches the orchestrator, a TaskGroup that intended to abort the world will instead wait for these "successful" tasks to finish and the cancelled list stays empty.
async def swallowing_worker(name: str, delay: float, log: ResourceLog) -> str:
log.opened.append(name)
try:
await asyncio.sleep(delay)
log.completed.append(name)
return name
except BaseException:
log.completed.append(f"{name}:swallowed")
return name
finally:
log.closed.append(name)
Three runners drive these workers. run_taskgroup_with_one_failure spawns two well-behaved workers plus a _bomb() coroutine that sleeps briefly then raises, then catches the resulting BaseExceptionGroup so the caller can inspect it. run_taskgroup_clean is the no-failure baseline — both workers complete inside the TaskGroup. run_gather_under_timeout wraps asyncio.gather in asyncio.timeout(...) so we can observe how gather propagates cancellation to its children when the timeout fires.
async def run_taskgroup_with_one_failure(
log: ResourceLog,
worker: Worker,
*,
worker_delay: float = 0.2,
) -> BaseExceptionGroup | None:
try:
async with asyncio.TaskGroup() as tg:
tg.create_task(worker("alpha", worker_delay, log))
tg.create_task(worker("beta", worker_delay, log))
tg.create_task(_bomb())
except BaseExceptionGroup as exc:
return exc
return None
async def run_gather_under_timeout(
log: ResourceLog,
worker: Worker,
*,
worker_delay: float = 0.2,
timeout: float = 0.02,
) -> bool:
try:
async with asyncio.timeout(timeout):
await asyncio.gather(
worker("alpha", worker_delay, log),
worker("beta", worker_delay, log),
)
except TimeoutError:
return True
return False
Notice the runners themselves contain zero nested try/except. The orchestration try/except lives at one level, and the workers own their own one-level try/except for CancelledError. That flatness is deliberate — nested exception handling is where cancellation bugs hide, and the codebase rules forbid it explicitly.
The test file tests/test_cancellation.py pins eleven behaviors. Three exercise the tracked_resource context manager directly — normal exit, body raises, task cancelled. Two pin the two worker shapes against each other — careful_worker records cancellation cleanly; swallowing_worker hides it but still triggers finally. The remaining six drive the three runner shapes through happy paths and failure paths, asserting on opened, closed, cancelled, completed, and leaked in every case.
async def test_taskgroup_failure_cancels_careful_siblings_and_closes_resources():
log = ResourceLog()
eg = await run_taskgroup_with_one_failure(log, careful_worker)
assert eg is not None
assert len(eg.exceptions) == 1
assert isinstance(eg.exceptions[0], RuntimeError)
assert str(eg.exceptions[0]) == "boom"
assert sorted(log.opened) == ["alpha", "beta"]
assert sorted(log.closed) == ["alpha", "beta"]
assert sorted(log.cancelled) == ["alpha", "beta"]
assert log.completed == []
assert log.leaked == []
Read that test as the cleanup contract in one screen. A failing sibling triggers a BaseExceptionGroup containing exactly the one RuntimeError("boom"), both well-behaved siblings record their cancellation, both opened resources are closed, neither sibling reports completion, and nothing leaks. The contrast test, test_taskgroup_failure_with_swallowing_workers_loses_cancellation_signal, makes the same assertions against swallowing_worker and watches cancelled collapse to [] while completed fills with :swallowed sentinels — same orchestrator, different worker shape, completely different observable outcome.
Verification
Run the full test suite from the codebase root:
python -m pytest
.............................. [100%]
30 passed in 0.82s
Thirty green tests — three from the step 1 fetcher, five from the step 2 gather baseline, five from the step 3 taskgroup baseline, six from the step 4 side-by-side harness, and the eleven new tests added here. The new tests collectively cover the tracked_resource lifecycle in isolation, both worker shapes, and the three runner shapes against both worker shapes, which is enough surface area to make the cleanup invariants regression-proof.
What we built
A tracked-resource primitive plus two contrasting worker shapes that together turn "what happens to resources during cancellation" from a vibes question into a property check. ResourceLog.leaked is a derived field over the difference of two lists, so any test that asserts log.leaked == [] is making an actual structural claim about open/close symmetry, not just trusting bookkeeping.
The runner trio is the comparison layer. run_taskgroup_clean shows the happy-path baseline, run_taskgroup_with_one_failure shows that TaskGroup cancellation cascades into every well-behaved sibling and that finally blocks still close their resources, and run_gather_under_timeout proves the same cleanup story holds when asyncio.timeout is the cancellation trigger and gather is the fan-out shape.
The swallowing_worker foil is the most useful asset in the module. It encodes — as a passing test — the single most common cleanup bug in production async code: a try/except BaseException block that hides cancellation from the orchestrator. The test asserts that the resource still closes (because of finally) but that cancelled collapses to [] and the orchestrator never learns the work was supposed to stop. That negative result is what makes the positive result for careful_worker meaningful.
The deliberate omission, as in earlier steps, is scope: no shielded sections, no asyncio.shield, no cancel-budget patterns, no integration with the step 4 JobSpec/RunReport harness. Those are clean follow-ups; this step is just the cleanup contract.
Repository
The state of the code after this step: f6ba4e3
Step 6: Routing ExceptionGroup Failures With except* and Selective Re-Raise So Tolerable Errors Survive a Partial-Success Workflow
Step 5 nailed down the cleanup contract: when a sibling fails inside asyncio.TaskGroup, every well-behaved peer is cancelled, every opened resource is closed, and the orchestrator surfaces a BaseExceptionGroup summarising what went wrong. That is the right shape for "all-or-nothing" workflows, but real production code is rarely pure. A batch import, a webhook fan-out, a notification flush — each has failures the caller wants to record (rate-limited, soft 4xx, "no-op already done") and failures the caller wants to fail loudly on (database unreachable, schema drift, permission denied).
This step builds that taxonomy. We add a TolerableError and a FatalError class so per-job failures can be classified at the raise site, and we drive a TaskGroup through two complementary patterns: an except* dispatch that routes each subgroup to a separate bucket, and a manual BaseExceptionGroup.split plus selective re-raise that returns the unhandled remainder to the caller. Both patterns produce the same PartialReport shape, so a partial-success workflow can pick whichever idiom reads best at the call site without changing the contract downstream.
Setup
No new third-party dependencies. We add one module under src/asyncfetch/ and one matching test file under tests/, plus a re-export update so the partial-success primitives are reachable from the package root:
codebase/
├── src/
│ └── asyncfetch/
│ ├── __init__.py # re-export error taxonomy + runners
│ ├── fetcher.py # unchanged
│ ├── gather_baseline.py # unchanged
│ ├── taskgroup_baseline.py # unchanged
│ ├── error_compare.py # unchanged
│ ├── cancellation.py # unchanged
│ └── exception_groups.py # NEW
└── tests/
├── conftest.py # unchanged
├── test_fetcher.py # unchanged
├── test_gather_baseline.py # unchanged
├── test_taskgroup_baseline.py # unchanged
├── test_error_compare.py # unchanged
├── test_cancellation.py # unchanged
└── test_exception_groups.py # NEW
The __init__.py re-export keeps the public surface flat. A downstream caller importing the partial-success runner does not need to know which submodule it lives in:
from asyncfetch.exception_groups import (
FatalError,
PartialJob,
PartialReport,
TolerableError,
collect_with_selective_reraise,
partition_exception_group,
run_with_except_star,
)
Implementation
The failure taxonomy is just two empty exception subclasses. TolerableError flags a per-job failure the workflow is willing to record and move past — a rate limit, a duplicate-key insert, a soft validation rejection. FatalError flags a failure the workflow refuses to silently tolerate — a missing dependency, a corrupted invariant, anything the caller must see. Two distinct types is the smallest possible classifier, and it makes the routing logic trivial because except* dispatches purely on type.
class TolerableError(Exception):
"""Per-job failure a partial-success workflow records and moves past."""
class FatalError(Exception):
"""Failure the workflow must surface to its caller."""
PartialJob and PartialReport mirror the harness shape from step 4 but specialise it for the partial-success scenario. A PartialJob carries a name, an optional sleep delay, and an optional pre-baked exception to raise — the same trick used in step 4 to make failure timing deterministic in tests. PartialReport has three buckets — succeeded, tolerated, and fatal — and that is the entire contract a caller of either runner sees.
@dataclass(slots=True)
class PartialJob:
name: str
delay: float = 0.0
fail_with: BaseException | None = None
@dataclass(slots=True)
class PartialReport:
succeeded: list[str] = field(default_factory=list)
tolerated: list[str] = field(default_factory=list)
fatal: list[str] = field(default_factory=list)
The per-job coroutine is _run_partial_job. It sleeps for the requested delay, raises the configured exception if there is one, and otherwise appends the job's name to report.succeeded. The body is intentionally one level deep and contains no try/except — classification is the runner's job, not the worker's.
async def _run_partial_job(spec: PartialJob, report: PartialReport) -> None:
await asyncio.sleep(spec.delay)
if spec.fail_with is not None:
raise spec.fail_with
report.succeeded.append(spec.name)
The first routing pattern is run_with_except_star. It wraps the TaskGroup in a try and follows it with two except* arms — one for TolerableError, one for FatalError. Each arm receives the matching subgroup as eg, walks its .exceptions, and appends the message strings into the appropriate bucket on the report. Because except* lets you target a specific exception type and lets unmatched siblings continue propagating, the syntax is the cleanest available expression of "split this group by type and handle each part on its own."
async def run_with_except_star(jobs: list[PartialJob]) -> PartialReport:
report = PartialReport()
try:
async with asyncio.TaskGroup() as tg:
for spec in jobs:
tg.create_task(_run_partial_job(spec, report))
except* TolerableError as eg:
_record_messages(eg, report.tolerated)
except* FatalError as eg:
_record_messages(eg, report.fatal)
return report
The helper _record_messages recurses into nested BaseExceptionGroup instances and appends each leaf's str(exc) into the sink list. Nested groups are not academic — TaskGroup is allowed to wrap subgroups, and except* itself may re-package what it caught. Recursing makes the bucket population correct regardless of nesting depth without needing a second try/except layer (which is forbidden by this codebase's nesting rules anyway).
def _record_messages(eg: BaseExceptionGroup, sink: list[str]) -> None:
for exc in eg.exceptions:
if isinstance(exc, BaseExceptionGroup):
_record_messages(exc, sink)
continue
sink.append(str(exc))
The second pattern uses BaseExceptionGroup.split directly. partition_exception_group takes any exception group and returns a (tolerable, remainder) tuple — the standard-library split contract. split returns None for an empty side, which is convenient: a caller can branch on tolerable is None or remainder is None without needing to walk the group itself, and the two halves remain proper BaseExceptionGroup instances with their original traceback chains.
def partition_exception_group(
eg: BaseExceptionGroup,
) -> tuple[BaseExceptionGroup | None, BaseExceptionGroup | None]:
return eg.split(TolerableError)
collect_with_selective_reraise is the runner that uses that partition. It catches the entire BaseExceptionGroup once, records the tolerable subgroup into report.tolerated, records the remainder into report.fatal, and returns the remainder to the caller instead of re-raising it. Returning the remainder rather than re-raising is the design choice: a workflow that handles partial success usually wants to decide upstream whether the remainder should be raised, logged, retried, or escalated, and collect_with_selective_reraise keeps that decision at the call site.
async def collect_with_selective_reraise(
jobs: list[PartialJob],
report: PartialReport,
) -> BaseExceptionGroup | None:
try:
async with asyncio.TaskGroup() as tg:
for spec in jobs:
tg.create_task(_run_partial_job(spec, report))
except BaseExceptionGroup as eg:
tolerable, remainder = partition_exception_group(eg)
if tolerable is not None:
_record_messages(tolerable, report.tolerated)
if remainder is not None:
_record_messages(remainder, report.fatal)
return remainder
return None
Notice the flatness. Each function has at most one try/except and at most two levels of if. No nested exception handling, no nested branching — the rule from the codebase guide is binding, and these patterns are deliberately shaped to honour it. When real partial-success workflows start sprawling, the temptation to wrap a try inside a try is exactly where cancellation and classification bugs leak in.
The test file tests/test_exception_groups.py pins nine behaviours. Four exercise run_with_except_star — all-jobs-succeed, lone-tolerable, lone-fatal, and a mixed batch where tolerable and fatal arrive together. Three pin partition_exception_group in isolation — a mixed group splits correctly, an all-tolerable group returns remainder=None, and an all-fatal group returns tolerable=None. The remaining two drive collect_with_selective_reraise through the two interesting cases: only-tolerable returns None, and a mix returns the fatal remainder while still populating both buckets.
async def test_except_star_dispatches_mixed_subgroups_separately():
jobs = [
PartialJob("warn1", 0.0, TolerableError("retry")),
PartialJob("warn2", 0.0, TolerableError("retry2")),
PartialJob("dead", 0.0, FatalError("boom")),
]
report = await run_with_except_star(jobs)
assert "boom" in report.fatal
assert any(msg in report.tolerated for msg in ("retry", "retry2"))
Read that test as the partial-success contract in one screen. Three jobs raise — two tolerable, one fatal — the except* dispatch routes them into separate buckets on the same PartialReport, and the assertion language is purely about the buckets, not about the exception group shape. The complementary collect_with_selective_reraise test makes the same assertions while also pinning the returned-remainder behaviour: when fatal failures exist, the caller receives a BaseExceptionGroup containing exactly those fatals, with the tolerable subgroup already harvested into the report.
Verification
Run the full test suite from the codebase root:
python -m pytest
....................................... [100%]
39 passed in 0.80s
Thirty-nine green tests — three from the step 1 fetcher, five from the step 2 gather baseline, five from the step 3 taskgroup baseline, six from the step 4 side-by-side harness, eleven from the step 5 cancellation suite, and the nine new tests added here. The new tests collectively cover the failure taxonomy, the except* dispatch runner, the partition_exception_group helper in isolation, and the selective-re-raise runner — enough surface area to keep the partial-success contract regression-proof as later steps build on top.
What we built
A two-class failure taxonomy (TolerableError, FatalError) plus a PartialReport shape with three buckets — succeeded, tolerated, fatal. The taxonomy is deliberately minimal: two empty subclasses is the smallest classifier that except* can dispatch on, and the three-bucket report is the smallest contract a partial-success caller needs to consume. Anything richer can be layered on top without changing the dispatch logic.
Two complementary runner shapes that produce that report. run_with_except_star is the Python-3.11-native form — declarative, one try plus two except* arms, no manual group manipulation. collect_with_selective_reraise is the manual form — one except BaseExceptionGroup, one split call, and the unhandled remainder returned to the caller so it can decide whether to raise. Both runners pass the same nine-test suite by populating the same PartialReport shape.
A reusable _record_messages helper that flattens arbitrarily nested BaseExceptionGroup trees into a flat sink list. That helper is the answer to the "what if the group contains subgroups?" question — recurse on the subgroup, append the leaves, never lose a message. Combined with the flat-control-flow discipline from earlier steps, it lets the runners stay at one level of try/except even when the underlying group structure is several levels deep.
The deliberate omissions are scope-bounded: no retries, no exponential backoff, no integration with the cleanup primitives from step 5, no shielded re-raise. Those belong to a richer partial-success framework. What we have here is the dispatch core — small enough to read in one screen, executable as a passing test, and shaped so future layers can wrap it without rewriting the contract.
Repository
The state of the code after this step: 3e6ccf4
Step 7: Refactoring the Concurrent HTTP Fetcher Into a Bounded TaskGroup With Retries, Backoff, and a Strict Variant
Step 6 nailed down the partial-success vocabulary: TolerableError, FatalError, PartialReport, and two complementary dispatch shapes that route an ExceptionGroup into buckets. That work was deliberately scenario-driven — fake jobs, deterministic sleeps, hand-raised exceptions — so the dispatch contract could be pinned without HTTP, timeouts, or retries in the way. Now we fold all of it back into a real workload.
This step rewrites the toy fetch_url from step 1 into a concurrent fetcher fit for production-shaped traffic. We add a RetryPolicy with geometric backoff, classify HTTP and transport errors to decide what is worth retrying, run every URL on its own retry loop, and bound total concurrency with an asyncio.Semaphore nested inside an asyncio.TaskGroup. The same core powers two public entry points — a collecting fetch_all_with_taskgroup_bounded that returns one FetchOutcome per URL, and a strict fetch_all_taskgroup_strict that raises an ExceptionGroup summarising every failure — so partial-success and all-or-nothing callers share the same retry machinery.
Setup
No new third-party dependencies — httpx is already in the project from step 1 and is the only network library we touch. We add one module under src/asyncfetch/, one matching test file under tests/, and extend __init__.py so the bounded-retry primitives are reachable from the package root:
codebase/
├── src/
│ └── asyncfetch/
│ ├── __init__.py # re-export the new retry + bounded runners
│ ├── fetcher.py # unchanged
│ ├── gather_baseline.py # unchanged
│ ├── taskgroup_baseline.py # unchanged
│ ├── error_compare.py # unchanged
│ ├── cancellation.py # unchanged
│ ├── exception_groups.py # unchanged
│ └── bounded_retry.py # NEW
└── tests/
├── conftest.py # unchanged — provides make_client
├── test_fetcher.py # unchanged
├── test_gather_baseline.py # unchanged
├── test_taskgroup_baseline.py # unchanged
├── test_error_compare.py # unchanged
├── test_cancellation.py # unchanged
├── test_exception_groups.py # unchanged
└── test_bounded_retry.py # NEW
The package-level re-export keeps imports flat — callers reach for the bounded runner without knowing it lives in bounded_retry:
from asyncfetch import (
ConcurrencyMeter,
FetchOutcome,
RetryPolicy,
classify_error,
fetch_all_taskgroup_strict,
fetch_all_with_gather_retry,
fetch_all_with_taskgroup_bounded,
is_retriable_status,
)
Implementation
Retries live behind a RetryPolicy dataclass. Three knobs — max_attempts, base_delay, multiplier — cover the common shapes (no-delay tests, geometric production backoff, single-shot exhaustion). The backoff(attempt) method returns base_delay * multiplier ** (attempt - 1), so attempt 1 uses the base delay, attempt 2 doubles it, attempt 3 quadruples it. Keeping the math on the policy means the runner never reaches into magic numbers.
@dataclass(frozen=True, slots=True)
class RetryPolicy:
max_attempts: int = 3
base_delay: float = 0.0
multiplier: float = 2.0
def backoff(self, attempt: int) -> float:
return self.base_delay * (self.multiplier ** (attempt - 1))
Per-URL state collapses into a FetchOutcome — a URL, an optional FetchResult, an optional terminal exception, and the attempt count when we stopped. Exactly one of result and error is populated when the runner returns. That two-field shape is the contract every collecting runner in this codebase exposes; the strict runner reads the same field to decide whether to raise.
@dataclass(slots=True)
class FetchOutcome:
url: str
result: FetchResult | None = None
error: BaseException | None = None
attempts: int = 0
Concurrency is a separate observability concern. ConcurrencyMeter is a tiny instrument: each call to enter() increments in_flight, updates peak, and appends the current count to timeline; each call to exit() decrements in_flight. The runner threads an optional meter into the bounded variant so tests can assert that the semaphore actually capped peak concurrency, not merely the count of completed tasks.
@dataclass(slots=True)
class ConcurrencyMeter:
in_flight: int = 0
peak: int = 0
timeline: list[int] = field(default_factory=list)
def enter(self) -> None:
self.in_flight += 1
if self.in_flight > self.peak:
self.peak = self.in_flight
self.timeline.append(self.in_flight)
def exit(self) -> None:
self.in_flight -= 1
The error classifier is two small functions instead of one nested predicate. is_retriable_status reads as the rule it encodes — "5xx and 429 retry, everything else is terminal." classify_error dispatches on exception type and delegates the status decision to is_retriable_status. Transport-level failures (DNS, connection reset, read timeout) are always retriable; HTTP errors retry only on the explicitly retriable status codes; anything else is terminal.
def is_retriable_status(status: int) -> bool:
return status >= 500 or status == 429
def classify_error(exc: BaseException) -> bool:
if isinstance(exc, httpx.HTTPStatusError):
return is_retriable_status(exc.response.status_code)
if isinstance(exc, httpx.TransportError):
return True
return False
The retry loop is structured so the codebase's "no nested try/except" rule stays binding. _attempt_once does the raw work — one client.get, one raise_for_status, one FetchResult. _try_attempt wraps that in a single try/except and returns either the result or the captured exception as a normal value. _fetch_with_retry then loops without any try at all — it inspects the value, decides whether to keep going, sleeps according to the policy, and returns the final FetchOutcome on success, terminal failure, or exhaustion.
async def _attempt_once(client: httpx.AsyncClient, url: str) -> FetchResult:
response = await client.get(url)
response.raise_for_status()
return FetchResult(url=url, status=response.status_code, body=response.text)
async def _try_attempt(
client: httpx.AsyncClient,
url: str,
) -> FetchResult | BaseException:
try:
return await _attempt_once(client, url)
except (httpx.HTTPStatusError, httpx.TransportError) as exc:
return exc
async def _fetch_with_retry(
client: httpx.AsyncClient,
url: str,
policy: RetryPolicy,
) -> FetchOutcome:
outcome = FetchOutcome(url=url)
for attempt in range(1, policy.max_attempts + 1):
outcome.attempts = attempt
result_or_error = await _try_attempt(client, url)
if isinstance(result_or_error, FetchResult):
outcome.result = result_or_error
outcome.error = None
return outcome
outcome.error = result_or_error
if not classify_error(result_or_error):
return outcome
if attempt == policy.max_attempts:
return outcome
await asyncio.sleep(policy.backoff(attempt))
return outcome
The gather-based runner is the smallest possible adapter — wrap each URL in a _fetch_with_retry coroutine, hand them to asyncio.gather, return the list. There is no return_exceptions=True because each retry loop already absorbs and records its terminal exception inside the FetchOutcome. That is the whole point of the dataclass: pushing exception capture down into the per-URL loop means the orchestrator only ever sees normal values.
async def fetch_all_with_gather_retry(
client: httpx.AsyncClient,
urls: Iterable[str],
policy: RetryPolicy,
) -> list[FetchOutcome]:
coros = [_fetch_with_retry(client, url, policy) for url in urls]
return list(await asyncio.gather(*coros))
The bounded TaskGroup runner is where the structured-concurrency story pays off. We allocate one asyncio.Semaphore(max_concurrency), then inside the TaskGroup spawn one _bounded_attempt per URL. _bounded_attempt enters the semaphore, optionally bumps the meter, runs the per-URL retry loop, decrements the meter, and exits the semaphore — all in a single straight-line function with one async with and one optional metric branch. Cancellation, cleanup, and shutdown are inherited from TaskGroup; the semaphore enforces the concurrency cap.
async def _bounded_attempt(
client: httpx.AsyncClient,
url: str,
policy: RetryPolicy,
semaphore: asyncio.Semaphore,
meter: ConcurrencyMeter | None,
) -> FetchOutcome:
async with semaphore:
if meter is not None:
meter.enter()
outcome = await _fetch_with_retry(client, url, policy)
if meter is not None:
meter.exit()
return outcome
async def fetch_all_with_taskgroup_bounded(
client: httpx.AsyncClient,
urls: Iterable[str],
policy: RetryPolicy,
*,
max_concurrency: int = 4,
meter: ConcurrencyMeter | None = None,
) -> list[FetchOutcome]:
url_list = list(urls)
semaphore = asyncio.Semaphore(max_concurrency)
async with asyncio.TaskGroup() as tg:
tasks = [
tg.create_task(_bounded_attempt(client, url, policy, semaphore, meter))
for url in url_list
]
return [task.result() for task in tasks]
The strict variant is a thin filter on top. It runs the bounded collecting variant, inspects the outcomes for any with a missing result, and — if there is at least one failure — packages every failure exception into a single ExceptionGroup. Callers who want all-or-nothing semantics get them in three lines. Callers who want partial success keep using fetch_all_with_taskgroup_bounded directly. One retry engine, two contracts.
async def fetch_all_taskgroup_strict(
client: httpx.AsyncClient,
urls: Iterable[str],
policy: RetryPolicy,
*,
max_concurrency: int = 4,
) -> list[FetchResult]:
outcomes = await fetch_all_with_taskgroup_bounded(
client,
urls,
policy,
max_concurrency=max_concurrency,
)
failures = [o for o in outcomes if o.result is None]
if failures:
raise ExceptionGroup(
"fetch_all_taskgroup_strict",
[_failure_or_default(o) for o in failures],
)
return [o.result for o in outcomes if o.result is not None]
Tests use the make_client fixture from step 1's conftest.py, which wires httpx.MockTransport into an httpx.AsyncClient so handlers can simulate arbitrary HTTP behaviour without a network. A _flaky_handler helper consumes a per-path status schedule (e.g. {"/a": [503, 503, 200]}) and advances a per-path counter on every request, so the third call to /a returns 200 while the first two return 503 — exactly the shape needed to pin retry behaviour deterministically.
async def test_taskgroup_bounded_respects_max_concurrency(make_client):
schedule = {
"/a": [200], "/b": [200], "/c": [200],
"/d": [200], "/e": [200], "/f": [200],
}
handler = _flaky_handler(schedule)
policy = RetryPolicy(max_attempts=1, base_delay=0.0)
meter = ConcurrencyMeter()
async with make_client(handler) as client:
outcomes = await fetch_all_with_taskgroup_bounded(
client,
["/a", "/b", "/c", "/d", "/e", "/f"],
policy,
max_concurrency=2,
meter=meter,
)
assert len(outcomes) == 6
assert meter.peak <= 2
assert all(o.result is not None for o in outcomes)
That single test is the load-bearing assertion for the whole step: six URLs offered, concurrency cap of two, peak observed concurrency never exceeded two, every URL succeeded. The meter is what makes "the semaphore actually works" a falsifiable statement instead of an article of faith.
Verification
Run the full test suite from the codebase root:
python -m pytest
................................................... [100%]
51 passed in 1.00s
Fifty-one green tests — three from the step 1 fetcher baseline, five from the step 2 gather baseline, five from the step 3 TaskGroup baseline, six from the step 4 side-by-side harness, eleven from the step 5 cancellation suite, nine from the step 6 partial-success runners, and the twelve new tests added here for retry classification, gather-based retries, the bounded TaskGroup runner, the strict variant, and the supporting dataclasses. Every behaviour shipped so far stays pinned; the new retry surface gets its own dedicated assertions instead of riding on shared scaffolding.
What we built
A retry-aware concurrent HTTP fetcher with a sharp separation between policy, classification, per-URL loop, and orchestration. RetryPolicy owns the timing math; is_retriable_status and classify_error own the retry decision; _fetch_with_retry owns the per-URL state machine; and the two public runners own the concurrency shape. Each component is small enough to read in one screen and replaceable in isolation — swapping geometric backoff for jittered backoff means editing one method on RetryPolicy, not rewriting the runner.
A ConcurrencyMeter observability primitive that promotes "max concurrency = N" from intent to a checkable invariant. The bounded runner threads an optional meter through the semaphore-guarded section, so tests can assert peak in-flight count never exceeded the cap. The meter is opt-in — callers that do not pass one pay no cost beyond a single is None check inside the bounded attempt.
Two public entry points that share one retry engine. fetch_all_with_taskgroup_bounded returns one FetchOutcome per URL — succeeded URLs carry a FetchResult, failed URLs carry the terminal exception and the attempt count, and the call site decides what to do with each. fetch_all_taskgroup_strict reuses the same machinery and raises an ExceptionGroup if any URL failed, so callers wanting an all-or-nothing contract get one without rewriting the underlying loop.
The deliberate omissions are scope-bounded. There is no jitter on the backoff, no Retry-After header awareness, no circuit breaker, no per-host rate limit, no streaming response support. Those belong to a richer fetcher and are easy to layer on top of the contracts shipped here — RetryPolicy is the natural place for jitter, classify_error is the natural place for Retry-After, and the semaphore can be parameterised per-host without disturbing the rest of the runner.
Repository
The state of the code after this step: 2750924
Repository
Full source at https://github.com/vytharion/asyncio-taskgroup-vs-gather-error-handling.
Walk the lessons by stepping through the git commits in the repo — each major step has its own commit you can git checkout and rerun.