@@ -88,8 +88,18 @@ def handle_request(self, request: Request) -> Response:
8888 with Trace ("http2.send_connection_init" , request , kwargs ):
8989 self ._send_connection_init (** kwargs )
9090 self ._sent_connection_init = True
91- max_streams = self ._h2_state .local_settings .max_concurrent_streams
92- self ._max_streams_semaphore = Semaphore (max_streams )
91+
92+ # Initially start with just 1 until the remote server provides
93+ # its max_concurrent_streams value
94+ self ._max_streams = 1
95+
96+ local_settings_max_streams = (
97+ self ._h2_state .local_settings .max_concurrent_streams
98+ )
99+ self ._max_streams_semaphore = Semaphore (local_settings_max_streams )
100+
101+ for _ in range (local_settings_max_streams - self ._max_streams ):
102+ self ._max_streams_semaphore .acquire ()
93103
94104 self ._max_streams_semaphore .acquire ()
95105
@@ -280,6 +290,13 @@ def _receive_events(
280290 if stream_id is None or not self ._events .get (stream_id ):
281291 events = self ._read_incoming_data (request )
282292 for event in events :
293+ if isinstance (event , h2 .events .RemoteSettingsChanged ):
294+ with Trace (
295+ "http2.receive_remote_settings" , request
296+ ) as trace :
297+ self ._receive_remote_settings_change (event )
298+ trace .return_value = event
299+
283300 event_stream_id = getattr (event , "stream_id" , 0 )
284301
285302 # The ConnectionTerminatedEvent applies to the entire connection,
@@ -293,6 +310,23 @@ def _receive_events(
293310
294311 self ._write_outgoing_data (request )
295312
313+ def _receive_remote_settings_change (self , event : h2 .events .Event ) -> None :
314+ max_concurrent_streams = event .changed_settings .get (
315+ h2 .settings .SettingCodes .MAX_CONCURRENT_STREAMS
316+ )
317+ if max_concurrent_streams :
318+ new_max_streams = min (
319+ max_concurrent_streams .new_value ,
320+ self ._h2_state .local_settings .max_concurrent_streams ,
321+ )
322+ if new_max_streams and new_max_streams != self ._max_streams :
323+ while new_max_streams > self ._max_streams :
324+ self ._max_streams_semaphore .release ()
325+ self ._max_streams += 1
326+ while new_max_streams < self ._max_streams :
327+ self ._max_streams_semaphore .acquire ()
328+ self ._max_streams -= 1
329+
296330 def _response_closed (self , stream_id : int ) -> None :
297331 self ._max_streams_semaphore .release ()
298332 del self ._events [stream_id ]
0 commit comments