aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authornsofya <nsofya@ydb.tech>2024-01-29 11:12:57 +0300
committerGitHub <noreply@github.com>2024-01-29 11:12:57 +0300
commitdb4298118bb377fe654bca629af1f5bb019f4d37 (patch)
treed089909d8ec05813b0de05c68e9918fc4ed4e5a8
parenteecc71dc9207f1843120577c55a6197041245d31 (diff)
downloadydb-db4298118bb377fe654bca629af1f5bb019f4d37.tar.gz
Add TxOperators (#1365)
* Create special classes for distribute commit logics * Correct diff * Add missed event
-rw-r--r--ydb/core/tx/columnshard/columnshard.cpp2
-rw-r--r--ydb/core/tx/columnshard/columnshard__init.cpp42
-rw-r--r--ydb/core/tx/columnshard/columnshard__notify_tx_completion.cpp7
-rw-r--r--ydb/core/tx/columnshard/columnshard__progress_tx.cpp121
-rw-r--r--ydb/core/tx/columnshard/columnshard__propose_transaction.cpp334
-rw-r--r--ydb/core/tx/columnshard/columnshard_impl.cpp122
-rw-r--r--ydb/core/tx/columnshard/columnshard_impl.h25
-rw-r--r--ydb/core/tx/columnshard/transactions/operators/ev_write.cpp3
-rw-r--r--ydb/core/tx/columnshard/transactions/operators/ev_write.h38
-rw-r--r--ydb/core/tx/columnshard/transactions/operators/long_tx_write.cpp3
-rw-r--r--ydb/core/tx/columnshard/transactions/operators/long_tx_write.h103
-rw-r--r--ydb/core/tx/columnshard/transactions/operators/schema.cpp3
-rw-r--r--ydb/core/tx/columnshard/transactions/operators/schema.h159
-rw-r--r--ydb/core/tx/columnshard/transactions/operators/ya.make13
-rw-r--r--ydb/core/tx/columnshard/transactions/tx_controller.cpp (renamed from ydb/core/tx/columnshard/tx_controller.cpp)59
-rw-r--r--ydb/core/tx/columnshard/transactions/tx_controller.h (renamed from ydb/core/tx/columnshard/tx_controller.h)57
-rw-r--r--ydb/core/tx/columnshard/transactions/ya.make20
-rw-r--r--ydb/core/tx/columnshard/ya.make3
18 files changed, 575 insertions, 539 deletions
diff --git a/ydb/core/tx/columnshard/columnshard.cpp b/ydb/core/tx/columnshard/columnshard.cpp
index ac3f521b8b..6923c4f357 100644
--- a/ydb/core/tx/columnshard/columnshard.cpp
+++ b/ydb/core/tx/columnshard/columnshard.cpp
@@ -255,8 +255,6 @@ ui64 TColumnShard::MemoryUsage() const {
ui64 memory =
ProgressTxController->GetMemoryUsage() +
ScanTxInFlight.size() * (sizeof(ui64) + sizeof(TInstant)) +
- AltersInFlight.size() * sizeof(TAlterMeta) +
- CommitsInFlight.size() * sizeof(TCommitMeta) +
LongTxWrites.size() * (sizeof(TWriteId) + sizeof(TLongTxWriteInfo)) +
LongTxWritesByUniqueId.size() * (sizeof(TULID) + sizeof(void*)) +
(WaitingScans.size()) * (sizeof(NOlap::TSnapshot) + sizeof(void*)) +
diff --git a/ydb/core/tx/columnshard/columnshard__init.cpp b/ydb/core/tx/columnshard/columnshard__init.cpp
index 8173719281..3e11ed9e92 100644
--- a/ydb/core/tx/columnshard/columnshard__init.cpp
+++ b/ydb/core/tx/columnshard/columnshard__init.cpp
@@ -38,8 +38,6 @@ void TTxInit::SetDefaults() {
Self->LastPlannedTxId = 0;
Self->OwnerPathId = 0;
Self->OwnerPath.clear();
- Self->AltersInFlight.clear();
- Self->CommitsInFlight.clear();
Self->LongTxWrites.clear();
Self->LongTxWritesByUniqueId.clear();
}
@@ -180,18 +178,6 @@ bool TTxInit::ReadEverything(TTransactionContext& txc, const TActorContext& ctx)
}
}
- {
- TMemoryProfileGuard g("TTxInit/CommitsInFlight");
- for (const auto& pr : Self->CommitsInFlight) {
- ui64 txId = pr.first;
- for (TWriteId writeId : pr.second.WriteIds) {
- Y_ABORT_UNLESS(Self->LongTxWrites.contains(writeId),
- "TTxInit at %" PRIu64 " : Commit %" PRIu64 " references local write %" PRIu64 " that doesn't exist",
- Self->TabletID(), txId, writeId);
- Self->AddLongTxWrite(writeId, txId);
- }
- }
- }
Self->UpdateInsertTableCounters();
Self->UpdateIndexCounters();
Self->UpdateResourceMetrics(ctx, {});
@@ -219,6 +205,7 @@ bool TTxInit::Execute(TTransactionContext& txc, const TActorContext& ctx) {
}
void TTxInit::Complete(const TActorContext& ctx) {
+ Self->ProgressTxController->OnTabletInit();
Self->SwitchToWork(ctx);
}
@@ -377,31 +364,4 @@ void TColumnShard::Handle(TEvPrivate::TEvNormalizerResult::TPtr& ev, const TActo
Execute(new TTxApplyNormalizer(this, ev->Get()->GetChanges()), ctx);
}
-bool TColumnShard::LoadTx(const ui64 txId, const NKikimrTxColumnShard::ETransactionKind& txKind, const TString& txBody) {
- switch (txKind) {
- case NKikimrTxColumnShard::TX_KIND_SCHEMA: {
- TColumnShard::TAlterMeta meta;
- Y_ABORT_UNLESS(meta.Body.ParseFromString(txBody));
- AltersInFlight.emplace(txId, std::move(meta));
- break;
- }
- case NKikimrTxColumnShard::TX_KIND_COMMIT: {
- NKikimrTxColumnShard::TCommitTxBody body;
- Y_ABORT_UNLESS(body.ParseFromString(txBody));
-
- TColumnShard::TCommitMeta meta;
- for (auto& id : body.GetWriteIds()) {
- meta.AddWriteId(TWriteId{id});
- }
-
- CommitsInFlight.emplace(txId, std::move(meta));
- break;
- }
- default: {
- Y_ABORT("Unsupported TxKind stored in the TxInfo table");
- }
- }
- return true;
-}
-
}
diff --git a/ydb/core/tx/columnshard/columnshard__notify_tx_completion.cpp b/ydb/core/tx/columnshard/columnshard__notify_tx_completion.cpp
index 31672ce1bc..cc498125c3 100644
--- a/ydb/core/tx/columnshard/columnshard__notify_tx_completion.cpp
+++ b/ydb/core/tx/columnshard/columnshard__notify_tx_completion.cpp
@@ -14,12 +14,11 @@ public:
LOG_S_DEBUG("TTxNotifyTxCompletion.Execute at tablet " << Self->TabletID());
const ui64 txId = Ev->Get()->Record.GetTxId();
-
- if (Self->AltersInFlight.contains(txId)) {
- Self->AltersInFlight[txId].NotifySubscribers.insert(Ev->Sender);
+ auto txOperator = Self->ProgressTxController->GetTxOperator(txId);
+ if (txOperator) {
+ txOperator->RegisterSubscriber(Ev->Sender);
return true;
}
-
Result.reset(new TEvColumnShard::TEvNotifyTxCompletionResult(Self->TabletID(), txId));
return true;
}
diff --git a/ydb/core/tx/columnshard/columnshard__progress_tx.cpp b/ydb/core/tx/columnshard/columnshard__progress_tx.cpp
index d8450d124a..42597d4927 100644
--- a/ydb/core/tx/columnshard/columnshard__progress_tx.cpp
+++ b/ydb/core/tx/columnshard/columnshard__progress_tx.cpp
@@ -7,55 +7,6 @@
namespace NKikimr::NColumnShard {
class TColumnShard::TTxProgressTx : public TTransactionBase<TColumnShard> {
-private:
- struct TEvent {
- TActorId Target;
- ui64 Cookie;
- THolder<IEventBase> Event;
-
- TEvent(TActorId target, ui64 cookie, THolder<IEventBase> event)
- : Target(target)
- , Cookie(cookie)
- , Event(std::move(event))
- { }
- };
-
- struct TResultEvent {
- TTxController::TBasicTxInfo TxInfo;
- NKikimrTxColumnShard::EResultStatus Status;
-
- TResultEvent(TTxController::TBasicTxInfo&& txInfo, NKikimrTxColumnShard::EResultStatus status)
- : TxInfo(std::move(txInfo))
- , Status(status)
- {}
-
- std::unique_ptr<IEventBase> MakeEvent(ui64 tabletId) const {
- if (TxInfo.TxKind == NKikimrTxColumnShard::TX_KIND_COMMIT_WRITE) {
- auto result = NEvents::TDataEvents::TEvWriteResult::BuildCommited(tabletId, TxInfo.TxId);
- return result;
- } else {
- auto result = std::make_unique<TEvColumnShard::TEvProposeTransactionResult>(
- tabletId, TxInfo.TxKind, TxInfo.TxId, Status);
- result->Record.SetStep(TxInfo.PlanStep);
- return result;
- }
- }
- };
-
- enum class ETriggerActivities {
- NONE,
- POST_INSERT,
- POST_SCHEMA
- };
-
- TStringBuilder TxPrefix() const {
- return TStringBuilder() << "TxProgressTx[" << ToString(TabletTxNo) << "] ";
- }
-
- TString TxSuffix() const {
- return TStringBuilder() << " at tablet " << Self->TabletID();
- }
-
public:
TTxProgressTx(TColumnShard* self)
: TTransactionBase(self)
@@ -81,60 +32,8 @@ public:
ui64 step = plannedItem->PlanStep;
ui64 txId = plannedItem->TxId;
- TTxController::TBasicTxInfo txInfo = *plannedItem;
- switch (txInfo.TxKind) {
- case NKikimrTxColumnShard::TX_KIND_SCHEMA:
- {
- auto& meta = Self->AltersInFlight.at(txId);
- Self->RunSchemaTx(meta.Body, NOlap::TSnapshot(step, txId), txc);
- Self->ProtectSchemaSeqNo(meta.Body.GetSeqNo(), txc);
- for (TActorId subscriber : meta.NotifySubscribers) {
- TxEvents.emplace_back(subscriber, 0,
- MakeHolder<TEvColumnShard::TEvNotifyTxCompletionResult>(Self->TabletID(), txId));
- }
- Self->AltersInFlight.erase(txId);
- Trigger = ETriggerActivities::POST_SCHEMA;
- break;
- }
- case NKikimrTxColumnShard::TX_KIND_COMMIT: {
- const auto& meta = Self->CommitsInFlight.at(txId);
-
- TBlobGroupSelector dsGroupSelector(Self->Info());
- NOlap::TDbWrapper dbTable(txc.DB, &dsGroupSelector);
-
- auto pathExists = [&](ui64 pathId) {
- return Self->TablesManager.HasTable(pathId);
- };
-
- auto counters = Self->InsertTable->Commit(dbTable, step, txId, meta.WriteIds,
- pathExists);
- Self->IncCounter(COUNTER_BLOBS_COMMITTED, counters.Rows);
- Self->IncCounter(COUNTER_BYTES_COMMITTED, counters.Bytes);
- Self->IncCounter(COUNTER_RAW_BYTES_COMMITTED, counters.RawBytes);
-
- NIceDb::TNiceDb db(txc.DB);
- for (TWriteId writeId : meta.WriteIds) {
- Self->RemoveLongTxWrite(db, writeId, txId);
- }
- Self->CommitsInFlight.erase(txId);
- Self->UpdateInsertTableCounters();
- Trigger = ETriggerActivities::POST_INSERT;
- break;
- }
- case NKikimrTxColumnShard::TX_KIND_COMMIT_WRITE: {
- NOlap::TSnapshot snapshot(step, txId);
- Y_ABORT_UNLESS(Self->OperationsManager->CommitTransaction(*Self, txId, txc, snapshot));
- Trigger = ETriggerActivities::POST_INSERT;
- break;
- }
- default: {
- Y_ABORT("Unexpected TxKind");
- }
- }
-
- // Currently transactions never fail and there are no dependencies between them
- TxResults.emplace_back(TResultEvent(std::move(txInfo), NKikimrTxColumnShard::SUCCESS));
-
+ TxOperator = Self->ProgressTxController->GetVerifiedTxOperator(txId);
+ AFL_VERIFY(TxOperator->Progress(*Self, NOlap::TSnapshot(step, txId), txc));
Self->ProgressTxController->FinishPlannedTx(txId, txc);
Self->RescheduleWaitingReads();
}
@@ -148,24 +47,14 @@ public:
void Complete(const TActorContext& ctx) override {
NActors::TLogContextGuard logGuard = NActors::TLogContextBuilder::Build(NKikimrServices::TX_COLUMNSHARD)("tablet_id", Self->TabletID())("tx_state", "complete");
-
- for (auto& rec : TxEvents) {
- ctx.Send(rec.Target, rec.Event.Release(), 0, rec.Cookie);
- }
-
- for (auto& res : TxResults) {
- Self->ProgressTxController->CompleteRunningTx(TTxController::TPlanQueueItem(res.TxInfo.PlanStep, res.TxInfo.TxId));
-
- auto event = res.MakeEvent(Self->TabletID());
- ctx.Send(res.TxInfo.Source, event.release(), 0, res.TxInfo.Cookie);
+ if (TxOperator) {
+ TxOperator->Complete(*Self, ctx);
}
Self->SetupIndexation();
}
private:
- std::vector<TResultEvent> TxResults;
- std::vector<TEvent> TxEvents;
- ETriggerActivities Trigger{ETriggerActivities::NONE};
+ TTxController::ITransactionOperatior::TPtr TxOperator;
const ui32 TabletTxNo;
};
diff --git a/ydb/core/tx/columnshard/columnshard__propose_transaction.cpp b/ydb/core/tx/columnshard/columnshard__propose_transaction.cpp
index 945b4f76b5..4f04f60ff8 100644
--- a/ydb/core/tx/columnshard/columnshard__propose_transaction.cpp
+++ b/ydb/core/tx/columnshard/columnshard__propose_transaction.cpp
@@ -32,6 +32,9 @@ private:
TString TxSuffix() const {
return TStringBuilder() << " at tablet " << Self->TabletID();
}
+
+ void ConstructResult(TTxController::TProposeResult& proposeResult, const TTxController::TBasicTxInfo& txInfo);
+ TTxController::TProposeResult ProposeTtlDeprecated(const TString& txBody);
};
@@ -46,257 +49,132 @@ bool TTxProposeTransaction::Execute(TTransactionContext& txc, const TActorContex
auto& record = Proto(Ev->Get());
auto txKind = record.GetTxKind();
- //ui64 ssId = record.GetSchemeShardId();
ui64 txId = record.GetTxId();
auto& txBody = record.GetTxBody();
- auto status = NKikimrTxColumnShard::EResultStatus::ERROR;
- TString statusMessage;
-
- ui64 minStep = 0;
- ui64 maxStep = Max<ui64>();
-
- switch (txKind) {
- case NKikimrTxColumnShard::TX_KIND_SCHEMA: {
- TColumnShard::TAlterMeta meta;
- if (!meta.Body.ParseFromString(txBody)) {
- statusMessage = TStringBuilder()
- << "Schema TxId# " << txId << " cannot be parsed";
- status = NKikimrTxColumnShard::EResultStatus::SCHEMA_ERROR;
- break;
- }
-
- NOlap::ISnapshotSchema::TPtr currentSchema;
- if (Self->TablesManager.HasPrimaryIndex()) {
- currentSchema = Self->TablesManager.GetPrimaryIndexSafe().GetVersionedIndex().GetLastSchema();
- }
-
- // Invalid body generated at a newer SchemeShard
- if (!meta.Validate(currentSchema)) {
- statusMessage = TStringBuilder()
- << "Schema TxId# " << txId << " cannot be proposed";
- status = NKikimrTxColumnShard::EResultStatus::SCHEMA_ERROR;
- break;
- }
-
- Y_ABORT_UNLESS(record.HasSchemeShardId());
- if (Self->CurrentSchemeShardId == 0) {
- Self->CurrentSchemeShardId = record.GetSchemeShardId();
- Schema::SaveSpecialValue(db, Schema::EValueIds::CurrentSchemeShardId, Self->CurrentSchemeShardId);
- } else {
- Y_ABORT_UNLESS(Self->CurrentSchemeShardId == record.GetSchemeShardId());
- }
-
- auto seqNo = SeqNoFromProto(meta.Body.GetSeqNo());
- auto lastSeqNo = Self->LastSchemaSeqNo;
-
- // Check if proposal is outdated
- if (seqNo < lastSeqNo) {
- status = NKikimrTxColumnShard::SCHEMA_CHANGED;
- statusMessage = TStringBuilder()
- << "Ignoring outdated schema tx proposal at tablet "
- << Self->TabletID()
- << " txId " << txId
- << " ssId " << Self->CurrentSchemeShardId
- << " seqNo " << seqNo
- << " lastSeqNo " << lastSeqNo;
- LOG_S_INFO(TxPrefix() << statusMessage << TxSuffix());
- break;
- }
-
- Self->UpdateSchemaSeqNo(seqNo, txc);
-
- // FIXME: current tests don't provide processing params!
- // Y_DEBUG_ABORT_UNLESS(record.HasProcessingParams());
- if (!Self->ProcessingParams && record.HasProcessingParams()) {
- Self->ProcessingParams.emplace().CopyFrom(record.GetProcessingParams());
- Schema::SaveSpecialProtoValue(db, Schema::EValueIds::ProcessingParams, *Self->ProcessingParams);
- }
-
- // Always persist the latest metadata, this may include an updated seqno
- Self->ProgressTxController->RegisterTx(txId, txKind, txBody, Ev->Get()->GetSource(), Ev->Cookie, txc);
-
- if (!Self->AltersInFlight.contains(txId)) {
- Self->AltersInFlight.emplace(txId, std::move(meta));
- } else {
- auto& existing = Self->AltersInFlight.at(txId);
- existing.Body = std::move(meta.Body);
- }
-
- LOG_S_DEBUG(TxPrefix() << "schema txId " << txId << TxSuffix());
-
- status = NKikimrTxColumnShard::EResultStatus::PREPARED;
- break;
+
+ if (txKind == NKikimrTxColumnShard::TX_KIND_TTL) {
+ auto proposeResult = ProposeTtlDeprecated(txBody);
+ Result = std::make_unique<TEvColumnShard::TEvProposeTransactionResult>(Self->TabletID(), txKind, txId, proposeResult.GetStatus(), proposeResult.GetStatusMessage());
+ return true;
+ }
+
+ if (!Self->ProcessingParams && record.HasProcessingParams()) {
+ Self->ProcessingParams.emplace().CopyFrom(record.GetProcessingParams());
+ Schema::SaveSpecialProtoValue(db, Schema::EValueIds::ProcessingParams, *Self->ProcessingParams);
+ }
+
+ if (record.HasSchemeShardId()) {
+ if (Self->CurrentSchemeShardId == 0) {
+ Self->CurrentSchemeShardId = record.GetSchemeShardId();
+ Schema::SaveSpecialValue(db, Schema::EValueIds::CurrentSchemeShardId, Self->CurrentSchemeShardId);
+ } else {
+ Y_ABORT_UNLESS(Self->CurrentSchemeShardId == record.GetSchemeShardId());
+ }
+ }
+
+ TTxController::TBasicTxInfo fakeTxInfo;
+ fakeTxInfo.TxId = txId;
+ fakeTxInfo.TxKind = txKind;
+
+ auto txOperator = TTxController::ITransactionOperatior::TFactory::MakeHolder(txKind, fakeTxInfo);
+ if (!txOperator || !txOperator->Parse(txBody)) {
+ TTxController::TProposeResult proposeResult(NKikimrTxColumnShard::EResultStatus::ERROR, TStringBuilder() << "Error processing commit TxId# " << txId
+ << (txOperator ? ". Parsing error " : ". Unknown operator for txKind"));
+ ConstructResult(proposeResult, fakeTxInfo);
+ return true;
+ }
+
+ auto txInfoPtr = Self->ProgressTxController->GetTxInfo(txId);
+ if (!!txInfoPtr) {
+ if (txInfoPtr->Source != Ev->Get()->GetSource() || txInfoPtr->Cookie != Ev->Cookie) {
+ TTxController::TProposeResult proposeResult(NKikimrTxColumnShard::EResultStatus::ERROR, TStringBuilder() << "Another commit TxId# " << txId << " has already been proposed");
+ ConstructResult(proposeResult, fakeTxInfo);
+ }
+ TTxController::TProposeResult proposeResult;
+ ConstructResult(proposeResult, *txInfoPtr);
+ } else {
+ auto proposeResult = txOperator->Propose(*Self, txc, false);
+ if (!!proposeResult) {
+ const auto& txInfo = txOperator->TxWithDeadline() ? Self->ProgressTxController->RegisterTxWithDeadline(txId, txKind, txBody, Ev->Get()->GetSource(), Ev->Cookie, txc)
+ : Self->ProgressTxController->RegisterTx(txId, txKind, txBody, Ev->Get()->GetSource(), Ev->Cookie, txc);
+
+ ConstructResult(proposeResult, txInfo);
+ } else {
+ ConstructResult(proposeResult, fakeTxInfo);
+ }
+ }
+ AFL_VERIFY(!!Result);
+ return true;
+}
+
+TTxController::TProposeResult TTxProposeTransaction::ProposeTtlDeprecated(const TString& txBody) {
+ /// @note There's no tx guaranties now. For now TX_KIND_TTL is used to trigger TTL in tests only.
+ /// In future we could trigger TTL outside of tablet. Then we need real tx with complete notification.
+ // TODO: make real tx: save and progress with tablets restart support
+
+ NKikimrTxColumnShard::TTtlTxBody ttlBody;
+ if (!ttlBody.ParseFromString(txBody)) {
+ return TTxController::TProposeResult(NKikimrTxColumnShard::EResultStatus::SCHEMA_ERROR, "TTL tx cannot be parsed");
+ }
+
+ // If no paths trigger schema defined TTL
+ THashMap<ui64, NOlap::TTiering> pathTtls;
+ if (!ttlBody.GetPathIds().empty()) {
+ auto unixTime = TInstant::Seconds(ttlBody.GetUnixTimeSeconds());
+ if (!unixTime) {
+ return TTxController::TProposeResult(NKikimrTxColumnShard::EResultStatus::SCHEMA_ERROR, "TTL tx wrong timestamp");
+ }
+
+ TString columnName = ttlBody.GetTtlColumnName();
+ if (columnName.empty()) {
+ return TTxController::TProposeResult(NKikimrTxColumnShard::EResultStatus::SCHEMA_ERROR, "TTL tx wrong TTL column ''");
}
- case NKikimrTxColumnShard::TX_KIND_COMMIT: {
- if (Self->CommitsInFlight.contains(txId)) {
- LOG_S_DEBUG(TxPrefix() << "CommitTx (retry) TxId " << txId << TxSuffix());
-
- auto txInfoPtr = Self->ProgressTxController->GetTxInfo(txId);
- Y_ABORT_UNLESS(txInfoPtr);
-
- if (txInfoPtr->Source != Ev->Get()->GetSource() || txInfoPtr->Cookie != Ev->Cookie) {
- statusMessage = TStringBuilder()
- << "Another commit TxId# " << txId << " has already been proposed";
- break;
- }
-
- maxStep = txInfoPtr->MaxStep;
- minStep = txInfoPtr->MinStep;
- status = NKikimrTxColumnShard::EResultStatus::PREPARED;
- break;
- }
-
- NKikimrTxColumnShard::TCommitTxBody body;
- if (!body.ParseFromString(txBody)) {
- statusMessage = TStringBuilder()
- << "Commit TxId# " << txId << " cannot be parsed";
- break;
- }
-
- if (body.GetWriteIds().empty()) {
- statusMessage = TStringBuilder()
- << "Commit TxId# " << txId << " has an empty list of write ids";
- break;
- }
-
- if (body.GetTxInitiator() == 0) {
- // When initiator is 0, this means it's a local write id
- // Check that all write ids actually exist
- bool failed = false;
- for (ui64 writeId : body.GetWriteIds()) {
- if (!Self->LongTxWrites.contains(TWriteId{writeId})) {
- statusMessage = TStringBuilder()
- << "Commit TxId# " << txId << " references WriteId# " << writeId
- << " that no longer exists";
- failed = true;
- break;
- }
- auto& lw = Self->LongTxWrites[TWriteId{writeId}];
- if (lw.PreparedTxId != 0) {
- statusMessage = TStringBuilder()
- << "Commit TxId# " << txId << " references WriteId# " << writeId
- << " that is already locked by TxId# " << lw.PreparedTxId;
- failed = true;
- break;
- }
- }
- if (failed) {
- break;
- }
- }
-
- TColumnShard::TCommitMeta meta;
- for (ui64 wId : body.GetWriteIds()) {
- TWriteId writeId{wId};
- meta.AddWriteId(writeId);
- Self->AddLongTxWrite(writeId, txId);
- }
-
- const auto& txInfo = Self->ProgressTxController->RegisterTxWithDeadline(txId, txKind, txBody, Ev->Get()->GetSource(), Ev->Cookie, txc);
- minStep = txInfo.MinStep;
- maxStep = txInfo.MaxStep;
-
- Self->CommitsInFlight.emplace(txId, std::move(meta));
-
- LOG_S_DEBUG(TxPrefix() << "CommitTx txId " << txId << TxSuffix());
-
- status = NKikimrTxColumnShard::EResultStatus::PREPARED;
- break;
+
+ if (!Self->TablesManager.HasPrimaryIndex()) {
+ return TTxController::TProposeResult(NKikimrTxColumnShard::EResultStatus::SCHEMA_ERROR, "No primary index for TTL");
}
- case NKikimrTxColumnShard::TX_KIND_TTL: {
- /// @note There's no tx guaranties now. For now TX_KIND_TTL is used to trigger TTL in tests only.
- /// In future we could trigger TTL outside of tablet. Then we need real tx with complete notification.
- // TODO: make real tx: save and progress with tablets restart support
-
- NKikimrTxColumnShard::TTtlTxBody ttlBody;
- if (!ttlBody.ParseFromString(txBody)) {
- statusMessage = "TTL tx cannot be parsed";
- status = NKikimrTxColumnShard::EResultStatus::SCHEMA_ERROR;
- break;
- }
-
- // If no paths trigger schema defined TTL
- THashMap<ui64, NOlap::TTiering> pathTtls;
- if (!ttlBody.GetPathIds().empty()) {
- auto unixTime = TInstant::Seconds(ttlBody.GetUnixTimeSeconds());
- if (!unixTime) {
- statusMessage = "TTL tx wrong timestamp";
- status = NKikimrTxColumnShard::EResultStatus::SCHEMA_ERROR;
- break;
- }
-
- TString columnName = ttlBody.GetTtlColumnName();
- if (columnName.empty()) {
- statusMessage = "TTL tx wrong TTL column ''";
- status = NKikimrTxColumnShard::EResultStatus::SCHEMA_ERROR;
- break;
- }
-
- if (!Self->TablesManager.HasPrimaryIndex()) {
- statusMessage = "No primary index for TTL";
- status = NKikimrTxColumnShard::EResultStatus::SCHEMA_ERROR;
- break;
- }
-
- auto schema = Self->TablesManager.GetPrimaryIndexSafe().GetVersionedIndex().GetLastSchema()->GetSchema();
- auto ttlColumn = schema->GetFieldByName(columnName);
- if (!ttlColumn) {
- statusMessage = "TTL tx wrong TTL column '" + columnName + "'";
- status = NKikimrTxColumnShard::EResultStatus::SCHEMA_ERROR;
- break;
- }
-
- if (statusMessage.empty()) {
- const TInstant now = TlsActivationContext ? AppData()->TimeProvider->Now() : TInstant::Now();
- for (ui64 pathId : ttlBody.GetPathIds()) {
- NOlap::TTiering tiering;
- tiering.Ttl = NOlap::TTierInfo::MakeTtl(now - unixTime, columnName);
- pathTtls.emplace(pathId, std::move(tiering));
- }
- }
- }
-
- if (statusMessage.empty()) {
- if (Self->SetupTtl(pathTtls, true)) {
- status = NKikimrTxColumnShard::EResultStatus::SUCCESS;
- } else {
- statusMessage = "TTL not started";
- }
- }
-
- break;
+
+ auto schema = Self->TablesManager.GetPrimaryIndexSafe().GetVersionedIndex().GetLastSchema()->GetSchema();
+ auto ttlColumn = schema->GetFieldByName(columnName);
+ if (!ttlColumn) {
+ return TTxController::TProposeResult(NKikimrTxColumnShard::EResultStatus::SCHEMA_ERROR, "TTL tx wrong TTL column '" + columnName + "'");
}
- default: {
- statusMessage = TStringBuilder()
- << "Unsupported TxKind# " << ui32(txKind) << " TxId# " << txId;
+
+ const TInstant now = TlsActivationContext ? AppData()->TimeProvider->Now() : TInstant::Now();
+ for (ui64 pathId : ttlBody.GetPathIds()) {
+ NOlap::TTiering tiering;
+ tiering.Ttl = NOlap::TTierInfo::MakeTtl(now - unixTime, columnName);
+ pathTtls.emplace(pathId, std::move(tiering));
}
}
+ if (!Self->SetupTtl(pathTtls, true)) {
+ return TTxController::TProposeResult(NKikimrTxColumnShard::EResultStatus::SCHEMA_ERROR, "TTL not started");
+ }
- Result = std::make_unique<TEvColumnShard::TEvProposeTransactionResult>(Self->TabletID(), txKind, txId, status, statusMessage);
+ return TTxController::TProposeResult();
+}
- if (status == NKikimrTxColumnShard::EResultStatus::PREPARED) {
+void TTxProposeTransaction::ConstructResult(TTxController::TProposeResult& proposeResult, const TTxController::TBasicTxInfo& txInfo) {
+ Result = std::make_unique<TEvColumnShard::TEvProposeTransactionResult>(Self->TabletID(), txInfo.TxKind, txInfo.TxId, proposeResult.GetStatus(), proposeResult.GetStatusMessage());
+ if (proposeResult.GetStatus() == NKikimrTxColumnShard::EResultStatus::PREPARED) {
Self->IncCounter(COUNTER_PREPARE_SUCCESS);
- Result->Record.SetMinStep(minStep);
- Result->Record.SetMaxStep(maxStep);
+ Result->Record.SetMinStep(txInfo.MinStep);
+ Result->Record.SetMaxStep(txInfo.MaxStep);
if (Self->ProcessingParams) {
Result->Record.MutableDomainCoordinators()->CopyFrom(Self->ProcessingParams->GetCoordinators());
}
- } else if (status == NKikimrTxColumnShard::EResultStatus::SUCCESS) {
+ } else if (proposeResult.GetStatus() == NKikimrTxColumnShard::EResultStatus::SUCCESS) {
Self->IncCounter(COUNTER_PREPARE_SUCCESS);
} else {
Self->IncCounter(COUNTER_PREPARE_ERROR);
- LOG_S_INFO(TxPrefix() << "error txId " << txId << " " << statusMessage << TxSuffix());
+ LOG_S_INFO(TxPrefix() << "error txId " << txInfo.TxId << " " << proposeResult.GetStatusMessage() << TxSuffix());
}
- return true;
}
void TTxProposeTransaction::Complete(const TActorContext& ctx) {
Y_ABORT_UNLESS(Ev);
Y_ABORT_UNLESS(Result);
- LOG_S_DEBUG(TxPrefix() << "complete" << TxSuffix());
-
ctx.Send(Ev->Get()->GetSource(), Result.release());
-
Self->TryRegisterMediatorTimeCast();
}
diff --git a/ydb/core/tx/columnshard/columnshard_impl.cpp b/ydb/core/tx/columnshard/columnshard_impl.cpp
index 6869cbef8c..5d5df25e8f 100644
--- a/ydb/core/tx/columnshard/columnshard_impl.cpp
+++ b/ydb/core/tx/columnshard/columnshard_impl.cpp
@@ -48,92 +48,6 @@ NTabletPipe::TClientConfig GetPipeClientConfig() {
return config;
}
-bool ValidateTableSchema(const NKikimrSchemeOp::TColumnTableSchema& schema) {
- namespace NTypeIds = NScheme::NTypeIds;
-
- static const THashSet<NScheme::TTypeId> supportedTypes = {
- NTypeIds::Timestamp,
- NTypeIds::Int8,
- NTypeIds::Int16,
- NTypeIds::Int32,
- NTypeIds::Int64,
- NTypeIds::Uint8,
- NTypeIds::Uint16,
- NTypeIds::Uint32,
- NTypeIds::Uint64,
- NTypeIds::Date,
- NTypeIds::Datetime,
- //NTypeIds::Interval,
- //NTypeIds::Float,
- //NTypeIds::Double,
- NTypeIds::String,
- NTypeIds::Utf8
- };
-
- if (!schema.HasEngine() ||
- schema.GetEngine() != NKikimrSchemeOp::EColumnTableEngine::COLUMN_ENGINE_REPLACING_TIMESERIES) {
- return false;
- }
-
- if (!schema.KeyColumnNamesSize()) {
- return false;
- }
-
- TString firstKeyColumn = schema.GetKeyColumnNames()[0];
- THashSet<TString> keyColumns(schema.GetKeyColumnNames().begin(), schema.GetKeyColumnNames().end());
-
- for (const NKikimrSchemeOp::TOlapColumnDescription& column : schema.GetColumns()) {
- TString name = column.GetName();
- /*
- if (column.GetNotNull() && keyColumns.contains(name)) {
- return false;
- }
- */
- if (name == firstKeyColumn && !supportedTypes.contains(column.GetTypeId())) {
- return false;
- }
- keyColumns.erase(name);
- }
-
- if (!keyColumns.empty()) {
- return false;
- }
- return true;
-}
-
-bool ValidateTablePreset(const NKikimrSchemeOp::TColumnTableSchemaPreset& preset) {
- if (preset.HasName() && preset.GetName() != "default") {
- return false;
- }
- return ValidateTableSchema(preset.GetSchema());
-}
-
-}
-
-bool TColumnShard::TAlterMeta::Validate(const NOlap::ISnapshotSchema::TPtr& /*schema*/) const {
- switch (Body.TxBody_case()) {
- case NKikimrTxColumnShard::TSchemaTxBody::kInitShard:
- break;
- case NKikimrTxColumnShard::TSchemaTxBody::kEnsureTables:
- for (auto& table : Body.GetEnsureTables().GetTables()) {
- if (table.HasSchemaPreset() && !ValidateTablePreset(table.GetSchemaPreset())) {
- return false;
- }
- if (table.HasSchema() && !ValidateTableSchema(table.GetSchema())) {
- return false;
- }
- // TODO: validate TtlSettings
- }
- break;
- case NKikimrTxColumnShard::TSchemaTxBody::kAlterTable:
- return true;
- case NKikimrTxColumnShard::TSchemaTxBody::kAlterStore:
- return true;
- case NKikimrTxColumnShard::TSchemaTxBody::kDropTable:
- case NKikimrTxColumnShard::TSchemaTxBody::TXBODY_NOT_SET:
- break;
- }
- return true;
}
class TColumnShard::TStoragesManager: public NOlap::IStoragesManager {
@@ -374,42 +288,6 @@ bool TColumnShard::RemoveLongTxWrite(NIceDb::TNiceDb& db, TWriteId writeId, ui64
return false;
}
-bool TColumnShard::AbortTx(const ui64 txId, const NKikimrTxColumnShard::ETransactionKind& txKind, NTabletFlatExecutor::TTransactionContext& txc) {
- switch (txKind) {
- case NKikimrTxColumnShard::TX_KIND_SCHEMA: {
- AltersInFlight.erase(txId);
- break;
- }
- case NKikimrTxColumnShard::TX_KIND_COMMIT: {
- NIceDb::TNiceDb db(txc.DB);
- if (auto* meta = CommitsInFlight.FindPtr(txId)) {
- for (TWriteId writeId : meta->WriteIds) {
- // TODO: we probably need to have more complex
- // logic in the future, when there are multiple
- // inflight commits for the same writeId.
- RemoveLongTxWrite(db, writeId, txId);
- }
- TBlobGroupSelector dsGroupSelector(Info());
- NOlap::TDbWrapper dbTable(txc.DB, &dsGroupSelector);
- InsertTable->Abort(dbTable, meta->WriteIds);
-
- CommitsInFlight.erase(txId);
- }
- break;
- }
- case NKikimrTxColumnShard::TX_KIND_COMMIT_WRITE: {
- if (!OperationsManager->AbortTransaction(*this, txId, txc)) {
- return false;
- }
- break;
- }
- default: {
- Y_ABORT("Unsupported TxKind");
- }
- }
- return true;
-}
-
void TColumnShard::TryAbortWrites(NIceDb::TNiceDb& db, NOlap::TDbWrapper& dbTable, THashSet<TWriteId>&& writesToAbort) {
std::vector<TWriteId> failedAborts;
for (auto& writeId : writesToAbort) {
diff --git a/ydb/core/tx/columnshard/columnshard_impl.h b/ydb/core/tx/columnshard/columnshard_impl.h
index 933112a594..00e72583c2 100644
--- a/ydb/core/tx/columnshard/columnshard_impl.h
+++ b/ydb/core/tx/columnshard/columnshard_impl.h
@@ -8,7 +8,7 @@
#include "columnshard_private_events.h"
#include "blob_manager.h"
#include "tables_manager.h"
-#include "tx_controller.h"
+#include "transactions/tx_controller.h"
#include "inflight_request_tracker.h"
#include "counters/columnshard.h"
#include "resource_subscriber/counters.h"
@@ -131,6 +131,10 @@ class TColumnShard
friend class TOperationsManager;
friend class TWriteOperation;
+ friend class TSchemaTransactionOperator;
+ friend class TLongTxTransactionOperator;
+ friend class TEvWriteTransactionOperator;
+
class TTxProgressTx;
class TTxProposeCancel;
// proto
@@ -281,21 +285,6 @@ private:
std::unique_ptr<TTxController> ProgressTxController;
std::unique_ptr<TOperationsManager> OperationsManager;
- struct TAlterMeta {
- NKikimrTxColumnShard::TSchemaTxBody Body;
- THashSet<TActorId> NotifySubscribers;
-
- bool Validate(const NOlap::ISnapshotSchema::TPtr& schema) const;
- };
-
- struct TCommitMeta {
- THashSet<TWriteId> WriteIds;
-
- void AddWriteId(TWriteId id) {
- WriteIds.insert(id);
- }
- };
-
using TSchemaPreset = TSchemaPreset;
using TTableInfo = TTableInfo;
@@ -414,8 +403,6 @@ private:
bool ProgressTxInFlight = false;
THashMap<ui64, TInstant> ScanTxInFlight;
- THashMap<ui64, TAlterMeta> AltersInFlight;
- THashMap<ui64, TCommitMeta> CommitsInFlight; // key is TxId from propose
THashMap<TWriteId, TLongTxWriteInfo> LongTxWrites;
using TPartsForLTXShard = THashMap<ui32, TLongTxWriteInfo*>;
THashMap<TULID, TPartsForLTXShard> LongTxWritesByUniqueId;
@@ -441,8 +428,6 @@ private:
void AddLongTxWrite(TWriteId writeId, ui64 txId);
void LoadLongTxWrite(TWriteId writeId, const ui32 writePartId, const NLongTxService::TLongTxId& longTxId);
bool RemoveLongTxWrite(NIceDb::TNiceDb& db, TWriteId writeId, ui64 txId = 0);
- bool AbortTx(const ui64 txId, const NKikimrTxColumnShard::ETransactionKind& txKind, NTabletFlatExecutor::TTransactionContext& txc);
- bool LoadTx(const ui64 txId, const NKikimrTxColumnShard::ETransactionKind& txKind, const TString& txBody);
void TryAbortWrites(NIceDb::TNiceDb& db, NOlap::TDbWrapper& dbTable, THashSet<TWriteId>&& writesToAbort);
TWriteId BuildNextWriteId(NTabletFlatExecutor::TTransactionContext& txc);
diff --git a/ydb/core/tx/columnshard/transactions/operators/ev_write.cpp b/ydb/core/tx/columnshard/transactions/operators/ev_write.cpp
new file mode 100644
index 0000000000..927cfabadc
--- /dev/null
+++ b/ydb/core/tx/columnshard/transactions/operators/ev_write.cpp
@@ -0,0 +1,3 @@
+#include "ev_write.h"
+
+namespace NKikimr::NColumnShard {}
diff --git a/ydb/core/tx/columnshard/transactions/operators/ev_write.h b/ydb/core/tx/columnshard/transactions/operators/ev_write.h
new file mode 100644
index 0000000000..b31f381df5
--- /dev/null
+++ b/ydb/core/tx/columnshard/transactions/operators/ev_write.h
@@ -0,0 +1,38 @@
+#pragma once
+
+#include <ydb/core/tx/columnshard/columnshard_impl.h>
+
+namespace NKikimr::NColumnShard {
+
+ class TEvWriteTransactionOperator : public TTxController::ITransactionOperatior {
+ using TBase = TTxController::ITransactionOperatior;
+ using TProposeResult = TTxController::TProposeResult;
+ static inline auto Registrator = TFactory::TRegistrator<TEvWriteTransactionOperator>(NKikimrTxColumnShard::TX_KIND_COMMIT_WRITE);
+ public:
+ using TBase::TBase;
+
+ virtual bool Parse(const TString& data) override {
+ Y_UNUSED(data);
+ return true;
+ }
+
+ TProposeResult Propose(TColumnShard& /*owner*/, NTabletFlatExecutor::TTransactionContext& /*txc*/, bool /*proposed*/) const override {
+ return TProposeResult();
+ }
+
+ virtual bool Progress(TColumnShard& owner, const NOlap::TSnapshot& version, NTabletFlatExecutor::TTransactionContext& txc) override {
+ return owner.OperationsManager->CommitTransaction(owner, GetTxId(), txc, version);
+ }
+
+ virtual bool Complete(TColumnShard& owner, const TActorContext& ctx) override {
+ auto result = NEvents::TDataEvents::TEvWriteResult::BuildCommited(owner.TabletID(), GetTxId());
+ ctx.Send(TxInfo.Source, result.release(), 0, TxInfo.Cookie);
+ return true;
+ }
+
+ virtual bool Abort(TColumnShard& owner, NTabletFlatExecutor::TTransactionContext& txc) override {
+ return owner.OperationsManager->AbortTransaction(owner, GetTxId(), txc);
+ }
+ };
+
+}
diff --git a/ydb/core/tx/columnshard/transactions/operators/long_tx_write.cpp b/ydb/core/tx/columnshard/transactions/operators/long_tx_write.cpp
new file mode 100644
index 0000000000..e10c1dc4a7
--- /dev/null
+++ b/ydb/core/tx/columnshard/transactions/operators/long_tx_write.cpp
@@ -0,0 +1,3 @@
+#include "long_tx_write.h"
+
+namespace NKikimr::NColumnShard {}
diff --git a/ydb/core/tx/columnshard/transactions/operators/long_tx_write.h b/ydb/core/tx/columnshard/transactions/operators/long_tx_write.h
new file mode 100644
index 0000000000..7ef86171f3
--- /dev/null
+++ b/ydb/core/tx/columnshard/transactions/operators/long_tx_write.h
@@ -0,0 +1,103 @@
+#pragma once
+
+#include <ydb/core/tx/columnshard/columnshard_impl.h>
+
+namespace NKikimr::NColumnShard {
+
+ class TLongTxTransactionOperator : public TTxController::ITransactionOperatior {
+ using TBase = TTxController::ITransactionOperatior;
+ using TProposeResult = TTxController::TProposeResult;
+ static inline auto Registrator = TFactory::TRegistrator<TLongTxTransactionOperator>(NKikimrTxColumnShard::TX_KIND_COMMIT);
+ public:
+ using TBase::TBase;
+
+ bool Parse(const TString& data) override {
+ NKikimrTxColumnShard::TCommitTxBody commitTxBody;
+ if (!commitTxBody.ParseFromString(data)) {
+ return false;
+ }
+
+ for (auto& id : commitTxBody.GetWriteIds()) {
+ WriteIds.insert(TWriteId{id});
+ }
+ return true;
+ }
+
+ void OnTabletInit(TColumnShard& owner) override {
+ for (auto&& writeId : WriteIds) {
+ Y_ABORT_UNLESS(owner.LongTxWrites.contains(writeId), "TTxInit at %" PRIu64 " : Commit %" PRIu64 " references local write %" PRIu64 " that doesn't exist",
+ owner.TabletID(), GetTxId(), (ui64)writeId);
+ owner.AddLongTxWrite(writeId, GetTxId());
+ }
+ }
+
+ TProposeResult Propose(TColumnShard& owner, NTabletFlatExecutor::TTransactionContext& /*txc*/, bool /*proposed*/) const override {
+ if (WriteIds.empty()) {
+ return TProposeResult(NKikimrTxColumnShard::EResultStatus::ERROR,
+ TStringBuilder() << "Commit TxId# " << GetTxId() << " has an empty list of write ids");
+ }
+
+ for (auto&& writeId : WriteIds) {
+ if (!owner.LongTxWrites.contains(writeId)) {
+ return TProposeResult(NKikimrTxColumnShard::EResultStatus::ERROR,
+ TStringBuilder() << "Commit TxId# " << GetTxId() << " references WriteId# " << (ui64)writeId << " that no longer exists");
+ }
+ auto& lw = owner.LongTxWrites[writeId];
+ if (lw.PreparedTxId != 0) {
+ return TProposeResult(NKikimrTxColumnShard::EResultStatus::ERROR,
+ TStringBuilder() << "Commit TxId# " << GetTxId() << " references WriteId# " << (ui64)writeId << " that is already locked by TxId# " << lw.PreparedTxId);
+ }
+ }
+
+ for (auto&& writeId : WriteIds) {
+ owner.AddLongTxWrite(writeId, GetTxId());
+ }
+ return TProposeResult();;
+ }
+
+ bool Progress(TColumnShard& owner, const NOlap::TSnapshot& version, NTabletFlatExecutor::TTransactionContext& txc) override {
+ TBlobGroupSelector dsGroupSelector(owner.Info());
+ NOlap::TDbWrapper dbTable(txc.DB, &dsGroupSelector);
+
+ auto pathExists = [&](ui64 pathId) {
+ return owner.TablesManager.HasTable(pathId);
+ };
+
+ auto counters = owner.InsertTable->Commit(dbTable, version.GetPlanStep(), version.GetTxId(), WriteIds,
+ pathExists);
+
+ owner.IncCounter(COUNTER_BLOBS_COMMITTED, counters.Rows);
+ owner.IncCounter(COUNTER_BYTES_COMMITTED, counters.Bytes);
+ owner.IncCounter(COUNTER_RAW_BYTES_COMMITTED, counters.RawBytes);
+
+ NIceDb::TNiceDb db(txc.DB);
+ for (TWriteId writeId : WriteIds) {
+ owner.RemoveLongTxWrite(db, writeId, GetTxId());
+ }
+ owner.UpdateInsertTableCounters();
+ return true;
+ }
+
+ bool Complete(TColumnShard& owner, const TActorContext& ctx) override {
+ auto result = std::make_unique<TEvColumnShard::TEvProposeTransactionResult>(
+ owner.TabletID(), TxInfo.TxKind, GetTxId(), NKikimrTxColumnShard::SUCCESS);
+ result->Record.SetStep(TxInfo.PlanStep);
+ ctx.Send(TxInfo.Source, result.release(), 0, TxInfo.Cookie);
+ return true;
+ }
+
+ bool Abort(TColumnShard& owner, NTabletFlatExecutor::TTransactionContext& txc) override {
+ NIceDb::TNiceDb db(txc.DB);
+ for (TWriteId writeId : WriteIds) {
+ owner.RemoveLongTxWrite(db, writeId, GetTxId());
+ }
+ TBlobGroupSelector dsGroupSelector(owner.Info());
+ NOlap::TDbWrapper dbTable(txc.DB, &dsGroupSelector);
+ owner.InsertTable->Abort(dbTable, WriteIds);
+ return true;
+ }
+
+ private:
+ THashSet<TWriteId> WriteIds;
+ };
+}
diff --git a/ydb/core/tx/columnshard/transactions/operators/schema.cpp b/ydb/core/tx/columnshard/transactions/operators/schema.cpp
new file mode 100644
index 0000000000..b9979fd754
--- /dev/null
+++ b/ydb/core/tx/columnshard/transactions/operators/schema.cpp
@@ -0,0 +1,3 @@
+#include "schema.h"
+
+namespace NKikimr::NColumnShard {}
diff --git a/ydb/core/tx/columnshard/transactions/operators/schema.h b/ydb/core/tx/columnshard/transactions/operators/schema.h
new file mode 100644
index 0000000000..78940db370
--- /dev/null
+++ b/ydb/core/tx/columnshard/transactions/operators/schema.h
@@ -0,0 +1,159 @@
+#pragma once
+
+#include <ydb/core/tx/columnshard/columnshard_impl.h>
+
+namespace NKikimr::NColumnShard {
+
+ class TSchemaTransactionOperator : public TTxController::ITransactionOperatior {
+ using TBase = TTxController::ITransactionOperatior;
+ using TProposeResult = TTxController::TProposeResult;
+ static inline auto Registrator = TFactory::TRegistrator<TSchemaTransactionOperator>(NKikimrTxColumnShard::TX_KIND_SCHEMA);
+ public:
+ using TBase::TBase;
+
+ virtual bool Parse(const TString& data) override {
+ if (!SchemaTxBody.ParseFromString(data)) {
+ return false;
+ }
+ return true;
+ }
+
+ bool TxWithDeadline() const override {
+ return false;
+ }
+
+ TProposeResult Propose(TColumnShard& owner, NTabletFlatExecutor::TTransactionContext& txc, bool /*proposed*/) const override {
+ switch (SchemaTxBody.TxBody_case()) {
+ case NKikimrTxColumnShard::TSchemaTxBody::kInitShard:
+ break;
+ case NKikimrTxColumnShard::TSchemaTxBody::kEnsureTables:
+ for (auto& table : SchemaTxBody.GetEnsureTables().GetTables()) {
+ if (table.HasSchemaPreset() && !ValidateTablePreset(table.GetSchemaPreset())) {
+ return TProposeResult(NKikimrTxColumnShard::EResultStatus::ERROR, "Invalid schema");
+ }
+ if (table.HasSchema() && !ValidateTableSchema(table.GetSchema())) {
+ return TProposeResult(NKikimrTxColumnShard::EResultStatus::ERROR, "Invalid schema");
+ }
+ }
+ break;
+ case NKikimrTxColumnShard::TSchemaTxBody::kAlterTable:
+ case NKikimrTxColumnShard::TSchemaTxBody::kAlterStore:
+ case NKikimrTxColumnShard::TSchemaTxBody::kDropTable:
+ case NKikimrTxColumnShard::TSchemaTxBody::TXBODY_NOT_SET:
+ break;
+ }
+
+ auto seqNo = SeqNoFromProto(SchemaTxBody.GetSeqNo());
+ auto lastSeqNo = owner.LastSchemaSeqNo;
+
+ // Check if proposal is outdated
+ if (seqNo < lastSeqNo) {
+ auto errorMessage = TStringBuilder()
+ << "Ignoring outdated schema tx proposal at tablet "
+ << owner.TabletID()
+ << " txId " << GetTxId()
+ << " ssId " << owner.CurrentSchemeShardId
+ << " seqNo " << seqNo
+ << " lastSeqNo " << lastSeqNo;
+ return TProposeResult(NKikimrTxColumnShard::EResultStatus::SCHEMA_CHANGED, errorMessage);
+ }
+
+ owner.UpdateSchemaSeqNo(seqNo, txc);
+ return TProposeResult();
+ }
+
+ virtual bool Progress(TColumnShard& owner, const NOlap::TSnapshot& version, NTabletFlatExecutor::TTransactionContext& txc) override {
+ owner.RunSchemaTx(SchemaTxBody, version, txc);
+ owner.ProtectSchemaSeqNo(SchemaTxBody.GetSeqNo(), txc);
+ return true;
+ }
+
+ virtual bool Complete(TColumnShard& owner, const TActorContext& ctx) override {
+ for (TActorId subscriber : NotifySubscribers) {
+ auto event = MakeHolder<TEvColumnShard::TEvNotifyTxCompletionResult>(owner.TabletID(), GetTxId());
+ ctx.Send(subscriber, event.Release(), 0, 0);
+ }
+
+ auto result = std::make_unique<TEvColumnShard::TEvProposeTransactionResult>(
+ owner.TabletID(), TxInfo.TxKind, TxInfo.TxId, NKikimrTxColumnShard::SUCCESS);
+ result->Record.SetStep(TxInfo.PlanStep);
+ ctx.Send(TxInfo.Source, result.release(), 0, TxInfo.Cookie);
+ return true;
+ }
+
+ virtual bool Abort(TColumnShard& owner, NTabletFlatExecutor::TTransactionContext& txc) override {
+ Y_UNUSED(owner, txc);
+ return true;
+ }
+
+ virtual void RegisterSubscriber(const TActorId& actorId) override {
+ NotifySubscribers.insert(actorId);
+ }
+
+ private:
+ bool ValidateTableSchema(const NKikimrSchemeOp::TColumnTableSchema& schema) const {
+ namespace NTypeIds = NScheme::NTypeIds;
+
+ static const THashSet<NScheme::TTypeId> supportedTypes = {
+ NTypeIds::Timestamp,
+ NTypeIds::Int8,
+ NTypeIds::Int16,
+ NTypeIds::Int32,
+ NTypeIds::Int64,
+ NTypeIds::Uint8,
+ NTypeIds::Uint16,
+ NTypeIds::Uint32,
+ NTypeIds::Uint64,
+ NTypeIds::Date,
+ NTypeIds::Datetime,
+ //NTypeIds::Interval,
+ //NTypeIds::Float,
+ //NTypeIds::Double,
+ NTypeIds::String,
+ NTypeIds::Utf8
+ };
+
+ if (!schema.HasEngine() ||
+ schema.GetEngine() != NKikimrSchemeOp::EColumnTableEngine::COLUMN_ENGINE_REPLACING_TIMESERIES) {
+ return false;
+ }
+
+ if (!schema.KeyColumnNamesSize()) {
+ return false;
+ }
+
+ TString firstKeyColumn = schema.GetKeyColumnNames()[0];
+ THashSet<TString> keyColumns(schema.GetKeyColumnNames().begin(), schema.GetKeyColumnNames().end());
+
+ for (const NKikimrSchemeOp::TOlapColumnDescription& column : schema.GetColumns()) {
+ TString name = column.GetName();
+ /*
+ if (column.GetNotNull() && keyColumns.contains(name)) {
+ return false;
+ }
+ */
+ if (name == firstKeyColumn && !supportedTypes.contains(column.GetTypeId())) {
+ return false;
+ }
+ keyColumns.erase(name);
+ }
+
+ if (!keyColumns.empty()) {
+ return false;
+ }
+ return true;
+ }
+
+ bool ValidateTablePreset(const NKikimrSchemeOp::TColumnTableSchemaPreset& preset) const {
+ if (preset.HasName() && preset.GetName() != "default") {
+ return false;
+ }
+ return ValidateTableSchema(preset.GetSchema());
+ }
+
+ private:
+ NKikimrTxColumnShard::TSchemaTxBody SchemaTxBody;
+ THashSet<TActorId> NotifySubscribers;
+ };
+
+}
diff --git a/ydb/core/tx/columnshard/transactions/operators/ya.make b/ydb/core/tx/columnshard/transactions/operators/ya.make
new file mode 100644
index 0000000000..449ec29a13
--- /dev/null
+++ b/ydb/core/tx/columnshard/transactions/operators/ya.make
@@ -0,0 +1,13 @@
+LIBRARY()
+
+SRCS(
+ GLOBAL schema.cpp
+ GLOBAL long_tx_write.cpp
+ GLOBAL ev_write.cpp
+)
+
+PEERDIR(
+ ydb/core/tx/columnshard/transactions
+)
+
+END()
diff --git a/ydb/core/tx/columnshard/tx_controller.cpp b/ydb/core/tx/columnshard/transactions/tx_controller.cpp
index b2943fd462..1137456d3d 100644
--- a/ydb/core/tx/columnshard/tx_controller.cpp
+++ b/ydb/core/tx/columnshard/transactions/tx_controller.cpp
@@ -1,5 +1,5 @@
#include "tx_controller.h"
-#include "columnshard_impl.h"
+#include <ydb/core/tx/columnshard/columnshard_impl.h>
namespace NKikimr::NColumnShard {
@@ -64,7 +64,10 @@ bool TTxController::Load(NTabletFlatExecutor::TTransactionContext& txc) {
}
const TString txBody = rowset.GetValue<Schema::TxInfo::TxBody>();
- Y_ABORT_UNLESS(Owner.LoadTx(txId, txInfo.TxKind, txBody));
+ ITransactionOperatior::TPtr txOperator(ITransactionOperatior::TFactory::Construct(txInfo.TxKind, txInfo));
+ Y_ABORT_UNLESS(!!txOperator);
+ Y_ABORT_UNLESS(txOperator->Parse(txBody));
+ Operators[txId] = txOperator;
if (!rowset.Next()) {
return false;
@@ -73,6 +76,20 @@ bool TTxController::Load(NTabletFlatExecutor::TTransactionContext& txc) {
return true;
}
+TTxController::ITransactionOperatior::TPtr TTxController::GetTxOperator(const ui64 txId) {
+ auto it = Operators.find(txId);
+ if(it == Operators.end()) {
+ return nullptr;
+ }
+ return it->second;
+}
+
+TTxController::ITransactionOperatior::TPtr TTxController::GetVerifiedTxOperator(const ui64 txId) {
+ auto it = Operators.find(txId);
+ AFL_VERIFY(it != Operators.end())("tx_id", txId);
+ return it->second;
+}
+
const TTxController::TBasicTxInfo& TTxController::RegisterTx(const ui64 txId, const NKikimrTxColumnShard::ETransactionKind& txKind, const TString& txBody, const TActorId& source, const ui64 cookie, NTabletFlatExecutor::TTransactionContext& txc) {
NIceDb::TNiceDb db(txc.DB);
@@ -81,6 +98,12 @@ const TTxController::TBasicTxInfo& TTxController::RegisterTx(const ui64 txId, co
txInfo.TxKind = txKind;
txInfo.Source = source;
txInfo.Cookie = cookie;
+
+ ITransactionOperatior::TPtr txOperator(ITransactionOperatior::TFactory::Construct(txInfo.TxKind, txInfo));
+ Y_ABORT_UNLESS(!!txOperator);
+ Y_ABORT_UNLESS(txOperator->Parse(txBody));
+ Operators[txId] = txOperator;
+
Schema::SaveTxInfo(db, txInfo.TxId, txInfo.TxKind, txBody, Max<ui64>(), txInfo.Source, txInfo.Cookie);
return txInfo;
}
@@ -95,6 +118,12 @@ const TTxController::TBasicTxInfo& TTxController::RegisterTxWithDeadline(const u
txInfo.Cookie = cookie;
txInfo.MinStep = GetAllowedStep();
txInfo.MaxStep = txInfo.MinStep + MaxCommitTxDelay.MilliSeconds();
+
+ ITransactionOperatior::TPtr txOperator(ITransactionOperatior::TFactory::Construct(txInfo.TxKind, txInfo));
+ Y_ABORT_UNLESS(!!txOperator);
+ Y_ABORT_UNLESS(txOperator->Parse(txBody));
+ Operators[txId] = txOperator;
+
Schema::SaveTxInfo(db, txInfo.TxId, txInfo.TxKind, txBody, txInfo.MaxStep, txInfo.Source, txInfo.Cookie);
DeadlineQueue.emplace(txInfo.MaxStep, txId);
return txInfo;
@@ -106,11 +135,16 @@ bool TTxController::AbortTx(const ui64 txId, NTabletFlatExecutor::TTransactionCo
return true;
}
Y_ABORT_UNLESS(it->second.PlanStep == 0);
- Owner.AbortTx(txId, it->second.TxKind, txc);
+
+ auto opIt = Operators.find(txId);
+ Y_ABORT_UNLESS(opIt != Operators.end());
+ opIt->second->Abort(Owner, txc);
+
if (it->second.MaxStep != Max<ui64>()) {
DeadlineQueue.erase(TPlanQueueItem(it->second.MaxStep, txId));
}
BasicTxInfo.erase(it);
+ Operators.erase(txId);
NIceDb::TNiceDb db(txc.DB);
Schema::EraseTxInfo(db, txId);
return true;
@@ -125,11 +159,16 @@ bool TTxController::CancelTx(const ui64 txId, NTabletFlatExecutor::TTransactionC
// Cannot cancel planned transaction
return false;
}
- Owner.AbortTx(txId, it->second.TxKind, txc);
+
+ auto opIt = Operators.find(txId);
+ Y_ABORT_UNLESS(opIt != Operators.end());
+ opIt->second->Abort(Owner, txc);
+
if (it->second.MaxStep != Max<ui64>()) {
DeadlineQueue.erase(TPlanQueueItem(it->second.MaxStep, txId));
}
BasicTxInfo.erase(it);
+ Operators.erase(txId);
NIceDb::TNiceDb db(txc.DB);
Schema::EraseTxInfo(db, txId);
return true;
@@ -152,6 +191,7 @@ void TTxController::FinishPlannedTx(const ui64 txId, NTabletFlatExecutor::TTrans
NIceDb::TNiceDb db(txc.DB);
BasicTxInfo.erase(txId);
+ Operators.erase(txId);
Schema::EraseTxInfo(db, txId);
}
@@ -217,4 +257,15 @@ TTxController::EPlanResult TTxController::PlanTx(const ui64 planStep, const ui64
return EPlanResult::AlreadyPlanned;
}
+void TTxController::OnTabletInit() {
+ for (auto&& txOperator : Operators) {
+ txOperator.second->OnTabletInit(Owner);
+ }
+}
+
+}
+
+template <>
+void Out<NKikimrTxColumnShard::ETransactionKind>(IOutputStream& out, TTypeTraits<NKikimrTxColumnShard::ETransactionKind>::TFuncParam txKind) {
+ out << (ui64) txKind;
}
diff --git a/ydb/core/tx/columnshard/tx_controller.h b/ydb/core/tx/columnshard/transactions/tx_controller.h
index cadeaf7ccb..fb38fae9d3 100644
--- a/ydb/core/tx/columnshard/tx_controller.h
+++ b/ydb/core/tx/columnshard/transactions/tx_controller.h
@@ -1,6 +1,6 @@
#pragma once
-#include "columnshard_schema.h"
+#include <ydb/core/tx/columnshard/columnshard_schema.h>
#include <ydb/core/tablet_flat/tablet_flat_executed.h>
#include <ydb/core/tx/data_events/events.h>
@@ -36,6 +36,54 @@ public:
NKikimrTxColumnShard::ETransactionKind TxKind;
};
+ class TProposeResult {
+ YDB_READONLY(NKikimrTxColumnShard::EResultStatus, Status, NKikimrTxColumnShard::EResultStatus::PREPARED);
+ YDB_READONLY_DEF(TString, StatusMessage);
+ public:
+ TProposeResult() = default;
+ TProposeResult(NKikimrTxColumnShard::EResultStatus status, const TString& statusMessage)
+ : Status(status)
+ , StatusMessage(statusMessage)
+ {}
+
+ bool operator!() const {
+ return Status != NKikimrTxColumnShard::EResultStatus::PREPARED;
+ }
+ };
+
+ class ITransactionOperatior {
+ protected:
+ TBasicTxInfo TxInfo;
+ public:
+ using TPtr = std::shared_ptr<ITransactionOperatior>;
+ using TFactory = NObjectFactory::TParametrizedObjectFactory<ITransactionOperatior, NKikimrTxColumnShard::ETransactionKind, TBasicTxInfo>;
+
+ ITransactionOperatior(const TBasicTxInfo& txInfo)
+ : TxInfo(txInfo)
+ {}
+
+ ui64 GetTxId() const {
+ return TxInfo.TxId;
+ }
+
+ virtual ~ITransactionOperatior() {}
+
+ virtual bool TxWithDeadline() const {
+ return true;
+ }
+
+ virtual bool Parse(const TString& data) = 0;
+ virtual TProposeResult Propose(TColumnShard& owner, NTabletFlatExecutor::TTransactionContext& txc, bool proposed) const = 0;
+
+ virtual bool Progress(TColumnShard& owner, const NOlap::TSnapshot& version, NTabletFlatExecutor::TTransactionContext& txc) = 0;
+ virtual bool Abort(TColumnShard& owner, NTabletFlatExecutor::TTransactionContext& txc) = 0;
+ virtual bool Complete(TColumnShard& owner, const TActorContext& ctx) = 0;
+ virtual void RegisterSubscriber(const TActorId&) {
+ AFL_VERIFY(false)("message", "Not implemented");
+ };
+ virtual void OnTabletInit(TColumnShard& /*owner*/) {}
+ };
+
private:
const TDuration MaxCommitTxDelay = TDuration::Seconds(30);
TColumnShard& Owner;
@@ -44,6 +92,8 @@ private:
std::set<TPlanQueueItem> PlanQueue;
std::set<TPlanQueueItem> RunningQueue;
+ THashMap<ui64, ITransactionOperatior::TPtr> Operators;
+
private:
ui64 GetAllowedStep() const;
bool AbortTx(const ui64 txId, NTabletFlatExecutor::TTransactionContext& txc);
@@ -51,6 +101,9 @@ private:
public:
TTxController(TColumnShard& owner);
+ ITransactionOperatior::TPtr GetTxOperator(const ui64 txId);
+ ITransactionOperatior::TPtr GetVerifiedTxOperator(const ui64 txId);
+
ui64 GetMemoryUsage() const;
bool HaveOutdatedTxs() const;
@@ -79,6 +132,8 @@ public:
};
EPlanResult PlanTx(const ui64 planStep, const ui64 txId, NTabletFlatExecutor::TTransactionContext& txc);
+ void OnTabletInit();
};
}
+
diff --git a/ydb/core/tx/columnshard/transactions/ya.make b/ydb/core/tx/columnshard/transactions/ya.make
new file mode 100644
index 0000000000..17f70379bb
--- /dev/null
+++ b/ydb/core/tx/columnshard/transactions/ya.make
@@ -0,0 +1,20 @@
+LIBRARY()
+
+SRCS(
+ tx_controller.cpp
+)
+
+PEERDIR(
+ ydb/core/tablet_flat
+ ydb/core/tx/data_events
+)
+
+IF (OS_WINDOWS)
+ CFLAGS(
+ -DKIKIMR_DISABLE_S3_OPS
+ )
+ENDIF()
+
+YQL_LAST_ABI_VERSION()
+
+END()
diff --git a/ydb/core/tx/columnshard/ya.make b/ydb/core/tx/columnshard/ya.make
index 04443739ee..32c6f82418 100644
--- a/ydb/core/tx/columnshard/ya.make
+++ b/ydb/core/tx/columnshard/ya.make
@@ -27,7 +27,6 @@ SRCS(
defs.cpp
write_actor.cpp
tables_manager.cpp
- tx_controller.cpp
)
GENERATE_ENUM_SERIALIZATION(columnshard.h)
@@ -49,6 +48,8 @@ PEERDIR(
ydb/core/tx/columnshard/common
ydb/core/tx/columnshard/splitter
ydb/core/tx/columnshard/operations
+ ydb/core/tx/columnshard/transactions
+ ydb/core/tx/columnshard/transactions/operators
ydb/core/tx/columnshard/blobs_reader
ydb/core/tx/columnshard/blobs_action
ydb/core/tx/columnshard/resource_subscriber