diff options
author | abcdef <akotov@ydb.tech> | 2022-12-15 13:17:20 +0300 |
---|---|---|
committer | abcdef <akotov@ydb.tech> | 2022-12-15 13:17:20 +0300 |
commit | 079f43fbda3b5c26b969b5a84ce5933697b6a00a (patch) | |
tree | 6a6270dafd4b37be9502f3eb47d046ec79f70513 | |
parent | 34ed13a68befbfbd4e21088c20d6019311d7202d (diff) | |
download | ydb-079f43fbda3b5c26b969b5a84ce5933697b6a00a.tar.gz |
-rw-r--r-- | ydb/core/persqueue/events/internal.h | 76 | ||||
-rw-r--r-- | ydb/core/persqueue/key.h | 3 | ||||
-rw-r--r-- | ydb/core/persqueue/partition.cpp | 320 | ||||
-rw-r--r-- | ydb/core/persqueue/partition.h | 46 | ||||
-rw-r--r-- | ydb/core/persqueue/ut/user_action_processor_ut.cpp | 524 | ||||
-rw-r--r-- | ydb/core/protos/pqconfig.proto | 5 |
6 files changed, 884 insertions, 90 deletions
diff --git a/ydb/core/persqueue/events/internal.h b/ydb/core/persqueue/events/internal.h index 6d60b347d52..451b3d00e55 100644 --- a/ydb/core/persqueue/events/internal.h +++ b/ydb/core/persqueue/events/internal.h @@ -118,6 +118,11 @@ struct TEvPQ { EvRequestPartitionStatus, EvReaderEventArrived, EvMetering, + EvTxCalcPredicate, + EvTxCalcPredicateResult, + EvTxCommit, + EvTxCommitDone, + EvTxRollback, EvEnd }; @@ -655,6 +660,77 @@ struct TEvPQ { NPQ::EMeteringJson Type; ui64 Quantity; }; + + struct TEvTxCalcPredicate : public TEventLocal<TEvTxCalcPredicate, EvTxCalcPredicate> { + TEvTxCalcPredicate(ui64 step, ui64 txId) : + Step(step), + TxId(txId) + { + } + + void AddOperation(TString consumer, ui64 begin, ui64 end) { + NKikimrPQ::TPartitionOperation operation; + operation.SetBegin(begin); + operation.SetEnd(end); + operation.SetConsumer(std::move(consumer)); + + Operations.push_back(std::move(operation)); + } + + ui64 Step; + ui64 TxId; + TVector<NKikimrPQ::TPartitionOperation> Operations; + }; + + struct TEvTxCalcPredicateResult : public TEventLocal<TEvTxCalcPredicateResult, EvTxCalcPredicateResult> { + TEvTxCalcPredicateResult(ui64 step, ui64 txId, ui32 partition, bool predicate) : + Step(step), + TxId(txId), + Partition(partition), + Predicate(predicate) + { + } + + ui64 Step; + ui64 TxId; + ui32 Partition; + bool Predicate = false; + }; + + struct TEvTxCommit : public TEventLocal<TEvTxCommit, EvTxCommit> { + TEvTxCommit(ui64 step, ui64 txId) : + Step(step), + TxId(txId) + { + } + + ui64 Step; + ui64 TxId; + }; + + struct TEvTxCommitDone : public TEventLocal<TEvTxCommitDone, EvTxCommitDone> { + TEvTxCommitDone(ui64 step, ui64 txId, ui32 partition) : + Step(step), + TxId(txId), + Partition(partition) + { + } + + ui64 Step; + ui64 TxId; + ui32 Partition; + }; + + struct TEvTxRollback : public TEventLocal<TEvTxRollback, EvTxRollback> { + TEvTxRollback(ui64 step, ui64 txId) : + Step(step), + TxId(txId) + { + } + + ui64 Step; + ui64 TxId; + }; }; } //NKikimr diff --git a/ydb/core/persqueue/key.h b/ydb/core/persqueue/key.h index 356c0cf6df5..760a5ccbcec 100644 --- a/ydb/core/persqueue/key.h +++ b/ydb/core/persqueue/key.h @@ -16,7 +16,8 @@ public: TypeInfo = 'm', TypeData = 'd', TypeTmpData = 'x', - TypeMeta = 'i' + TypeMeta = 'i', + TypeTxMeta = 'I' }; enum EMark : char { diff --git a/ydb/core/persqueue/partition.cpp b/ydb/core/persqueue/partition.cpp index e85ce509bcf..4ccef73eda1 100644 --- a/ydb/core/persqueue/partition.cpp +++ b/ydb/core/persqueue/partition.cpp @@ -37,6 +37,17 @@ static const ui32 MAX_USERS = 1000; static const ui64 SET_OFFSET_COOKIE = 1; static const ui32 MAX_TXS = 1000; +auto GetStepAndTxId(ui64 step, ui64 txId) +{ + return std::make_pair(step, txId); +} + +template<class E> +auto GetStepAndTxId(const E& event) +{ + return GetStepAndTxId(event.Step, event.TxId); +} + struct TPartition::THasDataReq { ui64 Num; ui64 Offset; @@ -392,10 +403,17 @@ void RequestInfoRange(const TActorContext& ctx, const TActorId& dst, ui32 partit } void RequestMetaRead(const TActorContext& ctx, const TActorId& dst, ui32 partition) { + auto addKey = [](NKikimrClient::TKeyValueRequest& request, TKeyPrefix::EType type, ui32 partition) { + auto read = request.AddCmdRead(); + TKeyPrefix key{type, partition}; + read->SetKey(key.Data(), key.Size()); + }; + THolder<TEvKeyValue::TEvRequest> request(new TEvKeyValue::TEvRequest); - auto read = request->Record.AddCmdRead(); - TKeyPrefix key{TKeyPrefix::TypeMeta, partition}; - read->SetKey(key.Data(), key.Size()); + + addKey(request->Record, TKeyPrefix::TypeMeta, partition); + addKey(request->Record, TKeyPrefix::TypeTxMeta, partition); + ctx.Send(dst, request.Release()); } @@ -460,7 +478,8 @@ void TPartition::FillReadFromTimestamps(const NKikimrPQ::TPQTabletConfig& config TPartition::TPartition(ui64 tabletId, ui32 partition, const TActorId& tablet, const TActorId& blobCache, const NPersQueue::TTopicConverterPtr& topicConverter, bool isLocalDC, TString dcId, bool isServerless, const NKikimrPQ::TPQTabletConfig& config, const TTabletCountersBase& counters, - bool newPartition) + bool newPartition, + TVector<TTransaction> distrTxs) : TabletID(tabletId) , Partition(partition) , Config(config) @@ -516,6 +535,12 @@ TPartition::TPartition(ui64 tabletId, ui32 partition, const TActorId& tablet, co } TabletCounters.Populate(counters); + + if (!distrTxs.empty()) { + std::move(distrTxs.begin(), distrTxs.end(), + std::back_inserter(DistrTxs)); + TxInProgress = DistrTxs.front().Predicate.Defined(); + } } void TPartition::HandleMonitoring(TEvPQ::TEvMonRequest::TPtr& ev, const TActorContext& ctx) { @@ -1392,21 +1417,14 @@ void TPartition::HandleGetDiskStatus(const NKikimrClient::TResponse& response, c RequestMetaRead(ctx, Tablet, Partition); } -void TPartition::HandleMetaRead(const NKikimrClient::TKeyValueResponse::TReadResult& response, const TActorContext& ctx) { - NKikimrPQ::TPartitionMeta meta; - switch (response.GetStatus()) { - case NKikimrProto::OK: { - bool res = meta.ParseFromString(response.GetValue()); - Y_VERIFY(res); - /* Bring back later, when switch to 21-2 will be unable - StartOffset = meta.GetStartOffset(); - EndOffset = meta.GetEndOffset(); - if (StartOffset == EndOffset) { - NewHead.Offset = Head.Offset = EndOffset; - } - */ +void TPartition::HandleMetaRead(const NKikimrClient::TResponse& response, const TActorContext& ctx) +{ + auto handleReadResult = [&](const NKikimrClient::TKeyValueResponse::TReadResult& response, + auto&& action) { + switch (response.GetStatus()) { + case NKikimrProto::OK: + action(response); break; - } case NKikimrProto::NODATA: break; case NKikimrProto::ERROR: @@ -1419,14 +1437,41 @@ void TPartition::HandleMetaRead(const NKikimrClient::TKeyValueResponse::TReadRes default: Cerr << "ERROR " << response.GetStatus() << "\n"; Y_FAIL("bad status"); + }; + }; + + auto loadMeta = [](const NKikimrClient::TKeyValueResponse::TReadResult& response) { + NKikimrPQ::TPartitionMeta meta; + bool res = meta.ParseFromString(response.GetValue()); + Y_VERIFY(res); + /* Bring back later, when switch to 21-2 will be unable + StartOffset = meta.GetStartOffset(); + EndOffset = meta.GetEndOffset(); + if (StartOffset == EndOffset) { + NewHead.Offset = Head.Offset = EndOffset; + } + */ }; + handleReadResult(response.GetReadResult(0), loadMeta); + + auto loadTxMeta = [this](const NKikimrClient::TKeyValueResponse::TReadResult& response) { + NKikimrPQ::TPartitionTxMeta meta; + bool res = meta.ParseFromString(response.GetValue()); + Y_VERIFY(res); + + if (meta.HasPlanStep()) { + PlanStep = meta.GetPlanStep(); + } + if (meta.HasTxId()) { + TxId = meta.GetTxId(); + } + }; + handleReadResult(response.GetReadResult(1), loadTxMeta); InitState = WaitInfoRange; RequestInfoRange(ctx, Tablet, Partition, ""); } - - void TPartition::HandleInfoRangeRead(const NKikimrClient::TKeyValueResponse::TReadRangeResult& range, const TActorContext& ctx) { //megaqc check here all results Y_VERIFY(range.HasStatus()); @@ -1686,8 +1731,8 @@ void TPartition::HandleOnInit(TEvKeyValue::TEvResponse::TPtr& ev, const TActorCo HandleGetDiskStatus(response, ctx); break; case WaitMetaRead: - Y_VERIFY(response.ReadResultSize() == 1); - HandleMetaRead(response.GetReadResult(0), ctx); + Y_VERIFY(response.ReadResultSize() == 2); + HandleMetaRead(response, ctx); break; case WaitInfoRange: Y_VERIFY(response.ReadRangeResultSize() == 1); @@ -2269,6 +2314,77 @@ void TPartition::Handle(TEvPersQueue::TEvProposeTransaction::TPtr& ev, const TAc ProcessTxsAndUserActs(ctx); } +void TPartition::Handle(TEvPQ::TEvTxCalcPredicate::TPtr& ev, const TActorContext& ctx) +{ + AddDistrTx(ev->Release()); + + ProcessTxsAndUserActs(ctx); +} + +void TPartition::Handle(TEvPQ::TEvTxCommit::TPtr& ev, const TActorContext& ctx) +{ + if (PlanStep.Defined() && TxId.Defined()) { + if (GetStepAndTxId(*ev->Get()) <= GetStepAndTxId(*PlanStep, *TxId)) { + ctx.Send(Tablet, MakeCommitDone(ev->Get()->Step, ev->Get()->TxId).Release()); + return; + } + } + + Y_VERIFY(TxInProgress); + + Y_VERIFY(!DistrTxs.empty()); + TTransaction& t = DistrTxs.front(); + + Y_VERIFY(GetStepAndTxId(*ev->Get()) == GetStepAndTxId(*t.Tx)); + Y_VERIFY(t.Predicate.Defined() && *t.Predicate); + + for (auto& operation : t.Tx->Operations) { + TUserInfo& userInfo = GetOrCreatePendingUser(operation.GetConsumer(), ctx); + + Y_VERIFY(userInfo.Offset == (i64)operation.GetBegin()); + + userInfo.Offset = operation.GetEnd(); + userInfo.Session = ""; + } + + PlanStep = t.Tx->Step; + TxId = t.Tx->TxId; + TxIdHasChanged = true; + + ScheduleReplyCommitDone(t.Tx->Step, t.Tx->TxId); + + RemoveDistrTx(); + TxInProgress = false; + + ContinueProcessTxsAndUserActs(ctx); +} + +void TPartition::Handle(TEvPQ::TEvTxRollback::TPtr& ev, const TActorContext& ctx) +{ + if (PlanStep.Defined() && TxId.Defined()) { + if (GetStepAndTxId(*ev->Get()) <= GetStepAndTxId(*PlanStep, *TxId)) { + return; + } + } + + Y_VERIFY(TxInProgress); + + Y_VERIFY(!DistrTxs.empty()); + TTransaction& t = DistrTxs.front(); + + Y_VERIFY(GetStepAndTxId(*ev->Get()) == GetStepAndTxId(*t.Tx)); + Y_VERIFY(t.Predicate.Defined()); + + PlanStep = t.Tx->Step; + TxId = t.Tx->TxId; + TxIdHasChanged = true; + + RemoveDistrTx(); + TxInProgress = false; + + ContinueProcessTxsAndUserActs(ctx); +} + void TPartition::Handle(TEvPQ::TEvSetClientInfo::TPtr& ev, const TActorContext& ctx) { if (size_t count = GetUserActCount(ev->Get()->ClientId); count > MAX_USER_ACTS) { TabletCounters.Cumulative()[COUNTER_PQ_SET_CLIENT_OFFSET_ERROR].Increment(1); @@ -3604,9 +3720,17 @@ void TPartition::HandleSetOffsetResponse(ui64 cookie, const TActorContext& ctx) AffectedUsers.clear(); UsersInfoWriteInProgress = false; + + TxIdHasChanged = false; + ProcessTxsAndUserActs(ctx); } +void TPartition::AddDistrTx(TSimpleSharedPtr<TEvPQ::TEvTxCalcPredicate> event) +{ + DistrTxs.emplace_back(std::move(event)); +} + void TPartition::AddImmediateTx(TSimpleSharedPtr<TEvPersQueue::TEvProposeTransaction> tx) { ImmediateTxs.push_back(std::move(tx)); @@ -3651,7 +3775,7 @@ size_t TPartition::GetUserActCount(const TString& consumer) const void TPartition::ProcessTxsAndUserActs(const TActorContext& ctx) { - if (UsersInfoWriteInProgress || (ImmediateTxs.empty() && UserActs.empty())) { + if (UsersInfoWriteInProgress || (ImmediateTxs.empty() && UserActs.empty() && DistrTxs.empty()) || TxInProgress) { return; } @@ -3659,35 +3783,101 @@ void TPartition::ProcessTxsAndUserActs(const TActorContext& ctx) Y_VERIFY(Replies.empty()); Y_VERIFY(AffectedUsers.empty()); + ContinueProcessTxsAndUserActs(ctx); +} + +void TPartition::ContinueProcessTxsAndUserActs(const TActorContext& ctx) +{ + if (!DistrTxs.empty()) { + ProcessDistrTxs(ctx); + + if (!DistrTxs.empty()) { + return; + } + } + ProcessUserActs(ctx); ProcessImmediateTxs(ctx); THolder<TEvKeyValue::TEvRequest> request(new TEvKeyValue::TEvRequest); - request->Record.SetCookie(SET_OFFSET_COOKIE); - for (auto& user : AffectedUsers) { - TKeyPrefix ikey(TKeyPrefix::TypeInfo, Partition, TKeyPrefix::MarkUser); - ikey.Append(user.c_str(), user.size()); - TKeyPrefix ikeyDeprecated(TKeyPrefix::TypeInfo, Partition, TKeyPrefix::MarkUserDeprecated); - ikeyDeprecated.Append(user.c_str(), user.size()); - - if (TUserInfo* userInfo = GetPendingUserIfExists(user)) { - AddCmdWrite(request->Record, - ikey, ikeyDeprecated, - userInfo->Offset, userInfo->Generation, userInfo->Step, userInfo->Session, - userInfo->ReadOffsetRewindSum, - userInfo->ReadRuleGeneration); - } else { - AddCmdDeleteRange(request->Record, - ikey, ikeyDeprecated); - } + if (TxIdHasChanged) { + AddCmdWriteTxMeta(request->Record, + *PlanStep, *TxId); } + AddCmdWriteUserInfos(request->Record); ctx.Send(Tablet, request.Release()); UsersInfoWriteInProgress = true; } +void TPartition::RemoveDistrTx() +{ + Y_VERIFY(!DistrTxs.empty()); + + DistrTxs.pop_front(); +} + +void TPartition::ProcessDistrTxs(const TActorContext& ctx) +{ + Y_VERIFY(!TxInProgress); + + while (!TxInProgress && !DistrTxs.empty()) { + ProcessDistrTx(ctx); + } +} + +void TPartition::ProcessDistrTx(const TActorContext& ctx) +{ + Y_VERIFY(!TxInProgress); + + Y_VERIFY(!DistrTxs.empty()); + TTransaction& t = DistrTxs.front(); + + bool predicate = true; + + for (auto& operation : t.Tx->Operations) { + const TString& consumer = operation.GetConsumer(); + + if (!UsersInfoStorage.GetIfExists(consumer)) { + predicate = false; + break; + } + + bool isAffectedConsumer = AffectedUsers.contains(consumer); + TUserInfo& userInfo = GetOrCreatePendingUser(consumer, ctx); + + if (operation.GetBegin() > operation.GetEnd()) { + // BAD_REQUEST + predicate = false; + } else if (userInfo.Offset != (i64)operation.GetBegin()) { + // ABORTED + predicate = false; + } else if (operation.GetEnd() > EndOffset) { + // BAD_REQUEST + predicate = false; + } + + if (!predicate) { + if (!isAffectedConsumer) { + AffectedUsers.erase(consumer); + } + break; + } + } + + t.Predicate = predicate; + + auto response = MakeHolder<TEvPQ::TEvTxCalcPredicateResult>(t.Tx->Step, + t.Tx->TxId, + Partition, + predicate); + ctx.Send(Tablet, response.Release()); + + TxInProgress = true; +} + void TPartition::ProcessImmediateTxs(const TActorContext& ctx) { Y_VERIFY(!UsersInfoWriteInProgress); @@ -3973,6 +4163,12 @@ void TPartition::ScheduleReplyPropose(const NKikimrPQ::TEvProposeTransaction& ev statusCode).Release()); } +void TPartition::ScheduleReplyCommitDone(ui64 step, ui64 txId) +{ + Replies.emplace_back(Tablet, + MakeCommitDone(step, txId).Release()); +} + void TPartition::AddCmdDeleteRange(NKikimrClient::TKeyValueRequest& request, const TKeyPrefix& ikey, const TKeyPrefix& ikeyDeprecated) { @@ -4026,6 +4222,45 @@ void TPartition::AddCmdWrite(NKikimrClient::TKeyValueRequest& request, write->SetStorageChannel(NKikimrClient::TKeyValueRequest::INLINE); } +void TPartition::AddCmdWriteTxMeta(NKikimrClient::TKeyValueRequest& request, + ui64 step, ui64 txId) +{ + TKeyPrefix ikey(TKeyPrefix::TypeTxMeta, Partition); + + NKikimrPQ::TPartitionTxMeta meta; + meta.SetPlanStep(step); + meta.SetTxId(txId); + + TString out; + Y_PROTOBUF_SUPPRESS_NODISCARD meta.SerializeToString(&out); + + auto write = request.AddCmdWrite(); + write->SetKey(ikey.Data(), ikey.Size()); + write->SetValue(out.c_str(), out.size()); + write->SetStorageChannel(NKikimrClient::TKeyValueRequest::INLINE); +} + +void TPartition::AddCmdWriteUserInfos(NKikimrClient::TKeyValueRequest& request) +{ + for (auto& user : AffectedUsers) { + TKeyPrefix ikey(TKeyPrefix::TypeInfo, Partition, TKeyPrefix::MarkUser); + ikey.Append(user.c_str(), user.size()); + TKeyPrefix ikeyDeprecated(TKeyPrefix::TypeInfo, Partition, TKeyPrefix::MarkUserDeprecated); + ikeyDeprecated.Append(user.c_str(), user.size()); + + if (TUserInfo* userInfo = GetPendingUserIfExists(user)) { + AddCmdWrite(request, + ikey, ikeyDeprecated, + userInfo->Offset, userInfo->Generation, userInfo->Step, userInfo->Session, + userInfo->ReadOffsetRewindSum, + userInfo->ReadRuleGeneration); + } else { + AddCmdDeleteRange(request, + ikey, ikeyDeprecated); + } + } +} + TUserInfo& TPartition::GetOrCreatePendingUser(const TString& user, const TActorContext& ctx, TMaybe<ui64> readRuleGeneration) @@ -4125,6 +4360,11 @@ THolder<TEvPersQueue::TEvProposeTransactionResult> TPartition::MakeReplyPropose( return response; } +THolder<TEvPQ::TEvTxCommitDone> TPartition::MakeCommitDone(ui64 step, ui64 txId) +{ + return MakeHolder<TEvPQ::TEvTxCommitDone>(step, txId, Partition); +} + void TPartition::ScheduleUpdateAvailableSize(const TActorContext& ctx) { ctx.Schedule(UPDATE_AVAIL_SIZE_INTERVAL, new TEvPQ::TEvUpdateAvailableSize()); } diff --git a/ydb/core/persqueue/partition.h b/ydb/core/persqueue/partition.h index 10d88373a80..1c0ed6d6b88 100644 --- a/ydb/core/persqueue/partition.h +++ b/ydb/core/persqueue/partition.h @@ -36,6 +36,19 @@ void CalcTopicWriteQuotaParams(const NKikimrPQ::TPQConfig& pqConfig, class TKeyLevel; struct TMirrorerInfo; +struct TTransaction { + explicit TTransaction(TSimpleSharedPtr<TEvPQ::TEvTxCalcPredicate> tx, + TMaybe<bool> predicate = Nothing()) : + Tx(tx), + Predicate(predicate) + { + Y_VERIFY(Tx); + } + + TSimpleSharedPtr<TEvPQ::TEvTxCalcPredicate> Tx; + TMaybe<bool> Predicate; +}; + class TPartition : public TActorBootstrapped<TPartition> { private: static const ui32 MAX_ERRORS_COUNT_TO_STORE = 10; @@ -91,6 +104,9 @@ private: void Handle(TEvPQ::TEvUpdateWriteTimestamp::TPtr& ev, const TActorContext& ctx); void Handle(TEvPersQueue::TEvHasDataInfo::TPtr& ev, const TActorContext& ctx); void Handle(TEvPersQueue::TEvProposeTransaction::TPtr& ev, const TActorContext& ctx); + void Handle(TEvPQ::TEvTxCalcPredicate::TPtr& ev, const TActorContext& ctx); + void Handle(TEvPQ::TEvTxCommit::TPtr& ev, const TActorContext& ctx); + void Handle(TEvPQ::TEvTxRollback::TPtr& ev, const TActorContext& ctx); void Handle(TEvPersQueue::TEvReportPartitionError::TPtr& ev, const TActorContext& ctx); void Handle(TEvQuota::TEvClearance::TPtr& ev, const TActorContext& ctx); void Handle(TEvents::TEvPoisonPill::TPtr& ev, const TActorContext& ctx); @@ -98,7 +114,7 @@ private: void HandleDataRead(const NKikimrClient::TResponse& range, const TActorContext& ctx); void HandleGetDiskStatus(const NKikimrClient::TResponse& res, const TActorContext& ctx); void HandleInfoRangeRead(const NKikimrClient::TKeyValueResponse::TReadRangeResult& range, const TActorContext& ctx); - void HandleMetaRead(const NKikimrClient::TKeyValueResponse::TReadResult& response, const TActorContext& ctx); + void HandleMetaRead(const NKikimrClient::TResponse& response, const TActorContext& ctx); void HandleMonitoring(TEvPQ::TEvMonRequest::TPtr& ev, const TActorContext& ctx); void HandleOnIdle(TEvPQ::TEvDeregisterMessageGroup::TPtr& ev, const TActorContext& ctx); void HandleOnIdle(TEvPQ::TEvRegisterMessageGroup::TPtr& ev, const TActorContext& ctx); @@ -176,6 +192,12 @@ private: ui64 GetUsedStorage(const TActorContext& ctx); void ProcessTxsAndUserActs(const TActorContext& ctx); + void ContinueProcessTxsAndUserActs(const TActorContext& ctx); + + void AddDistrTx(TSimpleSharedPtr<TEvPQ::TEvTxCalcPredicate> event); + void RemoveDistrTx(); + void ProcessDistrTxs(const TActorContext& ctx); + void ProcessDistrTx(const TActorContext& ctx); void AddImmediateTx(TSimpleSharedPtr<TEvPersQueue::TEvProposeTransaction> event); void RemoveImmediateTx(); @@ -203,12 +225,16 @@ private: const TString& error); void ScheduleReplyPropose(const NKikimrPQ::TEvProposeTransaction& event, NKikimrPQ::TEvProposeTransactionResult::EStatus statusCode); + void ScheduleReplyCommitDone(ui64 step, ui64 txId); void AddCmdWrite(NKikimrClient::TKeyValueRequest& request, const TKeyPrefix& ikey, const TKeyPrefix& ikeyDeprecated, ui64 offset, ui32 gen, ui32 step, const TString& session, ui64 readOffsetRewindSum, ui64 readRuleGeneration); + void AddCmdWriteTxMeta(NKikimrClient::TKeyValueRequest& request, + ui64 step, ui64 txId); + void AddCmdWriteUserInfos(NKikimrClient::TKeyValueRequest& request); void AddCmdDeleteRange(NKikimrClient::TKeyValueRequest& request, const TKeyPrefix& ikey, const TKeyPrefix& ikeyDeprecated); @@ -224,6 +250,7 @@ private: const TString& error); THolder<TEvPersQueue::TEvProposeTransactionResult> MakeReplyPropose(const NKikimrPQ::TEvProposeTransaction& event, NKikimrPQ::TEvProposeTransactionResult::EStatus statusCode); + THolder<TEvPQ::TEvTxCommitDone> MakeCommitDone(ui64 step, ui64 txId); public: static constexpr NKikimrServices::TActivity::EType ActorActivityType() { @@ -233,7 +260,8 @@ public: TPartition(ui64 tabletId, ui32 partition, const TActorId& tablet, const TActorId& blobCache, const NPersQueue::TTopicConverterPtr& topicConverter, bool isLocalDC, TString dcId, bool isServerless, const NKikimrPQ::TPQTabletConfig& config, const TTabletCountersBase& counters, - bool newPartition = false); + bool newPartition = false, + TVector<TTransaction> distrTxs = {}); void Bootstrap(const TActorContext& ctx); @@ -329,6 +357,9 @@ private: HFuncTraced(TEvPQ::TEvDeregisterMessageGroup, HandleOnIdle); HFuncTraced(TEvPQ::TEvSplitMessageGroup, HandleOnIdle); HFuncTraced(TEvPersQueue::TEvProposeTransaction, Handle); + HFuncTraced(TEvPQ::TEvTxCalcPredicate, Handle); + HFuncTraced(TEvPQ::TEvTxCommit, Handle); + HFuncTraced(TEvPQ::TEvTxRollback, Handle); default: LOG_ERROR_S(ctx, NKikimrServices::PERSQUEUE, "Unexpected " << EventStr("StateIdle", ev)); @@ -378,6 +409,9 @@ private: HFuncTraced(TEvPQ::TEvDeregisterMessageGroup, HandleOnWrite); HFuncTraced(TEvPQ::TEvSplitMessageGroup, HandleOnWrite); HFuncTraced(TEvPersQueue::TEvProposeTransaction, Handle); + HFuncTraced(TEvPQ::TEvTxCalcPredicate, Handle); + HFuncTraced(TEvPQ::TEvTxCommit, Handle); + HFuncTraced(TEvPQ::TEvTxRollback, Handle); default: LOG_ERROR_S(ctx, NKikimrServices::PERSQUEUE, "Unexpected " << EventStr("StateWrite", ev)); @@ -390,7 +424,8 @@ private: WaitInfoRange, WaitDataRange, WaitDataRead, - WaitMetaRead + WaitMetaRead, + WaitTxInfo }; ui64 TabletID; @@ -452,11 +487,16 @@ private: // std::deque<TSimpleSharedPtr<TEvPQ::TEvSetClientInfo>> UserActs; std::deque<TSimpleSharedPtr<TEvPersQueue::TEvProposeTransaction>> ImmediateTxs; + std::deque<TTransaction> DistrTxs; THashMap<TString, size_t> UserActCount; THashMap<TString, TUserInfo> PendingUsersInfo; TVector<std::pair<TActorId, std::unique_ptr<IEventBase>>> Replies; THashSet<TString> AffectedUsers; bool UsersInfoWriteInProgress = false; + bool TxInProgress = false; + TMaybe<ui64> PlanStep; + TMaybe<ui64> TxId; + bool TxIdHasChanged = false; // // // diff --git a/ydb/core/persqueue/ut/user_action_processor_ut.cpp b/ydb/core/persqueue/ut/user_action_processor_ut.cpp index 5ea3c4b821e..275d163724d 100644 --- a/ydb/core/persqueue/ut/user_action_processor_ut.cpp +++ b/ydb/core/persqueue/ut/user_action_processor_ut.cpp @@ -23,6 +23,29 @@ namespace NKikimr::NPQ { Y_UNIT_TEST_SUITE(TUserActionProcessorTests) { +namespace NHelpers { + +struct TCreatePartitionParams { + ui32 Partition = 1; + ui64 Begin = 0; + ui64 End = 0; + TMaybe<ui64> PlanStep; + TMaybe<ui64> TxId; + TVector<TTransaction> Transactions; +}; + +struct TCreateConsumerParams { + TString Consumer; + ui64 Offset = 0; + ui32 Generation = 0; + ui32 Step = 0; + TString Session; + ui64 OffsetRewindSum = 0; + ui64 ReadRuleGeneration = 0; +}; + +} + class TUserActionProcessorFixture : public NUnitTest::TBaseFixture { protected: struct TUserInfoMatcher { @@ -32,6 +55,13 @@ protected: TMaybe<ui32> Step; }; + struct TCmdWriteMatcher { + TMaybe<size_t> Count; + TMaybe<ui64> PlanStep; + TMaybe<ui64> TxId; + THashMap<size_t, TUserInfoMatcher> UserInfos; + }; + struct TProxyResponseMatcher { TMaybe<ui64> Cookie; TMaybe<NMsgBusProxy::EResponseStatus> Status; @@ -50,11 +80,31 @@ protected: TMaybe<NKikimrPQ::TEvProposeTransactionResult::EStatus> Status; }; + struct TCalcPredicateMatcher { + TMaybe<ui64> Step; + TMaybe<ui64> TxId; + TMaybe<ui32> Partition; + TMaybe<bool> Predicate; + }; + + struct TCommitTxDoneMatcher { + TMaybe<ui64> Step; + TMaybe<ui64> TxId; + TMaybe<ui32> Partition; + }; + + using TCreatePartitionParams = NHelpers::TCreatePartitionParams; + using TCreateConsumerParams = NHelpers::TCreateConsumerParams; + void SetUp(NUnitTest::TTestContext&) override; void TearDown(NUnitTest::TTestContext&) override; - void CreatePartitionActor(ui32 partition = 1, bool newPartition = true); - void CreatePartition(ui32 partition = 1, ui64 begin = 0, ui64 end = 0); + void CreatePartitionActor(ui32 partition, + const TVector<TCreateConsumerParams>& consumers, + bool newPartition, + TVector<TTransaction> txs); + void CreatePartition(const TCreatePartitionParams& params = {}, + const TVector<TCreateConsumerParams>& consumers = {}); void CreateSession(const TString& clientId, const TString& sessionId, @@ -77,8 +127,7 @@ protected: const TString& sessionId); void SendGetOffset(ui64 cookie, const TString& clientId); - void WaitCmdWrite(size_t count, - const THashMap<size_t, TUserInfoMatcher>& matchers = {}); + void WaitCmdWrite(const TCmdWriteMatcher& matcher = {}); void SendCmdWriteResponse(NMsgBusProxy::EResponseStatus status); void WaitProxyResponse(const TProxyResponseMatcher &matcher = {}); void WaitErrorResponse(const TErrorMatcher& matcher = {}); @@ -86,9 +135,10 @@ protected: void WaitDiskStatusRequest(); void SendDiskStatusResponse(); void WaitMetaReadRequest(); - void SendMetaReadResponse(); + void SendMetaReadResponse(TMaybe<ui64> step, TMaybe<ui64> txId); void WaitInfoRangeRequest(); - void SendInfoRangeResponse(); + void SendInfoRangeResponse(ui32 partition, + const TVector<TCreateConsumerParams>& consumers); void WaitDataRangeRequest(); void SendDataRangeResponse(ui64 begin, ui64 end); void WaitDataReadRequest(); @@ -102,6 +152,22 @@ protected: ui64 txId); void WaitProposeTransactionResponse(const TProposeTransactionResponseMatcher& matcher = {}); + void SendCalcPredicate(ui64 step, + ui64 txId, + const TString& consumer, + ui64 begin, + ui64 end); + void WaitCalcPredicateResult(const TCalcPredicateMatcher& matcher = {}); + + void SendCommitTx(ui64 step, ui64 txId); + void SendRollbackTx(ui64 step, ui64 txId); + void WaitCommitTxDone(const TCommitTxDoneMatcher& matcher = {}); + + TTransaction MakeTransaction(ui64 step, ui64 txId, + TString consumer, + ui64 begin, ui64 end, + TMaybe<bool> predicate = Nothing()); + private: TMaybe<TTestContext> Ctx; TMaybe<TFinalizer> Finalizer; @@ -122,7 +188,10 @@ void TUserActionProcessorFixture::TearDown(NUnitTest::TTestContext&) { } -void TUserActionProcessorFixture::CreatePartitionActor(ui32 id, bool newPartition) +void TUserActionProcessorFixture::CreatePartitionActor(ui32 id, + const TVector<TCreateConsumerParams>& consumers, + bool newPartition, + TVector<TTransaction> txs) { using TKeyValueCounters = TProtobufTabletCounters< NKeyValue::ESimpleCounters_descriptor, @@ -147,6 +216,10 @@ void TUserActionProcessorFixture::CreatePartitionActor(ui32 id, bool newPartitio NPersQueue::TTopicConverterPtr topicConverter; NKikimrPQ::TPQTabletConfig config; + for (auto& c : consumers) { + config.AddReadRules(c.Consumer); + } + config.SetTopicName("rt3.dc1--account--topic"); config.SetTopicPath("/Root/PQ/rt3.dc1--account--topic"); config.SetFederationAccount("account"); @@ -165,28 +238,30 @@ void TUserActionProcessorFixture::CreatePartitionActor(ui32 id, bool newPartitio false, config, *tabletCounters, - newPartition); + newPartition, + std::move(txs)); ActorId = Ctx->Runtime->Register(actor); } -void TUserActionProcessorFixture::CreatePartition(ui32 id, ui64 begin, ui64 end) +void TUserActionProcessorFixture::CreatePartition(const TCreatePartitionParams& params, + const TVector<TCreateConsumerParams>& consumers) { - if ((begin == 0) && (end == 0)) { - CreatePartitionActor(id, true); + if ((params.Begin == 0) && (params.End == 0)) { + CreatePartitionActor(params.Partition, consumers, true, {}); } else { - CreatePartitionActor(id, false); + CreatePartitionActor(params.Partition, consumers, false, params.Transactions); WaitDiskStatusRequest(); SendDiskStatusResponse(); WaitMetaReadRequest(); - SendMetaReadResponse(); + SendMetaReadResponse(params.PlanStep, params.TxId); WaitInfoRangeRequest(); - SendInfoRangeResponse(); + SendInfoRangeResponse(params.Partition, consumers); WaitDataRangeRequest(); - SendDataRangeResponse(begin, end); + SendDataRangeResponse(params.Begin, params.End); } } @@ -196,7 +271,7 @@ void TUserActionProcessorFixture::CreateSession(const TString& clientId, ui64 cookie) { SendCreateSession(cookie,clientId,sessionId, generation, step); - WaitCmdWrite(2, {{0, {.Session = sessionId, .Offset = 0}}}); + WaitCmdWrite({.Count=2, .UserInfos={{0, {.Session = sessionId, .Offset = 0}}}}); SendCmdWriteResponse(NMsgBusProxy::MSTATUS_OK); WaitProxyResponse({.Cookie = cookie}); } @@ -208,7 +283,7 @@ void TUserActionProcessorFixture::SetOffset(const TString& clientId, ui64 cookie) { SendSetOffset(cookie, clientId, offset, sessionId); - WaitCmdWrite(2, {{0, {.Session = sessionId, .Offset = (expected ? *expected : offset)}}}); + WaitCmdWrite({.Count=2, .UserInfos={{0, {.Session = sessionId, .Offset = (expected ? *expected : offset)}}}}); SendCmdWriteResponse(NMsgBusProxy::MSTATUS_OK); WaitProxyResponse({.Cookie = cookie}); } @@ -251,36 +326,51 @@ void TUserActionProcessorFixture::SendGetOffset(ui64 cookie, Ctx->Runtime->SingleSys()->Send(new IEventHandle(ActorId, Ctx->Edge, event.Release())); } -void TUserActionProcessorFixture::WaitCmdWrite(size_t count, - const THashMap<size_t, TUserInfoMatcher>& matchers) +void TUserActionProcessorFixture::WaitCmdWrite(const TCmdWriteMatcher& matcher) { auto event = Ctx->Runtime->GrabEdgeEvent<TEvKeyValue::TEvRequest>(); UNIT_ASSERT(event != nullptr); UNIT_ASSERT_VALUES_EQUAL(event->Record.GetCookie(), 1); // SET_OFFSET_COOKIE - UNIT_ASSERT_VALUES_EQUAL(event->Record.CmdWriteSize(), count); - for (auto& [index, matcher] : matchers) { - UNIT_ASSERT(index < count); + if (matcher.Count.Defined()) { + UNIT_ASSERT_VALUES_EQUAL(*matcher.Count, event->Record.CmdWriteSize()); + } + if (matcher.PlanStep.Defined()) { + NKikimrPQ::TPartitionTxMeta meta; + UNIT_ASSERT(meta.ParseFromString(event->Record.GetCmdWrite(0).GetValue())); + + UNIT_ASSERT_VALUES_EQUAL(*matcher.PlanStep, meta.GetPlanStep()); + } + if (matcher.TxId.Defined()) { + NKikimrPQ::TPartitionTxMeta meta; + UNIT_ASSERT(meta.ParseFromString(event->Record.GetCmdWrite(0).GetValue())); + + UNIT_ASSERT_VALUES_EQUAL(*matcher.TxId, meta.GetTxId()); + } + for (auto& [index, userInfo] : matcher.UserInfos) { + if (matcher.Count.Defined()) { + UNIT_ASSERT(index < *matcher.Count); + } NKikimrPQ::TUserInfo ud; UNIT_ASSERT(ud.ParseFromString(event->Record.GetCmdWrite(index).GetValue())); - if (matcher.Session) { + if (userInfo.Session) { UNIT_ASSERT(ud.HasSession()); - UNIT_ASSERT_VALUES_EQUAL(*matcher.Session, ud.GetSession()); + UNIT_ASSERT_VALUES_EQUAL(*userInfo.Session, ud.GetSession()); } - if (matcher.Generation) { + if (userInfo.Generation) { UNIT_ASSERT(ud.HasGeneration()); - UNIT_ASSERT_VALUES_EQUAL(*matcher.Generation, ud.GetGeneration()); + UNIT_ASSERT_VALUES_EQUAL(*userInfo.Generation, ud.GetGeneration()); } - if (matcher.Step) { + if (userInfo.Step) { UNIT_ASSERT(ud.HasStep()); - UNIT_ASSERT_VALUES_EQUAL(*matcher.Step, ud.GetStep()); + UNIT_ASSERT_VALUES_EQUAL(*userInfo.Step, ud.GetStep()); } - if (matcher.Offset) { + if (userInfo.Offset) { UNIT_ASSERT(ud.HasOffset()); - UNIT_ASSERT_VALUES_EQUAL(*matcher.Offset, ud.GetOffset()); + UNIT_ASSERT_VALUES_EQUAL(*userInfo.Offset, ud.GetOffset()); } } } @@ -363,17 +453,43 @@ void TUserActionProcessorFixture::WaitMetaReadRequest() auto event = Ctx->Runtime->GrabEdgeEvent<TEvKeyValue::TEvRequest>(); UNIT_ASSERT(event != nullptr); - UNIT_ASSERT_VALUES_EQUAL(event->Record.CmdReadSize(), 1); + UNIT_ASSERT_VALUES_EQUAL(event->Record.CmdReadSize(), 2); } -void TUserActionProcessorFixture::SendMetaReadResponse() +void TUserActionProcessorFixture::SendMetaReadResponse(TMaybe<ui64> step, TMaybe<ui64> txId) { auto event = MakeHolder<TEvKeyValue::TEvResponse>(); event->Record.SetStatus(NMsgBusProxy::MSTATUS_OK); + // + // NKikimrPQ::TPartitionMeta + // auto read = event->Record.AddReadResult(); read->SetStatus(NKikimrProto::NODATA); + // + // NKikimrPQ::TPartitionTxMeta + // + read = event->Record.AddReadResult(); + if (step.Defined() || txId.Defined()) { + NKikimrPQ::TPartitionTxMeta meta; + + if (step.Defined()) { + meta.SetPlanStep(*step); + } + if (txId.Defined()) { + meta.SetTxId(*txId); + } + + TString out; + Y_PROTOBUF_SUPPRESS_NODISCARD meta.SerializeToString(&out); + + read->SetStatus(NKikimrProto::OK); + read->SetValue(out); + } else { + read->SetStatus(NKikimrProto::NODATA); + } + Ctx->Runtime->SingleSys()->Send(new IEventHandle(ActorId, Ctx->Edge, event.Release())); } @@ -385,13 +501,39 @@ void TUserActionProcessorFixture::WaitInfoRangeRequest() UNIT_ASSERT_VALUES_EQUAL(event->Record.CmdReadRangeSize(), 1); } -void TUserActionProcessorFixture::SendInfoRangeResponse() +void TUserActionProcessorFixture::SendInfoRangeResponse(ui32 partition, + const TVector<TCreateConsumerParams>& consumers) { auto event = MakeHolder<TEvKeyValue::TEvResponse>(); event->Record.SetStatus(NMsgBusProxy::MSTATUS_OK); auto read = event->Record.AddReadRangeResult(); - read->SetStatus(NKikimrProto::NODATA); + if (consumers.empty()) { + read->SetStatus(NKikimrProto::NODATA); + } else { + read->SetStatus(NKikimrProto::OK); + + for (auto& c : consumers) { + auto pair = read->AddPair(); + pair->SetStatus(NKikimrProto::OK); + + NPQ::TKeyPrefix key(NPQ::TKeyPrefix::TypeInfo, partition, NPQ::TKeyPrefix::MarkUser); + key.Append(c.Consumer.data(), c.Consumer.size()); + pair->SetKey(key.Data(), key.Size()); + + NKikimrPQ::TUserInfo userInfo; + userInfo.SetOffset(c.Offset); + userInfo.SetGeneration(c.Generation); + userInfo.SetStep(c.Step); + userInfo.SetSession(c.Session); + userInfo.SetOffsetRewindSum(c.OffsetRewindSum); + userInfo.SetReadRuleGeneration(c.ReadRuleGeneration); + + TString out; + Y_PROTOBUF_SUPPRESS_NODISCARD userInfo.SerializeToString(&out); + pair->SetValue(out); + } + } Ctx->Runtime->SingleSys()->Send(new IEventHandle(ActorId, Ctx->Edge, event.Release())); } @@ -462,13 +604,83 @@ void TUserActionProcessorFixture::WaitProposeTransactionResponse(const TProposeT } } +void TUserActionProcessorFixture::SendCalcPredicate(ui64 step, + ui64 txId, + const TString& consumer, + ui64 begin, + ui64 end) +{ + auto event = MakeHolder<TEvPQ::TEvTxCalcPredicate>(step, txId); + event->AddOperation(consumer, begin, end); + + Ctx->Runtime->SingleSys()->Send(new IEventHandle(ActorId, Ctx->Edge, event.Release())); +} + +void TUserActionProcessorFixture::WaitCalcPredicateResult(const TCalcPredicateMatcher& matcher) +{ + auto event = Ctx->Runtime->GrabEdgeEvent<TEvPQ::TEvTxCalcPredicateResult>(); + UNIT_ASSERT(event != nullptr); + + if (matcher.Step) { + UNIT_ASSERT_VALUES_EQUAL(*matcher.Step, event->Step); + } + if (matcher.TxId) { + UNIT_ASSERT_VALUES_EQUAL(*matcher.TxId, event->TxId); + } + if (matcher.Partition) { + UNIT_ASSERT_VALUES_EQUAL(*matcher.Partition, event->Partition); + } + if (matcher.Predicate) { + UNIT_ASSERT_VALUES_EQUAL(*matcher.Predicate, event->Predicate); + } +} + +void TUserActionProcessorFixture::SendCommitTx(ui64 step, ui64 txId) +{ + auto event = MakeHolder<TEvPQ::TEvTxCommit>(step, txId); + Ctx->Runtime->SingleSys()->Send(new IEventHandle(ActorId, Ctx->Edge, event.Release())); +} + +void TUserActionProcessorFixture::SendRollbackTx(ui64 step, ui64 txId) +{ + auto event = MakeHolder<TEvPQ::TEvTxRollback>(step, txId); + Ctx->Runtime->SingleSys()->Send(new IEventHandle(ActorId, Ctx->Edge, event.Release())); +} + +void TUserActionProcessorFixture::WaitCommitTxDone(const TCommitTxDoneMatcher& matcher) +{ + auto event = Ctx->Runtime->GrabEdgeEvent<TEvPQ::TEvTxCommitDone>(); + UNIT_ASSERT(event != nullptr); + + if (matcher.Step) { + UNIT_ASSERT_VALUES_EQUAL(*matcher.Step, event->Step); + } + if (matcher.TxId) { + UNIT_ASSERT_VALUES_EQUAL(*matcher.TxId, event->TxId); + } + if (matcher.Partition) { + UNIT_ASSERT_VALUES_EQUAL(*matcher.Partition, event->Partition); + } +} + +TTransaction TUserActionProcessorFixture::MakeTransaction(ui64 step, ui64 txId, + TString consumer, + ui64 begin, ui64 end, + TMaybe<bool> predicate) +{ + auto event = MakeSimpleShared<TEvPQ::TEvTxCalcPredicate>(step, txId); + event->AddOperation(std::move(consumer), begin, end); + + return TTransaction(event, predicate); +} + Y_UNIT_TEST_F(Batching, TUserActionProcessorFixture) { CreatePartition(); SendCreateSession(4, "client-1", "session-id-1", 2, 3); - WaitCmdWrite(2, {{0, {.Session = "session-id-1", .Offset=0, .Generation=2, .Step=3}}}); + WaitCmdWrite({.Count=2, .UserInfos={{0, {.Session = "session-id-1", .Offset=0, .Generation=2, .Step=3}}}}); SendCreateSession(5, "client-2", "session-id-2", 4, 5); SendCreateSession(6, "client-3", "session-id-3", 6, 7); @@ -477,10 +689,10 @@ Y_UNIT_TEST_F(Batching, TUserActionProcessorFixture) WaitProxyResponse({.Cookie=4}); - WaitCmdWrite(4, { + WaitCmdWrite({.Count=4, .UserInfos={ {0, {.Session = "session-id-2", .Offset=0, .Generation=4, .Step=5}}, {2, {.Session = "session-id-3", .Offset=0, .Generation=6, .Step=7}} - }); + }}); SendSetOffset(7, "client-1", 0, "session-id-1"); SendCreateSession(8, "client-1", "session-id-2", 8, 9); @@ -493,9 +705,9 @@ Y_UNIT_TEST_F(Batching, TUserActionProcessorFixture) WaitProxyResponse({.Cookie=5}); WaitProxyResponse({.Cookie=6}); - WaitCmdWrite(2, { + WaitCmdWrite({.Count=2, .UserInfos={ {0, {.Session = "session-id-2", .Offset=0, .Generation=8, .Step=9}}, - }); + }}); SendCmdWriteResponse(NMsgBusProxy::MSTATUS_OK); @@ -514,7 +726,7 @@ Y_UNIT_TEST_F(SetOffset, TUserActionProcessorFixture) const TString client = "client"; const TString session = "session"; - CreatePartition(partition, begin, end); + CreatePartition({.Partition=partition, .Begin=begin, .End=end}); // // create session @@ -525,7 +737,7 @@ Y_UNIT_TEST_F(SetOffset, TUserActionProcessorFixture) // regular commit (5 <= end) // SendSetOffset(1, client, 5, session); - WaitCmdWrite(2, {{0, {.Session=session, .Offset=5}}}); + WaitCmdWrite({.Count=2, .UserInfos={{0, {.Session=session, .Offset=5}}}}); SendCmdWriteResponse(NMsgBusProxy::MSTATUS_OK); WaitProxyResponse({.Cookie=1, .Status=NMsgBusProxy::MSTATUS_OK}); @@ -539,7 +751,7 @@ Y_UNIT_TEST_F(SetOffset, TUserActionProcessorFixture) // commit to back (1 < 5) // SendSetOffset(3, client, 1, session); - WaitCmdWrite(2, {{0, {.Session=session, .Offset=5}}}); + WaitCmdWrite({.Count=2, .UserInfos={{0, {.Session=session, .Offset=5}}}}); SendCmdWriteResponse(NMsgBusProxy::MSTATUS_OK); WaitProxyResponse({.Cookie=3, .Status=NMsgBusProxy::MSTATUS_OK}); @@ -553,7 +765,7 @@ Y_UNIT_TEST_F(SetOffset, TUserActionProcessorFixture) // commit to future (13 > end) // SendSetOffset(5, client, 13, session); - WaitCmdWrite(2, {{0, {.Session=session, .Offset=end}}}); + WaitCmdWrite({.Count=2, .UserInfos={{0, {.Session=session, .Offset=end}}}}); SendCmdWriteResponse(NMsgBusProxy::MSTATUS_OK); WaitProxyResponse({.Cookie=5, .Status=NMsgBusProxy::MSTATUS_OK}); } @@ -566,7 +778,7 @@ Y_UNIT_TEST_F(CommitOffsetRanges, TUserActionProcessorFixture) const TString client = "client"; const TString session = "session"; - CreatePartition(partition, begin, end); + CreatePartition({.Partition=partition, .Begin=begin, .End=end}); // // create session @@ -579,7 +791,7 @@ Y_UNIT_TEST_F(CommitOffsetRanges, TUserActionProcessorFixture) "topic-path", true, 1); - WaitCmdWrite(2, {{0, {.Session="", .Offset=2}}}); + WaitCmdWrite({.Count=2, .UserInfos={{0, {.Session="", .Offset=2}}}}); SendProposeTransactionRequest(partition, 2, 0, // begin > end @@ -615,7 +827,7 @@ Y_UNIT_TEST_F(CommitOffsetRanges, TUserActionProcessorFixture) SendCmdWriteResponse(NMsgBusProxy::MSTATUS_OK); WaitProposeTransactionResponse({.TxId=1, .Status=NKikimrPQ::TEvProposeTransactionResult::COMPLETE}); - WaitCmdWrite(2, {{0, {.Session="", .Offset=4}}}); + WaitCmdWrite({.Count=2, .UserInfos={{0, {.Session="", .Offset=4}}}}); SendCmdWriteResponse(NMsgBusProxy::MSTATUS_OK); WaitProposeTransactionResponse({.TxId=2, .Status=NKikimrPQ::TEvProposeTransactionResult::BAD_REQUEST}); @@ -628,6 +840,226 @@ Y_UNIT_TEST_F(CommitOffsetRanges, TUserActionProcessorFixture) WaitProxyResponse({.Cookie=6, .Offset=4}); } +Y_UNIT_TEST_F(CorrectRange_Commit, TUserActionProcessorFixture) +{ + const ui32 partition = 3; + const ui64 begin = 0; + const ui64 end = 10; + const TString client = "client"; + const TString session = "session"; + + const ui64 step = 12345; + const ui64 txId = 67890; + + CreatePartition({.Partition=partition, .Begin=begin, .End=end, .PlanStep=step, .TxId=10000}); + CreateSession(client, session); + + SendCalcPredicate(step, txId, client, 0, 2); + WaitCalcPredicateResult({.Step=step, .TxId=txId, .Partition=partition, .Predicate=true}); + + SendCommitTx(step, txId); + + WaitCmdWrite({.Count=3, .PlanStep=step, .TxId=txId, .UserInfos={{1, {.Session="", .Offset=2}}}}); + SendCmdWriteResponse(NMsgBusProxy::MSTATUS_OK); + + WaitCommitTxDone({.TxId=txId, .Partition=partition}); +} + +Y_UNIT_TEST_F(CorrectRange_Multiple_Transactions, TUserActionProcessorFixture) +{ + const ui32 partition = 3; + const ui64 begin = 0; + const ui64 end = 10; + const TString client = "client"; + const TString session = "session"; + + const ui64 step = 12345; + const ui64 txId_1 = 67890; + const ui64 txId_2 = 67891; + const ui64 txId_3 = 67892; + + CreatePartition({.Partition=partition, .Begin=begin, .End=end, .PlanStep=step, .TxId=10000}); + CreateSession(client, session); + + SendCalcPredicate(step, txId_1, client, 0, 1); + WaitCalcPredicateResult({.Step=step, .TxId=txId_1, .Partition=partition, .Predicate=true}); + + SendCalcPredicate(step, txId_2, client, 0, 2); + SendCalcPredicate(step, txId_3, client, 0, 2); + + SendCommitTx(step, txId_1); + + WaitCalcPredicateResult({.Step=step, .TxId=txId_2, .Partition=partition, .Predicate=false}); + SendRollbackTx(step, txId_2); + + WaitCalcPredicateResult({.Step=step, .TxId=txId_3, .Partition=partition, .Predicate=false}); + SendRollbackTx(step, txId_3); + + WaitCmdWrite({.Count=3, .PlanStep=step, .TxId=txId_3, .UserInfos={{1, {.Session="", .Offset=1}}}}); + SendCmdWriteResponse(NMsgBusProxy::MSTATUS_OK); + + WaitCommitTxDone({.TxId=txId_1, .Partition=partition}); +} + +Y_UNIT_TEST_F(CorrectRange_Multiple_Consumers, TUserActionProcessorFixture) +{ + const ui32 partition = 3; + const ui64 begin = 0; + const ui64 end = 10; + + const ui64 step = 12345; + const ui64 txId = 67890; + + CreatePartition({.Partition=partition, .Begin=begin, .End=end}); + CreateSession("client-1", "session-1"); + CreateSession("client-2", "session-2"); + + SendSetOffset(1, "client-1", 3, "session-1"); + SendCalcPredicate(step, txId, "client-2", 0, 1); + SendSetOffset(2, "client-1", 6, "session-1"); + + WaitCmdWrite({.Count=2, .UserInfos={{0, {.Session="session-1", .Offset=3}}}}); + SendCmdWriteResponse(NMsgBusProxy::MSTATUS_OK); + + WaitProxyResponse({.Cookie=1, .Status=NMsgBusProxy::MSTATUS_OK}); + + WaitCalcPredicateResult({.Step=step, .TxId=txId, .Partition=partition, .Predicate=true}); + SendCommitTx(step, txId); + + WaitCmdWrite({.Count=5, .UserInfos={ + {1, {.Session="", .Offset=1}}, + {3, {.Session="session-1", .Offset=6}} + }}); +} + +Y_UNIT_TEST_F(OldPlanStep, TUserActionProcessorFixture) +{ + const ui32 partition = 3; + const ui64 begin = 0; + const ui64 end = 10; + + const ui64 step = 12345; + const ui64 txId = 67890; + + CreatePartition({.Partition=partition, .Begin=begin, .End=end, .PlanStep=99999, .TxId=55555}); + + SendCommitTx(step, txId); + WaitCommitTxDone({.TxId=txId, .Partition=partition}); +} + +Y_UNIT_TEST_F(AfterRestart_1, TUserActionProcessorFixture) +{ + const ui32 partition = 3; + const ui64 begin = 0; + const ui64 end = 10; + const TString consumer = "client"; + const TString session = "session"; + + const ui64 step = 12345; + + TVector<TTransaction> txs; + txs.push_back(MakeTransaction(step, 11111, consumer, 0, 2, true)); + txs.push_back(MakeTransaction(step, 22222, consumer, 2, 4)); + + CreatePartition({.Partition=partition, + .Begin=begin, + .End=end, + .PlanStep=step, .TxId=10000, + .Transactions=std::move(txs)}, + {{.Consumer=consumer, .Offset=0, .Session=session}}); + + SendCommitTx(step, 11111); + + WaitCalcPredicateResult({.Step=step, .TxId=22222, .Partition=partition, .Predicate=true}); + SendCommitTx(step, 22222); + + WaitCmdWrite({.Count=3, .PlanStep=step, .TxId=22222, .UserInfos={{1, {.Session="", .Offset=4}}}}); +} + +Y_UNIT_TEST_F(AfterRestart_2, TUserActionProcessorFixture) +{ + const ui32 partition = 3; + const ui64 begin = 0; + const ui64 end = 10; + const TString consumer = "client"; + const TString session = "session"; + + const ui64 step = 12345; + + TVector<TTransaction> txs; + txs.push_back(MakeTransaction(step, 11111, consumer, 0, 2)); + txs.push_back(MakeTransaction(step, 22222, consumer, 2, 4)); + + CreatePartition({.Partition=partition, + .Begin=begin, + .End=end, + .PlanStep=step, .TxId=10000, + .Transactions=std::move(txs)}, + {{.Consumer=consumer, .Offset=0, .Session=session}}); + + WaitCalcPredicateResult({.Step=step, .TxId=11111, .Partition=partition, .Predicate=true}); +} + +Y_UNIT_TEST_F(IncorrectRange, TUserActionProcessorFixture) +{ + const ui32 partition = 3; + const ui64 begin = 0; + const ui64 end = 10; + const TString client = "client"; + const TString session = "session"; + + const ui64 step = 12345; + ui64 txId = 67890; + + CreatePartition({.Partition=partition, .Begin=begin, .End=end}); + CreateSession(client, session); + + SendCalcPredicate(step, txId, client, 4, 2); + WaitCalcPredicateResult({.Step=step, .TxId=txId, .Partition=partition, .Predicate=false}); + SendRollbackTx(step, txId); + + WaitCmdWrite({.Count=1, .PlanStep=step, .TxId=txId}); + SendCmdWriteResponse(NMsgBusProxy::MSTATUS_OK); + + ++txId; + + SendCalcPredicate(step, txId, client, 2, 4); + WaitCalcPredicateResult({.Step=step, .TxId=txId, .Partition=partition, .Predicate=false}); + SendRollbackTx(step, txId); + + WaitCmdWrite({.Count=1, .PlanStep=step, .TxId=txId}); + SendCmdWriteResponse(NMsgBusProxy::MSTATUS_OK); + + ++txId; + + SendCalcPredicate(step, txId, client, 0, 11); + WaitCalcPredicateResult({.Step=step, .TxId=txId, .Partition=partition, .Predicate=false}); +} + +Y_UNIT_TEST_F(CorrectRange_Rollback, TUserActionProcessorFixture) +{ + const ui32 partition = 3; + const ui64 begin = 0; + const ui64 end = 10; + const TString client = "client"; + const TString session = "session"; + + const ui64 step = 12345; + const ui64 txId_1 = 67890; + const ui64 txId_2 = 67891; + + CreatePartition({.Partition=partition, .Begin=begin, .End=end}); + CreateSession(client, session); + + SendCalcPredicate(step, txId_1, client, 0, 2); + WaitCalcPredicateResult({.Step=step, .TxId=txId_1, .Partition=partition, .Predicate=true}); + + SendCalcPredicate(step, txId_2, client, 0, 5); + SendRollbackTx(step, txId_1); + + WaitCalcPredicateResult({.Step=step, .TxId=txId_2, .Partition=partition, .Predicate=true}); +} + } // Y_UNIT_TEST_SUITE(TUserActionProcessorTests) } // namespace NKikimr::NPQ diff --git a/ydb/core/protos/pqconfig.proto b/ydb/core/protos/pqconfig.proto index 2345a6b759b..23545a9a76c 100644 --- a/ydb/core/protos/pqconfig.proto +++ b/ydb/core/protos/pqconfig.proto @@ -20,6 +20,11 @@ message TPartitionMeta { optional uint64 EndOffset = 2; } +message TPartitionTxMeta { + optional uint64 PlanStep = 1; + optional uint64 TxId = 2; +} + message TPQConfig { optional uint32 ACLRetryTimeoutSec = 1 [default = 300]; optional uint32 BalancerMetadataRetryTimeoutSec = 2 [default = 240]; |