diff options
| author | ssmike <[email protected]> | 2023-02-02 15:29:49 +0300 |
|---|---|---|
| committer | ssmike <[email protected]> | 2023-02-02 15:29:49 +0300 |
| commit | 2a727f613dac73e5ad1e408c0d631860db8d448a (patch) | |
| tree | e77f10aa45da8571b429609d0be78f92ab1e4f6e | |
| parent | 8e4c944fccd17258a1d43f14f2d02f789f4906e2 (diff) | |
invalidate pipecache on undelivery
| -rw-r--r-- | ydb/core/base/tablet_pipecache.h | 13 | ||||
| -rw-r--r-- | ydb/core/kqp/executer_actor/kqp_data_executer.cpp | 2 | ||||
| -rw-r--r-- | ydb/core/kqp/executer_actor/kqp_executer_impl.h | 10 | ||||
| -rw-r--r-- | ydb/core/kqp/executer_actor/kqp_planner.cpp | 2 | ||||
| -rw-r--r-- | ydb/core/tablet/tablet_pipecache.cpp | 13 |
5 files changed, 37 insertions, 3 deletions
diff --git a/ydb/core/base/tablet_pipecache.h b/ydb/core/base/tablet_pipecache.h index b9e246a69a1..ce56815fcff 100644 --- a/ydb/core/base/tablet_pipecache.h +++ b/ydb/core/base/tablet_pipecache.h @@ -14,6 +14,7 @@ struct TEvPipeCache { EvForward = EventSpaceBegin(TKikimrEvents::ES_PIPECACHE), EvUnlink, EvGetTabletNode, + EvForcePipeReconnect, EvDeliveryProblem = EvForward + 1 * 512, EvGetTabletNodeResult, @@ -66,6 +67,18 @@ struct TEvPipeCache { }; /** + * Invalidate tablet node cache + */ + struct TEvForcePipeReconnect : public TEventLocal<TEvForcePipeReconnect, EvForcePipeReconnect> { + const ui64 TabletId; + + explicit TEvForcePipeReconnect(ui64 tabletId) + : TabletId(tabletId) + { + } + }; + + /** * Returns node id of the given tablet id, or zero if there's a connection error */ struct TEvGetTabletNodeResult : public TEventLocal<TEvGetTabletNodeResult, EvGetTabletNodeResult> { diff --git a/ydb/core/kqp/executer_actor/kqp_data_executer.cpp b/ydb/core/kqp/executer_actor/kqp_data_executer.cpp index cf5791bc004..818ab7d820b 100644 --- a/ydb/core/kqp/executer_actor/kqp_data_executer.cpp +++ b/ydb/core/kqp/executer_actor/kqp_data_executer.cpp @@ -2053,7 +2053,7 @@ private: if (SubscribedNodes.emplace(nodeId).second) { flags |= IEventHandle::FlagSubscribeOnSession; } - TlsActivationContext->Send(new IEventHandle(target, SelfId(), ev.Release(), flags)); + TlsActivationContext->Send(new IEventHandle(target, SelfId(), ev.Release(), flags, nodeId)); } // then start data tasks with known actor ids of compute tasks diff --git a/ydb/core/kqp/executer_actor/kqp_executer_impl.h b/ydb/core/kqp/executer_actor/kqp_executer_impl.h index 543f87b5861..43ee7a96981 100644 --- a/ydb/core/kqp/executer_actor/kqp_executer_impl.h +++ b/ydb/core/kqp/executer_actor/kqp_executer_impl.h @@ -6,11 +6,13 @@ #include "kqp_table_resolver.h" #include "kqp_shards_resolver.h" + #include <ydb/core/kqp/common/kqp_ru_calc.h> #include <ydb/core/kqp/common/kqp_lwtrace_probes.h> #include <ydb/core/actorlib_impl/long_timer.h> #include <ydb/core/base/appdata.h> +#include <ydb/core/base/tablet_pipecache.h> #include <ydb/core/base/wilson.h> #include <ydb/core/base/kikimr_issue.h> #include <ydb/core/protos/tx_datashard.pb.h> @@ -366,11 +368,19 @@ protected: return false; } + void InvalidateNode(ui64 node) { + for (auto tablet : ShardsOnNode[node]) { + auto ev = MakeHolder<TEvPipeCache::TEvForcePipeReconnect>(tablet); + this->Send(MakePipePeNodeCacheID(false), ev.Release()); + } + } + void HandleUndelivered(TEvents::TEvUndelivered::TPtr& ev) { ui32 eventType = ev->Get()->SourceType; auto reason = ev->Get()->Reason; switch (eventType) { case TEvKqpNode::TEvStartKqpTasksRequest::EventType: { + InvalidateNode(ev->Cookie); return InternalError(TStringBuilder() << "TEvKqpNode::TEvStartKqpTasksRequest lost: " << reason); } diff --git a/ydb/core/kqp/executer_actor/kqp_planner.cpp b/ydb/core/kqp/executer_actor/kqp_planner.cpp index 66aed6e8dfa..6dbd70de951 100644 --- a/ydb/core/kqp/executer_actor/kqp_planner.cpp +++ b/ydb/core/kqp/executer_actor/kqp_planner.cpp @@ -187,7 +187,7 @@ void TKqpPlanner::Process(const TVector<NKikimrKqp::TKqpNodeResources>& snapshot auto target = MakeKqpNodeServiceID(nodeId); LOG_D("Send request to kqpnode: " << target << ", node_id: " << SelfId().NodeId() << ", TxId: " << TxId); TlsActivationContext->Send(new IEventHandle(target, ExecuterId, ev.Release(), - CalcSendMessageFlagsForNode(target.NodeId()), 0, nullptr, KqpPlannerSpan.GetTraceId())); + CalcSendMessageFlagsForNode(target.NodeId()), nodeId, nullptr, KqpPlannerSpan.GetTraceId())); ++requestsCnt; } Y_VERIFY(ScanTasks.empty()); diff --git a/ydb/core/tablet/tablet_pipecache.cpp b/ydb/core/tablet/tablet_pipecache.cpp index 7cf1ad15814..b21c726d22a 100644 --- a/ydb/core/tablet/tablet_pipecache.cpp +++ b/ydb/core/tablet/tablet_pipecache.cpp @@ -68,6 +68,8 @@ class TPipePeNodeCache : public TActor<TPipePeNodeCache> { }; struct TTabletState { + bool ForceReconnect = false; + THashMap<TActorId, TClientState> ByClient; THashMap<TActorId, TClientState*> ByPeer; @@ -207,7 +209,8 @@ class TPipePeNodeCache : public TActor<TPipePeNodeCache> { TClientState* EnsureClient(TTabletState *tabletState, ui64 tabletId) { TClientState *clientState = nullptr; - if (!tabletState->LastClient || Config->PipeRefreshTime && tabletState->ByClient.size() < 2 && Config->PipeRefreshTime < (TActivationContext::Now() - tabletState->LastCreated)) { + if (!tabletState->LastClient || tabletState->ForceReconnect || Config->PipeRefreshTime && tabletState->ByClient.size() < 2 && Config->PipeRefreshTime < (TActivationContext::Now() - tabletState->LastCreated)) { + tabletState->ForceReconnect = false; // Remove current client if it is idle if (tabletState->LastClient) { clientState = tabletState->FindClient(tabletState->LastClient); @@ -240,6 +243,13 @@ class TPipePeNodeCache : public TActor<TPipePeNodeCache> { return clientState; } + void Handle(TEvPipeCache::TEvForcePipeReconnect::TPtr &ev) { + const ui64 tablet = ev->Get()->TabletId; + if (auto* tabletState = ByTablet.FindPtr(tablet)) { + tabletState->ForceReconnect = true; + } + } + void Handle(TEvPipeCache::TEvGetTabletNode::TPtr &ev) { const ui64 tablet = ev->Get()->TabletId; @@ -449,6 +459,7 @@ public: Y_UNUSED(ctx); switch (ev->GetTypeRewrite()) { hFunc(TEvPipeCache::TEvGetTabletNode, Handle); + hFunc(TEvPipeCache::TEvForcePipeReconnect, Handle); hFunc(TEvPipeCache::TEvForward, Handle); hFunc(TEvPipeCache::TEvUnlink, Handle); hFunc(TEvTabletPipe::TEvClientConnected, Handle); |
