aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--ydb/core/tablet/pipe_tracker.cpp51
-rw-r--r--ydb/core/tablet/pipe_tracker.h3
-rw-r--r--ydb/core/tx/schemeshard/schemeshard__operation.h17
-rw-r--r--ydb/core/tx/schemeshard/schemeshard__operation_side_effects.cpp2
-rw-r--r--ydb/core/tx/schemeshard/schemeshard__state_changed_reply.cpp24
-rw-r--r--ydb/core/tx/schemeshard/schemeshard_impl.cpp38
6 files changed, 74 insertions, 61 deletions
diff --git a/ydb/core/tablet/pipe_tracker.cpp b/ydb/core/tablet/pipe_tracker.cpp
index cf6e41a97f..f1843a44df 100644
--- a/ydb/core/tablet/pipe_tracker.cpp
+++ b/ydb/core/tablet/pipe_tracker.cpp
@@ -6,19 +6,14 @@ std::unordered_set<ui64> TPipeTrackerBase::EmptySet;
std::unordered_set<std::pair<ui64, ui64>> TPipeTrackerBase::EmptyPairSet;
void TPipeTrackerBase::AttachTablet(ui64 txid, ui64 tabletid, ui64 cookie) {
- auto txIt = TxToTablet.find(txid);
- if (txIt == TxToTablet.end()) {
- txIt = TxToTablet.emplace(txid, std::unordered_set<std::pair<ui64, ui64>>()).first;
- }
-
- auto& tabletSet = txIt->second;
+ auto& tabletSet = TxToTablet[txid];
auto tabIt = tabletSet.find(std::make_pair(cookie, tabletid));
if (tabIt != tabletSet.end())
return;
tabletSet.insert(std::make_pair(cookie, tabletid));
TabletToTx[tabletid].insert(txid);
- TxTablets[txid].insert(tabletid);
+ TxTablets[txid][tabletid].insert(cookie);
}
bool TPipeTrackerBase::DetachTablet(ui64 txid, ui64 tabletid, ui64 cookie) {
@@ -32,21 +27,27 @@ bool TPipeTrackerBase::DetachTablet(ui64 txid, ui64 tabletid, ui64 cookie) {
return false;
tabletSet.erase(tabIt);
- auto multiIt = TxTablets.find(txid);
- Y_VERIFY(multiIt != TxTablets.end());
- auto& tablets = multiIt->second;
- auto currIt = tablets.find(tabletid);
- Y_VERIFY(currIt != tablets.end());
- auto nextIt = currIt;
- ++nextIt;
- tablets.erase(currIt);
- if (nextIt == tablets.end() || *nextIt != tabletid) {
- if (tabletSet.empty()) {
+
+ auto itTxTablets = TxTablets.find(txid);
+ Y_VERIFY(itTxTablets != TxTablets.end());
+ auto itCookies = itTxTablets->second.find(tabletid);
+ Y_VERIFY(itCookies != itTxTablets->second.end());
+ auto itCookie = itCookies->second.find(cookie);
+ Y_VERIFY(itCookie != itCookies->second.end());
+ itCookies->second.erase(itCookie);
+
+ // Cookies are empty when there are no more links between txid and tabletid
+ if (itCookies->second.empty()) {
+ itTxTablets->second.erase(itCookies);
+
+ // Check if txid has no more tablets
+ if (itTxTablets->second.empty()) {
+ Y_VERIFY(tabletSet.empty());
+ TxTablets.erase(itTxTablets);
TxToTablet.erase(txIt);
- Y_VERIFY(tablets.empty());
- TxTablets.erase(multiIt);
}
+ // Unlink txid from tabletid
auto it = TabletToTx.find(tabletid);
Y_VERIFY(it != TabletToTx.end());
it->second.erase(txid);
@@ -80,6 +81,18 @@ const std::unordered_set<std::pair<ui64, ui64> > &TPipeTrackerBase::FindTablets(
return it->second;
}
+const std::unordered_set<ui64> &TPipeTrackerBase::FindCookies(ui64 txid, ui64 tabletid) const {
+ auto it = TxTablets.find(txid);
+ if (it == TxTablets.end())
+ return EmptySet;
+
+ auto itCookies = it->second.find(tabletid);
+ if (itCookies == it->second.end())
+ return EmptySet;
+
+ return itCookies->second;
+}
+
TPipeTracker::TPipeTracker(NTabletPipe::IClientCache& clientCache)
: ClientCache(clientCache)
{
diff --git a/ydb/core/tablet/pipe_tracker.h b/ydb/core/tablet/pipe_tracker.h
index 33b90ba82b..35cefe0a6d 100644
--- a/ydb/core/tablet/pipe_tracker.h
+++ b/ydb/core/tablet/pipe_tracker.h
@@ -17,11 +17,12 @@ public:
bool IsTxAlive(ui64 txid) const;
const std::unordered_set<ui64>& FindTx(ui64 tabletid) const;
const std::unordered_set<std::pair<ui64, ui64>>& FindTablets(ui64 txid) const;
+ const std::unordered_set<ui64>& FindCookies(ui64 txid, ui64 tabletid) const;
private:
std::unordered_map<ui64, std::unordered_set<ui64>> TabletToTx; // tabletid -> txid
std::unordered_map<ui64, std::unordered_set<std::pair<ui64, ui64>>> TxToTablet; // txid -> cookie:tabletid
- std::unordered_map<ui64, std::unordered_multiset<ui64>> TxTablets; // txid -> tabletids
+ std::unordered_map<ui64, std::unordered_map<ui64, std::unordered_set<ui64>>> TxTablets; // txid -> tabletid -> cookie
static std::unordered_set<ui64> EmptySet;
static std::unordered_set<std::pair<ui64, ui64>> EmptyPairSet;
};
diff --git a/ydb/core/tx/schemeshard/schemeshard__operation.h b/ydb/core/tx/schemeshard/schemeshard__operation.h
index ea66e3c5c8..ae7ceec9df 100644
--- a/ydb/core/tx/schemeshard/schemeshard__operation.h
+++ b/ydb/core/tx/schemeshard/schemeshard__operation.h
@@ -18,8 +18,21 @@ struct TOperation: TSimpleRefCount<TOperation> {
THashSet<TTxId> DependentOperations;
THashSet<TTxId> WaitOperations;
- using TPreSerialisedMessage = std::pair<ui32, TIntrusivePtr<TEventSerializedData>>; // ui32 it's a type
- THashMap<TTabletId, TMap<TPipeMessageId, TPreSerialisedMessage>> PipeBindedMessages; // std::pair<ui64, ui64> it's a cookie
+ struct TPreSerializedMessage {
+ ui32 Type;
+ TIntrusivePtr<TEventSerializedData> Data;
+ TOperationId OpId;
+
+ TPreSerializedMessage() = default;
+
+ TPreSerializedMessage(ui32 type, TIntrusivePtr<TEventSerializedData> data, TOperationId opId)
+ : Type(type)
+ , Data(std::move(data))
+ , OpId(opId)
+ { }
+ };
+
+ THashMap<TTabletId, TMap<TPipeMessageId, TPreSerializedMessage>> PipeBindedMessages; // std::pair<ui64, ui64> it's a cookie
THashMap<TTabletId, TSubTxId> RelationsByTabletId;
THashMap<TShardIdx, TSubTxId> RelationsByShardIdx;
diff --git a/ydb/core/tx/schemeshard/schemeshard__operation_side_effects.cpp b/ydb/core/tx/schemeshard/schemeshard__operation_side_effects.cpp
index 9979742fc6..8c16a6c03a 100644
--- a/ydb/core/tx/schemeshard/schemeshard__operation_side_effects.cpp
+++ b/ydb/core/tx/schemeshard/schemeshard__operation_side_effects.cpp
@@ -618,7 +618,7 @@ void TSideEffects::DoBindMsg(TSchemeShard *ss, const TActorContext &ctx) {
const bool success = message->SerializeToArcadiaStream(&serializer);
Y_VERIFY(success);
TIntrusivePtr<TEventSerializedData> data = serializer.Release(message->IsExtendedFormat());
- operation->PipeBindedMessages[tablet][cookie] = TOperation::TPreSerialisedMessage(msgType, data);
+ operation->PipeBindedMessages[tablet][cookie] = TOperation::TPreSerializedMessage(msgType, data, opId);
ss->PipeClientCache->Send(ctx, ui64(tablet), msgType, data, cookie.second);
}
diff --git a/ydb/core/tx/schemeshard/schemeshard__state_changed_reply.cpp b/ydb/core/tx/schemeshard/schemeshard__state_changed_reply.cpp
index ffa6e34d50..40b1b59a8c 100644
--- a/ydb/core/tx/schemeshard/schemeshard__state_changed_reply.cpp
+++ b/ydb/core/tx/schemeshard/schemeshard__state_changed_reply.cpp
@@ -49,25 +49,19 @@ struct TSchemeShard::TTxShardStateChanged : public TSchemeShard::TRwTxBase {
TOperation::TPtr operation = Self->Operations.at(txId);
- for (auto& related: Self->PipeTracker.FindTablets(ui64(txId))) {
- ui64 pipeTrackerCookie = related.first;
- auto relatedTabletId = TTabletId(related.second);
-
- if (tabletId != relatedTabletId) {
- continue;
- }
-
+ for (ui64 pipeTrackerCookie : Self->PipeTracker.FindCookies(ui64(txId), ui64(tabletId))) {
auto opId = TOperationId(txId, pipeTrackerCookie);
SideEffects.ActivateTx(opId);
+ }
- if (!operation->PipeBindedMessages.contains(tabletId)) {
- continue;
- }
+ if (!operation->PipeBindedMessages.contains(tabletId)) {
+ continue;
+ }
- for (auto& items: operation->PipeBindedMessages.at(tabletId)) {
- auto msgCookie = items.first;
- SideEffects.UnbindMsgFromPipe(opId, tabletId, msgCookie);
- }
+ for (auto& item: operation->PipeBindedMessages.at(tabletId)) {
+ auto msgCookie = item.first;
+ auto& msg = item.second;
+ SideEffects.UnbindMsgFromPipe(msg.OpId, tabletId, msgCookie);
}
}
}
diff --git a/ydb/core/tx/schemeshard/schemeshard_impl.cpp b/ydb/core/tx/schemeshard/schemeshard_impl.cpp
index 3777d4c25f..8b7c7b4ada 100644
--- a/ydb/core/tx/schemeshard/schemeshard_impl.cpp
+++ b/ydb/core/tx/schemeshard/schemeshard_impl.cpp
@@ -5450,39 +5450,31 @@ void TSchemeShard::RestartPipeTx(TTabletId tabletId, const TActorContext& ctx) {
}
TOperation::TPtr operation = Operations.at(txId);
- TSubTxId subTxId = operation->FindRelatedPartByTabletId(tabletId, ctx);
- for (auto related: PipeTracker.FindTablets(item)) {
- ui64 pipeTrackerCookie = related.first;
- auto relatedTabletId = TTabletId(related.second);
-
- if (tabletId != relatedTabletId) {
- continue;
- }
-
- if (!operation->PipeBindedMessages.contains(tabletId)) {
+ if (!operation->PipeBindedMessages.contains(tabletId)) {
+ for (ui64 pipeTrackerCookie : PipeTracker.FindCookies(ui64(txId), ui64(tabletId))) {
LOG_DEBUG_S(ctx, NKikimrServices::FLAT_TX_SCHEMESHARD,
"Pipe attached message is not found, ignore event"
<< ", opId:" << TOperationId(txId, pipeTrackerCookie)
<< ", tableId: " << tabletId
<< ", at schemeshardId: " << TabletID());
- continue;
}
+ continue;
+ }
- for (auto& items: operation->PipeBindedMessages.at(tabletId)) {
- TPipeMessageId msgCookie = items.first;
- TOperation::TPreSerialisedMessage& preSerialisedMessages = items.second;
+ for (auto& item: operation->PipeBindedMessages.at(tabletId)) {
+ TPipeMessageId msgCookie = item.first;
+ TOperation::TPreSerializedMessage& msg = item.second;
- LOG_INFO_S(ctx, NKikimrServices::FLAT_TX_SCHEMESHARD,
- "Pipe attached message is found and resent into the new pipe"
- << ", opId:" << TOperationId(txId, subTxId)
- << ", dst tableId: " << tabletId
- << ", msg type: " << preSerialisedMessages.first
- << ", msg cookie: " << msgCookie
- << ", at schemeshardId: " << TabletID());
+ LOG_INFO_S(ctx, NKikimrServices::FLAT_TX_SCHEMESHARD,
+ "Pipe attached message is found and resent into the new pipe"
+ << ", opId:" << msg.OpId
+ << ", dst tableId: " << tabletId
+ << ", msg type: " << msg.Type
+ << ", msg cookie: " << msgCookie
+ << ", at schemeshardId: " << TabletID());
- PipeClientCache->Send(ctx, ui64(tabletId), preSerialisedMessages.first, preSerialisedMessages.second, msgCookie.second);
- }
+ PipeClientCache->Send(ctx, ui64(tabletId), msg.Type, msg.Data, msgCookie.second);
}
}
}