aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorivanmorozov <ivanmorozov@yandex-team.com>2023-04-11 07:45:41 +0300
committerivanmorozov <ivanmorozov@yandex-team.com>2023-04-11 07:45:41 +0300
commit57c00b3389acb507d202636cbe3df771191784db (patch)
treeaebe527835fca8d4da65a58674f7a8d55991ee1e
parent1400b46e226b68d68b032f89bd2893a12178cd77 (diff)
downloadydb-57c00b3389acb507d202636cbe3df771191784db.tar.gz
provide RequestedBytesLimitReached flag
-rw-r--r--ydb/core/kqp/compute_actor/kqp_compute_events.h19
-rw-r--r--ydb/core/protos/kqp.proto2
-rw-r--r--ydb/core/tx/columnshard/columnshard__scan.cpp20
-rw-r--r--ydb/core/tx/columnshard/engines/indexed_read_data.h13
-rw-r--r--ydb/core/tx/datashard/datashard__kqp_scan.cpp1
5 files changed, 41 insertions, 14 deletions
diff --git a/ydb/core/kqp/compute_actor/kqp_compute_events.h b/ydb/core/kqp/compute_actor/kqp_compute_events.h
index ace19f0b43..f78d60eaa1 100644
--- a/ydb/core/kqp/compute_actor/kqp_compute_events.h
+++ b/ydb/core/kqp/compute_actor/kqp_compute_events.h
@@ -41,10 +41,27 @@ struct TEvKqpCompute {
TDuration CpuTime;
TDuration WaitTime;
ui32 PageFaults = 0; // number of page faults occurred when filling in this message
+ bool RequestedBytesLimitReached = false;
bool Finished = false;
bool PageFault = false; // page fault was the reason for sending this message
mutable THolder<TEvRemoteScanData> Remote;
+ ui32 GetRowsCount() const {
+ if (ArrowBatch) {
+ return ArrowBatch->num_rows();
+ } else {
+ return Rows.size();
+ }
+ }
+
+ bool IsEmpty() const {
+ if (ArrowBatch) {
+ return ArrowBatch->num_rows() == 0;
+ } else {
+ return Rows.size() == 0;
+ }
+ }
+
bool IsSerializable() const override {
return true;
}
@@ -77,6 +94,7 @@ struct TEvKqpCompute {
ev->PageFault = pbEv->Record.GetPageFault();
ev->PageFaults = pbEv->Record.GetPageFaults();
ev->Finished = pbEv->Record.GetFinished();
+ ev->RequestedBytesLimitReached = pbEv->Record.GetRequestedBytesLimitReached();
ev->LastKey = TOwnedCellVec(TSerializedCellVec(pbEv->Record.GetLastKey()).GetCells());
auto rows = pbEv->Record.GetRows();
@@ -104,6 +122,7 @@ struct TEvKqpCompute {
Remote->Record.SetWaitTimeMs(WaitTime.MilliSeconds());
Remote->Record.SetPageFaults(PageFaults);
Remote->Record.SetFinished(Finished);
+ Remote->Record.SetRequestedBytesLimitReached(RequestedBytesLimitReached);
Remote->Record.SetPageFaults(PageFaults);
Remote->Record.SetPageFault(PageFault);
Remote->Record.SetLastKey(TSerializedCellVec::Serialize(LastKey));
diff --git a/ydb/core/protos/kqp.proto b/ydb/core/protos/kqp.proto
index af80aa3eb1..0f936b5630 100644
--- a/ydb/core/protos/kqp.proto
+++ b/ydb/core/protos/kqp.proto
@@ -559,6 +559,8 @@ message TEvRemoteScanData {
// backwards comparability.
repeated bytes Rows = 8;
optional TArrowBatch ArrowBatch = 10;
+
+ optional bool RequestedBytesLimitReached = 11 [default = false];
}
message TEvRemoteScanDataAck {
diff --git a/ydb/core/tx/columnshard/columnshard__scan.cpp b/ydb/core/tx/columnshard/columnshard__scan.cpp
index c3b063eb5f..8fac22181c 100644
--- a/ydb/core/tx/columnshard/columnshard__scan.cpp
+++ b/ydb/core/tx/columnshard/columnshard__scan.cpp
@@ -160,6 +160,7 @@ private:
}
void HandleScan(NConveyor::TEvExecution::TEvTaskProcessedResult::TPtr& ev) {
+ TaskResultsCounter.Inc();
auto g = Stats.MakeGuard("task_result");
LOG_DEBUG_S(*TlsActivationContext, NKikimrServices::TX_COLUMNSHARD_SCAN,
"Scan " << ScanActorId << " got ScanDataAck" << " txId: " << TxId << " scanId: " << ScanId << " gen: " << ScanGen << " tablet: " << TabletId);
@@ -184,11 +185,8 @@ private:
<< " txId: " << TxId << " scanId: " << ScanId << " gen: " << ScanGen << " tablet: " << TabletId
<< " freeSpace: " << ev->Get()->FreeSpace << " prevFreeSpace: " << PeerFreeSpace);
- --InFlightScanDataMessages;
-
if (!ComputeActorId) {
ComputeActorId = ev->Sender;
- InFlightScanDataMessages = 0;
}
Y_VERIFY(ev->Get()->Generation == ScanGen);
@@ -342,11 +340,7 @@ private:
}
// Send new results if there is available capacity
- i64 MAX_SCANDATA_MESSAGES_IN_FLIGHT = 2;
- while (InFlightScanDataMessages < MAX_SCANDATA_MESSAGES_IN_FLIGHT) {
- if (!ScanIterator || !ProduceResults()) {
- break;
- }
+ while (ScanIterator && ProduceResults()) {
}
// Switch to the next range if the current one is finished
@@ -369,12 +363,12 @@ private:
// Only exist the loop if either:
// * we have finished scanning ALL the ranges
// * or there is an in-flight blob read or ScanData message for which
- // we will get a reply and will be able to proceed futher
- if (!ScanIterator || InFlightScanDataMessages != 0 || InFlightReads != 0) {
+ // we will get a reply and will be able to proceed further
+ if (!ScanIterator || PeerFreeSpace == 0 || InFlightReads != 0 || (DataTasksProcessor && DataTasksProcessor->GetDataCounter() > TaskResultsCounter.Val())) {
return;
}
}
-
+ Y_VERIFY_DEBUG(false);
// The loop has finished without any progress!
LOG_ERROR_S(*TlsActivationContext, NKikimrServices::TX_COLUMNSHARD_SCAN,
"Scan " << ScanActorId << " is hanging"
@@ -508,6 +502,7 @@ private:
<< " arrow schema:\n" << (Result->ArrowBatch ? Result->ArrowBatch->schema()->ToString() : ""));
if (PeerFreeSpace < Bytes) {
+ Result->RequestedBytesLimitReached = true;
PeerFreeSpace = 0;
} else {
PeerFreeSpace -= Bytes;
@@ -525,7 +520,6 @@ private:
}
Send(ComputeActorId, Result.Release(), IEventHandle::FlagTrackDelivery); // TODO: FlagSubscribeOnSession ?
- ++InFlightScanDataMessages;
ReportStats();
@@ -606,9 +600,9 @@ private:
THolder<TEvKqpCompute::TEvScanData> Result;
i64 InFlightReads = 0;
i64 InFlightReadBytes = 0;
- i64 InFlightScanDataMessages = 0;
bool Finished = false;
+ TAtomicCounter TaskResultsCounter = 0;
IDataTasksProcessor::TPtr DataTasksProcessor;
class TBlobStats {
diff --git a/ydb/core/tx/columnshard/engines/indexed_read_data.h b/ydb/core/tx/columnshard/engines/indexed_read_data.h
index 8230d20437..77cfa21154 100644
--- a/ydb/core/tx/columnshard/engines/indexed_read_data.h
+++ b/ydb/core/tx/columnshard/engines/indexed_read_data.h
@@ -38,10 +38,16 @@ public:
};
class IDataTasksProcessor {
+private:
+ TAtomicCounter DataProcessorAddDataCounter = 0;
protected:
virtual bool DoAdd(IDataPreparationTask::TPtr task) = 0;
std::atomic<bool> Stopped = false;
public:
+ i64 GetDataCounter() const {
+ return DataProcessorAddDataCounter.Val();
+ }
+
void Stop() {
Stopped = true;
}
@@ -52,7 +58,12 @@ public:
using TPtr = std::shared_ptr<IDataTasksProcessor>;
virtual ~IDataTasksProcessor() = default;
bool Add(IDataPreparationTask::TPtr task) {
- return DoAdd(task);
+ if (DoAdd(task)) {
+ DataProcessorAddDataCounter.Inc();
+ return true;
+ }
+ return false;
+
}
};
}
diff --git a/ydb/core/tx/datashard/datashard__kqp_scan.cpp b/ydb/core/tx/datashard/datashard__kqp_scan.cpp
index f04adb2952..97221a07a7 100644
--- a/ydb/core/tx/datashard/datashard__kqp_scan.cpp
+++ b/ydb/core/tx/datashard/datashard__kqp_scan.cpp
@@ -450,6 +450,7 @@ private:
<< ", finished: " << Result->Finished << ", pageFault: " << Result->PageFault);
if (PeerFreeSpace < sendBytes) {
+ Result->RequestedBytesLimitReached = true;
PeerFreeSpace = 0;
} else {
PeerFreeSpace -= sendBytes;