diff options
author | robot-piglet <robot-piglet@yandex-team.com> | 2023-12-05 13:21:52 +0300 |
---|---|---|
committer | robot-piglet <robot-piglet@yandex-team.com> | 2023-12-05 13:59:38 +0300 |
commit | c2c74635dbd451dfd2e13842854a412a4c43dc32 (patch) | |
tree | 75ef8263b117706d2e3dabc03011412ba3d4a305 | |
parent | e69e63ed814792594791cf49bb976c9338bab02f (diff) | |
download | ydb-c2c74635dbd451dfd2e13842854a412a4c43dc32.tar.gz |
Intermediate changes
27 files changed, 820 insertions, 61 deletions
diff --git a/contrib/python/boto3/py3/.dist-info/METADATA b/contrib/python/boto3/py3/.dist-info/METADATA index 1a0391e3b6..25169cdc26 100644 --- a/contrib/python/boto3/py3/.dist-info/METADATA +++ b/contrib/python/boto3/py3/.dist-info/METADATA @@ -1,6 +1,6 @@ Metadata-Version: 2.1 Name: boto3 -Version: 1.29.3 +Version: 1.29.4 Summary: The AWS SDK for Python Home-page: https://github.com/boto/boto3 Author: Amazon Web Services @@ -24,7 +24,7 @@ Classifier: Programming Language :: Python :: 3.12 Requires-Python: >= 3.7 License-File: LICENSE License-File: NOTICE -Requires-Dist: botocore (<1.33.0,>=1.32.3) +Requires-Dist: botocore (<1.33.0,>=1.32.4) Requires-Dist: jmespath (<2.0.0,>=0.7.1) Requires-Dist: s3transfer (<0.8.0,>=0.7.0) Provides-Extra: crt diff --git a/contrib/python/boto3/py3/boto3/__init__.py b/contrib/python/boto3/py3/boto3/__init__.py index cc10896a61..15c034de24 100644 --- a/contrib/python/boto3/py3/boto3/__init__.py +++ b/contrib/python/boto3/py3/boto3/__init__.py @@ -17,7 +17,7 @@ from boto3.compat import _warn_deprecated_python from boto3.session import Session __author__ = 'Amazon Web Services' -__version__ = '1.29.3' +__version__ = '1.29.4' # The default Boto3 session; autoloaded when needed. diff --git a/contrib/python/boto3/py3/ya.make b/contrib/python/boto3/py3/ya.make index e5448d3a86..709c02709a 100644 --- a/contrib/python/boto3/py3/ya.make +++ b/contrib/python/boto3/py3/ya.make @@ -2,7 +2,7 @@ PY3_LIBRARY() -VERSION(1.29.3) +VERSION(1.29.4) LICENSE(Apache-2.0) diff --git a/contrib/python/pyasn1/py3/.dist-info/METADATA b/contrib/python/pyasn1/py3/.dist-info/METADATA index 530fe5bf7b..1a6727cecc 100644 --- a/contrib/python/pyasn1/py3/.dist-info/METADATA +++ b/contrib/python/pyasn1/py3/.dist-info/METADATA @@ -1,6 +1,6 @@ Metadata-Version: 2.1 Name: pyasn1 -Version: 0.5.0 +Version: 0.5.1 Summary: Pure-Python implementation of ASN.1 types and DER/BER/CER codecs (X.208) Home-page: https://github.com/pyasn1/pyasn1 Author: Ilya Etingof @@ -32,6 +32,7 @@ Classifier: Programming Language :: Python :: 3.8 Classifier: Programming Language :: Python :: 3.9 Classifier: Programming Language :: Python :: 3.10 Classifier: Programming Language :: Python :: 3.11 +Classifier: Programming Language :: Python :: 3.12 Classifier: Programming Language :: Python :: Implementation :: CPython Classifier: Programming Language :: Python :: Implementation :: PyPy Classifier: Topic :: Communications diff --git a/contrib/python/pyasn1/py3/pyasn1/__init__.py b/contrib/python/pyasn1/py3/pyasn1/__init__.py index a979d291f2..73d47f3424 100644 --- a/contrib/python/pyasn1/py3/pyasn1/__init__.py +++ b/contrib/python/pyasn1/py3/pyasn1/__init__.py @@ -1,2 +1,2 @@ # https://www.python.org/dev/peps/pep-0396/ -__version__ = '0.5.0' +__version__ = '0.5.1' diff --git a/contrib/python/pyasn1/py3/pyasn1/codec/ber/decoder.py b/contrib/python/pyasn1/py3/pyasn1/codec/ber/decoder.py index 070733fd28..7cc863d1c7 100644 --- a/contrib/python/pyasn1/py3/pyasn1/codec/ber/decoder.py +++ b/contrib/python/pyasn1/py3/pyasn1/codec/ber/decoder.py @@ -4,7 +4,10 @@ # Copyright (c) 2005-2020, Ilya Etingof <etingof@gmail.com> # License: https://pyasn1.readthedocs.io/en/latest/license.html # +import io import os +import sys + from pyasn1 import debug from pyasn1 import error @@ -1762,7 +1765,14 @@ class SingleItemDecoder(object): if state is stDecodeValue: if not options.get('recursiveFlag', True) and not substrateFun: # deprecate this - substrateFun = lambda a, b, c: (a, b[:c]) + def substrateFun(asn1Object, _substrate, _length, _options): + """Legacy hack to keep the recursiveFlag=False option supported. + + The decode(..., substrateFun=userCallback) option was introduced in 0.1.4 as a generalization + of the old recursiveFlag=False option. Users should pass their callback instead of using + recursiveFlag. + """ + yield asn1Object original_position = substrate.tell() @@ -1783,9 +1793,13 @@ class SingleItemDecoder(object): yield value bytesRead = substrate.tell() - original_position - if bytesRead != length: + if not substrateFun and bytesRead != length: raise PyAsn1Error( "Read %s bytes instead of expected %s." % (bytesRead, length)) + elif substrateFun and bytesRead > length: + # custom substrateFun may be used for partial decoding, reading less is expected there + raise PyAsn1Error( + "Read %s bytes are more than expected %s." % (bytesRead, length)) if LOG: LOG('codec %s yields type %s, value:\n%s\n...' % ( @@ -1959,6 +1973,27 @@ class Decoder(object): may not be required. Most common reason for it to require is that ASN.1 structure is encoded in *IMPLICIT* tagging mode. + substrateFun: :py:class:`Union[ + Callable[[pyasn1.type.base.PyAsn1Item, bytes, int], + Tuple[pyasn1.type.base.PyAsn1Item, bytes]], + Callable[[pyasn1.type.base.PyAsn1Item, io.BytesIO, int, dict], + Generator[Union[pyasn1.type.base.PyAsn1Item, + pyasn1.error.SubstrateUnderrunError], + None, None]] + ]` + User callback meant to generalize special use cases like non-recursive or + partial decoding. A 3-arg non-streaming variant is supported for backwards + compatiblilty in addition to the newer 4-arg streaming variant. + The callback will receive the uninitialized object recovered from substrate + as 1st argument, the uninterpreted payload as 2nd argument, and the length + of the uninterpreted payload as 3rd argument. The streaming variant will + additionally receive the decode(..., **options) kwargs as 4th argument. + The non-streaming variant shall return an object that will be propagated + as decode() return value as 1st item, and the remainig payload for further + decode passes as 2nd item. + The streaming variant shall yield an object that will be propagated as + decode() return value, and leave the remaining payload in the stream. + Returns ------- : :py:class:`tuple` @@ -1997,6 +2032,31 @@ class Decoder(object): """ substrate = asSeekableStream(substrate) + if "substrateFun" in options: + origSubstrateFun = options["substrateFun"] + + def substrateFunWrapper(asn1Object, substrate, length, options=None): + """Support both 0.4 and 0.5 style APIs. + + substrateFun API has changed in 0.5 for use with streaming decoders. To stay backwards compatible, + we first try if we received a streaming user callback. If that fails,we assume we've received a + non-streaming v0.4 user callback and convert it for streaming on the fly + """ + try: + substrate_gen = origSubstrateFun(asn1Object, substrate, length, options) + except TypeError: + _type, _value, traceback = sys.exc_info() + if traceback.tb_next: + # Traceback depth > 1 means TypeError from inside user provided function + raise + # invariant maintained at Decoder.__call__ entry + assert isinstance(substrate, io.BytesIO) # nosec assert_used + substrate_gen = Decoder._callSubstrateFunV4asV5(origSubstrateFun, asn1Object, substrate, length) + for value in substrate_gen: + yield value + + options["substrateFun"] = substrateFunWrapper + streamingDecoder = cls.STREAMING_DECODER( substrate, asn1Spec, **options) @@ -2012,6 +2072,16 @@ class Decoder(object): return asn1Object, tail + @staticmethod + def _callSubstrateFunV4asV5(substrateFunV4, asn1Object, substrate, length): + substrate_bytes = substrate.read() + if length == -1: + length = len(substrate_bytes) + value, nextSubstrate = substrateFunV4(asn1Object, substrate_bytes, length) + nbytes = substrate.write(nextSubstrate) + substrate.truncate() + substrate.seek(-nbytes, os.SEEK_CUR) + yield value #: Turns BER octet stream into an ASN.1 object. #: diff --git a/contrib/python/pyasn1/py3/tests/codec/ber/test_decoder.py b/contrib/python/pyasn1/py3/tests/codec/ber/test_decoder.py index 9e238cd458..35d12d0536 100644 --- a/contrib/python/pyasn1/py3/tests/codec/ber/test_decoder.py +++ b/contrib/python/pyasn1/py3/tests/codec/ber/test_decoder.py @@ -141,12 +141,24 @@ class BitStringDecoderTestCase(BaseTestCase): substrateFun=lambda a, b, c, d: streaming.readFromStream(b, c) ) == (ints2octs((3, 2, 0, 169, 3, 2, 1, 138)), str2octs('')) + def testDefModeChunkedSubstV04(self): + assert decoder.decode( + ints2octs((35, 8, 3, 2, 0, 169, 3, 2, 1, 138)), + substrateFun=lambda a, b, c: (b, b[c:]) + ) == (ints2octs((3, 2, 0, 169, 3, 2, 1, 138)), str2octs('')) + def testIndefModeChunkedSubst(self): assert decoder.decode( ints2octs((35, 128, 3, 2, 0, 169, 3, 2, 1, 138, 0, 0)), substrateFun=lambda a, b, c, d: streaming.readFromStream(b, c) ) == (ints2octs((3, 2, 0, 169, 3, 2, 1, 138, 0, 0)), str2octs('')) + def testIndefModeChunkedSubstV04(self): + assert decoder.decode( + ints2octs((35, 128, 3, 2, 0, 169, 3, 2, 1, 138, 0, 0)), + substrateFun=lambda a, b, c: (b, b[c:]) + ) == (ints2octs((3, 2, 0, 169, 3, 2, 1, 138, 0, 0)), str2octs('')) + def testTypeChecking(self): try: decoder.decode(ints2octs((35, 4, 2, 2, 42, 42))) @@ -185,6 +197,13 @@ class OctetStringDecoderTestCase(BaseTestCase): substrateFun=lambda a, b, c, d: streaming.readFromStream(b, c) ) == (ints2octs((4, 4, 81, 117, 105, 99, 4, 4, 107, 32, 98, 114, 4, 4, 111, 119, 110, 32, 4, 3, 102, 111, 120)), str2octs('')) + def testDefModeChunkedSubstV04(self): + assert decoder.decode( + ints2octs( + (36, 23, 4, 4, 81, 117, 105, 99, 4, 4, 107, 32, 98, 114, 4, 4, 111, 119, 110, 32, 4, 3, 102, 111, 120)), + substrateFun=lambda a, b, c: (b, b[c:]) + ) == (ints2octs((4, 4, 81, 117, 105, 99, 4, 4, 107, 32, 98, 114, 4, 4, 111, 119, 110, 32, 4, 3, 102, 111, 120)), str2octs('')) + def testIndefModeChunkedSubst(self): assert decoder.decode( ints2octs((36, 128, 4, 4, 81, 117, 105, 99, 4, 4, 107, 32, 98, 114, 4, 4, 111, 119, 110, 32, 4, 3, 102, 111, @@ -193,6 +212,14 @@ class OctetStringDecoderTestCase(BaseTestCase): ) == (ints2octs( (4, 4, 81, 117, 105, 99, 4, 4, 107, 32, 98, 114, 4, 4, 111, 119, 110, 32, 4, 3, 102, 111, 120, 0, 0)), str2octs('')) + def testIndefModeChunkedSubstV04(self): + assert decoder.decode( + ints2octs((36, 128, 4, 4, 81, 117, 105, 99, 4, 4, 107, 32, 98, 114, 4, 4, 111, 119, 110, 32, 4, 3, 102, 111, + 120, 0, 0)), + substrateFun=lambda a, b, c: (b, b[c:]) + ) == (ints2octs( + (4, 4, 81, 117, 105, 99, 4, 4, 107, 32, 98, 114, 4, 4, 111, 119, 110, 32, 4, 3, 102, 111, 120, 0, 0)), str2octs('')) + class ExpTaggedOctetStringDecoderTestCase(BaseTestCase): def setUp(self): @@ -245,6 +272,12 @@ class ExpTaggedOctetStringDecoderTestCase(BaseTestCase): substrateFun=lambda a, b, c, d: streaming.readFromStream(b, c) ) == (ints2octs((4, 15, 81, 117, 105, 99, 107, 32, 98, 114, 111, 119, 110, 32, 102, 111, 120)), str2octs('')) + def testDefModeSubstV04(self): + assert decoder.decode( + ints2octs((101, 17, 4, 15, 81, 117, 105, 99, 107, 32, 98, 114, 111, 119, 110, 32, 102, 111, 120)), + substrateFun=lambda a, b, c: (b, b[c:]) + ) == (ints2octs((4, 15, 81, 117, 105, 99, 107, 32, 98, 114, 111, 119, 110, 32, 102, 111, 120)), str2octs('')) + def testIndefModeSubst(self): assert decoder.decode( ints2octs(( @@ -254,6 +287,15 @@ class ExpTaggedOctetStringDecoderTestCase(BaseTestCase): ) == (ints2octs( (36, 128, 4, 15, 81, 117, 105, 99, 107, 32, 98, 114, 111, 119, 110, 32, 102, 111, 120, 0, 0, 0, 0)), str2octs('')) + def testIndefModeSubstV04(self): + assert decoder.decode( + ints2octs(( + 101, 128, 36, 128, 4, 15, 81, 117, 105, 99, 107, 32, 98, 114, 111, 119, 110, 32, 102, 111, 120, 0, + 0, 0, 0)), + substrateFun=lambda a, b, c: (b, b[c:]) + ) == (ints2octs( + (36, 128, 4, 15, 81, 117, 105, 99, 107, 32, 98, 114, 111, 119, 110, 32, 102, 111, 120, 0, 0, 0, 0)), str2octs('')) + class NullDecoderTestCase(BaseTestCase): def testNull(self): @@ -680,6 +722,12 @@ class SequenceDecoderTestCase(BaseTestCase): substrateFun=lambda a, b, c, d: streaming.readFromStream(b, c) ) == (ints2octs((5, 0, 4, 11, 113, 117, 105, 99, 107, 32, 98, 114, 111, 119, 110, 2, 1, 1)), str2octs('')) + def testWithOptionalAndDefaultedDefModeSubstV04(self): + assert decoder.decode( + ints2octs((48, 18, 5, 0, 4, 11, 113, 117, 105, 99, 107, 32, 98, 114, 111, 119, 110, 2, 1, 1)), + substrateFun=lambda a, b, c: (b, b[c:]) + ) == (ints2octs((5, 0, 4, 11, 113, 117, 105, 99, 107, 32, 98, 114, 111, 119, 110, 2, 1, 1)), str2octs('')) + def testWithOptionalAndDefaultedIndefModeSubst(self): assert decoder.decode( ints2octs((48, 128, 5, 0, 36, 128, 4, 11, 113, 117, 105, 99, 107, 32, 98, 114, 111, 119, 110, 0, 0, 2, 1, 1, 0, 0)), @@ -687,6 +735,13 @@ class SequenceDecoderTestCase(BaseTestCase): ) == (ints2octs( (5, 0, 36, 128, 4, 11, 113, 117, 105, 99, 107, 32, 98, 114, 111, 119, 110, 0, 0, 2, 1, 1, 0, 0)), str2octs('')) + def testWithOptionalAndDefaultedIndefModeSubstV04(self): + assert decoder.decode( + ints2octs((48, 128, 5, 0, 36, 128, 4, 11, 113, 117, 105, 99, 107, 32, 98, 114, 111, 119, 110, 0, 0, 2, 1, 1, 0, 0)), + substrateFun=lambda a, b, c: (b, b[c:]) + ) == (ints2octs( + (5, 0, 36, 128, 4, 11, 113, 117, 105, 99, 107, 32, 98, 114, 111, 119, 110, 0, 0, 2, 1, 1, 0, 0)), str2octs('')) + def testTagFormat(self): try: decoder.decode( @@ -1166,6 +1221,12 @@ class SetDecoderTestCase(BaseTestCase): substrateFun=lambda a, b, c, d: streaming.readFromStream(b, c) ) == (ints2octs((5, 0, 4, 11, 113, 117, 105, 99, 107, 32, 98, 114, 111, 119, 110, 2, 1, 1)), str2octs('')) + def testWithOptionalAndDefaultedDefModeSubstV04(self): + assert decoder.decode( + ints2octs((49, 18, 5, 0, 4, 11, 113, 117, 105, 99, 107, 32, 98, 114, 111, 119, 110, 2, 1, 1)), + substrateFun=lambda a, b, c: (b, b[c:]) + ) == (ints2octs((5, 0, 4, 11, 113, 117, 105, 99, 107, 32, 98, 114, 111, 119, 110, 2, 1, 1)), str2octs('')) + def testWithOptionalAndDefaultedIndefModeSubst(self): assert decoder.decode( ints2octs((49, 128, 5, 0, 36, 128, 4, 11, 113, 117, 105, 99, 107, 32, 98, 114, 111, 119, 110, 0, 0, 2, 1, 1, 0, 0)), @@ -1173,6 +1234,13 @@ class SetDecoderTestCase(BaseTestCase): ) == (ints2octs( (5, 0, 36, 128, 4, 11, 113, 117, 105, 99, 107, 32, 98, 114, 111, 119, 110, 0, 0, 2, 1, 1, 0, 0)), str2octs('')) + def testWithOptionalAndDefaultedIndefModeSubstV04(self): + assert decoder.decode( + ints2octs((49, 128, 5, 0, 36, 128, 4, 11, 113, 117, 105, 99, 107, 32, 98, 114, 111, 119, 110, 0, 0, 2, 1, 1, 0, 0)), + substrateFun=lambda a, b, c: (b, b[c:]) + ) == (ints2octs( + (5, 0, 36, 128, 4, 11, 113, 117, 105, 99, 107, 32, 98, 114, 111, 119, 110, 0, 0, 2, 1, 1, 0, 0)), str2octs('')) + def testTagFormat(self): try: decoder.decode( @@ -1498,6 +1566,13 @@ class AnyDecoderTestCase(BaseTestCase): substrateFun=lambda a, b, c, d: streaming.readFromStream(b, c) ) == (ints2octs((4, 3, 102, 111, 120)), str2octs('')) + def testByUntaggedSubstV04(self): + assert decoder.decode( + ints2octs((4, 3, 102, 111, 120)), + asn1Spec=self.s, + substrateFun=lambda a, b, c: (b, b[c:]) + ) == (ints2octs((4, 3, 102, 111, 120)), str2octs('')) + def testTaggedExSubst(self): assert decoder.decode( ints2octs((164, 5, 4, 3, 102, 111, 120)), @@ -1505,6 +1580,13 @@ class AnyDecoderTestCase(BaseTestCase): substrateFun=lambda a, b, c, d: streaming.readFromStream(b, c) ) == (ints2octs((164, 5, 4, 3, 102, 111, 120)), str2octs('')) + def testTaggedExSubstV04(self): + assert decoder.decode( + ints2octs((164, 5, 4, 3, 102, 111, 120)), + asn1Spec=self.s, + substrateFun=lambda a, b, c: (b, b[c:]) + ) == (ints2octs((164, 5, 4, 3, 102, 111, 120)), str2octs('')) + class EndOfOctetsTestCase(BaseTestCase): def testUnexpectedEoo(self): @@ -1841,6 +1923,50 @@ class CompressedFilesTestCase(BaseTestCase): os.remove(path) +class NonStreamingCompatibilityTestCase(BaseTestCase): + def setUp(self): + from pyasn1 import debug + BaseTestCase.setUp(self) + debug.setLogger(None) # undo logger setup from BaseTestCase to work around unrelated issue + + def testPartialDecodeWithCustomSubstrateFun(self): + snmp_req_substrate = ints2octs(( + 0x30, 0x22, 0x02, 0x01, 0x01, 0x04, 0x06, 0x70, 0x75, 0x62, 0x6c, 0x69, 0x63, 0xa0, 0x15, 0x02, 0x04, 0x69, + 0x30, 0xdb, 0xeb, 0x02, 0x01, 0x00, 0x02, 0x01, 0x00, 0x30, 0x07, 0x30, 0x05, 0x06, 0x01, 0x01, 0x05, 0x00)) + seq, next_substrate = decoder.decode( + snmp_req_substrate, asn1Spec=univ.Sequence(), + recursiveFlag=False, substrateFun=lambda a, b, c: (a, b[:c]) + ) + assert seq.isSameTypeWith(univ.Sequence) + assert next_substrate == snmp_req_substrate[2:] + version, next_substrate = decoder.decode( + next_substrate, asn1Spec=univ.Integer(), recursiveFlag=False, + substrateFun=lambda a, b, c: (a, b[:c]) + ) + assert version == 1 + + def testPartialDecodeWithDefaultSubstrateFun(self): + substrate = ints2octs(( + 0x04, 0x0e, 0x30, 0x0c, 0x06, 0x0a, 0x2b, 0x06, 0x01, 0x04, 0x01, 0x82, 0x37, 0x3c, 0x03, 0x02 + )) + result, rest = decoder.decode(substrate, recursiveFlag=False) + assert result.isSameTypeWith(univ.OctetString) + assert rest == substrate[2:] + + def testPropagateUserException(self): + substrate = io.BytesIO(ints2octs((0x04, 0x00))) + + def userSubstrateFun(_asn1Object, _substrate, _length, _options): + raise TypeError("error inside user function") + + try: + decoder.decode(substrate, asn1Spec=univ.OctetString, substrateFun=userSubstrateFun) + except TypeError as exc: + assert str(exc) == "error inside user function" + else: + raise AssertionError("decode() must not hide TypeError from inside user provided callback") + + suite = unittest.TestLoader().loadTestsFromModule(sys.modules[__name__]) if __name__ == '__main__': diff --git a/contrib/python/pyasn1/py3/ya.make b/contrib/python/pyasn1/py3/ya.make index 772312ad0e..4e59f746fa 100644 --- a/contrib/python/pyasn1/py3/ya.make +++ b/contrib/python/pyasn1/py3/ya.make @@ -2,7 +2,7 @@ PY3_LIBRARY() -VERSION(0.5.0) +VERSION(0.5.1) LICENSE(BSD-3-Clause) diff --git a/ydb/core/driver_lib/cli_utils/cli_cmds_server.cpp b/ydb/core/driver_lib/cli_utils/cli_cmds_server.cpp index b466476a18..7496bf45f3 100644 --- a/ydb/core/driver_lib/cli_utils/cli_cmds_server.cpp +++ b/ydb/core/driver_lib/cli_utils/cli_cmds_server.cpp @@ -325,14 +325,14 @@ protected: } void AddLabelToAppConfig(const TString& name, const TString& value) { - for (auto &label : *RunConfig.AppConfig.MutableLabels()) { + for (auto &label : *AppConfig.MutableLabels()) { if (label.GetName() == name) { label.SetValue(value); return; } } - auto *label = RunConfig.AppConfig.AddLabels(); + auto *label = AppConfig.AddLabels(); label->SetName(name); label->SetValue(value); } @@ -414,7 +414,7 @@ protected: RunConfig.Labels["dynamic"] = ToString(NodeBrokerAddresses.empty() ? "false" : "true"); for (const auto& [name, value] : RunConfig.Labels) { - auto *label = RunConfig.AppConfig.AddLabels(); + auto *label = AppConfig.AddLabels(); label->SetName(name); label->SetValue(value); } @@ -594,7 +594,7 @@ protected: if (!AppConfig.HasRestartsCountConfig() && RestartsCountFile) AppConfig.MutableRestartsCountConfig()->SetRestartsCountFile(RestartsCountFile); - // Ports and node type are always applied (event if config was loaded from CMS). + // Ports and node type are always applied (even if config was loaded from CMS). if (MonitoringPort) AppConfig.MutableMonitoringConfig()->SetMonitoringPort(MonitoringPort); if (MonitoringAddress) @@ -608,7 +608,7 @@ protected: } } if (SqsHttpPort) - RunConfig.AppConfig.MutableSqsConfig()->MutableHttpServerConfig()->SetPort(SqsHttpPort); + AppConfig.MutableSqsConfig()->MutableHttpServerConfig()->SetPort(SqsHttpPort); if (GRpcPort) { auto& conf = *AppConfig.MutableGRpcConfig(); conf.SetStartGRpcProxy(true); @@ -742,13 +742,13 @@ protected: messageBusConfig->SetTracePath(TracePath); } - if (RunConfig.AppConfig.HasDynamicNameserviceConfig()) { - bool isDynamic = RunConfig.NodeId > RunConfig.AppConfig.GetDynamicNameserviceConfig().GetMaxStaticNodeId(); + if (AppConfig.HasDynamicNameserviceConfig()) { + bool isDynamic = RunConfig.NodeId > AppConfig.GetDynamicNameserviceConfig().GetMaxStaticNodeId(); RunConfig.Labels["dynamic"] = ToString(isDynamic ? "true" : "false"); AddLabelToAppConfig("node_id", RunConfig.Labels["node_id"]); } - RunConfig.ClusterName = RunConfig.AppConfig.GetNameserviceConfig().GetClusterUUID(); + RunConfig.ClusterName = AppConfig.GetNameserviceConfig().GetClusterUUID(); } inline bool LoadConfigFromCMS() { @@ -936,24 +936,6 @@ protected: } } - void MaybeRegisterAndLoadConfigs() - { - // static node - if (NodeBrokerAddresses.empty() && !NodeBrokerPort) { - if (!NodeId) { - ythrow yexception() << "Either --node [NUM|'static'] or --node-broker[-port] should be specified"; - } - - if (!HierarchicalCfg && RunConfig.PathToConfigCacheFile) - LoadCachedConfigsForStaticNode(); - return; - } - - RegisterDynamicNode(); - if (!HierarchicalCfg && !IgnoreCmsConfigs) - LoadConfigForDynamicNode(); - } - TNodeLocation CreateNodeLocation() { NActorsInterconnect::TNodeLocation location; location.SetDataCenter(DataCenter); @@ -1055,7 +1037,7 @@ protected: } } else { Y_ABORT_UNLESS(NodeBrokerPort); - for (auto &node : RunConfig.AppConfig.MutableNameserviceConfig()->GetNode()) { + for (auto &node : AppConfig.MutableNameserviceConfig()->GetNode()) { addrs.emplace_back(TStringBuilder() << (NodeBrokerUseTls ? "grpcs://" : "") << node.GetHost() << ':' << NodeBrokerPort); } } @@ -1111,10 +1093,10 @@ protected: } RunConfig.ScopeId = TKikimrScopeId(scopeId); - auto &nsConfig = *RunConfig.AppConfig.MutableNameserviceConfig(); + auto &nsConfig = *AppConfig.MutableNameserviceConfig(); nsConfig.ClearNode(); - auto &dnConfig = *RunConfig.AppConfig.MutableDynamicNodeConfig(); + auto &dnConfig = *AppConfig.MutableDynamicNodeConfig(); for (auto &node : result.GetNodes()) { if (node.NodeId == result.GetNodeId()) { auto nodeInfo = dnConfig.MutableNodeInfo(); @@ -1223,10 +1205,10 @@ protected: RunConfig.NodeId = result->GetNodeId(); RunConfig.ScopeId = TKikimrScopeId(result->GetScopeId()); - auto &nsConfig = *RunConfig.AppConfig.MutableNameserviceConfig(); + auto &nsConfig = *AppConfig.MutableNameserviceConfig(); nsConfig.ClearNode(); - auto &dnConfig = *RunConfig.AppConfig.MutableDynamicNodeConfig(); + auto &dnConfig = *AppConfig.MutableDynamicNodeConfig(); for (auto &node : result->Record().GetNodes()) { if (node.GetNodeId() == result->GetNodeId()) { dnConfig.MutableNodeInfo()->CopyFrom(node); diff --git a/ydb/core/fq/libs/compute/ydb/status_tracker_actor.cpp b/ydb/core/fq/libs/compute/ydb/status_tracker_actor.cpp index 659ae14b6e..ef628906ce 100644 --- a/ydb/core/fq/libs/compute/ydb/status_tracker_actor.cpp +++ b/ydb/core/fq/libs/compute/ydb/status_tracker_actor.cpp @@ -96,6 +96,11 @@ public: void Handle(const TEvents::TEvForwardPingResponse::TPtr& ev) { auto pingCounters = Counters.GetCounters(ERequestType::RT_PING); pingCounters->InFly->Dec(); + + if (ev->Cookie) { + return; + } + pingCounters->LatencyMs->Collect((TInstant::Now() - StartTime).MilliSeconds()); if (ev.Get()->Get()->Success) { pingCounters->Ok->Inc(); @@ -125,6 +130,8 @@ public: case NYdb::NQuery::EExecStatus::Unspecified: case NYdb::NQuery::EExecStatus::Starting: SendGetOperation(TDuration::MilliSeconds(BackoffTimer.NextBackoffMs())); + QueryStats = response.QueryStats; + UpdateProgress(); break; case NYdb::NQuery::EExecStatus::Aborted: case NYdb::NQuery::EExecStatus::Canceled: @@ -150,6 +157,19 @@ public: Register(new TRetryActor<TEvYdbCompute::TEvGetOperationRequest, TEvYdbCompute::TEvGetOperationResponse, NYdb::TOperation::TOperationId>(Counters.GetCounters(ERequestType::RT_GET_OPERATION), delay, SelfId(), Connector, OperationId)); } + void UpdateProgress() { + auto pingCounters = Counters.GetCounters(ERequestType::RT_PING); + pingCounters->InFly->Inc(); + Fq::Private::PingTaskRequest pingTaskRequest; + PrepareAstAndPlan(pingTaskRequest, QueryStats.query_plan(), QueryStats.query_ast()); + try { + pingTaskRequest.set_statistics(GetV1StatFromV2Plan(QueryStats.query_plan())); + } catch(const NJson::TJsonException& ex) { + LOG_E("Error statistics conversion: " << ex.what()); + } + Send(Pinger, new TEvents::TEvForwardPingRequest(pingTaskRequest), 0, 1); + } + void Failed() { LOG_I("Execution status: Failed, Status: " << Status << ", StatusCode: " << NYql::NDqProto::StatusIds::StatusCode_Name(StatusCode) << " Issues: " << Issues.ToOneLineString()); auto pingCounters = Counters.GetCounters(ERequestType::RT_PING); @@ -158,6 +178,11 @@ public: NYql::IssuesToMessage(Issues, pingTaskRequest.mutable_issues()); pingTaskRequest.set_pending_status_code(StatusCode); PrepareAstAndPlan(pingTaskRequest, QueryStats.query_plan(), QueryStats.query_ast()); + try { + pingTaskRequest.set_statistics(GetV1StatFromV2Plan(QueryStats.query_plan())); + } catch(const NJson::TJsonException& ex) { + LOG_E("Error statistics conversion: " << ex.what()); + } Send(Pinger, new TEvents::TEvForwardPingRequest(pingTaskRequest)); } diff --git a/ydb/core/kqp/common/events/query.h b/ydb/core/kqp/common/events/query.h index d365551e19..2f269d85f3 100644 --- a/ydb/core/kqp/common/events/query.h +++ b/ydb/core/kqp/common/events/query.h @@ -274,6 +274,14 @@ public: return UserRequestContext; } + void SetProgressStatsPeriod(TDuration progressStatsPeriod) { + ProgressStatsPeriod = progressStatsPeriod; + } + + TDuration GetProgressStatsPeriod() const { + return ProgressStatsPeriod; + } + mutable NKikimrKqp::TEvQueryRequest Record; private: @@ -301,6 +309,7 @@ private: TDuration CancelAfter; const ::Ydb::Query::Syntax Syntax = Ydb::Query::Syntax::SYNTAX_UNSPECIFIED; TIntrusivePtr<TUserRequestContext> UserRequestContext; + TDuration ProgressStatsPeriod; }; struct TEvDataQueryStreamPart: public TEventPB<TEvDataQueryStreamPart, diff --git a/ydb/core/kqp/executer_actor/kqp_executer_impl.h b/ydb/core/kqp/executer_actor/kqp_executer_impl.h index a3fb572a8b..aeec263682 100644 --- a/ydb/core/kqp/executer_actor/kqp_executer_impl.h +++ b/ydb/core/kqp/executer_actor/kqp_executer_impl.h @@ -23,6 +23,7 @@ #include <ydb/core/kqp/common/kqp_yql.h> #include <ydb/core/kqp/common/kqp_user_request_context.h> #include <ydb/core/kqp/federated_query/kqp_federated_query_actors.h> +#include <ydb/core/kqp/opt/kqp_query_plan.h> #include <ydb/core/grpc_services/local_rate_limiter.h> #include <ydb/services/metadata/secret/fetcher.h> @@ -243,6 +244,23 @@ protected: << ", state: " << NYql::NDqProto::EComputeState_Name((NYql::NDqProto::EComputeState) state.GetState()) << ", stats: " << state.GetStats()); + if (Stats && state.HasStats() && Request.ProgressStatsPeriod) { + Stats->UpdateTaskStats(taskId, state.GetStats()); + auto now = TInstant::Now(); + if (LastProgressStats + Request.ProgressStatsPeriod <= now) { + auto progress = MakeHolder<TEvKqpExecuter::TEvExecuterProgress>(); + auto& execStats = *progress->Record.MutableQueryStats()->AddExecutions(); + Stats->ExportExecStats(execStats); + for (ui32 txId = 0; txId < Request.Transactions.size(); ++txId) { + const auto& tx = Request.Transactions[txId].Body; + auto planWithStats = AddExecStatsToTxPlan(tx->GetPlan(), execStats); + execStats.AddTxPlansWithStats(planWithStats); + } + this->Send(Target, progress.Release()); + LastProgressStats = now; + } + } + switch (state.GetState()) { case NYql::NDqProto::COMPUTE_STATE_UNKNOWN: { YQL_ENSURE(false, "unexpected state from " << computeActor << ", task: " << taskId); @@ -1670,6 +1688,7 @@ protected: const TIntrusiveConstPtr<NACLib::TUserToken> UserToken; TKqpRequestCounters::TPtr Counters; std::shared_ptr<TQueryExecutionStats> Stats; + TInstant LastProgressStats; TInstant StartTime; TMaybe<TInstant> Deadline; TMaybe<TInstant> CancelAt; diff --git a/ydb/core/kqp/executer_actor/kqp_executer_stats.cpp b/ydb/core/kqp/executer_actor/kqp_executer_stats.cpp index 584ea11e4f..55393ba34d 100644 --- a/ydb/core/kqp/executer_actor/kqp_executer_stats.cpp +++ b/ydb/core/kqp/executer_actor/kqp_executer_stats.cpp @@ -6,6 +6,154 @@ namespace NKikimr::NKqp { using namespace NYql; using namespace NYql::NDq; +void TAsyncStats::Resize(ui32 taskCount) { + Bytes.resize(taskCount); + Rows.resize(taskCount); + Chunks.resize(taskCount); + Splits.resize(taskCount); + FirstMessageMs.resize(taskCount); + PauseMessageMs.resize(taskCount); + ResumeMessageMs.resize(taskCount); + LastMessageMs.resize(taskCount); + WaitTimeUs.resize(taskCount); + WaitPeriods.resize(taskCount); + ActiveTimeUs.resize(taskCount); +} + +void TAsyncBufferStats::Resize(ui32 taskCount) { + Ingress.Resize(taskCount); + Push.Resize(taskCount); + Pop.Resize(taskCount); + Egress.Resize(taskCount); +} + +void TTableStats::Resize(ui32 taskCount) { + ReadRows.resize(taskCount); + ReadBytes.resize(taskCount); + WriteRows.resize(taskCount); + WriteBytes.resize(taskCount); + EraseRows.resize(taskCount); + EraseBytes.resize(taskCount); + AffectedPartitions.resize(taskCount); +} + +void TStageExecutionStats::Resize(ui32 taskCount) { + CpuTimeUs.resize(taskCount); + SourceCpuTimeUs.resize(taskCount); + + InputRows.resize(taskCount); + InputBytes.resize(taskCount); + OutputRows.resize(taskCount); + OutputBytes.resize(taskCount); + + FirstRowTimeMs.resize(taskCount); + FinishTimeMs.resize(taskCount); + StartTimeMs.resize(taskCount); + + for (auto& p : Ingress) p.second.Resize(taskCount); + for (auto& p : Egress) p.second.Resize(taskCount); + for (auto& p : Input) p.second.Resize(taskCount); + for (auto& p : Output) p.second.Resize(taskCount); + + MaxMemoryUsage.resize(taskCount); +} + +void TStageExecutionStats::UpdateAsyncStats(i32 index, TAsyncStats& aggrAsyncStats, const NYql::NDqProto::TDqAsyncBufferStats& asyncStats) { + aggrAsyncStats.Bytes[index] = asyncStats.GetBytes(); + aggrAsyncStats.Rows[index] = asyncStats.GetRows(); + aggrAsyncStats.Chunks[index] = asyncStats.GetChunks(); + aggrAsyncStats.Splits[index] = asyncStats.GetSplits(); + + auto firstMessageMs = asyncStats.GetFirstMessageMs(); + aggrAsyncStats.FirstMessageMs[index] = firstMessageMs; + aggrAsyncStats.PauseMessageMs[index] = asyncStats.GetPauseMessageMs(); + aggrAsyncStats.ResumeMessageMs[index] = asyncStats.GetResumeMessageMs(); + auto lastMessageMs = asyncStats.GetLastMessageMs(); + aggrAsyncStats.LastMessageMs[index] = lastMessageMs; + aggrAsyncStats.WaitTimeUs[index] = asyncStats.GetWaitTimeUs(); + aggrAsyncStats.WaitPeriods[index] = asyncStats.GetWaitPeriods(); + if (firstMessageMs && lastMessageMs > firstMessageMs) { + aggrAsyncStats.ActiveTimeUs[index] = lastMessageMs - firstMessageMs; + } +} + +void TStageExecutionStats::UpdateStats(const NYql::NDqProto::TDqTaskStats& taskStats, ui64 maxMemoryUsage) { + auto taskId = taskStats.GetTaskId(); + auto it = Task2Index.find(taskId); + + ui32 taskCount = Task2Index.size(); + + ui32 index; + if (it == Task2Index.end()) { + index = taskCount++; + Task2Index.emplace(taskId, index); + Resize(taskCount); + } else { + index = it->second; + } + + CpuTimeUs[index] = taskStats.GetCpuTimeUs(); + SourceCpuTimeUs[index] = taskStats.GetSourceCpuTimeUs(); + + InputRows[index] = taskStats.GetInputRows(); + InputBytes[index] = taskStats.GetInputBytes(); + OutputRows[index] = taskStats.GetOutputRows(); + OutputBytes[index] = taskStats.GetOutputBytes(); + + StartTimeMs[index] = taskStats.GetStartTimeMs(); // to be reviewed + FirstRowTimeMs[index] = taskStats.GetFirstRowTimeMs(); // to be reviewed + FinishTimeMs[index] = taskStats.GetFinishTimeMs(); // to be reviewed + + for (auto& tableStat : taskStats.GetTables()) { + auto tablePath = tableStat.GetTablePath(); + auto [it, inserted] = Tables.try_emplace(tablePath, taskCount); + auto& aggrTableStats = it->second; + aggrTableStats.ReadRows[index] = tableStat.GetReadRows(); + aggrTableStats.ReadBytes[index] = tableStat.GetReadBytes(); + aggrTableStats.WriteRows[index] = tableStat.GetWriteRows(); + aggrTableStats.WriteBytes[index] = tableStat.GetWriteBytes(); + aggrTableStats.EraseRows[index] = tableStat.GetEraseRows(); + aggrTableStats.EraseBytes[index] = tableStat.GetEraseBytes(); + aggrTableStats.AffectedPartitions[index] = tableStat.GetAffectedPartitions(); + } + + for (auto& sourceStat : taskStats.GetSources()) { + auto ingressName = sourceStat.GetIngressName(); + auto [it, inserted] = Ingress.try_emplace(ingressName, taskCount); + auto& asyncBufferStats = it->second; + UpdateAsyncStats(index, asyncBufferStats.Ingress, sourceStat.GetIngress()); + UpdateAsyncStats(index, asyncBufferStats.Push, sourceStat.GetPush()); + UpdateAsyncStats(index, asyncBufferStats.Pop, sourceStat.GetPop()); + } + + for (auto& inputChannelStat : taskStats.GetInputChannels()) { + auto stageId = inputChannelStat.GetSrcStageId(); + auto [it, inserted] = Input.try_emplace(stageId, taskCount); + auto& asyncBufferStats = it->second; + UpdateAsyncStats(index, asyncBufferStats.Push, inputChannelStat.GetPush()); + UpdateAsyncStats(index, asyncBufferStats.Pop, inputChannelStat.GetPop()); + } + + for (auto& outputChannelStat : taskStats.GetOutputChannels()) { + auto stageId = outputChannelStat.GetDstStageId(); + auto [it, inserted] = Output.try_emplace(stageId, taskCount); + auto& asyncBufferStats = it->second; + UpdateAsyncStats(index, asyncBufferStats.Push, outputChannelStat.GetPush()); + UpdateAsyncStats(index, asyncBufferStats.Pop, outputChannelStat.GetPop()); + } + + for (auto& sinkStat : taskStats.GetSinks()) { + auto egressName = sinkStat.GetEgressName(); + auto [it, inserted] = Egress.try_emplace(egressName, taskCount); + auto& asyncBufferStats = it->second; + UpdateAsyncStats(index, asyncBufferStats.Push, sinkStat.GetPush()); + UpdateAsyncStats(index, asyncBufferStats.Pop, sinkStat.GetPop()); + UpdateAsyncStats(index, asyncBufferStats.Ingress, sinkStat.GetEgress()); + } + + MaxMemoryUsage[index] = maxMemoryUsage; +} + namespace { TTableStat operator - (const TTableStat& l, const TTableStat& r) { @@ -82,11 +230,9 @@ void UpdateMinMax(NDqProto::TDqStatsMinMax* minMax, ui64 value) noexcept { minMax->SetMax(std::max(minMax->GetMax(), value)); } -NDqProto::TDqStageStats* GetOrCreateStageStats(const NYql::NDqProto::TDqTaskStats& taskStats, +NDqProto::TDqStageStats* GetOrCreateStageStats(const NYql::NDq::TStageId& stageId, const TKqpTasksGraph& tasksGraph, NDqProto::TDqExecutionStats& execStats) { - auto& task = tasksGraph.GetTask(taskStats.GetTaskId()); - auto& stageId = task.StageId; auto& stageInfo = tasksGraph.GetStageInfo(stageId); auto& stageProto = stageInfo.Meta.Tx.Body->GetStages(stageId.StageId); @@ -103,6 +249,13 @@ NDqProto::TDqStageStats* GetOrCreateStageStats(const NYql::NDqProto::TDqTaskStat return newStage; } +NDqProto::TDqStageStats* GetOrCreateStageStats(const NYql::NDqProto::TDqTaskStats& taskStats, + const TKqpTasksGraph& tasksGraph, NDqProto::TDqExecutionStats& execStats) +{ + auto& task = tasksGraph.GetTask(taskStats.GetTaskId()); + return GetOrCreateStageStats(task.StageId, tasksGraph, execStats); +} + NDqProto::TDqTableAggrStats* GetOrCreateTableAggrStats(NDqProto::TDqStageStats* stage, const TString& tablePath) { for(auto& table : *stage->MutableTables()) { if (table.GetTablePath() == tablePath) { @@ -411,6 +564,144 @@ void TQueryExecutionStats::AddDatashardStats(NYql::NDqProto::TDqComputeActorStat } } +void TQueryExecutionStats::UpdateTaskStats(ui64 taskId, const NYql::NDqProto::TDqComputeActorStats& stats) { + Y_ASSERT(stats.GetTasks().size() == 1); + const NYql::NDqProto::TDqTaskStats& taskStats = stats.GetTasks(0); + Y_ASSERT(taskStats.GetTaskId() == taskId); + auto stageId = taskStats.GetStageId(); + auto [it, inserted] = StageStats.try_emplace(stageId); + if (inserted) { + it->second.StageId = TasksGraph->GetTask(taskStats.GetTaskId()).StageId; + } + it->second.UpdateStats(taskStats, stats.GetMaxMemoryUsage()); +} + +void ExportAggStats(std::vector<ui64>& data, NYql::NDqProto::TDqStatsMinMax& stats) { + ui64 count = 0; + ui64 min = 0; + ui64 max = 0; + for (auto d : data) { + if (d) { + if (count) { + if (min > d) min = d; + if (max < d) max = d; + } else { + min = max = d; + } + count++; + } + } + if (count) { + stats.SetMin(min); + stats.SetMax(max); + } +} + +void ExportAggStats(std::vector<ui64>& data, NYql::NDqProto::TDqStatsAggr& stats) { + ui64 count = 0; + ui64 min = 0; + ui64 max = 0; + ui64 sum = 0; + for (auto d : data) { + if (d) { + if (count) { + if (min > d) min = d; + if (max < d) max = d; + } else { + min = max = d; + } + sum += d; + count++; + } + } + if (count) { + stats.SetMin(min); + stats.SetMax(max); + stats.SetSum(sum); + stats.SetCnt(count); + } +} + +ui64 ExportAggStats(std::vector<ui64>& data) { + ui64 sum = 0; + for (auto d : data) { + sum += d; + } + return sum; +} + +void ExportAggAsyncStats(TAsyncStats& data, NYql::NDqProto::TDqAsyncStatsAggr& stats) { + ExportAggStats(data.Bytes, *stats.MutableBytes()); + ExportAggStats(data.Rows, *stats.MutableRows()); + ExportAggStats(data.Chunks, *stats.MutableChunks()); + ExportAggStats(data.Splits, *stats.MutableSplits()); + ExportAggStats(data.FirstMessageMs, *stats.MutableFirstMessageMs()); + ExportAggStats(data.PauseMessageMs, *stats.MutablePauseMessageMs()); + ExportAggStats(data.ResumeMessageMs, *stats.MutableResumeMessageMs()); + ExportAggStats(data.LastMessageMs, *stats.MutableLastMessageMs()); + ExportAggStats(data.WaitTimeUs, *stats.MutableWaitTimeUs()); + ExportAggStats(data.WaitPeriods, *stats.MutableWaitPeriods()); + ExportAggStats(data.ActiveTimeUs, *stats.MutableActiveTimeUs()); +} + +void ExportAggAsyncBufferStats(TAsyncBufferStats& data, NYql::NDqProto::TDqAsyncBufferStatsAggr& stats) { + ExportAggAsyncStats(data.Ingress, *stats.MutableIngress()); + ExportAggAsyncStats(data.Push, *stats.MutablePush()); + ExportAggAsyncStats(data.Pop, *stats.MutablePop()); + ExportAggAsyncStats(data.Egress, *stats.MutableEgress()); +} + +void TQueryExecutionStats::ExportExecStats(NYql::NDqProto::TDqExecutionStats& stats) { + + THashMap<ui32, NDqProto::TDqStageStats*> protoStages; + for (auto& [stageId, stagetype] : TasksGraph->GetStagesInfo()) { + protoStages.emplace(stageId.StageId, GetOrCreateStageStats(stageId, *TasksGraph, stats)); + } + + for (auto& p : StageStats) { + auto& stageStats = *protoStages[p.second.StageId.StageId]; + stageStats.SetTotalTasksCount(p.second.Task2Index.size()); + + ExportAggStats(p.second.CpuTimeUs, *stageStats.MutableCpuTimeUs()); + ExportAggStats(p.second.SourceCpuTimeUs, *stageStats.MutableSourceCpuTimeUs()); + + ExportAggStats(p.second.InputRows, *stageStats.MutableInputRows()); + ExportAggStats(p.second.InputBytes, *stageStats.MutableInputBytes()); + ExportAggStats(p.second.OutputRows, *stageStats.MutableOutputRows()); + ExportAggStats(p.second.OutputBytes, *stageStats.MutableOutputBytes()); + + ExportAggStats(p.second.StartTimeMs, *stageStats.MutableStartTimeMs()); // to be reviewed + ExportAggStats(p.second.FirstRowTimeMs, *stageStats.MutableFirstRowTimeMs()); // to be reviewed + ExportAggStats(p.second.FinishTimeMs, *stageStats.MutableFinishTimeMs()); // to be reviewed + + stageStats.SetDurationUs((stageStats.GetFinishTimeMs().GetMax() - stageStats.GetStartTimeMs().GetMin()) * 1'000); + + for (auto& p2 : p.second.Tables) { + auto& table = *stageStats.AddTables(); + table.SetTablePath(p2.first); + ExportAggStats(p2.second.ReadRows, *table.MutableReadRows()); + ExportAggStats(p2.second.ReadBytes, *table.MutableReadBytes()); + ExportAggStats(p2.second.WriteRows, *table.MutableWriteRows()); + ExportAggStats(p2.second.WriteBytes, *table.MutableWriteBytes()); + ExportAggStats(p2.second.EraseRows, *table.MutableEraseRows()); + ExportAggStats(p2.second.EraseBytes, *table.MutableEraseBytes()); + table.SetAffectedPartitions(ExportAggStats(p2.second.AffectedPartitions)); + } + for (auto& p2 : p.second.Ingress) { + ExportAggAsyncBufferStats(p2.second, (*stageStats.MutableIngress())[p2.first]); + } + for (auto& p2 : p.second.Input) { + ExportAggAsyncBufferStats(p2.second, (*stageStats.MutableInput())[p2.first]); + } + for (auto& p2 : p.second.Output) { + ExportAggAsyncBufferStats(p2.second, (*stageStats.MutableOutput())[p2.first]); + } + for (auto& p2 : p.second.Egress) { + ExportAggAsyncBufferStats(p2.second, (*stageStats.MutableEgress())[p2.first]); + } + } +} + void TQueryExecutionStats::Finish() { // Cerr << (TStringBuilder() << "-- finish: executerTime: " << ExecuterCpuTime.MicroSeconds() << Endl); diff --git a/ydb/core/kqp/executer_actor/kqp_executer_stats.h b/ydb/core/kqp/executer_actor/kqp_executer_stats.h index 63348022b1..14f1de9b8e 100644 --- a/ydb/core/kqp/executer_actor/kqp_executer_stats.h +++ b/ydb/core/kqp/executer_actor/kqp_executer_stats.h @@ -12,10 +12,94 @@ NYql::NDqProto::EDqStatsMode GetDqStatsModeShard(Ydb::Table::QueryStatsCollectio bool CollectFullStats(Ydb::Table::QueryStatsCollection::Mode statsMode); bool CollectProfileStats(Ydb::Table::QueryStatsCollection::Mode statsMode); +struct TAsyncStats { + // Data + std::vector<ui64> Bytes; + std::vector<ui64> Rows; + std::vector<ui64> Chunks; + std::vector<ui64> Splits; + // Time + std::vector<ui64> FirstMessageMs; + std::vector<ui64> PauseMessageMs; + std::vector<ui64> ResumeMessageMs; + std::vector<ui64> LastMessageMs; + std::vector<ui64> WaitTimeUs; + std::vector<ui64> WaitPeriods; + std::vector<ui64> ActiveTimeUs; + + void Resize(ui32 taskCount); +}; + +struct TAsyncBufferStats { + + TAsyncBufferStats() = default; + TAsyncBufferStats(ui32 taskCount) { + Resize(taskCount); + } + + TAsyncStats Ingress; + TAsyncStats Push; + TAsyncStats Pop; + TAsyncStats Egress; + + void Resize(ui32 taskCount); +}; + +struct TTableStats { + + TTableStats() = default; + TTableStats(ui32 taskCount) { + Resize(taskCount); + } + + std::vector<ui64> ReadRows; + std::vector<ui64> ReadBytes; + std::vector<ui64> WriteRows; + std::vector<ui64> WriteBytes; + std::vector<ui64> EraseRows; + std::vector<ui64> EraseBytes; + + std::vector<ui64> AffectedPartitions; + + void Resize(ui32 taskCount); +}; + +struct TStageExecutionStats { + + NYql::NDq::TStageId StageId; + + std::map<ui32, ui32> Task2Index; + + std::vector<ui64> CpuTimeUs; + std::vector<ui64> SourceCpuTimeUs; + + std::vector<ui64> InputRows; + std::vector<ui64> InputBytes; + std::vector<ui64> OutputRows; + std::vector<ui64> OutputBytes; + + std::vector<ui64> FirstRowTimeMs; + std::vector<ui64> FinishTimeMs; + std::vector<ui64> StartTimeMs; + + std::map<TString, TTableStats> Tables; + std::map<TString, TAsyncBufferStats> Ingress; + std::map<TString, TAsyncBufferStats> Egress; + std::map<ui32, TAsyncBufferStats> Input; + std::map<ui32, TAsyncBufferStats> Output; + + std::vector<ui64> MaxMemoryUsage; + + void Resize(ui32 taskCount); + void UpdateAsyncStats(i32 index, TAsyncStats& aggrAsyncStats, const NYql::NDqProto::TDqAsyncBufferStats& asyncStats); + void UpdateStats(const NYql::NDqProto::TDqTaskStats& taskStats, ui64 maxMemoryUsage); +}; + struct TQueryExecutionStats { private: std::map<ui32, std::map<ui32, ui32>> ShardsCountByNode; std::map<ui32, bool> UseLlvmByStageId; + std::map<ui32, TStageExecutionStats> StageStats; public: const Ydb::Table::QueryStatsCollection::Mode StatsMode; const TKqpTasksGraph* const TasksGraph = nullptr; @@ -70,6 +154,9 @@ public: TDuration collectLongTaskStatsTimeout = TDuration::Max() ); + void UpdateTaskStats(ui64 taskId, const NYql::NDqProto::TDqComputeActorStats& stats); + void ExportExecStats(NYql::NDqProto::TDqExecutionStats& stats); + void Finish(); private: diff --git a/ydb/core/kqp/gateway/kqp_gateway.h b/ydb/core/kqp/gateway/kqp_gateway.h index 320e00e958..68f69929b0 100644 --- a/ydb/core/kqp/gateway/kqp_gateway.h +++ b/ydb/core/kqp/gateway/kqp_gateway.h @@ -139,6 +139,7 @@ public: ui64 MkqlMemoryLimit = 0; // old engine compatibility ui64 PerShardKeysSizeLimitBytes = 0; Ydb::Table::QueryStatsCollection::Mode StatsMode = Ydb::Table::QueryStatsCollection::STATS_COLLECTION_NONE; + TDuration ProgressStatsPeriod; TKqpSnapshot Snapshot = TKqpSnapshot(); NKikimrKqp::EIsolationLevel IsolationLevel = NKikimrKqp::ISOLATION_LEVEL_UNDEFINED; TMaybe<NKikimrKqp::TRlPath> RlPath; diff --git a/ydb/core/kqp/proxy_service/kqp_proxy_service.cpp b/ydb/core/kqp/proxy_service/kqp_proxy_service.cpp index 32b2fe4ce1..cef3ffa7a3 100644 --- a/ydb/core/kqp/proxy_service/kqp_proxy_service.cpp +++ b/ydb/core/kqp/proxy_service/kqp_proxy_service.cpp @@ -14,6 +14,7 @@ #include <ydb/core/kqp/common/kqp_lwtrace_probes.h> #include <ydb/core/kqp/common/kqp_timeouts.h> #include <ydb/core/kqp/compile_service/kqp_compile_service.h> +#include <ydb/core/kqp/executer_actor/kqp_executer.h> #include <ydb/core/kqp/session_actor/kqp_worker_common.h> #include <ydb/core/kqp/node_service/kqp_node_service.h> #include <ydb/library/yql/dq/actors/spilling/spilling_file.h> @@ -606,6 +607,11 @@ public: } const TString& sessionId = ev->Get()->GetSessionId(); + + if (!ev->Get()->GetUserRequestContext()) { + ev->Get()->SetUserRequestContext(MakeIntrusive<TUserRequestContext>(traceId, database, sessionId)); + } + const TKqpSessionInfo* sessionInfo = LocalSessions->FindPtr(sessionId); auto dbCounters = sessionInfo ? sessionInfo->DbCounters : nullptr; if (!dbCounters) { @@ -650,11 +656,11 @@ public: if (cancelAfter) { timerDuration = Min(timerDuration, cancelAfter); } - KQP_PROXY_LOG_D(TKqpRequestInfo(traceId, sessionId) << "TEvQueryRequest, set timer for: " << timerDuration << " timeout: " << timeout << " cancelAfter: " << cancelAfter); + KQP_PROXY_LOG_D("Ctx: " << *ev->Get()->GetUserRequestContext() << ". TEvQueryRequest, set timer for: " << timerDuration + << " timeout: " << timeout << " cancelAfter: " << cancelAfter + << ". " << "Send request to target, requestId: " << requestId << ", targetId: " << targetId); auto status = timerDuration == cancelAfter ? NYql::NDqProto::StatusIds::CANCELLED : NYql::NDqProto::StatusIds::TIMEOUT; StartQueryTimeout(requestId, timerDuration, status); - KQP_PROXY_LOG_D("Sent request to target, requestId: " << requestId - << ", targetId: " << targetId << ", sessionId: " << sessionId); Send(targetId, ev->Release().Release(), IEventHandle::FlagTrackDelivery, requestId); } @@ -838,6 +844,22 @@ public: PendingRequests.Erase(requestId); } + void ForwardProgress(TEvKqpExecuter::TEvExecuterProgress::TPtr& ev) { + ui64 requestId = ev->Cookie; + + auto proxyRequest = PendingRequests.FindPtr(requestId); + if (!proxyRequest) { + KQP_PROXY_LOG_E("Unknown sender for proxy response, requestId: " << requestId); + return; + } + + Send(proxyRequest->Sender, ev->Release().Release(), 0, proxyRequest->SenderCookie); + + TKqpRequestInfo requestInfo(proxyRequest->TraceId); + KQP_PROXY_LOG_D(requestInfo << "Forwarded response to sender actor, requestId: " << requestId + << ", sender: " << proxyRequest->Sender << ", selfId: " << SelfId() << ", source: " << ev->Sender); + } + void LookupPeerProxyData() { if (!SelfDataCenterId || BoardLookupActor || AppData()->TenantName.empty()) { return; @@ -1246,6 +1268,7 @@ public: hFunc(TEvKqp::TEvScriptRequest, Handle); hFunc(TEvKqp::TEvCloseSessionRequest, Handle); hFunc(TEvKqp::TEvQueryResponse, ForwardEvent); + hFunc(TEvKqpExecuter::TEvExecuterProgress, ForwardProgress); hFunc(TEvKqp::TEvProcessResponse, Handle); hFunc(TEvKqp::TEvCreateSessionRequest, Handle); hFunc(TEvKqp::TEvPingSessionRequest, Handle); diff --git a/ydb/core/kqp/proxy_service/kqp_script_executions.cpp b/ydb/core/kqp/proxy_service/kqp_script_executions.cpp index 16101529d4..2d18aeb97b 100644 --- a/ydb/core/kqp/proxy_service/kqp_script_executions.cpp +++ b/ydb/core/kqp/proxy_service/kqp_script_executions.cpp @@ -1735,6 +1735,11 @@ public: ) void Handle(TEvSaveScriptResultMetaFinished::TPtr& ev) { + if (ev->Get()->Status == Ydb::StatusIds::ABORTED) { + Register(new TSaveScriptExecutionResultMetaQuery(Database, ExecutionId, SerializedMetas)); + return; + } + Send(ev->Forward(ReplyActorId)); PassAway(); } @@ -2698,6 +2703,54 @@ private: NYql::TIssues OperationIssues; }; +class TScriptProgressActor : public TQueryBase { +public: + TScriptProgressActor(const TString& database, const TString& executionId, const TString& queryPlan, const TString&) + : Database(database), ExecutionId(executionId), QueryPlan(queryPlan) + { + KQP_PROXY_LOG_D(queryPlan); + } + + void OnRunQuery() override { + TString sql = R"( + -- TScriptProgressActor::OnRunQuery + DECLARE $execution_id AS Text; + DECLARE $database AS Text; + DECLARE $plan AS JsonDocument; + + UPSERT INTO `.metadata/script_executions` (execution_id, database, plan) + VALUES ($execution_id, $database, $plan); + )"; + + NYdb::TParamsBuilder params; + params + .AddParam("$execution_id") + .Utf8(ExecutionId) + .Build() + .AddParam("$database") + .Utf8(Database) + .Build() + .AddParam("$plan") + .JsonDocument(QueryPlan) + .Build(); + + RunDataQuery(sql, ¶ms); + } + + void OnQueryResult() override { + Finish(); + } + + void OnFinish(Ydb::StatusIds::StatusCode, NYql::TIssues&&) override { + } + +private: + TString Database; + TString ExecutionId; + TString QueryPlan; +}; + + } // anonymous namespace NActors::IActor* CreateScriptExecutionCreatorActor(TEvKqp::TEvScriptRequest::TPtr&& ev, const NKikimrConfig::TQueryServiceConfig& queryServiceConfig, TIntrusivePtr<TKqpCounters> counters, TDuration maxRunTime) { @@ -2752,6 +2805,10 @@ NActors::IActor* CreateScriptFinalizationFinisherActor(const TString& executionI return new TScriptFinalizationFinisherActor(executionId, database, operationStatus, std::move(operationIssues)); } +NActors::IActor* CreateScriptProgressActor(const TString& executionId, const TString& database, const TString& queryPlan, const TString& queryStats) { + return new TScriptProgressActor(database, executionId, queryPlan, queryStats); +} + namespace NPrivate { NActors::IActor* CreateCreateScriptOperationQueryActor(const TString& executionId, const NActors::TActorId& runScriptActorId, const NKikimrKqp::TEvQueryRequest& record, TDuration operationTtl, TDuration resultsTtl, TDuration leaseDuration) { diff --git a/ydb/core/kqp/proxy_service/kqp_script_executions.h b/ydb/core/kqp/proxy_service/kqp_script_executions.h index 0ca77c17fd..5781046a1d 100644 --- a/ydb/core/kqp/proxy_service/kqp_script_executions.h +++ b/ydb/core/kqp/proxy_service/kqp_script_executions.h @@ -35,5 +35,6 @@ NActors::IActor* CreateGetScriptExecutionResultActor(const NActors::TActorId& re NActors::IActor* CreateSaveScriptExternalEffectActor(TEvSaveScriptExternalEffectRequest::TPtr ev); NActors::IActor* CreateSaveScriptFinalStatusActor(TEvScriptFinalizeRequest::TPtr ev); NActors::IActor* CreateScriptFinalizationFinisherActor(const TString& executionId, const TString& database, std::optional<Ydb::StatusIds::StatusCode> operationStatus, NYql::TIssues operationIssues); +NActors::IActor* CreateScriptProgressActor(const TString& executionId, const TString& database, const TString& queryPlan, const TString& queryStats); } // namespace NKikimr::NKqp diff --git a/ydb/core/kqp/run_script_actor/kqp_run_script_actor.cpp b/ydb/core/kqp/run_script_actor/kqp_run_script_actor.cpp index eb2c4f9748..71c15790a6 100644 --- a/ydb/core/kqp/run_script_actor/kqp_run_script_actor.cpp +++ b/ydb/core/kqp/run_script_actor/kqp_run_script_actor.cpp @@ -20,11 +20,11 @@ #include <forward_list> -#define LOG_T(stream) LOG_TRACE_S(NActors::TActivationContext::AsActorContext(), NKikimrServices::KQP_EXECUTER, SelfId() << " " << stream); -#define LOG_D(stream) LOG_DEBUG_S(NActors::TActivationContext::AsActorContext(), NKikimrServices::KQP_EXECUTER, SelfId() << " " << stream); -#define LOG_I(stream) LOG_INFO_S(NActors::TActivationContext::AsActorContext(), NKikimrServices::KQP_EXECUTER, SelfId() << " " << stream); -#define LOG_W(stream) LOG_WARN_S(NActors::TActivationContext::AsActorContext(), NKikimrServices::KQP_EXECUTER, SelfId() << " " << stream); -#define LOG_E(stream) LOG_ERROR_S(NActors::TActivationContext::AsActorContext(), NKikimrServices::KQP_EXECUTER, SelfId() << " " << stream); +#define LOG_T(stream) LOG_TRACE_S(NActors::TActivationContext::AsActorContext(), NKikimrServices::KQP_EXECUTER, "TRunScriptActor " << SelfId() << ". " << "Ctx: " << *UserRequestContext << ". " << stream); +#define LOG_D(stream) LOG_DEBUG_S(NActors::TActivationContext::AsActorContext(), NKikimrServices::KQP_EXECUTER, "TRunScriptActor " << SelfId() << ". " << "Ctx: " << *UserRequestContext << ". " << stream); +#define LOG_I(stream) LOG_INFO_S (NActors::TActivationContext::AsActorContext(), NKikimrServices::KQP_EXECUTER, "TRunScriptActor " << SelfId() << ". " << "Ctx: " << *UserRequestContext << ". " << stream); +#define LOG_W(stream) LOG_WARN_S (NActors::TActivationContext::AsActorContext(), NKikimrServices::KQP_EXECUTER, "TRunScriptActor " << SelfId() << ". " << "Ctx: " << *UserRequestContext << ". " << stream); +#define LOG_E(stream) LOG_ERROR_S(NActors::TActivationContext::AsActorContext(), NKikimrServices::KQP_EXECUTER, "TRunScriptActor " << SelfId() << ". " << "Ctx: " << *UserRequestContext << ". " << stream); namespace NKikimr::NKqp { @@ -67,7 +67,9 @@ public: , ResultsTtl(resultsTtl) , QueryServiceConfig(queryServiceConfig) , Counters(counters) - {} + { + UserRequestContext = MakeIntrusive<TUserRequestContext>(Request.GetTraceId(), Database, "", ExecutionId, Request.GetTraceId()); + } static constexpr char ActorName[] = "KQP_RUN_SCRIPT_ACTOR"; @@ -80,6 +82,7 @@ private: hFunc(NActors::TEvents::TEvWakeup, Handle); hFunc(NActors::TEvents::TEvPoison, Handle); hFunc(TEvKqpExecuter::TEvStreamData, Handle); + hFunc(TEvKqpExecuter::TEvExecuterProgress, Handle); hFunc(TEvKqp::TEvQueryResponse, Handle); hFunc(TEvKqp::TEvCreateSessionResponse, Handle); IgnoreFunc(TEvKqp::TEvCloseSessionResponse); @@ -116,6 +119,7 @@ private: } } else { SessionId = resp.GetResponse().GetSessionId(); + UserRequestContext->SessionId = SessionId; if (RunState == ERunState::Running) { Start(); @@ -139,10 +143,14 @@ private: auto ev = MakeHolder<TEvKqp::TEvQueryRequest>(); ev->Record = Request; ev->Record.MutableRequest()->SetSessionId(SessionId); - ev->SetUserRequestContext(MakeIntrusive<TUserRequestContext>(Request.GetTraceId(), Database, SessionId, ExecutionId, Request.GetTraceId())); + ev->SetUserRequestContext(UserRequestContext); + if (ev->Record.GetRequest().GetCollectStats() >= Ydb::Table::QueryStatsCollection::STATS_COLLECTION_FULL) { + ev->SetProgressStatsPeriod(TDuration::MilliSeconds(QueryServiceConfig.GetProgressStatsPeriodMs())); + } NActors::ActorIdToProto(SelfId(), ev->Record.MutableRequestActorId()); + LOG_I("Start Script Execution"); SendToKqpProxy(std::move(ev)); } @@ -376,6 +384,12 @@ private: ); } + void Handle(TEvKqpExecuter::TEvExecuterProgress::TPtr& ev) { + Register( + CreateScriptProgressActor(ExecutionId, Database, ev->Get()->Record.GetQueryPlan(), "") + ); + } + void Handle(TEvKqp::TEvQueryResponse::TPtr& ev) { if (RunState != ERunState::Running) { return; @@ -582,6 +596,7 @@ private: std::optional<TString> QueryPlan; std::optional<TString> QueryAst; std::optional<NKqpProto::TKqpStatsQuery> QueryStats; + TIntrusivePtr<TUserRequestContext> UserRequestContext; }; } // namespace diff --git a/ydb/core/kqp/session_actor/kqp_query_state.h b/ydb/core/kqp/session_actor/kqp_query_state.h index 68fa21228e..60970b2588 100644 --- a/ydb/core/kqp/session_actor/kqp_query_state.h +++ b/ydb/core/kqp/session_actor/kqp_query_state.h @@ -439,6 +439,10 @@ public: return RequestEv->GetCollectDiagnostics(); } + TDuration GetProgressStatsPeriod() { + return RequestEv->GetProgressStatsPeriod(); + } + //// Topic ops //// void AddOffsetsToTransaction(); bool TryMergeTopicOffsets(const NTopic::TTopicOperations &operations, TString& message); diff --git a/ydb/core/kqp/session_actor/kqp_session_actor.cpp b/ydb/core/kqp/session_actor/kqp_session_actor.cpp index 3855bd9258..3fd7c1e63f 100644 --- a/ydb/core/kqp/session_actor/kqp_session_actor.cpp +++ b/ydb/core/kqp/session_actor/kqp_session_actor.cpp @@ -754,6 +754,7 @@ public: } request.StatsMode = queryState->GetStatsMode(); + request.ProgressStatsPeriod = queryState->GetProgressStatsPeriod(); } const auto& limits = GetQueryLimits(Settings); @@ -1142,6 +1143,28 @@ public: ProcessExecuterResult(ev->Get()); } + void HandleExecute(TEvKqpExecuter::TEvExecuterProgress::TPtr& ev) { + if (QueryState && QueryState->RequestActorId) { + if (ExecuterId != ev->Sender) { + return; + } + + if (QueryState->ReportStats()) { + if (QueryState->GetStatsMode() >= Ydb::Table::QueryStatsCollection::STATS_COLLECTION_FULL) { + NKqpProto::TKqpStatsQuery& stats = *ev->Get()->Record.MutableQueryStats(); + NKqpProto::TKqpStatsQuery executionStats; + executionStats.Swap(&stats); + stats = QueryState->Stats; + stats.MutableExecutions()->MergeFrom(executionStats.GetExecutions()); + ev->Get()->Record.SetQueryPlan(SerializeAnalyzePlan(stats)); + } + } + + LOG_D("Forwarded TEvExecuterProgress to " << QueryState->RequestActorId); + Send(QueryState->RequestActorId, ev->Release().Release(), 0, QueryState->ProxyRequestId); + } + } + std::optional<TKqpTempTablesState::TTempTableInfo> GetTemporaryTableInfo(TKqpPhyTxHolder::TConstPtr tx) { if (!tx) { return std::nullopt; @@ -1965,6 +1988,7 @@ public: // forgotten messages from previous aborted request hFunc(TEvKqp::TEvCompileResponse, HandleNoop); hFunc(TEvKqpExecuter::TEvTxResponse, HandleNoop); + hFunc(TEvKqpExecuter::TEvExecuterProgress, HandleNoop) hFunc(TEvTxProxySchemeCache::TEvNavigateKeySetResult, HandleNoop); hFunc(TEvents::TEvUndelivered, HandleNoop); // message from KQP proxy in case of our reply just after kqp proxy timer tick @@ -1992,6 +2016,7 @@ public: hFunc(TEvTxUserProxy::TEvAllocateTxIdResult, Handle); hFunc(TEvKqpExecuter::TEvTxResponse, HandleExecute); + hFunc(TEvKqpExecuter::TEvExecuterProgress, HandleExecute) hFunc(TEvKqpExecuter::TEvStreamData, HandleExecute); hFunc(TEvKqpExecuter::TEvStreamDataAck, HandleExecute); @@ -2045,6 +2070,7 @@ public: // always come from WorkerActor hFunc(TEvKqp::TEvCloseSessionResponse, HandleCleanup); hFunc(TEvKqp::TEvQueryResponse, HandleNoop); + hFunc(TEvKqpExecuter::TEvExecuterProgress, HandleNoop) default: UnexpectedEvent("CleanupState", ev); } diff --git a/ydb/core/protos/config.proto b/ydb/core/protos/config.proto index 29a08e0c98..5dc2e109b5 100644 --- a/ydb/core/protos/config.proto +++ b/ydb/core/protos/config.proto @@ -1040,6 +1040,7 @@ message TQueryServiceConfig { optional bool MdbTransformHost = 10; optional NYql.TGenericGatewayConfig Generic = 11; optional TFinalizeScriptServiceConfig FinalizeScriptServiceConfig = 12; + optional uint64 ProgressStatsPeriodMs = 14 [default = 0]; // 0 = disabled } // Config describes immediate controls and allows diff --git a/ydb/core/protos/console_config.proto b/ydb/core/protos/console_config.proto index 9e48803547..61daa66137 100644 --- a/ydb/core/protos/console_config.proto +++ b/ydb/core/protos/console_config.proto @@ -88,6 +88,7 @@ message TConfigItem { MemoryLogConfigItem = 19; GRpcConfigItem = 20; DynamicNameserviceConfigItem = 22; + DynamicNodeConfigItem = 24; CmsConfigItem = 25; FeatureFlagsItem = 26; @@ -99,6 +100,7 @@ message TConfigItem { ConfigsDispatcherConfigItem = 33; TableProfilesConfigItem = 34; KeyConfigItem = 35; + PDiskKeyConfigItem = 51; NodeBrokerConfigItem = 36; TableServiceConfigItem = 37; SharedCacheConfigItem = 38; @@ -111,20 +113,24 @@ message TConfigItem { MeteringConfigItem = 45; HiveConfigItem = 46; DataShardConfigItem = 49; - FederatedQueryConfigItem = 50; - PDiskKeyConfigItem = 51; + FederatedQueryConfigItem = 58; CompactionConfigItem = 52; HttpProxyConfigItem = 53; SchemeShardConfigItem = 54; - ClientCertificateAuthorizationConfigItem = 55; + TracingConfigItem = 55; + FailureInjectionConfigItem = 56; + PublicHttpConfigItem = 57; MetadataProviderConfigItem = 59; BackgroundTasksConfigItem = 60; + AuditConfigItem = 61; + ClientCertificateAuthorizationItem = 62; ExternalIndexConfigItem = 63; YamlConfigEnabledItem = 64; ScanConveyorConfigItem = 65; ColumnShardConfigItem = 66; + LocalPgWireConfigItem = 69; AwsCompatibilityConfigItem = 70; - KafkaProxyConfig = 71; + KafkaProxyConfigItem = 71; CompConveyorConfigItem = 72; QueryServiceConfigItem = 73; InsertConveyorConfigItem = 74; diff --git a/ydb/core/protos/kqp.proto b/ydb/core/protos/kqp.proto index 1b42944578..30e6c740aa 100644 --- a/ydb/core/protos/kqp.proto +++ b/ydb/core/protos/kqp.proto @@ -432,8 +432,12 @@ message TEvExecuterStreamProfile { optional NYql.NDqProto.TDqExecutionStats Profile = 2; }; +// 1. Executer fills progress stats from it's own execution +// 2. Session adds stats from early finished executions and builds complete plan message TEvExecuterProgress { optional NActorsProto.TActorId ExecuterActorId = 1; + optional string QueryPlan = 2; + optional NKqpProto.TKqpStatsQuery QueryStats = 3; }; message TKqpProxyNodeResources { diff --git a/ydb/library/yql/sql/pg/pg_sql.cpp b/ydb/library/yql/sql/pg/pg_sql.cpp index 3e8a1acb61..d2969e621d 100644 --- a/ydb/library/yql/sql/pg/pg_sql.cpp +++ b/ydb/library/yql/sql/pg/pg_sql.cpp @@ -2394,7 +2394,7 @@ public: return {}; } - const auto cluster = !schemaname.Empty() ? schemaname : Settings.DefaultCluster; + const auto cluster = !schemaname.Empty() && schemaname != "public" ? schemaname : Settings.DefaultCluster; const auto sinkOrSource = BuildClusterSinkOrSourceExpression(isSink, cluster); const auto key = BuildTableKeyExpression(relname, isScheme); return {sinkOrSource, key}; @@ -2421,7 +2421,7 @@ public: return {}; } - const auto cluster = !schemaname.Empty() ? schemaname : Settings.DefaultCluster; + const auto cluster = !schemaname.Empty() && schemaname != "public" ? schemaname : Settings.DefaultCluster; const auto sinkOrSource = BuildClusterSinkOrSourceExpression(true, cluster); const auto key = BuildPgObjectExpression(objectName, pgObjectType); return {sinkOrSource, key}; diff --git a/ydb/library/yql/sql/pg/pg_sql_ut.cpp b/ydb/library/yql/sql/pg/pg_sql_ut.cpp index ce021378e3..fe86027f85 100644 --- a/ydb/library/yql/sql/pg/pg_sql_ut.cpp +++ b/ydb/library/yql/sql/pg/pg_sql_ut.cpp @@ -377,12 +377,22 @@ Y_UNIT_TEST_SUITE(PgSqlParsingOnly) { } Y_UNIT_TEST(DropTableUnknownClusterStmt) { - auto res = PgSqlToYql("drop table if exists public.t"); + auto res = PgSqlToYql("drop table if exists pub.t"); UNIT_ASSERT(!res.IsOk()); UNIT_ASSERT_EQUAL(res.Issues.Size(), 1); auto issue = *(res.Issues.begin()); - UNIT_ASSERT_C(issue.GetMessage().find("Unknown cluster: public") != TString::npos, res.Issues.ToString()); + UNIT_ASSERT_C(issue.GetMessage().find("Unknown cluster: pub") != TString::npos, res.Issues.ToString()); + } + + Y_UNIT_TEST(PublicSchemeRemove) { + auto res = PgSqlToYql("DROP TABLE IF EXISTS public.t; CREATE TABLE public.t(id INT PRIMARY KEY, foo INT);\ +INSERT INTO public.t VALUES(1, 2);\ +UPDATE public.t SET foo = 3 WHERE id == 1;\ +DELETE FROM public.t WHERE id == 1;\ +SELECT COUNT(*) FROM public.t;"); + UNIT_ASSERT(res.IsOk()); + UNIT_ASSERT(res.Root->ToString().find("public") == TString::npos); } Y_UNIT_TEST(UpdateStmt) { diff --git a/ydb/tests/tools/fq_runner/kikimr_runner.py b/ydb/tests/tools/fq_runner/kikimr_runner.py index a7ab18fd0b..a14fcdf4cd 100644 --- a/ydb/tests/tools/fq_runner/kikimr_runner.py +++ b/ydb/tests/tools/fq_runner/kikimr_runner.py @@ -600,6 +600,7 @@ class StreamingOverKikimr(object): self.wd = yatest.common.output_path("yq_" + self.uuid) os.mkdir(self.wd) self.fill_config() + self.compute_plane.qs_config['progress_stats_period_ms'] = 1 driver_config = ydb.DriverConfig(os.getenv("YDB_ENDPOINT"), os.getenv("YDB_DATABASE")) self.driver = ydb.Driver(driver_config) try: |