summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorMaxim Yurchuk <[email protected]>2025-06-27 14:33:23 +0000
committerGitHub <[email protected]>2025-06-27 17:33:23 +0300
commit8c006b7875e5653eb78af3e218b3f3d37b6fd277 (patch)
treebc413ffbda11fa93b6df469cdbcb18f1f7179282
parent55572b8efc9c230651c34b260d0e7ec8c43618b3 (diff)
Revert "Correct Scan Generation in case of retries" (#20309)
-rw-r--r--ydb/core/kqp/compute_actor/kqp_scan_compute_manager.cpp1
-rw-r--r--ydb/core/kqp/compute_actor/kqp_scan_compute_manager.h3
-rw-r--r--ydb/core/kqp/runtime/kqp_scan_fetcher_ut.cpp76
-rw-r--r--ydb/core/kqp/runtime/ut/ya.make5
4 files changed, 2 insertions, 83 deletions
diff --git a/ydb/core/kqp/compute_actor/kqp_scan_compute_manager.cpp b/ydb/core/kqp/compute_actor/kqp_scan_compute_manager.cpp
index a727dac65cf..1b408cdf6f8 100644
--- a/ydb/core/kqp/compute_actor/kqp_scan_compute_manager.cpp
+++ b/ydb/core/kqp/compute_actor/kqp_scan_compute_manager.cpp
@@ -9,7 +9,6 @@ TShardState::TPtr TInFlightShards::Put(TShardState&& state) {
MutableStatistics(state.TabletId).MutableStatistics(0).SetStartInstant(Now());
TShardState::TPtr result = std::make_shared<TShardState>(std::move(state));
- result->Generation = 1;
AFL_ENSURE(Shards.emplace(result->TabletId, result).second)("tablet_id", result->TabletId);
return result;
}
diff --git a/ydb/core/kqp/compute_actor/kqp_scan_compute_manager.h b/ydb/core/kqp/compute_actor/kqp_scan_compute_manager.h
index 65d8462370d..c90904ab3f5 100644
--- a/ydb/core/kqp/compute_actor/kqp_scan_compute_manager.h
+++ b/ydb/core/kqp/compute_actor/kqp_scan_compute_manager.h
@@ -60,7 +60,7 @@ public:
TShardScannerInfo(const ui64 scanId, TShardState& state, const IExternalObjectsProvider& externalObjectsProvider)
: ScanId(scanId)
, TabletId(state.TabletId)
- , Generation(state.Generation)
+ , Generation(++state.Generation)
{
const bool subscribed = std::exchange(state.SubscribedOnTablet, true);
@@ -392,7 +392,6 @@ public:
state->ActorId = {};
state->State = NComputeActor::EShardState::Initial;
state->SubscribedOnTablet = false;
- state->Generation++;
auto it = ShardScanners.find(tabletId);
AFL_ENSURE(it != ShardScanners.end())("tablet_id", tabletId);
diff --git a/ydb/core/kqp/runtime/kqp_scan_fetcher_ut.cpp b/ydb/core/kqp/runtime/kqp_scan_fetcher_ut.cpp
deleted file mode 100644
index 0a19d4c9424..00000000000
--- a/ydb/core/kqp/runtime/kqp_scan_fetcher_ut.cpp
+++ /dev/null
@@ -1,76 +0,0 @@
-#include <ydb/core/kqp/compute_actor/kqp_scan_fetcher_actor.h>
-
-#include <ydb/core/testlib/basics/appdata.h>
-#include <ydb/core/testlib/actors/test_runtime.h>
-
-#include <library/cpp/testing/unittest/registar.h>
-
-Y_UNIT_TEST_SUITE(TKqpScanFetcher) {
-
- Y_UNIT_TEST(ScanDelayedRetry) {
-
- constexpr ui64 TABLET_ID = 1001001;
-
- NActors::TTestActorRuntime runtime;
- runtime.Initialize(NKikimr::TAppPrepare().Unwrap());
-
- auto pipeCache = runtime.AllocateEdgeActor();
- runtime.RegisterService(NKikimr::MakePipePerNodeCacheID(false), pipeCache);
- auto scan = runtime.AllocateEdgeActor();
- auto compute = runtime.AllocateEdgeActor();
-
- NKikimrKqp::TKqpSnapshot snapshot;
- NYql::NDq::TComputeRuntimeSettings settings;
- NKikimrTxDataShard::TKqpTransaction_TScanTaskMeta meta;
- auto& read = *meta.AddReads();
- read.SetShardId(TABLET_ID);
- read.AddKeyRanges();
- NKikimrConfig::TTableServiceConfig::TShardsScanningPolicy protoPolicy;
- NKikimr::NKqp::TShardsScanningPolicy shardsScanningPolicy(protoPolicy);
- NWilson::TTraceId traceId(0);
- NKikimr::NKqp::TCPULimits cpuLimits;
- NMonitoring::TDynamicCounterPtr counters = MakeIntrusive<NMonitoring::TDynamicCounters>();
- auto scanFetcher = runtime.Register(CreateKqpScanFetcher(snapshot, {compute}, meta, settings,
- 0, TMaybe<ui64>(), 0, TMaybe<NKikimrDataEvents::ELockMode>(), shardsScanningPolicy,
- MakeIntrusive<NKikimr::NKqp::TKqpCounters>(counters), 0, cpuLimits)
- );
- runtime.EnableScheduleForActor(scanFetcher, true);
-
- // 1. Simulate fail
- {
- auto event = runtime.GrabEdgeEvent<NKikimr::TEvPipeCache::TEvForward>(TSet<NActors::TActorId>{pipeCache});
- NKikimr::TEvDataShard::TEvKqpScan* evScan = dynamic_cast<NKikimr::TEvDataShard::TEvKqpScan*>(event->Get()->Ev.get());
- UNIT_ASSERT_EQUAL(evScan->Record.GetGeneration(), 1);
- }
- runtime.Send(scanFetcher, pipeCache, new NKikimr::TEvPipeCache::TEvDeliveryProblem(TABLET_ID, false));
-
- // 2. First fail is retried instantly, so fail again
- {
- auto event = runtime.GrabEdgeEvent<NKikimr::TEvPipeCache::TEvForward>(TSet<NActors::TActorId>{pipeCache});
- NKikimr::TEvDataShard::TEvKqpScan* evScan = dynamic_cast<NKikimr::TEvDataShard::TEvKqpScan*>(event->Get()->Ev.get());
- UNIT_ASSERT_EQUAL(evScan->Record.GetGeneration(), 2);
- }
- runtime.Send(scanFetcher, pipeCache, new NKikimr::TEvPipeCache::TEvDeliveryProblem(TABLET_ID, false));
-
- // 3. Now we have 250ms until next retry, send late reply
- runtime.Send(scanFetcher, scan, new NKikimr::NKqp::TEvKqpCompute::TEvScanInitActor(0, scan, 2, TABLET_ID, true));
-
- // 4. Check for Fetcher failure
- {
- auto event = runtime.GrabEdgeEvent<NKikimr::NKqp::NScanPrivate::TEvScanExchange::TEvTerminateFromFetcher>(TSet<NActors::TActorId>{compute}, TDuration::Seconds(1));
- UNIT_ASSERT_C(!event, "Unexpected TEvTerminateFromFetcher");
- }
-
- // 5. Yet another retry
- {
- auto event = runtime.GrabEdgeEvent<NKikimr::TEvPipeCache::TEvForward>(TSet<NActors::TActorId>{pipeCache});
- NKikimr::TEvDataShard::TEvKqpScan* evScan = dynamic_cast<NKikimr::TEvDataShard::TEvKqpScan*>(event->Get()->Ev.get());
- UNIT_ASSERT_EQUAL(evScan->Record.GetGeneration(), 3);
- }
- runtime.Send(scanFetcher, scan, new NKikimr::NKqp::TEvKqpCompute::TEvScanInitActor(0, scan, 3, TABLET_ID, true));
- {
- auto event = runtime.GrabEdgeEvent<NKikimr::NKqp::TEvKqpCompute::TEvScanDataAck>(TSet<NActors::TActorId>{scan});
- Y_UNUSED(event);
- }
- }
-}
diff --git a/ydb/core/kqp/runtime/ut/ya.make b/ydb/core/kqp/runtime/ut/ya.make
index 46f49c4c208..0166c74d7f9 100644
--- a/ydb/core/kqp/runtime/ut/ya.make
+++ b/ydb/core/kqp/runtime/ut/ya.make
@@ -6,7 +6,7 @@ SIZE(MEDIUM)
SRCS(
kqp_scan_data_ut.cpp
- kqp_scan_fetcher_ut.cpp
+
scheduler/old/kqp_compute_scheduler_ut.cpp
)
@@ -17,9 +17,6 @@ PEERDIR(
ydb/core/testlib/basics/pg
yql/essentials/minikql/comp_nodes/llvm16
yql/essentials/public/udf/service/exception_policy
- yt/yql/providers/yt/codec/codegen
- yt/yql/providers/yt/comp_nodes/llvm16
- yt/yql/providers/yt/comp_nodes/dq/llvm16
)
END()