aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authornsofya <nsofya@ydb.tech>2023-11-02 23:38:49 +0300
committernsofya <nsofya@ydb.tech>2023-11-02 23:55:12 +0300
commit47a733af080e57d9d3d7659b2458d9b21eaf7f18 (patch)
tree95bbae7c23f4cee213fd0e443590f3e1709b84e8
parentdab4e7eff56df570d1174a06824539de1e64d22c (diff)
downloadydb-47a733af080e57d9d3d7659b2458d9b21eaf7f18.tar.gz
KIKIMR-19805: Read portions data and normalize
-rw-r--r--ydb/core/formats/arrow/arrow_helpers.cpp43
-rw-r--r--ydb/core/formats/arrow/arrow_helpers.h2
-rw-r--r--ydb/core/formats/arrow/permutations.cpp1
-rw-r--r--ydb/core/tablet/resource_broker.cpp11
-rw-r--r--ydb/core/tx/columnshard/columnshard_impl.cpp4
-rw-r--r--ydb/core/tx/columnshard/columnshard_ut_common.cpp8
-rw-r--r--ydb/core/tx/columnshard/columnshard_ut_common.h2
-rw-r--r--ydb/core/tx/columnshard/engines/portions/meta.cpp1
-rw-r--r--ydb/core/tx/columnshard/engines/portions/meta.h8
-rw-r--r--ydb/core/tx/columnshard/engines/portions/portion_info.cpp1
-rw-r--r--ydb/core/tx/columnshard/engines/reader/plain_reader/interval.cpp2
-rw-r--r--ydb/core/tx/columnshard/engines/reader/read_filter_merger.h4
-rw-r--r--ydb/core/tx/columnshard/engines/scheme/abstract_scheme.cpp9
-rw-r--r--ydb/core/tx/columnshard/engines/scheme/abstract_scheme.h2
-rw-r--r--ydb/core/tx/columnshard/normalizer/portion/CMakeLists.darwin-x86_64.txt2
-rw-r--r--ydb/core/tx/columnshard/normalizer/portion/CMakeLists.linux-aarch64.txt2
-rw-r--r--ydb/core/tx/columnshard/normalizer/portion/CMakeLists.linux-x86_64.txt2
-rw-r--r--ydb/core/tx/columnshard/normalizer/portion/CMakeLists.windows-x86_64.txt2
-rw-r--r--ydb/core/tx/columnshard/normalizer/portion/chunks.cpp159
-rw-r--r--ydb/core/tx/columnshard/normalizer/portion/chunks.h110
-rw-r--r--ydb/core/tx/columnshard/normalizer/portion/min_max.cpp222
-rw-r--r--ydb/core/tx/columnshard/normalizer/portion/min_max.h44
-rw-r--r--ydb/core/tx/columnshard/normalizer/portion/normalizer.cpp202
-rw-r--r--ydb/core/tx/columnshard/normalizer/portion/normalizer.h165
-rw-r--r--ydb/core/tx/columnshard/normalizer/portion/ya.make2
-rw-r--r--ydb/core/tx/columnshard/ut_rw/ut_normalizer.cpp69
26 files changed, 753 insertions, 326 deletions
diff --git a/ydb/core/formats/arrow/arrow_helpers.cpp b/ydb/core/formats/arrow/arrow_helpers.cpp
index f9951b9432..cc554adee8 100644
--- a/ydb/core/formats/arrow/arrow_helpers.cpp
+++ b/ydb/core/formats/arrow/arrow_helpers.cpp
@@ -139,24 +139,37 @@ std::shared_ptr<arrow::RecordBatch> MakeEmptyBatch(const std::shared_ptr<arrow::
return arrow::RecordBatch::Make(schema, 0, columns);
}
-std::shared_ptr<arrow::RecordBatch> ExtractColumns(const std::shared_ptr<arrow::RecordBatch>& srcBatch,
- const std::vector<TString>& columnNames) {
- std::vector<std::shared_ptr<arrow::Field>> fields;
- fields.reserve(columnNames.size());
- std::vector<std::shared_ptr<arrow::Array>> columns;
- columns.reserve(columnNames.size());
-
- auto srcSchema = srcBatch->schema();
- for (auto& name : columnNames) {
- int pos = srcSchema->GetFieldIndex(name);
- if (pos < 0) {
- return {};
+namespace {
+ template <class TStringType>
+ std::shared_ptr<arrow::RecordBatch> ExtractColumnsImpl(const std::shared_ptr<arrow::RecordBatch>& srcBatch,
+ const std::vector<TStringType>& columnNames) {
+ std::vector<std::shared_ptr<arrow::Field>> fields;
+ fields.reserve(columnNames.size());
+ std::vector<std::shared_ptr<arrow::Array>> columns;
+ columns.reserve(columnNames.size());
+
+ auto srcSchema = srcBatch->schema();
+ for (auto& name : columnNames) {
+ int pos = srcSchema->GetFieldIndex(name);
+ if (pos < 0) {
+ return {};
+ }
+ fields.push_back(srcSchema->field(pos));
+ columns.push_back(srcBatch->column(pos));
}
- fields.push_back(srcSchema->field(pos));
- columns.push_back(srcBatch->column(pos));
+
+ return arrow::RecordBatch::Make(std::make_shared<arrow::Schema>(std::move(fields)), srcBatch->num_rows(), std::move(columns));
}
+}
- return arrow::RecordBatch::Make(std::make_shared<arrow::Schema>(std::move(fields)), srcBatch->num_rows(), std::move(columns));
+std::shared_ptr<arrow::RecordBatch> ExtractColumns(const std::shared_ptr<arrow::RecordBatch>& srcBatch,
+ const std::vector<TString>& columnNames) {
+ return ExtractColumnsImpl(srcBatch, columnNames);
+}
+
+std::shared_ptr<arrow::RecordBatch> ExtractColumns(const std::shared_ptr<arrow::RecordBatch>& srcBatch,
+ const std::vector<std::string>& columnNames) {
+ return ExtractColumnsImpl(srcBatch, columnNames);
}
std::shared_ptr<arrow::RecordBatch> ExtractColumnsValidate(const std::shared_ptr<arrow::RecordBatch>& srcBatch,
diff --git a/ydb/core/formats/arrow/arrow_helpers.h b/ydb/core/formats/arrow/arrow_helpers.h
index 51b0246a63..fe4bd4c3c0 100644
--- a/ydb/core/formats/arrow/arrow_helpers.h
+++ b/ydb/core/formats/arrow/arrow_helpers.h
@@ -57,6 +57,8 @@ std::shared_ptr<arrow::RecordBatch> MakeEmptyBatch(const std::shared_ptr<arrow::
std::shared_ptr<arrow::RecordBatch> ExtractColumns(const std::shared_ptr<arrow::RecordBatch>& srcBatch,
const std::vector<TString>& columnNames);
+std::shared_ptr<arrow::RecordBatch> ExtractColumns(const std::shared_ptr<arrow::RecordBatch>& srcBatch,
+ const std::vector<std::string>& columnNames);
std::shared_ptr<arrow::RecordBatch> ExtractColumnsValidate(const std::shared_ptr<arrow::RecordBatch>& srcBatch,
const std::vector<TString>& columnNames);
std::shared_ptr<arrow::RecordBatch> ExtractColumns(const std::shared_ptr<arrow::RecordBatch>& srcBatch,
diff --git a/ydb/core/formats/arrow/permutations.cpp b/ydb/core/formats/arrow/permutations.cpp
index a6e75019f7..3abc3bb2ce 100644
--- a/ydb/core/formats/arrow/permutations.cpp
+++ b/ydb/core/formats/arrow/permutations.cpp
@@ -121,6 +121,7 @@ std::shared_ptr<arrow::UInt64Array> MakeFilterPermutation(const std::vector<ui64
}
std::shared_ptr<arrow::RecordBatch> CopyRecords(const std::shared_ptr<arrow::RecordBatch>& source, const std::vector<ui64>& indexes) {
+ Y_ABORT_UNLESS(!!source);
auto schema = source->schema();
std::vector<std::shared_ptr<arrow::Array>> columns;
for (auto&& i : source->columns()) {
diff --git a/ydb/core/tablet/resource_broker.cpp b/ydb/core/tablet/resource_broker.cpp
index 9204c6410b..5583ff7186 100644
--- a/ydb/core/tablet/resource_broker.cpp
+++ b/ydb/core/tablet/resource_broker.cpp
@@ -1327,6 +1327,12 @@ NKikimrResourceBroker::TResourceBrokerConfig MakeDefaultConfig()
queue->MutableLimit()->SetMemory(CSScanMemoryLimit);
queue = config.AddQueues();
+ queue->SetName("queue_cs_normalizer");
+ queue->SetWeight(100);
+ queue->MutableLimit()->SetCpu(3);
+ queue->MutableLimit()->SetMemory(CSScanMemoryLimit);
+
+ queue = config.AddQueues();
queue->SetName("queue_transaction");
queue->SetWeight(100);
queue->MutableLimit()->SetCpu(4);
@@ -1423,6 +1429,11 @@ NKikimrResourceBroker::TResourceBrokerConfig MakeDefaultConfig()
task->SetDefaultDuration(TDuration::Minutes(10).GetValue());
task = config.AddTasks();
+ task->SetName("CS::NORMALIZER");
+ task->SetQueueName("queue_cs_normalizer");
+ task->SetDefaultDuration(TDuration::Minutes(10).GetValue());
+
+ task = config.AddTasks();
task->SetName(NLocalDb::TransactionTaskName);
task->SetQueueName("queue_transaction");
task->SetDefaultDuration(TDuration::Minutes(10).GetValue());
diff --git a/ydb/core/tx/columnshard/columnshard_impl.cpp b/ydb/core/tx/columnshard/columnshard_impl.cpp
index 78d722253c..b815b0926c 100644
--- a/ydb/core/tx/columnshard/columnshard_impl.cpp
+++ b/ydb/core/tx/columnshard/columnshard_impl.cpp
@@ -27,7 +27,8 @@
#include <ydb/core/tx/columnshard/normalizer/granule/normalizer.h>
-#include <ydb/core/tx/columnshard/normalizer/portion/normalizer.h>
+#include <ydb/core/tx/columnshard/normalizer/portion/min_max.h>
+#include <ydb/core/tx/columnshard/normalizer/portion/chunks.h>
namespace NKikimr::NColumnShard {
@@ -183,6 +184,7 @@ TColumnShard::TColumnShard(TTabletStorageInfo* info, const TActorId& tablet)
NormalizerController.RegisterNormalizer(std::make_shared<NOlap::TGranulesNormalizer>());
NormalizerController.RegisterNormalizer(std::make_shared<NOlap::TChunksNormalizer>(Info()));
+ NormalizerController.RegisterNormalizer(std::make_shared<NOlap::TPortionsNormalizer>(Info()));
}
void TColumnShard::OnDetach(const TActorContext& ctx) {
diff --git a/ydb/core/tx/columnshard/columnshard_ut_common.cpp b/ydb/core/tx/columnshard/columnshard_ut_common.cpp
index c90860471b..a1d30787f9 100644
--- a/ydb/core/tx/columnshard/columnshard_ut_common.cpp
+++ b/ydb/core/tx/columnshard/columnshard_ut_common.cpp
@@ -429,7 +429,7 @@ namespace NKikimr::NColumnShard {
PlanSchemaTx(runtime, sender, snap);
}
- void PrepareTablet(TTestBasicRuntime& runtime, const ui64 tableId, const std::vector<std::pair<TString, NScheme::TTypeInfo>>& schema) {
+ void PrepareTablet(TTestBasicRuntime& runtime, const ui64 tableId, const std::vector<std::pair<TString, NScheme::TTypeInfo>>& schema, const ui32 keySize) {
using namespace NTxUT;
CreateTestBootstrapper(runtime, CreateTestTabletInfo(TTestTxConfig::TxTablet0, TTabletTypes::ColumnShard), &CreateColumnShard);
@@ -439,7 +439,11 @@ namespace NKikimr::NColumnShard {
TestTableDescription tableDescription;
tableDescription.Schema = schema;
- tableDescription.Pk = { schema[0] };
+ tableDescription.Pk = {};
+ for (ui64 i = 0; i < keySize; ++i) {
+ Y_ABORT_UNLESS(i < schema.size());
+ tableDescription.Pk.push_back(schema[i]);
+ }
TActorId sender = runtime.AllocateEdgeActor();
SetupSchema(runtime, sender, tableId, tableDescription);
}
diff --git a/ydb/core/tx/columnshard/columnshard_ut_common.h b/ydb/core/tx/columnshard/columnshard_ut_common.h
index 517ac70c1d..b69df6a03e 100644
--- a/ydb/core/tx/columnshard/columnshard_ut_common.h
+++ b/ydb/core/tx/columnshard/columnshard_ut_common.h
@@ -561,7 +561,7 @@ namespace NKikimr::NColumnShard {
void SetupSchema(TTestBasicRuntime& runtime, TActorId& sender, ui64 pathId,
const TestTableDescription& table = {}, TString codec = "none");
- void PrepareTablet(TTestBasicRuntime& runtime, const ui64 tableId, const std::vector<std::pair<TString, NScheme::TTypeInfo>>& schema);
+ void PrepareTablet(TTestBasicRuntime& runtime, const ui64 tableId, const std::vector<std::pair<TString, NScheme::TTypeInfo>>& schema, const ui32 keySize = 1);
std::shared_ptr<arrow::RecordBatch> ReadAllAsBatch(TTestBasicRuntime& runtime, const ui64 tableId, const NOlap::TSnapshot& snapshot, const std::vector<std::pair<TString, NScheme::TTypeInfo>>& schema);
}
diff --git a/ydb/core/tx/columnshard/engines/portions/meta.cpp b/ydb/core/tx/columnshard/engines/portions/meta.cpp
index 4c7576ab61..436b180a4f 100644
--- a/ydb/core/tx/columnshard/engines/portions/meta.cpp
+++ b/ydb/core/tx/columnshard/engines/portions/meta.cpp
@@ -43,6 +43,7 @@ bool TPortionMeta::DeserializeFromProto(const NKikimrTxColumnShard::TIndexPortio
AFL_ERROR(NKikimrServices::TX_COLUMNSHARD)("event", "DeserializeFromProto")("error", "incorrect portion meta")("meta", portionMeta.DebugString());
return false;
}
+ Y_ABORT_UNLESS(Produced != TPortionMeta::EProduced::UNSPECIFIED);
if (portionMeta.HasPrimaryKeyBorders()) {
ReplaceKeyEdges = std::make_shared<NArrow::TFirstLastSpecialKeys>(portionMeta.GetPrimaryKeyBorders(), indexInfo.GetReplaceKey());
diff --git a/ydb/core/tx/columnshard/engines/portions/meta.h b/ydb/core/tx/columnshard/engines/portions/meta.h
index 7e12b83b09..7acafc89a7 100644
--- a/ydb/core/tx/columnshard/engines/portions/meta.h
+++ b/ydb/core/tx/columnshard/engines/portions/meta.h
@@ -42,6 +42,14 @@ public:
out << info.DebugString();
return out;
}
+
+ bool HasSnapshotMinMax() const {
+ return !!RecordSnapshotMax && !!RecordSnapshotMin;
+ }
+
+ bool HasPrimaryKeyBorders() const {
+ return !!IndexKeyStart && !!IndexKeyEnd;
+ }
};
class TPortionAddress {
diff --git a/ydb/core/tx/columnshard/engines/portions/portion_info.cpp b/ydb/core/tx/columnshard/engines/portions/portion_info.cpp
index cb1cdb9070..778174d01b 100644
--- a/ydb/core/tx/columnshard/engines/portions/portion_info.cpp
+++ b/ydb/core/tx/columnshard/engines/portions/portion_info.cpp
@@ -34,7 +34,6 @@ void TPortionInfo::AddMetadata(const ISnapshotSchema& snapshotSchema, const std:
void TPortionInfo::AddMetadata(const ISnapshotSchema& snapshotSchema, const NArrow::TFirstLastSpecialKeys& primaryKeys, const NArrow::TMinMaxSpecialKeys& snapshotKeys, const TString& tierName) {
const auto& indexInfo = snapshotSchema.GetIndexInfo();
- Meta = {};
Meta.FirstPkColumn = indexInfo.GetPKFirstColumnId();
Meta.FillBatchInfo(primaryKeys, snapshotKeys, indexInfo);
Meta.SetTierName(tierName);
diff --git a/ydb/core/tx/columnshard/engines/reader/plain_reader/interval.cpp b/ydb/core/tx/columnshard/engines/reader/plain_reader/interval.cpp
index 4d48a70563..25dde64378 100644
--- a/ydb/core/tx/columnshard/engines/reader/plain_reader/interval.cpp
+++ b/ydb/core/tx/columnshard/engines/reader/plain_reader/interval.cpp
@@ -50,7 +50,7 @@ protected:
Y_ABORT_UNLESS(ResultBatch->schema()->Equals(Context->GetResultSchema()));
}
if (MergingContext->GetIncludeFinish() && originalSourcesCount == 1) {
- Y_ABORT_UNLESS(merger->IsEmpty());
+ AFL_VERIFY(merger->IsEmpty())("merging_context_finish", MergingContext->GetFinish().DebugJson().GetStringRobust())("merger", merger->DebugString());
}
} else {
auto rbBuilder = std::make_shared<NIndexedReader::TRecordBatchBuilder>(Context->GetResultFields());
diff --git a/ydb/core/tx/columnshard/engines/reader/read_filter_merger.h b/ydb/core/tx/columnshard/engines/reader/read_filter_merger.h
index 4361e68dd3..69e3ccb20a 100644
--- a/ydb/core/tx/columnshard/engines/reader/read_filter_merger.h
+++ b/ydb/core/tx/columnshard/engines/reader/read_filter_merger.h
@@ -332,6 +332,10 @@ public:
return SortHeap.Size();
}
+ TString DebugString() const {
+ return TStringBuilder() << "sort_heap=" << SortHeap.DebugJson();
+ }
+
void PutControlPoint(std::shared_ptr<TSortableBatchPosition> point);
void RemoveControlPoint();
diff --git a/ydb/core/tx/columnshard/engines/scheme/abstract_scheme.cpp b/ydb/core/tx/columnshard/engines/scheme/abstract_scheme.cpp
index 519a467e7d..8b16dda038 100644
--- a/ydb/core/tx/columnshard/engines/scheme/abstract_scheme.cpp
+++ b/ydb/core/tx/columnshard/engines/scheme/abstract_scheme.cpp
@@ -15,6 +15,15 @@ std::shared_ptr<arrow::Field> ISnapshotSchema::GetFieldByColumnId(const ui32 col
return GetFieldByIndex(GetFieldIndex(columnId));
}
+std::set<ui32> ISnapshotSchema::GetPkColumnsIds() const {
+ std::set<ui32> result;
+ for (auto&& field : GetSchema()->fields()) {
+ result.emplace(GetColumnId(field->name()));
+ }
+ return result;
+
+}
+
std::shared_ptr<arrow::RecordBatch> ISnapshotSchema::NormalizeBatch(const ISnapshotSchema& dataSchema, const std::shared_ptr<arrow::RecordBatch> batch) const {
if (dataSchema.GetSnapshot() == GetSnapshot()) {
return batch;
diff --git a/ydb/core/tx/columnshard/engines/scheme/abstract_scheme.h b/ydb/core/tx/columnshard/engines/scheme/abstract_scheme.h
index 1c954ecb7e..ab1c4eabf5 100644
--- a/ydb/core/tx/columnshard/engines/scheme/abstract_scheme.h
+++ b/ydb/core/tx/columnshard/engines/scheme/abstract_scheme.h
@@ -53,6 +53,8 @@ public:
virtual ui64 GetVersion() const = 0;
virtual ui32 GetColumnsCount() const = 0;
+ std::set<ui32> GetPkColumnsIds() const;
+
std::shared_ptr<arrow::RecordBatch> NormalizeBatch(const ISnapshotSchema& dataSchema, const std::shared_ptr<arrow::RecordBatch> batch) const;
std::shared_ptr<arrow::RecordBatch> PrepareForInsert(const TString& data, const std::shared_ptr<arrow::Schema>& dataSchema) const;
};
diff --git a/ydb/core/tx/columnshard/normalizer/portion/CMakeLists.darwin-x86_64.txt b/ydb/core/tx/columnshard/normalizer/portion/CMakeLists.darwin-x86_64.txt
index da21c485e5..ec45390bce 100644
--- a/ydb/core/tx/columnshard/normalizer/portion/CMakeLists.darwin-x86_64.txt
+++ b/ydb/core/tx/columnshard/normalizer/portion/CMakeLists.darwin-x86_64.txt
@@ -18,4 +18,6 @@ target_link_libraries(columnshard-normalizer-portion PUBLIC
)
target_sources(columnshard-normalizer-portion PRIVATE
${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/normalizer/portion/normalizer.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/normalizer/portion/min_max.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/normalizer/portion/chunks.cpp
)
diff --git a/ydb/core/tx/columnshard/normalizer/portion/CMakeLists.linux-aarch64.txt b/ydb/core/tx/columnshard/normalizer/portion/CMakeLists.linux-aarch64.txt
index 10efe432e2..63547312b6 100644
--- a/ydb/core/tx/columnshard/normalizer/portion/CMakeLists.linux-aarch64.txt
+++ b/ydb/core/tx/columnshard/normalizer/portion/CMakeLists.linux-aarch64.txt
@@ -19,4 +19,6 @@ target_link_libraries(columnshard-normalizer-portion PUBLIC
)
target_sources(columnshard-normalizer-portion PRIVATE
${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/normalizer/portion/normalizer.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/normalizer/portion/min_max.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/normalizer/portion/chunks.cpp
)
diff --git a/ydb/core/tx/columnshard/normalizer/portion/CMakeLists.linux-x86_64.txt b/ydb/core/tx/columnshard/normalizer/portion/CMakeLists.linux-x86_64.txt
index 10efe432e2..63547312b6 100644
--- a/ydb/core/tx/columnshard/normalizer/portion/CMakeLists.linux-x86_64.txt
+++ b/ydb/core/tx/columnshard/normalizer/portion/CMakeLists.linux-x86_64.txt
@@ -19,4 +19,6 @@ target_link_libraries(columnshard-normalizer-portion PUBLIC
)
target_sources(columnshard-normalizer-portion PRIVATE
${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/normalizer/portion/normalizer.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/normalizer/portion/min_max.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/normalizer/portion/chunks.cpp
)
diff --git a/ydb/core/tx/columnshard/normalizer/portion/CMakeLists.windows-x86_64.txt b/ydb/core/tx/columnshard/normalizer/portion/CMakeLists.windows-x86_64.txt
index da21c485e5..ec45390bce 100644
--- a/ydb/core/tx/columnshard/normalizer/portion/CMakeLists.windows-x86_64.txt
+++ b/ydb/core/tx/columnshard/normalizer/portion/CMakeLists.windows-x86_64.txt
@@ -18,4 +18,6 @@ target_link_libraries(columnshard-normalizer-portion PUBLIC
)
target_sources(columnshard-normalizer-portion PRIVATE
${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/normalizer/portion/normalizer.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/normalizer/portion/min_max.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/normalizer/portion/chunks.cpp
)
diff --git a/ydb/core/tx/columnshard/normalizer/portion/chunks.cpp b/ydb/core/tx/columnshard/normalizer/portion/chunks.cpp
new file mode 100644
index 0000000000..0e053489bf
--- /dev/null
+++ b/ydb/core/tx/columnshard/normalizer/portion/chunks.cpp
@@ -0,0 +1,159 @@
+#include "chunks.h"
+#include "normalizer.h"
+
+#include <ydb/core/tx/columnshard/engines/portions/portion_info.h>
+#include <ydb/core/tx/columnshard/tables_manager.h>
+#include <ydb/core/formats/arrow/size_calcer.h>
+
+
+namespace NKikimr::NOlap {
+
+class TChunksNormalizer::TNormalizerResult : public INormalizerChanges {
+ std::vector<TChunksNormalizer::TChunkInfo> Chunks;
+public:
+ TNormalizerResult(std::vector<TChunksNormalizer::TChunkInfo>&& chunks)
+ : Chunks(std::move(chunks))
+ {}
+
+ bool Apply(NTabletFlatExecutor::TTransactionContext& txc, const TNormalizationController& /* normController */) const override {
+ using namespace NColumnShard;
+ NIceDb::TNiceDb db(txc.DB);
+
+ for (auto&& chunkInfo : Chunks) {
+
+ NKikimrTxColumnShard::TIndexColumnMeta metaProto = chunkInfo.GetMetaProto();
+ metaProto.SetNumRows(chunkInfo.GetUpdate().GetNumRows());
+ metaProto.SetRawBytes(chunkInfo.GetUpdate().GetRawBytes());
+
+ const auto& key = chunkInfo.GetKey();
+
+ db.Table<Schema::IndexColumns>().Key(key.GetIndex(), key.GetGranule(), key.GetColumnIdx(),
+ key.GetPlanStep(), key.GetTxId(), key.GetPortion(), key.GetChunk()).Update(
+ NIceDb::TUpdate<Schema::IndexColumns::Metadata>(metaProto.SerializeAsString())
+ );
+ }
+ return true;
+ }
+};
+
+class TRowsAndBytesChangesTask: public NConveyor::ITask {
+public:
+ using TDataContainer = std::vector<TChunksNormalizer::TChunkInfo>;
+private:
+ THashMap<NKikimr::NOlap::TBlobRange, TString> Blobs;
+ std::vector<TChunksNormalizer::TChunkInfo> Chunks;
+ TNormalizationContext NormContext;
+protected:
+ virtual bool DoExecute() override {
+ for (auto&& chunkInfo : Chunks) {
+ const auto& blobRange = chunkInfo.GetBlobRange();
+
+ auto blobIt = Blobs.find(blobRange);
+ Y_ABORT_UNLESS(blobIt != Blobs.end());
+
+ auto columnLoader = chunkInfo.GetLoader();
+ Y_ABORT_UNLESS(!!columnLoader);
+
+ TPortionInfo::TAssembleBlobInfo assembleBlob(blobIt->second);
+ auto batch = assembleBlob.BuildRecordBatch(*columnLoader);
+ Y_ABORT_UNLESS(!!batch);
+
+ chunkInfo.MutableUpdate().SetNumRows(batch->num_rows());
+ chunkInfo.MutableUpdate().SetRawBytes(NArrow::GetBatchDataSize(batch));
+ }
+
+ auto changes = std::make_shared<TChunksNormalizer::TNormalizerResult>(std::move(Chunks));
+ TActorContext::AsActorContext().Send(NormContext.GetColumnshardActor(), std::make_unique<NColumnShard::TEvPrivate::TEvNormalizerResult>(changes));
+ return true;
+ }
+
+public:
+ TRowsAndBytesChangesTask(THashMap<NKikimr::NOlap::TBlobRange, TString>&& blobs, const TNormalizationContext& nCtx, std::vector<TChunksNormalizer::TChunkInfo>&& chunks, THashMap<ui64, ISnapshotSchema::TPtr>&&)
+ : Blobs(std::move(blobs))
+ , Chunks(std::move(chunks))
+ , NormContext(nCtx)
+ {}
+
+ virtual TString GetTaskClassIdentifier() const override {
+ const static TString name = "TRowsAndBytesChangesTask";
+ return name;
+ }
+
+ static void FillBlobRanges(std::shared_ptr<IBlobsReadingAction> readAction, const TChunksNormalizer::TChunkInfo& chunk) {
+ readAction->AddRange(chunk.GetBlobRange());
+ }
+
+ static ui64 GetMemSize(const TChunksNormalizer::TChunkInfo&) {
+ return 10 * 1024 * 1024;
+ }
+};
+
+void TChunksNormalizer::TChunkInfo::InitSchema(const NColumnShard::TTablesManager& tm) {
+ Schema = tm.GetPrimaryIndexSafe().GetVersionedIndex().GetSchema(NOlap::TSnapshot(Key.GetPlanStep(), Key.GetTxId()));
+}
+
+TConclusion<std::vector<INormalizerTask::TPtr>> TChunksNormalizer::Init(const TNormalizationController& controller, NTabletFlatExecutor::TTransactionContext& txc) {
+ using namespace NColumnShard;
+ NIceDb::TNiceDb db(txc.DB);
+
+ bool ready = true;
+ ready = ready & Schema::Precharge<Schema::IndexColumns>(db, txc.DB.GetScheme());
+ if (!ready) {
+ return TConclusionStatus::Fail("Not ready");
+ }
+
+ std::vector<TChunkInfo> chunks;
+ {
+ auto rowset = db.Table<Schema::IndexColumns>().Select();
+ if (!rowset.IsReady()) {
+ return TConclusionStatus::Fail("Not ready");
+ }
+
+ while (!rowset.EndOfSet()) {
+ TKey key;
+ key.Load(rowset);
+
+ TChunkInfo chunkInfo(std::move(key), rowset, &DsGroupSelector);
+ if (chunkInfo.NormalizationRequired()) {
+ chunks.emplace_back(std::move(chunkInfo));
+ }
+
+ if (!rowset.Next()) {
+ return TConclusionStatus::Fail("Not ready");
+ }
+ }
+ }
+
+ std::vector<INormalizerTask::TPtr> tasks;
+ ACFL_INFO("normalizer", "TChunksNormalizer")("message", TStringBuilder() << chunks.size() << " chunks found");
+ if (chunks.empty()) {
+ return tasks;
+ }
+
+ TTablesManager tablesManager(controller.GetStoragesManager(), 0);
+ if (!tablesManager.InitFromDB(db)) {
+ ACFL_ERROR("normalizer", "TChunksNormalizer")("error", "can't initialize tables manager");
+ return TConclusionStatus::Fail("Can't load index");
+ }
+
+ std::vector<TChunkInfo> package;
+ package.reserve(100);
+
+ for (auto&& chunk : chunks) {
+ chunk.InitSchema(tablesManager);
+ package.emplace_back(chunk);
+ if (package.size() == 100) {
+ std::vector<TChunkInfo> local;
+ local.swap(package);
+ tasks.emplace_back(std::make_shared<TPortionsNormalizerTask<TRowsAndBytesChangesTask>>(std::move(local)));
+ }
+ }
+
+ if (package.size() > 0) {
+ tasks.emplace_back(std::make_shared<TPortionsNormalizerTask<TRowsAndBytesChangesTask>>(std::move(package)));
+ }
+ AtomicSet(ActiveTasksCount, tasks.size());
+ return tasks;
+}
+
+}
diff --git a/ydb/core/tx/columnshard/normalizer/portion/chunks.h b/ydb/core/tx/columnshard/normalizer/portion/chunks.h
new file mode 100644
index 0000000000..eb558674d9
--- /dev/null
+++ b/ydb/core/tx/columnshard/normalizer/portion/chunks.h
@@ -0,0 +1,110 @@
+#pragma once
+
+#include <ydb/core/tx/columnshard/normalizer/abstract/abstract.h>
+#include <ydb/core/tx/columnshard/columnshard_schema.h>
+
+#include <ydb/core/tx/columnshard/defs.h>
+
+
+namespace NKikimr::NColumnShard {
+ class TTablesManager;
+}
+
+namespace NKikimr::NOlap {
+
+ class TChunksNormalizer : public INormalizerComponent {
+ public:
+ class TNormalizerResult;
+
+ class TKey {
+ YDB_READONLY(ui64, Index, 0);
+ YDB_READONLY(ui64, Granule, 0);
+ YDB_READONLY(ui64, ColumnIdx, 0);
+ YDB_READONLY(ui64, PlanStep, 0);
+ YDB_READONLY(ui64, TxId, 0);
+ YDB_READONLY(ui64, Portion, 0);
+ YDB_READONLY(ui64, Chunk, 0);
+
+ public:
+ template <class TRowset>
+ void Load(TRowset& rowset) {
+ using namespace NColumnShard;
+ Index = rowset.template GetValue<Schema::IndexColumns::Index>();
+ Granule = rowset.template GetValue<Schema::IndexColumns::Granule>();
+ ColumnIdx = rowset.template GetValue<Schema::IndexColumns::ColumnIdx>();
+ PlanStep = rowset.template GetValue<Schema::IndexColumns::PlanStep>();
+ TxId = rowset.template GetValue<Schema::IndexColumns::TxId>();
+ Portion = rowset.template GetValue<Schema::IndexColumns::Portion>();
+ Chunk = rowset.template GetValue<Schema::IndexColumns::Chunk>();
+ }
+
+ bool operator<(const TKey& other) const {
+ return std::make_tuple(Portion, Chunk, ColumnIdx) < std::make_tuple(other.Portion, other.Chunk, other.ColumnIdx);
+ }
+ };
+
+ class TUpdate {
+ YDB_ACCESSOR(ui64, NumRows, 0);
+ YDB_ACCESSOR(ui64, RawBytes, 0);
+ };
+
+ class TChunkInfo {
+ YDB_READONLY_DEF(TKey, Key);
+ TColumnChunkLoadContext CLContext;
+ ISnapshotSchema::TPtr Schema;
+
+ YDB_ACCESSOR_DEF(TUpdate, Update);
+ public:
+ template <class TSource>
+ TChunkInfo(TKey&& key, const TSource& rowset, const IBlobGroupSelector* dsGroupSelector)
+ : Key(std::move(key))
+ , CLContext(rowset, dsGroupSelector)
+ {}
+
+ const TBlobRange& GetBlobRange() const {
+ return CLContext.GetBlobRange();
+ }
+
+ const NKikimrTxColumnShard::TIndexColumnMeta& GetMetaProto() const {
+ return CLContext.GetMetaProto();
+ }
+
+ bool NormalizationRequired() const {
+ return !CLContext.GetMetaProto().HasNumRows() || !CLContext.GetMetaProto().HasRawBytes();
+ }
+
+ std::shared_ptr<TColumnLoader> GetLoader() const {
+ return Schema->GetColumnLoader(Key.GetColumnIdx());
+ }
+ void InitSchema(const NColumnShard::TTablesManager& tm);
+
+ bool operator<(const TChunkInfo& other) const {
+ return Key < other.Key;
+ }
+ };
+
+ public:
+ TChunksNormalizer(TTabletStorageInfo* info)
+ : DsGroupSelector(info)
+ {}
+
+ virtual const TString& GetName() const override {
+ const static TString name = "TChunksNormalizer";
+ return name;
+ }
+
+ virtual bool WaitResult() const override {
+ return AtomicGet(ActiveTasksCount) > 0;
+ }
+
+ void OnResultReady() override {
+ AtomicDecrement(ActiveTasksCount);
+ }
+
+ virtual TConclusion<std::vector<INormalizerTask::TPtr>> Init(const TNormalizationController& controller, NTabletFlatExecutor::TTransactionContext& txc) override;
+
+ private:
+ NColumnShard::TBlobGroupSelector DsGroupSelector;
+ TAtomic ActiveTasksCount = 0;
+ };
+}
diff --git a/ydb/core/tx/columnshard/normalizer/portion/min_max.cpp b/ydb/core/tx/columnshard/normalizer/portion/min_max.cpp
new file mode 100644
index 0000000000..b367c77a57
--- /dev/null
+++ b/ydb/core/tx/columnshard/normalizer/portion/min_max.cpp
@@ -0,0 +1,222 @@
+#include "min_max.h"
+#include "normalizer.h"
+
+#include <ydb/core/tx/columnshard/engines/portions/portion_info.h>
+#include <ydb/core/tx/columnshard/tables_manager.h>
+#include <ydb/core/tx/columnshard/engines/scheme/filtered_scheme.h>
+
+#include <ydb/core/formats/arrow/arrow_helpers.h>
+
+
+namespace NKikimr::NOlap {
+
+class TMinMaxSnapshotChangesTask: public NConveyor::ITask {
+public:
+ using TDataContainer = std::vector<std::shared_ptr<TPortionInfo>>;
+private:
+ THashMap<NKikimr::NOlap::TBlobRange, TString> Blobs;
+ TDataContainer Portions;
+ THashMap<ui64, ISnapshotSchema::TPtr> Schemas;
+ TNormalizationContext NormContext;
+protected:
+ virtual bool DoExecute() override {
+ Y_ABORT_UNLESS(!Schemas.empty());
+ auto pkColumnIds = Schemas.begin()->second->GetPkColumnsIds();
+ pkColumnIds.insert(TIndexInfo::GetSpecialColumnIds().begin(), TIndexInfo::GetSpecialColumnIds().end());
+
+ for (auto&& portionInfo : Portions) {
+ auto blobSchema = Schemas.FindPtr(portionInfo->GetPortionId());
+ THashMap<TBlobRange, TPortionInfo::TAssembleBlobInfo> blobsDataAssemble;
+ for (auto&& i : portionInfo->Records) {
+ auto blobIt = Blobs.find(i.BlobRange);
+ Y_ABORT_UNLESS(blobIt != Blobs.end());
+ blobsDataAssemble.emplace(i.BlobRange, blobIt->second);
+ }
+
+ AFL_VERIFY(!!blobSchema)("details", portionInfo->DebugString());
+ auto filteredSchema = std::make_shared<TFilteredSnapshotSchema>(*blobSchema, pkColumnIds);
+ auto preparedBatch = portionInfo->PrepareForAssemble(**blobSchema, *filteredSchema, blobsDataAssemble);
+ auto batch = preparedBatch.Assemble();
+ Y_ABORT_UNLESS(!!batch);
+ portionInfo->AddMetadata(**blobSchema, batch, portionInfo->GetMeta().GetTierName());
+ }
+
+ auto changes = std::make_shared<TPortionsNormalizer::TNormalizerResult>(std::move(Portions));
+ TActorContext::AsActorContext().Send(NormContext.GetColumnshardActor(), std::make_unique<NColumnShard::TEvPrivate::TEvNormalizerResult>(changes));
+ return true;
+ }
+
+public:
+ TMinMaxSnapshotChangesTask(THashMap<NKikimr::NOlap::TBlobRange, TString>&& blobs, const TNormalizationContext& nCtx, TDataContainer&& portions, THashMap<ui64, ISnapshotSchema::TPtr>&& schemas)
+ : Blobs(std::move(blobs))
+ , Portions(std::move(portions))
+ , Schemas(std::move(schemas))
+ , NormContext(nCtx)
+ {}
+
+ virtual TString GetTaskClassIdentifier() const override {
+ const static TString name = "TMinMaxSnapshotChangesTask";
+ return name;
+ }
+
+ static void FillBlobRanges(std::shared_ptr<IBlobsReadingAction> readAction, const std::shared_ptr<TPortionInfo>& portion) {
+ for (auto&& chunk : portion->Records) {
+ readAction->AddRange(chunk.BlobRange);
+ }
+ }
+
+ static ui64 GetMemSize(const std::shared_ptr<TPortionInfo>& portion) {
+ return portion->GetRawBytes();
+ }
+
+ static bool CheckPortion(const TPortionInfo& portionInfo) {
+ if (!portionInfo.GetMeta().HasPrimaryKeyBorders() || !portionInfo.GetMeta().HasSnapshotMinMax()) {
+ return false;
+ }
+ return true;
+ }
+
+ static std::set<ui32> GetColumnsFilter(const ISnapshotSchema::TPtr& schema) {
+ return schema->GetPkColumnsIds();
+ }
+};
+
+
+class TPortionsNormalizer::TNormalizerResult : public INormalizerChanges {
+ TMinMaxSnapshotChangesTask::TDataContainer Portions;
+public:
+ TNormalizerResult(TMinMaxSnapshotChangesTask::TDataContainer&& portions)
+ : Portions(std::move(portions))
+ {}
+
+ bool Apply(NTabletFlatExecutor::TTransactionContext& txc, const TNormalizationController& /* normController */) const override {
+ using namespace NColumnShard;
+ NIceDb::TNiceDb db(txc.DB);
+
+ for (auto&& portionInfo : Portions) {
+ for (auto&& chunk : portionInfo->Records) {
+ auto proto = portionInfo->GetMeta().SerializeToProto(chunk.ColumnId, chunk.Chunk);
+ if (!proto) {
+ continue;
+ }
+ auto rowProto = chunk.GetMeta().SerializeToProto();
+ *rowProto.MutablePortionMeta() = std::move(*proto);
+
+ db.Table<Schema::IndexColumns>().Key(0, portionInfo->GetDeprecatedGranuleId(), chunk.ColumnId,
+ portionInfo->GetMinSnapshot().GetPlanStep(), portionInfo->GetMinSnapshot().GetTxId(), portionInfo->GetPortion(), chunk.Chunk).Update(
+ NIceDb::TUpdate<Schema::IndexColumns::Metadata>(rowProto.SerializeAsString())
+ );
+ }
+ }
+ return true;
+ }
+};
+
+TConclusion<std::vector<INormalizerTask::TPtr>> TPortionsNormalizer::Init(const TNormalizationController& controller, NTabletFlatExecutor::TTransactionContext& txc) {
+ using namespace NColumnShard;
+ std::vector<INormalizerTask::TPtr> tasks;
+
+ NIceDb::TNiceDb db(txc.DB);
+
+ bool ready = true;
+ ready = ready & Schema::Precharge<Schema::IndexColumns>(db, txc.DB.GetScheme());
+ if (!ready) {
+ return TConclusionStatus::Fail("Not ready");
+ }
+
+ TTablesManager tablesManager(controller.GetStoragesManager(), 0);
+ if (!tablesManager.InitFromDB(db)) {
+ ACFL_ERROR("normalizer", "TPortionsNormalizer")("error", "can't initialize tables manager");
+ return TConclusionStatus::Fail("Can't load index");
+ }
+
+ if (!tablesManager.HasPrimaryIndex()) {
+ return tasks;
+ }
+
+ THashMap<ui64, std::shared_ptr<TPortionInfo>> portions;
+ THashMap<ui64, ISnapshotSchema::TPtr> schemas;
+ auto pkColumnIds = TMinMaxSnapshotChangesTask::GetColumnsFilter(tablesManager.GetPrimaryIndexSafe().GetVersionedIndex().GetLastSchema());
+
+ {
+ auto rowset = db.Table<Schema::IndexColumns>().Select();
+ if (!rowset.IsReady()) {
+ return TConclusionStatus::Fail("Not ready");
+ }
+
+ TSnapshot lastSnapshot(0, 0);
+ ISnapshotSchema::TPtr currentSchema;
+ auto initPortionCB = [&](const TPortionInfo& portion, const TColumnChunkLoadContext& loadContext) {
+ if (!currentSchema || lastSnapshot != portion.GetMinSnapshot()) {
+ currentSchema = tablesManager.GetPrimaryIndexSafe().GetVersionedIndex().GetSchema(portion.GetMinSnapshot());
+ lastSnapshot = portion.GetMinSnapshot();
+ }
+
+ AFL_VERIFY(portion.ValidSnapshotInfo())("details", portion.DebugString());
+ TColumnRecord rec(loadContext, currentSchema->GetIndexInfo());
+ if (!pkColumnIds.contains(rec.ColumnId)) {
+ return;
+ }
+ auto it = portions.find(portion.GetPortion());
+ auto portionMeta = loadContext.GetPortionMeta();
+ if (it == portions.end()) {
+ Y_ABORT_UNLESS(portion.Records.empty());
+ schemas[portion.GetPortionId()] = currentSchema;
+ auto portionNew = std::make_shared<TPortionInfo>(portion);
+ portionNew->AddRecord(currentSchema->GetIndexInfo(), rec, portionMeta);
+ it = portions.emplace(portion.GetPortion(), portionNew).first;
+ } else {
+ AFL_VERIFY(it->second->IsEqualWithSnapshots(portion))("self", it->second->DebugString())("item", portion.DebugString());
+ it->second->AddRecord(currentSchema->GetIndexInfo(), rec, portionMeta);
+ }
+
+ };
+
+ while (!rowset.EndOfSet()) {
+ TPortionInfo portion = TPortionInfo::BuildEmpty();
+ auto index = rowset.GetValue<Schema::IndexColumns::Index>();
+ Y_ABORT_UNLESS(index == 0);
+
+ portion.SetPathId(rowset.GetValue<Schema::IndexColumns::PathId>());
+
+ portion.SetMinSnapshot(rowset.GetValue<Schema::IndexColumns::PlanStep>(), rowset.GetValue<Schema::IndexColumns::TxId>());
+ portion.SetPortion(rowset.GetValue<Schema::IndexColumns::Portion>());
+ portion.SetDeprecatedGranuleId(rowset.GetValue<Schema::IndexColumns::Granule>());
+ portion.SetRemoveSnapshot(rowset.GetValue<Schema::IndexColumns::XPlanStep>(), rowset.GetValue<Schema::IndexColumns::XTxId>());
+
+ NOlap::TColumnChunkLoadContext chunkLoadContext(rowset, &DsGroupSelector);
+ initPortionCB(portion, chunkLoadContext);
+
+ if (!rowset.Next()) {
+ return TConclusionStatus::Fail("Not ready");
+ }
+ }
+ }
+
+ std::vector<std::shared_ptr<TPortionInfo>> package;
+ package.reserve(100);
+
+ ui64 brokenPortioncCount = 0;
+ for (auto&& portion : portions) {
+ if (TMinMaxSnapshotChangesTask::CheckPortion(*portion.second)) {
+ continue;
+ }
+ ++brokenPortioncCount;
+ package.emplace_back(portion.second);
+ if (package.size() == 100) {
+ std::vector<std::shared_ptr<TPortionInfo>> local;
+ local.swap(package);
+ tasks.emplace_back(std::make_shared<TPortionsNormalizerTask<TMinMaxSnapshotChangesTask>>(std::move(local), schemas));
+ }
+ }
+
+ if (package.size() > 0) {
+ tasks.emplace_back(std::make_shared<TPortionsNormalizerTask<TMinMaxSnapshotChangesTask>>(std::move(package), schemas));
+ }
+
+ AtomicSet(ActiveTasksCount, tasks.size());
+ ACFL_INFO("normalizer", "TPortionsNormalizer")("message", TStringBuilder() << brokenPortioncCount << " portions found");
+ return tasks;
+}
+
+}
diff --git a/ydb/core/tx/columnshard/normalizer/portion/min_max.h b/ydb/core/tx/columnshard/normalizer/portion/min_max.h
new file mode 100644
index 0000000000..267a379e2b
--- /dev/null
+++ b/ydb/core/tx/columnshard/normalizer/portion/min_max.h
@@ -0,0 +1,44 @@
+#pragma once
+
+#include <ydb/core/tx/columnshard/normalizer/abstract/abstract.h>
+#include <ydb/core/tx/columnshard/columnshard_schema.h>
+
+#include <ydb/core/tx/columnshard/defs.h>
+
+
+namespace NKikimr::NColumnShard {
+ class TTablesManager;
+}
+
+namespace NKikimr::NOlap {
+
+class TPortionsNormalizer : public INormalizerComponent {
+public:
+ class TNormalizerResult;
+
+public:
+ TPortionsNormalizer(TTabletStorageInfo* info)
+ : DsGroupSelector(info)
+ {}
+
+ virtual const TString& GetName() const override {
+ const static TString name = "TPortionsNormalizer";
+ return name;
+ }
+
+ virtual bool WaitResult() const override {
+ return AtomicGet(ActiveTasksCount) > 0;
+ }
+
+ void OnResultReady() override {
+ AtomicDecrement(ActiveTasksCount);
+ }
+
+ virtual TConclusion<std::vector<INormalizerTask::TPtr>> Init(const TNormalizationController& controller, NTabletFlatExecutor::TTransactionContext& txc) override;
+
+private:
+ NColumnShard::TBlobGroupSelector DsGroupSelector;
+ TAtomic ActiveTasksCount = 0;
+};
+
+}
diff --git a/ydb/core/tx/columnshard/normalizer/portion/normalizer.cpp b/ydb/core/tx/columnshard/normalizer/portion/normalizer.cpp
index 8a76e4442c..0d02541c9a 100644
--- a/ydb/core/tx/columnshard/normalizer/portion/normalizer.cpp
+++ b/ydb/core/tx/columnshard/normalizer/portion/normalizer.cpp
@@ -1,206 +1,4 @@
#include "normalizer.h"
-
-#include <ydb/core/tx/conveyor/usage/abstract.h>
-#include <ydb/core/tx/conveyor/usage/service.h>
-
-#include <ydb/core/tx/columnshard/blobs_reader/task.h>
-#include <ydb/core/tx/columnshard/engines/portions/portion_info.h>
-#include <ydb/core/tx/columnshard/tables_manager.h>
-#include <ydb/core/tx/columnshard/columnshard_private_events.h>
-
-#include <ydb/core/formats/arrow/size_calcer.h>
-
-
namespace NKikimr::NOlap {
-
-class TChunksNormalizer::TNormalizerResult : public INormalizerChanges {
- std::vector<TChunksNormalizer::TChunkInfo> Chunks;
-public:
- TNormalizerResult(std::vector<TChunksNormalizer::TChunkInfo>&& chunks)
- : Chunks(std::move(chunks))
- {}
-
- bool Apply(NTabletFlatExecutor::TTransactionContext& txc, const TNormalizationController& /* normController */) const override {
- using namespace NColumnShard;
- NIceDb::TNiceDb db(txc.DB);
-
- for (auto&& chunkInfo : Chunks) {
-
- NKikimrTxColumnShard::TIndexColumnMeta metaProto = chunkInfo.GetMetaProto();
- metaProto.SetNumRows(chunkInfo.GetUpdate().GetNumRows());
- metaProto.SetRawBytes(chunkInfo.GetUpdate().GetRawBytes());
-
- const auto& key = chunkInfo.GetKey();
-
- db.Table<Schema::IndexColumns>().Key(key.GetIndex(), key.GetGranule(), key.GetColumnIdx(),
- key.GetPlanStep(), key.GetTxId(), key.GetPortion(), key.GetChunk()).Update(
- NIceDb::TUpdate<Schema::IndexColumns::Metadata>(metaProto.SerializeAsString())
- );
- }
- return true;
- }
-};
-
-class TNormalizationChangesTask: public NConveyor::ITask {
-private:
- THashMap<NKikimr::NOlap::TBlobRange, TString> Blobs;
- std::vector<TChunksNormalizer::TChunkInfo> Chunks;
- TNormalizationContext NormContext;
-protected:
- virtual bool DoExecute() override {
- for (auto&& chunkInfo : Chunks) {
- const auto& blobRange = chunkInfo.GetBlobRange();
-
- auto blobIt = Blobs.find(blobRange);
- Y_ABORT_UNLESS(blobIt != Blobs.end());
-
- auto columnLoader = chunkInfo.GetLoader();
- Y_ABORT_UNLESS(!!columnLoader);
-
- TPortionInfo::TAssembleBlobInfo assembleBlob(blobIt->second);
- auto batch = assembleBlob.BuildRecordBatch(*columnLoader);
- Y_ABORT_UNLESS(!!batch);
-
- chunkInfo.MutableUpdate().SetNumRows(batch->num_rows());
- chunkInfo.MutableUpdate().SetRawBytes(NArrow::GetBatchDataSize(batch));
- }
-
- auto changes = std::make_shared<TChunksNormalizer::TNormalizerResult>(std::move(Chunks));
- TActorContext::AsActorContext().Send(NormContext.GetColumnshardActor(), std::make_unique<NColumnShard::TEvPrivate::TEvNormalizerResult>(changes));
- return true;
- }
-
-public:
- TNormalizationChangesTask(THashMap<NKikimr::NOlap::TBlobRange, TString>&& blobs, const TNormalizationContext& nCtx, std::vector<TChunksNormalizer::TChunkInfo>&& chunks)
- : Blobs(std::move(blobs))
- , Chunks(std::move(chunks))
- , NormContext(nCtx)
- {}
-
- virtual TString GetTaskClassIdentifier() const override {
- const static TString name = "TNormalizationChangesTask";
- return name;
- }
-};
-
-class TReadPortionsTask: public NOlap::NBlobOperations::NRead::ITask {
- private:
- using TBase = NOlap::NBlobOperations::NRead::ITask;
- std::vector<TChunksNormalizer::TChunkInfo> Chunks;
- TNormalizationContext NormContext;
-
- public:
- TReadPortionsTask(const TNormalizationContext& nCtx, const std::vector<std::shared_ptr<IBlobsReadingAction>>& actions, std::vector<TChunksNormalizer::TChunkInfo>&& chunks)
- : TBase(actions, "CS::NORMALIZER")
- , Chunks(std::move(chunks))
- , NormContext(nCtx)
- {
- }
-
- protected:
- virtual void DoOnDataReady(const std::shared_ptr<NOlap::NResourceBroker::NSubscribe::TResourcesGuard>& resourcesGuard) override {
- Y_UNUSED(resourcesGuard);
- std::shared_ptr<NConveyor::ITask> task = std::make_shared<TNormalizationChangesTask>(std::move(ExtractBlobsData()), NormContext, std::move(Chunks));
- NConveyor::TCompServiceOperator::SendTaskToExecute(task);
- }
-
- virtual bool DoOnError(const TBlobRange& range, const IBlobsReadingAction::TErrorStatus& status) override {
- Y_UNUSED(status, range);
- return false;
- }
-
- public:
- using TBase::TBase;
-};
-
-class TChunksNormalizerTask : public INormalizerTask {
- std::vector<TChunksNormalizer::TChunkInfo> Package;
-public:
- TChunksNormalizerTask(std::vector<TChunksNormalizer::TChunkInfo>&& package)
- : Package(std::move(package))
- {}
-
- void Start(const TNormalizationController& controller, const TNormalizationContext& nCtx) override {
- controller.GetCounters().CountObjects(Package.size());
- auto readingAction = controller.GetStoragesManager()->GetInsertOperator()->StartReadingAction("CS::NORMALIZER");
- for (auto&& chunk : Package) {
- readingAction->AddRange(chunk.GetBlobRange());
- }
- std::vector<std::shared_ptr<IBlobsReadingAction>> actions = {readingAction};
- ui64 memSize = 0;
- NOlap::NResourceBroker::NSubscribe::ITask::StartResourceSubscription(
- nCtx.GetResourceSubscribeActor(),std::make_shared<NOlap::NBlobOperations::NRead::ITask::TReadSubscriber>(
- std::make_shared<TReadPortionsTask>( nCtx, actions, std::move(Package) ), 1, memSize, "CS::NORMALIZER", controller.GetTaskSubscription()));
- }
-};
-
-void TChunksNormalizer::TChunkInfo::InitSchema(const NColumnShard::TTablesManager& tm) {
- Schema = tm.GetPrimaryIndexSafe().GetVersionedIndex().GetSchema(NOlap::TSnapshot(Key.GetPlanStep(), Key.GetTxId()));
-}
-
-TConclusion<std::vector<INormalizerTask::TPtr>> TChunksNormalizer::Init(const TNormalizationController& controller, NTabletFlatExecutor::TTransactionContext& txc) {
- using namespace NColumnShard;
- NIceDb::TNiceDb db(txc.DB);
-
- bool ready = true;
- ready = ready & Schema::Precharge<Schema::IndexColumns>(db, txc.DB.GetScheme());
- if (!ready) {
- return TConclusionStatus::Fail("Not ready");
- }
-
- std::vector<TChunkInfo> chunks;
- {
- auto rowset = db.Table<Schema::IndexColumns>().Select();
- if (!rowset.IsReady()) {
- return TConclusionStatus::Fail("Not ready");
- }
-
- while (!rowset.EndOfSet()) {
- TKey key;
- key.Load(rowset);
-
- TChunkInfo chunkInfo(std::move(key), rowset, &DsGroupSelector);
- if (chunkInfo.NormalizationRequired()) {
- chunks.emplace_back(std::move(chunkInfo));
- }
-
- if (!rowset.Next()) {
- return TConclusionStatus::Fail("Not ready");
- }
- }
- }
-
- std::vector<INormalizerTask::TPtr> tasks;
- ACFL_INFO("normalizer", "TChunksNormalizer")("message", TStringBuilder() << chunks.size() << " chunks found");
- if (chunks.empty()) {
- return tasks;
- }
-
- TTablesManager tablesManager(controller.GetStoragesManager(), 0);
- if (!tablesManager.InitFromDB(db)) {
- ACFL_ERROR("step", "TChunksNormalizer.StartNormalizer")("error", "can't initialize tables manager");
- return TConclusionStatus::Fail("Can't load index");
- }
-
- std::vector<TChunkInfo> package;
- package.reserve(100);
-
- for (auto&& chunk : chunks) {
- chunk.InitSchema(tablesManager);
- package.emplace_back(chunk);
- if (package.size() == 100) {
- std::vector<TChunkInfo> local;
- local.swap(package);
- tasks.emplace_back(std::make_shared<TChunksNormalizerTask>(std::move(local)));
- }
- }
-
- if (package.size() > 0) {
- tasks.emplace_back(std::make_shared<TChunksNormalizerTask>(std::move(package)));
- }
- AtomicSet(ActiveTasksCount, tasks.size());
- return tasks;
-}
-
}
diff --git a/ydb/core/tx/columnshard/normalizer/portion/normalizer.h b/ydb/core/tx/columnshard/normalizer/portion/normalizer.h
index d2748c0bf2..5cb952f68a 100644
--- a/ydb/core/tx/columnshard/normalizer/portion/normalizer.h
+++ b/ydb/core/tx/columnshard/normalizer/portion/normalizer.h
@@ -1,110 +1,77 @@
#pragma once
#include <ydb/core/tx/columnshard/normalizer/abstract/abstract.h>
-#include <ydb/core/tx/columnshard/columnshard_schema.h>
+#include <ydb/core/tx/columnshard/engines/scheme/abstract_scheme.h>
-#include <ydb/core/tx/columnshard/defs.h>
+#include <ydb/core/tx/conveyor/usage/abstract.h>
+#include <ydb/core/tx/conveyor/usage/service.h>
+#include <ydb/core/tx/columnshard/blobs_reader/task.h>
+#include <ydb/core/tx/columnshard/columnshard_private_events.h>
+#include <ydb/core/tx/columnshard/defs.h>
-namespace NKikimr::NColumnShard {
- class TTablesManager;
-}
namespace NKikimr::NOlap {
- class TChunksNormalizer : public INormalizerComponent {
- public:
- class TNormalizerResult;
-
- class TKey {
- YDB_READONLY(ui64, Index, 0);
- YDB_READONLY(ui64, Granule, 0);
- YDB_READONLY(ui64, ColumnIdx, 0);
- YDB_READONLY(ui64, PlanStep, 0);
- YDB_READONLY(ui64, TxId, 0);
- YDB_READONLY(ui64, Portion, 0);
- YDB_READONLY(ui64, Chunk, 0);
-
- public:
- template <class TRowset>
- void Load(TRowset& rowset) {
- using namespace NColumnShard;
- Index = rowset.template GetValue<Schema::IndexColumns::Index>();
- Granule = rowset.template GetValue<Schema::IndexColumns::Granule>();
- ColumnIdx = rowset.template GetValue<Schema::IndexColumns::ColumnIdx>();
- PlanStep = rowset.template GetValue<Schema::IndexColumns::PlanStep>();
- TxId = rowset.template GetValue<Schema::IndexColumns::TxId>();
- Portion = rowset.template GetValue<Schema::IndexColumns::Portion>();
- Chunk = rowset.template GetValue<Schema::IndexColumns::Chunk>();
- }
-
- bool operator<(const TKey& other) const {
- return std::make_tuple(Portion, Chunk, ColumnIdx) < std::make_tuple(other.Portion, other.Chunk, other.ColumnIdx);
- }
- };
-
- class TUpdate {
- YDB_ACCESSOR(ui64, NumRows, 0);
- YDB_ACCESSOR(ui64, RawBytes, 0);
- };
-
- class TChunkInfo {
- YDB_READONLY_DEF(TKey, Key);
- TColumnChunkLoadContext CLContext;
- ISnapshotSchema::TPtr Schema;
-
- YDB_ACCESSOR_DEF(TUpdate, Update);
- public:
- template <class TSource>
- TChunkInfo(TKey&& key, const TSource& rowset, const IBlobGroupSelector* dsGroupSelector)
- : Key(std::move(key))
- , CLContext(rowset, dsGroupSelector)
- {}
-
- const TBlobRange& GetBlobRange() const {
- return CLContext.GetBlobRange();
- }
-
- const NKikimrTxColumnShard::TIndexColumnMeta& GetMetaProto() const {
- return CLContext.GetMetaProto();
- }
-
- bool NormalizationRequired() const {
- return !CLContext.GetMetaProto().HasNumRows() || !CLContext.GetMetaProto().HasRawBytes();
- }
-
- std::shared_ptr<TColumnLoader> GetLoader() const {
- return Schema->GetColumnLoader(Key.GetColumnIdx());
- }
- void InitSchema(const NColumnShard::TTablesManager& tm);
-
- bool operator<(const TChunkInfo& other) const {
- return Key < other.Key;
- }
- };
-
- public:
- TChunksNormalizer(TTabletStorageInfo* info)
- : DsGroupSelector(info)
- {}
-
- virtual const TString& GetName() const override {
- const static TString name = "TChunksNormalizer";
- return name;
+template <class TConveyorTask>
+class TReadPortionsTask: public NOlap::NBlobOperations::NRead::ITask {
+private:
+ using TBase = NOlap::NBlobOperations::NRead::ITask;
+ typename TConveyorTask::TDataContainer Data;
+ THashMap<ui64, ISnapshotSchema::TPtr> Schemas;
+ TNormalizationContext NormContext;
+
+public:
+ TReadPortionsTask(const TNormalizationContext& nCtx, const std::vector<std::shared_ptr<IBlobsReadingAction>>& actions, typename TConveyorTask::TDataContainer&& data, THashMap<ui64, ISnapshotSchema::TPtr>&& schemas)
+ : TBase(actions, "CS::NORMALIZER")
+ , Data(std::move(data))
+ , Schemas(std::move(schemas))
+ , NormContext(nCtx)
+ {
+ }
+
+protected:
+ virtual void DoOnDataReady(const std::shared_ptr<NOlap::NResourceBroker::NSubscribe::TResourcesGuard>& resourcesGuard) override {
+ Y_UNUSED(resourcesGuard);
+ std::shared_ptr<NConveyor::ITask> task = std::make_shared<TConveyorTask>(std::move(ExtractBlobsData()), NormContext, std::move(Data), std::move(Schemas));
+ NConveyor::TCompServiceOperator::SendTaskToExecute(task);
+ }
+
+ virtual bool DoOnError(const TBlobRange& range, const IBlobsReadingAction::TErrorStatus& status) override {
+ Y_UNUSED(status, range);
+ return false;
+ }
+
+public:
+ using TBase::TBase;
+};
+
+template <class TConveyorTask>
+class TPortionsNormalizerTask : public INormalizerTask {
+ typename TConveyorTask::TDataContainer Package;
+ THashMap<ui64, ISnapshotSchema::TPtr> Schemas;
+public:
+ TPortionsNormalizerTask(typename TConveyorTask::TDataContainer&& package)
+ : Package(std::move(package))
+ {}
+
+ TPortionsNormalizerTask(typename TConveyorTask::TDataContainer&& package, const THashMap<ui64, ISnapshotSchema::TPtr>& schemas)
+ : Package(std::move(package))
+ , Schemas(schemas)
+ {}
+
+ void Start(const TNormalizationController& controller, const TNormalizationContext& nCtx) override {
+ controller.GetCounters().CountObjects(Package.size());
+ auto readingAction = controller.GetStoragesManager()->GetInsertOperator()->StartReadingAction("CS::NORMALIZER");
+ ui64 memSize = 0;
+ for (auto&& data : Package) {
+ TConveyorTask::FillBlobRanges(readingAction, data);
+ memSize += TConveyorTask::GetMemSize(data);
}
-
- virtual bool WaitResult() const override {
- return AtomicGet(ActiveTasksCount) > 0;
- }
-
- void OnResultReady() override {
- AtomicDecrement(ActiveTasksCount);
- }
-
- virtual TConclusion<std::vector<INormalizerTask::TPtr>> Init(const TNormalizationController& controller, NTabletFlatExecutor::TTransactionContext& txc) override;
-
- private:
- NColumnShard::TBlobGroupSelector DsGroupSelector;
- TAtomic ActiveTasksCount = 0;
- };
+ std::vector<std::shared_ptr<IBlobsReadingAction>> actions = {readingAction};
+ NOlap::NResourceBroker::NSubscribe::ITask::StartResourceSubscription(
+ nCtx.GetResourceSubscribeActor(),std::make_shared<NOlap::NBlobOperations::NRead::ITask::TReadSubscriber>(
+ std::make_shared<TReadPortionsTask<TConveyorTask>>( nCtx, actions, std::move(Package), std::move(Schemas) ), 1, memSize, "CS::NORMALIZER", controller.GetTaskSubscription()));
+ }
+};
}
diff --git a/ydb/core/tx/columnshard/normalizer/portion/ya.make b/ydb/core/tx/columnshard/normalizer/portion/ya.make
index 0f3a2ffc81..2dd2475a19 100644
--- a/ydb/core/tx/columnshard/normalizer/portion/ya.make
+++ b/ydb/core/tx/columnshard/normalizer/portion/ya.make
@@ -2,6 +2,8 @@ LIBRARY()
SRCS(
normalizer.cpp
+ min_max.cpp
+ chunks.cpp
)
PEERDIR(
diff --git a/ydb/core/tx/columnshard/ut_rw/ut_normalizer.cpp b/ydb/core/tx/columnshard/ut_rw/ut_normalizer.cpp
index 725469ebce..4be1264fdb 100644
--- a/ydb/core/tx/columnshard/ut_rw/ut_normalizer.cpp
+++ b/ydb/core/tx/columnshard/ut_rw/ut_normalizer.cpp
@@ -150,6 +150,61 @@ public:
}
};
+class TMinMaxCleaner : public NYDBTest::ILocalDBModifier {
+public:
+ virtual void Apply(NTabletFlatExecutor::TTransactionContext& txc) const override {
+ using namespace NColumnShard;
+ NIceDb::TNiceDb db(txc.DB);
+
+ std::vector<TPortionRecord> portion2Key;
+ std::optional<ui64> pathId;
+ {
+ auto rowset = db.Table<Schema::IndexColumns>().Select();
+ UNIT_ASSERT(rowset.IsReady());
+
+ while (!rowset.EndOfSet()) {
+ TPortionRecord key;
+ key.Index = rowset.GetValue<Schema::IndexColumns::Index>();
+ key.Granule = rowset.GetValue<Schema::IndexColumns::Granule>();
+ key.ColumnIdx = rowset.GetValue<Schema::IndexColumns::ColumnIdx>();
+ key.PlanStep = rowset.GetValue<Schema::IndexColumns::PlanStep>();
+ key.TxId = rowset.GetValue<Schema::IndexColumns::TxId>();
+ key.Portion = rowset.GetValue<Schema::IndexColumns::Portion>();
+ key.Chunk = rowset.GetValue<Schema::IndexColumns::Chunk>();
+
+ key.XPlanStep = rowset.GetValue<Schema::IndexColumns::XPlanStep>();
+ key.XTxId = rowset.GetValue<Schema::IndexColumns::XTxId>();
+ key.Blob = rowset.GetValue<Schema::IndexColumns::Blob>();
+ key.Metadata = rowset.GetValue<Schema::IndexColumns::Metadata>();
+ key.Offset = rowset.GetValue<Schema::IndexColumns::Offset>();
+ key.Size = rowset.GetValue<Schema::IndexColumns::Size>();
+
+ pathId = rowset.GetValue<Schema::IndexColumns::PathId>();
+
+ portion2Key.emplace_back(std::move(key));
+
+ UNIT_ASSERT(rowset.Next());
+ }
+ }
+
+ UNIT_ASSERT(pathId.has_value());
+
+ for (auto&& key: portion2Key) {
+ NKikimrTxColumnShard::TIndexColumnMeta metaProto;
+ UNIT_ASSERT(metaProto.ParseFromArray(key.Metadata.data(), key.Metadata.size()));
+ if (metaProto.HasPortionMeta()) {
+ metaProto.MutablePortionMeta()->ClearRecordSnapshotMax();
+ metaProto.MutablePortionMeta()->ClearRecordSnapshotMin();
+ }
+
+ db.Table<Schema::IndexColumns>().Key(key.Index, key.Granule, key.ColumnIdx,
+ key.PlanStep, key.TxId, key.Portion, key.Chunk).Update(
+ NIceDb::TUpdate<Schema::IndexColumns::Metadata>(metaProto.SerializeAsString())
+ );
+ }
+ }
+};
+
template <class TLocalDBModifier>
class TPrepareLocalDBController: public NKikimr::NYDBTest::NColumnShard::TController {
private:
@@ -172,17 +227,19 @@ Y_UNIT_TEST_SUITE(Normalizers) {
const ui64 tableId = 1;
const std::vector<std::pair<TString, TTypeInfo>> schema = {
- {"key", TTypeInfo(NTypeIds::Uint64) },
+ {"key1", TTypeInfo(NTypeIds::Uint64) },
+ {"key2", TTypeInfo(NTypeIds::Uint64) },
{"field", TTypeInfo(NTypeIds::Utf8) }
};
- PrepareTablet(runtime, tableId, schema);
+ PrepareTablet(runtime, tableId, schema, 2);
const ui64 txId = 111;
- NConstruction::IArrayBuilder::TPtr keyColumn = std::make_shared<NConstruction::TSimpleArrayConstructor<NConstruction::TIntSeqFiller<arrow::UInt64Type>>>("key");
+ NConstruction::IArrayBuilder::TPtr key1Column = std::make_shared<NConstruction::TSimpleArrayConstructor<NConstruction::TIntSeqFiller<arrow::UInt64Type>>>("key1");
+ NConstruction::IArrayBuilder::TPtr key2Column = std::make_shared<NConstruction::TSimpleArrayConstructor<NConstruction::TIntSeqFiller<arrow::UInt64Type>>>("key2");
NConstruction::IArrayBuilder::TPtr column = std::make_shared<NConstruction::TSimpleArrayConstructor<NConstruction::TStringPoolFiller>>(
"field", NConstruction::TStringPoolFiller(8, 100));
- auto batch = NConstruction::TRecordBatchConstructor({ keyColumn, column }).BuildBatch(2048);
+ auto batch = NConstruction::TRecordBatchConstructor({ key1Column, key2Column, column }).BuildBatch(2048);
TString blobData = NArrow::SerializeBatchNoCompression(batch);
auto evWrite = std::make_unique<NKikimr::NEvents::TDataEvents::TEvWrite>(txId);
@@ -219,6 +276,10 @@ Y_UNIT_TEST_SUITE(Normalizers) {
Y_UNIT_TEST(ColumnChunkNormalizer) {
TestNormalizerImpl<TColumnChunksCleaner>();
}
+
+ Y_UNIT_TEST(MinMaxNormalizer) {
+ TestNormalizerImpl<TMinMaxCleaner>();
+ }
}
} // namespace NKikimr