diff options
author | ssmike <ssmike@ydb.tech> | 2023-02-21 13:01:16 +0300 |
---|---|---|
committer | ssmike <ssmike@ydb.tech> | 2023-02-21 13:01:16 +0300 |
commit | 73db9d4327114063b8d9536190f4e21dd160c589 (patch) | |
tree | c8b8b326629c5e2d44837be4e2e73ec6cb91f0dd | |
parent | 61c8c2ee593742731a92501cace781de4d0106ee (diff) | |
download | ydb-73db9d4327114063b8d9536190f4e21dd160c589.tar.gz |
fix limits logic
-rw-r--r-- | ydb/core/kqp/runtime/kqp_read_actor.cpp | 52 | ||||
-rw-r--r-- | ydb/core/kqp/ut/opt/kqp_ne_ut.cpp | 69 |
2 files changed, 107 insertions, 14 deletions
diff --git a/ydb/core/kqp/runtime/kqp_read_actor.cpp b/ydb/core/kqp/runtime/kqp_read_actor.cpp index 2700b9d625..b747c796ee 100644 --- a/ydb/core/kqp/runtime/kqp_read_actor.cpp +++ b/ydb/core/kqp/runtime/kqp_read_actor.cpp @@ -369,6 +369,7 @@ public: StartTableScan(); } Become(&TKqpReadActor::ReadyState); + Bootstrapped = true; } bool StartTableScan() { @@ -625,7 +626,7 @@ public: void StartRead(TShardState* state) { TMaybe<ui64> limit; if (Settings.GetItemsLimit()) { - limit = Settings.GetItemsLimit() - Min(Settings.GetItemsLimit(), RecievedRowCount); + limit = Settings.GetItemsLimit() - Min(Settings.GetItemsLimit(), ReceivedRowCount); if (*limit == 0) { return; @@ -724,7 +725,7 @@ public: Reads[id].RegisterMessage(*ev->Get()); - RecievedRowCount += ev->Get()->GetRowsCount(); + ReceivedRowCount += ev->Get()->GetRowsCount(); Results.push({Reads[id].Shard->TabletId, THolder<TEventHandle<TEvDataShard::TEvReadResult>>(ev.Release())}); CA_LOG_D(TStringBuilder() << "new data for read #" << id << " pushed"); Send(ComputeActorId, new TEvNewAsyncInputDataArrived(InputIndex)); @@ -843,12 +844,17 @@ public: return stats; } + bool LimitReached() const { + return Settings.GetItemsLimit() && ProcessedRowCount >= Settings.GetItemsLimit(); + } + i64 GetAsyncInputData( NKikimr::NMiniKQL::TUnboxedValueVector& resultVector, TMaybe<TInstant>&, bool& finished, i64 freeSpace) override { + CA_LOG_D(TStringBuilder() << " enter getasyncinputdata results size " << Results.size()); ui64 bytes = 0; while (!Results.empty()) { auto& result = Results.front(); @@ -886,17 +892,14 @@ public: return bytes; } } - CA_LOG_D(TStringBuilder() << "returned " << resultVector.size() << " rows"); + CA_LOG_D(TStringBuilder() << "returned " << resultVector.size() << " rows; processed " << ProcessedRowCount << " rows"); if (batch->size() == result.ProcessedRows) { auto& record = msg.Record; - if (Reads[id].IsLastMessage(msg)) { - Reads[id].Reset(); - ResetReads++; - } else if (!Reads[id].Finished) { + if (!Reads[id].Finished) { TMaybe<ui64> limit; if (Settings.GetItemsLimit()) { - limit = Settings.GetItemsLimit() - Min(Settings.GetItemsLimit(), RecievedRowCount); + limit = Settings.GetItemsLimit() - Min(Settings.GetItemsLimit(), ReceivedRowCount); } if (!limit || *limit > 0) { @@ -909,20 +912,26 @@ public: Send(MakePipePeNodeCacheID(false), new TEvPipeCache::TEvForward(request.Release(), state->TabletId, true), IEventHandle::FlagTrackDelivery); } else { + Reads[id].Finished = true; + } + } + + if (Reads[id].IsLastMessage(msg)) { + if (!record.GetFinished()) { auto cancel = MakeHolder<TEvDataShard::TEvReadCancel>(); cancel->Record.SetReadId(id); Send(MakePipePeNodeCacheID(false), new TEvPipeCache::TEvForward(cancel.Release(), state->TabletId), IEventHandle::FlagTrackDelivery); - Reads[id].Reset(); - ResetReads++; } + Reads[id].Reset(); + ResetReads++; } StartTableScan(); Results.pop(); - CA_LOG_D("dropping batch"); + CA_LOG_D("dropping batch for read #" << id); - if (RunningReads() == 0 || (Settings.GetItemsLimit() && ProcessedRowCount >= Settings.GetItemsLimit())) { + if (LimitReached()) { finished = true; break; } @@ -931,6 +940,19 @@ public: } } + if (RunningReads() == 0 && PendingShards.Empty() && Bootstrapped) { + finished = true; + } + + CA_LOG_D(TStringBuilder() << "returned async data" + << " processed rows " << ProcessedRowCount + << " received rows " << ReceivedRowCount + << " running reads " << RunningReads() + << " pending shards " << PendingShards.Size() + << " finished = " << finished + << " has limit " << (Settings.GetItemsLimit() != 0) + << " limit reached " << LimitReached()); + return bytes; } @@ -950,7 +972,7 @@ public: } //FIXME: use evread statistics after KIKIMR-16924 - tableStats->SetReadRows(tableStats->GetReadRows() + RecievedRowCount); + tableStats->SetReadRows(tableStats->GetReadRows() + ReceivedRowCount); tableStats->SetReadBytes(tableStats->GetReadBytes() + BytesStats.DataBytes); tableStats->SetAffectedPartitions(tableStats->GetAffectedPartitions() + InFlightShards.Size()); } @@ -1010,7 +1032,7 @@ private: TVector<NScheme::TTypeInfo> KeyColumnTypes; NMiniKQL::TBytesStatistics BytesStats; - ui64 RecievedRowCount = 0; + ui64 ReceivedRowCount = 0; ui64 ProcessedRowCount = 0; ui64 ResetReads = 0; ui64 ReadId = 0; @@ -1034,6 +1056,8 @@ private: TString LogPrefix; TTableId TableId; + bool Bootstrapped = false; + const TActorId ComputeActorId; const ui64 InputIndex; const NMiniKQL::TTypeEnvironment& TypeEnv; diff --git a/ydb/core/kqp/ut/opt/kqp_ne_ut.cpp b/ydb/core/kqp/ut/opt/kqp_ne_ut.cpp index bc650bb730..276dc9341c 100644 --- a/ydb/core/kqp/ut/opt/kqp_ne_ut.cpp +++ b/ydb/core/kqp/ut/opt/kqp_ne_ut.cpp @@ -3575,6 +3575,75 @@ Y_UNIT_TEST_SUITE(KqpNewEngine) { } } + Y_UNIT_TEST(DqSourceLimit) { + TKikimrSettings settings; + NKikimrConfig::TAppConfig appConfig; + appConfig.MutableTableServiceConfig()->SetEnableKqpDataQuerySourceRead(true); + settings.SetDomainRoot(KikimrDefaultUtDomainRoot); + TFeatureFlags flags; + flags.SetEnablePredicateExtractForDataQueries(true); + settings.SetFeatureFlags(flags); + settings.SetAppConfig(appConfig); + + TKikimrRunner kikimr(settings); + auto db = kikimr.GetTableClient(); + auto session = db.CreateSession().GetValueSync().GetSession(); + + AssertSuccessResult(session.ExecuteSchemeQuery(R"( + --!syntax_v1 + CREATE TABLE `KeyValueLimit` ( + Key Uint64, + Value String, + PRIMARY KEY (Key) + ); + )").GetValueSync()); + + AssertSuccessResult(session.ExecuteDataQuery(R"( + + REPLACE INTO `KeyValueLimit` (Key, Value) VALUES + (101u, "Value1"), + (102u, "Value2"), + (103u, "Value3"), + (201u, "Value1"), + (202u, "Value2"), + (203u, "Value3"), + (301u, "Value1"), + (302u, "Value2"), + (303u, "Value3"), + (401u, "Value1"), + (402u, "Value2"), + (403u, "Value3"), + (501u, "Value1"), + (502u, "Value2"), + (503u, "Value3"), + (601u, "Value1"), + (602u, "Value2"), + (603u, "Value3"), + (701u, "Value1"), + (702u, "Value2"), + (703u, "Value3"), + (801u, "Value1"), + (802u, "Value2"), + (803u, "Value3"); + + )", TTxControl::BeginTx(TTxSettings::SerializableRW()).CommitTx()).GetValueSync()); + + NKikimrTxDataShard::TEvRead evread; + evread.SetMaxRowsInResult(2); + InjectRangeEvReadSettings(evread); + + NKikimrTxDataShard::TEvReadAck evreadack; + InjectRangeEvReadAckSettings(evreadack); + + { + auto result = session.ExecuteDataQuery(R"( + SELECT Key, Value FROM `/Root/KeyValueLimit` WHERE Key >= 202 ORDER BY Key LIMIT 5; + )", TTxControl::BeginTx().CommitTx()).GetValueSync(); + UNIT_ASSERT_C(result.IsSuccess(), result.GetIssues().ToString()); + CompareYson(R"([[[202u];["Value2"]];[[203u];["Value3"]];[[301u];["Value1"]];[[302u];["Value2"]];[[303u];["Value3"]]])", FormatResultSetYson(result.GetResultSet(0))); + } + } + Y_UNIT_TEST(DqSourceLocksEffects) { TKikimrSettings settings; NKikimrConfig::TAppConfig appConfig; |