2
2
import sqlite3
3
3
import threading
4
4
import logging
5
+ import time
5
6
from _error import Timeout
6
7
from filelock ._api import AcquireReturnProxy , BaseFileLock
7
8
from typing import Literal , Any
15
16
# systems. Use even a lower value to be safe. This 2 bln milliseconds is about 23 days.
16
17
_MAX_SQLITE_TIMEOUT_MS = 2_000_000_000 - 1
17
18
18
- def timeout_for_sqlite (timeout : float = - 1 , blocking : bool = True ) -> int :
19
+ def timeout_for_sqlite (timeout : float , blocking : bool , already_waited : float ) -> int :
19
20
if blocking is False :
20
21
return 0
22
+
21
23
if timeout == - 1 :
22
24
return _MAX_SQLITE_TIMEOUT_MS
25
+
23
26
if timeout < 0 :
24
27
raise ValueError ("timeout must be a non-negative number or -1" )
25
28
29
+ if timeout > 0 :
30
+ timeout = timeout - already_waited
31
+ if timeout < 0 :
32
+ timeout = 0
33
+
26
34
assert timeout >= 0
35
+
27
36
timeout_ms = int (timeout * 1000 )
28
37
if timeout_ms > _MAX_SQLITE_TIMEOUT_MS or timeout_ms < 0 :
29
38
_LOGGER .warning ("timeout %s is too large for SQLite, using %s ms instead" , timeout , _MAX_SQLITE_TIMEOUT_MS )
@@ -97,16 +106,22 @@ def __init__(
97
106
def acquire_read (self , timeout : float = - 1 , blocking : bool = True ) -> AcquireReturnProxy :
98
107
"""Acquire a read lock. If a lock is already held, it must be a read lock.
99
108
Upgrading from read to write is prohibited."""
109
+
110
+ # Attempt to re-enter already held lock.
100
111
with self ._internal_lock :
101
112
if self ._lock_level > 0 :
102
113
# Must already be in read mode.
103
114
if self ._current_mode != "read" :
104
- raise RuntimeError ("Cannot acquire read lock when a write lock is held (no upgrade allowed)" )
115
+ raise RuntimeError (
116
+ f"Cannot acquire read lock on { self .lock_file } (lock id: { id (self )} ): "
117
+ "already holding a write lock (downgrade not allowed)"
118
+ )
105
119
self ._lock_level += 1
106
120
return AcquireReturnProxy (lock = self )
107
121
108
122
timeout_ms = timeout_for_sqlite (timeout , blocking )
109
123
124
+ start_time = time .perf_counter ()
110
125
# Acquire the transaction lock so that the (possibly blocking) SQLite work
111
126
# happens without conflicting with other threads' transaction work.
112
127
if not self ._transaction_lock .acquire (blocking , timeout ):
@@ -115,11 +130,16 @@ def acquire_read(self, timeout: float = -1, blocking: bool = True) -> AcquireRet
115
130
# Double-check: another thread might have completed acquisition meanwhile.
116
131
with self ._internal_lock :
117
132
if self ._lock_level > 0 :
118
- # Must already be in read mode.
119
133
if self ._current_mode != "read" :
120
- raise RuntimeError ("Cannot acquire read lock when a write lock is held (no upgrade allowed)" )
134
+ raise RuntimeError (
135
+ f"Cannot acquire read lock on { self .lock_file } (lock id: { id (self )} ): "
136
+ "already holding a write lock (downgrade not allowed)"
137
+ )
121
138
self ._lock_level += 1
122
139
return AcquireReturnProxy (lock = self )
140
+
141
+ waited = time .perf_counter () - start_time
142
+ timeout_ms = timeout_for_sqlite (timeout , blocking , waited )
123
143
124
144
self .con .execute ('PRAGMA busy_timeout=?;' , (timeout_ms ,))
125
145
self .con .execute ('BEGIN TRANSACTION;' )
@@ -143,25 +163,38 @@ def acquire_read(self, timeout: float = -1, blocking: bool = True) -> AcquireRet
143
163
def acquire_write (self , timeout : float = - 1 , blocking : bool = True ) -> AcquireReturnProxy :
144
164
"""Acquire a write lock. If a lock is already held, it must be a write lock.
145
165
Upgrading from read to write is prohibited."""
166
+
167
+ # Attempt to re-enter already held lock.
146
168
with self ._internal_lock :
147
169
if self ._lock_level > 0 :
148
170
if self ._current_mode != "write" :
149
- raise RuntimeError ("Cannot acquire write lock: already holding a read lock (no upgrade allowed)" )
171
+ raise RuntimeError (
172
+ f"Cannot acquire write lock on { self .lock_file } (lock id: { id (self )} ): "
173
+ "already holding a read lock (upgrade not allowed)"
174
+ )
150
175
self ._lock_level += 1
151
176
return AcquireReturnProxy (lock = self )
152
177
153
- timeout_ms = timeout_for_sqlite (timeout , blocking )
178
+ start_time = time .perf_counter ()
179
+ # Acquire the transaction lock so that the (possibly blocking) SQLite work
180
+ # happens without conflicting with other threads' transaction work.
154
181
if not self ._transaction_lock .acquire (blocking , timeout ):
155
182
raise Timeout (self .lock_file )
156
183
try :
157
184
# Double-check: another thread might have completed acquisition meanwhile.
158
185
with self ._internal_lock :
159
186
if self ._lock_level > 0 :
160
187
if self ._current_mode != "write" :
161
- raise RuntimeError ("Cannot acquire write lock: already holding a read lock (no upgrade allowed)" )
188
+ raise RuntimeError (
189
+ f"Cannot acquire write lock on { self .lock_file } (lock id: { id (self )} ): "
190
+ "already holding a read lock (upgrade not allowed)"
191
+ )
162
192
self ._lock_level += 1
163
193
return AcquireReturnProxy (lock = self )
164
194
195
+ waited = time .perf_counter () - start_time
196
+ timeout_ms = timeout_for_sqlite (timeout , blocking , waited )
197
+
165
198
self .con .execute ('PRAGMA busy_timeout=?;' , (timeout_ms ,))
166
199
self .con .execute ('BEGIN EXCLUSIVE TRANSACTION;' )
167
200
@@ -183,7 +216,7 @@ def release(self, force: bool = False) -> None:
183
216
if self ._lock_level == 0 :
184
217
if force :
185
218
return
186
- raise RuntimeError ("Cannot release a lock that is not held" )
219
+ raise RuntimeError (f "Cannot release a lock on { self . lock_file } (lock id: { id ( self ) } ) that is not held" )
187
220
if force :
188
221
self ._lock_level = 0
189
222
else :
0 commit comments