aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorgvit <gvit@ydb.tech>2023-03-17 17:02:44 +0300
committergvit <gvit@ydb.tech>2023-03-17 17:02:44 +0300
commit9959a42781bf641f75ef94972faa4d29126ff14c (patch)
treebaaba2fbb4c988931d5bf48b020a1c33f078f5ff
parent301dd8be69dd6a92ad204547887831fb3c50a53a (diff)
downloadydb-9959a42781bf641f75ef94972faa4d29126ff14c.tar.gz
add cookie to navigation api
-rw-r--r--ydb/core/kqp/session_actor/kqp_query_state.cpp2
-rw-r--r--ydb/core/kqp/session_actor/kqp_session_actor.cpp11
-rw-r--r--ydb/core/tx/scheme_cache/scheme_cache.h1
3 files changed, 14 insertions, 0 deletions
diff --git a/ydb/core/kqp/session_actor/kqp_query_state.cpp b/ydb/core/kqp/session_actor/kqp_query_state.cpp
index ac6b84def5..523613690b 100644
--- a/ydb/core/kqp/session_actor/kqp_query_state.cpp
+++ b/ydb/core/kqp/session_actor/kqp_query_state.cpp
@@ -83,6 +83,8 @@ std::unique_ptr<TEvTxProxySchemeCache::TEvNavigateKeySet> TKqpQueryState::BuildN
navigate->UserToken = UserToken;
}
+ navigate->Cookie = QueryId;
+
for (const auto& [tableId, _] : TableVersions) {
NSchemeCache::TSchemeCacheNavigate::TEntry entry;
entry.TableId = tableId;
diff --git a/ydb/core/kqp/session_actor/kqp_session_actor.cpp b/ydb/core/kqp/session_actor/kqp_session_actor.cpp
index e3418d9acc..06cccb80b0 100644
--- a/ydb/core/kqp/session_actor/kqp_session_actor.cpp
+++ b/ydb/core/kqp/session_actor/kqp_session_actor.cpp
@@ -423,6 +423,7 @@ public:
QueryState->TopicOperations.FillSchemeCacheNavigate(*navigate,
std::move(consumer));
navigate->UserToken = QueryState->UserToken;
+ navigate->Cookie = QueryState->QueryId;
Become(&TKqpSessionActor::TopicOpsState);
ctx.Send(MakeSchemeCacheID(), new TEvTxProxySchemeCache::TEvNavigateKeySet(navigate.release()));
@@ -441,6 +442,11 @@ public:
void Handle(TEvTxProxySchemeCache::TEvNavigateKeySetResult::TPtr& ev) {
const auto* response = ev->Get();
+ YQL_ENSURE(response->Request);
+ // outdated response from scheme cache.
+ // ignoring that.
+ if (response->Request->Cookie < QueryId)
+ return;
// table versions are not the same. need the query recompilation.
if (!QueryState->EnsureTableVersions(*response)) {
@@ -1911,6 +1917,11 @@ private:
}
void HandleTopicOps(TEvTxProxySchemeCache::TEvNavigateKeySetResult::TPtr& ev) {
+ YQL_ENSURE(ev->Get()->Request);
+ if (ev->Get()->Request->Cookie < QueryId) {
+ return;
+ }
+
NSchemeCache::TSchemeCacheNavigate* response = ev->Get()->Request.Get();
Ydb::StatusIds_StatusCode status;
diff --git a/ydb/core/tx/scheme_cache/scheme_cache.h b/ydb/core/tx/scheme_cache/scheme_cache.h
index 2f85708c15..dfe9de5247 100644
--- a/ydb/core/tx/scheme_cache/scheme_cache.h
+++ b/ydb/core/tx/scheme_cache/scheme_cache.h
@@ -282,6 +282,7 @@ struct TSchemeCacheNavigate {
TString DatabaseName;
ui64 DomainOwnerId = 0;
ui64 ErrorCount = 0;
+ ui64 Cookie = 0;
const ui64 Instant; // deprecated, used by pq
TSchemeCacheNavigate()