diff options
author | nsofya <nsofya@ydb.tech> | 2023-11-02 23:38:49 +0300 |
---|---|---|
committer | nsofya <nsofya@ydb.tech> | 2023-11-02 23:55:12 +0300 |
commit | 47a733af080e57d9d3d7659b2458d9b21eaf7f18 (patch) | |
tree | 95bbae7c23f4cee213fd0e443590f3e1709b84e8 | |
parent | dab4e7eff56df570d1174a06824539de1e64d22c (diff) | |
download | ydb-47a733af080e57d9d3d7659b2458d9b21eaf7f18.tar.gz |
KIKIMR-19805: Read portions data and normalize
26 files changed, 753 insertions, 326 deletions
diff --git a/ydb/core/formats/arrow/arrow_helpers.cpp b/ydb/core/formats/arrow/arrow_helpers.cpp index f9951b9432..cc554adee8 100644 --- a/ydb/core/formats/arrow/arrow_helpers.cpp +++ b/ydb/core/formats/arrow/arrow_helpers.cpp @@ -139,24 +139,37 @@ std::shared_ptr<arrow::RecordBatch> MakeEmptyBatch(const std::shared_ptr<arrow:: return arrow::RecordBatch::Make(schema, 0, columns); } -std::shared_ptr<arrow::RecordBatch> ExtractColumns(const std::shared_ptr<arrow::RecordBatch>& srcBatch, - const std::vector<TString>& columnNames) { - std::vector<std::shared_ptr<arrow::Field>> fields; - fields.reserve(columnNames.size()); - std::vector<std::shared_ptr<arrow::Array>> columns; - columns.reserve(columnNames.size()); - - auto srcSchema = srcBatch->schema(); - for (auto& name : columnNames) { - int pos = srcSchema->GetFieldIndex(name); - if (pos < 0) { - return {}; +namespace { + template <class TStringType> + std::shared_ptr<arrow::RecordBatch> ExtractColumnsImpl(const std::shared_ptr<arrow::RecordBatch>& srcBatch, + const std::vector<TStringType>& columnNames) { + std::vector<std::shared_ptr<arrow::Field>> fields; + fields.reserve(columnNames.size()); + std::vector<std::shared_ptr<arrow::Array>> columns; + columns.reserve(columnNames.size()); + + auto srcSchema = srcBatch->schema(); + for (auto& name : columnNames) { + int pos = srcSchema->GetFieldIndex(name); + if (pos < 0) { + return {}; + } + fields.push_back(srcSchema->field(pos)); + columns.push_back(srcBatch->column(pos)); } - fields.push_back(srcSchema->field(pos)); - columns.push_back(srcBatch->column(pos)); + + return arrow::RecordBatch::Make(std::make_shared<arrow::Schema>(std::move(fields)), srcBatch->num_rows(), std::move(columns)); } +} - return arrow::RecordBatch::Make(std::make_shared<arrow::Schema>(std::move(fields)), srcBatch->num_rows(), std::move(columns)); +std::shared_ptr<arrow::RecordBatch> ExtractColumns(const std::shared_ptr<arrow::RecordBatch>& srcBatch, + const std::vector<TString>& columnNames) { + return ExtractColumnsImpl(srcBatch, columnNames); +} + +std::shared_ptr<arrow::RecordBatch> ExtractColumns(const std::shared_ptr<arrow::RecordBatch>& srcBatch, + const std::vector<std::string>& columnNames) { + return ExtractColumnsImpl(srcBatch, columnNames); } std::shared_ptr<arrow::RecordBatch> ExtractColumnsValidate(const std::shared_ptr<arrow::RecordBatch>& srcBatch, diff --git a/ydb/core/formats/arrow/arrow_helpers.h b/ydb/core/formats/arrow/arrow_helpers.h index 51b0246a63..fe4bd4c3c0 100644 --- a/ydb/core/formats/arrow/arrow_helpers.h +++ b/ydb/core/formats/arrow/arrow_helpers.h @@ -57,6 +57,8 @@ std::shared_ptr<arrow::RecordBatch> MakeEmptyBatch(const std::shared_ptr<arrow:: std::shared_ptr<arrow::RecordBatch> ExtractColumns(const std::shared_ptr<arrow::RecordBatch>& srcBatch, const std::vector<TString>& columnNames); +std::shared_ptr<arrow::RecordBatch> ExtractColumns(const std::shared_ptr<arrow::RecordBatch>& srcBatch, + const std::vector<std::string>& columnNames); std::shared_ptr<arrow::RecordBatch> ExtractColumnsValidate(const std::shared_ptr<arrow::RecordBatch>& srcBatch, const std::vector<TString>& columnNames); std::shared_ptr<arrow::RecordBatch> ExtractColumns(const std::shared_ptr<arrow::RecordBatch>& srcBatch, diff --git a/ydb/core/formats/arrow/permutations.cpp b/ydb/core/formats/arrow/permutations.cpp index a6e75019f7..3abc3bb2ce 100644 --- a/ydb/core/formats/arrow/permutations.cpp +++ b/ydb/core/formats/arrow/permutations.cpp @@ -121,6 +121,7 @@ std::shared_ptr<arrow::UInt64Array> MakeFilterPermutation(const std::vector<ui64 } std::shared_ptr<arrow::RecordBatch> CopyRecords(const std::shared_ptr<arrow::RecordBatch>& source, const std::vector<ui64>& indexes) { + Y_ABORT_UNLESS(!!source); auto schema = source->schema(); std::vector<std::shared_ptr<arrow::Array>> columns; for (auto&& i : source->columns()) { diff --git a/ydb/core/tablet/resource_broker.cpp b/ydb/core/tablet/resource_broker.cpp index 9204c6410b..5583ff7186 100644 --- a/ydb/core/tablet/resource_broker.cpp +++ b/ydb/core/tablet/resource_broker.cpp @@ -1327,6 +1327,12 @@ NKikimrResourceBroker::TResourceBrokerConfig MakeDefaultConfig() queue->MutableLimit()->SetMemory(CSScanMemoryLimit); queue = config.AddQueues(); + queue->SetName("queue_cs_normalizer"); + queue->SetWeight(100); + queue->MutableLimit()->SetCpu(3); + queue->MutableLimit()->SetMemory(CSScanMemoryLimit); + + queue = config.AddQueues(); queue->SetName("queue_transaction"); queue->SetWeight(100); queue->MutableLimit()->SetCpu(4); @@ -1423,6 +1429,11 @@ NKikimrResourceBroker::TResourceBrokerConfig MakeDefaultConfig() task->SetDefaultDuration(TDuration::Minutes(10).GetValue()); task = config.AddTasks(); + task->SetName("CS::NORMALIZER"); + task->SetQueueName("queue_cs_normalizer"); + task->SetDefaultDuration(TDuration::Minutes(10).GetValue()); + + task = config.AddTasks(); task->SetName(NLocalDb::TransactionTaskName); task->SetQueueName("queue_transaction"); task->SetDefaultDuration(TDuration::Minutes(10).GetValue()); diff --git a/ydb/core/tx/columnshard/columnshard_impl.cpp b/ydb/core/tx/columnshard/columnshard_impl.cpp index 78d722253c..b815b0926c 100644 --- a/ydb/core/tx/columnshard/columnshard_impl.cpp +++ b/ydb/core/tx/columnshard/columnshard_impl.cpp @@ -27,7 +27,8 @@ #include <ydb/core/tx/columnshard/normalizer/granule/normalizer.h> -#include <ydb/core/tx/columnshard/normalizer/portion/normalizer.h> +#include <ydb/core/tx/columnshard/normalizer/portion/min_max.h> +#include <ydb/core/tx/columnshard/normalizer/portion/chunks.h> namespace NKikimr::NColumnShard { @@ -183,6 +184,7 @@ TColumnShard::TColumnShard(TTabletStorageInfo* info, const TActorId& tablet) NormalizerController.RegisterNormalizer(std::make_shared<NOlap::TGranulesNormalizer>()); NormalizerController.RegisterNormalizer(std::make_shared<NOlap::TChunksNormalizer>(Info())); + NormalizerController.RegisterNormalizer(std::make_shared<NOlap::TPortionsNormalizer>(Info())); } void TColumnShard::OnDetach(const TActorContext& ctx) { diff --git a/ydb/core/tx/columnshard/columnshard_ut_common.cpp b/ydb/core/tx/columnshard/columnshard_ut_common.cpp index c90860471b..a1d30787f9 100644 --- a/ydb/core/tx/columnshard/columnshard_ut_common.cpp +++ b/ydb/core/tx/columnshard/columnshard_ut_common.cpp @@ -429,7 +429,7 @@ namespace NKikimr::NColumnShard { PlanSchemaTx(runtime, sender, snap); } - void PrepareTablet(TTestBasicRuntime& runtime, const ui64 tableId, const std::vector<std::pair<TString, NScheme::TTypeInfo>>& schema) { + void PrepareTablet(TTestBasicRuntime& runtime, const ui64 tableId, const std::vector<std::pair<TString, NScheme::TTypeInfo>>& schema, const ui32 keySize) { using namespace NTxUT; CreateTestBootstrapper(runtime, CreateTestTabletInfo(TTestTxConfig::TxTablet0, TTabletTypes::ColumnShard), &CreateColumnShard); @@ -439,7 +439,11 @@ namespace NKikimr::NColumnShard { TestTableDescription tableDescription; tableDescription.Schema = schema; - tableDescription.Pk = { schema[0] }; + tableDescription.Pk = {}; + for (ui64 i = 0; i < keySize; ++i) { + Y_ABORT_UNLESS(i < schema.size()); + tableDescription.Pk.push_back(schema[i]); + } TActorId sender = runtime.AllocateEdgeActor(); SetupSchema(runtime, sender, tableId, tableDescription); } diff --git a/ydb/core/tx/columnshard/columnshard_ut_common.h b/ydb/core/tx/columnshard/columnshard_ut_common.h index 517ac70c1d..b69df6a03e 100644 --- a/ydb/core/tx/columnshard/columnshard_ut_common.h +++ b/ydb/core/tx/columnshard/columnshard_ut_common.h @@ -561,7 +561,7 @@ namespace NKikimr::NColumnShard { void SetupSchema(TTestBasicRuntime& runtime, TActorId& sender, ui64 pathId, const TestTableDescription& table = {}, TString codec = "none"); - void PrepareTablet(TTestBasicRuntime& runtime, const ui64 tableId, const std::vector<std::pair<TString, NScheme::TTypeInfo>>& schema); + void PrepareTablet(TTestBasicRuntime& runtime, const ui64 tableId, const std::vector<std::pair<TString, NScheme::TTypeInfo>>& schema, const ui32 keySize = 1); std::shared_ptr<arrow::RecordBatch> ReadAllAsBatch(TTestBasicRuntime& runtime, const ui64 tableId, const NOlap::TSnapshot& snapshot, const std::vector<std::pair<TString, NScheme::TTypeInfo>>& schema); } diff --git a/ydb/core/tx/columnshard/engines/portions/meta.cpp b/ydb/core/tx/columnshard/engines/portions/meta.cpp index 4c7576ab61..436b180a4f 100644 --- a/ydb/core/tx/columnshard/engines/portions/meta.cpp +++ b/ydb/core/tx/columnshard/engines/portions/meta.cpp @@ -43,6 +43,7 @@ bool TPortionMeta::DeserializeFromProto(const NKikimrTxColumnShard::TIndexPortio AFL_ERROR(NKikimrServices::TX_COLUMNSHARD)("event", "DeserializeFromProto")("error", "incorrect portion meta")("meta", portionMeta.DebugString()); return false; } + Y_ABORT_UNLESS(Produced != TPortionMeta::EProduced::UNSPECIFIED); if (portionMeta.HasPrimaryKeyBorders()) { ReplaceKeyEdges = std::make_shared<NArrow::TFirstLastSpecialKeys>(portionMeta.GetPrimaryKeyBorders(), indexInfo.GetReplaceKey()); diff --git a/ydb/core/tx/columnshard/engines/portions/meta.h b/ydb/core/tx/columnshard/engines/portions/meta.h index 7e12b83b09..7acafc89a7 100644 --- a/ydb/core/tx/columnshard/engines/portions/meta.h +++ b/ydb/core/tx/columnshard/engines/portions/meta.h @@ -42,6 +42,14 @@ public: out << info.DebugString(); return out; } + + bool HasSnapshotMinMax() const { + return !!RecordSnapshotMax && !!RecordSnapshotMin; + } + + bool HasPrimaryKeyBorders() const { + return !!IndexKeyStart && !!IndexKeyEnd; + } }; class TPortionAddress { diff --git a/ydb/core/tx/columnshard/engines/portions/portion_info.cpp b/ydb/core/tx/columnshard/engines/portions/portion_info.cpp index cb1cdb9070..778174d01b 100644 --- a/ydb/core/tx/columnshard/engines/portions/portion_info.cpp +++ b/ydb/core/tx/columnshard/engines/portions/portion_info.cpp @@ -34,7 +34,6 @@ void TPortionInfo::AddMetadata(const ISnapshotSchema& snapshotSchema, const std: void TPortionInfo::AddMetadata(const ISnapshotSchema& snapshotSchema, const NArrow::TFirstLastSpecialKeys& primaryKeys, const NArrow::TMinMaxSpecialKeys& snapshotKeys, const TString& tierName) { const auto& indexInfo = snapshotSchema.GetIndexInfo(); - Meta = {}; Meta.FirstPkColumn = indexInfo.GetPKFirstColumnId(); Meta.FillBatchInfo(primaryKeys, snapshotKeys, indexInfo); Meta.SetTierName(tierName); diff --git a/ydb/core/tx/columnshard/engines/reader/plain_reader/interval.cpp b/ydb/core/tx/columnshard/engines/reader/plain_reader/interval.cpp index 4d48a70563..25dde64378 100644 --- a/ydb/core/tx/columnshard/engines/reader/plain_reader/interval.cpp +++ b/ydb/core/tx/columnshard/engines/reader/plain_reader/interval.cpp @@ -50,7 +50,7 @@ protected: Y_ABORT_UNLESS(ResultBatch->schema()->Equals(Context->GetResultSchema())); } if (MergingContext->GetIncludeFinish() && originalSourcesCount == 1) { - Y_ABORT_UNLESS(merger->IsEmpty()); + AFL_VERIFY(merger->IsEmpty())("merging_context_finish", MergingContext->GetFinish().DebugJson().GetStringRobust())("merger", merger->DebugString()); } } else { auto rbBuilder = std::make_shared<NIndexedReader::TRecordBatchBuilder>(Context->GetResultFields()); diff --git a/ydb/core/tx/columnshard/engines/reader/read_filter_merger.h b/ydb/core/tx/columnshard/engines/reader/read_filter_merger.h index 4361e68dd3..69e3ccb20a 100644 --- a/ydb/core/tx/columnshard/engines/reader/read_filter_merger.h +++ b/ydb/core/tx/columnshard/engines/reader/read_filter_merger.h @@ -332,6 +332,10 @@ public: return SortHeap.Size(); } + TString DebugString() const { + return TStringBuilder() << "sort_heap=" << SortHeap.DebugJson(); + } + void PutControlPoint(std::shared_ptr<TSortableBatchPosition> point); void RemoveControlPoint(); diff --git a/ydb/core/tx/columnshard/engines/scheme/abstract_scheme.cpp b/ydb/core/tx/columnshard/engines/scheme/abstract_scheme.cpp index 519a467e7d..8b16dda038 100644 --- a/ydb/core/tx/columnshard/engines/scheme/abstract_scheme.cpp +++ b/ydb/core/tx/columnshard/engines/scheme/abstract_scheme.cpp @@ -15,6 +15,15 @@ std::shared_ptr<arrow::Field> ISnapshotSchema::GetFieldByColumnId(const ui32 col return GetFieldByIndex(GetFieldIndex(columnId)); } +std::set<ui32> ISnapshotSchema::GetPkColumnsIds() const { + std::set<ui32> result; + for (auto&& field : GetSchema()->fields()) { + result.emplace(GetColumnId(field->name())); + } + return result; + +} + std::shared_ptr<arrow::RecordBatch> ISnapshotSchema::NormalizeBatch(const ISnapshotSchema& dataSchema, const std::shared_ptr<arrow::RecordBatch> batch) const { if (dataSchema.GetSnapshot() == GetSnapshot()) { return batch; diff --git a/ydb/core/tx/columnshard/engines/scheme/abstract_scheme.h b/ydb/core/tx/columnshard/engines/scheme/abstract_scheme.h index 1c954ecb7e..ab1c4eabf5 100644 --- a/ydb/core/tx/columnshard/engines/scheme/abstract_scheme.h +++ b/ydb/core/tx/columnshard/engines/scheme/abstract_scheme.h @@ -53,6 +53,8 @@ public: virtual ui64 GetVersion() const = 0; virtual ui32 GetColumnsCount() const = 0; + std::set<ui32> GetPkColumnsIds() const; + std::shared_ptr<arrow::RecordBatch> NormalizeBatch(const ISnapshotSchema& dataSchema, const std::shared_ptr<arrow::RecordBatch> batch) const; std::shared_ptr<arrow::RecordBatch> PrepareForInsert(const TString& data, const std::shared_ptr<arrow::Schema>& dataSchema) const; }; diff --git a/ydb/core/tx/columnshard/normalizer/portion/CMakeLists.darwin-x86_64.txt b/ydb/core/tx/columnshard/normalizer/portion/CMakeLists.darwin-x86_64.txt index da21c485e5..ec45390bce 100644 --- a/ydb/core/tx/columnshard/normalizer/portion/CMakeLists.darwin-x86_64.txt +++ b/ydb/core/tx/columnshard/normalizer/portion/CMakeLists.darwin-x86_64.txt @@ -18,4 +18,6 @@ target_link_libraries(columnshard-normalizer-portion PUBLIC ) target_sources(columnshard-normalizer-portion PRIVATE ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/normalizer/portion/normalizer.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/normalizer/portion/min_max.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/normalizer/portion/chunks.cpp ) diff --git a/ydb/core/tx/columnshard/normalizer/portion/CMakeLists.linux-aarch64.txt b/ydb/core/tx/columnshard/normalizer/portion/CMakeLists.linux-aarch64.txt index 10efe432e2..63547312b6 100644 --- a/ydb/core/tx/columnshard/normalizer/portion/CMakeLists.linux-aarch64.txt +++ b/ydb/core/tx/columnshard/normalizer/portion/CMakeLists.linux-aarch64.txt @@ -19,4 +19,6 @@ target_link_libraries(columnshard-normalizer-portion PUBLIC ) target_sources(columnshard-normalizer-portion PRIVATE ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/normalizer/portion/normalizer.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/normalizer/portion/min_max.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/normalizer/portion/chunks.cpp ) diff --git a/ydb/core/tx/columnshard/normalizer/portion/CMakeLists.linux-x86_64.txt b/ydb/core/tx/columnshard/normalizer/portion/CMakeLists.linux-x86_64.txt index 10efe432e2..63547312b6 100644 --- a/ydb/core/tx/columnshard/normalizer/portion/CMakeLists.linux-x86_64.txt +++ b/ydb/core/tx/columnshard/normalizer/portion/CMakeLists.linux-x86_64.txt @@ -19,4 +19,6 @@ target_link_libraries(columnshard-normalizer-portion PUBLIC ) target_sources(columnshard-normalizer-portion PRIVATE ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/normalizer/portion/normalizer.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/normalizer/portion/min_max.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/normalizer/portion/chunks.cpp ) diff --git a/ydb/core/tx/columnshard/normalizer/portion/CMakeLists.windows-x86_64.txt b/ydb/core/tx/columnshard/normalizer/portion/CMakeLists.windows-x86_64.txt index da21c485e5..ec45390bce 100644 --- a/ydb/core/tx/columnshard/normalizer/portion/CMakeLists.windows-x86_64.txt +++ b/ydb/core/tx/columnshard/normalizer/portion/CMakeLists.windows-x86_64.txt @@ -18,4 +18,6 @@ target_link_libraries(columnshard-normalizer-portion PUBLIC ) target_sources(columnshard-normalizer-portion PRIVATE ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/normalizer/portion/normalizer.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/normalizer/portion/min_max.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/normalizer/portion/chunks.cpp ) diff --git a/ydb/core/tx/columnshard/normalizer/portion/chunks.cpp b/ydb/core/tx/columnshard/normalizer/portion/chunks.cpp new file mode 100644 index 0000000000..0e053489bf --- /dev/null +++ b/ydb/core/tx/columnshard/normalizer/portion/chunks.cpp @@ -0,0 +1,159 @@ +#include "chunks.h" +#include "normalizer.h" + +#include <ydb/core/tx/columnshard/engines/portions/portion_info.h> +#include <ydb/core/tx/columnshard/tables_manager.h> +#include <ydb/core/formats/arrow/size_calcer.h> + + +namespace NKikimr::NOlap { + +class TChunksNormalizer::TNormalizerResult : public INormalizerChanges { + std::vector<TChunksNormalizer::TChunkInfo> Chunks; +public: + TNormalizerResult(std::vector<TChunksNormalizer::TChunkInfo>&& chunks) + : Chunks(std::move(chunks)) + {} + + bool Apply(NTabletFlatExecutor::TTransactionContext& txc, const TNormalizationController& /* normController */) const override { + using namespace NColumnShard; + NIceDb::TNiceDb db(txc.DB); + + for (auto&& chunkInfo : Chunks) { + + NKikimrTxColumnShard::TIndexColumnMeta metaProto = chunkInfo.GetMetaProto(); + metaProto.SetNumRows(chunkInfo.GetUpdate().GetNumRows()); + metaProto.SetRawBytes(chunkInfo.GetUpdate().GetRawBytes()); + + const auto& key = chunkInfo.GetKey(); + + db.Table<Schema::IndexColumns>().Key(key.GetIndex(), key.GetGranule(), key.GetColumnIdx(), + key.GetPlanStep(), key.GetTxId(), key.GetPortion(), key.GetChunk()).Update( + NIceDb::TUpdate<Schema::IndexColumns::Metadata>(metaProto.SerializeAsString()) + ); + } + return true; + } +}; + +class TRowsAndBytesChangesTask: public NConveyor::ITask { +public: + using TDataContainer = std::vector<TChunksNormalizer::TChunkInfo>; +private: + THashMap<NKikimr::NOlap::TBlobRange, TString> Blobs; + std::vector<TChunksNormalizer::TChunkInfo> Chunks; + TNormalizationContext NormContext; +protected: + virtual bool DoExecute() override { + for (auto&& chunkInfo : Chunks) { + const auto& blobRange = chunkInfo.GetBlobRange(); + + auto blobIt = Blobs.find(blobRange); + Y_ABORT_UNLESS(blobIt != Blobs.end()); + + auto columnLoader = chunkInfo.GetLoader(); + Y_ABORT_UNLESS(!!columnLoader); + + TPortionInfo::TAssembleBlobInfo assembleBlob(blobIt->second); + auto batch = assembleBlob.BuildRecordBatch(*columnLoader); + Y_ABORT_UNLESS(!!batch); + + chunkInfo.MutableUpdate().SetNumRows(batch->num_rows()); + chunkInfo.MutableUpdate().SetRawBytes(NArrow::GetBatchDataSize(batch)); + } + + auto changes = std::make_shared<TChunksNormalizer::TNormalizerResult>(std::move(Chunks)); + TActorContext::AsActorContext().Send(NormContext.GetColumnshardActor(), std::make_unique<NColumnShard::TEvPrivate::TEvNormalizerResult>(changes)); + return true; + } + +public: + TRowsAndBytesChangesTask(THashMap<NKikimr::NOlap::TBlobRange, TString>&& blobs, const TNormalizationContext& nCtx, std::vector<TChunksNormalizer::TChunkInfo>&& chunks, THashMap<ui64, ISnapshotSchema::TPtr>&&) + : Blobs(std::move(blobs)) + , Chunks(std::move(chunks)) + , NormContext(nCtx) + {} + + virtual TString GetTaskClassIdentifier() const override { + const static TString name = "TRowsAndBytesChangesTask"; + return name; + } + + static void FillBlobRanges(std::shared_ptr<IBlobsReadingAction> readAction, const TChunksNormalizer::TChunkInfo& chunk) { + readAction->AddRange(chunk.GetBlobRange()); + } + + static ui64 GetMemSize(const TChunksNormalizer::TChunkInfo&) { + return 10 * 1024 * 1024; + } +}; + +void TChunksNormalizer::TChunkInfo::InitSchema(const NColumnShard::TTablesManager& tm) { + Schema = tm.GetPrimaryIndexSafe().GetVersionedIndex().GetSchema(NOlap::TSnapshot(Key.GetPlanStep(), Key.GetTxId())); +} + +TConclusion<std::vector<INormalizerTask::TPtr>> TChunksNormalizer::Init(const TNormalizationController& controller, NTabletFlatExecutor::TTransactionContext& txc) { + using namespace NColumnShard; + NIceDb::TNiceDb db(txc.DB); + + bool ready = true; + ready = ready & Schema::Precharge<Schema::IndexColumns>(db, txc.DB.GetScheme()); + if (!ready) { + return TConclusionStatus::Fail("Not ready"); + } + + std::vector<TChunkInfo> chunks; + { + auto rowset = db.Table<Schema::IndexColumns>().Select(); + if (!rowset.IsReady()) { + return TConclusionStatus::Fail("Not ready"); + } + + while (!rowset.EndOfSet()) { + TKey key; + key.Load(rowset); + + TChunkInfo chunkInfo(std::move(key), rowset, &DsGroupSelector); + if (chunkInfo.NormalizationRequired()) { + chunks.emplace_back(std::move(chunkInfo)); + } + + if (!rowset.Next()) { + return TConclusionStatus::Fail("Not ready"); + } + } + } + + std::vector<INormalizerTask::TPtr> tasks; + ACFL_INFO("normalizer", "TChunksNormalizer")("message", TStringBuilder() << chunks.size() << " chunks found"); + if (chunks.empty()) { + return tasks; + } + + TTablesManager tablesManager(controller.GetStoragesManager(), 0); + if (!tablesManager.InitFromDB(db)) { + ACFL_ERROR("normalizer", "TChunksNormalizer")("error", "can't initialize tables manager"); + return TConclusionStatus::Fail("Can't load index"); + } + + std::vector<TChunkInfo> package; + package.reserve(100); + + for (auto&& chunk : chunks) { + chunk.InitSchema(tablesManager); + package.emplace_back(chunk); + if (package.size() == 100) { + std::vector<TChunkInfo> local; + local.swap(package); + tasks.emplace_back(std::make_shared<TPortionsNormalizerTask<TRowsAndBytesChangesTask>>(std::move(local))); + } + } + + if (package.size() > 0) { + tasks.emplace_back(std::make_shared<TPortionsNormalizerTask<TRowsAndBytesChangesTask>>(std::move(package))); + } + AtomicSet(ActiveTasksCount, tasks.size()); + return tasks; +} + +} diff --git a/ydb/core/tx/columnshard/normalizer/portion/chunks.h b/ydb/core/tx/columnshard/normalizer/portion/chunks.h new file mode 100644 index 0000000000..eb558674d9 --- /dev/null +++ b/ydb/core/tx/columnshard/normalizer/portion/chunks.h @@ -0,0 +1,110 @@ +#pragma once + +#include <ydb/core/tx/columnshard/normalizer/abstract/abstract.h> +#include <ydb/core/tx/columnshard/columnshard_schema.h> + +#include <ydb/core/tx/columnshard/defs.h> + + +namespace NKikimr::NColumnShard { + class TTablesManager; +} + +namespace NKikimr::NOlap { + + class TChunksNormalizer : public INormalizerComponent { + public: + class TNormalizerResult; + + class TKey { + YDB_READONLY(ui64, Index, 0); + YDB_READONLY(ui64, Granule, 0); + YDB_READONLY(ui64, ColumnIdx, 0); + YDB_READONLY(ui64, PlanStep, 0); + YDB_READONLY(ui64, TxId, 0); + YDB_READONLY(ui64, Portion, 0); + YDB_READONLY(ui64, Chunk, 0); + + public: + template <class TRowset> + void Load(TRowset& rowset) { + using namespace NColumnShard; + Index = rowset.template GetValue<Schema::IndexColumns::Index>(); + Granule = rowset.template GetValue<Schema::IndexColumns::Granule>(); + ColumnIdx = rowset.template GetValue<Schema::IndexColumns::ColumnIdx>(); + PlanStep = rowset.template GetValue<Schema::IndexColumns::PlanStep>(); + TxId = rowset.template GetValue<Schema::IndexColumns::TxId>(); + Portion = rowset.template GetValue<Schema::IndexColumns::Portion>(); + Chunk = rowset.template GetValue<Schema::IndexColumns::Chunk>(); + } + + bool operator<(const TKey& other) const { + return std::make_tuple(Portion, Chunk, ColumnIdx) < std::make_tuple(other.Portion, other.Chunk, other.ColumnIdx); + } + }; + + class TUpdate { + YDB_ACCESSOR(ui64, NumRows, 0); + YDB_ACCESSOR(ui64, RawBytes, 0); + }; + + class TChunkInfo { + YDB_READONLY_DEF(TKey, Key); + TColumnChunkLoadContext CLContext; + ISnapshotSchema::TPtr Schema; + + YDB_ACCESSOR_DEF(TUpdate, Update); + public: + template <class TSource> + TChunkInfo(TKey&& key, const TSource& rowset, const IBlobGroupSelector* dsGroupSelector) + : Key(std::move(key)) + , CLContext(rowset, dsGroupSelector) + {} + + const TBlobRange& GetBlobRange() const { + return CLContext.GetBlobRange(); + } + + const NKikimrTxColumnShard::TIndexColumnMeta& GetMetaProto() const { + return CLContext.GetMetaProto(); + } + + bool NormalizationRequired() const { + return !CLContext.GetMetaProto().HasNumRows() || !CLContext.GetMetaProto().HasRawBytes(); + } + + std::shared_ptr<TColumnLoader> GetLoader() const { + return Schema->GetColumnLoader(Key.GetColumnIdx()); + } + void InitSchema(const NColumnShard::TTablesManager& tm); + + bool operator<(const TChunkInfo& other) const { + return Key < other.Key; + } + }; + + public: + TChunksNormalizer(TTabletStorageInfo* info) + : DsGroupSelector(info) + {} + + virtual const TString& GetName() const override { + const static TString name = "TChunksNormalizer"; + return name; + } + + virtual bool WaitResult() const override { + return AtomicGet(ActiveTasksCount) > 0; + } + + void OnResultReady() override { + AtomicDecrement(ActiveTasksCount); + } + + virtual TConclusion<std::vector<INormalizerTask::TPtr>> Init(const TNormalizationController& controller, NTabletFlatExecutor::TTransactionContext& txc) override; + + private: + NColumnShard::TBlobGroupSelector DsGroupSelector; + TAtomic ActiveTasksCount = 0; + }; +} diff --git a/ydb/core/tx/columnshard/normalizer/portion/min_max.cpp b/ydb/core/tx/columnshard/normalizer/portion/min_max.cpp new file mode 100644 index 0000000000..b367c77a57 --- /dev/null +++ b/ydb/core/tx/columnshard/normalizer/portion/min_max.cpp @@ -0,0 +1,222 @@ +#include "min_max.h" +#include "normalizer.h" + +#include <ydb/core/tx/columnshard/engines/portions/portion_info.h> +#include <ydb/core/tx/columnshard/tables_manager.h> +#include <ydb/core/tx/columnshard/engines/scheme/filtered_scheme.h> + +#include <ydb/core/formats/arrow/arrow_helpers.h> + + +namespace NKikimr::NOlap { + +class TMinMaxSnapshotChangesTask: public NConveyor::ITask { +public: + using TDataContainer = std::vector<std::shared_ptr<TPortionInfo>>; +private: + THashMap<NKikimr::NOlap::TBlobRange, TString> Blobs; + TDataContainer Portions; + THashMap<ui64, ISnapshotSchema::TPtr> Schemas; + TNormalizationContext NormContext; +protected: + virtual bool DoExecute() override { + Y_ABORT_UNLESS(!Schemas.empty()); + auto pkColumnIds = Schemas.begin()->second->GetPkColumnsIds(); + pkColumnIds.insert(TIndexInfo::GetSpecialColumnIds().begin(), TIndexInfo::GetSpecialColumnIds().end()); + + for (auto&& portionInfo : Portions) { + auto blobSchema = Schemas.FindPtr(portionInfo->GetPortionId()); + THashMap<TBlobRange, TPortionInfo::TAssembleBlobInfo> blobsDataAssemble; + for (auto&& i : portionInfo->Records) { + auto blobIt = Blobs.find(i.BlobRange); + Y_ABORT_UNLESS(blobIt != Blobs.end()); + blobsDataAssemble.emplace(i.BlobRange, blobIt->second); + } + + AFL_VERIFY(!!blobSchema)("details", portionInfo->DebugString()); + auto filteredSchema = std::make_shared<TFilteredSnapshotSchema>(*blobSchema, pkColumnIds); + auto preparedBatch = portionInfo->PrepareForAssemble(**blobSchema, *filteredSchema, blobsDataAssemble); + auto batch = preparedBatch.Assemble(); + Y_ABORT_UNLESS(!!batch); + portionInfo->AddMetadata(**blobSchema, batch, portionInfo->GetMeta().GetTierName()); + } + + auto changes = std::make_shared<TPortionsNormalizer::TNormalizerResult>(std::move(Portions)); + TActorContext::AsActorContext().Send(NormContext.GetColumnshardActor(), std::make_unique<NColumnShard::TEvPrivate::TEvNormalizerResult>(changes)); + return true; + } + +public: + TMinMaxSnapshotChangesTask(THashMap<NKikimr::NOlap::TBlobRange, TString>&& blobs, const TNormalizationContext& nCtx, TDataContainer&& portions, THashMap<ui64, ISnapshotSchema::TPtr>&& schemas) + : Blobs(std::move(blobs)) + , Portions(std::move(portions)) + , Schemas(std::move(schemas)) + , NormContext(nCtx) + {} + + virtual TString GetTaskClassIdentifier() const override { + const static TString name = "TMinMaxSnapshotChangesTask"; + return name; + } + + static void FillBlobRanges(std::shared_ptr<IBlobsReadingAction> readAction, const std::shared_ptr<TPortionInfo>& portion) { + for (auto&& chunk : portion->Records) { + readAction->AddRange(chunk.BlobRange); + } + } + + static ui64 GetMemSize(const std::shared_ptr<TPortionInfo>& portion) { + return portion->GetRawBytes(); + } + + static bool CheckPortion(const TPortionInfo& portionInfo) { + if (!portionInfo.GetMeta().HasPrimaryKeyBorders() || !portionInfo.GetMeta().HasSnapshotMinMax()) { + return false; + } + return true; + } + + static std::set<ui32> GetColumnsFilter(const ISnapshotSchema::TPtr& schema) { + return schema->GetPkColumnsIds(); + } +}; + + +class TPortionsNormalizer::TNormalizerResult : public INormalizerChanges { + TMinMaxSnapshotChangesTask::TDataContainer Portions; +public: + TNormalizerResult(TMinMaxSnapshotChangesTask::TDataContainer&& portions) + : Portions(std::move(portions)) + {} + + bool Apply(NTabletFlatExecutor::TTransactionContext& txc, const TNormalizationController& /* normController */) const override { + using namespace NColumnShard; + NIceDb::TNiceDb db(txc.DB); + + for (auto&& portionInfo : Portions) { + for (auto&& chunk : portionInfo->Records) { + auto proto = portionInfo->GetMeta().SerializeToProto(chunk.ColumnId, chunk.Chunk); + if (!proto) { + continue; + } + auto rowProto = chunk.GetMeta().SerializeToProto(); + *rowProto.MutablePortionMeta() = std::move(*proto); + + db.Table<Schema::IndexColumns>().Key(0, portionInfo->GetDeprecatedGranuleId(), chunk.ColumnId, + portionInfo->GetMinSnapshot().GetPlanStep(), portionInfo->GetMinSnapshot().GetTxId(), portionInfo->GetPortion(), chunk.Chunk).Update( + NIceDb::TUpdate<Schema::IndexColumns::Metadata>(rowProto.SerializeAsString()) + ); + } + } + return true; + } +}; + +TConclusion<std::vector<INormalizerTask::TPtr>> TPortionsNormalizer::Init(const TNormalizationController& controller, NTabletFlatExecutor::TTransactionContext& txc) { + using namespace NColumnShard; + std::vector<INormalizerTask::TPtr> tasks; + + NIceDb::TNiceDb db(txc.DB); + + bool ready = true; + ready = ready & Schema::Precharge<Schema::IndexColumns>(db, txc.DB.GetScheme()); + if (!ready) { + return TConclusionStatus::Fail("Not ready"); + } + + TTablesManager tablesManager(controller.GetStoragesManager(), 0); + if (!tablesManager.InitFromDB(db)) { + ACFL_ERROR("normalizer", "TPortionsNormalizer")("error", "can't initialize tables manager"); + return TConclusionStatus::Fail("Can't load index"); + } + + if (!tablesManager.HasPrimaryIndex()) { + return tasks; + } + + THashMap<ui64, std::shared_ptr<TPortionInfo>> portions; + THashMap<ui64, ISnapshotSchema::TPtr> schemas; + auto pkColumnIds = TMinMaxSnapshotChangesTask::GetColumnsFilter(tablesManager.GetPrimaryIndexSafe().GetVersionedIndex().GetLastSchema()); + + { + auto rowset = db.Table<Schema::IndexColumns>().Select(); + if (!rowset.IsReady()) { + return TConclusionStatus::Fail("Not ready"); + } + + TSnapshot lastSnapshot(0, 0); + ISnapshotSchema::TPtr currentSchema; + auto initPortionCB = [&](const TPortionInfo& portion, const TColumnChunkLoadContext& loadContext) { + if (!currentSchema || lastSnapshot != portion.GetMinSnapshot()) { + currentSchema = tablesManager.GetPrimaryIndexSafe().GetVersionedIndex().GetSchema(portion.GetMinSnapshot()); + lastSnapshot = portion.GetMinSnapshot(); + } + + AFL_VERIFY(portion.ValidSnapshotInfo())("details", portion.DebugString()); + TColumnRecord rec(loadContext, currentSchema->GetIndexInfo()); + if (!pkColumnIds.contains(rec.ColumnId)) { + return; + } + auto it = portions.find(portion.GetPortion()); + auto portionMeta = loadContext.GetPortionMeta(); + if (it == portions.end()) { + Y_ABORT_UNLESS(portion.Records.empty()); + schemas[portion.GetPortionId()] = currentSchema; + auto portionNew = std::make_shared<TPortionInfo>(portion); + portionNew->AddRecord(currentSchema->GetIndexInfo(), rec, portionMeta); + it = portions.emplace(portion.GetPortion(), portionNew).first; + } else { + AFL_VERIFY(it->second->IsEqualWithSnapshots(portion))("self", it->second->DebugString())("item", portion.DebugString()); + it->second->AddRecord(currentSchema->GetIndexInfo(), rec, portionMeta); + } + + }; + + while (!rowset.EndOfSet()) { + TPortionInfo portion = TPortionInfo::BuildEmpty(); + auto index = rowset.GetValue<Schema::IndexColumns::Index>(); + Y_ABORT_UNLESS(index == 0); + + portion.SetPathId(rowset.GetValue<Schema::IndexColumns::PathId>()); + + portion.SetMinSnapshot(rowset.GetValue<Schema::IndexColumns::PlanStep>(), rowset.GetValue<Schema::IndexColumns::TxId>()); + portion.SetPortion(rowset.GetValue<Schema::IndexColumns::Portion>()); + portion.SetDeprecatedGranuleId(rowset.GetValue<Schema::IndexColumns::Granule>()); + portion.SetRemoveSnapshot(rowset.GetValue<Schema::IndexColumns::XPlanStep>(), rowset.GetValue<Schema::IndexColumns::XTxId>()); + + NOlap::TColumnChunkLoadContext chunkLoadContext(rowset, &DsGroupSelector); + initPortionCB(portion, chunkLoadContext); + + if (!rowset.Next()) { + return TConclusionStatus::Fail("Not ready"); + } + } + } + + std::vector<std::shared_ptr<TPortionInfo>> package; + package.reserve(100); + + ui64 brokenPortioncCount = 0; + for (auto&& portion : portions) { + if (TMinMaxSnapshotChangesTask::CheckPortion(*portion.second)) { + continue; + } + ++brokenPortioncCount; + package.emplace_back(portion.second); + if (package.size() == 100) { + std::vector<std::shared_ptr<TPortionInfo>> local; + local.swap(package); + tasks.emplace_back(std::make_shared<TPortionsNormalizerTask<TMinMaxSnapshotChangesTask>>(std::move(local), schemas)); + } + } + + if (package.size() > 0) { + tasks.emplace_back(std::make_shared<TPortionsNormalizerTask<TMinMaxSnapshotChangesTask>>(std::move(package), schemas)); + } + + AtomicSet(ActiveTasksCount, tasks.size()); + ACFL_INFO("normalizer", "TPortionsNormalizer")("message", TStringBuilder() << brokenPortioncCount << " portions found"); + return tasks; +} + +} diff --git a/ydb/core/tx/columnshard/normalizer/portion/min_max.h b/ydb/core/tx/columnshard/normalizer/portion/min_max.h new file mode 100644 index 0000000000..267a379e2b --- /dev/null +++ b/ydb/core/tx/columnshard/normalizer/portion/min_max.h @@ -0,0 +1,44 @@ +#pragma once + +#include <ydb/core/tx/columnshard/normalizer/abstract/abstract.h> +#include <ydb/core/tx/columnshard/columnshard_schema.h> + +#include <ydb/core/tx/columnshard/defs.h> + + +namespace NKikimr::NColumnShard { + class TTablesManager; +} + +namespace NKikimr::NOlap { + +class TPortionsNormalizer : public INormalizerComponent { +public: + class TNormalizerResult; + +public: + TPortionsNormalizer(TTabletStorageInfo* info) + : DsGroupSelector(info) + {} + + virtual const TString& GetName() const override { + const static TString name = "TPortionsNormalizer"; + return name; + } + + virtual bool WaitResult() const override { + return AtomicGet(ActiveTasksCount) > 0; + } + + void OnResultReady() override { + AtomicDecrement(ActiveTasksCount); + } + + virtual TConclusion<std::vector<INormalizerTask::TPtr>> Init(const TNormalizationController& controller, NTabletFlatExecutor::TTransactionContext& txc) override; + +private: + NColumnShard::TBlobGroupSelector DsGroupSelector; + TAtomic ActiveTasksCount = 0; +}; + +} diff --git a/ydb/core/tx/columnshard/normalizer/portion/normalizer.cpp b/ydb/core/tx/columnshard/normalizer/portion/normalizer.cpp index 8a76e4442c..0d02541c9a 100644 --- a/ydb/core/tx/columnshard/normalizer/portion/normalizer.cpp +++ b/ydb/core/tx/columnshard/normalizer/portion/normalizer.cpp @@ -1,206 +1,4 @@ #include "normalizer.h" - -#include <ydb/core/tx/conveyor/usage/abstract.h> -#include <ydb/core/tx/conveyor/usage/service.h> - -#include <ydb/core/tx/columnshard/blobs_reader/task.h> -#include <ydb/core/tx/columnshard/engines/portions/portion_info.h> -#include <ydb/core/tx/columnshard/tables_manager.h> -#include <ydb/core/tx/columnshard/columnshard_private_events.h> - -#include <ydb/core/formats/arrow/size_calcer.h> - - namespace NKikimr::NOlap { - -class TChunksNormalizer::TNormalizerResult : public INormalizerChanges { - std::vector<TChunksNormalizer::TChunkInfo> Chunks; -public: - TNormalizerResult(std::vector<TChunksNormalizer::TChunkInfo>&& chunks) - : Chunks(std::move(chunks)) - {} - - bool Apply(NTabletFlatExecutor::TTransactionContext& txc, const TNormalizationController& /* normController */) const override { - using namespace NColumnShard; - NIceDb::TNiceDb db(txc.DB); - - for (auto&& chunkInfo : Chunks) { - - NKikimrTxColumnShard::TIndexColumnMeta metaProto = chunkInfo.GetMetaProto(); - metaProto.SetNumRows(chunkInfo.GetUpdate().GetNumRows()); - metaProto.SetRawBytes(chunkInfo.GetUpdate().GetRawBytes()); - - const auto& key = chunkInfo.GetKey(); - - db.Table<Schema::IndexColumns>().Key(key.GetIndex(), key.GetGranule(), key.GetColumnIdx(), - key.GetPlanStep(), key.GetTxId(), key.GetPortion(), key.GetChunk()).Update( - NIceDb::TUpdate<Schema::IndexColumns::Metadata>(metaProto.SerializeAsString()) - ); - } - return true; - } -}; - -class TNormalizationChangesTask: public NConveyor::ITask { -private: - THashMap<NKikimr::NOlap::TBlobRange, TString> Blobs; - std::vector<TChunksNormalizer::TChunkInfo> Chunks; - TNormalizationContext NormContext; -protected: - virtual bool DoExecute() override { - for (auto&& chunkInfo : Chunks) { - const auto& blobRange = chunkInfo.GetBlobRange(); - - auto blobIt = Blobs.find(blobRange); - Y_ABORT_UNLESS(blobIt != Blobs.end()); - - auto columnLoader = chunkInfo.GetLoader(); - Y_ABORT_UNLESS(!!columnLoader); - - TPortionInfo::TAssembleBlobInfo assembleBlob(blobIt->second); - auto batch = assembleBlob.BuildRecordBatch(*columnLoader); - Y_ABORT_UNLESS(!!batch); - - chunkInfo.MutableUpdate().SetNumRows(batch->num_rows()); - chunkInfo.MutableUpdate().SetRawBytes(NArrow::GetBatchDataSize(batch)); - } - - auto changes = std::make_shared<TChunksNormalizer::TNormalizerResult>(std::move(Chunks)); - TActorContext::AsActorContext().Send(NormContext.GetColumnshardActor(), std::make_unique<NColumnShard::TEvPrivate::TEvNormalizerResult>(changes)); - return true; - } - -public: - TNormalizationChangesTask(THashMap<NKikimr::NOlap::TBlobRange, TString>&& blobs, const TNormalizationContext& nCtx, std::vector<TChunksNormalizer::TChunkInfo>&& chunks) - : Blobs(std::move(blobs)) - , Chunks(std::move(chunks)) - , NormContext(nCtx) - {} - - virtual TString GetTaskClassIdentifier() const override { - const static TString name = "TNormalizationChangesTask"; - return name; - } -}; - -class TReadPortionsTask: public NOlap::NBlobOperations::NRead::ITask { - private: - using TBase = NOlap::NBlobOperations::NRead::ITask; - std::vector<TChunksNormalizer::TChunkInfo> Chunks; - TNormalizationContext NormContext; - - public: - TReadPortionsTask(const TNormalizationContext& nCtx, const std::vector<std::shared_ptr<IBlobsReadingAction>>& actions, std::vector<TChunksNormalizer::TChunkInfo>&& chunks) - : TBase(actions, "CS::NORMALIZER") - , Chunks(std::move(chunks)) - , NormContext(nCtx) - { - } - - protected: - virtual void DoOnDataReady(const std::shared_ptr<NOlap::NResourceBroker::NSubscribe::TResourcesGuard>& resourcesGuard) override { - Y_UNUSED(resourcesGuard); - std::shared_ptr<NConveyor::ITask> task = std::make_shared<TNormalizationChangesTask>(std::move(ExtractBlobsData()), NormContext, std::move(Chunks)); - NConveyor::TCompServiceOperator::SendTaskToExecute(task); - } - - virtual bool DoOnError(const TBlobRange& range, const IBlobsReadingAction::TErrorStatus& status) override { - Y_UNUSED(status, range); - return false; - } - - public: - using TBase::TBase; -}; - -class TChunksNormalizerTask : public INormalizerTask { - std::vector<TChunksNormalizer::TChunkInfo> Package; -public: - TChunksNormalizerTask(std::vector<TChunksNormalizer::TChunkInfo>&& package) - : Package(std::move(package)) - {} - - void Start(const TNormalizationController& controller, const TNormalizationContext& nCtx) override { - controller.GetCounters().CountObjects(Package.size()); - auto readingAction = controller.GetStoragesManager()->GetInsertOperator()->StartReadingAction("CS::NORMALIZER"); - for (auto&& chunk : Package) { - readingAction->AddRange(chunk.GetBlobRange()); - } - std::vector<std::shared_ptr<IBlobsReadingAction>> actions = {readingAction}; - ui64 memSize = 0; - NOlap::NResourceBroker::NSubscribe::ITask::StartResourceSubscription( - nCtx.GetResourceSubscribeActor(),std::make_shared<NOlap::NBlobOperations::NRead::ITask::TReadSubscriber>( - std::make_shared<TReadPortionsTask>( nCtx, actions, std::move(Package) ), 1, memSize, "CS::NORMALIZER", controller.GetTaskSubscription())); - } -}; - -void TChunksNormalizer::TChunkInfo::InitSchema(const NColumnShard::TTablesManager& tm) { - Schema = tm.GetPrimaryIndexSafe().GetVersionedIndex().GetSchema(NOlap::TSnapshot(Key.GetPlanStep(), Key.GetTxId())); -} - -TConclusion<std::vector<INormalizerTask::TPtr>> TChunksNormalizer::Init(const TNormalizationController& controller, NTabletFlatExecutor::TTransactionContext& txc) { - using namespace NColumnShard; - NIceDb::TNiceDb db(txc.DB); - - bool ready = true; - ready = ready & Schema::Precharge<Schema::IndexColumns>(db, txc.DB.GetScheme()); - if (!ready) { - return TConclusionStatus::Fail("Not ready"); - } - - std::vector<TChunkInfo> chunks; - { - auto rowset = db.Table<Schema::IndexColumns>().Select(); - if (!rowset.IsReady()) { - return TConclusionStatus::Fail("Not ready"); - } - - while (!rowset.EndOfSet()) { - TKey key; - key.Load(rowset); - - TChunkInfo chunkInfo(std::move(key), rowset, &DsGroupSelector); - if (chunkInfo.NormalizationRequired()) { - chunks.emplace_back(std::move(chunkInfo)); - } - - if (!rowset.Next()) { - return TConclusionStatus::Fail("Not ready"); - } - } - } - - std::vector<INormalizerTask::TPtr> tasks; - ACFL_INFO("normalizer", "TChunksNormalizer")("message", TStringBuilder() << chunks.size() << " chunks found"); - if (chunks.empty()) { - return tasks; - } - - TTablesManager tablesManager(controller.GetStoragesManager(), 0); - if (!tablesManager.InitFromDB(db)) { - ACFL_ERROR("step", "TChunksNormalizer.StartNormalizer")("error", "can't initialize tables manager"); - return TConclusionStatus::Fail("Can't load index"); - } - - std::vector<TChunkInfo> package; - package.reserve(100); - - for (auto&& chunk : chunks) { - chunk.InitSchema(tablesManager); - package.emplace_back(chunk); - if (package.size() == 100) { - std::vector<TChunkInfo> local; - local.swap(package); - tasks.emplace_back(std::make_shared<TChunksNormalizerTask>(std::move(local))); - } - } - - if (package.size() > 0) { - tasks.emplace_back(std::make_shared<TChunksNormalizerTask>(std::move(package))); - } - AtomicSet(ActiveTasksCount, tasks.size()); - return tasks; -} - } diff --git a/ydb/core/tx/columnshard/normalizer/portion/normalizer.h b/ydb/core/tx/columnshard/normalizer/portion/normalizer.h index d2748c0bf2..5cb952f68a 100644 --- a/ydb/core/tx/columnshard/normalizer/portion/normalizer.h +++ b/ydb/core/tx/columnshard/normalizer/portion/normalizer.h @@ -1,110 +1,77 @@ #pragma once #include <ydb/core/tx/columnshard/normalizer/abstract/abstract.h> -#include <ydb/core/tx/columnshard/columnshard_schema.h> +#include <ydb/core/tx/columnshard/engines/scheme/abstract_scheme.h> -#include <ydb/core/tx/columnshard/defs.h> +#include <ydb/core/tx/conveyor/usage/abstract.h> +#include <ydb/core/tx/conveyor/usage/service.h> +#include <ydb/core/tx/columnshard/blobs_reader/task.h> +#include <ydb/core/tx/columnshard/columnshard_private_events.h> +#include <ydb/core/tx/columnshard/defs.h> -namespace NKikimr::NColumnShard { - class TTablesManager; -} namespace NKikimr::NOlap { - class TChunksNormalizer : public INormalizerComponent { - public: - class TNormalizerResult; - - class TKey { - YDB_READONLY(ui64, Index, 0); - YDB_READONLY(ui64, Granule, 0); - YDB_READONLY(ui64, ColumnIdx, 0); - YDB_READONLY(ui64, PlanStep, 0); - YDB_READONLY(ui64, TxId, 0); - YDB_READONLY(ui64, Portion, 0); - YDB_READONLY(ui64, Chunk, 0); - - public: - template <class TRowset> - void Load(TRowset& rowset) { - using namespace NColumnShard; - Index = rowset.template GetValue<Schema::IndexColumns::Index>(); - Granule = rowset.template GetValue<Schema::IndexColumns::Granule>(); - ColumnIdx = rowset.template GetValue<Schema::IndexColumns::ColumnIdx>(); - PlanStep = rowset.template GetValue<Schema::IndexColumns::PlanStep>(); - TxId = rowset.template GetValue<Schema::IndexColumns::TxId>(); - Portion = rowset.template GetValue<Schema::IndexColumns::Portion>(); - Chunk = rowset.template GetValue<Schema::IndexColumns::Chunk>(); - } - - bool operator<(const TKey& other) const { - return std::make_tuple(Portion, Chunk, ColumnIdx) < std::make_tuple(other.Portion, other.Chunk, other.ColumnIdx); - } - }; - - class TUpdate { - YDB_ACCESSOR(ui64, NumRows, 0); - YDB_ACCESSOR(ui64, RawBytes, 0); - }; - - class TChunkInfo { - YDB_READONLY_DEF(TKey, Key); - TColumnChunkLoadContext CLContext; - ISnapshotSchema::TPtr Schema; - - YDB_ACCESSOR_DEF(TUpdate, Update); - public: - template <class TSource> - TChunkInfo(TKey&& key, const TSource& rowset, const IBlobGroupSelector* dsGroupSelector) - : Key(std::move(key)) - , CLContext(rowset, dsGroupSelector) - {} - - const TBlobRange& GetBlobRange() const { - return CLContext.GetBlobRange(); - } - - const NKikimrTxColumnShard::TIndexColumnMeta& GetMetaProto() const { - return CLContext.GetMetaProto(); - } - - bool NormalizationRequired() const { - return !CLContext.GetMetaProto().HasNumRows() || !CLContext.GetMetaProto().HasRawBytes(); - } - - std::shared_ptr<TColumnLoader> GetLoader() const { - return Schema->GetColumnLoader(Key.GetColumnIdx()); - } - void InitSchema(const NColumnShard::TTablesManager& tm); - - bool operator<(const TChunkInfo& other) const { - return Key < other.Key; - } - }; - - public: - TChunksNormalizer(TTabletStorageInfo* info) - : DsGroupSelector(info) - {} - - virtual const TString& GetName() const override { - const static TString name = "TChunksNormalizer"; - return name; +template <class TConveyorTask> +class TReadPortionsTask: public NOlap::NBlobOperations::NRead::ITask { +private: + using TBase = NOlap::NBlobOperations::NRead::ITask; + typename TConveyorTask::TDataContainer Data; + THashMap<ui64, ISnapshotSchema::TPtr> Schemas; + TNormalizationContext NormContext; + +public: + TReadPortionsTask(const TNormalizationContext& nCtx, const std::vector<std::shared_ptr<IBlobsReadingAction>>& actions, typename TConveyorTask::TDataContainer&& data, THashMap<ui64, ISnapshotSchema::TPtr>&& schemas) + : TBase(actions, "CS::NORMALIZER") + , Data(std::move(data)) + , Schemas(std::move(schemas)) + , NormContext(nCtx) + { + } + +protected: + virtual void DoOnDataReady(const std::shared_ptr<NOlap::NResourceBroker::NSubscribe::TResourcesGuard>& resourcesGuard) override { + Y_UNUSED(resourcesGuard); + std::shared_ptr<NConveyor::ITask> task = std::make_shared<TConveyorTask>(std::move(ExtractBlobsData()), NormContext, std::move(Data), std::move(Schemas)); + NConveyor::TCompServiceOperator::SendTaskToExecute(task); + } + + virtual bool DoOnError(const TBlobRange& range, const IBlobsReadingAction::TErrorStatus& status) override { + Y_UNUSED(status, range); + return false; + } + +public: + using TBase::TBase; +}; + +template <class TConveyorTask> +class TPortionsNormalizerTask : public INormalizerTask { + typename TConveyorTask::TDataContainer Package; + THashMap<ui64, ISnapshotSchema::TPtr> Schemas; +public: + TPortionsNormalizerTask(typename TConveyorTask::TDataContainer&& package) + : Package(std::move(package)) + {} + + TPortionsNormalizerTask(typename TConveyorTask::TDataContainer&& package, const THashMap<ui64, ISnapshotSchema::TPtr>& schemas) + : Package(std::move(package)) + , Schemas(schemas) + {} + + void Start(const TNormalizationController& controller, const TNormalizationContext& nCtx) override { + controller.GetCounters().CountObjects(Package.size()); + auto readingAction = controller.GetStoragesManager()->GetInsertOperator()->StartReadingAction("CS::NORMALIZER"); + ui64 memSize = 0; + for (auto&& data : Package) { + TConveyorTask::FillBlobRanges(readingAction, data); + memSize += TConveyorTask::GetMemSize(data); } - - virtual bool WaitResult() const override { - return AtomicGet(ActiveTasksCount) > 0; - } - - void OnResultReady() override { - AtomicDecrement(ActiveTasksCount); - } - - virtual TConclusion<std::vector<INormalizerTask::TPtr>> Init(const TNormalizationController& controller, NTabletFlatExecutor::TTransactionContext& txc) override; - - private: - NColumnShard::TBlobGroupSelector DsGroupSelector; - TAtomic ActiveTasksCount = 0; - }; + std::vector<std::shared_ptr<IBlobsReadingAction>> actions = {readingAction}; + NOlap::NResourceBroker::NSubscribe::ITask::StartResourceSubscription( + nCtx.GetResourceSubscribeActor(),std::make_shared<NOlap::NBlobOperations::NRead::ITask::TReadSubscriber>( + std::make_shared<TReadPortionsTask<TConveyorTask>>( nCtx, actions, std::move(Package), std::move(Schemas) ), 1, memSize, "CS::NORMALIZER", controller.GetTaskSubscription())); + } +}; } diff --git a/ydb/core/tx/columnshard/normalizer/portion/ya.make b/ydb/core/tx/columnshard/normalizer/portion/ya.make index 0f3a2ffc81..2dd2475a19 100644 --- a/ydb/core/tx/columnshard/normalizer/portion/ya.make +++ b/ydb/core/tx/columnshard/normalizer/portion/ya.make @@ -2,6 +2,8 @@ LIBRARY() SRCS( normalizer.cpp + min_max.cpp + chunks.cpp ) PEERDIR( diff --git a/ydb/core/tx/columnshard/ut_rw/ut_normalizer.cpp b/ydb/core/tx/columnshard/ut_rw/ut_normalizer.cpp index 725469ebce..4be1264fdb 100644 --- a/ydb/core/tx/columnshard/ut_rw/ut_normalizer.cpp +++ b/ydb/core/tx/columnshard/ut_rw/ut_normalizer.cpp @@ -150,6 +150,61 @@ public: } }; +class TMinMaxCleaner : public NYDBTest::ILocalDBModifier { +public: + virtual void Apply(NTabletFlatExecutor::TTransactionContext& txc) const override { + using namespace NColumnShard; + NIceDb::TNiceDb db(txc.DB); + + std::vector<TPortionRecord> portion2Key; + std::optional<ui64> pathId; + { + auto rowset = db.Table<Schema::IndexColumns>().Select(); + UNIT_ASSERT(rowset.IsReady()); + + while (!rowset.EndOfSet()) { + TPortionRecord key; + key.Index = rowset.GetValue<Schema::IndexColumns::Index>(); + key.Granule = rowset.GetValue<Schema::IndexColumns::Granule>(); + key.ColumnIdx = rowset.GetValue<Schema::IndexColumns::ColumnIdx>(); + key.PlanStep = rowset.GetValue<Schema::IndexColumns::PlanStep>(); + key.TxId = rowset.GetValue<Schema::IndexColumns::TxId>(); + key.Portion = rowset.GetValue<Schema::IndexColumns::Portion>(); + key.Chunk = rowset.GetValue<Schema::IndexColumns::Chunk>(); + + key.XPlanStep = rowset.GetValue<Schema::IndexColumns::XPlanStep>(); + key.XTxId = rowset.GetValue<Schema::IndexColumns::XTxId>(); + key.Blob = rowset.GetValue<Schema::IndexColumns::Blob>(); + key.Metadata = rowset.GetValue<Schema::IndexColumns::Metadata>(); + key.Offset = rowset.GetValue<Schema::IndexColumns::Offset>(); + key.Size = rowset.GetValue<Schema::IndexColumns::Size>(); + + pathId = rowset.GetValue<Schema::IndexColumns::PathId>(); + + portion2Key.emplace_back(std::move(key)); + + UNIT_ASSERT(rowset.Next()); + } + } + + UNIT_ASSERT(pathId.has_value()); + + for (auto&& key: portion2Key) { + NKikimrTxColumnShard::TIndexColumnMeta metaProto; + UNIT_ASSERT(metaProto.ParseFromArray(key.Metadata.data(), key.Metadata.size())); + if (metaProto.HasPortionMeta()) { + metaProto.MutablePortionMeta()->ClearRecordSnapshotMax(); + metaProto.MutablePortionMeta()->ClearRecordSnapshotMin(); + } + + db.Table<Schema::IndexColumns>().Key(key.Index, key.Granule, key.ColumnIdx, + key.PlanStep, key.TxId, key.Portion, key.Chunk).Update( + NIceDb::TUpdate<Schema::IndexColumns::Metadata>(metaProto.SerializeAsString()) + ); + } + } +}; + template <class TLocalDBModifier> class TPrepareLocalDBController: public NKikimr::NYDBTest::NColumnShard::TController { private: @@ -172,17 +227,19 @@ Y_UNIT_TEST_SUITE(Normalizers) { const ui64 tableId = 1; const std::vector<std::pair<TString, TTypeInfo>> schema = { - {"key", TTypeInfo(NTypeIds::Uint64) }, + {"key1", TTypeInfo(NTypeIds::Uint64) }, + {"key2", TTypeInfo(NTypeIds::Uint64) }, {"field", TTypeInfo(NTypeIds::Utf8) } }; - PrepareTablet(runtime, tableId, schema); + PrepareTablet(runtime, tableId, schema, 2); const ui64 txId = 111; - NConstruction::IArrayBuilder::TPtr keyColumn = std::make_shared<NConstruction::TSimpleArrayConstructor<NConstruction::TIntSeqFiller<arrow::UInt64Type>>>("key"); + NConstruction::IArrayBuilder::TPtr key1Column = std::make_shared<NConstruction::TSimpleArrayConstructor<NConstruction::TIntSeqFiller<arrow::UInt64Type>>>("key1"); + NConstruction::IArrayBuilder::TPtr key2Column = std::make_shared<NConstruction::TSimpleArrayConstructor<NConstruction::TIntSeqFiller<arrow::UInt64Type>>>("key2"); NConstruction::IArrayBuilder::TPtr column = std::make_shared<NConstruction::TSimpleArrayConstructor<NConstruction::TStringPoolFiller>>( "field", NConstruction::TStringPoolFiller(8, 100)); - auto batch = NConstruction::TRecordBatchConstructor({ keyColumn, column }).BuildBatch(2048); + auto batch = NConstruction::TRecordBatchConstructor({ key1Column, key2Column, column }).BuildBatch(2048); TString blobData = NArrow::SerializeBatchNoCompression(batch); auto evWrite = std::make_unique<NKikimr::NEvents::TDataEvents::TEvWrite>(txId); @@ -219,6 +276,10 @@ Y_UNIT_TEST_SUITE(Normalizers) { Y_UNIT_TEST(ColumnChunkNormalizer) { TestNormalizerImpl<TColumnChunksCleaner>(); } + + Y_UNIT_TEST(MinMaxNormalizer) { + TestNormalizerImpl<TMinMaxCleaner>(); + } } } // namespace NKikimr |