aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorssmike <ssmike@ydb.tech>2023-11-14 02:49:56 +0300
committerssmike <ssmike@ydb.tech>2023-11-14 03:08:16 +0300
commitd09caf24ecc18787e322a4ceaad53659219d50ed (patch)
tree87b3118b3cb80600437bc60fc618843981048dca
parent669ccf043f259665d921b6e64b6f81d0c8946de0 (diff)
downloadydb-d09caf24ecc18787e322a4ceaad53659219d50ed.tar.gz
Fix sequential online reads
KIKIMR-20043
-rw-r--r--ydb/core/kqp/executer_actor/kqp_executer_impl.h5
-rw-r--r--ydb/core/kqp/runtime/kqp_read_actor.cpp13
-rw-r--r--ydb/core/kqp/ut/opt/kqp_ne_ut.cpp10
-rw-r--r--ydb/core/protos/tx_datashard.proto1
4 files changed, 23 insertions, 6 deletions
diff --git a/ydb/core/kqp/executer_actor/kqp_executer_impl.h b/ydb/core/kqp/executer_actor/kqp_executer_impl.h
index 635891c0e8..786525a941 100644
--- a/ydb/core/kqp/executer_actor/kqp_executer_impl.h
+++ b/ydb/core/kqp/executer_actor/kqp_executer_impl.h
@@ -894,6 +894,11 @@ protected:
settings->MutableSnapshot()->SetTxId(snapshot.TxId);
}
+
+ if (Request.IsolationLevel == NKikimrKqp::ISOLATION_LEVEL_READ_UNCOMMITTED) {
+ settings->SetAllowInconsistentReads(true);
+ }
+
shardInfo.KeyReadRanges->SerializeTo(settings);
settings->SetReverse(source.GetReverse());
settings->SetSorted(source.GetSorted());
diff --git a/ydb/core/kqp/runtime/kqp_read_actor.cpp b/ydb/core/kqp/runtime/kqp_read_actor.cpp
index 3080881079..99aa786772 100644
--- a/ydb/core/kqp/runtime/kqp_read_actor.cpp
+++ b/ydb/core/kqp/runtime/kqp_read_actor.cpp
@@ -159,9 +159,10 @@ public:
} else {
if (reverse) {
if (LastKey.empty()) {
+ size_t last = FirstUnprocessedRequest.GetOrElse(Ranges.size() - 1);
return TTableRange(
Ranges.front().From.GetCells(), Ranges.front().FromInclusive,
- Ranges[FirstUnprocessedRequest.GetOrElse(Ranges.size() - 1)].To.GetCells(), Ranges.back().ToInclusive);
+ Ranges[last].To.GetCells(), Ranges[last].ToInclusive);
} else {
return TTableRange(
Ranges.front().From.GetCells(), Ranges.front().FromInclusive,
@@ -169,8 +170,9 @@ public:
}
} else {
if (LastKey.empty()) {
+ size_t first = FirstUnprocessedRequest.GetOrElse(0);
return TTableRange(
- Ranges[FirstUnprocessedRequest.GetOrElse(0)].From.GetCells(), Ranges.front().FromInclusive,
+ Ranges[first].From.GetCells(), Ranges[first].FromInclusive,
Ranges.back().To.GetCells(), Ranges.back().ToInclusive);
} else {
return TTableRange(
@@ -410,6 +412,8 @@ public:
Settings->GetTable().GetTableId().GetSchemaVersion()
);
+ Snapshot = IKqpGateway::TKqpSnapshot(Settings->GetSnapshot().GetStep(), Settings->GetSnapshot().GetTxId());
+
if (Settings->GetUseFollowers() && !Snapshot.IsValid()) {
// reading from followers is allowed only of snapshot is not specified and
// specific flag is set. otherwise we always read from main replicas.
@@ -431,7 +435,6 @@ public:
) : nullptr));
}
Counters->ReadActorsCount->Inc();
- Snapshot = IKqpGateway::TKqpSnapshot(Settings->GetSnapshot().GetStep(), Settings->GetSnapshot().GetTxId());
if (Settings->HasMaxInFlightShards()) {
MaxInFlight = Settings->GetMaxInFlightShards();
@@ -634,8 +637,8 @@ public:
PendingShards.PushBack(state.Release());
return;
}
- } else if (!Snapshot.IsValid()) {
- return RuntimeError("inconsistent reads after shards split", NDqProto::StatusIds::UNAVAILABLE);
+ } else if (!Snapshot.IsValid() && !Settings->GetAllowInconsistentReads()) {
+ return RuntimeError("Inconsistent reads after shards split", NDqProto::StatusIds::UNAVAILABLE);
}
if (keyDesc->GetPartitions().empty()) {
diff --git a/ydb/core/kqp/ut/opt/kqp_ne_ut.cpp b/ydb/core/kqp/ut/opt/kqp_ne_ut.cpp
index 421bd3aa41..4eff74a2de 100644
--- a/ydb/core/kqp/ut/opt/kqp_ne_ut.cpp
+++ b/ydb/core/kqp/ut/opt/kqp_ne_ut.cpp
@@ -3739,7 +3739,6 @@ Y_UNIT_TEST_SUITE(KqpNewEngine) {
appConfig.MutableTableServiceConfig()->SetEnableKqpDataQuerySourceRead(true);
appConfig.MutableTableServiceConfig()->SetEnablePredicateExtractForDataQueries(true);
appConfig.MutableTableServiceConfig()->SetEnableSequentialReads(true);
- settings.SetDomainRoot(KikimrDefaultUtDomainRoot);
settings.SetAppConfig(appConfig);
TKikimrRunner kikimr(settings);
@@ -3760,6 +3759,15 @@ Y_UNIT_TEST_SUITE(KqpNewEngine) {
UNIT_ASSERT_C(result.IsSuccess(), result.GetIssues().ToString());
CompareYson(R"([[[101u];[1]]])", FormatResultSetYson(result.GetResultSet(0)));
}
+
+ {
+ auto result = session.ExecuteDataQuery(R"(
+ SELECT Key, Data FROM `/Root/EightShard` ORDER BY Key LIMIT 1;
+ )", TTxControl::BeginTx(TTxSettings::OnlineRO(TTxOnlineSettings().AllowInconsistentReads(true))).CommitTx()).GetValueSync();
+ UNIT_ASSERT_C(result.IsSuccess(), result.GetIssues().ToString());
+ CompareYson(R"([[[101u];[1]]])", FormatResultSetYson(result.GetResultSet(0)));
+ }
+
}
Y_UNIT_TEST(DqSourceLocksEffects) {
diff --git a/ydb/core/protos/tx_datashard.proto b/ydb/core/protos/tx_datashard.proto
index b672c1086e..8d1ad559b2 100644
--- a/ydb/core/protos/tx_datashard.proto
+++ b/ydb/core/protos/tx_datashard.proto
@@ -280,6 +280,7 @@ message TKqpReadRangesSourceSettings {
optional uint64 MaxInFlightShards = 16;
optional bool UseFollowers = 17 [default = false];
+ optional bool AllowInconsistentReads = 18 [default = false];
}
message TKqpTaskInfo {