diff options
| author | Ilnaz Nizametdinov <[email protected]> | 2024-01-18 16:16:22 +0300 |
|---|---|---|
| committer | GitHub <[email protected]> | 2024-01-18 16:16:22 +0300 |
| commit | ff3b5ae4fd0349ed8cf5d24c7cd5e0499275b099 (patch) | |
| tree | b19dae7f19a0b6f584cff1758db53c2042c4e000 | |
| parent | dc7315319aa0f76a302449f3a946e075a3e3be04 (diff) | |
(refactoring) Move TSchemeCacheHelpers to core/tx/scheme_cache KIKIMR-20673 (#1123)
| -rw-r--r-- | ydb/core/tx/datashard/change_exchange_split.cpp | 7 | ||||
| -rw-r--r-- | ydb/core/tx/datashard/change_sender_async_index.cpp | 4 | ||||
| -rw-r--r-- | ydb/core/tx/datashard/change_sender_cdc_stream.cpp | 4 | ||||
| -rw-r--r-- | ydb/core/tx/datashard/change_sender_common_ops.h | 95 | ||||
| -rw-r--r-- | ydb/core/tx/scheme_cache/helpers.h | 105 |
5 files changed, 117 insertions, 98 deletions
diff --git a/ydb/core/tx/datashard/change_exchange_split.cpp b/ydb/core/tx/datashard/change_exchange_split.cpp index 659a9a35862..0bcd44390ed 100644 --- a/ydb/core/tx/datashard/change_exchange_split.cpp +++ b/ydb/core/tx/datashard/change_exchange_split.cpp @@ -6,6 +6,8 @@ #include <ydb/core/base/tablet_pipe.h> #include <ydb/core/persqueue/events/global.h> #include <ydb/core/persqueue/writer/source_id_encoding.h> +#include <ydb/core/tx/scheme_cache/helpers.h> +#include <ydb/core/tx/scheme_cache/scheme_cache.h> #include <ydb/public/lib/base/msgbus_status.h> #include <ydb/library/actors/core/actor_bootstrapped.h> @@ -155,7 +157,10 @@ private: }; // TCdcPartitionWorker -class TCdcWorker: public TActorBootstrapped<TCdcWorker>, private TSchemeCacheHelpers { +class TCdcWorker + : public TActorBootstrapped<TCdcWorker> + , private NSchemeCache::TSchemeCacheHelpers +{ TStringBuf GetLogPrefix() const { if (!LogPrefix) { LogPrefix = TStringBuilder() diff --git a/ydb/core/tx/datashard/change_sender_async_index.cpp b/ydb/core/tx/datashard/change_sender_async_index.cpp index 54f55fed62a..05f6b279723 100644 --- a/ydb/core/tx/datashard/change_sender_async_index.cpp +++ b/ydb/core/tx/datashard/change_sender_async_index.cpp @@ -7,6 +7,8 @@ #include <ydb/core/base/tablet_pipecache.h> #include <ydb/library/services/services.pb.h> #include <ydb/core/tablet_flat/flat_row_eggs.h> +#include <ydb/core/tx/scheme_cache/helpers.h> +#include <ydb/core/tx/scheme_cache/scheme_cache.h> #include <ydb/core/tx/tx_proxy/proxy.h> #include <ydb/library/yql/public/udf/udf_data_type.h> @@ -325,7 +327,7 @@ class TAsyncIndexChangeSenderMain : public TActorBootstrapped<TAsyncIndexChangeSenderMain> , public TBaseChangeSender , public IChangeSenderResolver - , private TSchemeCacheHelpers + , private NSchemeCache::TSchemeCacheHelpers { TStringBuf GetLogPrefix() const { if (!LogPrefix) { diff --git a/ydb/core/tx/datashard/change_sender_cdc_stream.cpp b/ydb/core/tx/datashard/change_sender_cdc_stream.cpp index 1eb33600213..f2334800a2c 100644 --- a/ydb/core/tx/datashard/change_sender_cdc_stream.cpp +++ b/ydb/core/tx/datashard/change_sender_cdc_stream.cpp @@ -8,6 +8,8 @@ #include <ydb/core/persqueue/partition_key_range/partition_key_range.h> #include <ydb/core/persqueue/writer/source_id_encoding.h> #include <ydb/core/persqueue/writer/writer.h> +#include <ydb/core/tx/scheme_cache/helpers.h> +#include <ydb/core/tx/scheme_cache/scheme_cache.h> #include <ydb/services/lib/sharding/sharding.h> #include <ydb/library/actors/core/actor_bootstrapped.h> @@ -288,7 +290,7 @@ class TCdcChangeSenderMain : public TActorBootstrapped<TCdcChangeSenderMain> , public TBaseChangeSender , public IChangeSenderResolver - , private TSchemeCacheHelpers + , private NSchemeCache::TSchemeCacheHelpers { struct TPQPartitionInfo { ui32 PartitionId; diff --git a/ydb/core/tx/datashard/change_sender_common_ops.h b/ydb/core/tx/datashard/change_sender_common_ops.h index dc98913bb34..e9c6a5d5d8b 100644 --- a/ydb/core/tx/datashard/change_sender_common_ops.h +++ b/ydb/core/tx/datashard/change_sender_common_ops.h @@ -4,7 +4,6 @@ #include "change_exchange_helpers.h" #include <ydb/core/base/appdata.h> -#include <ydb/core/tx/scheme_cache/scheme_cache.h> #include <ydb/library/actors/core/actor.h> #include <ydb/library/actors/core/hfunc.h> @@ -170,99 +169,5 @@ private: }; // TBaseChangeSender -struct TSchemeCacheHelpers { - using TNavigate = NSchemeCache::TSchemeCacheNavigate; - using TEvNavigate = TEvTxProxySchemeCache::TEvNavigateKeySet; - using TResolve = NSchemeCache::TSchemeCacheRequest; - using TEvResolve = TEvTxProxySchemeCache::TEvResolveKeySet; - using TCheckFailFunc = std::function<void(const TString&)>; - - inline static TNavigate::TEntry MakeNavigateEntry(const TTableId& tableId, TNavigate::EOp op) { - TNavigate::TEntry entry; - entry.RequestType = TNavigate::TEntry::ERequestType::ByTableId; - entry.TableId = tableId; - entry.Operation = op; - entry.ShowPrivatePath = true; - return entry; - } - - template <typename T> - static bool CheckNotEmpty(const TStringBuf marker, const TAutoPtr<T>& result, TCheckFailFunc onFailure) { - if (result) { - return true; - } - - onFailure(TStringBuilder() << "Empty result at '" << marker << "'"); - return false; - } - - template <typename T> - static bool CheckEntriesCount(const TStringBuf marker, const TAutoPtr<T>& result, ui32 expected, TCheckFailFunc onFailure) { - if (result->ResultSet.size() == expected) { - return true; - } - - onFailure(TStringBuilder() << "Unexpected entries count at '" << marker << "'" - << ": expected# " << expected - << ", got# " << result->ResultSet.size() - << ", result# " << result->ToString(*AppData()->TypeRegistry)); - return false; - } - - inline static const TTableId& GetTableId(const TNavigate::TEntry& entry) { - return entry.TableId; - } - - inline static const TTableId& GetTableId(const TResolve::TEntry& entry) { - return entry.KeyDescription->TableId; - } - - template <typename T> - static bool CheckTableId(const TStringBuf marker, const T& entry, const TTableId& expected, TCheckFailFunc onFailure) { - if (GetTableId(entry).HasSamePath(expected)) { - return true; - } - - onFailure(TStringBuilder() << "Unexpected table id at '" << marker << "'" - << ": expected# " << expected - << ", got# " << GetTableId(entry) - << ", entry# " << entry.ToString()); - return false; - } - - inline static bool IsSucceeded(TNavigate::EStatus status) { - return status == TNavigate::EStatus::Ok; - } - - inline static bool IsSucceeded(TResolve::EStatus status) { - return status == TResolve::EStatus::OkData; - } - - template <typename T> - static bool CheckEntrySucceeded(const TStringBuf marker, const T& entry, TCheckFailFunc onFailure) { - if (IsSucceeded(entry.Status)) { - return true; - } - - onFailure(TStringBuilder() << "Failed entry at '" << marker << "'" - << ": entry# " << entry.ToString()); - return false; - } - - template <typename T> - static bool CheckEntryKind(const TStringBuf marker, const T& entry, TNavigate::EKind expected, TCheckFailFunc onFailure) { - if (entry.Kind == expected) { - return true; - } - - onFailure(TStringBuilder() << "Unexpected entry kind at '" << marker << "'" - << ", expected# " << static_cast<ui32>(expected) - << ", got# " << static_cast<ui32>(entry.Kind) - << ", entry# " << entry.ToString()); - return false; - } - -}; // TSchemeCacheHelpers - } // NDataShard } // NKikimr diff --git a/ydb/core/tx/scheme_cache/helpers.h b/ydb/core/tx/scheme_cache/helpers.h new file mode 100644 index 00000000000..2412505190f --- /dev/null +++ b/ydb/core/tx/scheme_cache/helpers.h @@ -0,0 +1,105 @@ +#pragma once + +#include <ydb/core/tx/scheme_cache/scheme_cache.h> + +#include <util/string/builder.h> + +#include <functional> + +namespace NKikimr::NSchemeCache { + +struct TSchemeCacheHelpers { + using TNavigate = TSchemeCacheNavigate; + using TEvNavigate = TEvTxProxySchemeCache::TEvNavigateKeySet; + using TResolve = TSchemeCacheRequest; + using TEvResolve = TEvTxProxySchemeCache::TEvResolveKeySet; + using TCheckFailFunc = std::function<void(const TString&)>; + + inline static TNavigate::TEntry MakeNavigateEntry(const TTableId& tableId, TNavigate::EOp op) { + TNavigate::TEntry entry; + entry.RequestType = TNavigate::TEntry::ERequestType::ByTableId; + entry.TableId = tableId; + entry.Operation = op; + entry.ShowPrivatePath = true; + return entry; + } + + template <typename T> + static bool CheckNotEmpty(const TStringBuf marker, const TAutoPtr<T>& result, TCheckFailFunc onFailure) { + if (result) { + return true; + } + + onFailure(TStringBuilder() << "Empty result at '" << marker << "'"); + return false; + } + + template <typename T> + static bool CheckEntriesCount(const TStringBuf marker, const TAutoPtr<T>& result, ui32 expected, TCheckFailFunc onFailure) { + if (result->ResultSet.size() == expected) { + return true; + } + + onFailure(TStringBuilder() << "Unexpected entries count at '" << marker << "'" + << ": expected# " << expected + << ", got# " << result->ResultSet.size() + << ", result# " << result->ToString(*AppData()->TypeRegistry)); + return false; + } + + inline static const TTableId& GetTableId(const TNavigate::TEntry& entry) { + return entry.TableId; + } + + inline static const TTableId& GetTableId(const TResolve::TEntry& entry) { + return entry.KeyDescription->TableId; + } + + template <typename T> + static bool CheckTableId(const TStringBuf marker, const T& entry, const TTableId& expected, TCheckFailFunc onFailure) { + if (GetTableId(entry).HasSamePath(expected)) { + return true; + } + + onFailure(TStringBuilder() << "Unexpected table id at '" << marker << "'" + << ": expected# " << expected + << ", got# " << GetTableId(entry) + << ", entry# " << entry.ToString()); + return false; + } + + inline static bool IsSucceeded(TNavigate::EStatus status) { + return status == TNavigate::EStatus::Ok; + } + + inline static bool IsSucceeded(TResolve::EStatus status) { + return status == TResolve::EStatus::OkData; + } + + template <typename T> + static bool CheckEntrySucceeded(const TStringBuf marker, const T& entry, TCheckFailFunc onFailure) { + if (IsSucceeded(entry.Status)) { + return true; + } + + onFailure(TStringBuilder() << "Failed entry at '" << marker << "'" + << ": entry# " << entry.ToString()); + return false; + } + + template <typename T> + static bool CheckEntryKind(const TStringBuf marker, const T& entry, TNavigate::EKind expected, TCheckFailFunc onFailure) { + if (entry.Kind == expected) { + return true; + } + + onFailure(TStringBuilder() << "Unexpected entry kind at '" << marker << "'" + << ", expected# " << static_cast<ui32>(expected) + << ", got# " << static_cast<ui32>(entry.Kind) + << ", entry# " << entry.ToString()); + return false; + } + +}; // TSchemeCacheHelpers + +} |
