blob: 018acf290b1e894b525d740a82d36777eb11ad27 (
plain) (
tree)
|
|
import os
import time
import logging
import multiprocessing
import tempfile
import threading
import library.python.filelock
def _acquire_lock(lock_path, out_file_path):
with library.python.filelock.FileLock(lock_path):
with open(out_file_path, "a") as out:
out.write("{}:{}\n".format(os.getpid(), time.time()))
time.sleep(2)
def test_filelock():
temp_dir = tempfile.mkdtemp()
lock_path = os.path.join(temp_dir, "file.lock")
out_file_path = os.path.join(temp_dir, "out.txt")
process_count = 5
processes = []
for i in range(process_count):
process = multiprocessing.Process(target=_acquire_lock, args=(lock_path, out_file_path))
process.start()
processes.append(process)
for process in processes:
process.join()
pids = []
times = []
with open(out_file_path) as out:
content = out.read()
logging.info("Times:\n%s", content)
for line in content.strip().split("\n"):
pid, time_val = line.split(":")
pids.append(pid)
times.append(float(time_val))
assert len(set(pids)) == process_count
time1 = times.pop()
while times:
time2 = times.pop()
assert int(time1) - int(time2) >= 2
time1 = time2
def test_filelock_init_acquired():
temp_dir = tempfile.mkdtemp()
lock_path = os.path.join(temp_dir, "file.lock")
with library.python.filelock.FileLock(lock_path):
sublock = library.python.filelock.FileLock(lock_path)
del sublock
def test_concurrent_lock():
filename = 'thread.lock'
def lock():
lock = library.python.filelock.FileLock(filename)
time.sleep(1)
assert lock.acquire()
lock.release()
try:
os.unlink(filename)
except OSError:
pass
threads = []
for i in range(100):
t = threading.Thread(target=lock)
threads.append(t)
for t in threads:
t.start()
for t in threads:
t.join()
def test_pidfilelock():
lock_file = 'pidfile.lock'
# there should be no info
lock = library.python.filelock.PidFileLock(lock_file)
assert lock.info.pid == 0
assert lock.info.time == 0
with library.python.filelock.PidFileLock(lock_file) as lock:
assert lock.info.pid == os.getpid()
assert lock.info.time <= time.time()
assert lock.info.time > time.time() - 2
newlock = library.python.filelock.PidFileLock(lock_file)
# info shouldn't require locking
assert newlock.info.pid == os.getpid()
assert not newlock.acquire(blocking=False)
newlock = library.python.filelock.PidFileLock(lock_file)
# info is still accessible
assert newlock.info.pid == os.getpid()
t = newlock.info.time
# info is updated
time.sleep(1)
with newlock as lock:
assert lock.info.time > t
def _try_acquire_pidlock(lock_file, out_file, lock_pid=None):
lock = library.python.filelock.PidFileLock(lock_file)
with open(out_file, "w") as afile:
afile.write("1" if lock.acquire(blocking=False) else "0")
if lock_pid is not None:
assert lock.info.pid == lock_pid
def test_pidfilelock_multiprocessing():
lock_file = 'mp_pidfile.lock'
out_file = lock_file + ".out"
# subprocess can aquire lock
proc = multiprocessing.Process(target=_try_acquire_pidlock, args=(lock_file, out_file))
proc.start()
proc.join()
with open(out_file) as afile:
assert "1" == afile.read()
# subprocess can't aquire lock
with library.python.filelock.PidFileLock(lock_file) as lock:
proc = multiprocessing.Process(target=_try_acquire_pidlock, args=(lock_file, out_file, lock.info.pid))
proc.start()
proc.join()
with open(out_file) as afile:
assert "0" == afile.read()
|