diff options
author | Iuliia Sidorina <yulia@ydb.tech> | 2024-03-06 12:05:28 +0100 |
---|---|---|
committer | GitHub <noreply@github.com> | 2024-03-06 12:05:28 +0100 |
commit | 6a40243a1fd7b6b39723f4b66f9e6fe7bd522f3f (patch) | |
tree | c32a999120f5748d7ef087e11407cdb073154a1d | |
parent | aa54674913bc678e0eb785d0728bd28e5f0eb8fd (diff) | |
download | ydb-6a40243a1fd7b6b39723f4b66f9e6fe7bd522f3f.tar.gz |
Revert "fix(kqp): allow inconsistent reads for stream lookup" (#2491)
-rw-r--r-- | ydb/core/kqp/executer_actor/kqp_data_executer.cpp | 1 | ||||
-rw-r--r-- | ydb/core/kqp/executer_actor/kqp_tasks_graph.cpp | 11 | ||||
-rw-r--r-- | ydb/core/kqp/executer_actor/kqp_tasks_graph.h | 1 | ||||
-rw-r--r-- | ydb/core/kqp/runtime/kqp_stream_lookup_actor.cpp | 15 | ||||
-rw-r--r-- | ydb/core/kqp/ut/opt/kqp_ne_ut.cpp | 42 | ||||
-rw-r--r-- | ydb/core/kqp/ut/opt/kqp_sqlin_ut.cpp | 12 | ||||
-rw-r--r-- | ydb/core/protos/kqp.proto | 1 | ||||
-rw-r--r-- | ydb/core/protos/table_service_config.proto | 2 |
8 files changed, 29 insertions, 56 deletions
diff --git a/ydb/core/kqp/executer_actor/kqp_data_executer.cpp b/ydb/core/kqp/executer_actor/kqp_data_executer.cpp index fca2d846cb..c448248e65 100644 --- a/ydb/core/kqp/executer_actor/kqp_data_executer.cpp +++ b/ydb/core/kqp/executer_actor/kqp_data_executer.cpp @@ -1822,7 +1822,6 @@ private: case NKikimrKqp::ISOLATION_LEVEL_READ_UNCOMMITTED: YQL_ENSURE(ReadOnlyTx); YQL_ENSURE(!VolatileTx); - TasksGraph.GetMeta().AllowInconsistentReads = true; ImmediateTx = true; break; diff --git a/ydb/core/kqp/executer_actor/kqp_tasks_graph.cpp b/ydb/core/kqp/executer_actor/kqp_tasks_graph.cpp index 3a137ae767..773b366857 100644 --- a/ydb/core/kqp/executer_actor/kqp_tasks_graph.cpp +++ b/ydb/core/kqp/executer_actor/kqp_tasks_graph.cpp @@ -1104,14 +1104,9 @@ void FillInputDesc(const TKqpTasksGraph& tasksGraph, NYql::NDqProto::TTaskInput& transformProto->SetOutputType(input.Transform->OutputType); if (input.Meta.StreamLookupSettings) { YQL_ENSURE(input.Meta.StreamLookupSettings); - if (snapshot.IsValid()) { - input.Meta.StreamLookupSettings->MutableSnapshot()->SetStep(snapshot.Step); - input.Meta.StreamLookupSettings->MutableSnapshot()->SetTxId(snapshot.TxId); - } else { - YQL_ENSURE(tasksGraph.GetMeta().AllowInconsistentReads, "Expected valid snapshot or enabled inconsistent read mode"); - input.Meta.StreamLookupSettings->SetAllowInconsistentReads(true); - } - + YQL_ENSURE(snapshot.IsValid(), "stream lookup cannot be performed without the snapshot."); + input.Meta.StreamLookupSettings->MutableSnapshot()->SetStep(snapshot.Step); + input.Meta.StreamLookupSettings->MutableSnapshot()->SetTxId(snapshot.TxId); if (lockTxId) { input.Meta.StreamLookupSettings->SetLockTxId(*lockTxId); } diff --git a/ydb/core/kqp/executer_actor/kqp_tasks_graph.h b/ydb/core/kqp/executer_actor/kqp_tasks_graph.h index adcde4d1c0..0e80e71e1d 100644 --- a/ydb/core/kqp/executer_actor/kqp_tasks_graph.h +++ b/ydb/core/kqp/executer_actor/kqp_tasks_graph.h @@ -94,7 +94,6 @@ struct TGraphMeta { std::unordered_map<ui64, TActorId> ResultChannelProxies; TActorId ExecuterId; bool UseFollowers = false; - bool AllowInconsistentReads = false; TIntrusivePtr<TProtoArenaHolder> Arena; TString Database; NKikimrConfig::TTableServiceConfig::EChannelTransportVersion ChannelTransportVersion; diff --git a/ydb/core/kqp/runtime/kqp_stream_lookup_actor.cpp b/ydb/core/kqp/runtime/kqp_stream_lookup_actor.cpp index 60fdc99c30..b51d0d3b7f 100644 --- a/ydb/core/kqp/runtime/kqp_stream_lookup_actor.cpp +++ b/ydb/core/kqp/runtime/kqp_stream_lookup_actor.cpp @@ -35,7 +35,6 @@ public: , TypeEnv(args.TypeEnv) , Alloc(args.Alloc) , Snapshot(settings.GetSnapshot().GetStep(), settings.GetSnapshot().GetTxId()) - , AllowInconsistentReads(settings.GetAllowInconsistentReads()) , LockTxId(settings.HasLockTxId() ? settings.GetLockTxId() : TMaybe<ui64>()) , SchemeCacheRequestTimeout(SCHEME_CACHE_REQUEST_TIMEOUT) , StreamLookupWorker(CreateStreamLookupWorker(std::move(settings), args.TypeEnv, args.HolderFactory, args.InputDesc)) @@ -274,10 +273,6 @@ private: Locks.push_back(lock); } - if (!Snapshot.IsValid()) { - Snapshot = IKqpGateway::TKqpSnapshot(record.GetSnapshot().GetStep(), record.GetSnapshot().GetTxId()); - } - Counters->DataShardIteratorMessages->Inc(); if (record.GetStatus().GetCode() != Ydb::StatusIds::SUCCESS) { Counters->DataShardIteratorFails->Inc(); @@ -393,12 +388,9 @@ private: TReadState read(record.GetReadId(), shardId); - if (Snapshot.IsValid()) { - record.MutableSnapshot()->SetStep(Snapshot.Step); - record.MutableSnapshot()->SetTxId(Snapshot.TxId); - } else { - YQL_ENSURE(AllowInconsistentReads, "Expected valid snapshot or enabled inconsistent read mode"); - } + YQL_ENSURE(Snapshot.IsValid(), "Invalid snapshot value"); + record.MutableSnapshot()->SetStep(Snapshot.Step); + record.MutableSnapshot()->SetTxId(Snapshot.TxId); if (LockTxId && BrokenLocks.empty()) { record.SetLockTxId(*LockTxId); @@ -504,7 +496,6 @@ private: const NMiniKQL::TTypeEnvironment& TypeEnv; std::shared_ptr<NKikimr::NMiniKQL::TScopedAlloc> Alloc; IKqpGateway::TKqpSnapshot Snapshot; - const bool AllowInconsistentReads; const TMaybe<ui64> LockTxId; std::unordered_map<ui64, TReadState> Reads; std::unordered_map<ui64, TShardState> ReadsPerShard; diff --git a/ydb/core/kqp/ut/opt/kqp_ne_ut.cpp b/ydb/core/kqp/ut/opt/kqp_ne_ut.cpp index 08a14be8dd..debf78c593 100644 --- a/ydb/core/kqp/ut/opt/kqp_ne_ut.cpp +++ b/ydb/core/kqp/ut/opt/kqp_ne_ut.cpp @@ -2107,38 +2107,20 @@ Y_UNIT_TEST_SUITE(KqpNewEngine) { kikimr.GetTestServer().GetRuntime()->SetLogPriority(NKikimrServices::KQP_EXECUTER, NActors::NLog::PRI_INFO); - { - auto result = session.ExecuteDataQuery(R"( - SELECT Value1, Value2, Key FROM `/Root/TwoShard` WHERE Value2 != 0 ORDER BY Key DESC; - )", TTxControl::BeginTx(TTxSettings::OnlineRO(TTxOnlineSettings().AllowInconsistentReads(true))).CommitTx()) - .ExtractValueSync(); - AssertSuccessResult(result); - - CompareYson(R"( - [ - [["BigThree"];[1];[4000000003u]]; - [["BigOne"];[-1];[4000000001u]]; - [["Three"];[1];[3u]]; - [["One"];[-1];[1u]] - ] - )", FormatResultSetYson(result.GetResultSet(0))); - } - - { // stream lookup query - auto result = session.ExecuteDataQuery(R"( - $list = SELECT Key FROM `/Root/TwoShard`; - SELECT Value, Key FROM `/Root/KeyValue` WHERE Key IN $list ORDER BY Key; - )", TTxControl::BeginTx(TTxSettings::OnlineRO(TTxOnlineSettings().AllowInconsistentReads(true))).CommitTx()) + auto result = session.ExecuteDataQuery(R"( + SELECT Value1, Value2, Key FROM `/Root/TwoShard` WHERE Value2 != 0 ORDER BY Key DESC; + )", TTxControl::BeginTx(TTxSettings::OnlineRO(TTxOnlineSettings().AllowInconsistentReads(true))).CommitTx()) .ExtractValueSync(); - AssertSuccessResult(result); + AssertSuccessResult(result); - CompareYson(R"( - [ - [["One"];[1u]]; - [["Two"];[2u]] - ] - )", FormatResultSetYson(result.GetResultSet(0))); - } + CompareYson(R"( + [ + [["BigThree"];[1];[4000000003u]]; + [["BigOne"];[-1];[4000000001u]]; + [["Three"];[1];[3u]]; + [["One"];[-1];[1u]] + ] + )", FormatResultSetYson(result.GetResultSet(0))); } Y_UNIT_TEST(StaleRO) { diff --git a/ydb/core/kqp/ut/opt/kqp_sqlin_ut.cpp b/ydb/core/kqp/ut/opt/kqp_sqlin_ut.cpp index 15297781bb..dce33af039 100644 --- a/ydb/core/kqp/ut/opt/kqp_sqlin_ut.cpp +++ b/ydb/core/kqp/ut/opt/kqp_sqlin_ut.cpp @@ -1016,7 +1016,11 @@ Y_UNIT_TEST_SUITE(KqpSqlIn) { CompareYson(R"([[[1u];["One"]]])", FormatResultSetYson(result.GetResultSet(0))); const Ydb::TableStats::QueryStats stats = NYdb::TProtoAccessor::GetProto(*result.GetStats()); - UNIT_ASSERT_EQUAL_C(2, stats.query_phases_size(), stats.DebugString()) + if (serverSettings.AppConfig.GetTableServiceConfig().GetEnableKqpDataQueryStreamLookup()) { + UNIT_ASSERT_EQUAL_C(1, stats.query_phases_size(), stats.DebugString()); + } else { + UNIT_ASSERT_EQUAL_C(2, stats.query_phases_size(), stats.DebugString()); + } } // complex (tuple) key @@ -1054,7 +1058,11 @@ Y_UNIT_TEST_SUITE(KqpSqlIn) { CompareYson(R"([[[3500u];["None"];[1u];["Anna"]]])", FormatResultSetYson(result.GetResultSet(0))); const Ydb::TableStats::QueryStats stats = NYdb::TProtoAccessor::GetProto(*result.GetStats()); - UNIT_ASSERT_EQUAL_C(2, stats.query_phases_size(), stats.DebugString()); + if (serverSettings.AppConfig.GetTableServiceConfig().GetEnableKqpDataQueryStreamLookup()) { + UNIT_ASSERT_EQUAL_C(1, stats.query_phases_size(), stats.DebugString()); + } else { + UNIT_ASSERT_EQUAL_C(2, stats.query_phases_size(), stats.DebugString()); + } } } } diff --git a/ydb/core/protos/kqp.proto b/ydb/core/protos/kqp.proto index e87afd4a75..a4780aab0e 100644 --- a/ydb/core/protos/kqp.proto +++ b/ydb/core/protos/kqp.proto @@ -669,7 +669,6 @@ message TKqpStreamLookupSettings { optional bool ImmediateTx = 6; repeated string LookupKeyColumns = 7; optional NKqpProto.EStreamLookupStrategy LookupStrategy = 8; - optional bool AllowInconsistentReads = 9 [default = false]; } message TKqpSequencerSettings { diff --git a/ydb/core/protos/table_service_config.proto b/ydb/core/protos/table_service_config.proto index cedb699a45..39e97bf607 100644 --- a/ydb/core/protos/table_service_config.proto +++ b/ydb/core/protos/table_service_config.proto @@ -224,7 +224,7 @@ message TTableServiceConfig { optional uint64 SessionIdleDurationSeconds = 28 [default = 600]; optional TAggregationConfig AggregationConfig = 29; optional bool EnableKqpScanQueryStreamLookup = 30 [default = true]; - optional bool EnableKqpDataQueryStreamLookup = 31 [default = true]; + optional bool EnableKqpDataQueryStreamLookup = 31 [default = false]; optional TExecuterRetriesConfig ExecuterRetriesConfig = 32; reserved 33; // optional bool EnableKqpDataQueryStreamPointLookup = 33 [default = false]; optional bool EnablePublishKqpProxyByRM = 34 [default = true]; |