diff options
author | ivanmorozov <ivanmorozov@yandex-team.com> | 2023-11-29 13:26:02 +0300 |
---|---|---|
committer | ivanmorozov <ivanmorozov@yandex-team.com> | 2023-11-29 14:58:59 +0300 |
commit | 10247a5dc67d1216fb8033107cc97a5864d3b8c9 (patch) | |
tree | 118963f2155d7887c6e7ed4f0d3658cc5c8e57aa | |
parent | f0965a0dfb017878d5aa66400f1cd8ff6460e1a8 (diff) | |
download | ydb-10247a5dc67d1216fb8033107cc97a5864d3b8c9.tar.gz |
KIKIMR-20316: fix incorrect loader usage for column restore
14 files changed, 55 insertions, 20 deletions
diff --git a/ydb/core/formats/arrow/common/CMakeLists.darwin-arm64.txt b/ydb/core/formats/arrow/common/CMakeLists.darwin-arm64.txt index 757b729018..f26f834bd6 100644 --- a/ydb/core/formats/arrow/common/CMakeLists.darwin-arm64.txt +++ b/ydb/core/formats/arrow/common/CMakeLists.darwin-arm64.txt @@ -12,6 +12,7 @@ target_link_libraries(formats-arrow-common PUBLIC contrib-libs-cxxsupp yutil libs-apache-arrow + cpp-actors-core ) target_sources(formats-arrow-common PRIVATE ${CMAKE_SOURCE_DIR}/ydb/core/formats/arrow/common/validation.cpp diff --git a/ydb/core/formats/arrow/common/CMakeLists.darwin-x86_64.txt b/ydb/core/formats/arrow/common/CMakeLists.darwin-x86_64.txt index 757b729018..f26f834bd6 100644 --- a/ydb/core/formats/arrow/common/CMakeLists.darwin-x86_64.txt +++ b/ydb/core/formats/arrow/common/CMakeLists.darwin-x86_64.txt @@ -12,6 +12,7 @@ target_link_libraries(formats-arrow-common PUBLIC contrib-libs-cxxsupp yutil libs-apache-arrow + cpp-actors-core ) target_sources(formats-arrow-common PRIVATE ${CMAKE_SOURCE_DIR}/ydb/core/formats/arrow/common/validation.cpp diff --git a/ydb/core/formats/arrow/common/CMakeLists.linux-aarch64.txt b/ydb/core/formats/arrow/common/CMakeLists.linux-aarch64.txt index c351a86aff..15e24a461d 100644 --- a/ydb/core/formats/arrow/common/CMakeLists.linux-aarch64.txt +++ b/ydb/core/formats/arrow/common/CMakeLists.linux-aarch64.txt @@ -13,6 +13,7 @@ target_link_libraries(formats-arrow-common PUBLIC contrib-libs-cxxsupp yutil libs-apache-arrow + cpp-actors-core ) target_sources(formats-arrow-common PRIVATE ${CMAKE_SOURCE_DIR}/ydb/core/formats/arrow/common/validation.cpp diff --git a/ydb/core/formats/arrow/common/CMakeLists.linux-x86_64.txt b/ydb/core/formats/arrow/common/CMakeLists.linux-x86_64.txt index c351a86aff..15e24a461d 100644 --- a/ydb/core/formats/arrow/common/CMakeLists.linux-x86_64.txt +++ b/ydb/core/formats/arrow/common/CMakeLists.linux-x86_64.txt @@ -13,6 +13,7 @@ target_link_libraries(formats-arrow-common PUBLIC contrib-libs-cxxsupp yutil libs-apache-arrow + cpp-actors-core ) target_sources(formats-arrow-common PRIVATE ${CMAKE_SOURCE_DIR}/ydb/core/formats/arrow/common/validation.cpp diff --git a/ydb/core/formats/arrow/common/CMakeLists.windows-x86_64.txt b/ydb/core/formats/arrow/common/CMakeLists.windows-x86_64.txt index 757b729018..f26f834bd6 100644 --- a/ydb/core/formats/arrow/common/CMakeLists.windows-x86_64.txt +++ b/ydb/core/formats/arrow/common/CMakeLists.windows-x86_64.txt @@ -12,6 +12,7 @@ target_link_libraries(formats-arrow-common PUBLIC contrib-libs-cxxsupp yutil libs-apache-arrow + cpp-actors-core ) target_sources(formats-arrow-common PRIVATE ${CMAKE_SOURCE_DIR}/ydb/core/formats/arrow/common/validation.cpp diff --git a/ydb/core/formats/arrow/common/validation.cpp b/ydb/core/formats/arrow/common/validation.cpp index ca069631e7..7cc69cdc27 100644 --- a/ydb/core/formats/arrow/common/validation.cpp +++ b/ydb/core/formats/arrow/common/validation.cpp @@ -1,5 +1,10 @@ #include "validation.h" +#include <library/cpp/actors/core/log.h> namespace NKikimr::NArrow { +void TStatusValidator::Validate(const arrow::Status& status) { + AFL_VERIFY(status.ok())("problem", status.ToString().c_str()); +} + } diff --git a/ydb/core/formats/arrow/common/validation.h b/ydb/core/formats/arrow/common/validation.h index 59f4364cc9..f71f18ece5 100644 --- a/ydb/core/formats/arrow/common/validation.h +++ b/ydb/core/formats/arrow/common/validation.h @@ -8,19 +8,17 @@ namespace NKikimr::NArrow { class TStatusValidator { public: - static void Validate(const arrow::Status& status) { - Y_ABORT_UNLESS(status.ok(), "%s", status.ToString().c_str()); - } + static void Validate(const arrow::Status& status); template <class T> static T GetValid(const arrow::Result<T>& result) { - Y_ABORT_UNLESS(result.ok(), "%s", result.status().ToString().c_str()); + Validate(result.status()); return *result; } template <class T> static T GetValid(arrow::Result<T>&& result) { - Y_ABORT_UNLESS(result.ok(), "%s", result.status().ToString().c_str()); + Validate(result.status()); return std::move(*result); } }; diff --git a/ydb/core/formats/arrow/common/ya.make b/ydb/core/formats/arrow/common/ya.make index bf6f79458a..cafa60e9b4 100644 --- a/ydb/core/formats/arrow/common/ya.make +++ b/ydb/core/formats/arrow/common/ya.make @@ -2,6 +2,7 @@ LIBRARY() PEERDIR( contrib/libs/apache/arrow + library/cpp/actors/core ) SRCS( diff --git a/ydb/core/tx/columnshard/engines/changes/compaction/column_cursor.cpp b/ydb/core/tx/columnshard/engines/changes/compaction/column_cursor.cpp index 95179bc534..77d6cf592c 100644 --- a/ydb/core/tx/columnshard/engines/changes/compaction/column_cursor.cpp +++ b/ydb/core/tx/columnshard/engines/changes/compaction/column_cursor.cpp @@ -8,7 +8,7 @@ bool TPortionColumnCursor::Fetch(TMergedColumn& column) { Y_ABORT_UNLESS(RecordIndexStart); ui32 currentStartPortionIdx = *RecordIndexStart; ui32 currentFinishPortionIdx = RecordIndexFinish; - +// NActors::TLogContextGuard lg(NActors::TLogContextBuilder::Build()("portion_id", PortionId)); while (currentStartPortionIdx - ChunkRecordIndexStartPosition >= CurrentColumnChunk->GetMeta().GetNumRowsVerified()) { if (!NextChunk()) { return false; @@ -18,11 +18,11 @@ bool TPortionColumnCursor::Fetch(TMergedColumn& column) { ui32 currentStart = currentStartPortionIdx - ChunkRecordIndexStartPosition; while (currentFinishPortionIdx - ChunkRecordIndexStartPosition >= CurrentColumnChunk->GetMeta().GetNumRowsVerified()) { const ui32 currentFinish = CurrentColumnChunk->GetMeta().GetNumRowsVerified(); - if (currentStart == 0) { - column.AppendBlob(CurrentBlobChunk->GetData(), *CurrentColumnChunk); - } else { +// if (currentStart == 0) { +// column.AppendBlob(CurrentBlobChunk->GetData(), *CurrentColumnChunk); +// } else { column.AppendSlice(GetCurrentArray(), currentStart, currentFinish - currentStart); - } +// } currentStart = 0; if (!NextChunk()) { return false; diff --git a/ydb/core/tx/columnshard/engines/changes/compaction/column_cursor.h b/ydb/core/tx/columnshard/engines/changes/compaction/column_cursor.h index a631ceb24d..330deaf936 100644 --- a/ydb/core/tx/columnshard/engines/changes/compaction/column_cursor.h +++ b/ydb/core/tx/columnshard/engines/changes/compaction/column_cursor.h @@ -20,6 +20,7 @@ private: const TColumnRecord* CurrentColumnChunk = nullptr; std::shared_ptr<arrow::Array> CurrentArray; std::shared_ptr<TColumnLoader> ColumnLoader; + const ui64 PortionId; const std::shared_ptr<arrow::Array>& GetCurrentArray(); @@ -36,11 +37,13 @@ public: bool Fetch(TMergedColumn& column); - TPortionColumnCursor(const std::vector<IPortionColumnChunk::TPtr>& columnChunks, const std::vector<const TColumnRecord*>& records, const std::shared_ptr<TColumnLoader> loader) + TPortionColumnCursor(const std::vector<IPortionColumnChunk::TPtr>& columnChunks, const std::vector<const TColumnRecord*>& records, const std::shared_ptr<TColumnLoader>& loader, const ui64 portionId) : BlobChunks(columnChunks) , ColumnChunks(records) , ColumnLoader(loader) + , PortionId(portionId) { + Y_UNUSED(PortionId); Y_ABORT_UNLESS(BlobChunks.size()); Y_ABORT_UNLESS(ColumnChunks.size() == BlobChunks.size()); CurrentBlobChunk = BlobChunks.front(); diff --git a/ydb/core/tx/columnshard/engines/changes/general_compaction.cpp b/ydb/core/tx/columnshard/engines/changes/general_compaction.cpp index e165bb59e8..7da321a4aa 100644 --- a/ydb/core/tx/columnshard/engines/changes/general_compaction.cpp +++ b/ydb/core/tx/columnshard/engines/changes/general_compaction.cpp @@ -18,16 +18,12 @@ namespace NKikimr::NOlap::NCompaction { TConclusionStatus TGeneralCompactColumnEngineChanges::DoConstructBlobs(TConstructionContext& context) noexcept { std::vector<TPortionInfoWithBlobs> portions = TPortionInfoWithBlobs::RestorePortions(SwitchedPortions, Blobs); Blobs.clear(); - std::optional<TSnapshot> maxSnapshot; i64 portionsSize = 0; i64 portionsCount = 0; i64 insertedPortionsSize = 0; i64 compactedPortionsSize = 0; i64 otherPortionsSize = 0; for (auto&& i : SwitchedPortions) { - if (!maxSnapshot || *maxSnapshot < i.GetMinSnapshot()) { - maxSnapshot = i.GetMinSnapshot(); - } if (i.GetMeta().GetProduced() == TPortionMeta::EProduced::INSERTED) { insertedPortionsSize += i.GetBlobBytes(); } else if (i.GetMeta().GetProduced() == TPortionMeta::EProduced::SPLIT_COMPACTED) { @@ -40,7 +36,6 @@ TConclusionStatus TGeneralCompactColumnEngineChanges::DoConstructBlobs(TConstruc } NChanges::TGeneralCompactionCounters::OnPortionsKind(insertedPortionsSize, compactedPortionsSize, otherPortionsSize); NChanges::TGeneralCompactionCounters::OnRepackPortions(portionsCount, portionsSize); - Y_ABORT_UNLESS(maxSnapshot); static const TString portionIdFieldName = "$$__portion_id"; static const TString portionRecordIndexFieldName = "$$__portion_record_idx"; @@ -90,18 +85,24 @@ TConclusionStatus TGeneralCompactColumnEngineChanges::DoConstructBlobs(TConstruc std::vector<std::map<std::string, std::vector<TColumnPortionResult>>> chunkGroups; chunkGroups.resize(batchResults.size()); +// Cerr << context.SchemaVersions.DebugString() << Endl; for (auto&& f : resultSchema->GetSchema()->fields()) { + NActors::TLogContextGuard logGuard(NActors::TLogContextBuilder::Build()("field_name", f->name())); const ui32 columnId = resultSchema->GetColumnId(f->name()); auto columnInfo = stats->GetColumnInfo(columnId); Y_ABORT_UNLESS(columnInfo); std::vector<TPortionColumnCursor> cursors; - auto loader = resultSchema->GetColumnLoader(f->name()); +// Cerr << f->name() << Endl; for (auto&& p : portions) { +// Cerr << p.GetPortionInfo().DebugString() << Endl; + auto dataSchema = context.SchemaVersions.GetSchema(p.GetPortionInfo().GetMinSnapshot()); + auto loader = dataSchema->GetColumnLoader(f->name()); +// Cerr << "loader: " << loader->DebugString() << Endl; std::vector<const TColumnRecord*> records; std::vector<IPortionColumnChunk::TPtr> chunks; p.ExtractColumnChunks(columnId, records, chunks); - cursors.emplace_back(TPortionColumnCursor(chunks, records, loader)); + cursors.emplace_back(TPortionColumnCursor(chunks, records, loader, p.GetPortionInfo().GetPortionId())); } ui32 batchesRecordsCount = 0; @@ -182,7 +183,7 @@ TConclusionStatus TGeneralCompactColumnEngineChanges::DoConstructBlobs(TConstruc TGeneralSerializedSlice slice(std::move(i)); auto b = batchResult->Slice(recordIdx, slice.GetRecordsCount()); std::vector<std::vector<IPortionColumnChunk::TPtr>> chunksByBlobs = slice.GroupChunksByBlobs(); - AppendedPortions.emplace_back(TPortionInfoWithBlobs::BuildByBlobs(chunksByBlobs, nullptr, GranuleMeta->GetPathId(), *maxSnapshot, SaverContext.GetStorageOperator())); + AppendedPortions.emplace_back(TPortionInfoWithBlobs::BuildByBlobs(chunksByBlobs, nullptr, GranuleMeta->GetPathId(), resultSchema->GetSnapshot(), SaverContext.GetStorageOperator())); NArrow::TFirstLastSpecialKeys primaryKeys(slice.GetFirstLastPKBatch(resultSchema->GetIndexInfo().GetReplaceKey())); NArrow::TMinMaxSpecialKeys snapshotKeys(b, TIndexInfo::ArrowSchemaSnapshot()); AppendedPortions.back().GetPortionInfo().AddMetadata(*resultSchema, primaryKeys, snapshotKeys, SaverContext.GetTierName()); diff --git a/ydb/core/tx/columnshard/engines/column_engine.h b/ydb/core/tx/columnshard/engines/column_engine.h index 1cb1f87a21..d074e1510d 100644 --- a/ydb/core/tx/columnshard/engines/column_engine.h +++ b/ydb/core/tx/columnshard/engines/column_engine.h @@ -292,6 +292,14 @@ class TVersionedIndex { std::map<ui64, ISnapshotSchema::TPtr> SnapshotByVersion; ui64 LastSchemaVersion = 0; public: + TString DebugString() const { + TStringBuilder sb; + for (auto&& i : Snapshots) { + sb << i.first << ":" << i.second->DebugString() << ";"; + } + return sb; + } + ISnapshotSchema::TPtr GetSchema(const ui64 version) const { auto it = SnapshotByVersion.find(version); return it == SnapshotByVersion.end() ? nullptr : it->second; diff --git a/ydb/core/tx/columnshard/engines/scheme/column_features.h b/ydb/core/tx/columnshard/engines/scheme/column_features.h index ee99be18a0..d2f0b21b77 100644 --- a/ydb/core/tx/columnshard/engines/scheme/column_features.h +++ b/ydb/core/tx/columnshard/engines/scheme/column_features.h @@ -136,6 +136,15 @@ private: : ColumnId(columnId) { } public: + + TString DebugString() const { + TStringBuilder sb; + sb << "compression=" << (Compression ? Compression->DebugString() : "NO") << ";"; + sb << "encoding=" << (DictionaryEncoding ? DictionaryEncoding->DebugString() : "NO") << ";"; + sb << "loader=" << (Loader ? Loader->DebugString() : "NO") << ";"; + return sb; + } + NArrow::NTransformation::ITransformer::TPtr GetSaveTransformer() const; static std::optional<TColumnFeatures> BuildFromProto(const NKikimrSchemeOp::TOlapColumnDescription& columnInfo, const TIndexInfo& indexInfo); static TColumnFeatures BuildFromIndexInfo(const ui32 columnId, const TIndexInfo& indexInfo); diff --git a/ydb/core/tx/columnshard/engines/scheme/index_info.h b/ydb/core/tx/columnshard/engines/scheme/index_info.h index a9aaf7d087..a50e3dbeef 100644 --- a/ydb/core/tx/columnshard/engines/scheme/index_info.h +++ b/ydb/core/tx/columnshard/engines/scheme/index_info.h @@ -49,11 +49,16 @@ public: }; TString DebugString() const { - return TStringBuilder() << "(" + TStringBuilder sb; + sb << "(" << "id=" << Id << ";" << "version=" << Version << ";" << "name=" << Name << ";" << ")"; + for (auto&& i : ColumnFeatures) { + sb << GetColumnName(i.first) << ":" << i.second.DebugString() << ";"; + } + return sb; } /// Appends the special columns to the batch. |