aboutsummaryrefslogtreecommitdiffstats
path: root/yt
diff options
context:
space:
mode:
authorAlexander Smirnov <alex@ydb.tech>2025-02-13 18:38:05 +0000
committerAlexander Smirnov <alex@ydb.tech>2025-02-13 18:38:05 +0000
commit28180f60aec6dcb2b662b6417c90226553ebe2dc (patch)
tree9ca4d2b0ea989b075f60d2746159e891c1aa77f7 /yt
parent09744cf9fbdd1cd31f648b5fabc8a9ed09875e3b (diff)
parent36161988ade9e56ec69a44ba4ff084ede6e44ee7 (diff)
downloadydb-28180f60aec6dcb2b662b6417c90226553ebe2dc.tar.gz
Merge pull request #14512 from ydb-platform/merge-libs-250213-0050
Diffstat (limited to 'yt')
-rw-r--r--yt/cpp/mapreduce/client/operation.cpp6
-rw-r--r--yt/cpp/mapreduce/interface/operation.h6
-rw-r--r--yt/yql/providers/yt/lib/mkql_helpers/mkql_helpers.cpp6
-rw-r--r--yt/yql/providers/yt/lib/res_pull/table_limiter.cpp7
-rw-r--r--yt/yql/providers/yt/provider/yql_yt_dq_hybrid.cpp42
-rw-r--r--yt/yql/providers/yt/provider/yql_yt_helpers.cpp25
-rw-r--r--yt/yql/providers/yt/provider/yql_yt_optimize.cpp4
-rw-r--r--yt/yql/tests/sql/suites/limit/dynamic_limit_offset_overflow.sql14
-rw-r--r--yt/yql/tests/sql/suites/select/sample_limit_recordindex.cfg2
-rw-r--r--yt/yql/tests/sql/suites/select/sample_limit_recordindex.sql12
-rw-r--r--yt/yt/client/kafka/requests.cpp4
-rw-r--r--yt/yt/client/kafka/requests.h8
-rw-r--r--yt/yt/core/bus/client.h5
-rw-r--r--yt/yt/core/bus/server.h8
-rw-r--r--yt/yt/core/bus/tcp/client.cpp10
-rw-r--r--yt/yt/core/bus/tcp/config.cpp14
-rw-r--r--yt/yt/core/bus/tcp/config.h41
-rw-r--r--yt/yt/core/bus/tcp/connection.cpp16
-rw-r--r--yt/yt/core/bus/tcp/connection.h4
-rw-r--r--yt/yt/core/bus/tcp/public.h6
-rw-r--r--yt/yt/core/bus/tcp/server.cpp34
-rw-r--r--yt/yt/core/concurrency/arcadia_interop-inl.h68
-rw-r--r--yt/yt/core/concurrency/arcadia_interop.h23
-rw-r--r--yt/yt/core/concurrency/unittests/arcadia_interop_ut.cpp137
-rw-r--r--yt/yt/core/concurrency/unittests/ya.make2
25 files changed, 473 insertions, 31 deletions
diff --git a/yt/cpp/mapreduce/client/operation.cpp b/yt/cpp/mapreduce/client/operation.cpp
index c8cbe5b644..76a55b436f 100644
--- a/yt/cpp/mapreduce/client/operation.cpp
+++ b/yt/cpp/mapreduce/client/operation.cpp
@@ -879,6 +879,12 @@ void BuildCommonOperationPart(
if (baseSpec.MaxFailedJobCount_.Defined()) {
(*specNode)["max_failed_job_count"] = *baseSpec.MaxFailedJobCount_;
}
+ if (baseSpec.Description_.Defined()) {
+ (*specNode)["description"] = *baseSpec.Description_;
+ }
+ if (baseSpec.Annotations_.Defined()) {
+ (*specNode)["annotations"] = *baseSpec.Annotations_;
+ }
}
template <typename TSpec>
diff --git a/yt/cpp/mapreduce/interface/operation.h b/yt/cpp/mapreduce/interface/operation.h
index 218ead0572..c1beb5375d 100644
--- a/yt/cpp/mapreduce/interface/operation.h
+++ b/yt/cpp/mapreduce/interface/operation.h
@@ -542,6 +542,12 @@ struct TOperationSpecBase
/// How many jobs can fail before operation is failed.
FLUENT_FIELD_OPTION(ui64, MaxFailedJobCount);
+
+ // Arbitrary structured information related to the operation.
+ FLUENT_FIELD_OPTION(TNode, Annotations);
+
+ // Similar to Annotations, shown on the operation page. Recommends concise, human-readable entries to prevent clutter.
+ FLUENT_FIELD_OPTION(TNode, Description);
};
///
diff --git a/yt/yql/providers/yt/lib/mkql_helpers/mkql_helpers.cpp b/yt/yql/providers/yt/lib/mkql_helpers/mkql_helpers.cpp
index bc307c3178..7916821ca4 100644
--- a/yt/yql/providers/yt/lib/mkql_helpers/mkql_helpers.cpp
+++ b/yt/yql/providers/yt/lib/mkql_helpers/mkql_helpers.cpp
@@ -69,7 +69,11 @@ void TRecordsRange::Fill(const TExprNode& settingsNode) {
if (settingName != TStringBuf("take") && settingName != TStringBuf("skip")) {
continue;
}
- YQL_ENSURE(setting->Child(1)->IsCallable("Uint64"));
+ if (!setting->Child(1)->IsCallable("Uint64")) {
+ Offset.Clear();
+ Limit.Clear();
+ return;
+ }
if (!UpdateRecordsRange(*this, settingName, NYql::FromString<ui64>(*setting->Child(1)->Child(0), NUdf::EDataSlot::Uint64))) {
break;
}
diff --git a/yt/yql/providers/yt/lib/res_pull/table_limiter.cpp b/yt/yql/providers/yt/lib/res_pull/table_limiter.cpp
index 98731da58f..ddd4781ed6 100644
--- a/yt/yql/providers/yt/lib/res_pull/table_limiter.cpp
+++ b/yt/yql/providers/yt/lib/res_pull/table_limiter.cpp
@@ -8,11 +8,16 @@ namespace NYql {
TTableLimiter::TTableLimiter(const TRecordsRange& range)
: Start(range.Offset.GetOrElse(0ULL))
- , End(range.Limit.Defined() ? Start + *range.Limit : Max())
, Current(0ULL)
, TableStart(0ULL)
, TableEnd(Max())
{
+ const auto limit = range.Limit.GetOrElse(Max());
+ if (limit > Max<ui64>() - Start) {
+ End = Max();
+ } else {
+ End = Start + limit;
+ }
}
bool TTableLimiter::NextTable(ui64 recordCount) {
diff --git a/yt/yql/providers/yt/provider/yql_yt_dq_hybrid.cpp b/yt/yql/providers/yt/provider/yql_yt_dq_hybrid.cpp
index 0c8c9fdf6c..b3233e37f1 100644
--- a/yt/yql/providers/yt/provider/yql_yt_dq_hybrid.cpp
+++ b/yt/yql/providers/yt/provider/yql_yt_dq_hybrid.cpp
@@ -279,31 +279,41 @@ private:
.Build()
.Done();
- TExprNode::TPtr limit;
- if (const auto& limitNode = NYql::GetSetting(sort.Settings().Ref(), EYtSettingType::Limit)) {
- limit = GetLimitExpr(limitNode, ctx);
- }
-
+ TExprNode::TPtr work;
auto [direct, selector] = GetOutputSortSettings(sort, ctx);
- auto work = direct && selector ?
- limit ?
- Build<TCoTopSort>(ctx, sort.Pos())
+ if (direct && selector) {
+ // Don't use runtime limit for TopSort - it may have max<ui64>() value, which cause TopSort to fail
+ TMaybe<ui64> limit = GetLimit(sort.Settings().Ref());
+ work = limit
+ ? Build<TCoTopSort>(ctx, sort.Pos())
.Input(input)
- .Count(std::move(limit))
+ .Count<TCoUint64>()
+ .Literal()
+ .Value(ToString(*limit), TNodeFlags::Default)
+ .Build()
+ .Build()
.SortDirections(std::move(direct))
.KeySelectorLambda(std::move(selector))
- .Done().Ptr():
- Build<TCoSort>(ctx, sort.Pos())
+ .Done().Ptr()
+ : Build<TCoSort>(ctx, sort.Pos())
.Input(input)
.SortDirections(std::move(direct))
.KeySelectorLambda(std::move(selector))
- .Done().Ptr():
- limit ?
- Build<TCoTake>(ctx, sort.Pos())
+ .Done().Ptr()
+ ;
+ } else {
+ TExprNode::TPtr limit;
+ if (const auto& limitNode = NYql::GetSetting(sort.Settings().Ref(), EYtSettingType::Limit)) {
+ limit = GetLimitExpr(limitNode, ctx);
+ }
+
+ work = limit
+ ? Build<TCoTake>(ctx, sort.Pos())
.Input(input)
.Count(std::move(limit))
- .Done().Ptr():
- input.Ptr();
+ .Done().Ptr()
+ : input.Ptr();
+ }
auto settings = NYql::AddSetting(sort.Settings().Ref(), EYtSettingType::NoDq, {}, ctx);
auto operation = ctx.ChangeChild(sort.Ref(), TYtTransientOpBase::idx_Settings, std::move(settings));
diff --git a/yt/yql/providers/yt/provider/yql_yt_helpers.cpp b/yt/yql/providers/yt/provider/yql_yt_helpers.cpp
index 97b9fdeda9..d8b1f20823 100644
--- a/yt/yql/providers/yt/provider/yql_yt_helpers.cpp
+++ b/yt/yql/providers/yt/provider/yql_yt_helpers.cpp
@@ -905,7 +905,30 @@ TExprNode::TPtr GetLimitExpr(const TExprNode::TPtr& limitSetting, TExprContext&
}
if (skip) {
- limitValues.push_back(ctx.NewCallable(child->Pos(), "+", { take, skip }));
+ auto uintMax = ctx.Builder(child->Pos())
+ .Callable("Uint64")
+ .Atom(0, ToString(Max<ui64>()), TNodeFlags::Default)
+ .Seal()
+ .Build();
+ limitValues.push_back(
+ ctx.Builder(child->Pos())
+ .Callable("If")
+ .Callable(0, ">")
+ .Add(0, take)
+ .Callable(1, "-")
+ .Add(0, uintMax)
+ .Add(1, skip)
+ .Seal()
+ .Seal()
+ .Add(1, uintMax)
+ .Callable(2, "+")
+ .Add(0, take)
+ .Add(1, skip)
+ .Seal()
+ .Seal()
+ .Build()
+ );
+
} else {
limitValues.push_back(take);
}
diff --git a/yt/yql/providers/yt/provider/yql_yt_optimize.cpp b/yt/yql/providers/yt/provider/yql_yt_optimize.cpp
index 0137b343eb..cd0b5e2ade 100644
--- a/yt/yql/providers/yt/provider/yql_yt_optimize.cpp
+++ b/yt/yql/providers/yt/provider/yql_yt_optimize.cpp
@@ -73,7 +73,7 @@ TMaybeNode<TYtSection> MaterializeSectionIfRequired(TExprBase world, TYtSection
.Paths()
.Add(path)
.Build()
- .Settings(NYql::RemoveSetting(section.Settings().Ref(), EYtSettingType::Sample, ctx))
+ .Settings(NYql::RemoveSettings(section.Settings().Ref(), EYtSettingType::Sample | EYtSettingType::SysColumns, ctx))
.Done();
}
@@ -89,7 +89,7 @@ TMaybeNode<TYtSection> UpdateSectionWithRange(TExprBase world, TYtSection sectio
TVector<TYtPath> skippedPaths;
if (auto limiter = TTableLimiter(range)) {
if (auto materialized = MaterializeSectionIfRequired(world, section, dataSink, outRowSpec, keepSortness,
- {NYql::KeepOnlySettings(section.Settings().Ref(), EYtSettingType::Take | EYtSettingType::Skip | EYtSettingType::SysColumns, ctx)}, state, ctx))
+ {NYql::KeepOnlySettings(section.Settings().Ref(), EYtSettingType::Take | EYtSettingType::Skip, ctx)}, state, ctx))
{
if (!allowMaterialize || state->Types->EvaluationInProgress) {
// Keep section as is
diff --git a/yt/yql/tests/sql/suites/limit/dynamic_limit_offset_overflow.sql b/yt/yql/tests/sql/suites/limit/dynamic_limit_offset_overflow.sql
new file mode 100644
index 0000000000..5452aceb1c
--- /dev/null
+++ b/yt/yql/tests/sql/suites/limit/dynamic_limit_offset_overflow.sql
@@ -0,0 +1,14 @@
+-- YQL-19579
+-- Check that offset + limit don't overflow max uin64
+use plato;
+
+$limit = -1;
+$offset = 2;
+$limit = if($limit >= 0, cast($limit as uint64));
+$offset = if($offset >= 0, cast($offset as uint64));
+
+$i = select distinct key from Input;
+
+select * from $i order by key
+limit $limit offset $offset;
+
diff --git a/yt/yql/tests/sql/suites/select/sample_limit_recordindex.cfg b/yt/yql/tests/sql/suites/select/sample_limit_recordindex.cfg
new file mode 100644
index 0000000000..6c06cba116
--- /dev/null
+++ b/yt/yql/tests/sql/suites/select/sample_limit_recordindex.cfg
@@ -0,0 +1,2 @@
+in Input input1100.txt
+
diff --git a/yt/yql/tests/sql/suites/select/sample_limit_recordindex.sql b/yt/yql/tests/sql/suites/select/sample_limit_recordindex.sql
new file mode 100644
index 0000000000..220fbfa060
--- /dev/null
+++ b/yt/yql/tests/sql/suites/select/sample_limit_recordindex.sql
@@ -0,0 +1,12 @@
+/* custom check: len(yt_res_yson[0][b'Write'][0][b'Data']) <= 5 */
+USE plato;
+
+SELECT
+ key,
+ subkey,
+ TableRecordIndex() AS index
+FROM
+ Input
+SAMPLE 1.0 / 5
+LIMIT 5
+;
diff --git a/yt/yt/client/kafka/requests.cpp b/yt/yt/client/kafka/requests.cpp
index e5ac8928f8..f81e97040d 100644
--- a/yt/yt/client/kafka/requests.cpp
+++ b/yt/yt/client/kafka/requests.cpp
@@ -601,8 +601,10 @@ void TRspFetch::Serialize(IKafkaProtocolWriter* writer, int apiVersion) const
////////////////////////////////////////////////////////////////////////////////
-void TReqSaslHandshake::Deserialize(IKafkaProtocolReader* reader, int /*apiVersion*/)
+void TReqSaslHandshake::Deserialize(IKafkaProtocolReader* reader, int apiVersion)
{
+ ApiVersion = apiVersion;
+
Mechanism = reader->ReadString();
}
diff --git a/yt/yt/client/kafka/requests.h b/yt/yt/client/kafka/requests.h
index e8c4342354..04435ffcaf 100644
--- a/yt/yt/client/kafka/requests.h
+++ b/yt/yt/client/kafka/requests.h
@@ -120,6 +120,13 @@ struct TRecord
////////////////////////////////////////////////////////////////////////////////
+struct TReqBase
+{
+ int ApiVersion = 0;
+};
+
+////////////////////////////////////////////////////////////////////////////////
+
struct TReqApiVersions
{
static constexpr ERequestType RequestType = ERequestType::ApiVersions;
@@ -504,6 +511,7 @@ struct TRspFetch
////////////////////////////////////////////////////////////////////////////////
struct TReqSaslHandshake
+ : public TReqBase
{
static constexpr ERequestType RequestType = ERequestType::SaslHandshake;
diff --git a/yt/yt/core/bus/client.h b/yt/yt/core/bus/client.h
index fa12665fc2..1b9ca361f5 100644
--- a/yt/yt/core/bus/client.h
+++ b/yt/yt/core/bus/client.h
@@ -2,6 +2,8 @@
#include "public.h"
+#include <yt/yt/core/bus/tcp/public.h>
+
#include <yt/yt/core/ytree/public.h>
namespace NYT::NBus {
@@ -28,6 +30,9 @@ struct IBusClient
//! Typically used for constructing errors.
virtual const NYTree::IAttributeDictionary& GetEndpointAttributes() const = 0;
+ //! Apply new dynamic config.
+ virtual void OnDynamicConfigChanged(const NBus::TBusClientDynamicConfigPtr& config) = 0;
+
//! Creates a new bus.
/*!
* The bus will point to the address supplied during construction.
diff --git a/yt/yt/core/bus/server.h b/yt/yt/core/bus/server.h
index dd7a35bc13..d217935c36 100644
--- a/yt/yt/core/bus/server.h
+++ b/yt/yt/core/bus/server.h
@@ -2,6 +2,8 @@
#include "public.h"
+#include <yt/yt/core/bus/tcp/public.h>
+
#include <yt/yt/core/actions/future.h>
namespace NYT::NBus {
@@ -22,6 +24,12 @@ struct IBusServer
*/
virtual void Start(IMessageHandlerPtr handler) = 0;
+ //! Apply new dynamic config.
+ /*
+ * \param config New config.
+ */
+ virtual void OnDynamicConfigChanged(const NBus::TBusServerDynamicConfigPtr& config) = 0;
+
//! Asynchronously stops the listener.
/*!
* After this call the instance is no longer usable.
diff --git a/yt/yt/core/bus/tcp/client.cpp b/yt/yt/core/bus/tcp/client.cpp
index 5dd0117bee..81db62b1ab 100644
--- a/yt/yt/core/bus/tcp/client.cpp
+++ b/yt/yt/core/bus/tcp/client.cpp
@@ -160,6 +160,11 @@ public:
return *EndpointAttributes_;
}
+ void OnDynamicConfigChanged(const NBus::TBusClientDynamicConfigPtr& config) override
+ {
+ DynamicConfig_.Store(config);
+ }
+
IBusPtr CreateBus(IMessageHandlerPtr handler, const TCreateBusOptions& options) override
{
YT_ASSERT_THREAD_AFFINITY_ANY();
@@ -181,7 +186,6 @@ public:
.EndMap());
auto poller = TTcpDispatcher::TImpl::Get()->GetXferPoller();
-
auto connection = New<TTcpConnection>(
Config_,
EConnectionType::Client,
@@ -196,7 +200,8 @@ public:
std::move(handler),
std::move(poller),
PacketTranscoderFactory_,
- MemoryUsageTracker_);
+ MemoryUsageTracker_,
+ DynamicConfig_.Acquire()->NeedRejectConnectionDueMemoryOvercommit);
connection->Start();
return New<TTcpClientBusProxy>(std::move(connection));
@@ -204,6 +209,7 @@ public:
private:
const TBusClientConfigPtr Config_;
+ TAtomicIntrusivePtr<TBusClientDynamicConfig> DynamicConfig_{New<TBusClientDynamicConfig>()};
IPacketTranscoderFactory* const PacketTranscoderFactory_;
const IMemoryUsageTrackerPtr MemoryUsageTracker_;
const std::string EndpointDescription_;
diff --git a/yt/yt/core/bus/tcp/config.cpp b/yt/yt/core/bus/tcp/config.cpp
index f67c6fba6c..809a2d7413 100644
--- a/yt/yt/core/bus/tcp/config.cpp
+++ b/yt/yt/core/bus/tcp/config.cpp
@@ -126,6 +126,9 @@ TBusServerConfigPtr TBusServerConfig::CreateUds(const std::string& socketPath)
return config;
}
+void TBusServerDynamicConfig::Register(TRegistrar /*registrar*/)
+{ }
+
////////////////////////////////////////////////////////////////////////////////
void TBusConfig::Register(TRegistrar registrar)
@@ -170,6 +173,14 @@ void TBusConfig::Register(TRegistrar registrar)
////////////////////////////////////////////////////////////////////////////////
+void TBusDynamicConfig::Register(TRegistrar registrar)
+{
+ registrar.Parameter("need_reject_connection_due_memory_overcommit", &TThis::NeedRejectConnectionDueMemoryOvercommit)
+ .Default(false);
+}
+
+////////////////////////////////////////////////////////////////////////////////
+
void TBusClientConfig::Register(TRegistrar registrar)
{
registrar.Parameter("address", &TThis::Address)
@@ -198,6 +209,9 @@ TBusClientConfigPtr TBusClientConfig::CreateUds(const std::string& socketPath)
return config;
}
+void TBusClientDynamicConfig::Register(TRegistrar /*registrar*/)
+{ }
+
////////////////////////////////////////////////////////////////////////////////
} // namespace NYT::NBus
diff --git a/yt/yt/core/bus/tcp/config.h b/yt/yt/core/bus/tcp/config.h
index 491612fabb..d0a5bde08d 100644
--- a/yt/yt/core/bus/tcp/config.h
+++ b/yt/yt/core/bus/tcp/config.h
@@ -130,6 +130,21 @@ DEFINE_REFCOUNTED_TYPE(TBusConfig)
////////////////////////////////////////////////////////////////////////////////
+class TBusDynamicConfig
+ : public NYTree::TYsonStruct
+{
+public:
+ bool NeedRejectConnectionDueMemoryOvercommit;
+
+ REGISTER_YSON_STRUCT(TBusDynamicConfig);
+
+ static void Register(TRegistrar registrar);
+};
+
+DEFINE_REFCOUNTED_TYPE(TBusDynamicConfig)
+
+////////////////////////////////////////////////////////////////////////////////
+
class TBusServerConfig
: public TBusConfig
{
@@ -151,6 +166,19 @@ DEFINE_REFCOUNTED_TYPE(TBusServerConfig)
////////////////////////////////////////////////////////////////////////////////
+class TBusServerDynamicConfig
+ : public TBusDynamicConfig
+{
+public:
+ REGISTER_YSON_STRUCT(TBusServerDynamicConfig);
+
+ static void Register(TRegistrar registrar);
+};
+
+DEFINE_REFCOUNTED_TYPE(TBusServerDynamicConfig)
+
+////////////////////////////////////////////////////////////////////////////////
+
class TBusClientConfig
: public TBusConfig
{
@@ -170,5 +198,18 @@ DEFINE_REFCOUNTED_TYPE(TBusClientConfig)
////////////////////////////////////////////////////////////////////////////////
+class TBusClientDynamicConfig
+ : public TBusDynamicConfig
+{
+public:
+ REGISTER_YSON_STRUCT(TBusClientDynamicConfig);
+
+ static void Register(TRegistrar registrar);
+};
+
+DEFINE_REFCOUNTED_TYPE(TBusClientDynamicConfig)
+
+////////////////////////////////////////////////////////////////////////////////
+
} // namespace NYT::NBus
diff --git a/yt/yt/core/bus/tcp/connection.cpp b/yt/yt/core/bus/tcp/connection.cpp
index a357e9199d..792cc3d0c4 100644
--- a/yt/yt/core/bus/tcp/connection.cpp
+++ b/yt/yt/core/bus/tcp/connection.cpp
@@ -114,7 +114,8 @@ TTcpConnection::TTcpConnection(
IMessageHandlerPtr handler,
IPollerPtr poller,
IPacketTranscoderFactory* packetTranscoderFactory,
- IMemoryUsageTrackerPtr memoryUsageTracker)
+ IMemoryUsageTrackerPtr memoryUsageTracker,
+ bool needRejectConnectionDueMemoryOvercommit)
: Config_(std::move(config))
, ConnectionType_(connectionType)
, Id_(id)
@@ -139,6 +140,7 @@ TTcpConnection::TTcpConnection(
, EncryptionMode_(Config_->EncryptionMode)
, VerificationMode_(Config_->VerificationMode)
, MemoryUsageTracker_(std::move(memoryUsageTracker))
+ , NeedRejectConnectionDueMemoryOvercommit_(needRejectConnectionDueMemoryOvercommit)
{ }
TTcpConnection::~TTcpConnection()
@@ -596,9 +598,15 @@ void TTcpConnection::InitBuffers()
ConnectionType_ == EConnectionType::Server
? GetRefCountedTypeCookie<TTcpServerConnectionWriteBufferTag>()
: GetRefCountedTypeCookie<TTcpClientConnectionWriteBufferTag>());
- trackedBlob
- .TryReserve(WriteBufferSize)
- .ThrowOnError();
+
+ if (NeedRejectConnectionDueMemoryOvercommit_) {
+ trackedBlob
+ .TryReserve(WriteBufferSize)
+ .ThrowOnError();
+ } else {
+ trackedBlob.Reserve(WriteBufferSize);
+ }
+
WriteBuffers_.push_back(std::move(trackedBlob));
}
diff --git a/yt/yt/core/bus/tcp/connection.h b/yt/yt/core/bus/tcp/connection.h
index bc520f331a..229d244828 100644
--- a/yt/yt/core/bus/tcp/connection.h
+++ b/yt/yt/core/bus/tcp/connection.h
@@ -88,7 +88,8 @@ public:
IMessageHandlerPtr handler,
NConcurrency::IPollerPtr poller,
IPacketTranscoderFactory* packetTranscoderFactory,
- IMemoryUsageTrackerPtr memoryUsageTracker);
+ IMemoryUsageTrackerPtr memoryUsageTracker,
+ bool needRejectConnectionDueMemoryOvercommit);
~TTcpConnection();
@@ -279,6 +280,7 @@ private:
const EVerificationMode VerificationMode_;
const IMemoryUsageTrackerPtr MemoryUsageTracker_;
+ const bool NeedRejectConnectionDueMemoryOvercommit_;
NYTree::IAttributeDictionaryPtr PeerAttributes_;
diff --git a/yt/yt/core/bus/tcp/public.h b/yt/yt/core/bus/tcp/public.h
index 0c86109c1a..aa86f46c4b 100644
--- a/yt/yt/core/bus/tcp/public.h
+++ b/yt/yt/core/bus/tcp/public.h
@@ -14,9 +14,15 @@ using TBusNetworkCountersPtr = TIntrusivePtr<TBusNetworkCounters>;
DECLARE_REFCOUNTED_CLASS(TMultiplexingBandConfig)
DECLARE_REFCOUNTED_CLASS(TTcpDispatcherConfig)
DECLARE_REFCOUNTED_CLASS(TTcpDispatcherDynamicConfig)
+
DECLARE_REFCOUNTED_CLASS(TBusConfig)
+DECLARE_REFCOUNTED_CLASS(TBusDynamicConfig)
+
DECLARE_REFCOUNTED_CLASS(TBusServerConfig)
+DECLARE_REFCOUNTED_CLASS(TBusServerDynamicConfig)
+
DECLARE_REFCOUNTED_CLASS(TBusClientConfig)
+DECLARE_REFCOUNTED_CLASS(TBusClientDynamicConfig)
struct IPacketTranscoderFactory;
diff --git a/yt/yt/core/bus/tcp/server.cpp b/yt/yt/core/bus/tcp/server.cpp
index a9fcf0c8a0..2802f2d5d6 100644
--- a/yt/yt/core/bus/tcp/server.cpp
+++ b/yt/yt/core/bus/tcp/server.cpp
@@ -78,6 +78,13 @@ public:
YT_LOG_INFO("Bus server started");
}
+ void OnDynamicConfigChanged(const NBus::TBusServerDynamicConfigPtr& config)
+ {
+ YT_VERIFY(config);
+
+ DynamicConfig_.Store(config);
+ }
+
TFuture<void> Stop()
{
YT_LOG_INFO("Stopping Bus server");
@@ -122,6 +129,7 @@ public:
protected:
const TBusServerConfigPtr Config_;
+ TAtomicIntrusivePtr<TBusServerDynamicConfig> DynamicConfig_{New<TBusServerDynamicConfig>()};
const IPollerPtr Poller_;
const IMessageHandlerPtr Handler_;
IPacketTranscoderFactory* const PacketTranscoderFactory_;
@@ -254,7 +262,6 @@ protected:
.EndMap());
auto poller = TTcpDispatcher::TImpl::Get()->GetXferPoller();
-
auto connection = New<TTcpConnection>(
Config_,
EConnectionType::Server,
@@ -269,7 +276,8 @@ protected:
Handler_,
std::move(poller),
PacketTranscoderFactory_,
- MemoryUsageTracker_);
+ MemoryUsageTracker_,
+ DynamicConfig_.Acquire()->NeedRejectConnectionDueMemoryOvercommit);
{
auto guard = WriterGuard(ConnectionsSpinLock_);
@@ -435,6 +443,18 @@ public:
Server_.Store(server);
server->Start();
+ server->OnDynamicConfigChanged(DynamicConfig_.Acquire());
+ }
+
+ void OnDynamicConfigChanged(const NBus::TBusServerDynamicConfigPtr& config) final
+ {
+ YT_VERIFY(config);
+
+ DynamicConfig_.Store(config);
+
+ if (auto server = Server_.Acquire()) {
+ server->OnDynamicConfigChanged(config);
+ }
}
TFuture<void> Stop() final
@@ -448,6 +468,7 @@ public:
private:
const TBusServerConfigPtr Config_;
+ TAtomicIntrusivePtr<TBusServerDynamicConfig> DynamicConfig_{New<TBusServerDynamicConfig>()};
IPacketTranscoderFactory* const PacketTranscoderFactory_;
const IMemoryUsageTrackerPtr MemoryUsageTracker_;
@@ -521,6 +542,15 @@ public:
}
}
+ void OnDynamicConfigChanged(const NBus::TBusServerDynamicConfigPtr& config) final
+ {
+ YT_VERIFY(config);
+
+ for (const auto& server : Servers_) {
+ server->OnDynamicConfigChanged(config);
+ }
+ }
+
TFuture<void> Stop() final
{
if (Config_->EnableLocalBypass && Config_->Port) {
diff --git a/yt/yt/core/concurrency/arcadia_interop-inl.h b/yt/yt/core/concurrency/arcadia_interop-inl.h
new file mode 100644
index 0000000000..57dc19c66d
--- /dev/null
+++ b/yt/yt/core/concurrency/arcadia_interop-inl.h
@@ -0,0 +1,68 @@
+#ifndef ARCADIA_INTEROP_INL_H_
+#error "Direct inclusion of this file is not allowed, include async_batcher.h"
+// For the sake of sane code completion.
+#include "arcadia_interop.h"
+#endif
+#undef ARCADIA_INTEROP_INL_H_
+
+#include <yt/yt/core/actions/future.h>
+
+#include <library/cpp/threading/future/core/future.h>
+
+namespace NYT::NConcurrency {
+
+////////////////////////////////////////////////////////////////////////////////
+
+template <class T>
+::NThreading::TFuture<T> ToArcadiaFuture(const TFuture<T>& future)
+{
+ auto promise = ::NThreading::NewPromise<T>();
+ auto wrappedFuture = promise.GetFuture();
+
+ future
+ .Subscribe(BIND([promise = std::move(promise)] (const TErrorOr<T>& valueOrError) mutable {
+ try {
+ if constexpr (std::is_same_v<T, void>) {
+ valueOrError
+ .ThrowOnError();
+ promise.TrySetValue();
+ } else {
+ auto value = valueOrError
+ .ValueOrThrow();
+ promise.TrySetValue(std::move(value));
+ }
+ } catch (...) {
+ promise.TrySetException(std::current_exception());
+ }
+ }));
+
+ return wrappedFuture;
+}
+
+template <class T>
+TFuture<T> FromArcadiaFuture(const ::NThreading::TFuture<T>& future)
+{
+ auto promise = NewPromise<T>();
+ auto wrappedFuture = promise.ToFuture();
+
+ future
+ .Subscribe([promise = std::move(promise)](::NThreading::TFuture<T> future) {
+ YT_ASSERT(future.HasValue() || future.HasException());
+ try {
+ if constexpr (std::is_void_v<T>) {
+ future.TryRethrow();
+ promise.TrySet();
+ } else {
+ promise.TrySet(future.ExtractValueSync());
+ }
+ } catch (const std::exception& e) {
+ promise.TrySet(NYT::TError(e));
+ }
+ });
+
+ return wrappedFuture;
+}
+
+////////////////////////////////////////////////////////////////////////////////
+
+} // namespace NYT::NConcurrency
diff --git a/yt/yt/core/concurrency/arcadia_interop.h b/yt/yt/core/concurrency/arcadia_interop.h
new file mode 100644
index 0000000000..d46ebb8c00
--- /dev/null
+++ b/yt/yt/core/concurrency/arcadia_interop.h
@@ -0,0 +1,23 @@
+#pragma once
+
+#include <yt/yt/core/actions/future.h>
+
+#include <library/cpp/threading/future/core/future.h>
+
+namespace NYT::NConcurrency {
+
+////////////////////////////////////////////////////////////////////////////////
+
+template <class T>
+::NThreading::TFuture<T> ToArcadiaFuture(const TFuture<T>& future);
+
+template <class T>
+TFuture<T> FromArcadiaFuture(const ::NThreading::TFuture<T>& future);
+
+////////////////////////////////////////////////////////////////////////////////
+
+} // namespace NYT::NConcurrency
+
+#define ARCADIA_INTEROP_INL_H_
+#include "arcadia_interop-inl.h"
+#undef ARCADIA_INTEROP_INL_H_
diff --git a/yt/yt/core/concurrency/unittests/arcadia_interop_ut.cpp b/yt/yt/core/concurrency/unittests/arcadia_interop_ut.cpp
new file mode 100644
index 0000000000..a987ebbca2
--- /dev/null
+++ b/yt/yt/core/concurrency/unittests/arcadia_interop_ut.cpp
@@ -0,0 +1,137 @@
+#include <yt/yt/core/test_framework/framework.h>
+
+#include <yt/yt/core/concurrency/arcadia_interop.h>
+
+namespace NYT::NConcurrency {
+namespace {
+
+////////////////////////////////////////////////////////////////////////////////
+
+TEST(TFutureInteropTest, FromArcadiaFutureWithValue1)
+{
+ auto future = FromArcadiaFuture(::NThreading::MakeFuture<int>(1));
+ ASSERT_TRUE(future.IsSet());
+ EXPECT_EQ(1, future.Get().ValueOrThrow());
+}
+
+TEST(TFutureInteropTest, FromArcadiaFutureWithValue2)
+{
+ ::testing::TProbeState state;
+ auto promise = ::NThreading::NewPromise<::testing::TProbe>();
+ auto future = FromArcadiaFuture(promise.GetFuture());
+ EXPECT_TRUE(!future.IsSet());
+ promise.SetValue(::testing::TProbe(&state));
+ ASSERT_TRUE(future.IsSet());
+ EXPECT_TRUE(future.Get().ValueOrThrow().IsValid());
+ EXPECT_THAT(state, ::testing::HasCopyMoveCounts(0, 3));
+}
+
+TEST(TFutureInteropTest, FromArcadiaFutureWithError1)
+{
+ auto promise = ::NThreading::NewPromise<int>();
+ promise.SetException("error");
+ auto future = FromArcadiaFuture(promise.GetFuture());
+ ASSERT_TRUE(future.IsSet());
+ EXPECT_THROW_MESSAGE_HAS_SUBSTR(future.Get().ThrowOnError(), std::exception, "error");
+}
+
+TEST(TFutureInteropTest, FromArcadiaFutureWithError2)
+{
+ auto promise = ::NThreading::NewPromise<int>();
+ auto future = FromArcadiaFuture(promise.GetFuture());
+ EXPECT_TRUE(!future.IsSet());
+ promise.SetException("error");
+ ASSERT_TRUE(future.IsSet());
+ EXPECT_THROW_MESSAGE_HAS_SUBSTR(future.Get().ThrowOnError(), std::exception, "error");
+}
+
+TEST(TFutureInteropTest, FromArcadiaFutureVoid1)
+{
+ auto promise = ::NThreading::NewPromise<void>();
+ auto future = FromArcadiaFuture(promise.GetFuture());
+ EXPECT_TRUE(!future.IsSet());
+ promise.SetValue();
+ ASSERT_TRUE(future.IsSet());
+ EXPECT_NO_THROW(future.Get().ThrowOnError());
+}
+
+TEST(TFutureInteropTest, FromArcadiaFutureVoid2)
+{
+ auto promise = ::NThreading::NewPromise<void>();
+ auto future = FromArcadiaFuture(promise.GetFuture());
+ EXPECT_TRUE(!future.IsSet());
+ promise.SetException("error");
+ ASSERT_TRUE(future.IsSet());
+ ASSERT_THROW_MESSAGE_HAS_SUBSTR(future.Get().ThrowOnError(), std::exception, "error");
+}
+
+TEST(TFutureInteropTest, FromArcadiaFutureCancel)
+{
+ auto promise = ::NThreading::NewPromise<void>();
+ auto future = FromArcadiaFuture(promise.GetFuture());
+ EXPECT_TRUE(!future.IsSet());
+ future.Cancel(TError("canceled"));
+ promise.SetValue();
+ ASSERT_THROW_MESSAGE_HAS_SUBSTR(future.Get().ThrowOnError(), std::exception, "canceled");
+}
+
+TEST(TFutureInteropTest, ToArcadiaFutureWithValue)
+{
+ ::testing::TProbeState state;
+ auto promise = NewPromise<::testing::TProbe>();
+ auto future = ToArcadiaFuture(promise.ToFuture());
+ EXPECT_FALSE(future.HasValue());
+ promise.Set(::testing::TProbe(&state));
+ ASSERT_TRUE(future.HasValue());
+ EXPECT_TRUE(future.GetValue().IsValid());
+ EXPECT_THAT(state, ::testing::HasCopyMoveCounts(1, 2));
+}
+
+TEST(TFutureInteropTest, ToArcadiaFutureWithError1)
+{
+ ::testing::TProbeState state;
+ auto promise = NewPromise<::testing::TProbe>();
+ auto future = ToArcadiaFuture(promise.ToFuture());
+ EXPECT_FALSE(future.HasValue());
+ promise.Set(TError("error"));
+ ASSERT_TRUE(future.HasException());
+ EXPECT_THROW_MESSAGE_HAS_SUBSTR(future.TryRethrow(), std::exception, "error");
+ EXPECT_THAT(state, ::testing::HasCopyMoveCounts(0, 0));
+}
+
+TEST(TFutureInteropTest, ToArcadiaFutureCanceled)
+{
+ ::testing::TProbeState state;
+ auto promise = NewPromise<::testing::TProbe>();
+ auto future = ToArcadiaFuture(promise.ToFuture());
+ EXPECT_FALSE(future.HasValue());
+
+ promise.ToFuture().Cancel(TError("canceled"));
+ ASSERT_TRUE(future.HasException());
+ EXPECT_THROW_MESSAGE_HAS_SUBSTR(future.TryRethrow(), std::exception, "canceled");
+ EXPECT_THAT(state, ::testing::HasCopyMoveCounts(0, 0));
+}
+
+TEST(TFutureInteropTest, ToArcadiaFutureVoid1)
+{
+ auto promise = NewPromise<void>();
+ auto future = ToArcadiaFuture(promise.ToFuture());
+ EXPECT_FALSE(future.HasValue());
+ promise.Set();
+ EXPECT_TRUE(future.HasValue());
+}
+
+TEST(TFutureInteropTest, ToArcadiaFutureVoid2)
+{
+ auto promise = NewPromise<void>();
+ auto future = ToArcadiaFuture(promise.ToFuture());
+ EXPECT_FALSE(future.HasValue());
+ promise.Set(TError("error"));
+ EXPECT_TRUE(future.HasException());
+ EXPECT_THROW_MESSAGE_HAS_SUBSTR(future.TryRethrow(), std::exception, "error");
+}
+
+////////////////////////////////////////////////////////////////////////////////
+
+} // namespace
+} // namespace NYT:::NConcurrency
diff --git a/yt/yt/core/concurrency/unittests/ya.make b/yt/yt/core/concurrency/unittests/ya.make
index 1d59b915ee..f78e63ebce 100644
--- a/yt/yt/core/concurrency/unittests/ya.make
+++ b/yt/yt/core/concurrency/unittests/ya.make
@@ -5,6 +5,7 @@ INCLUDE(${ARCADIA_ROOT}/yt/ya_cpp.make.inc)
PROTO_NAMESPACE(yt)
SRCS(
+ arcadia_interop_ut.cpp
async_barrier_ut.cpp
async_looper_ut.cpp
async_rw_lock_ut.cpp
@@ -47,6 +48,7 @@ PEERDIR(
yt/yt/core/test_framework
library/cpp/json/yson
+ library/cpp/threading/future
)
REQUIREMENTS(