diff options
author | robot-piglet <robot-piglet@yandex-team.com> | 2024-03-04 06:54:39 +0300 |
---|---|---|
committer | robot-piglet <robot-piglet@yandex-team.com> | 2024-03-04 07:04:36 +0300 |
commit | 70b4b5b300258bf4741c132670b9922ff8805525 (patch) | |
tree | c7881e1b47321ec6e490cfa33dc1e340c05fc664 | |
parent | 319caa1b455e129f55369bed14b87eb74101cae6 (diff) | |
download | ydb-70b4b5b300258bf4741c132670b9922ff8805525.tar.gz |
Intermediate changes
-rw-r--r-- | yt/yt/core/rpc/service_detail.cpp | 3 | ||||
-rw-r--r-- | yt/yt/core/yson/async_writer.cpp | 2 | ||||
-rw-r--r-- | yt/yt/core/ytree/virtual.cpp | 272 | ||||
-rw-r--r-- | yt/yt/core/ytree/virtual.h | 19 | ||||
-rw-r--r-- | yt/yt/library/tracing/example/main.cpp | 2 |
5 files changed, 198 insertions, 100 deletions
diff --git a/yt/yt/core/rpc/service_detail.cpp b/yt/yt/core/rpc/service_detail.cpp index 0b896c8bf4..cce7b8f625 100644 --- a/yt/yt/core/rpc/service_detail.cpp +++ b/yt/yt/core/rpc/service_detail.cpp @@ -1475,7 +1475,8 @@ void TRequestQueue::ScheduleRequestsFromQueue() // NB: Racy, may lead to overcommit in concurrency semaphore and request bytes throttler. auto concurrencyLimit = RuntimeInfo_->ConcurrencyLimit.GetDynamicLimit(); - while (QueueSize_.load() > 0 && Concurrency_.load() < concurrencyLimit) { + auto concurrencyByteLimit = RuntimeInfo_->ConcurrencyByteLimit.GetDynamicByteLimit(); + while (QueueSize_.load() > 0 && Concurrency_.load() < concurrencyLimit && ConcurrencyByte_.load() < concurrencyByteLimit) { if (AreThrottlersOverdrafted()) { SubscribeToThrottlers(); return; diff --git a/yt/yt/core/yson/async_writer.cpp b/yt/yt/core/yson/async_writer.cpp index d5dd646145..13ae1115e2 100644 --- a/yt/yt/core/yson/async_writer.cpp +++ b/yt/yt/core/yson/async_writer.cpp @@ -94,7 +94,7 @@ void TAsyncYsonWriter::OnRaw(TFuture<TYsonString> asyncStr) flushedSize->fetch_add(ysonStr.AsStringBuf().size(), std::memory_order::relaxed); return TSegment{ ysonStr, - ysonStr.GetType() == EYsonType::Node && (!topLevel || type != EYsonType::Node) + ysonStr.GetType() == EYsonType::Node && (!topLevel || type != EYsonType::Node), }; }))); } diff --git a/yt/yt/core/ytree/virtual.cpp b/yt/yt/core/ytree/virtual.cpp index 0d331fd764..9a1c475cfe 100644 --- a/yt/yt/core/ytree/virtual.cpp +++ b/yt/yt/core/ytree/virtual.cpp @@ -7,10 +7,9 @@ #include "ypath_service.h" #include <yt/yt/core/yson/tokenizer.h> -#include <yt/yt/core/yson/async_writer.h> +#include <yt/yt/core/yson/writer.h> #include <yt/yt/core/ypath/tokenizer.h> -#include <yt/yt/core/yson/writer.h> #include <util/generic/hash.h> @@ -20,6 +19,7 @@ using namespace NRpc; using namespace NYson; using namespace NYTree; using namespace NYPath; +using namespace NConcurrency; using NYT::FromProto; @@ -29,16 +29,73 @@ using TAsyncYsonWriterPtr = TIntrusivePtr<TAsyncYsonWriter>; namespace { -template <class TContextPtr> -void ReplyFromAsyncYsonWriter(TAsyncYsonWriterPtr writer, TContextPtr context) +std::vector<std::pair<i64, i64>> SplitIntoBatches(i64 count, i64 batchSize) { + YT_VERIFY(batchSize >= 1); + + std::vector<std::pair<i64, i64>> result; + result.reserve((count + batchSize - 1) / batchSize); + i64 start = 0; + while (start < count) { + result.emplace_back(start, std::min(start + batchSize, count)); + start += batchSize; + } + return result; +} - BIND([writer = std::move(writer)] { +template <class TWriteItems> +void ExecuteBatchRead( + std::optional<TVirtualCompositeNodeReadOffloadParams> offloadParams, + const TAsyncYsonWriterPtr& writer, + EYsonType ysonFragmentType, + i64 keyCount, + const TWriteItems& writeItems) +{ + if (offloadParams) { + auto batchIndexRanges = SplitIntoBatches(keyCount, offloadParams->BatchSize); + + std::vector<TAsyncYsonWriterPtr> batchWriters; + batchWriters.reserve(batchIndexRanges.size()); + + std::vector<TFuture<void>> batchFutures; + batchFutures.reserve(batchIndexRanges.size()); + + for (int index = 0; index < std::ssize(batchIndexRanges); ++index) { + auto batchWriter = New<TAsyncYsonWriter>(ysonFragmentType); + batchWriters.push_back(batchWriter); + auto batchFuture = BIND([writeItems, batchWriter, batchIndexRange = batchIndexRanges[index]] { + writeItems(batchIndexRange, batchWriter); + }) + .AsyncVia(NRpc::TDispatcher::Get()->GetHeavyInvoker()) + .Run(); + batchFutures.push_back(std::move(batchFuture)); + } + + // NB: Must wait for all futures to become set to ensure all lambdas above have finished accessing the state. + auto results = WaitForWithStrategy(AllSet(std::move(batchFutures)), offloadParams->WaitForStrategy) + .ValueOrThrow(); + for (const auto& result : results) { + result.ThrowOnError(); + } + for (const auto& batchWriter : batchWriters) { + writer->OnRaw(batchWriter->Finish()); + } + } else { + writeItems(std::pair(0, keyCount), writer); + } +} + +template <class TContext> +void ReplyFromAsyncYsonWriter( + const TAsyncYsonWriterPtr& writer, + const TIntrusivePtr<TContext>& context) +{ + BIND([=] { return writer->Finish(); }) .AsyncVia(NRpc::TDispatcher::Get()->GetHeavyInvoker()) .Run() - .Subscribe(BIND([context = std::move(context)] (const TErrorOr<TYsonString>& resultOrError) { + .Subscribe(BIND([=] (const TErrorOr<TYsonString>& resultOrError) { if (resultOrError.IsOK()) { auto* response = &context->Response(); response->set_value(resultOrError.Value().ToString()); @@ -51,14 +108,17 @@ void ReplyFromAsyncYsonWriter(TAsyncYsonWriterPtr writer, TContextPtr context) } // namespace -TVirtualMapBase::TVirtualMapBase() - : TVirtualMapBase(/*owningNode*/ nullptr) -{ } +//////////////////////////////////////////////////////////////////////////////// TVirtualMapBase::TVirtualMapBase(INodePtr owningNode) : OwningNode_(std::move(owningNode)) { } +std::optional<TVirtualCompositeNodeReadOffloadParams> TVirtualMapBase::GetReadOffloadParams() const +{ + return {}; +} + bool TVirtualMapBase::DoInvoke(const IYPathServiceContextPtr& context) { DISPATCH_YPATH_SERVICE_METHOD(Get); @@ -129,33 +189,43 @@ void TVirtualMapBase::GetSelf( writer->OnBeginMap(); - if (attributeFilter) { - for (const auto& key : keys) { - if (auto service = FindItemService(key)) { - writer->OnKeyedItem(key); - if (Opaque_) { - service->WriteAttributes(writer.Get(), attributeFilter, /*stable*/ false); - writer->OnEntity(); - } else { - auto asyncResult = AsyncYPathGet(service, "", attributeFilter); - writer->OnRaw(asyncResult); + ExecuteBatchRead( + GetReadOffloadParams(), + writer, + EYsonType::MapFragment, + std::ssize(keys), + [&] (std::pair<i64, i64> keyIndexRange, const TAsyncYsonWriterPtr& writer) { + if (attributeFilter) { + for (i64 index = keyIndexRange.first; index < keyIndexRange.second; ++index) { + const auto& key = keys[index]; + if (auto service = FindItemService(key)) { + writer->OnKeyedItem(key); + if (Opaque_) { + service->WriteAttributes(writer.Get(), attributeFilter, /*stable*/ false); + writer->OnEntity(); + } else { + auto asyncResult = AsyncYPathGet(service, TYPath(), attributeFilter); + writer->OnRaw(asyncResult); + } + } } - } - } - } else { - for (const auto& key : keys) { - if (Opaque_) { - writer->OnKeyedItem(key); - writer->OnEntity(); } else { - if (auto service = FindItemService(key)) { - writer->OnKeyedItem(key); - auto asyncResult = AsyncYPathGet(service, ""); - writer->OnRaw(asyncResult); + for (i64 index = keyIndexRange.first; index < keyIndexRange.second; ++index) { + const auto& key = keys[index]; + if (Opaque_) { + writer->OnKeyedItem(key); + writer->OnEntity(); + } else { + if (auto service = FindItemService(key)) { + writer->OnKeyedItem(key); + auto asyncResult = AsyncYPathGet(service, TYPath()); + writer->OnRaw(asyncResult); + } + } } } - } - } + }); + writer->OnEndMap(); ReplyFromAsyncYsonWriter(std::move(writer), context); @@ -191,20 +261,31 @@ void TVirtualMapBase::ListSelf( } writer->OnBeginList(); - if (attributeFilter) { - for (const auto& key : keys) { - if (auto service = FindItemService(key)) { - writer->OnListItem(); - service->WriteAttributes(writer.Get(), attributeFilter, /*stable*/ false); - writer->OnStringScalar(key); + + ExecuteBatchRead( + GetReadOffloadParams(), + writer, + EYsonType::MapFragment, + std::ssize(keys), + [&] (auto keyIndexRange, const TAsyncYsonWriterPtr& writer) { + if (attributeFilter) { + for (i64 index = keyIndexRange.first; index < keyIndexRange.second; ++index) { + const auto& key = keys[index]; + if (auto service = FindItemService(key)) { + writer->OnListItem(); + service->WriteAttributes(writer.Get(), attributeFilter, /*stable*/ false); + writer->OnStringScalar(key); + } + } + } else { + for (i64 index = keyIndexRange.first; index < keyIndexRange.second; ++index) { + const auto& key = keys[index]; + writer->OnListItem(); + writer->OnStringScalar(key); + } } - } - } else { - for (const auto& key : keys) { - writer->OnListItem(); - writer->OnStringScalar(key); - } - } + }); + writer->OnEndList(); ReplyFromAsyncYsonWriter(std::move(writer), context); @@ -332,7 +413,6 @@ public: private: THashMap<TString, IYPathServicePtr> Services_; THashMap<TInternedAttributeKey, TYsonCallback> Attributes_; - }; //////////////////////////////////////////////////////////////////////////////// @@ -341,8 +421,7 @@ TCompositeMapService::TCompositeMapService() : Impl_(New<TImpl>()) { } -TCompositeMapService::~TCompositeMapService() -{ } +TCompositeMapService::~TCompositeMapService() = default; std::vector<TString> TCompositeMapService::GetKeys(i64 limit) const { @@ -458,6 +537,11 @@ INodePtr CreateVirtualNode(IYPathServicePtr service) //////////////////////////////////////////////////////////////////////////////// +std::optional<TVirtualCompositeNodeReadOffloadParams> TVirtualListBase::GetReadOffloadParams() const +{ + return {}; +} + bool TVirtualListBase::DoInvoke(const IYPathServiceContextPtr& context) { DISPATCH_YPATH_SERVICE_METHOD(Get); @@ -493,7 +577,7 @@ IYPathService::TResolveResult TVirtualListBase::ResolveRecursive( void TVirtualListBase::GetSelf( TReqGet* request, - TRspGet* response, + TRspGet* /*response*/, const TCtxGetPtr& context) { YT_ASSERT(!NYson::TTokenizer(GetRequestTargetYPath(context->RequestHeader())).ParseNext()); @@ -512,63 +596,63 @@ void TVirtualListBase::GetSelf( i64 size = GetSize(); - TAsyncYsonWriter writer; + auto writer = New<TAsyncYsonWriter>(); // NB: we do not want empty attributes (<>) to appear in the result in order to comply // with current behaviour for some paths (like //sys/scheduler/orchid/scheduler/operations). if (limit < size) { - writer.OnBeginAttributes(); - writer.OnKeyedItem("incomplete"); - writer.OnBooleanScalar(true); - writer.OnEndAttributes(); + writer->OnBeginAttributes(); + writer->OnKeyedItem("incomplete"); + writer->OnBooleanScalar(true); + writer->OnEndAttributes(); } - writer.OnBeginList(); - - if (attributeFilter) { - for (int index = 0; index < limit && index < size; ++index) { - auto service = FindItemService(index); - writer.OnListItem(); - if (service) { - service->WriteAttributes(&writer, attributeFilter, false); - if (Opaque_) { - writer.OnEntity(); - } else { - auto asyncResult = AsyncYPathGet(service, ""); - writer.OnRaw(asyncResult); + i64 count = std::min(size, limit); + + writer->OnBeginList(); + + ExecuteBatchRead( + GetReadOffloadParams(), + writer, + EYsonType::ListFragment, + count, + [&] (std::pair<i64, i64> keyIndexRange, const TAsyncYsonWriterPtr& writer) { + if (attributeFilter) { + for (i64 index = keyIndexRange.first; index < keyIndexRange.second; ++index) { + writer->OnListItem(); + if (auto service = FindItemService(index)) { + service->WriteAttributes(writer.Get(), attributeFilter, false); + if (Opaque_) { + writer->OnEntity(); + } else { + auto asyncResult = AsyncYPathGet(service, TYPath()); + writer->OnRaw(asyncResult); + } + } else { + writer->OnEntity(); + } } } else { - writer.OnEntity(); - } - } - } else { - for (int index = 0; index < limit && index < size; ++index) { - writer.OnListItem(); - if (Opaque_) { - writer.OnEntity(); - } else { - if (auto service = FindItemService(index)) { - writer.OnListItem(); - auto asyncResult = AsyncYPathGet(service, ""); - writer.OnRaw(asyncResult); - } else { - writer.OnEntity(); + for (i64 index = keyIndexRange.first; index < keyIndexRange.second; ++index) { + writer->OnListItem(); + if (Opaque_) { + writer->OnEntity(); + } else { + if (auto service = FindItemService(index)) { + writer->OnListItem(); + auto asyncResult = AsyncYPathGet(service, TYPath()); + writer->OnRaw(asyncResult); + } else { + writer->OnEntity(); + } + } } } - } - } + }); - writer.OnEndList(); + writer->OnEndList(); - writer.Finish() - .Subscribe(BIND([=] (const TErrorOr<TYsonString>& resultOrError) { - if (resultOrError.IsOK()) { - response->set_value(resultOrError.Value().ToString()); - context->Reply(); - } else { - context->Reply(resultOrError); - } - })); + ReplyFromAsyncYsonWriter(std::move(writer), context); } void TVirtualListBase::ListSystemAttributes(std::vector<TAttributeDescriptor>* descriptors) diff --git a/yt/yt/core/ytree/virtual.h b/yt/yt/core/ytree/virtual.h index 66ac03e531..74a88f0dc5 100644 --- a/yt/yt/core/ytree/virtual.h +++ b/yt/yt/core/ytree/virtual.h @@ -4,11 +4,20 @@ #include "ypath_detail.h" #include <yt/yt/core/yson/producer.h> +#include <yt/yt/core/yson/async_writer.h> namespace NYT::NYTree { //////////////////////////////////////////////////////////////////////////////// +struct TVirtualCompositeNodeReadOffloadParams +{ + NConcurrency::EWaitForStrategy WaitForStrategy = NConcurrency::EWaitForStrategy::WaitFor; + i64 BatchSize = 10'000; +}; + +//////////////////////////////////////////////////////////////////////////////// + class TVirtualMapBase : public TSupportsAttributes , public ISystemAttributeProvider @@ -17,10 +26,12 @@ public: DEFINE_BYVAL_RW_PROPERTY(bool, Opaque, true); protected: - TVirtualMapBase(); - explicit TVirtualMapBase(INodePtr owningNode); + explicit TVirtualMapBase(INodePtr owningNode = nullptr); + + virtual std::optional<TVirtualCompositeNodeReadOffloadParams> GetReadOffloadParams() const; virtual std::vector<TString> GetKeys(i64 limit = std::numeric_limits<i64>::max()) const = 0; + virtual i64 GetSize() const = 0; virtual IYPathServicePtr FindItemService(TStringBuf key) const = 0; @@ -60,7 +71,6 @@ class TCompositeMapService { public: TCompositeMapService(); - ~TCompositeMapService(); std::vector<TString> GetKeys(i64 limit = std::numeric_limits<i64>::max()) const override; @@ -93,7 +103,10 @@ public: DEFINE_BYVAL_RW_PROPERTY(bool, Opaque, true); protected: + virtual std::optional<TVirtualCompositeNodeReadOffloadParams> GetReadOffloadParams() const; + virtual i64 GetSize() const = 0; + virtual IYPathServicePtr FindItemService(int index) const = 0; bool DoInvoke(const IYPathServiceContextPtr& context) override; diff --git a/yt/yt/library/tracing/example/main.cpp b/yt/yt/library/tracing/example/main.cpp index d2bbcb1972..9f2fabcf69 100644 --- a/yt/yt/library/tracing/example/main.cpp +++ b/yt/yt/library/tracing/example/main.cpp @@ -113,7 +113,7 @@ int main(int argc, char* argv[]) throw yexception() << usage; } - auto config = New<NTracing::TJaegerTracerConfig>(); + static auto config = New<NTracing::TJaegerTracerConfig>(); config->CollectorChannelConfig = New<NRpc::NGrpc::TChannelConfig>(); config->CollectorChannelConfig->Address = argv[1]; |