aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorPisarenko Grigoriy <grigoriypisar@ydb.tech>2024-12-19 10:02:01 +0300
committerGitHub <noreply@github.com>2024-12-19 10:02:01 +0300
commit01d95e5e6e0300d9461af676a5cb3bcfafd7fcbd (patch)
treef31f78fab5abf98599708cc0a5aee3d1cb76c99a
parent3317b753e5adf41ed34844c61c431a9f74e1c867 (diff)
downloadydb-01d95e5e6e0300d9461af676a5cb3bcfafd7fcbd.tar.gz
YQ-3975 RD fixed fault for parsing errors without filter (#12707)
-rw-r--r--ydb/core/fq/libs/row_dispatcher/format_handler/filters/filters_set.cpp27
-rw-r--r--ydb/core/fq/libs/row_dispatcher/format_handler/filters/filters_set.h2
-rw-r--r--ydb/core/fq/libs/row_dispatcher/format_handler/format_handler.cpp7
-rw-r--r--ydb/core/fq/libs/row_dispatcher/format_handler/parsers/json_parser.cpp10
-rw-r--r--ydb/core/fq/libs/row_dispatcher/format_handler/ut/common/ut_common.cpp3
-rw-r--r--ydb/core/fq/libs/row_dispatcher/format_handler/ut/common/ut_common.h2
-rw-r--r--ydb/core/fq/libs/row_dispatcher/format_handler/ut/format_handler_ut.cpp101
-rw-r--r--ydb/core/fq/libs/row_dispatcher/format_handler/ut/topic_filter_ut.cpp7
-rw-r--r--ydb/core/fq/libs/row_dispatcher/format_handler/ut/topic_parser_ut.cpp6
-rw-r--r--ydb/core/fq/libs/row_dispatcher/topic_session.cpp2
-rw-r--r--ydb/core/fq/libs/row_dispatcher/ut/topic_session_ut.cpp5
11 files changed, 96 insertions, 76 deletions
diff --git a/ydb/core/fq/libs/row_dispatcher/format_handler/filters/filters_set.cpp b/ydb/core/fq/libs/row_dispatcher/format_handler/filters/filters_set.cpp
index 6065579185..ae88416712 100644
--- a/ydb/core/fq/libs/row_dispatcher/format_handler/filters/filters_set.cpp
+++ b/ydb/core/fq/libs/row_dispatcher/format_handler/filters/filters_set.cpp
@@ -154,16 +154,7 @@ public:
continue;
}
- if (filterHandler.GetPurecalcFilter()) {
- PushToFilter(filterHandler, offsets, columnIndex, values, numberRows);
- continue;
- }
-
- // Clients without filters
- LOG_ROW_DISPATCHER_TRACE("Add " << numberRows << " rows to client " << consumer->GetFilterId() << " without filtering");
- for (ui64 rowId = 0; rowId < numberRows; ++rowId) {
- consumer->OnFilteredData(rowId);
- }
+ PushToFilter(filterHandler, offsets, columnIndex, values, numberRows);
}
Stats.AddFilterLatency(TInstant::Now() - startFilter);
}
@@ -193,7 +184,9 @@ public:
LOG_ROW_DISPATCHER_TRACE("Create filter with id " << filter->GetFilterId());
IPurecalcFilter::TPtr purecalcFilter;
- if (filter->GetWhereFilter()) {
+ if (const auto& predicate = filter->GetWhereFilter()) {
+ LOG_ROW_DISPATCHER_TRACE("Create purecalc filter for predicate '" << predicate << "' (filter id: " << filter->GetFilterId() << ")");
+
auto filterStatus = CreatePurecalcFilter(filter);
if (filterStatus.IsFail()) {
return filterStatus;
@@ -225,9 +218,6 @@ public:
private:
void PushToFilter(const TFilterHandler& filterHandler, const TVector<ui64>& offsets, const TVector<ui64>& columnIndex, const TVector<const TVector<NYql::NUdf::TUnboxedValue>*>& values, ui64 numberRows) {
- const auto filter = filterHandler.GetPurecalcFilter();
- Y_ENSURE(filter, "Expected initialized filter");
-
const auto consumer = filterHandler.GetConsumer();
const auto& columnIds = consumer->GetColumnIds();
@@ -246,8 +236,13 @@ private:
}
}
- LOG_ROW_DISPATCHER_TRACE("Pass " << numberRows << " rows to purecalc filter (client id: " << consumer->GetFilterId() << ")");
- filter->FilterData(result, numberRows);
+ if (const auto filter = filterHandler.GetPurecalcFilter()) {
+ LOG_ROW_DISPATCHER_TRACE("Pass " << numberRows << " rows to purecalc filter (filter id: " << consumer->GetFilterId() << ")");
+ filter->FilterData(result, numberRows);
+ } else if (numberRows) {
+ LOG_ROW_DISPATCHER_TRACE("Add " << numberRows << " rows to client " << consumer->GetFilterId() << " without filtering");
+ consumer->OnFilteredBatch(0, numberRows - 1);
+ }
}
private:
diff --git a/ydb/core/fq/libs/row_dispatcher/format_handler/filters/filters_set.h b/ydb/core/fq/libs/row_dispatcher/format_handler/filters/filters_set.h
index 427e73fa53..57da103b1a 100644
--- a/ydb/core/fq/libs/row_dispatcher/format_handler/filters/filters_set.h
+++ b/ydb/core/fq/libs/row_dispatcher/format_handler/filters/filters_set.h
@@ -17,6 +17,8 @@ public:
virtual const TVector<ui64>& GetColumnIds() const = 0;
virtual TMaybe<ui64> GetNextMessageOffset() const = 0;
+ virtual void OnFilteredBatch(ui64 firstRow, ui64 lastRow) = 0; // inclusive interval [firstRow, lastRow]
+
virtual void OnFilterStarted() = 0;
virtual void OnFilteringError(TStatus status) = 0;
};
diff --git a/ydb/core/fq/libs/row_dispatcher/format_handler/format_handler.cpp b/ydb/core/fq/libs/row_dispatcher/format_handler/format_handler.cpp
index df46b12bef..f492e864b3 100644
--- a/ydb/core/fq/libs/row_dispatcher/format_handler/format_handler.cpp
+++ b/ydb/core/fq/libs/row_dispatcher/format_handler/format_handler.cpp
@@ -207,6 +207,13 @@ class TTopicFormatHandler : public NActors::TActor<TTopicFormatHandler>, public
Client->StartClientSession();
}
+ void OnFilteredBatch(ui64 firstRow, ui64 lastRow) override {
+ LOG_ROW_DISPATCHER_TRACE("OnFilteredBatch, rows [" << firstRow << ", " << lastRow << "]");
+ for (ui64 rowId = firstRow; rowId <= lastRow; ++rowId) {
+ OnFilteredData(rowId);
+ }
+ }
+
void OnFilteredData(ui64 rowId) override {
const ui64 offset = Self.Offsets->at(rowId);
if (const auto nextOffset = Client->GetNextMessageOffset(); nextOffset && offset < *nextOffset) {
diff --git a/ydb/core/fq/libs/row_dispatcher/format_handler/parsers/json_parser.cpp b/ydb/core/fq/libs/row_dispatcher/format_handler/parsers/json_parser.cpp
index 7c4fb95102..46c0196a66 100644
--- a/ydb/core/fq/libs/row_dispatcher/format_handler/parsers/json_parser.cpp
+++ b/ydb/core/fq/libs/row_dispatcher/format_handler/parsers/json_parser.cpp
@@ -414,23 +414,23 @@ protected:
simdjson::ondemand::document_stream documents;
CHECK_JSON_ERROR(Parser.iterate_many(values, size, simdjson::ondemand::DEFAULT_BATCH_SIZE).get(documents)) {
- return TStatus::Fail(EStatusId::BAD_REQUEST, TStringBuilder() << "Failed to parse message batch from offset " << Buffer.Offsets.front() << ", json documents was corrupted: " << simdjson::error_message(error));
+ return TStatus::Fail(EStatusId::BAD_REQUEST, TStringBuilder() << "Failed to parse message batch from offset " << Buffer.Offsets.front() << ", json documents was corrupted: " << simdjson::error_message(error) << " Current data batch: " << TruncateString(std::string_view(values, size)));
}
size_t rowId = 0;
for (auto document : documents) {
if (Y_UNLIKELY(rowId >= Buffer.NumberValues)) {
- return TStatus::Fail(EStatusId::INTERNAL_ERROR, TStringBuilder() << "Failed to parse json messages, expected " << Buffer.NumberValues << " json rows from offset " << Buffer.Offsets.front() << " but got " << rowId + 1);
+ return TStatus::Fail(EStatusId::INTERNAL_ERROR, TStringBuilder() << "Failed to parse json messages, expected " << Buffer.NumberValues << " json rows from offset " << Buffer.Offsets.front() << " but got " << rowId + 1 << " (expected one json row for each offset from topic API in json each row format, maybe initial data was corrupted or messages is not in json format), current data batch: " << TruncateString(std::string_view(values, size)));
}
const ui64 offset = Buffer.Offsets[rowId];
CHECK_JSON_ERROR(document.error()) {
- return TStatus::Fail(EStatusId::BAD_REQUEST, TStringBuilder() << "Failed to parse json message for offset " << offset << ", json document was corrupted: " << simdjson::error_message(error));
+ return TStatus::Fail(EStatusId::BAD_REQUEST, TStringBuilder() << "Failed to parse json message for offset " << offset << ", json document was corrupted: " << simdjson::error_message(error) << " Current data batch: " << TruncateString(std::string_view(values, size)));
}
for (auto item : document.get_object()) {
CHECK_JSON_ERROR(item.error()) {
- return TStatus::Fail(EStatusId::BAD_REQUEST, TStringBuilder() << "Failed to parse json message for offset " << offset << ", json item was corrupted: " << simdjson::error_message(error));
+ return TStatus::Fail(EStatusId::BAD_REQUEST, TStringBuilder() << "Failed to parse json message for offset " << offset << ", json item was corrupted: " << simdjson::error_message(error) << " Current data batch: " << TruncateString(std::string_view(values, size)));
}
const auto it = ColumnsIndex.find(item.escaped_key().value());
@@ -445,7 +445,7 @@ protected:
}
if (Y_UNLIKELY(rowId != Buffer.NumberValues)) {
- return TStatus::Fail(EStatusId::INTERNAL_ERROR, TStringBuilder() << "Failed to parse json messages, expected " << Buffer.NumberValues << " json rows from offset " << Buffer.Offsets.front() << " but got " << rowId);
+ return TStatus::Fail(EStatusId::INTERNAL_ERROR, TStringBuilder() << "Failed to parse json messages, expected " << Buffer.NumberValues << " json rows from offset " << Buffer.Offsets.front() << " but got " << rowId << " (expected one json row for each offset from topic API in json each row format, maybe initial data was corrupted or messages is not in json format), current data batch: " << TruncateString(std::string_view(values, size)));
}
const ui64 firstOffset = Buffer.Offsets.front();
diff --git a/ydb/core/fq/libs/row_dispatcher/format_handler/ut/common/ut_common.cpp b/ydb/core/fq/libs/row_dispatcher/format_handler/ut/common/ut_common.cpp
index b65911e793..f7f3489dd5 100644
--- a/ydb/core/fq/libs/row_dispatcher/format_handler/ut/common/ut_common.cpp
+++ b/ydb/core/fq/libs/row_dispatcher/format_handler/ut/common/ut_common.cpp
@@ -68,6 +68,7 @@ public:
void Validate(const NYql::NUdf::TUnboxedValue& parsedValue) const override {
if (!parsedValue) {
+ UNIT_FAIL("Unexpected NULL value for optional cell");
return;
}
Value->Validate(parsedValue.GetOptionalValue());
@@ -166,7 +167,7 @@ void TBaseFixture::SetUp(NUnitTest::TTestContext&) {
TAutoPtr<NKikimr::TAppPrepare> app = new NKikimr::TAppPrepare();
Runtime.SetLogBackend(NActors::CreateStderrBackend());
Runtime.SetLogPriority(NKikimrServices::FQ_ROW_DISPATCHER, NActors::NLog::PRI_TRACE);
- Runtime.SetDispatchTimeout(TDuration::Seconds(5));
+ Runtime.SetDispatchTimeout(WAIT_TIMEOUT);
Runtime.Initialize(app->Unwrap());
// Init tls context
diff --git a/ydb/core/fq/libs/row_dispatcher/format_handler/ut/common/ut_common.h b/ydb/core/fq/libs/row_dispatcher/format_handler/ut/common/ut_common.h
index dd1c3d1c9f..ce54150b46 100644
--- a/ydb/core/fq/libs/row_dispatcher/format_handler/ut/common/ut_common.h
+++ b/ydb/core/fq/libs/row_dispatcher/format_handler/ut/common/ut_common.h
@@ -10,6 +10,8 @@
namespace NFq::NRowDispatcher::NTests {
+static constexpr TDuration WAIT_TIMEOUT = TDuration::Seconds(20);
+
class TBaseFixture : public NUnitTest::TBaseFixture, public TTypeParser {
public:
// Helper classes for checking serialized rows in multi type format
diff --git a/ydb/core/fq/libs/row_dispatcher/format_handler/ut/format_handler_ut.cpp b/ydb/core/fq/libs/row_dispatcher/format_handler/ut/format_handler_ut.cpp
index c643dba270..75daa83bc5 100644
--- a/ydb/core/fq/libs/row_dispatcher/format_handler/ut/format_handler_ut.cpp
+++ b/ydb/core/fq/libs/row_dispatcher/format_handler/ut/format_handler_ut.cpp
@@ -155,7 +155,7 @@ public:
FormatHandler = CreateTestFormatHandler(config, settings);
}
- TStatus MakeClient(const TVector<TSchemaColumn>& columns, const TString& whereFilter, TCallback callback, ui64 expectedFilteredRows = 1) {
+ [[nodiscard]] TStatus MakeClient(const TVector<TSchemaColumn>& columns, const TString& whereFilter, TCallback callback, ui64 expectedFilteredRows = 1) {
ClientIds.emplace_back(ClientIds.size(), 0, 0, 0);
auto client = MakeIntrusive<TClientDataConsumer>(ClientIds.back(), columns, whereFilter, callback, expectedFilteredRows);
@@ -202,6 +202,30 @@ public:
FormatHandler->RemoveClient(clientId);
}
+public:
+ static TCallback EmptyCheck() {
+ return [&](TQueue<std::pair<TRope, TVector<ui64>>>&& data) {};
+ }
+
+ static TCallback OneBatchCheck(std::function<void(TRope&& messages, TVector<ui64>&& offsets)> callback) {
+ return [callback](TQueue<std::pair<TRope, TVector<ui64>>>&& data) {
+ UNIT_ASSERT_VALUES_EQUAL(data.size(), 1);
+ auto [messages, offsets] = data.front();
+
+ UNIT_ASSERT(!offsets.empty());
+ callback(std::move(messages), std::move(offsets));
+ };
+ }
+
+ TCallback OneRowCheck(ui64 offset, const TRow& row) const {
+ return OneBatchCheck([this, offset, row](TRope&& messages, TVector<ui64>&& offsets) {
+ UNIT_ASSERT_VALUES_EQUAL(offsets.size(), 1);
+ UNIT_ASSERT_VALUES_EQUAL(offsets.front(), offset);
+
+ CheckMessageBatch(messages, TBatch().AddRow(row));
+ });
+ }
+
private:
void ExtractClientsData() {
for (auto& client : Clients) {
@@ -233,33 +257,13 @@ Y_UNIT_TEST_SUITE(TestFormatHandler) {
CheckSuccess(MakeClient(
{commonColumn, {"col_first", "[DataType; String]"}},
"WHERE col_first = \"str_first__large__\"",
- [&](TQueue<std::pair<TRope, TVector<ui64>>>&& data) {
- UNIT_ASSERT_VALUES_EQUAL(data.size(), 1);
-
- auto [messages, offsets] = data.front();
- UNIT_ASSERT_VALUES_EQUAL(offsets.size(), 1);
- UNIT_ASSERT_VALUES_EQUAL(offsets.front(), firstOffset + 1);
-
- CheckMessageBatch(messages, TBatch().AddRow(
- TRow().AddString("event2").AddString("str_first__large__")
- ));
- }
+ OneRowCheck(firstOffset + 1, TRow().AddString("event2").AddString("str_first__large__"))
));
CheckSuccess(MakeClient(
{commonColumn, {"col_second", "[DataType; String]"}},
"WHERE col_second = \"str_second\"",
- [&](TQueue<std::pair<TRope, TVector<ui64>>>&& data) {
- UNIT_ASSERT_VALUES_EQUAL(data.size(), 1);
-
- auto [messages, offsets] = data.front();
- UNIT_ASSERT_VALUES_EQUAL(offsets.size(), 1);
- UNIT_ASSERT_VALUES_EQUAL(offsets.front(), firstOffset);
-
- CheckMessageBatch(messages, TBatch().AddRow(
- TRow().AddString("event1").AddString("str_second")
- ));
- }
+ OneRowCheck(firstOffset, TRow().AddString("event1").AddString("str_second"))
));
ParseMessages({
@@ -288,14 +292,10 @@ Y_UNIT_TEST_SUITE(TestFormatHandler) {
R"({"col_a": false, "col_b": {"X": "Y"}})"
};
- CheckSuccess(MakeClient(schema, "WHERE FALSE", [&](TQueue<std::pair<TRope, TVector<ui64>>>&&) {}, 0));
-
- auto trueChacker = [&](TQueue<std::pair<TRope, TVector<ui64>>>&& data) {
- UNIT_ASSERT_VALUES_EQUAL(data.size(), 1);
- auto [messages, offsets] = data.front();
+ CheckSuccess(MakeClient(schema, "WHERE FALSE", EmptyCheck(), 0));
+ const auto trueChacker = OneBatchCheck([&](TRope&& messages, TVector<ui64>&& offsets) {
TBatch expectedBatch;
- UNIT_ASSERT(!offsets.empty());
for (ui64 offset : offsets) {
UNIT_ASSERT(offset - firstOffset < testData.size());
expectedBatch.AddRow(
@@ -304,7 +304,7 @@ Y_UNIT_TEST_SUITE(TestFormatHandler) {
}
CheckMessageBatch(messages, expectedBatch);
- };
+ });
CheckSuccess(MakeClient(schema, "WHERE TRUE", trueChacker, 3));
CheckSuccess(MakeClient(schema, "", trueChacker, 2));
@@ -323,7 +323,7 @@ Y_UNIT_TEST_SUITE(TestFormatHandler) {
Y_UNIT_TEST_F(ClientValidation, TFormatHadlerFixture) {
const TVector<TSchemaColumn> schema = {{"data", "[DataType; String]"}};
const TString filter = "WHERE FALSE";
- const auto callback = [&](TQueue<std::pair<TRope, TVector<ui64>>>&&) {};
+ const auto callback = EmptyCheck();
CheckSuccess(MakeClient(schema, filter, callback, 0));
CheckError(
@@ -349,27 +349,12 @@ Y_UNIT_TEST_SUITE(TestFormatHandler) {
const ui64 firstOffset = 42;
const TSchemaColumn commonColumn = {"com_col", "[DataType; String]"};
- CheckSuccess(MakeClient(
- {commonColumn, {"col_first", "[OptionalType; [DataType; Uint8]]"}},
- "WHERE TRUE",
- [&](TQueue<std::pair<TRope, TVector<ui64>>>&& data) {},
- 0
- ));
+ CheckSuccess(MakeClient({commonColumn, {"col_first", "[OptionalType; [DataType; Uint8]]"}}, "WHERE TRUE", EmptyCheck(), 0));
CheckSuccess(MakeClient(
{commonColumn, {"col_second", "[DataType; String]"}},
"WHERE col_second = \"str_second\"",
- [&](TQueue<std::pair<TRope, TVector<ui64>>>&& data) {
- UNIT_ASSERT_VALUES_EQUAL(data.size(), 1);
-
- auto [messages, offsets] = data.front();
- UNIT_ASSERT_VALUES_EQUAL(offsets.size(), 1);
- UNIT_ASSERT_VALUES_EQUAL(offsets.front(), firstOffset);
-
- CheckMessageBatch(messages, TBatch().AddRow(
- TRow().AddString("event1").AddString("str_second")
- ));
- }
+ OneRowCheck(firstOffset, TRow().AddString("event1").AddString("str_second"))
));
CheckClientError(
@@ -379,6 +364,26 @@ Y_UNIT_TEST_SUITE(TestFormatHandler) {
TStringBuilder() << "Failed to parse json string at offset " << firstOffset << ", got parsing error for column 'col_first' with type [OptionalType; [DataType; Uint8]]"
);
}
+
+ Y_UNIT_TEST_F(ClientErrorWithEmptyFilter, TFormatHadlerFixture) {
+ const ui64 firstOffset = 42;
+ const TSchemaColumn commonColumn = {"com_col", "[DataType; String]"};
+
+ CheckSuccess(MakeClient({commonColumn, {"col_first", "[DataType; String]"}}, "", EmptyCheck(), 0));
+
+ CheckSuccess(MakeClient(
+ {commonColumn, {"col_second", "[DataType; String]"}},
+ "WHERE col_second = \"str_second\"",
+ OneRowCheck(firstOffset, TRow().AddString("event1").AddString("str_second"))
+ ));
+
+ CheckClientError(
+ {GetMessage(firstOffset, R"({"com_col": "event1", "col_second": "str_second"})")},
+ ClientIds[0],
+ EStatusId::PRECONDITION_FAILED,
+ TStringBuilder() << "Failed to parse json messages, found 1 missing values from offset " << firstOffset << " in non optional column 'col_first' with type [DataType; String]"
+ );
+ }
}
} // namespace NFq::NRowDispatcher::NTests
diff --git a/ydb/core/fq/libs/row_dispatcher/format_handler/ut/topic_filter_ut.cpp b/ydb/core/fq/libs/row_dispatcher/format_handler/ut/topic_filter_ut.cpp
index 7f0297219f..469e4f87c5 100644
--- a/ydb/core/fq/libs/row_dispatcher/format_handler/ut/topic_filter_ut.cpp
+++ b/ydb/core/fq/libs/row_dispatcher/format_handler/ut/topic_filter_ut.cpp
@@ -64,6 +64,13 @@ public:
}
}
+ void OnFilteredBatch(ui64 firstRow, ui64 lastRow) override {
+ UNIT_ASSERT_C(Started, "Unexpected data for not started filter");
+ for (ui64 rowId = firstRow; rowId <= lastRow; ++rowId) {
+ Callback(rowId);
+ }
+ }
+
void OnFilteredData(ui64 rowId) override {
UNIT_ASSERT_C(Started, "Unexpected data for not started filter");
Callback(rowId);
diff --git a/ydb/core/fq/libs/row_dispatcher/format_handler/ut/topic_parser_ut.cpp b/ydb/core/fq/libs/row_dispatcher/format_handler/ut/topic_parser_ut.cpp
index 0f2d59d810..0895466e72 100644
--- a/ydb/core/fq/libs/row_dispatcher/format_handler/ut/topic_parser_ut.cpp
+++ b/ydb/core/fq/libs/row_dispatcher/format_handler/ut/topic_parser_ut.cpp
@@ -414,9 +414,9 @@ Y_UNIT_TEST_SUITE(TestJsonParser) {
Y_UNIT_TEST_F(JsonStructureValidation, TJsonParserFixture) {
CheckSuccess(MakeParser({{"a1", "[OptionalType; [DataType; String]]"}}));
CheckColumnError(R"({"a1": Yelse})", 0, EStatusId::BAD_REQUEST, TStringBuilder() << "Failed to parse json string at offset " << FIRST_OFFSET << ", got parsing error for column 'a1' with type [OptionalType; [DataType; String]] subissue: { <main>: Error: Failed to determine json value type, current token: 'Yelse', error: TAPE_ERROR: The JSON document has an improper structure: missing or superfluous commas, braces, missing keys, etc. }");
- CheckBatchError(R"({"a1": "st""r"})", EStatusId::BAD_REQUEST, TStringBuilder() << "Failed to parse json message for offset " << FIRST_OFFSET + 1 << ", json item was corrupted: TAPE_ERROR: The JSON document has an improper structure: missing or superfluous commas, braces, missing keys, etc.");
- CheckBatchError(R"({"a1": "x"} {"a1": "y"})", EStatusId::INTERNAL_ERROR, TStringBuilder() << "Failed to parse json messages, expected 1 json rows from offset " << FIRST_OFFSET + 2 << " but got 2");
- CheckBatchError(R"({)", EStatusId::INTERNAL_ERROR, TStringBuilder() << "Failed to parse json messages, expected 1 json rows from offset " << FIRST_OFFSET + 3 << " but got 0");
+ CheckBatchError(R"({"a1": "st""r"})", EStatusId::BAD_REQUEST, TStringBuilder() << "Failed to parse json message for offset " << FIRST_OFFSET + 1 << ", json item was corrupted: TAPE_ERROR: The JSON document has an improper structure: missing or superfluous commas, braces, missing keys, etc. Current data batch: {\"a1\": \"st\"\"r\"}");
+ CheckBatchError(R"({"a1": "x"} {"a1": "y"})", EStatusId::INTERNAL_ERROR, TStringBuilder() << "Failed to parse json messages, expected 1 json rows from offset " << FIRST_OFFSET + 2 << " but got 2 (expected one json row for each offset from topic API in json each row format, maybe initial data was corrupted or messages is not in json format), current data batch: {\"a1\": \"x\"} {\"a1\": \"y\"}");
+ CheckBatchError(R"({)", EStatusId::INTERNAL_ERROR, TStringBuilder() << "Failed to parse json messages, expected 1 json rows from offset " << FIRST_OFFSET + 3 << " but got 0 (expected one json row for each offset from topic API in json each row format, maybe initial data was corrupted or messages is not in json format), current data batch: {");
}
}
diff --git a/ydb/core/fq/libs/row_dispatcher/topic_session.cpp b/ydb/core/fq/libs/row_dispatcher/topic_session.cpp
index c00e8d64fe..c79ee722f3 100644
--- a/ydb/core/fq/libs/row_dispatcher/topic_session.cpp
+++ b/ydb/core/fq/libs/row_dispatcher/topic_session.cpp
@@ -709,7 +709,7 @@ void TTopicSession::StartClientSession(TClientsInfo& info) {
void TTopicSession::Handle(NFq::TEvRowDispatcher::TEvStartSession::TPtr& ev) {
const auto& source = ev->Get()->Record.GetSource();
- LOG_ROW_DISPATCHER_INFO("New client: read actor id " << ev->Sender.ToString() << ", predicate: " << source.GetPredicate() << ", offset: " << ev->Get()->Record.GetOffset());
+ LOG_ROW_DISPATCHER_INFO("New client: read actor id " << ev->Sender.ToString() << ", predicate: '" << source.GetPredicate() << "', offset: " << ev->Get()->Record.GetOffset());
if (!CheckNewClient(ev)) {
return;
diff --git a/ydb/core/fq/libs/row_dispatcher/ut/topic_session_ut.cpp b/ydb/core/fq/libs/row_dispatcher/ut/topic_session_ut.cpp
index 82eb073241..3ca0528412 100644
--- a/ydb/core/fq/libs/row_dispatcher/ut/topic_session_ut.cpp
+++ b/ydb/core/fq/libs/row_dispatcher/ut/topic_session_ut.cpp
@@ -23,8 +23,9 @@ namespace {
using namespace NKikimr;
using namespace NYql::NDq;
-const ui64 TimeoutBeforeStartSessionSec = 3;
-const ui64 GrabTimeoutSec = 4 * TimeoutBeforeStartSessionSec;
+constexpr ui64 TimeoutBeforeStartSessionSec = 3;
+constexpr ui64 GrabTimeoutSec = 4 * TimeoutBeforeStartSessionSec;
+static_assert(GrabTimeoutSec <= WAIT_TIMEOUT.Seconds());
class TFixture : public NTests::TBaseFixture {
public: