diff options
author | robot-piglet <robot-piglet@yandex-team.com> | 2023-12-04 12:08:11 +0300 |
---|---|---|
committer | robot-piglet <robot-piglet@yandex-team.com> | 2023-12-04 15:27:02 +0300 |
commit | f37c4c9349e2c8c433d52d4be744cf5dc9aa7909 (patch) | |
tree | 6b05033f48f4bf41a995f6a80a1f0c08fed62c13 | |
parent | 2a718325637e5302334b6d0a6430f63168f8dbb3 (diff) | |
download | ydb-f37c4c9349e2c8c433d52d4be744cf5dc9aa7909.tar.gz |
Intermediate changes
38 files changed, 206 insertions, 136 deletions
diff --git a/.gitignore b/.gitignore index 3ea9eaf681..e8dae2178d 100644 --- a/.gitignore +++ b/.gitignore @@ -8,8 +8,9 @@ # Unignore all files inside canondata dir !*/canondata/* -# Compiled dynamic C libraries +# C libraries *.so +*.a # Byte-compiled / optimized / DLL files __pycache__/ diff --git a/build/external_resources/ymake/public.resources.json b/build/external_resources/ymake/public.resources.json index 8564591c37..0f15409c4d 100644 --- a/build/external_resources/ymake/public.resources.json +++ b/build/external_resources/ymake/public.resources.json @@ -1,19 +1,19 @@ { "by_platform": { "darwin": { - "uri": "sbr:5443071926" + "uri": "sbr:5476908047" }, "darwin-arm64": { - "uri": "sbr:5443066470" + "uri": "sbr:5476891477" }, "linux": { - "uri": "sbr:5443068824" + "uri": "sbr:5476896849" }, "linux-aarch64": { - "uri": "sbr:5443067977" + "uri": "sbr:5476895322" }, "win32-clang-cl": { - "uri": "sbr:5443068659" + "uri": "sbr:5476896707" } } } diff --git a/build/external_resources/ymake/resources.json b/build/external_resources/ymake/resources.json index 41d5029dd6..9f55612913 100644 --- a/build/external_resources/ymake/resources.json +++ b/build/external_resources/ymake/resources.json @@ -1,19 +1,19 @@ { "by_platform": { "darwin": { - "uri": "sbr:5443051526" + "uri": "sbr:5476872784" }, "darwin-arm64": { - "uri": "sbr:5443049933" + "uri": "sbr:5476874982" }, "linux": { - "uri": "sbr:5443071824" + "uri": "sbr:5476877550" }, "linux-aarch64": { - "uri": "sbr:5443056096" + "uri": "sbr:5476883701" }, "win32-clang-cl": { - "uri": "sbr:5443077529" + "uri": "sbr:5476894202" } } } diff --git a/build/mapping.conf.json b/build/mapping.conf.json index 018bc9da7e..2d337dca71 100644 --- a/build/mapping.conf.json +++ b/build/mapping.conf.json @@ -85,11 +85,11 @@ "5433628204": "https://devtools-registry.s3.yandex.net/5433628204", "5433626802": "https://devtools-registry.s3.yandex.net/5433626802", "5433622361": "https://devtools-registry.s3.yandex.net/5433622361", - "5443071926": "https://devtools-registry.s3.yandex.net/5443071926", - "5443066470": "https://devtools-registry.s3.yandex.net/5443066470", - "5443068824": "https://devtools-registry.s3.yandex.net/5443068824", - "5443067977": "https://devtools-registry.s3.yandex.net/5443067977", - "5443068659": "https://devtools-registry.s3.yandex.net/5443068659", + "5476908047": "https://devtools-registry.s3.yandex.net/5476908047", + "5476891477": "https://devtools-registry.s3.yandex.net/5476891477", + "5476896849": "https://devtools-registry.s3.yandex.net/5476896849", + "5476895322": "https://devtools-registry.s3.yandex.net/5476895322", + "5476896707": "https://devtools-registry.s3.yandex.net/5476896707", "2980468199": "https://devtools-registry.s3.yandex.net/2980468199" }, "resources_descriptions": { @@ -177,11 +177,11 @@ "5433628204": "yexport for darwin", "5433626802": "yexport for darwin-arm64", "5433622361": "yexport for linux", - "5443071926": "ymake for darwin", - "5443066470": "ymake for darwin-arm64", - "5443068824": "ymake for linux", - "5443067977": "ymake for linux-aarch64", - "5443068659": "ymake.exe for win32-clang-cl", + "5476908047": "ymake for darwin", + "5476891477": "ymake for darwin-arm64", + "5476896849": "ymake for linux", + "5476895322": "ymake for linux-aarch64", + "5476896707": "ymake.exe for win32-clang-cl", "2980468199": "ytexec for linux" }, "tasks": {} diff --git a/build/platform/lld/ya.make b/build/platform/lld/ya.make index ddf3a9fb36..4c182c0784 100644 --- a/build/platform/lld/ya.make +++ b/build/platform/lld/ya.make @@ -2,11 +2,7 @@ RESOURCES_LIBRARY() LICENSE(Service-Prebuilt-Tool) -IF (CLANG16) - DEFAULT(LLD_VERSION 16) -ELSE() - DEFAULT(LLD_VERSION 14) -ENDIF() +DEFAULT(LLD_VERSION ${CLANG_VER}) IF (LLD_VERSION == 14) DECLARE_EXTERNAL_HOST_RESOURCES_BUNDLE_BY_JSON(LLD_ROOT lld14.json) diff --git a/contrib/go/_std_1.21/src/internal/goexperiment/exp_coverageredesign_off.go b/contrib/go/_std_1.21/src/internal/goexperiment/exp_coverageredesign_off.go new file mode 100644 index 0000000000..95d3a6c4ae --- /dev/null +++ b/contrib/go/_std_1.21/src/internal/goexperiment/exp_coverageredesign_off.go @@ -0,0 +1,9 @@ +// Code generated by mkconsts.go. DO NOT EDIT. + +//go:build !goexperiment.coverageredesign +// +build !goexperiment.coverageredesign + +package goexperiment + +const CoverageRedesign = false +const CoverageRedesignInt = 0 diff --git a/contrib/go/_std_1.21/src/internal/goexperiment/exp_coverageredesign_on.go b/contrib/go/_std_1.21/src/internal/goexperiment/exp_coverageredesign_on.go deleted file mode 100644 index 330a234f20..0000000000 --- a/contrib/go/_std_1.21/src/internal/goexperiment/exp_coverageredesign_on.go +++ /dev/null @@ -1,9 +0,0 @@ -// Code generated by mkconsts.go. DO NOT EDIT. - -//go:build goexperiment.coverageredesign -// +build goexperiment.coverageredesign - -package goexperiment - -const CoverageRedesign = true -const CoverageRedesignInt = 1 diff --git a/contrib/go/_std_1.21/src/internal/goexperiment/ya.make b/contrib/go/_std_1.21/src/internal/goexperiment/ya.make index 8b59c0307f..b50720d13d 100644 --- a/contrib/go/_std_1.21/src/internal/goexperiment/ya.make +++ b/contrib/go/_std_1.21/src/internal/goexperiment/ya.make @@ -5,7 +5,7 @@ SRCS( exp_boringcrypto_off.go exp_cacheprog_off.go exp_cgocheck2_off.go - exp_coverageredesign_on.go + exp_coverageredesign_off.go exp_fieldtrack_off.go exp_heapminimum512kib_off.go exp_loopvar_off.go diff --git a/contrib/libs/cxxsupp/libcxx/include/__memory/shared_ptr.h b/contrib/libs/cxxsupp/libcxx/include/__memory/shared_ptr.h index 3d73bfaafa..8b7256d038 100644 --- a/contrib/libs/cxxsupp/libcxx/include/__memory/shared_ptr.h +++ b/contrib/libs/cxxsupp/libcxx/include/__memory/shared_ptr.h @@ -35,7 +35,7 @@ #include <stdexcept> #include <type_traits> #include <typeinfo> -#if !defined(_LIBCPP_HAS_NO_THREADS) // !defined(_LIBCPP_HAS_NO_ATOMIC_HEADER) +#if !defined(_LIBCPP_HAS_NO_ATOMIC_HEADER) # include <atomic> #endif diff --git a/contrib/libs/cxxsupp/libcxx/include/__random/log2.h b/contrib/libs/cxxsupp/libcxx/include/__random/log2.h index f4f837764c..b077d211ce 100644 --- a/contrib/libs/cxxsupp/libcxx/include/__random/log2.h +++ b/contrib/libs/cxxsupp/libcxx/include/__random/log2.h @@ -11,7 +11,6 @@ #include <__config> #include <cstddef> -#include <limits> #include <type_traits> #if !defined(_LIBCPP_HAS_NO_PRAGMA_SYSTEM_HEADER) diff --git a/contrib/libs/cxxsupp/libcxx/include/__support/win32/limits_msvc_win32.h b/contrib/libs/cxxsupp/libcxx/include/__support/win32/limits_msvc_win32.h index 9f693d9fa4..87e4e7db66 100644 --- a/contrib/libs/cxxsupp/libcxx/include/__support/win32/limits_msvc_win32.h +++ b/contrib/libs/cxxsupp/libcxx/include/__support/win32/limits_msvc_win32.h @@ -20,6 +20,7 @@ #include <float.h> // limit constants #include <limits.h> // CHAR_BIT #include <math.h> // HUGE_VAL +#include <ymath.h> // internal MSVC header providing the needed functionality #define __CHAR_BIT__ CHAR_BIT @@ -63,8 +64,8 @@ #define __LDBL_DENORM_MIN__ 3.64519953188247460253e-4951L // __builtin replacements/workarounds -#define __builtin_huge_vall() ((long double)__builtin_huge_val()) -#define __builtin_nanl(__dummy) ((long double)__builtin_nan(__dummy)) -#define __builtin_nansl(__dummy) ((long double)__builtin_nans(__dummy)) +#define __builtin_huge_vall() _LInf._Long_double +#define __builtin_nanl(__dummmy) _LNan._Long_double +#define __builtin_nansl(__dummy) _LSnan._Long_double #endif // _LIBCPP_SUPPORT_WIN32_LIMITS_MSVC_WIN32_H diff --git a/contrib/libs/cxxsupp/libcxx/include/__tuple b/contrib/libs/cxxsupp/libcxx/include/__tuple index 38a22dbaaf..2b399f30e1 100644 --- a/contrib/libs/cxxsupp/libcxx/include/__tuple +++ b/contrib/libs/cxxsupp/libcxx/include/__tuple @@ -21,36 +21,36 @@ _LIBCPP_BEGIN_NAMESPACE_STD -template <class _Tp> class _LIBCPP_TEMPLATE_VIS tuple_size; +template <class _Tp> struct _LIBCPP_TEMPLATE_VIS tuple_size; #if !defined(_LIBCPP_CXX03_LANG) template <class _Tp, class...> using __enable_if_tuple_size_imp = _Tp; template <class _Tp> -class _LIBCPP_TEMPLATE_VIS tuple_size<__enable_if_tuple_size_imp< +struct _LIBCPP_TEMPLATE_VIS tuple_size<__enable_if_tuple_size_imp< const _Tp, typename enable_if<!is_volatile<_Tp>::value>::type, integral_constant<size_t, sizeof(tuple_size<_Tp>)>>> : public integral_constant<size_t, tuple_size<_Tp>::value> {}; template <class _Tp> -class _LIBCPP_TEMPLATE_VIS tuple_size<__enable_if_tuple_size_imp< +struct _LIBCPP_TEMPLATE_VIS tuple_size<__enable_if_tuple_size_imp< volatile _Tp, typename enable_if<!is_const<_Tp>::value>::type, integral_constant<size_t, sizeof(tuple_size<_Tp>)>>> : public integral_constant<size_t, tuple_size<_Tp>::value> {}; template <class _Tp> -class _LIBCPP_TEMPLATE_VIS tuple_size<__enable_if_tuple_size_imp< +struct _LIBCPP_TEMPLATE_VIS tuple_size<__enable_if_tuple_size_imp< const volatile _Tp, integral_constant<size_t, sizeof(tuple_size<_Tp>)>>> : public integral_constant<size_t, tuple_size<_Tp>::value> {}; #else -template <class _Tp> class _LIBCPP_TEMPLATE_VIS tuple_size<const _Tp> : public tuple_size<_Tp> {}; -template <class _Tp> class _LIBCPP_TEMPLATE_VIS tuple_size<volatile _Tp> : public tuple_size<_Tp> {}; -template <class _Tp> class _LIBCPP_TEMPLATE_VIS tuple_size<const volatile _Tp> : public tuple_size<_Tp> {}; +template <class _Tp> struct _LIBCPP_TEMPLATE_VIS tuple_size<const _Tp> : public tuple_size<_Tp> {}; +template <class _Tp> struct _LIBCPP_TEMPLATE_VIS tuple_size<volatile _Tp> : public tuple_size<_Tp> {}; +template <class _Tp> struct _LIBCPP_TEMPLATE_VIS tuple_size<const volatile _Tp> : public tuple_size<_Tp> {}; #endif template <size_t _Ip, class _Tp> struct _LIBCPP_TEMPLATE_VIS tuple_element; @@ -160,7 +160,7 @@ template <class ..._Tp> class _LIBCPP_TEMPLATE_VIS tuple; template <class... _Tp> struct __tuple_like<tuple<_Tp...> > : true_type {}; template <class ..._Tp> -class _LIBCPP_TEMPLATE_VIS tuple_size<tuple<_Tp...> > +struct _LIBCPP_TEMPLATE_VIS tuple_size<tuple<_Tp...> > : public integral_constant<size_t, sizeof...(_Tp)> { }; @@ -301,7 +301,7 @@ struct _LIBCPP_TEMPLATE_VIS tuple_element<_Ip, __tuple_types<_Types...> > template <class ..._Tp> -class _LIBCPP_TEMPLATE_VIS tuple_size<__tuple_types<_Tp...> > +struct _LIBCPP_TEMPLATE_VIS tuple_size<__tuple_types<_Tp...> > : public integral_constant<size_t, sizeof...(_Tp)> { }; diff --git a/contrib/libs/cxxsupp/libcxx/include/memory b/contrib/libs/cxxsupp/libcxx/include/memory index a495caa898..53af96afa7 100644 --- a/contrib/libs/cxxsupp/libcxx/include/memory +++ b/contrib/libs/cxxsupp/libcxx/include/memory @@ -866,7 +866,6 @@ template<size_t N, class T> #include <iosfwd> #include <new> #include <stdexcept> -#include <stlfwd> #include <tuple> #include <type_traits> #include <typeinfo> diff --git a/contrib/libs/cxxsupp/libcxx/src/support/runtime/exception_fallback.ipp b/contrib/libs/cxxsupp/libcxx/src/support/runtime/exception_fallback.ipp index ebdca7e5ee..100ee6da5e 100644 --- a/contrib/libs/cxxsupp/libcxx/src/support/runtime/exception_fallback.ipp +++ b/contrib/libs/cxxsupp/libcxx/src/support/runtime/exception_fallback.ipp @@ -8,7 +8,6 @@ //===----------------------------------------------------------------------===// #include <cstdio> -#include "../../include/atomic_support.h" namespace std { diff --git a/contrib/libs/cxxsupp/libcxx/src/support/runtime/exception_msvc.ipp b/contrib/libs/cxxsupp/libcxx/src/support/runtime/exception_msvc.ipp index 3ee2d21b68..7e36c7068a 100644 --- a/contrib/libs/cxxsupp/libcxx/src/support/runtime/exception_msvc.ipp +++ b/contrib/libs/cxxsupp/libcxx/src/support/runtime/exception_msvc.ipp @@ -14,8 +14,6 @@ #include <stdio.h> #include <stdlib.h> -#include <__config> - extern "C" { typedef void (__cdecl* terminate_handler)(); _LIBCPP_CRT_FUNC terminate_handler __cdecl set_terminate( diff --git a/library/python/testing/swag/daemon.py b/library/python/testing/swag/daemon.py index e59bedb5e7..81fd0eb97b 100644 --- a/library/python/testing/swag/daemon.py +++ b/library/python/testing/swag/daemon.py @@ -3,11 +3,12 @@ import logging import os -import sys import signal import tempfile import shutil +import six + try: from . import gdb except ValueError: @@ -66,11 +67,7 @@ class Daemon(object): self.stdinf = stdin or tempfile.NamedTemporaryFile(dir=self.cwd, prefix="stdin_", delete=False) self.cmd = command - if sys.version_info.major > 2: - _basestring = str - else: - _basestring = basestring - if isinstance(command, _basestring): + if isinstance(command, six.string_types): self.cmd = [arg for arg in command.split() if arg] self.daemon = None self.name = os.path.basename(self.cmd[0]) diff --git a/library/python/testing/swag/lib/ya.make b/library/python/testing/swag/lib/ya.make index e3e7cfa300..0567288a62 100644 --- a/library/python/testing/swag/lib/ya.make +++ b/library/python/testing/swag/lib/ya.make @@ -1,6 +1,7 @@ PY23_LIBRARY() PEERDIR( + contrib/python/six contrib/python/protobuf library/python/testing/yatest_common ) diff --git a/ydb/core/persqueue/ut/partition_chooser_ut.cpp b/ydb/core/persqueue/ut/partition_chooser_ut.cpp index 0d4a4304e1..e43f99ad85 100644 --- a/ydb/core/persqueue/ut/partition_chooser_ut.cpp +++ b/ydb/core/persqueue/ut/partition_chooser_ut.cpp @@ -99,14 +99,6 @@ Y_UNIT_TEST(TBoundaryChooser_GetTabletIdTest) { UNIT_ASSERT(!chooser.GetPartition(666)); } -Y_UNIT_TEST(TBoundaryChooser_GetRandomPartitionTest) { - auto config = CreateConfig(SMEnabled); - - NKikimr::NPQ::NPartitionChooser::TBoundaryChooser chooser(config); - auto p = chooser.GetRandomPartition(); - UNIT_ASSERT(p->PartitionId != 3); -} - Y_UNIT_TEST(THashChooserTest) { auto config = CreateConfig(SMDisabled); @@ -145,14 +137,6 @@ Y_UNIT_TEST(THashChooser_GetTabletIdTest) { UNIT_ASSERT(!chooser.GetPartition(666)); } -Y_UNIT_TEST(THashChooser_GetRandomPartitionTest) { - auto config = CreateConfig(SMDisabled); - - NKikimr::NPQ::NPartitionChooser::THashChooser chooser(config); - auto p = chooser.GetRandomPartition(); - UNIT_ASSERT(p->PartitionId != 3); -} - struct TWriteSessionMock: public NActors::TActorBootstrapped<TWriteSessionMock> { @@ -277,7 +261,6 @@ Y_UNIT_TEST(TPartitionChooserActor_SplitMergeEnabled_Test) { Y_UNIT_TEST(TPartitionChooserActor_SplitMergeDisabled_Test) { NPersQueue::TTestServer server{}; - server.CleverServer->GetRuntime()->GetAppData().PQConfig.SetRoundRobinPartitionMapping(false); server.CleverServer->GetRuntime()->GetAppData().PQConfig.SetTopicsAreFirstClassCitizen(true); server.CleverServer->GetRuntime()->GetAppData().PQConfig.SetUseSrcIdMetaMappingInFirstClass(true); diff --git a/ydb/core/persqueue/writer/partition_chooser_impl.h b/ydb/core/persqueue/writer/partition_chooser_impl.h index 4e954b02c4..c2f618d86e 100644 --- a/ydb/core/persqueue/writer/partition_chooser_impl.h +++ b/ydb/core/persqueue/writer/partition_chooser_impl.h @@ -58,7 +58,6 @@ public: const TPartitionInfo* GetPartition(const TString& sourceId) const; const TPartitionInfo* GetPartition(ui32 partitionId) const; - const TPartitionInfo* GetRandomPartition() const; private: const TString TopicName; @@ -79,7 +78,6 @@ public: const TPartitionInfo* GetPartition(const TString& sourceId) const; const TPartitionInfo* GetPartition(ui32 partitionId) const; - const TPartitionInfo* GetRandomPartition() const; private: std::vector<TPartitionInfo> Partitions; @@ -257,11 +255,6 @@ const typename TBoundaryChooser<THasher>::TPartitionInfo* TBoundaryChooser<THash return it == Partitions.end() ? nullptr : it; } -template<class THasher> -const typename TBoundaryChooser<THasher>::TPartitionInfo* TBoundaryChooser<THasher>::GetRandomPartition() const { - return &Partitions[RandomNumber<size_t>(Partitions.size())]; -} - // @@ -295,11 +288,6 @@ const typename THashChooser<THasher>::TPartitionInfo* THashChooser<THasher>::Get return it->PartitionId == partitionId ? it : nullptr; } -template<class THasher> -const typename THashChooser<THasher>::TPartitionInfo* THashChooser<THasher>::GetRandomPartition() const { - return &Partitions[RandomNumber<size_t>(Partitions.size())]; -} - // // TPartitionChooserActor @@ -740,10 +728,8 @@ std::pair<bool, const typename TPartitionChooserActor<TChooser>::TPartitionInfo* return {false, Chooser.GetPartition(PreferedPartition.value())}; } else if (pqConfig.GetTopicsAreFirstClassCitizen() && SourceId) { return {false, Chooser.GetPartition(SourceId)}; - } else if (pqConfig.GetRoundRobinPartitionMapping()) { - return {true, nullptr}; } else { - return {false, Chooser.GetRandomPartition()}; + return {true, nullptr}; } } diff --git a/ydb/core/protos/pqconfig.proto b/ydb/core/protos/pqconfig.proto index 048686e11e..fddb43eec4 100644 --- a/ydb/core/protos/pqconfig.proto +++ b/ydb/core/protos/pqconfig.proto @@ -90,7 +90,7 @@ message TPQConfig { optional uint32 RemoteClusterEnabledDelaySec = 24 [default = 300]; // 5 minutes optional uint32 CloseClientSessionWithEnabledRemotePreferredClusterDelaySec = 25 [default = 300]; // 5 minutes - optional bool RoundRobinPartitionMapping = 26 [default = true]; + reserved 26; // optional bool RoundRobinPartitionMapping = 26 [default = true]; optional string Root = 27 [default = ""]; optional string TestDatabaseRoot = 47; // For unit-tests only diff --git a/ydb/core/testlib/test_pq_client.h b/ydb/core/testlib/test_pq_client.h index b159c0249e..7448bfd6ca 100644 --- a/ydb/core/testlib/test_pq_client.h +++ b/ydb/core/testlib/test_pq_client.h @@ -25,14 +25,13 @@ using namespace NKikimr::Tests; const static ui32 PQ_DEFAULT_NODE_COUNT = 2; -inline Tests::TServerSettings PQSettings(ui16 port = 0, ui32 nodesCount = PQ_DEFAULT_NODE_COUNT, bool roundrobin = true, const TString& yql_timeout = "10", const THolder<TTempFileHandle>& netDataFile = nullptr) { +inline Tests::TServerSettings PQSettings(ui16 port = 0, ui32 nodesCount = PQ_DEFAULT_NODE_COUNT, const TString& yql_timeout = "10", const THolder<TTempFileHandle>& netDataFile = nullptr) { NKikimrPQ::TPQConfig pqConfig; NKikimrProto::TAuthConfig authConfig; authConfig.SetUseBlackBox(false); authConfig.SetUseAccessService(false); authConfig.SetUseAccessServiceTLS(false); authConfig.SetUseStaff(false); - pqConfig.SetRoundRobinPartitionMapping(roundrobin); pqConfig.SetEnabled(true); pqConfig.SetMaxReadCookies(10); @@ -72,6 +71,13 @@ inline Tests::TServerSettings PQSettings(ui16 port = 0, ui32 nodesCount = PQ_DEF return settings; } +// deprecated. +inline Tests::TServerSettings PQSettings(ui16 port, ui32 nodesCount, bool roundrobin, const TString& yql_timeout = "10", const THolder<TTempFileHandle>& netDataFile = nullptr) { + Y_UNUSED(roundrobin); + + return PQSettings(port, nodesCount, yql_timeout, netDataFile); +} + const TString TopicPrefix = "/Root/PQ/"; const static TString DEFAULT_SRC_IDS_PATH = "/Root/PQ/SourceIdMeta2"; diff --git a/ydb/core/tx/datashard/datashard_ut_change_exchange.cpp b/ydb/core/tx/datashard/datashard_ut_change_exchange.cpp index 1c02e8c202..e370a9b9a4 100644 --- a/ydb/core/tx/datashard/datashard_ut_change_exchange.cpp +++ b/ydb/core/tx/datashard/datashard_ut_change_exchange.cpp @@ -693,7 +693,6 @@ Y_UNIT_TEST_SUITE(Cdc) { NKikimrPQ::TPQConfig pqConfig; pqConfig.SetEnabled(true); pqConfig.SetEnableProtoSourceIdInfo(true); - pqConfig.SetRoundRobinPartitionMapping(true); pqConfig.SetTopicsAreFirstClassCitizen(true); pqConfig.SetMaxReadCookies(10); pqConfig.AddClientServiceType()->SetName("data-streams"); diff --git a/ydb/public/lib/ydb_cli/common/pretty_table.cpp b/ydb/public/lib/ydb_cli/common/pretty_table.cpp index 6af8836d5b..0dc07572ee 100644 --- a/ydb/public/lib/ydb_cli/common/pretty_table.cpp +++ b/ydb/public/lib/ydb_cli/common/pretty_table.cpp @@ -61,28 +61,31 @@ bool TPrettyTable::TRow::PrintColumns(IOutputStream& o, const TVector<size_t>& w o << " │ "; } - if (const size_t width = widths.at(columnIndex)) { + if (size_t width = widths.at(columnIndex)) { const auto& column = Columns.at(columnIndex); TStringBuf data; + size_t extraBytes; size_t l = 0; for (const auto& line : column) { data = line; + extraBytes = ExtraBytes(data); while (data && l < lineNumber) { - data.Skip(width); + data.Skip(width + extraBytes); ++l; } } - - size_t extraBytes = ExtraBytes(data); + extraBytes = ExtraBytes(data); + width += extraBytes; + if (data) { - o << RightPad(data.SubStr(0, width + extraBytes), width + extraBytes); + o << RightPad(data.SubStr(0, width), width); } else { - o << RightPad(' ', width + extraBytes); + o << RightPad(' ', width); } - if (data.size() > width + extraBytes) { + if (data.size() > width) { next = true; } } diff --git a/ydb/services/persqueue_v1/persqueue_ut.cpp b/ydb/services/persqueue_v1/persqueue_ut.cpp index 874dea2e62..7d9b8f60ea 100644 --- a/ydb/services/persqueue_v1/persqueue_ut.cpp +++ b/ydb/services/persqueue_v1/persqueue_ut.cpp @@ -1509,8 +1509,8 @@ Y_UNIT_TEST_SUITE(TPersQueueTest) { UNIT_ASSERT_C(resp.server_message_case() == Ydb::Topic::StreamReadMessage::FromServer::kReadResponse, resp); } - void SetupWriteSessionImpl(bool rr) { - NPersQueue::TTestServer server{PQSettings(0, 2, rr), false}; + Y_UNIT_TEST(SetupWriteSession) { + NPersQueue::TTestServer server{PQSettings(0, 2), false}; server.ServerSettings.SetEnableSystemViews(false); server.StartServer(); @@ -1550,17 +1550,10 @@ Y_UNIT_TEST_SUITE(TPersQueueTest) { for (ui32 i = 0; i < 15*5; ++i) { ss[writer.InitSession("sid_rand_" + ToString<ui32>(i), 0, true)]++; } - if (rr) { - for (auto& s : ss) { - Cerr << "Round robin check: " << s.first << ":" << s.second << "\n"; - UNIT_ASSERT(s.second >= 4 && s.second <= 6); - } + for (auto& s : ss) { + Cerr << "Round robin check: " << s.first << ":" << s.second << "\n"; + UNIT_ASSERT(s.second >= 4 && s.second <= 6); } - } - - Y_UNIT_TEST(SetupWriteSession) { - SetupWriteSessionImpl(false); - SetupWriteSessionImpl(true); } Y_UNIT_TEST(StoreNoMoreThanXSourceIDs) { @@ -3158,7 +3151,7 @@ Y_UNIT_TEST_SUITE(TPersQueueTest) { UNIT_ASSERT(equal); }; - NPersQueue::TTestServer server(PQSettings(0, 1, true, "10"), false); + NPersQueue::TTestServer server(PQSettings(0, 1, "10"), false); auto netDataUpdated = server.PrepareNetDataFile(FormNetData()); UNIT_ASSERT(netDataUpdated); server.StartServer(); diff --git a/yt/yt/client/api/rpc_proxy/connection_impl.cpp b/yt/yt/client/api/rpc_proxy/connection_impl.cpp index aa5d4e1080..179cee1b2a 100644 --- a/yt/yt/client/api/rpc_proxy/connection_impl.cpp +++ b/yt/yt/client/api/rpc_proxy/connection_impl.cpp @@ -70,10 +70,7 @@ void ApplyProxyUrlAliasingRules(TString& url, const std::optional<THashMap<TStri { static const auto rulesFromEnv = ParseProxyUrlAliasingRules(GetEnv("YT_PROXY_URL_ALIASING_CONFIG")); - const THashMap<TString, TString>& rules = - proxyUrlAliasingRules - ? proxyUrlAliasingRules.value() - : rulesFromEnv; + const auto& rules = proxyUrlAliasingRules.value_or(rulesFromEnv); if (auto ruleIt = rules.find(url); ruleIt != rules.end()) { url = ruleIt->second; @@ -82,7 +79,6 @@ void ApplyProxyUrlAliasingRules(TString& url, const std::optional<THashMap<TStri TString NormalizeHttpProxyUrl(TString url, const std::optional<THashMap<TString, TString>>& proxyUrlAliasingRules) { - ApplyProxyUrlAliasingRules(url, proxyUrlAliasingRules); if (url.find('.') == TString::npos && diff --git a/yt/yt/core/rpc/bus/channel.cpp b/yt/yt/core/rpc/bus/channel.cpp index ec8c6fd04d..6c53708a72 100644 --- a/yt/yt/core/rpc/bus/channel.cpp +++ b/yt/yt/core/rpc/bus/channel.cpp @@ -129,6 +129,20 @@ public: Terminated_.Unsubscribe(callback); } + int GetInflightRequestCount() override + { + int requestCount = 0; + + for (auto& bucket : Buckets_) { + auto guard = ReaderGuard(bucket.Lock); + for (const auto& session : bucket.Sessions) { + requestCount += session->GetInflightRequestCount(); + } + } + + return requestCount; + } + private: class TSession; using TSessionPtr = TIntrusivePtr<TSession>; @@ -260,7 +274,6 @@ private: private: const TWeakPtr<TSession> Session_; - }; //! Directs requests sent via a channel to go through its underlying bus. @@ -294,7 +307,7 @@ private: // Mark the channel as terminated to disallow any further usage. for (auto& bucket : RequestBuckets_) { - auto guard = Guard(bucket.Lock); + auto guard = Guard(bucket); bucket.Terminated = true; @@ -385,7 +398,7 @@ private: IClientResponseHandlerPtr responseHandler; { - auto guard = Guard(bucket->Lock); + auto guard = Guard(*bucket); auto it = bucket->ActiveRequestMap.find(requestId); if (it == bucket->ActiveRequestMap.end()) { @@ -502,7 +515,7 @@ private: IClientResponseHandlerPtr responseHandler; { - auto guard = Guard(bucket->Lock); + auto guard = Guard(*bucket); if (!requestControl->IsActive(guard)) { return; @@ -542,7 +555,7 @@ private: IClientResponseHandlerPtr responseHandler; { - auto guard = Guard(bucket->Lock); + auto guard = Guard(*bucket); if (!requestControl->IsActive(guard)) { return; @@ -600,18 +613,48 @@ private: } } + int GetInflightRequestCount() + { + int requestCount = 0; + + for (auto& bucket : RequestBuckets_) { + requestCount += bucket.ActiveRequestCount.load( + std::memory_order::relaxed); + } + + return requestCount; + } + private: const TTosLevel TosLevel_; IBusPtr Bus_; std::atomic<bool> BusReady_ = false; - struct TBucket + class TBucket { - YT_DECLARE_SPIN_LOCK(NThreading::TSpinLock, Lock); + public: IBusPtr Bus; bool Terminated = false; THashMap<TRequestId, TClientRequestControlPtr> ActiveRequestMap; + std::atomic<int> ActiveRequestCount = 0; + + void Acquire() noexcept + { + Lock.Acquire(); + } + + void Release() noexcept + { + if (std::ssize(ActiveRequestMap) != ActiveRequestCount.load(std::memory_order::relaxed)) { + ActiveRequestCount.store(std::ssize(ActiveRequestMap), std::memory_order::relaxed); + } + + Lock.Release(); + } + + private: + YT_DECLARE_SPIN_LOCK(NThreading::TSpinLock, Lock); }; static constexpr size_t BucketCount = 64; @@ -687,7 +730,7 @@ private: VERIFY_THREAD_AFFINITY_ANY(); auto* bucket = GetBucketForRequest(requestId); - auto guard = Guard(bucket->Lock); + auto guard = Guard(*bucket); auto it = bucket->ActiveRequestMap.find(requestId); if (it == bucket->ActiveRequestMap.end()) { @@ -719,7 +762,7 @@ private: TClientRequestControlPtr existingRequestControl; IClientResponseHandlerPtr existingResponseHandler; { - auto guard = Guard(bucket->Lock); + auto guard = Guard(*bucket); if (!requestControl->IsActive(guard)) { return; @@ -826,7 +869,7 @@ private: TClientRequestControlPtr requestControl; IClientResponseHandlerPtr responseHandler; { - auto guard = Guard(bucket->Lock); + auto guard = Guard(*bucket); if (bucket->Terminated) { YT_LOG_WARNING("Response received via a terminated channel (RequestId: %v)", @@ -972,7 +1015,7 @@ private: TClientRequestControlPtr requestControl; IClientResponseHandlerPtr responseHandler; { - auto guard = Guard(bucket->Lock); + auto guard = Guard(*bucket); auto it = bucket->ActiveRequestMap.find(requestId); if (it == bucket->ActiveRequestMap.end()) { @@ -1120,7 +1163,8 @@ private: return TraceContext_.MakeTraceContextGuard(); } - bool IsActive(const TGuard<NThreading::TSpinLock>&) const + template <typename TLock> + bool IsActive(const TGuard<TLock>&) const { return static_cast<bool>(ResponseHandler_); } @@ -1142,12 +1186,14 @@ private: TDelayedExecutor::CancelAndClear(AcknowledgementTimeoutCookie_); } - IClientResponseHandlerPtr GetResponseHandler(const TGuard<NThreading::TSpinLock>&) + template <typename TLock> + IClientResponseHandlerPtr GetResponseHandler(const TGuard<TLock>&) { return ResponseHandler_; } - IClientResponseHandlerPtr Finalize(const TGuard<NThreading::TSpinLock>&) + template <typename TLock> + IClientResponseHandlerPtr Finalize(const TGuard<TLock>&) { TotalTime_ = ProfileComplete(); TDelayedExecutor::CancelAndClear(TimeoutCookie_); diff --git a/yt/yt/core/rpc/channel.h b/yt/yt/core/rpc/channel.h index 9ae2a664ee..8248a938d5 100644 --- a/yt/yt/core/rpc/channel.h +++ b/yt/yt/core/rpc/channel.h @@ -121,6 +121,8 @@ struct IChannel //! Raised whenever the channel is terminated. DECLARE_INTERFACE_SIGNAL(void(const TError&), Terminated); + + virtual int GetInflightRequestCount() = 0; }; DEFINE_REFCOUNTED_TYPE(IChannel) diff --git a/yt/yt/core/rpc/channel_detail.cpp b/yt/yt/core/rpc/channel_detail.cpp index 295457f947..181a779f48 100644 --- a/yt/yt/core/rpc/channel_detail.cpp +++ b/yt/yt/core/rpc/channel_detail.cpp @@ -57,6 +57,11 @@ void TChannelWrapper::UnsubscribeTerminated(const TCallback<void(const TError&)> UnderlyingChannel_->UnsubscribeTerminated(callback); } +int TChannelWrapper::GetInflightRequestCount() +{ + return UnderlyingChannel_->GetInflightRequestCount(); +} + //////////////////////////////////////////////////////////////////////////////// void TClientRequestControlThunk::SetUnderlying(IClientRequestControlPtr underlying) diff --git a/yt/yt/core/rpc/channel_detail.h b/yt/yt/core/rpc/channel_detail.h index d23d2346b9..a3ca87a5d4 100644 --- a/yt/yt/core/rpc/channel_detail.h +++ b/yt/yt/core/rpc/channel_detail.h @@ -27,6 +27,8 @@ public: void SubscribeTerminated(const TCallback<void(const TError&)>& callback) override; void UnsubscribeTerminated(const TCallback<void(const TError&)>& callback) override; + int GetInflightRequestCount() override; + protected: const IChannelPtr UnderlyingChannel_; }; diff --git a/yt/yt/core/rpc/config.cpp b/yt/yt/core/rpc/config.cpp index bb1519dafe..594b83636a 100644 --- a/yt/yt/core/rpc/config.cpp +++ b/yt/yt/core/rpc/config.cpp @@ -143,6 +143,9 @@ void TViablePeerRegistryConfig::Register(TRegistrar registrar) .GreaterThanOrEqual(0) .Default(0); + registrar.Parameter("enable_power_of_two_choices_strategy", &TThis::EnablePowerOfTwoChoicesStrategy) + .Default(false); + registrar.Postprocessor([] (TThis* config) { if (config->MinPeerCountForPriorityAwareness > config->MaxPeerCount) { THROW_ERROR_EXCEPTION( diff --git a/yt/yt/core/rpc/config.h b/yt/yt/core/rpc/config.h index 6659daf9e2..709101f7c9 100644 --- a/yt/yt/core/rpc/config.h +++ b/yt/yt/core/rpc/config.h @@ -229,6 +229,8 @@ public: //! If you want to set bigger values, you must also increase MaxPeerCount to accommodate more peers. int MinPeerCountForPriorityAwareness; + bool EnablePowerOfTwoChoicesStrategy; + REGISTER_YSON_STRUCT(TViablePeerRegistryConfig); static void Register(TRegistrar registrar); diff --git a/yt/yt/core/rpc/grpc/channel.cpp b/yt/yt/core/rpc/grpc/channel.cpp index bfbc002505..1d09a0d249 100644 --- a/yt/yt/core/rpc/grpc/channel.cpp +++ b/yt/yt/core/rpc/grpc/channel.cpp @@ -209,6 +209,11 @@ public: return EndpointAddress_; } + int GetInflightRequestCount() override + { + YT_UNIMPLEMENTED(); + } + private: const TChannelConfigPtr Config_; const TString EndpointAddress_; diff --git a/yt/yt/core/rpc/hedging_channel.cpp b/yt/yt/core/rpc/hedging_channel.cpp index a0e379af62..b97c089b49 100644 --- a/yt/yt/core/rpc/hedging_channel.cpp +++ b/yt/yt/core/rpc/hedging_channel.cpp @@ -387,6 +387,11 @@ public: BackupChannel_->UnsubscribeTerminated(callback); } + int GetInflightRequestCount() override + { + YT_UNIMPLEMENTED(); + } + private: const IChannelPtr PrimaryChannel_; const IChannelPtr BackupChannel_; diff --git a/yt/yt/core/rpc/local_channel.cpp b/yt/yt/core/rpc/local_channel.cpp index 27a41b8976..948b3793a7 100644 --- a/yt/yt/core/rpc/local_channel.cpp +++ b/yt/yt/core/rpc/local_channel.cpp @@ -128,6 +128,11 @@ public: Terminated_.Unsubscribe(callback); } + int GetInflightRequestCount() override + { + return 0; + } + private: class TSession; using TSessionPtr = TIntrusivePtr<TSession>; diff --git a/yt/yt/core/rpc/null_channel.cpp b/yt/yt/core/rpc/null_channel.cpp index 73b12055dd..46f0c82520 100644 --- a/yt/yt/core/rpc/null_channel.cpp +++ b/yt/yt/core/rpc/null_channel.cpp @@ -43,6 +43,11 @@ public: DEFINE_SIGNAL_OVERRIDE(void(const TError&), Terminated); + int GetInflightRequestCount() override + { + return 0; + } + private: const TString Address_; }; diff --git a/yt/yt/core/rpc/roaming_channel.cpp b/yt/yt/core/rpc/roaming_channel.cpp index ef3c0f9297..a1b531b586 100644 --- a/yt/yt/core/rpc/roaming_channel.cpp +++ b/yt/yt/core/rpc/roaming_channel.cpp @@ -177,9 +177,13 @@ public: void UnsubscribeTerminated(const TCallback<void(const TError&)>& /*callback*/) override { } + int GetInflightRequestCount() override + { + return 0; + } + private: const IRoamingChannelProviderPtr Provider_; - }; IChannelPtr CreateRoamingChannel(IRoamingChannelProviderPtr provider) diff --git a/yt/yt/core/rpc/unittests/viable_peer_registry_ut.cpp b/yt/yt/core/rpc/unittests/viable_peer_registry_ut.cpp index fada327808..20055b71e9 100644 --- a/yt/yt/core/rpc/unittests/viable_peer_registry_ut.cpp +++ b/yt/yt/core/rpc/unittests/viable_peer_registry_ut.cpp @@ -124,6 +124,11 @@ public: } } + int GetInflightRequestCount() override + { + return 0; + } + DEFINE_SIGNAL_OVERRIDE(void(const TError&), Terminated); private: TString Address_; diff --git a/yt/yt/core/rpc/viable_peer_registry.cpp b/yt/yt/core/rpc/viable_peer_registry.cpp index b96b81a70f..5e9e0e5d22 100644 --- a/yt/yt/core/rpc/viable_peer_registry.cpp +++ b/yt/yt/core/rpc/viable_peer_registry.cpp @@ -248,6 +248,28 @@ public: return peers; } + IChannelPtr PickChannelFromTwoRandom(const IClientRequestPtr& request) const + { + auto peers = PickRandomPeers(/*peerCount*/ 2); + const auto& channelOne = peers.front(); + const auto& channelTwo = peers.back(); + + auto getLoad = [] (const auto& channel) { + return channel.second->GetInflightRequestCount(); + }; + + const auto& theWinner = getLoad(channelOne) < getLoad(channelTwo) ? channelOne : channelTwo; + + YT_LOG_DEBUG( + "Selected a peer via the power of two choices strategy (RequestId: %v, Peer1: %v, Peer2: %v, Winner: %v)", + request ? request->GetRequestId() : TRequestId(), + channelOne.first, + channelTwo.first, + theWinner.first); + + return theWinner.second; + } + IChannelPtr PickRandomChannel( const IClientRequestPtr& request, const std::optional<THedgingChannelOptions>& hedgingOptions) const override @@ -273,6 +295,8 @@ public: request ? request->GetRequestId() : TRequestId(), primaryPeer.first, backupPeer.first); + } else if (Config_->EnablePowerOfTwoChoicesStrategy && ActivePeerToPriority_.Size() >= 2) { + return PickChannelFromTwoRandom(request); } else { auto peer = PickRandomPeers()[0]; channel = peer.second; |