diff options
author | robot-piglet <robot-piglet@yandex-team.com> | 2024-02-01 13:12:06 +0300 |
---|---|---|
committer | Alexander Smirnov <alex@ydb.tech> | 2024-02-09 19:16:51 +0300 |
commit | 48899fc467679556c588087b21e4d7b75d87254f (patch) | |
tree | 644ddbdedb401f6f192066a36562a3557ccc7336 | |
parent | 15b635970831875460d193dfe28f639b453e105b (diff) | |
download | ydb-48899fc467679556c588087b21e4d7b75d87254f.tar.gz |
Intermediate changes
-rw-r--r-- | yt/yt/client/api/rpc_proxy/helpers.cpp | 18 | ||||
-rw-r--r-- | yt/yt/client/api/rpc_proxy/helpers.h | 3 | ||||
-rw-r--r-- | yt/yt/client/table_client/row_buffer.cpp | 38 | ||||
-rw-r--r-- | yt/yt/client/table_client/row_buffer.h | 18 | ||||
-rw-r--r-- | yt/yt/core/misc/memory_usage_tracker.cpp | 30 | ||||
-rw-r--r-- | yt/yt/core/misc/memory_usage_tracker.h | 2 | ||||
-rw-r--r-- | yt/yt/core/rpc/public.h | 1 |
7 files changed, 96 insertions, 14 deletions
diff --git a/yt/yt/client/api/rpc_proxy/helpers.cpp b/yt/yt/client/api/rpc_proxy/helpers.cpp index 819a478b41..68739781a6 100644 --- a/yt/yt/client/api/rpc_proxy/helpers.cpp +++ b/yt/yt/client/api/rpc_proxy/helpers.cpp @@ -1842,7 +1842,8 @@ auto ReadRows<TVersionedRow>(IWireProtocolReader* reader, const TTableSchema& sc template <class TRow> TIntrusivePtr<NApi::IRowset<TRow>> DeserializeRowset( const NProto::TRowsetDescriptor& descriptor, - const TSharedRef& data) + const TSharedRef& data, + NTableClient::TRowBufferPtr rowBuffer) { if (descriptor.rowset_format() != NApi::NRpcProxy::NProto::RF_YT_WIRE) { THROW_ERROR_EXCEPTION("Unsupported rowset format %Qv", @@ -1855,8 +1856,12 @@ TIntrusivePtr<NApi::IRowset<TRow>> DeserializeRowset( TRowsetTraits<TRow>::Kind, NApi::NRpcProxy::NProto::RF_YT_WIRE); - struct TDeserializedRowsetTag { }; - auto reader = CreateWireProtocolReader(data, New<TRowBuffer>(TDeserializedRowsetTag())); + if (!rowBuffer) { + struct TDeserializedRowsetTag { }; + rowBuffer = New<TRowBuffer>(TDeserializedRowsetTag()); + } + + auto reader = CreateWireProtocolReader(data, std::move(rowBuffer)); auto schema = DeserializeRowsetSchema(descriptor); auto rows = ReadRows<TRow>(reader.get(), *schema); @@ -1866,10 +1871,13 @@ TIntrusivePtr<NApi::IRowset<TRow>> DeserializeRowset( // Instantiate templates. template NApi::IUnversionedRowsetPtr DeserializeRowset( const NProto::TRowsetDescriptor& descriptor, - const TSharedRef& data); + const TSharedRef& data, + NTableClient::TRowBufferPtr buffer = nullptr); + template NApi::IVersionedRowsetPtr DeserializeRowset( const NProto::TRowsetDescriptor& descriptor, - const TSharedRef& data); + const TSharedRef& data, + NTableClient::TRowBufferPtr buffer = nullptr); //////////////////////////////////////////////////////////////////////////////// diff --git a/yt/yt/client/api/rpc_proxy/helpers.h b/yt/yt/client/api/rpc_proxy/helpers.h index 0eb4b3594c..4b01c4ae11 100644 --- a/yt/yt/client/api/rpc_proxy/helpers.h +++ b/yt/yt/client/api/rpc_proxy/helpers.h @@ -280,7 +280,8 @@ std::vector<TSharedRef> SerializeRowset( template <class TRow> TIntrusivePtr<NApi::IRowset<TRow>> DeserializeRowset( const NProto::TRowsetDescriptor& descriptor, - const TSharedRef& data); + const TSharedRef& data, + NTableClient::TRowBufferPtr buffer = nullptr); std::vector<TSharedRef> SerializeRowset( const NTableClient::TTableSchema& schema, diff --git a/yt/yt/client/table_client/row_buffer.cpp b/yt/yt/client/table_client/row_buffer.cpp index 38036bd80d..dfa72af0a5 100644 --- a/yt/yt/client/table_client/row_buffer.cpp +++ b/yt/yt/client/table_client/row_buffer.cpp @@ -15,7 +15,9 @@ TChunkedMemoryPool* TRowBuffer::GetPool() TMutableUnversionedRow TRowBuffer::AllocateUnversioned(int valueCount) { - return TMutableUnversionedRow::Allocate(&Pool_, valueCount); + auto result = TMutableUnversionedRow::Allocate(&Pool_, valueCount); + ValidateNoOverflow(); + return result; } TMutableVersionedRow TRowBuffer::AllocateVersioned( @@ -24,12 +26,14 @@ TMutableVersionedRow TRowBuffer::AllocateVersioned( int writeTimestampCount, int deleteTimestampCount) { - return TMutableVersionedRow::Allocate( + auto result = TMutableVersionedRow::Allocate( &Pool_, keyCount, valueCount, writeTimestampCount, deleteTimestampCount); + ValidateNoOverflow(); + return result; } void TRowBuffer::CaptureValue(TUnversionedValue* value) @@ -39,6 +43,8 @@ void TRowBuffer::CaptureValue(TUnversionedValue* value) memcpy(dst, value->Data.String, value->Length); value->Data.String = dst; } + + ValidateNoOverflow(); } TVersionedValue TRowBuffer::CaptureValue(const TVersionedValue& value) @@ -89,6 +95,8 @@ TMutableUnversionedRow TRowBuffer::CaptureRow(TUnversionedValueRange values, boo } } + ValidateNoOverflow(); + return capturedRow; } @@ -152,6 +160,8 @@ TMutableUnversionedRow TRowBuffer::CaptureAndPermuteRow( capturedRow[valueCount++] = *addend; } + ValidateNoOverflow(); + return capturedRow; } @@ -176,6 +186,8 @@ TMutableVersionedRow TRowBuffer::CaptureRow(TVersionedRow row, bool captureValue CaptureValues(capturedRow); } + ValidateNoOverflow(); + return capturedRow; } @@ -284,12 +296,15 @@ TMutableVersionedRow TRowBuffer::CaptureAndPermuteRow( } } + ValidateNoOverflow(); + return capturedRow; } void TRowBuffer::Absorb(TRowBuffer&& other) { Pool_.Absorb(std::move(other.Pool_)); + ValidateNoOverflow(); } i64 TRowBuffer::GetSize() const @@ -304,14 +319,33 @@ i64 TRowBuffer::GetCapacity() const void TRowBuffer::Clear() { + MemoryGuard_.reset(); Pool_.Clear(); } void TRowBuffer::Purge() { + MemoryGuard_.reset(); Pool_.Purge(); } +void TRowBuffer::ValidateNoOverflow() +{ + if (!MemoryTracker_) { + return; + } + + auto capacity = Pool_.GetCapacity(); + + if (!MemoryGuard_) { + MemoryGuard_ = TMemoryUsageTrackerGuard::TryAcquire(MemoryTracker_, capacity) + .ValueOrThrow(); + } else { + MemoryGuard_->TrySetSize(capacity) + .ThrowOnError(); + } +} + //////////////////////////////////////////////////////////////////////////////// } // namespace NYT::NTableClient diff --git a/yt/yt/client/table_client/row_buffer.h b/yt/yt/client/table_client/row_buffer.h index 94e3605027..4597fa69c2 100644 --- a/yt/yt/client/table_client/row_buffer.h +++ b/yt/yt/client/table_client/row_buffer.h @@ -6,6 +6,8 @@ #include <library/cpp/yt/memory/chunked_memory_pool.h> +#include <yt/yt/core/misc/memory_usage_tracker.h> + namespace NYT::NTableClient { //////////////////////////////////////////////////////////////////////////////// @@ -24,29 +26,35 @@ public: TRowBuffer( TRefCountedTypeCookie tagCookie, IMemoryChunkProviderPtr chunkProvider, - size_t startChunkSize = TChunkedMemoryPool::DefaultStartChunkSize) + size_t startChunkSize = TChunkedMemoryPool::DefaultStartChunkSize, + IMemoryUsageTrackerPtr tracker = nullptr) : Pool_( tagCookie, std::move(chunkProvider), startChunkSize) + , MemoryTracker_(std::move(tracker)) { } template <class TTag = TDefaultRowBufferPoolTag> explicit TRowBuffer( TTag = TDefaultRowBufferPoolTag(), - size_t startChunkSize = TChunkedMemoryPool::DefaultStartChunkSize) + size_t startChunkSize = TChunkedMemoryPool::DefaultStartChunkSize, + IMemoryUsageTrackerPtr tracker = nullptr) : Pool_( TTag(), startChunkSize) + , MemoryTracker_(std::move(tracker)) { } template <class TTag> TRowBuffer( TTag, - IMemoryChunkProviderPtr chunkProvider) + IMemoryChunkProviderPtr chunkProvider, + IMemoryUsageTrackerPtr tracker = nullptr) : Pool_( GetRefCountedTypeCookie<TTag>(), std::move(chunkProvider)) + , MemoryTracker_(std::move(tracker)) { } TChunkedMemoryPool* GetPool(); @@ -106,6 +114,10 @@ public: private: TChunkedMemoryPool Pool_; + IMemoryUsageTrackerPtr MemoryTracker_; + std::optional<TMemoryUsageTrackerGuard> MemoryGuard_; + + void ValidateNoOverflow(); }; DEFINE_REFCOUNTED_TYPE(TRowBuffer) diff --git a/yt/yt/core/misc/memory_usage_tracker.cpp b/yt/yt/core/misc/memory_usage_tracker.cpp index 7ae0d4d1fc..d968e25fec 100644 --- a/yt/yt/core/misc/memory_usage_tracker.cpp +++ b/yt/yt/core/misc/memory_usage_tracker.cpp @@ -96,8 +96,11 @@ TErrorOr<TMemoryUsageTrackerGuard> TMemoryUsageTrackerGuard::TryAcquire( i64 size, i64 granularity) { + if (!tracker) { + return {}; + } + YT_VERIFY(size >= 0); - YT_VERIFY(tracker); auto error = tracker->TryAcquire(size); if (!error.IsOK()) { @@ -142,20 +145,41 @@ i64 TMemoryUsageTrackerGuard::GetSize() const void TMemoryUsageTrackerGuard::SetSize(i64 size) { + auto ignoredError = SetSizeGeneric(size, [&] (i64 delta) { + Tracker_->Acquire(delta); + return TError{}; + }); + + Y_UNUSED(ignoredError); +} + +TError TMemoryUsageTrackerGuard::TrySetSize(i64 size) +{ + return SetSizeGeneric(size, [&] (i64 delta) { + return Tracker_->TryAcquire(delta); + }); +} + +TError TMemoryUsageTrackerGuard::SetSizeGeneric(i64 size, auto acquirer) +{ if (!Tracker_) { - return; + return {}; } YT_VERIFY(size >= 0); Size_ = size; if (std::abs(Size_ - AcquiredSize_) >= Granularity_) { if (Size_ > AcquiredSize_) { - Tracker_->Acquire(Size_ - AcquiredSize_); + if (auto result = acquirer(Size_ - AcquiredSize_); !result.IsOK()) { + return result; + } } else { Tracker_->Release(AcquiredSize_ - Size_); } AcquiredSize_ = Size_; } + + return {}; } void TMemoryUsageTrackerGuard::IncrementSize(i64 sizeDelta) diff --git a/yt/yt/core/misc/memory_usage_tracker.h b/yt/yt/core/misc/memory_usage_tracker.h index 3c13b75320..61248faaa8 100644 --- a/yt/yt/core/misc/memory_usage_tracker.h +++ b/yt/yt/core/misc/memory_usage_tracker.h @@ -54,6 +54,7 @@ public: i64 GetSize() const; void SetSize(i64 size); + TError TrySetSize(i64 size); void IncrementSize(i64 sizeDelta); TMemoryUsageTrackerGuard TransferMemory(i64 size); @@ -64,6 +65,7 @@ private: i64 Granularity_ = 0; void MoveFrom(TMemoryUsageTrackerGuard&& other); + TError SetSizeGeneric(i64 size, auto acquirer); }; //////////////////////////////////////////////////////////////////////////////// diff --git a/yt/yt/core/rpc/public.h b/yt/yt/core/rpc/public.h index 5548fa0f22..f544383487 100644 --- a/yt/yt/core/rpc/public.h +++ b/yt/yt/core/rpc/public.h @@ -184,6 +184,7 @@ YT_DEFINE_ERROR_ENUM( ((Overloaded) (118)) // The server is currently overloaded and unable to handle additional requests. // The client should try to reduce their request rate until the server has had a chance to recover. ((SslError) (static_cast<int>(NBus::EErrorCode::SslError))) + ((MemoryOverflow) (120)) ); DEFINE_ENUM(EMessageFormat, |