aboutsummaryrefslogtreecommitdiffstats
path: root/library/python/filelock/ut/lib/test_filelock.py
blob: a5624f827c3ec9fa8982e2f3131057ab8a882c60 (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
import os
import time
import logging
import multiprocessing
import tempfile
import threading 

import library.python.filelock


def _acquire_lock(lock_path, out_file_path):
    with library.python.filelock.FileLock(lock_path):
        with open(out_file_path, "a") as out:
            out.write("{}:{}\n".format(os.getpid(), time.time()))
        time.sleep(2)


def test_filelock():
    temp_dir = tempfile.mkdtemp()
    lock_path = os.path.join(temp_dir, "file.lock")
    out_file_path = os.path.join(temp_dir, "out.txt")

    process_count = 5
    processes = []
    for i in range(process_count):
        process = multiprocessing.Process(target=_acquire_lock, args=(lock_path, out_file_path))
        process.start()
        processes.append(process)

    for process in processes:
        process.join()

    pids = []
    times = []
    with open(out_file_path) as out:
        content = out.read()
        logging.info("Times:\n%s", content)
        for line in content.strip().split("\n"):
            pid, time_val = line.split(":")
            pids.append(pid)
            times.append(float(time_val))

    assert len(set(pids)) == process_count
    time1 = times.pop()
    while times:
        time2 = times.pop()
        assert int(time1) - int(time2) >= 2
        time1 = time2


def test_filelock_init_acquired(): 
    temp_dir = tempfile.mkdtemp()
    lock_path = os.path.join(temp_dir, "file.lock")

    with library.python.filelock.FileLock(lock_path):
        sublock = library.python.filelock.FileLock(lock_path)
        del sublock
 
 
def test_concurrent_lock(): 
    filename = 'con.lock' 
 
    def lock(): 
        l = library.python.filelock.FileLock(filename) 
        time.sleep(1) 
        l.acquire() 
        l.release() 
        try: 
            os.unlink(filename) 
        except OSError: 
            pass 
 
    threads = [] 
    for i in range(100): 
        t = threading.Thread(target=lock) 
        t.daemon = True 
        threads.append(t) 
 
    for t in threads: 
        t.start() 
 
    for t in threads: 
        t.join()