summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorAlexey Pozdniakov <[email protected]>2025-10-25 22:49:33 +0300
committerGitHub <[email protected]>2025-10-25 22:49:33 +0300
commit83fc3583be4365fad57d378b1a53e92fa1b7c36a (patch)
tree59d86abf0ecfcefa1345f60286996d228ce3eaf6
parent22841935340be1449d66b2d5a8fc53790edf4435 (diff)
[YQ-4797] Watermark: RD: reduce CPU consumption, fix (#27563)
-rw-r--r--ydb/core/fq/libs/row_dispatcher/format_handler/format_handler.cpp20
-rw-r--r--ydb/core/fq/libs/row_dispatcher/format_handler/ut/format_handler_ut.cpp22
2 files changed, 23 insertions, 19 deletions
diff --git a/ydb/core/fq/libs/row_dispatcher/format_handler/format_handler.cpp b/ydb/core/fq/libs/row_dispatcher/format_handler/format_handler.cpp
index d6a0ef8b303..de5e3ce9126 100644
--- a/ydb/core/fq/libs/row_dispatcher/format_handler/format_handler.cpp
+++ b/ydb/core/fq/libs/row_dispatcher/format_handler/format_handler.cpp
@@ -251,17 +251,17 @@ private:
Y_ENSURE(false, "Expected embedded or list from purecalc");
}
- const auto offset = Self.Offsets->at(rowId);
- if (const auto nextOffset = Client->GetNextMessageOffset(); nextOffset && offset < *nextOffset) {
- LOG_ROW_DISPATCHER_TRACE("OnData, skip historical offset: " << offset << ", next message offset: " << *nextOffset);
+ Offset = Self.Offsets->at(rowId);
+ if (const auto nextOffset = Client->GetNextMessageOffset(); nextOffset && Offset < *nextOffset) {
+ LOG_ROW_DISPATCHER_TRACE("OnData, skip historical offset: " << Offset << ", next message offset: " << *nextOffset);
return;
}
- FilteredOffsets.push_back(offset);
-
auto newNumberRows = NumberRows;
auto newDataPackerSize = DataPackerSize;
if (filter) {
+ FilteredOffsets.push_back(Offset);
+
Y_DEFER {
// Values allocated on parser allocator and should be released
FilteredRow.assign(Columns.size(), NYql::NUdf::TUnboxedValue());
@@ -280,7 +280,7 @@ private:
newDataPackerSize = DataPacker->PackedSizeEstimate();
}
- OnWatermark(offset, maybeWatermark);
+ OnWatermark(Offset, maybeWatermark);
const auto numberRows = newNumberRows - NumberRows;
const auto rowSize = newDataPackerSize - DataPackerSize;
@@ -289,9 +289,9 @@ private:
return;
}
- LOG_ROW_DISPATCHER_TRACE("OnBatchFinish, offset: " << offset << ", number rows: " << numberRows << ", row size: " << rowSize << ", watermark: " << Watermark);
+ LOG_ROW_DISPATCHER_TRACE("OnBatchFinish, offset: " << Offset << ", number rows: " << numberRows << ", row size: " << rowSize << ", watermark: " << Watermark);
- Client->AddDataToClient(offset, numberRows, rowSize, Watermark);
+ Client->AddDataToClient(Offset, numberRows, rowSize, Watermark);
NumberRows = newNumberRows;
DataPackerSize = newDataPackerSize;
@@ -322,6 +322,9 @@ private:
void FinishPacking() {
if (!DataPacker->IsEmpty() || !Watermark.Empty()) {
LOG_ROW_DISPATCHER_TRACE("FinishPacking, batch size: " << DataPackerSize << ", number rows: " << FilteredOffsets.size());
+ if (FilteredOffsets.empty()) {
+ FilteredOffsets.push_back(Offset);
+ }
ClientData.emplace(NYql::MakeReadOnlyRope(DataPacker->Finish()), std::move(FilteredOffsets), Watermark);
NumberRows = 0;
DataPackerSize = 0;
@@ -340,6 +343,7 @@ private:
bool ClientStarted = false;
// Filtered data
+ ui64 Offset;
ui64 NumberRows = 0;
ui64 DataPackerSize = 0;
TVector<NYql::NUdf::TUnboxedValue> FilteredRow; // Temporary value holder for DataPacket
diff --git a/ydb/core/fq/libs/row_dispatcher/format_handler/ut/format_handler_ut.cpp b/ydb/core/fq/libs/row_dispatcher/format_handler/ut/format_handler_ut.cpp
index 8b32ff4c161..d6a77c7b21c 100644
--- a/ydb/core/fq/libs/row_dispatcher/format_handler/ut/format_handler_ut.cpp
+++ b/ydb/core/fq/libs/row_dispatcher/format_handler/ut/format_handler_ut.cpp
@@ -342,8 +342,8 @@ Y_UNIT_TEST_SUITE(TestFormatHandler) {
));
messages = TVector<TMessages>{
- {{firstOffset + 0, firstOffset + 1}, {}, TBatch().AddRow(TRow().AddString("event1").AddString("str_first__large__"))},
- {{firstOffset + 2, firstOffset + 3}, {}, TBatch().AddRow(TRow().AddString("event3").AddString("str_first__large__"))},
+ {{firstOffset + 1}, {}, TBatch().AddRow(TRow().AddString("event1").AddString("str_first__large__"))},
+ {{firstOffset + 3}, {}, TBatch().AddRow(TRow().AddString("event3").AddString("str_first__large__"))},
};
CheckSuccess(MakeClient(
{commonColumn, {"column_0", "[DataType; String]"}},
@@ -354,7 +354,7 @@ Y_UNIT_TEST_SUITE(TestFormatHandler) {
));
messages = TVector<TMessages>{
- {{firstOffset + 0, firstOffset + 1}, {}, TBatch().AddRow(TRow().AddString("event0").AddString("str_second"))},
+ {{firstOffset + 0}, {}, TBatch().AddRow(TRow().AddString("event0").AddString("str_second"))},
};
CheckSuccess(MakeClient(
{commonColumn, {"column_1", "[DataType; String]"}},
@@ -656,13 +656,13 @@ Y_UNIT_TEST_SUITE(TestFormatHandler) {
auto messages = TVector<TMessages>{
{
- {firstOffset + 2, firstOffset + 3},
+ {firstOffset + 2},
TInstant::Seconds(40),
TBatch()
.AddRow(TRow().AddString("1970-01-01T00:00:44Z").AddUint64(1))
},
{
- {firstOffset + 4, firstOffset + 5},
+ {firstOffset + 4},
TInstant::Seconds(42),
TBatch()
.AddRow(TRow().AddString("1970-01-01T00:00:46Z").AddUint64(1))
@@ -683,13 +683,13 @@ Y_UNIT_TEST_SUITE(TestFormatHandler) {
messages = TVector<TMessages>{
{
- {firstOffset + 4, firstOffset + 5},
+ {firstOffset + 4},
TInstant::Seconds(42),
TBatch()
.AddRow(TRow().AddString("1970-01-01T00:00:46Z").AddUint64(1))
},
{
- {firstOffset + 6, firstOffset + 7},
+ {firstOffset + 6},
TInstant::Seconds(44),
TBatch()
.AddRow(TRow().AddString("1970-01-01T00:00:48Z").AddUint64(1))
@@ -733,12 +733,12 @@ Y_UNIT_TEST_SUITE(TestFormatHandler) {
auto messages = TVector<TMessages>{
{
- {firstOffset + 2, firstOffset + 3},
+ {firstOffset + 3},
TInstant::Seconds(40),
TBatch()
},
{
- {firstOffset + 4, firstOffset + 5},
+ {firstOffset + 5},
TInstant::Seconds(42),
TBatch()
},
@@ -758,12 +758,12 @@ Y_UNIT_TEST_SUITE(TestFormatHandler) {
messages = TVector<TMessages>{
{
- {firstOffset + 4, firstOffset + 5},
+ {firstOffset + 5},
TInstant::Seconds(42),
TBatch()
},
{
- {firstOffset + 6, firstOffset + 7},
+ {firstOffset + 7},
TInstant::Seconds(44),
TBatch()
},