Skip to content

gh-134173: Optimize concurrent.futures→asyncio state transfer with atomic snapshot #134174

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 9 commits into from
May 18, 2025

Follow Lee on X/Twitter - Father, Husband, Serial builder creating AI, crypto, games & web tools. We are friends :) AI Will Come To Life!

Check out: eBank.nz (Art Generator) | Netwrck.com (AI Tools) | Text-Generator.io (AI API) | BitBank.nz (Crypto AI) | ReadingTime (Kids Reading) | RewordGame | BigMultiplayerChess | WebFiddle | How.nz | Helix AI Assistant

Conversation

bdraco
Copy link
Contributor

@bdraco bdraco commented May 18, 2025

👋 from PyCon @ Pittsburgh

This PR significantly improves performance when transferring future state from concurrent.futures.Future to asyncio.Future, a common operation when dispatching executor jobs in asyncio applications.

The current _copy_future_state implementation requires multiple method calls and lock acquisitions to retrieve the source future's state:

  1. done() - acquires lock to check state
  2. cancelled() - acquires lock again
  3. exception() - acquires lock to get exception
  4. result() - acquires lock to get result

Each method call involves thread synchronization overhead, making this operation a bottleneck for high-frequency executor dispatches.

Our use case involves dispatching a large number of small executor jobs from asyncio to a thread pool. These jobs typically involve open or stat on files that are already cached by the OS, so the actual I/O returns almost instantly. However, we still have to offload them to avoid blocking the event loop, since there's no reliable way to determine in advance whether a read will hit the cache.

As a result, the majority of the overhead isn't from the I/O itself, but from the cost of scheduling. Most of the time is spent copying future state, which involves locking. This PR reduces that overhead, which has a meaningful impact at scale.

Add a new _get_snapshot() method to concurrent.futures.Future that atomically retrieves all state information in a single lock acquisition:

  • Returns tuple: (done, cancelled, result, exception)
  • Uses optimized fast path for already-finished futures (no lock needed)
  • Provides atomic state capture for other states

The _copy_future_state function in asyncio now uses this snapshot method to retrieve the state from the concurrent.future.Future

Benchmark results show dramatic improvements for the common case:

This optimization particularly benefits applications that:

  • Dispatch many small executor jobs (e.g., filesystem operations, DNS lookups)

  • Use thread pools for I/O-bound operations in asyncio

  • Have high frequency of executor task completion

  • Adds _get_snapshot() to concurrent.futures.Future for atomic state retrieval

  • Updates _copy_future_state() to use the snapshot method

  • Maintains full backwards compatibility with existing code

  • Minimal code changes with focused optimization

These show consistent 4.4x+ speedup for the critical concurrent.futures→asyncio path.

=== 1. Benchmarking concurrent.futures -> asyncio ===
Writing benchmark scripts...

=== Benchmarking concurrent.futures -> asyncio ===
Running original...
concurrent_to_asyncio: Mean +- std dev: 977 ns +- 13 ns
Running optimized...
concurrent_to_asyncio: Mean +- std dev: 222 ns +- 3 ns

Comparison:
Mean +- std dev: [concurrent_original] 977 ns +- 13 ns -> [concurrent_optimized] 222 ns +- 3 ns: 4.40x faster

Cleaning up...
import pyperf
import concurrent.futures
import asyncio
import subprocess
import os
import sys

