diff options
author | Maxim Yurchuk <maxim-yurchuk@ydb.tech> | 2024-09-19 18:35:37 +0300 |
---|---|---|
committer | GitHub <noreply@github.com> | 2024-09-19 18:35:37 +0300 |
commit | 56fbcfc12d0e9b60d405a93e782ffcb084376446 (patch) | |
tree | 8a437ef1f5f56a3586aa4e1ef4d64a789072fde8 | |
parent | 8d954a54ce7c727c029106c90cba767b82dc1325 (diff) | |
download | ydb-56fbcfc12d0e9b60d405a93e782ffcb084376446.tar.gz |
Revert "Support PG types in CDC (#9337)" (#9526)
-rw-r--r-- | ydb/core/tx/datashard/change_record_cdc_serializer.cpp | 3 | ||||
-rw-r--r-- | ydb/core/tx/datashard/change_sender_cdc_stream.cpp | 10 | ||||
-rw-r--r-- | ydb/core/tx/datashard/datashard_ut_change_exchange.cpp | 25 | ||||
-rw-r--r-- | ydb/core/tx/datashard/export_common.cpp | 6 | ||||
-rw-r--r-- | ydb/core/tx/datashard/export_common.h | 1 | ||||
-rw-r--r-- | ydb/core/tx/datashard/ut_change_exchange/ya.make | 2 |
6 files changed, 6 insertions, 41 deletions
diff --git a/ydb/core/tx/datashard/change_record_cdc_serializer.cpp b/ydb/core/tx/datashard/change_record_cdc_serializer.cpp index fcffe11d67..803a775d10 100644 --- a/ydb/core/tx/datashard/change_record_cdc_serializer.cpp +++ b/ydb/core/tx/datashard/change_record_cdc_serializer.cpp @@ -182,7 +182,8 @@ protected: case NScheme::NTypeIds::Yson: return YsonToJson(cell.AsBuf()); case NScheme::NTypeIds::Pg: - return NJson::TJsonValue(PgToString(cell.AsBuf(), type)); + // TODO: support pg types + Y_ABORT("pg types are not supported"); case NScheme::NTypeIds::Uuid: return NJson::TJsonValue(NUuid::UuidBytesToString(cell.Data())); default: diff --git a/ydb/core/tx/datashard/change_sender_cdc_stream.cpp b/ydb/core/tx/datashard/change_sender_cdc_stream.cpp index de96321c7c..131e5e6dbf 100644 --- a/ydb/core/tx/datashard/change_sender_cdc_stream.cpp +++ b/ydb/core/tx/datashard/change_sender_cdc_stream.cpp @@ -9,7 +9,6 @@ #include <ydb/core/persqueue/partition_key_range/partition_key_range.h> #include <ydb/core/persqueue/writer/source_id_encoding.h> #include <ydb/core/persqueue/writer/writer.h> -#include <ydb/core/scheme/protos/type_info.pb.h> #include <ydb/core/tx/scheme_cache/helpers.h> #include <ydb/core/tx/scheme_cache/scheme_cache.h> #include <ydb/library/actors/core/actor_bootstrapped.h> @@ -620,13 +619,8 @@ class TCdcChangeSenderMain schema.reserve(pqConfig.PartitionKeySchemaSize()); for (const auto& keySchema : pqConfig.GetPartitionKeySchema()) { - if (keySchema.GetTypeId() == NScheme::NTypeIds::Pg) { - schema.push_back(NScheme::TTypeInfo( - keySchema.GetTypeId(), - NPg::TypeDescFromPgTypeId(keySchema.GetTypeInfo().GetPgTypeId()))); - } else { - schema.push_back(NScheme::TTypeInfo(keySchema.GetTypeId())); - } + // TODO: support pg types + schema.push_back(NScheme::TTypeInfo(keySchema.GetTypeId())); } TSet<TPQPartitionInfo, TPQPartitionInfo::TLess> partitions(schema); diff --git a/ydb/core/tx/datashard/datashard_ut_change_exchange.cpp b/ydb/core/tx/datashard/datashard_ut_change_exchange.cpp index 0d30aa73b1..cfeefb3ed3 100644 --- a/ydb/core/tx/datashard/datashard_ut_change_exchange.cpp +++ b/ydb/core/tx/datashard/datashard_ut_change_exchange.cpp @@ -838,9 +838,7 @@ Y_UNIT_TEST_SUITE(Cdc) { .SetEnableChangefeedDebeziumJsonFormat(true) .SetEnableTopicMessageMeta(true) .SetEnableChangefeedInitialScan(true) - .SetEnableUuidAsPrimaryKey(true) - .SetEnableTablePgTypes(true) - .SetEnablePgSyntax(true); + .SetEnableUuidAsPrimaryKey(true); Server = new TServer(settings); if (useRealThreads) { @@ -1974,13 +1972,6 @@ Y_UNIT_TEST_SUITE(Cdc) { {"datetime64_value", "Datetime64", false, false}, {"timestamp64_value", "Timestamp64", false, false}, {"interval64_value", "Interval64", false, false}, - {"pgint2_value", "pgint2", false, false}, - {"pgint4_value", "pgint4", false, false}, - {"pgint8_value", "pgint8", false, false}, - {"pgfloat4_value", "pgfloat4", false, false}, - {"pgfloat8_value", "pgfloat8", false, false}, - {"pgbytea_value", "pgbytea", false, false}, - {"pgtext_value", "pgtext", false, false}, }); TopicRunner::Read(table, Updates(NKikimrSchemeOp::ECdcStreamFormatJson), { R"(UPSERT INTO `/Root/Table` (key, int32_value) VALUES (1, -100500);)", @@ -2006,13 +1997,6 @@ Y_UNIT_TEST_SUITE(Cdc) { R"(UPSERT INTO `/Root/Table` (key, datetime64_value) VALUES (21, CAST(1597235696 AS Datetime64));)", R"(UPSERT INTO `/Root/Table` (key, timestamp64_value) VALUES (22, CAST(1597235696123456 AS Timestamp64));)", R"(UPSERT INTO `/Root/Table` (key, interval64_value) VALUES (23, CAST(-300500 AS Interval64));)", - R"(UPSERT INTO `/Root/Table` (key, pgint2_value) VALUES (24, -42ps);)", - R"(UPSERT INTO `/Root/Table` (key, pgint4_value) VALUES (25, -420p);)", - R"(UPSERT INTO `/Root/Table` (key, pgint8_value) VALUES (26, -4200pb);)", - R"(UPSERT INTO `/Root/Table` (key, pgfloat4_value) VALUES (27, 3.1415pf4);)", - R"(UPSERT INTO `/Root/Table` (key, pgfloat8_value) VALUES (28, 2.718pf8);)", - R"(UPSERT INTO `/Root/Table` (key, pgbytea_value) VALUES (29, 'lorem "ipsum"'pb);)", - R"(UPSERT INTO `/Root/Table` (key, pgtext_value) VALUES (30, 'lorem "ipsum"'p);)", }, { R"({"key":[1],"update":{"int32_value":-100500}})", R"({"key":[2],"update":{"uint32_value":100500}})", @@ -2037,13 +2021,6 @@ Y_UNIT_TEST_SUITE(Cdc) { R"({"key":[21],"update":{"datetime64_value":1597235696}})", R"({"key":[22],"update":{"timestamp64_value":1597235696123456}})", R"({"key":[23],"update":{"interval64_value":-300500}})", - R"({"key":[24],"update":{"pgint2_value":"-42"}})", - R"({"key":[25],"update":{"pgint4_value":"-420"}})", - R"({"key":[26],"update":{"pgint8_value":"-4200"}})", - R"({"key":[27],"update":{"pgfloat4_value":"3.1415"}})", - R"({"key":[28],"update":{"pgfloat8_value":"2.718"}})", - R"({"key":[29],"update":{"pgbytea_value":"\\x6c6f72656d2022697073756d22"}})", - R"({"key":[30],"update":{"pgtext_value":"lorem \"ipsum\""}})", }); } diff --git a/ydb/core/tx/datashard/export_common.cpp b/ydb/core/tx/datashard/export_common.cpp index 152b8d1714..53cdc71f21 100644 --- a/ydb/core/tx/datashard/export_common.cpp +++ b/ydb/core/tx/datashard/export_common.cpp @@ -113,12 +113,6 @@ TString DyNumberToString(TStringBuf data) { return result; } -TString PgToString(TStringBuf data, const NScheme::TTypeInfo& typeInfo) { - const NPg::TConvertResult& pgResult = NPg::PgNativeTextFromNativeBinary(data, typeInfo.GetPgTypeDesc()); - Y_ABORT_UNLESS(pgResult.Error.Empty()); - return pgResult.Str; -} - bool DecimalToStream(const std::pair<ui64, i64>& loHi, IOutputStream& out, TString& err) { Y_UNUSED(err); using namespace NYql::NDecimal; diff --git a/ydb/core/tx/datashard/export_common.h b/ydb/core/tx/datashard/export_common.h index 04887f0489..fac97dde1d 100644 --- a/ydb/core/tx/datashard/export_common.h +++ b/ydb/core/tx/datashard/export_common.h @@ -41,7 +41,6 @@ TMaybe<Ydb::Scheme::ModifyPermissionsRequest> GenYdbPermissions( TString DecimalToString(const std::pair<ui64, i64>& loHi); TString DyNumberToString(TStringBuf data); -TString PgToString(TStringBuf data, const NScheme::TTypeInfo& typeInfo); bool DecimalToStream(const std::pair<ui64, i64>& loHi, IOutputStream& out, TString& err); bool DyNumberToStream(TStringBuf data, IOutputStream& out, TString& err); bool PgToStream(TStringBuf data, const NScheme::TTypeInfo& typeInfo, IOutputStream& out, TString& err); diff --git a/ydb/core/tx/datashard/ut_change_exchange/ya.make b/ydb/core/tx/datashard/ut_change_exchange/ya.make index d3a52ea720..d1621d16cf 100644 --- a/ydb/core/tx/datashard/ut_change_exchange/ya.make +++ b/ydb/core/tx/datashard/ut_change_exchange/ya.make @@ -23,7 +23,7 @@ PEERDIR( library/cpp/regex/pcre library/cpp/svnversion ydb/core/kqp/ut/common - ydb/core/testlib/pg + ydb/core/testlib/default ydb/core/tx ydb/library/yql/public/udf/service/exception_policy ydb/public/lib/yson_value |