Skip to content

Commit cf7e1f6

Browse files
avara1986P403n1x87
andcommitted
fix(rcm): update the forking behavior in remote config (#7548)
The Remote Configuration Publisher/Subscriber System #5464 restarts all pubsub instances when the application forks (for example, [gunicorn workers](https://docs.gunicorn.org/en/stable/design.html#server-model), uwsgi workers, etc.). ![image](https://github.com/DataDog/dd-trace-py/assets/6352942/774fe838-5374-4edc-82c6-51cc563fe66e) Dynamic Instrumentation needs to update the pubsub instance at this point because the probe mechanism should run in the child process. For that, DI needs the callback as the method of an instance of Debugger, which lives in the child process. https://github.com/DataDog/dd-trace-py/blob/2.x/ddtrace/debugging/_debugger.py#L276 When the application forks and restarts the subscribers, this happens before the debugger updates its callback instance. ``` 10348 starting subscribers 10348 restarting the debugger 10348 register callback 4429382528 <bound method Debugger._on_configuration of Debugger(status=<ServiceStatus.STOPPED: 'stopped'>)> 4406068560 shared_data 4398747792 ``` This results in the registration of the callback in the child processes but the execution of callbacks using the instance of the parent process. * Parent process: register a callback [on_configuration](https://github.com/DataDog/dd-trace-py/blob/2.x/ddtrace/debugging/_debugger.py#L654) for DI ``` 94621 register callback <bound method Debugger._on_configuration of Debugger(status=<ServiceStatus.STOPPED: 'stopped'>)> PubSub Isntance id: 4363974784 Debugger._on_configuration id: 4360362576 ``` * Child processes: register callbacks [on_configuration](https://github.com/DataDog/dd-trace-py/blob/2.x/ddtrace/debugging/_debugger.py#L654) for DI ``` 94638 register callback <bound method Debugger._on_configuration of Debugger(status=<ServiceStatus.STOPPED: 'stopped'>)> PubSub Isntance id: 4392569856 Debugger._on_configuration id: 4364006016 94639 register callback <bound method Debugger._on_configuration of Debugger(status=<ServiceStatus.STOPPED: 'stopped'>)> PubSub Isntance id: 4392569856 Debugger._on_configuration id: 4364006016 94640 register callback <bound method Debugger._on_configuration of Debugger(status=<ServiceStatus.STOPPED: 'stopped'>)> PubSub Isntance id: 4392569856 Debugger._on_configuration id: 4364006016 ``` * Child processes: exec callbacks. ``` 94621 _exec_callback <bound method Debugger._on_configuration of Debugger(status=<ServiceStatus.RUNNING: 'running'>)> PubSub Isntance id: 4392569856 Debugger._on_configuration id: 4360362576 94638 _exec_callback <bound method Debugger._on_configuration of Debugger(status=<ServiceStatus.RUNNING: 'running'>)> PubSub Isntance id: 4392569856 Debugger._on_configuration id: 4360362576 94639 _exec_callback <bound method Debugger._on_configuration of Debugger(status=<ServiceStatus.RUNNING: 'running'>)> PubSub Isntance id: 4392569856 Debugger._on_configuration id: 4360362576 94640 _exec_callback <bound method Debugger._on_configuration of Debugger(status=<ServiceStatus.RUNNING: 'running'>)> PubSub Isntance id: 4392569856 Debugger._on_configuration id: 4360362576 ``` As a result, we're registering callback id **4364006016** but calling **4360362576** (which is the instance of the parent process). This PR removes the restart of publisher-subscriber instances in Remote Configuration at fork and delegates it to the products that have integration with Remote Config, namely AppSec and Dynamic Instrumentation. - [x] Change(s) are motivated and described in the PR description. - [x] Testing strategy is described if automated tests are not included in the PR. - [x] Risk is outlined (performance impact, potential for breakage, maintainability, etc). - [x] Change is maintainable (easy to change, telemetry, documentation). - [x] [Library release note guidelines](https://ddtrace.readthedocs.io/en/stable/releasenotes.html) are followed. If no release note is required, add label `changelog/no-changelog`. - [x] Documentation is included (in-code, generated user docs, [public corp docs](https://github.com/DataDog/documentation/)). - [x] Backport labels are set (if [applicable](https://ddtrace.readthedocs.io/en/latest/contributing.html#backporting)) - [x] Title is accurate. - [x] No unnecessary changes are introduced. - [x] Description motivates each change. - [x] Avoids breaking [API](https://ddtrace.readthedocs.io/en/stable/versioning.html#interfaces) changes unless absolutely necessary. - [x] Testing strategy adequately addresses listed risk(s). - [x] Change is maintainable (easy to change, telemetry, documentation). - [x] Release note makes sense to a user of the library. - [x] Reviewer has explicitly acknowledged and discussed the performance implications of this PR as reported in the benchmarks PR comment. - [x] Backport labels are set in a manner that is consistent with the [release branch maintenance policy](https://ddtrace.readthedocs.io/en/latest/contributing.html#backporting) - [x] If this PR touches code that signs or publishes builds or packages, or handles credentials of any kind, I've requested a review from `@DataDog/security-design-and-guidance`. - [x] This PR doesn't touch any of that. --------- Co-authored-by: Gabriele N. Tornetta <[email protected]> (cherry picked from commit 1da4fc4)
1 parent fcf5dbb commit cf7e1f6

File tree

5 files changed

+45
-20
lines changed

5 files changed

+45
-20
lines changed

ddtrace/appsec/_remoteconfiguration.py

Lines changed: 11 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
from ddtrace.appsec._constants import PRODUCTS
88
from ddtrace.appsec.utils import _appsec_rc_features_is_enabled
99
from ddtrace.constants import APPSEC_ENV
10+
from ddtrace.internal import forksafe
1011
from ddtrace.internal.logger import get_logger
1112
from ddtrace.internal.remoteconfig._connectors import PublisherSubscriberConnector
1213
from ddtrace.internal.remoteconfig._publishers import RemoteConfigPublisherMergeDicts
@@ -32,6 +33,8 @@
3233

3334
log = get_logger(__name__)
3435

36+
APPSEC_PRODUCTS = [PRODUCTS.ASM_FEATURES, PRODUCTS.ASM, PRODUCTS.ASM_DATA, PRODUCTS.ASM_DD]
37+
3538

3639
class AppSecRC(PubSub):
3740
__subscriber_class__ = RemoteConfigSubscriber
@@ -43,6 +46,10 @@ def __init__(self, _preprocess_results, callback):
4346
self._subscriber = self.__subscriber_class__(self.__shared_data__, callback, "ASM")
4447

4548

49+
def _forksafe_appsec_rc():
50+
remoteconfig_poller.start_subscribers_by_product(APPSEC_PRODUCTS)
51+
52+
4653
def enable_appsec_rc(test_tracer=None):
4754
# type: (Optional[Tracer]) -> None
4855
"""Remote config will be used by ASM libraries to receive four different updates from the backend.
@@ -78,14 +85,13 @@ def enable_appsec_rc(test_tracer=None):
7885
remoteconfig_poller.register(PRODUCTS.ASM, asm_callback) # Exclusion Filters & Custom Rules
7986
remoteconfig_poller.register(PRODUCTS.ASM_DD, asm_callback) # DD Rules
8087

88+
forksafe.register(_forksafe_appsec_rc)
89+
8190

8291
def disable_appsec_rc():
8392
# only used to avoid data leaks between tests
84-
85-
remoteconfig_poller.unregister(PRODUCTS.ASM_FEATURES)
86-
remoteconfig_poller.unregister(PRODUCTS.ASM_DATA)
87-
remoteconfig_poller.unregister(PRODUCTS.ASM)
88-
remoteconfig_poller.unregister(PRODUCTS.ASM_DD)
93+
for product_name in APPSEC_PRODUCTS:
94+
remoteconfig_poller.unregister(product_name)
8995

9096

9197
def _add_rules_to_list(features, feature, message, ruleset):

ddtrace/internal/remoteconfig/client.py

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -242,6 +242,13 @@ def update_product_callback(self, product_name, callback):
242242
return True
243243
return False
244244

245+
def start_products(self, products_list):
246+
# type: (list) -> None
247+
for product_name in products_list:
248+
pubsub_instance = self._products.get(product_name)
249+
if pubsub_instance:
250+
pubsub_instance.restart_subscriber()
251+
245252
def unregister_product(self, product_name):
246253
# type: (str) -> None
247254
self._products.pop(product_name, None)

ddtrace/internal/remoteconfig/worker.py

Lines changed: 14 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
import os
2+
from typing import List
23

34
from ddtrace.internal import agent
45
from ddtrace.internal import atexit
@@ -80,19 +81,21 @@ def enable(self):
8081
return True
8182

8283
self.start()
83-
forksafe.register(self.start_subscribers)
84+
forksafe.register(self.reset_at_fork)
8485
atexit.register(self.disable)
8586
return True
8687
return False
8788

88-
def start_subscribers(self):
89+
def reset_at_fork(self):
8990
# type: () -> None
90-
"""Subscribers need to be restarted when application forks"""
91+
"""Client Id needs to be refreshed when application forks"""
9192
self._enable = False
92-
log.debug("[%s][P: %s] Remote Config Poller fork. Starting Pubsub services", os.getpid(), os.getppid())
93+
log.debug("[%s][P: %s] Remote Config Poller fork. Refreshing state", os.getpid(), os.getppid())
9394
self._client.renew_id()
94-
for pubsub in self._client.get_pubsubs():
95-
pubsub.restart_subscriber()
95+
96+
def start_subscribers_by_product(self, products_list):
97+
# type: (List[str]) -> None
98+
self._client.start_products(products_list)
9699

97100
def _poll_data(self, test_tracer=None):
98101
"""Force subscribers to poll new data. This function is only used in tests"""
@@ -121,7 +124,7 @@ def disable(self, join=False):
121124
if self.status == ServiceStatus.STOPPED:
122125
return
123126

124-
forksafe.unregister(self.start_subscribers)
127+
forksafe.unregister(self.reset_at_fork)
125128
atexit.unregister(self.disable)
126129

127130
self.stop()
@@ -147,14 +150,12 @@ def register(self, product, pubsub_instance, skip_enabled=False):
147150
try:
148151
# By enabling on registration we ensure we start the RCM client only
149152
# if there is at least one registered product.
150-
enabled = True
151153
if not skip_enabled:
152-
enabled = self.enable()
154+
self.enable()
153155

154-
if enabled:
155-
self._client.register_product(product, pubsub_instance)
156-
if not self._client.is_subscriber_running(pubsub_instance):
157-
pubsub_instance.start_subscriber()
156+
self._client.register_product(product, pubsub_instance)
157+
if not self._client.is_subscriber_running(pubsub_instance):
158+
pubsub_instance.start_subscriber()
158159
except Exception:
159160
log.debug("error starting the RCM client", exc_info=True)
160161

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
---
2+
fixes:
3+
- |
4+
dynamic instrumentation: Needs to update the pubsub instance when the application forks because the probe
5+
mechanism should run in the child process. For that, DI needs the callback as the method of an instance of
6+
Debugger, which lives in the child process.

tests/internal/remoteconfig/test_remoteconfig.py

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -109,7 +109,7 @@ def get_mock_encoded_msg(msg):
109109

110110
def test_remote_config_register_auto_enable():
111111
# ASM_FEATURES product is enabled by default, but LIVE_DEBUGGER isn't
112-
class MockPubsub:
112+
class MockPubsub(PubSub):
113113
def stop(self, *args, **kwargs):
114114
pass
115115

@@ -128,10 +128,15 @@ def stop(self, *args, **kwargs):
128128

129129
def test_remote_config_register_validate_rc_disabled():
130130
remoteconfig_poller.disable()
131+
132+
class MockPubsub(PubSub):
133+
def stop(self, *args, **kwargs):
134+
pass
135+
131136
assert remoteconfig_poller.status == ServiceStatus.STOPPED
132137

133138
with override_global_config(dict(_remote_config_enabled=False)):
134-
remoteconfig_poller.register("LIVE_DEBUGGER", lambda m, c: None)
139+
remoteconfig_poller.register("LIVE_DEBUGGER", MockPubsub())
135140

136141
assert remoteconfig_poller.status == ServiceStatus.STOPPED
137142

0 commit comments

Comments
 (0)