diff options
author | Vitaly Stoyan <vvvv@ydb.tech> | 2024-04-22 16:26:05 +0300 |
---|---|---|
committer | GitHub <noreply@github.com> | 2024-04-22 16:26:05 +0300 |
commit | 0fea310a336608fa290c8efee866be9e83318335 (patch) | |
tree | dae4a48060d2a0a6aef627fd3f956de2a8fcf06e | |
parent | 5a8c9bf0fa1b630d1a488eb4ebf4703dce1da767 (diff) | |
download | ydb-0fea310a336608fa290c8efee866be9e83318335.tar.gz |
QPlayer storage interfaces & in-memory implementation (#3993)
11 files changed, 444 insertions, 0 deletions
diff --git a/ydb/library/yql/core/qplayer/storage/interface/ya.make b/ydb/library/yql/core/qplayer/storage/interface/ya.make new file mode 100644 index 0000000000..4b35b17c95 --- /dev/null +++ b/ydb/library/yql/core/qplayer/storage/interface/ya.make @@ -0,0 +1,11 @@ +LIBRARY() + +SRCS( + yql_qstorage.cpp +) + +PEERDIR( + library/cpp/threading/future +) + +END() diff --git a/ydb/library/yql/core/qplayer/storage/interface/yql_qstorage.cpp b/ydb/library/yql/core/qplayer/storage/interface/yql_qstorage.cpp new file mode 100644 index 0000000000..43b073e130 --- /dev/null +++ b/ydb/library/yql/core/qplayer/storage/interface/yql_qstorage.cpp @@ -0,0 +1 @@ +#include "yql_qstorage.h" diff --git a/ydb/library/yql/core/qplayer/storage/interface/yql_qstorage.h b/ydb/library/yql/core/qplayer/storage/interface/yql_qstorage.h new file mode 100644 index 0000000000..7b8cecf9fd --- /dev/null +++ b/ydb/library/yql/core/qplayer/storage/interface/yql_qstorage.h @@ -0,0 +1,127 @@ +#pragma once + +#include <library/cpp/threading/future/future.h> +#include <util/digest/numeric.h> +#include <util/generic/string.h> +#include <memory> + +namespace NYql { + +struct TQItemKey { + TString Component; + TString Label; + + bool operator==(const TQItemKey& other) const { + return Component == other.Component && Label == other.Label; + } + + bool operator<(const TQItemKey& other) const { + return Component < other.Component || Component == other.Component && Label < other.Label; + } + + size_t Hash() const { + return CombineHashes( + THash<TString>()(Component), + THash<TString>()(Label) + ); + } +}; + +struct TQItem { + TQItemKey Key; + TString Value; + + bool operator<(const TQItem& other) const { + return Key < other.Key; + } +}; + +class IQReader { +public: + virtual ~IQReader() = default; + + virtual NThreading::TFuture<TMaybe<TQItem>> Get(const TQItemKey& key) const = 0; +}; + +using IQReaderPtr = std::shared_ptr<IQReader>; + +class IQWriter { +public: + virtual ~IQWriter() = default; + + virtual NThreading::TFuture<void> Put(const TQItemKey& key, const TString& value) = 0; + virtual NThreading::TFuture<void> Commit() = 0; +}; + +using IQWriterPtr = std::shared_ptr<IQWriter>; + +class IQIterator { +public: + virtual ~IQIterator() = default; + + virtual NThreading::TFuture<TMaybe<TQItem>> Next() = 0; +}; + +using IQIteratorPtr = std::shared_ptr<IQIterator>; + +struct TQIteratorSettings { + bool DoNotLoadValue = false; + TMaybe<ui64> ItemsLimit; + TMaybe<ui64> BytesLimit; + TMaybe<ui32> ConcurrencyLimit; +}; + +class IQStorage { +public: + virtual ~IQStorage() = default; + + virtual IQWriterPtr MakeWriter(const TString& operationId) const = 0; + // readers & iterators may not see results of writer until commit + virtual IQReaderPtr MakeReader(const TString& operationId) const = 0; + virtual IQIteratorPtr MakeIterator(const TString& operationId, const TQIteratorSettings& settings) const = 0; +}; + +using IQStoragePtr = std::shared_ptr<IQStorage>; + +class TQContext { +public: + TQContext() + {} + + TQContext(IQReaderPtr reader) + : Reader_(reader) + {} + + TQContext(IQWriterPtr writer) + : Writer_(writer) + {} + + bool CanRead() const { + return Reader_ != nullptr; + } + + bool CanWrite() const { + return Writer_ != nullptr; + } + + const IQReaderPtr& GetReader() const { + return Reader_; + } + + const IQWriterPtr& GetWriter() const { + return Writer_; + } + +private: + const IQReaderPtr Reader_; + const IQWriterPtr Writer_; +}; + +} + +template <> +struct THash<NYql::TQItemKey> { + size_t operator()(const NYql::TQItemKey& value) const { + return value.Hash(); + } +}; diff --git a/ydb/library/yql/core/qplayer/storage/memory/ut/ya.make b/ydb/library/yql/core/qplayer/storage/memory/ut/ya.make new file mode 100644 index 0000000000..1a0120d389 --- /dev/null +++ b/ydb/library/yql/core/qplayer/storage/memory/ut/ya.make @@ -0,0 +1,8 @@ +UNITTEST_FOR(ydb/library/yql/core/qplayer/storage/memory) + +SRCS( + yql_qstorage_memory_ut.cpp +) + +END() + diff --git a/ydb/library/yql/core/qplayer/storage/memory/ut/yql_qstorage_memory_ut.cpp b/ydb/library/yql/core/qplayer/storage/memory/ut/yql_qstorage_memory_ut.cpp new file mode 100644 index 0000000000..0b43294251 --- /dev/null +++ b/ydb/library/yql/core/qplayer/storage/memory/ut/yql_qstorage_memory_ut.cpp @@ -0,0 +1,128 @@ +#include <ydb/library/yql/core/qplayer/storage/memory/yql_qstorage_memory.h> + +#include <library/cpp/testing/unittest/registar.h> + +using namespace NYql; + +TVector<TQItem> DrainIterator(IQIterator& iterator) { + TVector<TQItem> res; + for (;;) { + auto value = iterator.Next().GetValueSync(); + if (!value) { + break; + } + + res.emplace_back(*value); + } + + return res; +} + +Y_UNIT_TEST_SUITE(TQStorageMemoryTests) { + Y_UNIT_TEST(Empty) { + auto storage = MakeMemoryQStorage(); + auto reader = storage->MakeReader("foo"); + UNIT_ASSERT(!reader->Get({"comp", "label"}).GetValueSync().Defined()); + auto iterator = storage->MakeIterator("foo", {}); + UNIT_ASSERT(!iterator->Next().GetValueSync().Defined()); + } + + Y_UNIT_TEST(One) { + auto storage = MakeMemoryQStorage(); + auto writer = storage->MakeWriter("foo"); + writer->Put({"comp", "label"}, "value").GetValueSync(); + writer->Commit().GetValueSync(); + auto reader = storage->MakeReader("foo"); + auto value = reader->Get({"comp", "label"}).GetValueSync(); + UNIT_ASSERT(value.Defined()); + UNIT_ASSERT_VALUES_EQUAL(value->Key.Component, "comp"); + UNIT_ASSERT_VALUES_EQUAL(value->Key.Label, "label"); + UNIT_ASSERT_VALUES_EQUAL(value->Value, "value"); + auto iterator = storage->MakeIterator("foo", {}); + value = iterator->Next().GetValueSync(); + UNIT_ASSERT(value.Defined()); + UNIT_ASSERT_VALUES_EQUAL(value->Key.Component, "comp"); + UNIT_ASSERT_VALUES_EQUAL(value->Key.Label, "label"); + UNIT_ASSERT_VALUES_EQUAL(value->Value, "value"); + value = iterator->Next().GetValueSync(); + UNIT_ASSERT(!value.Defined()); + } + + Y_UNIT_TEST(IterateWithoutValue) { + auto storage = MakeMemoryQStorage(); + auto writer = storage->MakeWriter("foo"); + writer->Put({"comp", "label"}, "value").GetValueSync(); + writer->Commit().GetValueSync(); + auto reader = storage->MakeReader("foo"); + auto settings = TQIteratorSettings{}; + settings.DoNotLoadValue = true; + auto iterator = storage->MakeIterator("foo", settings); + auto value = iterator->Next().GetValueSync(); + UNIT_ASSERT(value.Defined()); + UNIT_ASSERT_VALUES_EQUAL(value->Key.Component, "comp"); + UNIT_ASSERT_VALUES_EQUAL(value->Key.Label, "label"); + UNIT_ASSERT_VALUES_EQUAL(value->Value, ""); + value = iterator->Next().GetValueSync(); + UNIT_ASSERT(!value.Defined()); + } + + Y_UNIT_TEST(ManyKeys) { + const size_t N = 10; + auto storage = MakeMemoryQStorage(); + auto writer = storage->MakeWriter("foo"); + for (size_t i = 0; i < N; ++i) { + writer->Put({"comp", "label" + ToString(i)}, "value" + ToString(i)).GetValueSync(); + } + + writer->Commit().GetValueSync(); + auto reader = storage->MakeReader("foo"); + for (size_t i = 0; i < N; ++i) { + auto value = reader->Get({"comp", "label" + ToString(i)}).GetValueSync(); + UNIT_ASSERT(value.Defined()); + UNIT_ASSERT_VALUES_EQUAL(value->Key.Component, "comp"); + UNIT_ASSERT_VALUES_EQUAL(value->Key.Label, "label" + ToString(i)); + UNIT_ASSERT_VALUES_EQUAL(value->Value, "value" + ToString(i)); + } + + auto iterator = storage->MakeIterator("foo", {}); + TVector<TQItem> res = DrainIterator(*iterator); + UNIT_ASSERT_VALUES_EQUAL(res.size(), N); + Sort(res); + for (size_t i = 0; i < N; ++i) { + UNIT_ASSERT_VALUES_EQUAL(res[i].Key.Component, "comp"); + UNIT_ASSERT_VALUES_EQUAL(res[i].Key.Label, "label" + ToString(i)); + UNIT_ASSERT_VALUES_EQUAL(res[i].Value, "value" + ToString(i)); + } + } + + Y_UNIT_TEST(InterleaveReadWrite) { + auto storage = MakeMemoryQStorage(); + auto reader = storage->MakeReader("foo"); + auto value = reader->Get({"comp", "label"}).GetValueSync(); + UNIT_ASSERT(!value.Defined()); + auto iterator1 = storage->MakeIterator("foo", {}); + value = iterator1->Next().GetValueSync(); + UNIT_ASSERT(!value.Defined()); + auto writer = storage->MakeWriter("foo"); + writer->Put({"comp", "label"}, "value").GetValueSync(); + value = reader->Get({"comp", "label"}).GetValueSync(); + UNIT_ASSERT(!value.Defined()); + auto iterator2 = storage->MakeIterator("foo", {}); + value = iterator2->Next().GetValueSync(); + UNIT_ASSERT(!value.Defined()); + writer->Commit().GetValueSync(); + value = reader->Get({"comp", "label"}).GetValueSync(); + UNIT_ASSERT(value.Defined()); + UNIT_ASSERT_VALUES_EQUAL(value->Key.Component, "comp"); + UNIT_ASSERT_VALUES_EQUAL(value->Key.Label, "label"); + UNIT_ASSERT_VALUES_EQUAL(value->Value, "value"); + auto iterator3 = storage->MakeIterator("foo", {}); + value = iterator3->Next().GetValueSync(); + UNIT_ASSERT(value.Defined()); + UNIT_ASSERT_VALUES_EQUAL(value->Key.Component, "comp"); + UNIT_ASSERT_VALUES_EQUAL(value->Key.Label, "label"); + UNIT_ASSERT_VALUES_EQUAL(value->Value, "value"); + value = iterator2->Next().GetValueSync(); + UNIT_ASSERT(!value.Defined()); + } +} diff --git a/ydb/library/yql/core/qplayer/storage/memory/ya.make b/ydb/library/yql/core/qplayer/storage/memory/ya.make new file mode 100644 index 0000000000..e5d9a7bc11 --- /dev/null +++ b/ydb/library/yql/core/qplayer/storage/memory/ya.make @@ -0,0 +1,15 @@ +LIBRARY() + +SRCS( + yql_qstorage_memory.cpp +) + +PEERDIR( + ydb/library/yql/core/qplayer/storage/interface +) + +END() + +RECURSE_FOR_TESTS( + ut +) diff --git a/ydb/library/yql/core/qplayer/storage/memory/yql_qstorage_memory.cpp b/ydb/library/yql/core/qplayer/storage/memory/yql_qstorage_memory.cpp new file mode 100644 index 0000000000..7bc35f9b5b --- /dev/null +++ b/ydb/library/yql/core/qplayer/storage/memory/yql_qstorage_memory.cpp @@ -0,0 +1,136 @@ +#include "yql_qstorage_memory.h" + +#include <util/generic/hash.h> +#include <util/system/mutex.h> + +namespace NYql { + +namespace { + +struct TOperationMap { + TMutex Mutex; + using TMap = THashMap<TQItemKey, TString>; + TMap ReadMap, WriteMap; +}; + +using TOperationMapPtr = std::shared_ptr<TOperationMap>; + +struct TState { + TMutex Mutex; + using TMap = THashMap<TString, TOperationMapPtr>; + TMap Operations; +}; + +using TStatePtr = std::shared_ptr<TState>; + +class TReader : public IQReader { +public: + TReader(const TOperationMapPtr& operation) + : Operation_(operation) + {} + + NThreading::TFuture<TMaybe<TQItem>> Get(const TQItemKey& key) const final { + with_lock(Operation_->Mutex) { + auto it = Operation_->ReadMap.find(key); + if (it == Operation_->ReadMap.end()) { + return NThreading::MakeFuture(TMaybe<TQItem>()); + } + + return NThreading::MakeFuture(TMaybe<TQItem>(TQItem({key, it->second}))); + } + } + +private: + const TOperationMapPtr Operation_; +}; + +class TWriter : public IQWriter { +public: + TWriter(const TOperationMapPtr& operation) + : Operation_(operation) + {} + + NThreading::TFuture<void> Put(const TQItemKey& key, const TString& value) final { + with_lock(Operation_->Mutex) { + Operation_->WriteMap[key] = value; + return NThreading::MakeFuture(); + } + } + + NThreading::TFuture<void> Commit() final { + with_lock(Operation_->Mutex) { + Operation_->ReadMap = Operation_->WriteMap; + return NThreading::MakeFuture(); + } + } + +private: + const TOperationMapPtr Operation_; +}; + +class TIterator : public IQIterator { +public: + TIterator(const TQIteratorSettings& settings, TOperationMap::TMap map) + : Settings_(settings) + , Map_(std::move(map)) + , It_(Map_.begin()) + {} + + NThreading::TFuture<TMaybe<TQItem>> Next() final { + if (It_ == Map_.end()) { + return NThreading::MakeFuture(TMaybe<TQItem>()); + } + + auto res =TMaybe<TQItem>({It_->first, Settings_.DoNotLoadValue ? TString() : It_->second}); + ++It_; + return NThreading::MakeFuture(res); + } + +private: + const TQIteratorSettings Settings_; + const TOperationMap::TMap Map_; + TOperationMap::TMap::const_iterator It_; +}; + +class TStorage : public IQStorage { +public: + TStorage() + : State_(std::make_shared<TState>()) + { + } + IQReaderPtr MakeReader(const TString& operationId) const final { + return std::make_shared<TReader>(GetOperation(operationId)); + } + + IQWriterPtr MakeWriter(const TString& operationId) const final { + return std::make_shared<TWriter>(GetOperation(operationId)); + } + + IQIteratorPtr MakeIterator(const TString& operationId, const TQIteratorSettings& settings) const final { + // clones the whole map for given operation id + return std::make_shared<TIterator>(settings, GetOperation(operationId)->ReadMap); + } + +private: + TOperationMapPtr GetOperation(const TString& operationId) const { + with_lock(State_->Mutex) { + auto &op = State_->Operations[operationId]; + if (!op) { + op = std::make_shared<TOperationMap>(); + } + + return op; + } + } + +private: + TStatePtr State_; +}; + +} + +IQStoragePtr MakeMemoryQStorage() { + return std::make_shared<TStorage>(); +} + +} diff --git a/ydb/library/yql/core/qplayer/storage/memory/yql_qstorage_memory.h b/ydb/library/yql/core/qplayer/storage/memory/yql_qstorage_memory.h new file mode 100644 index 0000000000..500d1987fa --- /dev/null +++ b/ydb/library/yql/core/qplayer/storage/memory/yql_qstorage_memory.h @@ -0,0 +1,8 @@ +#pragma once +#include <ydb/library/yql/core/qplayer/storage/interface/yql_qstorage.h> + +namespace NYql { + +IQStoragePtr MakeMemoryQStorage(); + +}; diff --git a/ydb/library/yql/core/qplayer/storage/ya.make b/ydb/library/yql/core/qplayer/storage/ya.make new file mode 100644 index 0000000000..9cfc306f18 --- /dev/null +++ b/ydb/library/yql/core/qplayer/storage/ya.make @@ -0,0 +1,5 @@ +RECURSE( + interface + memory +) + diff --git a/ydb/library/yql/core/qplayer/ya.make b/ydb/library/yql/core/qplayer/ya.make new file mode 100644 index 0000000000..25965070b6 --- /dev/null +++ b/ydb/library/yql/core/qplayer/ya.make @@ -0,0 +1,4 @@ +RECURSE( + storage +) + diff --git a/ydb/library/yql/core/ya.make b/ydb/library/yql/core/ya.make index 8445b45165..02f4eb87ff 100644 --- a/ydb/library/yql/core/ya.make +++ b/ydb/library/yql/core/ya.make @@ -108,6 +108,7 @@ RECURSE( file_storage issue peephole_opt + qplayer services spilling sql_types |