aboutsummaryrefslogblamecommitdiffstats
path: root/library/python/filelock/ut/test_filelock.py
blob: 018acf290b1e894b525d740a82d36777eb11ad27 (plain) (tree)


























































                                                                                                
                            


                                                         
                             







                                         





                         






















































                                                                                                              
import os
import time
import logging
import multiprocessing
import tempfile
import threading

import library.python.filelock


def _acquire_lock(lock_path, out_file_path):
    with library.python.filelock.FileLock(lock_path):
        with open(out_file_path, "a") as out:
            out.write("{}:{}\n".format(os.getpid(), time.time()))
        time.sleep(2)


def test_filelock():
    temp_dir = tempfile.mkdtemp()
    lock_path = os.path.join(temp_dir, "file.lock")
    out_file_path = os.path.join(temp_dir, "out.txt")

    process_count = 5
    processes = []
    for i in range(process_count):
        process = multiprocessing.Process(target=_acquire_lock, args=(lock_path, out_file_path))
        process.start()
        processes.append(process)

    for process in processes:
        process.join()

    pids = []
    times = []
    with open(out_file_path) as out:
        content = out.read()
        logging.info("Times:\n%s", content)
        for line in content.strip().split("\n"):
            pid, time_val = line.split(":")
            pids.append(pid)
            times.append(float(time_val))

    assert len(set(pids)) == process_count
    time1 = times.pop()
    while times:
        time2 = times.pop()
        assert int(time1) - int(time2) >= 2
        time1 = time2


def test_filelock_init_acquired():
    temp_dir = tempfile.mkdtemp()
    lock_path = os.path.join(temp_dir, "file.lock")

    with library.python.filelock.FileLock(lock_path):
        sublock = library.python.filelock.FileLock(lock_path)
        del sublock


def test_concurrent_lock():
    filename = 'thread.lock'

    def lock():
        lock = library.python.filelock.FileLock(filename)
        time.sleep(1)
        assert lock.acquire()
        lock.release()
        try:
            os.unlink(filename)
        except OSError:
            pass

    threads = []
    for i in range(100):
        t = threading.Thread(target=lock)
        threads.append(t)

    for t in threads:
        t.start()

    for t in threads:
        t.join()


def test_pidfilelock():
    lock_file = 'pidfile.lock'
    # there should be no info
    lock = library.python.filelock.PidFileLock(lock_file)
    assert lock.info.pid == 0
    assert lock.info.time == 0

    with library.python.filelock.PidFileLock(lock_file) as lock:
        assert lock.info.pid == os.getpid()
        assert lock.info.time <= time.time()
        assert lock.info.time > time.time() - 2

        newlock = library.python.filelock.PidFileLock(lock_file)
        # info shouldn't require locking
        assert newlock.info.pid == os.getpid()
        assert not newlock.acquire(blocking=False)

    newlock = library.python.filelock.PidFileLock(lock_file)
    # info is still accessible
    assert newlock.info.pid == os.getpid()
    t = newlock.info.time
    # info is updated
    time.sleep(1)
    with newlock as lock:
        assert lock.info.time > t


def _try_acquire_pidlock(lock_file, out_file, lock_pid=None):
    lock = library.python.filelock.PidFileLock(lock_file)
    with open(out_file, "w") as afile:
        afile.write("1" if lock.acquire(blocking=False) else "0")

    if lock_pid is not None:
        assert lock.info.pid == lock_pid


def test_pidfilelock_multiprocessing():
    lock_file = 'mp_pidfile.lock'
    out_file = lock_file + ".out"

    # subprocess can aquire lock
    proc = multiprocessing.Process(target=_try_acquire_pidlock, args=(lock_file, out_file))
    proc.start()
    proc.join()
    with open(out_file) as afile:
        assert "1" == afile.read()

    # subprocess can't aquire lock
    with library.python.filelock.PidFileLock(lock_file) as lock:
        proc = multiprocessing.Process(target=_try_acquire_pidlock, args=(lock_file, out_file, lock.info.pid))
        proc.start()
        proc.join()
        with open(out_file) as afile:
            assert "0" == afile.read()