aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorabcdef <akotov@ydb.tech>2023-05-18 09:05:33 +0300
committerabcdef <akotov@ydb.tech>2023-05-18 09:05:33 +0300
commitf6237db2054554be8c0799a5a39a58c00701a3a8 (patch)
tree6de9e42969a65568779054c2ec6ffbab8c7beeae
parent26fb4330a3aaf4e8ee67e0ebf130fedbd8f8bb5b (diff)
downloadydb-f6237db2054554be8c0799a5a39a58c00701a3a8.tar.gz
the PQ tablet sends the TEvProposedTransactionResult response directly
Исправлена ошибка с отправкой `TEvProposeTransactionResult`. В некоторых случаях оно отправлялось через pipe. По соглашению ответные сообщения должны отправляться напрямую. Для гарантии доставки ответного сообщения при рестартах таблетки отправившей запрос используются `TEvProposeTransactionAttach` и `TEvProposeTransactionAttachResult`.
-rw-r--r--ydb/core/kqp/executer_actor/kqp_data_executer.cpp2
-rw-r--r--ydb/core/persqueue/events/global.h4
-rw-r--r--ydb/core/persqueue/partition.cpp4
-rw-r--r--ydb/core/persqueue/pq_impl.cpp97
-rw-r--r--ydb/core/persqueue/pq_impl.h6
-rw-r--r--ydb/core/persqueue/transaction.cpp43
-rw-r--r--ydb/core/persqueue/transaction.h1
-rw-r--r--ydb/core/persqueue/ut/partition_ut.cpp2
-rw-r--r--ydb/core/persqueue/ut/pqtablet_ut.cpp16
-rw-r--r--ydb/core/protos/counters_schemeshard.proto2
-rw-r--r--ydb/core/protos/pqconfig.proto12
-rw-r--r--ydb/core/tx/schemeshard/schemeshard__operation.cpp22
-rw-r--r--ydb/core/tx/schemeshard/schemeshard__operation_common.cpp172
-rw-r--r--ydb/core/tx/schemeshard/schemeshard__operation_common.h20
-rw-r--r--ydb/core/tx/schemeshard/schemeshard__operation_part.h2
-rw-r--r--ydb/core/tx/schemeshard/schemeshard_impl.cpp27
-rw-r--r--ydb/core/tx/schemeshard/schemeshard_impl.h2
17 files changed, 195 insertions, 239 deletions
diff --git a/ydb/core/kqp/executer_actor/kqp_data_executer.cpp b/ydb/core/kqp/executer_actor/kqp_data_executer.cpp
index 1d07df03c15..afac59d7681 100644
--- a/ydb/core/kqp/executer_actor/kqp_data_executer.cpp
+++ b/ydb/core/kqp/executer_actor/kqp_data_executer.cpp
@@ -2232,7 +2232,7 @@ private:
}
transaction.SetImmediate(ImmediateTx);
- ActorIdToProto(SelfId(), ev->Record.MutableActor());
+ ActorIdToProto(SelfId(), ev->Record.MutableSourceActor());
ev->Record.MutableData()->Swap(&transaction);
ev->Record.SetTxId(TxId);
diff --git a/ydb/core/persqueue/events/global.h b/ydb/core/persqueue/events/global.h
index 6caf7858267..9b0e97ef120 100644
--- a/ydb/core/persqueue/events/global.h
+++ b/ydb/core/persqueue/events/global.h
@@ -6,6 +6,7 @@
#include <library/cpp/actors/core/actorid.h>
#include <ydb/core/base/blobstorage.h>
#include <ydb/core/protos/msgbus.pb.h>
+#include <ydb/core/tx/datashard/datashard.h>
#include <ydb/public/api/protos/draft/persqueue_common.pb.h>
namespace NKikimr {
@@ -241,5 +242,8 @@ struct TEvPersQueue {
struct TEvPeriodicTopicStats : public TEventPB<TEvPeriodicTopicStats, NKikimrPQ::TEvPeriodicTopicStats, EvPeriodicTopicStats> {
};
+
+ using TEvProposeTransactionAttach = TEvDataShard::TEvProposeTransactionAttach;
+ using TEvProposeTransactionAttachResult = TEvDataShard::TEvProposeTransactionAttachResult;
};
} //NKikimr
diff --git a/ydb/core/persqueue/partition.cpp b/ydb/core/persqueue/partition.cpp
index 8e4b8affb6f..7b29a9c202b 100644
--- a/ydb/core/persqueue/partition.cpp
+++ b/ydb/core/persqueue/partition.cpp
@@ -76,7 +76,7 @@ void TPartition::ReplyPropose(const TActorContext& ctx,
const NKikimrPQ::TEvProposeTransaction& event,
NKikimrPQ::TEvProposeTransactionResult::EStatus statusCode)
{
- ctx.Send(ActorIdFromProto(event.GetActor()),
+ ctx.Send(ActorIdFromProto(event.GetSourceActor()),
MakeReplyPropose(event, statusCode).Release());
}
@@ -1998,7 +1998,7 @@ void TPartition::ScheduleReplyError(const ui64 dst,
void TPartition::ScheduleReplyPropose(const NKikimrPQ::TEvProposeTransaction& event,
NKikimrPQ::TEvProposeTransactionResult::EStatus statusCode)
{
- Replies.emplace_back(ActorIdFromProto(event.GetActor()),
+ Replies.emplace_back(ActorIdFromProto(event.GetSourceActor()),
MakeReplyPropose(event,
statusCode).Release());
}
diff --git a/ydb/core/persqueue/pq_impl.cpp b/ydb/core/persqueue/pq_impl.cpp
index b285e24565f..1c7611d320b 100644
--- a/ydb/core/persqueue/pq_impl.cpp
+++ b/ydb/core/persqueue/pq_impl.cpp
@@ -30,8 +30,6 @@ static constexpr ui32 CACHE_SIZE = 100_MB;
static constexpr ui32 MAX_BYTES = 25_MB;
static constexpr ui32 MAX_SOURCE_ID_LENGTH = 10_KB;
-constexpr const auto InvalidTabletId = Max<ui64>();
-
struct TPartitionInfo {
TPartitionInfo(const TActorId& actor, TMaybe<TPartitionKeyRange>&& keyRange,
const bool initDone, const TTabletCountersBase& baseline)
@@ -600,21 +598,6 @@ struct TPersQueue::TReplyToActor {
TEventBasePtr Event;
};
-struct TPersQueue::TReplyToPipe {
- using TEventBasePtr = std::unique_ptr<IEventBase>;
-
- TReplyToPipe(ui64 tabletId, ui64 txId, TEventBasePtr event) :
- TabletId(tabletId),
- TxId(txId),
- Event(std::move(event))
- {
- }
-
- ui64 TabletId;
- ui64 TxId;
- TEventBasePtr Event;
-};
-
void TPersQueue::ReplyError(const TActorContext& ctx, const ui64 responseCookie, NPersQueue::NErrorCode::EErrorCode errorCode, const TString& error)
{
ReplyPersQueueError(
@@ -2412,7 +2395,7 @@ void TPersQueue::Handle(TEvPersQueue::TEvProposeTransaction::TPtr& ev, const TAc
HandleConfigTransaction(ev->Release(), ctx);
break;
case NKikimrPQ::TEvProposeTransaction::TXBODY_NOT_SET:
- SendProposeTransactionAbort(ActorIdFromProto(event.GetActor()),
+ SendProposeTransactionAbort(ActorIdFromProto(event.GetSourceActor()),
event.GetTxId(),
ctx);
break;
@@ -2446,7 +2429,7 @@ void TPersQueue::HandleDataTransaction(TAutoPtr<TEvPersQueue::TEvProposeTransact
}
if (TabletState != NKikimrPQ::ENormal) {
- SendProposeTransactionAbort(ActorIdFromProto(event.GetActor()),
+ SendProposeTransactionAbort(ActorIdFromProto(event.GetSourceActor()),
event.GetTxId(),
ctx);
return;
@@ -2683,26 +2666,26 @@ void TPersQueue::ProcessProposeTransactionQueue(const TActorContext& ctx)
EvProposeTransactionQueue.pop_front();
const NKikimrPQ::TEvProposeTransaction& event = front->Record;
-
TDistributedTransaction& tx = Txs[event.GetTxId()];
+
switch (tx.State) {
- case NKikimrPQ::TTransaction::UNKNOWN: {
+ case NKikimrPQ::TTransaction::UNKNOWN:
tx.OnProposeTransaction(event, GetAllowedStep(),
TabletID());
CheckTxState(ctx, tx);
break;
- }
- case NKikimrPQ::TTransaction::PREPARING: {
- break;
- }
- case NKikimrPQ::TTransaction::PREPARED: {
+ case NKikimrPQ::TTransaction::PREPARING:
+ case NKikimrPQ::TTransaction::PREPARED:
+ //
+ // the sender re-sent the TEvProposeTransaction. the actor ID could have changed. you need to
+ // update the ID in the transaction and schedule a response to the new actor
+ //
+ tx.SourceActor = ActorIdFromProto(event.GetSourceActor());
ScheduleProposeTransactionResult(tx);
break;
- }
- default: {
+ default:
Y_FAIL();
}
- }
}
}
@@ -2752,10 +2735,6 @@ void TPersQueue::ProcessPlanStepQueue(const TActorContext& ctx)
" Transaction already planned for step " << tx.Step <<
", Step: " << step <<
", TxId: " << txId);
-
- if (tx.State > NKikimrPQ::TTransaction::PLANNING) {
- SendEvProposeTransactionResult(ctx, tx);
- }
}
} else {
LOG_WARN_S(ctx, NKikimrServices::PERSQUEUE,
@@ -2861,11 +2840,7 @@ void TPersQueue::ScheduleProposeTransactionResult(const TDistributedTransaction&
event->Record.MutableDomainCoordinators()->CopyFrom(ProcessingParams->GetCoordinators());
}
- if (tx.SourceTablet != InvalidTabletId) {
- RepliesToPipe.emplace_back(tx.SourceTablet, tx.TxId, std::move(event));
- } else {
- RepliesToActor.emplace_back(tx.SourceActor, std::move(event));
- }
+ RepliesToActor.emplace_back(tx.SourceActor, std::move(event));
}
void TPersQueue::SchedulePlanStepAck(ui64 step,
@@ -2992,11 +2967,8 @@ void TPersQueue::SendEvProposeTransactionResult(const TActorContext& ctx,
result->Record.SetTxId(tx.TxId);
result->Record.SetStep(tx.Step);
- if (tx.SourceTablet != InvalidTabletId) {
- SendToPipe(tx.SourceTablet, tx, std::move(result), ctx);
- } else {
- ctx.Send(tx.SourceActor, std::move(result));
- }
+
+ ctx.Send(tx.SourceActor, std::move(result));
}
void TPersQueue::SendToPipe(ui64 tabletId,
@@ -3232,28 +3204,12 @@ void TPersQueue::DeleteTx(TDistributedTransaction& tx)
void TPersQueue::SendReplies(const TActorContext& ctx)
{
- SendRepliesToActors(ctx);
- SendRepliesToPipes(ctx);
-}
-
-void TPersQueue::SendRepliesToActors(const TActorContext& ctx)
-{
for (auto& [actorId, event] : RepliesToActor) {
ctx.Send(actorId, event.release());
}
RepliesToActor.clear();
}
-void TPersQueue::SendRepliesToPipes(const TActorContext& ctx)
-{
- for (auto& [tabletId, txId, event] : RepliesToPipe) {
- auto* tx = GetTransaction(ctx, txId);
- Y_VERIFY(tx);
- SendToPipe(tabletId, *tx, std::move(event), ctx);
- }
- RepliesToPipe.clear();
-}
-
void TPersQueue::CheckChangedTxStates(const TActorContext& ctx)
{
for (ui64 txId : ChangedTxs) {
@@ -3460,6 +3416,28 @@ void TPersQueue::Handle(TEvPQ::TEvSubDomainStatus::TPtr& ev, const TActorContext
}
}
+void TPersQueue::Handle(TEvPersQueue::TEvProposeTransactionAttach::TPtr &ev, const TActorContext &ctx)
+{
+ const ui64 txId = ev->Get()->Record.GetTxId();
+ NKikimrProto::EReplyStatus status = NKikimrProto::NODATA;
+
+ auto tx = GetTransaction(ctx, txId);
+ if (tx) {
+ //
+ // the actor's ID could have changed from the moment he sent the TEvProposeTransaction. you need to
+ // update the actor ID in the transaction
+ //
+ // if the transaction has progressed beyond WAIT_RS, then a response has been sent to the sender
+ //
+ tx->SourceActor = ev->Sender;
+ if (tx->State <= NKikimrPQ::TTransaction::WAIT_RS) {
+ status = NKikimrProto::OK;
+ }
+ }
+
+ ctx.Send(ev->Sender, new TEvPersQueue::TEvProposeTransactionAttachResult(TabletID(), txId, status), 0, ev->Cookie);
+}
+
bool TPersQueue::HandleHook(STFUNC_SIG)
{
SetActivityType(NKikimrServices::TActivity::PERSQUEUE_ACTOR);
@@ -3497,6 +3475,7 @@ bool TPersQueue::HandleHook(STFUNC_SIG)
HFuncTraced(TEvPQ::TEvProposePartitionConfigResult, Handle);
HFuncTraced(TEvPQ::TEvTxCommitDone, Handle);
HFuncTraced(TEvPQ::TEvSubDomainStatus, Handle);
+ HFuncTraced(TEvPersQueue::TEvProposeTransactionAttach, Handle);
default:
return false;
}
diff --git a/ydb/core/persqueue/pq_impl.h b/ydb/core/persqueue/pq_impl.h
index 912a9c4f680..0836d6261d1 100644
--- a/ydb/core/persqueue/pq_impl.h
+++ b/ydb/core/persqueue/pq_impl.h
@@ -239,10 +239,8 @@ private:
bool WriteTxsInProgress = false;
struct TReplyToActor;
- struct TReplyToPipe;
TVector<TReplyToActor> RepliesToActor;
- TVector<TReplyToPipe> RepliesToPipe;
TIntrusivePtr<NTabletPipe::TBoundedClientCacheConfig> PipeClientCacheConfig;
THolder<NTabletPipe::IClientCache> PipeClientCache;
@@ -289,8 +287,6 @@ private:
void DeleteTx(TDistributedTransaction& tx);
void SendReplies(const TActorContext& ctx);
- void SendRepliesToActors(const TActorContext& ctx);
- void SendRepliesToPipes(const TActorContext& ctx);
void CheckChangedTxStates(const TActorContext& ctx);
bool AllTransactionsHaveBeenProcessed() const;
@@ -355,6 +351,8 @@ private:
void InitProcessingParams(const TActorContext& ctx);
TMaybe<NKikimrSubDomains::TProcessingParams> ProcessingParams;
+
+ void Handle(TEvPersQueue::TEvProposeTransactionAttach::TPtr& ev, const TActorContext& ctx);
};
diff --git a/ydb/core/persqueue/transaction.cpp b/ydb/core/persqueue/transaction.cpp
index 1454c66de13..771f1c52fbf 100644
--- a/ydb/core/persqueue/transaction.cpp
+++ b/ydb/core/persqueue/transaction.cpp
@@ -34,12 +34,8 @@ TDistributedTransaction::TDistributedTransaction(const NKikimrPQ::TTransaction&
tx.GetAggrPredicate() ? NKikimrTx::TReadSetData::DECISION_COMMIT : NKikimrTx::TReadSetData::DECISION_ABORT;
}
- if (tx.HasTablet()) {
- SourceTablet = tx.GetTablet();
- } else {
- Y_VERIFY(tx.HasActor());
- SourceActor = ActorIdFromProto(tx.GetActor());
- }
+ Y_VERIFY(tx.HasSourceActor());
+ SourceActor = ActorIdFromProto(tx.GetSourceActor());
}
void TDistributedTransaction::InitDataTransaction(const NKikimrPQ::TTransaction& tx)
@@ -66,10 +62,6 @@ void TDistributedTransaction::InitPartitions(const google::protobuf::RepeatedPtr
void TDistributedTransaction::InitConfigTransaction(const NKikimrPQ::TTransaction& tx)
{
- Y_VERIFY(tx.HasSchemeShardId());
-
- Receivers.insert(tx.GetSchemeShardId());
-
TabletConfig = tx.GetTabletConfig();
BootstrapConfig = tx.GetBootstrapConfig();
@@ -96,7 +88,6 @@ void TDistributedTransaction::OnProposeTransaction(const NKikimrPQ::TEvProposeTr
ui64 extractTabletId)
{
Y_VERIFY(event.GetTxBodyCase() != NKikimrPQ::TEvProposeTransaction::TXBODY_NOT_SET);
- Y_VERIFY(event.GetSourceCase() != NKikimrPQ::TEvProposeTransaction::SOURCE_NOT_SET);
Y_VERIFY(TxId == Max<ui64>());
TxId = event.GetTxId();
@@ -118,18 +109,8 @@ void TDistributedTransaction::OnProposeTransaction(const NKikimrPQ::TEvProposeTr
Y_FAIL_S("unknown TxBody case");
}
- switch (event.GetSourceCase()) {
- case NKikimrPQ::TEvProposeTransaction::kActor:
- Y_VERIFY(event.HasActor());
- SourceActor = ActorIdFromProto(event.GetActor());
- break;
- case NKikimrPQ::TEvProposeTransaction::kTablet:
- Y_VERIFY(event.HasTablet());
- SourceTablet = event.GetTablet();
- break;
- default:
- Y_FAIL_S("unknown Source case");
- }
+ Y_VERIFY(event.HasSourceActor());
+ SourceActor = ActorIdFromProto(event.GetSourceActor());
}
void TDistributedTransaction::OnProposeTransaction(const NKikimrPQ::TDataTransaction& txBody,
@@ -159,12 +140,8 @@ void TDistributedTransaction::OnProposeTransaction(const NKikimrPQ::TDataTransac
void TDistributedTransaction::OnProposeTransaction(const NKikimrPQ::TConfigTransaction& txBody)
{
- Y_VERIFY(txBody.HasSchemeShardId());
-
Kind = NKikimrPQ::TTransaction::KIND_CONFIG;
- Receivers.insert(txBody.GetSchemeShardId());
-
TabletConfig = txBody.GetTabletConfig();
BootstrapConfig = txBody.GetBootstrapConfig();
@@ -313,12 +290,8 @@ void TDistributedTransaction::AddCmdWrite(NKikimrClient::TKeyValueRequest& reque
Y_FAIL_S("unknown transaction type");
}
- if (SourceTablet != Max<ui64>()) {
- tx.SetTablet(SourceTablet);
- } else {
- Y_VERIFY(SourceActor != TActorId());
- ActorIdToProto(SourceActor, tx.MutableActor());
- }
+ Y_VERIFY(SourceActor != TActorId());
+ ActorIdToProto(SourceActor, tx.MutableSourceActor());
TString value;
Y_VERIFY(tx.SerializeToString(&value));
@@ -347,10 +320,6 @@ void TDistributedTransaction::AddCmdWriteDataTx(NKikimrPQ::TTransaction& tx)
void TDistributedTransaction::AddCmdWriteConfigTx(NKikimrPQ::TTransaction& tx)
{
- Y_VERIFY(Receivers.size() == 1);
-
- tx.SetSchemeShardId(*Receivers.begin());
-
*tx.MutableTabletConfig() = TabletConfig;
*tx.MutableBootstrapConfig() = BootstrapConfig;
}
diff --git a/ydb/core/persqueue/transaction.h b/ydb/core/persqueue/transaction.h
index 8e08f29ad46..5ffeb1704b0 100644
--- a/ydb/core/persqueue/transaction.h
+++ b/ydb/core/persqueue/transaction.h
@@ -51,7 +51,6 @@ struct TDistributedTransaction {
EDecision SelfDecision = NKikimrTx::TReadSetData::DECISION_UNKNOWN;
EDecision ParticipantsDecision = NKikimrTx::TReadSetData::DECISION_UNKNOWN;
NActors::TActorId SourceActor; // отправитель TEvProposeTransaction
- ui64 SourceTablet = Max<ui64>();
THashSet<ui32> Partitions; // список участвующих партиций
size_t PartitionRepliesCount = 0;
diff --git a/ydb/core/persqueue/ut/partition_ut.cpp b/ydb/core/persqueue/ut/partition_ut.cpp
index 16df7691622..870c70d8f2a 100644
--- a/ydb/core/persqueue/ut/partition_ut.cpp
+++ b/ydb/core/persqueue/ut/partition_ut.cpp
@@ -731,7 +731,7 @@ void TPartitionFixture::SendProposeTransactionRequest(ui32 partition,
{
auto event = MakeHolder<TEvPersQueue::TEvProposeTransaction>();
- ActorIdToProto(Ctx->Edge, event->Record.MutableActor());
+ ActorIdToProto(Ctx->Edge, event->Record.MutableSourceActor());
auto* body = event->Record.MutableData();
auto* operation = body->MutableOperations()->Add();
operation->SetPartitionId(partition);
diff --git a/ydb/core/persqueue/ut/pqtablet_ut.cpp b/ydb/core/persqueue/ut/pqtablet_ut.cpp
index 1013ccfd26f..0bbb72e1654 100644
--- a/ydb/core/persqueue/ut/pqtablet_ut.cpp
+++ b/ydb/core/persqueue/ut/pqtablet_ut.cpp
@@ -38,7 +38,6 @@ struct TTxOperation {
struct TConfigParams {
TMaybe<NKikimrPQ::TPQTabletConfig> Tablet;
TMaybe<NKikimrPQ::TBootstrapConfig> Bootstrap;
- ui64 SchemeShardId = 0;
};
struct TProposeTransactionParams {
@@ -216,7 +215,7 @@ void TPQTabletFixture::SendProposeTransactionRequest(const TProposeTransactionPa
auto event = MakeHolder<TEvPersQueue::TEvProposeTransaction>();
THashSet<ui32> partitions;
- ActorIdToProto(Ctx->Edge, event->Record.MutableActor());
+ ActorIdToProto(Ctx->Edge, event->Record.MutableSourceActor());
event->Record.SetTxId(params.TxId);
if (params.Configs) {
@@ -224,9 +223,6 @@ void TPQTabletFixture::SendProposeTransactionRequest(const TProposeTransactionPa
// TxBody.Config
//
auto* body = event->Record.MutableConfig();
- if (params.Configs->SchemeShardId) {
- body->SetSchemeShardId(params.Configs->SchemeShardId);
- }
if (params.Configs->Tablet.Defined()) {
*body->MutableTabletConfig() = *params.Configs->Tablet;
}
@@ -750,7 +746,6 @@ Y_UNIT_TEST_F(DropTablet_Before_Write, TPQTabletFixture)
Y_UNIT_TEST_F(UpdateConfig_1, TPQTabletFixture)
{
- NHelpers::TPQTabletMock* schemeshard = CreatePQTabletMock(22222);
PQTabletPrepare({.partitions=2}, {}, *Ctx);
const ui64 txId = 67890;
@@ -765,7 +760,6 @@ Y_UNIT_TEST_F(UpdateConfig_1, TPQTabletFixture)
.Configs=NHelpers::TConfigParams{
.Tablet=tabletConfig,
.Bootstrap=NHelpers::MakeBootstrapConfig(),
- .SchemeShardId = 22222
}});
WaitProposeTransactionResponse({.TxId=txId,
.Status=NKikimrPQ::TEvProposeTransactionResult::PREPARED});
@@ -775,16 +769,12 @@ Y_UNIT_TEST_F(UpdateConfig_1, TPQTabletFixture)
WaitPlanStepAck({.Step=100, .TxIds={txId}});
WaitPlanStepAccepted({.Step=100});
- WaitReadSet(*schemeshard, {.Step=100, .TxId=txId, .Source=Ctx->TabletId, .Target=22222, .Decision=NKikimrTx::TReadSetData::DECISION_COMMIT, .Producer=Ctx->TabletId});
- schemeshard->SendReadSetAck(*Ctx->Runtime, {.Step=100, .TxId=txId, .Source=Ctx->TabletId});
-
WaitProposeTransactionResponse({.TxId=txId,
.Status=NKikimrPQ::TEvProposeTransactionResult::COMPLETE});
}
Y_UNIT_TEST_F(UpdateConfig_2, TPQTabletFixture)
{
- NHelpers::TPQTabletMock* schemeshard = CreatePQTabletMock(22222);
PQTabletPrepare({.partitions=2}, {}, *Ctx);
const ui64 txId_2 = 67891;
@@ -801,7 +791,6 @@ Y_UNIT_TEST_F(UpdateConfig_2, TPQTabletFixture)
.Configs=NHelpers::TConfigParams{
.Tablet=tabletConfig,
.Bootstrap=NHelpers::MakeBootstrapConfig(),
- .SchemeShardId = 22222
}});
SendProposeTransactionRequest({.TxId=txId_3,
.TxOps={
@@ -819,9 +808,6 @@ Y_UNIT_TEST_F(UpdateConfig_2, TPQTabletFixture)
WaitPlanStepAck({.Step=100, .TxIds={txId_2, txId_3}});
WaitPlanStepAccepted({.Step=100});
- WaitReadSet(*schemeshard, {.Step=100, .TxId=txId_2, .Source=Ctx->TabletId, .Target=22222, .Decision=NKikimrTx::TReadSetData::DECISION_COMMIT, .Producer=Ctx->TabletId});
- schemeshard->SendReadSetAck(*Ctx->Runtime, {.Step=100, .TxId=txId_2, .Source=Ctx->TabletId});
-
WaitProposeTransactionResponse({.TxId=txId_2,
.Status=NKikimrPQ::TEvProposeTransactionResult::COMPLETE});
WaitProposeTransactionResponse({.TxId=txId_3,
diff --git a/ydb/core/protos/counters_schemeshard.proto b/ydb/core/protos/counters_schemeshard.proto
index dba8eea5d9c..5b6843dfda8 100644
--- a/ydb/core/protos/counters_schemeshard.proto
+++ b/ydb/core/protos/counters_schemeshard.proto
@@ -520,5 +520,5 @@ enum ETxTypes {
TXTYPE_CDC_STREAM_SCAN_PROGRESS = 82 [(TxTypeOpts) = {Name: "TxCdcStreamScanProgress"}];
TXTYPE_PERSQUEUE_PROPOSE_RESULT = 83 [(TxTypeOpts) = {Name: "TxPersQueueProposeResult"}];
- TXTYPE_READSETDATA = 84 [(TxTypeOpts) = {Name: "TxReadSetData"}];
+ TXTYPE_PERSQUEUE_PROPOSE_ATTACH_RESULT = 84 [(TxTypeOpts) = {Name: "TxProposeTransactionAttachResult"}];
}
diff --git a/ydb/core/protos/pqconfig.proto b/ydb/core/protos/pqconfig.proto
index b9b438290f3..66745b10d04 100644
--- a/ydb/core/protos/pqconfig.proto
+++ b/ydb/core/protos/pqconfig.proto
@@ -820,16 +820,12 @@ message TDataTransaction {
}
message TConfigTransaction {
- optional uint64 SchemeShardId = 1;
optional TPQTabletConfig TabletConfig = 2;
optional TBootstrapConfig BootstrapConfig = 3;
}
message TEvProposeTransaction {
- oneof Source {
- NActorsProto.TActorId Actor = 1;
- uint64 Tablet = 5;
- }
+ optional NActorsProto.TActorId SourceActor = 1;
optional uint64 TxId = 2;
oneof TxBody {
TDataTransaction Data = 3;
@@ -920,15 +916,11 @@ message TTransaction {
//
optional TPQTabletConfig TabletConfig = 12;
optional TBootstrapConfig BootstrapConfig = 13;
- optional uint64 SchemeShardId = 16;
//
// получатель результата
//
- oneof Source {
- NActorsProto.TActorId Actor = 14;
- uint64 Tablet = 15;
- }
+ optional NActorsProto.TActorId SourceActor = 14;
};
message TTabletTxInfo {
diff --git a/ydb/core/tx/schemeshard/schemeshard__operation.cpp b/ydb/core/tx/schemeshard/schemeshard__operation.cpp
index 5797ec4b014..3ade64ef64a 100644
--- a/ydb/core/tx/schemeshard/schemeshard__operation.cpp
+++ b/ydb/core/tx/schemeshard/schemeshard__operation.cpp
@@ -383,28 +383,6 @@ void OutOfScopeEventHandler<TEvDataShard::TEvSchemaChanged>(const TEvDataShard::
context.OnComplete.Send(ackTo, event.Release());
}
-//
-// PQ tablet sends TEvTxProcessing::TEvReadSet and is waiting for a response. If there is no response, the
-// transaction for changing the config will not be completed. As a result, the execution of other transactions
-// in the queue will stop on the tablet side of the PQ
-//
-template <>
-void OutOfScopeEventHandler<TEvTxProcessing::TEvReadSet>(const TEvTxProcessing::TEvReadSet::TPtr& ev, TOperationContext& context)
-{
- const auto txId = ev->Get()->Record.GetTxId();
- LOG_DEBUG_S(context.Ctx, NKikimrServices::FLAT_TX_SCHEMESHARD,
- "TTxOperationReply<" << ev->GetTypeName() << "> execute"
- << ", at schemeshard: " << context.SS->TabletID()
- << ", send out-of-scope reply, for txId " << txId
- );
-
- const TActorId ackTo = ev->Sender;
- const TTabletId ssId = context.SS->SelfTabletId();
-
- auto ack = std::make_unique<TEvTxProcessing::TEvReadSetAck>(*ev->Get(), ui64(ssId));
- context.OnComplete.Send(ackTo, ack.release());
-}
-
template <class TEvType>
struct TTxTypeFrom;
diff --git a/ydb/core/tx/schemeshard/schemeshard__operation_common.cpp b/ydb/core/tx/schemeshard/schemeshard__operation_common.cpp
index 9b266eb70f0..1d6b4428e17 100644
--- a/ydb/core/tx/schemeshard/schemeshard__operation_common.cpp
+++ b/ydb/core/tx/schemeshard/schemeshard__operation_common.cpp
@@ -595,9 +595,9 @@ bool CollectProposeTransactionResults(const TOperationId& operationId,
return CollectProposeTxResults(ev, operationId, context, prepared, toString);
}
-bool CollectSchemaChanged(const TOperationId& operationId,
- const TEvPersQueue::TEvProposeTransactionResult::TPtr& ev,
- TOperationContext& context)
+bool CollectPQConfigChanged(const TOperationId& operationId,
+ const TEvPersQueue::TEvProposeTransactionResult::TPtr& ev,
+ TOperationContext& context)
{
Y_VERIFY(context.SS->FindTx(operationId));
TTxState& txState = *context.SS->FindTx(operationId);
@@ -615,7 +615,7 @@ bool CollectSchemaChanged(const TOperationId& operationId,
txState.ShardsInProgress.erase(shardIdx);
LOG_DEBUG_S(context.Ctx, NKikimrServices::FLAT_TX_SCHEMESHARD,
- "CollectSchemaChanged accept TEvPersQueue::TEvProposeTransactionResult"
+ "CollectPQConfigChanged accept TEvPersQueue::TEvProposeTransactionResult"
<< ", operationId: " << operationId
<< ", shardIdx: " << shardIdx
<< ", shard: " << shardId
@@ -628,6 +628,57 @@ bool CollectSchemaChanged(const TOperationId& operationId,
return txState.ShardsInProgress.empty();
}
+bool CollectPQConfigChanged(const TOperationId& operationId,
+ const TEvPersQueue::TEvProposeTransactionAttachResult::TPtr& ev,
+ TOperationContext& context)
+{
+ Y_VERIFY(context.SS->FindTx(operationId));
+ TTxState& txState = *context.SS->FindTx(operationId);
+
+ //
+ // The PQ tablet can perform a transaction and send a TEvProposeTransactionResult(COMPLETE) response.
+ // The SchemeShard tablet can restart at this point. After restarting at the TPropose step, it will
+ // send the TEvProposeTransactionAttach message to the PQ tablets. If the NODATA status is specified in
+ // the response TEvProposeTransactionAttachResult, then the PQ tablet has already completed the transaction.
+ // Otherwise, she continues to execute the transaction
+ //
+
+ const auto& evRecord = ev->Get()->Record;
+ if (evRecord.GetStatus() != NKikimrProto::NODATA) {
+ //
+ // If the PQ tablet returned something other than NODATA, then it continues to execute the transaction
+ //
+ return txState.ShardsInProgress.empty();
+ }
+
+ //
+ // Otherwise, she has already completed the transaction and has forgotten about it. Then we can
+ // remove PQ tablet from the list of shards
+ //
+
+ const auto ssId = context.SS->SelfTabletId();
+ const TTabletId shardId(evRecord.GetTabletId());
+
+ const auto shardIdx = context.SS->MustGetShardIdx(shardId);
+ Y_VERIFY(context.SS->ShardInfos.contains(shardIdx));
+
+ Y_VERIFY(txState.State == TTxState::Propose);
+
+ txState.ShardsInProgress.erase(shardIdx);
+
+ LOG_DEBUG_S(context.Ctx, NKikimrServices::FLAT_TX_SCHEMESHARD,
+ "CollectPQConfigChanged accept TEvPersQueue::TEvProposeTransactionAttachResult"
+ << ", operationId: " << operationId
+ << ", shardIdx: " << shardIdx
+ << ", shard: " << shardId
+ << ", left await: " << txState.ShardsInProgress.size()
+ << ", txState.State: " << TTxState::StateName(txState.State)
+ << ", txState.ReadyForNotifications: " << txState.ReadyForNotifications
+ << ", at schemeshard: " << ssId);
+
+ return txState.ShardsInProgress.empty();
+}
+
bool TConfigureParts::HandleReply(TEvPersQueue::TEvProposeTransactionResult::TPtr& ev, TOperationContext& context)
{
const TTabletId ssId = context.SS->SelfTabletId();
@@ -649,13 +700,11 @@ THolder<TEvPersQueue::TEvProposeTransaction> TConfigureParts::MakeEvProposeTrans
const TString& databaseId,
const TString& databasePath,
TTxState::ETxType txType,
- TTabletId ssId,
const TOperationContext& context)
{
auto event = MakeHolder<TEvPersQueue::TEvProposeTransaction>();
event->Record.SetTxId(ui64(txId));
- event->Record.SetTablet(ui64(ssId));
- event->Record.MutableConfig()->SetSchemeShardId(ui64(ssId));
+ ActorIdToProto(context.SS->SelfId(), event->Record.MutableSourceActor());
MakePQTabletConfig(*event->Record.MutableConfig()->MutableTabletConfig(),
pqGroup,
@@ -723,29 +772,29 @@ bool TPropose::HandleReply(TEvPersQueue::TEvProposeTransactionResult::TPtr& ev,
<< ", at schemeshard: " << ssId
<< " message# " << evRecord.ShortDebugString());
- const bool collected = CollectSchemaChanged(OperationId, ev, context);
+ const bool collected = CollectPQConfigChanged(OperationId, ev, context);
LOG_DEBUG_S(context.Ctx, NKikimrServices::FLAT_TX_SCHEMESHARD,
DebugHint() << " HandleReply TEvProposeTransactionResult"
- << " CollectSchemaChanged: " << (collected ? "true" : "false"));
+ << " CollectPQConfigChanged: " << (collected ? "true" : "false"));
return TryPersistState(context);
}
-
-bool TPropose::HandleReply(TEvTxProcessing::TEvReadSet::TPtr& ev, TOperationContext& context)
+
+bool TPropose::HandleReply(TEvPersQueue::TEvProposeTransactionAttachResult::TPtr& ev, TOperationContext& context)
{
- const ui64 tabletId = ev->Get()->Record.GetTabletProducer();
- if (auto p = ReadSetAcks.find(tabletId); p != ReadSetAcks.end()) {
- TReadSetAck& ack = p->second;
+ const auto ssId = context.SS->SelfTabletId();
+ const auto& evRecord = ev->Get()->Record;
- if (!ack.Event) {
- ++ReadSetCount;
- }
+ LOG_INFO_S(context.Ctx, NKikimrServices::FLAT_TX_SCHEMESHARD,
+ DebugHint() << " HandleReply TEvProposeTransactionAttachResult"
+ << " triggers early"
+ << ", at schemeshard: " << ssId
+ << " message# " << evRecord.ShortDebugString());
- ack.Event =
- MakeHolder<TEvTxProcessing::TEvReadSetAck>(*ev->Get(),
- ui64(context.SS->SelfTabletId()));
- ack.Receiver = ev->Sender;
- }
+ const bool collected = CollectPQConfigChanged(OperationId, ev, context);
+ LOG_DEBUG_S(context.Ctx, NKikimrServices::FLAT_TX_SCHEMESHARD,
+ DebugHint() << " HandleReply TEvProposeTransactionAttachResult"
+ << " CollectPQConfigChanged: " << (collected ? "true" : "false"));
return TryPersistState(context);
}
@@ -754,9 +803,6 @@ void TPropose::PrepareShards(TTxState& txState, TSet<TTabletId>& shardSet, TOper
{
txState.UpdateShardsInProgress();
- ReadSetAcks.clear();
- ReadSetCount = 0;
-
for (const auto& shard : txState.Shards) {
const TShardIdx idx = shard.Idx;
//
@@ -769,68 +815,86 @@ void TPropose::PrepareShards(TTxState& txState, TSet<TTabletId>& shardSet, TOper
shardSet.insert(tablet);
- auto& ack = ReadSetAcks[ui64(tablet)];
- ack.Event = nullptr;
- ack.Receiver = TActorId();
+ //
+ // By this point, the SchemeShard tablet could restart and the actor ID changed. Therefore, we send
+ // the TEvProposeTransactionAttach message to the PQ tablets so that they recognize the new recipient
+ //
+ SendEvProposeTransactionAttach(idx, tablet, context);
} else {
txState.ShardsInProgress.erase(idx);
}
}
}
-bool TPropose::TryPersistState(TOperationContext& context)
+void TPropose::SendEvProposeTransactionAttach(TShardIdx shard, TTabletId tablet,
+ TOperationContext& context)
{
- if (ReadSetCount < ReadSetAcks.size()) {
- return false;
- }
-
- TTxState* txState = context.SS->FindTx(OperationId);
- Y_VERIFY(txState);
+ auto event =
+ MakeHolder<TEvPersQueue::TEvProposeTransactionAttach>(ui64(tablet),
+ ui64(OperationId.GetTxId()));
+ context.OnComplete.BindMsgToPipe(OperationId, tablet, shard, event.Release());
+}
- if (!txState->ShardsInProgress.empty()) {
+bool TPropose::CanPersistState(const TTxState& txState,
+ TOperationContext& context)
+{
+ if (!txState.ShardsInProgress.empty()) {
+ LOG_DEBUG_S(context.Ctx, NKikimrServices::FLAT_TX_SCHEMESHARD,
+ DebugHint() << " can't persist state: " <<
+ "ShardsInProgress is not empty, remain: " << txState.ShardsInProgress.size());
return false;
}
- const TPathId pathId = txState->TargetPathId;
- const TPathElement::TPtr path = context.SS->PathsById.at(pathId);
+ PathId = txState.TargetPathId;
+ Path = context.SS->PathsById.at(PathId);
- if (path->StepCreated == InvalidStepId) {
+ if (Path->StepCreated == InvalidStepId) {
+ LOG_DEBUG_S(context.Ctx, NKikimrServices::FLAT_TX_SCHEMESHARD,
+ DebugHint() << " can't persist state: " <<
+ "StepCreated is invalid");
return false;
}
+ return true;
+}
+
+void TPropose::PersistState(const TTxState& txState,
+ TOperationContext& context) const
+{
NIceDb::TNiceDb db(context.GetDB());
- if (txState->TxType == TTxState::TxCreatePQGroup || txState->TxType == TTxState::TxAllocatePQ) {
- auto parentDir = context.SS->PathsById.at(path->ParentPathId);
+ if (txState.TxType == TTxState::TxCreatePQGroup || txState.TxType == TTxState::TxAllocatePQ) {
+ auto parentDir = context.SS->PathsById.at(Path->ParentPathId);
++parentDir->DirAlterVersion;
context.SS->PersistPathDirAlterVersion(db, parentDir);
context.SS->ClearDescribePathCaches(parentDir);
context.OnComplete.PublishToSchemeBoard(OperationId, parentDir->PathId);
}
- context.SS->ClearDescribePathCaches(path);
- context.OnComplete.PublishToSchemeBoard(OperationId, pathId);
+ context.SS->ClearDescribePathCaches(Path);
+ context.OnComplete.PublishToSchemeBoard(OperationId, PathId);
- TTopicInfo::TPtr pqGroup = context.SS->Topics[pathId];
+ TTopicInfo::TPtr pqGroup = context.SS->Topics[PathId];
pqGroup->FinishAlter();
- context.SS->PersistPersQueueGroup(db, pathId, pqGroup);
- context.SS->PersistRemovePersQueueGroupAlter(db, pathId);
+ context.SS->PersistPersQueueGroup(db, PathId, pqGroup);
+ context.SS->PersistRemovePersQueueGroupAlter(db, PathId);
context.SS->ChangeTxState(db, OperationId, TTxState::Done);
-
- SendEvReadSetAck(context);
-
- return true;
}
-void TPropose::SendEvReadSetAck(TOperationContext& context)
+bool TPropose::TryPersistState(TOperationContext& context)
{
- for (auto& [_, ack] : ReadSetAcks) {
- Y_VERIFY(ack.Event);
+ TTxState* txState = context.SS->FindTx(OperationId);
+ Y_VERIFY(txState);
- context.Ctx.Send(ack.Receiver, ack.Event.Release());
+ if (!CanPersistState(*txState, context)) {
+ return false;
}
+
+ PersistState(*txState, context);
+
+ return true;
}
}
diff --git a/ydb/core/tx/schemeshard/schemeshard__operation_common.h b/ydb/core/tx/schemeshard/schemeshard__operation_common.h
index b9d2b83e5a3..c0addf8c929 100644
--- a/ydb/core/tx/schemeshard/schemeshard__operation_common.h
+++ b/ydb/core/tx/schemeshard/schemeshard__operation_common.h
@@ -686,7 +686,6 @@ public:
databaseId,
databasePath,
txState->TxType,
- ssId,
context);
} else {
event = MakeEvUpdateConfig(OperationId.GetTxId(),
@@ -851,7 +850,6 @@ private:
const TString& databaseId,
const TString& databasePath,
TTxState::ETxType txType,
- TTabletId ssId,
const TOperationContext& context);
static THolder<TEvPersQueue::TEvUpdateConfig>
MakeEvUpdateConfig(TTxId txId,
@@ -885,7 +883,7 @@ public:
}
bool HandleReply(TEvPersQueue::TEvProposeTransactionResult::TPtr& ev, TOperationContext& context) override;
- bool HandleReply(TEvTxProcessing::TEvReadSet::TPtr& ev, TOperationContext& context) override;
+ bool HandleReply(TEvPersQueue::TEvProposeTransactionAttachResult::TPtr& ev, TOperationContext& context) override;
bool HandleReply(TEvPrivate::TEvOperationPlan::TPtr& ev, TOperationContext& context) override {
TStepId step = TStepId(ev->Get()->StepId);
@@ -945,18 +943,18 @@ public:
}
private:
+ bool CanPersistState(const TTxState& txState,
+ TOperationContext& context);
+ void PersistState(const TTxState& txState,
+ TOperationContext& context) const;
bool TryPersistState(TOperationContext& context);
- void SendEvReadSetAck(TOperationContext& context);
+ void SendEvProposeTransactionAttach(TShardIdx shard, TTabletId tablet,
+ TOperationContext& context);
void PrepareShards(TTxState& txState, TSet<TTabletId>& shardSet, TOperationContext& context);
- struct TReadSetAck {
- THolder<TEvTxProcessing::TEvReadSetAck> Event;
- TActorId Receiver;
- };
-
- THashMap<ui64, TReadSetAck> ReadSetAcks;
- size_t ReadSetCount = 0;
+ TPathId PathId;
+ TPathElement::TPtr Path;
};
} // NPQState
diff --git a/ydb/core/tx/schemeshard/schemeshard__operation_part.h b/ydb/core/tx/schemeshard/schemeshard__operation_part.h
index 432d3e68643..441d19b6b60 100644
--- a/ydb/core/tx/schemeshard/schemeshard__operation_part.h
+++ b/ydb/core/tx/schemeshard/schemeshard__operation_part.h
@@ -75,7 +75,7 @@
action(TEvPrivate::TEvPrivate::TEvCompletePublication, NSchemeShard::TXTYPE_NOTIFY_OPERATION_COMPLETE_PUBLICATION) \
action(TEvPrivate::TEvPrivate::TEvCompleteBarrier, NSchemeShard::TXTYPE_NOTIFY_OPERATION_COMPLETE_BARRIER) \
\
- action(TEvTxProcessing::TEvReadSet, NSchemeShard::TXTYPE_READSETDATA)
+ action(TEvPersQueue::TEvProposeTransactionAttachResult, NSchemeShard::TXTYPE_PERSQUEUE_PROPOSE_ATTACH_RESULT)
namespace NKikimr {
diff --git a/ydb/core/tx/schemeshard/schemeshard_impl.cpp b/ydb/core/tx/schemeshard/schemeshard_impl.cpp
index cacaa80aa3e..0ca6dabc396 100644
--- a/ydb/core/tx/schemeshard/schemeshard_impl.cpp
+++ b/ydb/core/tx/schemeshard/schemeshard_impl.cpp
@@ -4381,7 +4381,7 @@ void TSchemeShard::StateWork(STFUNC_SIG) {
HFuncTraced(TEvSchemeShard::TEvLogin, Handle);
- HFuncTraced(TEvTxProcessing::TEvReadSet, Handle);
+ HFuncTraced(TEvPersQueue::TEvProposeTransactionAttachResult, Handle);
default:
if (!HandleDefaultEvents(ev, SelfId())) {
@@ -4949,36 +4949,25 @@ void TSchemeShard::Handle(TEvPrivate::TEvProgressOperation::TPtr &ev, const TAct
Execute(CreateTxOperationProgress(TOperationId(txId, ev->Get()->TxPartId)), ctx);
}
-void TSchemeShard::Handle(TEvTxProcessing::TEvReadSet::TPtr& ev, const TActorContext& ctx)
+void TSchemeShard::Handle(TEvPersQueue::TEvProposeTransactionAttachResult::TPtr& ev, const TActorContext& ctx)
{
- auto sendReadSetAck = [&]() {
- auto ack = std::make_unique<TEvTxProcessing::TEvReadSetAck>(*ev->Get(), TabletID());
- ctx.Send(ev->Sender, ack.release());
- };
-
const auto txId = TTxId(ev->Get()->Record.GetTxId());
if (!Operations.contains(txId)) {
LOG_WARN_S(ctx, NKikimrServices::FLAT_TX_SCHEMESHARD,
- "Got TEvTxProcessing::TEvReadSet"
- << " for unknown txId " << txId
- << " message " << ev->Get()->Record.ShortDebugString());
-
- sendReadSetAck();
-
+ "Got TEvPersQueue::TEvProposeTransactionAttachResult"
+ << " for unknown txId: " << txId
+ << " message: " << ev->Get()->Record.ShortDebugString());
return;
}
- const TTabletId tabletId(ev->Get()->Record.GetTabletSource());
- const TSubTxId partId = Operations.at(txId)->FindRelatedPartByTabletId(tabletId, ctx);
+ auto tabletId = TTabletId(ev->Get()->Record.GetTabletId());
+ TSubTxId partId = Operations.at(txId)->FindRelatedPartByTabletId(tabletId, ctx);
if (partId == InvalidSubTxId) {
LOG_WARN_S(ctx, NKikimrServices::FLAT_TX_SCHEMESHARD,
- "Got TEvProposeTransactionResult but partId is unknown"
+ "Got TEvPersQueue::TEvProposeTransactionAttachResult but partId is unknown"
<< ", for txId: " << txId
<< ", tabletId: " << tabletId
<< ", at schemeshard: " << TabletID());
-
- sendReadSetAck();
-
return;
}
diff --git a/ydb/core/tx/schemeshard/schemeshard_impl.h b/ydb/core/tx/schemeshard/schemeshard_impl.h
index 70efa73fc33..a96d2218087 100644
--- a/ydb/core/tx/schemeshard/schemeshard_impl.h
+++ b/ydb/core/tx/schemeshard/schemeshard_impl.h
@@ -930,7 +930,7 @@ public:
void Handle(TEvPrivate::TEvProgressOperation::TPtr &ev, const TActorContext &ctx);
- void Handle(TEvTxProcessing::TEvReadSet::TPtr& ev, const TActorContext& ctx);
+ void Handle(TEvPersQueue::TEvProposeTransactionAttachResult::TPtr& ev, const TActorContext& ctx);
void Handle(TEvTabletPipe::TEvClientConnected::TPtr &ev, const TActorContext &ctx);
void Handle(TEvTabletPipe::TEvClientDestroyed::TPtr &ev, const TActorContext &ctx);