aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorivanmorozov <ivanmorozov@yandex-team.com>2023-11-29 13:26:02 +0300
committerivanmorozov <ivanmorozov@yandex-team.com>2023-11-29 14:58:59 +0300
commit10247a5dc67d1216fb8033107cc97a5864d3b8c9 (patch)
tree118963f2155d7887c6e7ed4f0d3658cc5c8e57aa
parentf0965a0dfb017878d5aa66400f1cd8ff6460e1a8 (diff)
downloadydb-10247a5dc67d1216fb8033107cc97a5864d3b8c9.tar.gz
KIKIMR-20316: fix incorrect loader usage for column restore
-rw-r--r--ydb/core/formats/arrow/common/CMakeLists.darwin-arm64.txt1
-rw-r--r--ydb/core/formats/arrow/common/CMakeLists.darwin-x86_64.txt1
-rw-r--r--ydb/core/formats/arrow/common/CMakeLists.linux-aarch64.txt1
-rw-r--r--ydb/core/formats/arrow/common/CMakeLists.linux-x86_64.txt1
-rw-r--r--ydb/core/formats/arrow/common/CMakeLists.windows-x86_64.txt1
-rw-r--r--ydb/core/formats/arrow/common/validation.cpp5
-rw-r--r--ydb/core/formats/arrow/common/validation.h8
-rw-r--r--ydb/core/formats/arrow/common/ya.make1
-rw-r--r--ydb/core/tx/columnshard/engines/changes/compaction/column_cursor.cpp10
-rw-r--r--ydb/core/tx/columnshard/engines/changes/compaction/column_cursor.h5
-rw-r--r--ydb/core/tx/columnshard/engines/changes/general_compaction.cpp17
-rw-r--r--ydb/core/tx/columnshard/engines/column_engine.h8
-rw-r--r--ydb/core/tx/columnshard/engines/scheme/column_features.h9
-rw-r--r--ydb/core/tx/columnshard/engines/scheme/index_info.h7
14 files changed, 55 insertions, 20 deletions
diff --git a/ydb/core/formats/arrow/common/CMakeLists.darwin-arm64.txt b/ydb/core/formats/arrow/common/CMakeLists.darwin-arm64.txt
index 757b729018..f26f834bd6 100644
--- a/ydb/core/formats/arrow/common/CMakeLists.darwin-arm64.txt
+++ b/ydb/core/formats/arrow/common/CMakeLists.darwin-arm64.txt
@@ -12,6 +12,7 @@ target_link_libraries(formats-arrow-common PUBLIC
contrib-libs-cxxsupp
yutil
libs-apache-arrow
+ cpp-actors-core
)
target_sources(formats-arrow-common PRIVATE
${CMAKE_SOURCE_DIR}/ydb/core/formats/arrow/common/validation.cpp
diff --git a/ydb/core/formats/arrow/common/CMakeLists.darwin-x86_64.txt b/ydb/core/formats/arrow/common/CMakeLists.darwin-x86_64.txt
index 757b729018..f26f834bd6 100644
--- a/ydb/core/formats/arrow/common/CMakeLists.darwin-x86_64.txt
+++ b/ydb/core/formats/arrow/common/CMakeLists.darwin-x86_64.txt
@@ -12,6 +12,7 @@ target_link_libraries(formats-arrow-common PUBLIC
contrib-libs-cxxsupp
yutil
libs-apache-arrow
+ cpp-actors-core
)
target_sources(formats-arrow-common PRIVATE
${CMAKE_SOURCE_DIR}/ydb/core/formats/arrow/common/validation.cpp
diff --git a/ydb/core/formats/arrow/common/CMakeLists.linux-aarch64.txt b/ydb/core/formats/arrow/common/CMakeLists.linux-aarch64.txt
index c351a86aff..15e24a461d 100644
--- a/ydb/core/formats/arrow/common/CMakeLists.linux-aarch64.txt
+++ b/ydb/core/formats/arrow/common/CMakeLists.linux-aarch64.txt
@@ -13,6 +13,7 @@ target_link_libraries(formats-arrow-common PUBLIC
contrib-libs-cxxsupp
yutil
libs-apache-arrow
+ cpp-actors-core
)
target_sources(formats-arrow-common PRIVATE
${CMAKE_SOURCE_DIR}/ydb/core/formats/arrow/common/validation.cpp
diff --git a/ydb/core/formats/arrow/common/CMakeLists.linux-x86_64.txt b/ydb/core/formats/arrow/common/CMakeLists.linux-x86_64.txt
index c351a86aff..15e24a461d 100644
--- a/ydb/core/formats/arrow/common/CMakeLists.linux-x86_64.txt
+++ b/ydb/core/formats/arrow/common/CMakeLists.linux-x86_64.txt
@@ -13,6 +13,7 @@ target_link_libraries(formats-arrow-common PUBLIC
contrib-libs-cxxsupp
yutil
libs-apache-arrow
+ cpp-actors-core
)
target_sources(formats-arrow-common PRIVATE
${CMAKE_SOURCE_DIR}/ydb/core/formats/arrow/common/validation.cpp
diff --git a/ydb/core/formats/arrow/common/CMakeLists.windows-x86_64.txt b/ydb/core/formats/arrow/common/CMakeLists.windows-x86_64.txt
index 757b729018..f26f834bd6 100644
--- a/ydb/core/formats/arrow/common/CMakeLists.windows-x86_64.txt
+++ b/ydb/core/formats/arrow/common/CMakeLists.windows-x86_64.txt
@@ -12,6 +12,7 @@ target_link_libraries(formats-arrow-common PUBLIC
contrib-libs-cxxsupp
yutil
libs-apache-arrow
+ cpp-actors-core
)
target_sources(formats-arrow-common PRIVATE
${CMAKE_SOURCE_DIR}/ydb/core/formats/arrow/common/validation.cpp
diff --git a/ydb/core/formats/arrow/common/validation.cpp b/ydb/core/formats/arrow/common/validation.cpp
index ca069631e7..7cc69cdc27 100644
--- a/ydb/core/formats/arrow/common/validation.cpp
+++ b/ydb/core/formats/arrow/common/validation.cpp
@@ -1,5 +1,10 @@
#include "validation.h"
+#include <library/cpp/actors/core/log.h>
namespace NKikimr::NArrow {
+void TStatusValidator::Validate(const arrow::Status& status) {
+ AFL_VERIFY(status.ok())("problem", status.ToString().c_str());
+}
+
}
diff --git a/ydb/core/formats/arrow/common/validation.h b/ydb/core/formats/arrow/common/validation.h
index 59f4364cc9..f71f18ece5 100644
--- a/ydb/core/formats/arrow/common/validation.h
+++ b/ydb/core/formats/arrow/common/validation.h
@@ -8,19 +8,17 @@ namespace NKikimr::NArrow {
class TStatusValidator {
public:
- static void Validate(const arrow::Status& status) {
- Y_ABORT_UNLESS(status.ok(), "%s", status.ToString().c_str());
- }
+ static void Validate(const arrow::Status& status);
template <class T>
static T GetValid(const arrow::Result<T>& result) {
- Y_ABORT_UNLESS(result.ok(), "%s", result.status().ToString().c_str());
+ Validate(result.status());
return *result;
}
template <class T>
static T GetValid(arrow::Result<T>&& result) {
- Y_ABORT_UNLESS(result.ok(), "%s", result.status().ToString().c_str());
+ Validate(result.status());
return std::move(*result);
}
};
diff --git a/ydb/core/formats/arrow/common/ya.make b/ydb/core/formats/arrow/common/ya.make
index bf6f79458a..cafa60e9b4 100644
--- a/ydb/core/formats/arrow/common/ya.make
+++ b/ydb/core/formats/arrow/common/ya.make
@@ -2,6 +2,7 @@ LIBRARY()
PEERDIR(
contrib/libs/apache/arrow
+ library/cpp/actors/core
)
SRCS(
diff --git a/ydb/core/tx/columnshard/engines/changes/compaction/column_cursor.cpp b/ydb/core/tx/columnshard/engines/changes/compaction/column_cursor.cpp
index 95179bc534..77d6cf592c 100644
--- a/ydb/core/tx/columnshard/engines/changes/compaction/column_cursor.cpp
+++ b/ydb/core/tx/columnshard/engines/changes/compaction/column_cursor.cpp
@@ -8,7 +8,7 @@ bool TPortionColumnCursor::Fetch(TMergedColumn& column) {
Y_ABORT_UNLESS(RecordIndexStart);
ui32 currentStartPortionIdx = *RecordIndexStart;
ui32 currentFinishPortionIdx = RecordIndexFinish;
-
+// NActors::TLogContextGuard lg(NActors::TLogContextBuilder::Build()("portion_id", PortionId));
while (currentStartPortionIdx - ChunkRecordIndexStartPosition >= CurrentColumnChunk->GetMeta().GetNumRowsVerified()) {
if (!NextChunk()) {
return false;
@@ -18,11 +18,11 @@ bool TPortionColumnCursor::Fetch(TMergedColumn& column) {
ui32 currentStart = currentStartPortionIdx - ChunkRecordIndexStartPosition;
while (currentFinishPortionIdx - ChunkRecordIndexStartPosition >= CurrentColumnChunk->GetMeta().GetNumRowsVerified()) {
const ui32 currentFinish = CurrentColumnChunk->GetMeta().GetNumRowsVerified();
- if (currentStart == 0) {
- column.AppendBlob(CurrentBlobChunk->GetData(), *CurrentColumnChunk);
- } else {
+// if (currentStart == 0) {
+// column.AppendBlob(CurrentBlobChunk->GetData(), *CurrentColumnChunk);
+// } else {
column.AppendSlice(GetCurrentArray(), currentStart, currentFinish - currentStart);
- }
+// }
currentStart = 0;
if (!NextChunk()) {
return false;
diff --git a/ydb/core/tx/columnshard/engines/changes/compaction/column_cursor.h b/ydb/core/tx/columnshard/engines/changes/compaction/column_cursor.h
index a631ceb24d..330deaf936 100644
--- a/ydb/core/tx/columnshard/engines/changes/compaction/column_cursor.h
+++ b/ydb/core/tx/columnshard/engines/changes/compaction/column_cursor.h
@@ -20,6 +20,7 @@ private:
const TColumnRecord* CurrentColumnChunk = nullptr;
std::shared_ptr<arrow::Array> CurrentArray;
std::shared_ptr<TColumnLoader> ColumnLoader;
+ const ui64 PortionId;
const std::shared_ptr<arrow::Array>& GetCurrentArray();
@@ -36,11 +37,13 @@ public:
bool Fetch(TMergedColumn& column);
- TPortionColumnCursor(const std::vector<IPortionColumnChunk::TPtr>& columnChunks, const std::vector<const TColumnRecord*>& records, const std::shared_ptr<TColumnLoader> loader)
+ TPortionColumnCursor(const std::vector<IPortionColumnChunk::TPtr>& columnChunks, const std::vector<const TColumnRecord*>& records, const std::shared_ptr<TColumnLoader>& loader, const ui64 portionId)
: BlobChunks(columnChunks)
, ColumnChunks(records)
, ColumnLoader(loader)
+ , PortionId(portionId)
{
+ Y_UNUSED(PortionId);
Y_ABORT_UNLESS(BlobChunks.size());
Y_ABORT_UNLESS(ColumnChunks.size() == BlobChunks.size());
CurrentBlobChunk = BlobChunks.front();
diff --git a/ydb/core/tx/columnshard/engines/changes/general_compaction.cpp b/ydb/core/tx/columnshard/engines/changes/general_compaction.cpp
index e165bb59e8..7da321a4aa 100644
--- a/ydb/core/tx/columnshard/engines/changes/general_compaction.cpp
+++ b/ydb/core/tx/columnshard/engines/changes/general_compaction.cpp
@@ -18,16 +18,12 @@ namespace NKikimr::NOlap::NCompaction {
TConclusionStatus TGeneralCompactColumnEngineChanges::DoConstructBlobs(TConstructionContext& context) noexcept {
std::vector<TPortionInfoWithBlobs> portions = TPortionInfoWithBlobs::RestorePortions(SwitchedPortions, Blobs);
Blobs.clear();
- std::optional<TSnapshot> maxSnapshot;
i64 portionsSize = 0;
i64 portionsCount = 0;
i64 insertedPortionsSize = 0;
i64 compactedPortionsSize = 0;
i64 otherPortionsSize = 0;
for (auto&& i : SwitchedPortions) {
- if (!maxSnapshot || *maxSnapshot < i.GetMinSnapshot()) {
- maxSnapshot = i.GetMinSnapshot();
- }
if (i.GetMeta().GetProduced() == TPortionMeta::EProduced::INSERTED) {
insertedPortionsSize += i.GetBlobBytes();
} else if (i.GetMeta().GetProduced() == TPortionMeta::EProduced::SPLIT_COMPACTED) {
@@ -40,7 +36,6 @@ TConclusionStatus TGeneralCompactColumnEngineChanges::DoConstructBlobs(TConstruc
}
NChanges::TGeneralCompactionCounters::OnPortionsKind(insertedPortionsSize, compactedPortionsSize, otherPortionsSize);
NChanges::TGeneralCompactionCounters::OnRepackPortions(portionsCount, portionsSize);
- Y_ABORT_UNLESS(maxSnapshot);
static const TString portionIdFieldName = "$$__portion_id";
static const TString portionRecordIndexFieldName = "$$__portion_record_idx";
@@ -90,18 +85,24 @@ TConclusionStatus TGeneralCompactColumnEngineChanges::DoConstructBlobs(TConstruc
std::vector<std::map<std::string, std::vector<TColumnPortionResult>>> chunkGroups;
chunkGroups.resize(batchResults.size());
+// Cerr << context.SchemaVersions.DebugString() << Endl;
for (auto&& f : resultSchema->GetSchema()->fields()) {
+ NActors::TLogContextGuard logGuard(NActors::TLogContextBuilder::Build()("field_name", f->name()));
const ui32 columnId = resultSchema->GetColumnId(f->name());
auto columnInfo = stats->GetColumnInfo(columnId);
Y_ABORT_UNLESS(columnInfo);
std::vector<TPortionColumnCursor> cursors;
- auto loader = resultSchema->GetColumnLoader(f->name());
+// Cerr << f->name() << Endl;
for (auto&& p : portions) {
+// Cerr << p.GetPortionInfo().DebugString() << Endl;
+ auto dataSchema = context.SchemaVersions.GetSchema(p.GetPortionInfo().GetMinSnapshot());
+ auto loader = dataSchema->GetColumnLoader(f->name());
+// Cerr << "loader: " << loader->DebugString() << Endl;
std::vector<const TColumnRecord*> records;
std::vector<IPortionColumnChunk::TPtr> chunks;
p.ExtractColumnChunks(columnId, records, chunks);
- cursors.emplace_back(TPortionColumnCursor(chunks, records, loader));
+ cursors.emplace_back(TPortionColumnCursor(chunks, records, loader, p.GetPortionInfo().GetPortionId()));
}
ui32 batchesRecordsCount = 0;
@@ -182,7 +183,7 @@ TConclusionStatus TGeneralCompactColumnEngineChanges::DoConstructBlobs(TConstruc
TGeneralSerializedSlice slice(std::move(i));
auto b = batchResult->Slice(recordIdx, slice.GetRecordsCount());
std::vector<std::vector<IPortionColumnChunk::TPtr>> chunksByBlobs = slice.GroupChunksByBlobs();
- AppendedPortions.emplace_back(TPortionInfoWithBlobs::BuildByBlobs(chunksByBlobs, nullptr, GranuleMeta->GetPathId(), *maxSnapshot, SaverContext.GetStorageOperator()));
+ AppendedPortions.emplace_back(TPortionInfoWithBlobs::BuildByBlobs(chunksByBlobs, nullptr, GranuleMeta->GetPathId(), resultSchema->GetSnapshot(), SaverContext.GetStorageOperator()));
NArrow::TFirstLastSpecialKeys primaryKeys(slice.GetFirstLastPKBatch(resultSchema->GetIndexInfo().GetReplaceKey()));
NArrow::TMinMaxSpecialKeys snapshotKeys(b, TIndexInfo::ArrowSchemaSnapshot());
AppendedPortions.back().GetPortionInfo().AddMetadata(*resultSchema, primaryKeys, snapshotKeys, SaverContext.GetTierName());
diff --git a/ydb/core/tx/columnshard/engines/column_engine.h b/ydb/core/tx/columnshard/engines/column_engine.h
index 1cb1f87a21..d074e1510d 100644
--- a/ydb/core/tx/columnshard/engines/column_engine.h
+++ b/ydb/core/tx/columnshard/engines/column_engine.h
@@ -292,6 +292,14 @@ class TVersionedIndex {
std::map<ui64, ISnapshotSchema::TPtr> SnapshotByVersion;
ui64 LastSchemaVersion = 0;
public:
+ TString DebugString() const {
+ TStringBuilder sb;
+ for (auto&& i : Snapshots) {
+ sb << i.first << ":" << i.second->DebugString() << ";";
+ }
+ return sb;
+ }
+
ISnapshotSchema::TPtr GetSchema(const ui64 version) const {
auto it = SnapshotByVersion.find(version);
return it == SnapshotByVersion.end() ? nullptr : it->second;
diff --git a/ydb/core/tx/columnshard/engines/scheme/column_features.h b/ydb/core/tx/columnshard/engines/scheme/column_features.h
index ee99be18a0..d2f0b21b77 100644
--- a/ydb/core/tx/columnshard/engines/scheme/column_features.h
+++ b/ydb/core/tx/columnshard/engines/scheme/column_features.h
@@ -136,6 +136,15 @@ private:
: ColumnId(columnId) {
}
public:
+
+ TString DebugString() const {
+ TStringBuilder sb;
+ sb << "compression=" << (Compression ? Compression->DebugString() : "NO") << ";";
+ sb << "encoding=" << (DictionaryEncoding ? DictionaryEncoding->DebugString() : "NO") << ";";
+ sb << "loader=" << (Loader ? Loader->DebugString() : "NO") << ";";
+ return sb;
+ }
+
NArrow::NTransformation::ITransformer::TPtr GetSaveTransformer() const;
static std::optional<TColumnFeatures> BuildFromProto(const NKikimrSchemeOp::TOlapColumnDescription& columnInfo, const TIndexInfo& indexInfo);
static TColumnFeatures BuildFromIndexInfo(const ui32 columnId, const TIndexInfo& indexInfo);
diff --git a/ydb/core/tx/columnshard/engines/scheme/index_info.h b/ydb/core/tx/columnshard/engines/scheme/index_info.h
index a9aaf7d087..a50e3dbeef 100644
--- a/ydb/core/tx/columnshard/engines/scheme/index_info.h
+++ b/ydb/core/tx/columnshard/engines/scheme/index_info.h
@@ -49,11 +49,16 @@ public:
};
TString DebugString() const {
- return TStringBuilder() << "("
+ TStringBuilder sb;
+ sb << "("
<< "id=" << Id << ";"
<< "version=" << Version << ";"
<< "name=" << Name << ";"
<< ")";
+ for (auto&& i : ColumnFeatures) {
+ sb << GetColumnName(i.first) << ":" << i.second.DebugString() << ";";
+ }
+ return sb;
}
/// Appends the special columns to the batch.