diff options
author | Pisarenko Grigoriy <grigoriypisar@ydb.tech> | 2024-12-19 10:02:01 +0300 |
---|---|---|
committer | GitHub <noreply@github.com> | 2024-12-19 10:02:01 +0300 |
commit | 01d95e5e6e0300d9461af676a5cb3bcfafd7fcbd (patch) | |
tree | f31f78fab5abf98599708cc0a5aee3d1cb76c99a | |
parent | 3317b753e5adf41ed34844c61c431a9f74e1c867 (diff) | |
download | ydb-01d95e5e6e0300d9461af676a5cb3bcfafd7fcbd.tar.gz |
YQ-3975 RD fixed fault for parsing errors without filter (#12707)
11 files changed, 96 insertions, 76 deletions
diff --git a/ydb/core/fq/libs/row_dispatcher/format_handler/filters/filters_set.cpp b/ydb/core/fq/libs/row_dispatcher/format_handler/filters/filters_set.cpp index 6065579185..ae88416712 100644 --- a/ydb/core/fq/libs/row_dispatcher/format_handler/filters/filters_set.cpp +++ b/ydb/core/fq/libs/row_dispatcher/format_handler/filters/filters_set.cpp @@ -154,16 +154,7 @@ public: continue; } - if (filterHandler.GetPurecalcFilter()) { - PushToFilter(filterHandler, offsets, columnIndex, values, numberRows); - continue; - } - - // Clients without filters - LOG_ROW_DISPATCHER_TRACE("Add " << numberRows << " rows to client " << consumer->GetFilterId() << " without filtering"); - for (ui64 rowId = 0; rowId < numberRows; ++rowId) { - consumer->OnFilteredData(rowId); - } + PushToFilter(filterHandler, offsets, columnIndex, values, numberRows); } Stats.AddFilterLatency(TInstant::Now() - startFilter); } @@ -193,7 +184,9 @@ public: LOG_ROW_DISPATCHER_TRACE("Create filter with id " << filter->GetFilterId()); IPurecalcFilter::TPtr purecalcFilter; - if (filter->GetWhereFilter()) { + if (const auto& predicate = filter->GetWhereFilter()) { + LOG_ROW_DISPATCHER_TRACE("Create purecalc filter for predicate '" << predicate << "' (filter id: " << filter->GetFilterId() << ")"); + auto filterStatus = CreatePurecalcFilter(filter); if (filterStatus.IsFail()) { return filterStatus; @@ -225,9 +218,6 @@ public: private: void PushToFilter(const TFilterHandler& filterHandler, const TVector<ui64>& offsets, const TVector<ui64>& columnIndex, const TVector<const TVector<NYql::NUdf::TUnboxedValue>*>& values, ui64 numberRows) { - const auto filter = filterHandler.GetPurecalcFilter(); - Y_ENSURE(filter, "Expected initialized filter"); - const auto consumer = filterHandler.GetConsumer(); const auto& columnIds = consumer->GetColumnIds(); @@ -246,8 +236,13 @@ private: } } - LOG_ROW_DISPATCHER_TRACE("Pass " << numberRows << " rows to purecalc filter (client id: " << consumer->GetFilterId() << ")"); - filter->FilterData(result, numberRows); + if (const auto filter = filterHandler.GetPurecalcFilter()) { + LOG_ROW_DISPATCHER_TRACE("Pass " << numberRows << " rows to purecalc filter (filter id: " << consumer->GetFilterId() << ")"); + filter->FilterData(result, numberRows); + } else if (numberRows) { + LOG_ROW_DISPATCHER_TRACE("Add " << numberRows << " rows to client " << consumer->GetFilterId() << " without filtering"); + consumer->OnFilteredBatch(0, numberRows - 1); + } } private: diff --git a/ydb/core/fq/libs/row_dispatcher/format_handler/filters/filters_set.h b/ydb/core/fq/libs/row_dispatcher/format_handler/filters/filters_set.h index 427e73fa53..57da103b1a 100644 --- a/ydb/core/fq/libs/row_dispatcher/format_handler/filters/filters_set.h +++ b/ydb/core/fq/libs/row_dispatcher/format_handler/filters/filters_set.h @@ -17,6 +17,8 @@ public: virtual const TVector<ui64>& GetColumnIds() const = 0; virtual TMaybe<ui64> GetNextMessageOffset() const = 0; + virtual void OnFilteredBatch(ui64 firstRow, ui64 lastRow) = 0; // inclusive interval [firstRow, lastRow] + virtual void OnFilterStarted() = 0; virtual void OnFilteringError(TStatus status) = 0; }; diff --git a/ydb/core/fq/libs/row_dispatcher/format_handler/format_handler.cpp b/ydb/core/fq/libs/row_dispatcher/format_handler/format_handler.cpp index df46b12bef..f492e864b3 100644 --- a/ydb/core/fq/libs/row_dispatcher/format_handler/format_handler.cpp +++ b/ydb/core/fq/libs/row_dispatcher/format_handler/format_handler.cpp @@ -207,6 +207,13 @@ class TTopicFormatHandler : public NActors::TActor<TTopicFormatHandler>, public Client->StartClientSession(); } + void OnFilteredBatch(ui64 firstRow, ui64 lastRow) override { + LOG_ROW_DISPATCHER_TRACE("OnFilteredBatch, rows [" << firstRow << ", " << lastRow << "]"); + for (ui64 rowId = firstRow; rowId <= lastRow; ++rowId) { + OnFilteredData(rowId); + } + } + void OnFilteredData(ui64 rowId) override { const ui64 offset = Self.Offsets->at(rowId); if (const auto nextOffset = Client->GetNextMessageOffset(); nextOffset && offset < *nextOffset) { diff --git a/ydb/core/fq/libs/row_dispatcher/format_handler/parsers/json_parser.cpp b/ydb/core/fq/libs/row_dispatcher/format_handler/parsers/json_parser.cpp index 7c4fb95102..46c0196a66 100644 --- a/ydb/core/fq/libs/row_dispatcher/format_handler/parsers/json_parser.cpp +++ b/ydb/core/fq/libs/row_dispatcher/format_handler/parsers/json_parser.cpp @@ -414,23 +414,23 @@ protected: simdjson::ondemand::document_stream documents; CHECK_JSON_ERROR(Parser.iterate_many(values, size, simdjson::ondemand::DEFAULT_BATCH_SIZE).get(documents)) { - return TStatus::Fail(EStatusId::BAD_REQUEST, TStringBuilder() << "Failed to parse message batch from offset " << Buffer.Offsets.front() << ", json documents was corrupted: " << simdjson::error_message(error)); + return TStatus::Fail(EStatusId::BAD_REQUEST, TStringBuilder() << "Failed to parse message batch from offset " << Buffer.Offsets.front() << ", json documents was corrupted: " << simdjson::error_message(error) << " Current data batch: " << TruncateString(std::string_view(values, size))); } size_t rowId = 0; for (auto document : documents) { if (Y_UNLIKELY(rowId >= Buffer.NumberValues)) { - return TStatus::Fail(EStatusId::INTERNAL_ERROR, TStringBuilder() << "Failed to parse json messages, expected " << Buffer.NumberValues << " json rows from offset " << Buffer.Offsets.front() << " but got " << rowId + 1); + return TStatus::Fail(EStatusId::INTERNAL_ERROR, TStringBuilder() << "Failed to parse json messages, expected " << Buffer.NumberValues << " json rows from offset " << Buffer.Offsets.front() << " but got " << rowId + 1 << " (expected one json row for each offset from topic API in json each row format, maybe initial data was corrupted or messages is not in json format), current data batch: " << TruncateString(std::string_view(values, size))); } const ui64 offset = Buffer.Offsets[rowId]; CHECK_JSON_ERROR(document.error()) { - return TStatus::Fail(EStatusId::BAD_REQUEST, TStringBuilder() << "Failed to parse json message for offset " << offset << ", json document was corrupted: " << simdjson::error_message(error)); + return TStatus::Fail(EStatusId::BAD_REQUEST, TStringBuilder() << "Failed to parse json message for offset " << offset << ", json document was corrupted: " << simdjson::error_message(error) << " Current data batch: " << TruncateString(std::string_view(values, size))); } for (auto item : document.get_object()) { CHECK_JSON_ERROR(item.error()) { - return TStatus::Fail(EStatusId::BAD_REQUEST, TStringBuilder() << "Failed to parse json message for offset " << offset << ", json item was corrupted: " << simdjson::error_message(error)); + return TStatus::Fail(EStatusId::BAD_REQUEST, TStringBuilder() << "Failed to parse json message for offset " << offset << ", json item was corrupted: " << simdjson::error_message(error) << " Current data batch: " << TruncateString(std::string_view(values, size))); } const auto it = ColumnsIndex.find(item.escaped_key().value()); @@ -445,7 +445,7 @@ protected: } if (Y_UNLIKELY(rowId != Buffer.NumberValues)) { - return TStatus::Fail(EStatusId::INTERNAL_ERROR, TStringBuilder() << "Failed to parse json messages, expected " << Buffer.NumberValues << " json rows from offset " << Buffer.Offsets.front() << " but got " << rowId); + return TStatus::Fail(EStatusId::INTERNAL_ERROR, TStringBuilder() << "Failed to parse json messages, expected " << Buffer.NumberValues << " json rows from offset " << Buffer.Offsets.front() << " but got " << rowId << " (expected one json row for each offset from topic API in json each row format, maybe initial data was corrupted or messages is not in json format), current data batch: " << TruncateString(std::string_view(values, size))); } const ui64 firstOffset = Buffer.Offsets.front(); diff --git a/ydb/core/fq/libs/row_dispatcher/format_handler/ut/common/ut_common.cpp b/ydb/core/fq/libs/row_dispatcher/format_handler/ut/common/ut_common.cpp index b65911e793..f7f3489dd5 100644 --- a/ydb/core/fq/libs/row_dispatcher/format_handler/ut/common/ut_common.cpp +++ b/ydb/core/fq/libs/row_dispatcher/format_handler/ut/common/ut_common.cpp @@ -68,6 +68,7 @@ public: void Validate(const NYql::NUdf::TUnboxedValue& parsedValue) const override { if (!parsedValue) { + UNIT_FAIL("Unexpected NULL value for optional cell"); return; } Value->Validate(parsedValue.GetOptionalValue()); @@ -166,7 +167,7 @@ void TBaseFixture::SetUp(NUnitTest::TTestContext&) { TAutoPtr<NKikimr::TAppPrepare> app = new NKikimr::TAppPrepare(); Runtime.SetLogBackend(NActors::CreateStderrBackend()); Runtime.SetLogPriority(NKikimrServices::FQ_ROW_DISPATCHER, NActors::NLog::PRI_TRACE); - Runtime.SetDispatchTimeout(TDuration::Seconds(5)); + Runtime.SetDispatchTimeout(WAIT_TIMEOUT); Runtime.Initialize(app->Unwrap()); // Init tls context diff --git a/ydb/core/fq/libs/row_dispatcher/format_handler/ut/common/ut_common.h b/ydb/core/fq/libs/row_dispatcher/format_handler/ut/common/ut_common.h index dd1c3d1c9f..ce54150b46 100644 --- a/ydb/core/fq/libs/row_dispatcher/format_handler/ut/common/ut_common.h +++ b/ydb/core/fq/libs/row_dispatcher/format_handler/ut/common/ut_common.h @@ -10,6 +10,8 @@ namespace NFq::NRowDispatcher::NTests { +static constexpr TDuration WAIT_TIMEOUT = TDuration::Seconds(20); + class TBaseFixture : public NUnitTest::TBaseFixture, public TTypeParser { public: // Helper classes for checking serialized rows in multi type format diff --git a/ydb/core/fq/libs/row_dispatcher/format_handler/ut/format_handler_ut.cpp b/ydb/core/fq/libs/row_dispatcher/format_handler/ut/format_handler_ut.cpp index c643dba270..75daa83bc5 100644 --- a/ydb/core/fq/libs/row_dispatcher/format_handler/ut/format_handler_ut.cpp +++ b/ydb/core/fq/libs/row_dispatcher/format_handler/ut/format_handler_ut.cpp @@ -155,7 +155,7 @@ public: FormatHandler = CreateTestFormatHandler(config, settings); } - TStatus MakeClient(const TVector<TSchemaColumn>& columns, const TString& whereFilter, TCallback callback, ui64 expectedFilteredRows = 1) { + [[nodiscard]] TStatus MakeClient(const TVector<TSchemaColumn>& columns, const TString& whereFilter, TCallback callback, ui64 expectedFilteredRows = 1) { ClientIds.emplace_back(ClientIds.size(), 0, 0, 0); auto client = MakeIntrusive<TClientDataConsumer>(ClientIds.back(), columns, whereFilter, callback, expectedFilteredRows); @@ -202,6 +202,30 @@ public: FormatHandler->RemoveClient(clientId); } +public: + static TCallback EmptyCheck() { + return [&](TQueue<std::pair<TRope, TVector<ui64>>>&& data) {}; + } + + static TCallback OneBatchCheck(std::function<void(TRope&& messages, TVector<ui64>&& offsets)> callback) { + return [callback](TQueue<std::pair<TRope, TVector<ui64>>>&& data) { + UNIT_ASSERT_VALUES_EQUAL(data.size(), 1); + auto [messages, offsets] = data.front(); + + UNIT_ASSERT(!offsets.empty()); + callback(std::move(messages), std::move(offsets)); + }; + } + + TCallback OneRowCheck(ui64 offset, const TRow& row) const { + return OneBatchCheck([this, offset, row](TRope&& messages, TVector<ui64>&& offsets) { + UNIT_ASSERT_VALUES_EQUAL(offsets.size(), 1); + UNIT_ASSERT_VALUES_EQUAL(offsets.front(), offset); + + CheckMessageBatch(messages, TBatch().AddRow(row)); + }); + } + private: void ExtractClientsData() { for (auto& client : Clients) { @@ -233,33 +257,13 @@ Y_UNIT_TEST_SUITE(TestFormatHandler) { CheckSuccess(MakeClient( {commonColumn, {"col_first", "[DataType; String]"}}, "WHERE col_first = \"str_first__large__\"", - [&](TQueue<std::pair<TRope, TVector<ui64>>>&& data) { - UNIT_ASSERT_VALUES_EQUAL(data.size(), 1); - - auto [messages, offsets] = data.front(); - UNIT_ASSERT_VALUES_EQUAL(offsets.size(), 1); - UNIT_ASSERT_VALUES_EQUAL(offsets.front(), firstOffset + 1); - - CheckMessageBatch(messages, TBatch().AddRow( - TRow().AddString("event2").AddString("str_first__large__") - )); - } + OneRowCheck(firstOffset + 1, TRow().AddString("event2").AddString("str_first__large__")) )); CheckSuccess(MakeClient( {commonColumn, {"col_second", "[DataType; String]"}}, "WHERE col_second = \"str_second\"", - [&](TQueue<std::pair<TRope, TVector<ui64>>>&& data) { - UNIT_ASSERT_VALUES_EQUAL(data.size(), 1); - - auto [messages, offsets] = data.front(); - UNIT_ASSERT_VALUES_EQUAL(offsets.size(), 1); - UNIT_ASSERT_VALUES_EQUAL(offsets.front(), firstOffset); - - CheckMessageBatch(messages, TBatch().AddRow( - TRow().AddString("event1").AddString("str_second") - )); - } + OneRowCheck(firstOffset, TRow().AddString("event1").AddString("str_second")) )); ParseMessages({ @@ -288,14 +292,10 @@ Y_UNIT_TEST_SUITE(TestFormatHandler) { R"({"col_a": false, "col_b": {"X": "Y"}})" }; - CheckSuccess(MakeClient(schema, "WHERE FALSE", [&](TQueue<std::pair<TRope, TVector<ui64>>>&&) {}, 0)); - - auto trueChacker = [&](TQueue<std::pair<TRope, TVector<ui64>>>&& data) { - UNIT_ASSERT_VALUES_EQUAL(data.size(), 1); - auto [messages, offsets] = data.front(); + CheckSuccess(MakeClient(schema, "WHERE FALSE", EmptyCheck(), 0)); + const auto trueChacker = OneBatchCheck([&](TRope&& messages, TVector<ui64>&& offsets) { TBatch expectedBatch; - UNIT_ASSERT(!offsets.empty()); for (ui64 offset : offsets) { UNIT_ASSERT(offset - firstOffset < testData.size()); expectedBatch.AddRow( @@ -304,7 +304,7 @@ Y_UNIT_TEST_SUITE(TestFormatHandler) { } CheckMessageBatch(messages, expectedBatch); - }; + }); CheckSuccess(MakeClient(schema, "WHERE TRUE", trueChacker, 3)); CheckSuccess(MakeClient(schema, "", trueChacker, 2)); @@ -323,7 +323,7 @@ Y_UNIT_TEST_SUITE(TestFormatHandler) { Y_UNIT_TEST_F(ClientValidation, TFormatHadlerFixture) { const TVector<TSchemaColumn> schema = {{"data", "[DataType; String]"}}; const TString filter = "WHERE FALSE"; - const auto callback = [&](TQueue<std::pair<TRope, TVector<ui64>>>&&) {}; + const auto callback = EmptyCheck(); CheckSuccess(MakeClient(schema, filter, callback, 0)); CheckError( @@ -349,27 +349,12 @@ Y_UNIT_TEST_SUITE(TestFormatHandler) { const ui64 firstOffset = 42; const TSchemaColumn commonColumn = {"com_col", "[DataType; String]"}; - CheckSuccess(MakeClient( - {commonColumn, {"col_first", "[OptionalType; [DataType; Uint8]]"}}, - "WHERE TRUE", - [&](TQueue<std::pair<TRope, TVector<ui64>>>&& data) {}, - 0 - )); + CheckSuccess(MakeClient({commonColumn, {"col_first", "[OptionalType; [DataType; Uint8]]"}}, "WHERE TRUE", EmptyCheck(), 0)); CheckSuccess(MakeClient( {commonColumn, {"col_second", "[DataType; String]"}}, "WHERE col_second = \"str_second\"", - [&](TQueue<std::pair<TRope, TVector<ui64>>>&& data) { - UNIT_ASSERT_VALUES_EQUAL(data.size(), 1); - - auto [messages, offsets] = data.front(); - UNIT_ASSERT_VALUES_EQUAL(offsets.size(), 1); - UNIT_ASSERT_VALUES_EQUAL(offsets.front(), firstOffset); - - CheckMessageBatch(messages, TBatch().AddRow( - TRow().AddString("event1").AddString("str_second") - )); - } + OneRowCheck(firstOffset, TRow().AddString("event1").AddString("str_second")) )); CheckClientError( @@ -379,6 +364,26 @@ Y_UNIT_TEST_SUITE(TestFormatHandler) { TStringBuilder() << "Failed to parse json string at offset " << firstOffset << ", got parsing error for column 'col_first' with type [OptionalType; [DataType; Uint8]]" ); } + + Y_UNIT_TEST_F(ClientErrorWithEmptyFilter, TFormatHadlerFixture) { + const ui64 firstOffset = 42; + const TSchemaColumn commonColumn = {"com_col", "[DataType; String]"}; + + CheckSuccess(MakeClient({commonColumn, {"col_first", "[DataType; String]"}}, "", EmptyCheck(), 0)); + + CheckSuccess(MakeClient( + {commonColumn, {"col_second", "[DataType; String]"}}, + "WHERE col_second = \"str_second\"", + OneRowCheck(firstOffset, TRow().AddString("event1").AddString("str_second")) + )); + + CheckClientError( + {GetMessage(firstOffset, R"({"com_col": "event1", "col_second": "str_second"})")}, + ClientIds[0], + EStatusId::PRECONDITION_FAILED, + TStringBuilder() << "Failed to parse json messages, found 1 missing values from offset " << firstOffset << " in non optional column 'col_first' with type [DataType; String]" + ); + } } } // namespace NFq::NRowDispatcher::NTests diff --git a/ydb/core/fq/libs/row_dispatcher/format_handler/ut/topic_filter_ut.cpp b/ydb/core/fq/libs/row_dispatcher/format_handler/ut/topic_filter_ut.cpp index 7f0297219f..469e4f87c5 100644 --- a/ydb/core/fq/libs/row_dispatcher/format_handler/ut/topic_filter_ut.cpp +++ b/ydb/core/fq/libs/row_dispatcher/format_handler/ut/topic_filter_ut.cpp @@ -64,6 +64,13 @@ public: } } + void OnFilteredBatch(ui64 firstRow, ui64 lastRow) override { + UNIT_ASSERT_C(Started, "Unexpected data for not started filter"); + for (ui64 rowId = firstRow; rowId <= lastRow; ++rowId) { + Callback(rowId); + } + } + void OnFilteredData(ui64 rowId) override { UNIT_ASSERT_C(Started, "Unexpected data for not started filter"); Callback(rowId); diff --git a/ydb/core/fq/libs/row_dispatcher/format_handler/ut/topic_parser_ut.cpp b/ydb/core/fq/libs/row_dispatcher/format_handler/ut/topic_parser_ut.cpp index 0f2d59d810..0895466e72 100644 --- a/ydb/core/fq/libs/row_dispatcher/format_handler/ut/topic_parser_ut.cpp +++ b/ydb/core/fq/libs/row_dispatcher/format_handler/ut/topic_parser_ut.cpp @@ -414,9 +414,9 @@ Y_UNIT_TEST_SUITE(TestJsonParser) { Y_UNIT_TEST_F(JsonStructureValidation, TJsonParserFixture) { CheckSuccess(MakeParser({{"a1", "[OptionalType; [DataType; String]]"}})); CheckColumnError(R"({"a1": Yelse})", 0, EStatusId::BAD_REQUEST, TStringBuilder() << "Failed to parse json string at offset " << FIRST_OFFSET << ", got parsing error for column 'a1' with type [OptionalType; [DataType; String]] subissue: { <main>: Error: Failed to determine json value type, current token: 'Yelse', error: TAPE_ERROR: The JSON document has an improper structure: missing or superfluous commas, braces, missing keys, etc. }"); - CheckBatchError(R"({"a1": "st""r"})", EStatusId::BAD_REQUEST, TStringBuilder() << "Failed to parse json message for offset " << FIRST_OFFSET + 1 << ", json item was corrupted: TAPE_ERROR: The JSON document has an improper structure: missing or superfluous commas, braces, missing keys, etc."); - CheckBatchError(R"({"a1": "x"} {"a1": "y"})", EStatusId::INTERNAL_ERROR, TStringBuilder() << "Failed to parse json messages, expected 1 json rows from offset " << FIRST_OFFSET + 2 << " but got 2"); - CheckBatchError(R"({)", EStatusId::INTERNAL_ERROR, TStringBuilder() << "Failed to parse json messages, expected 1 json rows from offset " << FIRST_OFFSET + 3 << " but got 0"); + CheckBatchError(R"({"a1": "st""r"})", EStatusId::BAD_REQUEST, TStringBuilder() << "Failed to parse json message for offset " << FIRST_OFFSET + 1 << ", json item was corrupted: TAPE_ERROR: The JSON document has an improper structure: missing or superfluous commas, braces, missing keys, etc. Current data batch: {\"a1\": \"st\"\"r\"}"); + CheckBatchError(R"({"a1": "x"} {"a1": "y"})", EStatusId::INTERNAL_ERROR, TStringBuilder() << "Failed to parse json messages, expected 1 json rows from offset " << FIRST_OFFSET + 2 << " but got 2 (expected one json row for each offset from topic API in json each row format, maybe initial data was corrupted or messages is not in json format), current data batch: {\"a1\": \"x\"} {\"a1\": \"y\"}"); + CheckBatchError(R"({)", EStatusId::INTERNAL_ERROR, TStringBuilder() << "Failed to parse json messages, expected 1 json rows from offset " << FIRST_OFFSET + 3 << " but got 0 (expected one json row for each offset from topic API in json each row format, maybe initial data was corrupted or messages is not in json format), current data batch: {"); } } diff --git a/ydb/core/fq/libs/row_dispatcher/topic_session.cpp b/ydb/core/fq/libs/row_dispatcher/topic_session.cpp index c00e8d64fe..c79ee722f3 100644 --- a/ydb/core/fq/libs/row_dispatcher/topic_session.cpp +++ b/ydb/core/fq/libs/row_dispatcher/topic_session.cpp @@ -709,7 +709,7 @@ void TTopicSession::StartClientSession(TClientsInfo& info) { void TTopicSession::Handle(NFq::TEvRowDispatcher::TEvStartSession::TPtr& ev) { const auto& source = ev->Get()->Record.GetSource(); - LOG_ROW_DISPATCHER_INFO("New client: read actor id " << ev->Sender.ToString() << ", predicate: " << source.GetPredicate() << ", offset: " << ev->Get()->Record.GetOffset()); + LOG_ROW_DISPATCHER_INFO("New client: read actor id " << ev->Sender.ToString() << ", predicate: '" << source.GetPredicate() << "', offset: " << ev->Get()->Record.GetOffset()); if (!CheckNewClient(ev)) { return; diff --git a/ydb/core/fq/libs/row_dispatcher/ut/topic_session_ut.cpp b/ydb/core/fq/libs/row_dispatcher/ut/topic_session_ut.cpp index 82eb073241..3ca0528412 100644 --- a/ydb/core/fq/libs/row_dispatcher/ut/topic_session_ut.cpp +++ b/ydb/core/fq/libs/row_dispatcher/ut/topic_session_ut.cpp @@ -23,8 +23,9 @@ namespace { using namespace NKikimr; using namespace NYql::NDq; -const ui64 TimeoutBeforeStartSessionSec = 3; -const ui64 GrabTimeoutSec = 4 * TimeoutBeforeStartSessionSec; +constexpr ui64 TimeoutBeforeStartSessionSec = 3; +constexpr ui64 GrabTimeoutSec = 4 * TimeoutBeforeStartSessionSec; +static_assert(GrabTimeoutSec <= WAIT_TIMEOUT.Seconds()); class TFixture : public NTests::TBaseFixture { public: |