aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorchertus <azuikov@ydb.tech>2023-03-06 12:58:46 +0300
committerchertus <azuikov@ydb.tech>2023-03-06 12:58:46 +0300
commit5e173e098fd5ea097c89978a38210a2d446a8ab4 (patch)
tree3c7423934b7bc1a438336ddf49ef5fe66c68c4c1
parent91a970e57eca134bea57b1522398bee21f7c63d9 (diff)
downloadydb-5e173e098fd5ea097c89978a38210a2d446a8ab4.tar.gz
try fix UNDETERMINED problem in LongTx
-rw-r--r--ydb/core/grpc_services/rpc_long_tx.cpp35
-rw-r--r--ydb/core/protos/tx_columnshard.proto6
-rw-r--r--ydb/core/tx/columnshard/columnshard.cpp7
-rw-r--r--ydb/core/tx/columnshard/columnshard.h53
-rw-r--r--ydb/core/tx/columnshard/columnshard__progress_tx.cpp74
-rw-r--r--ydb/core/tx/columnshard/columnshard__propose_transaction.cpp16
-rw-r--r--ydb/core/tx/columnshard/columnshard__write.cpp22
-rw-r--r--ydb/core/tx/columnshard/columnshard_impl.cpp2
-rw-r--r--ydb/core/tx/columnshard/columnshard_impl.h6
-rw-r--r--ydb/core/tx/long_tx_service/commit_impl.cpp159
10 files changed, 302 insertions, 78 deletions
diff --git a/ydb/core/grpc_services/rpc_long_tx.cpp b/ydb/core/grpc_services/rpc_long_tx.cpp
index e511f837c3..be07f4066d 100644
--- a/ydb/core/grpc_services/rpc_long_tx.cpp
+++ b/ydb/core/grpc_services/rpc_long_tx.cpp
@@ -540,51 +540,20 @@ private:
}
}
- // Expects NKikimrTxColumnShard::EResultStatus
- static Ydb::StatusIds::StatusCode ConvertToYdbStatus(ui32 columnShardStatus) {
- switch (columnShardStatus) {
- case NKikimrTxColumnShard::UNSPECIFIED:
- return Ydb::StatusIds::STATUS_CODE_UNSPECIFIED;
-
- case NKikimrTxColumnShard::PREPARED:
- case NKikimrTxColumnShard::SUCCESS:
- return Ydb::StatusIds::SUCCESS;
-
- case NKikimrTxColumnShard::ABORTED:
- return Ydb::StatusIds::ABORTED;
-
- case NKikimrTxColumnShard::ERROR:
- return Ydb::StatusIds::GENERIC_ERROR;
-
- case NKikimrTxColumnShard::TIMEOUT:
- return Ydb::StatusIds::TIMEOUT;
-
- case NKikimrTxColumnShard::SCHEMA_ERROR:
- case NKikimrTxColumnShard::SCHEMA_CHANGED:
- return Ydb::StatusIds::SCHEME_ERROR;
-
- case NKikimrTxColumnShard::OVERLOADED:
- return Ydb::StatusIds::OVERLOADED;
-
- default:
- return Ydb::StatusIds::GENERIC_ERROR;
- }
- }
-
void Handle(TEvColumnShard::TEvWriteResult::TPtr& ev) {
auto gProfile = ActorSpan.StartStackTimeGuard("WriteResult");
const auto* msg = ev->Get();
ui64 shardId = msg->Record.GetOrigin();
Y_VERIFY(WaitShards.count(shardId) || ShardsWrites.count(shardId));
- auto status = msg->Record.GetStatus();
+ const auto status = (NKikimrTxColumnShard::EResultStatus)msg->Record.GetStatus();
if (status == NKikimrTxColumnShard::OVERLOADED) {
if (RetryWriteRequest(shardId)) {
return;
}
}
if (status != NKikimrTxColumnShard::SUCCESS) {
- auto ydbStatus = ConvertToYdbStatus(status);
+ auto ydbStatus = NColumnShard::ConvertToYdbStatus(status);
return ReplyError(ydbStatus,
TStringBuilder() << "Cannot write data into shard " << shardId << " in longTx " << LongTxId.ToString());
}
diff --git a/ydb/core/protos/tx_columnshard.proto b/ydb/core/protos/tx_columnshard.proto
index 89369c992f..e239807efe 100644
--- a/ydb/core/protos/tx_columnshard.proto
+++ b/ydb/core/protos/tx_columnshard.proto
@@ -134,6 +134,12 @@ message TEvProposeTransaction {
optional uint64 Flags = 7;
}
+message TEvCheckPlannedTransaction {
+ optional NActorsProto.TActorId Source = 1;
+ optional uint64 Step = 2;
+ optional uint64 TxId = 3;
+}
+
message TEvProposeTransactionResult {
optional EResultStatus Status = 1;
optional ETransactionKind TxKind = 2;
diff --git a/ydb/core/tx/columnshard/columnshard.cpp b/ydb/core/tx/columnshard/columnshard.cpp
index 444164d573..44a82e1826 100644
--- a/ydb/core/tx/columnshard/columnshard.cpp
+++ b/ydb/core/tx/columnshard/columnshard.cpp
@@ -247,8 +247,15 @@ ui64 TColumnShard::MemoryUsage() const {
PathsToDrop.size() * sizeof(ui64) +
Ttl.PathsCount() * sizeof(TTtl::TDescription) +
SchemaPresets.size() * sizeof(TSchemaPreset) +
+ BasicTxInfo.size() * sizeof(TBasicTxInfo) +
+ DeadlineQueue.size() * sizeof(TDeadlineQueueItem) +
+ (PlanQueue.size() + RunningQueue.size()) * sizeof(TPlanQueueItem) +
+ ScanTxInFlight.size() * (sizeof(ui64) + sizeof(TInstant)) +
AltersInFlight.size() * sizeof(TAlterMeta) +
CommitsInFlight.size() * sizeof(TCommitMeta) +
+ LongTxWrites.size() * (sizeof(TWriteId) + sizeof(TLongTxWriteInfo)) +
+ LongTxWritesByUniqueId.size() * (sizeof(TULID) + sizeof(void*)) +
+ (WaitingReads.size() + WaitingScans.size()) * (sizeof(TRowVersion) + sizeof(void*)) +
TabletCounters->Simple()[COUNTER_PREPARED_RECORDS].Get() * sizeof(NOlap::TInsertedData) +
TabletCounters->Simple()[COUNTER_COMMITTED_RECORDS].Get() * sizeof(NOlap::TInsertedData);
if (PrimaryIndex) {
diff --git a/ydb/core/tx/columnshard/columnshard.h b/ydb/core/tx/columnshard/columnshard.h
index 3e56dcd02c..bebf58bd1e 100644
--- a/ydb/core/tx/columnshard/columnshard.h
+++ b/ydb/core/tx/columnshard/columnshard.h
@@ -15,6 +15,36 @@ namespace NKikimr {
namespace NColumnShard {
class TBlobGroupSelector;
+
+inline Ydb::StatusIds::StatusCode ConvertToYdbStatus(NKikimrTxColumnShard::EResultStatus columnShardStatus) {
+ switch (columnShardStatus) {
+ case NKikimrTxColumnShard::UNSPECIFIED:
+ return Ydb::StatusIds::STATUS_CODE_UNSPECIFIED;
+
+ case NKikimrTxColumnShard::PREPARED:
+ case NKikimrTxColumnShard::SUCCESS:
+ return Ydb::StatusIds::SUCCESS;
+
+ case NKikimrTxColumnShard::ABORTED:
+ return Ydb::StatusIds::ABORTED;
+
+ case NKikimrTxColumnShard::ERROR:
+ return Ydb::StatusIds::GENERIC_ERROR;
+
+ case NKikimrTxColumnShard::TIMEOUT:
+ return Ydb::StatusIds::TIMEOUT;
+
+ case NKikimrTxColumnShard::SCHEMA_ERROR:
+ case NKikimrTxColumnShard::SCHEMA_CHANGED:
+ return Ydb::StatusIds::SCHEME_ERROR;
+
+ case NKikimrTxColumnShard::OVERLOADED:
+ return Ydb::StatusIds::OVERLOADED;
+
+ default:
+ return Ydb::StatusIds::GENERIC_ERROR;
+ }
+}
}
struct TEvColumnShard {
@@ -26,6 +56,7 @@ struct TEvColumnShard {
EvNotifyTxCompletionResult,
EvReadBlobRanges,
EvReadBlobRangesResult,
+ EvCheckPlannedTransaction,
EvWrite = EvProposeTransaction + 256,
EvRead,
@@ -75,6 +106,24 @@ struct TEvColumnShard {
}
};
+ struct TEvCheckPlannedTransaction
+ : public TEventPB<TEvCheckPlannedTransaction,
+ NKikimrTxColumnShard::TEvCheckPlannedTransaction,
+ EvCheckPlannedTransaction>
+ {
+ TEvCheckPlannedTransaction() = default;
+
+ TEvCheckPlannedTransaction(const TActorId& source, ui64 planStep, ui64 txId) {
+ ActorIdToProto(source, Record.MutableSource());
+ Record.SetStep(planStep);
+ Record.SetTxId(txId);
+ }
+
+ TActorId GetSource() const {
+ return ActorIdFromProto(Record.GetSource());
+ }
+ };
+
struct TEvCancelTransactionProposal
: public TEventPB<TEvCancelTransactionProposal,
NKikimrTxColumnShard::TEvCancelTransactionProposal,
@@ -276,6 +325,10 @@ inline auto& Proto(TEvColumnShard::TEvProposeTransaction* ev) {
return ev->Record;
}
+inline auto& Proto(TEvColumnShard::TEvCheckPlannedTransaction* ev) {
+ return ev->Record;
+}
+
inline auto& Proto(TEvColumnShard::TEvProposeTransactionResult* ev) {
return ev->Record;
}
diff --git a/ydb/core/tx/columnshard/columnshard__progress_tx.cpp b/ydb/core/tx/columnshard/columnshard__progress_tx.cpp
index 34f78c7eb5..65c03dbb97 100644
--- a/ydb/core/tx/columnshard/columnshard__progress_tx.cpp
+++ b/ydb/core/tx/columnshard/columnshard__progress_tx.cpp
@@ -18,6 +18,23 @@ private:
{ }
};
+ struct TResultEvent {
+ TBasicTxInfo TxInfo;
+ NKikimrTxColumnShard::EResultStatus Status;
+
+ TResultEvent(TBasicTxInfo&& txInfo, NKikimrTxColumnShard::EResultStatus status)
+ : TxInfo(std::move(txInfo))
+ , Status(status)
+ {}
+
+ THolder<IEventBase> MakeEvent(ui64 tabletId) const {
+ auto result = MakeHolder<TEvColumnShard::TEvProposeTransactionResult>(
+ tabletId, TxInfo.TxKind, TxInfo.TxId, Status);
+ result->Record.SetStep(TxInfo.PlanStep);
+ return result;
+ }
+ };
+
enum class ETriggerActivities {
NONE,
POST_INSERT,
@@ -62,14 +79,15 @@ public:
}
// Process a single transaction at the front of the queue
- if (Self->PlanQueue) {
+ if (!Self->PlanQueue.empty()) {
ui64 step;
ui64 txId;
{
- auto it = Self->PlanQueue.begin();
- step = it->Step;
- txId = it->TxId;
- Self->PlanQueue.erase(it);
+ auto node = Self->PlanQueue.extract(Self->PlanQueue.begin());
+ auto& item = node.value();
+ step = item.Step;
+ txId = item.TxId;
+ Self->RunningQueue.emplace(std::move(item));
}
auto& txInfo = Self->BasicTxInfo.at(txId);
@@ -150,11 +168,8 @@ public:
}
// Currently transactions never fail and there are no dependencies between them
- auto txKind = txInfo.TxKind;
- auto status = NKikimrTxColumnShard::SUCCESS;
- auto result = MakeHolder<TEvColumnShard::TEvProposeTransactionResult>(Self->TabletID(), txKind, txId, status);
- result->Record.SetStep(step);
- TxEvents.emplace_back(txInfo.Source, txInfo.Cookie, std::move(result));
+ txInfo.PlanStep = step;
+ TxResults.emplace_back(TResultEvent(std::move(txInfo), NKikimrTxColumnShard::SUCCESS));
Self->BasicTxInfo.erase(txId);
Schema::EraseTxInfo(db, txId);
@@ -163,7 +178,7 @@ public:
}
Self->ProgressTxInFlight = false;
- if (Self->PlanQueue) {
+ if (!Self->PlanQueue.empty()) {
Self->EnqueueProgressTx(ctx);
}
return true;
@@ -176,6 +191,14 @@ public:
ctx.Send(rec.Target, rec.Event.Release(), 0, rec.Cookie);
}
+ for (auto& res : TxResults) {
+ TPlanQueueItem txItem(res.TxInfo.PlanStep, res.TxInfo.TxId);
+ Self->RunningQueue.erase(txItem);
+
+ auto event = res.MakeEvent(Self->TabletID());
+ ctx.Send(res.TxInfo.Source, event.Release(), 0, res.TxInfo.Cookie);
+ }
+
Self->UpdateBlobMangerCounters();
if (Self->BlobManager->CanCollectGarbage()) {
Self->Execute(Self->CreateTxRunGc(), ctx);
@@ -195,7 +218,8 @@ public:
}
private:
- TVector<TEvent> TxEvents;
+ std::vector<TResultEvent> TxResults;
+ std::vector<TEvent> TxEvents;
ETriggerActivities Trigger{ETriggerActivities::NONE};
};
@@ -206,4 +230,30 @@ void TColumnShard::EnqueueProgressTx(const TActorContext& ctx) {
}
}
+void TColumnShard::Handle(TEvColumnShard::TEvCheckPlannedTransaction::TPtr& ev, const TActorContext& ctx) {
+ auto& record = Proto(ev->Get());
+ ui64 step = record.GetStep();
+ ui64 txId = record.GetTxId();
+ LOG_S_DEBUG("CheckTransaction planStep " << step << " txId " << txId << " at tablet " << TabletID());
+
+ TPlanQueueItem frontTx(LastPlannedStep, 0);
+ if (!RunningQueue.empty()) {
+ frontTx = TPlanQueueItem(RunningQueue.begin()->Step, RunningQueue.begin()->TxId);
+ } else if (!PlanQueue.empty()) {
+ frontTx = TPlanQueueItem(PlanQueue.begin()->Step, PlanQueue.begin()->TxId);
+ }
+
+ bool finished = step < frontTx.Step || (step == frontTx.Step && txId < frontTx.TxId);
+ if (finished) {
+ auto txKind = NKikimrTxColumnShard::ETransactionKind::TX_KIND_COMMIT;
+ auto status = NKikimrTxColumnShard::SUCCESS;
+ auto result = MakeHolder<TEvColumnShard::TEvProposeTransactionResult>(TabletID(), txKind, txId, status);
+ result->Record.SetStep(step);
+
+ ctx.Send(ev->Get()->GetSource(), result.Release());
+ }
+
+ // For now do not return result for not finished tx. It would be sent in TTxProgressTx::Complete()
+}
+
}
diff --git a/ydb/core/tx/columnshard/columnshard__propose_transaction.cpp b/ydb/core/tx/columnshard/columnshard__propose_transaction.cpp
index 68b166657c..de3bf81635 100644
--- a/ydb/core/tx/columnshard/columnshard__propose_transaction.cpp
+++ b/ydb/core/tx/columnshard/columnshard__propose_transaction.cpp
@@ -117,8 +117,20 @@ bool TTxProposeTransaction::Execute(TTransactionContext& txc, const TActorContex
}
case NKikimrTxColumnShard::TX_KIND_COMMIT: {
if (Self->CommitsInFlight.contains(txId)) {
- statusMessage = TStringBuilder()
- << "Commit TxId# " << txId << " has already been proposed";
+ LOG_S_DEBUG("TTxProposeTransaction CommitTx (retry) TxId " << txId << " at tablet " << Self->TabletID());
+
+ Y_VERIFY(Self->BasicTxInfo.contains(txId));
+ const auto& txInfo = Self->BasicTxInfo[txId];
+
+ if (txInfo.Source != Ev->Get()->GetSource() || txInfo.Cookie != Ev->Cookie) {
+ statusMessage = TStringBuilder()
+ << "Another commit TxId# " << txId << " has already been proposed";
+ break;
+ }
+
+ maxStep = txInfo.MaxStep;
+ minStep = maxStep - Self->MaxCommitTxDelay.MilliSeconds(); // TODO: improve this code
+ status = NKikimrTxColumnShard::EResultStatus::PREPARED;
break;
}
diff --git a/ydb/core/tx/columnshard/columnshard__write.cpp b/ydb/core/tx/columnshard/columnshard__write.cpp
index b2bfa0d18d..d73a747f8a 100644
--- a/ydb/core/tx/columnshard/columnshard__write.cpp
+++ b/ydb/core/tx/columnshard/columnshard__write.cpp
@@ -45,11 +45,13 @@ bool TTxWrite::Execute(TTransactionContext& txc, const TActorContext&) {
ui32 status = NKikimrTxColumnShard::EResultStatus::SUCCESS;
auto& logoBlobId = Ev->Get()->BlobId;
auto putStatus = Ev->Get()->PutStatus;
+ Y_VERIFY(putStatus == NKikimrProto::OK);
+ Y_VERIFY(logoBlobId.IsValid());
bool ok = false;
if (!Self->PrimaryIndex || !Self->IsTableWritable(tableId)) {
status = NKikimrTxColumnShard::EResultStatus::SCHEMA_ERROR;
- } else if (putStatus == NKikimrProto::OK && logoBlobId.IsValid()) {
+ } else {
if (record.HasLongTxId()) {
Y_VERIFY(metaShard == 0);
auto longTxId = NLongTxService::TLongTxId::FromProto(record.GetLongTxId());
@@ -102,12 +104,6 @@ bool TTxWrite::Execute(TTransactionContext& txc, const TActorContext&) {
// Return EResultStatus::SUCCESS for dups
Self->IncCounter(COUNTER_WRITE_DUPLICATE);
}
- } else if (putStatus == NKikimrProto::TIMEOUT) {
- status = NKikimrTxColumnShard::EResultStatus::TIMEOUT;
- } else if (putStatus == NKikimrProto::TRYLATER) {
- status = NKikimrTxColumnShard::EResultStatus::OVERLOADED;
- } else {
- status = NKikimrTxColumnShard::EResultStatus::ERROR;
}
if (status != NKikimrTxColumnShard::EResultStatus::SUCCESS) {
@@ -140,15 +136,16 @@ void TColumnShard::Handle(TEvColumnShard::TEvWrite::TPtr& ev, const TActorContex
ui64 metaShard = record.GetTxInitiator();
ui64 writeId = record.GetWriteId();
TString dedupId = record.GetDedupId();
+ auto putStatus = ev->Get()->PutStatus;
bool isWritable = IsTableWritable(tableId);
bool error = data.empty() || data.size() > TLimits::MAX_BLOB_SIZE || !PrimaryIndex || !isWritable;
- bool errorReturned = (ev->Get()->PutStatus != NKikimrProto::OK) && (ev->Get()->PutStatus != NKikimrProto::UNKNOWN);
+ bool errorReturned = (putStatus != NKikimrProto::OK) && (putStatus != NKikimrProto::UNKNOWN);
bool isOutOfSpace = IsAnyChannelYellowStop();
if (error || errorReturned) {
- LOG_S_WARN("Write (fail) " << data.size() << " bytes into pathId " << tableId
- << ", status " << ev->Get()->PutStatus
+ LOG_S_NOTICE("Write (fail) " << data.size() << " bytes into pathId " << tableId
+ << ", status " << putStatus
<< (PrimaryIndex? "": ", no index") << (isWritable? "": ", ro")
<< " at tablet " << TabletID());
@@ -156,8 +153,10 @@ void TColumnShard::Handle(TEvColumnShard::TEvWrite::TPtr& ev, const TActorContex
auto errCode = NKikimrTxColumnShard::EResultStatus::ERROR;
if (errorReturned) {
- if (ev->Get()->PutStatus == NKikimrProto::TIMEOUT) {
+ if (putStatus == NKikimrProto::TIMEOUT || putStatus == NKikimrProto::DEADLINE) {
errCode = NKikimrTxColumnShard::EResultStatus::TIMEOUT;
+ } else if (putStatus == NKikimrProto::TRYLATER || putStatus == NKikimrProto::OUT_OF_SPACE) {
+ errCode = NKikimrTxColumnShard::EResultStatus::OVERLOADED;
}
--WritesInFly; // write failed
}
@@ -171,6 +170,7 @@ void TColumnShard::Handle(TEvColumnShard::TEvWrite::TPtr& ev, const TActorContex
<< (writeId? (" writeId " + ToString(writeId)).c_str() : "") << " at tablet " << TabletID());
--WritesInFly; // write successed
+ Y_VERIFY(putStatus == NKikimrProto::OK);
Execute(new TTxWrite(this, ev), ctx);
} else if (isOutOfSpace || InsertTable->IsOverloaded(tableId) || ShardOverloaded()) {
IncCounter(COUNTER_WRITE_FAIL);
diff --git a/ydb/core/tx/columnshard/columnshard_impl.cpp b/ydb/core/tx/columnshard/columnshard_impl.cpp
index 80c3fc5f81..47e6990c39 100644
--- a/ydb/core/tx/columnshard/columnshard_impl.cpp
+++ b/ydb/core/tx/columnshard/columnshard_impl.cpp
@@ -205,7 +205,7 @@ void TColumnShard::RescheduleWaitingReads() {
}
TRowVersion TColumnShard::GetMaxReadVersion() const {
- if (PlanQueue) {
+ if (!PlanQueue.empty()) {
// We may only read just before the first transaction in the queue
auto it = PlanQueue.begin();
return TRowVersion(it->Step, it->TxId).Prev();
diff --git a/ydb/core/tx/columnshard/columnshard_impl.h b/ydb/core/tx/columnshard/columnshard_impl.h
index b88e5fd071..ab0b1af3f0 100644
--- a/ydb/core/tx/columnshard/columnshard_impl.h
+++ b/ydb/core/tx/columnshard/columnshard_impl.h
@@ -109,6 +109,7 @@ class TColumnShard
void Handle(TEvTabletPipe::TEvServerConnected::TPtr& ev, const TActorContext& ctx);
void Handle(TEvTabletPipe::TEvServerDisconnected::TPtr& ev, const TActorContext& ctx);
void Handle(TEvColumnShard::TEvProposeTransaction::TPtr& ev, const TActorContext& ctx);
+ void Handle(TEvColumnShard::TEvCheckPlannedTransaction::TPtr& ev, const TActorContext& ctx);
void Handle(TEvColumnShard::TEvCancelTransactionProposal::TPtr& ev, const TActorContext& ctx);
void Handle(TEvColumnShard::TEvNotifyTxCompletion::TPtr& ev, const TActorContext& ctx);
void Handle(TEvTxProcessing::TEvPlanStep::TPtr& ev, const TActorContext& ctx);
@@ -212,6 +213,7 @@ protected:
HFunc(TEvTabletPipe::TEvServerConnected, Handle);
HFunc(TEvTabletPipe::TEvServerDisconnected, Handle);
HFunc(TEvColumnShard::TEvProposeTransaction, Handle);
+ HFunc(TEvColumnShard::TEvCheckPlannedTransaction, Handle);
HFunc(TEvColumnShard::TEvCancelTransactionProposal, Handle);
HFunc(TEvColumnShard::TEvNotifyTxCompletion, Handle);
HFunc(TEvColumnShard::TEvScan, Handle);
@@ -369,10 +371,10 @@ private:
THashMap<ui64, TBasicTxInfo> BasicTxInfo;
TSet<TDeadlineQueueItem> DeadlineQueue;
- TSet<TPlanQueueItem> PlanQueue;
+ std::set<TPlanQueueItem> PlanQueue;
+ std::set<TPlanQueueItem> RunningQueue;
bool ProgressTxInFlight = false;
THashMap<ui64, TInstant> ScanTxInFlight;
-
THashMap<ui64, TAlterMeta> AltersInFlight;
THashMap<ui64, TCommitMeta> CommitsInFlight; // key is TxId from propose
THashMap<ui32, TSchemaPreset> SchemaPresets;
diff --git a/ydb/core/tx/long_tx_service/commit_impl.cpp b/ydb/core/tx/long_tx_service/commit_impl.cpp
index 3a2872bad0..91e082f962 100644
--- a/ydb/core/tx/long_tx_service/commit_impl.cpp
+++ b/ydb/core/tx/long_tx_service/commit_impl.cpp
@@ -3,6 +3,7 @@
#include <ydb/core/base/tablet_pipecache.h>
#include <ydb/core/tx/tx_proxy/proxy.h>
#include <ydb/core/tx/columnshard/columnshard.h>
+#include <ydb/core/actorlib_impl/long_timer.h>
#include <library/cpp/actors/core/actor_bootstrapped.h>
@@ -14,6 +15,15 @@
namespace NKikimr {
namespace NLongTxService {
+ struct TRetryData {
+ static constexpr ui32 MaxPrepareRetriesPerShard = 100; // ~30 sec
+ static constexpr ui32 MaxPlanRetriesPerShard = 1000; // ~5 min
+ static constexpr ui32 RetryDelayMs = 300;
+
+ ui64 WriteId = 0;
+ TString TxBody;
+ ui32 NumRetries = 0;
+ };
class TLongTxServiceActor::TCommitActor : public TActorBootstrapped<TCommitActor> {
public:
@@ -64,7 +74,7 @@ namespace NLongTxService {
const auto* msg = ev->Get();
TxId = msg->TxId;
Services = msg->Services;
- LogPrefix = TStringBuilder() << LogPrefix << " TxId# " << TxId;
+ LogPrefix = TStringBuilder() << LogPrefix << " TxId# " << TxId << " ";
TXLOG_DEBUG("Allocated TxId");
PrepareTransaction();
}
@@ -79,15 +89,40 @@ namespace NLongTxService {
TString txBody;
Y_VERIFY(tx.SerializeToString(&txBody));
- TXLOG_DEBUG("Sending TEvProposeTransaction to ColumnShard# " << tabletId << " WriteId# " << writeId);
- SendToTablet(tabletId, MakeHolder<TEvColumnShard::TEvProposeTransaction>(
+ WaitingShards.emplace(tabletId, TRetryData{writeId, txBody, 0});
+ SendPrepareTransaction(tabletId);
+ }
+ Become(&TThis::StatePrepare);
+ }
+
+ bool SendPrepareTransaction(ui64 tabletId, bool delayed = false) {
+ auto it = WaitingShards.find(tabletId);
+ if (it == WaitingShards.end()) {
+ return false;
+ }
+
+ auto& data = it->second;
+ if (delayed) {
+ if (data.NumRetries >= TRetryData::MaxPrepareRetriesPerShard) {
+ return false;
+ }
+ ++data.NumRetries;
+ if (ToRetry.empty()) {
+ TimeoutTimerActorId = CreateLongTimer(TDuration::MilliSeconds(TRetryData::RetryDelayMs),
+ new IEventHandle(this->SelfId(), this->SelfId(), new TEvents::TEvWakeup()));
+ }
+ ToRetry.insert(tabletId);
+ return true;
+ }
+
+ TXLOG_DEBUG("Sending TEvProposeTransaction to ColumnShard# " << tabletId << " WriteId# " << data.WriteId);
+
+ SendToTablet(tabletId, MakeHolder<TEvColumnShard::TEvProposeTransaction>(
NKikimrTxColumnShard::TX_KIND_COMMIT,
SelfId(),
TxId,
- std::move(txBody)));
- WaitingShards.insert(tabletId);
- }
- Become(&TThis::StatePrepare);
+ data.TxBody));
+ return true;
}
STFUNC(StatePrepare) {
@@ -95,6 +130,7 @@ namespace NLongTxService {
switch (ev->GetTypeRewrite()) {
hFunc(TEvColumnShard::TEvProposeTransactionResult, HandlePrepare);
hFunc(TEvPipeCache::TEvDeliveryProblem, HandlePrepare);
+ CFunc(TEvents::TSystem::Wakeup, HandlePrepareTimeout);
}
}
@@ -126,7 +162,7 @@ namespace NLongTxService {
void HandlePrepare(TEvColumnShard::TEvProposeTransactionResult::TPtr& ev) {
const auto* msg = ev->Get();
const ui64 tabletId = msg->Record.GetOrigin();
- const auto status = msg->Record.GetStatus();
+ const NKikimrTxColumnShard::EResultStatus status = msg->Record.GetStatus();
TXLOG_DEBUG("Received TEvProposeTransactionResult from"
<< " ColumnShard# " << tabletId
@@ -155,9 +191,13 @@ namespace NLongTxService {
// Cancel transaction, since we didn't plan it
CancelProposal();
+ auto ydbStatus = NColumnShard::ConvertToYdbStatus(status);
NYql::TIssues issues;
- issues.AddIssue("TODO: need mapping from shard errors to api errors");
- return FinishWithError(Ydb::StatusIds::GENERIC_ERROR, std::move(issues));
+ issues.AddIssue(TStringBuilder() << "Cannot prepare transaction at shard " << tabletId);
+ if (msg->Record.HasStatusMessage()) {
+ issues.AddIssue(msg->Record.GetStatusMessage());
+ }
+ return FinishWithError(ydbStatus, std::move(issues));
}
}
}
@@ -165,11 +205,21 @@ namespace NLongTxService {
void HandlePrepare(TEvPipeCache::TEvDeliveryProblem::TPtr& ev) {
const auto* msg = ev->Get();
const ui64 tabletId = msg->TabletId;
+ Y_VERIFY(tabletId != SelectedCoordinator);
TXLOG_DEBUG("Delivery problem"
<< " TabletId# " << tabletId
<< " NotDelivered# " << msg->NotDelivered);
+ if (!WaitingShards.count(tabletId)) {
+ return;
+ }
+
+ // Delayed retry
+ if (SendPrepareTransaction(tabletId, true)) {
+ return;
+ }
+
// Cancel transaction, since we didn't plan it
CancelProposal();
@@ -178,10 +228,19 @@ namespace NLongTxService {
return FinishWithError(Ydb::StatusIds::UNAVAILABLE, std::move(issues));
}
+ void HandlePrepareTimeout(const TActorContext& /*ctx*/) {
+ TimeoutTimerActorId = {};
+ for (ui64 tabletId : ToRetry) {
+ SendPrepareTransaction(tabletId);
+ }
+ ToRetry.clear();
+ }
+
private:
void PlanTransaction() {
Y_VERIFY(SelectedCoordinator);
Y_VERIFY(WaitingShards.empty());
+ ToRetry.clear();
auto req = MakeHolder<TEvTxProxy::TEvProposeTransaction>(
SelectedCoordinator, TxId, 0, MinStep, MaxStep);
@@ -193,7 +252,7 @@ namespace NLongTxService {
auto* x = reqAffectedSet->Add();
x->SetTabletId(tabletId);
x->SetFlags(/* write */ 2);
- WaitingShards.insert(tabletId);
+ WaitingShards.emplace(tabletId, TRetryData{});
}
TXLOG_DEBUG("Sending TEvProposeTransaction to SelectedCoordinator# " << SelectedCoordinator);
@@ -207,6 +266,7 @@ namespace NLongTxService {
hFunc(TEvTxProxy::TEvProposeTransactionStatus, HandlePlan);
hFunc(TEvColumnShard::TEvProposeTransactionResult, HandlePlan);
hFunc(TEvPipeCache::TEvDeliveryProblem, HandlePlan);
+ CFunc(TEvents::TSystem::Wakeup, HandlePlanTimeout);
}
}
@@ -267,10 +327,23 @@ namespace NLongTxService {
return Finish();
}
+ case NKikimrTxColumnShard::OUTDATED: {
+ if (!WaitingShards.count(tabletId)) {
+ return;
+ }
+ NYql::TIssues issues;
+ issues.AddIssue(TStringBuilder() << "Shard " << tabletId << " has no info about tx " << TxId);
+ return FinishWithError(Ydb::StatusIds::UNDETERMINED, std::move(issues));
+ }
+
default: {
+ auto ydbStatus = NColumnShard::ConvertToYdbStatus(status);
NYql::TIssues issues;
- issues.AddIssue("TODO: need mapping from shard errors to api errors");
- return FinishWithError(Ydb::StatusIds::GENERIC_ERROR, std::move(issues));
+ issues.AddIssue(TStringBuilder() << "Cannot plan transaction at shard " << tabletId);
+ if (msg->Record.HasStatusMessage()) {
+ issues.AddIssue(msg->Record.GetStatusMessage());
+ }
+ return FinishWithError(ydbStatus, std::move(issues));
}
}
}
@@ -296,9 +369,17 @@ namespace NLongTxService {
issues.AddIssue("Coordinator not available, transaction was not committed");
return FinishWithError(Ydb::StatusIds::UNAVAILABLE, std::move(issues));
}
- } else if (!WaitingShards.contains(tabletId)) {
- // We are not waiting for results from this shard
- return;
+ } else {
+ if (!PlanStep) {
+ // Waiting for PlanStep. Do not break transaction.
+ return;
+ }
+ // It's planned, not completed. We could check TEvProposeTransactionResult by PlanStep.
+ // - tx is completed if PlanStep:TxId at tablet is greater then ours
+ // - tx is not completed otherwise. Keep waiting TEvProposeTransactionResult for it.
+ if (SendCheckPlannedTransaction(tabletId, true)) {
+ return;
+ }
}
NYql::TIssues issues;
@@ -306,6 +387,48 @@ namespace NLongTxService {
return FinishWithError(Ydb::StatusIds::UNDETERMINED, std::move(issues));
}
+ void HandlePlanTimeout(const TActorContext& /*ctx*/) {
+ TimeoutTimerActorId = {};
+ for (ui64 tabletId : ToRetry) {
+ SendCheckPlannedTransaction(tabletId);
+ }
+ ToRetry.clear();
+ }
+
+ bool SendCheckPlannedTransaction(ui64 tabletId, bool delayed = false) {
+ Y_VERIFY(PlanStep);
+
+ if (delayed) {
+ auto it = WaitingShards.find(tabletId);
+ if (it == WaitingShards.end()) {
+ // We are not waiting for results from this shard
+ return true;
+ }
+
+ auto& numRetries = it->second.NumRetries;
+ if (numRetries >= TRetryData::MaxPlanRetriesPerShard) {
+ return false;
+ }
+ ++numRetries;
+
+ if (ToRetry.empty()) {
+ TimeoutTimerActorId = CreateLongTimer(TDuration::MilliSeconds(TRetryData::RetryDelayMs),
+ new IEventHandle(this->SelfId(), this->SelfId(), new TEvents::TEvWakeup()));
+ }
+ ToRetry.insert(tabletId);
+ return true;
+ }
+
+ TXLOG_DEBUG("Ask TEvProposeTransactionResult from ColumnShard# " << tabletId
+ << " for PlanStep# " << PlanStep << " TxId# " << TxId);
+
+ SendToTablet(tabletId, MakeHolder<TEvColumnShard::TEvCheckPlannedTransaction>(
+ SelfId(),
+ PlanStep,
+ TxId));
+ return true;
+ }
+
private:
void CancelProposal() {
for (const auto& pr : Params.ColumnShardWrites) {
@@ -336,11 +459,13 @@ namespace NLongTxService {
TString LogPrefix;
ui64 TxId = 0;
NTxProxy::TTxProxyServices Services;
- THashSet<ui64> WaitingShards;
+ THashMap<ui64, TRetryData> WaitingShards;
ui64 SelectedCoordinator = 0;
ui64 MinStep = 0;
ui64 MaxStep = Max<ui64>();
ui64 PlanStep = 0;
+ THashSet<ui64> ToRetry;
+ TActorId TimeoutTimerActorId;
};
void TLongTxServiceActor::StartCommitActor(TTransaction& tx) {