aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorVitaly Stoyan <vvvv@ydb.tech>2024-05-03 21:15:31 +0300
committerGitHub <noreply@github.com>2024-05-03 21:15:31 +0300
commitb69384e250b7914e22cb16e169020580ab65acb9 (patch)
tree573aa77b5639cf43b57ae93c4cb7eb7db433fc8e
parentcc8e54e00bd286829e383ba0793cf1dabda84002 (diff)
downloadydb-b69384e250b7914e22cb16e169020580ab65acb9.tar.gz
Limits for qwriters (#4225)
-rw-r--r--ydb/library/yql/core/qplayer/storage/file/yql_qstorage_file.cpp45
-rw-r--r--ydb/library/yql/core/qplayer/storage/interface/yql_qstorage.h9
-rw-r--r--ydb/library/yql/core/qplayer/storage/memory/yql_qstorage_memory.cpp40
-rw-r--r--ydb/library/yql/core/qplayer/storage/ut_common/yql_qstorage_ut_common.cpp36
-rw-r--r--ydb/library/yql/core/qplayer/storage/ut_common/yql_qstorage_ut_common.h8
-rw-r--r--ydb/library/yql/core/qplayer/storage/ydb/yql_qstorage_ydb.cpp2
6 files changed, 91 insertions, 49 deletions
diff --git a/ydb/library/yql/core/qplayer/storage/file/yql_qstorage_file.cpp b/ydb/library/yql/core/qplayer/storage/file/yql_qstorage_file.cpp
index 46cac468af0..ddd7fa2c5c7 100644
--- a/ydb/library/yql/core/qplayer/storage/file/yql_qstorage_file.cpp
+++ b/ydb/library/yql/core/qplayer/storage/file/yql_qstorage_file.cpp
@@ -50,10 +50,10 @@ protected:
class TBufferedWriter : public TWriterBase {
public:
- TBufferedWriter(TFsPath& path, TInstant writtenAt)
+ TBufferedWriter(TFsPath& path, TInstant writtenAt, const TQWriterSettings& settings)
: TWriterBase(path, writtenAt)
, Storage_(MakeMemoryQStorage())
- , Writer_(Storage_->MakeWriter("", {}))
+ , Writer_(Storage_->MakeWriter("", settings))
{
}
@@ -97,8 +97,9 @@ private:
class TUnbufferedWriter : public TWriterBase {
public:
- TUnbufferedWriter(TFsPath& path, TInstant writtenAt)
+ TUnbufferedWriter(TFsPath& path, TInstant writtenAt, const TQWriterSettings& settings)
: TWriterBase(path, writtenAt)
+ , Settings_(settings)
, DataFile_(Path_.GetPath() + ".dat")
{
DataFile_.Write(&WrittenAt_, sizeof(WrittenAt_));
@@ -107,11 +108,21 @@ public:
NThreading::TFuture<void> Put(const TQItemKey& key, const TString& value) final {
with_lock(Mutex_) {
Y_ENSURE(!Committed_);
- if (Keys_.emplace(key).second) {
- SaveString(DataFile_, key.Component, TotalBytes_, Checksum_);
- SaveString(DataFile_, key.Label, TotalBytes_, Checksum_);
- SaveString(DataFile_, value, TotalBytes_, Checksum_);
- ++TotalItems_;
+ if (!Overflow_) {
+ if (Keys_.emplace(key).second) {
+ SaveString(DataFile_, key.Component, TotalBytes_, Checksum_);
+ SaveString(DataFile_, key.Label, TotalBytes_, Checksum_);
+ SaveString(DataFile_, value, TotalBytes_, Checksum_);
+ ++TotalItems_;
+ }
+
+ if (Settings_.ItemsLimit && TotalItems_ > *Settings_.ItemsLimit) {
+ Overflow_ = true;
+ }
+
+ if (Settings_.BytesLimit && TotalBytes_ > *Settings_.BytesLimit) {
+ Overflow_ = true;
+ }
}
return NThreading::MakeFuture();
@@ -120,6 +131,10 @@ public:
NThreading::TFuture<void> Commit() final {
with_lock(Mutex_) {
+ if (Overflow_) {
+ throw yexception() << "Overflow of qwriter";
+ }
+
Y_ENSURE(!Committed_);
Committed_ = true;
DataFile_.Finish();
@@ -129,6 +144,7 @@ public:
}
private:
+ const TQWriterSettings Settings_;
TMutex Mutex_;
TFileOutput DataFile_;
ui64 TotalItems_ = 0;
@@ -136,6 +152,7 @@ private:
ui64 Checksum_ = 0;
THashSet<TQItemKey> Keys_;
bool Committed_ = false;
+ bool Overflow_ = false;
};
class TStorage : public IQStorage {
@@ -154,23 +171,23 @@ public:
auto opPath = Folder_ / operationId;
auto writtenAt = writerSettings.WrittenAt.GetOrElse(Now());
if (Settings_.BufferUntilCommit) {
- return std::make_shared<TBufferedWriter>(opPath, writtenAt);
+ return std::make_shared<TBufferedWriter>(opPath, writtenAt, writerSettings);
} else {
- return std::make_shared<TUnbufferedWriter>(opPath, writtenAt);
+ return std::make_shared<TUnbufferedWriter>(opPath, writtenAt, writerSettings);
}
}
- IQReaderPtr MakeReader(const TString& operationId, const TQReaderSettings& settings) const final {
- Y_UNUSED(settings);
+ IQReaderPtr MakeReader(const TString& operationId, const TQReaderSettings& readerSettings) const final {
+ Y_UNUSED(readerSettings);
auto memory = MakeMemoryQStorage();
LoadFile(operationId, memory);
return memory->MakeReader("", {});
}
- IQIteratorPtr MakeIterator(const TString& operationId, const TQIteratorSettings& settings) const {
+ IQIteratorPtr MakeIterator(const TString& operationId, const TQIteratorSettings& iteratorSettings) const {
auto memory = MakeMemoryQStorage();
LoadFile(operationId, memory);
- return memory->MakeIterator("", settings);
+ return memory->MakeIterator("", iteratorSettings);
}
private:
diff --git a/ydb/library/yql/core/qplayer/storage/interface/yql_qstorage.h b/ydb/library/yql/core/qplayer/storage/interface/yql_qstorage.h
index 53c1bde9a49..137e9ad6391 100644
--- a/ydb/library/yql/core/qplayer/storage/interface/yql_qstorage.h
+++ b/ydb/library/yql/core/qplayer/storage/interface/yql_qstorage.h
@@ -67,13 +67,14 @@ using IQIteratorPtr = std::shared_ptr<IQIterator>;
struct TQWriterSettings {
TMaybe<TInstant> WrittenAt;
+ TMaybe<ui64> ItemsLimit;
+ TMaybe<ui64> BytesLimit;
};
struct TQReaderSettings {
};
struct TQIteratorSettings {
- bool DoNotLoadValue = false;
TMaybe<ui64> ItemsLimit;
TMaybe<ui64> BytesLimit;
TMaybe<ui32> ConcurrencyLimit;
@@ -84,10 +85,10 @@ public:
virtual ~IQStorage() = default;
// it's an UB to open writer twice for the same operationId, implementations may check it
- virtual IQWriterPtr MakeWriter(const TString& operationId, const TQWriterSettings& settings) const = 0;
+ virtual IQWriterPtr MakeWriter(const TString& operationId, const TQWriterSettings& writerSettings) const = 0;
// readers & iterators may not see results of writer until commit
- virtual IQReaderPtr MakeReader(const TString& operationId, const TQReaderSettings& settings) const = 0;
- virtual IQIteratorPtr MakeIterator(const TString& operationId, const TQIteratorSettings& settings) const = 0;
+ virtual IQReaderPtr MakeReader(const TString& operationId, const TQReaderSettings& readerSettings) const = 0;
+ virtual IQIteratorPtr MakeIterator(const TString& operationId, const TQIteratorSettings& iteratorSettings) const = 0;
};
using IQStoragePtr = std::shared_ptr<IQStorage>;
diff --git a/ydb/library/yql/core/qplayer/storage/memory/yql_qstorage_memory.cpp b/ydb/library/yql/core/qplayer/storage/memory/yql_qstorage_memory.cpp
index 660ac39ea4d..ae11feb3b8f 100644
--- a/ydb/library/yql/core/qplayer/storage/memory/yql_qstorage_memory.cpp
+++ b/ydb/library/yql/core/qplayer/storage/memory/yql_qstorage_memory.cpp
@@ -13,6 +13,8 @@ struct TOperationMap {
using TMapPtr = std::shared_ptr<TMap>;
TMapPtr ReadMap, WriteMap;
bool Committed = false;
+ bool Overflow = false;
+ ui64 TotalBytes = 0;
};
using TOperationMapPtr = std::shared_ptr<TOperationMap>;
@@ -46,20 +48,38 @@ private:
class TWriter : public IQWriter {
public:
- TWriter(const TOperationMapPtr& operation)
+ TWriter(const TOperationMapPtr& operation, const TQWriterSettings& settings)
: Operation_(operation)
+ , Settings_(settings)
{}
NThreading::TFuture<void> Put(const TQItemKey& key, const TString& value) final {
with_lock(Operation_->Mutex) {
Y_ENSURE(!Operation_->Committed);
- Operation_->WriteMap->emplace(key, value);
+ if (!Operation_->Overflow) {
+ if (Operation_->WriteMap->emplace(key, value).second) {
+ Operation_->TotalBytes += key.Component.size() + key.Label.size() + value.size();
+ }
+
+ if (Settings_.ItemsLimit && Operation_->WriteMap->size() > *Settings_.ItemsLimit) {
+ Operation_->Overflow = true;
+ }
+
+ if (Settings_.BytesLimit && Operation_->TotalBytes > *Settings_.BytesLimit) {
+ Operation_->Overflow = true;
+ }
+ }
+
return NThreading::MakeFuture();
}
}
NThreading::TFuture<void> Commit() final {
with_lock(Operation_->Mutex) {
+ if (Operation_->Overflow) {
+ throw yexception() << "Overflow of qwriter";
+ }
+
Y_ENSURE(!Operation_->Committed);
Operation_->ReadMap = Operation_->WriteMap;
Operation_->Committed = true;
@@ -69,6 +89,7 @@ public:
private:
const TOperationMapPtr Operation_;
+ const TQWriterSettings Settings_;
};
class TIterator : public IQIterator {
@@ -84,7 +105,7 @@ public:
return NThreading::MakeFuture(TMaybe<TQItem>());
}
- auto res =TMaybe<TQItem>({It_->first, Settings_.DoNotLoadValue ? TString() : It_->second});
+ auto res =TMaybe<TQItem>({It_->first, It_->second});
++It_;
return NThreading::MakeFuture(res);
}
@@ -102,18 +123,17 @@ public:
{
}
- IQReaderPtr MakeReader(const TString& operationId, const TQReaderSettings& settings) const final {
- Y_UNUSED(settings);
+ IQReaderPtr MakeReader(const TString& operationId, const TQReaderSettings& readerSettings) const final {
+ Y_UNUSED(readerSettings);
return std::make_shared<TReader>(GetOperation(operationId, false)->ReadMap);
}
- IQWriterPtr MakeWriter(const TString& operationId, const TQWriterSettings& settings) const final {
- Y_UNUSED(settings);
- return std::make_shared<TWriter>(GetOperation(operationId, true));
+ IQWriterPtr MakeWriter(const TString& operationId, const TQWriterSettings& writerSettings) const final {
+ return std::make_shared<TWriter>(GetOperation(operationId, true), writerSettings);
}
- IQIteratorPtr MakeIterator(const TString& operationId, const TQIteratorSettings& settings) const final {
- return std::make_shared<TIterator>(settings, GetOperation(operationId, false)->ReadMap);
+ IQIteratorPtr MakeIterator(const TString& operationId, const TQIteratorSettings& iteratorSettings) const final {
+ return std::make_shared<TIterator>(iteratorSettings, GetOperation(operationId, false)->ReadMap);
}
private:
diff --git a/ydb/library/yql/core/qplayer/storage/ut_common/yql_qstorage_ut_common.cpp b/ydb/library/yql/core/qplayer/storage/ut_common/yql_qstorage_ut_common.cpp
index 2aa502333bb..d62efc7ebfb 100644
--- a/ydb/library/yql/core/qplayer/storage/ut_common/yql_qstorage_ut_common.cpp
+++ b/ydb/library/yql/core/qplayer/storage/ut_common/yql_qstorage_ut_common.cpp
@@ -43,23 +43,6 @@ void QStorageTestOne_Impl(const NYql::IQStoragePtr& storage) {
UNIT_ASSERT(!value.Defined());
}
-void QStorageTestIterateWithoutValue_Impl(const NYql::IQStoragePtr& storage) {
- auto writer = storage->MakeWriter("foo", {});
- writer->Put({"comp", "label"}, "value").GetValueSync();
- writer->Commit().GetValueSync();
- auto reader = storage->MakeReader("foo", {});
- auto settings = TQIteratorSettings{};
- settings.DoNotLoadValue = true;
- auto iterator = storage->MakeIterator("foo", settings);
- auto value = iterator->Next().GetValueSync();
- UNIT_ASSERT(value.Defined());
- UNIT_ASSERT_VALUES_EQUAL(value->Key.Component, "comp");
- UNIT_ASSERT_VALUES_EQUAL(value->Key.Label, "label");
- UNIT_ASSERT_VALUES_EQUAL(value->Value, "");
- value = iterator->Next().GetValueSync();
- UNIT_ASSERT(!value.Defined());
-}
-
void QStorageTestManyKeys_Impl(const NYql::IQStoragePtr& storage) {
const size_t N = 10;
auto writer = storage->MakeWriter("foo", {});
@@ -119,3 +102,22 @@ void QStorageTestInterleaveReadWrite_Impl(const NYql::IQStoragePtr& storage) {
value = iterator2->Next().GetValueSync();
UNIT_ASSERT(!value.Defined());
}
+
+void QStorageTestLimitWriterItems_Impl(const NYql::IQStoragePtr& storage) {
+ TQWriterSettings settings;
+ settings.ItemsLimit = 1;
+ auto writer = storage->MakeWriter("foo", settings);
+ writer->Put({"comp", "label1"}, "value1").GetValueSync();
+ writer->Put({"comp", "label2"}, "value2").GetValueSync();
+ UNIT_ASSERT_EXCEPTION(writer->Commit().GetValueSync(), yexception);
+}
+
+void QStorageTestLimitWriterBytes_Impl(const NYql::IQStoragePtr& storage) {
+ TQWriterSettings settings;
+ settings.BytesLimit = 7;
+ auto writer = storage->MakeWriter("foo", settings);
+ writer->Put({"comp", "label1"}, "value1").GetValueSync();
+ writer->Put({"comp", "label2"}, "value2").GetValueSync();
+ UNIT_ASSERT_EXCEPTION(writer->Commit().GetValueSync(), yexception);
+}
+
diff --git a/ydb/library/yql/core/qplayer/storage/ut_common/yql_qstorage_ut_common.h b/ydb/library/yql/core/qplayer/storage/ut_common/yql_qstorage_ut_common.h
index 19a0cf2179c..7c6108b3926 100644
--- a/ydb/library/yql/core/qplayer/storage/ut_common/yql_qstorage_ut_common.h
+++ b/ydb/library/yql/core/qplayer/storage/ut_common/yql_qstorage_ut_common.h
@@ -5,9 +5,10 @@
void QStorageTestEmpty_Impl(const NYql::IQStoragePtr& storage);
void QStorageTestOne_Impl(const NYql::IQStoragePtr& storage);
-void QStorageTestIterateWithoutValue_Impl(const NYql::IQStoragePtr& storage);
void QStorageTestManyKeys_Impl(const NYql::IQStoragePtr& storage);
void QStorageTestInterleaveReadWrite_Impl(const NYql::IQStoragePtr& storage);
+void QStorageTestLimitWriterItems_Impl(const NYql::IQStoragePtr& storage);
+void QStorageTestLimitWriterBytes_Impl(const NYql::IQStoragePtr& storage);
#define GENERATE_ONE_TEST(NAME, FACTORY) \
Y_UNIT_TEST(NAME) { \
@@ -20,6 +21,7 @@ void QStorageTestInterleaveReadWrite_Impl(const NYql::IQStoragePtr& storage);
#define GENERATE_TESTS(FACTORY)\
GENERATE_ONE_TEST(Empty, FACTORY) \
GENERATE_ONE_TEST(One, FACTORY) \
- GENERATE_ONE_TEST(IterateWithoutValue, FACTORY) \
GENERATE_ONE_TEST(ManyKeys, FACTORY) \
- GENERATE_ONE_TEST(InterleaveReadWrite, FACTORY)
+ GENERATE_ONE_TEST(InterleaveReadWrite, FACTORY) \
+ GENERATE_ONE_TEST(LimitWriterItems, FACTORY) \
+ GENERATE_ONE_TEST(LimitWriterBytes, FACTORY)
diff --git a/ydb/library/yql/core/qplayer/storage/ydb/yql_qstorage_ydb.cpp b/ydb/library/yql/core/qplayer/storage/ydb/yql_qstorage_ydb.cpp
index 45fc60b4148..f1d1a421838 100644
--- a/ydb/library/yql/core/qplayer/storage/ydb/yql_qstorage_ydb.cpp
+++ b/ydb/library/yql/core/qplayer/storage/ydb/yql_qstorage_ydb.cpp
@@ -44,7 +44,7 @@ public:
: Settings_(settings)
, FullOperationId_(settings.OperationIdPrefix + operationId)
, Storage_(MakeMemoryQStorage())
- , Writer_(Storage_->MakeWriter("", {}))
+ , Writer_(Storage_->MakeWriter("", writerSettings))
, WrittenAt_(writerSettings.WrittenAt.GetOrElse(Now()))
{}