diff options
author | Maxim Yurchuk <[email protected]> | 2025-06-27 14:33:23 +0000 |
---|---|---|
committer | GitHub <[email protected]> | 2025-06-27 17:33:23 +0300 |
commit | 8c006b7875e5653eb78af3e218b3f3d37b6fd277 (patch) | |
tree | bc413ffbda11fa93b6df469cdbcb18f1f7179282 | |
parent | 55572b8efc9c230651c34b260d0e7ec8c43618b3 (diff) |
Revert "Correct Scan Generation in case of retries" (#20309)
-rw-r--r-- | ydb/core/kqp/compute_actor/kqp_scan_compute_manager.cpp | 1 | ||||
-rw-r--r-- | ydb/core/kqp/compute_actor/kqp_scan_compute_manager.h | 3 | ||||
-rw-r--r-- | ydb/core/kqp/runtime/kqp_scan_fetcher_ut.cpp | 76 | ||||
-rw-r--r-- | ydb/core/kqp/runtime/ut/ya.make | 5 |
4 files changed, 2 insertions, 83 deletions
diff --git a/ydb/core/kqp/compute_actor/kqp_scan_compute_manager.cpp b/ydb/core/kqp/compute_actor/kqp_scan_compute_manager.cpp index a727dac65cf..1b408cdf6f8 100644 --- a/ydb/core/kqp/compute_actor/kqp_scan_compute_manager.cpp +++ b/ydb/core/kqp/compute_actor/kqp_scan_compute_manager.cpp @@ -9,7 +9,6 @@ TShardState::TPtr TInFlightShards::Put(TShardState&& state) { MutableStatistics(state.TabletId).MutableStatistics(0).SetStartInstant(Now()); TShardState::TPtr result = std::make_shared<TShardState>(std::move(state)); - result->Generation = 1; AFL_ENSURE(Shards.emplace(result->TabletId, result).second)("tablet_id", result->TabletId); return result; } diff --git a/ydb/core/kqp/compute_actor/kqp_scan_compute_manager.h b/ydb/core/kqp/compute_actor/kqp_scan_compute_manager.h index 65d8462370d..c90904ab3f5 100644 --- a/ydb/core/kqp/compute_actor/kqp_scan_compute_manager.h +++ b/ydb/core/kqp/compute_actor/kqp_scan_compute_manager.h @@ -60,7 +60,7 @@ public: TShardScannerInfo(const ui64 scanId, TShardState& state, const IExternalObjectsProvider& externalObjectsProvider) : ScanId(scanId) , TabletId(state.TabletId) - , Generation(state.Generation) + , Generation(++state.Generation) { const bool subscribed = std::exchange(state.SubscribedOnTablet, true); @@ -392,7 +392,6 @@ public: state->ActorId = {}; state->State = NComputeActor::EShardState::Initial; state->SubscribedOnTablet = false; - state->Generation++; auto it = ShardScanners.find(tabletId); AFL_ENSURE(it != ShardScanners.end())("tablet_id", tabletId); diff --git a/ydb/core/kqp/runtime/kqp_scan_fetcher_ut.cpp b/ydb/core/kqp/runtime/kqp_scan_fetcher_ut.cpp deleted file mode 100644 index 0a19d4c9424..00000000000 --- a/ydb/core/kqp/runtime/kqp_scan_fetcher_ut.cpp +++ /dev/null @@ -1,76 +0,0 @@ -#include <ydb/core/kqp/compute_actor/kqp_scan_fetcher_actor.h> - -#include <ydb/core/testlib/basics/appdata.h> -#include <ydb/core/testlib/actors/test_runtime.h> - -#include <library/cpp/testing/unittest/registar.h> - -Y_UNIT_TEST_SUITE(TKqpScanFetcher) { - - Y_UNIT_TEST(ScanDelayedRetry) { - - constexpr ui64 TABLET_ID = 1001001; - - NActors::TTestActorRuntime runtime; - runtime.Initialize(NKikimr::TAppPrepare().Unwrap()); - - auto pipeCache = runtime.AllocateEdgeActor(); - runtime.RegisterService(NKikimr::MakePipePerNodeCacheID(false), pipeCache); - auto scan = runtime.AllocateEdgeActor(); - auto compute = runtime.AllocateEdgeActor(); - - NKikimrKqp::TKqpSnapshot snapshot; - NYql::NDq::TComputeRuntimeSettings settings; - NKikimrTxDataShard::TKqpTransaction_TScanTaskMeta meta; - auto& read = *meta.AddReads(); - read.SetShardId(TABLET_ID); - read.AddKeyRanges(); - NKikimrConfig::TTableServiceConfig::TShardsScanningPolicy protoPolicy; - NKikimr::NKqp::TShardsScanningPolicy shardsScanningPolicy(protoPolicy); - NWilson::TTraceId traceId(0); - NKikimr::NKqp::TCPULimits cpuLimits; - NMonitoring::TDynamicCounterPtr counters = MakeIntrusive<NMonitoring::TDynamicCounters>(); - auto scanFetcher = runtime.Register(CreateKqpScanFetcher(snapshot, {compute}, meta, settings, - 0, TMaybe<ui64>(), 0, TMaybe<NKikimrDataEvents::ELockMode>(), shardsScanningPolicy, - MakeIntrusive<NKikimr::NKqp::TKqpCounters>(counters), 0, cpuLimits) - ); - runtime.EnableScheduleForActor(scanFetcher, true); - - // 1. Simulate fail - { - auto event = runtime.GrabEdgeEvent<NKikimr::TEvPipeCache::TEvForward>(TSet<NActors::TActorId>{pipeCache}); - NKikimr::TEvDataShard::TEvKqpScan* evScan = dynamic_cast<NKikimr::TEvDataShard::TEvKqpScan*>(event->Get()->Ev.get()); - UNIT_ASSERT_EQUAL(evScan->Record.GetGeneration(), 1); - } - runtime.Send(scanFetcher, pipeCache, new NKikimr::TEvPipeCache::TEvDeliveryProblem(TABLET_ID, false)); - - // 2. First fail is retried instantly, so fail again - { - auto event = runtime.GrabEdgeEvent<NKikimr::TEvPipeCache::TEvForward>(TSet<NActors::TActorId>{pipeCache}); - NKikimr::TEvDataShard::TEvKqpScan* evScan = dynamic_cast<NKikimr::TEvDataShard::TEvKqpScan*>(event->Get()->Ev.get()); - UNIT_ASSERT_EQUAL(evScan->Record.GetGeneration(), 2); - } - runtime.Send(scanFetcher, pipeCache, new NKikimr::TEvPipeCache::TEvDeliveryProblem(TABLET_ID, false)); - - // 3. Now we have 250ms until next retry, send late reply - runtime.Send(scanFetcher, scan, new NKikimr::NKqp::TEvKqpCompute::TEvScanInitActor(0, scan, 2, TABLET_ID, true)); - - // 4. Check for Fetcher failure - { - auto event = runtime.GrabEdgeEvent<NKikimr::NKqp::NScanPrivate::TEvScanExchange::TEvTerminateFromFetcher>(TSet<NActors::TActorId>{compute}, TDuration::Seconds(1)); - UNIT_ASSERT_C(!event, "Unexpected TEvTerminateFromFetcher"); - } - - // 5. Yet another retry - { - auto event = runtime.GrabEdgeEvent<NKikimr::TEvPipeCache::TEvForward>(TSet<NActors::TActorId>{pipeCache}); - NKikimr::TEvDataShard::TEvKqpScan* evScan = dynamic_cast<NKikimr::TEvDataShard::TEvKqpScan*>(event->Get()->Ev.get()); - UNIT_ASSERT_EQUAL(evScan->Record.GetGeneration(), 3); - } - runtime.Send(scanFetcher, scan, new NKikimr::NKqp::TEvKqpCompute::TEvScanInitActor(0, scan, 3, TABLET_ID, true)); - { - auto event = runtime.GrabEdgeEvent<NKikimr::NKqp::TEvKqpCompute::TEvScanDataAck>(TSet<NActors::TActorId>{scan}); - Y_UNUSED(event); - } - } -} diff --git a/ydb/core/kqp/runtime/ut/ya.make b/ydb/core/kqp/runtime/ut/ya.make index 46f49c4c208..0166c74d7f9 100644 --- a/ydb/core/kqp/runtime/ut/ya.make +++ b/ydb/core/kqp/runtime/ut/ya.make @@ -6,7 +6,7 @@ SIZE(MEDIUM) SRCS( kqp_scan_data_ut.cpp - kqp_scan_fetcher_ut.cpp + scheduler/old/kqp_compute_scheduler_ut.cpp ) @@ -17,9 +17,6 @@ PEERDIR( ydb/core/testlib/basics/pg yql/essentials/minikql/comp_nodes/llvm16 yql/essentials/public/udf/service/exception_policy - yt/yql/providers/yt/codec/codegen - yt/yql/providers/yt/comp_nodes/llvm16 - yt/yql/providers/yt/comp_nodes/dq/llvm16 ) END() |