aboutsummaryrefslogtreecommitdiffstats
path: root/library/python/par_apply/__init__.py
diff options
context:
space:
mode:
authorprettyboy <prettyboy@yandex-team.com>2023-09-08 00:22:12 +0300
committerprettyboy <prettyboy@yandex-team.com>2023-09-08 00:46:04 +0300
commit3a6cd865171eed9b89bf536cd242285f8b583a91 (patch)
tree25e2756c125f7484fb118e0d5724212199662389 /library/python/par_apply/__init__.py
parent67f3f216950849664a29035458cfaa5d12a62846 (diff)
downloadydb-3a6cd865171eed9b89bf536cd242285f8b583a91.tar.gz
[build/plugins/ytest] Allow prebuilt linters for opensource
Без этого, ydb или не сможет запускать flake8 с помощью ya make. Или к ним поедет сборка flake8. Возможно последнее и не так плохо, но сейчас предлагается пока так
Diffstat (limited to 'library/python/par_apply/__init__.py')
-rw-r--r--library/python/par_apply/__init__.py114
1 files changed, 114 insertions, 0 deletions
diff --git a/library/python/par_apply/__init__.py b/library/python/par_apply/__init__.py
new file mode 100644
index 0000000000..19b89ae843
--- /dev/null
+++ b/library/python/par_apply/__init__.py
@@ -0,0 +1,114 @@
+import sys
+import threading
+import six
+
+from six.moves import queue
+
+
+def par_apply(seq, func, thr_num, join_polling=None):
+ if thr_num < 2:
+ for x in seq:
+ yield func(x)
+
+ return
+
+ in_q = queue.Queue()
+ out_q = queue.Queue()
+
+ def enumerate_blocks():
+ n = 0
+
+ for b in seq:
+ yield n, [b]
+ n += 1
+
+ yield n, None
+
+ def iter_out():
+ n = 0
+ d = {}
+
+ while True:
+ if n in d:
+ r = d[n]
+ del d[n]
+ n += 1
+
+ yield r
+ else:
+ res = out_q.get()
+
+ d[res[0]] = res
+
+ out_iter = iter_out()
+
+ def wait_block():
+ for x in out_iter:
+ return x
+
+ def iter_compressed():
+ p = 0
+
+ for n, b in enumerate_blocks():
+ in_q.put((n, b))
+
+ while n > p + (thr_num * 2):
+ p, b, c = wait_block()
+
+ if not b:
+ return
+
+ yield p, c
+
+ while True:
+ p, b, c = wait_block()
+
+ if not b:
+ return
+
+ yield p, c
+
+ def proc():
+ while True:
+ data = in_q.get()
+
+ if data is None:
+ return
+
+ n, b = data
+
+ if b:
+ try:
+ res = (func(b[0]), None)
+ except Exception:
+ res = (None, sys.exc_info())
+ else:
+ res = (None, None)
+
+ out_q.put((n, b, res))
+
+ thrs = [threading.Thread(target=proc) for i in range(0, thr_num)]
+
+ for t in thrs:
+ t.start()
+
+ try:
+ for p, c in iter_compressed():
+ res, err = c
+
+ if err:
+ six.reraise(*err)
+
+ yield res
+ finally:
+ for t in thrs:
+ in_q.put(None)
+
+ for t in thrs:
+ if join_polling is not None:
+ while True:
+ t.join(join_polling)
+ if not t.is_alive():
+ break
+ else:
+ t.join()