diff options
| author | ssmike <[email protected]> | 2023-02-02 15:29:49 +0300 | 
|---|---|---|
| committer | ssmike <[email protected]> | 2023-02-02 15:29:49 +0300 | 
| commit | 2a727f613dac73e5ad1e408c0d631860db8d448a (patch) | |
| tree | e77f10aa45da8571b429609d0be78f92ab1e4f6e | |
| parent | 8e4c944fccd17258a1d43f14f2d02f789f4906e2 (diff) | |
invalidate pipecache on undelivery
| -rw-r--r-- | ydb/core/base/tablet_pipecache.h | 13 | ||||
| -rw-r--r-- | ydb/core/kqp/executer_actor/kqp_data_executer.cpp | 2 | ||||
| -rw-r--r-- | ydb/core/kqp/executer_actor/kqp_executer_impl.h | 10 | ||||
| -rw-r--r-- | ydb/core/kqp/executer_actor/kqp_planner.cpp | 2 | ||||
| -rw-r--r-- | ydb/core/tablet/tablet_pipecache.cpp | 13 | 
5 files changed, 37 insertions, 3 deletions
diff --git a/ydb/core/base/tablet_pipecache.h b/ydb/core/base/tablet_pipecache.h index b9e246a69a1..ce56815fcff 100644 --- a/ydb/core/base/tablet_pipecache.h +++ b/ydb/core/base/tablet_pipecache.h @@ -14,6 +14,7 @@ struct TEvPipeCache {          EvForward = EventSpaceBegin(TKikimrEvents::ES_PIPECACHE),          EvUnlink,          EvGetTabletNode, +        EvForcePipeReconnect,          EvDeliveryProblem = EvForward + 1 * 512,          EvGetTabletNodeResult, @@ -66,6 +67,18 @@ struct TEvPipeCache {      };      /** +     * Invalidate tablet node cache +     */ +    struct TEvForcePipeReconnect : public TEventLocal<TEvForcePipeReconnect, EvForcePipeReconnect> { +        const ui64 TabletId; + +        explicit TEvForcePipeReconnect(ui64 tabletId)  +            : TabletId(tabletId) +        { +        } +    }; + +    /**       * Returns node id of the given tablet id, or zero if there's a connection error       */      struct TEvGetTabletNodeResult : public TEventLocal<TEvGetTabletNodeResult, EvGetTabletNodeResult> { diff --git a/ydb/core/kqp/executer_actor/kqp_data_executer.cpp b/ydb/core/kqp/executer_actor/kqp_data_executer.cpp index cf5791bc004..818ab7d820b 100644 --- a/ydb/core/kqp/executer_actor/kqp_data_executer.cpp +++ b/ydb/core/kqp/executer_actor/kqp_data_executer.cpp @@ -2053,7 +2053,7 @@ private:              if (SubscribedNodes.emplace(nodeId).second) {                  flags |= IEventHandle::FlagSubscribeOnSession;              } -            TlsActivationContext->Send(new IEventHandle(target, SelfId(), ev.Release(), flags)); +            TlsActivationContext->Send(new IEventHandle(target, SelfId(), ev.Release(), flags, nodeId));          }          // then start data tasks with known actor ids of compute tasks diff --git a/ydb/core/kqp/executer_actor/kqp_executer_impl.h b/ydb/core/kqp/executer_actor/kqp_executer_impl.h index 543f87b5861..43ee7a96981 100644 --- a/ydb/core/kqp/executer_actor/kqp_executer_impl.h +++ b/ydb/core/kqp/executer_actor/kqp_executer_impl.h @@ -6,11 +6,13 @@  #include "kqp_table_resolver.h"  #include "kqp_shards_resolver.h" +  #include <ydb/core/kqp/common/kqp_ru_calc.h>  #include <ydb/core/kqp/common/kqp_lwtrace_probes.h>  #include <ydb/core/actorlib_impl/long_timer.h>  #include <ydb/core/base/appdata.h> +#include <ydb/core/base/tablet_pipecache.h>  #include <ydb/core/base/wilson.h>  #include <ydb/core/base/kikimr_issue.h>  #include <ydb/core/protos/tx_datashard.pb.h> @@ -366,11 +368,19 @@ protected:          return false;      } +    void InvalidateNode(ui64 node) { +        for (auto tablet : ShardsOnNode[node]) { +            auto ev = MakeHolder<TEvPipeCache::TEvForcePipeReconnect>(tablet); +            this->Send(MakePipePeNodeCacheID(false), ev.Release()); +        } +    } +      void HandleUndelivered(TEvents::TEvUndelivered::TPtr& ev) {          ui32 eventType = ev->Get()->SourceType;          auto reason = ev->Get()->Reason;          switch (eventType) {              case TEvKqpNode::TEvStartKqpTasksRequest::EventType: { +                InvalidateNode(ev->Cookie);                  return InternalError(TStringBuilder()                      << "TEvKqpNode::TEvStartKqpTasksRequest lost: " << reason);              } diff --git a/ydb/core/kqp/executer_actor/kqp_planner.cpp b/ydb/core/kqp/executer_actor/kqp_planner.cpp index 66aed6e8dfa..6dbd70de951 100644 --- a/ydb/core/kqp/executer_actor/kqp_planner.cpp +++ b/ydb/core/kqp/executer_actor/kqp_planner.cpp @@ -187,7 +187,7 @@ void TKqpPlanner::Process(const TVector<NKikimrKqp::TKqpNodeResources>& snapshot              auto target = MakeKqpNodeServiceID(nodeId);              LOG_D("Send request to kqpnode: " << target << ", node_id: " << SelfId().NodeId() << ", TxId: " << TxId);              TlsActivationContext->Send(new IEventHandle(target, ExecuterId, ev.Release(), -                CalcSendMessageFlagsForNode(target.NodeId()), 0, nullptr, KqpPlannerSpan.GetTraceId())); +                CalcSendMessageFlagsForNode(target.NodeId()), nodeId, nullptr, KqpPlannerSpan.GetTraceId()));              ++requestsCnt;          }          Y_VERIFY(ScanTasks.empty()); diff --git a/ydb/core/tablet/tablet_pipecache.cpp b/ydb/core/tablet/tablet_pipecache.cpp index 7cf1ad15814..b21c726d22a 100644 --- a/ydb/core/tablet/tablet_pipecache.cpp +++ b/ydb/core/tablet/tablet_pipecache.cpp @@ -68,6 +68,8 @@ class TPipePeNodeCache : public TActor<TPipePeNodeCache> {      };      struct TTabletState { +        bool ForceReconnect = false; +          THashMap<TActorId, TClientState> ByClient;          THashMap<TActorId, TClientState*> ByPeer; @@ -207,7 +209,8 @@ class TPipePeNodeCache : public TActor<TPipePeNodeCache> {      TClientState* EnsureClient(TTabletState *tabletState, ui64 tabletId) {          TClientState *clientState = nullptr; -        if (!tabletState->LastClient || Config->PipeRefreshTime && tabletState->ByClient.size() < 2 && Config->PipeRefreshTime < (TActivationContext::Now() - tabletState->LastCreated)) { +        if (!tabletState->LastClient || tabletState->ForceReconnect || Config->PipeRefreshTime && tabletState->ByClient.size() < 2 && Config->PipeRefreshTime < (TActivationContext::Now() - tabletState->LastCreated)) { +            tabletState->ForceReconnect = false;              // Remove current client if it is idle              if (tabletState->LastClient) {                  clientState = tabletState->FindClient(tabletState->LastClient); @@ -240,6 +243,13 @@ class TPipePeNodeCache : public TActor<TPipePeNodeCache> {          return clientState;      } +    void Handle(TEvPipeCache::TEvForcePipeReconnect::TPtr &ev) { +        const ui64 tablet = ev->Get()->TabletId; +        if (auto* tabletState = ByTablet.FindPtr(tablet)) { +            tabletState->ForceReconnect = true; +        } +    } +      void Handle(TEvPipeCache::TEvGetTabletNode::TPtr &ev) {          const ui64 tablet = ev->Get()->TabletId; @@ -449,6 +459,7 @@ public:          Y_UNUSED(ctx);          switch (ev->GetTypeRewrite()) {              hFunc(TEvPipeCache::TEvGetTabletNode, Handle); +            hFunc(TEvPipeCache::TEvForcePipeReconnect, Handle);              hFunc(TEvPipeCache::TEvForward, Handle);              hFunc(TEvPipeCache::TEvUnlink, Handle);              hFunc(TEvTabletPipe::TEvClientConnected, Handle);  | 
