aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorivanmorozov333 <ivanmorozov@ydb.tech>2025-02-28 14:23:08 +0300
committerGitHub <noreply@github.com>2025-02-28 14:23:08 +0300
commitcaf2adefb93a6b62bdaa42a7161f00e794e241c6 (patch)
treeb3e9170242a544db985d96aafe78ce3d5df352f5
parent0db71bac9c6b70b5ca8bf8e7590c30e6cf7e6df7 (diff)
downloadydb-caf2adefb93a6b62bdaa42a7161f00e794e241c6.tar.gz
many sub columns request usage correction (#15086)
-rw-r--r--ydb/core/formats/arrow/accessor/abstract/accessor.cpp2
-rw-r--r--ydb/core/kqp/ut/olap/json_ut.cpp70
-rw-r--r--ydb/core/tx/columnshard/engines/reader/common_reader/iterator/fetching.cpp2
-rw-r--r--ydb/core/tx/columnshard/engines/reader/common_reader/iterator/sub_columns_fetching.h53
-rw-r--r--ydb/core/tx/columnshard/engines/scheme/index_info.cpp3
5 files changed, 108 insertions, 22 deletions
diff --git a/ydb/core/formats/arrow/accessor/abstract/accessor.cpp b/ydb/core/formats/arrow/accessor/abstract/accessor.cpp
index d4b9c3535c..e39c3a566f 100644
--- a/ydb/core/formats/arrow/accessor/abstract/accessor.cpp
+++ b/ydb/core/formats/arrow/accessor/abstract/accessor.cpp
@@ -75,7 +75,7 @@ IChunkedArray::TFullDataAddress IChunkedArray::GetChunk(const std::optional<TAdd
IChunkedArray::TFullChunkedArrayAddress IChunkedArray::GetArray(
const std::optional<TAddressChain>& chunkCurrent, const ui64 position, const std::shared_ptr<IChunkedArray>& selfPtr) const {
- AFL_VERIFY(position < GetRecordsCount());
+ AFL_VERIFY(position < GetRecordsCount())("pos", position)("records_count", GetRecordsCount());
if (IsDataOwner()) {
AFL_VERIFY(selfPtr);
TAddressChain chain;
diff --git a/ydb/core/kqp/ut/olap/json_ut.cpp b/ydb/core/kqp/ut/olap/json_ut.cpp
index 6f1fe9af45..dccff1e609 100644
--- a/ydb/core/kqp/ut/olap/json_ut.cpp
+++ b/ydb/core/kqp/ut/olap/json_ut.cpp
@@ -102,6 +102,24 @@ Y_UNIT_TEST_SUITE(KqpOlapJson) {
}
};
+ class TOneActualizationCommand: public ICommand {
+ private:
+ virtual TConclusionStatus DoExecute(TKikimrRunner& kikimr) override {
+ {
+ auto alterQuery =
+ TStringBuilder()
+ << "ALTER OBJECT `/Root/ColumnTable` (TYPE TABLE) SET (ACTION=UPSERT_OPTIONS, SCHEME_NEED_ACTUALIZATION=`true`);";
+ auto session = kikimr.GetTableClient().CreateSession().GetValueSync().GetSession();
+ auto alterResult = session.ExecuteSchemeQuery(alterQuery).GetValueSync();
+ AFL_VERIFY(alterResult.GetStatus() == NYdb::EStatus::SUCCESS)("error", alterResult.GetIssues().ToString());
+ }
+ auto controller = NYDBTest::TControllers::GetControllerAs<NYDBTest::NColumnShard::TController>();
+ AFL_VERIFY(controller);
+ controller->WaitActualization(TDuration::Seconds(10));
+ return TConclusionStatus::Success();
+ }
+ };
+
class TOneCompactionCommand: public ICommand {
private:
virtual TConclusionStatus DoExecute(TKikimrRunner& /*kikimr*/) override {
@@ -203,6 +221,8 @@ Y_UNIT_TEST_SUITE(KqpOlapJson) {
return std::make_shared<TStopCompactionCommand>();
} else if (command.StartsWith("ONE_COMPACTION")) {
return std::make_shared<TOneCompactionCommand>();
+ } else if (command.StartsWith("ONE_ACTUALIZATION")) {
+ return std::make_shared<TOneActualizationCommand>();
} else {
AFL_VERIFY(false)("command", command);
return nullptr;
@@ -616,7 +636,55 @@ Y_UNIT_TEST_SUITE(KqpOlapJson) {
)";
TScriptVariator(script).Execute();
}
-
+/*
+ Y_UNIT_TEST(BloomIndexesVariants) {
+ TString script = R"(
+ STOP_COMPACTION
+ ------
+ SCHEMA:
+ CREATE TABLE `/Root/ColumnTable` (
+ Col1 Uint64 NOT NULL,
+ Col2 JsonDocument,
+ PRIMARY KEY (Col1)
+ )
+ PARTITION BY HASH(Col1)
+ WITH (STORE = COLUMN, AUTO_PARTITIONING_MIN_PARTITIONS_COUNT = $$1|2$$);
+ ------
+ SCHEMA:
+ ALTER OBJECT `/Root/ColumnTable` (TYPE TABLE) SET (ACTION=UPSERT_OPTIONS, `SCAN_READER_POLICY_NAME`=`SIMPLE`)
+ ------
+ SCHEMA:
+ ALTER OBJECT `/Root/ColumnTable` (TYPE TABLE) SET (ACTION=ALTER_COLUMN, NAME=Col2, `DATA_ACCESSOR_CONSTRUCTOR.CLASS_NAME`=`SUB_COLUMNS`,
+ `COLUMNS_LIMIT`=`$$0|1|1024$$`, `SPARSED_DETECTOR_KFF`=`$$0|10|1000$$`, `MEM_LIMIT_CHUNK`=`$$0|100|1000000$$`, `OTHERS_ALLOWED_FRACTION`=`$$0|0.5$$`)
+ ------
+ SCHEMA:
+ ALTER OBJECT `/Root/ColumnTable` (TYPE TABLE) SET (ACTION=ALTER_COLUMN, NAME=Col2, `DATA_ACCESSOR_CONSTRUCTOR.CLASS_NAME`=`SUB_COLUMNS`,
+ `COLUMNS_LIMIT`=`$$0|1|1024$$`, `SPARSED_DETECTOR_KFF`=`$$0|10|1000$$`, `MEM_LIMIT_CHUNK`=`$$0|100|1000000$$`, `OTHERS_ALLOWED_FRACTION`=`$$0|0.5$$`)
+ ------
+ DATA:
+ REPLACE INTO `/Root/ColumnTable` (Col1, Col2) VALUES(1u, JsonDocument('{"a" : "a1"}')), (2u, JsonDocument('{"a" : "a2"}')),
+ (3u, JsonDocument('{"b" : "b3"}')), (4u, JsonDocument('{"b" : "b4", "a" : "a4"}'))
+ ------
+ DATA:
+ REPLACE INTO `/Root/ColumnTable` (Col1, Col2) VALUES(11u, JsonDocument('{"a" : "1a1"}')), (12u, JsonDocument('{"a" : "1a2"}')),
+ (13u, JsonDocument('{"b" : "1b3"}')), (14u, JsonDocument('{"b" : "1b4", "a" : "a4"}'))
+ ------
+ SCHEMA:
+ ALTER OBJECT `/Root/ColumnTable` (TYPE TABLE) SET (ACTION=UPSERT_INDEX, NAME=a_index, TYPE=BLOOM_FILTER,
+ FEATURES=`{"column_names" : ["Col2"], "data_extractor" : {"class_name" : "SUB_COLUMN", "sub_column_name" : "a"}, "false_positive_probability" : 0.05}`)
+ ------
+ DATA:
+ REPLACE INTO `/Root/ColumnTable` (Col1) VALUES(10u)
+ ------
+ ONE_ACTUALIZATION
+ ------
+ READ: SELECT * FROM `/Root/ColumnTable` WHERE JSON_VALUE(Col2, "$.a") = "1a1" ORDER BY Col1;
+ EXPECTED: [[11u;["{\"a\":\"1a1\"}"]]]
+
+ )";
+ TScriptVariator(script).Execute();
+ }
+*/
Y_UNIT_TEST(SwitchAccessorCompactionVariants) {
TString script = R"(
STOP_COMPACTION
diff --git a/ydb/core/tx/columnshard/engines/reader/common_reader/iterator/fetching.cpp b/ydb/core/tx/columnshard/engines/reader/common_reader/iterator/fetching.cpp
index c19b790244..937f049a43 100644
--- a/ydb/core/tx/columnshard/engines/reader/common_reader/iterator/fetching.cpp
+++ b/ydb/core/tx/columnshard/engines/reader/common_reader/iterator/fetching.cpp
@@ -181,7 +181,7 @@ TConclusion<bool> TProgramStepPrepare::DoExecuteInplace(const std::shared_ptr<ID
source->GetStageData().GetTable()->Remove(i.GetColumnId());
}
std::shared_ptr<IKernelFetchLogic> logic;
- if (customFetchInfo->GetFullRestore()) {
+ if (customFetchInfo->GetFullRestore() || source->GetStageData().GetPortionAccessor().GetColumnChunksPointers(i.GetColumnId()).empty()) {
logic = std::make_shared<TDefaultFetchLogic>(i.GetColumnId(), source);
} else {
AFL_VERIFY(customFetchInfo->GetSubColumns().size());
diff --git a/ydb/core/tx/columnshard/engines/reader/common_reader/iterator/sub_columns_fetching.h b/ydb/core/tx/columnshard/engines/reader/common_reader/iterator/sub_columns_fetching.h
index b202115625..75b6f2f4bb 100644
--- a/ydb/core/tx/columnshard/engines/reader/common_reader/iterator/sub_columns_fetching.h
+++ b/ydb/core/tx/columnshard/engines/reader/common_reader/iterator/sub_columns_fetching.h
@@ -78,8 +78,10 @@ public:
i.second.GetBlobDataVerified().size());
std::vector<NArrow::NAccessor::TDeserializeChunkedArray::TChunk> chunks = { NArrow::NAccessor::TDeserializeChunkedArray::TChunk(
GetRecordsCount(), i.second.GetBlobDataVerified()) };
- const std::shared_ptr<NArrow::NAccessor::IChunkedArray> arrOriginal = deserialize
- ? columnLoader->ApplyVerified(i.second.GetBlobDataVerified(), GetRecordsCount())
+// const ui32 filledRecordsCount = PartialArray->GetHeader().GetColumnStats().GetColumnRecordsCount(i.second.GetColumnIdx());
+ const std::shared_ptr<NArrow::NAccessor::IChunkedArray> arrOriginal =
+ deserialize
+ ? columnLoader->ApplyVerified(i.second.GetBlobDataVerified(), GetRecordsCount()/*, filledRecordsCount*/)
: std::make_shared<NArrow::NAccessor::TDeserializeChunkedArray>(GetRecordsCount(), columnLoader, std::move(chunks), true);
if (applyFilter) {
PartialArray->AddColumn(i.first, applyFilter->Apply(arrOriginal));
@@ -124,14 +126,12 @@ public:
// "others", PartialArray->GetHeader().GetOtherStats().DebugJson().GetStringRobust());
}
- void InitPartialReader(
- const ui32 columnId, const ui32 positionStart, const std::shared_ptr<NArrow::NAccessor::TAccessorsCollection>& resources) {
+ void InitPartialReader(const std::shared_ptr<NArrow::NAccessor::IChunkedArray>& accessor) {
AFL_VERIFY(!HeaderRange);
AFL_VERIFY(!PartialArray);
- auto columnAccessor = resources->GetAccessorVerified(columnId);
- auto partialArray = columnAccessor->GetArraySlow(positionStart, columnAccessor);
- AFL_VERIFY(partialArray.GetArray()->GetType() == NArrow::NAccessor::IChunkedArray::EType::SubColumnsPartialArray);
- PartialArray = std::static_pointer_cast<NArrow::NAccessor::TSubColumnsPartialArray>(partialArray.GetArray());
+ AFL_VERIFY(accessor);
+ AFL_VERIFY(accessor->GetType() == NArrow::NAccessor::IChunkedArray::EType::SubColumnsPartialArray)("type", accessor->GetType());
+ PartialArray = std::static_pointer_cast<NArrow::NAccessor::TSubColumnsPartialArray>(accessor);
}
TColumnChunkRestoreInfo(const TBlobRange& fullChunkRange, const NArrow::NAccessor::TChunkConstructionData& chunkExternalInfo)
@@ -179,8 +179,10 @@ private:
}
Resources->AddVerified(GetColumnId(), compositeBuilder.Finish(), true);
} else {
+ ui32 pos = 0;
for (auto&& i : ColumnChunks) {
- i.Finish(Resources->GetAppliedFilter(), Source);
+ i.Finish(std::make_shared<NArrow::TColumnFilter>(Resources->GetAppliedFilter()->Slice(pos, i.GetRecordsCount())), Source);
+ pos += i.GetRecordsCount();
}
}
}
@@ -238,23 +240,36 @@ private:
auto itFilter = cFilter.GetIterator(false, Source->GetRecordsCount());
bool itFinished = false;
- NeedToAddResource = !Resources->HasColumn(GetColumnId());
- ui32 posCurrent = 0;
- for (auto&& c : columnChunks) {
+ auto accessor = Resources->GetAccessorOptional(GetColumnId());
+ NeedToAddResource = !accessor;
+ std::vector<std::shared_ptr<NArrow::NAccessor::IChunkedArray>> chunks;
+ if (!NeedToAddResource) {
+ if (accessor->GetType() == NArrow::NAccessor::IChunkedArray::EType::CompositeChunkedArray) {
+ auto composite = std::static_pointer_cast<NArrow::NAccessor::TCompositeChunkedArray>(accessor);
+ chunks = composite->GetChunks();
+ } else {
+ chunks.emplace_back(accessor);
+ }
+ }
+ ui32 resChunkIdx = 0;
+ for (ui32 chunkIdx = 0; chunkIdx < columnChunks.size(); ++chunkIdx) {
+ auto& meta = columnChunks[chunkIdx]->GetMeta();
AFL_VERIFY(!itFinished);
- if (!itFilter.IsBatchForSkip(c->GetMeta().GetRecordsCount())) {
- const TBlobRange range = Source->RestoreBlobRange(c->BlobRange);
- ColumnChunks.emplace_back(range, ChunkExternalInfo.GetSubset(c->GetMeta().GetRecordsCount()));
+ if (!itFilter.IsBatchForSkip(meta.GetRecordsCount())) {
+ const TBlobRange range = Source->RestoreBlobRange(columnChunks[chunkIdx]->BlobRange);
+ ColumnChunks.emplace_back(range, ChunkExternalInfo.GetSubset(meta.GetRecordsCount()));
if (!NeedToAddResource) {
- ColumnChunks.back().InitPartialReader(GetColumnId(), posCurrent, Resources);
+ AFL_VERIFY(resChunkIdx < chunks.size())("chunks", chunks.size())("meta", columnChunks.size())("need", NeedToAddResource);
+ ColumnChunks.back().InitPartialReader(chunks[resChunkIdx]);
+ ++resChunkIdx;
}
ColumnChunks.back().InitReading(reading, SubColumns);
} else {
- ColumnChunks.emplace_back(TColumnChunkRestoreInfo::BuildEmpty(ChunkExternalInfo.GetSubset(c->GetMeta().GetRecordsCount())));
+ ColumnChunks.emplace_back(TColumnChunkRestoreInfo::BuildEmpty(ChunkExternalInfo.GetSubset(meta.GetRecordsCount())));
}
- itFinished = !itFilter.Next(c->GetMeta().GetRecordsCount());
- posCurrent += c->GetMeta().GetRecordsCount();
+ itFinished = !itFilter.Next(meta.GetRecordsCount());
}
+ AFL_VERIFY(NeedToAddResource || (resChunkIdx == chunks.size()));
AFL_VERIFY(itFinished)("filter", itFilter.DebugString())("count", Source->GetRecordsCount());
for (auto&& i : blobsAction.GetReadingActions()) {
nextRead.Add(i);
diff --git a/ydb/core/tx/columnshard/engines/scheme/index_info.cpp b/ydb/core/tx/columnshard/engines/scheme/index_info.cpp
index 10861e3677..afdf9b333d 100644
--- a/ydb/core/tx/columnshard/engines/scheme/index_info.cpp
+++ b/ydb/core/tx/columnshard/engines/scheme/index_info.cpp
@@ -433,6 +433,9 @@ NKikimr::TConclusionStatus TIndexInfo::AppendIndex(const THashMap<ui32, std::vec
AFL_VERIFY(it != Indexes.end());
auto& index = it->second;
std::shared_ptr<IPortionDataChunk> chunk = index->BuildIndex(originalData, recordsCount, *this);
+ if (!chunk) {
+ return TConclusionStatus::Success();
+ }
auto opStorage = operators->GetOperatorVerified(index->GetStorageId());
if ((i64)chunk->GetPackedSize() > opStorage->GetBlobSplitSettings().GetMaxBlobSize()) {
return TConclusionStatus::Fail("blob size for secondary data (" + ::ToString(indexId) + ":" + ::ToString(chunk->GetPackedSize()) + ":" +