diff options
author | robot-piglet <[email protected]> | 2025-08-28 14:27:58 +0300 |
---|---|---|
committer | robot-piglet <[email protected]> | 2025-08-28 14:57:06 +0300 |
commit | 81d828c32c8d5477cb2f0ce5da06a1a8d9392ca3 (patch) | |
tree | 3081d566f0d5158d76e9093261344f6406fd09f7 /contrib/python/portalocker/py3/tests/test_redis.py | |
parent | 77ea11423f959e51795cc3ef36a48d808b4ffb98 (diff) |
Intermediate changes
commit_hash:d5b1af16dbe9030537a04c27eb410c88c2f496cd
Diffstat (limited to 'contrib/python/portalocker/py3/tests/test_redis.py')
-rw-r--r-- | contrib/python/portalocker/py3/tests/test_redis.py | 90 |
1 files changed, 90 insertions, 0 deletions
diff --git a/contrib/python/portalocker/py3/tests/test_redis.py b/contrib/python/portalocker/py3/tests/test_redis.py new file mode 100644 index 00000000000..694c9af17ca --- /dev/null +++ b/contrib/python/portalocker/py3/tests/test_redis.py @@ -0,0 +1,90 @@ +import _thread +import logging +import random +import time + +import pytest +from redis import client +from redis import exceptions + +import portalocker +from portalocker import redis +from portalocker import utils + +logger = logging.getLogger(__name__) + +try: + client.Redis().ping() +except (exceptions.ConnectionError, ConnectionRefusedError): + pytest.skip('Unable to connect to redis', allow_module_level=True) + + [email protected](autouse=True) +def set_redis_timeouts(monkeypatch): + monkeypatch.setattr(utils, 'DEFAULT_TIMEOUT', 0.0001) + monkeypatch.setattr(utils, 'DEFAULT_CHECK_INTERVAL', 0.0005) + monkeypatch.setattr(redis, 'DEFAULT_UNAVAILABLE_TIMEOUT', 0.01) + monkeypatch.setattr(redis, 'DEFAULT_THREAD_SLEEP_TIME', 0.001) + monkeypatch.setattr(_thread, 'interrupt_main', lambda: None) + + +def test_redis_lock(): + channel = str(random.random()) + + lock_a = redis.RedisLock(channel) + lock_a.acquire(fail_when_locked=True) + time.sleep(0.01) + + lock_b = redis.RedisLock(channel) + try: + with pytest.raises(portalocker.AlreadyLocked): + lock_b.acquire(fail_when_locked=True) + finally: + lock_a.release() + lock_a.connection.close() + + [email protected]('timeout', [None, 0, 0.001]) [email protected]('check_interval', [None, 0, 0.0005]) +def test_redis_lock_timeout(timeout, check_interval): + connection = client.Redis() + channel = str(random.random()) + lock_a = redis.RedisLock(channel) + lock_a.acquire(timeout=timeout, check_interval=check_interval) + + lock_b = redis.RedisLock(channel, connection=connection) + with pytest.raises(portalocker.AlreadyLocked): + try: + lock_b.acquire(timeout=timeout, check_interval=check_interval) + finally: + lock_a.release() + lock_a.connection.close() + + +def test_redis_lock_context(): + channel = str(random.random()) + + lock_a = redis.RedisLock(channel, fail_when_locked=True) + with lock_a: + time.sleep(0.01) + lock_b = redis.RedisLock(channel, fail_when_locked=True) + with pytest.raises(portalocker.AlreadyLocked): + with lock_b: + pass + + +def test_redis_relock(): + channel = str(random.random()) + + lock_a = redis.RedisLock(channel, fail_when_locked=True) + with lock_a: + time.sleep(0.01) + with pytest.raises(AssertionError): + lock_a.acquire() + time.sleep(0.01) + + lock_a.release() + + +if __name__ == '__main__': + test_redis_lock() |