diff options
author | nsofya <nsofya@ydb.tech> | 2024-01-29 11:12:57 +0300 |
---|---|---|
committer | GitHub <noreply@github.com> | 2024-01-29 11:12:57 +0300 |
commit | db4298118bb377fe654bca629af1f5bb019f4d37 (patch) | |
tree | d089909d8ec05813b0de05c68e9918fc4ed4e5a8 | |
parent | eecc71dc9207f1843120577c55a6197041245d31 (diff) | |
download | ydb-db4298118bb377fe654bca629af1f5bb019f4d37.tar.gz |
Add TxOperators (#1365)
* Create special classes for distribute commit logics
* Correct diff
* Add missed event
18 files changed, 575 insertions, 539 deletions
diff --git a/ydb/core/tx/columnshard/columnshard.cpp b/ydb/core/tx/columnshard/columnshard.cpp index ac3f521b8b..6923c4f357 100644 --- a/ydb/core/tx/columnshard/columnshard.cpp +++ b/ydb/core/tx/columnshard/columnshard.cpp @@ -255,8 +255,6 @@ ui64 TColumnShard::MemoryUsage() const { ui64 memory = ProgressTxController->GetMemoryUsage() + ScanTxInFlight.size() * (sizeof(ui64) + sizeof(TInstant)) + - AltersInFlight.size() * sizeof(TAlterMeta) + - CommitsInFlight.size() * sizeof(TCommitMeta) + LongTxWrites.size() * (sizeof(TWriteId) + sizeof(TLongTxWriteInfo)) + LongTxWritesByUniqueId.size() * (sizeof(TULID) + sizeof(void*)) + (WaitingScans.size()) * (sizeof(NOlap::TSnapshot) + sizeof(void*)) + diff --git a/ydb/core/tx/columnshard/columnshard__init.cpp b/ydb/core/tx/columnshard/columnshard__init.cpp index 8173719281..3e11ed9e92 100644 --- a/ydb/core/tx/columnshard/columnshard__init.cpp +++ b/ydb/core/tx/columnshard/columnshard__init.cpp @@ -38,8 +38,6 @@ void TTxInit::SetDefaults() { Self->LastPlannedTxId = 0; Self->OwnerPathId = 0; Self->OwnerPath.clear(); - Self->AltersInFlight.clear(); - Self->CommitsInFlight.clear(); Self->LongTxWrites.clear(); Self->LongTxWritesByUniqueId.clear(); } @@ -180,18 +178,6 @@ bool TTxInit::ReadEverything(TTransactionContext& txc, const TActorContext& ctx) } } - { - TMemoryProfileGuard g("TTxInit/CommitsInFlight"); - for (const auto& pr : Self->CommitsInFlight) { - ui64 txId = pr.first; - for (TWriteId writeId : pr.second.WriteIds) { - Y_ABORT_UNLESS(Self->LongTxWrites.contains(writeId), - "TTxInit at %" PRIu64 " : Commit %" PRIu64 " references local write %" PRIu64 " that doesn't exist", - Self->TabletID(), txId, writeId); - Self->AddLongTxWrite(writeId, txId); - } - } - } Self->UpdateInsertTableCounters(); Self->UpdateIndexCounters(); Self->UpdateResourceMetrics(ctx, {}); @@ -219,6 +205,7 @@ bool TTxInit::Execute(TTransactionContext& txc, const TActorContext& ctx) { } void TTxInit::Complete(const TActorContext& ctx) { + Self->ProgressTxController->OnTabletInit(); Self->SwitchToWork(ctx); } @@ -377,31 +364,4 @@ void TColumnShard::Handle(TEvPrivate::TEvNormalizerResult::TPtr& ev, const TActo Execute(new TTxApplyNormalizer(this, ev->Get()->GetChanges()), ctx); } -bool TColumnShard::LoadTx(const ui64 txId, const NKikimrTxColumnShard::ETransactionKind& txKind, const TString& txBody) { - switch (txKind) { - case NKikimrTxColumnShard::TX_KIND_SCHEMA: { - TColumnShard::TAlterMeta meta; - Y_ABORT_UNLESS(meta.Body.ParseFromString(txBody)); - AltersInFlight.emplace(txId, std::move(meta)); - break; - } - case NKikimrTxColumnShard::TX_KIND_COMMIT: { - NKikimrTxColumnShard::TCommitTxBody body; - Y_ABORT_UNLESS(body.ParseFromString(txBody)); - - TColumnShard::TCommitMeta meta; - for (auto& id : body.GetWriteIds()) { - meta.AddWriteId(TWriteId{id}); - } - - CommitsInFlight.emplace(txId, std::move(meta)); - break; - } - default: { - Y_ABORT("Unsupported TxKind stored in the TxInfo table"); - } - } - return true; -} - } diff --git a/ydb/core/tx/columnshard/columnshard__notify_tx_completion.cpp b/ydb/core/tx/columnshard/columnshard__notify_tx_completion.cpp index 31672ce1bc..cc498125c3 100644 --- a/ydb/core/tx/columnshard/columnshard__notify_tx_completion.cpp +++ b/ydb/core/tx/columnshard/columnshard__notify_tx_completion.cpp @@ -14,12 +14,11 @@ public: LOG_S_DEBUG("TTxNotifyTxCompletion.Execute at tablet " << Self->TabletID()); const ui64 txId = Ev->Get()->Record.GetTxId(); - - if (Self->AltersInFlight.contains(txId)) { - Self->AltersInFlight[txId].NotifySubscribers.insert(Ev->Sender); + auto txOperator = Self->ProgressTxController->GetTxOperator(txId); + if (txOperator) { + txOperator->RegisterSubscriber(Ev->Sender); return true; } - Result.reset(new TEvColumnShard::TEvNotifyTxCompletionResult(Self->TabletID(), txId)); return true; } diff --git a/ydb/core/tx/columnshard/columnshard__progress_tx.cpp b/ydb/core/tx/columnshard/columnshard__progress_tx.cpp index d8450d124a..42597d4927 100644 --- a/ydb/core/tx/columnshard/columnshard__progress_tx.cpp +++ b/ydb/core/tx/columnshard/columnshard__progress_tx.cpp @@ -7,55 +7,6 @@ namespace NKikimr::NColumnShard { class TColumnShard::TTxProgressTx : public TTransactionBase<TColumnShard> { -private: - struct TEvent { - TActorId Target; - ui64 Cookie; - THolder<IEventBase> Event; - - TEvent(TActorId target, ui64 cookie, THolder<IEventBase> event) - : Target(target) - , Cookie(cookie) - , Event(std::move(event)) - { } - }; - - struct TResultEvent { - TTxController::TBasicTxInfo TxInfo; - NKikimrTxColumnShard::EResultStatus Status; - - TResultEvent(TTxController::TBasicTxInfo&& txInfo, NKikimrTxColumnShard::EResultStatus status) - : TxInfo(std::move(txInfo)) - , Status(status) - {} - - std::unique_ptr<IEventBase> MakeEvent(ui64 tabletId) const { - if (TxInfo.TxKind == NKikimrTxColumnShard::TX_KIND_COMMIT_WRITE) { - auto result = NEvents::TDataEvents::TEvWriteResult::BuildCommited(tabletId, TxInfo.TxId); - return result; - } else { - auto result = std::make_unique<TEvColumnShard::TEvProposeTransactionResult>( - tabletId, TxInfo.TxKind, TxInfo.TxId, Status); - result->Record.SetStep(TxInfo.PlanStep); - return result; - } - } - }; - - enum class ETriggerActivities { - NONE, - POST_INSERT, - POST_SCHEMA - }; - - TStringBuilder TxPrefix() const { - return TStringBuilder() << "TxProgressTx[" << ToString(TabletTxNo) << "] "; - } - - TString TxSuffix() const { - return TStringBuilder() << " at tablet " << Self->TabletID(); - } - public: TTxProgressTx(TColumnShard* self) : TTransactionBase(self) @@ -81,60 +32,8 @@ public: ui64 step = plannedItem->PlanStep; ui64 txId = plannedItem->TxId; - TTxController::TBasicTxInfo txInfo = *plannedItem; - switch (txInfo.TxKind) { - case NKikimrTxColumnShard::TX_KIND_SCHEMA: - { - auto& meta = Self->AltersInFlight.at(txId); - Self->RunSchemaTx(meta.Body, NOlap::TSnapshot(step, txId), txc); - Self->ProtectSchemaSeqNo(meta.Body.GetSeqNo(), txc); - for (TActorId subscriber : meta.NotifySubscribers) { - TxEvents.emplace_back(subscriber, 0, - MakeHolder<TEvColumnShard::TEvNotifyTxCompletionResult>(Self->TabletID(), txId)); - } - Self->AltersInFlight.erase(txId); - Trigger = ETriggerActivities::POST_SCHEMA; - break; - } - case NKikimrTxColumnShard::TX_KIND_COMMIT: { - const auto& meta = Self->CommitsInFlight.at(txId); - - TBlobGroupSelector dsGroupSelector(Self->Info()); - NOlap::TDbWrapper dbTable(txc.DB, &dsGroupSelector); - - auto pathExists = [&](ui64 pathId) { - return Self->TablesManager.HasTable(pathId); - }; - - auto counters = Self->InsertTable->Commit(dbTable, step, txId, meta.WriteIds, - pathExists); - Self->IncCounter(COUNTER_BLOBS_COMMITTED, counters.Rows); - Self->IncCounter(COUNTER_BYTES_COMMITTED, counters.Bytes); - Self->IncCounter(COUNTER_RAW_BYTES_COMMITTED, counters.RawBytes); - - NIceDb::TNiceDb db(txc.DB); - for (TWriteId writeId : meta.WriteIds) { - Self->RemoveLongTxWrite(db, writeId, txId); - } - Self->CommitsInFlight.erase(txId); - Self->UpdateInsertTableCounters(); - Trigger = ETriggerActivities::POST_INSERT; - break; - } - case NKikimrTxColumnShard::TX_KIND_COMMIT_WRITE: { - NOlap::TSnapshot snapshot(step, txId); - Y_ABORT_UNLESS(Self->OperationsManager->CommitTransaction(*Self, txId, txc, snapshot)); - Trigger = ETriggerActivities::POST_INSERT; - break; - } - default: { - Y_ABORT("Unexpected TxKind"); - } - } - - // Currently transactions never fail and there are no dependencies between them - TxResults.emplace_back(TResultEvent(std::move(txInfo), NKikimrTxColumnShard::SUCCESS)); - + TxOperator = Self->ProgressTxController->GetVerifiedTxOperator(txId); + AFL_VERIFY(TxOperator->Progress(*Self, NOlap::TSnapshot(step, txId), txc)); Self->ProgressTxController->FinishPlannedTx(txId, txc); Self->RescheduleWaitingReads(); } @@ -148,24 +47,14 @@ public: void Complete(const TActorContext& ctx) override { NActors::TLogContextGuard logGuard = NActors::TLogContextBuilder::Build(NKikimrServices::TX_COLUMNSHARD)("tablet_id", Self->TabletID())("tx_state", "complete"); - - for (auto& rec : TxEvents) { - ctx.Send(rec.Target, rec.Event.Release(), 0, rec.Cookie); - } - - for (auto& res : TxResults) { - Self->ProgressTxController->CompleteRunningTx(TTxController::TPlanQueueItem(res.TxInfo.PlanStep, res.TxInfo.TxId)); - - auto event = res.MakeEvent(Self->TabletID()); - ctx.Send(res.TxInfo.Source, event.release(), 0, res.TxInfo.Cookie); + if (TxOperator) { + TxOperator->Complete(*Self, ctx); } Self->SetupIndexation(); } private: - std::vector<TResultEvent> TxResults; - std::vector<TEvent> TxEvents; - ETriggerActivities Trigger{ETriggerActivities::NONE}; + TTxController::ITransactionOperatior::TPtr TxOperator; const ui32 TabletTxNo; }; diff --git a/ydb/core/tx/columnshard/columnshard__propose_transaction.cpp b/ydb/core/tx/columnshard/columnshard__propose_transaction.cpp index 945b4f76b5..4f04f60ff8 100644 --- a/ydb/core/tx/columnshard/columnshard__propose_transaction.cpp +++ b/ydb/core/tx/columnshard/columnshard__propose_transaction.cpp @@ -32,6 +32,9 @@ private: TString TxSuffix() const { return TStringBuilder() << " at tablet " << Self->TabletID(); } + + void ConstructResult(TTxController::TProposeResult& proposeResult, const TTxController::TBasicTxInfo& txInfo); + TTxController::TProposeResult ProposeTtlDeprecated(const TString& txBody); }; @@ -46,257 +49,132 @@ bool TTxProposeTransaction::Execute(TTransactionContext& txc, const TActorContex auto& record = Proto(Ev->Get()); auto txKind = record.GetTxKind(); - //ui64 ssId = record.GetSchemeShardId(); ui64 txId = record.GetTxId(); auto& txBody = record.GetTxBody(); - auto status = NKikimrTxColumnShard::EResultStatus::ERROR; - TString statusMessage; - - ui64 minStep = 0; - ui64 maxStep = Max<ui64>(); - - switch (txKind) { - case NKikimrTxColumnShard::TX_KIND_SCHEMA: { - TColumnShard::TAlterMeta meta; - if (!meta.Body.ParseFromString(txBody)) { - statusMessage = TStringBuilder() - << "Schema TxId# " << txId << " cannot be parsed"; - status = NKikimrTxColumnShard::EResultStatus::SCHEMA_ERROR; - break; - } - - NOlap::ISnapshotSchema::TPtr currentSchema; - if (Self->TablesManager.HasPrimaryIndex()) { - currentSchema = Self->TablesManager.GetPrimaryIndexSafe().GetVersionedIndex().GetLastSchema(); - } - - // Invalid body generated at a newer SchemeShard - if (!meta.Validate(currentSchema)) { - statusMessage = TStringBuilder() - << "Schema TxId# " << txId << " cannot be proposed"; - status = NKikimrTxColumnShard::EResultStatus::SCHEMA_ERROR; - break; - } - - Y_ABORT_UNLESS(record.HasSchemeShardId()); - if (Self->CurrentSchemeShardId == 0) { - Self->CurrentSchemeShardId = record.GetSchemeShardId(); - Schema::SaveSpecialValue(db, Schema::EValueIds::CurrentSchemeShardId, Self->CurrentSchemeShardId); - } else { - Y_ABORT_UNLESS(Self->CurrentSchemeShardId == record.GetSchemeShardId()); - } - - auto seqNo = SeqNoFromProto(meta.Body.GetSeqNo()); - auto lastSeqNo = Self->LastSchemaSeqNo; - - // Check if proposal is outdated - if (seqNo < lastSeqNo) { - status = NKikimrTxColumnShard::SCHEMA_CHANGED; - statusMessage = TStringBuilder() - << "Ignoring outdated schema tx proposal at tablet " - << Self->TabletID() - << " txId " << txId - << " ssId " << Self->CurrentSchemeShardId - << " seqNo " << seqNo - << " lastSeqNo " << lastSeqNo; - LOG_S_INFO(TxPrefix() << statusMessage << TxSuffix()); - break; - } - - Self->UpdateSchemaSeqNo(seqNo, txc); - - // FIXME: current tests don't provide processing params! - // Y_DEBUG_ABORT_UNLESS(record.HasProcessingParams()); - if (!Self->ProcessingParams && record.HasProcessingParams()) { - Self->ProcessingParams.emplace().CopyFrom(record.GetProcessingParams()); - Schema::SaveSpecialProtoValue(db, Schema::EValueIds::ProcessingParams, *Self->ProcessingParams); - } - - // Always persist the latest metadata, this may include an updated seqno - Self->ProgressTxController->RegisterTx(txId, txKind, txBody, Ev->Get()->GetSource(), Ev->Cookie, txc); - - if (!Self->AltersInFlight.contains(txId)) { - Self->AltersInFlight.emplace(txId, std::move(meta)); - } else { - auto& existing = Self->AltersInFlight.at(txId); - existing.Body = std::move(meta.Body); - } - - LOG_S_DEBUG(TxPrefix() << "schema txId " << txId << TxSuffix()); - - status = NKikimrTxColumnShard::EResultStatus::PREPARED; - break; + + if (txKind == NKikimrTxColumnShard::TX_KIND_TTL) { + auto proposeResult = ProposeTtlDeprecated(txBody); + Result = std::make_unique<TEvColumnShard::TEvProposeTransactionResult>(Self->TabletID(), txKind, txId, proposeResult.GetStatus(), proposeResult.GetStatusMessage()); + return true; + } + + if (!Self->ProcessingParams && record.HasProcessingParams()) { + Self->ProcessingParams.emplace().CopyFrom(record.GetProcessingParams()); + Schema::SaveSpecialProtoValue(db, Schema::EValueIds::ProcessingParams, *Self->ProcessingParams); + } + + if (record.HasSchemeShardId()) { + if (Self->CurrentSchemeShardId == 0) { + Self->CurrentSchemeShardId = record.GetSchemeShardId(); + Schema::SaveSpecialValue(db, Schema::EValueIds::CurrentSchemeShardId, Self->CurrentSchemeShardId); + } else { + Y_ABORT_UNLESS(Self->CurrentSchemeShardId == record.GetSchemeShardId()); + } + } + + TTxController::TBasicTxInfo fakeTxInfo; + fakeTxInfo.TxId = txId; + fakeTxInfo.TxKind = txKind; + + auto txOperator = TTxController::ITransactionOperatior::TFactory::MakeHolder(txKind, fakeTxInfo); + if (!txOperator || !txOperator->Parse(txBody)) { + TTxController::TProposeResult proposeResult(NKikimrTxColumnShard::EResultStatus::ERROR, TStringBuilder() << "Error processing commit TxId# " << txId + << (txOperator ? ". Parsing error " : ". Unknown operator for txKind")); + ConstructResult(proposeResult, fakeTxInfo); + return true; + } + + auto txInfoPtr = Self->ProgressTxController->GetTxInfo(txId); + if (!!txInfoPtr) { + if (txInfoPtr->Source != Ev->Get()->GetSource() || txInfoPtr->Cookie != Ev->Cookie) { + TTxController::TProposeResult proposeResult(NKikimrTxColumnShard::EResultStatus::ERROR, TStringBuilder() << "Another commit TxId# " << txId << " has already been proposed"); + ConstructResult(proposeResult, fakeTxInfo); + } + TTxController::TProposeResult proposeResult; + ConstructResult(proposeResult, *txInfoPtr); + } else { + auto proposeResult = txOperator->Propose(*Self, txc, false); + if (!!proposeResult) { + const auto& txInfo = txOperator->TxWithDeadline() ? Self->ProgressTxController->RegisterTxWithDeadline(txId, txKind, txBody, Ev->Get()->GetSource(), Ev->Cookie, txc) + : Self->ProgressTxController->RegisterTx(txId, txKind, txBody, Ev->Get()->GetSource(), Ev->Cookie, txc); + + ConstructResult(proposeResult, txInfo); + } else { + ConstructResult(proposeResult, fakeTxInfo); + } + } + AFL_VERIFY(!!Result); + return true; +} + +TTxController::TProposeResult TTxProposeTransaction::ProposeTtlDeprecated(const TString& txBody) { + /// @note There's no tx guaranties now. For now TX_KIND_TTL is used to trigger TTL in tests only. + /// In future we could trigger TTL outside of tablet. Then we need real tx with complete notification. + // TODO: make real tx: save and progress with tablets restart support + + NKikimrTxColumnShard::TTtlTxBody ttlBody; + if (!ttlBody.ParseFromString(txBody)) { + return TTxController::TProposeResult(NKikimrTxColumnShard::EResultStatus::SCHEMA_ERROR, "TTL tx cannot be parsed"); + } + + // If no paths trigger schema defined TTL + THashMap<ui64, NOlap::TTiering> pathTtls; + if (!ttlBody.GetPathIds().empty()) { + auto unixTime = TInstant::Seconds(ttlBody.GetUnixTimeSeconds()); + if (!unixTime) { + return TTxController::TProposeResult(NKikimrTxColumnShard::EResultStatus::SCHEMA_ERROR, "TTL tx wrong timestamp"); + } + + TString columnName = ttlBody.GetTtlColumnName(); + if (columnName.empty()) { + return TTxController::TProposeResult(NKikimrTxColumnShard::EResultStatus::SCHEMA_ERROR, "TTL tx wrong TTL column ''"); } - case NKikimrTxColumnShard::TX_KIND_COMMIT: { - if (Self->CommitsInFlight.contains(txId)) { - LOG_S_DEBUG(TxPrefix() << "CommitTx (retry) TxId " << txId << TxSuffix()); - - auto txInfoPtr = Self->ProgressTxController->GetTxInfo(txId); - Y_ABORT_UNLESS(txInfoPtr); - - if (txInfoPtr->Source != Ev->Get()->GetSource() || txInfoPtr->Cookie != Ev->Cookie) { - statusMessage = TStringBuilder() - << "Another commit TxId# " << txId << " has already been proposed"; - break; - } - - maxStep = txInfoPtr->MaxStep; - minStep = txInfoPtr->MinStep; - status = NKikimrTxColumnShard::EResultStatus::PREPARED; - break; - } - - NKikimrTxColumnShard::TCommitTxBody body; - if (!body.ParseFromString(txBody)) { - statusMessage = TStringBuilder() - << "Commit TxId# " << txId << " cannot be parsed"; - break; - } - - if (body.GetWriteIds().empty()) { - statusMessage = TStringBuilder() - << "Commit TxId# " << txId << " has an empty list of write ids"; - break; - } - - if (body.GetTxInitiator() == 0) { - // When initiator is 0, this means it's a local write id - // Check that all write ids actually exist - bool failed = false; - for (ui64 writeId : body.GetWriteIds()) { - if (!Self->LongTxWrites.contains(TWriteId{writeId})) { - statusMessage = TStringBuilder() - << "Commit TxId# " << txId << " references WriteId# " << writeId - << " that no longer exists"; - failed = true; - break; - } - auto& lw = Self->LongTxWrites[TWriteId{writeId}]; - if (lw.PreparedTxId != 0) { - statusMessage = TStringBuilder() - << "Commit TxId# " << txId << " references WriteId# " << writeId - << " that is already locked by TxId# " << lw.PreparedTxId; - failed = true; - break; - } - } - if (failed) { - break; - } - } - - TColumnShard::TCommitMeta meta; - for (ui64 wId : body.GetWriteIds()) { - TWriteId writeId{wId}; - meta.AddWriteId(writeId); - Self->AddLongTxWrite(writeId, txId); - } - - const auto& txInfo = Self->ProgressTxController->RegisterTxWithDeadline(txId, txKind, txBody, Ev->Get()->GetSource(), Ev->Cookie, txc); - minStep = txInfo.MinStep; - maxStep = txInfo.MaxStep; - - Self->CommitsInFlight.emplace(txId, std::move(meta)); - - LOG_S_DEBUG(TxPrefix() << "CommitTx txId " << txId << TxSuffix()); - - status = NKikimrTxColumnShard::EResultStatus::PREPARED; - break; + + if (!Self->TablesManager.HasPrimaryIndex()) { + return TTxController::TProposeResult(NKikimrTxColumnShard::EResultStatus::SCHEMA_ERROR, "No primary index for TTL"); } - case NKikimrTxColumnShard::TX_KIND_TTL: { - /// @note There's no tx guaranties now. For now TX_KIND_TTL is used to trigger TTL in tests only. - /// In future we could trigger TTL outside of tablet. Then we need real tx with complete notification. - // TODO: make real tx: save and progress with tablets restart support - - NKikimrTxColumnShard::TTtlTxBody ttlBody; - if (!ttlBody.ParseFromString(txBody)) { - statusMessage = "TTL tx cannot be parsed"; - status = NKikimrTxColumnShard::EResultStatus::SCHEMA_ERROR; - break; - } - - // If no paths trigger schema defined TTL - THashMap<ui64, NOlap::TTiering> pathTtls; - if (!ttlBody.GetPathIds().empty()) { - auto unixTime = TInstant::Seconds(ttlBody.GetUnixTimeSeconds()); - if (!unixTime) { - statusMessage = "TTL tx wrong timestamp"; - status = NKikimrTxColumnShard::EResultStatus::SCHEMA_ERROR; - break; - } - - TString columnName = ttlBody.GetTtlColumnName(); - if (columnName.empty()) { - statusMessage = "TTL tx wrong TTL column ''"; - status = NKikimrTxColumnShard::EResultStatus::SCHEMA_ERROR; - break; - } - - if (!Self->TablesManager.HasPrimaryIndex()) { - statusMessage = "No primary index for TTL"; - status = NKikimrTxColumnShard::EResultStatus::SCHEMA_ERROR; - break; - } - - auto schema = Self->TablesManager.GetPrimaryIndexSafe().GetVersionedIndex().GetLastSchema()->GetSchema(); - auto ttlColumn = schema->GetFieldByName(columnName); - if (!ttlColumn) { - statusMessage = "TTL tx wrong TTL column '" + columnName + "'"; - status = NKikimrTxColumnShard::EResultStatus::SCHEMA_ERROR; - break; - } - - if (statusMessage.empty()) { - const TInstant now = TlsActivationContext ? AppData()->TimeProvider->Now() : TInstant::Now(); - for (ui64 pathId : ttlBody.GetPathIds()) { - NOlap::TTiering tiering; - tiering.Ttl = NOlap::TTierInfo::MakeTtl(now - unixTime, columnName); - pathTtls.emplace(pathId, std::move(tiering)); - } - } - } - - if (statusMessage.empty()) { - if (Self->SetupTtl(pathTtls, true)) { - status = NKikimrTxColumnShard::EResultStatus::SUCCESS; - } else { - statusMessage = "TTL not started"; - } - } - - break; + + auto schema = Self->TablesManager.GetPrimaryIndexSafe().GetVersionedIndex().GetLastSchema()->GetSchema(); + auto ttlColumn = schema->GetFieldByName(columnName); + if (!ttlColumn) { + return TTxController::TProposeResult(NKikimrTxColumnShard::EResultStatus::SCHEMA_ERROR, "TTL tx wrong TTL column '" + columnName + "'"); } - default: { - statusMessage = TStringBuilder() - << "Unsupported TxKind# " << ui32(txKind) << " TxId# " << txId; + + const TInstant now = TlsActivationContext ? AppData()->TimeProvider->Now() : TInstant::Now(); + for (ui64 pathId : ttlBody.GetPathIds()) { + NOlap::TTiering tiering; + tiering.Ttl = NOlap::TTierInfo::MakeTtl(now - unixTime, columnName); + pathTtls.emplace(pathId, std::move(tiering)); } } + if (!Self->SetupTtl(pathTtls, true)) { + return TTxController::TProposeResult(NKikimrTxColumnShard::EResultStatus::SCHEMA_ERROR, "TTL not started"); + } - Result = std::make_unique<TEvColumnShard::TEvProposeTransactionResult>(Self->TabletID(), txKind, txId, status, statusMessage); + return TTxController::TProposeResult(); +} - if (status == NKikimrTxColumnShard::EResultStatus::PREPARED) { +void TTxProposeTransaction::ConstructResult(TTxController::TProposeResult& proposeResult, const TTxController::TBasicTxInfo& txInfo) { + Result = std::make_unique<TEvColumnShard::TEvProposeTransactionResult>(Self->TabletID(), txInfo.TxKind, txInfo.TxId, proposeResult.GetStatus(), proposeResult.GetStatusMessage()); + if (proposeResult.GetStatus() == NKikimrTxColumnShard::EResultStatus::PREPARED) { Self->IncCounter(COUNTER_PREPARE_SUCCESS); - Result->Record.SetMinStep(minStep); - Result->Record.SetMaxStep(maxStep); + Result->Record.SetMinStep(txInfo.MinStep); + Result->Record.SetMaxStep(txInfo.MaxStep); if (Self->ProcessingParams) { Result->Record.MutableDomainCoordinators()->CopyFrom(Self->ProcessingParams->GetCoordinators()); } - } else if (status == NKikimrTxColumnShard::EResultStatus::SUCCESS) { + } else if (proposeResult.GetStatus() == NKikimrTxColumnShard::EResultStatus::SUCCESS) { Self->IncCounter(COUNTER_PREPARE_SUCCESS); } else { Self->IncCounter(COUNTER_PREPARE_ERROR); - LOG_S_INFO(TxPrefix() << "error txId " << txId << " " << statusMessage << TxSuffix()); + LOG_S_INFO(TxPrefix() << "error txId " << txInfo.TxId << " " << proposeResult.GetStatusMessage() << TxSuffix()); } - return true; } void TTxProposeTransaction::Complete(const TActorContext& ctx) { Y_ABORT_UNLESS(Ev); Y_ABORT_UNLESS(Result); - LOG_S_DEBUG(TxPrefix() << "complete" << TxSuffix()); - ctx.Send(Ev->Get()->GetSource(), Result.release()); - Self->TryRegisterMediatorTimeCast(); } diff --git a/ydb/core/tx/columnshard/columnshard_impl.cpp b/ydb/core/tx/columnshard/columnshard_impl.cpp index 6869cbef8c..5d5df25e8f 100644 --- a/ydb/core/tx/columnshard/columnshard_impl.cpp +++ b/ydb/core/tx/columnshard/columnshard_impl.cpp @@ -48,92 +48,6 @@ NTabletPipe::TClientConfig GetPipeClientConfig() { return config; } -bool ValidateTableSchema(const NKikimrSchemeOp::TColumnTableSchema& schema) { - namespace NTypeIds = NScheme::NTypeIds; - - static const THashSet<NScheme::TTypeId> supportedTypes = { - NTypeIds::Timestamp, - NTypeIds::Int8, - NTypeIds::Int16, - NTypeIds::Int32, - NTypeIds::Int64, - NTypeIds::Uint8, - NTypeIds::Uint16, - NTypeIds::Uint32, - NTypeIds::Uint64, - NTypeIds::Date, - NTypeIds::Datetime, - //NTypeIds::Interval, - //NTypeIds::Float, - //NTypeIds::Double, - NTypeIds::String, - NTypeIds::Utf8 - }; - - if (!schema.HasEngine() || - schema.GetEngine() != NKikimrSchemeOp::EColumnTableEngine::COLUMN_ENGINE_REPLACING_TIMESERIES) { - return false; - } - - if (!schema.KeyColumnNamesSize()) { - return false; - } - - TString firstKeyColumn = schema.GetKeyColumnNames()[0]; - THashSet<TString> keyColumns(schema.GetKeyColumnNames().begin(), schema.GetKeyColumnNames().end()); - - for (const NKikimrSchemeOp::TOlapColumnDescription& column : schema.GetColumns()) { - TString name = column.GetName(); - /* - if (column.GetNotNull() && keyColumns.contains(name)) { - return false; - } - */ - if (name == firstKeyColumn && !supportedTypes.contains(column.GetTypeId())) { - return false; - } - keyColumns.erase(name); - } - - if (!keyColumns.empty()) { - return false; - } - return true; -} - -bool ValidateTablePreset(const NKikimrSchemeOp::TColumnTableSchemaPreset& preset) { - if (preset.HasName() && preset.GetName() != "default") { - return false; - } - return ValidateTableSchema(preset.GetSchema()); -} - -} - -bool TColumnShard::TAlterMeta::Validate(const NOlap::ISnapshotSchema::TPtr& /*schema*/) const { - switch (Body.TxBody_case()) { - case NKikimrTxColumnShard::TSchemaTxBody::kInitShard: - break; - case NKikimrTxColumnShard::TSchemaTxBody::kEnsureTables: - for (auto& table : Body.GetEnsureTables().GetTables()) { - if (table.HasSchemaPreset() && !ValidateTablePreset(table.GetSchemaPreset())) { - return false; - } - if (table.HasSchema() && !ValidateTableSchema(table.GetSchema())) { - return false; - } - // TODO: validate TtlSettings - } - break; - case NKikimrTxColumnShard::TSchemaTxBody::kAlterTable: - return true; - case NKikimrTxColumnShard::TSchemaTxBody::kAlterStore: - return true; - case NKikimrTxColumnShard::TSchemaTxBody::kDropTable: - case NKikimrTxColumnShard::TSchemaTxBody::TXBODY_NOT_SET: - break; - } - return true; } class TColumnShard::TStoragesManager: public NOlap::IStoragesManager { @@ -374,42 +288,6 @@ bool TColumnShard::RemoveLongTxWrite(NIceDb::TNiceDb& db, TWriteId writeId, ui64 return false; } -bool TColumnShard::AbortTx(const ui64 txId, const NKikimrTxColumnShard::ETransactionKind& txKind, NTabletFlatExecutor::TTransactionContext& txc) { - switch (txKind) { - case NKikimrTxColumnShard::TX_KIND_SCHEMA: { - AltersInFlight.erase(txId); - break; - } - case NKikimrTxColumnShard::TX_KIND_COMMIT: { - NIceDb::TNiceDb db(txc.DB); - if (auto* meta = CommitsInFlight.FindPtr(txId)) { - for (TWriteId writeId : meta->WriteIds) { - // TODO: we probably need to have more complex - // logic in the future, when there are multiple - // inflight commits for the same writeId. - RemoveLongTxWrite(db, writeId, txId); - } - TBlobGroupSelector dsGroupSelector(Info()); - NOlap::TDbWrapper dbTable(txc.DB, &dsGroupSelector); - InsertTable->Abort(dbTable, meta->WriteIds); - - CommitsInFlight.erase(txId); - } - break; - } - case NKikimrTxColumnShard::TX_KIND_COMMIT_WRITE: { - if (!OperationsManager->AbortTransaction(*this, txId, txc)) { - return false; - } - break; - } - default: { - Y_ABORT("Unsupported TxKind"); - } - } - return true; -} - void TColumnShard::TryAbortWrites(NIceDb::TNiceDb& db, NOlap::TDbWrapper& dbTable, THashSet<TWriteId>&& writesToAbort) { std::vector<TWriteId> failedAborts; for (auto& writeId : writesToAbort) { diff --git a/ydb/core/tx/columnshard/columnshard_impl.h b/ydb/core/tx/columnshard/columnshard_impl.h index 933112a594..00e72583c2 100644 --- a/ydb/core/tx/columnshard/columnshard_impl.h +++ b/ydb/core/tx/columnshard/columnshard_impl.h @@ -8,7 +8,7 @@ #include "columnshard_private_events.h" #include "blob_manager.h" #include "tables_manager.h" -#include "tx_controller.h" +#include "transactions/tx_controller.h" #include "inflight_request_tracker.h" #include "counters/columnshard.h" #include "resource_subscriber/counters.h" @@ -131,6 +131,10 @@ class TColumnShard friend class TOperationsManager; friend class TWriteOperation; + friend class TSchemaTransactionOperator; + friend class TLongTxTransactionOperator; + friend class TEvWriteTransactionOperator; + class TTxProgressTx; class TTxProposeCancel; // proto @@ -281,21 +285,6 @@ private: std::unique_ptr<TTxController> ProgressTxController; std::unique_ptr<TOperationsManager> OperationsManager; - struct TAlterMeta { - NKikimrTxColumnShard::TSchemaTxBody Body; - THashSet<TActorId> NotifySubscribers; - - bool Validate(const NOlap::ISnapshotSchema::TPtr& schema) const; - }; - - struct TCommitMeta { - THashSet<TWriteId> WriteIds; - - void AddWriteId(TWriteId id) { - WriteIds.insert(id); - } - }; - using TSchemaPreset = TSchemaPreset; using TTableInfo = TTableInfo; @@ -414,8 +403,6 @@ private: bool ProgressTxInFlight = false; THashMap<ui64, TInstant> ScanTxInFlight; - THashMap<ui64, TAlterMeta> AltersInFlight; - THashMap<ui64, TCommitMeta> CommitsInFlight; // key is TxId from propose THashMap<TWriteId, TLongTxWriteInfo> LongTxWrites; using TPartsForLTXShard = THashMap<ui32, TLongTxWriteInfo*>; THashMap<TULID, TPartsForLTXShard> LongTxWritesByUniqueId; @@ -441,8 +428,6 @@ private: void AddLongTxWrite(TWriteId writeId, ui64 txId); void LoadLongTxWrite(TWriteId writeId, const ui32 writePartId, const NLongTxService::TLongTxId& longTxId); bool RemoveLongTxWrite(NIceDb::TNiceDb& db, TWriteId writeId, ui64 txId = 0); - bool AbortTx(const ui64 txId, const NKikimrTxColumnShard::ETransactionKind& txKind, NTabletFlatExecutor::TTransactionContext& txc); - bool LoadTx(const ui64 txId, const NKikimrTxColumnShard::ETransactionKind& txKind, const TString& txBody); void TryAbortWrites(NIceDb::TNiceDb& db, NOlap::TDbWrapper& dbTable, THashSet<TWriteId>&& writesToAbort); TWriteId BuildNextWriteId(NTabletFlatExecutor::TTransactionContext& txc); diff --git a/ydb/core/tx/columnshard/transactions/operators/ev_write.cpp b/ydb/core/tx/columnshard/transactions/operators/ev_write.cpp new file mode 100644 index 0000000000..927cfabadc --- /dev/null +++ b/ydb/core/tx/columnshard/transactions/operators/ev_write.cpp @@ -0,0 +1,3 @@ +#include "ev_write.h" + +namespace NKikimr::NColumnShard {} diff --git a/ydb/core/tx/columnshard/transactions/operators/ev_write.h b/ydb/core/tx/columnshard/transactions/operators/ev_write.h new file mode 100644 index 0000000000..b31f381df5 --- /dev/null +++ b/ydb/core/tx/columnshard/transactions/operators/ev_write.h @@ -0,0 +1,38 @@ +#pragma once + +#include <ydb/core/tx/columnshard/columnshard_impl.h> + +namespace NKikimr::NColumnShard { + + class TEvWriteTransactionOperator : public TTxController::ITransactionOperatior { + using TBase = TTxController::ITransactionOperatior; + using TProposeResult = TTxController::TProposeResult; + static inline auto Registrator = TFactory::TRegistrator<TEvWriteTransactionOperator>(NKikimrTxColumnShard::TX_KIND_COMMIT_WRITE); + public: + using TBase::TBase; + + virtual bool Parse(const TString& data) override { + Y_UNUSED(data); + return true; + } + + TProposeResult Propose(TColumnShard& /*owner*/, NTabletFlatExecutor::TTransactionContext& /*txc*/, bool /*proposed*/) const override { + return TProposeResult(); + } + + virtual bool Progress(TColumnShard& owner, const NOlap::TSnapshot& version, NTabletFlatExecutor::TTransactionContext& txc) override { + return owner.OperationsManager->CommitTransaction(owner, GetTxId(), txc, version); + } + + virtual bool Complete(TColumnShard& owner, const TActorContext& ctx) override { + auto result = NEvents::TDataEvents::TEvWriteResult::BuildCommited(owner.TabletID(), GetTxId()); + ctx.Send(TxInfo.Source, result.release(), 0, TxInfo.Cookie); + return true; + } + + virtual bool Abort(TColumnShard& owner, NTabletFlatExecutor::TTransactionContext& txc) override { + return owner.OperationsManager->AbortTransaction(owner, GetTxId(), txc); + } + }; + +} diff --git a/ydb/core/tx/columnshard/transactions/operators/long_tx_write.cpp b/ydb/core/tx/columnshard/transactions/operators/long_tx_write.cpp new file mode 100644 index 0000000000..e10c1dc4a7 --- /dev/null +++ b/ydb/core/tx/columnshard/transactions/operators/long_tx_write.cpp @@ -0,0 +1,3 @@ +#include "long_tx_write.h" + +namespace NKikimr::NColumnShard {} diff --git a/ydb/core/tx/columnshard/transactions/operators/long_tx_write.h b/ydb/core/tx/columnshard/transactions/operators/long_tx_write.h new file mode 100644 index 0000000000..7ef86171f3 --- /dev/null +++ b/ydb/core/tx/columnshard/transactions/operators/long_tx_write.h @@ -0,0 +1,103 @@ +#pragma once + +#include <ydb/core/tx/columnshard/columnshard_impl.h> + +namespace NKikimr::NColumnShard { + + class TLongTxTransactionOperator : public TTxController::ITransactionOperatior { + using TBase = TTxController::ITransactionOperatior; + using TProposeResult = TTxController::TProposeResult; + static inline auto Registrator = TFactory::TRegistrator<TLongTxTransactionOperator>(NKikimrTxColumnShard::TX_KIND_COMMIT); + public: + using TBase::TBase; + + bool Parse(const TString& data) override { + NKikimrTxColumnShard::TCommitTxBody commitTxBody; + if (!commitTxBody.ParseFromString(data)) { + return false; + } + + for (auto& id : commitTxBody.GetWriteIds()) { + WriteIds.insert(TWriteId{id}); + } + return true; + } + + void OnTabletInit(TColumnShard& owner) override { + for (auto&& writeId : WriteIds) { + Y_ABORT_UNLESS(owner.LongTxWrites.contains(writeId), "TTxInit at %" PRIu64 " : Commit %" PRIu64 " references local write %" PRIu64 " that doesn't exist", + owner.TabletID(), GetTxId(), (ui64)writeId); + owner.AddLongTxWrite(writeId, GetTxId()); + } + } + + TProposeResult Propose(TColumnShard& owner, NTabletFlatExecutor::TTransactionContext& /*txc*/, bool /*proposed*/) const override { + if (WriteIds.empty()) { + return TProposeResult(NKikimrTxColumnShard::EResultStatus::ERROR, + TStringBuilder() << "Commit TxId# " << GetTxId() << " has an empty list of write ids"); + } + + for (auto&& writeId : WriteIds) { + if (!owner.LongTxWrites.contains(writeId)) { + return TProposeResult(NKikimrTxColumnShard::EResultStatus::ERROR, + TStringBuilder() << "Commit TxId# " << GetTxId() << " references WriteId# " << (ui64)writeId << " that no longer exists"); + } + auto& lw = owner.LongTxWrites[writeId]; + if (lw.PreparedTxId != 0) { + return TProposeResult(NKikimrTxColumnShard::EResultStatus::ERROR, + TStringBuilder() << "Commit TxId# " << GetTxId() << " references WriteId# " << (ui64)writeId << " that is already locked by TxId# " << lw.PreparedTxId); + } + } + + for (auto&& writeId : WriteIds) { + owner.AddLongTxWrite(writeId, GetTxId()); + } + return TProposeResult();; + } + + bool Progress(TColumnShard& owner, const NOlap::TSnapshot& version, NTabletFlatExecutor::TTransactionContext& txc) override { + TBlobGroupSelector dsGroupSelector(owner.Info()); + NOlap::TDbWrapper dbTable(txc.DB, &dsGroupSelector); + + auto pathExists = [&](ui64 pathId) { + return owner.TablesManager.HasTable(pathId); + }; + + auto counters = owner.InsertTable->Commit(dbTable, version.GetPlanStep(), version.GetTxId(), WriteIds, + pathExists); + + owner.IncCounter(COUNTER_BLOBS_COMMITTED, counters.Rows); + owner.IncCounter(COUNTER_BYTES_COMMITTED, counters.Bytes); + owner.IncCounter(COUNTER_RAW_BYTES_COMMITTED, counters.RawBytes); + + NIceDb::TNiceDb db(txc.DB); + for (TWriteId writeId : WriteIds) { + owner.RemoveLongTxWrite(db, writeId, GetTxId()); + } + owner.UpdateInsertTableCounters(); + return true; + } + + bool Complete(TColumnShard& owner, const TActorContext& ctx) override { + auto result = std::make_unique<TEvColumnShard::TEvProposeTransactionResult>( + owner.TabletID(), TxInfo.TxKind, GetTxId(), NKikimrTxColumnShard::SUCCESS); + result->Record.SetStep(TxInfo.PlanStep); + ctx.Send(TxInfo.Source, result.release(), 0, TxInfo.Cookie); + return true; + } + + bool Abort(TColumnShard& owner, NTabletFlatExecutor::TTransactionContext& txc) override { + NIceDb::TNiceDb db(txc.DB); + for (TWriteId writeId : WriteIds) { + owner.RemoveLongTxWrite(db, writeId, GetTxId()); + } + TBlobGroupSelector dsGroupSelector(owner.Info()); + NOlap::TDbWrapper dbTable(txc.DB, &dsGroupSelector); + owner.InsertTable->Abort(dbTable, WriteIds); + return true; + } + + private: + THashSet<TWriteId> WriteIds; + }; +} diff --git a/ydb/core/tx/columnshard/transactions/operators/schema.cpp b/ydb/core/tx/columnshard/transactions/operators/schema.cpp new file mode 100644 index 0000000000..b9979fd754 --- /dev/null +++ b/ydb/core/tx/columnshard/transactions/operators/schema.cpp @@ -0,0 +1,3 @@ +#include "schema.h" + +namespace NKikimr::NColumnShard {} diff --git a/ydb/core/tx/columnshard/transactions/operators/schema.h b/ydb/core/tx/columnshard/transactions/operators/schema.h new file mode 100644 index 0000000000..78940db370 --- /dev/null +++ b/ydb/core/tx/columnshard/transactions/operators/schema.h @@ -0,0 +1,159 @@ +#pragma once + +#include <ydb/core/tx/columnshard/columnshard_impl.h> + +namespace NKikimr::NColumnShard { + + class TSchemaTransactionOperator : public TTxController::ITransactionOperatior { + using TBase = TTxController::ITransactionOperatior; + using TProposeResult = TTxController::TProposeResult; + static inline auto Registrator = TFactory::TRegistrator<TSchemaTransactionOperator>(NKikimrTxColumnShard::TX_KIND_SCHEMA); + public: + using TBase::TBase; + + virtual bool Parse(const TString& data) override { + if (!SchemaTxBody.ParseFromString(data)) { + return false; + } + return true; + } + + bool TxWithDeadline() const override { + return false; + } + + TProposeResult Propose(TColumnShard& owner, NTabletFlatExecutor::TTransactionContext& txc, bool /*proposed*/) const override { + switch (SchemaTxBody.TxBody_case()) { + case NKikimrTxColumnShard::TSchemaTxBody::kInitShard: + break; + case NKikimrTxColumnShard::TSchemaTxBody::kEnsureTables: + for (auto& table : SchemaTxBody.GetEnsureTables().GetTables()) { + if (table.HasSchemaPreset() && !ValidateTablePreset(table.GetSchemaPreset())) { + return TProposeResult(NKikimrTxColumnShard::EResultStatus::ERROR, "Invalid schema"); + } + if (table.HasSchema() && !ValidateTableSchema(table.GetSchema())) { + return TProposeResult(NKikimrTxColumnShard::EResultStatus::ERROR, "Invalid schema"); + } + } + break; + case NKikimrTxColumnShard::TSchemaTxBody::kAlterTable: + case NKikimrTxColumnShard::TSchemaTxBody::kAlterStore: + case NKikimrTxColumnShard::TSchemaTxBody::kDropTable: + case NKikimrTxColumnShard::TSchemaTxBody::TXBODY_NOT_SET: + break; + } + + auto seqNo = SeqNoFromProto(SchemaTxBody.GetSeqNo()); + auto lastSeqNo = owner.LastSchemaSeqNo; + + // Check if proposal is outdated + if (seqNo < lastSeqNo) { + auto errorMessage = TStringBuilder() + << "Ignoring outdated schema tx proposal at tablet " + << owner.TabletID() + << " txId " << GetTxId() + << " ssId " << owner.CurrentSchemeShardId + << " seqNo " << seqNo + << " lastSeqNo " << lastSeqNo; + return TProposeResult(NKikimrTxColumnShard::EResultStatus::SCHEMA_CHANGED, errorMessage); + } + + owner.UpdateSchemaSeqNo(seqNo, txc); + return TProposeResult(); + } + + virtual bool Progress(TColumnShard& owner, const NOlap::TSnapshot& version, NTabletFlatExecutor::TTransactionContext& txc) override { + owner.RunSchemaTx(SchemaTxBody, version, txc); + owner.ProtectSchemaSeqNo(SchemaTxBody.GetSeqNo(), txc); + return true; + } + + virtual bool Complete(TColumnShard& owner, const TActorContext& ctx) override { + for (TActorId subscriber : NotifySubscribers) { + auto event = MakeHolder<TEvColumnShard::TEvNotifyTxCompletionResult>(owner.TabletID(), GetTxId()); + ctx.Send(subscriber, event.Release(), 0, 0); + } + + auto result = std::make_unique<TEvColumnShard::TEvProposeTransactionResult>( + owner.TabletID(), TxInfo.TxKind, TxInfo.TxId, NKikimrTxColumnShard::SUCCESS); + result->Record.SetStep(TxInfo.PlanStep); + ctx.Send(TxInfo.Source, result.release(), 0, TxInfo.Cookie); + return true; + } + + virtual bool Abort(TColumnShard& owner, NTabletFlatExecutor::TTransactionContext& txc) override { + Y_UNUSED(owner, txc); + return true; + } + + virtual void RegisterSubscriber(const TActorId& actorId) override { + NotifySubscribers.insert(actorId); + } + + private: + bool ValidateTableSchema(const NKikimrSchemeOp::TColumnTableSchema& schema) const { + namespace NTypeIds = NScheme::NTypeIds; + + static const THashSet<NScheme::TTypeId> supportedTypes = { + NTypeIds::Timestamp, + NTypeIds::Int8, + NTypeIds::Int16, + NTypeIds::Int32, + NTypeIds::Int64, + NTypeIds::Uint8, + NTypeIds::Uint16, + NTypeIds::Uint32, + NTypeIds::Uint64, + NTypeIds::Date, + NTypeIds::Datetime, + //NTypeIds::Interval, + //NTypeIds::Float, + //NTypeIds::Double, + NTypeIds::String, + NTypeIds::Utf8 + }; + + if (!schema.HasEngine() || + schema.GetEngine() != NKikimrSchemeOp::EColumnTableEngine::COLUMN_ENGINE_REPLACING_TIMESERIES) { + return false; + } + + if (!schema.KeyColumnNamesSize()) { + return false; + } + + TString firstKeyColumn = schema.GetKeyColumnNames()[0]; + THashSet<TString> keyColumns(schema.GetKeyColumnNames().begin(), schema.GetKeyColumnNames().end()); + + for (const NKikimrSchemeOp::TOlapColumnDescription& column : schema.GetColumns()) { + TString name = column.GetName(); + /* + if (column.GetNotNull() && keyColumns.contains(name)) { + return false; + } + */ + if (name == firstKeyColumn && !supportedTypes.contains(column.GetTypeId())) { + return false; + } + keyColumns.erase(name); + } + + if (!keyColumns.empty()) { + return false; + } + return true; + } + + bool ValidateTablePreset(const NKikimrSchemeOp::TColumnTableSchemaPreset& preset) const { + if (preset.HasName() && preset.GetName() != "default") { + return false; + } + return ValidateTableSchema(preset.GetSchema()); + } + + private: + NKikimrTxColumnShard::TSchemaTxBody SchemaTxBody; + THashSet<TActorId> NotifySubscribers; + }; + +} diff --git a/ydb/core/tx/columnshard/transactions/operators/ya.make b/ydb/core/tx/columnshard/transactions/operators/ya.make new file mode 100644 index 0000000000..449ec29a13 --- /dev/null +++ b/ydb/core/tx/columnshard/transactions/operators/ya.make @@ -0,0 +1,13 @@ +LIBRARY() + +SRCS( + GLOBAL schema.cpp + GLOBAL long_tx_write.cpp + GLOBAL ev_write.cpp +) + +PEERDIR( + ydb/core/tx/columnshard/transactions +) + +END() diff --git a/ydb/core/tx/columnshard/tx_controller.cpp b/ydb/core/tx/columnshard/transactions/tx_controller.cpp index b2943fd462..1137456d3d 100644 --- a/ydb/core/tx/columnshard/tx_controller.cpp +++ b/ydb/core/tx/columnshard/transactions/tx_controller.cpp @@ -1,5 +1,5 @@ #include "tx_controller.h" -#include "columnshard_impl.h" +#include <ydb/core/tx/columnshard/columnshard_impl.h> namespace NKikimr::NColumnShard { @@ -64,7 +64,10 @@ bool TTxController::Load(NTabletFlatExecutor::TTransactionContext& txc) { } const TString txBody = rowset.GetValue<Schema::TxInfo::TxBody>(); - Y_ABORT_UNLESS(Owner.LoadTx(txId, txInfo.TxKind, txBody)); + ITransactionOperatior::TPtr txOperator(ITransactionOperatior::TFactory::Construct(txInfo.TxKind, txInfo)); + Y_ABORT_UNLESS(!!txOperator); + Y_ABORT_UNLESS(txOperator->Parse(txBody)); + Operators[txId] = txOperator; if (!rowset.Next()) { return false; @@ -73,6 +76,20 @@ bool TTxController::Load(NTabletFlatExecutor::TTransactionContext& txc) { return true; } +TTxController::ITransactionOperatior::TPtr TTxController::GetTxOperator(const ui64 txId) { + auto it = Operators.find(txId); + if(it == Operators.end()) { + return nullptr; + } + return it->second; +} + +TTxController::ITransactionOperatior::TPtr TTxController::GetVerifiedTxOperator(const ui64 txId) { + auto it = Operators.find(txId); + AFL_VERIFY(it != Operators.end())("tx_id", txId); + return it->second; +} + const TTxController::TBasicTxInfo& TTxController::RegisterTx(const ui64 txId, const NKikimrTxColumnShard::ETransactionKind& txKind, const TString& txBody, const TActorId& source, const ui64 cookie, NTabletFlatExecutor::TTransactionContext& txc) { NIceDb::TNiceDb db(txc.DB); @@ -81,6 +98,12 @@ const TTxController::TBasicTxInfo& TTxController::RegisterTx(const ui64 txId, co txInfo.TxKind = txKind; txInfo.Source = source; txInfo.Cookie = cookie; + + ITransactionOperatior::TPtr txOperator(ITransactionOperatior::TFactory::Construct(txInfo.TxKind, txInfo)); + Y_ABORT_UNLESS(!!txOperator); + Y_ABORT_UNLESS(txOperator->Parse(txBody)); + Operators[txId] = txOperator; + Schema::SaveTxInfo(db, txInfo.TxId, txInfo.TxKind, txBody, Max<ui64>(), txInfo.Source, txInfo.Cookie); return txInfo; } @@ -95,6 +118,12 @@ const TTxController::TBasicTxInfo& TTxController::RegisterTxWithDeadline(const u txInfo.Cookie = cookie; txInfo.MinStep = GetAllowedStep(); txInfo.MaxStep = txInfo.MinStep + MaxCommitTxDelay.MilliSeconds(); + + ITransactionOperatior::TPtr txOperator(ITransactionOperatior::TFactory::Construct(txInfo.TxKind, txInfo)); + Y_ABORT_UNLESS(!!txOperator); + Y_ABORT_UNLESS(txOperator->Parse(txBody)); + Operators[txId] = txOperator; + Schema::SaveTxInfo(db, txInfo.TxId, txInfo.TxKind, txBody, txInfo.MaxStep, txInfo.Source, txInfo.Cookie); DeadlineQueue.emplace(txInfo.MaxStep, txId); return txInfo; @@ -106,11 +135,16 @@ bool TTxController::AbortTx(const ui64 txId, NTabletFlatExecutor::TTransactionCo return true; } Y_ABORT_UNLESS(it->second.PlanStep == 0); - Owner.AbortTx(txId, it->second.TxKind, txc); + + auto opIt = Operators.find(txId); + Y_ABORT_UNLESS(opIt != Operators.end()); + opIt->second->Abort(Owner, txc); + if (it->second.MaxStep != Max<ui64>()) { DeadlineQueue.erase(TPlanQueueItem(it->second.MaxStep, txId)); } BasicTxInfo.erase(it); + Operators.erase(txId); NIceDb::TNiceDb db(txc.DB); Schema::EraseTxInfo(db, txId); return true; @@ -125,11 +159,16 @@ bool TTxController::CancelTx(const ui64 txId, NTabletFlatExecutor::TTransactionC // Cannot cancel planned transaction return false; } - Owner.AbortTx(txId, it->second.TxKind, txc); + + auto opIt = Operators.find(txId); + Y_ABORT_UNLESS(opIt != Operators.end()); + opIt->second->Abort(Owner, txc); + if (it->second.MaxStep != Max<ui64>()) { DeadlineQueue.erase(TPlanQueueItem(it->second.MaxStep, txId)); } BasicTxInfo.erase(it); + Operators.erase(txId); NIceDb::TNiceDb db(txc.DB); Schema::EraseTxInfo(db, txId); return true; @@ -152,6 +191,7 @@ void TTxController::FinishPlannedTx(const ui64 txId, NTabletFlatExecutor::TTrans NIceDb::TNiceDb db(txc.DB); BasicTxInfo.erase(txId); + Operators.erase(txId); Schema::EraseTxInfo(db, txId); } @@ -217,4 +257,15 @@ TTxController::EPlanResult TTxController::PlanTx(const ui64 planStep, const ui64 return EPlanResult::AlreadyPlanned; } +void TTxController::OnTabletInit() { + for (auto&& txOperator : Operators) { + txOperator.second->OnTabletInit(Owner); + } +} + +} + +template <> +void Out<NKikimrTxColumnShard::ETransactionKind>(IOutputStream& out, TTypeTraits<NKikimrTxColumnShard::ETransactionKind>::TFuncParam txKind) { + out << (ui64) txKind; } diff --git a/ydb/core/tx/columnshard/tx_controller.h b/ydb/core/tx/columnshard/transactions/tx_controller.h index cadeaf7ccb..fb38fae9d3 100644 --- a/ydb/core/tx/columnshard/tx_controller.h +++ b/ydb/core/tx/columnshard/transactions/tx_controller.h @@ -1,6 +1,6 @@ #pragma once -#include "columnshard_schema.h" +#include <ydb/core/tx/columnshard/columnshard_schema.h> #include <ydb/core/tablet_flat/tablet_flat_executed.h> #include <ydb/core/tx/data_events/events.h> @@ -36,6 +36,54 @@ public: NKikimrTxColumnShard::ETransactionKind TxKind; }; + class TProposeResult { + YDB_READONLY(NKikimrTxColumnShard::EResultStatus, Status, NKikimrTxColumnShard::EResultStatus::PREPARED); + YDB_READONLY_DEF(TString, StatusMessage); + public: + TProposeResult() = default; + TProposeResult(NKikimrTxColumnShard::EResultStatus status, const TString& statusMessage) + : Status(status) + , StatusMessage(statusMessage) + {} + + bool operator!() const { + return Status != NKikimrTxColumnShard::EResultStatus::PREPARED; + } + }; + + class ITransactionOperatior { + protected: + TBasicTxInfo TxInfo; + public: + using TPtr = std::shared_ptr<ITransactionOperatior>; + using TFactory = NObjectFactory::TParametrizedObjectFactory<ITransactionOperatior, NKikimrTxColumnShard::ETransactionKind, TBasicTxInfo>; + + ITransactionOperatior(const TBasicTxInfo& txInfo) + : TxInfo(txInfo) + {} + + ui64 GetTxId() const { + return TxInfo.TxId; + } + + virtual ~ITransactionOperatior() {} + + virtual bool TxWithDeadline() const { + return true; + } + + virtual bool Parse(const TString& data) = 0; + virtual TProposeResult Propose(TColumnShard& owner, NTabletFlatExecutor::TTransactionContext& txc, bool proposed) const = 0; + + virtual bool Progress(TColumnShard& owner, const NOlap::TSnapshot& version, NTabletFlatExecutor::TTransactionContext& txc) = 0; + virtual bool Abort(TColumnShard& owner, NTabletFlatExecutor::TTransactionContext& txc) = 0; + virtual bool Complete(TColumnShard& owner, const TActorContext& ctx) = 0; + virtual void RegisterSubscriber(const TActorId&) { + AFL_VERIFY(false)("message", "Not implemented"); + }; + virtual void OnTabletInit(TColumnShard& /*owner*/) {} + }; + private: const TDuration MaxCommitTxDelay = TDuration::Seconds(30); TColumnShard& Owner; @@ -44,6 +92,8 @@ private: std::set<TPlanQueueItem> PlanQueue; std::set<TPlanQueueItem> RunningQueue; + THashMap<ui64, ITransactionOperatior::TPtr> Operators; + private: ui64 GetAllowedStep() const; bool AbortTx(const ui64 txId, NTabletFlatExecutor::TTransactionContext& txc); @@ -51,6 +101,9 @@ private: public: TTxController(TColumnShard& owner); + ITransactionOperatior::TPtr GetTxOperator(const ui64 txId); + ITransactionOperatior::TPtr GetVerifiedTxOperator(const ui64 txId); + ui64 GetMemoryUsage() const; bool HaveOutdatedTxs() const; @@ -79,6 +132,8 @@ public: }; EPlanResult PlanTx(const ui64 planStep, const ui64 txId, NTabletFlatExecutor::TTransactionContext& txc); + void OnTabletInit(); }; } + diff --git a/ydb/core/tx/columnshard/transactions/ya.make b/ydb/core/tx/columnshard/transactions/ya.make new file mode 100644 index 0000000000..17f70379bb --- /dev/null +++ b/ydb/core/tx/columnshard/transactions/ya.make @@ -0,0 +1,20 @@ +LIBRARY() + +SRCS( + tx_controller.cpp +) + +PEERDIR( + ydb/core/tablet_flat + ydb/core/tx/data_events +) + +IF (OS_WINDOWS) + CFLAGS( + -DKIKIMR_DISABLE_S3_OPS + ) +ENDIF() + +YQL_LAST_ABI_VERSION() + +END() diff --git a/ydb/core/tx/columnshard/ya.make b/ydb/core/tx/columnshard/ya.make index 04443739ee..32c6f82418 100644 --- a/ydb/core/tx/columnshard/ya.make +++ b/ydb/core/tx/columnshard/ya.make @@ -27,7 +27,6 @@ SRCS( defs.cpp write_actor.cpp tables_manager.cpp - tx_controller.cpp ) GENERATE_ENUM_SERIALIZATION(columnshard.h) @@ -49,6 +48,8 @@ PEERDIR( ydb/core/tx/columnshard/common ydb/core/tx/columnshard/splitter ydb/core/tx/columnshard/operations + ydb/core/tx/columnshard/transactions + ydb/core/tx/columnshard/transactions/operators ydb/core/tx/columnshard/blobs_reader ydb/core/tx/columnshard/blobs_action ydb/core/tx/columnshard/resource_subscriber |