diff options
author | Vitaly Stoyan <vvvv@ydb.tech> | 2024-05-03 21:15:31 +0300 |
---|---|---|
committer | GitHub <noreply@github.com> | 2024-05-03 21:15:31 +0300 |
commit | b69384e250b7914e22cb16e169020580ab65acb9 (patch) | |
tree | 573aa77b5639cf43b57ae93c4cb7eb7db433fc8e | |
parent | cc8e54e00bd286829e383ba0793cf1dabda84002 (diff) | |
download | ydb-b69384e250b7914e22cb16e169020580ab65acb9.tar.gz |
Limits for qwriters (#4225)
6 files changed, 91 insertions, 49 deletions
diff --git a/ydb/library/yql/core/qplayer/storage/file/yql_qstorage_file.cpp b/ydb/library/yql/core/qplayer/storage/file/yql_qstorage_file.cpp index 46cac468af0..ddd7fa2c5c7 100644 --- a/ydb/library/yql/core/qplayer/storage/file/yql_qstorage_file.cpp +++ b/ydb/library/yql/core/qplayer/storage/file/yql_qstorage_file.cpp @@ -50,10 +50,10 @@ protected: class TBufferedWriter : public TWriterBase { public: - TBufferedWriter(TFsPath& path, TInstant writtenAt) + TBufferedWriter(TFsPath& path, TInstant writtenAt, const TQWriterSettings& settings) : TWriterBase(path, writtenAt) , Storage_(MakeMemoryQStorage()) - , Writer_(Storage_->MakeWriter("", {})) + , Writer_(Storage_->MakeWriter("", settings)) { } @@ -97,8 +97,9 @@ private: class TUnbufferedWriter : public TWriterBase { public: - TUnbufferedWriter(TFsPath& path, TInstant writtenAt) + TUnbufferedWriter(TFsPath& path, TInstant writtenAt, const TQWriterSettings& settings) : TWriterBase(path, writtenAt) + , Settings_(settings) , DataFile_(Path_.GetPath() + ".dat") { DataFile_.Write(&WrittenAt_, sizeof(WrittenAt_)); @@ -107,11 +108,21 @@ public: NThreading::TFuture<void> Put(const TQItemKey& key, const TString& value) final { with_lock(Mutex_) { Y_ENSURE(!Committed_); - if (Keys_.emplace(key).second) { - SaveString(DataFile_, key.Component, TotalBytes_, Checksum_); - SaveString(DataFile_, key.Label, TotalBytes_, Checksum_); - SaveString(DataFile_, value, TotalBytes_, Checksum_); - ++TotalItems_; + if (!Overflow_) { + if (Keys_.emplace(key).second) { + SaveString(DataFile_, key.Component, TotalBytes_, Checksum_); + SaveString(DataFile_, key.Label, TotalBytes_, Checksum_); + SaveString(DataFile_, value, TotalBytes_, Checksum_); + ++TotalItems_; + } + + if (Settings_.ItemsLimit && TotalItems_ > *Settings_.ItemsLimit) { + Overflow_ = true; + } + + if (Settings_.BytesLimit && TotalBytes_ > *Settings_.BytesLimit) { + Overflow_ = true; + } } return NThreading::MakeFuture(); @@ -120,6 +131,10 @@ public: NThreading::TFuture<void> Commit() final { with_lock(Mutex_) { + if (Overflow_) { + throw yexception() << "Overflow of qwriter"; + } + Y_ENSURE(!Committed_); Committed_ = true; DataFile_.Finish(); @@ -129,6 +144,7 @@ public: } private: + const TQWriterSettings Settings_; TMutex Mutex_; TFileOutput DataFile_; ui64 TotalItems_ = 0; @@ -136,6 +152,7 @@ private: ui64 Checksum_ = 0; THashSet<TQItemKey> Keys_; bool Committed_ = false; + bool Overflow_ = false; }; class TStorage : public IQStorage { @@ -154,23 +171,23 @@ public: auto opPath = Folder_ / operationId; auto writtenAt = writerSettings.WrittenAt.GetOrElse(Now()); if (Settings_.BufferUntilCommit) { - return std::make_shared<TBufferedWriter>(opPath, writtenAt); + return std::make_shared<TBufferedWriter>(opPath, writtenAt, writerSettings); } else { - return std::make_shared<TUnbufferedWriter>(opPath, writtenAt); + return std::make_shared<TUnbufferedWriter>(opPath, writtenAt, writerSettings); } } - IQReaderPtr MakeReader(const TString& operationId, const TQReaderSettings& settings) const final { - Y_UNUSED(settings); + IQReaderPtr MakeReader(const TString& operationId, const TQReaderSettings& readerSettings) const final { + Y_UNUSED(readerSettings); auto memory = MakeMemoryQStorage(); LoadFile(operationId, memory); return memory->MakeReader("", {}); } - IQIteratorPtr MakeIterator(const TString& operationId, const TQIteratorSettings& settings) const { + IQIteratorPtr MakeIterator(const TString& operationId, const TQIteratorSettings& iteratorSettings) const { auto memory = MakeMemoryQStorage(); LoadFile(operationId, memory); - return memory->MakeIterator("", settings); + return memory->MakeIterator("", iteratorSettings); } private: diff --git a/ydb/library/yql/core/qplayer/storage/interface/yql_qstorage.h b/ydb/library/yql/core/qplayer/storage/interface/yql_qstorage.h index 53c1bde9a49..137e9ad6391 100644 --- a/ydb/library/yql/core/qplayer/storage/interface/yql_qstorage.h +++ b/ydb/library/yql/core/qplayer/storage/interface/yql_qstorage.h @@ -67,13 +67,14 @@ using IQIteratorPtr = std::shared_ptr<IQIterator>; struct TQWriterSettings { TMaybe<TInstant> WrittenAt; + TMaybe<ui64> ItemsLimit; + TMaybe<ui64> BytesLimit; }; struct TQReaderSettings { }; struct TQIteratorSettings { - bool DoNotLoadValue = false; TMaybe<ui64> ItemsLimit; TMaybe<ui64> BytesLimit; TMaybe<ui32> ConcurrencyLimit; @@ -84,10 +85,10 @@ public: virtual ~IQStorage() = default; // it's an UB to open writer twice for the same operationId, implementations may check it - virtual IQWriterPtr MakeWriter(const TString& operationId, const TQWriterSettings& settings) const = 0; + virtual IQWriterPtr MakeWriter(const TString& operationId, const TQWriterSettings& writerSettings) const = 0; // readers & iterators may not see results of writer until commit - virtual IQReaderPtr MakeReader(const TString& operationId, const TQReaderSettings& settings) const = 0; - virtual IQIteratorPtr MakeIterator(const TString& operationId, const TQIteratorSettings& settings) const = 0; + virtual IQReaderPtr MakeReader(const TString& operationId, const TQReaderSettings& readerSettings) const = 0; + virtual IQIteratorPtr MakeIterator(const TString& operationId, const TQIteratorSettings& iteratorSettings) const = 0; }; using IQStoragePtr = std::shared_ptr<IQStorage>; diff --git a/ydb/library/yql/core/qplayer/storage/memory/yql_qstorage_memory.cpp b/ydb/library/yql/core/qplayer/storage/memory/yql_qstorage_memory.cpp index 660ac39ea4d..ae11feb3b8f 100644 --- a/ydb/library/yql/core/qplayer/storage/memory/yql_qstorage_memory.cpp +++ b/ydb/library/yql/core/qplayer/storage/memory/yql_qstorage_memory.cpp @@ -13,6 +13,8 @@ struct TOperationMap { using TMapPtr = std::shared_ptr<TMap>; TMapPtr ReadMap, WriteMap; bool Committed = false; + bool Overflow = false; + ui64 TotalBytes = 0; }; using TOperationMapPtr = std::shared_ptr<TOperationMap>; @@ -46,20 +48,38 @@ private: class TWriter : public IQWriter { public: - TWriter(const TOperationMapPtr& operation) + TWriter(const TOperationMapPtr& operation, const TQWriterSettings& settings) : Operation_(operation) + , Settings_(settings) {} NThreading::TFuture<void> Put(const TQItemKey& key, const TString& value) final { with_lock(Operation_->Mutex) { Y_ENSURE(!Operation_->Committed); - Operation_->WriteMap->emplace(key, value); + if (!Operation_->Overflow) { + if (Operation_->WriteMap->emplace(key, value).second) { + Operation_->TotalBytes += key.Component.size() + key.Label.size() + value.size(); + } + + if (Settings_.ItemsLimit && Operation_->WriteMap->size() > *Settings_.ItemsLimit) { + Operation_->Overflow = true; + } + + if (Settings_.BytesLimit && Operation_->TotalBytes > *Settings_.BytesLimit) { + Operation_->Overflow = true; + } + } + return NThreading::MakeFuture(); } } NThreading::TFuture<void> Commit() final { with_lock(Operation_->Mutex) { + if (Operation_->Overflow) { + throw yexception() << "Overflow of qwriter"; + } + Y_ENSURE(!Operation_->Committed); Operation_->ReadMap = Operation_->WriteMap; Operation_->Committed = true; @@ -69,6 +89,7 @@ public: private: const TOperationMapPtr Operation_; + const TQWriterSettings Settings_; }; class TIterator : public IQIterator { @@ -84,7 +105,7 @@ public: return NThreading::MakeFuture(TMaybe<TQItem>()); } - auto res =TMaybe<TQItem>({It_->first, Settings_.DoNotLoadValue ? TString() : It_->second}); + auto res =TMaybe<TQItem>({It_->first, It_->second}); ++It_; return NThreading::MakeFuture(res); } @@ -102,18 +123,17 @@ public: { } - IQReaderPtr MakeReader(const TString& operationId, const TQReaderSettings& settings) const final { - Y_UNUSED(settings); + IQReaderPtr MakeReader(const TString& operationId, const TQReaderSettings& readerSettings) const final { + Y_UNUSED(readerSettings); return std::make_shared<TReader>(GetOperation(operationId, false)->ReadMap); } - IQWriterPtr MakeWriter(const TString& operationId, const TQWriterSettings& settings) const final { - Y_UNUSED(settings); - return std::make_shared<TWriter>(GetOperation(operationId, true)); + IQWriterPtr MakeWriter(const TString& operationId, const TQWriterSettings& writerSettings) const final { + return std::make_shared<TWriter>(GetOperation(operationId, true), writerSettings); } - IQIteratorPtr MakeIterator(const TString& operationId, const TQIteratorSettings& settings) const final { - return std::make_shared<TIterator>(settings, GetOperation(operationId, false)->ReadMap); + IQIteratorPtr MakeIterator(const TString& operationId, const TQIteratorSettings& iteratorSettings) const final { + return std::make_shared<TIterator>(iteratorSettings, GetOperation(operationId, false)->ReadMap); } private: diff --git a/ydb/library/yql/core/qplayer/storage/ut_common/yql_qstorage_ut_common.cpp b/ydb/library/yql/core/qplayer/storage/ut_common/yql_qstorage_ut_common.cpp index 2aa502333bb..d62efc7ebfb 100644 --- a/ydb/library/yql/core/qplayer/storage/ut_common/yql_qstorage_ut_common.cpp +++ b/ydb/library/yql/core/qplayer/storage/ut_common/yql_qstorage_ut_common.cpp @@ -43,23 +43,6 @@ void QStorageTestOne_Impl(const NYql::IQStoragePtr& storage) { UNIT_ASSERT(!value.Defined()); } -void QStorageTestIterateWithoutValue_Impl(const NYql::IQStoragePtr& storage) { - auto writer = storage->MakeWriter("foo", {}); - writer->Put({"comp", "label"}, "value").GetValueSync(); - writer->Commit().GetValueSync(); - auto reader = storage->MakeReader("foo", {}); - auto settings = TQIteratorSettings{}; - settings.DoNotLoadValue = true; - auto iterator = storage->MakeIterator("foo", settings); - auto value = iterator->Next().GetValueSync(); - UNIT_ASSERT(value.Defined()); - UNIT_ASSERT_VALUES_EQUAL(value->Key.Component, "comp"); - UNIT_ASSERT_VALUES_EQUAL(value->Key.Label, "label"); - UNIT_ASSERT_VALUES_EQUAL(value->Value, ""); - value = iterator->Next().GetValueSync(); - UNIT_ASSERT(!value.Defined()); -} - void QStorageTestManyKeys_Impl(const NYql::IQStoragePtr& storage) { const size_t N = 10; auto writer = storage->MakeWriter("foo", {}); @@ -119,3 +102,22 @@ void QStorageTestInterleaveReadWrite_Impl(const NYql::IQStoragePtr& storage) { value = iterator2->Next().GetValueSync(); UNIT_ASSERT(!value.Defined()); } + +void QStorageTestLimitWriterItems_Impl(const NYql::IQStoragePtr& storage) { + TQWriterSettings settings; + settings.ItemsLimit = 1; + auto writer = storage->MakeWriter("foo", settings); + writer->Put({"comp", "label1"}, "value1").GetValueSync(); + writer->Put({"comp", "label2"}, "value2").GetValueSync(); + UNIT_ASSERT_EXCEPTION(writer->Commit().GetValueSync(), yexception); +} + +void QStorageTestLimitWriterBytes_Impl(const NYql::IQStoragePtr& storage) { + TQWriterSettings settings; + settings.BytesLimit = 7; + auto writer = storage->MakeWriter("foo", settings); + writer->Put({"comp", "label1"}, "value1").GetValueSync(); + writer->Put({"comp", "label2"}, "value2").GetValueSync(); + UNIT_ASSERT_EXCEPTION(writer->Commit().GetValueSync(), yexception); +} + diff --git a/ydb/library/yql/core/qplayer/storage/ut_common/yql_qstorage_ut_common.h b/ydb/library/yql/core/qplayer/storage/ut_common/yql_qstorage_ut_common.h index 19a0cf2179c..7c6108b3926 100644 --- a/ydb/library/yql/core/qplayer/storage/ut_common/yql_qstorage_ut_common.h +++ b/ydb/library/yql/core/qplayer/storage/ut_common/yql_qstorage_ut_common.h @@ -5,9 +5,10 @@ void QStorageTestEmpty_Impl(const NYql::IQStoragePtr& storage); void QStorageTestOne_Impl(const NYql::IQStoragePtr& storage); -void QStorageTestIterateWithoutValue_Impl(const NYql::IQStoragePtr& storage); void QStorageTestManyKeys_Impl(const NYql::IQStoragePtr& storage); void QStorageTestInterleaveReadWrite_Impl(const NYql::IQStoragePtr& storage); +void QStorageTestLimitWriterItems_Impl(const NYql::IQStoragePtr& storage); +void QStorageTestLimitWriterBytes_Impl(const NYql::IQStoragePtr& storage); #define GENERATE_ONE_TEST(NAME, FACTORY) \ Y_UNIT_TEST(NAME) { \ @@ -20,6 +21,7 @@ void QStorageTestInterleaveReadWrite_Impl(const NYql::IQStoragePtr& storage); #define GENERATE_TESTS(FACTORY)\ GENERATE_ONE_TEST(Empty, FACTORY) \ GENERATE_ONE_TEST(One, FACTORY) \ - GENERATE_ONE_TEST(IterateWithoutValue, FACTORY) \ GENERATE_ONE_TEST(ManyKeys, FACTORY) \ - GENERATE_ONE_TEST(InterleaveReadWrite, FACTORY) + GENERATE_ONE_TEST(InterleaveReadWrite, FACTORY) \ + GENERATE_ONE_TEST(LimitWriterItems, FACTORY) \ + GENERATE_ONE_TEST(LimitWriterBytes, FACTORY) diff --git a/ydb/library/yql/core/qplayer/storage/ydb/yql_qstorage_ydb.cpp b/ydb/library/yql/core/qplayer/storage/ydb/yql_qstorage_ydb.cpp index 45fc60b4148..f1d1a421838 100644 --- a/ydb/library/yql/core/qplayer/storage/ydb/yql_qstorage_ydb.cpp +++ b/ydb/library/yql/core/qplayer/storage/ydb/yql_qstorage_ydb.cpp @@ -44,7 +44,7 @@ public: : Settings_(settings) , FullOperationId_(settings.OperationIdPrefix + operationId) , Storage_(MakeMemoryQStorage()) - , Writer_(Storage_->MakeWriter("", {})) + , Writer_(Storage_->MakeWriter("", writerSettings)) , WrittenAt_(writerSettings.WrittenAt.GetOrElse(Now())) {} |