From ce86c6ef43e497545124f48ee668a09da2f0a12a Mon Sep 17 00:00:00 2001 From: jsonbailey Date: Wed, 24 Jun 2026 17:10:46 -0500 Subject: [PATCH] fix: Correct pool ownership and close pooled connections synchronously _HttpClientImpl owned the wrong pool: the ownership flag was inverted in the 2023 ConnectStrategy refactor (c516f1f8), so close() cleared a caller-supplied pool instead of one it created. On urllib3 2.x, PoolManager.clear() no longer closes connections synchronously (sockets close at GC via weakref.finalize), so a caller's streaming connection lingered until garbage collection. - Restore conventional ownership: close only a pool we created (params.pool is None), matching the async client's existing behavior. - Close each HTTPConnectionPool synchronously before clear() so the TCP FIN is sent immediately instead of at GC. - Log close errors at debug instead of swallowing them. Adds regression tests covering both halves of the ownership contract. --- ld_eventsource/http.py | 12 ++++++- .../testing/test_http_connect_strategy.py | 33 +++++++++++++++++++ 2 files changed, 44 insertions(+), 1 deletion(-) diff --git a/ld_eventsource/http.py b/ld_eventsource/http.py index 1058993..2b94809 100644 --- a/ld_eventsource/http.py +++ b/ld_eventsource/http.py @@ -57,7 +57,7 @@ class _HttpClientImpl: def __init__(self, params: _HttpConnectParams, logger: Logger): self.__params = params self.__pool = params.pool or PoolManager() - self.__should_close_pool = params.pool is not None + self.__should_close_pool = params.pool is None self.__logger = logger def connect(self, last_event_id: Optional[str]) -> Tuple[Iterator[bytes], Callable, Dict[str, Any]]: @@ -125,4 +125,14 @@ def close(): def close(self): if self.__should_close_pool: + # Close pooled connections (sends the TCP FIN) before dropping the pool. + # PoolManager.clear() alone drops the pool dict without closing the + # underlying sockets, leaving the connection open until garbage collection. + for key in list(self.__pool.pools.keys()): + connection_pool = self.__pool.pools.get(key) + if connection_pool is not None: + try: + connection_pool.close() + except Exception: + self.__logger.debug("Error closing connection pool", exc_info=True) self.__pool.clear() diff --git a/ld_eventsource/testing/test_http_connect_strategy.py b/ld_eventsource/testing/test_http_connect_strategy.py index 64a5832..815dfec 100644 --- a/ld_eventsource/testing/test_http_connect_strategy.py +++ b/ld_eventsource/testing/test_http_connect_strategy.py @@ -1,5 +1,7 @@ import logging +from unittest import mock +from urllib3 import PoolManager from urllib3.exceptions import ProtocolError from ld_eventsource import * @@ -239,3 +241,34 @@ def test_fault_exposes_headers_from_http_error(): assert fault.headers is not None assert fault.headers.get('Retry-After') == '60' assert fault.headers.get('X-Error-Code') == 'SERVICE_UNAVAILABLE' + + +def test_close_leaves_caller_supplied_pool_open(): + # The caller owns the lifecycle of a pool it supplies, so close() must not + # tear it down. (Regression: this ownership flag was inverted, causing the + # client to clear() a caller-supplied pool out from under the caller.) + pool = mock.MagicMock(spec=PoolManager) + client = ConnectStrategy.http("http://test", pool=pool).create_client(logger()) + + client.close() + + pool.clear.assert_not_called() + + +def test_close_closes_pool_it_created(): + # When the client creates its own pool (no pool supplied), close() must close + # the pooled connections synchronously -- sending the TCP FIN now -- rather + # than leaving the sockets open until garbage collection. PoolManager.clear() + # alone does not close them on urllib3 2.x. + connection_pool = mock.Mock() + created_pool = mock.MagicMock() + created_pool.pools.keys.return_value = ['poolkey'] + created_pool.pools.get.return_value = connection_pool + + with mock.patch('ld_eventsource.http.PoolManager', return_value=created_pool): + client = ConnectStrategy.http("http://test").create_client(logger()) + + client.close() + + connection_pool.close.assert_called_once() + created_pool.clear.assert_called_once()