aboutsummaryrefslogtreecommitdiffstats
path: root/library/python/par_apply
diff options
context:
space:
mode:
authorvitalyisaev <vitalyisaev@ydb.tech>2023-11-30 13:26:22 +0300
committervitalyisaev <vitalyisaev@ydb.tech>2023-11-30 15:44:45 +0300
commit0a98fece5a9b54f16afeb3a94b3eb3105e9c3962 (patch)
tree291d72dbd7e9865399f668c84d11ed86fb190bbf /library/python/par_apply
parentcb2c8d75065e5b3c47094067cb4aa407d4813298 (diff)
downloadydb-0a98fece5a9b54f16afeb3a94b3eb3105e9c3962.tar.gz
YQ Connector:Use docker-compose in integrational tests
Diffstat (limited to 'library/python/par_apply')
-rw-r--r--library/python/par_apply/__init__.py114
-rw-r--r--library/python/par_apply/ya.make11
2 files changed, 125 insertions, 0 deletions
diff --git a/library/python/par_apply/__init__.py b/library/python/par_apply/__init__.py
new file mode 100644
index 0000000000..19b89ae843
--- /dev/null
+++ b/library/python/par_apply/__init__.py
@@ -0,0 +1,114 @@
+import sys
+import threading
+import six
+
+from six.moves import queue
+
+
+def par_apply(seq, func, thr_num, join_polling=None):
+ if thr_num < 2:
+ for x in seq:
+ yield func(x)
+
+ return
+
+ in_q = queue.Queue()
+ out_q = queue.Queue()
+
+ def enumerate_blocks():
+ n = 0
+
+ for b in seq:
+ yield n, [b]
+ n += 1
+
+ yield n, None
+
+ def iter_out():
+ n = 0
+ d = {}
+
+ while True:
+ if n in d:
+ r = d[n]
+ del d[n]
+ n += 1
+
+ yield r
+ else:
+ res = out_q.get()
+
+ d[res[0]] = res
+
+ out_iter = iter_out()
+
+ def wait_block():
+ for x in out_iter:
+ return x
+
+ def iter_compressed():
+ p = 0
+
+ for n, b in enumerate_blocks():
+ in_q.put((n, b))
+
+ while n > p + (thr_num * 2):
+ p, b, c = wait_block()
+
+ if not b:
+ return
+
+ yield p, c
+
+ while True:
+ p, b, c = wait_block()
+
+ if not b:
+ return
+
+ yield p, c
+
+ def proc():
+ while True:
+ data = in_q.get()
+
+ if data is None:
+ return
+
+ n, b = data
+
+ if b:
+ try:
+ res = (func(b[0]), None)
+ except Exception:
+ res = (None, sys.exc_info())
+ else:
+ res = (None, None)
+
+ out_q.put((n, b, res))
+
+ thrs = [threading.Thread(target=proc) for i in range(0, thr_num)]
+
+ for t in thrs:
+ t.start()
+
+ try:
+ for p, c in iter_compressed():
+ res, err = c
+
+ if err:
+ six.reraise(*err)
+
+ yield res
+ finally:
+ for t in thrs:
+ in_q.put(None)
+
+ for t in thrs:
+ if join_polling is not None:
+ while True:
+ t.join(join_polling)
+ if not t.is_alive():
+ break
+ else:
+ t.join()
diff --git a/library/python/par_apply/ya.make b/library/python/par_apply/ya.make
new file mode 100644
index 0000000000..b14592ab79
--- /dev/null
+++ b/library/python/par_apply/ya.make
@@ -0,0 +1,11 @@
+PY23_LIBRARY()
+
+PEERDIR(
+ contrib/python/six
+)
+
+PY_SRCS(
+ __init__.py
+)
+
+END()