diff options
author | Nikolay Shestakov <tesseract@ydb.tech> | 2025-03-13 20:22:01 +0500 |
---|---|---|
committer | GitHub <noreply@github.com> | 2025-03-13 15:22:01 +0000 |
commit | 7f029bce6f19c4351b6b75006236a61005a8097b (patch) | |
tree | 95102d94c9f6a71a3eab7b8cc8047ac948cc9d69 | |
parent | da5c76b964fec016ed66d168d153b83cf4ee3b2a (diff) | |
download | ydb-7f029bce6f19c4351b6b75006236a61005a8097b.tar.gz |
Added a batching for transferring from a topic to a table (#15637)
Co-authored-by: Ilnaz Nizametdinov <i.nizametdinov@gmail.com>
-rw-r--r-- | ydb/core/tx/replication/service/topic_reader.cpp | 5 | ||||
-rw-r--r-- | ydb/core/tx/replication/service/transfer_writer.cpp | 131 | ||||
-rw-r--r-- | ydb/core/tx/replication/service/worker.cpp | 11 | ||||
-rw-r--r-- | ydb/core/tx/replication/service/worker.h | 8 | ||||
-rw-r--r-- | ydb/core/tx/replication/ydb_proxy/ydb_proxy.cpp | 4 | ||||
-rw-r--r-- | ydb/core/tx/replication/ydb_proxy/ydb_proxy.h | 10 | ||||
-rw-r--r-- | ydb/core/tx/replication/ydb_proxy/ydb_proxy_ut.cpp | 8 | ||||
-rw-r--r-- | ydb/tests/functional/transfer/main.cpp | 6 |
8 files changed, 152 insertions, 31 deletions
diff --git a/ydb/core/tx/replication/service/topic_reader.cpp b/ydb/core/tx/replication/service/topic_reader.cpp index ef7f231cf6..e25c3b708f 100644 --- a/ydb/core/tx/replication/service/topic_reader.cpp +++ b/ydb/core/tx/replication/service/topic_reader.cpp @@ -48,7 +48,10 @@ class TRemoteTopicReader: public TActor<TRemoteTopicReader> { LOG_D("Handle " << ev->Get()->ToString()); Y_ABORT_UNLESS(ReadSession); - Send(ReadSession, new TEvYdbProxy::TEvReadTopicRequest()); + auto settings = TEvYdbProxy::TReadTopicSettings() + .SkipCommit(ev->Get()->SkipCommit); + + Send(ReadSession, new TEvYdbProxy::TEvReadTopicRequest(settings)); } void Handle(TEvYdbProxy::TEvReadTopicResponse::TPtr& ev) { diff --git a/ydb/core/tx/replication/service/transfer_writer.cpp b/ydb/core/tx/replication/service/transfer_writer.cpp index cac69ecd69..845730a199 100644 --- a/ydb/core/tx/replication/service/transfer_writer.cpp +++ b/ydb/core/tx/replication/service/transfer_writer.cpp @@ -289,6 +289,10 @@ public: Batcher->AddData(data); } + i64 BatchSize() const { + return Batcher->GetMemory(); + } + virtual NKqp::IDataBatcherPtr CreateDataBatcher() = 0; virtual bool Flush() = 0; @@ -322,24 +326,40 @@ public: } bool Flush() override { + auto doWrite = [&]() { + Issues = std::make_shared<NYql::TIssues>(); + + NTxProxy::DoLongTxWriteSameMailbox(TActivationContext::AsActorContext(), SelfId /* replyTo */, { /* longTxId */ }, { /* dedupId */ }, + NavigateResult->DatabaseName, Path, NavigateResult, Data, Issues, true /* noTxWrite */); + }; + + if (Data) { + doWrite(); + return true; + } + + if (!Batcher || !Batcher->GetMemory()) { + return false; + } + NKqp::IDataBatchPtr batch = Batcher->Build(); auto data = batch->ExtractBatch(); - auto arrowBatch = reinterpret_pointer_cast<arrow::RecordBatch>(data); - Y_VERIFY(arrowBatch); + Data = reinterpret_pointer_cast<arrow::RecordBatch>(data); + Y_VERIFY(Data); - Issues = std::make_shared<NYql::TIssues>(); - - NTxProxy::DoLongTxWriteSameMailbox(TActivationContext::AsActorContext(), SelfId /* replyTo */, { /* longTxId */ }, { /* dedupId */ }, - NavigateResult->DatabaseName, Path, NavigateResult, arrowBatch, Issues, true /* noTxWrite */); - + doWrite(); return true; } TString Handle(TEvents::TEvCompleted::TPtr& ev) override { if (ev->Get()->Status == Ydb::StatusIds::SUCCESS) { + Data.reset(); + Issues.reset(); + return ""; } + return Issues->ToOneLineString(); } @@ -347,6 +367,7 @@ private: std::shared_ptr<const NSchemeCache::TSchemeCacheNavigate> NavigateResult; TString Path; + std::shared_ptr<arrow::RecordBatch> Data; std::shared_ptr<NYql::TIssues> Issues; }; @@ -376,13 +397,22 @@ private: const std::vector<ui32> WriteIndex; }; -} // anonymous namespace +enum class ETag { + FlushTimeout, + RetryFlush +}; +} // anonymous namespace class TTransferWriter : public TActorBootstrapped<TTransferWriter> , private NSchemeCache::TSchemeCacheHelpers { + static constexpr i64 ExpectedBatchSize = 8_MB; + static constexpr TDuration FlushInterval = TDuration::Seconds(5); + static constexpr TDuration MinRetryDelay = TDuration::Seconds(1); + static constexpr TDuration MaxRetryDelay = TDuration::Minutes(10); + public: void Bootstrap() { GetTableScheme(); @@ -503,7 +533,6 @@ private: hFunc(TEvWorker::TEvHandshake, Handle); hFunc(TEvWorker::TEvData, HoldHandle); - //sFunc(TEvents::TEvWakeup, SendS3Request); sFunc(TEvents::TEvPoison, PassAway); } } @@ -545,10 +574,18 @@ private: void StartWork() { Become(&TThis::StateWork); + Attempt = 0; + Delay = MinRetryDelay; + if (PendingRecords) { ProcessData(PendingPartitionId, *PendingRecords); PendingRecords.reset(); } + + if (!WakeupScheduled) { + WakeupScheduled = true; + Schedule(FlushInterval, new TEvents::TEvWakeup(ui32(ETag::FlushTimeout))); + } } STFUNC(StateWork) { @@ -557,6 +594,7 @@ private: hFunc(TEvWorker::TEvData, Handle); sFunc(TEvents::TEvPoison, PassAway); + sFunc(TEvents::TEvWakeup, TryFlush); } } @@ -568,6 +606,7 @@ private: if (ProcessingError) { Leave(ProcessingErrorStatus, *ProcessingError); } else { + PollSent = true; Send(Worker, new TEvWorker::TEvHandshake()); } } @@ -579,7 +618,7 @@ private: } void Handle(TEvWorker::TEvData::TPtr& ev) { - LOG_D("Handle TEvData " << ev->Get()->ToString()); + LOG_D("Handle TEvData record count: " << ev->Get()->Records.size()); ProcessData(ev->Get()->PartitionId, ev->Get()->Records); } @@ -589,7 +628,12 @@ private: return; } + PollSent = false; + TableState->EnshureDataBatch(); + if (!LastWriteTime) { + LastWriteTime = TInstant::Now(); + } for (auto& message : records) { NYdb::NTopic::NPurecalc::TMessage input; @@ -606,15 +650,34 @@ private: TableState->AddData(m->Data); } } catch (const yexception& e) { + ProcessingErrorStatus = TEvWorker::TEvGone::EStatus::SCHEME_ERROR; ProcessingError = TStringBuilder() << "Error transform message: " << e.what(); break; } } - if (TableState->Flush()) { - Become(&TThis::StateWrite); - } else if (ProcessingError) { + if (TableState->BatchSize() >= ExpectedBatchSize || *LastWriteTime < TInstant::Now() - FlushInterval) { + if (TableState->Flush()) { + LastWriteTime.reset(); + return Become(&TThis::StateWrite); + } + } + + if (ProcessingError) { LogCritAndLeave(*ProcessingError); + } else { + PollSent = true; + Send(Worker, new TEvWorker::TEvPoll(true)); + } + } + + void TryFlush() { + if (LastWriteTime && LastWriteTime < TInstant::Now() - FlushInterval && TableState->Flush()) { + LastWriteTime.reset(); + WakeupScheduled = false; + Become(&TThis::StateWrite); + } else { + Schedule(FlushInterval, new TEvents::TEvWakeup(ui32(ETag::FlushTimeout))); } } @@ -626,6 +689,7 @@ private: hFunc(TEvWorker::TEvData, HoldHandle); sFunc(TEvents::TEvPoison, PassAway); + hFunc(TEvents::TEvWakeup, WriteWakeup); } } @@ -635,20 +699,43 @@ private: << " status# " << ev->Get()->Status); auto error = TableState->Handle(ev); - if (error) { - return LogCritAndLeave(error); + if (ui32(NYdb::EStatus::SUCCESS) != ev->Get()->Status && Delay < MaxRetryDelay && !PendingLeave()) { + return LogWarnAndRetry(error); + } + + if (error && !ProcessingError) { + ProcessingError = error; } if (ProcessingError) { return LogCritAndLeave(*ProcessingError); } - Send(Worker, new TEvWorker::TEvPoll()); + if (!PollSent) { + PollSent = true; + Send(Worker, new TEvWorker::TEvPoll()); + } + return StartWork(); } + void WriteWakeup(TEvents::TEvWakeup::TPtr& ev) { + switch(ETag(ev->Get()->Tag)) { + case ETag::FlushTimeout: + WakeupScheduled = false; + break; + case ETag::RetryFlush: + TableState->Flush(); + break; + } + } + private: + bool PendingLeave() { + return PendingRecords && PendingRecords->empty(); + } + TStringBuf GetLogPrefix() const { if (!LogPrefix) { LogPrefix = TStringBuilder() @@ -673,9 +760,10 @@ private: } void Retry() { - Delay = Min(Delay * ++Attempt, MaxDelay); + Delay = Attempt++ ? Delay * 2 : MinRetryDelay; + Delay = Min(Delay, MaxRetryDelay); const TDuration random = TDuration::FromValue(TAppData::RandomProvider->GenRand64() % Delay.MicroSeconds()); - this->Schedule(Delay + random, new TEvents::TEvWakeup()); + this->Schedule(Delay + random, new TEvents::TEvWakeup(ui32(ETag::RetryFlush))); } void Leave(TEvWorker::TEvGone::EStatus status, const TString& message) { @@ -719,6 +807,10 @@ private: size_t InFlightCompilationId = 0; TProgramHolder::TPtr ProgramHolder; + mutable bool WakeupScheduled = false; + mutable bool PollSent = false; + mutable std::optional<TInstant> LastWriteTime; + mutable TMaybe<TString> LogPrefix; mutable TEvWorker::TEvGone::EStatus ProcessingErrorStatus; @@ -728,8 +820,7 @@ private: std::optional<TVector<TTopicMessage>> PendingRecords; ui32 Attempt = 0; - TDuration Delay = TDuration::Minutes(1); - static constexpr TDuration MaxDelay = TDuration::Minutes(10); + TDuration Delay = MinRetryDelay; }; // TTransferWriter diff --git a/ydb/core/tx/replication/service/worker.cpp b/ydb/core/tx/replication/service/worker.cpp index 008be6aadd..d66eacf1ce 100644 --- a/ydb/core/tx/replication/service/worker.cpp +++ b/ydb/core/tx/replication/service/worker.cpp @@ -14,6 +14,17 @@ namespace NKikimr::NReplication::NService { +TEvWorker::TEvPoll::TEvPoll(bool skipCommit) + : SkipCommit(skipCommit) +{ +} + +TString TEvWorker::TEvPoll::ToString() const { + return TStringBuilder() << ToStringHeader() << " {" + << " SkipCommit: " << SkipCommit + << " }"; +} + TEvWorker::TEvData::TEvData(ui32 partitionId, const TString& source, const TVector<TTopicMessage>& records) : PartitionId(partitionId) , Source(source) diff --git a/ydb/core/tx/replication/service/worker.h b/ydb/core/tx/replication/service/worker.h index 2374384618..1c1dc14133 100644 --- a/ydb/core/tx/replication/service/worker.h +++ b/ydb/core/tx/replication/service/worker.h @@ -31,7 +31,13 @@ struct TEvWorker { static_assert(EvEnd < EventSpaceEnd(TKikimrEvents::ES_REPLICATION_WORKER)); struct TEvHandshake: public TEventLocal<TEvHandshake, EvHandshake> {}; - struct TEvPoll: public TEventLocal<TEvPoll, EvPoll> {}; + + struct TEvPoll: public TEventLocal<TEvPoll, EvPoll> { + bool SkipCommit; + + explicit TEvPoll(bool skipCommit = false); + TString ToString() const override; + }; struct TEvData: public TEventLocal<TEvData, EvData> { ui32 PartitionId; diff --git a/ydb/core/tx/replication/ydb_proxy/ydb_proxy.cpp b/ydb/core/tx/replication/ydb_proxy/ydb_proxy.cpp index e4005159e3..87484335ab 100644 --- a/ydb/core/tx/replication/ydb_proxy/ydb_proxy.cpp +++ b/ydb/core/tx/replication/ydb_proxy/ydb_proxy.cpp @@ -177,7 +177,9 @@ private: class TTopicReader: public TBaseProxyActor<TTopicReader> { void Handle(TEvYdbProxy::TEvReadTopicRequest::TPtr& ev) { - if (AutoCommit) { + auto args = std::move(ev->Get()->GetArgs()); + const auto& settings = std::get<TEvYdbProxy::TReadTopicSettings>(args); + if (AutoCommit && !settings.SkipCommit_) { DeferredCommit.Commit(); } WaitEvent(ev->Sender, ev->Cookie); diff --git a/ydb/core/tx/replication/ydb_proxy/ydb_proxy.h b/ydb/core/tx/replication/ydb_proxy/ydb_proxy.h index 28b83425dd..e58b9f69be 100644 --- a/ydb/core/tx/replication/ydb_proxy/ydb_proxy.h +++ b/ydb/core/tx/replication/ydb_proxy/ydb_proxy.h @@ -159,6 +159,14 @@ struct TEvYdbProxy { #undef PROXY_METHOD }; + struct TReadTopicSettings { + using TSelf = TReadTopicSettings; + + // This option allows you to postpone the auto-commit of read messages. All previously + // read messages will be commited upon subsequent receipt of TEvPoll with SkipCommit set to false. + FLUENT_SETTING_DEFAULT(bool, SkipCommit, false); + }; + struct TReadTopicResult { explicit TReadTopicResult(const NYdb::NTopic::TReadSessionEvent::TDataReceivedEvent& event) { PartitionId = event.GetPartitionSession()->GetPartitionId(); @@ -234,7 +242,7 @@ struct TEvYdbProxy { DEFINE_GENERIC_REQUEST_RESPONSE(DescribeTopic, NYdb::NTopic::TDescribeTopicResult, TString, NYdb::NTopic::TDescribeTopicSettings); DEFINE_GENERIC_REQUEST_RESPONSE(DescribeConsumer, NYdb::NTopic::TDescribeConsumerResult, TString, TString, NYdb::NTopic::TDescribeConsumerSettings); DEFINE_GENERIC_REQUEST_RESPONSE(CreateTopicReader, TActorId, TTopicReaderSettings); - DEFINE_GENERIC_REQUEST_RESPONSE(ReadTopic, TReadTopicResult, void); + DEFINE_GENERIC_REQUEST_RESPONSE(ReadTopic, TReadTopicResult, TReadTopicSettings); DEFINE_GENERIC_REQUEST_RESPONSE(CommitOffset, NYdb::TStatus, TString, ui64, TString, ui64, NYdb::NTopic::TCommitOffsetSettings); #undef DEFINE_GENERIC_REQUEST_RESPONSE diff --git a/ydb/core/tx/replication/ydb_proxy/ydb_proxy_ut.cpp b/ydb/core/tx/replication/ydb_proxy/ydb_proxy_ut.cpp index fe5b963301..6de5a771f5 100644 --- a/ydb/core/tx/replication/ydb_proxy/ydb_proxy_ut.cpp +++ b/ydb/core/tx/replication/ydb_proxy/ydb_proxy_ut.cpp @@ -611,7 +611,7 @@ Y_UNIT_TEST_SUITE(YdbProxy) { template <typename Env> TEvYdbProxy::TReadTopicResult ReadTopicData(Env& env, TActorId& reader, const TString& topicPath) { do { - env.SendAsync(reader, new TEvYdbProxy::TEvReadTopicRequest()); + env.SendAsync(reader, new TEvYdbProxy::TEvReadTopicRequest(TEvYdbProxy::TReadTopicSettings())); try { TAutoPtr<IEventHandle> ev; @@ -659,13 +659,13 @@ Y_UNIT_TEST_SUITE(YdbProxy) { } // wait next event - env.SendAsync(reader, new TEvYdbProxy::TEvReadTopicRequest()); + env.SendAsync(reader, new TEvYdbProxy::TEvReadTopicRequest(TEvYdbProxy::TReadTopicSettings())); TActorId newReader; do { newReader = CreateTopicReader(env, "/Root/topic"); // wait next event - env.SendAsync(newReader, new TEvYdbProxy::TEvReadTopicRequest()); + env.SendAsync(newReader, new TEvYdbProxy::TEvReadTopicRequest(TEvYdbProxy::TReadTopicSettings())); // wait event from previous session try { @@ -702,7 +702,7 @@ Y_UNIT_TEST_SUITE(YdbProxy) { TEnv env; auto reader = CreateTopicReader(env, "/Root/topic"); - auto ev = env.Send<TEvYdbProxy::TEvTopicReaderGone>(reader, new TEvYdbProxy::TEvReadTopicRequest()); + auto ev = env.Send<TEvYdbProxy::TEvTopicReaderGone>(reader, new TEvYdbProxy::TEvReadTopicRequest(TEvYdbProxy::TReadTopicSettings())); UNIT_ASSERT(ev); UNIT_ASSERT_VALUES_EQUAL(ev->Get()->Result.GetStatus(), NYdb::EStatus::SCHEME_ERROR); diff --git a/ydb/tests/functional/transfer/main.cpp b/ydb/tests/functional/transfer/main.cpp index 45a8314e68..6264400a55 100644 --- a/ydb/tests/functional/transfer/main.cpp +++ b/ydb/tests/functional/transfer/main.cpp @@ -17,7 +17,7 @@ using namespace NYdb::NReplication; namespace { -volatile size_t TestCaseCounter = 0; +volatile size_t TestCaseCounter = RandomNumber<size_t>(); struct IChecker { virtual void Assert(const TString& msg, const ::Ydb::Value& value) = 0; @@ -125,7 +125,6 @@ struct TConfig { const TVector<TString> AlterLambdas; }; - struct MainTestCase { MainTestCase() @@ -929,7 +928,7 @@ Y_UNIT_TEST_SUITE(Transfer) Sleep(TDuration::Seconds(1)); } } - +/* Y_UNIT_TEST(DescribeError_OnWriteToShard) { MainTestCase testCase; @@ -969,5 +968,6 @@ Y_UNIT_TEST_SUITE(Transfer) Sleep(TDuration::Seconds(1)); } } +*/ } |