2020from concurrent .futures import ThreadPoolExecutor
2121from typing import TYPE_CHECKING , Dict , Optional , Tuple
2222
23+ from aws_advanced_python_wrapper .host_availability import HostAvailability
2324from aws_advanced_python_wrapper .hostinfo import HostInfo
2425from aws_advanced_python_wrapper .utils .atomic import AtomicReference
2526from aws_advanced_python_wrapper .utils .cache_map import CacheMap
2627from aws_advanced_python_wrapper .utils .messages import Messages
28+ from aws_advanced_python_wrapper .utils .rdsutils import RdsUtils
2729from aws_advanced_python_wrapper .utils .thread_safe_connection_holder import \
2830 ThreadSafeConnectionHolder
2931from aws_advanced_python_wrapper .utils .utils import LogUtils
@@ -87,6 +89,7 @@ def __init__(self, plugin_service: PluginService, topology_utils: TopologyUtils,
8789 self ._refresh_rate_nano = refresh_rate_nano
8890 self ._high_refresh_rate_nano = high_refresh_rate_nano
8991
92+ self ._rds_utils = RdsUtils ()
9093 self ._writer_host_info : AtomicReference [Optional [HostInfo ]] = AtomicReference (None )
9194 self ._monitoring_connection : ThreadSafeConnectionHolder = ThreadSafeConnectionHolder (None )
9295
@@ -300,16 +303,34 @@ def _open_any_connection_and_update_topology(self) -> Tuple[HostInfo, ...]:
300303 try :
301304 conn = self ._plugin_service .force_connect (self ._initial_host_info , self ._monitoring_properties )
302305 self ._monitoring_connection .set (conn , close_previous = False )
303- logger .debug ("ClusterTopologyMonitorImpl.OpenedMonitoringConnection" , self ._cluster_id , self ._initial_host_info .host )
306+ logger .debug ("ClusterTopologyMonitorImpl.OpenedMonitoringConnection" ,
307+ self ._cluster_id , self ._initial_host_info .host )
304308
305309 try :
306- writer_host = self ._topology_utils .get_writer_host_if_connected (
307- conn , self ._plugin_service .driver_dialect )
308- if writer_host :
310+ writer_id = self ._topology_utils .get_writer_id_if_connected (
311+ conn , self ._plugin_service .driver_dialect )
312+ if writer_id :
309313 self ._is_verified_writer_connection = True
310314 writer_verified_by_this_thread = True
311- self ._writer_host_info .set (HostInfo (writer_host , self ._initial_host_info .port ))
312- logger .debug ("ClusterTopologyMonitorImpl.WriterMonitoringConnection" , self ._cluster_id , writer_host )
315+
316+ if self ._rds_utils .is_rds_instance (self ._initial_host_info .host ):
317+ writer_host_info = self ._initial_host_info
318+ self ._writer_host_info .set (writer_host_info )
319+ else :
320+ writer_host = self ._instance_template .host .replace ("?" , writer_id )
321+ port = self ._instance_template .port \
322+ if self ._instance_template .is_port_specified () \
323+ else self ._initial_host_info .port
324+ writer_host_info = HostInfo (
325+ writer_host ,
326+ port ,
327+ HostRole .WRITER ,
328+ HostAvailability .AVAILABLE ,
329+ host_id = writer_id )
330+ self ._writer_host_info .set (writer_host_info )
331+
332+ logger .debug ("ClusterTopologyMonitorImpl.WriterMonitoringConnection" ,
333+ self ._cluster_id , writer_host_info .host )
313334 except Exception :
314335 pass
315336 except Exception :
@@ -321,7 +342,7 @@ def _open_any_connection_and_update_topology(self) -> Tuple[HostInfo, ...]:
321342 self ._ignore_new_topology_requests_end_time_nano = 0
322343 else :
323344 self ._ignore_new_topology_requests_end_time_nano = (
324- time .time_ns () + self .IGNORE_TOPOLOGY_REQUEST_NANO )
345+ time .time_ns () + self .IGNORE_TOPOLOGY_REQUEST_NANO )
325346
326347 if len (hosts ) == 0 :
327348 self ._monitoring_connection .clear ()
@@ -472,7 +493,7 @@ def __call__(self) -> None:
472493 if connection is not None :
473494 is_writer = False
474495 try :
475- is_writer = self ._monitor ._topology_utils .get_writer_host_if_connected (
496+ is_writer = self ._monitor ._topology_utils .get_writer_id_if_connected (
476497 connection , self ._monitor ._plugin_service .driver_dialect ) is not None
477498 except Exception :
478499 self ._monitor ._close_connection (connection )
0 commit comments