diff options
-rw-r--r-- | ydb/core/tablet/pipe_tracker.cpp | 51 | ||||
-rw-r--r-- | ydb/core/tablet/pipe_tracker.h | 3 | ||||
-rw-r--r-- | ydb/core/tx/schemeshard/schemeshard__operation.h | 17 | ||||
-rw-r--r-- | ydb/core/tx/schemeshard/schemeshard__operation_side_effects.cpp | 2 | ||||
-rw-r--r-- | ydb/core/tx/schemeshard/schemeshard__state_changed_reply.cpp | 24 | ||||
-rw-r--r-- | ydb/core/tx/schemeshard/schemeshard_impl.cpp | 38 |
6 files changed, 74 insertions, 61 deletions
diff --git a/ydb/core/tablet/pipe_tracker.cpp b/ydb/core/tablet/pipe_tracker.cpp index cf6e41a97f..f1843a44df 100644 --- a/ydb/core/tablet/pipe_tracker.cpp +++ b/ydb/core/tablet/pipe_tracker.cpp @@ -6,19 +6,14 @@ std::unordered_set<ui64> TPipeTrackerBase::EmptySet; std::unordered_set<std::pair<ui64, ui64>> TPipeTrackerBase::EmptyPairSet; void TPipeTrackerBase::AttachTablet(ui64 txid, ui64 tabletid, ui64 cookie) { - auto txIt = TxToTablet.find(txid); - if (txIt == TxToTablet.end()) { - txIt = TxToTablet.emplace(txid, std::unordered_set<std::pair<ui64, ui64>>()).first; - } - - auto& tabletSet = txIt->second; + auto& tabletSet = TxToTablet[txid]; auto tabIt = tabletSet.find(std::make_pair(cookie, tabletid)); if (tabIt != tabletSet.end()) return; tabletSet.insert(std::make_pair(cookie, tabletid)); TabletToTx[tabletid].insert(txid); - TxTablets[txid].insert(tabletid); + TxTablets[txid][tabletid].insert(cookie); } bool TPipeTrackerBase::DetachTablet(ui64 txid, ui64 tabletid, ui64 cookie) { @@ -32,21 +27,27 @@ bool TPipeTrackerBase::DetachTablet(ui64 txid, ui64 tabletid, ui64 cookie) { return false; tabletSet.erase(tabIt); - auto multiIt = TxTablets.find(txid); - Y_VERIFY(multiIt != TxTablets.end()); - auto& tablets = multiIt->second; - auto currIt = tablets.find(tabletid); - Y_VERIFY(currIt != tablets.end()); - auto nextIt = currIt; - ++nextIt; - tablets.erase(currIt); - if (nextIt == tablets.end() || *nextIt != tabletid) { - if (tabletSet.empty()) { + + auto itTxTablets = TxTablets.find(txid); + Y_VERIFY(itTxTablets != TxTablets.end()); + auto itCookies = itTxTablets->second.find(tabletid); + Y_VERIFY(itCookies != itTxTablets->second.end()); + auto itCookie = itCookies->second.find(cookie); + Y_VERIFY(itCookie != itCookies->second.end()); + itCookies->second.erase(itCookie); + + // Cookies are empty when there are no more links between txid and tabletid + if (itCookies->second.empty()) { + itTxTablets->second.erase(itCookies); + + // Check if txid has no more tablets + if (itTxTablets->second.empty()) { + Y_VERIFY(tabletSet.empty()); + TxTablets.erase(itTxTablets); TxToTablet.erase(txIt); - Y_VERIFY(tablets.empty()); - TxTablets.erase(multiIt); } + // Unlink txid from tabletid auto it = TabletToTx.find(tabletid); Y_VERIFY(it != TabletToTx.end()); it->second.erase(txid); @@ -80,6 +81,18 @@ const std::unordered_set<std::pair<ui64, ui64> > &TPipeTrackerBase::FindTablets( return it->second; } +const std::unordered_set<ui64> &TPipeTrackerBase::FindCookies(ui64 txid, ui64 tabletid) const { + auto it = TxTablets.find(txid); + if (it == TxTablets.end()) + return EmptySet; + + auto itCookies = it->second.find(tabletid); + if (itCookies == it->second.end()) + return EmptySet; + + return itCookies->second; +} + TPipeTracker::TPipeTracker(NTabletPipe::IClientCache& clientCache) : ClientCache(clientCache) { diff --git a/ydb/core/tablet/pipe_tracker.h b/ydb/core/tablet/pipe_tracker.h index 33b90ba82b..35cefe0a6d 100644 --- a/ydb/core/tablet/pipe_tracker.h +++ b/ydb/core/tablet/pipe_tracker.h @@ -17,11 +17,12 @@ public: bool IsTxAlive(ui64 txid) const; const std::unordered_set<ui64>& FindTx(ui64 tabletid) const; const std::unordered_set<std::pair<ui64, ui64>>& FindTablets(ui64 txid) const; + const std::unordered_set<ui64>& FindCookies(ui64 txid, ui64 tabletid) const; private: std::unordered_map<ui64, std::unordered_set<ui64>> TabletToTx; // tabletid -> txid std::unordered_map<ui64, std::unordered_set<std::pair<ui64, ui64>>> TxToTablet; // txid -> cookie:tabletid - std::unordered_map<ui64, std::unordered_multiset<ui64>> TxTablets; // txid -> tabletids + std::unordered_map<ui64, std::unordered_map<ui64, std::unordered_set<ui64>>> TxTablets; // txid -> tabletid -> cookie static std::unordered_set<ui64> EmptySet; static std::unordered_set<std::pair<ui64, ui64>> EmptyPairSet; }; diff --git a/ydb/core/tx/schemeshard/schemeshard__operation.h b/ydb/core/tx/schemeshard/schemeshard__operation.h index ea66e3c5c8..ae7ceec9df 100644 --- a/ydb/core/tx/schemeshard/schemeshard__operation.h +++ b/ydb/core/tx/schemeshard/schemeshard__operation.h @@ -18,8 +18,21 @@ struct TOperation: TSimpleRefCount<TOperation> { THashSet<TTxId> DependentOperations; THashSet<TTxId> WaitOperations; - using TPreSerialisedMessage = std::pair<ui32, TIntrusivePtr<TEventSerializedData>>; // ui32 it's a type - THashMap<TTabletId, TMap<TPipeMessageId, TPreSerialisedMessage>> PipeBindedMessages; // std::pair<ui64, ui64> it's a cookie + struct TPreSerializedMessage { + ui32 Type; + TIntrusivePtr<TEventSerializedData> Data; + TOperationId OpId; + + TPreSerializedMessage() = default; + + TPreSerializedMessage(ui32 type, TIntrusivePtr<TEventSerializedData> data, TOperationId opId) + : Type(type) + , Data(std::move(data)) + , OpId(opId) + { } + }; + + THashMap<TTabletId, TMap<TPipeMessageId, TPreSerializedMessage>> PipeBindedMessages; // std::pair<ui64, ui64> it's a cookie THashMap<TTabletId, TSubTxId> RelationsByTabletId; THashMap<TShardIdx, TSubTxId> RelationsByShardIdx; diff --git a/ydb/core/tx/schemeshard/schemeshard__operation_side_effects.cpp b/ydb/core/tx/schemeshard/schemeshard__operation_side_effects.cpp index 9979742fc6..8c16a6c03a 100644 --- a/ydb/core/tx/schemeshard/schemeshard__operation_side_effects.cpp +++ b/ydb/core/tx/schemeshard/schemeshard__operation_side_effects.cpp @@ -618,7 +618,7 @@ void TSideEffects::DoBindMsg(TSchemeShard *ss, const TActorContext &ctx) { const bool success = message->SerializeToArcadiaStream(&serializer); Y_VERIFY(success); TIntrusivePtr<TEventSerializedData> data = serializer.Release(message->IsExtendedFormat()); - operation->PipeBindedMessages[tablet][cookie] = TOperation::TPreSerialisedMessage(msgType, data); + operation->PipeBindedMessages[tablet][cookie] = TOperation::TPreSerializedMessage(msgType, data, opId); ss->PipeClientCache->Send(ctx, ui64(tablet), msgType, data, cookie.second); } diff --git a/ydb/core/tx/schemeshard/schemeshard__state_changed_reply.cpp b/ydb/core/tx/schemeshard/schemeshard__state_changed_reply.cpp index ffa6e34d50..40b1b59a8c 100644 --- a/ydb/core/tx/schemeshard/schemeshard__state_changed_reply.cpp +++ b/ydb/core/tx/schemeshard/schemeshard__state_changed_reply.cpp @@ -49,25 +49,19 @@ struct TSchemeShard::TTxShardStateChanged : public TSchemeShard::TRwTxBase { TOperation::TPtr operation = Self->Operations.at(txId); - for (auto& related: Self->PipeTracker.FindTablets(ui64(txId))) { - ui64 pipeTrackerCookie = related.first; - auto relatedTabletId = TTabletId(related.second); - - if (tabletId != relatedTabletId) { - continue; - } - + for (ui64 pipeTrackerCookie : Self->PipeTracker.FindCookies(ui64(txId), ui64(tabletId))) { auto opId = TOperationId(txId, pipeTrackerCookie); SideEffects.ActivateTx(opId); + } - if (!operation->PipeBindedMessages.contains(tabletId)) { - continue; - } + if (!operation->PipeBindedMessages.contains(tabletId)) { + continue; + } - for (auto& items: operation->PipeBindedMessages.at(tabletId)) { - auto msgCookie = items.first; - SideEffects.UnbindMsgFromPipe(opId, tabletId, msgCookie); - } + for (auto& item: operation->PipeBindedMessages.at(tabletId)) { + auto msgCookie = item.first; + auto& msg = item.second; + SideEffects.UnbindMsgFromPipe(msg.OpId, tabletId, msgCookie); } } } diff --git a/ydb/core/tx/schemeshard/schemeshard_impl.cpp b/ydb/core/tx/schemeshard/schemeshard_impl.cpp index 3777d4c25f..8b7c7b4ada 100644 --- a/ydb/core/tx/schemeshard/schemeshard_impl.cpp +++ b/ydb/core/tx/schemeshard/schemeshard_impl.cpp @@ -5450,39 +5450,31 @@ void TSchemeShard::RestartPipeTx(TTabletId tabletId, const TActorContext& ctx) { } TOperation::TPtr operation = Operations.at(txId); - TSubTxId subTxId = operation->FindRelatedPartByTabletId(tabletId, ctx); - for (auto related: PipeTracker.FindTablets(item)) { - ui64 pipeTrackerCookie = related.first; - auto relatedTabletId = TTabletId(related.second); - - if (tabletId != relatedTabletId) { - continue; - } - - if (!operation->PipeBindedMessages.contains(tabletId)) { + if (!operation->PipeBindedMessages.contains(tabletId)) { + for (ui64 pipeTrackerCookie : PipeTracker.FindCookies(ui64(txId), ui64(tabletId))) { LOG_DEBUG_S(ctx, NKikimrServices::FLAT_TX_SCHEMESHARD, "Pipe attached message is not found, ignore event" << ", opId:" << TOperationId(txId, pipeTrackerCookie) << ", tableId: " << tabletId << ", at schemeshardId: " << TabletID()); - continue; } + continue; + } - for (auto& items: operation->PipeBindedMessages.at(tabletId)) { - TPipeMessageId msgCookie = items.first; - TOperation::TPreSerialisedMessage& preSerialisedMessages = items.second; + for (auto& item: operation->PipeBindedMessages.at(tabletId)) { + TPipeMessageId msgCookie = item.first; + TOperation::TPreSerializedMessage& msg = item.second; - LOG_INFO_S(ctx, NKikimrServices::FLAT_TX_SCHEMESHARD, - "Pipe attached message is found and resent into the new pipe" - << ", opId:" << TOperationId(txId, subTxId) - << ", dst tableId: " << tabletId - << ", msg type: " << preSerialisedMessages.first - << ", msg cookie: " << msgCookie - << ", at schemeshardId: " << TabletID()); + LOG_INFO_S(ctx, NKikimrServices::FLAT_TX_SCHEMESHARD, + "Pipe attached message is found and resent into the new pipe" + << ", opId:" << msg.OpId + << ", dst tableId: " << tabletId + << ", msg type: " << msg.Type + << ", msg cookie: " << msgCookie + << ", at schemeshardId: " << TabletID()); - PipeClientCache->Send(ctx, ui64(tabletId), preSerialisedMessages.first, preSerialisedMessages.second, msgCookie.second); - } + PipeClientCache->Send(ctx, ui64(tabletId), msg.Type, msg.Data, msgCookie.second); } } } |