def write_benchmark_scripts():
    """Write individual benchmark scripts for each scenario."""

    # Common helper code
    common_imports = '''
import pyperf
import concurrent.futures
import asyncio

def _convert_future_exc(exc):
    exc_class = type(exc)
    if exc_class is concurrent.futures.CancelledError:
        return asyncio.CancelledError(*exc.args)
    elif exc_class is concurrent.futures.TimeoutError:
        return asyncio.TimeoutError(*exc.args)
    elif exc_class is concurrent.futures.InvalidStateError:
        return asyncio.InvalidStateError(*exc.args)
    else:
        return exc
'''

    # Optimization patch code
    optimization_patch = '''
FINISHED = concurrent.futures._base.FINISHED
CANCELLED = concurrent.futures._base.CANCELLED
CANCELLED_AND_NOTIFIED = concurrent.futures._base.CANCELLED_AND_NOTIFIED

def _get_snapshot_implementation(self):
    """Get a snapshot of the future's current state."""
    # Fast path: check if already finished without lock
    if self._state == FINISHED:
        return True, False, self._result, self._exception

    # Need lock for other states since they can change
    with self._condition:
        if self._state == FINISHED:
            return True, False, self._result, self._exception
        if self._state in {CANCELLED, CANCELLED_AND_NOTIFIED}:
            return True, True, None, None
        return False, False, None, None

concurrent.futures.Future._get_snapshot = _get_snapshot_implementation
'''

    # Original copy implementation (for concurrent.futures.Future)
    original_copy = '''
def copy_future_original(source, dest):
    """Original implementation using individual method calls."""
    if dest.cancelled():
        return

    if hasattr(source, 'done'):
        assert source.done()

    if source.cancelled():
        dest.cancel()
    else:
        exception = source.exception()
        if exception is not None:
            dest.set_exception(_convert_future_exc(exception))
        else:
            result = source.result()
            dest.set_result(result)
'''

    # Optimized copy implementation (matches current code)
    optimized_copy = '''
def copy_future_optimized(source, dest):
    """Optimized implementation using _get_snapshot."""
    if dest.cancelled():
        return
    assert not dest.done()
    done, cancelled, result, exception = source._get_snapshot()
    assert done
    if cancelled:
        dest.cancel()
    elif exception is not None:
        dest.set_exception(_convert_future_exc(exception))
    else:
        dest.set_result(result)
'''

    # 1. concurrent.futures -> asyncio (original)
    with open('bench_concurrent_to_asyncio_original.py', 'w') as f:
        f.write(common_imports + original_copy + '''
source = concurrent.futures.Future()
source.set_result(42)
loop = asyncio.new_event_loop()

def task():
    """Single copy operation benchmark."""
    dest = asyncio.Future(loop=loop)
    copy_future_original(source, dest)
    dest.cancel()

runner = pyperf.Runner()
runner.bench_func('concurrent_to_asyncio', task)
''')

    # 2. concurrent.futures -> asyncio (optimized)
    with open('bench_concurrent_to_asyncio_optimized.py', 'w') as f:
        f.write(common_imports + optimization_patch + optimized_copy + '''
source = concurrent.futures.Future()
source.set_result(42)
loop = asyncio.new_event_loop()

def task():
    """Single copy operation benchmark."""
    dest = asyncio.Future(loop=loop)
    copy_future_optimized(source, dest)
    dest.cancel()

runner = pyperf.Runner()
runner.bench_func('concurrent_to_asyncio', task)
''')

def run_benchmarks():
    """Run all benchmarks and compare results."""
    print("Writing benchmark scripts...")
    write_benchmark_scripts()

    # Clean up old results
    for f in ['concurrent_original.json', 'concurrent_optimized.json']:
        if os.path.exists(f):
            os.remove(f)

    print("\n=== Benchmarking concurrent.futures -> asyncio ===")
    print("Running original...")
    subprocess.run([sys.executable, 'bench_concurrent_to_asyncio_original.py',
                   '-o', 'concurrent_original.json', '--quiet'])

    print("Running optimized...")
    subprocess.run([sys.executable, 'bench_concurrent_to_asyncio_optimized.py',
                   '-o', 'concurrent_optimized.json', '--quiet'])

    print("\nComparison:")
    subprocess.run([sys.executable, '-m', 'pyperf', 'compare_to',
                   'concurrent_original.json', 'concurrent_optimized.json'])

    # Clean up
    print("\nCleaning up...")
    for f in ['bench_concurrent_to_asyncio_original.py',
              'bench_concurrent_to_asyncio_optimized.py']:
        if os.path.exists(f):
            os.remove(f)

    print("\n=== Summary ===")
    print("concurrent.futures -> asyncio: Should show significant speedup with _get_snapshot")

