Skip to content

Commit ae8024a

Browse files
performance optimization (#1072)
1 parent 7dbd039 commit ae8024a

37 files changed

+581
-330
lines changed

aws_advanced_python_wrapper/aurora_connection_tracker_plugin.py

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@
3030

3131
from aws_advanced_python_wrapper.errors import FailoverError
3232
from aws_advanced_python_wrapper.hostinfo import HostInfo, HostRole
33+
from aws_advanced_python_wrapper.pep249_methods import DbApiMethod
3334
from aws_advanced_python_wrapper.plugin import Plugin, PluginFactory
3435
from aws_advanced_python_wrapper.utils.log import Logger
3536
from aws_advanced_python_wrapper.utils.rdsutils import RdsUtils
@@ -147,13 +148,12 @@ def _log_connection_set(self, host: str, conn_set: Optional[WeakSet]):
147148

148149

149150
class AuroraConnectionTrackerPlugin(Plugin):
150-
_SUBSCRIBED_METHODS: Set[str] = {"*"}
151151
_current_writer: Optional[HostInfo] = None
152152
_need_update_current_writer: bool = False
153153

154154
@property
155155
def subscribed_methods(self) -> Set[str]:
156-
return self._SUBSCRIBED_METHODS
156+
return self._subscribed_methods
157157

158158
def __init__(self,
159159
plugin_service: PluginService,
@@ -164,6 +164,11 @@ def __init__(self,
164164
self._props = props
165165
self._rds_utils = rds_utils
166166
self._tracker = tracker
167+
self._subscribed_methods: Set[str] = {DbApiMethod.CONNECT.method_name,
168+
DbApiMethod.CONNECTION_CLOSE.method_name,
169+
DbApiMethod.CONNECT.method_name,
170+
DbApiMethod.NOTIFY_HOST_LIST_CHANGED.method_name}
171+
self._subscribed_methods.update(self._plugin_service.network_bound_methods)
167172

168173
def connect(
169174
self,
@@ -210,5 +215,6 @@ def _get_writer(self, hosts: Tuple[HostInfo, ...]) -> Optional[HostInfo]:
210215

211216

212217
class AuroraConnectionTrackerPluginFactory(PluginFactory):
213-
def get_instance(self, plugin_service: PluginService, props: Properties) -> Plugin:
218+
@staticmethod
219+
def get_instance(plugin_service: PluginService, props: Properties) -> Plugin:
214220
return AuroraConnectionTrackerPlugin(plugin_service, props)

aws_advanced_python_wrapper/aurora_initial_connection_strategy_plugin.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -233,5 +233,6 @@ def _has_no_readers(self) -> bool:
233233

234234

235235
class AuroraInitialConnectionStrategyPluginFactory(PluginFactory):
236-
def get_instance(self, plugin_service: PluginService, props: Properties) -> Plugin:
236+
@staticmethod
237+
def get_instance(plugin_service: PluginService, props: Properties) -> Plugin:
237238
return AuroraInitialConnectionStrategyPlugin(plugin_service)

aws_advanced_python_wrapper/aws_secrets_manager_plugin.py

Lines changed: 11 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@
3232
from aws_advanced_python_wrapper.plugin_service import PluginService
3333

3434
from aws_advanced_python_wrapper.errors import AwsWrapperError
35+
from aws_advanced_python_wrapper.pep249_methods import DbApiMethod
3536
from aws_advanced_python_wrapper.plugin import Plugin, PluginFactory
3637
from aws_advanced_python_wrapper.utils.log import Logger
3738
from aws_advanced_python_wrapper.utils.messages import Messages
@@ -45,7 +46,7 @@
4546

4647

4748
class AwsSecretsManagerPlugin(Plugin):
48-
_SUBSCRIBED_METHODS: Set[str] = {"connect", "force_connect"}
49+
_SUBSCRIBED_METHODS: Set[str] = {DbApiMethod.CONNECT.method_name, DbApiMethod.FORCE_CONNECT.method_name}
4950

5051
_SECRETS_ARN_PATTERN = r"^arn:aws:secretsmanager:(?P<region>[^:\n]*):[^:\n]*:([^:/\n]*[:/])?(.*)$"
5152
_ONE_YEAR_IN_SECONDS = 60 * 60 * 24 * 365
@@ -136,7 +137,8 @@ def _update_secret(self, token_expiration_ns: int, force_refetch: bool = False)
136137
"""
137138
telemetry_factory = self._plugin_service.get_telemetry_factory()
138139
context = telemetry_factory.open_telemetry_context("fetch credentials", TelemetryTraceLevel.NESTED)
139-
self._fetch_credentials_counter.inc()
140+
if self._fetch_credentials_counter is not None:
141+
self._fetch_credentials_counter.inc()
140142

141143
try:
142144
fetched: bool = False
@@ -167,11 +169,13 @@ def _update_secret(self, token_expiration_ns: int, force_refetch: bool = False)
167169

168170
return fetched
169171
except Exception as ex:
170-
context.set_success(False)
171-
context.set_exception(ex)
172+
if context is not None:
173+
context.set_success(False)
174+
context.set_exception(ex)
172175
raise ex
173176
finally:
174-
context.close_context()
177+
if context is not None:
178+
context.close_context()
175179

176180
def _fetch_latest_credentials(self):
177181
"""
@@ -228,5 +232,6 @@ def _get_rds_region(self, secret_id: str, props: Properties) -> str:
228232

229233

230234
class AwsSecretsManagerPluginFactory(PluginFactory):
231-
def get_instance(self, plugin_service: PluginService, props: Properties) -> Plugin:
235+
@staticmethod
236+
def get_instance(plugin_service: PluginService, props: Properties) -> Plugin:
232237
return AwsSecretsManagerPlugin(plugin_service, props)

aws_advanced_python_wrapper/blue_green_plugin.py

Lines changed: 11 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,7 @@
4242
from aws_advanced_python_wrapper.host_availability import HostAvailability
4343
from aws_advanced_python_wrapper.hostinfo import HostInfo, HostRole
4444
from aws_advanced_python_wrapper.iam_plugin import IamAuthPlugin
45+
from aws_advanced_python_wrapper.pep249_methods import DbApiMethod
4546
from aws_advanced_python_wrapper.plugin import Plugin, PluginFactory
4647
from aws_advanced_python_wrapper.utils.atomic import AtomicInt
4748
from aws_advanced_python_wrapper.utils.concurrent import (ConcurrentDict,
@@ -471,7 +472,8 @@ def apply(
471472
"SuspendConnectRouting.SwitchoverCompleteContinueWithConnect",
472473
(time.time() - start_time_sec) * 1000))
473474
finally:
474-
telemetry_context.close_context()
475+
if telemetry_context is not None:
476+
telemetry_context.close_context()
475477

476478
# return None so that the next routing can attempt a connection
477479
return None
@@ -540,7 +542,8 @@ def apply(
540542
host_info.host,
541543
(time.time() - start_time_sec) * 1000))
542544
finally:
543-
telemetry_context.close_context()
545+
if telemetry_context is not None:
546+
telemetry_context.close_context()
544547

545548
# return None so that the next routing can attempt a connection
546549
return None
@@ -615,15 +618,16 @@ def apply(
615618
method_name,
616619
(time.time() - start_time_sec) * 1000))
617620
finally:
618-
telemetry_context.close_context()
621+
if telemetry_context is not None:
622+
telemetry_context.close_context()
619623

620624
# return empty so that the next routing can attempt a connection
621625
return ValueContainer.empty()
622626

623627

624628
class BlueGreenPlugin(Plugin):
625-
_SUBSCRIBED_METHODS: Set[str] = {"connect"}
626-
_CLOSE_METHODS: ClassVar[Set[str]] = {"Connection.close", "Cursor.close"}
629+
_SUBSCRIBED_METHODS: Set[str] = {DbApiMethod.CONNECT.method_name}
630+
_CLOSE_METHODS: ClassVar[Set[str]] = {DbApiMethod.CONNECTION_CLOSE.method_name, DbApiMethod.CURSOR_CLOSE.method_name}
627631
_status_providers: ClassVar[ConcurrentDict[str, BlueGreenStatusProvider]] = ConcurrentDict()
628632

629633
def __init__(self, plugin_service: PluginService, props: Properties):
@@ -779,7 +783,8 @@ def get_hold_time_ns(self) -> int:
779783

780784

781785
class BlueGreenPluginFactory(PluginFactory):
782-
def get_instance(self, plugin_service: PluginService, props: Properties) -> Plugin:
786+
@staticmethod
787+
def get_instance(plugin_service: PluginService, props: Properties) -> Plugin:
783788
return BlueGreenPlugin(plugin_service, props)
784789

785790

aws_advanced_python_wrapper/connect_time_plugin.py

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727
from time import perf_counter_ns
2828
from typing import Callable, Set
2929

30+
from aws_advanced_python_wrapper.pep249_methods import DbApiMethod
3031
from aws_advanced_python_wrapper.plugin import Plugin, PluginFactory
3132
from aws_advanced_python_wrapper.utils.messages import Messages
3233

@@ -42,7 +43,7 @@ def reset_connect_time():
4243

4344
@property
4445
def subscribed_methods(self) -> Set[str]:
45-
return {"connect", "force_connect"}
46+
return {DbApiMethod.CONNECT.method_name, DbApiMethod.FORCE_CONNECT.method_name}
4647

4748
def connect(
4849
self,
@@ -65,5 +66,6 @@ def connect(
6566

6667

6768
class ConnectTimePluginFactory(PluginFactory):
68-
def get_instance(self, plugin_service: PluginService, props: Properties) -> Plugin:
69+
@staticmethod
70+
def get_instance(plugin_service: PluginService, props: Properties) -> Plugin:
6971
return ConnectTimePlugin()

aws_advanced_python_wrapper/custom_endpoint_plugin.py

Lines changed: 10 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@
3737

3838
from boto3 import Session
3939

40+
from aws_advanced_python_wrapper.pep249_methods import DbApiMethod
4041
from aws_advanced_python_wrapper.plugin import Plugin, PluginFactory
4142
from aws_advanced_python_wrapper.utils.log import Logger
4243
from aws_advanced_python_wrapper.utils.properties import WrapperProperties
@@ -194,7 +195,9 @@ def _run(self):
194195
self._custom_endpoint_host_info.host,
195196
endpoint_info,
196197
CustomEndpointMonitor._CUSTOM_ENDPOINT_INFO_EXPIRATION_NS)
197-
self._info_changed_counter.inc()
198+
199+
if self._info_changed_counter is not None:
200+
self._info_changed_counter.inc()
198201

199202
elapsed_time = perf_counter_ns() - start_ns
200203
sleep_duration = max(0, self._refresh_rate_ns - elapsed_time)
@@ -228,7 +231,7 @@ class CustomEndpointPlugin(Plugin):
228231
A plugin that analyzes custom endpoints for custom endpoint information and custom endpoint changes, such as adding
229232
or removing an instance in the custom endpoint.
230233
"""
231-
_SUBSCRIBED_METHODS: ClassVar[Set[str]] = {"connect"}
234+
_SUBSCRIBED_METHODS: ClassVar[Set[str]] = {DbApiMethod.CONNECT.method_name}
232235
_CACHE_CLEANUP_RATE_NS: ClassVar[int] = 6 * 10 ^ 10 # 1 minute
233236
_monitors: ClassVar[SlidingExpirationCacheWithCleanupThread[str, CustomEndpointMonitor]] = \
234237
SlidingExpirationCacheWithCleanupThread(_CACHE_CLEANUP_RATE_NS,
@@ -250,7 +253,7 @@ def __init__(self, plugin_service: PluginService, props: Properties):
250253
self._custom_endpoint_host_info: Optional[HostInfo] = None
251254
self._custom_endpoint_id: Optional[str] = None
252255
telemetry_factory: TelemetryFactory = self._plugin_service.get_telemetry_factory()
253-
self._wait_for_info_counter: TelemetryCounter = telemetry_factory.create_counter("customEndpoint.waitForInfo.counter")
256+
self._wait_for_info_counter: TelemetryCounter | None = telemetry_factory.create_counter("customEndpoint.waitForInfo.counter")
254257

255258
CustomEndpointPlugin._SUBSCRIBED_METHODS.update(self._plugin_service.network_bound_methods)
256259

@@ -312,7 +315,8 @@ def _wait_for_info(self, monitor: CustomEndpointMonitor):
312315
if has_info:
313316
return
314317

315-
self._wait_for_info_counter.inc()
318+
if self._wait_for_info_counter is not None:
319+
self._wait_for_info_counter.inc()
316320
host_info = cast('HostInfo', self._custom_endpoint_host_info)
317321
hostname = host_info.host
318322
logger.debug("CustomEndpointPlugin.WaitingForCustomEndpointInfo", hostname, self._wait_for_info_timeout_ms)
@@ -343,5 +347,6 @@ def execute(self, target: type, method_name: str, execute_func: Callable, *args:
343347

344348

345349
class CustomEndpointPluginFactory(PluginFactory):
346-
def get_instance(self, plugin_service: PluginService, props: Properties) -> Plugin:
350+
@staticmethod
351+
def get_instance(plugin_service: PluginService, props: Properties) -> Plugin:
347352
return CustomEndpointPlugin(plugin_service, props)

aws_advanced_python_wrapper/default_plugin.py

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -31,15 +31,15 @@
3131
from aws_advanced_python_wrapper.errors import AwsWrapperError
3232
from aws_advanced_python_wrapper.host_availability import HostAvailability
3333
from aws_advanced_python_wrapper.hostinfo import HostInfo, HostRole
34+
from aws_advanced_python_wrapper.pep249_methods import DbApiMethod
3435
from aws_advanced_python_wrapper.plugin import Plugin
3536
from aws_advanced_python_wrapper.utils.messages import Messages
3637
from aws_advanced_python_wrapper.utils.telemetry.telemetry import \
3738
TelemetryTraceLevel
3839

3940

4041
class DefaultPlugin(Plugin):
41-
_SUBSCRIBED_METHODS: Set[str] = {"*"}
42-
_CLOSE_METHOD = "Connection.close"
42+
_SUBSCRIBED_METHODS: Set[str] = {DbApiMethod.ALL.method_name}
4343

4444
def __init__(self, plugin_service: PluginService, connection_provider_manager: ConnectionProviderManager):
4545
self._plugin_service: PluginService = plugin_service
@@ -74,7 +74,8 @@ def _connect(
7474
database_dialect = self._plugin_service.database_dialect
7575
conn = conn_provider.connect(target_func, driver_dialect, database_dialect, host_info, props)
7676
finally:
77-
context.close_context()
77+
if context is not None:
78+
context.close_context()
7879

7980
self._plugin_service.set_availability(host_info.all_aliases, HostAvailability.AVAILABLE)
8081
self._plugin_service.update_driver_dialect(conn_provider)
@@ -106,9 +107,10 @@ def execute(self, target: object, method_name: str, execute_func: Callable, *arg
106107
try:
107108
result = self._plugin_service.driver_dialect.execute(method_name, execute_func, *args, **kwargs)
108109
finally:
109-
context.close_context()
110+
if context is not None:
111+
context.close_context()
110112

111-
if method_name != DefaultPlugin._CLOSE_METHOD and self._plugin_service.current_connection is not None:
113+
if method_name != DbApiMethod.CONNECTION_CLOSE.method_name and self._plugin_service.current_connection is not None:
112114
self._plugin_service.update_in_transaction()
113115

114116
return result

aws_advanced_python_wrapper/developer_plugin.py

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
from aws_advanced_python_wrapper.utils.properties import Properties
2424
from aws_advanced_python_wrapper.hostinfo import HostInfo
2525

26+
from aws_advanced_python_wrapper.pep249_methods import DbApiMethod
2627
from aws_advanced_python_wrapper.plugin import Plugin, PluginFactory
2728
from aws_advanced_python_wrapper.utils.log import Logger
2829
from aws_advanced_python_wrapper.utils.messages import Messages
@@ -76,8 +77,7 @@ def set_method_callback(method_callback: Optional[ExceptionSimulatorMethodCallba
7677

7778

7879
class DeveloperPlugin(Plugin):
79-
_ALL_METHODS: str = "*"
80-
_SUBSCRIBED_METHODS: Set[str] = {_ALL_METHODS}
80+
_SUBSCRIBED_METHODS: Set[str] = {DbApiMethod.ALL.method_name}
8181

8282
@property
8383
def subscribed_methods(self) -> Set[str]:
@@ -90,7 +90,7 @@ def execute(self, target: type, method_name: str, execute_func: Callable, *args:
9090
def raise_method_exception_if_set(
9191
self, target: type, method_name: str, execute_func: Callable, *args: Any, **kwargs: Any) -> None:
9292
if ExceptionSimulatorManager.next_method_exception is not None:
93-
if DeveloperPlugin._ALL_METHODS == ExceptionSimulatorManager.next_method_name or \
93+
if DbApiMethod.ALL.method_name == ExceptionSimulatorManager.next_method_name or \
9494
method_name == ExceptionSimulatorManager.next_method_name:
9595
self.raise_exception_on_method(ExceptionSimulatorManager.next_method_exception, method_name)
9696
elif ExceptionSimulatorManager.method_callback is not None:
@@ -158,6 +158,6 @@ def raise_exception_on_connect(self, error: Optional[Exception]) -> None:
158158

159159

160160
class DeveloperPluginFactory(PluginFactory):
161-
162-
def get_instance(self, plugin_service: PluginService, props: Properties) -> Plugin:
161+
@staticmethod
162+
def get_instance(plugin_service: PluginService, props: Properties) -> Plugin:
163163
return DeveloperPlugin()

aws_advanced_python_wrapper/driver_dialect.py

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
from aws_advanced_python_wrapper.driver_dialect_codes import DriverDialectCodes
2727
from aws_advanced_python_wrapper.errors import (QueryTimeoutError,
2828
UnsupportedOperationError)
29+
from aws_advanced_python_wrapper.pep249_methods import DbApiMethod
2930
from aws_advanced_python_wrapper.thread_pool_container import \
3031
ThreadPoolContainer
3132
from aws_advanced_python_wrapper.utils.decorators import timeout
@@ -40,11 +41,10 @@ class DriverDialect(ABC):
4041
Driver dialects help the driver-agnostic AWS Python Driver interface with the driver-specific functionality of the underlying Python Driver.
4142
"""
4243
_QUERY = "SELECT 1"
43-
_ALL_METHODS = "*"
4444

4545
_executor_name: ClassVar[str] = "DriverDialectExecutor"
4646
_dialect_code: str = DriverDialectCodes.GENERIC
47-
_network_bound_methods: Set[str] = {_ALL_METHODS}
47+
_network_bound_methods: Set[str] = {DbApiMethod.ALL.method_name}
4848
_read_only: bool = False
4949
_autocommit: bool = False
5050
_driver_name: str = "Generic"
@@ -130,7 +130,7 @@ def execute(
130130
*args: Any,
131131
exec_timeout: Optional[float] = None,
132132
**kwargs: Any) -> Cursor:
133-
if DriverDialect._ALL_METHODS not in self.network_bound_methods and method_name not in self.network_bound_methods:
133+
if DbApiMethod.ALL.method_name not in self.network_bound_methods and method_name not in self.network_bound_methods:
134134
return exec_func()
135135

136136
if exec_timeout is None:
@@ -161,7 +161,7 @@ def ping(self, conn: Connection) -> bool:
161161
try:
162162
with conn.cursor() as cursor:
163163
query = DriverDialect._QUERY
164-
self.execute("Cursor.execute", lambda: cursor.execute(query), query, exec_timeout=10)
164+
self.execute(DbApiMethod.CURSOR_EXECUTE.method_name, lambda: cursor.execute(query), query, exec_timeout=10)
165165
cursor.fetchone()
166166
return True
167167
except Exception:

aws_advanced_python_wrapper/execute_time_plugin.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -54,5 +54,6 @@ def execute(self, target: type, method_name: str, execute_func: Callable, *args:
5454

5555

5656
class ExecuteTimePluginFactory(PluginFactory):
57-
def get_instance(self, plugin_service: PluginService, props: Properties) -> Plugin:
57+
@staticmethod
58+
def get_instance(plugin_service: PluginService, props: Properties) -> Plugin:
5859
return ExecuteTimePlugin()

0 commit comments

Comments
 (0)