Skip to content

Commit f2c4f6d

Browse files
committed
Fixed memory leak in async/thread executors
1 parent 391f96b commit f2c4f6d

File tree

6 files changed

+30
-5
lines changed

6 files changed

+30
-5
lines changed

graphql/execution/executor.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -105,6 +105,10 @@ def on_resolve(data):
105105
if not return_promise:
106106
exe_context.executor.wait_until_finished()
107107
return promise.get()
108+
else:
109+
clean = getattr(exe_context.executor, 'clean', None)
110+
if clean:
111+
clean()
108112

109113
return promise
110114

graphql/execution/executors/asyncio.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,9 @@ def wait_until_finished(self):
5252
self.futures = []
5353
self.loop.run_until_complete(wait(futures))
5454

55+
def clean(self):
56+
self.futures = []
57+
5558
def execute(self, fn, *args, **kwargs):
5659
result = fn(*args, **kwargs)
5760
if isinstance(result, Future) or iscoroutine(result):

graphql/execution/executors/gevent.py

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,8 +12,13 @@ def __init__(self):
1212
self.jobs = []
1313

1414
def wait_until_finished(self):
15-
[j.join() for j in self.jobs]
1615
# gevent.joinall(self.jobs)
16+
while self.jobs:
17+
jobs = self.jobs
18+
self.jobs = []
19+
[j.join() for j in jobs]
20+
21+
def clean(self):
1722
self.jobs = []
1823

1924
def execute(self, fn, *args, **kwargs):

graphql/execution/executors/process.py

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,11 +17,16 @@ def __init__(self):
1717
self.q = Queue()
1818

1919
def wait_until_finished(self):
20-
for _process in self.processes:
21-
_process.join()
20+
while self.processes:
21+
processes = self.processes
22+
self.processes = []
23+
[_process.join() for _process in processes]
2224
self.q.close()
2325
self.q.join_thread()
2426

27+
def clean(self):
28+
self.processes = []
29+
2530
def execute(self, fn, *args, **kwargs):
2631
promise = Promise()
2732

graphql/execution/executors/sync.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,5 +3,8 @@ class SyncExecutor(object):
33
def wait_until_finished(self):
44
pass
55

6+
def clean(self):
7+
pass
8+
69
def execute(self, fn, *args, **kwargs):
710
return fn(*args, **kwargs)

graphql/execution/executors/thread.py

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,8 +19,13 @@ def __init__(self, pool=False):
1919
self.execute = self.execute_in_thread
2020

2121
def wait_until_finished(self):
22-
for thread in self.threads:
23-
thread.join()
22+
while self.threads:
23+
threads = self.threads
24+
self.threads = []
25+
[thread.join() for thread in threads]
26+
27+
def clean(self):
28+
self.threads = []
2429

2530
def execute_in_thread(self, fn, *args, **kwargs):
2631
promise = Promise()

0 commit comments

Comments
 (0)