diff options
author | ivanmorozov333 <ivanmorozov@ydb.tech> | 2025-02-28 14:23:08 +0300 |
---|---|---|
committer | GitHub <noreply@github.com> | 2025-02-28 14:23:08 +0300 |
commit | caf2adefb93a6b62bdaa42a7161f00e794e241c6 (patch) | |
tree | b3e9170242a544db985d96aafe78ce3d5df352f5 | |
parent | 0db71bac9c6b70b5ca8bf8e7590c30e6cf7e6df7 (diff) | |
download | ydb-caf2adefb93a6b62bdaa42a7161f00e794e241c6.tar.gz |
many sub columns request usage correction (#15086)
5 files changed, 108 insertions, 22 deletions
diff --git a/ydb/core/formats/arrow/accessor/abstract/accessor.cpp b/ydb/core/formats/arrow/accessor/abstract/accessor.cpp index d4b9c3535c..e39c3a566f 100644 --- a/ydb/core/formats/arrow/accessor/abstract/accessor.cpp +++ b/ydb/core/formats/arrow/accessor/abstract/accessor.cpp @@ -75,7 +75,7 @@ IChunkedArray::TFullDataAddress IChunkedArray::GetChunk(const std::optional<TAdd IChunkedArray::TFullChunkedArrayAddress IChunkedArray::GetArray( const std::optional<TAddressChain>& chunkCurrent, const ui64 position, const std::shared_ptr<IChunkedArray>& selfPtr) const { - AFL_VERIFY(position < GetRecordsCount()); + AFL_VERIFY(position < GetRecordsCount())("pos", position)("records_count", GetRecordsCount()); if (IsDataOwner()) { AFL_VERIFY(selfPtr); TAddressChain chain; diff --git a/ydb/core/kqp/ut/olap/json_ut.cpp b/ydb/core/kqp/ut/olap/json_ut.cpp index 6f1fe9af45..dccff1e609 100644 --- a/ydb/core/kqp/ut/olap/json_ut.cpp +++ b/ydb/core/kqp/ut/olap/json_ut.cpp @@ -102,6 +102,24 @@ Y_UNIT_TEST_SUITE(KqpOlapJson) { } }; + class TOneActualizationCommand: public ICommand { + private: + virtual TConclusionStatus DoExecute(TKikimrRunner& kikimr) override { + { + auto alterQuery = + TStringBuilder() + << "ALTER OBJECT `/Root/ColumnTable` (TYPE TABLE) SET (ACTION=UPSERT_OPTIONS, SCHEME_NEED_ACTUALIZATION=`true`);"; + auto session = kikimr.GetTableClient().CreateSession().GetValueSync().GetSession(); + auto alterResult = session.ExecuteSchemeQuery(alterQuery).GetValueSync(); + AFL_VERIFY(alterResult.GetStatus() == NYdb::EStatus::SUCCESS)("error", alterResult.GetIssues().ToString()); + } + auto controller = NYDBTest::TControllers::GetControllerAs<NYDBTest::NColumnShard::TController>(); + AFL_VERIFY(controller); + controller->WaitActualization(TDuration::Seconds(10)); + return TConclusionStatus::Success(); + } + }; + class TOneCompactionCommand: public ICommand { private: virtual TConclusionStatus DoExecute(TKikimrRunner& /*kikimr*/) override { @@ -203,6 +221,8 @@ Y_UNIT_TEST_SUITE(KqpOlapJson) { return std::make_shared<TStopCompactionCommand>(); } else if (command.StartsWith("ONE_COMPACTION")) { return std::make_shared<TOneCompactionCommand>(); + } else if (command.StartsWith("ONE_ACTUALIZATION")) { + return std::make_shared<TOneActualizationCommand>(); } else { AFL_VERIFY(false)("command", command); return nullptr; @@ -616,7 +636,55 @@ Y_UNIT_TEST_SUITE(KqpOlapJson) { )"; TScriptVariator(script).Execute(); } - +/* + Y_UNIT_TEST(BloomIndexesVariants) { + TString script = R"( + STOP_COMPACTION + ------ + SCHEMA: + CREATE TABLE `/Root/ColumnTable` ( + Col1 Uint64 NOT NULL, + Col2 JsonDocument, + PRIMARY KEY (Col1) + ) + PARTITION BY HASH(Col1) + WITH (STORE = COLUMN, AUTO_PARTITIONING_MIN_PARTITIONS_COUNT = $$1|2$$); + ------ + SCHEMA: + ALTER OBJECT `/Root/ColumnTable` (TYPE TABLE) SET (ACTION=UPSERT_OPTIONS, `SCAN_READER_POLICY_NAME`=`SIMPLE`) + ------ + SCHEMA: + ALTER OBJECT `/Root/ColumnTable` (TYPE TABLE) SET (ACTION=ALTER_COLUMN, NAME=Col2, `DATA_ACCESSOR_CONSTRUCTOR.CLASS_NAME`=`SUB_COLUMNS`, + `COLUMNS_LIMIT`=`$$0|1|1024$$`, `SPARSED_DETECTOR_KFF`=`$$0|10|1000$$`, `MEM_LIMIT_CHUNK`=`$$0|100|1000000$$`, `OTHERS_ALLOWED_FRACTION`=`$$0|0.5$$`) + ------ + SCHEMA: + ALTER OBJECT `/Root/ColumnTable` (TYPE TABLE) SET (ACTION=ALTER_COLUMN, NAME=Col2, `DATA_ACCESSOR_CONSTRUCTOR.CLASS_NAME`=`SUB_COLUMNS`, + `COLUMNS_LIMIT`=`$$0|1|1024$$`, `SPARSED_DETECTOR_KFF`=`$$0|10|1000$$`, `MEM_LIMIT_CHUNK`=`$$0|100|1000000$$`, `OTHERS_ALLOWED_FRACTION`=`$$0|0.5$$`) + ------ + DATA: + REPLACE INTO `/Root/ColumnTable` (Col1, Col2) VALUES(1u, JsonDocument('{"a" : "a1"}')), (2u, JsonDocument('{"a" : "a2"}')), + (3u, JsonDocument('{"b" : "b3"}')), (4u, JsonDocument('{"b" : "b4", "a" : "a4"}')) + ------ + DATA: + REPLACE INTO `/Root/ColumnTable` (Col1, Col2) VALUES(11u, JsonDocument('{"a" : "1a1"}')), (12u, JsonDocument('{"a" : "1a2"}')), + (13u, JsonDocument('{"b" : "1b3"}')), (14u, JsonDocument('{"b" : "1b4", "a" : "a4"}')) + ------ + SCHEMA: + ALTER OBJECT `/Root/ColumnTable` (TYPE TABLE) SET (ACTION=UPSERT_INDEX, NAME=a_index, TYPE=BLOOM_FILTER, + FEATURES=`{"column_names" : ["Col2"], "data_extractor" : {"class_name" : "SUB_COLUMN", "sub_column_name" : "a"}, "false_positive_probability" : 0.05}`) + ------ + DATA: + REPLACE INTO `/Root/ColumnTable` (Col1) VALUES(10u) + ------ + ONE_ACTUALIZATION + ------ + READ: SELECT * FROM `/Root/ColumnTable` WHERE JSON_VALUE(Col2, "$.a") = "1a1" ORDER BY Col1; + EXPECTED: [[11u;["{\"a\":\"1a1\"}"]]] + + )"; + TScriptVariator(script).Execute(); + } +*/ Y_UNIT_TEST(SwitchAccessorCompactionVariants) { TString script = R"( STOP_COMPACTION diff --git a/ydb/core/tx/columnshard/engines/reader/common_reader/iterator/fetching.cpp b/ydb/core/tx/columnshard/engines/reader/common_reader/iterator/fetching.cpp index c19b790244..937f049a43 100644 --- a/ydb/core/tx/columnshard/engines/reader/common_reader/iterator/fetching.cpp +++ b/ydb/core/tx/columnshard/engines/reader/common_reader/iterator/fetching.cpp @@ -181,7 +181,7 @@ TConclusion<bool> TProgramStepPrepare::DoExecuteInplace(const std::shared_ptr<ID source->GetStageData().GetTable()->Remove(i.GetColumnId()); } std::shared_ptr<IKernelFetchLogic> logic; - if (customFetchInfo->GetFullRestore()) { + if (customFetchInfo->GetFullRestore() || source->GetStageData().GetPortionAccessor().GetColumnChunksPointers(i.GetColumnId()).empty()) { logic = std::make_shared<TDefaultFetchLogic>(i.GetColumnId(), source); } else { AFL_VERIFY(customFetchInfo->GetSubColumns().size()); diff --git a/ydb/core/tx/columnshard/engines/reader/common_reader/iterator/sub_columns_fetching.h b/ydb/core/tx/columnshard/engines/reader/common_reader/iterator/sub_columns_fetching.h index b202115625..75b6f2f4bb 100644 --- a/ydb/core/tx/columnshard/engines/reader/common_reader/iterator/sub_columns_fetching.h +++ b/ydb/core/tx/columnshard/engines/reader/common_reader/iterator/sub_columns_fetching.h @@ -78,8 +78,10 @@ public: i.second.GetBlobDataVerified().size()); std::vector<NArrow::NAccessor::TDeserializeChunkedArray::TChunk> chunks = { NArrow::NAccessor::TDeserializeChunkedArray::TChunk( GetRecordsCount(), i.second.GetBlobDataVerified()) }; - const std::shared_ptr<NArrow::NAccessor::IChunkedArray> arrOriginal = deserialize - ? columnLoader->ApplyVerified(i.second.GetBlobDataVerified(), GetRecordsCount()) +// const ui32 filledRecordsCount = PartialArray->GetHeader().GetColumnStats().GetColumnRecordsCount(i.second.GetColumnIdx()); + const std::shared_ptr<NArrow::NAccessor::IChunkedArray> arrOriginal = + deserialize + ? columnLoader->ApplyVerified(i.second.GetBlobDataVerified(), GetRecordsCount()/*, filledRecordsCount*/) : std::make_shared<NArrow::NAccessor::TDeserializeChunkedArray>(GetRecordsCount(), columnLoader, std::move(chunks), true); if (applyFilter) { PartialArray->AddColumn(i.first, applyFilter->Apply(arrOriginal)); @@ -124,14 +126,12 @@ public: // "others", PartialArray->GetHeader().GetOtherStats().DebugJson().GetStringRobust()); } - void InitPartialReader( - const ui32 columnId, const ui32 positionStart, const std::shared_ptr<NArrow::NAccessor::TAccessorsCollection>& resources) { + void InitPartialReader(const std::shared_ptr<NArrow::NAccessor::IChunkedArray>& accessor) { AFL_VERIFY(!HeaderRange); AFL_VERIFY(!PartialArray); - auto columnAccessor = resources->GetAccessorVerified(columnId); - auto partialArray = columnAccessor->GetArraySlow(positionStart, columnAccessor); - AFL_VERIFY(partialArray.GetArray()->GetType() == NArrow::NAccessor::IChunkedArray::EType::SubColumnsPartialArray); - PartialArray = std::static_pointer_cast<NArrow::NAccessor::TSubColumnsPartialArray>(partialArray.GetArray()); + AFL_VERIFY(accessor); + AFL_VERIFY(accessor->GetType() == NArrow::NAccessor::IChunkedArray::EType::SubColumnsPartialArray)("type", accessor->GetType()); + PartialArray = std::static_pointer_cast<NArrow::NAccessor::TSubColumnsPartialArray>(accessor); } TColumnChunkRestoreInfo(const TBlobRange& fullChunkRange, const NArrow::NAccessor::TChunkConstructionData& chunkExternalInfo) @@ -179,8 +179,10 @@ private: } Resources->AddVerified(GetColumnId(), compositeBuilder.Finish(), true); } else { + ui32 pos = 0; for (auto&& i : ColumnChunks) { - i.Finish(Resources->GetAppliedFilter(), Source); + i.Finish(std::make_shared<NArrow::TColumnFilter>(Resources->GetAppliedFilter()->Slice(pos, i.GetRecordsCount())), Source); + pos += i.GetRecordsCount(); } } } @@ -238,23 +240,36 @@ private: auto itFilter = cFilter.GetIterator(false, Source->GetRecordsCount()); bool itFinished = false; - NeedToAddResource = !Resources->HasColumn(GetColumnId()); - ui32 posCurrent = 0; - for (auto&& c : columnChunks) { + auto accessor = Resources->GetAccessorOptional(GetColumnId()); + NeedToAddResource = !accessor; + std::vector<std::shared_ptr<NArrow::NAccessor::IChunkedArray>> chunks; + if (!NeedToAddResource) { + if (accessor->GetType() == NArrow::NAccessor::IChunkedArray::EType::CompositeChunkedArray) { + auto composite = std::static_pointer_cast<NArrow::NAccessor::TCompositeChunkedArray>(accessor); + chunks = composite->GetChunks(); + } else { + chunks.emplace_back(accessor); + } + } + ui32 resChunkIdx = 0; + for (ui32 chunkIdx = 0; chunkIdx < columnChunks.size(); ++chunkIdx) { + auto& meta = columnChunks[chunkIdx]->GetMeta(); AFL_VERIFY(!itFinished); - if (!itFilter.IsBatchForSkip(c->GetMeta().GetRecordsCount())) { - const TBlobRange range = Source->RestoreBlobRange(c->BlobRange); - ColumnChunks.emplace_back(range, ChunkExternalInfo.GetSubset(c->GetMeta().GetRecordsCount())); + if (!itFilter.IsBatchForSkip(meta.GetRecordsCount())) { + const TBlobRange range = Source->RestoreBlobRange(columnChunks[chunkIdx]->BlobRange); + ColumnChunks.emplace_back(range, ChunkExternalInfo.GetSubset(meta.GetRecordsCount())); if (!NeedToAddResource) { - ColumnChunks.back().InitPartialReader(GetColumnId(), posCurrent, Resources); + AFL_VERIFY(resChunkIdx < chunks.size())("chunks", chunks.size())("meta", columnChunks.size())("need", NeedToAddResource); + ColumnChunks.back().InitPartialReader(chunks[resChunkIdx]); + ++resChunkIdx; } ColumnChunks.back().InitReading(reading, SubColumns); } else { - ColumnChunks.emplace_back(TColumnChunkRestoreInfo::BuildEmpty(ChunkExternalInfo.GetSubset(c->GetMeta().GetRecordsCount()))); + ColumnChunks.emplace_back(TColumnChunkRestoreInfo::BuildEmpty(ChunkExternalInfo.GetSubset(meta.GetRecordsCount()))); } - itFinished = !itFilter.Next(c->GetMeta().GetRecordsCount()); - posCurrent += c->GetMeta().GetRecordsCount(); + itFinished = !itFilter.Next(meta.GetRecordsCount()); } + AFL_VERIFY(NeedToAddResource || (resChunkIdx == chunks.size())); AFL_VERIFY(itFinished)("filter", itFilter.DebugString())("count", Source->GetRecordsCount()); for (auto&& i : blobsAction.GetReadingActions()) { nextRead.Add(i); diff --git a/ydb/core/tx/columnshard/engines/scheme/index_info.cpp b/ydb/core/tx/columnshard/engines/scheme/index_info.cpp index 10861e3677..afdf9b333d 100644 --- a/ydb/core/tx/columnshard/engines/scheme/index_info.cpp +++ b/ydb/core/tx/columnshard/engines/scheme/index_info.cpp @@ -433,6 +433,9 @@ NKikimr::TConclusionStatus TIndexInfo::AppendIndex(const THashMap<ui32, std::vec AFL_VERIFY(it != Indexes.end()); auto& index = it->second; std::shared_ptr<IPortionDataChunk> chunk = index->BuildIndex(originalData, recordsCount, *this); + if (!chunk) { + return TConclusionStatus::Success(); + } auto opStorage = operators->GetOperatorVerified(index->GetStorageId()); if ((i64)chunk->GetPackedSize() > opStorage->GetBlobSplitSettings().GetMaxBlobSize()) { return TConclusionStatus::Fail("blob size for secondary data (" + ::ToString(indexId) + ":" + ::ToString(chunk->GetPackedSize()) + ":" + |