aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorEvgeniy Ivanov <eivanov89@ydb.tech>2024-11-29 16:51:43 +0100
committerGitHub <noreply@github.com>2024-11-29 16:51:43 +0100
commit8fde16d3cdda23a3097b8bad7a4f2084c9069530 (patch)
treee9ab30c050a05f0bdf5bae63230ad91027f304bd
parent8edc4ee5dc2b45c7c924a069d85c0c3939d0a5fd (diff)
downloadydb-8fde16d3cdda23a3097b8bad7a4f2084c9069530.tar.gz
remove obsolete invalidation from kqp (#11965)
-rw-r--r--ydb/core/kqp/compute_actor/kqp_scan_fetcher_actor.cpp4
-rw-r--r--ydb/core/kqp/runtime/kqp_read_actor.cpp4
-rw-r--r--ydb/core/kqp/runtime/kqp_stream_lookup_actor.cpp1
-rw-r--r--ydb/core/kqp/runtime/kqp_write_actor.cpp23
4 files changed, 11 insertions, 21 deletions
diff --git a/ydb/core/kqp/compute_actor/kqp_scan_fetcher_actor.cpp b/ydb/core/kqp/compute_actor/kqp_scan_fetcher_actor.cpp
index c17289b35f..0c04670325 100644
--- a/ydb/core/kqp/compute_actor/kqp_scan_fetcher_actor.cpp
+++ b/ydb/core/kqp/compute_actor/kqp_scan_fetcher_actor.cpp
@@ -251,9 +251,6 @@ void TKqpScanFetcherActor::HandleExecute(TEvTxProxySchemeCache::TEvResolveKeySet
for (const auto& x : request->ResultSet) {
if ((ui32)x.Status < (ui32)NSchemeCache::TSchemeCacheRequest::EStatus::OkScheme) {
- // invalidate table
- Send(MakeSchemeCacheID(), new TEvTxProxySchemeCache::TEvInvalidateTable(ScanDataMeta.TableId, {}));
-
switch (x.Status) {
case NSchemeCache::TSchemeCacheRequest::EStatus::PathErrorNotExist:
statusCode = NDqProto::StatusIds::SCHEME_ERROR;
@@ -633,7 +630,6 @@ void TKqpScanFetcherActor::ResolveShard(TShardState& state) {
auto request = MakeHolder<NSchemeCache::TSchemeCacheRequest>();
request->ResultSet.emplace_back(std::move(keyDesc));
- Send(MakeSchemeCacheID(), new TEvTxProxySchemeCache::TEvInvalidateTable(ScanDataMeta.TableId, {}));
Send(MakeSchemeCacheID(), new TEvTxProxySchemeCache::TEvResolveKeySet(request));
}
diff --git a/ydb/core/kqp/runtime/kqp_read_actor.cpp b/ydb/core/kqp/runtime/kqp_read_actor.cpp
index b30875968b..42d6075e64 100644
--- a/ydb/core/kqp/runtime/kqp_read_actor.cpp
+++ b/ydb/core/kqp/runtime/kqp_read_actor.cpp
@@ -520,7 +520,6 @@ public:
ReadActorStateSpan = NWilson::TSpan(TWilsonKqp::ReadActorShardsResolve, ReadActorSpan.GetTraceId(),
"WaitForShardsResolve", NWilson::EFlags::AUTO_END);
- Send(MakeSchemeCacheID(), new TEvTxProxySchemeCache::TEvInvalidateTable(TableId, {}));
Send(MakeSchemeCacheID(), new TEvTxProxySchemeCache::TEvResolveKeySet(request));
}
@@ -544,9 +543,6 @@ public:
for (const auto& x : request->ResultSet) {
if ((ui32)x.Status < (ui32)NSchemeCache::TSchemeCacheRequest::EStatus::OkScheme) {
- // invalidate table
- Send(MakeSchemeCacheID(), new TEvTxProxySchemeCache::TEvInvalidateTable(TableId, {}));
-
switch (x.Status) {
case NSchemeCache::TSchemeCacheRequest::EStatus::PathErrorNotExist:
statusCode = NDqProto::StatusIds::SCHEME_ERROR;
diff --git a/ydb/core/kqp/runtime/kqp_stream_lookup_actor.cpp b/ydb/core/kqp/runtime/kqp_stream_lookup_actor.cpp
index 60225ab253..694f5159f3 100644
--- a/ydb/core/kqp/runtime/kqp_stream_lookup_actor.cpp
+++ b/ydb/core/kqp/runtime/kqp_stream_lookup_actor.cpp
@@ -586,7 +586,6 @@ private:
LookupActorStateSpan = NWilson::TSpan(TWilsonKqp::LookupActorShardsResolve, LookupActorSpan.GetTraceId(),
"WaitForShardsResolve", NWilson::EFlags::AUTO_END);
- Send(MakeSchemeCacheID(), new TEvTxProxySchemeCache::TEvInvalidateTable(StreamLookupWorker->GetTableId(), {}));
Send(MakeSchemeCacheID(), new TEvTxProxySchemeCache::TEvResolveKeySet(request));
SchemeCacheRequestTimeoutTimer = CreateLongTimer(TlsActivationContext->AsActorContext(), SchemeCacheRequestTimeout,
diff --git a/ydb/core/kqp/runtime/kqp_write_actor.cpp b/ydb/core/kqp/runtime/kqp_write_actor.cpp
index 0bf58bd317..e52512c9df 100644
--- a/ydb/core/kqp/runtime/kqp_write_actor.cpp
+++ b/ydb/core/kqp/runtime/kqp_write_actor.cpp
@@ -60,7 +60,7 @@ namespace {
evWrite->Record.SetTxId(txId);
auto* protoLocks = evWrite->Record.MutableLocks();
protoLocks->SetOp(NKikimrDataEvents::TKqpLocks::Commit);
-
+
const auto prepareSettings = txManager->GetPrepareTransactionInfo();
if (!prepareSettings.ArbiterColumnShard) {
for (const ui64 sendingShardId : prepareSettings.SendingShards) {
@@ -360,7 +360,7 @@ public:
CA_LOG_D("Plan resolve with delay " << CalculateNextAttemptDelay(MessageSettings, ResolveAttempts));
TlsActivationContext->Schedule(
CalculateNextAttemptDelay(MessageSettings, ResolveAttempts),
- new IEventHandle(SelfId(), SelfId(), new TEvPrivate::TEvResolveRequestPlanned{}, 0, 0));
+ new IEventHandle(SelfId(), SelfId(), new TEvPrivate::TEvResolveRequestPlanned{}, 0, 0));
}
void Handle(TEvPrivate::TEvResolveRequestPlanned::TPtr&) {
@@ -395,7 +395,6 @@ public:
TableWriteActorStateSpan = NWilson::TSpan(TWilsonKqp::TableWriteActorTableNavigate, TableWriteActorSpan.GetTraceId(),
"WaitForShardsResolve", NWilson::EFlags::AUTO_END);
- Send(MakeSchemeCacheID(), new TEvTxProxySchemeCache::TEvInvalidateTable(TableId, {}), 0, 0, TableWriteActorStateSpan.GetTraceId());
Send(MakeSchemeCacheID(), new TEvTxProxySchemeCache::TEvNavigateKeySet(request), 0, 0, TableWriteActorStateSpan.GetTraceId());
}
@@ -496,7 +495,7 @@ public:
}()
<< ", Cookie=" << ev->Cookie);
-
+
switch (ev->Get()->GetStatus()) {
case NKikimrDataEvents::TEvWriteResult::STATUS_UNSPECIFIED: {
@@ -581,7 +580,7 @@ public:
NYql::NDqProto::StatusIds::UNAVAILABLE,
getIssues());
return;
- }
+ }
case NKikimrDataEvents::TEvWriteResult::STATUS_OVERLOADED: {
CA_LOG_W("Got OVERLOADED for table `"
<< SchemeEntry->TableId.PathId.ToString() << "`."
@@ -808,7 +807,7 @@ public:
? NKikimrDataEvents::TEvWrite::MODE_VOLATILE_PREPARE
: NKikimrDataEvents::TEvWrite::MODE_PREPARE)
: NKikimrDataEvents::TEvWrite::MODE_IMMEDIATE);
-
+
if (isImmediateCommit) {
const auto locks = TxManager->GetLocks(shardId);
if (!locks.empty()) {
@@ -944,7 +943,7 @@ public:
if (TableWriteActorSpan) {
TableWriteActorSpan.EndError(message);
}
-
+
Callbacks->OnError(message, statusCode, subIssues);
}
@@ -1359,7 +1358,7 @@ public:
} else {
token = *ev->Get()->Token;
}
-
+
auto& queue = DataQueues[token.TableId];
queue.emplace();
auto& message = queue.back();
@@ -1372,7 +1371,7 @@ public:
ev->Get()->Data = nullptr;
ev->Get()->Alloc = nullptr;
-
+
Process();
}
@@ -1672,7 +1671,7 @@ public:
queue.pop();
}
}
-
+
for (auto& [_, info] : WriteInfos) {
if (info.WriteTableActor) {
info.WriteTableActor->Terminate();
@@ -1846,7 +1845,7 @@ public:
NYql::NDqProto::StatusIds::UNAVAILABLE,
getIssues());
return;
- }
+ }
case NKikimrDataEvents::TEvWriteResult::STATUS_OVERLOADED: {
CA_LOG_W("Got OVERLOADED for table ."
<< " ShardID=" << ev->Get()->Record.GetOrigin() << ","
@@ -2023,7 +2022,7 @@ public:
BufferWriteActorState.EndError(message);
BufferWriteActor.EndError(message);
CA_LOG_E(message << ". statusCode=" << NYql::NDqProto::StatusIds_StatusCode_Name(statusCode) << ". subIssues=" << subIssues.ToString() << ". sessionActorId=" << SessionActorId << ". isRollback=" << (State == EState::ROLLINGBACK));
-
+
Y_ABORT_UNLESS(!HasError);
HasError = true;
if (State != EState::ROLLINGBACK) {