diff options
author | vitalyisaev <vitalyisaev@ydb.tech> | 2023-11-30 13:26:22 +0300 |
---|---|---|
committer | vitalyisaev <vitalyisaev@ydb.tech> | 2023-11-30 15:44:45 +0300 |
commit | 0a98fece5a9b54f16afeb3a94b3eb3105e9c3962 (patch) | |
tree | 291d72dbd7e9865399f668c84d11ed86fb190bbf /library/python/par_apply/__init__.py | |
parent | cb2c8d75065e5b3c47094067cb4aa407d4813298 (diff) | |
download | ydb-0a98fece5a9b54f16afeb3a94b3eb3105e9c3962.tar.gz |
YQ Connector:Use docker-compose in integrational tests
Diffstat (limited to 'library/python/par_apply/__init__.py')
-rw-r--r-- | library/python/par_apply/__init__.py | 114 |
1 files changed, 114 insertions, 0 deletions
diff --git a/library/python/par_apply/__init__.py b/library/python/par_apply/__init__.py new file mode 100644 index 0000000000..19b89ae843 --- /dev/null +++ b/library/python/par_apply/__init__.py @@ -0,0 +1,114 @@ +import sys +import threading +import six + +from six.moves import queue + + +def par_apply(seq, func, thr_num, join_polling=None): + if thr_num < 2: + for x in seq: + yield func(x) + + return + + in_q = queue.Queue() + out_q = queue.Queue() + + def enumerate_blocks(): + n = 0 + + for b in seq: + yield n, [b] + n += 1 + + yield n, None + + def iter_out(): + n = 0 + d = {} + + while True: + if n in d: + r = d[n] + del d[n] + n += 1 + + yield r + else: + res = out_q.get() + + d[res[0]] = res + + out_iter = iter_out() + + def wait_block(): + for x in out_iter: + return x + + def iter_compressed(): + p = 0 + + for n, b in enumerate_blocks(): + in_q.put((n, b)) + + while n > p + (thr_num * 2): + p, b, c = wait_block() + + if not b: + return + + yield p, c + + while True: + p, b, c = wait_block() + + if not b: + return + + yield p, c + + def proc(): + while True: + data = in_q.get() + + if data is None: + return + + n, b = data + + if b: + try: + res = (func(b[0]), None) + except Exception: + res = (None, sys.exc_info()) + else: + res = (None, None) + + out_q.put((n, b, res)) + + thrs = [threading.Thread(target=proc) for i in range(0, thr_num)] + + for t in thrs: + t.start() + + try: + for p, c in iter_compressed(): + res, err = c + + if err: + six.reraise(*err) + + yield res + finally: + for t in thrs: + in_q.put(None) + + for t in thrs: + if join_polling is not None: + while True: + t.join(join_polling) + if not t.is_alive(): + break + else: + t.join() |