diff options
| author | Alexey Pozdniakov <[email protected]> | 2025-10-25 22:49:33 +0300 |
|---|---|---|
| committer | GitHub <[email protected]> | 2025-10-25 22:49:33 +0300 |
| commit | 83fc3583be4365fad57d378b1a53e92fa1b7c36a (patch) | |
| tree | 59d86abf0ecfcefa1345f60286996d228ce3eaf6 | |
| parent | 22841935340be1449d66b2d5a8fc53790edf4435 (diff) | |
[YQ-4797] Watermark: RD: reduce CPU consumption, fix (#27563)
| -rw-r--r-- | ydb/core/fq/libs/row_dispatcher/format_handler/format_handler.cpp | 20 | ||||
| -rw-r--r-- | ydb/core/fq/libs/row_dispatcher/format_handler/ut/format_handler_ut.cpp | 22 |
2 files changed, 23 insertions, 19 deletions
diff --git a/ydb/core/fq/libs/row_dispatcher/format_handler/format_handler.cpp b/ydb/core/fq/libs/row_dispatcher/format_handler/format_handler.cpp index d6a0ef8b303..de5e3ce9126 100644 --- a/ydb/core/fq/libs/row_dispatcher/format_handler/format_handler.cpp +++ b/ydb/core/fq/libs/row_dispatcher/format_handler/format_handler.cpp @@ -251,17 +251,17 @@ private: Y_ENSURE(false, "Expected embedded or list from purecalc"); } - const auto offset = Self.Offsets->at(rowId); - if (const auto nextOffset = Client->GetNextMessageOffset(); nextOffset && offset < *nextOffset) { - LOG_ROW_DISPATCHER_TRACE("OnData, skip historical offset: " << offset << ", next message offset: " << *nextOffset); + Offset = Self.Offsets->at(rowId); + if (const auto nextOffset = Client->GetNextMessageOffset(); nextOffset && Offset < *nextOffset) { + LOG_ROW_DISPATCHER_TRACE("OnData, skip historical offset: " << Offset << ", next message offset: " << *nextOffset); return; } - FilteredOffsets.push_back(offset); - auto newNumberRows = NumberRows; auto newDataPackerSize = DataPackerSize; if (filter) { + FilteredOffsets.push_back(Offset); + Y_DEFER { // Values allocated on parser allocator and should be released FilteredRow.assign(Columns.size(), NYql::NUdf::TUnboxedValue()); @@ -280,7 +280,7 @@ private: newDataPackerSize = DataPacker->PackedSizeEstimate(); } - OnWatermark(offset, maybeWatermark); + OnWatermark(Offset, maybeWatermark); const auto numberRows = newNumberRows - NumberRows; const auto rowSize = newDataPackerSize - DataPackerSize; @@ -289,9 +289,9 @@ private: return; } - LOG_ROW_DISPATCHER_TRACE("OnBatchFinish, offset: " << offset << ", number rows: " << numberRows << ", row size: " << rowSize << ", watermark: " << Watermark); + LOG_ROW_DISPATCHER_TRACE("OnBatchFinish, offset: " << Offset << ", number rows: " << numberRows << ", row size: " << rowSize << ", watermark: " << Watermark); - Client->AddDataToClient(offset, numberRows, rowSize, Watermark); + Client->AddDataToClient(Offset, numberRows, rowSize, Watermark); NumberRows = newNumberRows; DataPackerSize = newDataPackerSize; @@ -322,6 +322,9 @@ private: void FinishPacking() { if (!DataPacker->IsEmpty() || !Watermark.Empty()) { LOG_ROW_DISPATCHER_TRACE("FinishPacking, batch size: " << DataPackerSize << ", number rows: " << FilteredOffsets.size()); + if (FilteredOffsets.empty()) { + FilteredOffsets.push_back(Offset); + } ClientData.emplace(NYql::MakeReadOnlyRope(DataPacker->Finish()), std::move(FilteredOffsets), Watermark); NumberRows = 0; DataPackerSize = 0; @@ -340,6 +343,7 @@ private: bool ClientStarted = false; // Filtered data + ui64 Offset; ui64 NumberRows = 0; ui64 DataPackerSize = 0; TVector<NYql::NUdf::TUnboxedValue> FilteredRow; // Temporary value holder for DataPacket diff --git a/ydb/core/fq/libs/row_dispatcher/format_handler/ut/format_handler_ut.cpp b/ydb/core/fq/libs/row_dispatcher/format_handler/ut/format_handler_ut.cpp index 8b32ff4c161..d6a77c7b21c 100644 --- a/ydb/core/fq/libs/row_dispatcher/format_handler/ut/format_handler_ut.cpp +++ b/ydb/core/fq/libs/row_dispatcher/format_handler/ut/format_handler_ut.cpp @@ -342,8 +342,8 @@ Y_UNIT_TEST_SUITE(TestFormatHandler) { )); messages = TVector<TMessages>{ - {{firstOffset + 0, firstOffset + 1}, {}, TBatch().AddRow(TRow().AddString("event1").AddString("str_first__large__"))}, - {{firstOffset + 2, firstOffset + 3}, {}, TBatch().AddRow(TRow().AddString("event3").AddString("str_first__large__"))}, + {{firstOffset + 1}, {}, TBatch().AddRow(TRow().AddString("event1").AddString("str_first__large__"))}, + {{firstOffset + 3}, {}, TBatch().AddRow(TRow().AddString("event3").AddString("str_first__large__"))}, }; CheckSuccess(MakeClient( {commonColumn, {"column_0", "[DataType; String]"}}, @@ -354,7 +354,7 @@ Y_UNIT_TEST_SUITE(TestFormatHandler) { )); messages = TVector<TMessages>{ - {{firstOffset + 0, firstOffset + 1}, {}, TBatch().AddRow(TRow().AddString("event0").AddString("str_second"))}, + {{firstOffset + 0}, {}, TBatch().AddRow(TRow().AddString("event0").AddString("str_second"))}, }; CheckSuccess(MakeClient( {commonColumn, {"column_1", "[DataType; String]"}}, @@ -656,13 +656,13 @@ Y_UNIT_TEST_SUITE(TestFormatHandler) { auto messages = TVector<TMessages>{ { - {firstOffset + 2, firstOffset + 3}, + {firstOffset + 2}, TInstant::Seconds(40), TBatch() .AddRow(TRow().AddString("1970-01-01T00:00:44Z").AddUint64(1)) }, { - {firstOffset + 4, firstOffset + 5}, + {firstOffset + 4}, TInstant::Seconds(42), TBatch() .AddRow(TRow().AddString("1970-01-01T00:00:46Z").AddUint64(1)) @@ -683,13 +683,13 @@ Y_UNIT_TEST_SUITE(TestFormatHandler) { messages = TVector<TMessages>{ { - {firstOffset + 4, firstOffset + 5}, + {firstOffset + 4}, TInstant::Seconds(42), TBatch() .AddRow(TRow().AddString("1970-01-01T00:00:46Z").AddUint64(1)) }, { - {firstOffset + 6, firstOffset + 7}, + {firstOffset + 6}, TInstant::Seconds(44), TBatch() .AddRow(TRow().AddString("1970-01-01T00:00:48Z").AddUint64(1)) @@ -733,12 +733,12 @@ Y_UNIT_TEST_SUITE(TestFormatHandler) { auto messages = TVector<TMessages>{ { - {firstOffset + 2, firstOffset + 3}, + {firstOffset + 3}, TInstant::Seconds(40), TBatch() }, { - {firstOffset + 4, firstOffset + 5}, + {firstOffset + 5}, TInstant::Seconds(42), TBatch() }, @@ -758,12 +758,12 @@ Y_UNIT_TEST_SUITE(TestFormatHandler) { messages = TVector<TMessages>{ { - {firstOffset + 4, firstOffset + 5}, + {firstOffset + 5}, TInstant::Seconds(42), TBatch() }, { - {firstOffset + 6, firstOffset + 7}, + {firstOffset + 7}, TInstant::Seconds(44), TBatch() }, |
