diff options
author | ivanmorozov333 <ivanmorozov@ydb.tech> | 2024-09-29 07:11:51 +0300 |
---|---|---|
committer | GitHub <noreply@github.com> | 2024-09-29 07:11:51 +0300 |
commit | e8a4121de1a15f1baef5c2ef6300817b6abcf44c (patch) | |
tree | 1e15d1a4f1e0056b396c3ad6fae579b92e02c7c2 | |
parent | 06c6c6616b5a7ed6174bfe16d24228b741e4242e (diff) | |
download | ydb-e8a4121de1a15f1baef5c2ef6300817b6abcf44c.tar.gz |
accessors actualization (#9857)
-rw-r--r-- | ydb/core/formats/arrow/accessor/abstract/constructor.h | 32 | ||||
-rw-r--r-- | ydb/core/formats/arrow/accessor/plain/constructor.cpp | 11 | ||||
-rw-r--r-- | ydb/core/formats/arrow/accessor/plain/constructor.h | 7 | ||||
-rw-r--r-- | ydb/core/formats/arrow/accessor/sparsed/accessor.h | 5 | ||||
-rw-r--r-- | ydb/core/formats/arrow/accessor/sparsed/constructor.cpp | 6 | ||||
-rw-r--r-- | ydb/core/formats/arrow/accessor/sparsed/constructor.h | 7 | ||||
-rw-r--r-- | ydb/core/formats/arrow/save_load/loader.cpp | 21 | ||||
-rw-r--r-- | ydb/core/formats/arrow/save_load/loader.h | 13 | ||||
-rw-r--r-- | ydb/core/kqp/ut/olap/sparsed_ut.cpp | 55 | ||||
-rw-r--r-- | ydb/core/tx/columnshard/engines/scheme/column/info.cpp | 8 |
10 files changed, 144 insertions, 21 deletions
diff --git a/ydb/core/formats/arrow/accessor/abstract/constructor.h b/ydb/core/formats/arrow/accessor/abstract/constructor.h index aa99260e097..8cbef70a2a4 100644 --- a/ydb/core/formats/arrow/accessor/abstract/constructor.h +++ b/ydb/core/formats/arrow/accessor/abstract/constructor.h @@ -25,10 +25,23 @@ private: virtual TString DoDebugString() const { return ""; } + virtual bool DoIsEqualWithSameTypeTo(const IConstructor& item) const = 0; + virtual std::shared_ptr<arrow::RecordBatch> DoConstruct( + const std::shared_ptr<IChunkedArray>& columnData, const TChunkConstructionData& externalInfo) const = 0; public: virtual ~IConstructor() = default; + std::shared_ptr<arrow::RecordBatch> Construct( + const std::shared_ptr<IChunkedArray>& columnData, const TChunkConstructionData& externalInfo) const { + AFL_VERIFY(columnData); + return DoConstruct(columnData, externalInfo); + } + + bool IsEqualWithSameTypeTo(const IConstructor& item) const { + return DoIsEqualWithSameTypeTo(item); + } + TString DebugString() const { return TStringBuilder() << GetClassName() << ":" << DoDebugString(); } @@ -65,10 +78,27 @@ public: class TConstructorContainer: public NBackgroundTasks::TInterfaceProtoContainer<IConstructor> { private: using TBase = NBackgroundTasks::TInterfaceProtoContainer<IConstructor>; - public: using TBase::TBase; + bool IsEqualTo(const TConstructorContainer& item) const { + if (!GetObjectPtr() && !item.GetObjectPtr()) { + return true; + } else if (!!GetObjectPtr() && !!item.GetObjectPtr()) { + if (GetObjectPtr()->GetClassName() != item.GetObjectPtr()->GetClassName()) { + return false; + } + return GetObjectPtr()->IsEqualWithSameTypeTo(*item.GetObjectPtr()); + } else { + return false; + } + } + + std::shared_ptr<arrow::RecordBatch> Construct(const std::shared_ptr<IChunkedArray>& batch, const TChunkConstructionData& externalInfo) const { + AFL_VERIFY(!!GetObjectPtr()); + return GetObjectPtr()->Construct(batch, externalInfo); + } + static TConstructorContainer GetDefaultConstructor(); }; diff --git a/ydb/core/formats/arrow/accessor/plain/constructor.cpp b/ydb/core/formats/arrow/accessor/plain/constructor.cpp index 3ecf41502b3..2a2b4657596 100644 --- a/ydb/core/formats/arrow/accessor/plain/constructor.cpp +++ b/ydb/core/formats/arrow/accessor/plain/constructor.cpp @@ -2,8 +2,11 @@ #include "constructor.h" #include <ydb/library/formats/arrow/accessor/abstract/accessor.h> +#include <ydb/library/formats/arrow/arrow_helpers.h> #include <ydb/library/formats/arrow/simple_arrays_cache.h> + #include <contrib/libs/apache/arrow/cpp/src/arrow/record_batch.h> +#include <contrib/libs/apache/arrow/cpp/src/arrow/table.h> namespace NKikimr::NArrow::NAccessor::NPlain { @@ -30,4 +33,12 @@ std::shared_ptr<arrow::Schema> TConstructor::DoGetExpectedSchema(const std::shar return std::make_shared<arrow::Schema>(arrow::FieldVector({ resultColumn })); } +std::shared_ptr<arrow::RecordBatch> TConstructor::DoConstruct( + const std::shared_ptr<IChunkedArray>& columnData, const TChunkConstructionData& externalInfo) const { + auto chunked = columnData->GetChunkedArray(); + auto schema = std::make_shared<arrow::Schema>(arrow::FieldVector({ std::make_shared<arrow::Field>("val", externalInfo.GetColumnType()) })); + auto table = arrow::Table::Make(schema, { chunked }, columnData->GetRecordsCount()); + return NArrow::ToBatch(table, true); +} + } // namespace NKikimr::NArrow::NAccessor::NPlain diff --git a/ydb/core/formats/arrow/accessor/plain/constructor.h b/ydb/core/formats/arrow/accessor/plain/constructor.h index 57c366689eb..243c1e47dec 100644 --- a/ydb/core/formats/arrow/accessor/plain/constructor.h +++ b/ydb/core/formats/arrow/accessor/plain/constructor.h @@ -12,6 +12,13 @@ public: private: static inline auto Registrator = TFactory::TRegistrator<TConstructor>(GetClassNameStatic()); + + virtual bool DoIsEqualWithSameTypeTo(const IConstructor& /*item*/) const override { + return true; + } + + virtual std::shared_ptr<arrow::RecordBatch> DoConstruct( + const std::shared_ptr<IChunkedArray>& columnData, const TChunkConstructionData& externalInfo) const override; virtual TConclusion<std::shared_ptr<NArrow::NAccessor::IChunkedArray>> DoConstruct( const std::shared_ptr<arrow::RecordBatch>& originalData, const TChunkConstructionData& externalInfo) const override; virtual NKikimrArrowAccessorProto::TConstructor DoSerializeToProto() const override; diff --git a/ydb/core/formats/arrow/accessor/sparsed/accessor.h b/ydb/core/formats/arrow/accessor/sparsed/accessor.h index 04022496223..07a1a2465ce 100644 --- a/ydb/core/formats/arrow/accessor/sparsed/accessor.h +++ b/ydb/core/formats/arrow/accessor/sparsed/accessor.h @@ -153,6 +153,11 @@ public: return chunk.GetScalar(index - chunk.GetStartPosition()); } + std::shared_ptr<arrow::RecordBatch> GetRecordBatchVerified() const { + AFL_VERIFY(Records.size() == 1)("size", Records.size()); + return Records.front().GetRecords(); + } + const TSparsedArrayChunk& GetSparsedChunk(const ui64 position) const { const auto pred = [](const ui64 position, const TSparsedArrayChunk& item) { return position < item.GetStartPosition(); diff --git a/ydb/core/formats/arrow/accessor/sparsed/constructor.cpp b/ydb/core/formats/arrow/accessor/sparsed/constructor.cpp index e3f45cd7532..2962636c367 100644 --- a/ydb/core/formats/arrow/accessor/sparsed/constructor.cpp +++ b/ydb/core/formats/arrow/accessor/sparsed/constructor.cpp @@ -31,4 +31,10 @@ bool TConstructor::DoDeserializeFromProto(const NKikimrArrowAccessorProto::TCons return true; } +std::shared_ptr<arrow::RecordBatch> TConstructor::DoConstruct( + const std::shared_ptr<IChunkedArray>& columnData, const TChunkConstructionData& externalInfo) const { + NArrow::NAccessor::TSparsedArray sparsed(*columnData, externalInfo.GetDefaultValue()); + return sparsed.GetRecordBatchVerified(); +} + } // namespace NKikimr::NArrow::NAccessor::NSparsed diff --git a/ydb/core/formats/arrow/accessor/sparsed/constructor.h b/ydb/core/formats/arrow/accessor/sparsed/constructor.h index 0ccf5efdd70..a85b8918eaf 100644 --- a/ydb/core/formats/arrow/accessor/sparsed/constructor.h +++ b/ydb/core/formats/arrow/accessor/sparsed/constructor.h @@ -12,6 +12,13 @@ public: private: static inline auto Registrator = TFactory::TRegistrator<TConstructor>(GetClassNameStatic()); + + virtual bool DoIsEqualWithSameTypeTo(const IConstructor& /*item*/) const override { + return true; + } + virtual std::shared_ptr<arrow::RecordBatch> DoConstruct( + const std::shared_ptr<IChunkedArray>& columnData, const TChunkConstructionData& externalInfo) const override; + virtual TConclusion<std::shared_ptr<NArrow::NAccessor::IChunkedArray>> DoConstruct( const std::shared_ptr<arrow::RecordBatch>& originalData, const TChunkConstructionData& externalInfo) const override; virtual NKikimrArrowAccessorProto::TConstructor DoSerializeToProto() const override; diff --git a/ydb/core/formats/arrow/save_load/loader.cpp b/ydb/core/formats/arrow/save_load/loader.cpp index c9328f751d4..33685100bd1 100644 --- a/ydb/core/formats/arrow/save_load/loader.cpp +++ b/ydb/core/formats/arrow/save_load/loader.cpp @@ -51,9 +51,13 @@ std::shared_ptr<arrow::RecordBatch> TColumnLoader::ApplyRawVerified(const TStrin return TStatusValidator::GetValid(Apply(data)); } +TChunkConstructionData TColumnLoader::BuildAccessorContext(const ui32 recordsCount) const { + return TChunkConstructionData(recordsCount, DefaultValue, ResultField->type()); +} + std::shared_ptr<IChunkedArray> TColumnLoader::ApplyVerified(const TString& dataStr, const ui32 recordsCount) const { auto data = TStatusValidator::GetValid(Apply(dataStr)); - return BuildAccessor(data, TChunkConstructionData(recordsCount, DefaultValue, ResultField->type())); + return BuildAccessor(data, BuildAccessorContext(recordsCount)); } std::shared_ptr<IChunkedArray> TColumnLoader::BuildAccessor( @@ -65,4 +69,19 @@ std::shared_ptr<NKikimr::NArrow::NAccessor::IChunkedArray> TColumnLoader::BuildD return AccessorConstructor->ConstructDefault(TChunkConstructionData(recordsCount, DefaultValue, ResultField->type())).DetachResult(); } +bool TColumnLoader::IsEqualTo(const TColumnLoader& item) const { + if (!!Transformer != !!item.Transformer) { + return false; + } else if (!!Transformer && !Transformer->IsEqualTo(*item.Transformer)) { + return false; + } + if (!Serializer.IsEqualTo(item.Serializer)) { + return false; + } + if (!AccessorConstructor.IsEqualTo(item.AccessorConstructor)) { + return false; + } + return true; +} + } // namespace NKikimr::NArrow::NAccessor diff --git a/ydb/core/formats/arrow/save_load/loader.h b/ydb/core/formats/arrow/save_load/loader.h index 2d3119ac3fa..eb5a2740ef9 100644 --- a/ydb/core/formats/arrow/save_load/loader.h +++ b/ydb/core/formats/arrow/save_load/loader.h @@ -25,17 +25,7 @@ private: public: std::shared_ptr<IChunkedArray> BuildDefaultAccessor(const ui32 recordsCount) const; - bool IsEqualTo(const TColumnLoader& item) const { - if (!!Transformer != !!item.Transformer) { - return false; - } else if (!!Transformer && !Transformer->IsEqualTo(*item.Transformer)) { - return false; - } - if (!Serializer.IsEqualTo(item.Serializer)) { - return false; - } - return true; - } + bool IsEqualTo(const TColumnLoader& item) const; TString DebugString() const; @@ -49,6 +39,7 @@ public: const std::shared_ptr<arrow::Field>& GetField() const; + TChunkConstructionData BuildAccessorContext(const ui32 recordsCount) const; std::shared_ptr<IChunkedArray> ApplyVerified(const TString& data, const ui32 expectedRecordsCount) const; std::shared_ptr<arrow::RecordBatch> ApplyRawVerified(const TString& data) const; }; diff --git a/ydb/core/kqp/ut/olap/sparsed_ut.cpp b/ydb/core/kqp/ut/olap/sparsed_ut.cpp index 73b75f2cc53..bdd895e8e24 100644 --- a/ydb/core/kqp/ut/olap/sparsed_ut.cpp +++ b/ydb/core/kqp/ut/olap/sparsed_ut.cpp @@ -8,6 +8,7 @@ #include <ydb/core/tx/columnshard/hooks/testing/controller.h> #include <ydb/core/base/tablet_pipecache.h> #include <ydb/core/wrappers/fake_storage.h> +#include <ydb/core/tx/columnshard/blobs_action/common/const.h> namespace NKikimr::NKqp { @@ -21,13 +22,12 @@ Y_UNIT_TEST_SUITE(KqpOlapSparsed) { const TString StoreName; ui32 MultiColumnRepCount = 100; static const ui32 SKIP_GROUPS = 7; - const TVector<TString> FIELD_NAMES{"utf", "int", "uint", "float", "double"}; + const TVector<TString> FIELD_NAMES{ "utf", "int", "uint", "float", "double" }; public: TSparsedDataTest(const TString& storeName) : Kikimr(Settings) , CSController(NKikimr::NYDBTest::TControllers::RegisterCSControllerGuard<NKikimr::NYDBTest::NColumnShard::TController>()) - , StoreName(storeName) - { + , StoreName(storeName) { } @@ -79,7 +79,7 @@ Y_UNIT_TEST_SUITE(KqpOlapSparsed) { Fill(&counts[0], &counts[FIELD_NAMES.size() * groupsCount], 0); - for (auto& row: rows) { + for (auto& row : rows) { auto incCounts = [&](ui32 i, const TString& column) { if (*NYdb::TValueParser(row.at(column)).GetOptionalBool()) { counts[i]++; @@ -94,7 +94,7 @@ Y_UNIT_TEST_SUITE(KqpOlapSparsed) { incCounts(ind++, "def_float" + grStr); incCounts(ind++, "def_double" + grStr); } - } + } } void CheckAllFieldsTable(bool firstCall, ui32 countExpectation, ui32* defCountStart) { @@ -169,7 +169,7 @@ Y_UNIT_TEST_SUITE(KqpOlapSparsed) { NArrow::NConstruction::TStringPoolFiller sPool(1000, 52, "abcde", frq); helper.FillTable(sPool, shiftKff, 10000); }, - [&](bool firstCall) { + [&](bool firstCall) { CheckTable("field", "'abcde'", firstCall, countExpectation, defCountStart); }); } @@ -181,7 +181,7 @@ Y_UNIT_TEST_SUITE(KqpOlapSparsed) { TTypedLocalHelper helper("Utf8", Kikimr); helper.FillMultiColumnTable(MultiColumnRepCount, shiftKff, 10000); }, - [&](bool firstCall) { + [&](bool firstCall) { CheckAllFieldsTable(firstCall, countExpectation, defCountStart); }); } @@ -302,6 +302,47 @@ Y_UNIT_TEST_SUITE(KqpOlapSparsed) { TSparsedDataTest test(""); test.Execute(); } + + Y_UNIT_TEST(AccessorActualization) { + auto settings = TKikimrSettings().SetWithSampleTables(false); + TKikimrRunner kikimr(settings); + + auto csController = NYDBTest::TControllers::RegisterCSControllerGuard<NYDBTest::NColumnShard::TController>(); + csController->SetOverridePeriodicWakeupActivationPeriod(TDuration::Seconds(1)); + csController->SetOverrideLagForCompactionBeforeTierings(TDuration::Seconds(1)); + csController->SetOverrideReduceMemoryIntervalLimit(1LLU << 30); + + TLocalHelper helper(kikimr); + helper.SetOptionalStorageId(NOlap::NBlobOperations::TGlobal::DefaultStorageId); + helper.CreateTestOlapTable(); + auto tableClient = kikimr.GetTableClient(); + + auto session = tableClient.CreateSession().GetValueSync().GetSession(); + Tests::NCommon::TLoggerInit(kikimr).SetComponents({ NKikimrServices::TX_COLUMNSHARD }, "CS").SetPriority(NActors::NLog::PRI_DEBUG).Initialize(); + + WriteTestData(kikimr, "/Root/olapStore/olapTable", 1000000, 300000000, 10000); + csController->WaitIndexation(TDuration::Seconds(3)); + + { + auto result = session.ExecuteSchemeQuery("ALTER OBJECT `/Root/olapStore` (TYPE TABLESTORE) SET (ACTION=ALTER_COLUMN, NAME=uid, `DATA_ACCESSOR_CONSTRUCTOR.CLASS_NAME`=`SPARSED`)").GetValueSync(); + UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), NYdb::EStatus::SUCCESS, result.GetIssues().ToString()); + } + + { + auto result = session.ExecuteSchemeQuery("ALTER OBJECT `/Root/olapStore` (TYPE TABLESTORE) SET (ACTION=UPSERT_OPTIONS, SCHEME_NEED_ACTUALIZATION=`true`)").GetValueSync(); + UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), NYdb::EStatus::SUCCESS, result.GetIssues().ToString()); + } + csController->WaitActualization(TDuration::Seconds(5)); + { + auto it = tableClient.StreamExecuteScanQuery(R"( + --!syntax_v1 + SELECT count(uid) FROM `/Root/olapStore/olapTable` + )").GetValueSync(); + UNIT_ASSERT_C(it.IsSuccess(), it.GetIssues().ToString()); + Cerr << StreamResultToYson(it) << Endl; + } + } + } } // namespace diff --git a/ydb/core/tx/columnshard/engines/scheme/column/info.cpp b/ydb/core/tx/columnshard/engines/scheme/column/info.cpp index f139bc63bcd..c8e95fdb1db 100644 --- a/ydb/core/tx/columnshard/engines/scheme/column/info.cpp +++ b/ydb/core/tx/columnshard/engines/scheme/column/info.cpp @@ -91,7 +91,13 @@ std::vector<std::shared_ptr<NKikimr::NOlap::IPortionDataChunk>> TSimpleColumnInf } std::vector<std::shared_ptr<IPortionDataChunk>> result; for (auto&& s : source) { - auto data = sourceColumnFeatures.Loader->ApplyRawVerified(s->GetData()); + std::shared_ptr<arrow::RecordBatch> data; + if (!DataAccessorConstructor.IsEqualTo(sourceColumnFeatures.DataAccessorConstructor)) { + auto chunkedArray = sourceColumnFeatures.Loader->ApplyVerified(s->GetData(), s->GetRecordsCountVerified()); + data = DataAccessorConstructor.Construct(chunkedArray, Loader->BuildAccessorContext(s->GetRecordsCountVerified())); + } else { + data = sourceColumnFeatures.Loader->ApplyRawVerified(s->GetData()); + } result.emplace_back(s->CopyWithAnotherBlob(GetColumnSaver().Apply(data), *this)); } return result; |