aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorabcdef <akotov@ydb.tech>2022-12-15 13:17:20 +0300
committerabcdef <akotov@ydb.tech>2022-12-15 13:17:20 +0300
commit079f43fbda3b5c26b969b5a84ce5933697b6a00a (patch)
tree6a6270dafd4b37be9502f3eb47d046ec79f70513
parent34ed13a68befbfbd4e21088c20d6019311d7202d (diff)
downloadydb-079f43fbda3b5c26b969b5a84ce5933697b6a00a.tar.gz
-rw-r--r--ydb/core/persqueue/events/internal.h76
-rw-r--r--ydb/core/persqueue/key.h3
-rw-r--r--ydb/core/persqueue/partition.cpp320
-rw-r--r--ydb/core/persqueue/partition.h46
-rw-r--r--ydb/core/persqueue/ut/user_action_processor_ut.cpp524
-rw-r--r--ydb/core/protos/pqconfig.proto5
6 files changed, 884 insertions, 90 deletions
diff --git a/ydb/core/persqueue/events/internal.h b/ydb/core/persqueue/events/internal.h
index 6d60b347d52..451b3d00e55 100644
--- a/ydb/core/persqueue/events/internal.h
+++ b/ydb/core/persqueue/events/internal.h
@@ -118,6 +118,11 @@ struct TEvPQ {
EvRequestPartitionStatus,
EvReaderEventArrived,
EvMetering,
+ EvTxCalcPredicate,
+ EvTxCalcPredicateResult,
+ EvTxCommit,
+ EvTxCommitDone,
+ EvTxRollback,
EvEnd
};
@@ -655,6 +660,77 @@ struct TEvPQ {
NPQ::EMeteringJson Type;
ui64 Quantity;
};
+
+ struct TEvTxCalcPredicate : public TEventLocal<TEvTxCalcPredicate, EvTxCalcPredicate> {
+ TEvTxCalcPredicate(ui64 step, ui64 txId) :
+ Step(step),
+ TxId(txId)
+ {
+ }
+
+ void AddOperation(TString consumer, ui64 begin, ui64 end) {
+ NKikimrPQ::TPartitionOperation operation;
+ operation.SetBegin(begin);
+ operation.SetEnd(end);
+ operation.SetConsumer(std::move(consumer));
+
+ Operations.push_back(std::move(operation));
+ }
+
+ ui64 Step;
+ ui64 TxId;
+ TVector<NKikimrPQ::TPartitionOperation> Operations;
+ };
+
+ struct TEvTxCalcPredicateResult : public TEventLocal<TEvTxCalcPredicateResult, EvTxCalcPredicateResult> {
+ TEvTxCalcPredicateResult(ui64 step, ui64 txId, ui32 partition, bool predicate) :
+ Step(step),
+ TxId(txId),
+ Partition(partition),
+ Predicate(predicate)
+ {
+ }
+
+ ui64 Step;
+ ui64 TxId;
+ ui32 Partition;
+ bool Predicate = false;
+ };
+
+ struct TEvTxCommit : public TEventLocal<TEvTxCommit, EvTxCommit> {
+ TEvTxCommit(ui64 step, ui64 txId) :
+ Step(step),
+ TxId(txId)
+ {
+ }
+
+ ui64 Step;
+ ui64 TxId;
+ };
+
+ struct TEvTxCommitDone : public TEventLocal<TEvTxCommitDone, EvTxCommitDone> {
+ TEvTxCommitDone(ui64 step, ui64 txId, ui32 partition) :
+ Step(step),
+ TxId(txId),
+ Partition(partition)
+ {
+ }
+
+ ui64 Step;
+ ui64 TxId;
+ ui32 Partition;
+ };
+
+ struct TEvTxRollback : public TEventLocal<TEvTxRollback, EvTxRollback> {
+ TEvTxRollback(ui64 step, ui64 txId) :
+ Step(step),
+ TxId(txId)
+ {
+ }
+
+ ui64 Step;
+ ui64 TxId;
+ };
};
} //NKikimr
diff --git a/ydb/core/persqueue/key.h b/ydb/core/persqueue/key.h
index 356c0cf6df5..760a5ccbcec 100644
--- a/ydb/core/persqueue/key.h
+++ b/ydb/core/persqueue/key.h
@@ -16,7 +16,8 @@ public:
TypeInfo = 'm',
TypeData = 'd',
TypeTmpData = 'x',
- TypeMeta = 'i'
+ TypeMeta = 'i',
+ TypeTxMeta = 'I'
};
enum EMark : char {
diff --git a/ydb/core/persqueue/partition.cpp b/ydb/core/persqueue/partition.cpp
index e85ce509bcf..4ccef73eda1 100644
--- a/ydb/core/persqueue/partition.cpp
+++ b/ydb/core/persqueue/partition.cpp
@@ -37,6 +37,17 @@ static const ui32 MAX_USERS = 1000;
static const ui64 SET_OFFSET_COOKIE = 1;
static const ui32 MAX_TXS = 1000;
+auto GetStepAndTxId(ui64 step, ui64 txId)
+{
+ return std::make_pair(step, txId);
+}
+
+template<class E>
+auto GetStepAndTxId(const E& event)
+{
+ return GetStepAndTxId(event.Step, event.TxId);
+}
+
struct TPartition::THasDataReq {
ui64 Num;
ui64 Offset;
@@ -392,10 +403,17 @@ void RequestInfoRange(const TActorContext& ctx, const TActorId& dst, ui32 partit
}
void RequestMetaRead(const TActorContext& ctx, const TActorId& dst, ui32 partition) {
+ auto addKey = [](NKikimrClient::TKeyValueRequest& request, TKeyPrefix::EType type, ui32 partition) {
+ auto read = request.AddCmdRead();
+ TKeyPrefix key{type, partition};
+ read->SetKey(key.Data(), key.Size());
+ };
+
THolder<TEvKeyValue::TEvRequest> request(new TEvKeyValue::TEvRequest);
- auto read = request->Record.AddCmdRead();
- TKeyPrefix key{TKeyPrefix::TypeMeta, partition};
- read->SetKey(key.Data(), key.Size());
+
+ addKey(request->Record, TKeyPrefix::TypeMeta, partition);
+ addKey(request->Record, TKeyPrefix::TypeTxMeta, partition);
+
ctx.Send(dst, request.Release());
}
@@ -460,7 +478,8 @@ void TPartition::FillReadFromTimestamps(const NKikimrPQ::TPQTabletConfig& config
TPartition::TPartition(ui64 tabletId, ui32 partition, const TActorId& tablet, const TActorId& blobCache,
const NPersQueue::TTopicConverterPtr& topicConverter, bool isLocalDC, TString dcId, bool isServerless,
const NKikimrPQ::TPQTabletConfig& config, const TTabletCountersBase& counters,
- bool newPartition)
+ bool newPartition,
+ TVector<TTransaction> distrTxs)
: TabletID(tabletId)
, Partition(partition)
, Config(config)
@@ -516,6 +535,12 @@ TPartition::TPartition(ui64 tabletId, ui32 partition, const TActorId& tablet, co
}
TabletCounters.Populate(counters);
+
+ if (!distrTxs.empty()) {
+ std::move(distrTxs.begin(), distrTxs.end(),
+ std::back_inserter(DistrTxs));
+ TxInProgress = DistrTxs.front().Predicate.Defined();
+ }
}
void TPartition::HandleMonitoring(TEvPQ::TEvMonRequest::TPtr& ev, const TActorContext& ctx) {
@@ -1392,21 +1417,14 @@ void TPartition::HandleGetDiskStatus(const NKikimrClient::TResponse& response, c
RequestMetaRead(ctx, Tablet, Partition);
}
-void TPartition::HandleMetaRead(const NKikimrClient::TKeyValueResponse::TReadResult& response, const TActorContext& ctx) {
- NKikimrPQ::TPartitionMeta meta;
- switch (response.GetStatus()) {
- case NKikimrProto::OK: {
- bool res = meta.ParseFromString(response.GetValue());
- Y_VERIFY(res);
- /* Bring back later, when switch to 21-2 will be unable
- StartOffset = meta.GetStartOffset();
- EndOffset = meta.GetEndOffset();
- if (StartOffset == EndOffset) {
- NewHead.Offset = Head.Offset = EndOffset;
- }
- */
+void TPartition::HandleMetaRead(const NKikimrClient::TResponse& response, const TActorContext& ctx)
+{
+ auto handleReadResult = [&](const NKikimrClient::TKeyValueResponse::TReadResult& response,
+ auto&& action) {
+ switch (response.GetStatus()) {
+ case NKikimrProto::OK:
+ action(response);
break;
- }
case NKikimrProto::NODATA:
break;
case NKikimrProto::ERROR:
@@ -1419,14 +1437,41 @@ void TPartition::HandleMetaRead(const NKikimrClient::TKeyValueResponse::TReadRes
default:
Cerr << "ERROR " << response.GetStatus() << "\n";
Y_FAIL("bad status");
+ };
+ };
+
+ auto loadMeta = [](const NKikimrClient::TKeyValueResponse::TReadResult& response) {
+ NKikimrPQ::TPartitionMeta meta;
+ bool res = meta.ParseFromString(response.GetValue());
+ Y_VERIFY(res);
+ /* Bring back later, when switch to 21-2 will be unable
+ StartOffset = meta.GetStartOffset();
+ EndOffset = meta.GetEndOffset();
+ if (StartOffset == EndOffset) {
+ NewHead.Offset = Head.Offset = EndOffset;
+ }
+ */
};
+ handleReadResult(response.GetReadResult(0), loadMeta);
+
+ auto loadTxMeta = [this](const NKikimrClient::TKeyValueResponse::TReadResult& response) {
+ NKikimrPQ::TPartitionTxMeta meta;
+ bool res = meta.ParseFromString(response.GetValue());
+ Y_VERIFY(res);
+
+ if (meta.HasPlanStep()) {
+ PlanStep = meta.GetPlanStep();
+ }
+ if (meta.HasTxId()) {
+ TxId = meta.GetTxId();
+ }
+ };
+ handleReadResult(response.GetReadResult(1), loadTxMeta);
InitState = WaitInfoRange;
RequestInfoRange(ctx, Tablet, Partition, "");
}
-
-
void TPartition::HandleInfoRangeRead(const NKikimrClient::TKeyValueResponse::TReadRangeResult& range, const TActorContext& ctx) {
//megaqc check here all results
Y_VERIFY(range.HasStatus());
@@ -1686,8 +1731,8 @@ void TPartition::HandleOnInit(TEvKeyValue::TEvResponse::TPtr& ev, const TActorCo
HandleGetDiskStatus(response, ctx);
break;
case WaitMetaRead:
- Y_VERIFY(response.ReadResultSize() == 1);
- HandleMetaRead(response.GetReadResult(0), ctx);
+ Y_VERIFY(response.ReadResultSize() == 2);
+ HandleMetaRead(response, ctx);
break;
case WaitInfoRange:
Y_VERIFY(response.ReadRangeResultSize() == 1);
@@ -2269,6 +2314,77 @@ void TPartition::Handle(TEvPersQueue::TEvProposeTransaction::TPtr& ev, const TAc
ProcessTxsAndUserActs(ctx);
}
+void TPartition::Handle(TEvPQ::TEvTxCalcPredicate::TPtr& ev, const TActorContext& ctx)
+{
+ AddDistrTx(ev->Release());
+
+ ProcessTxsAndUserActs(ctx);
+}
+
+void TPartition::Handle(TEvPQ::TEvTxCommit::TPtr& ev, const TActorContext& ctx)
+{
+ if (PlanStep.Defined() && TxId.Defined()) {
+ if (GetStepAndTxId(*ev->Get()) <= GetStepAndTxId(*PlanStep, *TxId)) {
+ ctx.Send(Tablet, MakeCommitDone(ev->Get()->Step, ev->Get()->TxId).Release());
+ return;
+ }
+ }
+
+ Y_VERIFY(TxInProgress);
+
+ Y_VERIFY(!DistrTxs.empty());
+ TTransaction& t = DistrTxs.front();
+
+ Y_VERIFY(GetStepAndTxId(*ev->Get()) == GetStepAndTxId(*t.Tx));
+ Y_VERIFY(t.Predicate.Defined() && *t.Predicate);
+
+ for (auto& operation : t.Tx->Operations) {
+ TUserInfo& userInfo = GetOrCreatePendingUser(operation.GetConsumer(), ctx);
+
+ Y_VERIFY(userInfo.Offset == (i64)operation.GetBegin());
+
+ userInfo.Offset = operation.GetEnd();
+ userInfo.Session = "";
+ }
+
+ PlanStep = t.Tx->Step;
+ TxId = t.Tx->TxId;
+ TxIdHasChanged = true;
+
+ ScheduleReplyCommitDone(t.Tx->Step, t.Tx->TxId);
+
+ RemoveDistrTx();
+ TxInProgress = false;
+
+ ContinueProcessTxsAndUserActs(ctx);
+}
+
+void TPartition::Handle(TEvPQ::TEvTxRollback::TPtr& ev, const TActorContext& ctx)
+{
+ if (PlanStep.Defined() && TxId.Defined()) {
+ if (GetStepAndTxId(*ev->Get()) <= GetStepAndTxId(*PlanStep, *TxId)) {
+ return;
+ }
+ }
+
+ Y_VERIFY(TxInProgress);
+
+ Y_VERIFY(!DistrTxs.empty());
+ TTransaction& t = DistrTxs.front();
+
+ Y_VERIFY(GetStepAndTxId(*ev->Get()) == GetStepAndTxId(*t.Tx));
+ Y_VERIFY(t.Predicate.Defined());
+
+ PlanStep = t.Tx->Step;
+ TxId = t.Tx->TxId;
+ TxIdHasChanged = true;
+
+ RemoveDistrTx();
+ TxInProgress = false;
+
+ ContinueProcessTxsAndUserActs(ctx);
+}
+
void TPartition::Handle(TEvPQ::TEvSetClientInfo::TPtr& ev, const TActorContext& ctx) {
if (size_t count = GetUserActCount(ev->Get()->ClientId); count > MAX_USER_ACTS) {
TabletCounters.Cumulative()[COUNTER_PQ_SET_CLIENT_OFFSET_ERROR].Increment(1);
@@ -3604,9 +3720,17 @@ void TPartition::HandleSetOffsetResponse(ui64 cookie, const TActorContext& ctx)
AffectedUsers.clear();
UsersInfoWriteInProgress = false;
+
+ TxIdHasChanged = false;
+
ProcessTxsAndUserActs(ctx);
}
+void TPartition::AddDistrTx(TSimpleSharedPtr<TEvPQ::TEvTxCalcPredicate> event)
+{
+ DistrTxs.emplace_back(std::move(event));
+}
+
void TPartition::AddImmediateTx(TSimpleSharedPtr<TEvPersQueue::TEvProposeTransaction> tx)
{
ImmediateTxs.push_back(std::move(tx));
@@ -3651,7 +3775,7 @@ size_t TPartition::GetUserActCount(const TString& consumer) const
void TPartition::ProcessTxsAndUserActs(const TActorContext& ctx)
{
- if (UsersInfoWriteInProgress || (ImmediateTxs.empty() && UserActs.empty())) {
+ if (UsersInfoWriteInProgress || (ImmediateTxs.empty() && UserActs.empty() && DistrTxs.empty()) || TxInProgress) {
return;
}
@@ -3659,35 +3783,101 @@ void TPartition::ProcessTxsAndUserActs(const TActorContext& ctx)
Y_VERIFY(Replies.empty());
Y_VERIFY(AffectedUsers.empty());
+ ContinueProcessTxsAndUserActs(ctx);
+}
+
+void TPartition::ContinueProcessTxsAndUserActs(const TActorContext& ctx)
+{
+ if (!DistrTxs.empty()) {
+ ProcessDistrTxs(ctx);
+
+ if (!DistrTxs.empty()) {
+ return;
+ }
+ }
+
ProcessUserActs(ctx);
ProcessImmediateTxs(ctx);
THolder<TEvKeyValue::TEvRequest> request(new TEvKeyValue::TEvRequest);
-
request->Record.SetCookie(SET_OFFSET_COOKIE);
- for (auto& user : AffectedUsers) {
- TKeyPrefix ikey(TKeyPrefix::TypeInfo, Partition, TKeyPrefix::MarkUser);
- ikey.Append(user.c_str(), user.size());
- TKeyPrefix ikeyDeprecated(TKeyPrefix::TypeInfo, Partition, TKeyPrefix::MarkUserDeprecated);
- ikeyDeprecated.Append(user.c_str(), user.size());
-
- if (TUserInfo* userInfo = GetPendingUserIfExists(user)) {
- AddCmdWrite(request->Record,
- ikey, ikeyDeprecated,
- userInfo->Offset, userInfo->Generation, userInfo->Step, userInfo->Session,
- userInfo->ReadOffsetRewindSum,
- userInfo->ReadRuleGeneration);
- } else {
- AddCmdDeleteRange(request->Record,
- ikey, ikeyDeprecated);
- }
+ if (TxIdHasChanged) {
+ AddCmdWriteTxMeta(request->Record,
+ *PlanStep, *TxId);
}
+ AddCmdWriteUserInfos(request->Record);
ctx.Send(Tablet, request.Release());
UsersInfoWriteInProgress = true;
}
+void TPartition::RemoveDistrTx()
+{
+ Y_VERIFY(!DistrTxs.empty());
+
+ DistrTxs.pop_front();
+}
+
+void TPartition::ProcessDistrTxs(const TActorContext& ctx)
+{
+ Y_VERIFY(!TxInProgress);
+
+ while (!TxInProgress && !DistrTxs.empty()) {
+ ProcessDistrTx(ctx);
+ }
+}
+
+void TPartition::ProcessDistrTx(const TActorContext& ctx)
+{
+ Y_VERIFY(!TxInProgress);
+
+ Y_VERIFY(!DistrTxs.empty());
+ TTransaction& t = DistrTxs.front();
+
+ bool predicate = true;
+
+ for (auto& operation : t.Tx->Operations) {
+ const TString& consumer = operation.GetConsumer();
+
+ if (!UsersInfoStorage.GetIfExists(consumer)) {
+ predicate = false;
+ break;
+ }
+
+ bool isAffectedConsumer = AffectedUsers.contains(consumer);
+ TUserInfo& userInfo = GetOrCreatePendingUser(consumer, ctx);
+
+ if (operation.GetBegin() > operation.GetEnd()) {
+ // BAD_REQUEST
+ predicate = false;
+ } else if (userInfo.Offset != (i64)operation.GetBegin()) {
+ // ABORTED
+ predicate = false;
+ } else if (operation.GetEnd() > EndOffset) {
+ // BAD_REQUEST
+ predicate = false;
+ }
+
+ if (!predicate) {
+ if (!isAffectedConsumer) {
+ AffectedUsers.erase(consumer);
+ }
+ break;
+ }
+ }
+
+ t.Predicate = predicate;
+
+ auto response = MakeHolder<TEvPQ::TEvTxCalcPredicateResult>(t.Tx->Step,
+ t.Tx->TxId,
+ Partition,
+ predicate);
+ ctx.Send(Tablet, response.Release());
+
+ TxInProgress = true;
+}
+
void TPartition::ProcessImmediateTxs(const TActorContext& ctx)
{
Y_VERIFY(!UsersInfoWriteInProgress);
@@ -3973,6 +4163,12 @@ void TPartition::ScheduleReplyPropose(const NKikimrPQ::TEvProposeTransaction& ev
statusCode).Release());
}
+void TPartition::ScheduleReplyCommitDone(ui64 step, ui64 txId)
+{
+ Replies.emplace_back(Tablet,
+ MakeCommitDone(step, txId).Release());
+}
+
void TPartition::AddCmdDeleteRange(NKikimrClient::TKeyValueRequest& request,
const TKeyPrefix& ikey, const TKeyPrefix& ikeyDeprecated)
{
@@ -4026,6 +4222,45 @@ void TPartition::AddCmdWrite(NKikimrClient::TKeyValueRequest& request,
write->SetStorageChannel(NKikimrClient::TKeyValueRequest::INLINE);
}
+void TPartition::AddCmdWriteTxMeta(NKikimrClient::TKeyValueRequest& request,
+ ui64 step, ui64 txId)
+{
+ TKeyPrefix ikey(TKeyPrefix::TypeTxMeta, Partition);
+
+ NKikimrPQ::TPartitionTxMeta meta;
+ meta.SetPlanStep(step);
+ meta.SetTxId(txId);
+
+ TString out;
+ Y_PROTOBUF_SUPPRESS_NODISCARD meta.SerializeToString(&out);
+
+ auto write = request.AddCmdWrite();
+ write->SetKey(ikey.Data(), ikey.Size());
+ write->SetValue(out.c_str(), out.size());
+ write->SetStorageChannel(NKikimrClient::TKeyValueRequest::INLINE);
+}
+
+void TPartition::AddCmdWriteUserInfos(NKikimrClient::TKeyValueRequest& request)
+{
+ for (auto& user : AffectedUsers) {
+ TKeyPrefix ikey(TKeyPrefix::TypeInfo, Partition, TKeyPrefix::MarkUser);
+ ikey.Append(user.c_str(), user.size());
+ TKeyPrefix ikeyDeprecated(TKeyPrefix::TypeInfo, Partition, TKeyPrefix::MarkUserDeprecated);
+ ikeyDeprecated.Append(user.c_str(), user.size());
+
+ if (TUserInfo* userInfo = GetPendingUserIfExists(user)) {
+ AddCmdWrite(request,
+ ikey, ikeyDeprecated,
+ userInfo->Offset, userInfo->Generation, userInfo->Step, userInfo->Session,
+ userInfo->ReadOffsetRewindSum,
+ userInfo->ReadRuleGeneration);
+ } else {
+ AddCmdDeleteRange(request,
+ ikey, ikeyDeprecated);
+ }
+ }
+}
+
TUserInfo& TPartition::GetOrCreatePendingUser(const TString& user,
const TActorContext& ctx,
TMaybe<ui64> readRuleGeneration)
@@ -4125,6 +4360,11 @@ THolder<TEvPersQueue::TEvProposeTransactionResult> TPartition::MakeReplyPropose(
return response;
}
+THolder<TEvPQ::TEvTxCommitDone> TPartition::MakeCommitDone(ui64 step, ui64 txId)
+{
+ return MakeHolder<TEvPQ::TEvTxCommitDone>(step, txId, Partition);
+}
+
void TPartition::ScheduleUpdateAvailableSize(const TActorContext& ctx) {
ctx.Schedule(UPDATE_AVAIL_SIZE_INTERVAL, new TEvPQ::TEvUpdateAvailableSize());
}
diff --git a/ydb/core/persqueue/partition.h b/ydb/core/persqueue/partition.h
index 10d88373a80..1c0ed6d6b88 100644
--- a/ydb/core/persqueue/partition.h
+++ b/ydb/core/persqueue/partition.h
@@ -36,6 +36,19 @@ void CalcTopicWriteQuotaParams(const NKikimrPQ::TPQConfig& pqConfig,
class TKeyLevel;
struct TMirrorerInfo;
+struct TTransaction {
+ explicit TTransaction(TSimpleSharedPtr<TEvPQ::TEvTxCalcPredicate> tx,
+ TMaybe<bool> predicate = Nothing()) :
+ Tx(tx),
+ Predicate(predicate)
+ {
+ Y_VERIFY(Tx);
+ }
+
+ TSimpleSharedPtr<TEvPQ::TEvTxCalcPredicate> Tx;
+ TMaybe<bool> Predicate;
+};
+
class TPartition : public TActorBootstrapped<TPartition> {
private:
static const ui32 MAX_ERRORS_COUNT_TO_STORE = 10;
@@ -91,6 +104,9 @@ private:
void Handle(TEvPQ::TEvUpdateWriteTimestamp::TPtr& ev, const TActorContext& ctx);
void Handle(TEvPersQueue::TEvHasDataInfo::TPtr& ev, const TActorContext& ctx);
void Handle(TEvPersQueue::TEvProposeTransaction::TPtr& ev, const TActorContext& ctx);
+ void Handle(TEvPQ::TEvTxCalcPredicate::TPtr& ev, const TActorContext& ctx);
+ void Handle(TEvPQ::TEvTxCommit::TPtr& ev, const TActorContext& ctx);
+ void Handle(TEvPQ::TEvTxRollback::TPtr& ev, const TActorContext& ctx);
void Handle(TEvPersQueue::TEvReportPartitionError::TPtr& ev, const TActorContext& ctx);
void Handle(TEvQuota::TEvClearance::TPtr& ev, const TActorContext& ctx);
void Handle(TEvents::TEvPoisonPill::TPtr& ev, const TActorContext& ctx);
@@ -98,7 +114,7 @@ private:
void HandleDataRead(const NKikimrClient::TResponse& range, const TActorContext& ctx);
void HandleGetDiskStatus(const NKikimrClient::TResponse& res, const TActorContext& ctx);
void HandleInfoRangeRead(const NKikimrClient::TKeyValueResponse::TReadRangeResult& range, const TActorContext& ctx);
- void HandleMetaRead(const NKikimrClient::TKeyValueResponse::TReadResult& response, const TActorContext& ctx);
+ void HandleMetaRead(const NKikimrClient::TResponse& response, const TActorContext& ctx);
void HandleMonitoring(TEvPQ::TEvMonRequest::TPtr& ev, const TActorContext& ctx);
void HandleOnIdle(TEvPQ::TEvDeregisterMessageGroup::TPtr& ev, const TActorContext& ctx);
void HandleOnIdle(TEvPQ::TEvRegisterMessageGroup::TPtr& ev, const TActorContext& ctx);
@@ -176,6 +192,12 @@ private:
ui64 GetUsedStorage(const TActorContext& ctx);
void ProcessTxsAndUserActs(const TActorContext& ctx);
+ void ContinueProcessTxsAndUserActs(const TActorContext& ctx);
+
+ void AddDistrTx(TSimpleSharedPtr<TEvPQ::TEvTxCalcPredicate> event);
+ void RemoveDistrTx();
+ void ProcessDistrTxs(const TActorContext& ctx);
+ void ProcessDistrTx(const TActorContext& ctx);
void AddImmediateTx(TSimpleSharedPtr<TEvPersQueue::TEvProposeTransaction> event);
void RemoveImmediateTx();
@@ -203,12 +225,16 @@ private:
const TString& error);
void ScheduleReplyPropose(const NKikimrPQ::TEvProposeTransaction& event,
NKikimrPQ::TEvProposeTransactionResult::EStatus statusCode);
+ void ScheduleReplyCommitDone(ui64 step, ui64 txId);
void AddCmdWrite(NKikimrClient::TKeyValueRequest& request,
const TKeyPrefix& ikey, const TKeyPrefix& ikeyDeprecated,
ui64 offset, ui32 gen, ui32 step, const TString& session,
ui64 readOffsetRewindSum,
ui64 readRuleGeneration);
+ void AddCmdWriteTxMeta(NKikimrClient::TKeyValueRequest& request,
+ ui64 step, ui64 txId);
+ void AddCmdWriteUserInfos(NKikimrClient::TKeyValueRequest& request);
void AddCmdDeleteRange(NKikimrClient::TKeyValueRequest& request,
const TKeyPrefix& ikey, const TKeyPrefix& ikeyDeprecated);
@@ -224,6 +250,7 @@ private:
const TString& error);
THolder<TEvPersQueue::TEvProposeTransactionResult> MakeReplyPropose(const NKikimrPQ::TEvProposeTransaction& event,
NKikimrPQ::TEvProposeTransactionResult::EStatus statusCode);
+ THolder<TEvPQ::TEvTxCommitDone> MakeCommitDone(ui64 step, ui64 txId);
public:
static constexpr NKikimrServices::TActivity::EType ActorActivityType() {
@@ -233,7 +260,8 @@ public:
TPartition(ui64 tabletId, ui32 partition, const TActorId& tablet, const TActorId& blobCache,
const NPersQueue::TTopicConverterPtr& topicConverter, bool isLocalDC, TString dcId, bool isServerless,
const NKikimrPQ::TPQTabletConfig& config, const TTabletCountersBase& counters,
- bool newPartition = false);
+ bool newPartition = false,
+ TVector<TTransaction> distrTxs = {});
void Bootstrap(const TActorContext& ctx);
@@ -329,6 +357,9 @@ private:
HFuncTraced(TEvPQ::TEvDeregisterMessageGroup, HandleOnIdle);
HFuncTraced(TEvPQ::TEvSplitMessageGroup, HandleOnIdle);
HFuncTraced(TEvPersQueue::TEvProposeTransaction, Handle);
+ HFuncTraced(TEvPQ::TEvTxCalcPredicate, Handle);
+ HFuncTraced(TEvPQ::TEvTxCommit, Handle);
+ HFuncTraced(TEvPQ::TEvTxRollback, Handle);
default:
LOG_ERROR_S(ctx, NKikimrServices::PERSQUEUE, "Unexpected " << EventStr("StateIdle", ev));
@@ -378,6 +409,9 @@ private:
HFuncTraced(TEvPQ::TEvDeregisterMessageGroup, HandleOnWrite);
HFuncTraced(TEvPQ::TEvSplitMessageGroup, HandleOnWrite);
HFuncTraced(TEvPersQueue::TEvProposeTransaction, Handle);
+ HFuncTraced(TEvPQ::TEvTxCalcPredicate, Handle);
+ HFuncTraced(TEvPQ::TEvTxCommit, Handle);
+ HFuncTraced(TEvPQ::TEvTxRollback, Handle);
default:
LOG_ERROR_S(ctx, NKikimrServices::PERSQUEUE, "Unexpected " << EventStr("StateWrite", ev));
@@ -390,7 +424,8 @@ private:
WaitInfoRange,
WaitDataRange,
WaitDataRead,
- WaitMetaRead
+ WaitMetaRead,
+ WaitTxInfo
};
ui64 TabletID;
@@ -452,11 +487,16 @@ private:
//
std::deque<TSimpleSharedPtr<TEvPQ::TEvSetClientInfo>> UserActs;
std::deque<TSimpleSharedPtr<TEvPersQueue::TEvProposeTransaction>> ImmediateTxs;
+ std::deque<TTransaction> DistrTxs;
THashMap<TString, size_t> UserActCount;
THashMap<TString, TUserInfo> PendingUsersInfo;
TVector<std::pair<TActorId, std::unique_ptr<IEventBase>>> Replies;
THashSet<TString> AffectedUsers;
bool UsersInfoWriteInProgress = false;
+ bool TxInProgress = false;
+ TMaybe<ui64> PlanStep;
+ TMaybe<ui64> TxId;
+ bool TxIdHasChanged = false;
//
//
//
diff --git a/ydb/core/persqueue/ut/user_action_processor_ut.cpp b/ydb/core/persqueue/ut/user_action_processor_ut.cpp
index 5ea3c4b821e..275d163724d 100644
--- a/ydb/core/persqueue/ut/user_action_processor_ut.cpp
+++ b/ydb/core/persqueue/ut/user_action_processor_ut.cpp
@@ -23,6 +23,29 @@ namespace NKikimr::NPQ {
Y_UNIT_TEST_SUITE(TUserActionProcessorTests) {
+namespace NHelpers {
+
+struct TCreatePartitionParams {
+ ui32 Partition = 1;
+ ui64 Begin = 0;
+ ui64 End = 0;
+ TMaybe<ui64> PlanStep;
+ TMaybe<ui64> TxId;
+ TVector<TTransaction> Transactions;
+};
+
+struct TCreateConsumerParams {
+ TString Consumer;
+ ui64 Offset = 0;
+ ui32 Generation = 0;
+ ui32 Step = 0;
+ TString Session;
+ ui64 OffsetRewindSum = 0;
+ ui64 ReadRuleGeneration = 0;
+};
+
+}
+
class TUserActionProcessorFixture : public NUnitTest::TBaseFixture {
protected:
struct TUserInfoMatcher {
@@ -32,6 +55,13 @@ protected:
TMaybe<ui32> Step;
};
+ struct TCmdWriteMatcher {
+ TMaybe<size_t> Count;
+ TMaybe<ui64> PlanStep;
+ TMaybe<ui64> TxId;
+ THashMap<size_t, TUserInfoMatcher> UserInfos;
+ };
+
struct TProxyResponseMatcher {
TMaybe<ui64> Cookie;
TMaybe<NMsgBusProxy::EResponseStatus> Status;
@@ -50,11 +80,31 @@ protected:
TMaybe<NKikimrPQ::TEvProposeTransactionResult::EStatus> Status;
};
+ struct TCalcPredicateMatcher {
+ TMaybe<ui64> Step;
+ TMaybe<ui64> TxId;
+ TMaybe<ui32> Partition;
+ TMaybe<bool> Predicate;
+ };
+
+ struct TCommitTxDoneMatcher {
+ TMaybe<ui64> Step;
+ TMaybe<ui64> TxId;
+ TMaybe<ui32> Partition;
+ };
+
+ using TCreatePartitionParams = NHelpers::TCreatePartitionParams;
+ using TCreateConsumerParams = NHelpers::TCreateConsumerParams;
+
void SetUp(NUnitTest::TTestContext&) override;
void TearDown(NUnitTest::TTestContext&) override;
- void CreatePartitionActor(ui32 partition = 1, bool newPartition = true);
- void CreatePartition(ui32 partition = 1, ui64 begin = 0, ui64 end = 0);
+ void CreatePartitionActor(ui32 partition,
+ const TVector<TCreateConsumerParams>& consumers,
+ bool newPartition,
+ TVector<TTransaction> txs);
+ void CreatePartition(const TCreatePartitionParams& params = {},
+ const TVector<TCreateConsumerParams>& consumers = {});
void CreateSession(const TString& clientId,
const TString& sessionId,
@@ -77,8 +127,7 @@ protected:
const TString& sessionId);
void SendGetOffset(ui64 cookie,
const TString& clientId);
- void WaitCmdWrite(size_t count,
- const THashMap<size_t, TUserInfoMatcher>& matchers = {});
+ void WaitCmdWrite(const TCmdWriteMatcher& matcher = {});
void SendCmdWriteResponse(NMsgBusProxy::EResponseStatus status);
void WaitProxyResponse(const TProxyResponseMatcher &matcher = {});
void WaitErrorResponse(const TErrorMatcher& matcher = {});
@@ -86,9 +135,10 @@ protected:
void WaitDiskStatusRequest();
void SendDiskStatusResponse();
void WaitMetaReadRequest();
- void SendMetaReadResponse();
+ void SendMetaReadResponse(TMaybe<ui64> step, TMaybe<ui64> txId);
void WaitInfoRangeRequest();
- void SendInfoRangeResponse();
+ void SendInfoRangeResponse(ui32 partition,
+ const TVector<TCreateConsumerParams>& consumers);
void WaitDataRangeRequest();
void SendDataRangeResponse(ui64 begin, ui64 end);
void WaitDataReadRequest();
@@ -102,6 +152,22 @@ protected:
ui64 txId);
void WaitProposeTransactionResponse(const TProposeTransactionResponseMatcher& matcher = {});
+ void SendCalcPredicate(ui64 step,
+ ui64 txId,
+ const TString& consumer,
+ ui64 begin,
+ ui64 end);
+ void WaitCalcPredicateResult(const TCalcPredicateMatcher& matcher = {});
+
+ void SendCommitTx(ui64 step, ui64 txId);
+ void SendRollbackTx(ui64 step, ui64 txId);
+ void WaitCommitTxDone(const TCommitTxDoneMatcher& matcher = {});
+
+ TTransaction MakeTransaction(ui64 step, ui64 txId,
+ TString consumer,
+ ui64 begin, ui64 end,
+ TMaybe<bool> predicate = Nothing());
+
private:
TMaybe<TTestContext> Ctx;
TMaybe<TFinalizer> Finalizer;
@@ -122,7 +188,10 @@ void TUserActionProcessorFixture::TearDown(NUnitTest::TTestContext&)
{
}
-void TUserActionProcessorFixture::CreatePartitionActor(ui32 id, bool newPartition)
+void TUserActionProcessorFixture::CreatePartitionActor(ui32 id,
+ const TVector<TCreateConsumerParams>& consumers,
+ bool newPartition,
+ TVector<TTransaction> txs)
{
using TKeyValueCounters = TProtobufTabletCounters<
NKeyValue::ESimpleCounters_descriptor,
@@ -147,6 +216,10 @@ void TUserActionProcessorFixture::CreatePartitionActor(ui32 id, bool newPartitio
NPersQueue::TTopicConverterPtr topicConverter;
NKikimrPQ::TPQTabletConfig config;
+ for (auto& c : consumers) {
+ config.AddReadRules(c.Consumer);
+ }
+
config.SetTopicName("rt3.dc1--account--topic");
config.SetTopicPath("/Root/PQ/rt3.dc1--account--topic");
config.SetFederationAccount("account");
@@ -165,28 +238,30 @@ void TUserActionProcessorFixture::CreatePartitionActor(ui32 id, bool newPartitio
false,
config,
*tabletCounters,
- newPartition);
+ newPartition,
+ std::move(txs));
ActorId = Ctx->Runtime->Register(actor);
}
-void TUserActionProcessorFixture::CreatePartition(ui32 id, ui64 begin, ui64 end)
+void TUserActionProcessorFixture::CreatePartition(const TCreatePartitionParams& params,
+ const TVector<TCreateConsumerParams>& consumers)
{
- if ((begin == 0) && (end == 0)) {
- CreatePartitionActor(id, true);
+ if ((params.Begin == 0) && (params.End == 0)) {
+ CreatePartitionActor(params.Partition, consumers, true, {});
} else {
- CreatePartitionActor(id, false);
+ CreatePartitionActor(params.Partition, consumers, false, params.Transactions);
WaitDiskStatusRequest();
SendDiskStatusResponse();
WaitMetaReadRequest();
- SendMetaReadResponse();
+ SendMetaReadResponse(params.PlanStep, params.TxId);
WaitInfoRangeRequest();
- SendInfoRangeResponse();
+ SendInfoRangeResponse(params.Partition, consumers);
WaitDataRangeRequest();
- SendDataRangeResponse(begin, end);
+ SendDataRangeResponse(params.Begin, params.End);
}
}
@@ -196,7 +271,7 @@ void TUserActionProcessorFixture::CreateSession(const TString& clientId,
ui64 cookie)
{
SendCreateSession(cookie,clientId,sessionId, generation, step);
- WaitCmdWrite(2, {{0, {.Session = sessionId, .Offset = 0}}});
+ WaitCmdWrite({.Count=2, .UserInfos={{0, {.Session = sessionId, .Offset = 0}}}});
SendCmdWriteResponse(NMsgBusProxy::MSTATUS_OK);
WaitProxyResponse({.Cookie = cookie});
}
@@ -208,7 +283,7 @@ void TUserActionProcessorFixture::SetOffset(const TString& clientId,
ui64 cookie)
{
SendSetOffset(cookie, clientId, offset, sessionId);
- WaitCmdWrite(2, {{0, {.Session = sessionId, .Offset = (expected ? *expected : offset)}}});
+ WaitCmdWrite({.Count=2, .UserInfos={{0, {.Session = sessionId, .Offset = (expected ? *expected : offset)}}}});
SendCmdWriteResponse(NMsgBusProxy::MSTATUS_OK);
WaitProxyResponse({.Cookie = cookie});
}
@@ -251,36 +326,51 @@ void TUserActionProcessorFixture::SendGetOffset(ui64 cookie,
Ctx->Runtime->SingleSys()->Send(new IEventHandle(ActorId, Ctx->Edge, event.Release()));
}
-void TUserActionProcessorFixture::WaitCmdWrite(size_t count,
- const THashMap<size_t, TUserInfoMatcher>& matchers)
+void TUserActionProcessorFixture::WaitCmdWrite(const TCmdWriteMatcher& matcher)
{
auto event = Ctx->Runtime->GrabEdgeEvent<TEvKeyValue::TEvRequest>();
UNIT_ASSERT(event != nullptr);
UNIT_ASSERT_VALUES_EQUAL(event->Record.GetCookie(), 1); // SET_OFFSET_COOKIE
- UNIT_ASSERT_VALUES_EQUAL(event->Record.CmdWriteSize(), count);
- for (auto& [index, matcher] : matchers) {
- UNIT_ASSERT(index < count);
+ if (matcher.Count.Defined()) {
+ UNIT_ASSERT_VALUES_EQUAL(*matcher.Count, event->Record.CmdWriteSize());
+ }
+ if (matcher.PlanStep.Defined()) {
+ NKikimrPQ::TPartitionTxMeta meta;
+ UNIT_ASSERT(meta.ParseFromString(event->Record.GetCmdWrite(0).GetValue()));
+
+ UNIT_ASSERT_VALUES_EQUAL(*matcher.PlanStep, meta.GetPlanStep());
+ }
+ if (matcher.TxId.Defined()) {
+ NKikimrPQ::TPartitionTxMeta meta;
+ UNIT_ASSERT(meta.ParseFromString(event->Record.GetCmdWrite(0).GetValue()));
+
+ UNIT_ASSERT_VALUES_EQUAL(*matcher.TxId, meta.GetTxId());
+ }
+ for (auto& [index, userInfo] : matcher.UserInfos) {
+ if (matcher.Count.Defined()) {
+ UNIT_ASSERT(index < *matcher.Count);
+ }
NKikimrPQ::TUserInfo ud;
UNIT_ASSERT(ud.ParseFromString(event->Record.GetCmdWrite(index).GetValue()));
- if (matcher.Session) {
+ if (userInfo.Session) {
UNIT_ASSERT(ud.HasSession());
- UNIT_ASSERT_VALUES_EQUAL(*matcher.Session, ud.GetSession());
+ UNIT_ASSERT_VALUES_EQUAL(*userInfo.Session, ud.GetSession());
}
- if (matcher.Generation) {
+ if (userInfo.Generation) {
UNIT_ASSERT(ud.HasGeneration());
- UNIT_ASSERT_VALUES_EQUAL(*matcher.Generation, ud.GetGeneration());
+ UNIT_ASSERT_VALUES_EQUAL(*userInfo.Generation, ud.GetGeneration());
}
- if (matcher.Step) {
+ if (userInfo.Step) {
UNIT_ASSERT(ud.HasStep());
- UNIT_ASSERT_VALUES_EQUAL(*matcher.Step, ud.GetStep());
+ UNIT_ASSERT_VALUES_EQUAL(*userInfo.Step, ud.GetStep());
}
- if (matcher.Offset) {
+ if (userInfo.Offset) {
UNIT_ASSERT(ud.HasOffset());
- UNIT_ASSERT_VALUES_EQUAL(*matcher.Offset, ud.GetOffset());
+ UNIT_ASSERT_VALUES_EQUAL(*userInfo.Offset, ud.GetOffset());
}
}
}
@@ -363,17 +453,43 @@ void TUserActionProcessorFixture::WaitMetaReadRequest()
auto event = Ctx->Runtime->GrabEdgeEvent<TEvKeyValue::TEvRequest>();
UNIT_ASSERT(event != nullptr);
- UNIT_ASSERT_VALUES_EQUAL(event->Record.CmdReadSize(), 1);
+ UNIT_ASSERT_VALUES_EQUAL(event->Record.CmdReadSize(), 2);
}
-void TUserActionProcessorFixture::SendMetaReadResponse()
+void TUserActionProcessorFixture::SendMetaReadResponse(TMaybe<ui64> step, TMaybe<ui64> txId)
{
auto event = MakeHolder<TEvKeyValue::TEvResponse>();
event->Record.SetStatus(NMsgBusProxy::MSTATUS_OK);
+ //
+ // NKikimrPQ::TPartitionMeta
+ //
auto read = event->Record.AddReadResult();
read->SetStatus(NKikimrProto::NODATA);
+ //
+ // NKikimrPQ::TPartitionTxMeta
+ //
+ read = event->Record.AddReadResult();
+ if (step.Defined() || txId.Defined()) {
+ NKikimrPQ::TPartitionTxMeta meta;
+
+ if (step.Defined()) {
+ meta.SetPlanStep(*step);
+ }
+ if (txId.Defined()) {
+ meta.SetTxId(*txId);
+ }
+
+ TString out;
+ Y_PROTOBUF_SUPPRESS_NODISCARD meta.SerializeToString(&out);
+
+ read->SetStatus(NKikimrProto::OK);
+ read->SetValue(out);
+ } else {
+ read->SetStatus(NKikimrProto::NODATA);
+ }
+
Ctx->Runtime->SingleSys()->Send(new IEventHandle(ActorId, Ctx->Edge, event.Release()));
}
@@ -385,13 +501,39 @@ void TUserActionProcessorFixture::WaitInfoRangeRequest()
UNIT_ASSERT_VALUES_EQUAL(event->Record.CmdReadRangeSize(), 1);
}
-void TUserActionProcessorFixture::SendInfoRangeResponse()
+void TUserActionProcessorFixture::SendInfoRangeResponse(ui32 partition,
+ const TVector<TCreateConsumerParams>& consumers)
{
auto event = MakeHolder<TEvKeyValue::TEvResponse>();
event->Record.SetStatus(NMsgBusProxy::MSTATUS_OK);
auto read = event->Record.AddReadRangeResult();
- read->SetStatus(NKikimrProto::NODATA);
+ if (consumers.empty()) {
+ read->SetStatus(NKikimrProto::NODATA);
+ } else {
+ read->SetStatus(NKikimrProto::OK);
+
+ for (auto& c : consumers) {
+ auto pair = read->AddPair();
+ pair->SetStatus(NKikimrProto::OK);
+
+ NPQ::TKeyPrefix key(NPQ::TKeyPrefix::TypeInfo, partition, NPQ::TKeyPrefix::MarkUser);
+ key.Append(c.Consumer.data(), c.Consumer.size());
+ pair->SetKey(key.Data(), key.Size());
+
+ NKikimrPQ::TUserInfo userInfo;
+ userInfo.SetOffset(c.Offset);
+ userInfo.SetGeneration(c.Generation);
+ userInfo.SetStep(c.Step);
+ userInfo.SetSession(c.Session);
+ userInfo.SetOffsetRewindSum(c.OffsetRewindSum);
+ userInfo.SetReadRuleGeneration(c.ReadRuleGeneration);
+
+ TString out;
+ Y_PROTOBUF_SUPPRESS_NODISCARD userInfo.SerializeToString(&out);
+ pair->SetValue(out);
+ }
+ }
Ctx->Runtime->SingleSys()->Send(new IEventHandle(ActorId, Ctx->Edge, event.Release()));
}
@@ -462,13 +604,83 @@ void TUserActionProcessorFixture::WaitProposeTransactionResponse(const TProposeT
}
}
+void TUserActionProcessorFixture::SendCalcPredicate(ui64 step,
+ ui64 txId,
+ const TString& consumer,
+ ui64 begin,
+ ui64 end)
+{
+ auto event = MakeHolder<TEvPQ::TEvTxCalcPredicate>(step, txId);
+ event->AddOperation(consumer, begin, end);
+
+ Ctx->Runtime->SingleSys()->Send(new IEventHandle(ActorId, Ctx->Edge, event.Release()));
+}
+
+void TUserActionProcessorFixture::WaitCalcPredicateResult(const TCalcPredicateMatcher& matcher)
+{
+ auto event = Ctx->Runtime->GrabEdgeEvent<TEvPQ::TEvTxCalcPredicateResult>();
+ UNIT_ASSERT(event != nullptr);
+
+ if (matcher.Step) {
+ UNIT_ASSERT_VALUES_EQUAL(*matcher.Step, event->Step);
+ }
+ if (matcher.TxId) {
+ UNIT_ASSERT_VALUES_EQUAL(*matcher.TxId, event->TxId);
+ }
+ if (matcher.Partition) {
+ UNIT_ASSERT_VALUES_EQUAL(*matcher.Partition, event->Partition);
+ }
+ if (matcher.Predicate) {
+ UNIT_ASSERT_VALUES_EQUAL(*matcher.Predicate, event->Predicate);
+ }
+}
+
+void TUserActionProcessorFixture::SendCommitTx(ui64 step, ui64 txId)
+{
+ auto event = MakeHolder<TEvPQ::TEvTxCommit>(step, txId);
+ Ctx->Runtime->SingleSys()->Send(new IEventHandle(ActorId, Ctx->Edge, event.Release()));
+}
+
+void TUserActionProcessorFixture::SendRollbackTx(ui64 step, ui64 txId)
+{
+ auto event = MakeHolder<TEvPQ::TEvTxRollback>(step, txId);
+ Ctx->Runtime->SingleSys()->Send(new IEventHandle(ActorId, Ctx->Edge, event.Release()));
+}
+
+void TUserActionProcessorFixture::WaitCommitTxDone(const TCommitTxDoneMatcher& matcher)
+{
+ auto event = Ctx->Runtime->GrabEdgeEvent<TEvPQ::TEvTxCommitDone>();
+ UNIT_ASSERT(event != nullptr);
+
+ if (matcher.Step) {
+ UNIT_ASSERT_VALUES_EQUAL(*matcher.Step, event->Step);
+ }
+ if (matcher.TxId) {
+ UNIT_ASSERT_VALUES_EQUAL(*matcher.TxId, event->TxId);
+ }
+ if (matcher.Partition) {
+ UNIT_ASSERT_VALUES_EQUAL(*matcher.Partition, event->Partition);
+ }
+}
+
+TTransaction TUserActionProcessorFixture::MakeTransaction(ui64 step, ui64 txId,
+ TString consumer,
+ ui64 begin, ui64 end,
+ TMaybe<bool> predicate)
+{
+ auto event = MakeSimpleShared<TEvPQ::TEvTxCalcPredicate>(step, txId);
+ event->AddOperation(std::move(consumer), begin, end);
+
+ return TTransaction(event, predicate);
+}
+
Y_UNIT_TEST_F(Batching, TUserActionProcessorFixture)
{
CreatePartition();
SendCreateSession(4, "client-1", "session-id-1", 2, 3);
- WaitCmdWrite(2, {{0, {.Session = "session-id-1", .Offset=0, .Generation=2, .Step=3}}});
+ WaitCmdWrite({.Count=2, .UserInfos={{0, {.Session = "session-id-1", .Offset=0, .Generation=2, .Step=3}}}});
SendCreateSession(5, "client-2", "session-id-2", 4, 5);
SendCreateSession(6, "client-3", "session-id-3", 6, 7);
@@ -477,10 +689,10 @@ Y_UNIT_TEST_F(Batching, TUserActionProcessorFixture)
WaitProxyResponse({.Cookie=4});
- WaitCmdWrite(4, {
+ WaitCmdWrite({.Count=4, .UserInfos={
{0, {.Session = "session-id-2", .Offset=0, .Generation=4, .Step=5}},
{2, {.Session = "session-id-3", .Offset=0, .Generation=6, .Step=7}}
- });
+ }});
SendSetOffset(7, "client-1", 0, "session-id-1");
SendCreateSession(8, "client-1", "session-id-2", 8, 9);
@@ -493,9 +705,9 @@ Y_UNIT_TEST_F(Batching, TUserActionProcessorFixture)
WaitProxyResponse({.Cookie=5});
WaitProxyResponse({.Cookie=6});
- WaitCmdWrite(2, {
+ WaitCmdWrite({.Count=2, .UserInfos={
{0, {.Session = "session-id-2", .Offset=0, .Generation=8, .Step=9}},
- });
+ }});
SendCmdWriteResponse(NMsgBusProxy::MSTATUS_OK);
@@ -514,7 +726,7 @@ Y_UNIT_TEST_F(SetOffset, TUserActionProcessorFixture)
const TString client = "client";
const TString session = "session";
- CreatePartition(partition, begin, end);
+ CreatePartition({.Partition=partition, .Begin=begin, .End=end});
//
// create session
@@ -525,7 +737,7 @@ Y_UNIT_TEST_F(SetOffset, TUserActionProcessorFixture)
// regular commit (5 <= end)
//
SendSetOffset(1, client, 5, session);
- WaitCmdWrite(2, {{0, {.Session=session, .Offset=5}}});
+ WaitCmdWrite({.Count=2, .UserInfos={{0, {.Session=session, .Offset=5}}}});
SendCmdWriteResponse(NMsgBusProxy::MSTATUS_OK);
WaitProxyResponse({.Cookie=1, .Status=NMsgBusProxy::MSTATUS_OK});
@@ -539,7 +751,7 @@ Y_UNIT_TEST_F(SetOffset, TUserActionProcessorFixture)
// commit to back (1 < 5)
//
SendSetOffset(3, client, 1, session);
- WaitCmdWrite(2, {{0, {.Session=session, .Offset=5}}});
+ WaitCmdWrite({.Count=2, .UserInfos={{0, {.Session=session, .Offset=5}}}});
SendCmdWriteResponse(NMsgBusProxy::MSTATUS_OK);
WaitProxyResponse({.Cookie=3, .Status=NMsgBusProxy::MSTATUS_OK});
@@ -553,7 +765,7 @@ Y_UNIT_TEST_F(SetOffset, TUserActionProcessorFixture)
// commit to future (13 > end)
//
SendSetOffset(5, client, 13, session);
- WaitCmdWrite(2, {{0, {.Session=session, .Offset=end}}});
+ WaitCmdWrite({.Count=2, .UserInfos={{0, {.Session=session, .Offset=end}}}});
SendCmdWriteResponse(NMsgBusProxy::MSTATUS_OK);
WaitProxyResponse({.Cookie=5, .Status=NMsgBusProxy::MSTATUS_OK});
}
@@ -566,7 +778,7 @@ Y_UNIT_TEST_F(CommitOffsetRanges, TUserActionProcessorFixture)
const TString client = "client";
const TString session = "session";
- CreatePartition(partition, begin, end);
+ CreatePartition({.Partition=partition, .Begin=begin, .End=end});
//
// create session
@@ -579,7 +791,7 @@ Y_UNIT_TEST_F(CommitOffsetRanges, TUserActionProcessorFixture)
"topic-path",
true,
1);
- WaitCmdWrite(2, {{0, {.Session="", .Offset=2}}});
+ WaitCmdWrite({.Count=2, .UserInfos={{0, {.Session="", .Offset=2}}}});
SendProposeTransactionRequest(partition,
2, 0, // begin > end
@@ -615,7 +827,7 @@ Y_UNIT_TEST_F(CommitOffsetRanges, TUserActionProcessorFixture)
SendCmdWriteResponse(NMsgBusProxy::MSTATUS_OK);
WaitProposeTransactionResponse({.TxId=1, .Status=NKikimrPQ::TEvProposeTransactionResult::COMPLETE});
- WaitCmdWrite(2, {{0, {.Session="", .Offset=4}}});
+ WaitCmdWrite({.Count=2, .UserInfos={{0, {.Session="", .Offset=4}}}});
SendCmdWriteResponse(NMsgBusProxy::MSTATUS_OK);
WaitProposeTransactionResponse({.TxId=2, .Status=NKikimrPQ::TEvProposeTransactionResult::BAD_REQUEST});
@@ -628,6 +840,226 @@ Y_UNIT_TEST_F(CommitOffsetRanges, TUserActionProcessorFixture)
WaitProxyResponse({.Cookie=6, .Offset=4});
}
+Y_UNIT_TEST_F(CorrectRange_Commit, TUserActionProcessorFixture)
+{
+ const ui32 partition = 3;
+ const ui64 begin = 0;
+ const ui64 end = 10;
+ const TString client = "client";
+ const TString session = "session";
+
+ const ui64 step = 12345;
+ const ui64 txId = 67890;
+
+ CreatePartition({.Partition=partition, .Begin=begin, .End=end, .PlanStep=step, .TxId=10000});
+ CreateSession(client, session);
+
+ SendCalcPredicate(step, txId, client, 0, 2);
+ WaitCalcPredicateResult({.Step=step, .TxId=txId, .Partition=partition, .Predicate=true});
+
+ SendCommitTx(step, txId);
+
+ WaitCmdWrite({.Count=3, .PlanStep=step, .TxId=txId, .UserInfos={{1, {.Session="", .Offset=2}}}});
+ SendCmdWriteResponse(NMsgBusProxy::MSTATUS_OK);
+
+ WaitCommitTxDone({.TxId=txId, .Partition=partition});
+}
+
+Y_UNIT_TEST_F(CorrectRange_Multiple_Transactions, TUserActionProcessorFixture)
+{
+ const ui32 partition = 3;
+ const ui64 begin = 0;
+ const ui64 end = 10;
+ const TString client = "client";
+ const TString session = "session";
+
+ const ui64 step = 12345;
+ const ui64 txId_1 = 67890;
+ const ui64 txId_2 = 67891;
+ const ui64 txId_3 = 67892;
+
+ CreatePartition({.Partition=partition, .Begin=begin, .End=end, .PlanStep=step, .TxId=10000});
+ CreateSession(client, session);
+
+ SendCalcPredicate(step, txId_1, client, 0, 1);
+ WaitCalcPredicateResult({.Step=step, .TxId=txId_1, .Partition=partition, .Predicate=true});
+
+ SendCalcPredicate(step, txId_2, client, 0, 2);
+ SendCalcPredicate(step, txId_3, client, 0, 2);
+
+ SendCommitTx(step, txId_1);
+
+ WaitCalcPredicateResult({.Step=step, .TxId=txId_2, .Partition=partition, .Predicate=false});
+ SendRollbackTx(step, txId_2);
+
+ WaitCalcPredicateResult({.Step=step, .TxId=txId_3, .Partition=partition, .Predicate=false});
+ SendRollbackTx(step, txId_3);
+
+ WaitCmdWrite({.Count=3, .PlanStep=step, .TxId=txId_3, .UserInfos={{1, {.Session="", .Offset=1}}}});
+ SendCmdWriteResponse(NMsgBusProxy::MSTATUS_OK);
+
+ WaitCommitTxDone({.TxId=txId_1, .Partition=partition});
+}
+
+Y_UNIT_TEST_F(CorrectRange_Multiple_Consumers, TUserActionProcessorFixture)
+{
+ const ui32 partition = 3;
+ const ui64 begin = 0;
+ const ui64 end = 10;
+
+ const ui64 step = 12345;
+ const ui64 txId = 67890;
+
+ CreatePartition({.Partition=partition, .Begin=begin, .End=end});
+ CreateSession("client-1", "session-1");
+ CreateSession("client-2", "session-2");
+
+ SendSetOffset(1, "client-1", 3, "session-1");
+ SendCalcPredicate(step, txId, "client-2", 0, 1);
+ SendSetOffset(2, "client-1", 6, "session-1");
+
+ WaitCmdWrite({.Count=2, .UserInfos={{0, {.Session="session-1", .Offset=3}}}});
+ SendCmdWriteResponse(NMsgBusProxy::MSTATUS_OK);
+
+ WaitProxyResponse({.Cookie=1, .Status=NMsgBusProxy::MSTATUS_OK});
+
+ WaitCalcPredicateResult({.Step=step, .TxId=txId, .Partition=partition, .Predicate=true});
+ SendCommitTx(step, txId);
+
+ WaitCmdWrite({.Count=5, .UserInfos={
+ {1, {.Session="", .Offset=1}},
+ {3, {.Session="session-1", .Offset=6}}
+ }});
+}
+
+Y_UNIT_TEST_F(OldPlanStep, TUserActionProcessorFixture)
+{
+ const ui32 partition = 3;
+ const ui64 begin = 0;
+ const ui64 end = 10;
+
+ const ui64 step = 12345;
+ const ui64 txId = 67890;
+
+ CreatePartition({.Partition=partition, .Begin=begin, .End=end, .PlanStep=99999, .TxId=55555});
+
+ SendCommitTx(step, txId);
+ WaitCommitTxDone({.TxId=txId, .Partition=partition});
+}
+
+Y_UNIT_TEST_F(AfterRestart_1, TUserActionProcessorFixture)
+{
+ const ui32 partition = 3;
+ const ui64 begin = 0;
+ const ui64 end = 10;
+ const TString consumer = "client";
+ const TString session = "session";
+
+ const ui64 step = 12345;
+
+ TVector<TTransaction> txs;
+ txs.push_back(MakeTransaction(step, 11111, consumer, 0, 2, true));
+ txs.push_back(MakeTransaction(step, 22222, consumer, 2, 4));
+
+ CreatePartition({.Partition=partition,
+ .Begin=begin,
+ .End=end,
+ .PlanStep=step, .TxId=10000,
+ .Transactions=std::move(txs)},
+ {{.Consumer=consumer, .Offset=0, .Session=session}});
+
+ SendCommitTx(step, 11111);
+
+ WaitCalcPredicateResult({.Step=step, .TxId=22222, .Partition=partition, .Predicate=true});
+ SendCommitTx(step, 22222);
+
+ WaitCmdWrite({.Count=3, .PlanStep=step, .TxId=22222, .UserInfos={{1, {.Session="", .Offset=4}}}});
+}
+
+Y_UNIT_TEST_F(AfterRestart_2, TUserActionProcessorFixture)
+{
+ const ui32 partition = 3;
+ const ui64 begin = 0;
+ const ui64 end = 10;
+ const TString consumer = "client";
+ const TString session = "session";
+
+ const ui64 step = 12345;
+
+ TVector<TTransaction> txs;
+ txs.push_back(MakeTransaction(step, 11111, consumer, 0, 2));
+ txs.push_back(MakeTransaction(step, 22222, consumer, 2, 4));
+
+ CreatePartition({.Partition=partition,
+ .Begin=begin,
+ .End=end,
+ .PlanStep=step, .TxId=10000,
+ .Transactions=std::move(txs)},
+ {{.Consumer=consumer, .Offset=0, .Session=session}});
+
+ WaitCalcPredicateResult({.Step=step, .TxId=11111, .Partition=partition, .Predicate=true});
+}
+
+Y_UNIT_TEST_F(IncorrectRange, TUserActionProcessorFixture)
+{
+ const ui32 partition = 3;
+ const ui64 begin = 0;
+ const ui64 end = 10;
+ const TString client = "client";
+ const TString session = "session";
+
+ const ui64 step = 12345;
+ ui64 txId = 67890;
+
+ CreatePartition({.Partition=partition, .Begin=begin, .End=end});
+ CreateSession(client, session);
+
+ SendCalcPredicate(step, txId, client, 4, 2);
+ WaitCalcPredicateResult({.Step=step, .TxId=txId, .Partition=partition, .Predicate=false});
+ SendRollbackTx(step, txId);
+
+ WaitCmdWrite({.Count=1, .PlanStep=step, .TxId=txId});
+ SendCmdWriteResponse(NMsgBusProxy::MSTATUS_OK);
+
+ ++txId;
+
+ SendCalcPredicate(step, txId, client, 2, 4);
+ WaitCalcPredicateResult({.Step=step, .TxId=txId, .Partition=partition, .Predicate=false});
+ SendRollbackTx(step, txId);
+
+ WaitCmdWrite({.Count=1, .PlanStep=step, .TxId=txId});
+ SendCmdWriteResponse(NMsgBusProxy::MSTATUS_OK);
+
+ ++txId;
+
+ SendCalcPredicate(step, txId, client, 0, 11);
+ WaitCalcPredicateResult({.Step=step, .TxId=txId, .Partition=partition, .Predicate=false});
+}
+
+Y_UNIT_TEST_F(CorrectRange_Rollback, TUserActionProcessorFixture)
+{
+ const ui32 partition = 3;
+ const ui64 begin = 0;
+ const ui64 end = 10;
+ const TString client = "client";
+ const TString session = "session";
+
+ const ui64 step = 12345;
+ const ui64 txId_1 = 67890;
+ const ui64 txId_2 = 67891;
+
+ CreatePartition({.Partition=partition, .Begin=begin, .End=end});
+ CreateSession(client, session);
+
+ SendCalcPredicate(step, txId_1, client, 0, 2);
+ WaitCalcPredicateResult({.Step=step, .TxId=txId_1, .Partition=partition, .Predicate=true});
+
+ SendCalcPredicate(step, txId_2, client, 0, 5);
+ SendRollbackTx(step, txId_1);
+
+ WaitCalcPredicateResult({.Step=step, .TxId=txId_2, .Partition=partition, .Predicate=true});
+}
+
} // Y_UNIT_TEST_SUITE(TUserActionProcessorTests)
} // namespace NKikimr::NPQ
diff --git a/ydb/core/protos/pqconfig.proto b/ydb/core/protos/pqconfig.proto
index 2345a6b759b..23545a9a76c 100644
--- a/ydb/core/protos/pqconfig.proto
+++ b/ydb/core/protos/pqconfig.proto
@@ -20,6 +20,11 @@ message TPartitionMeta {
optional uint64 EndOffset = 2;
}
+message TPartitionTxMeta {
+ optional uint64 PlanStep = 1;
+ optional uint64 TxId = 2;
+}
+
message TPQConfig {
optional uint32 ACLRetryTimeoutSec = 1 [default = 300];
optional uint32 BalancerMetadataRetryTimeoutSec = 2 [default = 240];