aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorVasily Gerasimov <UgnineSirdis@gmail.com>2022-06-03 16:02:25 +0300
committerVasily Gerasimov <UgnineSirdis@gmail.com>2022-06-03 16:02:25 +0300
commiteb3f94093eb0127dd2068709408ef601e48d921e (patch)
tree6141cd7801b6b8de33bec9714f31e922d00e24d8
parent5b41e4a336cf92a15886794a89cf2567a9efd93a (diff)
downloadydb-eb3f94093eb0127dd2068709408ef601e48d921e.tar.gz
YQ-947 PQ source. Skip messages with write time less than StartingMessageTimestamp
Test Skip messages with write time less than StartingMessageTimestamp ref:ce511291740f35001f92be131a23d175813459f7
-rw-r--r--ydb/library/yql/providers/pq/async_io/dq_pq_read_actor.cpp7
-rw-r--r--ydb/library/yql/providers/pq/async_io/ut/dq_pq_read_actor_ut.cpp15
2 files changed, 21 insertions, 1 deletions
diff --git a/ydb/library/yql/providers/pq/async_io/dq_pq_read_actor.cpp b/ydb/library/yql/providers/pq/async_io/dq_pq_read_actor.cpp
index e7627eff093..7d7f29f750b 100644
--- a/ydb/library/yql/providers/pq/async_io/dq_pq_read_actor.cpp
+++ b/ydb/library/yql/providers/pq/async_io/dq_pq_read_actor.cpp
@@ -111,7 +111,7 @@ public:
, CredentialsProviderFactory(std::move(credentialsProviderFactory))
, SourceParams(std::move(sourceParams))
, ReadParams(std::move(readParams))
- , StartingMessageTimestamp(TInstant::Now())
+ , StartingMessageTimestamp(TInstant::MilliSeconds(TInstant::Now().MilliSeconds())) // this field is serialized as milliseconds, so drop microseconds part to be consistent with storage
, ComputeActorId(computeActorId)
{
Y_UNUSED(HolderFactory);
@@ -311,6 +311,11 @@ private:
LWPROBE(PqReadDataReceived, TString(TStringBuilder() << Self.TxId), Self.SourceParams.GetTopicPath(), data);
SRC_LOG_T("Data received: " << message.DebugString(true));
+ if (message.GetWriteTime() < Self.StartingMessageTimestamp) {
+ SRC_LOG_D("Skip data. StartingMessageTimestamp: " << Self.StartingMessageTimestamp << ". Write time: " << message.GetWriteTime());
+ continue;
+ }
+
Batch.emplace_back(NKikimr::NMiniKQL::MakeString(NUdf::TStringRef(data.Data(), data.Size())));
UsedSpace += data.Size();
}
diff --git a/ydb/library/yql/providers/pq/async_io/ut/dq_pq_read_actor_ut.cpp b/ydb/library/yql/providers/pq/async_io/ut/dq_pq_read_actor_ut.cpp
index ed6579022cf..5b655896b82 100644
--- a/ydb/library/yql/providers/pq/async_io/ut/dq_pq_read_actor_ut.cpp
+++ b/ydb/library/yql/providers/pq/async_io/ut/dq_pq_read_actor_ut.cpp
@@ -20,6 +20,21 @@ Y_UNIT_TEST_SUITE(TDqPqReadActorTest) {
UNIT_ASSERT_EQUAL(result, data);
}
+ Y_UNIT_TEST_F(TestReadFromTopicFromNow, TPqIoTestFixture) {
+ const TString topicName = "ReadFromTopicFromNow";
+
+ const std::vector<TString> oldData = { "-4", "-3", "-2", "-1", "0" };
+ PQWrite(oldData, topicName);
+
+ InitSource(topicName);
+
+ const std::vector<TString> data = { "1", "2", "3", "4" };
+ PQWrite(data, topicName);
+
+ auto result = SourceReadUntil<TString>(UVParser, 4);
+ UNIT_ASSERT_EQUAL(result, data);
+ }
+
Y_UNIT_TEST_F(ReadWithFreeSpace, TPqIoTestFixture) {
const TString topicName = "ReadWithFreeSpace";
InitSource(topicName);