aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorVitaly Stoyan <vvvv@ydb.tech>2024-04-22 16:26:05 +0300
committerGitHub <noreply@github.com>2024-04-22 16:26:05 +0300
commit0fea310a336608fa290c8efee866be9e83318335 (patch)
treedae4a48060d2a0a6aef627fd3f956de2a8fcf06e
parent5a8c9bf0fa1b630d1a488eb4ebf4703dce1da767 (diff)
downloadydb-0fea310a336608fa290c8efee866be9e83318335.tar.gz
QPlayer storage interfaces & in-memory implementation (#3993)
-rw-r--r--ydb/library/yql/core/qplayer/storage/interface/ya.make11
-rw-r--r--ydb/library/yql/core/qplayer/storage/interface/yql_qstorage.cpp1
-rw-r--r--ydb/library/yql/core/qplayer/storage/interface/yql_qstorage.h127
-rw-r--r--ydb/library/yql/core/qplayer/storage/memory/ut/ya.make8
-rw-r--r--ydb/library/yql/core/qplayer/storage/memory/ut/yql_qstorage_memory_ut.cpp128
-rw-r--r--ydb/library/yql/core/qplayer/storage/memory/ya.make15
-rw-r--r--ydb/library/yql/core/qplayer/storage/memory/yql_qstorage_memory.cpp136
-rw-r--r--ydb/library/yql/core/qplayer/storage/memory/yql_qstorage_memory.h8
-rw-r--r--ydb/library/yql/core/qplayer/storage/ya.make5
-rw-r--r--ydb/library/yql/core/qplayer/ya.make4
-rw-r--r--ydb/library/yql/core/ya.make1
11 files changed, 444 insertions, 0 deletions
diff --git a/ydb/library/yql/core/qplayer/storage/interface/ya.make b/ydb/library/yql/core/qplayer/storage/interface/ya.make
new file mode 100644
index 0000000000..4b35b17c95
--- /dev/null
+++ b/ydb/library/yql/core/qplayer/storage/interface/ya.make
@@ -0,0 +1,11 @@
+LIBRARY()
+
+SRCS(
+ yql_qstorage.cpp
+)
+
+PEERDIR(
+ library/cpp/threading/future
+)
+
+END()
diff --git a/ydb/library/yql/core/qplayer/storage/interface/yql_qstorage.cpp b/ydb/library/yql/core/qplayer/storage/interface/yql_qstorage.cpp
new file mode 100644
index 0000000000..43b073e130
--- /dev/null
+++ b/ydb/library/yql/core/qplayer/storage/interface/yql_qstorage.cpp
@@ -0,0 +1 @@
+#include "yql_qstorage.h"
diff --git a/ydb/library/yql/core/qplayer/storage/interface/yql_qstorage.h b/ydb/library/yql/core/qplayer/storage/interface/yql_qstorage.h
new file mode 100644
index 0000000000..7b8cecf9fd
--- /dev/null
+++ b/ydb/library/yql/core/qplayer/storage/interface/yql_qstorage.h
@@ -0,0 +1,127 @@
+#pragma once
+
+#include <library/cpp/threading/future/future.h>
+#include <util/digest/numeric.h>
+#include <util/generic/string.h>
+#include <memory>
+
+namespace NYql {
+
+struct TQItemKey {
+ TString Component;
+ TString Label;
+
+ bool operator==(const TQItemKey& other) const {
+ return Component == other.Component && Label == other.Label;
+ }
+
+ bool operator<(const TQItemKey& other) const {
+ return Component < other.Component || Component == other.Component && Label < other.Label;
+ }
+
+ size_t Hash() const {
+ return CombineHashes(
+ THash<TString>()(Component),
+ THash<TString>()(Label)
+ );
+ }
+};
+
+struct TQItem {
+ TQItemKey Key;
+ TString Value;
+
+ bool operator<(const TQItem& other) const {
+ return Key < other.Key;
+ }
+};
+
+class IQReader {
+public:
+ virtual ~IQReader() = default;
+
+ virtual NThreading::TFuture<TMaybe<TQItem>> Get(const TQItemKey& key) const = 0;
+};
+
+using IQReaderPtr = std::shared_ptr<IQReader>;
+
+class IQWriter {
+public:
+ virtual ~IQWriter() = default;
+
+ virtual NThreading::TFuture<void> Put(const TQItemKey& key, const TString& value) = 0;
+ virtual NThreading::TFuture<void> Commit() = 0;
+};
+
+using IQWriterPtr = std::shared_ptr<IQWriter>;
+
+class IQIterator {
+public:
+ virtual ~IQIterator() = default;
+
+ virtual NThreading::TFuture<TMaybe<TQItem>> Next() = 0;
+};
+
+using IQIteratorPtr = std::shared_ptr<IQIterator>;
+
+struct TQIteratorSettings {
+ bool DoNotLoadValue = false;
+ TMaybe<ui64> ItemsLimit;
+ TMaybe<ui64> BytesLimit;
+ TMaybe<ui32> ConcurrencyLimit;
+};
+
+class IQStorage {
+public:
+ virtual ~IQStorage() = default;
+
+ virtual IQWriterPtr MakeWriter(const TString& operationId) const = 0;
+ // readers & iterators may not see results of writer until commit
+ virtual IQReaderPtr MakeReader(const TString& operationId) const = 0;
+ virtual IQIteratorPtr MakeIterator(const TString& operationId, const TQIteratorSettings& settings) const = 0;
+};
+
+using IQStoragePtr = std::shared_ptr<IQStorage>;
+
+class TQContext {
+public:
+ TQContext()
+ {}
+
+ TQContext(IQReaderPtr reader)
+ : Reader_(reader)
+ {}
+
+ TQContext(IQWriterPtr writer)
+ : Writer_(writer)
+ {}
+
+ bool CanRead() const {
+ return Reader_ != nullptr;
+ }
+
+ bool CanWrite() const {
+ return Writer_ != nullptr;
+ }
+
+ const IQReaderPtr& GetReader() const {
+ return Reader_;
+ }
+
+ const IQWriterPtr& GetWriter() const {
+ return Writer_;
+ }
+
+private:
+ const IQReaderPtr Reader_;
+ const IQWriterPtr Writer_;
+};
+
+}
+
+template <>
+struct THash<NYql::TQItemKey> {
+ size_t operator()(const NYql::TQItemKey& value) const {
+ return value.Hash();
+ }
+};
diff --git a/ydb/library/yql/core/qplayer/storage/memory/ut/ya.make b/ydb/library/yql/core/qplayer/storage/memory/ut/ya.make
new file mode 100644
index 0000000000..1a0120d389
--- /dev/null
+++ b/ydb/library/yql/core/qplayer/storage/memory/ut/ya.make
@@ -0,0 +1,8 @@
+UNITTEST_FOR(ydb/library/yql/core/qplayer/storage/memory)
+
+SRCS(
+ yql_qstorage_memory_ut.cpp
+)
+
+END()
+
diff --git a/ydb/library/yql/core/qplayer/storage/memory/ut/yql_qstorage_memory_ut.cpp b/ydb/library/yql/core/qplayer/storage/memory/ut/yql_qstorage_memory_ut.cpp
new file mode 100644
index 0000000000..0b43294251
--- /dev/null
+++ b/ydb/library/yql/core/qplayer/storage/memory/ut/yql_qstorage_memory_ut.cpp
@@ -0,0 +1,128 @@
+#include <ydb/library/yql/core/qplayer/storage/memory/yql_qstorage_memory.h>
+
+#include <library/cpp/testing/unittest/registar.h>
+
+using namespace NYql;
+
+TVector<TQItem> DrainIterator(IQIterator& iterator) {
+ TVector<TQItem> res;
+ for (;;) {
+ auto value = iterator.Next().GetValueSync();
+ if (!value) {
+ break;
+ }
+
+ res.emplace_back(*value);
+ }
+
+ return res;
+}
+
+Y_UNIT_TEST_SUITE(TQStorageMemoryTests) {
+ Y_UNIT_TEST(Empty) {
+ auto storage = MakeMemoryQStorage();
+ auto reader = storage->MakeReader("foo");
+ UNIT_ASSERT(!reader->Get({"comp", "label"}).GetValueSync().Defined());
+ auto iterator = storage->MakeIterator("foo", {});
+ UNIT_ASSERT(!iterator->Next().GetValueSync().Defined());
+ }
+
+ Y_UNIT_TEST(One) {
+ auto storage = MakeMemoryQStorage();
+ auto writer = storage->MakeWriter("foo");
+ writer->Put({"comp", "label"}, "value").GetValueSync();
+ writer->Commit().GetValueSync();
+ auto reader = storage->MakeReader("foo");
+ auto value = reader->Get({"comp", "label"}).GetValueSync();
+ UNIT_ASSERT(value.Defined());
+ UNIT_ASSERT_VALUES_EQUAL(value->Key.Component, "comp");
+ UNIT_ASSERT_VALUES_EQUAL(value->Key.Label, "label");
+ UNIT_ASSERT_VALUES_EQUAL(value->Value, "value");
+ auto iterator = storage->MakeIterator("foo", {});
+ value = iterator->Next().GetValueSync();
+ UNIT_ASSERT(value.Defined());
+ UNIT_ASSERT_VALUES_EQUAL(value->Key.Component, "comp");
+ UNIT_ASSERT_VALUES_EQUAL(value->Key.Label, "label");
+ UNIT_ASSERT_VALUES_EQUAL(value->Value, "value");
+ value = iterator->Next().GetValueSync();
+ UNIT_ASSERT(!value.Defined());
+ }
+
+ Y_UNIT_TEST(IterateWithoutValue) {
+ auto storage = MakeMemoryQStorage();
+ auto writer = storage->MakeWriter("foo");
+ writer->Put({"comp", "label"}, "value").GetValueSync();
+ writer->Commit().GetValueSync();
+ auto reader = storage->MakeReader("foo");
+ auto settings = TQIteratorSettings{};
+ settings.DoNotLoadValue = true;
+ auto iterator = storage->MakeIterator("foo", settings);
+ auto value = iterator->Next().GetValueSync();
+ UNIT_ASSERT(value.Defined());
+ UNIT_ASSERT_VALUES_EQUAL(value->Key.Component, "comp");
+ UNIT_ASSERT_VALUES_EQUAL(value->Key.Label, "label");
+ UNIT_ASSERT_VALUES_EQUAL(value->Value, "");
+ value = iterator->Next().GetValueSync();
+ UNIT_ASSERT(!value.Defined());
+ }
+
+ Y_UNIT_TEST(ManyKeys) {
+ const size_t N = 10;
+ auto storage = MakeMemoryQStorage();
+ auto writer = storage->MakeWriter("foo");
+ for (size_t i = 0; i < N; ++i) {
+ writer->Put({"comp", "label" + ToString(i)}, "value" + ToString(i)).GetValueSync();
+ }
+
+ writer->Commit().GetValueSync();
+ auto reader = storage->MakeReader("foo");
+ for (size_t i = 0; i < N; ++i) {
+ auto value = reader->Get({"comp", "label" + ToString(i)}).GetValueSync();
+ UNIT_ASSERT(value.Defined());
+ UNIT_ASSERT_VALUES_EQUAL(value->Key.Component, "comp");
+ UNIT_ASSERT_VALUES_EQUAL(value->Key.Label, "label" + ToString(i));
+ UNIT_ASSERT_VALUES_EQUAL(value->Value, "value" + ToString(i));
+ }
+
+ auto iterator = storage->MakeIterator("foo", {});
+ TVector<TQItem> res = DrainIterator(*iterator);
+ UNIT_ASSERT_VALUES_EQUAL(res.size(), N);
+ Sort(res);
+ for (size_t i = 0; i < N; ++i) {
+ UNIT_ASSERT_VALUES_EQUAL(res[i].Key.Component, "comp");
+ UNIT_ASSERT_VALUES_EQUAL(res[i].Key.Label, "label" + ToString(i));
+ UNIT_ASSERT_VALUES_EQUAL(res[i].Value, "value" + ToString(i));
+ }
+ }
+
+ Y_UNIT_TEST(InterleaveReadWrite) {
+ auto storage = MakeMemoryQStorage();
+ auto reader = storage->MakeReader("foo");
+ auto value = reader->Get({"comp", "label"}).GetValueSync();
+ UNIT_ASSERT(!value.Defined());
+ auto iterator1 = storage->MakeIterator("foo", {});
+ value = iterator1->Next().GetValueSync();
+ UNIT_ASSERT(!value.Defined());
+ auto writer = storage->MakeWriter("foo");
+ writer->Put({"comp", "label"}, "value").GetValueSync();
+ value = reader->Get({"comp", "label"}).GetValueSync();
+ UNIT_ASSERT(!value.Defined());
+ auto iterator2 = storage->MakeIterator("foo", {});
+ value = iterator2->Next().GetValueSync();
+ UNIT_ASSERT(!value.Defined());
+ writer->Commit().GetValueSync();
+ value = reader->Get({"comp", "label"}).GetValueSync();
+ UNIT_ASSERT(value.Defined());
+ UNIT_ASSERT_VALUES_EQUAL(value->Key.Component, "comp");
+ UNIT_ASSERT_VALUES_EQUAL(value->Key.Label, "label");
+ UNIT_ASSERT_VALUES_EQUAL(value->Value, "value");
+ auto iterator3 = storage->MakeIterator("foo", {});
+ value = iterator3->Next().GetValueSync();
+ UNIT_ASSERT(value.Defined());
+ UNIT_ASSERT_VALUES_EQUAL(value->Key.Component, "comp");
+ UNIT_ASSERT_VALUES_EQUAL(value->Key.Label, "label");
+ UNIT_ASSERT_VALUES_EQUAL(value->Value, "value");
+ value = iterator2->Next().GetValueSync();
+ UNIT_ASSERT(!value.Defined());
+ }
+}
diff --git a/ydb/library/yql/core/qplayer/storage/memory/ya.make b/ydb/library/yql/core/qplayer/storage/memory/ya.make
new file mode 100644
index 0000000000..e5d9a7bc11
--- /dev/null
+++ b/ydb/library/yql/core/qplayer/storage/memory/ya.make
@@ -0,0 +1,15 @@
+LIBRARY()
+
+SRCS(
+ yql_qstorage_memory.cpp
+)
+
+PEERDIR(
+ ydb/library/yql/core/qplayer/storage/interface
+)
+
+END()
+
+RECURSE_FOR_TESTS(
+ ut
+)
diff --git a/ydb/library/yql/core/qplayer/storage/memory/yql_qstorage_memory.cpp b/ydb/library/yql/core/qplayer/storage/memory/yql_qstorage_memory.cpp
new file mode 100644
index 0000000000..7bc35f9b5b
--- /dev/null
+++ b/ydb/library/yql/core/qplayer/storage/memory/yql_qstorage_memory.cpp
@@ -0,0 +1,136 @@
+#include "yql_qstorage_memory.h"
+
+#include <util/generic/hash.h>
+#include <util/system/mutex.h>
+
+namespace NYql {
+
+namespace {
+
+struct TOperationMap {
+ TMutex Mutex;
+ using TMap = THashMap<TQItemKey, TString>;
+ TMap ReadMap, WriteMap;
+};
+
+using TOperationMapPtr = std::shared_ptr<TOperationMap>;
+
+struct TState {
+ TMutex Mutex;
+ using TMap = THashMap<TString, TOperationMapPtr>;
+ TMap Operations;
+};
+
+using TStatePtr = std::shared_ptr<TState>;
+
+class TReader : public IQReader {
+public:
+ TReader(const TOperationMapPtr& operation)
+ : Operation_(operation)
+ {}
+
+ NThreading::TFuture<TMaybe<TQItem>> Get(const TQItemKey& key) const final {
+ with_lock(Operation_->Mutex) {
+ auto it = Operation_->ReadMap.find(key);
+ if (it == Operation_->ReadMap.end()) {
+ return NThreading::MakeFuture(TMaybe<TQItem>());
+ }
+
+ return NThreading::MakeFuture(TMaybe<TQItem>(TQItem({key, it->second})));
+ }
+ }
+
+private:
+ const TOperationMapPtr Operation_;
+};
+
+class TWriter : public IQWriter {
+public:
+ TWriter(const TOperationMapPtr& operation)
+ : Operation_(operation)
+ {}
+
+ NThreading::TFuture<void> Put(const TQItemKey& key, const TString& value) final {
+ with_lock(Operation_->Mutex) {
+ Operation_->WriteMap[key] = value;
+ return NThreading::MakeFuture();
+ }
+ }
+
+ NThreading::TFuture<void> Commit() final {
+ with_lock(Operation_->Mutex) {
+ Operation_->ReadMap = Operation_->WriteMap;
+ return NThreading::MakeFuture();
+ }
+ }
+
+private:
+ const TOperationMapPtr Operation_;
+};
+
+class TIterator : public IQIterator {
+public:
+ TIterator(const TQIteratorSettings& settings, TOperationMap::TMap map)
+ : Settings_(settings)
+ , Map_(std::move(map))
+ , It_(Map_.begin())
+ {}
+
+ NThreading::TFuture<TMaybe<TQItem>> Next() final {
+ if (It_ == Map_.end()) {
+ return NThreading::MakeFuture(TMaybe<TQItem>());
+ }
+
+ auto res =TMaybe<TQItem>({It_->first, Settings_.DoNotLoadValue ? TString() : It_->second});
+ ++It_;
+ return NThreading::MakeFuture(res);
+ }
+
+private:
+ const TQIteratorSettings Settings_;
+ const TOperationMap::TMap Map_;
+ TOperationMap::TMap::const_iterator It_;
+};
+
+class TStorage : public IQStorage {
+public:
+ TStorage()
+ : State_(std::make_shared<TState>())
+ {
+ }
+ IQReaderPtr MakeReader(const TString& operationId) const final {
+ return std::make_shared<TReader>(GetOperation(operationId));
+ }
+
+ IQWriterPtr MakeWriter(const TString& operationId) const final {
+ return std::make_shared<TWriter>(GetOperation(operationId));
+ }
+
+ IQIteratorPtr MakeIterator(const TString& operationId, const TQIteratorSettings& settings) const final {
+ // clones the whole map for given operation id
+ return std::make_shared<TIterator>(settings, GetOperation(operationId)->ReadMap);
+ }
+
+private:
+ TOperationMapPtr GetOperation(const TString& operationId) const {
+ with_lock(State_->Mutex) {
+ auto &op = State_->Operations[operationId];
+ if (!op) {
+ op = std::make_shared<TOperationMap>();
+ }
+
+ return op;
+ }
+ }
+
+private:
+ TStatePtr State_;
+};
+
+}
+
+IQStoragePtr MakeMemoryQStorage() {
+ return std::make_shared<TStorage>();
+}
+
+}
diff --git a/ydb/library/yql/core/qplayer/storage/memory/yql_qstorage_memory.h b/ydb/library/yql/core/qplayer/storage/memory/yql_qstorage_memory.h
new file mode 100644
index 0000000000..500d1987fa
--- /dev/null
+++ b/ydb/library/yql/core/qplayer/storage/memory/yql_qstorage_memory.h
@@ -0,0 +1,8 @@
+#pragma once
+#include <ydb/library/yql/core/qplayer/storage/interface/yql_qstorage.h>
+
+namespace NYql {
+
+IQStoragePtr MakeMemoryQStorage();
+
+};
diff --git a/ydb/library/yql/core/qplayer/storage/ya.make b/ydb/library/yql/core/qplayer/storage/ya.make
new file mode 100644
index 0000000000..9cfc306f18
--- /dev/null
+++ b/ydb/library/yql/core/qplayer/storage/ya.make
@@ -0,0 +1,5 @@
+RECURSE(
+ interface
+ memory
+)
+
diff --git a/ydb/library/yql/core/qplayer/ya.make b/ydb/library/yql/core/qplayer/ya.make
new file mode 100644
index 0000000000..25965070b6
--- /dev/null
+++ b/ydb/library/yql/core/qplayer/ya.make
@@ -0,0 +1,4 @@
+RECURSE(
+ storage
+)
+
diff --git a/ydb/library/yql/core/ya.make b/ydb/library/yql/core/ya.make
index 8445b45165..02f4eb87ff 100644
--- a/ydb/library/yql/core/ya.make
+++ b/ydb/library/yql/core/ya.make
@@ -108,6 +108,7 @@ RECURSE(
file_storage
issue
peephole_opt
+ qplayer
services
spilling
sql_types