aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorrobot-piglet <robot-piglet@yandex-team.com>2023-12-04 12:08:11 +0300
committerrobot-piglet <robot-piglet@yandex-team.com>2023-12-04 15:27:02 +0300
commitf37c4c9349e2c8c433d52d4be744cf5dc9aa7909 (patch)
tree6b05033f48f4bf41a995f6a80a1f0c08fed62c13
parent2a718325637e5302334b6d0a6430f63168f8dbb3 (diff)
downloadydb-f37c4c9349e2c8c433d52d4be744cf5dc9aa7909.tar.gz
Intermediate changes
-rw-r--r--.gitignore3
-rw-r--r--build/external_resources/ymake/public.resources.json10
-rw-r--r--build/external_resources/ymake/resources.json10
-rw-r--r--build/mapping.conf.json20
-rw-r--r--build/platform/lld/ya.make6
-rw-r--r--contrib/go/_std_1.21/src/internal/goexperiment/exp_coverageredesign_off.go9
-rw-r--r--contrib/go/_std_1.21/src/internal/goexperiment/exp_coverageredesign_on.go9
-rw-r--r--contrib/go/_std_1.21/src/internal/goexperiment/ya.make2
-rw-r--r--contrib/libs/cxxsupp/libcxx/include/__memory/shared_ptr.h2
-rw-r--r--contrib/libs/cxxsupp/libcxx/include/__random/log2.h1
-rw-r--r--contrib/libs/cxxsupp/libcxx/include/__support/win32/limits_msvc_win32.h7
-rw-r--r--contrib/libs/cxxsupp/libcxx/include/__tuple18
-rw-r--r--contrib/libs/cxxsupp/libcxx/include/memory1
-rw-r--r--contrib/libs/cxxsupp/libcxx/src/support/runtime/exception_fallback.ipp1
-rw-r--r--contrib/libs/cxxsupp/libcxx/src/support/runtime/exception_msvc.ipp2
-rw-r--r--library/python/testing/swag/daemon.py9
-rw-r--r--library/python/testing/swag/lib/ya.make1
-rw-r--r--ydb/core/persqueue/ut/partition_chooser_ut.cpp17
-rw-r--r--ydb/core/persqueue/writer/partition_chooser_impl.h16
-rw-r--r--ydb/core/protos/pqconfig.proto2
-rw-r--r--ydb/core/testlib/test_pq_client.h10
-rw-r--r--ydb/core/tx/datashard/datashard_ut_change_exchange.cpp1
-rw-r--r--ydb/public/lib/ydb_cli/common/pretty_table.cpp17
-rw-r--r--ydb/services/persqueue_v1/persqueue_ut.cpp19
-rw-r--r--yt/yt/client/api/rpc_proxy/connection_impl.cpp6
-rw-r--r--yt/yt/core/rpc/bus/channel.cpp74
-rw-r--r--yt/yt/core/rpc/channel.h2
-rw-r--r--yt/yt/core/rpc/channel_detail.cpp5
-rw-r--r--yt/yt/core/rpc/channel_detail.h2
-rw-r--r--yt/yt/core/rpc/config.cpp3
-rw-r--r--yt/yt/core/rpc/config.h2
-rw-r--r--yt/yt/core/rpc/grpc/channel.cpp5
-rw-r--r--yt/yt/core/rpc/hedging_channel.cpp5
-rw-r--r--yt/yt/core/rpc/local_channel.cpp5
-rw-r--r--yt/yt/core/rpc/null_channel.cpp5
-rw-r--r--yt/yt/core/rpc/roaming_channel.cpp6
-rw-r--r--yt/yt/core/rpc/unittests/viable_peer_registry_ut.cpp5
-rw-r--r--yt/yt/core/rpc/viable_peer_registry.cpp24
38 files changed, 206 insertions, 136 deletions
diff --git a/.gitignore b/.gitignore
index 3ea9eaf681..e8dae2178d 100644
--- a/.gitignore
+++ b/.gitignore
@@ -8,8 +8,9 @@
# Unignore all files inside canondata dir
!*/canondata/*
-# Compiled dynamic C libraries
+# C libraries
*.so
+*.a
# Byte-compiled / optimized / DLL files
__pycache__/
diff --git a/build/external_resources/ymake/public.resources.json b/build/external_resources/ymake/public.resources.json
index 8564591c37..0f15409c4d 100644
--- a/build/external_resources/ymake/public.resources.json
+++ b/build/external_resources/ymake/public.resources.json
@@ -1,19 +1,19 @@
{
"by_platform": {
"darwin": {
- "uri": "sbr:5443071926"
+ "uri": "sbr:5476908047"
},
"darwin-arm64": {
- "uri": "sbr:5443066470"
+ "uri": "sbr:5476891477"
},
"linux": {
- "uri": "sbr:5443068824"
+ "uri": "sbr:5476896849"
},
"linux-aarch64": {
- "uri": "sbr:5443067977"
+ "uri": "sbr:5476895322"
},
"win32-clang-cl": {
- "uri": "sbr:5443068659"
+ "uri": "sbr:5476896707"
}
}
}
diff --git a/build/external_resources/ymake/resources.json b/build/external_resources/ymake/resources.json
index 41d5029dd6..9f55612913 100644
--- a/build/external_resources/ymake/resources.json
+++ b/build/external_resources/ymake/resources.json
@@ -1,19 +1,19 @@
{
"by_platform": {
"darwin": {
- "uri": "sbr:5443051526"
+ "uri": "sbr:5476872784"
},
"darwin-arm64": {
- "uri": "sbr:5443049933"
+ "uri": "sbr:5476874982"
},
"linux": {
- "uri": "sbr:5443071824"
+ "uri": "sbr:5476877550"
},
"linux-aarch64": {
- "uri": "sbr:5443056096"
+ "uri": "sbr:5476883701"
},
"win32-clang-cl": {
- "uri": "sbr:5443077529"
+ "uri": "sbr:5476894202"
}
}
}
diff --git a/build/mapping.conf.json b/build/mapping.conf.json
index 018bc9da7e..2d337dca71 100644
--- a/build/mapping.conf.json
+++ b/build/mapping.conf.json
@@ -85,11 +85,11 @@
"5433628204": "https://devtools-registry.s3.yandex.net/5433628204",
"5433626802": "https://devtools-registry.s3.yandex.net/5433626802",
"5433622361": "https://devtools-registry.s3.yandex.net/5433622361",
- "5443071926": "https://devtools-registry.s3.yandex.net/5443071926",
- "5443066470": "https://devtools-registry.s3.yandex.net/5443066470",
- "5443068824": "https://devtools-registry.s3.yandex.net/5443068824",
- "5443067977": "https://devtools-registry.s3.yandex.net/5443067977",
- "5443068659": "https://devtools-registry.s3.yandex.net/5443068659",
+ "5476908047": "https://devtools-registry.s3.yandex.net/5476908047",
+ "5476891477": "https://devtools-registry.s3.yandex.net/5476891477",
+ "5476896849": "https://devtools-registry.s3.yandex.net/5476896849",
+ "5476895322": "https://devtools-registry.s3.yandex.net/5476895322",
+ "5476896707": "https://devtools-registry.s3.yandex.net/5476896707",
"2980468199": "https://devtools-registry.s3.yandex.net/2980468199"
},
"resources_descriptions": {
@@ -177,11 +177,11 @@
"5433628204": "yexport for darwin",
"5433626802": "yexport for darwin-arm64",
"5433622361": "yexport for linux",
- "5443071926": "ymake for darwin",
- "5443066470": "ymake for darwin-arm64",
- "5443068824": "ymake for linux",
- "5443067977": "ymake for linux-aarch64",
- "5443068659": "ymake.exe for win32-clang-cl",
+ "5476908047": "ymake for darwin",
+ "5476891477": "ymake for darwin-arm64",
+ "5476896849": "ymake for linux",
+ "5476895322": "ymake for linux-aarch64",
+ "5476896707": "ymake.exe for win32-clang-cl",
"2980468199": "ytexec for linux"
},
"tasks": {}
diff --git a/build/platform/lld/ya.make b/build/platform/lld/ya.make
index ddf3a9fb36..4c182c0784 100644
--- a/build/platform/lld/ya.make
+++ b/build/platform/lld/ya.make
@@ -2,11 +2,7 @@ RESOURCES_LIBRARY()
LICENSE(Service-Prebuilt-Tool)
-IF (CLANG16)
- DEFAULT(LLD_VERSION 16)
-ELSE()
- DEFAULT(LLD_VERSION 14)
-ENDIF()
+DEFAULT(LLD_VERSION ${CLANG_VER})
IF (LLD_VERSION == 14)
DECLARE_EXTERNAL_HOST_RESOURCES_BUNDLE_BY_JSON(LLD_ROOT lld14.json)
diff --git a/contrib/go/_std_1.21/src/internal/goexperiment/exp_coverageredesign_off.go b/contrib/go/_std_1.21/src/internal/goexperiment/exp_coverageredesign_off.go
new file mode 100644
index 0000000000..95d3a6c4ae
--- /dev/null
+++ b/contrib/go/_std_1.21/src/internal/goexperiment/exp_coverageredesign_off.go
@@ -0,0 +1,9 @@
+// Code generated by mkconsts.go. DO NOT EDIT.
+
+//go:build !goexperiment.coverageredesign
+// +build !goexperiment.coverageredesign
+
+package goexperiment
+
+const CoverageRedesign = false
+const CoverageRedesignInt = 0
diff --git a/contrib/go/_std_1.21/src/internal/goexperiment/exp_coverageredesign_on.go b/contrib/go/_std_1.21/src/internal/goexperiment/exp_coverageredesign_on.go
deleted file mode 100644
index 330a234f20..0000000000
--- a/contrib/go/_std_1.21/src/internal/goexperiment/exp_coverageredesign_on.go
+++ /dev/null
@@ -1,9 +0,0 @@
-// Code generated by mkconsts.go. DO NOT EDIT.
-
-//go:build goexperiment.coverageredesign
-// +build goexperiment.coverageredesign
-
-package goexperiment
-
-const CoverageRedesign = true
-const CoverageRedesignInt = 1
diff --git a/contrib/go/_std_1.21/src/internal/goexperiment/ya.make b/contrib/go/_std_1.21/src/internal/goexperiment/ya.make
index 8b59c0307f..b50720d13d 100644
--- a/contrib/go/_std_1.21/src/internal/goexperiment/ya.make
+++ b/contrib/go/_std_1.21/src/internal/goexperiment/ya.make
@@ -5,7 +5,7 @@ SRCS(
exp_boringcrypto_off.go
exp_cacheprog_off.go
exp_cgocheck2_off.go
- exp_coverageredesign_on.go
+ exp_coverageredesign_off.go
exp_fieldtrack_off.go
exp_heapminimum512kib_off.go
exp_loopvar_off.go
diff --git a/contrib/libs/cxxsupp/libcxx/include/__memory/shared_ptr.h b/contrib/libs/cxxsupp/libcxx/include/__memory/shared_ptr.h
index 3d73bfaafa..8b7256d038 100644
--- a/contrib/libs/cxxsupp/libcxx/include/__memory/shared_ptr.h
+++ b/contrib/libs/cxxsupp/libcxx/include/__memory/shared_ptr.h
@@ -35,7 +35,7 @@
#include <stdexcept>
#include <type_traits>
#include <typeinfo>
-#if !defined(_LIBCPP_HAS_NO_THREADS) // !defined(_LIBCPP_HAS_NO_ATOMIC_HEADER)
+#if !defined(_LIBCPP_HAS_NO_ATOMIC_HEADER)
# include <atomic>
#endif
diff --git a/contrib/libs/cxxsupp/libcxx/include/__random/log2.h b/contrib/libs/cxxsupp/libcxx/include/__random/log2.h
index f4f837764c..b077d211ce 100644
--- a/contrib/libs/cxxsupp/libcxx/include/__random/log2.h
+++ b/contrib/libs/cxxsupp/libcxx/include/__random/log2.h
@@ -11,7 +11,6 @@
#include <__config>
#include <cstddef>
-#include <limits>
#include <type_traits>
#if !defined(_LIBCPP_HAS_NO_PRAGMA_SYSTEM_HEADER)
diff --git a/contrib/libs/cxxsupp/libcxx/include/__support/win32/limits_msvc_win32.h b/contrib/libs/cxxsupp/libcxx/include/__support/win32/limits_msvc_win32.h
index 9f693d9fa4..87e4e7db66 100644
--- a/contrib/libs/cxxsupp/libcxx/include/__support/win32/limits_msvc_win32.h
+++ b/contrib/libs/cxxsupp/libcxx/include/__support/win32/limits_msvc_win32.h
@@ -20,6 +20,7 @@
#include <float.h> // limit constants
#include <limits.h> // CHAR_BIT
#include <math.h> // HUGE_VAL
+#include <ymath.h> // internal MSVC header providing the needed functionality
#define __CHAR_BIT__ CHAR_BIT
@@ -63,8 +64,8 @@
#define __LDBL_DENORM_MIN__ 3.64519953188247460253e-4951L
// __builtin replacements/workarounds
-#define __builtin_huge_vall() ((long double)__builtin_huge_val())
-#define __builtin_nanl(__dummy) ((long double)__builtin_nan(__dummy))
-#define __builtin_nansl(__dummy) ((long double)__builtin_nans(__dummy))
+#define __builtin_huge_vall() _LInf._Long_double
+#define __builtin_nanl(__dummmy) _LNan._Long_double
+#define __builtin_nansl(__dummy) _LSnan._Long_double
#endif // _LIBCPP_SUPPORT_WIN32_LIMITS_MSVC_WIN32_H
diff --git a/contrib/libs/cxxsupp/libcxx/include/__tuple b/contrib/libs/cxxsupp/libcxx/include/__tuple
index 38a22dbaaf..2b399f30e1 100644
--- a/contrib/libs/cxxsupp/libcxx/include/__tuple
+++ b/contrib/libs/cxxsupp/libcxx/include/__tuple
@@ -21,36 +21,36 @@
_LIBCPP_BEGIN_NAMESPACE_STD
-template <class _Tp> class _LIBCPP_TEMPLATE_VIS tuple_size;
+template <class _Tp> struct _LIBCPP_TEMPLATE_VIS tuple_size;
#if !defined(_LIBCPP_CXX03_LANG)
template <class _Tp, class...>
using __enable_if_tuple_size_imp = _Tp;
template <class _Tp>
-class _LIBCPP_TEMPLATE_VIS tuple_size<__enable_if_tuple_size_imp<
+struct _LIBCPP_TEMPLATE_VIS tuple_size<__enable_if_tuple_size_imp<
const _Tp,
typename enable_if<!is_volatile<_Tp>::value>::type,
integral_constant<size_t, sizeof(tuple_size<_Tp>)>>>
: public integral_constant<size_t, tuple_size<_Tp>::value> {};
template <class _Tp>
-class _LIBCPP_TEMPLATE_VIS tuple_size<__enable_if_tuple_size_imp<
+struct _LIBCPP_TEMPLATE_VIS tuple_size<__enable_if_tuple_size_imp<
volatile _Tp,
typename enable_if<!is_const<_Tp>::value>::type,
integral_constant<size_t, sizeof(tuple_size<_Tp>)>>>
: public integral_constant<size_t, tuple_size<_Tp>::value> {};
template <class _Tp>
-class _LIBCPP_TEMPLATE_VIS tuple_size<__enable_if_tuple_size_imp<
+struct _LIBCPP_TEMPLATE_VIS tuple_size<__enable_if_tuple_size_imp<
const volatile _Tp,
integral_constant<size_t, sizeof(tuple_size<_Tp>)>>>
: public integral_constant<size_t, tuple_size<_Tp>::value> {};
#else
-template <class _Tp> class _LIBCPP_TEMPLATE_VIS tuple_size<const _Tp> : public tuple_size<_Tp> {};
-template <class _Tp> class _LIBCPP_TEMPLATE_VIS tuple_size<volatile _Tp> : public tuple_size<_Tp> {};
-template <class _Tp> class _LIBCPP_TEMPLATE_VIS tuple_size<const volatile _Tp> : public tuple_size<_Tp> {};
+template <class _Tp> struct _LIBCPP_TEMPLATE_VIS tuple_size<const _Tp> : public tuple_size<_Tp> {};
+template <class _Tp> struct _LIBCPP_TEMPLATE_VIS tuple_size<volatile _Tp> : public tuple_size<_Tp> {};
+template <class _Tp> struct _LIBCPP_TEMPLATE_VIS tuple_size<const volatile _Tp> : public tuple_size<_Tp> {};
#endif
template <size_t _Ip, class _Tp> struct _LIBCPP_TEMPLATE_VIS tuple_element;
@@ -160,7 +160,7 @@ template <class ..._Tp> class _LIBCPP_TEMPLATE_VIS tuple;
template <class... _Tp> struct __tuple_like<tuple<_Tp...> > : true_type {};
template <class ..._Tp>
-class _LIBCPP_TEMPLATE_VIS tuple_size<tuple<_Tp...> >
+struct _LIBCPP_TEMPLATE_VIS tuple_size<tuple<_Tp...> >
: public integral_constant<size_t, sizeof...(_Tp)>
{
};
@@ -301,7 +301,7 @@ struct _LIBCPP_TEMPLATE_VIS tuple_element<_Ip, __tuple_types<_Types...> >
template <class ..._Tp>
-class _LIBCPP_TEMPLATE_VIS tuple_size<__tuple_types<_Tp...> >
+struct _LIBCPP_TEMPLATE_VIS tuple_size<__tuple_types<_Tp...> >
: public integral_constant<size_t, sizeof...(_Tp)>
{
};
diff --git a/contrib/libs/cxxsupp/libcxx/include/memory b/contrib/libs/cxxsupp/libcxx/include/memory
index a495caa898..53af96afa7 100644
--- a/contrib/libs/cxxsupp/libcxx/include/memory
+++ b/contrib/libs/cxxsupp/libcxx/include/memory
@@ -866,7 +866,6 @@ template<size_t N, class T>
#include <iosfwd>
#include <new>
#include <stdexcept>
-#include <stlfwd>
#include <tuple>
#include <type_traits>
#include <typeinfo>
diff --git a/contrib/libs/cxxsupp/libcxx/src/support/runtime/exception_fallback.ipp b/contrib/libs/cxxsupp/libcxx/src/support/runtime/exception_fallback.ipp
index ebdca7e5ee..100ee6da5e 100644
--- a/contrib/libs/cxxsupp/libcxx/src/support/runtime/exception_fallback.ipp
+++ b/contrib/libs/cxxsupp/libcxx/src/support/runtime/exception_fallback.ipp
@@ -8,7 +8,6 @@
//===----------------------------------------------------------------------===//
#include <cstdio>
-#include "../../include/atomic_support.h"
namespace std {
diff --git a/contrib/libs/cxxsupp/libcxx/src/support/runtime/exception_msvc.ipp b/contrib/libs/cxxsupp/libcxx/src/support/runtime/exception_msvc.ipp
index 3ee2d21b68..7e36c7068a 100644
--- a/contrib/libs/cxxsupp/libcxx/src/support/runtime/exception_msvc.ipp
+++ b/contrib/libs/cxxsupp/libcxx/src/support/runtime/exception_msvc.ipp
@@ -14,8 +14,6 @@
#include <stdio.h>
#include <stdlib.h>
-#include <__config>
-
extern "C" {
typedef void (__cdecl* terminate_handler)();
_LIBCPP_CRT_FUNC terminate_handler __cdecl set_terminate(
diff --git a/library/python/testing/swag/daemon.py b/library/python/testing/swag/daemon.py
index e59bedb5e7..81fd0eb97b 100644
--- a/library/python/testing/swag/daemon.py
+++ b/library/python/testing/swag/daemon.py
@@ -3,11 +3,12 @@
import logging
import os
-import sys
import signal
import tempfile
import shutil
+import six
+
try:
from . import gdb
except ValueError:
@@ -66,11 +67,7 @@ class Daemon(object):
self.stdinf = stdin or tempfile.NamedTemporaryFile(dir=self.cwd, prefix="stdin_", delete=False)
self.cmd = command
- if sys.version_info.major > 2:
- _basestring = str
- else:
- _basestring = basestring
- if isinstance(command, _basestring):
+ if isinstance(command, six.string_types):
self.cmd = [arg for arg in command.split() if arg]
self.daemon = None
self.name = os.path.basename(self.cmd[0])
diff --git a/library/python/testing/swag/lib/ya.make b/library/python/testing/swag/lib/ya.make
index e3e7cfa300..0567288a62 100644
--- a/library/python/testing/swag/lib/ya.make
+++ b/library/python/testing/swag/lib/ya.make
@@ -1,6 +1,7 @@
PY23_LIBRARY()
PEERDIR(
+ contrib/python/six
contrib/python/protobuf
library/python/testing/yatest_common
)
diff --git a/ydb/core/persqueue/ut/partition_chooser_ut.cpp b/ydb/core/persqueue/ut/partition_chooser_ut.cpp
index 0d4a4304e1..e43f99ad85 100644
--- a/ydb/core/persqueue/ut/partition_chooser_ut.cpp
+++ b/ydb/core/persqueue/ut/partition_chooser_ut.cpp
@@ -99,14 +99,6 @@ Y_UNIT_TEST(TBoundaryChooser_GetTabletIdTest) {
UNIT_ASSERT(!chooser.GetPartition(666));
}
-Y_UNIT_TEST(TBoundaryChooser_GetRandomPartitionTest) {
- auto config = CreateConfig(SMEnabled);
-
- NKikimr::NPQ::NPartitionChooser::TBoundaryChooser chooser(config);
- auto p = chooser.GetRandomPartition();
- UNIT_ASSERT(p->PartitionId != 3);
-}
-
Y_UNIT_TEST(THashChooserTest) {
auto config = CreateConfig(SMDisabled);
@@ -145,14 +137,6 @@ Y_UNIT_TEST(THashChooser_GetTabletIdTest) {
UNIT_ASSERT(!chooser.GetPartition(666));
}
-Y_UNIT_TEST(THashChooser_GetRandomPartitionTest) {
- auto config = CreateConfig(SMDisabled);
-
- NKikimr::NPQ::NPartitionChooser::THashChooser chooser(config);
- auto p = chooser.GetRandomPartition();
- UNIT_ASSERT(p->PartitionId != 3);
-}
-
struct TWriteSessionMock: public NActors::TActorBootstrapped<TWriteSessionMock> {
@@ -277,7 +261,6 @@ Y_UNIT_TEST(TPartitionChooserActor_SplitMergeEnabled_Test) {
Y_UNIT_TEST(TPartitionChooserActor_SplitMergeDisabled_Test) {
NPersQueue::TTestServer server{};
- server.CleverServer->GetRuntime()->GetAppData().PQConfig.SetRoundRobinPartitionMapping(false);
server.CleverServer->GetRuntime()->GetAppData().PQConfig.SetTopicsAreFirstClassCitizen(true);
server.CleverServer->GetRuntime()->GetAppData().PQConfig.SetUseSrcIdMetaMappingInFirstClass(true);
diff --git a/ydb/core/persqueue/writer/partition_chooser_impl.h b/ydb/core/persqueue/writer/partition_chooser_impl.h
index 4e954b02c4..c2f618d86e 100644
--- a/ydb/core/persqueue/writer/partition_chooser_impl.h
+++ b/ydb/core/persqueue/writer/partition_chooser_impl.h
@@ -58,7 +58,6 @@ public:
const TPartitionInfo* GetPartition(const TString& sourceId) const;
const TPartitionInfo* GetPartition(ui32 partitionId) const;
- const TPartitionInfo* GetRandomPartition() const;
private:
const TString TopicName;
@@ -79,7 +78,6 @@ public:
const TPartitionInfo* GetPartition(const TString& sourceId) const;
const TPartitionInfo* GetPartition(ui32 partitionId) const;
- const TPartitionInfo* GetRandomPartition() const;
private:
std::vector<TPartitionInfo> Partitions;
@@ -257,11 +255,6 @@ const typename TBoundaryChooser<THasher>::TPartitionInfo* TBoundaryChooser<THash
return it == Partitions.end() ? nullptr : it;
}
-template<class THasher>
-const typename TBoundaryChooser<THasher>::TPartitionInfo* TBoundaryChooser<THasher>::GetRandomPartition() const {
- return &Partitions[RandomNumber<size_t>(Partitions.size())];
-}
-
//
@@ -295,11 +288,6 @@ const typename THashChooser<THasher>::TPartitionInfo* THashChooser<THasher>::Get
return it->PartitionId == partitionId ? it : nullptr;
}
-template<class THasher>
-const typename THashChooser<THasher>::TPartitionInfo* THashChooser<THasher>::GetRandomPartition() const {
- return &Partitions[RandomNumber<size_t>(Partitions.size())];
-}
-
//
// TPartitionChooserActor
@@ -740,10 +728,8 @@ std::pair<bool, const typename TPartitionChooserActor<TChooser>::TPartitionInfo*
return {false, Chooser.GetPartition(PreferedPartition.value())};
} else if (pqConfig.GetTopicsAreFirstClassCitizen() && SourceId) {
return {false, Chooser.GetPartition(SourceId)};
- } else if (pqConfig.GetRoundRobinPartitionMapping()) {
- return {true, nullptr};
} else {
- return {false, Chooser.GetRandomPartition()};
+ return {true, nullptr};
}
}
diff --git a/ydb/core/protos/pqconfig.proto b/ydb/core/protos/pqconfig.proto
index 048686e11e..fddb43eec4 100644
--- a/ydb/core/protos/pqconfig.proto
+++ b/ydb/core/protos/pqconfig.proto
@@ -90,7 +90,7 @@ message TPQConfig {
optional uint32 RemoteClusterEnabledDelaySec = 24 [default = 300]; // 5 minutes
optional uint32 CloseClientSessionWithEnabledRemotePreferredClusterDelaySec = 25 [default = 300]; // 5 minutes
- optional bool RoundRobinPartitionMapping = 26 [default = true];
+ reserved 26; // optional bool RoundRobinPartitionMapping = 26 [default = true];
optional string Root = 27 [default = ""];
optional string TestDatabaseRoot = 47; // For unit-tests only
diff --git a/ydb/core/testlib/test_pq_client.h b/ydb/core/testlib/test_pq_client.h
index b159c0249e..7448bfd6ca 100644
--- a/ydb/core/testlib/test_pq_client.h
+++ b/ydb/core/testlib/test_pq_client.h
@@ -25,14 +25,13 @@ using namespace NKikimr::Tests;
const static ui32 PQ_DEFAULT_NODE_COUNT = 2;
-inline Tests::TServerSettings PQSettings(ui16 port = 0, ui32 nodesCount = PQ_DEFAULT_NODE_COUNT, bool roundrobin = true, const TString& yql_timeout = "10", const THolder<TTempFileHandle>& netDataFile = nullptr) {
+inline Tests::TServerSettings PQSettings(ui16 port = 0, ui32 nodesCount = PQ_DEFAULT_NODE_COUNT, const TString& yql_timeout = "10", const THolder<TTempFileHandle>& netDataFile = nullptr) {
NKikimrPQ::TPQConfig pqConfig;
NKikimrProto::TAuthConfig authConfig;
authConfig.SetUseBlackBox(false);
authConfig.SetUseAccessService(false);
authConfig.SetUseAccessServiceTLS(false);
authConfig.SetUseStaff(false);
- pqConfig.SetRoundRobinPartitionMapping(roundrobin);
pqConfig.SetEnabled(true);
pqConfig.SetMaxReadCookies(10);
@@ -72,6 +71,13 @@ inline Tests::TServerSettings PQSettings(ui16 port = 0, ui32 nodesCount = PQ_DEF
return settings;
}
+// deprecated.
+inline Tests::TServerSettings PQSettings(ui16 port, ui32 nodesCount, bool roundrobin, const TString& yql_timeout = "10", const THolder<TTempFileHandle>& netDataFile = nullptr) {
+ Y_UNUSED(roundrobin);
+
+ return PQSettings(port, nodesCount, yql_timeout, netDataFile);
+}
+
const TString TopicPrefix = "/Root/PQ/";
const static TString DEFAULT_SRC_IDS_PATH = "/Root/PQ/SourceIdMeta2";
diff --git a/ydb/core/tx/datashard/datashard_ut_change_exchange.cpp b/ydb/core/tx/datashard/datashard_ut_change_exchange.cpp
index 1c02e8c202..e370a9b9a4 100644
--- a/ydb/core/tx/datashard/datashard_ut_change_exchange.cpp
+++ b/ydb/core/tx/datashard/datashard_ut_change_exchange.cpp
@@ -693,7 +693,6 @@ Y_UNIT_TEST_SUITE(Cdc) {
NKikimrPQ::TPQConfig pqConfig;
pqConfig.SetEnabled(true);
pqConfig.SetEnableProtoSourceIdInfo(true);
- pqConfig.SetRoundRobinPartitionMapping(true);
pqConfig.SetTopicsAreFirstClassCitizen(true);
pqConfig.SetMaxReadCookies(10);
pqConfig.AddClientServiceType()->SetName("data-streams");
diff --git a/ydb/public/lib/ydb_cli/common/pretty_table.cpp b/ydb/public/lib/ydb_cli/common/pretty_table.cpp
index 6af8836d5b..0dc07572ee 100644
--- a/ydb/public/lib/ydb_cli/common/pretty_table.cpp
+++ b/ydb/public/lib/ydb_cli/common/pretty_table.cpp
@@ -61,28 +61,31 @@ bool TPrettyTable::TRow::PrintColumns(IOutputStream& o, const TVector<size_t>& w
o << " │ ";
}
- if (const size_t width = widths.at(columnIndex)) {
+ if (size_t width = widths.at(columnIndex)) {
const auto& column = Columns.at(columnIndex);
TStringBuf data;
+ size_t extraBytes;
size_t l = 0;
for (const auto& line : column) {
data = line;
+ extraBytes = ExtraBytes(data);
while (data && l < lineNumber) {
- data.Skip(width);
+ data.Skip(width + extraBytes);
++l;
}
}
-
- size_t extraBytes = ExtraBytes(data);
+ extraBytes = ExtraBytes(data);
+ width += extraBytes;
+
if (data) {
- o << RightPad(data.SubStr(0, width + extraBytes), width + extraBytes);
+ o << RightPad(data.SubStr(0, width), width);
} else {
- o << RightPad(' ', width + extraBytes);
+ o << RightPad(' ', width);
}
- if (data.size() > width + extraBytes) {
+ if (data.size() > width) {
next = true;
}
}
diff --git a/ydb/services/persqueue_v1/persqueue_ut.cpp b/ydb/services/persqueue_v1/persqueue_ut.cpp
index 874dea2e62..7d9b8f60ea 100644
--- a/ydb/services/persqueue_v1/persqueue_ut.cpp
+++ b/ydb/services/persqueue_v1/persqueue_ut.cpp
@@ -1509,8 +1509,8 @@ Y_UNIT_TEST_SUITE(TPersQueueTest) {
UNIT_ASSERT_C(resp.server_message_case() == Ydb::Topic::StreamReadMessage::FromServer::kReadResponse, resp);
}
- void SetupWriteSessionImpl(bool rr) {
- NPersQueue::TTestServer server{PQSettings(0, 2, rr), false};
+ Y_UNIT_TEST(SetupWriteSession) {
+ NPersQueue::TTestServer server{PQSettings(0, 2), false};
server.ServerSettings.SetEnableSystemViews(false);
server.StartServer();
@@ -1550,17 +1550,10 @@ Y_UNIT_TEST_SUITE(TPersQueueTest) {
for (ui32 i = 0; i < 15*5; ++i) {
ss[writer.InitSession("sid_rand_" + ToString<ui32>(i), 0, true)]++;
}
- if (rr) {
- for (auto& s : ss) {
- Cerr << "Round robin check: " << s.first << ":" << s.second << "\n";
- UNIT_ASSERT(s.second >= 4 && s.second <= 6);
- }
+ for (auto& s : ss) {
+ Cerr << "Round robin check: " << s.first << ":" << s.second << "\n";
+ UNIT_ASSERT(s.second >= 4 && s.second <= 6);
}
- }
-
- Y_UNIT_TEST(SetupWriteSession) {
- SetupWriteSessionImpl(false);
- SetupWriteSessionImpl(true);
}
Y_UNIT_TEST(StoreNoMoreThanXSourceIDs) {
@@ -3158,7 +3151,7 @@ Y_UNIT_TEST_SUITE(TPersQueueTest) {
UNIT_ASSERT(equal);
};
- NPersQueue::TTestServer server(PQSettings(0, 1, true, "10"), false);
+ NPersQueue::TTestServer server(PQSettings(0, 1, "10"), false);
auto netDataUpdated = server.PrepareNetDataFile(FormNetData());
UNIT_ASSERT(netDataUpdated);
server.StartServer();
diff --git a/yt/yt/client/api/rpc_proxy/connection_impl.cpp b/yt/yt/client/api/rpc_proxy/connection_impl.cpp
index aa5d4e1080..179cee1b2a 100644
--- a/yt/yt/client/api/rpc_proxy/connection_impl.cpp
+++ b/yt/yt/client/api/rpc_proxy/connection_impl.cpp
@@ -70,10 +70,7 @@ void ApplyProxyUrlAliasingRules(TString& url, const std::optional<THashMap<TStri
{
static const auto rulesFromEnv = ParseProxyUrlAliasingRules(GetEnv("YT_PROXY_URL_ALIASING_CONFIG"));
- const THashMap<TString, TString>& rules =
- proxyUrlAliasingRules
- ? proxyUrlAliasingRules.value()
- : rulesFromEnv;
+ const auto& rules = proxyUrlAliasingRules.value_or(rulesFromEnv);
if (auto ruleIt = rules.find(url); ruleIt != rules.end()) {
url = ruleIt->second;
@@ -82,7 +79,6 @@ void ApplyProxyUrlAliasingRules(TString& url, const std::optional<THashMap<TStri
TString NormalizeHttpProxyUrl(TString url, const std::optional<THashMap<TString, TString>>& proxyUrlAliasingRules)
{
-
ApplyProxyUrlAliasingRules(url, proxyUrlAliasingRules);
if (url.find('.') == TString::npos &&
diff --git a/yt/yt/core/rpc/bus/channel.cpp b/yt/yt/core/rpc/bus/channel.cpp
index ec8c6fd04d..6c53708a72 100644
--- a/yt/yt/core/rpc/bus/channel.cpp
+++ b/yt/yt/core/rpc/bus/channel.cpp
@@ -129,6 +129,20 @@ public:
Terminated_.Unsubscribe(callback);
}
+ int GetInflightRequestCount() override
+ {
+ int requestCount = 0;
+
+ for (auto& bucket : Buckets_) {
+ auto guard = ReaderGuard(bucket.Lock);
+ for (const auto& session : bucket.Sessions) {
+ requestCount += session->GetInflightRequestCount();
+ }
+ }
+
+ return requestCount;
+ }
+
private:
class TSession;
using TSessionPtr = TIntrusivePtr<TSession>;
@@ -260,7 +274,6 @@ private:
private:
const TWeakPtr<TSession> Session_;
-
};
//! Directs requests sent via a channel to go through its underlying bus.
@@ -294,7 +307,7 @@ private:
// Mark the channel as terminated to disallow any further usage.
for (auto& bucket : RequestBuckets_) {
- auto guard = Guard(bucket.Lock);
+ auto guard = Guard(bucket);
bucket.Terminated = true;
@@ -385,7 +398,7 @@ private:
IClientResponseHandlerPtr responseHandler;
{
- auto guard = Guard(bucket->Lock);
+ auto guard = Guard(*bucket);
auto it = bucket->ActiveRequestMap.find(requestId);
if (it == bucket->ActiveRequestMap.end()) {
@@ -502,7 +515,7 @@ private:
IClientResponseHandlerPtr responseHandler;
{
- auto guard = Guard(bucket->Lock);
+ auto guard = Guard(*bucket);
if (!requestControl->IsActive(guard)) {
return;
@@ -542,7 +555,7 @@ private:
IClientResponseHandlerPtr responseHandler;
{
- auto guard = Guard(bucket->Lock);
+ auto guard = Guard(*bucket);
if (!requestControl->IsActive(guard)) {
return;
@@ -600,18 +613,48 @@ private:
}
}
+ int GetInflightRequestCount()
+ {
+ int requestCount = 0;
+
+ for (auto& bucket : RequestBuckets_) {
+ requestCount += bucket.ActiveRequestCount.load(
+ std::memory_order::relaxed);
+ }
+
+ return requestCount;
+ }
+
private:
const TTosLevel TosLevel_;
IBusPtr Bus_;
std::atomic<bool> BusReady_ = false;
- struct TBucket
+ class TBucket
{
- YT_DECLARE_SPIN_LOCK(NThreading::TSpinLock, Lock);
+ public:
IBusPtr Bus;
bool Terminated = false;
THashMap<TRequestId, TClientRequestControlPtr> ActiveRequestMap;
+ std::atomic<int> ActiveRequestCount = 0;
+
+ void Acquire() noexcept
+ {
+ Lock.Acquire();
+ }
+
+ void Release() noexcept
+ {
+ if (std::ssize(ActiveRequestMap) != ActiveRequestCount.load(std::memory_order::relaxed)) {
+ ActiveRequestCount.store(std::ssize(ActiveRequestMap), std::memory_order::relaxed);
+ }
+
+ Lock.Release();
+ }
+
+ private:
+ YT_DECLARE_SPIN_LOCK(NThreading::TSpinLock, Lock);
};
static constexpr size_t BucketCount = 64;
@@ -687,7 +730,7 @@ private:
VERIFY_THREAD_AFFINITY_ANY();
auto* bucket = GetBucketForRequest(requestId);
- auto guard = Guard(bucket->Lock);
+ auto guard = Guard(*bucket);
auto it = bucket->ActiveRequestMap.find(requestId);
if (it == bucket->ActiveRequestMap.end()) {
@@ -719,7 +762,7 @@ private:
TClientRequestControlPtr existingRequestControl;
IClientResponseHandlerPtr existingResponseHandler;
{
- auto guard = Guard(bucket->Lock);
+ auto guard = Guard(*bucket);
if (!requestControl->IsActive(guard)) {
return;
@@ -826,7 +869,7 @@ private:
TClientRequestControlPtr requestControl;
IClientResponseHandlerPtr responseHandler;
{
- auto guard = Guard(bucket->Lock);
+ auto guard = Guard(*bucket);
if (bucket->Terminated) {
YT_LOG_WARNING("Response received via a terminated channel (RequestId: %v)",
@@ -972,7 +1015,7 @@ private:
TClientRequestControlPtr requestControl;
IClientResponseHandlerPtr responseHandler;
{
- auto guard = Guard(bucket->Lock);
+ auto guard = Guard(*bucket);
auto it = bucket->ActiveRequestMap.find(requestId);
if (it == bucket->ActiveRequestMap.end()) {
@@ -1120,7 +1163,8 @@ private:
return TraceContext_.MakeTraceContextGuard();
}
- bool IsActive(const TGuard<NThreading::TSpinLock>&) const
+ template <typename TLock>
+ bool IsActive(const TGuard<TLock>&) const
{
return static_cast<bool>(ResponseHandler_);
}
@@ -1142,12 +1186,14 @@ private:
TDelayedExecutor::CancelAndClear(AcknowledgementTimeoutCookie_);
}
- IClientResponseHandlerPtr GetResponseHandler(const TGuard<NThreading::TSpinLock>&)
+ template <typename TLock>
+ IClientResponseHandlerPtr GetResponseHandler(const TGuard<TLock>&)
{
return ResponseHandler_;
}
- IClientResponseHandlerPtr Finalize(const TGuard<NThreading::TSpinLock>&)
+ template <typename TLock>
+ IClientResponseHandlerPtr Finalize(const TGuard<TLock>&)
{
TotalTime_ = ProfileComplete();
TDelayedExecutor::CancelAndClear(TimeoutCookie_);
diff --git a/yt/yt/core/rpc/channel.h b/yt/yt/core/rpc/channel.h
index 9ae2a664ee..8248a938d5 100644
--- a/yt/yt/core/rpc/channel.h
+++ b/yt/yt/core/rpc/channel.h
@@ -121,6 +121,8 @@ struct IChannel
//! Raised whenever the channel is terminated.
DECLARE_INTERFACE_SIGNAL(void(const TError&), Terminated);
+
+ virtual int GetInflightRequestCount() = 0;
};
DEFINE_REFCOUNTED_TYPE(IChannel)
diff --git a/yt/yt/core/rpc/channel_detail.cpp b/yt/yt/core/rpc/channel_detail.cpp
index 295457f947..181a779f48 100644
--- a/yt/yt/core/rpc/channel_detail.cpp
+++ b/yt/yt/core/rpc/channel_detail.cpp
@@ -57,6 +57,11 @@ void TChannelWrapper::UnsubscribeTerminated(const TCallback<void(const TError&)>
UnderlyingChannel_->UnsubscribeTerminated(callback);
}
+int TChannelWrapper::GetInflightRequestCount()
+{
+ return UnderlyingChannel_->GetInflightRequestCount();
+}
+
////////////////////////////////////////////////////////////////////////////////
void TClientRequestControlThunk::SetUnderlying(IClientRequestControlPtr underlying)
diff --git a/yt/yt/core/rpc/channel_detail.h b/yt/yt/core/rpc/channel_detail.h
index d23d2346b9..a3ca87a5d4 100644
--- a/yt/yt/core/rpc/channel_detail.h
+++ b/yt/yt/core/rpc/channel_detail.h
@@ -27,6 +27,8 @@ public:
void SubscribeTerminated(const TCallback<void(const TError&)>& callback) override;
void UnsubscribeTerminated(const TCallback<void(const TError&)>& callback) override;
+ int GetInflightRequestCount() override;
+
protected:
const IChannelPtr UnderlyingChannel_;
};
diff --git a/yt/yt/core/rpc/config.cpp b/yt/yt/core/rpc/config.cpp
index bb1519dafe..594b83636a 100644
--- a/yt/yt/core/rpc/config.cpp
+++ b/yt/yt/core/rpc/config.cpp
@@ -143,6 +143,9 @@ void TViablePeerRegistryConfig::Register(TRegistrar registrar)
.GreaterThanOrEqual(0)
.Default(0);
+ registrar.Parameter("enable_power_of_two_choices_strategy", &TThis::EnablePowerOfTwoChoicesStrategy)
+ .Default(false);
+
registrar.Postprocessor([] (TThis* config) {
if (config->MinPeerCountForPriorityAwareness > config->MaxPeerCount) {
THROW_ERROR_EXCEPTION(
diff --git a/yt/yt/core/rpc/config.h b/yt/yt/core/rpc/config.h
index 6659daf9e2..709101f7c9 100644
--- a/yt/yt/core/rpc/config.h
+++ b/yt/yt/core/rpc/config.h
@@ -229,6 +229,8 @@ public:
//! If you want to set bigger values, you must also increase MaxPeerCount to accommodate more peers.
int MinPeerCountForPriorityAwareness;
+ bool EnablePowerOfTwoChoicesStrategy;
+
REGISTER_YSON_STRUCT(TViablePeerRegistryConfig);
static void Register(TRegistrar registrar);
diff --git a/yt/yt/core/rpc/grpc/channel.cpp b/yt/yt/core/rpc/grpc/channel.cpp
index bfbc002505..1d09a0d249 100644
--- a/yt/yt/core/rpc/grpc/channel.cpp
+++ b/yt/yt/core/rpc/grpc/channel.cpp
@@ -209,6 +209,11 @@ public:
return EndpointAddress_;
}
+ int GetInflightRequestCount() override
+ {
+ YT_UNIMPLEMENTED();
+ }
+
private:
const TChannelConfigPtr Config_;
const TString EndpointAddress_;
diff --git a/yt/yt/core/rpc/hedging_channel.cpp b/yt/yt/core/rpc/hedging_channel.cpp
index a0e379af62..b97c089b49 100644
--- a/yt/yt/core/rpc/hedging_channel.cpp
+++ b/yt/yt/core/rpc/hedging_channel.cpp
@@ -387,6 +387,11 @@ public:
BackupChannel_->UnsubscribeTerminated(callback);
}
+ int GetInflightRequestCount() override
+ {
+ YT_UNIMPLEMENTED();
+ }
+
private:
const IChannelPtr PrimaryChannel_;
const IChannelPtr BackupChannel_;
diff --git a/yt/yt/core/rpc/local_channel.cpp b/yt/yt/core/rpc/local_channel.cpp
index 27a41b8976..948b3793a7 100644
--- a/yt/yt/core/rpc/local_channel.cpp
+++ b/yt/yt/core/rpc/local_channel.cpp
@@ -128,6 +128,11 @@ public:
Terminated_.Unsubscribe(callback);
}
+ int GetInflightRequestCount() override
+ {
+ return 0;
+ }
+
private:
class TSession;
using TSessionPtr = TIntrusivePtr<TSession>;
diff --git a/yt/yt/core/rpc/null_channel.cpp b/yt/yt/core/rpc/null_channel.cpp
index 73b12055dd..46f0c82520 100644
--- a/yt/yt/core/rpc/null_channel.cpp
+++ b/yt/yt/core/rpc/null_channel.cpp
@@ -43,6 +43,11 @@ public:
DEFINE_SIGNAL_OVERRIDE(void(const TError&), Terminated);
+ int GetInflightRequestCount() override
+ {
+ return 0;
+ }
+
private:
const TString Address_;
};
diff --git a/yt/yt/core/rpc/roaming_channel.cpp b/yt/yt/core/rpc/roaming_channel.cpp
index ef3c0f9297..a1b531b586 100644
--- a/yt/yt/core/rpc/roaming_channel.cpp
+++ b/yt/yt/core/rpc/roaming_channel.cpp
@@ -177,9 +177,13 @@ public:
void UnsubscribeTerminated(const TCallback<void(const TError&)>& /*callback*/) override
{ }
+ int GetInflightRequestCount() override
+ {
+ return 0;
+ }
+
private:
const IRoamingChannelProviderPtr Provider_;
-
};
IChannelPtr CreateRoamingChannel(IRoamingChannelProviderPtr provider)
diff --git a/yt/yt/core/rpc/unittests/viable_peer_registry_ut.cpp b/yt/yt/core/rpc/unittests/viable_peer_registry_ut.cpp
index fada327808..20055b71e9 100644
--- a/yt/yt/core/rpc/unittests/viable_peer_registry_ut.cpp
+++ b/yt/yt/core/rpc/unittests/viable_peer_registry_ut.cpp
@@ -124,6 +124,11 @@ public:
}
}
+ int GetInflightRequestCount() override
+ {
+ return 0;
+ }
+
DEFINE_SIGNAL_OVERRIDE(void(const TError&), Terminated);
private:
TString Address_;
diff --git a/yt/yt/core/rpc/viable_peer_registry.cpp b/yt/yt/core/rpc/viable_peer_registry.cpp
index b96b81a70f..5e9e0e5d22 100644
--- a/yt/yt/core/rpc/viable_peer_registry.cpp
+++ b/yt/yt/core/rpc/viable_peer_registry.cpp
@@ -248,6 +248,28 @@ public:
return peers;
}
+ IChannelPtr PickChannelFromTwoRandom(const IClientRequestPtr& request) const
+ {
+ auto peers = PickRandomPeers(/*peerCount*/ 2);
+ const auto& channelOne = peers.front();
+ const auto& channelTwo = peers.back();
+
+ auto getLoad = [] (const auto& channel) {
+ return channel.second->GetInflightRequestCount();
+ };
+
+ const auto& theWinner = getLoad(channelOne) < getLoad(channelTwo) ? channelOne : channelTwo;
+
+ YT_LOG_DEBUG(
+ "Selected a peer via the power of two choices strategy (RequestId: %v, Peer1: %v, Peer2: %v, Winner: %v)",
+ request ? request->GetRequestId() : TRequestId(),
+ channelOne.first,
+ channelTwo.first,
+ theWinner.first);
+
+ return theWinner.second;
+ }
+
IChannelPtr PickRandomChannel(
const IClientRequestPtr& request,
const std::optional<THedgingChannelOptions>& hedgingOptions) const override
@@ -273,6 +295,8 @@ public:
request ? request->GetRequestId() : TRequestId(),
primaryPeer.first,
backupPeer.first);
+ } else if (Config_->EnablePowerOfTwoChoicesStrategy && ActivePeerToPriority_.Size() >= 2) {
+ return PickChannelFromTwoRandom(request);
} else {
auto peer = PickRandomPeers()[0];
channel = peer.second;