aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorrobot-piglet <robot-piglet@yandex-team.com>2023-12-05 13:21:52 +0300
committerrobot-piglet <robot-piglet@yandex-team.com>2023-12-05 13:59:38 +0300
commitc2c74635dbd451dfd2e13842854a412a4c43dc32 (patch)
tree75ef8263b117706d2e3dabc03011412ba3d4a305
parente69e63ed814792594791cf49bb976c9338bab02f (diff)
downloadydb-c2c74635dbd451dfd2e13842854a412a4c43dc32.tar.gz
Intermediate changes
-rw-r--r--contrib/python/boto3/py3/.dist-info/METADATA4
-rw-r--r--contrib/python/boto3/py3/boto3/__init__.py2
-rw-r--r--contrib/python/boto3/py3/ya.make2
-rw-r--r--contrib/python/pyasn1/py3/.dist-info/METADATA3
-rw-r--r--contrib/python/pyasn1/py3/pyasn1/__init__.py2
-rw-r--r--contrib/python/pyasn1/py3/pyasn1/codec/ber/decoder.py74
-rw-r--r--contrib/python/pyasn1/py3/tests/codec/ber/test_decoder.py126
-rw-r--r--contrib/python/pyasn1/py3/ya.make2
-rw-r--r--ydb/core/driver_lib/cli_utils/cli_cmds_server.cpp44
-rw-r--r--ydb/core/fq/libs/compute/ydb/status_tracker_actor.cpp25
-rw-r--r--ydb/core/kqp/common/events/query.h9
-rw-r--r--ydb/core/kqp/executer_actor/kqp_executer_impl.h19
-rw-r--r--ydb/core/kqp/executer_actor/kqp_executer_stats.cpp297
-rw-r--r--ydb/core/kqp/executer_actor/kqp_executer_stats.h87
-rw-r--r--ydb/core/kqp/gateway/kqp_gateway.h1
-rw-r--r--ydb/core/kqp/proxy_service/kqp_proxy_service.cpp29
-rw-r--r--ydb/core/kqp/proxy_service/kqp_script_executions.cpp57
-rw-r--r--ydb/core/kqp/proxy_service/kqp_script_executions.h1
-rw-r--r--ydb/core/kqp/run_script_actor/kqp_run_script_actor.cpp29
-rw-r--r--ydb/core/kqp/session_actor/kqp_query_state.h4
-rw-r--r--ydb/core/kqp/session_actor/kqp_session_actor.cpp26
-rw-r--r--ydb/core/protos/config.proto1
-rw-r--r--ydb/core/protos/console_config.proto14
-rw-r--r--ydb/core/protos/kqp.proto4
-rw-r--r--ydb/library/yql/sql/pg/pg_sql.cpp4
-rw-r--r--ydb/library/yql/sql/pg/pg_sql_ut.cpp14
-rw-r--r--ydb/tests/tools/fq_runner/kikimr_runner.py1
27 files changed, 820 insertions, 61 deletions
diff --git a/contrib/python/boto3/py3/.dist-info/METADATA b/contrib/python/boto3/py3/.dist-info/METADATA
index 1a0391e3b6..25169cdc26 100644
--- a/contrib/python/boto3/py3/.dist-info/METADATA
+++ b/contrib/python/boto3/py3/.dist-info/METADATA
@@ -1,6 +1,6 @@
Metadata-Version: 2.1
Name: boto3
-Version: 1.29.3
+Version: 1.29.4
Summary: The AWS SDK for Python
Home-page: https://github.com/boto/boto3
Author: Amazon Web Services
@@ -24,7 +24,7 @@ Classifier: Programming Language :: Python :: 3.12
Requires-Python: >= 3.7
License-File: LICENSE
License-File: NOTICE
-Requires-Dist: botocore (<1.33.0,>=1.32.3)
+Requires-Dist: botocore (<1.33.0,>=1.32.4)
Requires-Dist: jmespath (<2.0.0,>=0.7.1)
Requires-Dist: s3transfer (<0.8.0,>=0.7.0)
Provides-Extra: crt
diff --git a/contrib/python/boto3/py3/boto3/__init__.py b/contrib/python/boto3/py3/boto3/__init__.py
index cc10896a61..15c034de24 100644
--- a/contrib/python/boto3/py3/boto3/__init__.py
+++ b/contrib/python/boto3/py3/boto3/__init__.py
@@ -17,7 +17,7 @@ from boto3.compat import _warn_deprecated_python
from boto3.session import Session
__author__ = 'Amazon Web Services'
-__version__ = '1.29.3'
+__version__ = '1.29.4'
# The default Boto3 session; autoloaded when needed.
diff --git a/contrib/python/boto3/py3/ya.make b/contrib/python/boto3/py3/ya.make
index e5448d3a86..709c02709a 100644
--- a/contrib/python/boto3/py3/ya.make
+++ b/contrib/python/boto3/py3/ya.make
@@ -2,7 +2,7 @@
PY3_LIBRARY()
-VERSION(1.29.3)
+VERSION(1.29.4)
LICENSE(Apache-2.0)
diff --git a/contrib/python/pyasn1/py3/.dist-info/METADATA b/contrib/python/pyasn1/py3/.dist-info/METADATA
index 530fe5bf7b..1a6727cecc 100644
--- a/contrib/python/pyasn1/py3/.dist-info/METADATA
+++ b/contrib/python/pyasn1/py3/.dist-info/METADATA
@@ -1,6 +1,6 @@
Metadata-Version: 2.1
Name: pyasn1
-Version: 0.5.0
+Version: 0.5.1
Summary: Pure-Python implementation of ASN.1 types and DER/BER/CER codecs (X.208)
Home-page: https://github.com/pyasn1/pyasn1
Author: Ilya Etingof
@@ -32,6 +32,7 @@ Classifier: Programming Language :: Python :: 3.8
Classifier: Programming Language :: Python :: 3.9
Classifier: Programming Language :: Python :: 3.10
Classifier: Programming Language :: Python :: 3.11
+Classifier: Programming Language :: Python :: 3.12
Classifier: Programming Language :: Python :: Implementation :: CPython
Classifier: Programming Language :: Python :: Implementation :: PyPy
Classifier: Topic :: Communications
diff --git a/contrib/python/pyasn1/py3/pyasn1/__init__.py b/contrib/python/pyasn1/py3/pyasn1/__init__.py
index a979d291f2..73d47f3424 100644
--- a/contrib/python/pyasn1/py3/pyasn1/__init__.py
+++ b/contrib/python/pyasn1/py3/pyasn1/__init__.py
@@ -1,2 +1,2 @@
# https://www.python.org/dev/peps/pep-0396/
-__version__ = '0.5.0'
+__version__ = '0.5.1'
diff --git a/contrib/python/pyasn1/py3/pyasn1/codec/ber/decoder.py b/contrib/python/pyasn1/py3/pyasn1/codec/ber/decoder.py
index 070733fd28..7cc863d1c7 100644
--- a/contrib/python/pyasn1/py3/pyasn1/codec/ber/decoder.py
+++ b/contrib/python/pyasn1/py3/pyasn1/codec/ber/decoder.py
@@ -4,7 +4,10 @@
# Copyright (c) 2005-2020, Ilya Etingof <etingof@gmail.com>
# License: https://pyasn1.readthedocs.io/en/latest/license.html
#
+import io
import os
+import sys
+
from pyasn1 import debug
from pyasn1 import error
@@ -1762,7 +1765,14 @@ class SingleItemDecoder(object):
if state is stDecodeValue:
if not options.get('recursiveFlag', True) and not substrateFun: # deprecate this
- substrateFun = lambda a, b, c: (a, b[:c])
+ def substrateFun(asn1Object, _substrate, _length, _options):
+ """Legacy hack to keep the recursiveFlag=False option supported.
+
+ The decode(..., substrateFun=userCallback) option was introduced in 0.1.4 as a generalization
+ of the old recursiveFlag=False option. Users should pass their callback instead of using
+ recursiveFlag.
+ """
+ yield asn1Object
original_position = substrate.tell()
@@ -1783,9 +1793,13 @@ class SingleItemDecoder(object):
yield value
bytesRead = substrate.tell() - original_position
- if bytesRead != length:
+ if not substrateFun and bytesRead != length:
raise PyAsn1Error(
"Read %s bytes instead of expected %s." % (bytesRead, length))
+ elif substrateFun and bytesRead > length:
+ # custom substrateFun may be used for partial decoding, reading less is expected there
+ raise PyAsn1Error(
+ "Read %s bytes are more than expected %s." % (bytesRead, length))
if LOG:
LOG('codec %s yields type %s, value:\n%s\n...' % (
@@ -1959,6 +1973,27 @@ class Decoder(object):
may not be required. Most common reason for it to require is that
ASN.1 structure is encoded in *IMPLICIT* tagging mode.
+ substrateFun: :py:class:`Union[
+ Callable[[pyasn1.type.base.PyAsn1Item, bytes, int],
+ Tuple[pyasn1.type.base.PyAsn1Item, bytes]],
+ Callable[[pyasn1.type.base.PyAsn1Item, io.BytesIO, int, dict],
+ Generator[Union[pyasn1.type.base.PyAsn1Item,
+ pyasn1.error.SubstrateUnderrunError],
+ None, None]]
+ ]`
+ User callback meant to generalize special use cases like non-recursive or
+ partial decoding. A 3-arg non-streaming variant is supported for backwards
+ compatiblilty in addition to the newer 4-arg streaming variant.
+ The callback will receive the uninitialized object recovered from substrate
+ as 1st argument, the uninterpreted payload as 2nd argument, and the length
+ of the uninterpreted payload as 3rd argument. The streaming variant will
+ additionally receive the decode(..., **options) kwargs as 4th argument.
+ The non-streaming variant shall return an object that will be propagated
+ as decode() return value as 1st item, and the remainig payload for further
+ decode passes as 2nd item.
+ The streaming variant shall yield an object that will be propagated as
+ decode() return value, and leave the remaining payload in the stream.
+
Returns
-------
: :py:class:`tuple`
@@ -1997,6 +2032,31 @@ class Decoder(object):
"""
substrate = asSeekableStream(substrate)
+ if "substrateFun" in options:
+ origSubstrateFun = options["substrateFun"]
+
+ def substrateFunWrapper(asn1Object, substrate, length, options=None):
+ """Support both 0.4 and 0.5 style APIs.
+
+ substrateFun API has changed in 0.5 for use with streaming decoders. To stay backwards compatible,
+ we first try if we received a streaming user callback. If that fails,we assume we've received a
+ non-streaming v0.4 user callback and convert it for streaming on the fly
+ """
+ try:
+ substrate_gen = origSubstrateFun(asn1Object, substrate, length, options)
+ except TypeError:
+ _type, _value, traceback = sys.exc_info()
+ if traceback.tb_next:
+ # Traceback depth > 1 means TypeError from inside user provided function
+ raise
+ # invariant maintained at Decoder.__call__ entry
+ assert isinstance(substrate, io.BytesIO) # nosec assert_used
+ substrate_gen = Decoder._callSubstrateFunV4asV5(origSubstrateFun, asn1Object, substrate, length)
+ for value in substrate_gen:
+ yield value
+
+ options["substrateFun"] = substrateFunWrapper
+
streamingDecoder = cls.STREAMING_DECODER(
substrate, asn1Spec, **options)
@@ -2012,6 +2072,16 @@ class Decoder(object):
return asn1Object, tail
+ @staticmethod
+ def _callSubstrateFunV4asV5(substrateFunV4, asn1Object, substrate, length):
+ substrate_bytes = substrate.read()
+ if length == -1:
+ length = len(substrate_bytes)
+ value, nextSubstrate = substrateFunV4(asn1Object, substrate_bytes, length)
+ nbytes = substrate.write(nextSubstrate)
+ substrate.truncate()
+ substrate.seek(-nbytes, os.SEEK_CUR)
+ yield value
#: Turns BER octet stream into an ASN.1 object.
#:
diff --git a/contrib/python/pyasn1/py3/tests/codec/ber/test_decoder.py b/contrib/python/pyasn1/py3/tests/codec/ber/test_decoder.py
index 9e238cd458..35d12d0536 100644
--- a/contrib/python/pyasn1/py3/tests/codec/ber/test_decoder.py
+++ b/contrib/python/pyasn1/py3/tests/codec/ber/test_decoder.py
@@ -141,12 +141,24 @@ class BitStringDecoderTestCase(BaseTestCase):
substrateFun=lambda a, b, c, d: streaming.readFromStream(b, c)
) == (ints2octs((3, 2, 0, 169, 3, 2, 1, 138)), str2octs(''))
+ def testDefModeChunkedSubstV04(self):
+ assert decoder.decode(
+ ints2octs((35, 8, 3, 2, 0, 169, 3, 2, 1, 138)),
+ substrateFun=lambda a, b, c: (b, b[c:])
+ ) == (ints2octs((3, 2, 0, 169, 3, 2, 1, 138)), str2octs(''))
+
def testIndefModeChunkedSubst(self):
assert decoder.decode(
ints2octs((35, 128, 3, 2, 0, 169, 3, 2, 1, 138, 0, 0)),
substrateFun=lambda a, b, c, d: streaming.readFromStream(b, c)
) == (ints2octs((3, 2, 0, 169, 3, 2, 1, 138, 0, 0)), str2octs(''))
+ def testIndefModeChunkedSubstV04(self):
+ assert decoder.decode(
+ ints2octs((35, 128, 3, 2, 0, 169, 3, 2, 1, 138, 0, 0)),
+ substrateFun=lambda a, b, c: (b, b[c:])
+ ) == (ints2octs((3, 2, 0, 169, 3, 2, 1, 138, 0, 0)), str2octs(''))
+
def testTypeChecking(self):
try:
decoder.decode(ints2octs((35, 4, 2, 2, 42, 42)))
@@ -185,6 +197,13 @@ class OctetStringDecoderTestCase(BaseTestCase):
substrateFun=lambda a, b, c, d: streaming.readFromStream(b, c)
) == (ints2octs((4, 4, 81, 117, 105, 99, 4, 4, 107, 32, 98, 114, 4, 4, 111, 119, 110, 32, 4, 3, 102, 111, 120)), str2octs(''))
+ def testDefModeChunkedSubstV04(self):
+ assert decoder.decode(
+ ints2octs(
+ (36, 23, 4, 4, 81, 117, 105, 99, 4, 4, 107, 32, 98, 114, 4, 4, 111, 119, 110, 32, 4, 3, 102, 111, 120)),
+ substrateFun=lambda a, b, c: (b, b[c:])
+ ) == (ints2octs((4, 4, 81, 117, 105, 99, 4, 4, 107, 32, 98, 114, 4, 4, 111, 119, 110, 32, 4, 3, 102, 111, 120)), str2octs(''))
+
def testIndefModeChunkedSubst(self):
assert decoder.decode(
ints2octs((36, 128, 4, 4, 81, 117, 105, 99, 4, 4, 107, 32, 98, 114, 4, 4, 111, 119, 110, 32, 4, 3, 102, 111,
@@ -193,6 +212,14 @@ class OctetStringDecoderTestCase(BaseTestCase):
) == (ints2octs(
(4, 4, 81, 117, 105, 99, 4, 4, 107, 32, 98, 114, 4, 4, 111, 119, 110, 32, 4, 3, 102, 111, 120, 0, 0)), str2octs(''))
+ def testIndefModeChunkedSubstV04(self):
+ assert decoder.decode(
+ ints2octs((36, 128, 4, 4, 81, 117, 105, 99, 4, 4, 107, 32, 98, 114, 4, 4, 111, 119, 110, 32, 4, 3, 102, 111,
+ 120, 0, 0)),
+ substrateFun=lambda a, b, c: (b, b[c:])
+ ) == (ints2octs(
+ (4, 4, 81, 117, 105, 99, 4, 4, 107, 32, 98, 114, 4, 4, 111, 119, 110, 32, 4, 3, 102, 111, 120, 0, 0)), str2octs(''))
+
class ExpTaggedOctetStringDecoderTestCase(BaseTestCase):
def setUp(self):
@@ -245,6 +272,12 @@ class ExpTaggedOctetStringDecoderTestCase(BaseTestCase):
substrateFun=lambda a, b, c, d: streaming.readFromStream(b, c)
) == (ints2octs((4, 15, 81, 117, 105, 99, 107, 32, 98, 114, 111, 119, 110, 32, 102, 111, 120)), str2octs(''))
+ def testDefModeSubstV04(self):
+ assert decoder.decode(
+ ints2octs((101, 17, 4, 15, 81, 117, 105, 99, 107, 32, 98, 114, 111, 119, 110, 32, 102, 111, 120)),
+ substrateFun=lambda a, b, c: (b, b[c:])
+ ) == (ints2octs((4, 15, 81, 117, 105, 99, 107, 32, 98, 114, 111, 119, 110, 32, 102, 111, 120)), str2octs(''))
+
def testIndefModeSubst(self):
assert decoder.decode(
ints2octs((
@@ -254,6 +287,15 @@ class ExpTaggedOctetStringDecoderTestCase(BaseTestCase):
) == (ints2octs(
(36, 128, 4, 15, 81, 117, 105, 99, 107, 32, 98, 114, 111, 119, 110, 32, 102, 111, 120, 0, 0, 0, 0)), str2octs(''))
+ def testIndefModeSubstV04(self):
+ assert decoder.decode(
+ ints2octs((
+ 101, 128, 36, 128, 4, 15, 81, 117, 105, 99, 107, 32, 98, 114, 111, 119, 110, 32, 102, 111, 120, 0,
+ 0, 0, 0)),
+ substrateFun=lambda a, b, c: (b, b[c:])
+ ) == (ints2octs(
+ (36, 128, 4, 15, 81, 117, 105, 99, 107, 32, 98, 114, 111, 119, 110, 32, 102, 111, 120, 0, 0, 0, 0)), str2octs(''))
+
class NullDecoderTestCase(BaseTestCase):
def testNull(self):
@@ -680,6 +722,12 @@ class SequenceDecoderTestCase(BaseTestCase):
substrateFun=lambda a, b, c, d: streaming.readFromStream(b, c)
) == (ints2octs((5, 0, 4, 11, 113, 117, 105, 99, 107, 32, 98, 114, 111, 119, 110, 2, 1, 1)), str2octs(''))
+ def testWithOptionalAndDefaultedDefModeSubstV04(self):
+ assert decoder.decode(
+ ints2octs((48, 18, 5, 0, 4, 11, 113, 117, 105, 99, 107, 32, 98, 114, 111, 119, 110, 2, 1, 1)),
+ substrateFun=lambda a, b, c: (b, b[c:])
+ ) == (ints2octs((5, 0, 4, 11, 113, 117, 105, 99, 107, 32, 98, 114, 111, 119, 110, 2, 1, 1)), str2octs(''))
+
def testWithOptionalAndDefaultedIndefModeSubst(self):
assert decoder.decode(
ints2octs((48, 128, 5, 0, 36, 128, 4, 11, 113, 117, 105, 99, 107, 32, 98, 114, 111, 119, 110, 0, 0, 2, 1, 1, 0, 0)),
@@ -687,6 +735,13 @@ class SequenceDecoderTestCase(BaseTestCase):
) == (ints2octs(
(5, 0, 36, 128, 4, 11, 113, 117, 105, 99, 107, 32, 98, 114, 111, 119, 110, 0, 0, 2, 1, 1, 0, 0)), str2octs(''))
+ def testWithOptionalAndDefaultedIndefModeSubstV04(self):
+ assert decoder.decode(
+ ints2octs((48, 128, 5, 0, 36, 128, 4, 11, 113, 117, 105, 99, 107, 32, 98, 114, 111, 119, 110, 0, 0, 2, 1, 1, 0, 0)),
+ substrateFun=lambda a, b, c: (b, b[c:])
+ ) == (ints2octs(
+ (5, 0, 36, 128, 4, 11, 113, 117, 105, 99, 107, 32, 98, 114, 111, 119, 110, 0, 0, 2, 1, 1, 0, 0)), str2octs(''))
+
def testTagFormat(self):
try:
decoder.decode(
@@ -1166,6 +1221,12 @@ class SetDecoderTestCase(BaseTestCase):
substrateFun=lambda a, b, c, d: streaming.readFromStream(b, c)
) == (ints2octs((5, 0, 4, 11, 113, 117, 105, 99, 107, 32, 98, 114, 111, 119, 110, 2, 1, 1)), str2octs(''))
+ def testWithOptionalAndDefaultedDefModeSubstV04(self):
+ assert decoder.decode(
+ ints2octs((49, 18, 5, 0, 4, 11, 113, 117, 105, 99, 107, 32, 98, 114, 111, 119, 110, 2, 1, 1)),
+ substrateFun=lambda a, b, c: (b, b[c:])
+ ) == (ints2octs((5, 0, 4, 11, 113, 117, 105, 99, 107, 32, 98, 114, 111, 119, 110, 2, 1, 1)), str2octs(''))
+
def testWithOptionalAndDefaultedIndefModeSubst(self):
assert decoder.decode(
ints2octs((49, 128, 5, 0, 36, 128, 4, 11, 113, 117, 105, 99, 107, 32, 98, 114, 111, 119, 110, 0, 0, 2, 1, 1, 0, 0)),
@@ -1173,6 +1234,13 @@ class SetDecoderTestCase(BaseTestCase):
) == (ints2octs(
(5, 0, 36, 128, 4, 11, 113, 117, 105, 99, 107, 32, 98, 114, 111, 119, 110, 0, 0, 2, 1, 1, 0, 0)), str2octs(''))
+ def testWithOptionalAndDefaultedIndefModeSubstV04(self):
+ assert decoder.decode(
+ ints2octs((49, 128, 5, 0, 36, 128, 4, 11, 113, 117, 105, 99, 107, 32, 98, 114, 111, 119, 110, 0, 0, 2, 1, 1, 0, 0)),
+ substrateFun=lambda a, b, c: (b, b[c:])
+ ) == (ints2octs(
+ (5, 0, 36, 128, 4, 11, 113, 117, 105, 99, 107, 32, 98, 114, 111, 119, 110, 0, 0, 2, 1, 1, 0, 0)), str2octs(''))
+
def testTagFormat(self):
try:
decoder.decode(
@@ -1498,6 +1566,13 @@ class AnyDecoderTestCase(BaseTestCase):
substrateFun=lambda a, b, c, d: streaming.readFromStream(b, c)
) == (ints2octs((4, 3, 102, 111, 120)), str2octs(''))
+ def testByUntaggedSubstV04(self):
+ assert decoder.decode(
+ ints2octs((4, 3, 102, 111, 120)),
+ asn1Spec=self.s,
+ substrateFun=lambda a, b, c: (b, b[c:])
+ ) == (ints2octs((4, 3, 102, 111, 120)), str2octs(''))
+
def testTaggedExSubst(self):
assert decoder.decode(
ints2octs((164, 5, 4, 3, 102, 111, 120)),
@@ -1505,6 +1580,13 @@ class AnyDecoderTestCase(BaseTestCase):
substrateFun=lambda a, b, c, d: streaming.readFromStream(b, c)
) == (ints2octs((164, 5, 4, 3, 102, 111, 120)), str2octs(''))
+ def testTaggedExSubstV04(self):
+ assert decoder.decode(
+ ints2octs((164, 5, 4, 3, 102, 111, 120)),
+ asn1Spec=self.s,
+ substrateFun=lambda a, b, c: (b, b[c:])
+ ) == (ints2octs((164, 5, 4, 3, 102, 111, 120)), str2octs(''))
+
class EndOfOctetsTestCase(BaseTestCase):
def testUnexpectedEoo(self):
@@ -1841,6 +1923,50 @@ class CompressedFilesTestCase(BaseTestCase):
os.remove(path)
+class NonStreamingCompatibilityTestCase(BaseTestCase):
+ def setUp(self):
+ from pyasn1 import debug
+ BaseTestCase.setUp(self)
+ debug.setLogger(None) # undo logger setup from BaseTestCase to work around unrelated issue
+
+ def testPartialDecodeWithCustomSubstrateFun(self):
+ snmp_req_substrate = ints2octs((
+ 0x30, 0x22, 0x02, 0x01, 0x01, 0x04, 0x06, 0x70, 0x75, 0x62, 0x6c, 0x69, 0x63, 0xa0, 0x15, 0x02, 0x04, 0x69,
+ 0x30, 0xdb, 0xeb, 0x02, 0x01, 0x00, 0x02, 0x01, 0x00, 0x30, 0x07, 0x30, 0x05, 0x06, 0x01, 0x01, 0x05, 0x00))
+ seq, next_substrate = decoder.decode(
+ snmp_req_substrate, asn1Spec=univ.Sequence(),
+ recursiveFlag=False, substrateFun=lambda a, b, c: (a, b[:c])
+ )
+ assert seq.isSameTypeWith(univ.Sequence)
+ assert next_substrate == snmp_req_substrate[2:]
+ version, next_substrate = decoder.decode(
+ next_substrate, asn1Spec=univ.Integer(), recursiveFlag=False,
+ substrateFun=lambda a, b, c: (a, b[:c])
+ )
+ assert version == 1
+
+ def testPartialDecodeWithDefaultSubstrateFun(self):
+ substrate = ints2octs((
+ 0x04, 0x0e, 0x30, 0x0c, 0x06, 0x0a, 0x2b, 0x06, 0x01, 0x04, 0x01, 0x82, 0x37, 0x3c, 0x03, 0x02
+ ))
+ result, rest = decoder.decode(substrate, recursiveFlag=False)
+ assert result.isSameTypeWith(univ.OctetString)
+ assert rest == substrate[2:]
+
+ def testPropagateUserException(self):
+ substrate = io.BytesIO(ints2octs((0x04, 0x00)))
+
+ def userSubstrateFun(_asn1Object, _substrate, _length, _options):
+ raise TypeError("error inside user function")
+
+ try:
+ decoder.decode(substrate, asn1Spec=univ.OctetString, substrateFun=userSubstrateFun)
+ except TypeError as exc:
+ assert str(exc) == "error inside user function"
+ else:
+ raise AssertionError("decode() must not hide TypeError from inside user provided callback")
+
+
suite = unittest.TestLoader().loadTestsFromModule(sys.modules[__name__])
if __name__ == '__main__':
diff --git a/contrib/python/pyasn1/py3/ya.make b/contrib/python/pyasn1/py3/ya.make
index 772312ad0e..4e59f746fa 100644
--- a/contrib/python/pyasn1/py3/ya.make
+++ b/contrib/python/pyasn1/py3/ya.make
@@ -2,7 +2,7 @@
PY3_LIBRARY()
-VERSION(0.5.0)
+VERSION(0.5.1)
LICENSE(BSD-3-Clause)
diff --git a/ydb/core/driver_lib/cli_utils/cli_cmds_server.cpp b/ydb/core/driver_lib/cli_utils/cli_cmds_server.cpp
index b466476a18..7496bf45f3 100644
--- a/ydb/core/driver_lib/cli_utils/cli_cmds_server.cpp
+++ b/ydb/core/driver_lib/cli_utils/cli_cmds_server.cpp
@@ -325,14 +325,14 @@ protected:
}
void AddLabelToAppConfig(const TString& name, const TString& value) {
- for (auto &label : *RunConfig.AppConfig.MutableLabels()) {
+ for (auto &label : *AppConfig.MutableLabels()) {
if (label.GetName() == name) {
label.SetValue(value);
return;
}
}
- auto *label = RunConfig.AppConfig.AddLabels();
+ auto *label = AppConfig.AddLabels();
label->SetName(name);
label->SetValue(value);
}
@@ -414,7 +414,7 @@ protected:
RunConfig.Labels["dynamic"] = ToString(NodeBrokerAddresses.empty() ? "false" : "true");
for (const auto& [name, value] : RunConfig.Labels) {
- auto *label = RunConfig.AppConfig.AddLabels();
+ auto *label = AppConfig.AddLabels();
label->SetName(name);
label->SetValue(value);
}
@@ -594,7 +594,7 @@ protected:
if (!AppConfig.HasRestartsCountConfig() && RestartsCountFile)
AppConfig.MutableRestartsCountConfig()->SetRestartsCountFile(RestartsCountFile);
- // Ports and node type are always applied (event if config was loaded from CMS).
+ // Ports and node type are always applied (even if config was loaded from CMS).
if (MonitoringPort)
AppConfig.MutableMonitoringConfig()->SetMonitoringPort(MonitoringPort);
if (MonitoringAddress)
@@ -608,7 +608,7 @@ protected:
}
}
if (SqsHttpPort)
- RunConfig.AppConfig.MutableSqsConfig()->MutableHttpServerConfig()->SetPort(SqsHttpPort);
+ AppConfig.MutableSqsConfig()->MutableHttpServerConfig()->SetPort(SqsHttpPort);
if (GRpcPort) {
auto& conf = *AppConfig.MutableGRpcConfig();
conf.SetStartGRpcProxy(true);
@@ -742,13 +742,13 @@ protected:
messageBusConfig->SetTracePath(TracePath);
}
- if (RunConfig.AppConfig.HasDynamicNameserviceConfig()) {
- bool isDynamic = RunConfig.NodeId > RunConfig.AppConfig.GetDynamicNameserviceConfig().GetMaxStaticNodeId();
+ if (AppConfig.HasDynamicNameserviceConfig()) {
+ bool isDynamic = RunConfig.NodeId > AppConfig.GetDynamicNameserviceConfig().GetMaxStaticNodeId();
RunConfig.Labels["dynamic"] = ToString(isDynamic ? "true" : "false");
AddLabelToAppConfig("node_id", RunConfig.Labels["node_id"]);
}
- RunConfig.ClusterName = RunConfig.AppConfig.GetNameserviceConfig().GetClusterUUID();
+ RunConfig.ClusterName = AppConfig.GetNameserviceConfig().GetClusterUUID();
}
inline bool LoadConfigFromCMS() {
@@ -936,24 +936,6 @@ protected:
}
}
- void MaybeRegisterAndLoadConfigs()
- {
- // static node
- if (NodeBrokerAddresses.empty() && !NodeBrokerPort) {
- if (!NodeId) {
- ythrow yexception() << "Either --node [NUM|'static'] or --node-broker[-port] should be specified";
- }
-
- if (!HierarchicalCfg && RunConfig.PathToConfigCacheFile)
- LoadCachedConfigsForStaticNode();
- return;
- }
-
- RegisterDynamicNode();
- if (!HierarchicalCfg && !IgnoreCmsConfigs)
- LoadConfigForDynamicNode();
- }
-
TNodeLocation CreateNodeLocation() {
NActorsInterconnect::TNodeLocation location;
location.SetDataCenter(DataCenter);
@@ -1055,7 +1037,7 @@ protected:
}
} else {
Y_ABORT_UNLESS(NodeBrokerPort);
- for (auto &node : RunConfig.AppConfig.MutableNameserviceConfig()->GetNode()) {
+ for (auto &node : AppConfig.MutableNameserviceConfig()->GetNode()) {
addrs.emplace_back(TStringBuilder() << (NodeBrokerUseTls ? "grpcs://" : "") << node.GetHost() << ':' << NodeBrokerPort);
}
}
@@ -1111,10 +1093,10 @@ protected:
}
RunConfig.ScopeId = TKikimrScopeId(scopeId);
- auto &nsConfig = *RunConfig.AppConfig.MutableNameserviceConfig();
+ auto &nsConfig = *AppConfig.MutableNameserviceConfig();
nsConfig.ClearNode();
- auto &dnConfig = *RunConfig.AppConfig.MutableDynamicNodeConfig();
+ auto &dnConfig = *AppConfig.MutableDynamicNodeConfig();
for (auto &node : result.GetNodes()) {
if (node.NodeId == result.GetNodeId()) {
auto nodeInfo = dnConfig.MutableNodeInfo();
@@ -1223,10 +1205,10 @@ protected:
RunConfig.NodeId = result->GetNodeId();
RunConfig.ScopeId = TKikimrScopeId(result->GetScopeId());
- auto &nsConfig = *RunConfig.AppConfig.MutableNameserviceConfig();
+ auto &nsConfig = *AppConfig.MutableNameserviceConfig();
nsConfig.ClearNode();
- auto &dnConfig = *RunConfig.AppConfig.MutableDynamicNodeConfig();
+ auto &dnConfig = *AppConfig.MutableDynamicNodeConfig();
for (auto &node : result->Record().GetNodes()) {
if (node.GetNodeId() == result->GetNodeId()) {
dnConfig.MutableNodeInfo()->CopyFrom(node);
diff --git a/ydb/core/fq/libs/compute/ydb/status_tracker_actor.cpp b/ydb/core/fq/libs/compute/ydb/status_tracker_actor.cpp
index 659ae14b6e..ef628906ce 100644
--- a/ydb/core/fq/libs/compute/ydb/status_tracker_actor.cpp
+++ b/ydb/core/fq/libs/compute/ydb/status_tracker_actor.cpp
@@ -96,6 +96,11 @@ public:
void Handle(const TEvents::TEvForwardPingResponse::TPtr& ev) {
auto pingCounters = Counters.GetCounters(ERequestType::RT_PING);
pingCounters->InFly->Dec();
+
+ if (ev->Cookie) {
+ return;
+ }
+
pingCounters->LatencyMs->Collect((TInstant::Now() - StartTime).MilliSeconds());
if (ev.Get()->Get()->Success) {
pingCounters->Ok->Inc();
@@ -125,6 +130,8 @@ public:
case NYdb::NQuery::EExecStatus::Unspecified:
case NYdb::NQuery::EExecStatus::Starting:
SendGetOperation(TDuration::MilliSeconds(BackoffTimer.NextBackoffMs()));
+ QueryStats = response.QueryStats;
+ UpdateProgress();
break;
case NYdb::NQuery::EExecStatus::Aborted:
case NYdb::NQuery::EExecStatus::Canceled:
@@ -150,6 +157,19 @@ public:
Register(new TRetryActor<TEvYdbCompute::TEvGetOperationRequest, TEvYdbCompute::TEvGetOperationResponse, NYdb::TOperation::TOperationId>(Counters.GetCounters(ERequestType::RT_GET_OPERATION), delay, SelfId(), Connector, OperationId));
}
+ void UpdateProgress() {
+ auto pingCounters = Counters.GetCounters(ERequestType::RT_PING);
+ pingCounters->InFly->Inc();
+ Fq::Private::PingTaskRequest pingTaskRequest;
+ PrepareAstAndPlan(pingTaskRequest, QueryStats.query_plan(), QueryStats.query_ast());
+ try {
+ pingTaskRequest.set_statistics(GetV1StatFromV2Plan(QueryStats.query_plan()));
+ } catch(const NJson::TJsonException& ex) {
+ LOG_E("Error statistics conversion: " << ex.what());
+ }
+ Send(Pinger, new TEvents::TEvForwardPingRequest(pingTaskRequest), 0, 1);
+ }
+
void Failed() {
LOG_I("Execution status: Failed, Status: " << Status << ", StatusCode: " << NYql::NDqProto::StatusIds::StatusCode_Name(StatusCode) << " Issues: " << Issues.ToOneLineString());
auto pingCounters = Counters.GetCounters(ERequestType::RT_PING);
@@ -158,6 +178,11 @@ public:
NYql::IssuesToMessage(Issues, pingTaskRequest.mutable_issues());
pingTaskRequest.set_pending_status_code(StatusCode);
PrepareAstAndPlan(pingTaskRequest, QueryStats.query_plan(), QueryStats.query_ast());
+ try {
+ pingTaskRequest.set_statistics(GetV1StatFromV2Plan(QueryStats.query_plan()));
+ } catch(const NJson::TJsonException& ex) {
+ LOG_E("Error statistics conversion: " << ex.what());
+ }
Send(Pinger, new TEvents::TEvForwardPingRequest(pingTaskRequest));
}
diff --git a/ydb/core/kqp/common/events/query.h b/ydb/core/kqp/common/events/query.h
index d365551e19..2f269d85f3 100644
--- a/ydb/core/kqp/common/events/query.h
+++ b/ydb/core/kqp/common/events/query.h
@@ -274,6 +274,14 @@ public:
return UserRequestContext;
}
+ void SetProgressStatsPeriod(TDuration progressStatsPeriod) {
+ ProgressStatsPeriod = progressStatsPeriod;
+ }
+
+ TDuration GetProgressStatsPeriod() const {
+ return ProgressStatsPeriod;
+ }
+
mutable NKikimrKqp::TEvQueryRequest Record;
private:
@@ -301,6 +309,7 @@ private:
TDuration CancelAfter;
const ::Ydb::Query::Syntax Syntax = Ydb::Query::Syntax::SYNTAX_UNSPECIFIED;
TIntrusivePtr<TUserRequestContext> UserRequestContext;
+ TDuration ProgressStatsPeriod;
};
struct TEvDataQueryStreamPart: public TEventPB<TEvDataQueryStreamPart,
diff --git a/ydb/core/kqp/executer_actor/kqp_executer_impl.h b/ydb/core/kqp/executer_actor/kqp_executer_impl.h
index a3fb572a8b..aeec263682 100644
--- a/ydb/core/kqp/executer_actor/kqp_executer_impl.h
+++ b/ydb/core/kqp/executer_actor/kqp_executer_impl.h
@@ -23,6 +23,7 @@
#include <ydb/core/kqp/common/kqp_yql.h>
#include <ydb/core/kqp/common/kqp_user_request_context.h>
#include <ydb/core/kqp/federated_query/kqp_federated_query_actors.h>
+#include <ydb/core/kqp/opt/kqp_query_plan.h>
#include <ydb/core/grpc_services/local_rate_limiter.h>
#include <ydb/services/metadata/secret/fetcher.h>
@@ -243,6 +244,23 @@ protected:
<< ", state: " << NYql::NDqProto::EComputeState_Name((NYql::NDqProto::EComputeState) state.GetState())
<< ", stats: " << state.GetStats());
+ if (Stats && state.HasStats() && Request.ProgressStatsPeriod) {
+ Stats->UpdateTaskStats(taskId, state.GetStats());
+ auto now = TInstant::Now();
+ if (LastProgressStats + Request.ProgressStatsPeriod <= now) {
+ auto progress = MakeHolder<TEvKqpExecuter::TEvExecuterProgress>();
+ auto& execStats = *progress->Record.MutableQueryStats()->AddExecutions();
+ Stats->ExportExecStats(execStats);
+ for (ui32 txId = 0; txId < Request.Transactions.size(); ++txId) {
+ const auto& tx = Request.Transactions[txId].Body;
+ auto planWithStats = AddExecStatsToTxPlan(tx->GetPlan(), execStats);
+ execStats.AddTxPlansWithStats(planWithStats);
+ }
+ this->Send(Target, progress.Release());
+ LastProgressStats = now;
+ }
+ }
+
switch (state.GetState()) {
case NYql::NDqProto::COMPUTE_STATE_UNKNOWN: {
YQL_ENSURE(false, "unexpected state from " << computeActor << ", task: " << taskId);
@@ -1670,6 +1688,7 @@ protected:
const TIntrusiveConstPtr<NACLib::TUserToken> UserToken;
TKqpRequestCounters::TPtr Counters;
std::shared_ptr<TQueryExecutionStats> Stats;
+ TInstant LastProgressStats;
TInstant StartTime;
TMaybe<TInstant> Deadline;
TMaybe<TInstant> CancelAt;
diff --git a/ydb/core/kqp/executer_actor/kqp_executer_stats.cpp b/ydb/core/kqp/executer_actor/kqp_executer_stats.cpp
index 584ea11e4f..55393ba34d 100644
--- a/ydb/core/kqp/executer_actor/kqp_executer_stats.cpp
+++ b/ydb/core/kqp/executer_actor/kqp_executer_stats.cpp
@@ -6,6 +6,154 @@ namespace NKikimr::NKqp {
using namespace NYql;
using namespace NYql::NDq;
+void TAsyncStats::Resize(ui32 taskCount) {
+ Bytes.resize(taskCount);
+ Rows.resize(taskCount);
+ Chunks.resize(taskCount);
+ Splits.resize(taskCount);
+ FirstMessageMs.resize(taskCount);
+ PauseMessageMs.resize(taskCount);
+ ResumeMessageMs.resize(taskCount);
+ LastMessageMs.resize(taskCount);
+ WaitTimeUs.resize(taskCount);
+ WaitPeriods.resize(taskCount);
+ ActiveTimeUs.resize(taskCount);
+}
+
+void TAsyncBufferStats::Resize(ui32 taskCount) {
+ Ingress.Resize(taskCount);
+ Push.Resize(taskCount);
+ Pop.Resize(taskCount);
+ Egress.Resize(taskCount);
+}
+
+void TTableStats::Resize(ui32 taskCount) {
+ ReadRows.resize(taskCount);
+ ReadBytes.resize(taskCount);
+ WriteRows.resize(taskCount);
+ WriteBytes.resize(taskCount);
+ EraseRows.resize(taskCount);
+ EraseBytes.resize(taskCount);
+ AffectedPartitions.resize(taskCount);
+}
+
+void TStageExecutionStats::Resize(ui32 taskCount) {
+ CpuTimeUs.resize(taskCount);
+ SourceCpuTimeUs.resize(taskCount);
+
+ InputRows.resize(taskCount);
+ InputBytes.resize(taskCount);
+ OutputRows.resize(taskCount);
+ OutputBytes.resize(taskCount);
+
+ FirstRowTimeMs.resize(taskCount);
+ FinishTimeMs.resize(taskCount);
+ StartTimeMs.resize(taskCount);
+
+ for (auto& p : Ingress) p.second.Resize(taskCount);
+ for (auto& p : Egress) p.second.Resize(taskCount);
+ for (auto& p : Input) p.second.Resize(taskCount);
+ for (auto& p : Output) p.second.Resize(taskCount);
+
+ MaxMemoryUsage.resize(taskCount);
+}
+
+void TStageExecutionStats::UpdateAsyncStats(i32 index, TAsyncStats& aggrAsyncStats, const NYql::NDqProto::TDqAsyncBufferStats& asyncStats) {
+ aggrAsyncStats.Bytes[index] = asyncStats.GetBytes();
+ aggrAsyncStats.Rows[index] = asyncStats.GetRows();
+ aggrAsyncStats.Chunks[index] = asyncStats.GetChunks();
+ aggrAsyncStats.Splits[index] = asyncStats.GetSplits();
+
+ auto firstMessageMs = asyncStats.GetFirstMessageMs();
+ aggrAsyncStats.FirstMessageMs[index] = firstMessageMs;
+ aggrAsyncStats.PauseMessageMs[index] = asyncStats.GetPauseMessageMs();
+ aggrAsyncStats.ResumeMessageMs[index] = asyncStats.GetResumeMessageMs();
+ auto lastMessageMs = asyncStats.GetLastMessageMs();
+ aggrAsyncStats.LastMessageMs[index] = lastMessageMs;
+ aggrAsyncStats.WaitTimeUs[index] = asyncStats.GetWaitTimeUs();
+ aggrAsyncStats.WaitPeriods[index] = asyncStats.GetWaitPeriods();
+ if (firstMessageMs && lastMessageMs > firstMessageMs) {
+ aggrAsyncStats.ActiveTimeUs[index] = lastMessageMs - firstMessageMs;
+ }
+}
+
+void TStageExecutionStats::UpdateStats(const NYql::NDqProto::TDqTaskStats& taskStats, ui64 maxMemoryUsage) {
+ auto taskId = taskStats.GetTaskId();
+ auto it = Task2Index.find(taskId);
+
+ ui32 taskCount = Task2Index.size();
+
+ ui32 index;
+ if (it == Task2Index.end()) {
+ index = taskCount++;
+ Task2Index.emplace(taskId, index);
+ Resize(taskCount);
+ } else {
+ index = it->second;
+ }
+
+ CpuTimeUs[index] = taskStats.GetCpuTimeUs();
+ SourceCpuTimeUs[index] = taskStats.GetSourceCpuTimeUs();
+
+ InputRows[index] = taskStats.GetInputRows();
+ InputBytes[index] = taskStats.GetInputBytes();
+ OutputRows[index] = taskStats.GetOutputRows();
+ OutputBytes[index] = taskStats.GetOutputBytes();
+
+ StartTimeMs[index] = taskStats.GetStartTimeMs(); // to be reviewed
+ FirstRowTimeMs[index] = taskStats.GetFirstRowTimeMs(); // to be reviewed
+ FinishTimeMs[index] = taskStats.GetFinishTimeMs(); // to be reviewed
+
+ for (auto& tableStat : taskStats.GetTables()) {
+ auto tablePath = tableStat.GetTablePath();
+ auto [it, inserted] = Tables.try_emplace(tablePath, taskCount);
+ auto& aggrTableStats = it->second;
+ aggrTableStats.ReadRows[index] = tableStat.GetReadRows();
+ aggrTableStats.ReadBytes[index] = tableStat.GetReadBytes();
+ aggrTableStats.WriteRows[index] = tableStat.GetWriteRows();
+ aggrTableStats.WriteBytes[index] = tableStat.GetWriteBytes();
+ aggrTableStats.EraseRows[index] = tableStat.GetEraseRows();
+ aggrTableStats.EraseBytes[index] = tableStat.GetEraseBytes();
+ aggrTableStats.AffectedPartitions[index] = tableStat.GetAffectedPartitions();
+ }
+
+ for (auto& sourceStat : taskStats.GetSources()) {
+ auto ingressName = sourceStat.GetIngressName();
+ auto [it, inserted] = Ingress.try_emplace(ingressName, taskCount);
+ auto& asyncBufferStats = it->second;
+ UpdateAsyncStats(index, asyncBufferStats.Ingress, sourceStat.GetIngress());
+ UpdateAsyncStats(index, asyncBufferStats.Push, sourceStat.GetPush());
+ UpdateAsyncStats(index, asyncBufferStats.Pop, sourceStat.GetPop());
+ }
+
+ for (auto& inputChannelStat : taskStats.GetInputChannels()) {
+ auto stageId = inputChannelStat.GetSrcStageId();
+ auto [it, inserted] = Input.try_emplace(stageId, taskCount);
+ auto& asyncBufferStats = it->second;
+ UpdateAsyncStats(index, asyncBufferStats.Push, inputChannelStat.GetPush());
+ UpdateAsyncStats(index, asyncBufferStats.Pop, inputChannelStat.GetPop());
+ }
+
+ for (auto& outputChannelStat : taskStats.GetOutputChannels()) {
+ auto stageId = outputChannelStat.GetDstStageId();
+ auto [it, inserted] = Output.try_emplace(stageId, taskCount);
+ auto& asyncBufferStats = it->second;
+ UpdateAsyncStats(index, asyncBufferStats.Push, outputChannelStat.GetPush());
+ UpdateAsyncStats(index, asyncBufferStats.Pop, outputChannelStat.GetPop());
+ }
+
+ for (auto& sinkStat : taskStats.GetSinks()) {
+ auto egressName = sinkStat.GetEgressName();
+ auto [it, inserted] = Egress.try_emplace(egressName, taskCount);
+ auto& asyncBufferStats = it->second;
+ UpdateAsyncStats(index, asyncBufferStats.Push, sinkStat.GetPush());
+ UpdateAsyncStats(index, asyncBufferStats.Pop, sinkStat.GetPop());
+ UpdateAsyncStats(index, asyncBufferStats.Ingress, sinkStat.GetEgress());
+ }
+
+ MaxMemoryUsage[index] = maxMemoryUsage;
+}
+
namespace {
TTableStat operator - (const TTableStat& l, const TTableStat& r) {
@@ -82,11 +230,9 @@ void UpdateMinMax(NDqProto::TDqStatsMinMax* minMax, ui64 value) noexcept {
minMax->SetMax(std::max(minMax->GetMax(), value));
}
-NDqProto::TDqStageStats* GetOrCreateStageStats(const NYql::NDqProto::TDqTaskStats& taskStats,
+NDqProto::TDqStageStats* GetOrCreateStageStats(const NYql::NDq::TStageId& stageId,
const TKqpTasksGraph& tasksGraph, NDqProto::TDqExecutionStats& execStats)
{
- auto& task = tasksGraph.GetTask(taskStats.GetTaskId());
- auto& stageId = task.StageId;
auto& stageInfo = tasksGraph.GetStageInfo(stageId);
auto& stageProto = stageInfo.Meta.Tx.Body->GetStages(stageId.StageId);
@@ -103,6 +249,13 @@ NDqProto::TDqStageStats* GetOrCreateStageStats(const NYql::NDqProto::TDqTaskStat
return newStage;
}
+NDqProto::TDqStageStats* GetOrCreateStageStats(const NYql::NDqProto::TDqTaskStats& taskStats,
+ const TKqpTasksGraph& tasksGraph, NDqProto::TDqExecutionStats& execStats)
+{
+ auto& task = tasksGraph.GetTask(taskStats.GetTaskId());
+ return GetOrCreateStageStats(task.StageId, tasksGraph, execStats);
+}
+
NDqProto::TDqTableAggrStats* GetOrCreateTableAggrStats(NDqProto::TDqStageStats* stage, const TString& tablePath) {
for(auto& table : *stage->MutableTables()) {
if (table.GetTablePath() == tablePath) {
@@ -411,6 +564,144 @@ void TQueryExecutionStats::AddDatashardStats(NYql::NDqProto::TDqComputeActorStat
}
}
+void TQueryExecutionStats::UpdateTaskStats(ui64 taskId, const NYql::NDqProto::TDqComputeActorStats& stats) {
+ Y_ASSERT(stats.GetTasks().size() == 1);
+ const NYql::NDqProto::TDqTaskStats& taskStats = stats.GetTasks(0);
+ Y_ASSERT(taskStats.GetTaskId() == taskId);
+ auto stageId = taskStats.GetStageId();
+ auto [it, inserted] = StageStats.try_emplace(stageId);
+ if (inserted) {
+ it->second.StageId = TasksGraph->GetTask(taskStats.GetTaskId()).StageId;
+ }
+ it->second.UpdateStats(taskStats, stats.GetMaxMemoryUsage());
+}
+
+void ExportAggStats(std::vector<ui64>& data, NYql::NDqProto::TDqStatsMinMax& stats) {
+ ui64 count = 0;
+ ui64 min = 0;
+ ui64 max = 0;
+ for (auto d : data) {
+ if (d) {
+ if (count) {
+ if (min > d) min = d;
+ if (max < d) max = d;
+ } else {
+ min = max = d;
+ }
+ count++;
+ }
+ }
+ if (count) {
+ stats.SetMin(min);
+ stats.SetMax(max);
+ }
+}
+
+void ExportAggStats(std::vector<ui64>& data, NYql::NDqProto::TDqStatsAggr& stats) {
+ ui64 count = 0;
+ ui64 min = 0;
+ ui64 max = 0;
+ ui64 sum = 0;
+ for (auto d : data) {
+ if (d) {
+ if (count) {
+ if (min > d) min = d;
+ if (max < d) max = d;
+ } else {
+ min = max = d;
+ }
+ sum += d;
+ count++;
+ }
+ }
+ if (count) {
+ stats.SetMin(min);
+ stats.SetMax(max);
+ stats.SetSum(sum);
+ stats.SetCnt(count);
+ }
+}
+
+ui64 ExportAggStats(std::vector<ui64>& data) {
+ ui64 sum = 0;
+ for (auto d : data) {
+ sum += d;
+ }
+ return sum;
+}
+
+void ExportAggAsyncStats(TAsyncStats& data, NYql::NDqProto::TDqAsyncStatsAggr& stats) {
+ ExportAggStats(data.Bytes, *stats.MutableBytes());
+ ExportAggStats(data.Rows, *stats.MutableRows());
+ ExportAggStats(data.Chunks, *stats.MutableChunks());
+ ExportAggStats(data.Splits, *stats.MutableSplits());
+ ExportAggStats(data.FirstMessageMs, *stats.MutableFirstMessageMs());
+ ExportAggStats(data.PauseMessageMs, *stats.MutablePauseMessageMs());
+ ExportAggStats(data.ResumeMessageMs, *stats.MutableResumeMessageMs());
+ ExportAggStats(data.LastMessageMs, *stats.MutableLastMessageMs());
+ ExportAggStats(data.WaitTimeUs, *stats.MutableWaitTimeUs());
+ ExportAggStats(data.WaitPeriods, *stats.MutableWaitPeriods());
+ ExportAggStats(data.ActiveTimeUs, *stats.MutableActiveTimeUs());
+}
+
+void ExportAggAsyncBufferStats(TAsyncBufferStats& data, NYql::NDqProto::TDqAsyncBufferStatsAggr& stats) {
+ ExportAggAsyncStats(data.Ingress, *stats.MutableIngress());
+ ExportAggAsyncStats(data.Push, *stats.MutablePush());
+ ExportAggAsyncStats(data.Pop, *stats.MutablePop());
+ ExportAggAsyncStats(data.Egress, *stats.MutableEgress());
+}
+
+void TQueryExecutionStats::ExportExecStats(NYql::NDqProto::TDqExecutionStats& stats) {
+
+ THashMap<ui32, NDqProto::TDqStageStats*> protoStages;
+ for (auto& [stageId, stagetype] : TasksGraph->GetStagesInfo()) {
+ protoStages.emplace(stageId.StageId, GetOrCreateStageStats(stageId, *TasksGraph, stats));
+ }
+
+ for (auto& p : StageStats) {
+ auto& stageStats = *protoStages[p.second.StageId.StageId];
+ stageStats.SetTotalTasksCount(p.second.Task2Index.size());
+
+ ExportAggStats(p.second.CpuTimeUs, *stageStats.MutableCpuTimeUs());
+ ExportAggStats(p.second.SourceCpuTimeUs, *stageStats.MutableSourceCpuTimeUs());
+
+ ExportAggStats(p.second.InputRows, *stageStats.MutableInputRows());
+ ExportAggStats(p.second.InputBytes, *stageStats.MutableInputBytes());
+ ExportAggStats(p.second.OutputRows, *stageStats.MutableOutputRows());
+ ExportAggStats(p.second.OutputBytes, *stageStats.MutableOutputBytes());
+
+ ExportAggStats(p.second.StartTimeMs, *stageStats.MutableStartTimeMs()); // to be reviewed
+ ExportAggStats(p.second.FirstRowTimeMs, *stageStats.MutableFirstRowTimeMs()); // to be reviewed
+ ExportAggStats(p.second.FinishTimeMs, *stageStats.MutableFinishTimeMs()); // to be reviewed
+
+ stageStats.SetDurationUs((stageStats.GetFinishTimeMs().GetMax() - stageStats.GetStartTimeMs().GetMin()) * 1'000);
+
+ for (auto& p2 : p.second.Tables) {
+ auto& table = *stageStats.AddTables();
+ table.SetTablePath(p2.first);
+ ExportAggStats(p2.second.ReadRows, *table.MutableReadRows());
+ ExportAggStats(p2.second.ReadBytes, *table.MutableReadBytes());
+ ExportAggStats(p2.second.WriteRows, *table.MutableWriteRows());
+ ExportAggStats(p2.second.WriteBytes, *table.MutableWriteBytes());
+ ExportAggStats(p2.second.EraseRows, *table.MutableEraseRows());
+ ExportAggStats(p2.second.EraseBytes, *table.MutableEraseBytes());
+ table.SetAffectedPartitions(ExportAggStats(p2.second.AffectedPartitions));
+ }
+ for (auto& p2 : p.second.Ingress) {
+ ExportAggAsyncBufferStats(p2.second, (*stageStats.MutableIngress())[p2.first]);
+ }
+ for (auto& p2 : p.second.Input) {
+ ExportAggAsyncBufferStats(p2.second, (*stageStats.MutableInput())[p2.first]);
+ }
+ for (auto& p2 : p.second.Output) {
+ ExportAggAsyncBufferStats(p2.second, (*stageStats.MutableOutput())[p2.first]);
+ }
+ for (auto& p2 : p.second.Egress) {
+ ExportAggAsyncBufferStats(p2.second, (*stageStats.MutableEgress())[p2.first]);
+ }
+ }
+}
+
void TQueryExecutionStats::Finish() {
// Cerr << (TStringBuilder() << "-- finish: executerTime: " << ExecuterCpuTime.MicroSeconds() << Endl);
diff --git a/ydb/core/kqp/executer_actor/kqp_executer_stats.h b/ydb/core/kqp/executer_actor/kqp_executer_stats.h
index 63348022b1..14f1de9b8e 100644
--- a/ydb/core/kqp/executer_actor/kqp_executer_stats.h
+++ b/ydb/core/kqp/executer_actor/kqp_executer_stats.h
@@ -12,10 +12,94 @@ NYql::NDqProto::EDqStatsMode GetDqStatsModeShard(Ydb::Table::QueryStatsCollectio
bool CollectFullStats(Ydb::Table::QueryStatsCollection::Mode statsMode);
bool CollectProfileStats(Ydb::Table::QueryStatsCollection::Mode statsMode);
+struct TAsyncStats {
+ // Data
+ std::vector<ui64> Bytes;
+ std::vector<ui64> Rows;
+ std::vector<ui64> Chunks;
+ std::vector<ui64> Splits;
+ // Time
+ std::vector<ui64> FirstMessageMs;
+ std::vector<ui64> PauseMessageMs;
+ std::vector<ui64> ResumeMessageMs;
+ std::vector<ui64> LastMessageMs;
+ std::vector<ui64> WaitTimeUs;
+ std::vector<ui64> WaitPeriods;
+ std::vector<ui64> ActiveTimeUs;
+
+ void Resize(ui32 taskCount);
+};
+
+struct TAsyncBufferStats {
+
+ TAsyncBufferStats() = default;
+ TAsyncBufferStats(ui32 taskCount) {
+ Resize(taskCount);
+ }
+
+ TAsyncStats Ingress;
+ TAsyncStats Push;
+ TAsyncStats Pop;
+ TAsyncStats Egress;
+
+ void Resize(ui32 taskCount);
+};
+
+struct TTableStats {
+
+ TTableStats() = default;
+ TTableStats(ui32 taskCount) {
+ Resize(taskCount);
+ }
+
+ std::vector<ui64> ReadRows;
+ std::vector<ui64> ReadBytes;
+ std::vector<ui64> WriteRows;
+ std::vector<ui64> WriteBytes;
+ std::vector<ui64> EraseRows;
+ std::vector<ui64> EraseBytes;
+
+ std::vector<ui64> AffectedPartitions;
+
+ void Resize(ui32 taskCount);
+};
+
+struct TStageExecutionStats {
+
+ NYql::NDq::TStageId StageId;
+
+ std::map<ui32, ui32> Task2Index;
+
+ std::vector<ui64> CpuTimeUs;
+ std::vector<ui64> SourceCpuTimeUs;
+
+ std::vector<ui64> InputRows;
+ std::vector<ui64> InputBytes;
+ std::vector<ui64> OutputRows;
+ std::vector<ui64> OutputBytes;
+
+ std::vector<ui64> FirstRowTimeMs;
+ std::vector<ui64> FinishTimeMs;
+ std::vector<ui64> StartTimeMs;
+
+ std::map<TString, TTableStats> Tables;
+ std::map<TString, TAsyncBufferStats> Ingress;
+ std::map<TString, TAsyncBufferStats> Egress;
+ std::map<ui32, TAsyncBufferStats> Input;
+ std::map<ui32, TAsyncBufferStats> Output;
+
+ std::vector<ui64> MaxMemoryUsage;
+
+ void Resize(ui32 taskCount);
+ void UpdateAsyncStats(i32 index, TAsyncStats& aggrAsyncStats, const NYql::NDqProto::TDqAsyncBufferStats& asyncStats);
+ void UpdateStats(const NYql::NDqProto::TDqTaskStats& taskStats, ui64 maxMemoryUsage);
+};
+
struct TQueryExecutionStats {
private:
std::map<ui32, std::map<ui32, ui32>> ShardsCountByNode;
std::map<ui32, bool> UseLlvmByStageId;
+ std::map<ui32, TStageExecutionStats> StageStats;
public:
const Ydb::Table::QueryStatsCollection::Mode StatsMode;
const TKqpTasksGraph* const TasksGraph = nullptr;
@@ -70,6 +154,9 @@ public:
TDuration collectLongTaskStatsTimeout = TDuration::Max()
);
+ void UpdateTaskStats(ui64 taskId, const NYql::NDqProto::TDqComputeActorStats& stats);
+ void ExportExecStats(NYql::NDqProto::TDqExecutionStats& stats);
+
void Finish();
private:
diff --git a/ydb/core/kqp/gateway/kqp_gateway.h b/ydb/core/kqp/gateway/kqp_gateway.h
index 320e00e958..68f69929b0 100644
--- a/ydb/core/kqp/gateway/kqp_gateway.h
+++ b/ydb/core/kqp/gateway/kqp_gateway.h
@@ -139,6 +139,7 @@ public:
ui64 MkqlMemoryLimit = 0; // old engine compatibility
ui64 PerShardKeysSizeLimitBytes = 0;
Ydb::Table::QueryStatsCollection::Mode StatsMode = Ydb::Table::QueryStatsCollection::STATS_COLLECTION_NONE;
+ TDuration ProgressStatsPeriod;
TKqpSnapshot Snapshot = TKqpSnapshot();
NKikimrKqp::EIsolationLevel IsolationLevel = NKikimrKqp::ISOLATION_LEVEL_UNDEFINED;
TMaybe<NKikimrKqp::TRlPath> RlPath;
diff --git a/ydb/core/kqp/proxy_service/kqp_proxy_service.cpp b/ydb/core/kqp/proxy_service/kqp_proxy_service.cpp
index 32b2fe4ce1..cef3ffa7a3 100644
--- a/ydb/core/kqp/proxy_service/kqp_proxy_service.cpp
+++ b/ydb/core/kqp/proxy_service/kqp_proxy_service.cpp
@@ -14,6 +14,7 @@
#include <ydb/core/kqp/common/kqp_lwtrace_probes.h>
#include <ydb/core/kqp/common/kqp_timeouts.h>
#include <ydb/core/kqp/compile_service/kqp_compile_service.h>
+#include <ydb/core/kqp/executer_actor/kqp_executer.h>
#include <ydb/core/kqp/session_actor/kqp_worker_common.h>
#include <ydb/core/kqp/node_service/kqp_node_service.h>
#include <ydb/library/yql/dq/actors/spilling/spilling_file.h>
@@ -606,6 +607,11 @@ public:
}
const TString& sessionId = ev->Get()->GetSessionId();
+
+ if (!ev->Get()->GetUserRequestContext()) {
+ ev->Get()->SetUserRequestContext(MakeIntrusive<TUserRequestContext>(traceId, database, sessionId));
+ }
+
const TKqpSessionInfo* sessionInfo = LocalSessions->FindPtr(sessionId);
auto dbCounters = sessionInfo ? sessionInfo->DbCounters : nullptr;
if (!dbCounters) {
@@ -650,11 +656,11 @@ public:
if (cancelAfter) {
timerDuration = Min(timerDuration, cancelAfter);
}
- KQP_PROXY_LOG_D(TKqpRequestInfo(traceId, sessionId) << "TEvQueryRequest, set timer for: " << timerDuration << " timeout: " << timeout << " cancelAfter: " << cancelAfter);
+ KQP_PROXY_LOG_D("Ctx: " << *ev->Get()->GetUserRequestContext() << ". TEvQueryRequest, set timer for: " << timerDuration
+ << " timeout: " << timeout << " cancelAfter: " << cancelAfter
+ << ". " << "Send request to target, requestId: " << requestId << ", targetId: " << targetId);
auto status = timerDuration == cancelAfter ? NYql::NDqProto::StatusIds::CANCELLED : NYql::NDqProto::StatusIds::TIMEOUT;
StartQueryTimeout(requestId, timerDuration, status);
- KQP_PROXY_LOG_D("Sent request to target, requestId: " << requestId
- << ", targetId: " << targetId << ", sessionId: " << sessionId);
Send(targetId, ev->Release().Release(), IEventHandle::FlagTrackDelivery, requestId);
}
@@ -838,6 +844,22 @@ public:
PendingRequests.Erase(requestId);
}
+ void ForwardProgress(TEvKqpExecuter::TEvExecuterProgress::TPtr& ev) {
+ ui64 requestId = ev->Cookie;
+
+ auto proxyRequest = PendingRequests.FindPtr(requestId);
+ if (!proxyRequest) {
+ KQP_PROXY_LOG_E("Unknown sender for proxy response, requestId: " << requestId);
+ return;
+ }
+
+ Send(proxyRequest->Sender, ev->Release().Release(), 0, proxyRequest->SenderCookie);
+
+ TKqpRequestInfo requestInfo(proxyRequest->TraceId);
+ KQP_PROXY_LOG_D(requestInfo << "Forwarded response to sender actor, requestId: " << requestId
+ << ", sender: " << proxyRequest->Sender << ", selfId: " << SelfId() << ", source: " << ev->Sender);
+ }
+
void LookupPeerProxyData() {
if (!SelfDataCenterId || BoardLookupActor || AppData()->TenantName.empty()) {
return;
@@ -1246,6 +1268,7 @@ public:
hFunc(TEvKqp::TEvScriptRequest, Handle);
hFunc(TEvKqp::TEvCloseSessionRequest, Handle);
hFunc(TEvKqp::TEvQueryResponse, ForwardEvent);
+ hFunc(TEvKqpExecuter::TEvExecuterProgress, ForwardProgress);
hFunc(TEvKqp::TEvProcessResponse, Handle);
hFunc(TEvKqp::TEvCreateSessionRequest, Handle);
hFunc(TEvKqp::TEvPingSessionRequest, Handle);
diff --git a/ydb/core/kqp/proxy_service/kqp_script_executions.cpp b/ydb/core/kqp/proxy_service/kqp_script_executions.cpp
index 16101529d4..2d18aeb97b 100644
--- a/ydb/core/kqp/proxy_service/kqp_script_executions.cpp
+++ b/ydb/core/kqp/proxy_service/kqp_script_executions.cpp
@@ -1735,6 +1735,11 @@ public:
)
void Handle(TEvSaveScriptResultMetaFinished::TPtr& ev) {
+ if (ev->Get()->Status == Ydb::StatusIds::ABORTED) {
+ Register(new TSaveScriptExecutionResultMetaQuery(Database, ExecutionId, SerializedMetas));
+ return;
+ }
+
Send(ev->Forward(ReplyActorId));
PassAway();
}
@@ -2698,6 +2703,54 @@ private:
NYql::TIssues OperationIssues;
};
+class TScriptProgressActor : public TQueryBase {
+public:
+ TScriptProgressActor(const TString& database, const TString& executionId, const TString& queryPlan, const TString&)
+ : Database(database), ExecutionId(executionId), QueryPlan(queryPlan)
+ {
+ KQP_PROXY_LOG_D(queryPlan);
+ }
+
+ void OnRunQuery() override {
+ TString sql = R"(
+ -- TScriptProgressActor::OnRunQuery
+ DECLARE $execution_id AS Text;
+ DECLARE $database AS Text;
+ DECLARE $plan AS JsonDocument;
+
+ UPSERT INTO `.metadata/script_executions` (execution_id, database, plan)
+ VALUES ($execution_id, $database, $plan);
+ )";
+
+ NYdb::TParamsBuilder params;
+ params
+ .AddParam("$execution_id")
+ .Utf8(ExecutionId)
+ .Build()
+ .AddParam("$database")
+ .Utf8(Database)
+ .Build()
+ .AddParam("$plan")
+ .JsonDocument(QueryPlan)
+ .Build();
+
+ RunDataQuery(sql, &params);
+ }
+
+ void OnQueryResult() override {
+ Finish();
+ }
+
+ void OnFinish(Ydb::StatusIds::StatusCode, NYql::TIssues&&) override {
+ }
+
+private:
+ TString Database;
+ TString ExecutionId;
+ TString QueryPlan;
+};
+
+
} // anonymous namespace
NActors::IActor* CreateScriptExecutionCreatorActor(TEvKqp::TEvScriptRequest::TPtr&& ev, const NKikimrConfig::TQueryServiceConfig& queryServiceConfig, TIntrusivePtr<TKqpCounters> counters, TDuration maxRunTime) {
@@ -2752,6 +2805,10 @@ NActors::IActor* CreateScriptFinalizationFinisherActor(const TString& executionI
return new TScriptFinalizationFinisherActor(executionId, database, operationStatus, std::move(operationIssues));
}
+NActors::IActor* CreateScriptProgressActor(const TString& executionId, const TString& database, const TString& queryPlan, const TString& queryStats) {
+ return new TScriptProgressActor(database, executionId, queryPlan, queryStats);
+}
+
namespace NPrivate {
NActors::IActor* CreateCreateScriptOperationQueryActor(const TString& executionId, const NActors::TActorId& runScriptActorId, const NKikimrKqp::TEvQueryRequest& record, TDuration operationTtl, TDuration resultsTtl, TDuration leaseDuration) {
diff --git a/ydb/core/kqp/proxy_service/kqp_script_executions.h b/ydb/core/kqp/proxy_service/kqp_script_executions.h
index 0ca77c17fd..5781046a1d 100644
--- a/ydb/core/kqp/proxy_service/kqp_script_executions.h
+++ b/ydb/core/kqp/proxy_service/kqp_script_executions.h
@@ -35,5 +35,6 @@ NActors::IActor* CreateGetScriptExecutionResultActor(const NActors::TActorId& re
NActors::IActor* CreateSaveScriptExternalEffectActor(TEvSaveScriptExternalEffectRequest::TPtr ev);
NActors::IActor* CreateSaveScriptFinalStatusActor(TEvScriptFinalizeRequest::TPtr ev);
NActors::IActor* CreateScriptFinalizationFinisherActor(const TString& executionId, const TString& database, std::optional<Ydb::StatusIds::StatusCode> operationStatus, NYql::TIssues operationIssues);
+NActors::IActor* CreateScriptProgressActor(const TString& executionId, const TString& database, const TString& queryPlan, const TString& queryStats);
} // namespace NKikimr::NKqp
diff --git a/ydb/core/kqp/run_script_actor/kqp_run_script_actor.cpp b/ydb/core/kqp/run_script_actor/kqp_run_script_actor.cpp
index eb2c4f9748..71c15790a6 100644
--- a/ydb/core/kqp/run_script_actor/kqp_run_script_actor.cpp
+++ b/ydb/core/kqp/run_script_actor/kqp_run_script_actor.cpp
@@ -20,11 +20,11 @@
#include <forward_list>
-#define LOG_T(stream) LOG_TRACE_S(NActors::TActivationContext::AsActorContext(), NKikimrServices::KQP_EXECUTER, SelfId() << " " << stream);
-#define LOG_D(stream) LOG_DEBUG_S(NActors::TActivationContext::AsActorContext(), NKikimrServices::KQP_EXECUTER, SelfId() << " " << stream);
-#define LOG_I(stream) LOG_INFO_S(NActors::TActivationContext::AsActorContext(), NKikimrServices::KQP_EXECUTER, SelfId() << " " << stream);
-#define LOG_W(stream) LOG_WARN_S(NActors::TActivationContext::AsActorContext(), NKikimrServices::KQP_EXECUTER, SelfId() << " " << stream);
-#define LOG_E(stream) LOG_ERROR_S(NActors::TActivationContext::AsActorContext(), NKikimrServices::KQP_EXECUTER, SelfId() << " " << stream);
+#define LOG_T(stream) LOG_TRACE_S(NActors::TActivationContext::AsActorContext(), NKikimrServices::KQP_EXECUTER, "TRunScriptActor " << SelfId() << ". " << "Ctx: " << *UserRequestContext << ". " << stream);
+#define LOG_D(stream) LOG_DEBUG_S(NActors::TActivationContext::AsActorContext(), NKikimrServices::KQP_EXECUTER, "TRunScriptActor " << SelfId() << ". " << "Ctx: " << *UserRequestContext << ". " << stream);
+#define LOG_I(stream) LOG_INFO_S (NActors::TActivationContext::AsActorContext(), NKikimrServices::KQP_EXECUTER, "TRunScriptActor " << SelfId() << ". " << "Ctx: " << *UserRequestContext << ". " << stream);
+#define LOG_W(stream) LOG_WARN_S (NActors::TActivationContext::AsActorContext(), NKikimrServices::KQP_EXECUTER, "TRunScriptActor " << SelfId() << ". " << "Ctx: " << *UserRequestContext << ". " << stream);
+#define LOG_E(stream) LOG_ERROR_S(NActors::TActivationContext::AsActorContext(), NKikimrServices::KQP_EXECUTER, "TRunScriptActor " << SelfId() << ". " << "Ctx: " << *UserRequestContext << ". " << stream);
namespace NKikimr::NKqp {
@@ -67,7 +67,9 @@ public:
, ResultsTtl(resultsTtl)
, QueryServiceConfig(queryServiceConfig)
, Counters(counters)
- {}
+ {
+ UserRequestContext = MakeIntrusive<TUserRequestContext>(Request.GetTraceId(), Database, "", ExecutionId, Request.GetTraceId());
+ }
static constexpr char ActorName[] = "KQP_RUN_SCRIPT_ACTOR";
@@ -80,6 +82,7 @@ private:
hFunc(NActors::TEvents::TEvWakeup, Handle);
hFunc(NActors::TEvents::TEvPoison, Handle);
hFunc(TEvKqpExecuter::TEvStreamData, Handle);
+ hFunc(TEvKqpExecuter::TEvExecuterProgress, Handle);
hFunc(TEvKqp::TEvQueryResponse, Handle);
hFunc(TEvKqp::TEvCreateSessionResponse, Handle);
IgnoreFunc(TEvKqp::TEvCloseSessionResponse);
@@ -116,6 +119,7 @@ private:
}
} else {
SessionId = resp.GetResponse().GetSessionId();
+ UserRequestContext->SessionId = SessionId;
if (RunState == ERunState::Running) {
Start();
@@ -139,10 +143,14 @@ private:
auto ev = MakeHolder<TEvKqp::TEvQueryRequest>();
ev->Record = Request;
ev->Record.MutableRequest()->SetSessionId(SessionId);
- ev->SetUserRequestContext(MakeIntrusive<TUserRequestContext>(Request.GetTraceId(), Database, SessionId, ExecutionId, Request.GetTraceId()));
+ ev->SetUserRequestContext(UserRequestContext);
+ if (ev->Record.GetRequest().GetCollectStats() >= Ydb::Table::QueryStatsCollection::STATS_COLLECTION_FULL) {
+ ev->SetProgressStatsPeriod(TDuration::MilliSeconds(QueryServiceConfig.GetProgressStatsPeriodMs()));
+ }
NActors::ActorIdToProto(SelfId(), ev->Record.MutableRequestActorId());
+ LOG_I("Start Script Execution");
SendToKqpProxy(std::move(ev));
}
@@ -376,6 +384,12 @@ private:
);
}
+ void Handle(TEvKqpExecuter::TEvExecuterProgress::TPtr& ev) {
+ Register(
+ CreateScriptProgressActor(ExecutionId, Database, ev->Get()->Record.GetQueryPlan(), "")
+ );
+ }
+
void Handle(TEvKqp::TEvQueryResponse::TPtr& ev) {
if (RunState != ERunState::Running) {
return;
@@ -582,6 +596,7 @@ private:
std::optional<TString> QueryPlan;
std::optional<TString> QueryAst;
std::optional<NKqpProto::TKqpStatsQuery> QueryStats;
+ TIntrusivePtr<TUserRequestContext> UserRequestContext;
};
} // namespace
diff --git a/ydb/core/kqp/session_actor/kqp_query_state.h b/ydb/core/kqp/session_actor/kqp_query_state.h
index 68fa21228e..60970b2588 100644
--- a/ydb/core/kqp/session_actor/kqp_query_state.h
+++ b/ydb/core/kqp/session_actor/kqp_query_state.h
@@ -439,6 +439,10 @@ public:
return RequestEv->GetCollectDiagnostics();
}
+ TDuration GetProgressStatsPeriod() {
+ return RequestEv->GetProgressStatsPeriod();
+ }
+
//// Topic ops ////
void AddOffsetsToTransaction();
bool TryMergeTopicOffsets(const NTopic::TTopicOperations &operations, TString& message);
diff --git a/ydb/core/kqp/session_actor/kqp_session_actor.cpp b/ydb/core/kqp/session_actor/kqp_session_actor.cpp
index 3855bd9258..3fd7c1e63f 100644
--- a/ydb/core/kqp/session_actor/kqp_session_actor.cpp
+++ b/ydb/core/kqp/session_actor/kqp_session_actor.cpp
@@ -754,6 +754,7 @@ public:
}
request.StatsMode = queryState->GetStatsMode();
+ request.ProgressStatsPeriod = queryState->GetProgressStatsPeriod();
}
const auto& limits = GetQueryLimits(Settings);
@@ -1142,6 +1143,28 @@ public:
ProcessExecuterResult(ev->Get());
}
+ void HandleExecute(TEvKqpExecuter::TEvExecuterProgress::TPtr& ev) {
+ if (QueryState && QueryState->RequestActorId) {
+ if (ExecuterId != ev->Sender) {
+ return;
+ }
+
+ if (QueryState->ReportStats()) {
+ if (QueryState->GetStatsMode() >= Ydb::Table::QueryStatsCollection::STATS_COLLECTION_FULL) {
+ NKqpProto::TKqpStatsQuery& stats = *ev->Get()->Record.MutableQueryStats();
+ NKqpProto::TKqpStatsQuery executionStats;
+ executionStats.Swap(&stats);
+ stats = QueryState->Stats;
+ stats.MutableExecutions()->MergeFrom(executionStats.GetExecutions());
+ ev->Get()->Record.SetQueryPlan(SerializeAnalyzePlan(stats));
+ }
+ }
+
+ LOG_D("Forwarded TEvExecuterProgress to " << QueryState->RequestActorId);
+ Send(QueryState->RequestActorId, ev->Release().Release(), 0, QueryState->ProxyRequestId);
+ }
+ }
+
std::optional<TKqpTempTablesState::TTempTableInfo> GetTemporaryTableInfo(TKqpPhyTxHolder::TConstPtr tx) {
if (!tx) {
return std::nullopt;
@@ -1965,6 +1988,7 @@ public:
// forgotten messages from previous aborted request
hFunc(TEvKqp::TEvCompileResponse, HandleNoop);
hFunc(TEvKqpExecuter::TEvTxResponse, HandleNoop);
+ hFunc(TEvKqpExecuter::TEvExecuterProgress, HandleNoop)
hFunc(TEvTxProxySchemeCache::TEvNavigateKeySetResult, HandleNoop);
hFunc(TEvents::TEvUndelivered, HandleNoop);
// message from KQP proxy in case of our reply just after kqp proxy timer tick
@@ -1992,6 +2016,7 @@ public:
hFunc(TEvTxUserProxy::TEvAllocateTxIdResult, Handle);
hFunc(TEvKqpExecuter::TEvTxResponse, HandleExecute);
+ hFunc(TEvKqpExecuter::TEvExecuterProgress, HandleExecute)
hFunc(TEvKqpExecuter::TEvStreamData, HandleExecute);
hFunc(TEvKqpExecuter::TEvStreamDataAck, HandleExecute);
@@ -2045,6 +2070,7 @@ public:
// always come from WorkerActor
hFunc(TEvKqp::TEvCloseSessionResponse, HandleCleanup);
hFunc(TEvKqp::TEvQueryResponse, HandleNoop);
+ hFunc(TEvKqpExecuter::TEvExecuterProgress, HandleNoop)
default:
UnexpectedEvent("CleanupState", ev);
}
diff --git a/ydb/core/protos/config.proto b/ydb/core/protos/config.proto
index 29a08e0c98..5dc2e109b5 100644
--- a/ydb/core/protos/config.proto
+++ b/ydb/core/protos/config.proto
@@ -1040,6 +1040,7 @@ message TQueryServiceConfig {
optional bool MdbTransformHost = 10;
optional NYql.TGenericGatewayConfig Generic = 11;
optional TFinalizeScriptServiceConfig FinalizeScriptServiceConfig = 12;
+ optional uint64 ProgressStatsPeriodMs = 14 [default = 0]; // 0 = disabled
}
// Config describes immediate controls and allows
diff --git a/ydb/core/protos/console_config.proto b/ydb/core/protos/console_config.proto
index 9e48803547..61daa66137 100644
--- a/ydb/core/protos/console_config.proto
+++ b/ydb/core/protos/console_config.proto
@@ -88,6 +88,7 @@ message TConfigItem {
MemoryLogConfigItem = 19;
GRpcConfigItem = 20;
DynamicNameserviceConfigItem = 22;
+
DynamicNodeConfigItem = 24;
CmsConfigItem = 25;
FeatureFlagsItem = 26;
@@ -99,6 +100,7 @@ message TConfigItem {
ConfigsDispatcherConfigItem = 33;
TableProfilesConfigItem = 34;
KeyConfigItem = 35;
+ PDiskKeyConfigItem = 51;
NodeBrokerConfigItem = 36;
TableServiceConfigItem = 37;
SharedCacheConfigItem = 38;
@@ -111,20 +113,24 @@ message TConfigItem {
MeteringConfigItem = 45;
HiveConfigItem = 46;
DataShardConfigItem = 49;
- FederatedQueryConfigItem = 50;
- PDiskKeyConfigItem = 51;
+ FederatedQueryConfigItem = 58;
CompactionConfigItem = 52;
HttpProxyConfigItem = 53;
SchemeShardConfigItem = 54;
- ClientCertificateAuthorizationConfigItem = 55;
+ TracingConfigItem = 55;
+ FailureInjectionConfigItem = 56;
+ PublicHttpConfigItem = 57;
MetadataProviderConfigItem = 59;
BackgroundTasksConfigItem = 60;
+ AuditConfigItem = 61;
+ ClientCertificateAuthorizationItem = 62;
ExternalIndexConfigItem = 63;
YamlConfigEnabledItem = 64;
ScanConveyorConfigItem = 65;
ColumnShardConfigItem = 66;
+ LocalPgWireConfigItem = 69;
AwsCompatibilityConfigItem = 70;
- KafkaProxyConfig = 71;
+ KafkaProxyConfigItem = 71;
CompConveyorConfigItem = 72;
QueryServiceConfigItem = 73;
InsertConveyorConfigItem = 74;
diff --git a/ydb/core/protos/kqp.proto b/ydb/core/protos/kqp.proto
index 1b42944578..30e6c740aa 100644
--- a/ydb/core/protos/kqp.proto
+++ b/ydb/core/protos/kqp.proto
@@ -432,8 +432,12 @@ message TEvExecuterStreamProfile {
optional NYql.NDqProto.TDqExecutionStats Profile = 2;
};
+// 1. Executer fills progress stats from it's own execution
+// 2. Session adds stats from early finished executions and builds complete plan
message TEvExecuterProgress {
optional NActorsProto.TActorId ExecuterActorId = 1;
+ optional string QueryPlan = 2;
+ optional NKqpProto.TKqpStatsQuery QueryStats = 3;
};
message TKqpProxyNodeResources {
diff --git a/ydb/library/yql/sql/pg/pg_sql.cpp b/ydb/library/yql/sql/pg/pg_sql.cpp
index 3e8a1acb61..d2969e621d 100644
--- a/ydb/library/yql/sql/pg/pg_sql.cpp
+++ b/ydb/library/yql/sql/pg/pg_sql.cpp
@@ -2394,7 +2394,7 @@ public:
return {};
}
- const auto cluster = !schemaname.Empty() ? schemaname : Settings.DefaultCluster;
+ const auto cluster = !schemaname.Empty() && schemaname != "public" ? schemaname : Settings.DefaultCluster;
const auto sinkOrSource = BuildClusterSinkOrSourceExpression(isSink, cluster);
const auto key = BuildTableKeyExpression(relname, isScheme);
return {sinkOrSource, key};
@@ -2421,7 +2421,7 @@ public:
return {};
}
- const auto cluster = !schemaname.Empty() ? schemaname : Settings.DefaultCluster;
+ const auto cluster = !schemaname.Empty() && schemaname != "public" ? schemaname : Settings.DefaultCluster;
const auto sinkOrSource = BuildClusterSinkOrSourceExpression(true, cluster);
const auto key = BuildPgObjectExpression(objectName, pgObjectType);
return {sinkOrSource, key};
diff --git a/ydb/library/yql/sql/pg/pg_sql_ut.cpp b/ydb/library/yql/sql/pg/pg_sql_ut.cpp
index ce021378e3..fe86027f85 100644
--- a/ydb/library/yql/sql/pg/pg_sql_ut.cpp
+++ b/ydb/library/yql/sql/pg/pg_sql_ut.cpp
@@ -377,12 +377,22 @@ Y_UNIT_TEST_SUITE(PgSqlParsingOnly) {
}
Y_UNIT_TEST(DropTableUnknownClusterStmt) {
- auto res = PgSqlToYql("drop table if exists public.t");
+ auto res = PgSqlToYql("drop table if exists pub.t");
UNIT_ASSERT(!res.IsOk());
UNIT_ASSERT_EQUAL(res.Issues.Size(), 1);
auto issue = *(res.Issues.begin());
- UNIT_ASSERT_C(issue.GetMessage().find("Unknown cluster: public") != TString::npos, res.Issues.ToString());
+ UNIT_ASSERT_C(issue.GetMessage().find("Unknown cluster: pub") != TString::npos, res.Issues.ToString());
+ }
+
+ Y_UNIT_TEST(PublicSchemeRemove) {
+ auto res = PgSqlToYql("DROP TABLE IF EXISTS public.t; CREATE TABLE public.t(id INT PRIMARY KEY, foo INT);\
+INSERT INTO public.t VALUES(1, 2);\
+UPDATE public.t SET foo = 3 WHERE id == 1;\
+DELETE FROM public.t WHERE id == 1;\
+SELECT COUNT(*) FROM public.t;");
+ UNIT_ASSERT(res.IsOk());
+ UNIT_ASSERT(res.Root->ToString().find("public") == TString::npos);
}
Y_UNIT_TEST(UpdateStmt) {
diff --git a/ydb/tests/tools/fq_runner/kikimr_runner.py b/ydb/tests/tools/fq_runner/kikimr_runner.py
index a7ab18fd0b..a14fcdf4cd 100644
--- a/ydb/tests/tools/fq_runner/kikimr_runner.py
+++ b/ydb/tests/tools/fq_runner/kikimr_runner.py
@@ -600,6 +600,7 @@ class StreamingOverKikimr(object):
self.wd = yatest.common.output_path("yq_" + self.uuid)
os.mkdir(self.wd)
self.fill_config()
+ self.compute_plane.qs_config['progress_stats_period_ms'] = 1
driver_config = ydb.DriverConfig(os.getenv("YDB_ENDPOINT"), os.getenv("YDB_DATABASE"))
self.driver = ydb.Driver(driver_config)
try: