summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorssmike <[email protected]>2023-02-02 15:29:49 +0300
committerssmike <[email protected]>2023-02-02 15:29:49 +0300
commit2a727f613dac73e5ad1e408c0d631860db8d448a (patch)
treee77f10aa45da8571b429609d0be78f92ab1e4f6e
parent8e4c944fccd17258a1d43f14f2d02f789f4906e2 (diff)
invalidate pipecache on undelivery
-rw-r--r--ydb/core/base/tablet_pipecache.h13
-rw-r--r--ydb/core/kqp/executer_actor/kqp_data_executer.cpp2
-rw-r--r--ydb/core/kqp/executer_actor/kqp_executer_impl.h10
-rw-r--r--ydb/core/kqp/executer_actor/kqp_planner.cpp2
-rw-r--r--ydb/core/tablet/tablet_pipecache.cpp13
5 files changed, 37 insertions, 3 deletions
diff --git a/ydb/core/base/tablet_pipecache.h b/ydb/core/base/tablet_pipecache.h
index b9e246a69a1..ce56815fcff 100644
--- a/ydb/core/base/tablet_pipecache.h
+++ b/ydb/core/base/tablet_pipecache.h
@@ -14,6 +14,7 @@ struct TEvPipeCache {
EvForward = EventSpaceBegin(TKikimrEvents::ES_PIPECACHE),
EvUnlink,
EvGetTabletNode,
+ EvForcePipeReconnect,
EvDeliveryProblem = EvForward + 1 * 512,
EvGetTabletNodeResult,
@@ -66,6 +67,18 @@ struct TEvPipeCache {
};
/**
+ * Invalidate tablet node cache
+ */
+ struct TEvForcePipeReconnect : public TEventLocal<TEvForcePipeReconnect, EvForcePipeReconnect> {
+ const ui64 TabletId;
+
+ explicit TEvForcePipeReconnect(ui64 tabletId)
+ : TabletId(tabletId)
+ {
+ }
+ };
+
+ /**
* Returns node id of the given tablet id, or zero if there's a connection error
*/
struct TEvGetTabletNodeResult : public TEventLocal<TEvGetTabletNodeResult, EvGetTabletNodeResult> {
diff --git a/ydb/core/kqp/executer_actor/kqp_data_executer.cpp b/ydb/core/kqp/executer_actor/kqp_data_executer.cpp
index cf5791bc004..818ab7d820b 100644
--- a/ydb/core/kqp/executer_actor/kqp_data_executer.cpp
+++ b/ydb/core/kqp/executer_actor/kqp_data_executer.cpp
@@ -2053,7 +2053,7 @@ private:
if (SubscribedNodes.emplace(nodeId).second) {
flags |= IEventHandle::FlagSubscribeOnSession;
}
- TlsActivationContext->Send(new IEventHandle(target, SelfId(), ev.Release(), flags));
+ TlsActivationContext->Send(new IEventHandle(target, SelfId(), ev.Release(), flags, nodeId));
}
// then start data tasks with known actor ids of compute tasks
diff --git a/ydb/core/kqp/executer_actor/kqp_executer_impl.h b/ydb/core/kqp/executer_actor/kqp_executer_impl.h
index 543f87b5861..43ee7a96981 100644
--- a/ydb/core/kqp/executer_actor/kqp_executer_impl.h
+++ b/ydb/core/kqp/executer_actor/kqp_executer_impl.h
@@ -6,11 +6,13 @@
#include "kqp_table_resolver.h"
#include "kqp_shards_resolver.h"
+
#include <ydb/core/kqp/common/kqp_ru_calc.h>
#include <ydb/core/kqp/common/kqp_lwtrace_probes.h>
#include <ydb/core/actorlib_impl/long_timer.h>
#include <ydb/core/base/appdata.h>
+#include <ydb/core/base/tablet_pipecache.h>
#include <ydb/core/base/wilson.h>
#include <ydb/core/base/kikimr_issue.h>
#include <ydb/core/protos/tx_datashard.pb.h>
@@ -366,11 +368,19 @@ protected:
return false;
}
+ void InvalidateNode(ui64 node) {
+ for (auto tablet : ShardsOnNode[node]) {
+ auto ev = MakeHolder<TEvPipeCache::TEvForcePipeReconnect>(tablet);
+ this->Send(MakePipePeNodeCacheID(false), ev.Release());
+ }
+ }
+
void HandleUndelivered(TEvents::TEvUndelivered::TPtr& ev) {
ui32 eventType = ev->Get()->SourceType;
auto reason = ev->Get()->Reason;
switch (eventType) {
case TEvKqpNode::TEvStartKqpTasksRequest::EventType: {
+ InvalidateNode(ev->Cookie);
return InternalError(TStringBuilder()
<< "TEvKqpNode::TEvStartKqpTasksRequest lost: " << reason);
}
diff --git a/ydb/core/kqp/executer_actor/kqp_planner.cpp b/ydb/core/kqp/executer_actor/kqp_planner.cpp
index 66aed6e8dfa..6dbd70de951 100644
--- a/ydb/core/kqp/executer_actor/kqp_planner.cpp
+++ b/ydb/core/kqp/executer_actor/kqp_planner.cpp
@@ -187,7 +187,7 @@ void TKqpPlanner::Process(const TVector<NKikimrKqp::TKqpNodeResources>& snapshot
auto target = MakeKqpNodeServiceID(nodeId);
LOG_D("Send request to kqpnode: " << target << ", node_id: " << SelfId().NodeId() << ", TxId: " << TxId);
TlsActivationContext->Send(new IEventHandle(target, ExecuterId, ev.Release(),
- CalcSendMessageFlagsForNode(target.NodeId()), 0, nullptr, KqpPlannerSpan.GetTraceId()));
+ CalcSendMessageFlagsForNode(target.NodeId()), nodeId, nullptr, KqpPlannerSpan.GetTraceId()));
++requestsCnt;
}
Y_VERIFY(ScanTasks.empty());
diff --git a/ydb/core/tablet/tablet_pipecache.cpp b/ydb/core/tablet/tablet_pipecache.cpp
index 7cf1ad15814..b21c726d22a 100644
--- a/ydb/core/tablet/tablet_pipecache.cpp
+++ b/ydb/core/tablet/tablet_pipecache.cpp
@@ -68,6 +68,8 @@ class TPipePeNodeCache : public TActor<TPipePeNodeCache> {
};
struct TTabletState {
+ bool ForceReconnect = false;
+
THashMap<TActorId, TClientState> ByClient;
THashMap<TActorId, TClientState*> ByPeer;
@@ -207,7 +209,8 @@ class TPipePeNodeCache : public TActor<TPipePeNodeCache> {
TClientState* EnsureClient(TTabletState *tabletState, ui64 tabletId) {
TClientState *clientState = nullptr;
- if (!tabletState->LastClient || Config->PipeRefreshTime && tabletState->ByClient.size() < 2 && Config->PipeRefreshTime < (TActivationContext::Now() - tabletState->LastCreated)) {
+ if (!tabletState->LastClient || tabletState->ForceReconnect || Config->PipeRefreshTime && tabletState->ByClient.size() < 2 && Config->PipeRefreshTime < (TActivationContext::Now() - tabletState->LastCreated)) {
+ tabletState->ForceReconnect = false;
// Remove current client if it is idle
if (tabletState->LastClient) {
clientState = tabletState->FindClient(tabletState->LastClient);
@@ -240,6 +243,13 @@ class TPipePeNodeCache : public TActor<TPipePeNodeCache> {
return clientState;
}
+ void Handle(TEvPipeCache::TEvForcePipeReconnect::TPtr &ev) {
+ const ui64 tablet = ev->Get()->TabletId;
+ if (auto* tabletState = ByTablet.FindPtr(tablet)) {
+ tabletState->ForceReconnect = true;
+ }
+ }
+
void Handle(TEvPipeCache::TEvGetTabletNode::TPtr &ev) {
const ui64 tablet = ev->Get()->TabletId;
@@ -449,6 +459,7 @@ public:
Y_UNUSED(ctx);
switch (ev->GetTypeRewrite()) {
hFunc(TEvPipeCache::TEvGetTabletNode, Handle);
+ hFunc(TEvPipeCache::TEvForcePipeReconnect, Handle);
hFunc(TEvPipeCache::TEvForward, Handle);
hFunc(TEvPipeCache::TEvUnlink, Handle);
hFunc(TEvTabletPipe::TEvClientConnected, Handle);