diff options
author | chertus <azuikov@ydb.tech> | 2023-03-06 12:58:46 +0300 |
---|---|---|
committer | chertus <azuikov@ydb.tech> | 2023-03-06 12:58:46 +0300 |
commit | 5e173e098fd5ea097c89978a38210a2d446a8ab4 (patch) | |
tree | 3c7423934b7bc1a438336ddf49ef5fe66c68c4c1 | |
parent | 91a970e57eca134bea57b1522398bee21f7c63d9 (diff) | |
download | ydb-5e173e098fd5ea097c89978a38210a2d446a8ab4.tar.gz |
try fix UNDETERMINED problem in LongTx
-rw-r--r-- | ydb/core/grpc_services/rpc_long_tx.cpp | 35 | ||||
-rw-r--r-- | ydb/core/protos/tx_columnshard.proto | 6 | ||||
-rw-r--r-- | ydb/core/tx/columnshard/columnshard.cpp | 7 | ||||
-rw-r--r-- | ydb/core/tx/columnshard/columnshard.h | 53 | ||||
-rw-r--r-- | ydb/core/tx/columnshard/columnshard__progress_tx.cpp | 74 | ||||
-rw-r--r-- | ydb/core/tx/columnshard/columnshard__propose_transaction.cpp | 16 | ||||
-rw-r--r-- | ydb/core/tx/columnshard/columnshard__write.cpp | 22 | ||||
-rw-r--r-- | ydb/core/tx/columnshard/columnshard_impl.cpp | 2 | ||||
-rw-r--r-- | ydb/core/tx/columnshard/columnshard_impl.h | 6 | ||||
-rw-r--r-- | ydb/core/tx/long_tx_service/commit_impl.cpp | 159 |
10 files changed, 302 insertions, 78 deletions
diff --git a/ydb/core/grpc_services/rpc_long_tx.cpp b/ydb/core/grpc_services/rpc_long_tx.cpp index e511f837c3..be07f4066d 100644 --- a/ydb/core/grpc_services/rpc_long_tx.cpp +++ b/ydb/core/grpc_services/rpc_long_tx.cpp @@ -540,51 +540,20 @@ private: } } - // Expects NKikimrTxColumnShard::EResultStatus - static Ydb::StatusIds::StatusCode ConvertToYdbStatus(ui32 columnShardStatus) { - switch (columnShardStatus) { - case NKikimrTxColumnShard::UNSPECIFIED: - return Ydb::StatusIds::STATUS_CODE_UNSPECIFIED; - - case NKikimrTxColumnShard::PREPARED: - case NKikimrTxColumnShard::SUCCESS: - return Ydb::StatusIds::SUCCESS; - - case NKikimrTxColumnShard::ABORTED: - return Ydb::StatusIds::ABORTED; - - case NKikimrTxColumnShard::ERROR: - return Ydb::StatusIds::GENERIC_ERROR; - - case NKikimrTxColumnShard::TIMEOUT: - return Ydb::StatusIds::TIMEOUT; - - case NKikimrTxColumnShard::SCHEMA_ERROR: - case NKikimrTxColumnShard::SCHEMA_CHANGED: - return Ydb::StatusIds::SCHEME_ERROR; - - case NKikimrTxColumnShard::OVERLOADED: - return Ydb::StatusIds::OVERLOADED; - - default: - return Ydb::StatusIds::GENERIC_ERROR; - } - } - void Handle(TEvColumnShard::TEvWriteResult::TPtr& ev) { auto gProfile = ActorSpan.StartStackTimeGuard("WriteResult"); const auto* msg = ev->Get(); ui64 shardId = msg->Record.GetOrigin(); Y_VERIFY(WaitShards.count(shardId) || ShardsWrites.count(shardId)); - auto status = msg->Record.GetStatus(); + const auto status = (NKikimrTxColumnShard::EResultStatus)msg->Record.GetStatus(); if (status == NKikimrTxColumnShard::OVERLOADED) { if (RetryWriteRequest(shardId)) { return; } } if (status != NKikimrTxColumnShard::SUCCESS) { - auto ydbStatus = ConvertToYdbStatus(status); + auto ydbStatus = NColumnShard::ConvertToYdbStatus(status); return ReplyError(ydbStatus, TStringBuilder() << "Cannot write data into shard " << shardId << " in longTx " << LongTxId.ToString()); } diff --git a/ydb/core/protos/tx_columnshard.proto b/ydb/core/protos/tx_columnshard.proto index 89369c992f..e239807efe 100644 --- a/ydb/core/protos/tx_columnshard.proto +++ b/ydb/core/protos/tx_columnshard.proto @@ -134,6 +134,12 @@ message TEvProposeTransaction { optional uint64 Flags = 7; } +message TEvCheckPlannedTransaction { + optional NActorsProto.TActorId Source = 1; + optional uint64 Step = 2; + optional uint64 TxId = 3; +} + message TEvProposeTransactionResult { optional EResultStatus Status = 1; optional ETransactionKind TxKind = 2; diff --git a/ydb/core/tx/columnshard/columnshard.cpp b/ydb/core/tx/columnshard/columnshard.cpp index 444164d573..44a82e1826 100644 --- a/ydb/core/tx/columnshard/columnshard.cpp +++ b/ydb/core/tx/columnshard/columnshard.cpp @@ -247,8 +247,15 @@ ui64 TColumnShard::MemoryUsage() const { PathsToDrop.size() * sizeof(ui64) + Ttl.PathsCount() * sizeof(TTtl::TDescription) + SchemaPresets.size() * sizeof(TSchemaPreset) + + BasicTxInfo.size() * sizeof(TBasicTxInfo) + + DeadlineQueue.size() * sizeof(TDeadlineQueueItem) + + (PlanQueue.size() + RunningQueue.size()) * sizeof(TPlanQueueItem) + + ScanTxInFlight.size() * (sizeof(ui64) + sizeof(TInstant)) + AltersInFlight.size() * sizeof(TAlterMeta) + CommitsInFlight.size() * sizeof(TCommitMeta) + + LongTxWrites.size() * (sizeof(TWriteId) + sizeof(TLongTxWriteInfo)) + + LongTxWritesByUniqueId.size() * (sizeof(TULID) + sizeof(void*)) + + (WaitingReads.size() + WaitingScans.size()) * (sizeof(TRowVersion) + sizeof(void*)) + TabletCounters->Simple()[COUNTER_PREPARED_RECORDS].Get() * sizeof(NOlap::TInsertedData) + TabletCounters->Simple()[COUNTER_COMMITTED_RECORDS].Get() * sizeof(NOlap::TInsertedData); if (PrimaryIndex) { diff --git a/ydb/core/tx/columnshard/columnshard.h b/ydb/core/tx/columnshard/columnshard.h index 3e56dcd02c..bebf58bd1e 100644 --- a/ydb/core/tx/columnshard/columnshard.h +++ b/ydb/core/tx/columnshard/columnshard.h @@ -15,6 +15,36 @@ namespace NKikimr { namespace NColumnShard { class TBlobGroupSelector; + +inline Ydb::StatusIds::StatusCode ConvertToYdbStatus(NKikimrTxColumnShard::EResultStatus columnShardStatus) { + switch (columnShardStatus) { + case NKikimrTxColumnShard::UNSPECIFIED: + return Ydb::StatusIds::STATUS_CODE_UNSPECIFIED; + + case NKikimrTxColumnShard::PREPARED: + case NKikimrTxColumnShard::SUCCESS: + return Ydb::StatusIds::SUCCESS; + + case NKikimrTxColumnShard::ABORTED: + return Ydb::StatusIds::ABORTED; + + case NKikimrTxColumnShard::ERROR: + return Ydb::StatusIds::GENERIC_ERROR; + + case NKikimrTxColumnShard::TIMEOUT: + return Ydb::StatusIds::TIMEOUT; + + case NKikimrTxColumnShard::SCHEMA_ERROR: + case NKikimrTxColumnShard::SCHEMA_CHANGED: + return Ydb::StatusIds::SCHEME_ERROR; + + case NKikimrTxColumnShard::OVERLOADED: + return Ydb::StatusIds::OVERLOADED; + + default: + return Ydb::StatusIds::GENERIC_ERROR; + } +} } struct TEvColumnShard { @@ -26,6 +56,7 @@ struct TEvColumnShard { EvNotifyTxCompletionResult, EvReadBlobRanges, EvReadBlobRangesResult, + EvCheckPlannedTransaction, EvWrite = EvProposeTransaction + 256, EvRead, @@ -75,6 +106,24 @@ struct TEvColumnShard { } }; + struct TEvCheckPlannedTransaction + : public TEventPB<TEvCheckPlannedTransaction, + NKikimrTxColumnShard::TEvCheckPlannedTransaction, + EvCheckPlannedTransaction> + { + TEvCheckPlannedTransaction() = default; + + TEvCheckPlannedTransaction(const TActorId& source, ui64 planStep, ui64 txId) { + ActorIdToProto(source, Record.MutableSource()); + Record.SetStep(planStep); + Record.SetTxId(txId); + } + + TActorId GetSource() const { + return ActorIdFromProto(Record.GetSource()); + } + }; + struct TEvCancelTransactionProposal : public TEventPB<TEvCancelTransactionProposal, NKikimrTxColumnShard::TEvCancelTransactionProposal, @@ -276,6 +325,10 @@ inline auto& Proto(TEvColumnShard::TEvProposeTransaction* ev) { return ev->Record; } +inline auto& Proto(TEvColumnShard::TEvCheckPlannedTransaction* ev) { + return ev->Record; +} + inline auto& Proto(TEvColumnShard::TEvProposeTransactionResult* ev) { return ev->Record; } diff --git a/ydb/core/tx/columnshard/columnshard__progress_tx.cpp b/ydb/core/tx/columnshard/columnshard__progress_tx.cpp index 34f78c7eb5..65c03dbb97 100644 --- a/ydb/core/tx/columnshard/columnshard__progress_tx.cpp +++ b/ydb/core/tx/columnshard/columnshard__progress_tx.cpp @@ -18,6 +18,23 @@ private: { } }; + struct TResultEvent { + TBasicTxInfo TxInfo; + NKikimrTxColumnShard::EResultStatus Status; + + TResultEvent(TBasicTxInfo&& txInfo, NKikimrTxColumnShard::EResultStatus status) + : TxInfo(std::move(txInfo)) + , Status(status) + {} + + THolder<IEventBase> MakeEvent(ui64 tabletId) const { + auto result = MakeHolder<TEvColumnShard::TEvProposeTransactionResult>( + tabletId, TxInfo.TxKind, TxInfo.TxId, Status); + result->Record.SetStep(TxInfo.PlanStep); + return result; + } + }; + enum class ETriggerActivities { NONE, POST_INSERT, @@ -62,14 +79,15 @@ public: } // Process a single transaction at the front of the queue - if (Self->PlanQueue) { + if (!Self->PlanQueue.empty()) { ui64 step; ui64 txId; { - auto it = Self->PlanQueue.begin(); - step = it->Step; - txId = it->TxId; - Self->PlanQueue.erase(it); + auto node = Self->PlanQueue.extract(Self->PlanQueue.begin()); + auto& item = node.value(); + step = item.Step; + txId = item.TxId; + Self->RunningQueue.emplace(std::move(item)); } auto& txInfo = Self->BasicTxInfo.at(txId); @@ -150,11 +168,8 @@ public: } // Currently transactions never fail and there are no dependencies between them - auto txKind = txInfo.TxKind; - auto status = NKikimrTxColumnShard::SUCCESS; - auto result = MakeHolder<TEvColumnShard::TEvProposeTransactionResult>(Self->TabletID(), txKind, txId, status); - result->Record.SetStep(step); - TxEvents.emplace_back(txInfo.Source, txInfo.Cookie, std::move(result)); + txInfo.PlanStep = step; + TxResults.emplace_back(TResultEvent(std::move(txInfo), NKikimrTxColumnShard::SUCCESS)); Self->BasicTxInfo.erase(txId); Schema::EraseTxInfo(db, txId); @@ -163,7 +178,7 @@ public: } Self->ProgressTxInFlight = false; - if (Self->PlanQueue) { + if (!Self->PlanQueue.empty()) { Self->EnqueueProgressTx(ctx); } return true; @@ -176,6 +191,14 @@ public: ctx.Send(rec.Target, rec.Event.Release(), 0, rec.Cookie); } + for (auto& res : TxResults) { + TPlanQueueItem txItem(res.TxInfo.PlanStep, res.TxInfo.TxId); + Self->RunningQueue.erase(txItem); + + auto event = res.MakeEvent(Self->TabletID()); + ctx.Send(res.TxInfo.Source, event.Release(), 0, res.TxInfo.Cookie); + } + Self->UpdateBlobMangerCounters(); if (Self->BlobManager->CanCollectGarbage()) { Self->Execute(Self->CreateTxRunGc(), ctx); @@ -195,7 +218,8 @@ public: } private: - TVector<TEvent> TxEvents; + std::vector<TResultEvent> TxResults; + std::vector<TEvent> TxEvents; ETriggerActivities Trigger{ETriggerActivities::NONE}; }; @@ -206,4 +230,30 @@ void TColumnShard::EnqueueProgressTx(const TActorContext& ctx) { } } +void TColumnShard::Handle(TEvColumnShard::TEvCheckPlannedTransaction::TPtr& ev, const TActorContext& ctx) { + auto& record = Proto(ev->Get()); + ui64 step = record.GetStep(); + ui64 txId = record.GetTxId(); + LOG_S_DEBUG("CheckTransaction planStep " << step << " txId " << txId << " at tablet " << TabletID()); + + TPlanQueueItem frontTx(LastPlannedStep, 0); + if (!RunningQueue.empty()) { + frontTx = TPlanQueueItem(RunningQueue.begin()->Step, RunningQueue.begin()->TxId); + } else if (!PlanQueue.empty()) { + frontTx = TPlanQueueItem(PlanQueue.begin()->Step, PlanQueue.begin()->TxId); + } + + bool finished = step < frontTx.Step || (step == frontTx.Step && txId < frontTx.TxId); + if (finished) { + auto txKind = NKikimrTxColumnShard::ETransactionKind::TX_KIND_COMMIT; + auto status = NKikimrTxColumnShard::SUCCESS; + auto result = MakeHolder<TEvColumnShard::TEvProposeTransactionResult>(TabletID(), txKind, txId, status); + result->Record.SetStep(step); + + ctx.Send(ev->Get()->GetSource(), result.Release()); + } + + // For now do not return result for not finished tx. It would be sent in TTxProgressTx::Complete() +} + } diff --git a/ydb/core/tx/columnshard/columnshard__propose_transaction.cpp b/ydb/core/tx/columnshard/columnshard__propose_transaction.cpp index 68b166657c..de3bf81635 100644 --- a/ydb/core/tx/columnshard/columnshard__propose_transaction.cpp +++ b/ydb/core/tx/columnshard/columnshard__propose_transaction.cpp @@ -117,8 +117,20 @@ bool TTxProposeTransaction::Execute(TTransactionContext& txc, const TActorContex } case NKikimrTxColumnShard::TX_KIND_COMMIT: { if (Self->CommitsInFlight.contains(txId)) { - statusMessage = TStringBuilder() - << "Commit TxId# " << txId << " has already been proposed"; + LOG_S_DEBUG("TTxProposeTransaction CommitTx (retry) TxId " << txId << " at tablet " << Self->TabletID()); + + Y_VERIFY(Self->BasicTxInfo.contains(txId)); + const auto& txInfo = Self->BasicTxInfo[txId]; + + if (txInfo.Source != Ev->Get()->GetSource() || txInfo.Cookie != Ev->Cookie) { + statusMessage = TStringBuilder() + << "Another commit TxId# " << txId << " has already been proposed"; + break; + } + + maxStep = txInfo.MaxStep; + minStep = maxStep - Self->MaxCommitTxDelay.MilliSeconds(); // TODO: improve this code + status = NKikimrTxColumnShard::EResultStatus::PREPARED; break; } diff --git a/ydb/core/tx/columnshard/columnshard__write.cpp b/ydb/core/tx/columnshard/columnshard__write.cpp index b2bfa0d18d..d73a747f8a 100644 --- a/ydb/core/tx/columnshard/columnshard__write.cpp +++ b/ydb/core/tx/columnshard/columnshard__write.cpp @@ -45,11 +45,13 @@ bool TTxWrite::Execute(TTransactionContext& txc, const TActorContext&) { ui32 status = NKikimrTxColumnShard::EResultStatus::SUCCESS; auto& logoBlobId = Ev->Get()->BlobId; auto putStatus = Ev->Get()->PutStatus; + Y_VERIFY(putStatus == NKikimrProto::OK); + Y_VERIFY(logoBlobId.IsValid()); bool ok = false; if (!Self->PrimaryIndex || !Self->IsTableWritable(tableId)) { status = NKikimrTxColumnShard::EResultStatus::SCHEMA_ERROR; - } else if (putStatus == NKikimrProto::OK && logoBlobId.IsValid()) { + } else { if (record.HasLongTxId()) { Y_VERIFY(metaShard == 0); auto longTxId = NLongTxService::TLongTxId::FromProto(record.GetLongTxId()); @@ -102,12 +104,6 @@ bool TTxWrite::Execute(TTransactionContext& txc, const TActorContext&) { // Return EResultStatus::SUCCESS for dups Self->IncCounter(COUNTER_WRITE_DUPLICATE); } - } else if (putStatus == NKikimrProto::TIMEOUT) { - status = NKikimrTxColumnShard::EResultStatus::TIMEOUT; - } else if (putStatus == NKikimrProto::TRYLATER) { - status = NKikimrTxColumnShard::EResultStatus::OVERLOADED; - } else { - status = NKikimrTxColumnShard::EResultStatus::ERROR; } if (status != NKikimrTxColumnShard::EResultStatus::SUCCESS) { @@ -140,15 +136,16 @@ void TColumnShard::Handle(TEvColumnShard::TEvWrite::TPtr& ev, const TActorContex ui64 metaShard = record.GetTxInitiator(); ui64 writeId = record.GetWriteId(); TString dedupId = record.GetDedupId(); + auto putStatus = ev->Get()->PutStatus; bool isWritable = IsTableWritable(tableId); bool error = data.empty() || data.size() > TLimits::MAX_BLOB_SIZE || !PrimaryIndex || !isWritable; - bool errorReturned = (ev->Get()->PutStatus != NKikimrProto::OK) && (ev->Get()->PutStatus != NKikimrProto::UNKNOWN); + bool errorReturned = (putStatus != NKikimrProto::OK) && (putStatus != NKikimrProto::UNKNOWN); bool isOutOfSpace = IsAnyChannelYellowStop(); if (error || errorReturned) { - LOG_S_WARN("Write (fail) " << data.size() << " bytes into pathId " << tableId - << ", status " << ev->Get()->PutStatus + LOG_S_NOTICE("Write (fail) " << data.size() << " bytes into pathId " << tableId + << ", status " << putStatus << (PrimaryIndex? "": ", no index") << (isWritable? "": ", ro") << " at tablet " << TabletID()); @@ -156,8 +153,10 @@ void TColumnShard::Handle(TEvColumnShard::TEvWrite::TPtr& ev, const TActorContex auto errCode = NKikimrTxColumnShard::EResultStatus::ERROR; if (errorReturned) { - if (ev->Get()->PutStatus == NKikimrProto::TIMEOUT) { + if (putStatus == NKikimrProto::TIMEOUT || putStatus == NKikimrProto::DEADLINE) { errCode = NKikimrTxColumnShard::EResultStatus::TIMEOUT; + } else if (putStatus == NKikimrProto::TRYLATER || putStatus == NKikimrProto::OUT_OF_SPACE) { + errCode = NKikimrTxColumnShard::EResultStatus::OVERLOADED; } --WritesInFly; // write failed } @@ -171,6 +170,7 @@ void TColumnShard::Handle(TEvColumnShard::TEvWrite::TPtr& ev, const TActorContex << (writeId? (" writeId " + ToString(writeId)).c_str() : "") << " at tablet " << TabletID()); --WritesInFly; // write successed + Y_VERIFY(putStatus == NKikimrProto::OK); Execute(new TTxWrite(this, ev), ctx); } else if (isOutOfSpace || InsertTable->IsOverloaded(tableId) || ShardOverloaded()) { IncCounter(COUNTER_WRITE_FAIL); diff --git a/ydb/core/tx/columnshard/columnshard_impl.cpp b/ydb/core/tx/columnshard/columnshard_impl.cpp index 80c3fc5f81..47e6990c39 100644 --- a/ydb/core/tx/columnshard/columnshard_impl.cpp +++ b/ydb/core/tx/columnshard/columnshard_impl.cpp @@ -205,7 +205,7 @@ void TColumnShard::RescheduleWaitingReads() { } TRowVersion TColumnShard::GetMaxReadVersion() const { - if (PlanQueue) { + if (!PlanQueue.empty()) { // We may only read just before the first transaction in the queue auto it = PlanQueue.begin(); return TRowVersion(it->Step, it->TxId).Prev(); diff --git a/ydb/core/tx/columnshard/columnshard_impl.h b/ydb/core/tx/columnshard/columnshard_impl.h index b88e5fd071..ab0b1af3f0 100644 --- a/ydb/core/tx/columnshard/columnshard_impl.h +++ b/ydb/core/tx/columnshard/columnshard_impl.h @@ -109,6 +109,7 @@ class TColumnShard void Handle(TEvTabletPipe::TEvServerConnected::TPtr& ev, const TActorContext& ctx); void Handle(TEvTabletPipe::TEvServerDisconnected::TPtr& ev, const TActorContext& ctx); void Handle(TEvColumnShard::TEvProposeTransaction::TPtr& ev, const TActorContext& ctx); + void Handle(TEvColumnShard::TEvCheckPlannedTransaction::TPtr& ev, const TActorContext& ctx); void Handle(TEvColumnShard::TEvCancelTransactionProposal::TPtr& ev, const TActorContext& ctx); void Handle(TEvColumnShard::TEvNotifyTxCompletion::TPtr& ev, const TActorContext& ctx); void Handle(TEvTxProcessing::TEvPlanStep::TPtr& ev, const TActorContext& ctx); @@ -212,6 +213,7 @@ protected: HFunc(TEvTabletPipe::TEvServerConnected, Handle); HFunc(TEvTabletPipe::TEvServerDisconnected, Handle); HFunc(TEvColumnShard::TEvProposeTransaction, Handle); + HFunc(TEvColumnShard::TEvCheckPlannedTransaction, Handle); HFunc(TEvColumnShard::TEvCancelTransactionProposal, Handle); HFunc(TEvColumnShard::TEvNotifyTxCompletion, Handle); HFunc(TEvColumnShard::TEvScan, Handle); @@ -369,10 +371,10 @@ private: THashMap<ui64, TBasicTxInfo> BasicTxInfo; TSet<TDeadlineQueueItem> DeadlineQueue; - TSet<TPlanQueueItem> PlanQueue; + std::set<TPlanQueueItem> PlanQueue; + std::set<TPlanQueueItem> RunningQueue; bool ProgressTxInFlight = false; THashMap<ui64, TInstant> ScanTxInFlight; - THashMap<ui64, TAlterMeta> AltersInFlight; THashMap<ui64, TCommitMeta> CommitsInFlight; // key is TxId from propose THashMap<ui32, TSchemaPreset> SchemaPresets; diff --git a/ydb/core/tx/long_tx_service/commit_impl.cpp b/ydb/core/tx/long_tx_service/commit_impl.cpp index 3a2872bad0..91e082f962 100644 --- a/ydb/core/tx/long_tx_service/commit_impl.cpp +++ b/ydb/core/tx/long_tx_service/commit_impl.cpp @@ -3,6 +3,7 @@ #include <ydb/core/base/tablet_pipecache.h> #include <ydb/core/tx/tx_proxy/proxy.h> #include <ydb/core/tx/columnshard/columnshard.h> +#include <ydb/core/actorlib_impl/long_timer.h> #include <library/cpp/actors/core/actor_bootstrapped.h> @@ -14,6 +15,15 @@ namespace NKikimr { namespace NLongTxService { + struct TRetryData { + static constexpr ui32 MaxPrepareRetriesPerShard = 100; // ~30 sec + static constexpr ui32 MaxPlanRetriesPerShard = 1000; // ~5 min + static constexpr ui32 RetryDelayMs = 300; + + ui64 WriteId = 0; + TString TxBody; + ui32 NumRetries = 0; + }; class TLongTxServiceActor::TCommitActor : public TActorBootstrapped<TCommitActor> { public: @@ -64,7 +74,7 @@ namespace NLongTxService { const auto* msg = ev->Get(); TxId = msg->TxId; Services = msg->Services; - LogPrefix = TStringBuilder() << LogPrefix << " TxId# " << TxId; + LogPrefix = TStringBuilder() << LogPrefix << " TxId# " << TxId << " "; TXLOG_DEBUG("Allocated TxId"); PrepareTransaction(); } @@ -79,15 +89,40 @@ namespace NLongTxService { TString txBody; Y_VERIFY(tx.SerializeToString(&txBody)); - TXLOG_DEBUG("Sending TEvProposeTransaction to ColumnShard# " << tabletId << " WriteId# " << writeId); - SendToTablet(tabletId, MakeHolder<TEvColumnShard::TEvProposeTransaction>( + WaitingShards.emplace(tabletId, TRetryData{writeId, txBody, 0}); + SendPrepareTransaction(tabletId); + } + Become(&TThis::StatePrepare); + } + + bool SendPrepareTransaction(ui64 tabletId, bool delayed = false) { + auto it = WaitingShards.find(tabletId); + if (it == WaitingShards.end()) { + return false; + } + + auto& data = it->second; + if (delayed) { + if (data.NumRetries >= TRetryData::MaxPrepareRetriesPerShard) { + return false; + } + ++data.NumRetries; + if (ToRetry.empty()) { + TimeoutTimerActorId = CreateLongTimer(TDuration::MilliSeconds(TRetryData::RetryDelayMs), + new IEventHandle(this->SelfId(), this->SelfId(), new TEvents::TEvWakeup())); + } + ToRetry.insert(tabletId); + return true; + } + + TXLOG_DEBUG("Sending TEvProposeTransaction to ColumnShard# " << tabletId << " WriteId# " << data.WriteId); + + SendToTablet(tabletId, MakeHolder<TEvColumnShard::TEvProposeTransaction>( NKikimrTxColumnShard::TX_KIND_COMMIT, SelfId(), TxId, - std::move(txBody))); - WaitingShards.insert(tabletId); - } - Become(&TThis::StatePrepare); + data.TxBody)); + return true; } STFUNC(StatePrepare) { @@ -95,6 +130,7 @@ namespace NLongTxService { switch (ev->GetTypeRewrite()) { hFunc(TEvColumnShard::TEvProposeTransactionResult, HandlePrepare); hFunc(TEvPipeCache::TEvDeliveryProblem, HandlePrepare); + CFunc(TEvents::TSystem::Wakeup, HandlePrepareTimeout); } } @@ -126,7 +162,7 @@ namespace NLongTxService { void HandlePrepare(TEvColumnShard::TEvProposeTransactionResult::TPtr& ev) { const auto* msg = ev->Get(); const ui64 tabletId = msg->Record.GetOrigin(); - const auto status = msg->Record.GetStatus(); + const NKikimrTxColumnShard::EResultStatus status = msg->Record.GetStatus(); TXLOG_DEBUG("Received TEvProposeTransactionResult from" << " ColumnShard# " << tabletId @@ -155,9 +191,13 @@ namespace NLongTxService { // Cancel transaction, since we didn't plan it CancelProposal(); + auto ydbStatus = NColumnShard::ConvertToYdbStatus(status); NYql::TIssues issues; - issues.AddIssue("TODO: need mapping from shard errors to api errors"); - return FinishWithError(Ydb::StatusIds::GENERIC_ERROR, std::move(issues)); + issues.AddIssue(TStringBuilder() << "Cannot prepare transaction at shard " << tabletId); + if (msg->Record.HasStatusMessage()) { + issues.AddIssue(msg->Record.GetStatusMessage()); + } + return FinishWithError(ydbStatus, std::move(issues)); } } } @@ -165,11 +205,21 @@ namespace NLongTxService { void HandlePrepare(TEvPipeCache::TEvDeliveryProblem::TPtr& ev) { const auto* msg = ev->Get(); const ui64 tabletId = msg->TabletId; + Y_VERIFY(tabletId != SelectedCoordinator); TXLOG_DEBUG("Delivery problem" << " TabletId# " << tabletId << " NotDelivered# " << msg->NotDelivered); + if (!WaitingShards.count(tabletId)) { + return; + } + + // Delayed retry + if (SendPrepareTransaction(tabletId, true)) { + return; + } + // Cancel transaction, since we didn't plan it CancelProposal(); @@ -178,10 +228,19 @@ namespace NLongTxService { return FinishWithError(Ydb::StatusIds::UNAVAILABLE, std::move(issues)); } + void HandlePrepareTimeout(const TActorContext& /*ctx*/) { + TimeoutTimerActorId = {}; + for (ui64 tabletId : ToRetry) { + SendPrepareTransaction(tabletId); + } + ToRetry.clear(); + } + private: void PlanTransaction() { Y_VERIFY(SelectedCoordinator); Y_VERIFY(WaitingShards.empty()); + ToRetry.clear(); auto req = MakeHolder<TEvTxProxy::TEvProposeTransaction>( SelectedCoordinator, TxId, 0, MinStep, MaxStep); @@ -193,7 +252,7 @@ namespace NLongTxService { auto* x = reqAffectedSet->Add(); x->SetTabletId(tabletId); x->SetFlags(/* write */ 2); - WaitingShards.insert(tabletId); + WaitingShards.emplace(tabletId, TRetryData{}); } TXLOG_DEBUG("Sending TEvProposeTransaction to SelectedCoordinator# " << SelectedCoordinator); @@ -207,6 +266,7 @@ namespace NLongTxService { hFunc(TEvTxProxy::TEvProposeTransactionStatus, HandlePlan); hFunc(TEvColumnShard::TEvProposeTransactionResult, HandlePlan); hFunc(TEvPipeCache::TEvDeliveryProblem, HandlePlan); + CFunc(TEvents::TSystem::Wakeup, HandlePlanTimeout); } } @@ -267,10 +327,23 @@ namespace NLongTxService { return Finish(); } + case NKikimrTxColumnShard::OUTDATED: { + if (!WaitingShards.count(tabletId)) { + return; + } + NYql::TIssues issues; + issues.AddIssue(TStringBuilder() << "Shard " << tabletId << " has no info about tx " << TxId); + return FinishWithError(Ydb::StatusIds::UNDETERMINED, std::move(issues)); + } + default: { + auto ydbStatus = NColumnShard::ConvertToYdbStatus(status); NYql::TIssues issues; - issues.AddIssue("TODO: need mapping from shard errors to api errors"); - return FinishWithError(Ydb::StatusIds::GENERIC_ERROR, std::move(issues)); + issues.AddIssue(TStringBuilder() << "Cannot plan transaction at shard " << tabletId); + if (msg->Record.HasStatusMessage()) { + issues.AddIssue(msg->Record.GetStatusMessage()); + } + return FinishWithError(ydbStatus, std::move(issues)); } } } @@ -296,9 +369,17 @@ namespace NLongTxService { issues.AddIssue("Coordinator not available, transaction was not committed"); return FinishWithError(Ydb::StatusIds::UNAVAILABLE, std::move(issues)); } - } else if (!WaitingShards.contains(tabletId)) { - // We are not waiting for results from this shard - return; + } else { + if (!PlanStep) { + // Waiting for PlanStep. Do not break transaction. + return; + } + // It's planned, not completed. We could check TEvProposeTransactionResult by PlanStep. + // - tx is completed if PlanStep:TxId at tablet is greater then ours + // - tx is not completed otherwise. Keep waiting TEvProposeTransactionResult for it. + if (SendCheckPlannedTransaction(tabletId, true)) { + return; + } } NYql::TIssues issues; @@ -306,6 +387,48 @@ namespace NLongTxService { return FinishWithError(Ydb::StatusIds::UNDETERMINED, std::move(issues)); } + void HandlePlanTimeout(const TActorContext& /*ctx*/) { + TimeoutTimerActorId = {}; + for (ui64 tabletId : ToRetry) { + SendCheckPlannedTransaction(tabletId); + } + ToRetry.clear(); + } + + bool SendCheckPlannedTransaction(ui64 tabletId, bool delayed = false) { + Y_VERIFY(PlanStep); + + if (delayed) { + auto it = WaitingShards.find(tabletId); + if (it == WaitingShards.end()) { + // We are not waiting for results from this shard + return true; + } + + auto& numRetries = it->second.NumRetries; + if (numRetries >= TRetryData::MaxPlanRetriesPerShard) { + return false; + } + ++numRetries; + + if (ToRetry.empty()) { + TimeoutTimerActorId = CreateLongTimer(TDuration::MilliSeconds(TRetryData::RetryDelayMs), + new IEventHandle(this->SelfId(), this->SelfId(), new TEvents::TEvWakeup())); + } + ToRetry.insert(tabletId); + return true; + } + + TXLOG_DEBUG("Ask TEvProposeTransactionResult from ColumnShard# " << tabletId + << " for PlanStep# " << PlanStep << " TxId# " << TxId); + + SendToTablet(tabletId, MakeHolder<TEvColumnShard::TEvCheckPlannedTransaction>( + SelfId(), + PlanStep, + TxId)); + return true; + } + private: void CancelProposal() { for (const auto& pr : Params.ColumnShardWrites) { @@ -336,11 +459,13 @@ namespace NLongTxService { TString LogPrefix; ui64 TxId = 0; NTxProxy::TTxProxyServices Services; - THashSet<ui64> WaitingShards; + THashMap<ui64, TRetryData> WaitingShards; ui64 SelectedCoordinator = 0; ui64 MinStep = 0; ui64 MaxStep = Max<ui64>(); ui64 PlanStep = 0; + THashSet<ui64> ToRetry; + TActorId TimeoutTimerActorId; }; void TLongTxServiceActor::StartCommitActor(TTransaction& tx) { |