aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorIuliia Sidorina <yulia@ydb.tech>2024-03-06 12:05:28 +0100
committerGitHub <noreply@github.com>2024-03-06 12:05:28 +0100
commit6a40243a1fd7b6b39723f4b66f9e6fe7bd522f3f (patch)
treec32a999120f5748d7ef087e11407cdb073154a1d
parentaa54674913bc678e0eb785d0728bd28e5f0eb8fd (diff)
downloadydb-6a40243a1fd7b6b39723f4b66f9e6fe7bd522f3f.tar.gz
Revert "fix(kqp): allow inconsistent reads for stream lookup" (#2491)
-rw-r--r--ydb/core/kqp/executer_actor/kqp_data_executer.cpp1
-rw-r--r--ydb/core/kqp/executer_actor/kqp_tasks_graph.cpp11
-rw-r--r--ydb/core/kqp/executer_actor/kqp_tasks_graph.h1
-rw-r--r--ydb/core/kqp/runtime/kqp_stream_lookup_actor.cpp15
-rw-r--r--ydb/core/kqp/ut/opt/kqp_ne_ut.cpp42
-rw-r--r--ydb/core/kqp/ut/opt/kqp_sqlin_ut.cpp12
-rw-r--r--ydb/core/protos/kqp.proto1
-rw-r--r--ydb/core/protos/table_service_config.proto2
8 files changed, 29 insertions, 56 deletions
diff --git a/ydb/core/kqp/executer_actor/kqp_data_executer.cpp b/ydb/core/kqp/executer_actor/kqp_data_executer.cpp
index fca2d846cb..c448248e65 100644
--- a/ydb/core/kqp/executer_actor/kqp_data_executer.cpp
+++ b/ydb/core/kqp/executer_actor/kqp_data_executer.cpp
@@ -1822,7 +1822,6 @@ private:
case NKikimrKqp::ISOLATION_LEVEL_READ_UNCOMMITTED:
YQL_ENSURE(ReadOnlyTx);
YQL_ENSURE(!VolatileTx);
- TasksGraph.GetMeta().AllowInconsistentReads = true;
ImmediateTx = true;
break;
diff --git a/ydb/core/kqp/executer_actor/kqp_tasks_graph.cpp b/ydb/core/kqp/executer_actor/kqp_tasks_graph.cpp
index 3a137ae767..773b366857 100644
--- a/ydb/core/kqp/executer_actor/kqp_tasks_graph.cpp
+++ b/ydb/core/kqp/executer_actor/kqp_tasks_graph.cpp
@@ -1104,14 +1104,9 @@ void FillInputDesc(const TKqpTasksGraph& tasksGraph, NYql::NDqProto::TTaskInput&
transformProto->SetOutputType(input.Transform->OutputType);
if (input.Meta.StreamLookupSettings) {
YQL_ENSURE(input.Meta.StreamLookupSettings);
- if (snapshot.IsValid()) {
- input.Meta.StreamLookupSettings->MutableSnapshot()->SetStep(snapshot.Step);
- input.Meta.StreamLookupSettings->MutableSnapshot()->SetTxId(snapshot.TxId);
- } else {
- YQL_ENSURE(tasksGraph.GetMeta().AllowInconsistentReads, "Expected valid snapshot or enabled inconsistent read mode");
- input.Meta.StreamLookupSettings->SetAllowInconsistentReads(true);
- }
-
+ YQL_ENSURE(snapshot.IsValid(), "stream lookup cannot be performed without the snapshot.");
+ input.Meta.StreamLookupSettings->MutableSnapshot()->SetStep(snapshot.Step);
+ input.Meta.StreamLookupSettings->MutableSnapshot()->SetTxId(snapshot.TxId);
if (lockTxId) {
input.Meta.StreamLookupSettings->SetLockTxId(*lockTxId);
}
diff --git a/ydb/core/kqp/executer_actor/kqp_tasks_graph.h b/ydb/core/kqp/executer_actor/kqp_tasks_graph.h
index adcde4d1c0..0e80e71e1d 100644
--- a/ydb/core/kqp/executer_actor/kqp_tasks_graph.h
+++ b/ydb/core/kqp/executer_actor/kqp_tasks_graph.h
@@ -94,7 +94,6 @@ struct TGraphMeta {
std::unordered_map<ui64, TActorId> ResultChannelProxies;
TActorId ExecuterId;
bool UseFollowers = false;
- bool AllowInconsistentReads = false;
TIntrusivePtr<TProtoArenaHolder> Arena;
TString Database;
NKikimrConfig::TTableServiceConfig::EChannelTransportVersion ChannelTransportVersion;
diff --git a/ydb/core/kqp/runtime/kqp_stream_lookup_actor.cpp b/ydb/core/kqp/runtime/kqp_stream_lookup_actor.cpp
index 60fdc99c30..b51d0d3b7f 100644
--- a/ydb/core/kqp/runtime/kqp_stream_lookup_actor.cpp
+++ b/ydb/core/kqp/runtime/kqp_stream_lookup_actor.cpp
@@ -35,7 +35,6 @@ public:
, TypeEnv(args.TypeEnv)
, Alloc(args.Alloc)
, Snapshot(settings.GetSnapshot().GetStep(), settings.GetSnapshot().GetTxId())
- , AllowInconsistentReads(settings.GetAllowInconsistentReads())
, LockTxId(settings.HasLockTxId() ? settings.GetLockTxId() : TMaybe<ui64>())
, SchemeCacheRequestTimeout(SCHEME_CACHE_REQUEST_TIMEOUT)
, StreamLookupWorker(CreateStreamLookupWorker(std::move(settings), args.TypeEnv, args.HolderFactory, args.InputDesc))
@@ -274,10 +273,6 @@ private:
Locks.push_back(lock);
}
- if (!Snapshot.IsValid()) {
- Snapshot = IKqpGateway::TKqpSnapshot(record.GetSnapshot().GetStep(), record.GetSnapshot().GetTxId());
- }
-
Counters->DataShardIteratorMessages->Inc();
if (record.GetStatus().GetCode() != Ydb::StatusIds::SUCCESS) {
Counters->DataShardIteratorFails->Inc();
@@ -393,12 +388,9 @@ private:
TReadState read(record.GetReadId(), shardId);
- if (Snapshot.IsValid()) {
- record.MutableSnapshot()->SetStep(Snapshot.Step);
- record.MutableSnapshot()->SetTxId(Snapshot.TxId);
- } else {
- YQL_ENSURE(AllowInconsistentReads, "Expected valid snapshot or enabled inconsistent read mode");
- }
+ YQL_ENSURE(Snapshot.IsValid(), "Invalid snapshot value");
+ record.MutableSnapshot()->SetStep(Snapshot.Step);
+ record.MutableSnapshot()->SetTxId(Snapshot.TxId);
if (LockTxId && BrokenLocks.empty()) {
record.SetLockTxId(*LockTxId);
@@ -504,7 +496,6 @@ private:
const NMiniKQL::TTypeEnvironment& TypeEnv;
std::shared_ptr<NKikimr::NMiniKQL::TScopedAlloc> Alloc;
IKqpGateway::TKqpSnapshot Snapshot;
- const bool AllowInconsistentReads;
const TMaybe<ui64> LockTxId;
std::unordered_map<ui64, TReadState> Reads;
std::unordered_map<ui64, TShardState> ReadsPerShard;
diff --git a/ydb/core/kqp/ut/opt/kqp_ne_ut.cpp b/ydb/core/kqp/ut/opt/kqp_ne_ut.cpp
index 08a14be8dd..debf78c593 100644
--- a/ydb/core/kqp/ut/opt/kqp_ne_ut.cpp
+++ b/ydb/core/kqp/ut/opt/kqp_ne_ut.cpp
@@ -2107,38 +2107,20 @@ Y_UNIT_TEST_SUITE(KqpNewEngine) {
kikimr.GetTestServer().GetRuntime()->SetLogPriority(NKikimrServices::KQP_EXECUTER, NActors::NLog::PRI_INFO);
- {
- auto result = session.ExecuteDataQuery(R"(
- SELECT Value1, Value2, Key FROM `/Root/TwoShard` WHERE Value2 != 0 ORDER BY Key DESC;
- )", TTxControl::BeginTx(TTxSettings::OnlineRO(TTxOnlineSettings().AllowInconsistentReads(true))).CommitTx())
- .ExtractValueSync();
- AssertSuccessResult(result);
-
- CompareYson(R"(
- [
- [["BigThree"];[1];[4000000003u]];
- [["BigOne"];[-1];[4000000001u]];
- [["Three"];[1];[3u]];
- [["One"];[-1];[1u]]
- ]
- )", FormatResultSetYson(result.GetResultSet(0)));
- }
-
- { // stream lookup query
- auto result = session.ExecuteDataQuery(R"(
- $list = SELECT Key FROM `/Root/TwoShard`;
- SELECT Value, Key FROM `/Root/KeyValue` WHERE Key IN $list ORDER BY Key;
- )", TTxControl::BeginTx(TTxSettings::OnlineRO(TTxOnlineSettings().AllowInconsistentReads(true))).CommitTx())
+ auto result = session.ExecuteDataQuery(R"(
+ SELECT Value1, Value2, Key FROM `/Root/TwoShard` WHERE Value2 != 0 ORDER BY Key DESC;
+ )", TTxControl::BeginTx(TTxSettings::OnlineRO(TTxOnlineSettings().AllowInconsistentReads(true))).CommitTx())
.ExtractValueSync();
- AssertSuccessResult(result);
+ AssertSuccessResult(result);
- CompareYson(R"(
- [
- [["One"];[1u]];
- [["Two"];[2u]]
- ]
- )", FormatResultSetYson(result.GetResultSet(0)));
- }
+ CompareYson(R"(
+ [
+ [["BigThree"];[1];[4000000003u]];
+ [["BigOne"];[-1];[4000000001u]];
+ [["Three"];[1];[3u]];
+ [["One"];[-1];[1u]]
+ ]
+ )", FormatResultSetYson(result.GetResultSet(0)));
}
Y_UNIT_TEST(StaleRO) {
diff --git a/ydb/core/kqp/ut/opt/kqp_sqlin_ut.cpp b/ydb/core/kqp/ut/opt/kqp_sqlin_ut.cpp
index 15297781bb..dce33af039 100644
--- a/ydb/core/kqp/ut/opt/kqp_sqlin_ut.cpp
+++ b/ydb/core/kqp/ut/opt/kqp_sqlin_ut.cpp
@@ -1016,7 +1016,11 @@ Y_UNIT_TEST_SUITE(KqpSqlIn) {
CompareYson(R"([[[1u];["One"]]])", FormatResultSetYson(result.GetResultSet(0)));
const Ydb::TableStats::QueryStats stats = NYdb::TProtoAccessor::GetProto(*result.GetStats());
- UNIT_ASSERT_EQUAL_C(2, stats.query_phases_size(), stats.DebugString())
+ if (serverSettings.AppConfig.GetTableServiceConfig().GetEnableKqpDataQueryStreamLookup()) {
+ UNIT_ASSERT_EQUAL_C(1, stats.query_phases_size(), stats.DebugString());
+ } else {
+ UNIT_ASSERT_EQUAL_C(2, stats.query_phases_size(), stats.DebugString());
+ }
}
// complex (tuple) key
@@ -1054,7 +1058,11 @@ Y_UNIT_TEST_SUITE(KqpSqlIn) {
CompareYson(R"([[[3500u];["None"];[1u];["Anna"]]])", FormatResultSetYson(result.GetResultSet(0)));
const Ydb::TableStats::QueryStats stats = NYdb::TProtoAccessor::GetProto(*result.GetStats());
- UNIT_ASSERT_EQUAL_C(2, stats.query_phases_size(), stats.DebugString());
+ if (serverSettings.AppConfig.GetTableServiceConfig().GetEnableKqpDataQueryStreamLookup()) {
+ UNIT_ASSERT_EQUAL_C(1, stats.query_phases_size(), stats.DebugString());
+ } else {
+ UNIT_ASSERT_EQUAL_C(2, stats.query_phases_size(), stats.DebugString());
+ }
}
}
}
diff --git a/ydb/core/protos/kqp.proto b/ydb/core/protos/kqp.proto
index e87afd4a75..a4780aab0e 100644
--- a/ydb/core/protos/kqp.proto
+++ b/ydb/core/protos/kqp.proto
@@ -669,7 +669,6 @@ message TKqpStreamLookupSettings {
optional bool ImmediateTx = 6;
repeated string LookupKeyColumns = 7;
optional NKqpProto.EStreamLookupStrategy LookupStrategy = 8;
- optional bool AllowInconsistentReads = 9 [default = false];
}
message TKqpSequencerSettings {
diff --git a/ydb/core/protos/table_service_config.proto b/ydb/core/protos/table_service_config.proto
index cedb699a45..39e97bf607 100644
--- a/ydb/core/protos/table_service_config.proto
+++ b/ydb/core/protos/table_service_config.proto
@@ -224,7 +224,7 @@ message TTableServiceConfig {
optional uint64 SessionIdleDurationSeconds = 28 [default = 600];
optional TAggregationConfig AggregationConfig = 29;
optional bool EnableKqpScanQueryStreamLookup = 30 [default = true];
- optional bool EnableKqpDataQueryStreamLookup = 31 [default = true];
+ optional bool EnableKqpDataQueryStreamLookup = 31 [default = false];
optional TExecuterRetriesConfig ExecuterRetriesConfig = 32;
reserved 33; // optional bool EnableKqpDataQueryStreamPointLookup = 33 [default = false];
optional bool EnablePublishKqpProxyByRM = 34 [default = true];