aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authornsofya <nsofya@ydb.tech>2023-10-25 16:26:49 +0300
committernsofya <nsofya@ydb.tech>2023-10-25 16:47:50 +0300
commit5e9517e09bd2bb7fd6a0daf598c081125a85e98d (patch)
tree6ced059184e4fd254e661f8b764aadc4cd756454
parent509f4bccedd15798058fffbba552dfe3c76c4899 (diff)
downloadydb-5e9517e09bd2bb7fd6a0daf598c081125a85e98d.tar.gz
KIKIMR-19807: PathId normalization
-rw-r--r--.mapping.json11
-rw-r--r--ydb/core/tx/columnshard/CMakeLists.darwin-x86_64.txt2
-rw-r--r--ydb/core/tx/columnshard/CMakeLists.linux-aarch64.txt2
-rw-r--r--ydb/core/tx/columnshard/CMakeLists.linux-x86_64.txt2
-rw-r--r--ydb/core/tx/columnshard/CMakeLists.windows-x86_64.txt2
-rw-r--r--ydb/core/tx/columnshard/blobs_reader/task.cpp2
-rw-r--r--ydb/core/tx/columnshard/columnshard.cpp12
-rw-r--r--ydb/core/tx/columnshard/columnshard__init.cpp63
-rw-r--r--ydb/core/tx/columnshard/columnshard_impl.cpp7
-rw-r--r--ydb/core/tx/columnshard/columnshard_impl.h3
-rw-r--r--ydb/core/tx/columnshard/columnshard_ut_common.cpp56
-rw-r--r--ydb/core/tx/columnshard/columnshard_ut_common.h37
-rw-r--r--ydb/core/tx/columnshard/hooks/abstract/abstract.h16
-rw-r--r--ydb/core/tx/columnshard/normalizer/CMakeLists.txt10
-rw-r--r--ydb/core/tx/columnshard/normalizer/abstract/CMakeLists.darwin-x86_64.txt20
-rw-r--r--ydb/core/tx/columnshard/normalizer/abstract/CMakeLists.linux-aarch64.txt21
-rw-r--r--ydb/core/tx/columnshard/normalizer/abstract/CMakeLists.linux-x86_64.txt21
-rw-r--r--ydb/core/tx/columnshard/normalizer/abstract/CMakeLists.txt17
-rw-r--r--ydb/core/tx/columnshard/normalizer/abstract/CMakeLists.windows-x86_64.txt20
-rw-r--r--ydb/core/tx/columnshard/normalizer/abstract/abstract.cpp1
-rw-r--r--ydb/core/tx/columnshard/normalizer/abstract/abstract.h55
-rw-r--r--ydb/core/tx/columnshard/normalizer/abstract/ya.make13
-rw-r--r--ydb/core/tx/columnshard/normalizer/granule/CMakeLists.darwin-x86_64.txt18
-rw-r--r--ydb/core/tx/columnshard/normalizer/granule/CMakeLists.linux-aarch64.txt19
-rw-r--r--ydb/core/tx/columnshard/normalizer/granule/CMakeLists.linux-x86_64.txt19
-rw-r--r--ydb/core/tx/columnshard/normalizer/granule/CMakeLists.txt17
-rw-r--r--ydb/core/tx/columnshard/normalizer/granule/CMakeLists.windows-x86_64.txt18
-rw-r--r--ydb/core/tx/columnshard/normalizer/granule/normalizer.cpp1
-rw-r--r--ydb/core/tx/columnshard/normalizer/granule/normalizer.h114
-rw-r--r--ydb/core/tx/columnshard/normalizer/granule/ya.make11
-rw-r--r--ydb/core/tx/columnshard/ut_rw/CMakeLists.darwin-x86_64.txt1
-rw-r--r--ydb/core/tx/columnshard/ut_rw/CMakeLists.linux-aarch64.txt1
-rw-r--r--ydb/core/tx/columnshard/ut_rw/CMakeLists.linux-x86_64.txt1
-rw-r--r--ydb/core/tx/columnshard/ut_rw/CMakeLists.windows-x86_64.txt1
-rw-r--r--ydb/core/tx/columnshard/ut_rw/ut_columnshard_read_write.cpp89
-rw-r--r--ydb/core/tx/columnshard/ut_rw/ut_normalizer.cpp165
-rw-r--r--ydb/core/tx/columnshard/ut_rw/ya.make1
-rw-r--r--ydb/core/tx/columnshard/ya.make1
38 files changed, 758 insertions, 112 deletions
diff --git a/.mapping.json b/.mapping.json
index 31e35d3301..b61d37e62f 100644
--- a/.mapping.json
+++ b/.mapping.json
@@ -5445,6 +5445,17 @@
"ydb/core/tx/columnshard/hooks/testing/CMakeLists.linux-x86_64.txt":"",
"ydb/core/tx/columnshard/hooks/testing/CMakeLists.txt":"",
"ydb/core/tx/columnshard/hooks/testing/CMakeLists.windows-x86_64.txt":"",
+ "ydb/core/tx/columnshard/normalizer/CMakeLists.txt":"",
+ "ydb/core/tx/columnshard/normalizer/abstract/CMakeLists.darwin-x86_64.txt":"",
+ "ydb/core/tx/columnshard/normalizer/abstract/CMakeLists.linux-aarch64.txt":"",
+ "ydb/core/tx/columnshard/normalizer/abstract/CMakeLists.linux-x86_64.txt":"",
+ "ydb/core/tx/columnshard/normalizer/abstract/CMakeLists.txt":"",
+ "ydb/core/tx/columnshard/normalizer/abstract/CMakeLists.windows-x86_64.txt":"",
+ "ydb/core/tx/columnshard/normalizer/granule/CMakeLists.darwin-x86_64.txt":"",
+ "ydb/core/tx/columnshard/normalizer/granule/CMakeLists.linux-aarch64.txt":"",
+ "ydb/core/tx/columnshard/normalizer/granule/CMakeLists.linux-x86_64.txt":"",
+ "ydb/core/tx/columnshard/normalizer/granule/CMakeLists.txt":"",
+ "ydb/core/tx/columnshard/normalizer/granule/CMakeLists.windows-x86_64.txt":"",
"ydb/core/tx/columnshard/operations/CMakeLists.darwin-x86_64.txt":"",
"ydb/core/tx/columnshard/operations/CMakeLists.linux-aarch64.txt":"",
"ydb/core/tx/columnshard/operations/CMakeLists.linux-x86_64.txt":"",
diff --git a/ydb/core/tx/columnshard/CMakeLists.darwin-x86_64.txt b/ydb/core/tx/columnshard/CMakeLists.darwin-x86_64.txt
index 9f1deb1cd4..98156163d9 100644
--- a/ydb/core/tx/columnshard/CMakeLists.darwin-x86_64.txt
+++ b/ydb/core/tx/columnshard/CMakeLists.darwin-x86_64.txt
@@ -12,6 +12,7 @@ add_subdirectory(common)
add_subdirectory(counters)
add_subdirectory(engines)
add_subdirectory(hooks)
+add_subdirectory(normalizer)
add_subdirectory(operations)
add_subdirectory(resource_subscriber)
add_subdirectory(resources)
@@ -57,6 +58,7 @@ target_link_libraries(core-tx-columnshard PUBLIC
tx-columnshard-blobs_reader
tx-columnshard-blobs_action
tx-columnshard-resource_subscriber
+ columnshard-normalizer-granule
core-tx-tiering
tx-conveyor-usage
tx-long_tx_service-public
diff --git a/ydb/core/tx/columnshard/CMakeLists.linux-aarch64.txt b/ydb/core/tx/columnshard/CMakeLists.linux-aarch64.txt
index 26c0b204e9..e10d15fc23 100644
--- a/ydb/core/tx/columnshard/CMakeLists.linux-aarch64.txt
+++ b/ydb/core/tx/columnshard/CMakeLists.linux-aarch64.txt
@@ -12,6 +12,7 @@ add_subdirectory(common)
add_subdirectory(counters)
add_subdirectory(engines)
add_subdirectory(hooks)
+add_subdirectory(normalizer)
add_subdirectory(operations)
add_subdirectory(resource_subscriber)
add_subdirectory(resources)
@@ -58,6 +59,7 @@ target_link_libraries(core-tx-columnshard PUBLIC
tx-columnshard-blobs_reader
tx-columnshard-blobs_action
tx-columnshard-resource_subscriber
+ columnshard-normalizer-granule
core-tx-tiering
tx-conveyor-usage
tx-long_tx_service-public
diff --git a/ydb/core/tx/columnshard/CMakeLists.linux-x86_64.txt b/ydb/core/tx/columnshard/CMakeLists.linux-x86_64.txt
index 26c0b204e9..e10d15fc23 100644
--- a/ydb/core/tx/columnshard/CMakeLists.linux-x86_64.txt
+++ b/ydb/core/tx/columnshard/CMakeLists.linux-x86_64.txt
@@ -12,6 +12,7 @@ add_subdirectory(common)
add_subdirectory(counters)
add_subdirectory(engines)
add_subdirectory(hooks)
+add_subdirectory(normalizer)
add_subdirectory(operations)
add_subdirectory(resource_subscriber)
add_subdirectory(resources)
@@ -58,6 +59,7 @@ target_link_libraries(core-tx-columnshard PUBLIC
tx-columnshard-blobs_reader
tx-columnshard-blobs_action
tx-columnshard-resource_subscriber
+ columnshard-normalizer-granule
core-tx-tiering
tx-conveyor-usage
tx-long_tx_service-public
diff --git a/ydb/core/tx/columnshard/CMakeLists.windows-x86_64.txt b/ydb/core/tx/columnshard/CMakeLists.windows-x86_64.txt
index 8c97dad14a..47fb7a3a91 100644
--- a/ydb/core/tx/columnshard/CMakeLists.windows-x86_64.txt
+++ b/ydb/core/tx/columnshard/CMakeLists.windows-x86_64.txt
@@ -12,6 +12,7 @@ add_subdirectory(common)
add_subdirectory(counters)
add_subdirectory(engines)
add_subdirectory(hooks)
+add_subdirectory(normalizer)
add_subdirectory(operations)
add_subdirectory(resource_subscriber)
add_subdirectory(resources)
@@ -58,6 +59,7 @@ target_link_libraries(core-tx-columnshard PUBLIC
tx-columnshard-blobs_reader
tx-columnshard-blobs_action
tx-columnshard-resource_subscriber
+ columnshard-normalizer-granule
core-tx-tiering
tx-conveyor-usage
tx-long_tx_service-public
diff --git a/ydb/core/tx/columnshard/blobs_reader/task.cpp b/ydb/core/tx/columnshard/blobs_reader/task.cpp
index bbe9b80824..cfe95e0314 100644
--- a/ydb/core/tx/columnshard/blobs_reader/task.cpp
+++ b/ydb/core/tx/columnshard/blobs_reader/task.cpp
@@ -122,7 +122,7 @@ void ITask::OnDataReady() {
}
bool ITask::OnError(const TBlobRange& range, const IBlobsReadingAction::TErrorStatus& status) {
- ACFL_DEBUG("event", "OnError")("task", DebugString());
+ ACFL_DEBUG("event", "OnError")("status", status.GetStatus())("task", DebugString());
return DoOnError(range, status);
}
diff --git a/ydb/core/tx/columnshard/columnshard.cpp b/ydb/core/tx/columnshard/columnshard.cpp
index 365d8e6d02..14a6d3f2dc 100644
--- a/ydb/core/tx/columnshard/columnshard.cpp
+++ b/ydb/core/tx/columnshard/columnshard.cpp
@@ -13,8 +13,7 @@ IActor* CreateColumnShard(const TActorId& tablet, TTabletStorageInfo* info) {
namespace NKikimr::NColumnShard {
-void TColumnShard::CleanupActors(const TActorContext& ctx)
-{
+void TColumnShard::CleanupActors(const TActorContext& ctx) {
ctx.Send(ResourceSubscribeActor, new TEvents::TEvPoisonPill);
StoragesManager->Stop();
if (Tiers) {
@@ -22,8 +21,7 @@ void TColumnShard::CleanupActors(const TActorContext& ctx)
}
}
-void TColumnShard::BecomeBroken(const TActorContext& ctx)
-{
+void TColumnShard::BecomeBroken(const TActorContext& ctx) {
Become(&TThis::StateBroken);
ctx.Send(Tablet(), new TEvents::TEvPoisonPill);
CleanupActors(ctx);
@@ -41,6 +39,12 @@ void TColumnShard::SwitchToWork(const TActorContext& ctx) {
Become(&TThis::StateWork);
SignalTabletActive(ctx);
+ AFL_INFO(NKikimrServices::TX_COLUMNSHARD)("event", "initialize_shard")("step", "SignalTabletActive");
+ TryRegisterMediatorTimeCast();
+ // Trigger progress: planned or outdated tx
+ EnqueueProgressTx(ctx);
+ EnqueueBackgroundActivities();
+ ctx.Schedule(ActivationPeriod, new TEvPrivate::TEvPeriodicWakeup());
}
void TColumnShard::OnActivateExecutor(const TActorContext& ctx) {
diff --git a/ydb/core/tx/columnshard/columnshard__init.cpp b/ydb/core/tx/columnshard/columnshard__init.cpp
index d7fc92abe3..649304e61c 100644
--- a/ydb/core/tx/columnshard/columnshard__init.cpp
+++ b/ydb/core/tx/columnshard/columnshard__init.cpp
@@ -2,6 +2,7 @@
#include "columnshard_ttl.h"
#include "columnshard_private_events.h"
#include "columnshard_schema.h"
+#include "hooks/abstract/abstract.h"
#include <ydb/core/tx/columnshard/blobs_action/blob_manager_db.h>
#include <ydb/core/tablet/tablet_exception.h>
@@ -11,9 +12,7 @@ namespace NKikimr::NColumnShard {
using namespace NTabletFlatExecutor;
-// TTxInit => SwitchToWork
-/// Load data from local database
class TTxInit : public TTransactionBase<TColumnShard> {
public:
TTxInit(TColumnShard* self)
@@ -204,21 +203,9 @@ bool TTxInit::Execute(TTransactionContext& txc, const TActorContext& ctx) {
}
void TTxInit::Complete(const TActorContext& ctx) {
- LOG_S_DEBUG("TTxInit.Complete at tablet " << Self->TabletID());
Self->SwitchToWork(ctx);
- Self->TryRegisterMediatorTimeCast();
-
- // Trigger progress: planned or outdated tx
- Self->EnqueueProgressTx(ctx);
- Self->EnqueueBackgroundActivities();
-
- // Start periodic wakeups
- ctx.Schedule(Self->ActivationPeriod, new TEvPrivate::TEvPeriodicWakeup());
}
-// TTxUpdateSchema => TTxInit
-
-/// Update local database on tablet start
class TTxUpdateSchema : public TTransactionBase<TColumnShard> {
public:
TTxUpdateSchema(TColumnShard* self)
@@ -228,21 +215,48 @@ public:
bool Execute(TTransactionContext& txc, const TActorContext& ctx) override;
void Complete(const TActorContext& ctx) override;
TTxType GetTxType() const override { return TXTYPE_UPDATE_SCHEMA; }
+
+private:
+ bool WaitNormalization = false;
};
bool TTxUpdateSchema::Execute(TTransactionContext& txc, const TActorContext&) {
- Y_UNUSED(txc);
- LOG_S_DEBUG("TTxUpdateSchema.Execute at tablet " << Self->TabletID());
+ NActors::TLogContextGuard gLogging = NActors::TLogContextBuilder::Build(NKikimrServices::TX_COLUMNSHARD)("tablet_id", Self->TabletID())("event", "initialize_shard");
+ ACFL_INFO("step", "TTxUpdateSchema.Execute_Start");
+
+ for (auto&& normalizer : Self->Normalizers) {
+ if (!normalizer->NormalizationRequired()) {
+ ACFL_INFO("step", "TTxUpdateSchema.Execute")("skip_normalizer", normalizer->GetName());
+ continue;
+ }
+
+ ACFL_INFO("step", "TTxUpdateSchema.Execute")("start_normalizer", normalizer->GetName());
+ NOlap::TNormalizationContext nCtx(Self->InsertTaskSubscription);
+ auto status = normalizer->NormalizeData(nCtx, txc);
+
+ if (status == NOlap::ENormalizerResult::Failed) {
+ ACFL_INFO("step", "TTxUpdateSchema.Execute")("fail_normalizer", normalizer->GetName());
+ return false;
+ } else if (status == NOlap::ENormalizerResult::Wait) {
+ ACFL_INFO("step", "TTxUpdateSchema.Execute")("start_normalizer", normalizer->GetName());
+ WaitNormalization = true;
+ } else if (status == NOlap::ENormalizerResult::Skip) {
+ ACFL_INFO("step", "TTxUpdateSchema.Execute")("pass_normalizer", normalizer->GetName());
+ } else {
+ ACFL_INFO("step", "TTxUpdateSchema.Execute")("finish_normalizer", normalizer->GetName());
+ }
+ }
+ ACFL_INFO("step", "TTxUpdateSchema.Execute_Finish");
return true;
}
void TTxUpdateSchema::Complete(const TActorContext& ctx) {
- LOG_S_DEBUG("TTxUpdateSchema.Complete at tablet " << Self->TabletID());
- Self->Execute(new TTxInit(Self), ctx);
+ ACFL_INFO("step", "TTxUpdateSchema.Complete");
+ if (!WaitNormalization) {
+ Self->Execute(new TTxInit(Self), ctx);
+ }
}
-// TTxInitSchema => TTxUpdateSchema
-
/// Create local database on tablet start if none
class TTxInitSchema : public TTransactionBase<TColumnShard> {
public:
@@ -258,13 +272,18 @@ public:
bool TTxInitSchema::Execute(TTransactionContext& txc, const TActorContext&) {
LOG_S_DEBUG("TxInitSchema.Execute at tablet " << Self->TabletID());
- bool isCreate = txc.DB.GetScheme().IsEmpty();
+ const bool isFirstRun = txc.DB.GetScheme().IsEmpty();
NIceDb::TNiceDb(txc.DB).Materialize<Schema>();
- if (isCreate) {
+ if (isFirstRun) {
txc.DB.Alter().SetExecutorAllowLogBatching(gAllowLogBatchingDefaultValue);
txc.DB.Alter().SetExecutorLogFlushPeriod(TDuration::MicroSeconds(500));
txc.DB.Alter().SetExecutorCacheSize(500000);
+ } else {
+ auto localBaseModifier = NYDBTest::TControllers::GetColumnShardController()->BuildLocalBaseModifier();
+ if (localBaseModifier) {
+ localBaseModifier->Apply(txc);
+ }
}
// Enable compression for the SmallBlobs table
diff --git a/ydb/core/tx/columnshard/columnshard_impl.cpp b/ydb/core/tx/columnshard/columnshard_impl.cpp
index 3173bebe08..86908da2fb 100644
--- a/ydb/core/tx/columnshard/columnshard_impl.cpp
+++ b/ydb/core/tx/columnshard/columnshard_impl.cpp
@@ -25,6 +25,9 @@
#include "resource_subscriber/counters.h"
#include "blobs_reader/actor.h"
+#include <ydb/core/tx/columnshard/normalizer/granule/normalizer.h>
+// #include <ydb/core/tx/columnshard/normalizer/portion/normalizer.h>
+
namespace NKikimr::NColumnShard {
// NOTE: We really want to batch log records by default in columnshards!
@@ -174,6 +177,10 @@ TColumnShard::TColumnShard(TTabletStorageInfo* info, const TActorId& tablet)
ETxTypes_descriptor
>());
TabletCounters = TabletCountersPtr.get();
+
+ Normalizers.emplace_back(std::make_shared<NOlap::TGranulesNormalizer>());
+ // Normalizers.emplace_back(std::make_shared<NOlap::TPortionsNormalizer>(Info()));
+
}
void TColumnShard::OnDetach(const TActorContext& ctx) {
diff --git a/ydb/core/tx/columnshard/columnshard_impl.h b/ydb/core/tx/columnshard/columnshard_impl.h
index c683b0a5c2..40bb27584c 100644
--- a/ydb/core/tx/columnshard/columnshard_impl.h
+++ b/ydb/core/tx/columnshard/columnshard_impl.h
@@ -13,6 +13,7 @@
#include "counters/columnshard.h"
#include "resource_subscriber/counters.h"
#include "resource_subscriber/task.h"
+#include "normalizer/abstract/abstract.h"
#include <ydb/core/base/tablet_pipecache.h>
#include <ydb/core/tablet/tablet_counters.h>
@@ -409,6 +410,8 @@ private:
const TIndexationCounters IndexationCounters = TIndexationCounters("Indexation");
const TIndexationCounters EvictionCounters = TIndexationCounters("Eviction");
+ std::vector<NOlap::INormalizerComponent::TPtr> Normalizers;
+
const TCSCounters CSCounters;
TWritesMonitor WritesMonitor;
diff --git a/ydb/core/tx/columnshard/columnshard_ut_common.cpp b/ydb/core/tx/columnshard/columnshard_ut_common.cpp
index 53e6aedbfe..c90860471b 100644
--- a/ydb/core/tx/columnshard/columnshard_ut_common.cpp
+++ b/ydb/core/tx/columnshard/columnshard_ut_common.cpp
@@ -411,4 +411,60 @@ namespace NKikimr::NColumnShard {
indexInfo.SetAllKeys();
return indexInfo;
}
+
+ void SetupSchema(TTestBasicRuntime& runtime, TActorId& sender, ui64 pathId,
+ const TestTableDescription& table, TString codec) {
+ using namespace NTxUT;
+ NOlap::TSnapshot snap(10, 10);
+ TString txBody;
+ auto specials = TTestSchema::TTableSpecials().WithCodec(codec);
+ if (table.InStore) {
+ txBody = TTestSchema::CreateTableTxBody(pathId, table.Schema, table.Pk, specials);
+ } else {
+ txBody = TTestSchema::CreateStandaloneTableTxBody(pathId, table.Schema, table.Pk, specials);
+ }
+ bool ok = ProposeSchemaTx(runtime, sender, txBody, snap);
+ UNIT_ASSERT(ok);
+
+ PlanSchemaTx(runtime, sender, snap);
+ }
+
+ void PrepareTablet(TTestBasicRuntime& runtime, const ui64 tableId, const std::vector<std::pair<TString, NScheme::TTypeInfo>>& schema) {
+ using namespace NTxUT;
+ CreateTestBootstrapper(runtime, CreateTestTabletInfo(TTestTxConfig::TxTablet0, TTabletTypes::ColumnShard), &CreateColumnShard);
+
+ TDispatchOptions options;
+ options.FinalEvents.push_back(TDispatchOptions::TFinalEventCondition(TEvTablet::EvBoot));
+ runtime.DispatchEvents(options);
+
+ TestTableDescription tableDescription;
+ tableDescription.Schema = schema;
+ tableDescription.Pk = { schema[0] };
+ TActorId sender = runtime.AllocateEdgeActor();
+ SetupSchema(runtime, sender, tableId, tableDescription);
+ }
+
+ std::shared_ptr<arrow::RecordBatch> ReadAllAsBatch(TTestBasicRuntime& runtime, const ui64 tableId, const NOlap::TSnapshot& snapshot, const std::vector<std::pair<TString, NScheme::TTypeInfo>>& schema) {
+ using namespace NTxUT;
+ TActorId sender = runtime.AllocateEdgeActor();
+
+ ForwardToTablet(runtime, TTestTxConfig::TxTablet0, sender,
+ new TEvColumnShard::TEvRead(sender, TTestTxConfig::TxTablet1, snapshot.GetPlanStep(), snapshot.GetTxId(), tableId));
+
+ std::vector<std::shared_ptr<arrow::RecordBatch>> batches;
+ while(true) {
+ TAutoPtr<IEventHandle> handle;
+ auto event = runtime.GrabEdgeEvent<TEvColumnShard::TEvReadResult>(handle);
+ UNIT_ASSERT(event);
+ auto b = event->GetArrowBatch();
+ if (b) {
+ batches.push_back(b);
+ }
+ if (!event->HasMore()) {
+ break;
+ }
+ }
+ auto res = NArrow::CombineBatches(batches);
+ return res ? res : NArrow::MakeEmptyBatch(NArrow::MakeArrowSchema(schema));
+ }
}
diff --git a/ydb/core/tx/columnshard/columnshard_ut_common.h b/ydb/core/tx/columnshard/columnshard_ut_common.h
index 344933f78b..517ac70c1d 100644
--- a/ydb/core/tx/columnshard/columnshard_ut_common.h
+++ b/ydb/core/tx/columnshard/columnshard_ut_common.h
@@ -527,4 +527,41 @@ namespace NKikimr::NColumnShard {
NOlap::TIndexInfo BuildTableInfo(const std::vector<std::pair<TString, NScheme::TTypeInfo>>& ydbSchema,
const std::vector<std::pair<TString, NScheme::TTypeInfo>>& key);
+
+
+ struct TestTableDescription {
+ std::vector<std::pair<TString, NScheme::TTypeInfo>> Schema = NTxUT::TTestSchema::YdbSchema();
+ std::vector<std::pair<TString, NScheme::TTypeInfo>> Pk = NTxUT::TTestSchema::YdbPkSchema();
+ bool InStore = true;
+ };
+
+ class TArrowDataConstructor : public NKikimr::NEvents::IDataConstructor {
+ std::vector<std::pair<TString, NScheme::TTypeInfo>> YdbSchema;
+ ui64 Index;
+
+ public:
+ TArrowDataConstructor(const std::vector<std::pair<TString, NScheme::TTypeInfo>>& ydbSchema, const ui64 idx)
+ : YdbSchema(ydbSchema)
+ , Index(idx)
+ {
+ }
+
+ void Serialize(NKikimrDataEvents::TOperationData& proto) const override {
+ for (ui32 i = 0; i < YdbSchema.size(); ++i) {
+ proto.AddColumnIds(i + 1);
+ }
+ proto.MutableArrowData()->SetPayloadIndex(Index);
+ }
+
+ ui64 GetSchemaVersion() const override {
+ return 1;
+ }
+ };
+
+ void SetupSchema(TTestBasicRuntime& runtime, TActorId& sender, ui64 pathId,
+ const TestTableDescription& table = {}, TString codec = "none");
+
+ void PrepareTablet(TTestBasicRuntime& runtime, const ui64 tableId, const std::vector<std::pair<TString, NScheme::TTypeInfo>>& schema);
+
+ std::shared_ptr<arrow::RecordBatch> ReadAllAsBatch(TTestBasicRuntime& runtime, const ui64 tableId, const NOlap::TSnapshot& snapshot, const std::vector<std::pair<TString, NScheme::TTypeInfo>>& schema);
}
diff --git a/ydb/core/tx/columnshard/hooks/abstract/abstract.h b/ydb/core/tx/columnshard/hooks/abstract/abstract.h
index 4339a6d1a6..7a40d59aa9 100644
--- a/ydb/core/tx/columnshard/hooks/abstract/abstract.h
+++ b/ydb/core/tx/columnshard/hooks/abstract/abstract.h
@@ -1,4 +1,7 @@
#pragma once
+
+#include <ydb/core/tablet_flat/tablet_flat_executor.h>
+
#include <ydb/services/metadata/abstract/fetcher.h>
#include <ydb/core/tx/tiering/snapshot.h>
@@ -31,6 +34,15 @@ enum class EOptimizerCompactionWeightControl {
Force
};
+class ILocalDBModifier {
+public:
+ using TPtr = std::shared_ptr<ILocalDBModifier>;
+
+ virtual ~ILocalDBModifier() {}
+
+ virtual void Apply(NTabletFlatExecutor::TTransactionContext& txc) const = 0;
+};
+
class ICSController {
private:
YDB_READONLY(TAtomicCounter, OnSortingPolicyCounter, 0);
@@ -89,6 +101,10 @@ public:
virtual void OnTieringModified(const std::shared_ptr<NColumnShard::TTiersManager>& /*tiers*/) {
}
+ virtual ILocalDBModifier::TPtr BuildLocalBaseModifier() const {
+ return nullptr;
+ }
+
virtual NMetadata::NFetcher::ISnapshot::TPtr GetFallbackTiersSnapshot() const {
static std::shared_ptr<NColumnShard::NTiers::TConfigsSnapshot> result = std::make_shared<NColumnShard::NTiers::TConfigsSnapshot>(TInstant::Now());
return result;
diff --git a/ydb/core/tx/columnshard/normalizer/CMakeLists.txt b/ydb/core/tx/columnshard/normalizer/CMakeLists.txt
new file mode 100644
index 0000000000..7475f3a995
--- /dev/null
+++ b/ydb/core/tx/columnshard/normalizer/CMakeLists.txt
@@ -0,0 +1,10 @@
+
+# This file was generated by the build system used internally in the Yandex monorepo.
+# Only simple modifications are allowed (adding source-files to targets, adding simple properties
+# like target_include_directories). These modifications will be ported to original
+# ya.make files by maintainers. Any complex modifications which can't be ported back to the
+# original buildsystem will not be accepted.
+
+
+add_subdirectory(abstract)
+add_subdirectory(granule)
diff --git a/ydb/core/tx/columnshard/normalizer/abstract/CMakeLists.darwin-x86_64.txt b/ydb/core/tx/columnshard/normalizer/abstract/CMakeLists.darwin-x86_64.txt
new file mode 100644
index 0000000000..4a11c1f061
--- /dev/null
+++ b/ydb/core/tx/columnshard/normalizer/abstract/CMakeLists.darwin-x86_64.txt
@@ -0,0 +1,20 @@
+
+# This file was generated by the build system used internally in the Yandex monorepo.
+# Only simple modifications are allowed (adding source-files to targets, adding simple properties
+# like target_include_directories). These modifications will be ported to original
+# ya.make files by maintainers. Any complex modifications which can't be ported back to the
+# original buildsystem will not be accepted.
+
+
+
+add_library(columnshard-normalizer-abstract)
+target_link_libraries(columnshard-normalizer-abstract PUBLIC
+ contrib-libs-cxxsupp
+ yutil
+ ydb-core-tablet_flat
+ columnshard-blobs_action-abstract
+ tx-columnshard-resource_subscriber
+)
+target_sources(columnshard-normalizer-abstract PRIVATE
+ ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/normalizer/abstract/abstract.cpp
+)
diff --git a/ydb/core/tx/columnshard/normalizer/abstract/CMakeLists.linux-aarch64.txt b/ydb/core/tx/columnshard/normalizer/abstract/CMakeLists.linux-aarch64.txt
new file mode 100644
index 0000000000..6da128405d
--- /dev/null
+++ b/ydb/core/tx/columnshard/normalizer/abstract/CMakeLists.linux-aarch64.txt
@@ -0,0 +1,21 @@
+
+# This file was generated by the build system used internally in the Yandex monorepo.
+# Only simple modifications are allowed (adding source-files to targets, adding simple properties
+# like target_include_directories). These modifications will be ported to original
+# ya.make files by maintainers. Any complex modifications which can't be ported back to the
+# original buildsystem will not be accepted.
+
+
+
+add_library(columnshard-normalizer-abstract)
+target_link_libraries(columnshard-normalizer-abstract PUBLIC
+ contrib-libs-linux-headers
+ contrib-libs-cxxsupp
+ yutil
+ ydb-core-tablet_flat
+ columnshard-blobs_action-abstract
+ tx-columnshard-resource_subscriber
+)
+target_sources(columnshard-normalizer-abstract PRIVATE
+ ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/normalizer/abstract/abstract.cpp
+)
diff --git a/ydb/core/tx/columnshard/normalizer/abstract/CMakeLists.linux-x86_64.txt b/ydb/core/tx/columnshard/normalizer/abstract/CMakeLists.linux-x86_64.txt
new file mode 100644
index 0000000000..6da128405d
--- /dev/null
+++ b/ydb/core/tx/columnshard/normalizer/abstract/CMakeLists.linux-x86_64.txt
@@ -0,0 +1,21 @@
+
+# This file was generated by the build system used internally in the Yandex monorepo.
+# Only simple modifications are allowed (adding source-files to targets, adding simple properties
+# like target_include_directories). These modifications will be ported to original
+# ya.make files by maintainers. Any complex modifications which can't be ported back to the
+# original buildsystem will not be accepted.
+
+
+
+add_library(columnshard-normalizer-abstract)
+target_link_libraries(columnshard-normalizer-abstract PUBLIC
+ contrib-libs-linux-headers
+ contrib-libs-cxxsupp
+ yutil
+ ydb-core-tablet_flat
+ columnshard-blobs_action-abstract
+ tx-columnshard-resource_subscriber
+)
+target_sources(columnshard-normalizer-abstract PRIVATE
+ ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/normalizer/abstract/abstract.cpp
+)
diff --git a/ydb/core/tx/columnshard/normalizer/abstract/CMakeLists.txt b/ydb/core/tx/columnshard/normalizer/abstract/CMakeLists.txt
new file mode 100644
index 0000000000..f8b31df0c1
--- /dev/null
+++ b/ydb/core/tx/columnshard/normalizer/abstract/CMakeLists.txt
@@ -0,0 +1,17 @@
+
+# This file was generated by the build system used internally in the Yandex monorepo.
+# Only simple modifications are allowed (adding source-files to targets, adding simple properties
+# like target_include_directories). These modifications will be ported to original
+# ya.make files by maintainers. Any complex modifications which can't be ported back to the
+# original buildsystem will not be accepted.
+
+
+if (CMAKE_SYSTEM_NAME STREQUAL "Linux" AND CMAKE_SYSTEM_PROCESSOR STREQUAL "aarch64" AND NOT HAVE_CUDA)
+ include(CMakeLists.linux-aarch64.txt)
+elseif (CMAKE_SYSTEM_NAME STREQUAL "Darwin" AND CMAKE_SYSTEM_PROCESSOR STREQUAL "x86_64")
+ include(CMakeLists.darwin-x86_64.txt)
+elseif (WIN32 AND CMAKE_SYSTEM_PROCESSOR STREQUAL "AMD64" AND NOT HAVE_CUDA)
+ include(CMakeLists.windows-x86_64.txt)
+elseif (CMAKE_SYSTEM_NAME STREQUAL "Linux" AND CMAKE_SYSTEM_PROCESSOR STREQUAL "x86_64" AND NOT HAVE_CUDA)
+ include(CMakeLists.linux-x86_64.txt)
+endif()
diff --git a/ydb/core/tx/columnshard/normalizer/abstract/CMakeLists.windows-x86_64.txt b/ydb/core/tx/columnshard/normalizer/abstract/CMakeLists.windows-x86_64.txt
new file mode 100644
index 0000000000..4a11c1f061
--- /dev/null
+++ b/ydb/core/tx/columnshard/normalizer/abstract/CMakeLists.windows-x86_64.txt
@@ -0,0 +1,20 @@
+
+# This file was generated by the build system used internally in the Yandex monorepo.
+# Only simple modifications are allowed (adding source-files to targets, adding simple properties
+# like target_include_directories). These modifications will be ported to original
+# ya.make files by maintainers. Any complex modifications which can't be ported back to the
+# original buildsystem will not be accepted.
+
+
+
+add_library(columnshard-normalizer-abstract)
+target_link_libraries(columnshard-normalizer-abstract PUBLIC
+ contrib-libs-cxxsupp
+ yutil
+ ydb-core-tablet_flat
+ columnshard-blobs_action-abstract
+ tx-columnshard-resource_subscriber
+)
+target_sources(columnshard-normalizer-abstract PRIVATE
+ ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/normalizer/abstract/abstract.cpp
+)
diff --git a/ydb/core/tx/columnshard/normalizer/abstract/abstract.cpp b/ydb/core/tx/columnshard/normalizer/abstract/abstract.cpp
new file mode 100644
index 0000000000..6908736d95
--- /dev/null
+++ b/ydb/core/tx/columnshard/normalizer/abstract/abstract.cpp
@@ -0,0 +1 @@
+#include "abstract.h"
diff --git a/ydb/core/tx/columnshard/normalizer/abstract/abstract.h b/ydb/core/tx/columnshard/normalizer/abstract/abstract.h
new file mode 100644
index 0000000000..8e231a6ef1
--- /dev/null
+++ b/ydb/core/tx/columnshard/normalizer/abstract/abstract.h
@@ -0,0 +1,55 @@
+#pragma once
+
+#include <ydb/core/tablet_flat/tablet_flat_executor.h>
+#include <ydb/library/accessor/accessor.h>
+
+#include <ydb/core/tx/columnshard/blobs_action/abstract/storages_manager.h>
+#include <ydb/core/tx/columnshard/resource_subscriber/task.h>
+
+namespace NKikimr::NOlap {
+
+ class TNormalizationContext {
+ YDB_READONLY_DEF(TActorId, ResourceSubscribeActor);
+ YDB_READONLY_DEF(TActorId, ColumnshardActor);
+
+ const NOlap::NResourceBroker::NSubscribe::TTaskContext& InsertTaskSubscription;
+ std::shared_ptr<IStoragesManager> StoragesManager;
+
+ public:
+ TNormalizationContext(const NOlap::NResourceBroker::NSubscribe::TTaskContext& its)
+ : InsertTaskSubscription(its)
+ {}
+
+ IStoragesManager& GetStoragesManager() {
+ AFL_VERIFY(!!StoragesManager);
+ return *StoragesManager;
+ }
+
+ const NOlap::NResourceBroker::NSubscribe::TTaskContext& GetInsertTaskSubscription() const {
+ return InsertTaskSubscription;
+ }
+ };
+
+ enum class ENormalizerResult {
+ Ok,
+ Wait,
+ Failed,
+ Skip
+ };
+
+ class INormalizerComponent {
+ public:
+ using TPtr = std::shared_ptr<INormalizerComponent>;
+
+ virtual ~INormalizerComponent() {}
+
+ virtual bool NormalizationRequired() const {
+ return false;
+ }
+
+ virtual const TString& GetName() const = 0;
+
+ virtual ENormalizerResult NormalizeData(TNormalizationContext& nCtx, NTabletFlatExecutor::TTransactionContext& txc) = 0;
+ };
+
+}
diff --git a/ydb/core/tx/columnshard/normalizer/abstract/ya.make b/ydb/core/tx/columnshard/normalizer/abstract/ya.make
new file mode 100644
index 0000000000..48572e5fe0
--- /dev/null
+++ b/ydb/core/tx/columnshard/normalizer/abstract/ya.make
@@ -0,0 +1,13 @@
+LIBRARY()
+
+SRCS(
+ abstract.cpp
+)
+
+PEERDIR(
+ ydb/core/tablet_flat
+ ydb/core/tx/columnshard/blobs_action/abstract
+ ydb/core/tx/columnshard/resource_subscriber
+)
+
+END()
diff --git a/ydb/core/tx/columnshard/normalizer/granule/CMakeLists.darwin-x86_64.txt b/ydb/core/tx/columnshard/normalizer/granule/CMakeLists.darwin-x86_64.txt
new file mode 100644
index 0000000000..0b6a8a69b4
--- /dev/null
+++ b/ydb/core/tx/columnshard/normalizer/granule/CMakeLists.darwin-x86_64.txt
@@ -0,0 +1,18 @@
+
+# This file was generated by the build system used internally in the Yandex monorepo.
+# Only simple modifications are allowed (adding source-files to targets, adding simple properties
+# like target_include_directories). These modifications will be ported to original
+# ya.make files by maintainers. Any complex modifications which can't be ported back to the
+# original buildsystem will not be accepted.
+
+
+
+add_library(columnshard-normalizer-granule)
+target_link_libraries(columnshard-normalizer-granule PUBLIC
+ contrib-libs-cxxsupp
+ yutil
+ columnshard-normalizer-abstract
+)
+target_sources(columnshard-normalizer-granule PRIVATE
+ ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/normalizer/granule/normalizer.cpp
+)
diff --git a/ydb/core/tx/columnshard/normalizer/granule/CMakeLists.linux-aarch64.txt b/ydb/core/tx/columnshard/normalizer/granule/CMakeLists.linux-aarch64.txt
new file mode 100644
index 0000000000..a2d66c12ed
--- /dev/null
+++ b/ydb/core/tx/columnshard/normalizer/granule/CMakeLists.linux-aarch64.txt
@@ -0,0 +1,19 @@
+
+# This file was generated by the build system used internally in the Yandex monorepo.
+# Only simple modifications are allowed (adding source-files to targets, adding simple properties
+# like target_include_directories). These modifications will be ported to original
+# ya.make files by maintainers. Any complex modifications which can't be ported back to the
+# original buildsystem will not be accepted.
+
+
+
+add_library(columnshard-normalizer-granule)
+target_link_libraries(columnshard-normalizer-granule PUBLIC
+ contrib-libs-linux-headers
+ contrib-libs-cxxsupp
+ yutil
+ columnshard-normalizer-abstract
+)
+target_sources(columnshard-normalizer-granule PRIVATE
+ ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/normalizer/granule/normalizer.cpp
+)
diff --git a/ydb/core/tx/columnshard/normalizer/granule/CMakeLists.linux-x86_64.txt b/ydb/core/tx/columnshard/normalizer/granule/CMakeLists.linux-x86_64.txt
new file mode 100644
index 0000000000..a2d66c12ed
--- /dev/null
+++ b/ydb/core/tx/columnshard/normalizer/granule/CMakeLists.linux-x86_64.txt
@@ -0,0 +1,19 @@
+
+# This file was generated by the build system used internally in the Yandex monorepo.
+# Only simple modifications are allowed (adding source-files to targets, adding simple properties
+# like target_include_directories). These modifications will be ported to original
+# ya.make files by maintainers. Any complex modifications which can't be ported back to the
+# original buildsystem will not be accepted.
+
+
+
+add_library(columnshard-normalizer-granule)
+target_link_libraries(columnshard-normalizer-granule PUBLIC
+ contrib-libs-linux-headers
+ contrib-libs-cxxsupp
+ yutil
+ columnshard-normalizer-abstract
+)
+target_sources(columnshard-normalizer-granule PRIVATE
+ ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/normalizer/granule/normalizer.cpp
+)
diff --git a/ydb/core/tx/columnshard/normalizer/granule/CMakeLists.txt b/ydb/core/tx/columnshard/normalizer/granule/CMakeLists.txt
new file mode 100644
index 0000000000..f8b31df0c1
--- /dev/null
+++ b/ydb/core/tx/columnshard/normalizer/granule/CMakeLists.txt
@@ -0,0 +1,17 @@
+
+# This file was generated by the build system used internally in the Yandex monorepo.
+# Only simple modifications are allowed (adding source-files to targets, adding simple properties
+# like target_include_directories). These modifications will be ported to original
+# ya.make files by maintainers. Any complex modifications which can't be ported back to the
+# original buildsystem will not be accepted.
+
+
+if (CMAKE_SYSTEM_NAME STREQUAL "Linux" AND CMAKE_SYSTEM_PROCESSOR STREQUAL "aarch64" AND NOT HAVE_CUDA)
+ include(CMakeLists.linux-aarch64.txt)
+elseif (CMAKE_SYSTEM_NAME STREQUAL "Darwin" AND CMAKE_SYSTEM_PROCESSOR STREQUAL "x86_64")
+ include(CMakeLists.darwin-x86_64.txt)
+elseif (WIN32 AND CMAKE_SYSTEM_PROCESSOR STREQUAL "AMD64" AND NOT HAVE_CUDA)
+ include(CMakeLists.windows-x86_64.txt)
+elseif (CMAKE_SYSTEM_NAME STREQUAL "Linux" AND CMAKE_SYSTEM_PROCESSOR STREQUAL "x86_64" AND NOT HAVE_CUDA)
+ include(CMakeLists.linux-x86_64.txt)
+endif()
diff --git a/ydb/core/tx/columnshard/normalizer/granule/CMakeLists.windows-x86_64.txt b/ydb/core/tx/columnshard/normalizer/granule/CMakeLists.windows-x86_64.txt
new file mode 100644
index 0000000000..0b6a8a69b4
--- /dev/null
+++ b/ydb/core/tx/columnshard/normalizer/granule/CMakeLists.windows-x86_64.txt
@@ -0,0 +1,18 @@
+
+# This file was generated by the build system used internally in the Yandex monorepo.
+# Only simple modifications are allowed (adding source-files to targets, adding simple properties
+# like target_include_directories). These modifications will be ported to original
+# ya.make files by maintainers. Any complex modifications which can't be ported back to the
+# original buildsystem will not be accepted.
+
+
+
+add_library(columnshard-normalizer-granule)
+target_link_libraries(columnshard-normalizer-granule PUBLIC
+ contrib-libs-cxxsupp
+ yutil
+ columnshard-normalizer-abstract
+)
+target_sources(columnshard-normalizer-granule PRIVATE
+ ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/normalizer/granule/normalizer.cpp
+)
diff --git a/ydb/core/tx/columnshard/normalizer/granule/normalizer.cpp b/ydb/core/tx/columnshard/normalizer/granule/normalizer.cpp
new file mode 100644
index 0000000000..3832299efc
--- /dev/null
+++ b/ydb/core/tx/columnshard/normalizer/granule/normalizer.cpp
@@ -0,0 +1 @@
+#include "normalizer.h"
diff --git a/ydb/core/tx/columnshard/normalizer/granule/normalizer.h b/ydb/core/tx/columnshard/normalizer/granule/normalizer.h
new file mode 100644
index 0000000000..7fa02c8ccb
--- /dev/null
+++ b/ydb/core/tx/columnshard/normalizer/granule/normalizer.h
@@ -0,0 +1,114 @@
+#pragma once
+
+#include <ydb/core/tx/columnshard/normalizer/abstract/abstract.h>
+#include <ydb/core/tx/columnshard/columnshard_schema.h>
+
+
+namespace NKikimr::NOlap {
+
+class TGranulesNormalizer: public NOlap::INormalizerComponent {
+ struct TPortionKey {
+ ui64 Index = 0;
+ ui64 GranuleId = 0;
+ ui64 PlanStep = 0;
+ ui64 TxId = 0;
+ };
+
+ struct TUniqueId {
+ ui64 PortionId = 0;
+ ui32 Chunk = 0;
+ ui64 ColumnIdx = 0;
+
+ bool operator<(const TUniqueId& other) const {
+ return std::make_tuple(PortionId, Chunk, ColumnIdx) < std::make_tuple(other.PortionId, other.Chunk, other.ColumnIdx);
+ }
+ };
+
+public:
+ virtual bool NormalizationRequired() const override {
+ return true;
+ }
+
+ virtual const TString& GetName() const override {
+ const static TString name = "TGranulesNormalizer";
+ return name;
+ }
+
+ virtual ENormalizerResult NormalizeData(TNormalizationContext& nCtx, NTabletFlatExecutor::TTransactionContext& txc) override {
+ Y_UNUSED(nCtx);
+
+ using namespace NColumnShard;
+ NIceDb::TNiceDb db(txc.DB);
+
+ bool ready = true;
+ ready = ready & Schema::Precharge<Schema::IndexColumns>(db, txc.DB.GetScheme());
+ ready = ready & Schema::Precharge<Schema::IndexGranules>(db, txc.DB.GetScheme());
+ if (!ready) {
+ return ENormalizerResult::Failed;
+ }
+
+ TMap<TUniqueId, TPortionKey> portion2Key;
+ {
+ auto rowset = db.Table<Schema::IndexColumns>().Select();
+ if (!rowset.IsReady()) {
+ return ENormalizerResult::Failed;
+ }
+
+ while (!rowset.EndOfSet()) {
+ if (!rowset.HaveValue<Schema::IndexColumns::PathId>()) {
+ TUniqueId id;
+ TPortionKey key;
+ key.PlanStep = rowset.GetValue<Schema::IndexColumns::PlanStep>();
+ key.TxId = rowset.GetValue<Schema::IndexColumns::TxId>();
+ id.PortionId = rowset.GetValue<Schema::IndexColumns::Portion>();
+ key.GranuleId = rowset.GetValue<Schema::IndexColumns::Granule>();
+ id.Chunk = rowset.GetValue<Schema::IndexColumns::Chunk>();
+ key.Index = rowset.GetValue<Schema::IndexColumns::Index>();
+ id.ColumnIdx = rowset.GetValue<Schema::IndexColumns::ColumnIdx>();
+
+ portion2Key[id] = key;
+ }
+
+ if (!rowset.Next()) {
+ return ENormalizerResult::Failed;
+ }
+ }
+ }
+ ACFL_INFO("normalizer", "TGranulesNormalizer")("message", TStringBuilder() << portion2Key.size() << " portions found");
+
+ if (portion2Key.empty()) {
+ return ENormalizerResult::Skip;
+ }
+
+ THashMap<ui64, ui64> granule2Path;
+ {
+ auto rowset = db.Table<Schema::IndexGranules>().Select();
+ if (!rowset.IsReady()) {
+ return ENormalizerResult::Failed;
+ }
+
+ while (!rowset.EndOfSet()) {
+ ui64 pathId = rowset.GetValue<Schema::IndexGranules::PathId>();
+ ui64 granuleId = rowset.GetValue<Schema::IndexGranules::Granule>();
+ Y_ABORT_UNLESS(granuleId != 0);
+ granule2Path[granuleId] = pathId;
+ if (!rowset.Next()) {
+ return ENormalizerResult::Failed;
+ }
+ }
+ }
+
+ for (auto&& [ portionId, key ] : portion2Key) {
+ auto granuleIt = granule2Path.find(key.GranuleId);
+ Y_ABORT_UNLESS(granuleIt != granule2Path.end());
+
+ db.Table<Schema::IndexColumns>().Key(key.Index, key.GranuleId, portionId.ColumnIdx,
+ key.PlanStep, key.TxId, portionId.PortionId, portionId.Chunk).Update(
+ NIceDb::TUpdate<Schema::IndexColumns::PathId>(granuleIt->second)
+ );
+ }
+ return ENormalizerResult::Ok;
+ }
+};
+
+}
diff --git a/ydb/core/tx/columnshard/normalizer/granule/ya.make b/ydb/core/tx/columnshard/normalizer/granule/ya.make
new file mode 100644
index 0000000000..8a8c7e5ab0
--- /dev/null
+++ b/ydb/core/tx/columnshard/normalizer/granule/ya.make
@@ -0,0 +1,11 @@
+LIBRARY()
+
+SRCS(
+ normalizer.cpp
+)
+
+PEERDIR(
+ ydb/core/tx/columnshard/normalizer/abstract
+)
+
+END()
diff --git a/ydb/core/tx/columnshard/ut_rw/CMakeLists.darwin-x86_64.txt b/ydb/core/tx/columnshard/ut_rw/CMakeLists.darwin-x86_64.txt
index e51dec1b76..aec0c14a8a 100644
--- a/ydb/core/tx/columnshard/ut_rw/CMakeLists.darwin-x86_64.txt
+++ b/ydb/core/tx/columnshard/ut_rw/CMakeLists.darwin-x86_64.txt
@@ -40,6 +40,7 @@ target_link_options(ydb-core-tx-columnshard-ut_rw PRIVATE
target_sources(ydb-core-tx-columnshard-ut_rw PRIVATE
${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/columnshard_ut_common.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/ut_rw/ut_columnshard_read_write.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/ut_rw/ut_normalizer.cpp
)
set_property(
TARGET
diff --git a/ydb/core/tx/columnshard/ut_rw/CMakeLists.linux-aarch64.txt b/ydb/core/tx/columnshard/ut_rw/CMakeLists.linux-aarch64.txt
index 2b43f87f55..8524515b05 100644
--- a/ydb/core/tx/columnshard/ut_rw/CMakeLists.linux-aarch64.txt
+++ b/ydb/core/tx/columnshard/ut_rw/CMakeLists.linux-aarch64.txt
@@ -43,6 +43,7 @@ target_link_options(ydb-core-tx-columnshard-ut_rw PRIVATE
target_sources(ydb-core-tx-columnshard-ut_rw PRIVATE
${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/columnshard_ut_common.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/ut_rw/ut_columnshard_read_write.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/ut_rw/ut_normalizer.cpp
)
set_property(
TARGET
diff --git a/ydb/core/tx/columnshard/ut_rw/CMakeLists.linux-x86_64.txt b/ydb/core/tx/columnshard/ut_rw/CMakeLists.linux-x86_64.txt
index 47af387c56..fb7f19246f 100644
--- a/ydb/core/tx/columnshard/ut_rw/CMakeLists.linux-x86_64.txt
+++ b/ydb/core/tx/columnshard/ut_rw/CMakeLists.linux-x86_64.txt
@@ -44,6 +44,7 @@ target_link_options(ydb-core-tx-columnshard-ut_rw PRIVATE
target_sources(ydb-core-tx-columnshard-ut_rw PRIVATE
${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/columnshard_ut_common.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/ut_rw/ut_columnshard_read_write.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/ut_rw/ut_normalizer.cpp
)
set_property(
TARGET
diff --git a/ydb/core/tx/columnshard/ut_rw/CMakeLists.windows-x86_64.txt b/ydb/core/tx/columnshard/ut_rw/CMakeLists.windows-x86_64.txt
index 6118745799..2ef0565f74 100644
--- a/ydb/core/tx/columnshard/ut_rw/CMakeLists.windows-x86_64.txt
+++ b/ydb/core/tx/columnshard/ut_rw/CMakeLists.windows-x86_64.txt
@@ -33,6 +33,7 @@ target_link_libraries(ydb-core-tx-columnshard-ut_rw PUBLIC
target_sources(ydb-core-tx-columnshard-ut_rw PRIVATE
${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/columnshard_ut_common.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/ut_rw/ut_columnshard_read_write.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/ut_rw/ut_normalizer.cpp
)
set_property(
TARGET
diff --git a/ydb/core/tx/columnshard/ut_rw/ut_columnshard_read_write.cpp b/ydb/core/tx/columnshard/ut_rw/ut_columnshard_read_write.cpp
index d952eb745e..1c2b0f39d8 100644
--- a/ydb/core/tx/columnshard/ut_rw/ut_columnshard_read_write.cpp
+++ b/ydb/core/tx/columnshard/ut_rw/ut_columnshard_read_write.cpp
@@ -326,28 +326,6 @@ bool CheckColumns(const TString& blob, const NKikimrTxColumnShard::TMetadata& me
return CheckColumns(batch, colNames, rowsCount);
}
-struct TestTableDescription {
- std::vector<std::pair<TString, TTypeInfo>> Schema = TTestSchema::YdbSchema();
- std::vector<std::pair<TString, TTypeInfo>> Pk = TTestSchema::YdbPkSchema();
- bool InStore = true;
-};
-
-void SetupSchema(TTestBasicRuntime& runtime, TActorId& sender, ui64 pathId,
- const TestTableDescription& table = {}, TString codec = "none") {
- NOlap::TSnapshot snap(10, 10);
- TString txBody;
- auto specials = TTestSchema::TTableSpecials().WithCodec(codec);
- if (table.InStore) {
- txBody = TTestSchema::CreateTableTxBody(pathId, table.Schema, table.Pk, specials);
- } else {
- txBody = TTestSchema::CreateStandaloneTableTxBody(pathId, table.Schema, table.Pk, specials);
- }
- bool ok = ProposeSchemaTx(runtime, sender, txBody, snap);
- UNIT_ASSERT(ok);
-
- PlanSchemaTx(runtime, sender, snap);
-}
-
std::vector<TString> ReadManyResults(TTestBasicRuntime& runtime, TString& schema,
NKikimrTxColumnShard::TMetadata& meta, ui32 expected = 1000) {
std::vector<TString> readData;
@@ -1870,65 +1848,6 @@ void TestReadAggregate(const std::vector<std::pair<TString, TTypeInfo>>& ydbSche
}
Y_UNIT_TEST_SUITE(EvWrite) {
- class TArrowData : public NKikimr::NEvents::IDataConstructor {
- std::vector<std::pair<TString, TTypeInfo>> YdbSchema;
- ui64 Index;
-
- public:
- TArrowData(const std::vector<std::pair<TString, TTypeInfo>>& ydbSchema, const ui64 idx)
- : YdbSchema(ydbSchema)
- , Index(idx)
- {
- }
-
- void Serialize(NKikimrDataEvents::TOperationData& proto) const override {
- for (ui32 i = 0; i < YdbSchema.size(); ++i) {
- proto.AddColumnIds(i + 1);
- }
- proto.MutableArrowData()->SetPayloadIndex(Index);
- }
-
- ui64 GetSchemaVersion() const override {
- return 1;
- }
- };
-
- void PrepareTablet(TTestBasicRuntime& runtime, const ui64 tableId, const std::vector<std::pair<TString, TTypeInfo>>& schema) {
- CreateTestBootstrapper(runtime, CreateTestTabletInfo(TTestTxConfig::TxTablet0, TTabletTypes::ColumnShard), &CreateColumnShard);
-
- TDispatchOptions options;
- options.FinalEvents.push_back(TDispatchOptions::TFinalEventCondition(TEvTablet::EvBoot));
- runtime.DispatchEvents(options);
-
- TestTableDescription tableDescription;
- tableDescription.Schema = schema;
- tableDescription.Pk = { schema[0] };
- TActorId sender = runtime.AllocateEdgeActor();
- SetupSchema(runtime, sender, tableId, tableDescription);
- }
-
- std::shared_ptr<arrow::RecordBatch> ReadAllAsBatch(TTestBasicRuntime& runtime, const ui64 tableId, const NOlap::TSnapshot& snapshot, const std::vector<std::pair<TString, TTypeInfo>>& schema) {
- TActorId sender = runtime.AllocateEdgeActor();
-
- ForwardToTablet(runtime, TTestTxConfig::TxTablet0, sender,
- new TEvColumnShard::TEvRead(sender, TTestTxConfig::TxTablet1, snapshot.GetPlanStep(), snapshot.GetTxId(), tableId));
-
- std::vector<std::shared_ptr<arrow::RecordBatch>> batches;
- while(true) {
- TAutoPtr<IEventHandle> handle;
- auto event = runtime.GrabEdgeEvent<TEvColumnShard::TEvReadResult>(handle);
- UNIT_ASSERT(event);
- auto b = event->GetArrowBatch();
- if (b) {
- batches.push_back(b);
- }
- if (!event->HasMore()) {
- break;
- }
- }
- auto res = NArrow::CombineBatches(batches);
- return res ? res : NArrow::MakeEmptyBatch(NArrow::MakeArrowSchema(schema));
- }
Y_UNIT_TEST(WriteInTransaction) {
using namespace NArrow;
@@ -1953,7 +1872,7 @@ Y_UNIT_TEST_SUITE(EvWrite) {
UNIT_ASSERT(blobData.size() < TLimits::GetMaxBlobSize());
auto evWrite = std::make_unique<NKikimr::NEvents::TDataEvents::TEvWrite>(txId);
- auto dataPtr = std::make_shared<TArrowData>(schema, TPayloadHelper<NKikimr::NEvents::TDataEvents::TEvWrite>(*evWrite).AddDataToPayload(std::move(blobData)));
+ auto dataPtr = std::make_shared<TArrowDataConstructor>(schema, TPayloadHelper<NKikimr::NEvents::TDataEvents::TEvWrite>(*evWrite).AddDataToPayload(std::move(blobData)));
evWrite->AddReplaceOp(tableId, dataPtr);
TActorId sender = runtime.AllocateEdgeActor();
@@ -1975,7 +1894,7 @@ Y_UNIT_TEST_SUITE(EvWrite) {
UNIT_ASSERT_VALUES_EQUAL(readResult->num_rows(), 2048);
}
- Y_UNIT_TEST(AbotrInTransaction) {
+ Y_UNIT_TEST(AbortInTransaction) {
using namespace NArrow;
TTestBasicRuntime runtime;
@@ -1998,7 +1917,7 @@ Y_UNIT_TEST_SUITE(EvWrite) {
UNIT_ASSERT(blobData.size() < TLimits::GetMaxBlobSize());
auto evWrite = std::make_unique<NKikimr::NEvents::TDataEvents::TEvWrite>(txId);
- auto dataPtr = std::make_shared<TArrowData>(schema, TPayloadHelper<NKikimr::NEvents::TDataEvents::TEvWrite>(*evWrite).AddDataToPayload(std::move(blobData)));
+ auto dataPtr = std::make_shared<TArrowDataConstructor>(schema, TPayloadHelper<NKikimr::NEvents::TDataEvents::TEvWrite>(*evWrite).AddDataToPayload(std::move(blobData)));
evWrite->AddReplaceOp(tableId, dataPtr);
TActorId sender = runtime.AllocateEdgeActor();
@@ -2042,7 +1961,7 @@ Y_UNIT_TEST_SUITE(EvWrite) {
UNIT_ASSERT(blobData.size() > TLimits::GetMaxBlobSize());
auto evWrite = std::make_unique<NKikimr::NEvents::TDataEvents::TEvWrite>(txId);
- auto dataPtr = std::make_shared<TArrowData>(schema, TPayloadHelper<NKikimr::NEvents::TDataEvents::TEvWrite>(*evWrite).AddDataToPayload(std::move(blobData)));
+ auto dataPtr = std::make_shared<TArrowDataConstructor>(schema, TPayloadHelper<NKikimr::NEvents::TDataEvents::TEvWrite>(*evWrite).AddDataToPayload(std::move(blobData)));
evWrite->AddReplaceOp(tableId, dataPtr);
TActorId sender = runtime.AllocateEdgeActor();
diff --git a/ydb/core/tx/columnshard/ut_rw/ut_normalizer.cpp b/ydb/core/tx/columnshard/ut_rw/ut_normalizer.cpp
new file mode 100644
index 0000000000..45bf17c477
--- /dev/null
+++ b/ydb/core/tx/columnshard/ut_rw/ut_normalizer.cpp
@@ -0,0 +1,165 @@
+#include "columnshard_ut_common.h"
+
+#include <ydb/core/tx/columnshard/hooks/abstract/abstract.h>
+#include <ydb/core/tx/columnshard/hooks/testing/controller.h>
+
+#include <ydb/core/tx/columnshard/operations/write_data.h>
+
+#include <ydb/core/formats/arrow/simple_builder/filler.h>
+#include <ydb/core/formats/arrow/simple_builder/array.h>
+#include <ydb/core/formats/arrow/simple_builder/batch.h>
+
+
+namespace NKikimr {
+
+using namespace NColumnShard;
+using namespace Tests;
+using namespace NTxUT;
+
+
+class TPrepareLocalDBController: public NKikimr::NYDBTest::NColumnShard::TController {
+private:
+ using TBase = NKikimr::NYDBTest::ICSController;
+
+ struct TPortionRecord {
+ ui64 Index = 0;
+ ui64 Granule = 0;
+ ui64 ColumnIdx = 0;
+ ui64 PlanStep = 0;
+ ui64 TxId = 0;
+ ui64 Portion = 0;
+ ui32 Chunk = 0;
+ ui64 XPlanStep = 0;
+ ui64 XTxId = 0;
+ TString Blob;
+ TString Metadata;
+ ui32 Offset = 0;
+ ui32 Size = 0;
+ };
+
+protected:
+
+ class TPathIdCleaner : public NYDBTest::ILocalDBModifier {
+ public:
+ virtual void Apply(NTabletFlatExecutor::TTransactionContext& txc) const override {
+ using namespace NColumnShard;
+ NIceDb::TNiceDb db(txc.DB);
+
+ THashMap<ui64, TPortionRecord> portion2Key;
+ std::optional<ui64> pathId;
+ {
+ auto rowset = db.Table<Schema::IndexColumns>().Select();
+ UNIT_ASSERT(rowset.IsReady());
+
+ while (!rowset.EndOfSet()) {
+ TPortionRecord key;
+ key.Index = rowset.GetValue<Schema::IndexColumns::Index>();
+ key.Granule = rowset.GetValue<Schema::IndexColumns::Granule>();
+ key.ColumnIdx = rowset.GetValue<Schema::IndexColumns::ColumnIdx>();
+ key.PlanStep = rowset.GetValue<Schema::IndexColumns::PlanStep>();
+ key.TxId = rowset.GetValue<Schema::IndexColumns::TxId>();
+ key.Portion = rowset.GetValue<Schema::IndexColumns::Portion>();
+ key.Chunk = rowset.GetValue<Schema::IndexColumns::Chunk>();
+
+ key.XPlanStep = rowset.GetValue<Schema::IndexColumns::XPlanStep>();
+ key.XTxId = rowset.GetValue<Schema::IndexColumns::XTxId>();
+ key.Blob = rowset.GetValue<Schema::IndexColumns::Blob>();
+ key.Metadata = rowset.GetValue<Schema::IndexColumns::Metadata>();
+ key.Offset = rowset.GetValue<Schema::IndexColumns::Offset>();
+ key.Size = rowset.GetValue<Schema::IndexColumns::Size>();
+
+ pathId = rowset.GetValue<Schema::IndexColumns::PathId>();
+
+ portion2Key[key.Portion] = key;
+
+ UNIT_ASSERT(rowset.Next());
+ }
+ }
+
+ UNIT_ASSERT(pathId.has_value());
+
+ for (auto&& [ portionId, key ] : portion2Key) {
+ db.Table<Schema::IndexColumns>().Key(key.Index, key.Granule, key.ColumnIdx,
+ key.PlanStep, key.TxId, key.Portion, key.Chunk).Delete();
+
+ db.Table<Schema::IndexColumns>().Key(key.Index, 1, key.ColumnIdx,
+ key.PlanStep, key.TxId, key.Portion, key.Chunk).Update(
+ NIceDb::TUpdate<Schema::IndexColumns::XPlanStep>(key.XPlanStep),
+ NIceDb::TUpdate<Schema::IndexColumns::XTxId>(key.XTxId),
+ NIceDb::TUpdate<Schema::IndexColumns::Blob>(key.Blob),
+ NIceDb::TUpdate<Schema::IndexColumns::Metadata>(key.Metadata),
+ NIceDb::TUpdate<Schema::IndexColumns::Offset>(key.Offset),
+ NIceDb::TUpdate<Schema::IndexColumns::Size>(key.Size),
+
+ NIceDb::TNull<Schema::IndexColumns::PathId>()
+ );
+ }
+
+ db.Table<Schema::IndexGranules>().Key(0, *pathId, "1").Update(
+ NIceDb::TUpdate<Schema::IndexGranules::Granule>(1),
+ NIceDb::TUpdate<Schema::IndexGranules::PlanStep>(1),
+ NIceDb::TUpdate<Schema::IndexGranules::TxId>(1),
+ NIceDb::TUpdate<Schema::IndexGranules::Metadata>("")
+ );
+ }
+ };
+
+public:
+ NYDBTest::ILocalDBModifier::TPtr BuildLocalBaseModifier() const override {
+ return std::make_shared<TPathIdCleaner>();
+ }
+};
+
+Y_UNIT_TEST_SUITE(Normalizers) {
+
+ Y_UNIT_TEST(PathIdNormalizer) {
+ using namespace NArrow;
+ auto csControllerGuard = NYDBTest::TControllers::RegisterCSControllerGuard<TPrepareLocalDBController>();
+
+ TTestBasicRuntime runtime;
+ TTester::Setup(runtime);
+
+ const ui64 tableId = 1;
+ const std::vector<std::pair<TString, TTypeInfo>> schema = {
+ {"key", TTypeInfo(NTypeIds::Uint64) },
+ {"field", TTypeInfo(NTypeIds::Utf8) }
+ };
+ PrepareTablet(runtime, tableId, schema);
+ const ui64 txId = 111;
+
+ NConstruction::IArrayBuilder::TPtr keyColumn = std::make_shared<NConstruction::TSimpleArrayConstructor<NConstruction::TIntSeqFiller<arrow::UInt64Type>>>("key");
+ NConstruction::IArrayBuilder::TPtr column = std::make_shared<NConstruction::TSimpleArrayConstructor<NConstruction::TStringPoolFiller>>(
+ "field", NConstruction::TStringPoolFiller(8, 100));
+
+ auto batch = NConstruction::TRecordBatchConstructor({ keyColumn, column }).BuildBatch(2048);
+ TString blobData = NArrow::SerializeBatchNoCompression(batch);
+
+ auto evWrite = std::make_unique<NKikimr::NEvents::TDataEvents::TEvWrite>(txId);
+ auto dataPtr = std::make_shared<TArrowDataConstructor>(schema, TPayloadHelper<NKikimr::NEvents::TDataEvents::TEvWrite>(*evWrite).AddDataToPayload(std::move(blobData)));
+ evWrite->AddReplaceOp(tableId, dataPtr);
+
+ TActorId sender = runtime.AllocateEdgeActor();
+ ForwardToTablet(runtime, TTestTxConfig::TxTablet0, sender, evWrite.release());
+ {
+ TAutoPtr<NActors::IEventHandle> handle;
+ auto event = runtime.GrabEdgeEvent<NKikimr::NEvents::TDataEvents::TEvWriteResult>(handle);
+ UNIT_ASSERT(event);
+ UNIT_ASSERT_VALUES_EQUAL((ui64)event->Record.GetStatus(), (ui64)NKikimrDataEvents::TEvWriteResult::PREPARED);
+
+ PlanWriteTx(runtime, sender, NOlap::TSnapshot(11, txId));
+ }
+
+ {
+ auto readResult = ReadAllAsBatch(runtime, tableId, NOlap::TSnapshot(11, txId), schema);
+ UNIT_ASSERT_VALUES_EQUAL(readResult->num_rows(), 2048);
+ }
+ RebootTablet(runtime, TTestTxConfig::TxTablet0, sender);
+
+ {
+ auto readResult = ReadAllAsBatch(runtime, tableId, NOlap::TSnapshot(11, txId), schema);
+ UNIT_ASSERT_VALUES_EQUAL(readResult->num_rows(), 2048);
+ }
+ }
+}
+
+} // namespace NKikimr
diff --git a/ydb/core/tx/columnshard/ut_rw/ya.make b/ydb/core/tx/columnshard/ut_rw/ya.make
index 2819d6f667..63c9dbed1d 100644
--- a/ydb/core/tx/columnshard/ut_rw/ya.make
+++ b/ydb/core/tx/columnshard/ut_rw/ya.make
@@ -31,6 +31,7 @@ YQL_LAST_ABI_VERSION()
SRCS(
columnshard_ut_common.cpp
ut_columnshard_read_write.cpp
+ ut_normalizer.cpp
)
END()
diff --git a/ydb/core/tx/columnshard/ya.make b/ydb/core/tx/columnshard/ya.make
index 87a239982e..66b50a38e1 100644
--- a/ydb/core/tx/columnshard/ya.make
+++ b/ydb/core/tx/columnshard/ya.make
@@ -53,6 +53,7 @@ PEERDIR(
ydb/core/tx/columnshard/blobs_reader
ydb/core/tx/columnshard/blobs_action
ydb/core/tx/columnshard/resource_subscriber
+ ydb/core/tx/columnshard/normalizer/granule
ydb/core/tx/tiering
ydb/core/tx/conveyor/usage
ydb/core/tx/long_tx_service/public