diff options
author | nsofya <nsofya@ydb.tech> | 2023-10-25 16:26:49 +0300 |
---|---|---|
committer | nsofya <nsofya@ydb.tech> | 2023-10-25 16:47:50 +0300 |
commit | 5e9517e09bd2bb7fd6a0daf598c081125a85e98d (patch) | |
tree | 6ced059184e4fd254e661f8b764aadc4cd756454 | |
parent | 509f4bccedd15798058fffbba552dfe3c76c4899 (diff) | |
download | ydb-5e9517e09bd2bb7fd6a0daf598c081125a85e98d.tar.gz |
KIKIMR-19807: PathId normalization
38 files changed, 758 insertions, 112 deletions
diff --git a/.mapping.json b/.mapping.json index 31e35d3301..b61d37e62f 100644 --- a/.mapping.json +++ b/.mapping.json @@ -5445,6 +5445,17 @@ "ydb/core/tx/columnshard/hooks/testing/CMakeLists.linux-x86_64.txt":"", "ydb/core/tx/columnshard/hooks/testing/CMakeLists.txt":"", "ydb/core/tx/columnshard/hooks/testing/CMakeLists.windows-x86_64.txt":"", + "ydb/core/tx/columnshard/normalizer/CMakeLists.txt":"", + "ydb/core/tx/columnshard/normalizer/abstract/CMakeLists.darwin-x86_64.txt":"", + "ydb/core/tx/columnshard/normalizer/abstract/CMakeLists.linux-aarch64.txt":"", + "ydb/core/tx/columnshard/normalizer/abstract/CMakeLists.linux-x86_64.txt":"", + "ydb/core/tx/columnshard/normalizer/abstract/CMakeLists.txt":"", + "ydb/core/tx/columnshard/normalizer/abstract/CMakeLists.windows-x86_64.txt":"", + "ydb/core/tx/columnshard/normalizer/granule/CMakeLists.darwin-x86_64.txt":"", + "ydb/core/tx/columnshard/normalizer/granule/CMakeLists.linux-aarch64.txt":"", + "ydb/core/tx/columnshard/normalizer/granule/CMakeLists.linux-x86_64.txt":"", + "ydb/core/tx/columnshard/normalizer/granule/CMakeLists.txt":"", + "ydb/core/tx/columnshard/normalizer/granule/CMakeLists.windows-x86_64.txt":"", "ydb/core/tx/columnshard/operations/CMakeLists.darwin-x86_64.txt":"", "ydb/core/tx/columnshard/operations/CMakeLists.linux-aarch64.txt":"", "ydb/core/tx/columnshard/operations/CMakeLists.linux-x86_64.txt":"", diff --git a/ydb/core/tx/columnshard/CMakeLists.darwin-x86_64.txt b/ydb/core/tx/columnshard/CMakeLists.darwin-x86_64.txt index 9f1deb1cd4..98156163d9 100644 --- a/ydb/core/tx/columnshard/CMakeLists.darwin-x86_64.txt +++ b/ydb/core/tx/columnshard/CMakeLists.darwin-x86_64.txt @@ -12,6 +12,7 @@ add_subdirectory(common) add_subdirectory(counters) add_subdirectory(engines) add_subdirectory(hooks) +add_subdirectory(normalizer) add_subdirectory(operations) add_subdirectory(resource_subscriber) add_subdirectory(resources) @@ -57,6 +58,7 @@ target_link_libraries(core-tx-columnshard PUBLIC tx-columnshard-blobs_reader tx-columnshard-blobs_action tx-columnshard-resource_subscriber + columnshard-normalizer-granule core-tx-tiering tx-conveyor-usage tx-long_tx_service-public diff --git a/ydb/core/tx/columnshard/CMakeLists.linux-aarch64.txt b/ydb/core/tx/columnshard/CMakeLists.linux-aarch64.txt index 26c0b204e9..e10d15fc23 100644 --- a/ydb/core/tx/columnshard/CMakeLists.linux-aarch64.txt +++ b/ydb/core/tx/columnshard/CMakeLists.linux-aarch64.txt @@ -12,6 +12,7 @@ add_subdirectory(common) add_subdirectory(counters) add_subdirectory(engines) add_subdirectory(hooks) +add_subdirectory(normalizer) add_subdirectory(operations) add_subdirectory(resource_subscriber) add_subdirectory(resources) @@ -58,6 +59,7 @@ target_link_libraries(core-tx-columnshard PUBLIC tx-columnshard-blobs_reader tx-columnshard-blobs_action tx-columnshard-resource_subscriber + columnshard-normalizer-granule core-tx-tiering tx-conveyor-usage tx-long_tx_service-public diff --git a/ydb/core/tx/columnshard/CMakeLists.linux-x86_64.txt b/ydb/core/tx/columnshard/CMakeLists.linux-x86_64.txt index 26c0b204e9..e10d15fc23 100644 --- a/ydb/core/tx/columnshard/CMakeLists.linux-x86_64.txt +++ b/ydb/core/tx/columnshard/CMakeLists.linux-x86_64.txt @@ -12,6 +12,7 @@ add_subdirectory(common) add_subdirectory(counters) add_subdirectory(engines) add_subdirectory(hooks) +add_subdirectory(normalizer) add_subdirectory(operations) add_subdirectory(resource_subscriber) add_subdirectory(resources) @@ -58,6 +59,7 @@ target_link_libraries(core-tx-columnshard PUBLIC tx-columnshard-blobs_reader tx-columnshard-blobs_action tx-columnshard-resource_subscriber + columnshard-normalizer-granule core-tx-tiering tx-conveyor-usage tx-long_tx_service-public diff --git a/ydb/core/tx/columnshard/CMakeLists.windows-x86_64.txt b/ydb/core/tx/columnshard/CMakeLists.windows-x86_64.txt index 8c97dad14a..47fb7a3a91 100644 --- a/ydb/core/tx/columnshard/CMakeLists.windows-x86_64.txt +++ b/ydb/core/tx/columnshard/CMakeLists.windows-x86_64.txt @@ -12,6 +12,7 @@ add_subdirectory(common) add_subdirectory(counters) add_subdirectory(engines) add_subdirectory(hooks) +add_subdirectory(normalizer) add_subdirectory(operations) add_subdirectory(resource_subscriber) add_subdirectory(resources) @@ -58,6 +59,7 @@ target_link_libraries(core-tx-columnshard PUBLIC tx-columnshard-blobs_reader tx-columnshard-blobs_action tx-columnshard-resource_subscriber + columnshard-normalizer-granule core-tx-tiering tx-conveyor-usage tx-long_tx_service-public diff --git a/ydb/core/tx/columnshard/blobs_reader/task.cpp b/ydb/core/tx/columnshard/blobs_reader/task.cpp index bbe9b80824..cfe95e0314 100644 --- a/ydb/core/tx/columnshard/blobs_reader/task.cpp +++ b/ydb/core/tx/columnshard/blobs_reader/task.cpp @@ -122,7 +122,7 @@ void ITask::OnDataReady() { } bool ITask::OnError(const TBlobRange& range, const IBlobsReadingAction::TErrorStatus& status) { - ACFL_DEBUG("event", "OnError")("task", DebugString()); + ACFL_DEBUG("event", "OnError")("status", status.GetStatus())("task", DebugString()); return DoOnError(range, status); } diff --git a/ydb/core/tx/columnshard/columnshard.cpp b/ydb/core/tx/columnshard/columnshard.cpp index 365d8e6d02..14a6d3f2dc 100644 --- a/ydb/core/tx/columnshard/columnshard.cpp +++ b/ydb/core/tx/columnshard/columnshard.cpp @@ -13,8 +13,7 @@ IActor* CreateColumnShard(const TActorId& tablet, TTabletStorageInfo* info) { namespace NKikimr::NColumnShard { -void TColumnShard::CleanupActors(const TActorContext& ctx) -{ +void TColumnShard::CleanupActors(const TActorContext& ctx) { ctx.Send(ResourceSubscribeActor, new TEvents::TEvPoisonPill); StoragesManager->Stop(); if (Tiers) { @@ -22,8 +21,7 @@ void TColumnShard::CleanupActors(const TActorContext& ctx) } } -void TColumnShard::BecomeBroken(const TActorContext& ctx) -{ +void TColumnShard::BecomeBroken(const TActorContext& ctx) { Become(&TThis::StateBroken); ctx.Send(Tablet(), new TEvents::TEvPoisonPill); CleanupActors(ctx); @@ -41,6 +39,12 @@ void TColumnShard::SwitchToWork(const TActorContext& ctx) { Become(&TThis::StateWork); SignalTabletActive(ctx); + AFL_INFO(NKikimrServices::TX_COLUMNSHARD)("event", "initialize_shard")("step", "SignalTabletActive"); + TryRegisterMediatorTimeCast(); + // Trigger progress: planned or outdated tx + EnqueueProgressTx(ctx); + EnqueueBackgroundActivities(); + ctx.Schedule(ActivationPeriod, new TEvPrivate::TEvPeriodicWakeup()); } void TColumnShard::OnActivateExecutor(const TActorContext& ctx) { diff --git a/ydb/core/tx/columnshard/columnshard__init.cpp b/ydb/core/tx/columnshard/columnshard__init.cpp index d7fc92abe3..649304e61c 100644 --- a/ydb/core/tx/columnshard/columnshard__init.cpp +++ b/ydb/core/tx/columnshard/columnshard__init.cpp @@ -2,6 +2,7 @@ #include "columnshard_ttl.h" #include "columnshard_private_events.h" #include "columnshard_schema.h" +#include "hooks/abstract/abstract.h" #include <ydb/core/tx/columnshard/blobs_action/blob_manager_db.h> #include <ydb/core/tablet/tablet_exception.h> @@ -11,9 +12,7 @@ namespace NKikimr::NColumnShard { using namespace NTabletFlatExecutor; -// TTxInit => SwitchToWork -/// Load data from local database class TTxInit : public TTransactionBase<TColumnShard> { public: TTxInit(TColumnShard* self) @@ -204,21 +203,9 @@ bool TTxInit::Execute(TTransactionContext& txc, const TActorContext& ctx) { } void TTxInit::Complete(const TActorContext& ctx) { - LOG_S_DEBUG("TTxInit.Complete at tablet " << Self->TabletID()); Self->SwitchToWork(ctx); - Self->TryRegisterMediatorTimeCast(); - - // Trigger progress: planned or outdated tx - Self->EnqueueProgressTx(ctx); - Self->EnqueueBackgroundActivities(); - - // Start periodic wakeups - ctx.Schedule(Self->ActivationPeriod, new TEvPrivate::TEvPeriodicWakeup()); } -// TTxUpdateSchema => TTxInit - -/// Update local database on tablet start class TTxUpdateSchema : public TTransactionBase<TColumnShard> { public: TTxUpdateSchema(TColumnShard* self) @@ -228,21 +215,48 @@ public: bool Execute(TTransactionContext& txc, const TActorContext& ctx) override; void Complete(const TActorContext& ctx) override; TTxType GetTxType() const override { return TXTYPE_UPDATE_SCHEMA; } + +private: + bool WaitNormalization = false; }; bool TTxUpdateSchema::Execute(TTransactionContext& txc, const TActorContext&) { - Y_UNUSED(txc); - LOG_S_DEBUG("TTxUpdateSchema.Execute at tablet " << Self->TabletID()); + NActors::TLogContextGuard gLogging = NActors::TLogContextBuilder::Build(NKikimrServices::TX_COLUMNSHARD)("tablet_id", Self->TabletID())("event", "initialize_shard"); + ACFL_INFO("step", "TTxUpdateSchema.Execute_Start"); + + for (auto&& normalizer : Self->Normalizers) { + if (!normalizer->NormalizationRequired()) { + ACFL_INFO("step", "TTxUpdateSchema.Execute")("skip_normalizer", normalizer->GetName()); + continue; + } + + ACFL_INFO("step", "TTxUpdateSchema.Execute")("start_normalizer", normalizer->GetName()); + NOlap::TNormalizationContext nCtx(Self->InsertTaskSubscription); + auto status = normalizer->NormalizeData(nCtx, txc); + + if (status == NOlap::ENormalizerResult::Failed) { + ACFL_INFO("step", "TTxUpdateSchema.Execute")("fail_normalizer", normalizer->GetName()); + return false; + } else if (status == NOlap::ENormalizerResult::Wait) { + ACFL_INFO("step", "TTxUpdateSchema.Execute")("start_normalizer", normalizer->GetName()); + WaitNormalization = true; + } else if (status == NOlap::ENormalizerResult::Skip) { + ACFL_INFO("step", "TTxUpdateSchema.Execute")("pass_normalizer", normalizer->GetName()); + } else { + ACFL_INFO("step", "TTxUpdateSchema.Execute")("finish_normalizer", normalizer->GetName()); + } + } + ACFL_INFO("step", "TTxUpdateSchema.Execute_Finish"); return true; } void TTxUpdateSchema::Complete(const TActorContext& ctx) { - LOG_S_DEBUG("TTxUpdateSchema.Complete at tablet " << Self->TabletID()); - Self->Execute(new TTxInit(Self), ctx); + ACFL_INFO("step", "TTxUpdateSchema.Complete"); + if (!WaitNormalization) { + Self->Execute(new TTxInit(Self), ctx); + } } -// TTxInitSchema => TTxUpdateSchema - /// Create local database on tablet start if none class TTxInitSchema : public TTransactionBase<TColumnShard> { public: @@ -258,13 +272,18 @@ public: bool TTxInitSchema::Execute(TTransactionContext& txc, const TActorContext&) { LOG_S_DEBUG("TxInitSchema.Execute at tablet " << Self->TabletID()); - bool isCreate = txc.DB.GetScheme().IsEmpty(); + const bool isFirstRun = txc.DB.GetScheme().IsEmpty(); NIceDb::TNiceDb(txc.DB).Materialize<Schema>(); - if (isCreate) { + if (isFirstRun) { txc.DB.Alter().SetExecutorAllowLogBatching(gAllowLogBatchingDefaultValue); txc.DB.Alter().SetExecutorLogFlushPeriod(TDuration::MicroSeconds(500)); txc.DB.Alter().SetExecutorCacheSize(500000); + } else { + auto localBaseModifier = NYDBTest::TControllers::GetColumnShardController()->BuildLocalBaseModifier(); + if (localBaseModifier) { + localBaseModifier->Apply(txc); + } } // Enable compression for the SmallBlobs table diff --git a/ydb/core/tx/columnshard/columnshard_impl.cpp b/ydb/core/tx/columnshard/columnshard_impl.cpp index 3173bebe08..86908da2fb 100644 --- a/ydb/core/tx/columnshard/columnshard_impl.cpp +++ b/ydb/core/tx/columnshard/columnshard_impl.cpp @@ -25,6 +25,9 @@ #include "resource_subscriber/counters.h" #include "blobs_reader/actor.h" +#include <ydb/core/tx/columnshard/normalizer/granule/normalizer.h> +// #include <ydb/core/tx/columnshard/normalizer/portion/normalizer.h> + namespace NKikimr::NColumnShard { // NOTE: We really want to batch log records by default in columnshards! @@ -174,6 +177,10 @@ TColumnShard::TColumnShard(TTabletStorageInfo* info, const TActorId& tablet) ETxTypes_descriptor >()); TabletCounters = TabletCountersPtr.get(); + + Normalizers.emplace_back(std::make_shared<NOlap::TGranulesNormalizer>()); + // Normalizers.emplace_back(std::make_shared<NOlap::TPortionsNormalizer>(Info())); + } void TColumnShard::OnDetach(const TActorContext& ctx) { diff --git a/ydb/core/tx/columnshard/columnshard_impl.h b/ydb/core/tx/columnshard/columnshard_impl.h index c683b0a5c2..40bb27584c 100644 --- a/ydb/core/tx/columnshard/columnshard_impl.h +++ b/ydb/core/tx/columnshard/columnshard_impl.h @@ -13,6 +13,7 @@ #include "counters/columnshard.h" #include "resource_subscriber/counters.h" #include "resource_subscriber/task.h" +#include "normalizer/abstract/abstract.h" #include <ydb/core/base/tablet_pipecache.h> #include <ydb/core/tablet/tablet_counters.h> @@ -409,6 +410,8 @@ private: const TIndexationCounters IndexationCounters = TIndexationCounters("Indexation"); const TIndexationCounters EvictionCounters = TIndexationCounters("Eviction"); + std::vector<NOlap::INormalizerComponent::TPtr> Normalizers; + const TCSCounters CSCounters; TWritesMonitor WritesMonitor; diff --git a/ydb/core/tx/columnshard/columnshard_ut_common.cpp b/ydb/core/tx/columnshard/columnshard_ut_common.cpp index 53e6aedbfe..c90860471b 100644 --- a/ydb/core/tx/columnshard/columnshard_ut_common.cpp +++ b/ydb/core/tx/columnshard/columnshard_ut_common.cpp @@ -411,4 +411,60 @@ namespace NKikimr::NColumnShard { indexInfo.SetAllKeys(); return indexInfo; } + + void SetupSchema(TTestBasicRuntime& runtime, TActorId& sender, ui64 pathId, + const TestTableDescription& table, TString codec) { + using namespace NTxUT; + NOlap::TSnapshot snap(10, 10); + TString txBody; + auto specials = TTestSchema::TTableSpecials().WithCodec(codec); + if (table.InStore) { + txBody = TTestSchema::CreateTableTxBody(pathId, table.Schema, table.Pk, specials); + } else { + txBody = TTestSchema::CreateStandaloneTableTxBody(pathId, table.Schema, table.Pk, specials); + } + bool ok = ProposeSchemaTx(runtime, sender, txBody, snap); + UNIT_ASSERT(ok); + + PlanSchemaTx(runtime, sender, snap); + } + + void PrepareTablet(TTestBasicRuntime& runtime, const ui64 tableId, const std::vector<std::pair<TString, NScheme::TTypeInfo>>& schema) { + using namespace NTxUT; + CreateTestBootstrapper(runtime, CreateTestTabletInfo(TTestTxConfig::TxTablet0, TTabletTypes::ColumnShard), &CreateColumnShard); + + TDispatchOptions options; + options.FinalEvents.push_back(TDispatchOptions::TFinalEventCondition(TEvTablet::EvBoot)); + runtime.DispatchEvents(options); + + TestTableDescription tableDescription; + tableDescription.Schema = schema; + tableDescription.Pk = { schema[0] }; + TActorId sender = runtime.AllocateEdgeActor(); + SetupSchema(runtime, sender, tableId, tableDescription); + } + + std::shared_ptr<arrow::RecordBatch> ReadAllAsBatch(TTestBasicRuntime& runtime, const ui64 tableId, const NOlap::TSnapshot& snapshot, const std::vector<std::pair<TString, NScheme::TTypeInfo>>& schema) { + using namespace NTxUT; + TActorId sender = runtime.AllocateEdgeActor(); + + ForwardToTablet(runtime, TTestTxConfig::TxTablet0, sender, + new TEvColumnShard::TEvRead(sender, TTestTxConfig::TxTablet1, snapshot.GetPlanStep(), snapshot.GetTxId(), tableId)); + + std::vector<std::shared_ptr<arrow::RecordBatch>> batches; + while(true) { + TAutoPtr<IEventHandle> handle; + auto event = runtime.GrabEdgeEvent<TEvColumnShard::TEvReadResult>(handle); + UNIT_ASSERT(event); + auto b = event->GetArrowBatch(); + if (b) { + batches.push_back(b); + } + if (!event->HasMore()) { + break; + } + } + auto res = NArrow::CombineBatches(batches); + return res ? res : NArrow::MakeEmptyBatch(NArrow::MakeArrowSchema(schema)); + } } diff --git a/ydb/core/tx/columnshard/columnshard_ut_common.h b/ydb/core/tx/columnshard/columnshard_ut_common.h index 344933f78b..517ac70c1d 100644 --- a/ydb/core/tx/columnshard/columnshard_ut_common.h +++ b/ydb/core/tx/columnshard/columnshard_ut_common.h @@ -527,4 +527,41 @@ namespace NKikimr::NColumnShard { NOlap::TIndexInfo BuildTableInfo(const std::vector<std::pair<TString, NScheme::TTypeInfo>>& ydbSchema, const std::vector<std::pair<TString, NScheme::TTypeInfo>>& key); + + + struct TestTableDescription { + std::vector<std::pair<TString, NScheme::TTypeInfo>> Schema = NTxUT::TTestSchema::YdbSchema(); + std::vector<std::pair<TString, NScheme::TTypeInfo>> Pk = NTxUT::TTestSchema::YdbPkSchema(); + bool InStore = true; + }; + + class TArrowDataConstructor : public NKikimr::NEvents::IDataConstructor { + std::vector<std::pair<TString, NScheme::TTypeInfo>> YdbSchema; + ui64 Index; + + public: + TArrowDataConstructor(const std::vector<std::pair<TString, NScheme::TTypeInfo>>& ydbSchema, const ui64 idx) + : YdbSchema(ydbSchema) + , Index(idx) + { + } + + void Serialize(NKikimrDataEvents::TOperationData& proto) const override { + for (ui32 i = 0; i < YdbSchema.size(); ++i) { + proto.AddColumnIds(i + 1); + } + proto.MutableArrowData()->SetPayloadIndex(Index); + } + + ui64 GetSchemaVersion() const override { + return 1; + } + }; + + void SetupSchema(TTestBasicRuntime& runtime, TActorId& sender, ui64 pathId, + const TestTableDescription& table = {}, TString codec = "none"); + + void PrepareTablet(TTestBasicRuntime& runtime, const ui64 tableId, const std::vector<std::pair<TString, NScheme::TTypeInfo>>& schema); + + std::shared_ptr<arrow::RecordBatch> ReadAllAsBatch(TTestBasicRuntime& runtime, const ui64 tableId, const NOlap::TSnapshot& snapshot, const std::vector<std::pair<TString, NScheme::TTypeInfo>>& schema); } diff --git a/ydb/core/tx/columnshard/hooks/abstract/abstract.h b/ydb/core/tx/columnshard/hooks/abstract/abstract.h index 4339a6d1a6..7a40d59aa9 100644 --- a/ydb/core/tx/columnshard/hooks/abstract/abstract.h +++ b/ydb/core/tx/columnshard/hooks/abstract/abstract.h @@ -1,4 +1,7 @@ #pragma once + +#include <ydb/core/tablet_flat/tablet_flat_executor.h> + #include <ydb/services/metadata/abstract/fetcher.h> #include <ydb/core/tx/tiering/snapshot.h> @@ -31,6 +34,15 @@ enum class EOptimizerCompactionWeightControl { Force }; +class ILocalDBModifier { +public: + using TPtr = std::shared_ptr<ILocalDBModifier>; + + virtual ~ILocalDBModifier() {} + + virtual void Apply(NTabletFlatExecutor::TTransactionContext& txc) const = 0; +}; + class ICSController { private: YDB_READONLY(TAtomicCounter, OnSortingPolicyCounter, 0); @@ -89,6 +101,10 @@ public: virtual void OnTieringModified(const std::shared_ptr<NColumnShard::TTiersManager>& /*tiers*/) { } + virtual ILocalDBModifier::TPtr BuildLocalBaseModifier() const { + return nullptr; + } + virtual NMetadata::NFetcher::ISnapshot::TPtr GetFallbackTiersSnapshot() const { static std::shared_ptr<NColumnShard::NTiers::TConfigsSnapshot> result = std::make_shared<NColumnShard::NTiers::TConfigsSnapshot>(TInstant::Now()); return result; diff --git a/ydb/core/tx/columnshard/normalizer/CMakeLists.txt b/ydb/core/tx/columnshard/normalizer/CMakeLists.txt new file mode 100644 index 0000000000..7475f3a995 --- /dev/null +++ b/ydb/core/tx/columnshard/normalizer/CMakeLists.txt @@ -0,0 +1,10 @@ + +# This file was generated by the build system used internally in the Yandex monorepo. +# Only simple modifications are allowed (adding source-files to targets, adding simple properties +# like target_include_directories). These modifications will be ported to original +# ya.make files by maintainers. Any complex modifications which can't be ported back to the +# original buildsystem will not be accepted. + + +add_subdirectory(abstract) +add_subdirectory(granule) diff --git a/ydb/core/tx/columnshard/normalizer/abstract/CMakeLists.darwin-x86_64.txt b/ydb/core/tx/columnshard/normalizer/abstract/CMakeLists.darwin-x86_64.txt new file mode 100644 index 0000000000..4a11c1f061 --- /dev/null +++ b/ydb/core/tx/columnshard/normalizer/abstract/CMakeLists.darwin-x86_64.txt @@ -0,0 +1,20 @@ + +# This file was generated by the build system used internally in the Yandex monorepo. +# Only simple modifications are allowed (adding source-files to targets, adding simple properties +# like target_include_directories). These modifications will be ported to original +# ya.make files by maintainers. Any complex modifications which can't be ported back to the +# original buildsystem will not be accepted. + + + +add_library(columnshard-normalizer-abstract) +target_link_libraries(columnshard-normalizer-abstract PUBLIC + contrib-libs-cxxsupp + yutil + ydb-core-tablet_flat + columnshard-blobs_action-abstract + tx-columnshard-resource_subscriber +) +target_sources(columnshard-normalizer-abstract PRIVATE + ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/normalizer/abstract/abstract.cpp +) diff --git a/ydb/core/tx/columnshard/normalizer/abstract/CMakeLists.linux-aarch64.txt b/ydb/core/tx/columnshard/normalizer/abstract/CMakeLists.linux-aarch64.txt new file mode 100644 index 0000000000..6da128405d --- /dev/null +++ b/ydb/core/tx/columnshard/normalizer/abstract/CMakeLists.linux-aarch64.txt @@ -0,0 +1,21 @@ + +# This file was generated by the build system used internally in the Yandex monorepo. +# Only simple modifications are allowed (adding source-files to targets, adding simple properties +# like target_include_directories). These modifications will be ported to original +# ya.make files by maintainers. Any complex modifications which can't be ported back to the +# original buildsystem will not be accepted. + + + +add_library(columnshard-normalizer-abstract) +target_link_libraries(columnshard-normalizer-abstract PUBLIC + contrib-libs-linux-headers + contrib-libs-cxxsupp + yutil + ydb-core-tablet_flat + columnshard-blobs_action-abstract + tx-columnshard-resource_subscriber +) +target_sources(columnshard-normalizer-abstract PRIVATE + ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/normalizer/abstract/abstract.cpp +) diff --git a/ydb/core/tx/columnshard/normalizer/abstract/CMakeLists.linux-x86_64.txt b/ydb/core/tx/columnshard/normalizer/abstract/CMakeLists.linux-x86_64.txt new file mode 100644 index 0000000000..6da128405d --- /dev/null +++ b/ydb/core/tx/columnshard/normalizer/abstract/CMakeLists.linux-x86_64.txt @@ -0,0 +1,21 @@ + +# This file was generated by the build system used internally in the Yandex monorepo. +# Only simple modifications are allowed (adding source-files to targets, adding simple properties +# like target_include_directories). These modifications will be ported to original +# ya.make files by maintainers. Any complex modifications which can't be ported back to the +# original buildsystem will not be accepted. + + + +add_library(columnshard-normalizer-abstract) +target_link_libraries(columnshard-normalizer-abstract PUBLIC + contrib-libs-linux-headers + contrib-libs-cxxsupp + yutil + ydb-core-tablet_flat + columnshard-blobs_action-abstract + tx-columnshard-resource_subscriber +) +target_sources(columnshard-normalizer-abstract PRIVATE + ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/normalizer/abstract/abstract.cpp +) diff --git a/ydb/core/tx/columnshard/normalizer/abstract/CMakeLists.txt b/ydb/core/tx/columnshard/normalizer/abstract/CMakeLists.txt new file mode 100644 index 0000000000..f8b31df0c1 --- /dev/null +++ b/ydb/core/tx/columnshard/normalizer/abstract/CMakeLists.txt @@ -0,0 +1,17 @@ + +# This file was generated by the build system used internally in the Yandex monorepo. +# Only simple modifications are allowed (adding source-files to targets, adding simple properties +# like target_include_directories). These modifications will be ported to original +# ya.make files by maintainers. Any complex modifications which can't be ported back to the +# original buildsystem will not be accepted. + + +if (CMAKE_SYSTEM_NAME STREQUAL "Linux" AND CMAKE_SYSTEM_PROCESSOR STREQUAL "aarch64" AND NOT HAVE_CUDA) + include(CMakeLists.linux-aarch64.txt) +elseif (CMAKE_SYSTEM_NAME STREQUAL "Darwin" AND CMAKE_SYSTEM_PROCESSOR STREQUAL "x86_64") + include(CMakeLists.darwin-x86_64.txt) +elseif (WIN32 AND CMAKE_SYSTEM_PROCESSOR STREQUAL "AMD64" AND NOT HAVE_CUDA) + include(CMakeLists.windows-x86_64.txt) +elseif (CMAKE_SYSTEM_NAME STREQUAL "Linux" AND CMAKE_SYSTEM_PROCESSOR STREQUAL "x86_64" AND NOT HAVE_CUDA) + include(CMakeLists.linux-x86_64.txt) +endif() diff --git a/ydb/core/tx/columnshard/normalizer/abstract/CMakeLists.windows-x86_64.txt b/ydb/core/tx/columnshard/normalizer/abstract/CMakeLists.windows-x86_64.txt new file mode 100644 index 0000000000..4a11c1f061 --- /dev/null +++ b/ydb/core/tx/columnshard/normalizer/abstract/CMakeLists.windows-x86_64.txt @@ -0,0 +1,20 @@ + +# This file was generated by the build system used internally in the Yandex monorepo. +# Only simple modifications are allowed (adding source-files to targets, adding simple properties +# like target_include_directories). These modifications will be ported to original +# ya.make files by maintainers. Any complex modifications which can't be ported back to the +# original buildsystem will not be accepted. + + + +add_library(columnshard-normalizer-abstract) +target_link_libraries(columnshard-normalizer-abstract PUBLIC + contrib-libs-cxxsupp + yutil + ydb-core-tablet_flat + columnshard-blobs_action-abstract + tx-columnshard-resource_subscriber +) +target_sources(columnshard-normalizer-abstract PRIVATE + ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/normalizer/abstract/abstract.cpp +) diff --git a/ydb/core/tx/columnshard/normalizer/abstract/abstract.cpp b/ydb/core/tx/columnshard/normalizer/abstract/abstract.cpp new file mode 100644 index 0000000000..6908736d95 --- /dev/null +++ b/ydb/core/tx/columnshard/normalizer/abstract/abstract.cpp @@ -0,0 +1 @@ +#include "abstract.h" diff --git a/ydb/core/tx/columnshard/normalizer/abstract/abstract.h b/ydb/core/tx/columnshard/normalizer/abstract/abstract.h new file mode 100644 index 0000000000..8e231a6ef1 --- /dev/null +++ b/ydb/core/tx/columnshard/normalizer/abstract/abstract.h @@ -0,0 +1,55 @@ +#pragma once + +#include <ydb/core/tablet_flat/tablet_flat_executor.h> +#include <ydb/library/accessor/accessor.h> + +#include <ydb/core/tx/columnshard/blobs_action/abstract/storages_manager.h> +#include <ydb/core/tx/columnshard/resource_subscriber/task.h> + +namespace NKikimr::NOlap { + + class TNormalizationContext { + YDB_READONLY_DEF(TActorId, ResourceSubscribeActor); + YDB_READONLY_DEF(TActorId, ColumnshardActor); + + const NOlap::NResourceBroker::NSubscribe::TTaskContext& InsertTaskSubscription; + std::shared_ptr<IStoragesManager> StoragesManager; + + public: + TNormalizationContext(const NOlap::NResourceBroker::NSubscribe::TTaskContext& its) + : InsertTaskSubscription(its) + {} + + IStoragesManager& GetStoragesManager() { + AFL_VERIFY(!!StoragesManager); + return *StoragesManager; + } + + const NOlap::NResourceBroker::NSubscribe::TTaskContext& GetInsertTaskSubscription() const { + return InsertTaskSubscription; + } + }; + + enum class ENormalizerResult { + Ok, + Wait, + Failed, + Skip + }; + + class INormalizerComponent { + public: + using TPtr = std::shared_ptr<INormalizerComponent>; + + virtual ~INormalizerComponent() {} + + virtual bool NormalizationRequired() const { + return false; + } + + virtual const TString& GetName() const = 0; + + virtual ENormalizerResult NormalizeData(TNormalizationContext& nCtx, NTabletFlatExecutor::TTransactionContext& txc) = 0; + }; + +} diff --git a/ydb/core/tx/columnshard/normalizer/abstract/ya.make b/ydb/core/tx/columnshard/normalizer/abstract/ya.make new file mode 100644 index 0000000000..48572e5fe0 --- /dev/null +++ b/ydb/core/tx/columnshard/normalizer/abstract/ya.make @@ -0,0 +1,13 @@ +LIBRARY() + +SRCS( + abstract.cpp +) + +PEERDIR( + ydb/core/tablet_flat + ydb/core/tx/columnshard/blobs_action/abstract + ydb/core/tx/columnshard/resource_subscriber +) + +END() diff --git a/ydb/core/tx/columnshard/normalizer/granule/CMakeLists.darwin-x86_64.txt b/ydb/core/tx/columnshard/normalizer/granule/CMakeLists.darwin-x86_64.txt new file mode 100644 index 0000000000..0b6a8a69b4 --- /dev/null +++ b/ydb/core/tx/columnshard/normalizer/granule/CMakeLists.darwin-x86_64.txt @@ -0,0 +1,18 @@ + +# This file was generated by the build system used internally in the Yandex monorepo. +# Only simple modifications are allowed (adding source-files to targets, adding simple properties +# like target_include_directories). These modifications will be ported to original +# ya.make files by maintainers. Any complex modifications which can't be ported back to the +# original buildsystem will not be accepted. + + + +add_library(columnshard-normalizer-granule) +target_link_libraries(columnshard-normalizer-granule PUBLIC + contrib-libs-cxxsupp + yutil + columnshard-normalizer-abstract +) +target_sources(columnshard-normalizer-granule PRIVATE + ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/normalizer/granule/normalizer.cpp +) diff --git a/ydb/core/tx/columnshard/normalizer/granule/CMakeLists.linux-aarch64.txt b/ydb/core/tx/columnshard/normalizer/granule/CMakeLists.linux-aarch64.txt new file mode 100644 index 0000000000..a2d66c12ed --- /dev/null +++ b/ydb/core/tx/columnshard/normalizer/granule/CMakeLists.linux-aarch64.txt @@ -0,0 +1,19 @@ + +# This file was generated by the build system used internally in the Yandex monorepo. +# Only simple modifications are allowed (adding source-files to targets, adding simple properties +# like target_include_directories). These modifications will be ported to original +# ya.make files by maintainers. Any complex modifications which can't be ported back to the +# original buildsystem will not be accepted. + + + +add_library(columnshard-normalizer-granule) +target_link_libraries(columnshard-normalizer-granule PUBLIC + contrib-libs-linux-headers + contrib-libs-cxxsupp + yutil + columnshard-normalizer-abstract +) +target_sources(columnshard-normalizer-granule PRIVATE + ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/normalizer/granule/normalizer.cpp +) diff --git a/ydb/core/tx/columnshard/normalizer/granule/CMakeLists.linux-x86_64.txt b/ydb/core/tx/columnshard/normalizer/granule/CMakeLists.linux-x86_64.txt new file mode 100644 index 0000000000..a2d66c12ed --- /dev/null +++ b/ydb/core/tx/columnshard/normalizer/granule/CMakeLists.linux-x86_64.txt @@ -0,0 +1,19 @@ + +# This file was generated by the build system used internally in the Yandex monorepo. +# Only simple modifications are allowed (adding source-files to targets, adding simple properties +# like target_include_directories). These modifications will be ported to original +# ya.make files by maintainers. Any complex modifications which can't be ported back to the +# original buildsystem will not be accepted. + + + +add_library(columnshard-normalizer-granule) +target_link_libraries(columnshard-normalizer-granule PUBLIC + contrib-libs-linux-headers + contrib-libs-cxxsupp + yutil + columnshard-normalizer-abstract +) +target_sources(columnshard-normalizer-granule PRIVATE + ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/normalizer/granule/normalizer.cpp +) diff --git a/ydb/core/tx/columnshard/normalizer/granule/CMakeLists.txt b/ydb/core/tx/columnshard/normalizer/granule/CMakeLists.txt new file mode 100644 index 0000000000..f8b31df0c1 --- /dev/null +++ b/ydb/core/tx/columnshard/normalizer/granule/CMakeLists.txt @@ -0,0 +1,17 @@ + +# This file was generated by the build system used internally in the Yandex monorepo. +# Only simple modifications are allowed (adding source-files to targets, adding simple properties +# like target_include_directories). These modifications will be ported to original +# ya.make files by maintainers. Any complex modifications which can't be ported back to the +# original buildsystem will not be accepted. + + +if (CMAKE_SYSTEM_NAME STREQUAL "Linux" AND CMAKE_SYSTEM_PROCESSOR STREQUAL "aarch64" AND NOT HAVE_CUDA) + include(CMakeLists.linux-aarch64.txt) +elseif (CMAKE_SYSTEM_NAME STREQUAL "Darwin" AND CMAKE_SYSTEM_PROCESSOR STREQUAL "x86_64") + include(CMakeLists.darwin-x86_64.txt) +elseif (WIN32 AND CMAKE_SYSTEM_PROCESSOR STREQUAL "AMD64" AND NOT HAVE_CUDA) + include(CMakeLists.windows-x86_64.txt) +elseif (CMAKE_SYSTEM_NAME STREQUAL "Linux" AND CMAKE_SYSTEM_PROCESSOR STREQUAL "x86_64" AND NOT HAVE_CUDA) + include(CMakeLists.linux-x86_64.txt) +endif() diff --git a/ydb/core/tx/columnshard/normalizer/granule/CMakeLists.windows-x86_64.txt b/ydb/core/tx/columnshard/normalizer/granule/CMakeLists.windows-x86_64.txt new file mode 100644 index 0000000000..0b6a8a69b4 --- /dev/null +++ b/ydb/core/tx/columnshard/normalizer/granule/CMakeLists.windows-x86_64.txt @@ -0,0 +1,18 @@ + +# This file was generated by the build system used internally in the Yandex monorepo. +# Only simple modifications are allowed (adding source-files to targets, adding simple properties +# like target_include_directories). These modifications will be ported to original +# ya.make files by maintainers. Any complex modifications which can't be ported back to the +# original buildsystem will not be accepted. + + + +add_library(columnshard-normalizer-granule) +target_link_libraries(columnshard-normalizer-granule PUBLIC + contrib-libs-cxxsupp + yutil + columnshard-normalizer-abstract +) +target_sources(columnshard-normalizer-granule PRIVATE + ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/normalizer/granule/normalizer.cpp +) diff --git a/ydb/core/tx/columnshard/normalizer/granule/normalizer.cpp b/ydb/core/tx/columnshard/normalizer/granule/normalizer.cpp new file mode 100644 index 0000000000..3832299efc --- /dev/null +++ b/ydb/core/tx/columnshard/normalizer/granule/normalizer.cpp @@ -0,0 +1 @@ +#include "normalizer.h" diff --git a/ydb/core/tx/columnshard/normalizer/granule/normalizer.h b/ydb/core/tx/columnshard/normalizer/granule/normalizer.h new file mode 100644 index 0000000000..7fa02c8ccb --- /dev/null +++ b/ydb/core/tx/columnshard/normalizer/granule/normalizer.h @@ -0,0 +1,114 @@ +#pragma once + +#include <ydb/core/tx/columnshard/normalizer/abstract/abstract.h> +#include <ydb/core/tx/columnshard/columnshard_schema.h> + + +namespace NKikimr::NOlap { + +class TGranulesNormalizer: public NOlap::INormalizerComponent { + struct TPortionKey { + ui64 Index = 0; + ui64 GranuleId = 0; + ui64 PlanStep = 0; + ui64 TxId = 0; + }; + + struct TUniqueId { + ui64 PortionId = 0; + ui32 Chunk = 0; + ui64 ColumnIdx = 0; + + bool operator<(const TUniqueId& other) const { + return std::make_tuple(PortionId, Chunk, ColumnIdx) < std::make_tuple(other.PortionId, other.Chunk, other.ColumnIdx); + } + }; + +public: + virtual bool NormalizationRequired() const override { + return true; + } + + virtual const TString& GetName() const override { + const static TString name = "TGranulesNormalizer"; + return name; + } + + virtual ENormalizerResult NormalizeData(TNormalizationContext& nCtx, NTabletFlatExecutor::TTransactionContext& txc) override { + Y_UNUSED(nCtx); + + using namespace NColumnShard; + NIceDb::TNiceDb db(txc.DB); + + bool ready = true; + ready = ready & Schema::Precharge<Schema::IndexColumns>(db, txc.DB.GetScheme()); + ready = ready & Schema::Precharge<Schema::IndexGranules>(db, txc.DB.GetScheme()); + if (!ready) { + return ENormalizerResult::Failed; + } + + TMap<TUniqueId, TPortionKey> portion2Key; + { + auto rowset = db.Table<Schema::IndexColumns>().Select(); + if (!rowset.IsReady()) { + return ENormalizerResult::Failed; + } + + while (!rowset.EndOfSet()) { + if (!rowset.HaveValue<Schema::IndexColumns::PathId>()) { + TUniqueId id; + TPortionKey key; + key.PlanStep = rowset.GetValue<Schema::IndexColumns::PlanStep>(); + key.TxId = rowset.GetValue<Schema::IndexColumns::TxId>(); + id.PortionId = rowset.GetValue<Schema::IndexColumns::Portion>(); + key.GranuleId = rowset.GetValue<Schema::IndexColumns::Granule>(); + id.Chunk = rowset.GetValue<Schema::IndexColumns::Chunk>(); + key.Index = rowset.GetValue<Schema::IndexColumns::Index>(); + id.ColumnIdx = rowset.GetValue<Schema::IndexColumns::ColumnIdx>(); + + portion2Key[id] = key; + } + + if (!rowset.Next()) { + return ENormalizerResult::Failed; + } + } + } + ACFL_INFO("normalizer", "TGranulesNormalizer")("message", TStringBuilder() << portion2Key.size() << " portions found"); + + if (portion2Key.empty()) { + return ENormalizerResult::Skip; + } + + THashMap<ui64, ui64> granule2Path; + { + auto rowset = db.Table<Schema::IndexGranules>().Select(); + if (!rowset.IsReady()) { + return ENormalizerResult::Failed; + } + + while (!rowset.EndOfSet()) { + ui64 pathId = rowset.GetValue<Schema::IndexGranules::PathId>(); + ui64 granuleId = rowset.GetValue<Schema::IndexGranules::Granule>(); + Y_ABORT_UNLESS(granuleId != 0); + granule2Path[granuleId] = pathId; + if (!rowset.Next()) { + return ENormalizerResult::Failed; + } + } + } + + for (auto&& [ portionId, key ] : portion2Key) { + auto granuleIt = granule2Path.find(key.GranuleId); + Y_ABORT_UNLESS(granuleIt != granule2Path.end()); + + db.Table<Schema::IndexColumns>().Key(key.Index, key.GranuleId, portionId.ColumnIdx, + key.PlanStep, key.TxId, portionId.PortionId, portionId.Chunk).Update( + NIceDb::TUpdate<Schema::IndexColumns::PathId>(granuleIt->second) + ); + } + return ENormalizerResult::Ok; + } +}; + +} diff --git a/ydb/core/tx/columnshard/normalizer/granule/ya.make b/ydb/core/tx/columnshard/normalizer/granule/ya.make new file mode 100644 index 0000000000..8a8c7e5ab0 --- /dev/null +++ b/ydb/core/tx/columnshard/normalizer/granule/ya.make @@ -0,0 +1,11 @@ +LIBRARY() + +SRCS( + normalizer.cpp +) + +PEERDIR( + ydb/core/tx/columnshard/normalizer/abstract +) + +END() diff --git a/ydb/core/tx/columnshard/ut_rw/CMakeLists.darwin-x86_64.txt b/ydb/core/tx/columnshard/ut_rw/CMakeLists.darwin-x86_64.txt index e51dec1b76..aec0c14a8a 100644 --- a/ydb/core/tx/columnshard/ut_rw/CMakeLists.darwin-x86_64.txt +++ b/ydb/core/tx/columnshard/ut_rw/CMakeLists.darwin-x86_64.txt @@ -40,6 +40,7 @@ target_link_options(ydb-core-tx-columnshard-ut_rw PRIVATE target_sources(ydb-core-tx-columnshard-ut_rw PRIVATE ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/columnshard_ut_common.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/ut_rw/ut_columnshard_read_write.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/ut_rw/ut_normalizer.cpp ) set_property( TARGET diff --git a/ydb/core/tx/columnshard/ut_rw/CMakeLists.linux-aarch64.txt b/ydb/core/tx/columnshard/ut_rw/CMakeLists.linux-aarch64.txt index 2b43f87f55..8524515b05 100644 --- a/ydb/core/tx/columnshard/ut_rw/CMakeLists.linux-aarch64.txt +++ b/ydb/core/tx/columnshard/ut_rw/CMakeLists.linux-aarch64.txt @@ -43,6 +43,7 @@ target_link_options(ydb-core-tx-columnshard-ut_rw PRIVATE target_sources(ydb-core-tx-columnshard-ut_rw PRIVATE ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/columnshard_ut_common.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/ut_rw/ut_columnshard_read_write.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/ut_rw/ut_normalizer.cpp ) set_property( TARGET diff --git a/ydb/core/tx/columnshard/ut_rw/CMakeLists.linux-x86_64.txt b/ydb/core/tx/columnshard/ut_rw/CMakeLists.linux-x86_64.txt index 47af387c56..fb7f19246f 100644 --- a/ydb/core/tx/columnshard/ut_rw/CMakeLists.linux-x86_64.txt +++ b/ydb/core/tx/columnshard/ut_rw/CMakeLists.linux-x86_64.txt @@ -44,6 +44,7 @@ target_link_options(ydb-core-tx-columnshard-ut_rw PRIVATE target_sources(ydb-core-tx-columnshard-ut_rw PRIVATE ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/columnshard_ut_common.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/ut_rw/ut_columnshard_read_write.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/ut_rw/ut_normalizer.cpp ) set_property( TARGET diff --git a/ydb/core/tx/columnshard/ut_rw/CMakeLists.windows-x86_64.txt b/ydb/core/tx/columnshard/ut_rw/CMakeLists.windows-x86_64.txt index 6118745799..2ef0565f74 100644 --- a/ydb/core/tx/columnshard/ut_rw/CMakeLists.windows-x86_64.txt +++ b/ydb/core/tx/columnshard/ut_rw/CMakeLists.windows-x86_64.txt @@ -33,6 +33,7 @@ target_link_libraries(ydb-core-tx-columnshard-ut_rw PUBLIC target_sources(ydb-core-tx-columnshard-ut_rw PRIVATE ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/columnshard_ut_common.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/ut_rw/ut_columnshard_read_write.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/ut_rw/ut_normalizer.cpp ) set_property( TARGET diff --git a/ydb/core/tx/columnshard/ut_rw/ut_columnshard_read_write.cpp b/ydb/core/tx/columnshard/ut_rw/ut_columnshard_read_write.cpp index d952eb745e..1c2b0f39d8 100644 --- a/ydb/core/tx/columnshard/ut_rw/ut_columnshard_read_write.cpp +++ b/ydb/core/tx/columnshard/ut_rw/ut_columnshard_read_write.cpp @@ -326,28 +326,6 @@ bool CheckColumns(const TString& blob, const NKikimrTxColumnShard::TMetadata& me return CheckColumns(batch, colNames, rowsCount); } -struct TestTableDescription { - std::vector<std::pair<TString, TTypeInfo>> Schema = TTestSchema::YdbSchema(); - std::vector<std::pair<TString, TTypeInfo>> Pk = TTestSchema::YdbPkSchema(); - bool InStore = true; -}; - -void SetupSchema(TTestBasicRuntime& runtime, TActorId& sender, ui64 pathId, - const TestTableDescription& table = {}, TString codec = "none") { - NOlap::TSnapshot snap(10, 10); - TString txBody; - auto specials = TTestSchema::TTableSpecials().WithCodec(codec); - if (table.InStore) { - txBody = TTestSchema::CreateTableTxBody(pathId, table.Schema, table.Pk, specials); - } else { - txBody = TTestSchema::CreateStandaloneTableTxBody(pathId, table.Schema, table.Pk, specials); - } - bool ok = ProposeSchemaTx(runtime, sender, txBody, snap); - UNIT_ASSERT(ok); - - PlanSchemaTx(runtime, sender, snap); -} - std::vector<TString> ReadManyResults(TTestBasicRuntime& runtime, TString& schema, NKikimrTxColumnShard::TMetadata& meta, ui32 expected = 1000) { std::vector<TString> readData; @@ -1870,65 +1848,6 @@ void TestReadAggregate(const std::vector<std::pair<TString, TTypeInfo>>& ydbSche } Y_UNIT_TEST_SUITE(EvWrite) { - class TArrowData : public NKikimr::NEvents::IDataConstructor { - std::vector<std::pair<TString, TTypeInfo>> YdbSchema; - ui64 Index; - - public: - TArrowData(const std::vector<std::pair<TString, TTypeInfo>>& ydbSchema, const ui64 idx) - : YdbSchema(ydbSchema) - , Index(idx) - { - } - - void Serialize(NKikimrDataEvents::TOperationData& proto) const override { - for (ui32 i = 0; i < YdbSchema.size(); ++i) { - proto.AddColumnIds(i + 1); - } - proto.MutableArrowData()->SetPayloadIndex(Index); - } - - ui64 GetSchemaVersion() const override { - return 1; - } - }; - - void PrepareTablet(TTestBasicRuntime& runtime, const ui64 tableId, const std::vector<std::pair<TString, TTypeInfo>>& schema) { - CreateTestBootstrapper(runtime, CreateTestTabletInfo(TTestTxConfig::TxTablet0, TTabletTypes::ColumnShard), &CreateColumnShard); - - TDispatchOptions options; - options.FinalEvents.push_back(TDispatchOptions::TFinalEventCondition(TEvTablet::EvBoot)); - runtime.DispatchEvents(options); - - TestTableDescription tableDescription; - tableDescription.Schema = schema; - tableDescription.Pk = { schema[0] }; - TActorId sender = runtime.AllocateEdgeActor(); - SetupSchema(runtime, sender, tableId, tableDescription); - } - - std::shared_ptr<arrow::RecordBatch> ReadAllAsBatch(TTestBasicRuntime& runtime, const ui64 tableId, const NOlap::TSnapshot& snapshot, const std::vector<std::pair<TString, TTypeInfo>>& schema) { - TActorId sender = runtime.AllocateEdgeActor(); - - ForwardToTablet(runtime, TTestTxConfig::TxTablet0, sender, - new TEvColumnShard::TEvRead(sender, TTestTxConfig::TxTablet1, snapshot.GetPlanStep(), snapshot.GetTxId(), tableId)); - - std::vector<std::shared_ptr<arrow::RecordBatch>> batches; - while(true) { - TAutoPtr<IEventHandle> handle; - auto event = runtime.GrabEdgeEvent<TEvColumnShard::TEvReadResult>(handle); - UNIT_ASSERT(event); - auto b = event->GetArrowBatch(); - if (b) { - batches.push_back(b); - } - if (!event->HasMore()) { - break; - } - } - auto res = NArrow::CombineBatches(batches); - return res ? res : NArrow::MakeEmptyBatch(NArrow::MakeArrowSchema(schema)); - } Y_UNIT_TEST(WriteInTransaction) { using namespace NArrow; @@ -1953,7 +1872,7 @@ Y_UNIT_TEST_SUITE(EvWrite) { UNIT_ASSERT(blobData.size() < TLimits::GetMaxBlobSize()); auto evWrite = std::make_unique<NKikimr::NEvents::TDataEvents::TEvWrite>(txId); - auto dataPtr = std::make_shared<TArrowData>(schema, TPayloadHelper<NKikimr::NEvents::TDataEvents::TEvWrite>(*evWrite).AddDataToPayload(std::move(blobData))); + auto dataPtr = std::make_shared<TArrowDataConstructor>(schema, TPayloadHelper<NKikimr::NEvents::TDataEvents::TEvWrite>(*evWrite).AddDataToPayload(std::move(blobData))); evWrite->AddReplaceOp(tableId, dataPtr); TActorId sender = runtime.AllocateEdgeActor(); @@ -1975,7 +1894,7 @@ Y_UNIT_TEST_SUITE(EvWrite) { UNIT_ASSERT_VALUES_EQUAL(readResult->num_rows(), 2048); } - Y_UNIT_TEST(AbotrInTransaction) { + Y_UNIT_TEST(AbortInTransaction) { using namespace NArrow; TTestBasicRuntime runtime; @@ -1998,7 +1917,7 @@ Y_UNIT_TEST_SUITE(EvWrite) { UNIT_ASSERT(blobData.size() < TLimits::GetMaxBlobSize()); auto evWrite = std::make_unique<NKikimr::NEvents::TDataEvents::TEvWrite>(txId); - auto dataPtr = std::make_shared<TArrowData>(schema, TPayloadHelper<NKikimr::NEvents::TDataEvents::TEvWrite>(*evWrite).AddDataToPayload(std::move(blobData))); + auto dataPtr = std::make_shared<TArrowDataConstructor>(schema, TPayloadHelper<NKikimr::NEvents::TDataEvents::TEvWrite>(*evWrite).AddDataToPayload(std::move(blobData))); evWrite->AddReplaceOp(tableId, dataPtr); TActorId sender = runtime.AllocateEdgeActor(); @@ -2042,7 +1961,7 @@ Y_UNIT_TEST_SUITE(EvWrite) { UNIT_ASSERT(blobData.size() > TLimits::GetMaxBlobSize()); auto evWrite = std::make_unique<NKikimr::NEvents::TDataEvents::TEvWrite>(txId); - auto dataPtr = std::make_shared<TArrowData>(schema, TPayloadHelper<NKikimr::NEvents::TDataEvents::TEvWrite>(*evWrite).AddDataToPayload(std::move(blobData))); + auto dataPtr = std::make_shared<TArrowDataConstructor>(schema, TPayloadHelper<NKikimr::NEvents::TDataEvents::TEvWrite>(*evWrite).AddDataToPayload(std::move(blobData))); evWrite->AddReplaceOp(tableId, dataPtr); TActorId sender = runtime.AllocateEdgeActor(); diff --git a/ydb/core/tx/columnshard/ut_rw/ut_normalizer.cpp b/ydb/core/tx/columnshard/ut_rw/ut_normalizer.cpp new file mode 100644 index 0000000000..45bf17c477 --- /dev/null +++ b/ydb/core/tx/columnshard/ut_rw/ut_normalizer.cpp @@ -0,0 +1,165 @@ +#include "columnshard_ut_common.h" + +#include <ydb/core/tx/columnshard/hooks/abstract/abstract.h> +#include <ydb/core/tx/columnshard/hooks/testing/controller.h> + +#include <ydb/core/tx/columnshard/operations/write_data.h> + +#include <ydb/core/formats/arrow/simple_builder/filler.h> +#include <ydb/core/formats/arrow/simple_builder/array.h> +#include <ydb/core/formats/arrow/simple_builder/batch.h> + + +namespace NKikimr { + +using namespace NColumnShard; +using namespace Tests; +using namespace NTxUT; + + +class TPrepareLocalDBController: public NKikimr::NYDBTest::NColumnShard::TController { +private: + using TBase = NKikimr::NYDBTest::ICSController; + + struct TPortionRecord { + ui64 Index = 0; + ui64 Granule = 0; + ui64 ColumnIdx = 0; + ui64 PlanStep = 0; + ui64 TxId = 0; + ui64 Portion = 0; + ui32 Chunk = 0; + ui64 XPlanStep = 0; + ui64 XTxId = 0; + TString Blob; + TString Metadata; + ui32 Offset = 0; + ui32 Size = 0; + }; + +protected: + + class TPathIdCleaner : public NYDBTest::ILocalDBModifier { + public: + virtual void Apply(NTabletFlatExecutor::TTransactionContext& txc) const override { + using namespace NColumnShard; + NIceDb::TNiceDb db(txc.DB); + + THashMap<ui64, TPortionRecord> portion2Key; + std::optional<ui64> pathId; + { + auto rowset = db.Table<Schema::IndexColumns>().Select(); + UNIT_ASSERT(rowset.IsReady()); + + while (!rowset.EndOfSet()) { + TPortionRecord key; + key.Index = rowset.GetValue<Schema::IndexColumns::Index>(); + key.Granule = rowset.GetValue<Schema::IndexColumns::Granule>(); + key.ColumnIdx = rowset.GetValue<Schema::IndexColumns::ColumnIdx>(); + key.PlanStep = rowset.GetValue<Schema::IndexColumns::PlanStep>(); + key.TxId = rowset.GetValue<Schema::IndexColumns::TxId>(); + key.Portion = rowset.GetValue<Schema::IndexColumns::Portion>(); + key.Chunk = rowset.GetValue<Schema::IndexColumns::Chunk>(); + + key.XPlanStep = rowset.GetValue<Schema::IndexColumns::XPlanStep>(); + key.XTxId = rowset.GetValue<Schema::IndexColumns::XTxId>(); + key.Blob = rowset.GetValue<Schema::IndexColumns::Blob>(); + key.Metadata = rowset.GetValue<Schema::IndexColumns::Metadata>(); + key.Offset = rowset.GetValue<Schema::IndexColumns::Offset>(); + key.Size = rowset.GetValue<Schema::IndexColumns::Size>(); + + pathId = rowset.GetValue<Schema::IndexColumns::PathId>(); + + portion2Key[key.Portion] = key; + + UNIT_ASSERT(rowset.Next()); + } + } + + UNIT_ASSERT(pathId.has_value()); + + for (auto&& [ portionId, key ] : portion2Key) { + db.Table<Schema::IndexColumns>().Key(key.Index, key.Granule, key.ColumnIdx, + key.PlanStep, key.TxId, key.Portion, key.Chunk).Delete(); + + db.Table<Schema::IndexColumns>().Key(key.Index, 1, key.ColumnIdx, + key.PlanStep, key.TxId, key.Portion, key.Chunk).Update( + NIceDb::TUpdate<Schema::IndexColumns::XPlanStep>(key.XPlanStep), + NIceDb::TUpdate<Schema::IndexColumns::XTxId>(key.XTxId), + NIceDb::TUpdate<Schema::IndexColumns::Blob>(key.Blob), + NIceDb::TUpdate<Schema::IndexColumns::Metadata>(key.Metadata), + NIceDb::TUpdate<Schema::IndexColumns::Offset>(key.Offset), + NIceDb::TUpdate<Schema::IndexColumns::Size>(key.Size), + + NIceDb::TNull<Schema::IndexColumns::PathId>() + ); + } + + db.Table<Schema::IndexGranules>().Key(0, *pathId, "1").Update( + NIceDb::TUpdate<Schema::IndexGranules::Granule>(1), + NIceDb::TUpdate<Schema::IndexGranules::PlanStep>(1), + NIceDb::TUpdate<Schema::IndexGranules::TxId>(1), + NIceDb::TUpdate<Schema::IndexGranules::Metadata>("") + ); + } + }; + +public: + NYDBTest::ILocalDBModifier::TPtr BuildLocalBaseModifier() const override { + return std::make_shared<TPathIdCleaner>(); + } +}; + +Y_UNIT_TEST_SUITE(Normalizers) { + + Y_UNIT_TEST(PathIdNormalizer) { + using namespace NArrow; + auto csControllerGuard = NYDBTest::TControllers::RegisterCSControllerGuard<TPrepareLocalDBController>(); + + TTestBasicRuntime runtime; + TTester::Setup(runtime); + + const ui64 tableId = 1; + const std::vector<std::pair<TString, TTypeInfo>> schema = { + {"key", TTypeInfo(NTypeIds::Uint64) }, + {"field", TTypeInfo(NTypeIds::Utf8) } + }; + PrepareTablet(runtime, tableId, schema); + const ui64 txId = 111; + + NConstruction::IArrayBuilder::TPtr keyColumn = std::make_shared<NConstruction::TSimpleArrayConstructor<NConstruction::TIntSeqFiller<arrow::UInt64Type>>>("key"); + NConstruction::IArrayBuilder::TPtr column = std::make_shared<NConstruction::TSimpleArrayConstructor<NConstruction::TStringPoolFiller>>( + "field", NConstruction::TStringPoolFiller(8, 100)); + + auto batch = NConstruction::TRecordBatchConstructor({ keyColumn, column }).BuildBatch(2048); + TString blobData = NArrow::SerializeBatchNoCompression(batch); + + auto evWrite = std::make_unique<NKikimr::NEvents::TDataEvents::TEvWrite>(txId); + auto dataPtr = std::make_shared<TArrowDataConstructor>(schema, TPayloadHelper<NKikimr::NEvents::TDataEvents::TEvWrite>(*evWrite).AddDataToPayload(std::move(blobData))); + evWrite->AddReplaceOp(tableId, dataPtr); + + TActorId sender = runtime.AllocateEdgeActor(); + ForwardToTablet(runtime, TTestTxConfig::TxTablet0, sender, evWrite.release()); + { + TAutoPtr<NActors::IEventHandle> handle; + auto event = runtime.GrabEdgeEvent<NKikimr::NEvents::TDataEvents::TEvWriteResult>(handle); + UNIT_ASSERT(event); + UNIT_ASSERT_VALUES_EQUAL((ui64)event->Record.GetStatus(), (ui64)NKikimrDataEvents::TEvWriteResult::PREPARED); + + PlanWriteTx(runtime, sender, NOlap::TSnapshot(11, txId)); + } + + { + auto readResult = ReadAllAsBatch(runtime, tableId, NOlap::TSnapshot(11, txId), schema); + UNIT_ASSERT_VALUES_EQUAL(readResult->num_rows(), 2048); + } + RebootTablet(runtime, TTestTxConfig::TxTablet0, sender); + + { + auto readResult = ReadAllAsBatch(runtime, tableId, NOlap::TSnapshot(11, txId), schema); + UNIT_ASSERT_VALUES_EQUAL(readResult->num_rows(), 2048); + } + } +} + +} // namespace NKikimr diff --git a/ydb/core/tx/columnshard/ut_rw/ya.make b/ydb/core/tx/columnshard/ut_rw/ya.make index 2819d6f667..63c9dbed1d 100644 --- a/ydb/core/tx/columnshard/ut_rw/ya.make +++ b/ydb/core/tx/columnshard/ut_rw/ya.make @@ -31,6 +31,7 @@ YQL_LAST_ABI_VERSION() SRCS( columnshard_ut_common.cpp ut_columnshard_read_write.cpp + ut_normalizer.cpp ) END() diff --git a/ydb/core/tx/columnshard/ya.make b/ydb/core/tx/columnshard/ya.make index 87a239982e..66b50a38e1 100644 --- a/ydb/core/tx/columnshard/ya.make +++ b/ydb/core/tx/columnshard/ya.make @@ -53,6 +53,7 @@ PEERDIR( ydb/core/tx/columnshard/blobs_reader ydb/core/tx/columnshard/blobs_action ydb/core/tx/columnshard/resource_subscriber + ydb/core/tx/columnshard/normalizer/granule ydb/core/tx/tiering ydb/core/tx/conveyor/usage ydb/core/tx/long_tx_service/public |