aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorIlnaz Nizametdinov <ilnaz@ydb.tech>2024-01-23 14:33:41 +0300
committerGitHub <noreply@github.com>2024-01-23 14:33:41 +0300
commit5d6abda405b039fb926830e81339f2cb37d89bce (patch)
treecb3fa15092a0d775abb9b45dccb11d93d746e40c
parent0761c67911d67019f2ba62bf62e9569b6a63dde0 (diff)
downloadydb-5d6abda405b039fb926830e81339f2cb37d89bce.tar.gz
LocalTableWriter skeleton KIKIMR-20673 (#1214)
-rw-r--r--ydb/core/change_exchange/change_sender_common_ops.cpp6
-rw-r--r--ydb/core/change_exchange/change_sender_common_ops.h11
-rw-r--r--ydb/core/tx/datashard/change_sender_async_index.cpp6
-rw-r--r--ydb/core/tx/datashard/change_sender_cdc_stream.cpp6
-rw-r--r--ydb/core/tx/datashard/datashard_ut_change_exchange.cpp14
-rw-r--r--ydb/core/tx/replication/service/table_writer.cpp248
-rw-r--r--ydb/core/tx/replication/service/table_writer.h6
-rw-r--r--ydb/core/tx/replication/service/topic_reader.cpp2
-rw-r--r--ydb/core/tx/replication/service/ya.make2
-rw-r--r--ydb/core/tx/scheme_cache/helpers.h1
10 files changed, 272 insertions, 30 deletions
diff --git a/ydb/core/change_exchange/change_sender_common_ops.cpp b/ydb/core/change_exchange/change_sender_common_ops.cpp
index c3887dfec6..b2b3e90718 100644
--- a/ydb/core/change_exchange/change_sender_common_ops.cpp
+++ b/ydb/core/change_exchange/change_sender_common_ops.cpp
@@ -119,7 +119,7 @@ bool TBaseChangeSender::RequestRecords() {
return false;
}
- ActorOps->Send(ChangeServer, new TEvChangeExchange::TEvRequestRecords(std::move(records)));
+ ActorOps->Send(GetChangeServer(), new TEvChangeExchange::TEvRequestRecords(std::move(records)));
return true;
}
@@ -431,11 +431,9 @@ void TBaseChangeSender::RemoveRecords() {
}
}
-TBaseChangeSender::TBaseChangeSender(IActorOps* actorOps, IChangeSenderResolver* resolver,
- const TActorId& changeServer, const TPathId& pathId)
+TBaseChangeSender::TBaseChangeSender(IActorOps* actorOps, IChangeSenderResolver* resolver, const TPathId& pathId)
: ActorOps(actorOps)
, Resolver(resolver)
- , ChangeServer(changeServer)
, PathId(pathId)
, MemLimit(192_KB)
, MemUsage(0)
diff --git a/ydb/core/change_exchange/change_sender_common_ops.h b/ydb/core/change_exchange/change_sender_common_ops.h
index fe516176a8..2e25c8aa72 100644
--- a/ydb/core/change_exchange/change_sender_common_ops.h
+++ b/ydb/core/change_exchange/change_sender_common_ops.h
@@ -3,7 +3,6 @@
#include "change_exchange.h"
#include <ydb/library/actors/core/actor.h>
-#include <ydb/library/actors/core/hfunc.h>
#include <ydb/library/actors/core/mon.h>
#include <util/generic/hash.h>
@@ -52,6 +51,8 @@ class IChangeSender {
public:
virtual ~IChangeSender() = default;
+ virtual TActorId GetChangeServer() const = 0;
+
virtual void CreateSenders(const TVector<ui64>& partitionIds, bool partitioningChanged = true) = 0;
virtual void KillSenders() = 0;
virtual IActor* CreateSender(ui64 partitionId) = 0;
@@ -120,12 +121,12 @@ protected:
remove.push_back(record.Order);
}
- ActorOps->Send(ChangeServer, new TEvChangeExchange::TEvRemoveRecords(std::move(remove)));
+ ActorOps->Send(GetChangeServer(), new TEvChangeExchange::TEvRemoveRecords(std::move(remove)));
}
template <>
void RemoveRecords(TVector<ui64>&& records) {
- ActorOps->Send(ChangeServer, new TEvChangeExchange::TEvRemoveRecords(std::move(records)));
+ ActorOps->Send(GetChangeServer(), new TEvChangeExchange::TEvRemoveRecords(std::move(records)));
}
void CreateSenders(const TVector<ui64>& partitionIds, bool partitioningChanged = true) override;
@@ -138,8 +139,7 @@ protected:
void OnReady(ui64 partitionId) override;
void OnGone(ui64 partitionId) override;
- explicit TBaseChangeSender(IActorOps* actorOps, IChangeSenderResolver* resolver,
- const TActorId& changeServer, const TPathId& pathId);
+ explicit TBaseChangeSender(IActorOps* actorOps, IChangeSenderResolver* resolver, const TPathId& pathId);
void RenderHtmlPage(ui64 tabletId, NMon::TEvRemoteHttpInfo::TPtr& ev, const TActorContext& ctx);
@@ -148,7 +148,6 @@ private:
IChangeSenderResolver* const Resolver;
protected:
- const TActorId ChangeServer;
const TPathId PathId;
private:
diff --git a/ydb/core/tx/datashard/change_sender_async_index.cpp b/ydb/core/tx/datashard/change_sender_async_index.cpp
index 98a225f554..c0630662a4 100644
--- a/ydb/core/tx/datashard/change_sender_async_index.cpp
+++ b/ydb/core/tx/datashard/change_sender_async_index.cpp
@@ -704,6 +704,10 @@ class TAsyncIndexChangeSenderMain
return StateBase(ev);
}
+ TActorId GetChangeServer() const override {
+ return DataShard.ActorId;
+ }
+
void Resolve() override {
ResolveIndex();
}
@@ -794,7 +798,7 @@ public:
explicit TAsyncIndexChangeSenderMain(const TDataShardId& dataShard, const TTableId& userTableId, const TPathId& indexPathId)
: TActorBootstrapped()
- , TBaseChangeSender(this, this, dataShard.ActorId, indexPathId)
+ , TBaseChangeSender(this, this, indexPathId)
, DataShard(dataShard)
, UserTableId(userTableId)
, IndexTableVersion(0)
diff --git a/ydb/core/tx/datashard/change_sender_cdc_stream.cpp b/ydb/core/tx/datashard/change_sender_cdc_stream.cpp
index 111f1c74c6..55cf00095e 100644
--- a/ydb/core/tx/datashard/change_sender_cdc_stream.cpp
+++ b/ydb/core/tx/datashard/change_sender_cdc_stream.cpp
@@ -651,6 +651,10 @@ class TCdcChangeSenderMain
return StateBase(ev);
}
+ TActorId GetChangeServer() const override {
+ return DataShard.ActorId;
+ }
+
void Resolve() override {
ResolveCdcStream();
}
@@ -760,7 +764,7 @@ public:
explicit TCdcChangeSenderMain(const TDataShardId& dataShard, const TPathId& streamPathId)
: TActorBootstrapped()
- , TBaseChangeSender(this, this, dataShard.ActorId, streamPathId)
+ , TBaseChangeSender(this, this, streamPathId)
, DataShard(dataShard)
, TopicVersion(0)
{
diff --git a/ydb/core/tx/datashard/datashard_ut_change_exchange.cpp b/ydb/core/tx/datashard/datashard_ut_change_exchange.cpp
index a744dbfeb5..97576e8481 100644
--- a/ydb/core/tx/datashard/datashard_ut_change_exchange.cpp
+++ b/ydb/core/tx/datashard/datashard_ut_change_exchange.cpp
@@ -1,11 +1,7 @@
#include <ydb/core/tx/datashard/ut_common/datashard_ut_common.h>
-#include "change_sender_common_ops.h"
-
-#include <library/cpp/digest/md5/md5.h>
-#include <library/cpp/json/json_reader.h>
-#include <library/cpp/json/json_writer.h>
#include <ydb/core/base/path.h>
+#include <ydb/core/change_exchange/change_sender_common_ops.h>
#include <ydb/core/persqueue/events/global.h>
#include <ydb/core/persqueue/user_info.h>
#include <ydb/core/persqueue/write_meta.h>
@@ -14,6 +10,10 @@
#include <ydb/public/sdk/cpp/client/ydb_persqueue_public/persqueue.h>
#include <ydb/public/sdk/cpp/client/ydb_topic/topic.h>
+#include <library/cpp/digest/md5/md5.h>
+#include <library/cpp/json/json_reader.h>
+#include <library/cpp/json/json_writer.h>
+
#include <util/generic/size_literals.h>
#include <util/string/join.h>
#include <util/string/printf.h>
@@ -2926,7 +2926,7 @@ Y_UNIT_TEST_SUITE(Cdc) {
bool ready = false;
auto prevObserver = runtime.SetObserverFunc([&](TAutoPtr<IEventHandle>& ev) {
- if (ev->GetTypeRewrite() == TEvChangeExchangePrivate::EvReady) {
+ if (ev->GetTypeRewrite() == NChangeExchange::TEvChangeExchangePrivate::EvReady) {
ready = true;
}
@@ -2948,7 +2948,7 @@ Y_UNIT_TEST_SUITE(Cdc) {
THolder<IEventHandle> delayed;
prevObserver = runtime.SetObserverFunc([&](TAutoPtr<IEventHandle>& ev) {
- if (ev->GetTypeRewrite() == TEvChangeExchangePrivate::EvReady) {
+ if (ev->GetTypeRewrite() == NChangeExchange::TEvChangeExchangePrivate::EvReady) {
delayed.Reset(ev.Release());
return TTestActorRuntime::EEventAction::DROP;
}
diff --git a/ydb/core/tx/replication/service/table_writer.cpp b/ydb/core/tx/replication/service/table_writer.cpp
index 0ff61d0307..6bd3cb08bd 100644
--- a/ydb/core/tx/replication/service/table_writer.cpp
+++ b/ydb/core/tx/replication/service/table_writer.cpp
@@ -1,47 +1,277 @@
#include "table_writer.h"
#include "worker.h"
+#include <ydb/core/change_exchange/change_sender_common_ops.h>
+#include <ydb/core/tx/scheme_cache/helpers.h>
#include <ydb/library/actors/core/actor.h>
#include <ydb/library/actors/core/hfunc.h>
namespace NKikimr::NReplication::NService {
-class TLocalTableWriter: public TActor<TLocalTableWriter> {
+class TLocalTableWriter
+ : public TActor<TLocalTableWriter>
+ , public NChangeExchange::TBaseChangeSender
+ , public NChangeExchange::IChangeSenderResolver
+ , private NSchemeCache::TSchemeCacheHelpers
+{
+ static TSerializedTableRange GetFullRange(ui32 keyColumnsCount) {
+ TVector<TCell> fromValues(keyColumnsCount);
+ TVector<TCell> toValues;
+ return TSerializedTableRange(fromValues, true, toValues, false);
+ }
+
+ void LogCritAndLeave(const TString& error) {
+ Y_UNUSED(error);
+ Leave();
+ }
+
+ void LogWarnAndRetry(const TString& error) {
+ Y_UNUSED(error);
+ Retry();
+ }
+
+ template <typename CheckFunc, typename FailFunc, typename T, typename... Args>
+ bool Check(CheckFunc checkFunc, FailFunc failFunc, const T& subject, Args&&... args) {
+ return checkFunc("writer", subject, std::forward<Args>(args)..., std::bind(failFunc, this, std::placeholders::_1));
+ }
+
+ template <typename T>
+ bool CheckNotEmpty(const TAutoPtr<T>& result) {
+ return Check(&TSchemeCacheHelpers::CheckNotEmpty<T>, &TThis::LogCritAndLeave, result);
+ }
+
+ template <typename T>
+ bool CheckEntriesCount(const TAutoPtr<T>& result, ui32 expected) {
+ return Check(&TSchemeCacheHelpers::CheckEntriesCount<T>, &TThis::LogCritAndLeave, result, expected);
+ }
+
+ template <typename T>
+ bool CheckTableId(const T& entry, const TTableId& expected) {
+ return Check(&TSchemeCacheHelpers::CheckTableId<T>, &TThis::LogCritAndLeave, entry, expected);
+ }
+
+ template <typename T>
+ bool CheckEntrySucceeded(const T& entry) {
+ return Check(&TSchemeCacheHelpers::CheckEntrySucceeded<T>, &TThis::LogWarnAndRetry, entry);
+ }
+
+ template <typename T>
+ bool CheckEntryKind(const T& entry, TNavigate::EKind expected) {
+ return Check(&TSchemeCacheHelpers::CheckEntryKind<T>, &TThis::LogCritAndLeave, entry, expected);
+ }
+
+ static TVector<ui64> MakePartitionIds(const TVector<TKeyDesc::TPartitionInfo>& partitions) {
+ TVector<ui64> result(Reserve(partitions.size()));
+
+ for (const auto& partition : partitions) {
+ result.push_back(partition.ShardId);
+ }
+
+ return result;
+ }
+
+ TActorId GetChangeServer() const override {
+ return SelfId();
+ }
+
+ void Resolve() override {
+ ResolveTable();
+ }
+
+ bool IsResolving() const override {
+ return Resolving;
+ }
+
+ bool IsResolved() const override {
+ return KeyDesc && KeyDesc->GetPartitions();
+ }
+
void Handle(TEvWorker::TEvHandshake::TPtr& ev) {
Worker = ev->Sender;
+ ResolveTable();
+ }
+
+ void ResolveTable() {
+ Resolving = true;
+
+ auto request = MakeHolder<TNavigate>();
+ request->ResultSet.emplace_back(MakeNavigateEntry(PathId, TNavigate::OpTable));
+ Send(MakeSchemeCacheID(), new TEvNavigate(request.Release()));
+ }
+
+ void Handle(TEvTxProxySchemeCache::TEvNavigateKeySetResult::TPtr& ev) {
+ const auto& result = ev->Get()->Request;
+
+ if (!CheckNotEmpty(result)) {
+ return;
+ }
+
+ if (!CheckEntriesCount(result, 1)) {
+ return;
+ }
+
+ const auto& entry = result->ResultSet.at(0);
+
+ if (!CheckTableId(entry, PathId)) {
+ return;
+ }
+
+ if (!CheckEntrySucceeded(entry)) {
+ return;
+ }
+
+ if (!CheckEntryKind(entry, TNavigate::KindTable)) {
+ return;
+ }
+
+ TVector<NScheme::TTypeInfo> keyColumnTypes;
+ for (const auto& [_, column] : entry.Columns) {
+ if (column.KeyOrder < 0) {
+ continue;
+ }
+
+ if (keyColumnTypes.size() <= static_cast<ui32>(column.KeyOrder)) {
+ keyColumnTypes.resize(column.KeyOrder + 1);
+ }
+
+ keyColumnTypes[column.KeyOrder] = column.PType;
+ }
+
+ KeyDesc = MakeHolder<TKeyDesc>(
+ entry.TableId,
+ GetFullRange(keyColumnTypes.size()).ToTableRange(),
+ TKeyDesc::ERowOperation::Update,
+ keyColumnTypes,
+ TVector<TKeyDesc::TColumnOp>()
+ );
+
+ ResolveKeys();
+ }
+
+ void ResolveKeys() {
+ auto request = MakeHolder<TResolve>();
+ request->ResultSet.emplace_back(std::move(KeyDesc));
+ Send(MakeSchemeCacheID(), new TEvResolve(request.Release()));
+ }
+
+ void Handle(TEvTxProxySchemeCache::TEvResolveKeySetResult::TPtr& ev) {
+ const auto& result = ev->Get()->Request;
+
+ if (!CheckNotEmpty(result)) {
+ return;
+ }
+
+ if (!CheckEntriesCount(result, 1)) {
+ return;
+ }
+
+ auto& entry = result->ResultSet.at(0);
+
+ if (!CheckTableId(entry, PathId)) {
+ return;
+ }
+
+ if (!CheckEntrySucceeded(entry)) {
+ return;
+ }
+
+ if (!entry.KeyDescription->GetPartitions()) {
+ return LogWarnAndRetry("Empty partitions");
+ }
+
+ const bool versionChanged = !TableVersion || TableVersion != entry.GeneralVersion;
+ TableVersion = entry.GeneralVersion;
+
+ KeyDesc = std::move(entry.KeyDescription);
+ CreateSenders(MakePartitionIds(KeyDesc->GetPartitions()), versionChanged);
+
Send(Worker, new TEvWorker::TEvHandshake());
+ Resolving = false;
+ }
+
+ IActor* CreateSender(ui64 partitionId) override {
+ Y_UNUSED(partitionId);
+ return nullptr;
+ }
+
+ ui64 GetPartitionId(NChangeExchange::IChangeRecord::TPtr record) const override {
+ Y_UNUSED(record);
+ return 0;
}
void Handle(TEvWorker::TEvData::TPtr& ev) {
Worker = ev->Sender;
- // TODO
+ // TODO: enqueue records
+ }
+
+ void Handle(NChangeExchange::TEvChangeExchange::TEvRequestRecords::TPtr& ev) {
+ Y_UNUSED(ev);
+ // TODO: send records
+ }
+
+ void Handle(NChangeExchange::TEvChangeExchange::TEvRecords::TPtr& ev) {
+ ProcessRecords(std::move(ev->Get()->Records));
+ }
+
+ void Handle(NChangeExchange::TEvChangeExchange::TEvForgetRecords::TPtr& ev) {
+ ForgetRecords(std::move(ev->Get()->Records));
+ }
+
+ void Handle(NChangeExchange::TEvChangeExchangePrivate::TEvReady::TPtr& ev) {
+ OnReady(ev->Get()->PartitionId);
+ }
+
+ void Handle(NChangeExchange::TEvChangeExchangePrivate::TEvGone::TPtr& ev) {
+ OnGone(ev->Get()->PartitionId);
+ }
+
+ void Retry() {
+ Schedule(TDuration::Seconds(1), new TEvents::TEvWakeup());
+ }
+
+ void Leave() {
+ Send(Worker, new TEvents::TEvGone());
+ PassAway();
+ }
+
+ void PassAway() override {
+ KillSenders();
+ TActor::PassAway();
}
public:
- explicit TLocalTableWriter(const TString& path)
+ explicit TLocalTableWriter(const TPathId& tablePathId)
: TActor(&TThis::StateWork)
- , Path(path)
+ , TBaseChangeSender(this, this, tablePathId)
{
- Y_UNUSED(Path);
}
STFUNC(StateWork) {
switch (ev->GetTypeRewrite()) {
hFunc(TEvWorker::TEvHandshake, Handle);
hFunc(TEvWorker::TEvData, Handle);
+ hFunc(NChangeExchange::TEvChangeExchange::TEvRequestRecords, Handle);
+ hFunc(NChangeExchange::TEvChangeExchange::TEvRecords, Handle);
+ hFunc(NChangeExchange::TEvChangeExchange::TEvForgetRecords, Handle);
+ hFunc(NChangeExchange::TEvChangeExchangePrivate::TEvReady, Handle);
+ hFunc(NChangeExchange::TEvChangeExchangePrivate::TEvGone, Handle);
+ hFunc(TEvTxProxySchemeCache::TEvNavigateKeySetResult, Handle);
+ hFunc(TEvTxProxySchemeCache::TEvResolveKeySetResult, Handle);
+ sFunc(TEvents::TEvWakeup, ResolveTable);
sFunc(TEvents::TEvPoison, PassAway);
}
}
private:
- const TString Path;
-
TActorId Worker;
+ ui64 TableVersion = 0;
+ THolder<TKeyDesc> KeyDesc;
+ bool Resolving = false;
}; // TLocalTableWriter
-IActor* CreateLocalTableWriter(const TString& path) {
- return new TLocalTableWriter(path);
+IActor* CreateLocalTableWriter(const TPathId& tablePathId) {
+ return new TLocalTableWriter(tablePathId);
}
}
diff --git a/ydb/core/tx/replication/service/table_writer.h b/ydb/core/tx/replication/service/table_writer.h
index 95ebde89ed..b0c6568be8 100644
--- a/ydb/core/tx/replication/service/table_writer.h
+++ b/ydb/core/tx/replication/service/table_writer.h
@@ -2,8 +2,12 @@
#include <ydb/core/base/defs.h>
+namespace NKikimr {
+ struct TPathId;
+}
+
namespace NKikimr::NReplication::NService {
-IActor* CreateLocalTableWriter(const TString& path);
+IActor* CreateLocalTableWriter(const TPathId& tablePathId);
}
diff --git a/ydb/core/tx/replication/service/topic_reader.cpp b/ydb/core/tx/replication/service/topic_reader.cpp
index 62ccd9ca65..3a549cac3c 100644
--- a/ydb/core/tx/replication/service/topic_reader.cpp
+++ b/ydb/core/tx/replication/service/topic_reader.cpp
@@ -20,7 +20,7 @@ class TRemoteTopicReader: public TActor<TRemoteTopicReader> {
void Handle(TEvYdbProxy::TEvCreateTopicReaderResponse::TPtr& ev) {
ReadSession = ev->Get()->Result;
- Y_ABORT_UNLESS(!Worker);
+ Y_ABORT_UNLESS(Worker);
Send(Worker, new TEvWorker::TEvHandshake());
}
diff --git a/ydb/core/tx/replication/service/ya.make b/ydb/core/tx/replication/service/ya.make
index 9573a41ecb..ac8d233010 100644
--- a/ydb/core/tx/replication/service/ya.make
+++ b/ydb/core/tx/replication/service/ya.make
@@ -2,6 +2,8 @@ LIBRARY()
PEERDIR(
ydb/core/base
+ ydb/core/change_exchange
+ ydb/core/scheme
ydb/core/tx/replication/ydb_proxy
ydb/library/actors/core
)
diff --git a/ydb/core/tx/scheme_cache/helpers.h b/ydb/core/tx/scheme_cache/helpers.h
index 2412505190..e647109653 100644
--- a/ydb/core/tx/scheme_cache/helpers.h
+++ b/ydb/core/tx/scheme_cache/helpers.h
@@ -1,5 +1,6 @@
#pragma once
+#include <ydb/core/base/appdata.h>
#include <ydb/core/tx/scheme_cache/scheme_cache.h>
#include <util/string/builder.h>