aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorrobot-piglet <robot-piglet@yandex-team.com>2024-02-01 13:12:06 +0300
committerAlexander Smirnov <alex@ydb.tech>2024-02-09 19:16:51 +0300
commit48899fc467679556c588087b21e4d7b75d87254f (patch)
tree644ddbdedb401f6f192066a36562a3557ccc7336
parent15b635970831875460d193dfe28f639b453e105b (diff)
downloadydb-48899fc467679556c588087b21e4d7b75d87254f.tar.gz
Intermediate changes
-rw-r--r--yt/yt/client/api/rpc_proxy/helpers.cpp18
-rw-r--r--yt/yt/client/api/rpc_proxy/helpers.h3
-rw-r--r--yt/yt/client/table_client/row_buffer.cpp38
-rw-r--r--yt/yt/client/table_client/row_buffer.h18
-rw-r--r--yt/yt/core/misc/memory_usage_tracker.cpp30
-rw-r--r--yt/yt/core/misc/memory_usage_tracker.h2
-rw-r--r--yt/yt/core/rpc/public.h1
7 files changed, 96 insertions, 14 deletions
diff --git a/yt/yt/client/api/rpc_proxy/helpers.cpp b/yt/yt/client/api/rpc_proxy/helpers.cpp
index 819a478b41..68739781a6 100644
--- a/yt/yt/client/api/rpc_proxy/helpers.cpp
+++ b/yt/yt/client/api/rpc_proxy/helpers.cpp
@@ -1842,7 +1842,8 @@ auto ReadRows<TVersionedRow>(IWireProtocolReader* reader, const TTableSchema& sc
template <class TRow>
TIntrusivePtr<NApi::IRowset<TRow>> DeserializeRowset(
const NProto::TRowsetDescriptor& descriptor,
- const TSharedRef& data)
+ const TSharedRef& data,
+ NTableClient::TRowBufferPtr rowBuffer)
{
if (descriptor.rowset_format() != NApi::NRpcProxy::NProto::RF_YT_WIRE) {
THROW_ERROR_EXCEPTION("Unsupported rowset format %Qv",
@@ -1855,8 +1856,12 @@ TIntrusivePtr<NApi::IRowset<TRow>> DeserializeRowset(
TRowsetTraits<TRow>::Kind,
NApi::NRpcProxy::NProto::RF_YT_WIRE);
- struct TDeserializedRowsetTag { };
- auto reader = CreateWireProtocolReader(data, New<TRowBuffer>(TDeserializedRowsetTag()));
+ if (!rowBuffer) {
+ struct TDeserializedRowsetTag { };
+ rowBuffer = New<TRowBuffer>(TDeserializedRowsetTag());
+ }
+
+ auto reader = CreateWireProtocolReader(data, std::move(rowBuffer));
auto schema = DeserializeRowsetSchema(descriptor);
auto rows = ReadRows<TRow>(reader.get(), *schema);
@@ -1866,10 +1871,13 @@ TIntrusivePtr<NApi::IRowset<TRow>> DeserializeRowset(
// Instantiate templates.
template NApi::IUnversionedRowsetPtr DeserializeRowset(
const NProto::TRowsetDescriptor& descriptor,
- const TSharedRef& data);
+ const TSharedRef& data,
+ NTableClient::TRowBufferPtr buffer = nullptr);
+
template NApi::IVersionedRowsetPtr DeserializeRowset(
const NProto::TRowsetDescriptor& descriptor,
- const TSharedRef& data);
+ const TSharedRef& data,
+ NTableClient::TRowBufferPtr buffer = nullptr);
////////////////////////////////////////////////////////////////////////////////
diff --git a/yt/yt/client/api/rpc_proxy/helpers.h b/yt/yt/client/api/rpc_proxy/helpers.h
index 0eb4b3594c..4b01c4ae11 100644
--- a/yt/yt/client/api/rpc_proxy/helpers.h
+++ b/yt/yt/client/api/rpc_proxy/helpers.h
@@ -280,7 +280,8 @@ std::vector<TSharedRef> SerializeRowset(
template <class TRow>
TIntrusivePtr<NApi::IRowset<TRow>> DeserializeRowset(
const NProto::TRowsetDescriptor& descriptor,
- const TSharedRef& data);
+ const TSharedRef& data,
+ NTableClient::TRowBufferPtr buffer = nullptr);
std::vector<TSharedRef> SerializeRowset(
const NTableClient::TTableSchema& schema,
diff --git a/yt/yt/client/table_client/row_buffer.cpp b/yt/yt/client/table_client/row_buffer.cpp
index 38036bd80d..dfa72af0a5 100644
--- a/yt/yt/client/table_client/row_buffer.cpp
+++ b/yt/yt/client/table_client/row_buffer.cpp
@@ -15,7 +15,9 @@ TChunkedMemoryPool* TRowBuffer::GetPool()
TMutableUnversionedRow TRowBuffer::AllocateUnversioned(int valueCount)
{
- return TMutableUnversionedRow::Allocate(&Pool_, valueCount);
+ auto result = TMutableUnversionedRow::Allocate(&Pool_, valueCount);
+ ValidateNoOverflow();
+ return result;
}
TMutableVersionedRow TRowBuffer::AllocateVersioned(
@@ -24,12 +26,14 @@ TMutableVersionedRow TRowBuffer::AllocateVersioned(
int writeTimestampCount,
int deleteTimestampCount)
{
- return TMutableVersionedRow::Allocate(
+ auto result = TMutableVersionedRow::Allocate(
&Pool_,
keyCount,
valueCount,
writeTimestampCount,
deleteTimestampCount);
+ ValidateNoOverflow();
+ return result;
}
void TRowBuffer::CaptureValue(TUnversionedValue* value)
@@ -39,6 +43,8 @@ void TRowBuffer::CaptureValue(TUnversionedValue* value)
memcpy(dst, value->Data.String, value->Length);
value->Data.String = dst;
}
+
+ ValidateNoOverflow();
}
TVersionedValue TRowBuffer::CaptureValue(const TVersionedValue& value)
@@ -89,6 +95,8 @@ TMutableUnversionedRow TRowBuffer::CaptureRow(TUnversionedValueRange values, boo
}
}
+ ValidateNoOverflow();
+
return capturedRow;
}
@@ -152,6 +160,8 @@ TMutableUnversionedRow TRowBuffer::CaptureAndPermuteRow(
capturedRow[valueCount++] = *addend;
}
+ ValidateNoOverflow();
+
return capturedRow;
}
@@ -176,6 +186,8 @@ TMutableVersionedRow TRowBuffer::CaptureRow(TVersionedRow row, bool captureValue
CaptureValues(capturedRow);
}
+ ValidateNoOverflow();
+
return capturedRow;
}
@@ -284,12 +296,15 @@ TMutableVersionedRow TRowBuffer::CaptureAndPermuteRow(
}
}
+ ValidateNoOverflow();
+
return capturedRow;
}
void TRowBuffer::Absorb(TRowBuffer&& other)
{
Pool_.Absorb(std::move(other.Pool_));
+ ValidateNoOverflow();
}
i64 TRowBuffer::GetSize() const
@@ -304,14 +319,33 @@ i64 TRowBuffer::GetCapacity() const
void TRowBuffer::Clear()
{
+ MemoryGuard_.reset();
Pool_.Clear();
}
void TRowBuffer::Purge()
{
+ MemoryGuard_.reset();
Pool_.Purge();
}
+void TRowBuffer::ValidateNoOverflow()
+{
+ if (!MemoryTracker_) {
+ return;
+ }
+
+ auto capacity = Pool_.GetCapacity();
+
+ if (!MemoryGuard_) {
+ MemoryGuard_ = TMemoryUsageTrackerGuard::TryAcquire(MemoryTracker_, capacity)
+ .ValueOrThrow();
+ } else {
+ MemoryGuard_->TrySetSize(capacity)
+ .ThrowOnError();
+ }
+}
+
////////////////////////////////////////////////////////////////////////////////
} // namespace NYT::NTableClient
diff --git a/yt/yt/client/table_client/row_buffer.h b/yt/yt/client/table_client/row_buffer.h
index 94e3605027..4597fa69c2 100644
--- a/yt/yt/client/table_client/row_buffer.h
+++ b/yt/yt/client/table_client/row_buffer.h
@@ -6,6 +6,8 @@
#include <library/cpp/yt/memory/chunked_memory_pool.h>
+#include <yt/yt/core/misc/memory_usage_tracker.h>
+
namespace NYT::NTableClient {
////////////////////////////////////////////////////////////////////////////////
@@ -24,29 +26,35 @@ public:
TRowBuffer(
TRefCountedTypeCookie tagCookie,
IMemoryChunkProviderPtr chunkProvider,
- size_t startChunkSize = TChunkedMemoryPool::DefaultStartChunkSize)
+ size_t startChunkSize = TChunkedMemoryPool::DefaultStartChunkSize,
+ IMemoryUsageTrackerPtr tracker = nullptr)
: Pool_(
tagCookie,
std::move(chunkProvider),
startChunkSize)
+ , MemoryTracker_(std::move(tracker))
{ }
template <class TTag = TDefaultRowBufferPoolTag>
explicit TRowBuffer(
TTag = TDefaultRowBufferPoolTag(),
- size_t startChunkSize = TChunkedMemoryPool::DefaultStartChunkSize)
+ size_t startChunkSize = TChunkedMemoryPool::DefaultStartChunkSize,
+ IMemoryUsageTrackerPtr tracker = nullptr)
: Pool_(
TTag(),
startChunkSize)
+ , MemoryTracker_(std::move(tracker))
{ }
template <class TTag>
TRowBuffer(
TTag,
- IMemoryChunkProviderPtr chunkProvider)
+ IMemoryChunkProviderPtr chunkProvider,
+ IMemoryUsageTrackerPtr tracker = nullptr)
: Pool_(
GetRefCountedTypeCookie<TTag>(),
std::move(chunkProvider))
+ , MemoryTracker_(std::move(tracker))
{ }
TChunkedMemoryPool* GetPool();
@@ -106,6 +114,10 @@ public:
private:
TChunkedMemoryPool Pool_;
+ IMemoryUsageTrackerPtr MemoryTracker_;
+ std::optional<TMemoryUsageTrackerGuard> MemoryGuard_;
+
+ void ValidateNoOverflow();
};
DEFINE_REFCOUNTED_TYPE(TRowBuffer)
diff --git a/yt/yt/core/misc/memory_usage_tracker.cpp b/yt/yt/core/misc/memory_usage_tracker.cpp
index 7ae0d4d1fc..d968e25fec 100644
--- a/yt/yt/core/misc/memory_usage_tracker.cpp
+++ b/yt/yt/core/misc/memory_usage_tracker.cpp
@@ -96,8 +96,11 @@ TErrorOr<TMemoryUsageTrackerGuard> TMemoryUsageTrackerGuard::TryAcquire(
i64 size,
i64 granularity)
{
+ if (!tracker) {
+ return {};
+ }
+
YT_VERIFY(size >= 0);
- YT_VERIFY(tracker);
auto error = tracker->TryAcquire(size);
if (!error.IsOK()) {
@@ -142,20 +145,41 @@ i64 TMemoryUsageTrackerGuard::GetSize() const
void TMemoryUsageTrackerGuard::SetSize(i64 size)
{
+ auto ignoredError = SetSizeGeneric(size, [&] (i64 delta) {
+ Tracker_->Acquire(delta);
+ return TError{};
+ });
+
+ Y_UNUSED(ignoredError);
+}
+
+TError TMemoryUsageTrackerGuard::TrySetSize(i64 size)
+{
+ return SetSizeGeneric(size, [&] (i64 delta) {
+ return Tracker_->TryAcquire(delta);
+ });
+}
+
+TError TMemoryUsageTrackerGuard::SetSizeGeneric(i64 size, auto acquirer)
+{
if (!Tracker_) {
- return;
+ return {};
}
YT_VERIFY(size >= 0);
Size_ = size;
if (std::abs(Size_ - AcquiredSize_) >= Granularity_) {
if (Size_ > AcquiredSize_) {
- Tracker_->Acquire(Size_ - AcquiredSize_);
+ if (auto result = acquirer(Size_ - AcquiredSize_); !result.IsOK()) {
+ return result;
+ }
} else {
Tracker_->Release(AcquiredSize_ - Size_);
}
AcquiredSize_ = Size_;
}
+
+ return {};
}
void TMemoryUsageTrackerGuard::IncrementSize(i64 sizeDelta)
diff --git a/yt/yt/core/misc/memory_usage_tracker.h b/yt/yt/core/misc/memory_usage_tracker.h
index 3c13b75320..61248faaa8 100644
--- a/yt/yt/core/misc/memory_usage_tracker.h
+++ b/yt/yt/core/misc/memory_usage_tracker.h
@@ -54,6 +54,7 @@ public:
i64 GetSize() const;
void SetSize(i64 size);
+ TError TrySetSize(i64 size);
void IncrementSize(i64 sizeDelta);
TMemoryUsageTrackerGuard TransferMemory(i64 size);
@@ -64,6 +65,7 @@ private:
i64 Granularity_ = 0;
void MoveFrom(TMemoryUsageTrackerGuard&& other);
+ TError SetSizeGeneric(i64 size, auto acquirer);
};
////////////////////////////////////////////////////////////////////////////////
diff --git a/yt/yt/core/rpc/public.h b/yt/yt/core/rpc/public.h
index 5548fa0f22..f544383487 100644
--- a/yt/yt/core/rpc/public.h
+++ b/yt/yt/core/rpc/public.h
@@ -184,6 +184,7 @@ YT_DEFINE_ERROR_ENUM(
((Overloaded) (118)) // The server is currently overloaded and unable to handle additional requests.
// The client should try to reduce their request rate until the server has had a chance to recover.
((SslError) (static_cast<int>(NBus::EErrorCode::SslError)))
+ ((MemoryOverflow) (120))
);
DEFINE_ENUM(EMessageFormat,