diff options
author | ssmike <ssmike@ydb.tech> | 2023-11-14 02:49:56 +0300 |
---|---|---|
committer | ssmike <ssmike@ydb.tech> | 2023-11-14 03:08:16 +0300 |
commit | d09caf24ecc18787e322a4ceaad53659219d50ed (patch) | |
tree | 87b3118b3cb80600437bc60fc618843981048dca | |
parent | 669ccf043f259665d921b6e64b6f81d0c8946de0 (diff) | |
download | ydb-d09caf24ecc18787e322a4ceaad53659219d50ed.tar.gz |
Fix sequential online reads
KIKIMR-20043
-rw-r--r-- | ydb/core/kqp/executer_actor/kqp_executer_impl.h | 5 | ||||
-rw-r--r-- | ydb/core/kqp/runtime/kqp_read_actor.cpp | 13 | ||||
-rw-r--r-- | ydb/core/kqp/ut/opt/kqp_ne_ut.cpp | 10 | ||||
-rw-r--r-- | ydb/core/protos/tx_datashard.proto | 1 |
4 files changed, 23 insertions, 6 deletions
diff --git a/ydb/core/kqp/executer_actor/kqp_executer_impl.h b/ydb/core/kqp/executer_actor/kqp_executer_impl.h index 635891c0e8..786525a941 100644 --- a/ydb/core/kqp/executer_actor/kqp_executer_impl.h +++ b/ydb/core/kqp/executer_actor/kqp_executer_impl.h @@ -894,6 +894,11 @@ protected: settings->MutableSnapshot()->SetTxId(snapshot.TxId); } + + if (Request.IsolationLevel == NKikimrKqp::ISOLATION_LEVEL_READ_UNCOMMITTED) { + settings->SetAllowInconsistentReads(true); + } + shardInfo.KeyReadRanges->SerializeTo(settings); settings->SetReverse(source.GetReverse()); settings->SetSorted(source.GetSorted()); diff --git a/ydb/core/kqp/runtime/kqp_read_actor.cpp b/ydb/core/kqp/runtime/kqp_read_actor.cpp index 3080881079..99aa786772 100644 --- a/ydb/core/kqp/runtime/kqp_read_actor.cpp +++ b/ydb/core/kqp/runtime/kqp_read_actor.cpp @@ -159,9 +159,10 @@ public: } else { if (reverse) { if (LastKey.empty()) { + size_t last = FirstUnprocessedRequest.GetOrElse(Ranges.size() - 1); return TTableRange( Ranges.front().From.GetCells(), Ranges.front().FromInclusive, - Ranges[FirstUnprocessedRequest.GetOrElse(Ranges.size() - 1)].To.GetCells(), Ranges.back().ToInclusive); + Ranges[last].To.GetCells(), Ranges[last].ToInclusive); } else { return TTableRange( Ranges.front().From.GetCells(), Ranges.front().FromInclusive, @@ -169,8 +170,9 @@ public: } } else { if (LastKey.empty()) { + size_t first = FirstUnprocessedRequest.GetOrElse(0); return TTableRange( - Ranges[FirstUnprocessedRequest.GetOrElse(0)].From.GetCells(), Ranges.front().FromInclusive, + Ranges[first].From.GetCells(), Ranges[first].FromInclusive, Ranges.back().To.GetCells(), Ranges.back().ToInclusive); } else { return TTableRange( @@ -410,6 +412,8 @@ public: Settings->GetTable().GetTableId().GetSchemaVersion() ); + Snapshot = IKqpGateway::TKqpSnapshot(Settings->GetSnapshot().GetStep(), Settings->GetSnapshot().GetTxId()); + if (Settings->GetUseFollowers() && !Snapshot.IsValid()) { // reading from followers is allowed only of snapshot is not specified and // specific flag is set. otherwise we always read from main replicas. @@ -431,7 +435,6 @@ public: ) : nullptr)); } Counters->ReadActorsCount->Inc(); - Snapshot = IKqpGateway::TKqpSnapshot(Settings->GetSnapshot().GetStep(), Settings->GetSnapshot().GetTxId()); if (Settings->HasMaxInFlightShards()) { MaxInFlight = Settings->GetMaxInFlightShards(); @@ -634,8 +637,8 @@ public: PendingShards.PushBack(state.Release()); return; } - } else if (!Snapshot.IsValid()) { - return RuntimeError("inconsistent reads after shards split", NDqProto::StatusIds::UNAVAILABLE); + } else if (!Snapshot.IsValid() && !Settings->GetAllowInconsistentReads()) { + return RuntimeError("Inconsistent reads after shards split", NDqProto::StatusIds::UNAVAILABLE); } if (keyDesc->GetPartitions().empty()) { diff --git a/ydb/core/kqp/ut/opt/kqp_ne_ut.cpp b/ydb/core/kqp/ut/opt/kqp_ne_ut.cpp index 421bd3aa41..4eff74a2de 100644 --- a/ydb/core/kqp/ut/opt/kqp_ne_ut.cpp +++ b/ydb/core/kqp/ut/opt/kqp_ne_ut.cpp @@ -3739,7 +3739,6 @@ Y_UNIT_TEST_SUITE(KqpNewEngine) { appConfig.MutableTableServiceConfig()->SetEnableKqpDataQuerySourceRead(true); appConfig.MutableTableServiceConfig()->SetEnablePredicateExtractForDataQueries(true); appConfig.MutableTableServiceConfig()->SetEnableSequentialReads(true); - settings.SetDomainRoot(KikimrDefaultUtDomainRoot); settings.SetAppConfig(appConfig); TKikimrRunner kikimr(settings); @@ -3760,6 +3759,15 @@ Y_UNIT_TEST_SUITE(KqpNewEngine) { UNIT_ASSERT_C(result.IsSuccess(), result.GetIssues().ToString()); CompareYson(R"([[[101u];[1]]])", FormatResultSetYson(result.GetResultSet(0))); } + + { + auto result = session.ExecuteDataQuery(R"( + SELECT Key, Data FROM `/Root/EightShard` ORDER BY Key LIMIT 1; + )", TTxControl::BeginTx(TTxSettings::OnlineRO(TTxOnlineSettings().AllowInconsistentReads(true))).CommitTx()).GetValueSync(); + UNIT_ASSERT_C(result.IsSuccess(), result.GetIssues().ToString()); + CompareYson(R"([[[101u];[1]]])", FormatResultSetYson(result.GetResultSet(0))); + } + } Y_UNIT_TEST(DqSourceLocksEffects) { diff --git a/ydb/core/protos/tx_datashard.proto b/ydb/core/protos/tx_datashard.proto index b672c1086e..8d1ad559b2 100644 --- a/ydb/core/protos/tx_datashard.proto +++ b/ydb/core/protos/tx_datashard.proto @@ -280,6 +280,7 @@ message TKqpReadRangesSourceSettings { optional uint64 MaxInFlightShards = 16; optional bool UseFollowers = 17 [default = false]; + optional bool AllowInconsistentReads = 18 [default = false]; } message TKqpTaskInfo { |