Skip to content

Commit 54e37d5

Browse files
avara1986P403n1x87
authored andcommitted
fix(rcm): update the forking behavior in remote config (#7548)
# Context The Remote Configuration Publisher/Subscriber System #5464 restarts all pubsub instances when the application forks (for example, [gunicorn workers](https://docs.gunicorn.org/en/stable/design.html#server-model), uwsgi workers, etc.). ![image](https://github.com/DataDog/dd-trace-py/assets/6352942/774fe838-5374-4edc-82c6-51cc563fe66e) Dynamic Instrumentation needs to update the pubsub instance at this point because the probe mechanism should run in the child process. For that, DI needs the callback as the method of an instance of Debugger, which lives in the child process. https://github.com/DataDog/dd-trace-py/blob/2.x/ddtrace/debugging/_debugger.py#L276 # Problem description When the application forks and restarts the subscribers, this happens before the debugger updates its callback instance. ``` 10348 starting subscribers 10348 restarting the debugger 10348 register callback 4429382528 <bound method Debugger._on_configuration of Debugger(status=<ServiceStatus.STOPPED: 'stopped'>)> 4406068560 shared_data 4398747792 ``` This results in the registration of the callback in the child processes but the execution of callbacks using the instance of the parent process. * Parent process: register a callback [on_configuration](https://github.com/DataDog/dd-trace-py/blob/2.x/ddtrace/debugging/_debugger.py#L654) for DI ``` 94621 register callback <bound method Debugger._on_configuration of Debugger(status=<ServiceStatus.STOPPED: 'stopped'>)> PubSub Isntance id: 4363974784 Debugger._on_configuration id: 4360362576 ``` * Child processes: register callbacks [on_configuration](https://github.com/DataDog/dd-trace-py/blob/2.x/ddtrace/debugging/_debugger.py#L654) for DI ``` 94638 register callback <bound method Debugger._on_configuration of Debugger(status=<ServiceStatus.STOPPED: 'stopped'>)> PubSub Isntance id: 4392569856 Debugger._on_configuration id: 4364006016 94639 register callback <bound method Debugger._on_configuration of Debugger(status=<ServiceStatus.STOPPED: 'stopped'>)> PubSub Isntance id: 4392569856 Debugger._on_configuration id: 4364006016 94640 register callback <bound method Debugger._on_configuration of Debugger(status=<ServiceStatus.STOPPED: 'stopped'>)> PubSub Isntance id: 4392569856 Debugger._on_configuration id: 4364006016 ``` * Child processes: exec callbacks. ``` 94621 _exec_callback <bound method Debugger._on_configuration of Debugger(status=<ServiceStatus.RUNNING: 'running'>)> PubSub Isntance id: 4392569856 Debugger._on_configuration id: 4360362576 94638 _exec_callback <bound method Debugger._on_configuration of Debugger(status=<ServiceStatus.RUNNING: 'running'>)> PubSub Isntance id: 4392569856 Debugger._on_configuration id: 4360362576 94639 _exec_callback <bound method Debugger._on_configuration of Debugger(status=<ServiceStatus.RUNNING: 'running'>)> PubSub Isntance id: 4392569856 Debugger._on_configuration id: 4360362576 94640 _exec_callback <bound method Debugger._on_configuration of Debugger(status=<ServiceStatus.RUNNING: 'running'>)> PubSub Isntance id: 4392569856 Debugger._on_configuration id: 4360362576 ``` As a result, we're registering callback id **4364006016** but calling **4360362576** (which is the instance of the parent process). # PR Description This PR removes the restart of publisher-subscriber instances in Remote Configuration at fork and delegates it to the products that have integration with Remote Config, namely AppSec and Dynamic Instrumentation. ## Checklist - [x] Change(s) are motivated and described in the PR description. - [x] Testing strategy is described if automated tests are not included in the PR. - [x] Risk is outlined (performance impact, potential for breakage, maintainability, etc). - [x] Change is maintainable (easy to change, telemetry, documentation). - [x] [Library release note guidelines](https://ddtrace.readthedocs.io/en/stable/releasenotes.html) are followed. If no release note is required, add label `changelog/no-changelog`. - [x] Documentation is included (in-code, generated user docs, [public corp docs](https://github.com/DataDog/documentation/)). - [x] Backport labels are set (if [applicable](https://ddtrace.readthedocs.io/en/latest/contributing.html#backporting)) ## Reviewer Checklist - [x] Title is accurate. - [x] No unnecessary changes are introduced. - [x] Description motivates each change. - [x] Avoids breaking [API](https://ddtrace.readthedocs.io/en/stable/versioning.html#interfaces) changes unless absolutely necessary. - [x] Testing strategy adequately addresses listed risk(s). - [x] Change is maintainable (easy to change, telemetry, documentation). - [x] Release note makes sense to a user of the library. - [x] Reviewer has explicitly acknowledged and discussed the performance implications of this PR as reported in the benchmarks PR comment. - [x] Backport labels are set in a manner that is consistent with the [release branch maintenance policy](https://ddtrace.readthedocs.io/en/latest/contributing.html#backporting) - [x] If this PR touches code that signs or publishes builds or packages, or handles credentials of any kind, I've requested a review from `@DataDog/security-design-and-guidance`. - [x] This PR doesn't touch any of that. --------- Co-authored-by: Gabriele N. Tornetta <[email protected]> (cherry picked from commit 1da4fc4)
1 parent db48b01 commit 54e37d5

File tree

5 files changed

+45
-20
lines changed

5 files changed

+45
-20
lines changed

ddtrace/appsec/_remoteconfiguration.py

Lines changed: 11 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
from ddtrace.appsec._constants import PRODUCTS
1414
from ddtrace.appsec._utils import _appsec_rc_features_is_enabled
1515
from ddtrace.constants import APPSEC_ENV
16+
from ddtrace.internal import forksafe
1617
from ddtrace.internal.logger import get_logger
1718
from ddtrace.internal.remoteconfig._connectors import PublisherSubscriberConnector
1819
from ddtrace.internal.remoteconfig._publishers import RemoteConfigPublisherMergeDicts
@@ -25,6 +26,8 @@
2526

2627
log = get_logger(__name__)
2728

29+
APPSEC_PRODUCTS = [PRODUCTS.ASM_FEATURES, PRODUCTS.ASM, PRODUCTS.ASM_DATA, PRODUCTS.ASM_DD]
30+
2831

2932
class AppSecRC(PubSub):
3033
__subscriber_class__ = RemoteConfigSubscriber
@@ -36,6 +39,10 @@ def __init__(self, _preprocess_results, callback):
3639
self._subscriber = self.__subscriber_class__(self.__shared_data__, callback, "ASM")
3740

3841

42+
def _forksafe_appsec_rc():
43+
remoteconfig_poller.start_subscribers_by_product(APPSEC_PRODUCTS)
44+
45+
3946
def enable_appsec_rc(test_tracer: Optional[Tracer] = None) -> None:
4047
"""Remote config will be used by ASM libraries to receive four different updates from the backend.
4148
Each update has it’s own product:
@@ -70,14 +77,13 @@ def enable_appsec_rc(test_tracer: Optional[Tracer] = None) -> None:
7077
remoteconfig_poller.register(PRODUCTS.ASM, asm_callback) # Exclusion Filters & Custom Rules
7178
remoteconfig_poller.register(PRODUCTS.ASM_DD, asm_callback) # DD Rules
7279

80+
forksafe.register(_forksafe_appsec_rc)
81+
7382

7483
def disable_appsec_rc():
7584
# only used to avoid data leaks between tests
76-
77-
remoteconfig_poller.unregister(PRODUCTS.ASM_FEATURES)
78-
remoteconfig_poller.unregister(PRODUCTS.ASM_DATA)
79-
remoteconfig_poller.unregister(PRODUCTS.ASM)
80-
remoteconfig_poller.unregister(PRODUCTS.ASM_DD)
85+
for product_name in APPSEC_PRODUCTS:
86+
remoteconfig_poller.unregister(product_name)
8187

8288

8389
def _add_rules_to_list(features: Mapping[str, Any], feature: str, message: str, ruleset: Dict[str, Any]) -> None:

ddtrace/internal/remoteconfig/client.py

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -244,6 +244,13 @@ def update_product_callback(self, product_name, callback):
244244
return True
245245
return False
246246

247+
def start_products(self, products_list):
248+
# type: (list) -> None
249+
for product_name in products_list:
250+
pubsub_instance = self._products.get(product_name)
251+
if pubsub_instance:
252+
pubsub_instance.restart_subscriber()
253+
247254
def unregister_product(self, product_name):
248255
# type: (str) -> None
249256
self._products.pop(product_name, None)

ddtrace/internal/remoteconfig/worker.py

Lines changed: 14 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
import os
2+
from typing import List
23

34
from ddtrace.internal import agent
45
from ddtrace.internal import atexit
@@ -80,19 +81,21 @@ def enable(self):
8081
return True
8182

8283
self.start()
83-
forksafe.register(self.start_subscribers)
84+
forksafe.register(self.reset_at_fork)
8485
atexit.register(self.disable)
8586
return True
8687
return False
8788

88-
def start_subscribers(self):
89+
def reset_at_fork(self):
8990
# type: () -> None
90-
"""Subscribers need to be restarted when application forks"""
91+
"""Client Id needs to be refreshed when application forks"""
9192
self._enable = False
92-
log.debug("[%s][P: %s] Remote Config Poller fork. Starting Pubsub services", os.getpid(), os.getppid())
93+
log.debug("[%s][P: %s] Remote Config Poller fork. Refreshing state", os.getpid(), os.getppid())
9394
self._client.renew_id()
94-
for pubsub in self._client.get_pubsubs():
95-
pubsub.restart_subscriber()
95+
96+
def start_subscribers_by_product(self, products_list):
97+
# type: (List[str]) -> None
98+
self._client.start_products(products_list)
9699

97100
def _poll_data(self, test_tracer=None):
98101
"""Force subscribers to poll new data. This function is only used in tests"""
@@ -121,7 +124,7 @@ def disable(self, join=False):
121124
if self.status == ServiceStatus.STOPPED:
122125
return
123126

124-
forksafe.unregister(self.start_subscribers)
127+
forksafe.unregister(self.reset_at_fork)
125128
atexit.unregister(self.disable)
126129

127130
self.stop(join=join)
@@ -147,14 +150,12 @@ def register(self, product, pubsub_instance, skip_enabled=False):
147150
try:
148151
# By enabling on registration we ensure we start the RCM client only
149152
# if there is at least one registered product.
150-
enabled = True
151153
if not skip_enabled:
152-
enabled = self.enable()
154+
self.enable()
153155

154-
if enabled:
155-
self._client.register_product(product, pubsub_instance)
156-
if not self._client.is_subscriber_running(pubsub_instance):
157-
pubsub_instance.start_subscriber()
156+
self._client.register_product(product, pubsub_instance)
157+
if not self._client.is_subscriber_running(pubsub_instance):
158+
pubsub_instance.start_subscriber()
158159
except Exception:
159160
log.debug("error starting the RCM client", exc_info=True)
160161

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
---
2+
fixes:
3+
- |
4+
dynamic instrumentation: Needs to update the pubsub instance when the application forks because the probe
5+
mechanism should run in the child process. For that, DI needs the callback as the method of an instance of
6+
Debugger, which lives in the child process.

tests/internal/remoteconfig/test_remoteconfig.py

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -146,7 +146,7 @@ def get_mock_encoded_msg_with_signed_errors(msg, path, signed_errors):
146146

147147
def test_remote_config_register_auto_enable(remote_config_worker):
148148
# ASM_FEATURES product is enabled by default, but LIVE_DEBUGGER isn't
149-
class MockPubsub:
149+
class MockPubsub(PubSub):
150150
def stop(self, *args, **kwargs):
151151
pass
152152

@@ -165,10 +165,15 @@ def stop(self, *args, **kwargs):
165165

166166
def test_remote_config_register_validate_rc_disabled(remote_config_worker):
167167
remoteconfig_poller.disable()
168+
169+
class MockPubsub(PubSub):
170+
def stop(self, *args, **kwargs):
171+
pass
172+
168173
assert remoteconfig_poller.status == ServiceStatus.STOPPED
169174

170175
with override_global_config(dict(_remote_config_enabled=False)):
171-
remoteconfig_poller.register("LIVE_DEBUGGER", lambda m, c: None)
176+
remoteconfig_poller.register("LIVE_DEBUGGER", MockPubsub())
172177

173178
assert remoteconfig_poller.status == ServiceStatus.STOPPED
174179

0 commit comments

Comments
 (0)