diff options
author | Ilnaz Nizametdinov <ilnaz@ydb.tech> | 2024-01-23 14:33:41 +0300 |
---|---|---|
committer | GitHub <noreply@github.com> | 2024-01-23 14:33:41 +0300 |
commit | 5d6abda405b039fb926830e81339f2cb37d89bce (patch) | |
tree | cb3fa15092a0d775abb9b45dccb11d93d746e40c | |
parent | 0761c67911d67019f2ba62bf62e9569b6a63dde0 (diff) | |
download | ydb-5d6abda405b039fb926830e81339f2cb37d89bce.tar.gz |
LocalTableWriter skeleton KIKIMR-20673 (#1214)
-rw-r--r-- | ydb/core/change_exchange/change_sender_common_ops.cpp | 6 | ||||
-rw-r--r-- | ydb/core/change_exchange/change_sender_common_ops.h | 11 | ||||
-rw-r--r-- | ydb/core/tx/datashard/change_sender_async_index.cpp | 6 | ||||
-rw-r--r-- | ydb/core/tx/datashard/change_sender_cdc_stream.cpp | 6 | ||||
-rw-r--r-- | ydb/core/tx/datashard/datashard_ut_change_exchange.cpp | 14 | ||||
-rw-r--r-- | ydb/core/tx/replication/service/table_writer.cpp | 248 | ||||
-rw-r--r-- | ydb/core/tx/replication/service/table_writer.h | 6 | ||||
-rw-r--r-- | ydb/core/tx/replication/service/topic_reader.cpp | 2 | ||||
-rw-r--r-- | ydb/core/tx/replication/service/ya.make | 2 | ||||
-rw-r--r-- | ydb/core/tx/scheme_cache/helpers.h | 1 |
10 files changed, 272 insertions, 30 deletions
diff --git a/ydb/core/change_exchange/change_sender_common_ops.cpp b/ydb/core/change_exchange/change_sender_common_ops.cpp index c3887dfec6..b2b3e90718 100644 --- a/ydb/core/change_exchange/change_sender_common_ops.cpp +++ b/ydb/core/change_exchange/change_sender_common_ops.cpp @@ -119,7 +119,7 @@ bool TBaseChangeSender::RequestRecords() { return false; } - ActorOps->Send(ChangeServer, new TEvChangeExchange::TEvRequestRecords(std::move(records))); + ActorOps->Send(GetChangeServer(), new TEvChangeExchange::TEvRequestRecords(std::move(records))); return true; } @@ -431,11 +431,9 @@ void TBaseChangeSender::RemoveRecords() { } } -TBaseChangeSender::TBaseChangeSender(IActorOps* actorOps, IChangeSenderResolver* resolver, - const TActorId& changeServer, const TPathId& pathId) +TBaseChangeSender::TBaseChangeSender(IActorOps* actorOps, IChangeSenderResolver* resolver, const TPathId& pathId) : ActorOps(actorOps) , Resolver(resolver) - , ChangeServer(changeServer) , PathId(pathId) , MemLimit(192_KB) , MemUsage(0) diff --git a/ydb/core/change_exchange/change_sender_common_ops.h b/ydb/core/change_exchange/change_sender_common_ops.h index fe516176a8..2e25c8aa72 100644 --- a/ydb/core/change_exchange/change_sender_common_ops.h +++ b/ydb/core/change_exchange/change_sender_common_ops.h @@ -3,7 +3,6 @@ #include "change_exchange.h" #include <ydb/library/actors/core/actor.h> -#include <ydb/library/actors/core/hfunc.h> #include <ydb/library/actors/core/mon.h> #include <util/generic/hash.h> @@ -52,6 +51,8 @@ class IChangeSender { public: virtual ~IChangeSender() = default; + virtual TActorId GetChangeServer() const = 0; + virtual void CreateSenders(const TVector<ui64>& partitionIds, bool partitioningChanged = true) = 0; virtual void KillSenders() = 0; virtual IActor* CreateSender(ui64 partitionId) = 0; @@ -120,12 +121,12 @@ protected: remove.push_back(record.Order); } - ActorOps->Send(ChangeServer, new TEvChangeExchange::TEvRemoveRecords(std::move(remove))); + ActorOps->Send(GetChangeServer(), new TEvChangeExchange::TEvRemoveRecords(std::move(remove))); } template <> void RemoveRecords(TVector<ui64>&& records) { - ActorOps->Send(ChangeServer, new TEvChangeExchange::TEvRemoveRecords(std::move(records))); + ActorOps->Send(GetChangeServer(), new TEvChangeExchange::TEvRemoveRecords(std::move(records))); } void CreateSenders(const TVector<ui64>& partitionIds, bool partitioningChanged = true) override; @@ -138,8 +139,7 @@ protected: void OnReady(ui64 partitionId) override; void OnGone(ui64 partitionId) override; - explicit TBaseChangeSender(IActorOps* actorOps, IChangeSenderResolver* resolver, - const TActorId& changeServer, const TPathId& pathId); + explicit TBaseChangeSender(IActorOps* actorOps, IChangeSenderResolver* resolver, const TPathId& pathId); void RenderHtmlPage(ui64 tabletId, NMon::TEvRemoteHttpInfo::TPtr& ev, const TActorContext& ctx); @@ -148,7 +148,6 @@ private: IChangeSenderResolver* const Resolver; protected: - const TActorId ChangeServer; const TPathId PathId; private: diff --git a/ydb/core/tx/datashard/change_sender_async_index.cpp b/ydb/core/tx/datashard/change_sender_async_index.cpp index 98a225f554..c0630662a4 100644 --- a/ydb/core/tx/datashard/change_sender_async_index.cpp +++ b/ydb/core/tx/datashard/change_sender_async_index.cpp @@ -704,6 +704,10 @@ class TAsyncIndexChangeSenderMain return StateBase(ev); } + TActorId GetChangeServer() const override { + return DataShard.ActorId; + } + void Resolve() override { ResolveIndex(); } @@ -794,7 +798,7 @@ public: explicit TAsyncIndexChangeSenderMain(const TDataShardId& dataShard, const TTableId& userTableId, const TPathId& indexPathId) : TActorBootstrapped() - , TBaseChangeSender(this, this, dataShard.ActorId, indexPathId) + , TBaseChangeSender(this, this, indexPathId) , DataShard(dataShard) , UserTableId(userTableId) , IndexTableVersion(0) diff --git a/ydb/core/tx/datashard/change_sender_cdc_stream.cpp b/ydb/core/tx/datashard/change_sender_cdc_stream.cpp index 111f1c74c6..55cf00095e 100644 --- a/ydb/core/tx/datashard/change_sender_cdc_stream.cpp +++ b/ydb/core/tx/datashard/change_sender_cdc_stream.cpp @@ -651,6 +651,10 @@ class TCdcChangeSenderMain return StateBase(ev); } + TActorId GetChangeServer() const override { + return DataShard.ActorId; + } + void Resolve() override { ResolveCdcStream(); } @@ -760,7 +764,7 @@ public: explicit TCdcChangeSenderMain(const TDataShardId& dataShard, const TPathId& streamPathId) : TActorBootstrapped() - , TBaseChangeSender(this, this, dataShard.ActorId, streamPathId) + , TBaseChangeSender(this, this, streamPathId) , DataShard(dataShard) , TopicVersion(0) { diff --git a/ydb/core/tx/datashard/datashard_ut_change_exchange.cpp b/ydb/core/tx/datashard/datashard_ut_change_exchange.cpp index a744dbfeb5..97576e8481 100644 --- a/ydb/core/tx/datashard/datashard_ut_change_exchange.cpp +++ b/ydb/core/tx/datashard/datashard_ut_change_exchange.cpp @@ -1,11 +1,7 @@ #include <ydb/core/tx/datashard/ut_common/datashard_ut_common.h> -#include "change_sender_common_ops.h" - -#include <library/cpp/digest/md5/md5.h> -#include <library/cpp/json/json_reader.h> -#include <library/cpp/json/json_writer.h> #include <ydb/core/base/path.h> +#include <ydb/core/change_exchange/change_sender_common_ops.h> #include <ydb/core/persqueue/events/global.h> #include <ydb/core/persqueue/user_info.h> #include <ydb/core/persqueue/write_meta.h> @@ -14,6 +10,10 @@ #include <ydb/public/sdk/cpp/client/ydb_persqueue_public/persqueue.h> #include <ydb/public/sdk/cpp/client/ydb_topic/topic.h> +#include <library/cpp/digest/md5/md5.h> +#include <library/cpp/json/json_reader.h> +#include <library/cpp/json/json_writer.h> + #include <util/generic/size_literals.h> #include <util/string/join.h> #include <util/string/printf.h> @@ -2926,7 +2926,7 @@ Y_UNIT_TEST_SUITE(Cdc) { bool ready = false; auto prevObserver = runtime.SetObserverFunc([&](TAutoPtr<IEventHandle>& ev) { - if (ev->GetTypeRewrite() == TEvChangeExchangePrivate::EvReady) { + if (ev->GetTypeRewrite() == NChangeExchange::TEvChangeExchangePrivate::EvReady) { ready = true; } @@ -2948,7 +2948,7 @@ Y_UNIT_TEST_SUITE(Cdc) { THolder<IEventHandle> delayed; prevObserver = runtime.SetObserverFunc([&](TAutoPtr<IEventHandle>& ev) { - if (ev->GetTypeRewrite() == TEvChangeExchangePrivate::EvReady) { + if (ev->GetTypeRewrite() == NChangeExchange::TEvChangeExchangePrivate::EvReady) { delayed.Reset(ev.Release()); return TTestActorRuntime::EEventAction::DROP; } diff --git a/ydb/core/tx/replication/service/table_writer.cpp b/ydb/core/tx/replication/service/table_writer.cpp index 0ff61d0307..6bd3cb08bd 100644 --- a/ydb/core/tx/replication/service/table_writer.cpp +++ b/ydb/core/tx/replication/service/table_writer.cpp @@ -1,47 +1,277 @@ #include "table_writer.h" #include "worker.h" +#include <ydb/core/change_exchange/change_sender_common_ops.h> +#include <ydb/core/tx/scheme_cache/helpers.h> #include <ydb/library/actors/core/actor.h> #include <ydb/library/actors/core/hfunc.h> namespace NKikimr::NReplication::NService { -class TLocalTableWriter: public TActor<TLocalTableWriter> { +class TLocalTableWriter + : public TActor<TLocalTableWriter> + , public NChangeExchange::TBaseChangeSender + , public NChangeExchange::IChangeSenderResolver + , private NSchemeCache::TSchemeCacheHelpers +{ + static TSerializedTableRange GetFullRange(ui32 keyColumnsCount) { + TVector<TCell> fromValues(keyColumnsCount); + TVector<TCell> toValues; + return TSerializedTableRange(fromValues, true, toValues, false); + } + + void LogCritAndLeave(const TString& error) { + Y_UNUSED(error); + Leave(); + } + + void LogWarnAndRetry(const TString& error) { + Y_UNUSED(error); + Retry(); + } + + template <typename CheckFunc, typename FailFunc, typename T, typename... Args> + bool Check(CheckFunc checkFunc, FailFunc failFunc, const T& subject, Args&&... args) { + return checkFunc("writer", subject, std::forward<Args>(args)..., std::bind(failFunc, this, std::placeholders::_1)); + } + + template <typename T> + bool CheckNotEmpty(const TAutoPtr<T>& result) { + return Check(&TSchemeCacheHelpers::CheckNotEmpty<T>, &TThis::LogCritAndLeave, result); + } + + template <typename T> + bool CheckEntriesCount(const TAutoPtr<T>& result, ui32 expected) { + return Check(&TSchemeCacheHelpers::CheckEntriesCount<T>, &TThis::LogCritAndLeave, result, expected); + } + + template <typename T> + bool CheckTableId(const T& entry, const TTableId& expected) { + return Check(&TSchemeCacheHelpers::CheckTableId<T>, &TThis::LogCritAndLeave, entry, expected); + } + + template <typename T> + bool CheckEntrySucceeded(const T& entry) { + return Check(&TSchemeCacheHelpers::CheckEntrySucceeded<T>, &TThis::LogWarnAndRetry, entry); + } + + template <typename T> + bool CheckEntryKind(const T& entry, TNavigate::EKind expected) { + return Check(&TSchemeCacheHelpers::CheckEntryKind<T>, &TThis::LogCritAndLeave, entry, expected); + } + + static TVector<ui64> MakePartitionIds(const TVector<TKeyDesc::TPartitionInfo>& partitions) { + TVector<ui64> result(Reserve(partitions.size())); + + for (const auto& partition : partitions) { + result.push_back(partition.ShardId); + } + + return result; + } + + TActorId GetChangeServer() const override { + return SelfId(); + } + + void Resolve() override { + ResolveTable(); + } + + bool IsResolving() const override { + return Resolving; + } + + bool IsResolved() const override { + return KeyDesc && KeyDesc->GetPartitions(); + } + void Handle(TEvWorker::TEvHandshake::TPtr& ev) { Worker = ev->Sender; + ResolveTable(); + } + + void ResolveTable() { + Resolving = true; + + auto request = MakeHolder<TNavigate>(); + request->ResultSet.emplace_back(MakeNavigateEntry(PathId, TNavigate::OpTable)); + Send(MakeSchemeCacheID(), new TEvNavigate(request.Release())); + } + + void Handle(TEvTxProxySchemeCache::TEvNavigateKeySetResult::TPtr& ev) { + const auto& result = ev->Get()->Request; + + if (!CheckNotEmpty(result)) { + return; + } + + if (!CheckEntriesCount(result, 1)) { + return; + } + + const auto& entry = result->ResultSet.at(0); + + if (!CheckTableId(entry, PathId)) { + return; + } + + if (!CheckEntrySucceeded(entry)) { + return; + } + + if (!CheckEntryKind(entry, TNavigate::KindTable)) { + return; + } + + TVector<NScheme::TTypeInfo> keyColumnTypes; + for (const auto& [_, column] : entry.Columns) { + if (column.KeyOrder < 0) { + continue; + } + + if (keyColumnTypes.size() <= static_cast<ui32>(column.KeyOrder)) { + keyColumnTypes.resize(column.KeyOrder + 1); + } + + keyColumnTypes[column.KeyOrder] = column.PType; + } + + KeyDesc = MakeHolder<TKeyDesc>( + entry.TableId, + GetFullRange(keyColumnTypes.size()).ToTableRange(), + TKeyDesc::ERowOperation::Update, + keyColumnTypes, + TVector<TKeyDesc::TColumnOp>() + ); + + ResolveKeys(); + } + + void ResolveKeys() { + auto request = MakeHolder<TResolve>(); + request->ResultSet.emplace_back(std::move(KeyDesc)); + Send(MakeSchemeCacheID(), new TEvResolve(request.Release())); + } + + void Handle(TEvTxProxySchemeCache::TEvResolveKeySetResult::TPtr& ev) { + const auto& result = ev->Get()->Request; + + if (!CheckNotEmpty(result)) { + return; + } + + if (!CheckEntriesCount(result, 1)) { + return; + } + + auto& entry = result->ResultSet.at(0); + + if (!CheckTableId(entry, PathId)) { + return; + } + + if (!CheckEntrySucceeded(entry)) { + return; + } + + if (!entry.KeyDescription->GetPartitions()) { + return LogWarnAndRetry("Empty partitions"); + } + + const bool versionChanged = !TableVersion || TableVersion != entry.GeneralVersion; + TableVersion = entry.GeneralVersion; + + KeyDesc = std::move(entry.KeyDescription); + CreateSenders(MakePartitionIds(KeyDesc->GetPartitions()), versionChanged); + Send(Worker, new TEvWorker::TEvHandshake()); + Resolving = false; + } + + IActor* CreateSender(ui64 partitionId) override { + Y_UNUSED(partitionId); + return nullptr; + } + + ui64 GetPartitionId(NChangeExchange::IChangeRecord::TPtr record) const override { + Y_UNUSED(record); + return 0; } void Handle(TEvWorker::TEvData::TPtr& ev) { Worker = ev->Sender; - // TODO + // TODO: enqueue records + } + + void Handle(NChangeExchange::TEvChangeExchange::TEvRequestRecords::TPtr& ev) { + Y_UNUSED(ev); + // TODO: send records + } + + void Handle(NChangeExchange::TEvChangeExchange::TEvRecords::TPtr& ev) { + ProcessRecords(std::move(ev->Get()->Records)); + } + + void Handle(NChangeExchange::TEvChangeExchange::TEvForgetRecords::TPtr& ev) { + ForgetRecords(std::move(ev->Get()->Records)); + } + + void Handle(NChangeExchange::TEvChangeExchangePrivate::TEvReady::TPtr& ev) { + OnReady(ev->Get()->PartitionId); + } + + void Handle(NChangeExchange::TEvChangeExchangePrivate::TEvGone::TPtr& ev) { + OnGone(ev->Get()->PartitionId); + } + + void Retry() { + Schedule(TDuration::Seconds(1), new TEvents::TEvWakeup()); + } + + void Leave() { + Send(Worker, new TEvents::TEvGone()); + PassAway(); + } + + void PassAway() override { + KillSenders(); + TActor::PassAway(); } public: - explicit TLocalTableWriter(const TString& path) + explicit TLocalTableWriter(const TPathId& tablePathId) : TActor(&TThis::StateWork) - , Path(path) + , TBaseChangeSender(this, this, tablePathId) { - Y_UNUSED(Path); } STFUNC(StateWork) { switch (ev->GetTypeRewrite()) { hFunc(TEvWorker::TEvHandshake, Handle); hFunc(TEvWorker::TEvData, Handle); + hFunc(NChangeExchange::TEvChangeExchange::TEvRequestRecords, Handle); + hFunc(NChangeExchange::TEvChangeExchange::TEvRecords, Handle); + hFunc(NChangeExchange::TEvChangeExchange::TEvForgetRecords, Handle); + hFunc(NChangeExchange::TEvChangeExchangePrivate::TEvReady, Handle); + hFunc(NChangeExchange::TEvChangeExchangePrivate::TEvGone, Handle); + hFunc(TEvTxProxySchemeCache::TEvNavigateKeySetResult, Handle); + hFunc(TEvTxProxySchemeCache::TEvResolveKeySetResult, Handle); + sFunc(TEvents::TEvWakeup, ResolveTable); sFunc(TEvents::TEvPoison, PassAway); } } private: - const TString Path; - TActorId Worker; + ui64 TableVersion = 0; + THolder<TKeyDesc> KeyDesc; + bool Resolving = false; }; // TLocalTableWriter -IActor* CreateLocalTableWriter(const TString& path) { - return new TLocalTableWriter(path); +IActor* CreateLocalTableWriter(const TPathId& tablePathId) { + return new TLocalTableWriter(tablePathId); } } diff --git a/ydb/core/tx/replication/service/table_writer.h b/ydb/core/tx/replication/service/table_writer.h index 95ebde89ed..b0c6568be8 100644 --- a/ydb/core/tx/replication/service/table_writer.h +++ b/ydb/core/tx/replication/service/table_writer.h @@ -2,8 +2,12 @@ #include <ydb/core/base/defs.h> +namespace NKikimr { + struct TPathId; +} + namespace NKikimr::NReplication::NService { -IActor* CreateLocalTableWriter(const TString& path); +IActor* CreateLocalTableWriter(const TPathId& tablePathId); } diff --git a/ydb/core/tx/replication/service/topic_reader.cpp b/ydb/core/tx/replication/service/topic_reader.cpp index 62ccd9ca65..3a549cac3c 100644 --- a/ydb/core/tx/replication/service/topic_reader.cpp +++ b/ydb/core/tx/replication/service/topic_reader.cpp @@ -20,7 +20,7 @@ class TRemoteTopicReader: public TActor<TRemoteTopicReader> { void Handle(TEvYdbProxy::TEvCreateTopicReaderResponse::TPtr& ev) { ReadSession = ev->Get()->Result; - Y_ABORT_UNLESS(!Worker); + Y_ABORT_UNLESS(Worker); Send(Worker, new TEvWorker::TEvHandshake()); } diff --git a/ydb/core/tx/replication/service/ya.make b/ydb/core/tx/replication/service/ya.make index 9573a41ecb..ac8d233010 100644 --- a/ydb/core/tx/replication/service/ya.make +++ b/ydb/core/tx/replication/service/ya.make @@ -2,6 +2,8 @@ LIBRARY() PEERDIR( ydb/core/base + ydb/core/change_exchange + ydb/core/scheme ydb/core/tx/replication/ydb_proxy ydb/library/actors/core ) diff --git a/ydb/core/tx/scheme_cache/helpers.h b/ydb/core/tx/scheme_cache/helpers.h index 2412505190..e647109653 100644 --- a/ydb/core/tx/scheme_cache/helpers.h +++ b/ydb/core/tx/scheme_cache/helpers.h @@ -1,5 +1,6 @@ #pragma once +#include <ydb/core/base/appdata.h> #include <ydb/core/tx/scheme_cache/scheme_cache.h> #include <util/string/builder.h> |