diff options
author | Vasily Gerasimov <UgnineSirdis@gmail.com> | 2022-06-03 16:02:25 +0300 |
---|---|---|
committer | Vasily Gerasimov <UgnineSirdis@gmail.com> | 2022-06-03 16:02:25 +0300 |
commit | eb3f94093eb0127dd2068709408ef601e48d921e (patch) | |
tree | 6141cd7801b6b8de33bec9714f31e922d00e24d8 | |
parent | 5b41e4a336cf92a15886794a89cf2567a9efd93a (diff) | |
download | ydb-eb3f94093eb0127dd2068709408ef601e48d921e.tar.gz |
YQ-947 PQ source. Skip messages with write time less than StartingMessageTimestamp
Test
Skip messages with write time less than StartingMessageTimestamp
ref:ce511291740f35001f92be131a23d175813459f7
-rw-r--r-- | ydb/library/yql/providers/pq/async_io/dq_pq_read_actor.cpp | 7 | ||||
-rw-r--r-- | ydb/library/yql/providers/pq/async_io/ut/dq_pq_read_actor_ut.cpp | 15 |
2 files changed, 21 insertions, 1 deletions
diff --git a/ydb/library/yql/providers/pq/async_io/dq_pq_read_actor.cpp b/ydb/library/yql/providers/pq/async_io/dq_pq_read_actor.cpp index e7627eff093..7d7f29f750b 100644 --- a/ydb/library/yql/providers/pq/async_io/dq_pq_read_actor.cpp +++ b/ydb/library/yql/providers/pq/async_io/dq_pq_read_actor.cpp @@ -111,7 +111,7 @@ public: , CredentialsProviderFactory(std::move(credentialsProviderFactory)) , SourceParams(std::move(sourceParams)) , ReadParams(std::move(readParams)) - , StartingMessageTimestamp(TInstant::Now()) + , StartingMessageTimestamp(TInstant::MilliSeconds(TInstant::Now().MilliSeconds())) // this field is serialized as milliseconds, so drop microseconds part to be consistent with storage , ComputeActorId(computeActorId) { Y_UNUSED(HolderFactory); @@ -311,6 +311,11 @@ private: LWPROBE(PqReadDataReceived, TString(TStringBuilder() << Self.TxId), Self.SourceParams.GetTopicPath(), data); SRC_LOG_T("Data received: " << message.DebugString(true)); + if (message.GetWriteTime() < Self.StartingMessageTimestamp) { + SRC_LOG_D("Skip data. StartingMessageTimestamp: " << Self.StartingMessageTimestamp << ". Write time: " << message.GetWriteTime()); + continue; + } + Batch.emplace_back(NKikimr::NMiniKQL::MakeString(NUdf::TStringRef(data.Data(), data.Size()))); UsedSpace += data.Size(); } diff --git a/ydb/library/yql/providers/pq/async_io/ut/dq_pq_read_actor_ut.cpp b/ydb/library/yql/providers/pq/async_io/ut/dq_pq_read_actor_ut.cpp index ed6579022cf..5b655896b82 100644 --- a/ydb/library/yql/providers/pq/async_io/ut/dq_pq_read_actor_ut.cpp +++ b/ydb/library/yql/providers/pq/async_io/ut/dq_pq_read_actor_ut.cpp @@ -20,6 +20,21 @@ Y_UNIT_TEST_SUITE(TDqPqReadActorTest) { UNIT_ASSERT_EQUAL(result, data); } + Y_UNIT_TEST_F(TestReadFromTopicFromNow, TPqIoTestFixture) { + const TString topicName = "ReadFromTopicFromNow"; + + const std::vector<TString> oldData = { "-4", "-3", "-2", "-1", "0" }; + PQWrite(oldData, topicName); + + InitSource(topicName); + + const std::vector<TString> data = { "1", "2", "3", "4" }; + PQWrite(data, topicName); + + auto result = SourceReadUntil<TString>(UVParser, 4); + UNIT_ASSERT_EQUAL(result, data); + } + Y_UNIT_TEST_F(ReadWithFreeSpace, TPqIoTestFixture) { const TString topicName = "ReadWithFreeSpace"; InitSource(topicName); |