if __name__ == "__main__":
    run_benchmarks()

This PR significantly improves performance when transferring future state from `concurrent.futures.Future` to `asyncio.Future`, a common operation when dispatching executor jobs in asyncio applications.

The current `_copy_future_state` implementation requires multiple method calls and lock acquisitions to retrieve the source future's state:
1. `done()` - acquires lock to check state
2. `cancelled()` - acquires lock again
3. `exception()` - acquires lock to get exception
4. `result()` - acquires lock to get result

Each method call involves thread synchronization overhead, making this operation a bottleneck for high-frequency executor dispatches.

Our use case involves dispatching a large number of small executor jobs from `asyncio` to a thread pool. These jobs typically involve `open` or `stat` on files that are already cached by the OS, so the actual I/O returns almost instantly. However, we still have to offload them to avoid blocking the event loop, since there's no reliable way to determine in advance whether a read will hit the cache.

As a result, the majority of the overhead isn't from the I/O itself, but from the cost of scheduling. Most of the time is spent copying future state, which involves locking. This PR reduces that overhead, which has a meaningful impact at scale.

Add a new `_get_snapshot()` method to `concurrent.futures.Future` that atomically retrieves all state information in a single lock acquisition:
- Returns tuple: `(done, cancelled, result, exception)`
- Uses optimized fast path for already-finished futures (no lock needed)
- Provides atomic state capture for other states

The `_copy_future_state` function in `asyncio` now uses this snapshot method when available, falling back to the traditional approach for backwards compatibility.

