diff options
author | Evgeniy Ivanov <eivanov89@ydb.tech> | 2024-11-29 16:51:43 +0100 |
---|---|---|
committer | GitHub <noreply@github.com> | 2024-11-29 16:51:43 +0100 |
commit | 8fde16d3cdda23a3097b8bad7a4f2084c9069530 (patch) | |
tree | e9ab30c050a05f0bdf5bae63230ad91027f304bd | |
parent | 8edc4ee5dc2b45c7c924a069d85c0c3939d0a5fd (diff) | |
download | ydb-8fde16d3cdda23a3097b8bad7a4f2084c9069530.tar.gz |
remove obsolete invalidation from kqp (#11965)
-rw-r--r-- | ydb/core/kqp/compute_actor/kqp_scan_fetcher_actor.cpp | 4 | ||||
-rw-r--r-- | ydb/core/kqp/runtime/kqp_read_actor.cpp | 4 | ||||
-rw-r--r-- | ydb/core/kqp/runtime/kqp_stream_lookup_actor.cpp | 1 | ||||
-rw-r--r-- | ydb/core/kqp/runtime/kqp_write_actor.cpp | 23 |
4 files changed, 11 insertions, 21 deletions
diff --git a/ydb/core/kqp/compute_actor/kqp_scan_fetcher_actor.cpp b/ydb/core/kqp/compute_actor/kqp_scan_fetcher_actor.cpp index c17289b35f..0c04670325 100644 --- a/ydb/core/kqp/compute_actor/kqp_scan_fetcher_actor.cpp +++ b/ydb/core/kqp/compute_actor/kqp_scan_fetcher_actor.cpp @@ -251,9 +251,6 @@ void TKqpScanFetcherActor::HandleExecute(TEvTxProxySchemeCache::TEvResolveKeySet for (const auto& x : request->ResultSet) { if ((ui32)x.Status < (ui32)NSchemeCache::TSchemeCacheRequest::EStatus::OkScheme) { - // invalidate table - Send(MakeSchemeCacheID(), new TEvTxProxySchemeCache::TEvInvalidateTable(ScanDataMeta.TableId, {})); - switch (x.Status) { case NSchemeCache::TSchemeCacheRequest::EStatus::PathErrorNotExist: statusCode = NDqProto::StatusIds::SCHEME_ERROR; @@ -633,7 +630,6 @@ void TKqpScanFetcherActor::ResolveShard(TShardState& state) { auto request = MakeHolder<NSchemeCache::TSchemeCacheRequest>(); request->ResultSet.emplace_back(std::move(keyDesc)); - Send(MakeSchemeCacheID(), new TEvTxProxySchemeCache::TEvInvalidateTable(ScanDataMeta.TableId, {})); Send(MakeSchemeCacheID(), new TEvTxProxySchemeCache::TEvResolveKeySet(request)); } diff --git a/ydb/core/kqp/runtime/kqp_read_actor.cpp b/ydb/core/kqp/runtime/kqp_read_actor.cpp index b30875968b..42d6075e64 100644 --- a/ydb/core/kqp/runtime/kqp_read_actor.cpp +++ b/ydb/core/kqp/runtime/kqp_read_actor.cpp @@ -520,7 +520,6 @@ public: ReadActorStateSpan = NWilson::TSpan(TWilsonKqp::ReadActorShardsResolve, ReadActorSpan.GetTraceId(), "WaitForShardsResolve", NWilson::EFlags::AUTO_END); - Send(MakeSchemeCacheID(), new TEvTxProxySchemeCache::TEvInvalidateTable(TableId, {})); Send(MakeSchemeCacheID(), new TEvTxProxySchemeCache::TEvResolveKeySet(request)); } @@ -544,9 +543,6 @@ public: for (const auto& x : request->ResultSet) { if ((ui32)x.Status < (ui32)NSchemeCache::TSchemeCacheRequest::EStatus::OkScheme) { - // invalidate table - Send(MakeSchemeCacheID(), new TEvTxProxySchemeCache::TEvInvalidateTable(TableId, {})); - switch (x.Status) { case NSchemeCache::TSchemeCacheRequest::EStatus::PathErrorNotExist: statusCode = NDqProto::StatusIds::SCHEME_ERROR; diff --git a/ydb/core/kqp/runtime/kqp_stream_lookup_actor.cpp b/ydb/core/kqp/runtime/kqp_stream_lookup_actor.cpp index 60225ab253..694f5159f3 100644 --- a/ydb/core/kqp/runtime/kqp_stream_lookup_actor.cpp +++ b/ydb/core/kqp/runtime/kqp_stream_lookup_actor.cpp @@ -586,7 +586,6 @@ private: LookupActorStateSpan = NWilson::TSpan(TWilsonKqp::LookupActorShardsResolve, LookupActorSpan.GetTraceId(), "WaitForShardsResolve", NWilson::EFlags::AUTO_END); - Send(MakeSchemeCacheID(), new TEvTxProxySchemeCache::TEvInvalidateTable(StreamLookupWorker->GetTableId(), {})); Send(MakeSchemeCacheID(), new TEvTxProxySchemeCache::TEvResolveKeySet(request)); SchemeCacheRequestTimeoutTimer = CreateLongTimer(TlsActivationContext->AsActorContext(), SchemeCacheRequestTimeout, diff --git a/ydb/core/kqp/runtime/kqp_write_actor.cpp b/ydb/core/kqp/runtime/kqp_write_actor.cpp index 0bf58bd317..e52512c9df 100644 --- a/ydb/core/kqp/runtime/kqp_write_actor.cpp +++ b/ydb/core/kqp/runtime/kqp_write_actor.cpp @@ -60,7 +60,7 @@ namespace { evWrite->Record.SetTxId(txId); auto* protoLocks = evWrite->Record.MutableLocks(); protoLocks->SetOp(NKikimrDataEvents::TKqpLocks::Commit); - + const auto prepareSettings = txManager->GetPrepareTransactionInfo(); if (!prepareSettings.ArbiterColumnShard) { for (const ui64 sendingShardId : prepareSettings.SendingShards) { @@ -360,7 +360,7 @@ public: CA_LOG_D("Plan resolve with delay " << CalculateNextAttemptDelay(MessageSettings, ResolveAttempts)); TlsActivationContext->Schedule( CalculateNextAttemptDelay(MessageSettings, ResolveAttempts), - new IEventHandle(SelfId(), SelfId(), new TEvPrivate::TEvResolveRequestPlanned{}, 0, 0)); + new IEventHandle(SelfId(), SelfId(), new TEvPrivate::TEvResolveRequestPlanned{}, 0, 0)); } void Handle(TEvPrivate::TEvResolveRequestPlanned::TPtr&) { @@ -395,7 +395,6 @@ public: TableWriteActorStateSpan = NWilson::TSpan(TWilsonKqp::TableWriteActorTableNavigate, TableWriteActorSpan.GetTraceId(), "WaitForShardsResolve", NWilson::EFlags::AUTO_END); - Send(MakeSchemeCacheID(), new TEvTxProxySchemeCache::TEvInvalidateTable(TableId, {}), 0, 0, TableWriteActorStateSpan.GetTraceId()); Send(MakeSchemeCacheID(), new TEvTxProxySchemeCache::TEvNavigateKeySet(request), 0, 0, TableWriteActorStateSpan.GetTraceId()); } @@ -496,7 +495,7 @@ public: }() << ", Cookie=" << ev->Cookie); - + switch (ev->Get()->GetStatus()) { case NKikimrDataEvents::TEvWriteResult::STATUS_UNSPECIFIED: { @@ -581,7 +580,7 @@ public: NYql::NDqProto::StatusIds::UNAVAILABLE, getIssues()); return; - } + } case NKikimrDataEvents::TEvWriteResult::STATUS_OVERLOADED: { CA_LOG_W("Got OVERLOADED for table `" << SchemeEntry->TableId.PathId.ToString() << "`." @@ -808,7 +807,7 @@ public: ? NKikimrDataEvents::TEvWrite::MODE_VOLATILE_PREPARE : NKikimrDataEvents::TEvWrite::MODE_PREPARE) : NKikimrDataEvents::TEvWrite::MODE_IMMEDIATE); - + if (isImmediateCommit) { const auto locks = TxManager->GetLocks(shardId); if (!locks.empty()) { @@ -944,7 +943,7 @@ public: if (TableWriteActorSpan) { TableWriteActorSpan.EndError(message); } - + Callbacks->OnError(message, statusCode, subIssues); } @@ -1359,7 +1358,7 @@ public: } else { token = *ev->Get()->Token; } - + auto& queue = DataQueues[token.TableId]; queue.emplace(); auto& message = queue.back(); @@ -1372,7 +1371,7 @@ public: ev->Get()->Data = nullptr; ev->Get()->Alloc = nullptr; - + Process(); } @@ -1672,7 +1671,7 @@ public: queue.pop(); } } - + for (auto& [_, info] : WriteInfos) { if (info.WriteTableActor) { info.WriteTableActor->Terminate(); @@ -1846,7 +1845,7 @@ public: NYql::NDqProto::StatusIds::UNAVAILABLE, getIssues()); return; - } + } case NKikimrDataEvents::TEvWriteResult::STATUS_OVERLOADED: { CA_LOG_W("Got OVERLOADED for table ." << " ShardID=" << ev->Get()->Record.GetOrigin() << "," @@ -2023,7 +2022,7 @@ public: BufferWriteActorState.EndError(message); BufferWriteActor.EndError(message); CA_LOG_E(message << ". statusCode=" << NYql::NDqProto::StatusIds_StatusCode_Name(statusCode) << ". subIssues=" << subIssues.ToString() << ". sessionActorId=" << SessionActorId << ". isRollback=" << (State == EState::ROLLINGBACK)); - + Y_ABORT_UNLESS(!HasError); HasError = true; if (State != EState::ROLLINGBACK) { |