summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--ydb/core/tx/datashard/change_exchange_split.cpp7
-rw-r--r--ydb/core/tx/datashard/change_sender_async_index.cpp4
-rw-r--r--ydb/core/tx/datashard/change_sender_cdc_stream.cpp4
-rw-r--r--ydb/core/tx/datashard/change_sender_common_ops.h95
-rw-r--r--ydb/core/tx/scheme_cache/helpers.h105
5 files changed, 117 insertions, 98 deletions
diff --git a/ydb/core/tx/datashard/change_exchange_split.cpp b/ydb/core/tx/datashard/change_exchange_split.cpp
index 659a9a35862..0bcd44390ed 100644
--- a/ydb/core/tx/datashard/change_exchange_split.cpp
+++ b/ydb/core/tx/datashard/change_exchange_split.cpp
@@ -6,6 +6,8 @@
#include <ydb/core/base/tablet_pipe.h>
#include <ydb/core/persqueue/events/global.h>
#include <ydb/core/persqueue/writer/source_id_encoding.h>
+#include <ydb/core/tx/scheme_cache/helpers.h>
+#include <ydb/core/tx/scheme_cache/scheme_cache.h>
#include <ydb/public/lib/base/msgbus_status.h>
#include <ydb/library/actors/core/actor_bootstrapped.h>
@@ -155,7 +157,10 @@ private:
}; // TCdcPartitionWorker
-class TCdcWorker: public TActorBootstrapped<TCdcWorker>, private TSchemeCacheHelpers {
+class TCdcWorker
+ : public TActorBootstrapped<TCdcWorker>
+ , private NSchemeCache::TSchemeCacheHelpers
+{
TStringBuf GetLogPrefix() const {
if (!LogPrefix) {
LogPrefix = TStringBuilder()
diff --git a/ydb/core/tx/datashard/change_sender_async_index.cpp b/ydb/core/tx/datashard/change_sender_async_index.cpp
index 54f55fed62a..05f6b279723 100644
--- a/ydb/core/tx/datashard/change_sender_async_index.cpp
+++ b/ydb/core/tx/datashard/change_sender_async_index.cpp
@@ -7,6 +7,8 @@
#include <ydb/core/base/tablet_pipecache.h>
#include <ydb/library/services/services.pb.h>
#include <ydb/core/tablet_flat/flat_row_eggs.h>
+#include <ydb/core/tx/scheme_cache/helpers.h>
+#include <ydb/core/tx/scheme_cache/scheme_cache.h>
#include <ydb/core/tx/tx_proxy/proxy.h>
#include <ydb/library/yql/public/udf/udf_data_type.h>
@@ -325,7 +327,7 @@ class TAsyncIndexChangeSenderMain
: public TActorBootstrapped<TAsyncIndexChangeSenderMain>
, public TBaseChangeSender
, public IChangeSenderResolver
- , private TSchemeCacheHelpers
+ , private NSchemeCache::TSchemeCacheHelpers
{
TStringBuf GetLogPrefix() const {
if (!LogPrefix) {
diff --git a/ydb/core/tx/datashard/change_sender_cdc_stream.cpp b/ydb/core/tx/datashard/change_sender_cdc_stream.cpp
index 1eb33600213..f2334800a2c 100644
--- a/ydb/core/tx/datashard/change_sender_cdc_stream.cpp
+++ b/ydb/core/tx/datashard/change_sender_cdc_stream.cpp
@@ -8,6 +8,8 @@
#include <ydb/core/persqueue/partition_key_range/partition_key_range.h>
#include <ydb/core/persqueue/writer/source_id_encoding.h>
#include <ydb/core/persqueue/writer/writer.h>
+#include <ydb/core/tx/scheme_cache/helpers.h>
+#include <ydb/core/tx/scheme_cache/scheme_cache.h>
#include <ydb/services/lib/sharding/sharding.h>
#include <ydb/library/actors/core/actor_bootstrapped.h>
@@ -288,7 +290,7 @@ class TCdcChangeSenderMain
: public TActorBootstrapped<TCdcChangeSenderMain>
, public TBaseChangeSender
, public IChangeSenderResolver
- , private TSchemeCacheHelpers
+ , private NSchemeCache::TSchemeCacheHelpers
{
struct TPQPartitionInfo {
ui32 PartitionId;
diff --git a/ydb/core/tx/datashard/change_sender_common_ops.h b/ydb/core/tx/datashard/change_sender_common_ops.h
index dc98913bb34..e9c6a5d5d8b 100644
--- a/ydb/core/tx/datashard/change_sender_common_ops.h
+++ b/ydb/core/tx/datashard/change_sender_common_ops.h
@@ -4,7 +4,6 @@
#include "change_exchange_helpers.h"
#include <ydb/core/base/appdata.h>
-#include <ydb/core/tx/scheme_cache/scheme_cache.h>
#include <ydb/library/actors/core/actor.h>
#include <ydb/library/actors/core/hfunc.h>
@@ -170,99 +169,5 @@ private:
}; // TBaseChangeSender
-struct TSchemeCacheHelpers {
- using TNavigate = NSchemeCache::TSchemeCacheNavigate;
- using TEvNavigate = TEvTxProxySchemeCache::TEvNavigateKeySet;
- using TResolve = NSchemeCache::TSchemeCacheRequest;
- using TEvResolve = TEvTxProxySchemeCache::TEvResolveKeySet;
- using TCheckFailFunc = std::function<void(const TString&)>;
-
- inline static TNavigate::TEntry MakeNavigateEntry(const TTableId& tableId, TNavigate::EOp op) {
- TNavigate::TEntry entry;
- entry.RequestType = TNavigate::TEntry::ERequestType::ByTableId;
- entry.TableId = tableId;
- entry.Operation = op;
- entry.ShowPrivatePath = true;
- return entry;
- }
-
- template <typename T>
- static bool CheckNotEmpty(const TStringBuf marker, const TAutoPtr<T>& result, TCheckFailFunc onFailure) {
- if (result) {
- return true;
- }
-
- onFailure(TStringBuilder() << "Empty result at '" << marker << "'");
- return false;
- }
-
- template <typename T>
- static bool CheckEntriesCount(const TStringBuf marker, const TAutoPtr<T>& result, ui32 expected, TCheckFailFunc onFailure) {
- if (result->ResultSet.size() == expected) {
- return true;
- }
-
- onFailure(TStringBuilder() << "Unexpected entries count at '" << marker << "'"
- << ": expected# " << expected
- << ", got# " << result->ResultSet.size()
- << ", result# " << result->ToString(*AppData()->TypeRegistry));
- return false;
- }
-
- inline static const TTableId& GetTableId(const TNavigate::TEntry& entry) {
- return entry.TableId;
- }
-
- inline static const TTableId& GetTableId(const TResolve::TEntry& entry) {
- return entry.KeyDescription->TableId;
- }
-
- template <typename T>
- static bool CheckTableId(const TStringBuf marker, const T& entry, const TTableId& expected, TCheckFailFunc onFailure) {
- if (GetTableId(entry).HasSamePath(expected)) {
- return true;
- }
-
- onFailure(TStringBuilder() << "Unexpected table id at '" << marker << "'"
- << ": expected# " << expected
- << ", got# " << GetTableId(entry)
- << ", entry# " << entry.ToString());
- return false;
- }
-
- inline static bool IsSucceeded(TNavigate::EStatus status) {
- return status == TNavigate::EStatus::Ok;
- }
-
- inline static bool IsSucceeded(TResolve::EStatus status) {
- return status == TResolve::EStatus::OkData;
- }
-
- template <typename T>
- static bool CheckEntrySucceeded(const TStringBuf marker, const T& entry, TCheckFailFunc onFailure) {
- if (IsSucceeded(entry.Status)) {
- return true;
- }
-
- onFailure(TStringBuilder() << "Failed entry at '" << marker << "'"
- << ": entry# " << entry.ToString());
- return false;
- }
-
- template <typename T>
- static bool CheckEntryKind(const TStringBuf marker, const T& entry, TNavigate::EKind expected, TCheckFailFunc onFailure) {
- if (entry.Kind == expected) {
- return true;
- }
-
- onFailure(TStringBuilder() << "Unexpected entry kind at '" << marker << "'"
- << ", expected# " << static_cast<ui32>(expected)
- << ", got# " << static_cast<ui32>(entry.Kind)
- << ", entry# " << entry.ToString());
- return false;
- }
-
-}; // TSchemeCacheHelpers
-
} // NDataShard
} // NKikimr
diff --git a/ydb/core/tx/scheme_cache/helpers.h b/ydb/core/tx/scheme_cache/helpers.h
new file mode 100644
index 00000000000..2412505190f
--- /dev/null
+++ b/ydb/core/tx/scheme_cache/helpers.h
@@ -0,0 +1,105 @@
+#pragma once
+
+#include <ydb/core/tx/scheme_cache/scheme_cache.h>
+
+#include <util/string/builder.h>
+
+#include <functional>
+
+namespace NKikimr::NSchemeCache {
+
+struct TSchemeCacheHelpers {
+ using TNavigate = TSchemeCacheNavigate;
+ using TEvNavigate = TEvTxProxySchemeCache::TEvNavigateKeySet;
+ using TResolve = TSchemeCacheRequest;
+ using TEvResolve = TEvTxProxySchemeCache::TEvResolveKeySet;
+ using TCheckFailFunc = std::function<void(const TString&)>;
+
+ inline static TNavigate::TEntry MakeNavigateEntry(const TTableId& tableId, TNavigate::EOp op) {
+ TNavigate::TEntry entry;
+ entry.RequestType = TNavigate::TEntry::ERequestType::ByTableId;
+ entry.TableId = tableId;
+ entry.Operation = op;
+ entry.ShowPrivatePath = true;
+ return entry;
+ }
+
+ template <typename T>
+ static bool CheckNotEmpty(const TStringBuf marker, const TAutoPtr<T>& result, TCheckFailFunc onFailure) {
+ if (result) {
+ return true;
+ }
+
+ onFailure(TStringBuilder() << "Empty result at '" << marker << "'");
+ return false;
+ }
+
+ template <typename T>
+ static bool CheckEntriesCount(const TStringBuf marker, const TAutoPtr<T>& result, ui32 expected, TCheckFailFunc onFailure) {
+ if (result->ResultSet.size() == expected) {
+ return true;
+ }
+
+ onFailure(TStringBuilder() << "Unexpected entries count at '" << marker << "'"
+ << ": expected# " << expected
+ << ", got# " << result->ResultSet.size()
+ << ", result# " << result->ToString(*AppData()->TypeRegistry));
+ return false;
+ }
+
+ inline static const TTableId& GetTableId(const TNavigate::TEntry& entry) {
+ return entry.TableId;
+ }
+
+ inline static const TTableId& GetTableId(const TResolve::TEntry& entry) {
+ return entry.KeyDescription->TableId;
+ }
+
+ template <typename T>
+ static bool CheckTableId(const TStringBuf marker, const T& entry, const TTableId& expected, TCheckFailFunc onFailure) {
+ if (GetTableId(entry).HasSamePath(expected)) {
+ return true;
+ }
+
+ onFailure(TStringBuilder() << "Unexpected table id at '" << marker << "'"
+ << ": expected# " << expected
+ << ", got# " << GetTableId(entry)
+ << ", entry# " << entry.ToString());
+ return false;
+ }
+
+ inline static bool IsSucceeded(TNavigate::EStatus status) {
+ return status == TNavigate::EStatus::Ok;
+ }
+
+ inline static bool IsSucceeded(TResolve::EStatus status) {
+ return status == TResolve::EStatus::OkData;
+ }
+
+ template <typename T>
+ static bool CheckEntrySucceeded(const TStringBuf marker, const T& entry, TCheckFailFunc onFailure) {
+ if (IsSucceeded(entry.Status)) {
+ return true;
+ }
+
+ onFailure(TStringBuilder() << "Failed entry at '" << marker << "'"
+ << ": entry# " << entry.ToString());
+ return false;
+ }
+
+ template <typename T>
+ static bool CheckEntryKind(const TStringBuf marker, const T& entry, TNavigate::EKind expected, TCheckFailFunc onFailure) {
+ if (entry.Kind == expected) {
+ return true;
+ }
+
+ onFailure(TStringBuilder() << "Unexpected entry kind at '" << marker << "'"
+ << ", expected# " << static_cast<ui32>(expected)
+ << ", got# " << static_cast<ui32>(entry.Kind)
+ << ", entry# " << entry.ToString());
+ return false;
+ }
+
+}; // TSchemeCacheHelpers
+
+}