Benchmark results show dramatic improvements for the common case:
- **concurrent.futures→asyncio transfer: 4.12x faster**
- asyncio→asyncio transfer: Slightly slower (1.05x) due to hasattr check (I couldn't find any places where this actually happens though as it looks like `_chain_future` the only entry point to `_copy_future_state` and it is always called with `concurrent.futures.Future`)

This optimization particularly benefits applications that:
- Dispatch many small executor jobs (e.g., filesystem operations, DNS lookups)
- Use thread pools for I/O-bound operations in asyncio
- Have high frequency of executor task completion

- Adds `_get_snapshot()` to `concurrent.futures.Future` for atomic state retrieval
- Updates `_copy_future_state()` to prefer snapshot method when available
- Maintains full backwards compatibility with existing code
- Minimal code changes with focused optimization

These show consistent 4x+ speedup for the critical concurrent.futures→asyncio path.
```
=== 1. Benchmarking concurrent.futures -> asyncio ===
Running original...
concurrent_to_asyncio: Mean +- std dev: 986 ns +- 16 ns
Running optimized...
concurrent_to_asyncio: Mean +- std dev: 239 ns +- 4 ns

Comparison:
Mean +- std dev: [concurrent_original] 986 ns +- 16 ns -> [concurrent_optimized] 239 ns +- 4 ns: 4.12x faster

=== 2. Benchmarking asyncio -> asyncio ===
Running original...
asyncio_to_asyncio: Mean +- std dev: 221 ns +- 4 ns
Running optimized...
asyncio_to_asyncio: Mean +- std dev: 232 ns +- 4 ns

Comparison:
Mean +- std dev: [asyncio_original] 221 ns +- 4 ns -> [asyncio_optimized] 232 ns +- 4 ns: 1.05x slower

Cleaning up...
```

```python
import pyperf
import concurrent.futures
import asyncio
import subprocess
import os
import sys

def write_benchmark_scripts():
    """Write individual benchmark scripts for each scenario."""

    # Common helper code
    common_imports = '''
import pyperf
import concurrent.futures
import asyncio

def _convert_future_exc(exc):
    exc_class = type(exc)
    if exc_class is concurrent.futures.CancelledError:
        return asyncio.CancelledError(*exc.args)
    elif exc_class is concurrent.futures.TimeoutError:
        return asyncio.TimeoutError(*exc.args)
    elif exc_class is concurrent.futures.InvalidStateError:
        return asyncio.InvalidStateError(*exc.args)
    else:
        return exc
'''

    # Optimization patch code
    optimization_patch = '''
FINISHED = concurrent.futures._base.FINISHED
CANCELLED = concurrent.futures._base.CANCELLED
CANCELLED_AND_NOTIFIED = concurrent.futures._base.CANCELLED_AND_NOTIFIED

def _get_snapshot_implementation(self):
    """Get a snapshot of the future's current state."""
    # Fast path: check if already finished without lock
    if self._state == FINISHED:
        return True, False, self._result, self._exception

    # Need lock for other states since they can change
    with self._condition:
        if self._state == FINISHED:
            return True, False, self._result, self._exception
        if self._state in {CANCELLED, CANCELLED_AND_NOTIFIED}:
            return True, True, None, None
        return False, False, None, None

concurrent.futures.Future._get_snapshot = _get_snapshot_implementation
'''

    # Original copy implementation
    original_copy = '''
def copy_future_original(source, dest):
    """Original implementation using individual method calls."""
    if dest.cancelled():
        return

    if hasattr(source, 'done'):
        assert source.done()

    if source.cancelled():
        dest.cancel()
    else:
        exception = source.exception()
        if exception is not None:
            dest.set_exception(_convert_future_exc(exception))
        else:
            result = source.result()
            dest.set_result(result)
'''

    # Optimized copy implementation
    optimized_copy = '''
def copy_future_optimized(source, dest):
    """Optimized implementation using _get_snapshot when available."""
    if dest.cancelled():
        return

    # Use _get_snapshot for futures that support it
    if hasattr(source, '_get_snapshot'):
        done, cancelled, result, exception = source._get_snapshot()
        assert done
        if cancelled:
            dest.cancel()
        elif exception is not None:
            dest.set_exception(_convert_future_exc(exception))
        else:
            dest.set_result(result)
        return

    # Traditional fallback for asyncio.Future
    if hasattr(source, 'done'):
        assert source.done()

    if source.cancelled():
        dest.cancel()
    else:
        exception = source.exception()
        if exception is not None:
            dest.set_exception(_convert_future_exc(exception))
        else:
            result = source.result()
            dest.set_result(result)
'''

    # 1. concurrent.futures -> asyncio (original)
    with open('bench_concurrent_to_asyncio_original.py', 'w') as f:
        f.write(common_imports + original_copy + '''
source = concurrent.futures.Future()
source.set_result(42)
loop = asyncio.new_event_loop()

def task():
    """Single copy operation benchmark."""
    dest = asyncio.Future(loop=loop)
    copy_future_original(source, dest)
    dest.cancel()

runner = pyperf.Runner()
runner.bench_func('concurrent_to_asyncio', task)
''')

    # 2. concurrent.futures -> asyncio (optimized)
    with open('bench_concurrent_to_asyncio_optimized.py', 'w') as f:
        f.write(common_imports + optimization_patch + optimized_copy + '''
source = concurrent.futures.Future()
source.set_result(42)
loop = asyncio.new_event_loop()

def task():
    """Single copy operation benchmark."""
    dest = asyncio.Future(loop=loop)
    copy_future_optimized(source, dest)
    dest.cancel()

runner = pyperf.Runner()
runner.bench_func('concurrent_to_asyncio', task)
''')

    # 3. asyncio -> asyncio (original)
    with open('bench_asyncio_to_asyncio_original.py', 'w') as f:
        f.write(common_imports + original_copy + '''
loop = asyncio.new_event_loop()
source = asyncio.Future(loop=loop)
source.set_result(42)

def task():
    """Single copy operation benchmark."""
    dest = asyncio.Future(loop=loop)
    copy_future_original(source, dest)
    dest.cancel()

runner = pyperf.Runner()
runner.bench_func('asyncio_to_asyncio', task)
''')

    # 4. asyncio -> asyncio (optimized - should use fallback)
    with open('bench_asyncio_to_asyncio_optimized.py', 'w') as f:
        f.write(common_imports + optimization_patch + optimized_copy + '''
loop = asyncio.new_event_loop()
source = asyncio.Future(loop=loop)
source.set_result(42)

def task():
    """Single copy operation benchmark."""
    dest = asyncio.Future(loop=loop)
    copy_future_optimized(source, dest)
    dest.cancel()

runner = pyperf.Runner()
runner.bench_func('asyncio_to_asyncio', task)
''')

def run_benchmarks():
    """Run all benchmarks and compare results."""
    print("Writing benchmark scripts...")
    write_benchmark_scripts()

    # Clean up old results
    for f in ['concurrent_original.json', 'concurrent_optimized.json',
              'asyncio_original.json', 'asyncio_optimized.json']:
        if os.path.exists(f):
            os.remove(f)

    print("\n=== 1. Benchmarking concurrent.futures -> asyncio ===")
    print("Running original...")
    subprocess.run([sys.executable, 'bench_concurrent_to_asyncio_original.py',
                   '-o', 'concurrent_original.json', '--quiet'])

    print("Running optimized...")
    subprocess.run([sys.executable, 'bench_concurrent_to_asyncio_optimized.py',
                   '-o', 'concurrent_optimized.json', '--quiet'])

    print("\nComparison:")
    subprocess.run([sys.executable, '-m', 'pyperf', 'compare_to',
                   'concurrent_original.json', 'concurrent_optimized.json'])

    print("\n=== 2. Benchmarking asyncio -> asyncio ===")
    print("Running original...")
    subprocess.run([sys.executable, 'bench_asyncio_to_asyncio_original.py',
                   '-o', 'asyncio_original.json', '--quiet'])

    print("Running optimized...")
    subprocess.run([sys.executable, 'bench_asyncio_to_asyncio_optimized.py',
                   '-o', 'asyncio_optimized.json', '--quiet'])

    print("\nComparison:")
    subprocess.run([sys.executable, '-m', 'pyperf', 'compare_to',
                   'asyncio_original.json', 'asyncio_optimized.json'])

    # Clean up
    print("\nCleaning up...")
    for f in ['bench_concurrent_to_asyncio_original.py',
              'bench_concurrent_to_asyncio_optimized.py',
              'bench_asyncio_to_asyncio_original.py',
              'bench_asyncio_to_asyncio_optimized.py']:
        if os.path.exists(f):
            os.remove(f)

    print("\n=== Summary ===")
    print("concurrent.futures -> asyncio: Should show significant speedup")
    print("asyncio -> asyncio: Should show no regression (fallback path)")

if __name__ == "__main__":
    run_benchmarks()
```
@bdraco bdraco changed the title GH-134173: Optimize concurrent.futures→asyncio state transfer with atomic snapshot gh-134173: Optimize concurrent.futures→asyncio state transfer with atomic snapshot May 18, 2025
kumaraditya303
kumaraditya303
kumaraditya303
@kumaraditya303 kumaraditya303 merged commit 53da1e8 into python:main May 18, 2025
43 checks passed
@bdraco
Copy link
Contributor Author

bdraco commented May 18, 2025

Thanks!

@bdraco bdraco deleted the thread_to_asyncio_slow branch May 18, 2025 17:29
bdraco added a commit to home-assistant/docker-base that referenced this pull request May 18, 2025
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

Follow Lee on X/Twitter - Father, Husband, Serial builder creating AI, crypto, games & web tools. We are friends :) AI Will Come To Life!

Check out: eBank.nz (Art Generator) | Netwrck.com (AI Tools) | Text-Generator.io (AI API) | BitBank.nz (Crypto AI) | ReadingTime (Kids Reading) | RewordGame | BigMultiplayerChess | WebFiddle | How.nz | Helix AI Assistant

2 participants

Follow Lee on X/Twitter - Father, Husband, Serial builder creating AI, crypto, games & web tools. We are friends :) AI Will Come To Life!

Check out: eBank.nz (Art Generator) | Netwrck.com (AI Tools) | Text-Generator.io (AI API) | BitBank.nz (Crypto AI) | ReadingTime (Kids Reading) | RewordGame | BigMultiplayerChess | WebFiddle | How.nz | Helix AI Assistant