diff options
author | ivanmorozov <ivanmorozov@yandex-team.com> | 2023-04-11 07:45:41 +0300 |
---|---|---|
committer | ivanmorozov <ivanmorozov@yandex-team.com> | 2023-04-11 07:45:41 +0300 |
commit | 57c00b3389acb507d202636cbe3df771191784db (patch) | |
tree | aebe527835fca8d4da65a58674f7a8d55991ee1e | |
parent | 1400b46e226b68d68b032f89bd2893a12178cd77 (diff) | |
download | ydb-57c00b3389acb507d202636cbe3df771191784db.tar.gz |
provide RequestedBytesLimitReached flag
-rw-r--r-- | ydb/core/kqp/compute_actor/kqp_compute_events.h | 19 | ||||
-rw-r--r-- | ydb/core/protos/kqp.proto | 2 | ||||
-rw-r--r-- | ydb/core/tx/columnshard/columnshard__scan.cpp | 20 | ||||
-rw-r--r-- | ydb/core/tx/columnshard/engines/indexed_read_data.h | 13 | ||||
-rw-r--r-- | ydb/core/tx/datashard/datashard__kqp_scan.cpp | 1 |
5 files changed, 41 insertions, 14 deletions
diff --git a/ydb/core/kqp/compute_actor/kqp_compute_events.h b/ydb/core/kqp/compute_actor/kqp_compute_events.h index ace19f0b43..f78d60eaa1 100644 --- a/ydb/core/kqp/compute_actor/kqp_compute_events.h +++ b/ydb/core/kqp/compute_actor/kqp_compute_events.h @@ -41,10 +41,27 @@ struct TEvKqpCompute { TDuration CpuTime; TDuration WaitTime; ui32 PageFaults = 0; // number of page faults occurred when filling in this message + bool RequestedBytesLimitReached = false; bool Finished = false; bool PageFault = false; // page fault was the reason for sending this message mutable THolder<TEvRemoteScanData> Remote; + ui32 GetRowsCount() const { + if (ArrowBatch) { + return ArrowBatch->num_rows(); + } else { + return Rows.size(); + } + } + + bool IsEmpty() const { + if (ArrowBatch) { + return ArrowBatch->num_rows() == 0; + } else { + return Rows.size() == 0; + } + } + bool IsSerializable() const override { return true; } @@ -77,6 +94,7 @@ struct TEvKqpCompute { ev->PageFault = pbEv->Record.GetPageFault(); ev->PageFaults = pbEv->Record.GetPageFaults(); ev->Finished = pbEv->Record.GetFinished(); + ev->RequestedBytesLimitReached = pbEv->Record.GetRequestedBytesLimitReached(); ev->LastKey = TOwnedCellVec(TSerializedCellVec(pbEv->Record.GetLastKey()).GetCells()); auto rows = pbEv->Record.GetRows(); @@ -104,6 +122,7 @@ struct TEvKqpCompute { Remote->Record.SetWaitTimeMs(WaitTime.MilliSeconds()); Remote->Record.SetPageFaults(PageFaults); Remote->Record.SetFinished(Finished); + Remote->Record.SetRequestedBytesLimitReached(RequestedBytesLimitReached); Remote->Record.SetPageFaults(PageFaults); Remote->Record.SetPageFault(PageFault); Remote->Record.SetLastKey(TSerializedCellVec::Serialize(LastKey)); diff --git a/ydb/core/protos/kqp.proto b/ydb/core/protos/kqp.proto index af80aa3eb1..0f936b5630 100644 --- a/ydb/core/protos/kqp.proto +++ b/ydb/core/protos/kqp.proto @@ -559,6 +559,8 @@ message TEvRemoteScanData { // backwards comparability. repeated bytes Rows = 8; optional TArrowBatch ArrowBatch = 10; + + optional bool RequestedBytesLimitReached = 11 [default = false]; } message TEvRemoteScanDataAck { diff --git a/ydb/core/tx/columnshard/columnshard__scan.cpp b/ydb/core/tx/columnshard/columnshard__scan.cpp index c3b063eb5f..8fac22181c 100644 --- a/ydb/core/tx/columnshard/columnshard__scan.cpp +++ b/ydb/core/tx/columnshard/columnshard__scan.cpp @@ -160,6 +160,7 @@ private: } void HandleScan(NConveyor::TEvExecution::TEvTaskProcessedResult::TPtr& ev) { + TaskResultsCounter.Inc(); auto g = Stats.MakeGuard("task_result"); LOG_DEBUG_S(*TlsActivationContext, NKikimrServices::TX_COLUMNSHARD_SCAN, "Scan " << ScanActorId << " got ScanDataAck" << " txId: " << TxId << " scanId: " << ScanId << " gen: " << ScanGen << " tablet: " << TabletId); @@ -184,11 +185,8 @@ private: << " txId: " << TxId << " scanId: " << ScanId << " gen: " << ScanGen << " tablet: " << TabletId << " freeSpace: " << ev->Get()->FreeSpace << " prevFreeSpace: " << PeerFreeSpace); - --InFlightScanDataMessages; - if (!ComputeActorId) { ComputeActorId = ev->Sender; - InFlightScanDataMessages = 0; } Y_VERIFY(ev->Get()->Generation == ScanGen); @@ -342,11 +340,7 @@ private: } // Send new results if there is available capacity - i64 MAX_SCANDATA_MESSAGES_IN_FLIGHT = 2; - while (InFlightScanDataMessages < MAX_SCANDATA_MESSAGES_IN_FLIGHT) { - if (!ScanIterator || !ProduceResults()) { - break; - } + while (ScanIterator && ProduceResults()) { } // Switch to the next range if the current one is finished @@ -369,12 +363,12 @@ private: // Only exist the loop if either: // * we have finished scanning ALL the ranges // * or there is an in-flight blob read or ScanData message for which - // we will get a reply and will be able to proceed futher - if (!ScanIterator || InFlightScanDataMessages != 0 || InFlightReads != 0) { + // we will get a reply and will be able to proceed further + if (!ScanIterator || PeerFreeSpace == 0 || InFlightReads != 0 || (DataTasksProcessor && DataTasksProcessor->GetDataCounter() > TaskResultsCounter.Val())) { return; } } - + Y_VERIFY_DEBUG(false); // The loop has finished without any progress! LOG_ERROR_S(*TlsActivationContext, NKikimrServices::TX_COLUMNSHARD_SCAN, "Scan " << ScanActorId << " is hanging" @@ -508,6 +502,7 @@ private: << " arrow schema:\n" << (Result->ArrowBatch ? Result->ArrowBatch->schema()->ToString() : "")); if (PeerFreeSpace < Bytes) { + Result->RequestedBytesLimitReached = true; PeerFreeSpace = 0; } else { PeerFreeSpace -= Bytes; @@ -525,7 +520,6 @@ private: } Send(ComputeActorId, Result.Release(), IEventHandle::FlagTrackDelivery); // TODO: FlagSubscribeOnSession ? - ++InFlightScanDataMessages; ReportStats(); @@ -606,9 +600,9 @@ private: THolder<TEvKqpCompute::TEvScanData> Result; i64 InFlightReads = 0; i64 InFlightReadBytes = 0; - i64 InFlightScanDataMessages = 0; bool Finished = false; + TAtomicCounter TaskResultsCounter = 0; IDataTasksProcessor::TPtr DataTasksProcessor; class TBlobStats { diff --git a/ydb/core/tx/columnshard/engines/indexed_read_data.h b/ydb/core/tx/columnshard/engines/indexed_read_data.h index 8230d20437..77cfa21154 100644 --- a/ydb/core/tx/columnshard/engines/indexed_read_data.h +++ b/ydb/core/tx/columnshard/engines/indexed_read_data.h @@ -38,10 +38,16 @@ public: }; class IDataTasksProcessor { +private: + TAtomicCounter DataProcessorAddDataCounter = 0; protected: virtual bool DoAdd(IDataPreparationTask::TPtr task) = 0; std::atomic<bool> Stopped = false; public: + i64 GetDataCounter() const { + return DataProcessorAddDataCounter.Val(); + } + void Stop() { Stopped = true; } @@ -52,7 +58,12 @@ public: using TPtr = std::shared_ptr<IDataTasksProcessor>; virtual ~IDataTasksProcessor() = default; bool Add(IDataPreparationTask::TPtr task) { - return DoAdd(task); + if (DoAdd(task)) { + DataProcessorAddDataCounter.Inc(); + return true; + } + return false; + } }; } diff --git a/ydb/core/tx/datashard/datashard__kqp_scan.cpp b/ydb/core/tx/datashard/datashard__kqp_scan.cpp index f04adb2952..97221a07a7 100644 --- a/ydb/core/tx/datashard/datashard__kqp_scan.cpp +++ b/ydb/core/tx/datashard/datashard__kqp_scan.cpp @@ -450,6 +450,7 @@ private: << ", finished: " << Result->Finished << ", pageFault: " << Result->PageFault); if (PeerFreeSpace < sendBytes) { + Result->RequestedBytesLimitReached = true; PeerFreeSpace = 0; } else { PeerFreeSpace -= sendBytes; |