aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorssmike <ssmike@ydb.tech>2023-02-21 13:01:16 +0300
committerssmike <ssmike@ydb.tech>2023-02-21 13:01:16 +0300
commit73db9d4327114063b8d9536190f4e21dd160c589 (patch)
treec8b8b326629c5e2d44837be4e2e73ec6cb91f0dd
parent61c8c2ee593742731a92501cace781de4d0106ee (diff)
downloadydb-73db9d4327114063b8d9536190f4e21dd160c589.tar.gz
fix limits logic
-rw-r--r--ydb/core/kqp/runtime/kqp_read_actor.cpp52
-rw-r--r--ydb/core/kqp/ut/opt/kqp_ne_ut.cpp69
2 files changed, 107 insertions, 14 deletions
diff --git a/ydb/core/kqp/runtime/kqp_read_actor.cpp b/ydb/core/kqp/runtime/kqp_read_actor.cpp
index 2700b9d625..b747c796ee 100644
--- a/ydb/core/kqp/runtime/kqp_read_actor.cpp
+++ b/ydb/core/kqp/runtime/kqp_read_actor.cpp
@@ -369,6 +369,7 @@ public:
StartTableScan();
}
Become(&TKqpReadActor::ReadyState);
+ Bootstrapped = true;
}
bool StartTableScan() {
@@ -625,7 +626,7 @@ public:
void StartRead(TShardState* state) {
TMaybe<ui64> limit;
if (Settings.GetItemsLimit()) {
- limit = Settings.GetItemsLimit() - Min(Settings.GetItemsLimit(), RecievedRowCount);
+ limit = Settings.GetItemsLimit() - Min(Settings.GetItemsLimit(), ReceivedRowCount);
if (*limit == 0) {
return;
@@ -724,7 +725,7 @@ public:
Reads[id].RegisterMessage(*ev->Get());
- RecievedRowCount += ev->Get()->GetRowsCount();
+ ReceivedRowCount += ev->Get()->GetRowsCount();
Results.push({Reads[id].Shard->TabletId, THolder<TEventHandle<TEvDataShard::TEvReadResult>>(ev.Release())});
CA_LOG_D(TStringBuilder() << "new data for read #" << id << " pushed");
Send(ComputeActorId, new TEvNewAsyncInputDataArrived(InputIndex));
@@ -843,12 +844,17 @@ public:
return stats;
}
+ bool LimitReached() const {
+ return Settings.GetItemsLimit() && ProcessedRowCount >= Settings.GetItemsLimit();
+ }
+
i64 GetAsyncInputData(
NKikimr::NMiniKQL::TUnboxedValueVector& resultVector,
TMaybe<TInstant>&,
bool& finished,
i64 freeSpace) override
{
+ CA_LOG_D(TStringBuilder() << " enter getasyncinputdata results size " << Results.size());
ui64 bytes = 0;
while (!Results.empty()) {
auto& result = Results.front();
@@ -886,17 +892,14 @@ public:
return bytes;
}
}
- CA_LOG_D(TStringBuilder() << "returned " << resultVector.size() << " rows");
+ CA_LOG_D(TStringBuilder() << "returned " << resultVector.size() << " rows; processed " << ProcessedRowCount << " rows");
if (batch->size() == result.ProcessedRows) {
auto& record = msg.Record;
- if (Reads[id].IsLastMessage(msg)) {
- Reads[id].Reset();
- ResetReads++;
- } else if (!Reads[id].Finished) {
+ if (!Reads[id].Finished) {
TMaybe<ui64> limit;
if (Settings.GetItemsLimit()) {
- limit = Settings.GetItemsLimit() - Min(Settings.GetItemsLimit(), RecievedRowCount);
+ limit = Settings.GetItemsLimit() - Min(Settings.GetItemsLimit(), ReceivedRowCount);
}
if (!limit || *limit > 0) {
@@ -909,20 +912,26 @@ public:
Send(MakePipePeNodeCacheID(false), new TEvPipeCache::TEvForward(request.Release(), state->TabletId, true),
IEventHandle::FlagTrackDelivery);
} else {
+ Reads[id].Finished = true;
+ }
+ }
+
+ if (Reads[id].IsLastMessage(msg)) {
+ if (!record.GetFinished()) {
auto cancel = MakeHolder<TEvDataShard::TEvReadCancel>();
cancel->Record.SetReadId(id);
Send(MakePipePeNodeCacheID(false), new TEvPipeCache::TEvForward(cancel.Release(), state->TabletId), IEventHandle::FlagTrackDelivery);
- Reads[id].Reset();
- ResetReads++;
}
+ Reads[id].Reset();
+ ResetReads++;
}
StartTableScan();
Results.pop();
- CA_LOG_D("dropping batch");
+ CA_LOG_D("dropping batch for read #" << id);
- if (RunningReads() == 0 || (Settings.GetItemsLimit() && ProcessedRowCount >= Settings.GetItemsLimit())) {
+ if (LimitReached()) {
finished = true;
break;
}
@@ -931,6 +940,19 @@ public:
}
}
+ if (RunningReads() == 0 && PendingShards.Empty() && Bootstrapped) {
+ finished = true;
+ }
+
+ CA_LOG_D(TStringBuilder() << "returned async data"
+ << " processed rows " << ProcessedRowCount
+ << " received rows " << ReceivedRowCount
+ << " running reads " << RunningReads()
+ << " pending shards " << PendingShards.Size()
+ << " finished = " << finished
+ << " has limit " << (Settings.GetItemsLimit() != 0)
+ << " limit reached " << LimitReached());
+
return bytes;
}
@@ -950,7 +972,7 @@ public:
}
//FIXME: use evread statistics after KIKIMR-16924
- tableStats->SetReadRows(tableStats->GetReadRows() + RecievedRowCount);
+ tableStats->SetReadRows(tableStats->GetReadRows() + ReceivedRowCount);
tableStats->SetReadBytes(tableStats->GetReadBytes() + BytesStats.DataBytes);
tableStats->SetAffectedPartitions(tableStats->GetAffectedPartitions() + InFlightShards.Size());
}
@@ -1010,7 +1032,7 @@ private:
TVector<NScheme::TTypeInfo> KeyColumnTypes;
NMiniKQL::TBytesStatistics BytesStats;
- ui64 RecievedRowCount = 0;
+ ui64 ReceivedRowCount = 0;
ui64 ProcessedRowCount = 0;
ui64 ResetReads = 0;
ui64 ReadId = 0;
@@ -1034,6 +1056,8 @@ private:
TString LogPrefix;
TTableId TableId;
+ bool Bootstrapped = false;
+
const TActorId ComputeActorId;
const ui64 InputIndex;
const NMiniKQL::TTypeEnvironment& TypeEnv;
diff --git a/ydb/core/kqp/ut/opt/kqp_ne_ut.cpp b/ydb/core/kqp/ut/opt/kqp_ne_ut.cpp
index bc650bb730..276dc9341c 100644
--- a/ydb/core/kqp/ut/opt/kqp_ne_ut.cpp
+++ b/ydb/core/kqp/ut/opt/kqp_ne_ut.cpp
@@ -3575,6 +3575,75 @@ Y_UNIT_TEST_SUITE(KqpNewEngine) {
}
}
+ Y_UNIT_TEST(DqSourceLimit) {
+ TKikimrSettings settings;
+ NKikimrConfig::TAppConfig appConfig;
+ appConfig.MutableTableServiceConfig()->SetEnableKqpDataQuerySourceRead(true);
+ settings.SetDomainRoot(KikimrDefaultUtDomainRoot);
+ TFeatureFlags flags;
+ flags.SetEnablePredicateExtractForDataQueries(true);
+ settings.SetFeatureFlags(flags);
+ settings.SetAppConfig(appConfig);
+
+ TKikimrRunner kikimr(settings);
+ auto db = kikimr.GetTableClient();
+ auto session = db.CreateSession().GetValueSync().GetSession();
+
+ AssertSuccessResult(session.ExecuteSchemeQuery(R"(
+ --!syntax_v1
+ CREATE TABLE `KeyValueLimit` (
+ Key Uint64,
+ Value String,
+ PRIMARY KEY (Key)
+ );
+ )").GetValueSync());
+
+ AssertSuccessResult(session.ExecuteDataQuery(R"(
+
+ REPLACE INTO `KeyValueLimit` (Key, Value) VALUES
+ (101u, "Value1"),
+ (102u, "Value2"),
+ (103u, "Value3"),
+ (201u, "Value1"),
+ (202u, "Value2"),
+ (203u, "Value3"),
+ (301u, "Value1"),
+ (302u, "Value2"),
+ (303u, "Value3"),
+ (401u, "Value1"),
+ (402u, "Value2"),
+ (403u, "Value3"),
+ (501u, "Value1"),
+ (502u, "Value2"),
+ (503u, "Value3"),
+ (601u, "Value1"),
+ (602u, "Value2"),
+ (603u, "Value3"),
+ (701u, "Value1"),
+ (702u, "Value2"),
+ (703u, "Value3"),
+ (801u, "Value1"),
+ (802u, "Value2"),
+ (803u, "Value3");
+
+ )", TTxControl::BeginTx(TTxSettings::SerializableRW()).CommitTx()).GetValueSync());
+
+ NKikimrTxDataShard::TEvRead evread;
+ evread.SetMaxRowsInResult(2);
+ InjectRangeEvReadSettings(evread);
+
+ NKikimrTxDataShard::TEvReadAck evreadack;
+ InjectRangeEvReadAckSettings(evreadack);
+
+ {
+ auto result = session.ExecuteDataQuery(R"(
+ SELECT Key, Value FROM `/Root/KeyValueLimit` WHERE Key >= 202 ORDER BY Key LIMIT 5;
+ )", TTxControl::BeginTx().CommitTx()).GetValueSync();
+ UNIT_ASSERT_C(result.IsSuccess(), result.GetIssues().ToString());
+ CompareYson(R"([[[202u];["Value2"]];[[203u];["Value3"]];[[301u];["Value1"]];[[302u];["Value2"]];[[303u];["Value3"]]])", FormatResultSetYson(result.GetResultSet(0)));
+ }
+ }
+
Y_UNIT_TEST(DqSourceLocksEffects) {
TKikimrSettings settings;
NKikimrConfig::TAppConfig appConfig;