aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorrobot-piglet <robot-piglet@yandex-team.com>2024-03-04 06:54:39 +0300
committerrobot-piglet <robot-piglet@yandex-team.com>2024-03-04 07:04:36 +0300
commit70b4b5b300258bf4741c132670b9922ff8805525 (patch)
treec7881e1b47321ec6e490cfa33dc1e340c05fc664
parent319caa1b455e129f55369bed14b87eb74101cae6 (diff)
downloadydb-70b4b5b300258bf4741c132670b9922ff8805525.tar.gz
Intermediate changes
-rw-r--r--yt/yt/core/rpc/service_detail.cpp3
-rw-r--r--yt/yt/core/yson/async_writer.cpp2
-rw-r--r--yt/yt/core/ytree/virtual.cpp272
-rw-r--r--yt/yt/core/ytree/virtual.h19
-rw-r--r--yt/yt/library/tracing/example/main.cpp2
5 files changed, 198 insertions, 100 deletions
diff --git a/yt/yt/core/rpc/service_detail.cpp b/yt/yt/core/rpc/service_detail.cpp
index 0b896c8bf4..cce7b8f625 100644
--- a/yt/yt/core/rpc/service_detail.cpp
+++ b/yt/yt/core/rpc/service_detail.cpp
@@ -1475,7 +1475,8 @@ void TRequestQueue::ScheduleRequestsFromQueue()
// NB: Racy, may lead to overcommit in concurrency semaphore and request bytes throttler.
auto concurrencyLimit = RuntimeInfo_->ConcurrencyLimit.GetDynamicLimit();
- while (QueueSize_.load() > 0 && Concurrency_.load() < concurrencyLimit) {
+ auto concurrencyByteLimit = RuntimeInfo_->ConcurrencyByteLimit.GetDynamicByteLimit();
+ while (QueueSize_.load() > 0 && Concurrency_.load() < concurrencyLimit && ConcurrencyByte_.load() < concurrencyByteLimit) {
if (AreThrottlersOverdrafted()) {
SubscribeToThrottlers();
return;
diff --git a/yt/yt/core/yson/async_writer.cpp b/yt/yt/core/yson/async_writer.cpp
index d5dd646145..13ae1115e2 100644
--- a/yt/yt/core/yson/async_writer.cpp
+++ b/yt/yt/core/yson/async_writer.cpp
@@ -94,7 +94,7 @@ void TAsyncYsonWriter::OnRaw(TFuture<TYsonString> asyncStr)
flushedSize->fetch_add(ysonStr.AsStringBuf().size(), std::memory_order::relaxed);
return TSegment{
ysonStr,
- ysonStr.GetType() == EYsonType::Node && (!topLevel || type != EYsonType::Node)
+ ysonStr.GetType() == EYsonType::Node && (!topLevel || type != EYsonType::Node),
};
})));
}
diff --git a/yt/yt/core/ytree/virtual.cpp b/yt/yt/core/ytree/virtual.cpp
index 0d331fd764..9a1c475cfe 100644
--- a/yt/yt/core/ytree/virtual.cpp
+++ b/yt/yt/core/ytree/virtual.cpp
@@ -7,10 +7,9 @@
#include "ypath_service.h"
#include <yt/yt/core/yson/tokenizer.h>
-#include <yt/yt/core/yson/async_writer.h>
+#include <yt/yt/core/yson/writer.h>
#include <yt/yt/core/ypath/tokenizer.h>
-#include <yt/yt/core/yson/writer.h>
#include <util/generic/hash.h>
@@ -20,6 +19,7 @@ using namespace NRpc;
using namespace NYson;
using namespace NYTree;
using namespace NYPath;
+using namespace NConcurrency;
using NYT::FromProto;
@@ -29,16 +29,73 @@ using TAsyncYsonWriterPtr = TIntrusivePtr<TAsyncYsonWriter>;
namespace {
-template <class TContextPtr>
-void ReplyFromAsyncYsonWriter(TAsyncYsonWriterPtr writer, TContextPtr context)
+std::vector<std::pair<i64, i64>> SplitIntoBatches(i64 count, i64 batchSize)
{
+ YT_VERIFY(batchSize >= 1);
+
+ std::vector<std::pair<i64, i64>> result;
+ result.reserve((count + batchSize - 1) / batchSize);
+ i64 start = 0;
+ while (start < count) {
+ result.emplace_back(start, std::min(start + batchSize, count));
+ start += batchSize;
+ }
+ return result;
+}
- BIND([writer = std::move(writer)] {
+template <class TWriteItems>
+void ExecuteBatchRead(
+ std::optional<TVirtualCompositeNodeReadOffloadParams> offloadParams,
+ const TAsyncYsonWriterPtr& writer,
+ EYsonType ysonFragmentType,
+ i64 keyCount,
+ const TWriteItems& writeItems)
+{
+ if (offloadParams) {
+ auto batchIndexRanges = SplitIntoBatches(keyCount, offloadParams->BatchSize);
+
+ std::vector<TAsyncYsonWriterPtr> batchWriters;
+ batchWriters.reserve(batchIndexRanges.size());
+
+ std::vector<TFuture<void>> batchFutures;
+ batchFutures.reserve(batchIndexRanges.size());
+
+ for (int index = 0; index < std::ssize(batchIndexRanges); ++index) {
+ auto batchWriter = New<TAsyncYsonWriter>(ysonFragmentType);
+ batchWriters.push_back(batchWriter);
+ auto batchFuture = BIND([writeItems, batchWriter, batchIndexRange = batchIndexRanges[index]] {
+ writeItems(batchIndexRange, batchWriter);
+ })
+ .AsyncVia(NRpc::TDispatcher::Get()->GetHeavyInvoker())
+ .Run();
+ batchFutures.push_back(std::move(batchFuture));
+ }
+
+ // NB: Must wait for all futures to become set to ensure all lambdas above have finished accessing the state.
+ auto results = WaitForWithStrategy(AllSet(std::move(batchFutures)), offloadParams->WaitForStrategy)
+ .ValueOrThrow();
+ for (const auto& result : results) {
+ result.ThrowOnError();
+ }
+ for (const auto& batchWriter : batchWriters) {
+ writer->OnRaw(batchWriter->Finish());
+ }
+ } else {
+ writeItems(std::pair(0, keyCount), writer);
+ }
+}
+
+template <class TContext>
+void ReplyFromAsyncYsonWriter(
+ const TAsyncYsonWriterPtr& writer,
+ const TIntrusivePtr<TContext>& context)
+{
+ BIND([=] {
return writer->Finish();
})
.AsyncVia(NRpc::TDispatcher::Get()->GetHeavyInvoker())
.Run()
- .Subscribe(BIND([context = std::move(context)] (const TErrorOr<TYsonString>& resultOrError) {
+ .Subscribe(BIND([=] (const TErrorOr<TYsonString>& resultOrError) {
if (resultOrError.IsOK()) {
auto* response = &context->Response();
response->set_value(resultOrError.Value().ToString());
@@ -51,14 +108,17 @@ void ReplyFromAsyncYsonWriter(TAsyncYsonWriterPtr writer, TContextPtr context)
} // namespace
-TVirtualMapBase::TVirtualMapBase()
- : TVirtualMapBase(/*owningNode*/ nullptr)
-{ }
+////////////////////////////////////////////////////////////////////////////////
TVirtualMapBase::TVirtualMapBase(INodePtr owningNode)
: OwningNode_(std::move(owningNode))
{ }
+std::optional<TVirtualCompositeNodeReadOffloadParams> TVirtualMapBase::GetReadOffloadParams() const
+{
+ return {};
+}
+
bool TVirtualMapBase::DoInvoke(const IYPathServiceContextPtr& context)
{
DISPATCH_YPATH_SERVICE_METHOD(Get);
@@ -129,33 +189,43 @@ void TVirtualMapBase::GetSelf(
writer->OnBeginMap();
- if (attributeFilter) {
- for (const auto& key : keys) {
- if (auto service = FindItemService(key)) {
- writer->OnKeyedItem(key);
- if (Opaque_) {
- service->WriteAttributes(writer.Get(), attributeFilter, /*stable*/ false);
- writer->OnEntity();
- } else {
- auto asyncResult = AsyncYPathGet(service, "", attributeFilter);
- writer->OnRaw(asyncResult);
+ ExecuteBatchRead(
+ GetReadOffloadParams(),
+ writer,
+ EYsonType::MapFragment,
+ std::ssize(keys),
+ [&] (std::pair<i64, i64> keyIndexRange, const TAsyncYsonWriterPtr& writer) {
+ if (attributeFilter) {
+ for (i64 index = keyIndexRange.first; index < keyIndexRange.second; ++index) {
+ const auto& key = keys[index];
+ if (auto service = FindItemService(key)) {
+ writer->OnKeyedItem(key);
+ if (Opaque_) {
+ service->WriteAttributes(writer.Get(), attributeFilter, /*stable*/ false);
+ writer->OnEntity();
+ } else {
+ auto asyncResult = AsyncYPathGet(service, TYPath(), attributeFilter);
+ writer->OnRaw(asyncResult);
+ }
+ }
}
- }
- }
- } else {
- for (const auto& key : keys) {
- if (Opaque_) {
- writer->OnKeyedItem(key);
- writer->OnEntity();
} else {
- if (auto service = FindItemService(key)) {
- writer->OnKeyedItem(key);
- auto asyncResult = AsyncYPathGet(service, "");
- writer->OnRaw(asyncResult);
+ for (i64 index = keyIndexRange.first; index < keyIndexRange.second; ++index) {
+ const auto& key = keys[index];
+ if (Opaque_) {
+ writer->OnKeyedItem(key);
+ writer->OnEntity();
+ } else {
+ if (auto service = FindItemService(key)) {
+ writer->OnKeyedItem(key);
+ auto asyncResult = AsyncYPathGet(service, TYPath());
+ writer->OnRaw(asyncResult);
+ }
+ }
}
}
- }
- }
+ });
+
writer->OnEndMap();
ReplyFromAsyncYsonWriter(std::move(writer), context);
@@ -191,20 +261,31 @@ void TVirtualMapBase::ListSelf(
}
writer->OnBeginList();
- if (attributeFilter) {
- for (const auto& key : keys) {
- if (auto service = FindItemService(key)) {
- writer->OnListItem();
- service->WriteAttributes(writer.Get(), attributeFilter, /*stable*/ false);
- writer->OnStringScalar(key);
+
+ ExecuteBatchRead(
+ GetReadOffloadParams(),
+ writer,
+ EYsonType::MapFragment,
+ std::ssize(keys),
+ [&] (auto keyIndexRange, const TAsyncYsonWriterPtr& writer) {
+ if (attributeFilter) {
+ for (i64 index = keyIndexRange.first; index < keyIndexRange.second; ++index) {
+ const auto& key = keys[index];
+ if (auto service = FindItemService(key)) {
+ writer->OnListItem();
+ service->WriteAttributes(writer.Get(), attributeFilter, /*stable*/ false);
+ writer->OnStringScalar(key);
+ }
+ }
+ } else {
+ for (i64 index = keyIndexRange.first; index < keyIndexRange.second; ++index) {
+ const auto& key = keys[index];
+ writer->OnListItem();
+ writer->OnStringScalar(key);
+ }
}
- }
- } else {
- for (const auto& key : keys) {
- writer->OnListItem();
- writer->OnStringScalar(key);
- }
- }
+ });
+
writer->OnEndList();
ReplyFromAsyncYsonWriter(std::move(writer), context);
@@ -332,7 +413,6 @@ public:
private:
THashMap<TString, IYPathServicePtr> Services_;
THashMap<TInternedAttributeKey, TYsonCallback> Attributes_;
-
};
////////////////////////////////////////////////////////////////////////////////
@@ -341,8 +421,7 @@ TCompositeMapService::TCompositeMapService()
: Impl_(New<TImpl>())
{ }
-TCompositeMapService::~TCompositeMapService()
-{ }
+TCompositeMapService::~TCompositeMapService() = default;
std::vector<TString> TCompositeMapService::GetKeys(i64 limit) const
{
@@ -458,6 +537,11 @@ INodePtr CreateVirtualNode(IYPathServicePtr service)
////////////////////////////////////////////////////////////////////////////////
+std::optional<TVirtualCompositeNodeReadOffloadParams> TVirtualListBase::GetReadOffloadParams() const
+{
+ return {};
+}
+
bool TVirtualListBase::DoInvoke(const IYPathServiceContextPtr& context)
{
DISPATCH_YPATH_SERVICE_METHOD(Get);
@@ -493,7 +577,7 @@ IYPathService::TResolveResult TVirtualListBase::ResolveRecursive(
void TVirtualListBase::GetSelf(
TReqGet* request,
- TRspGet* response,
+ TRspGet* /*response*/,
const TCtxGetPtr& context)
{
YT_ASSERT(!NYson::TTokenizer(GetRequestTargetYPath(context->RequestHeader())).ParseNext());
@@ -512,63 +596,63 @@ void TVirtualListBase::GetSelf(
i64 size = GetSize();
- TAsyncYsonWriter writer;
+ auto writer = New<TAsyncYsonWriter>();
// NB: we do not want empty attributes (<>) to appear in the result in order to comply
// with current behaviour for some paths (like //sys/scheduler/orchid/scheduler/operations).
if (limit < size) {
- writer.OnBeginAttributes();
- writer.OnKeyedItem("incomplete");
- writer.OnBooleanScalar(true);
- writer.OnEndAttributes();
+ writer->OnBeginAttributes();
+ writer->OnKeyedItem("incomplete");
+ writer->OnBooleanScalar(true);
+ writer->OnEndAttributes();
}
- writer.OnBeginList();
-
- if (attributeFilter) {
- for (int index = 0; index < limit && index < size; ++index) {
- auto service = FindItemService(index);
- writer.OnListItem();
- if (service) {
- service->WriteAttributes(&writer, attributeFilter, false);
- if (Opaque_) {
- writer.OnEntity();
- } else {
- auto asyncResult = AsyncYPathGet(service, "");
- writer.OnRaw(asyncResult);
+ i64 count = std::min(size, limit);
+
+ writer->OnBeginList();
+
+ ExecuteBatchRead(
+ GetReadOffloadParams(),
+ writer,
+ EYsonType::ListFragment,
+ count,
+ [&] (std::pair<i64, i64> keyIndexRange, const TAsyncYsonWriterPtr& writer) {
+ if (attributeFilter) {
+ for (i64 index = keyIndexRange.first; index < keyIndexRange.second; ++index) {
+ writer->OnListItem();
+ if (auto service = FindItemService(index)) {
+ service->WriteAttributes(writer.Get(), attributeFilter, false);
+ if (Opaque_) {
+ writer->OnEntity();
+ } else {
+ auto asyncResult = AsyncYPathGet(service, TYPath());
+ writer->OnRaw(asyncResult);
+ }
+ } else {
+ writer->OnEntity();
+ }
}
} else {
- writer.OnEntity();
- }
- }
- } else {
- for (int index = 0; index < limit && index < size; ++index) {
- writer.OnListItem();
- if (Opaque_) {
- writer.OnEntity();
- } else {
- if (auto service = FindItemService(index)) {
- writer.OnListItem();
- auto asyncResult = AsyncYPathGet(service, "");
- writer.OnRaw(asyncResult);
- } else {
- writer.OnEntity();
+ for (i64 index = keyIndexRange.first; index < keyIndexRange.second; ++index) {
+ writer->OnListItem();
+ if (Opaque_) {
+ writer->OnEntity();
+ } else {
+ if (auto service = FindItemService(index)) {
+ writer->OnListItem();
+ auto asyncResult = AsyncYPathGet(service, TYPath());
+ writer->OnRaw(asyncResult);
+ } else {
+ writer->OnEntity();
+ }
+ }
}
}
- }
- }
+ });
- writer.OnEndList();
+ writer->OnEndList();
- writer.Finish()
- .Subscribe(BIND([=] (const TErrorOr<TYsonString>& resultOrError) {
- if (resultOrError.IsOK()) {
- response->set_value(resultOrError.Value().ToString());
- context->Reply();
- } else {
- context->Reply(resultOrError);
- }
- }));
+ ReplyFromAsyncYsonWriter(std::move(writer), context);
}
void TVirtualListBase::ListSystemAttributes(std::vector<TAttributeDescriptor>* descriptors)
diff --git a/yt/yt/core/ytree/virtual.h b/yt/yt/core/ytree/virtual.h
index 66ac03e531..74a88f0dc5 100644
--- a/yt/yt/core/ytree/virtual.h
+++ b/yt/yt/core/ytree/virtual.h
@@ -4,11 +4,20 @@
#include "ypath_detail.h"
#include <yt/yt/core/yson/producer.h>
+#include <yt/yt/core/yson/async_writer.h>
namespace NYT::NYTree {
////////////////////////////////////////////////////////////////////////////////
+struct TVirtualCompositeNodeReadOffloadParams
+{
+ NConcurrency::EWaitForStrategy WaitForStrategy = NConcurrency::EWaitForStrategy::WaitFor;
+ i64 BatchSize = 10'000;
+};
+
+////////////////////////////////////////////////////////////////////////////////
+
class TVirtualMapBase
: public TSupportsAttributes
, public ISystemAttributeProvider
@@ -17,10 +26,12 @@ public:
DEFINE_BYVAL_RW_PROPERTY(bool, Opaque, true);
protected:
- TVirtualMapBase();
- explicit TVirtualMapBase(INodePtr owningNode);
+ explicit TVirtualMapBase(INodePtr owningNode = nullptr);
+
+ virtual std::optional<TVirtualCompositeNodeReadOffloadParams> GetReadOffloadParams() const;
virtual std::vector<TString> GetKeys(i64 limit = std::numeric_limits<i64>::max()) const = 0;
+
virtual i64 GetSize() const = 0;
virtual IYPathServicePtr FindItemService(TStringBuf key) const = 0;
@@ -60,7 +71,6 @@ class TCompositeMapService
{
public:
TCompositeMapService();
-
~TCompositeMapService();
std::vector<TString> GetKeys(i64 limit = std::numeric_limits<i64>::max()) const override;
@@ -93,7 +103,10 @@ public:
DEFINE_BYVAL_RW_PROPERTY(bool, Opaque, true);
protected:
+ virtual std::optional<TVirtualCompositeNodeReadOffloadParams> GetReadOffloadParams() const;
+
virtual i64 GetSize() const = 0;
+
virtual IYPathServicePtr FindItemService(int index) const = 0;
bool DoInvoke(const IYPathServiceContextPtr& context) override;
diff --git a/yt/yt/library/tracing/example/main.cpp b/yt/yt/library/tracing/example/main.cpp
index d2bbcb1972..9f2fabcf69 100644
--- a/yt/yt/library/tracing/example/main.cpp
+++ b/yt/yt/library/tracing/example/main.cpp
@@ -113,7 +113,7 @@ int main(int argc, char* argv[])
throw yexception() << usage;
}
- auto config = New<NTracing::TJaegerTracerConfig>();
+ static auto config = New<NTracing::TJaegerTracerConfig>();
config->CollectorChannelConfig = New<NRpc::NGrpc::TChannelConfig>();
config->CollectorChannelConfig->Address = argv[1];