aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorMaxim Yurchuk <maxim-yurchuk@ydb.tech>2024-09-19 18:35:37 +0300
committerGitHub <noreply@github.com>2024-09-19 18:35:37 +0300
commit56fbcfc12d0e9b60d405a93e782ffcb084376446 (patch)
tree8a437ef1f5f56a3586aa4e1ef4d64a789072fde8
parent8d954a54ce7c727c029106c90cba767b82dc1325 (diff)
downloadydb-56fbcfc12d0e9b60d405a93e782ffcb084376446.tar.gz
Revert "Support PG types in CDC (#9337)" (#9526)
-rw-r--r--ydb/core/tx/datashard/change_record_cdc_serializer.cpp3
-rw-r--r--ydb/core/tx/datashard/change_sender_cdc_stream.cpp10
-rw-r--r--ydb/core/tx/datashard/datashard_ut_change_exchange.cpp25
-rw-r--r--ydb/core/tx/datashard/export_common.cpp6
-rw-r--r--ydb/core/tx/datashard/export_common.h1
-rw-r--r--ydb/core/tx/datashard/ut_change_exchange/ya.make2
6 files changed, 6 insertions, 41 deletions
diff --git a/ydb/core/tx/datashard/change_record_cdc_serializer.cpp b/ydb/core/tx/datashard/change_record_cdc_serializer.cpp
index fcffe11d67..803a775d10 100644
--- a/ydb/core/tx/datashard/change_record_cdc_serializer.cpp
+++ b/ydb/core/tx/datashard/change_record_cdc_serializer.cpp
@@ -182,7 +182,8 @@ protected:
case NScheme::NTypeIds::Yson:
return YsonToJson(cell.AsBuf());
case NScheme::NTypeIds::Pg:
- return NJson::TJsonValue(PgToString(cell.AsBuf(), type));
+ // TODO: support pg types
+ Y_ABORT("pg types are not supported");
case NScheme::NTypeIds::Uuid:
return NJson::TJsonValue(NUuid::UuidBytesToString(cell.Data()));
default:
diff --git a/ydb/core/tx/datashard/change_sender_cdc_stream.cpp b/ydb/core/tx/datashard/change_sender_cdc_stream.cpp
index de96321c7c..131e5e6dbf 100644
--- a/ydb/core/tx/datashard/change_sender_cdc_stream.cpp
+++ b/ydb/core/tx/datashard/change_sender_cdc_stream.cpp
@@ -9,7 +9,6 @@
#include <ydb/core/persqueue/partition_key_range/partition_key_range.h>
#include <ydb/core/persqueue/writer/source_id_encoding.h>
#include <ydb/core/persqueue/writer/writer.h>
-#include <ydb/core/scheme/protos/type_info.pb.h>
#include <ydb/core/tx/scheme_cache/helpers.h>
#include <ydb/core/tx/scheme_cache/scheme_cache.h>
#include <ydb/library/actors/core/actor_bootstrapped.h>
@@ -620,13 +619,8 @@ class TCdcChangeSenderMain
schema.reserve(pqConfig.PartitionKeySchemaSize());
for (const auto& keySchema : pqConfig.GetPartitionKeySchema()) {
- if (keySchema.GetTypeId() == NScheme::NTypeIds::Pg) {
- schema.push_back(NScheme::TTypeInfo(
- keySchema.GetTypeId(),
- NPg::TypeDescFromPgTypeId(keySchema.GetTypeInfo().GetPgTypeId())));
- } else {
- schema.push_back(NScheme::TTypeInfo(keySchema.GetTypeId()));
- }
+ // TODO: support pg types
+ schema.push_back(NScheme::TTypeInfo(keySchema.GetTypeId()));
}
TSet<TPQPartitionInfo, TPQPartitionInfo::TLess> partitions(schema);
diff --git a/ydb/core/tx/datashard/datashard_ut_change_exchange.cpp b/ydb/core/tx/datashard/datashard_ut_change_exchange.cpp
index 0d30aa73b1..cfeefb3ed3 100644
--- a/ydb/core/tx/datashard/datashard_ut_change_exchange.cpp
+++ b/ydb/core/tx/datashard/datashard_ut_change_exchange.cpp
@@ -838,9 +838,7 @@ Y_UNIT_TEST_SUITE(Cdc) {
.SetEnableChangefeedDebeziumJsonFormat(true)
.SetEnableTopicMessageMeta(true)
.SetEnableChangefeedInitialScan(true)
- .SetEnableUuidAsPrimaryKey(true)
- .SetEnableTablePgTypes(true)
- .SetEnablePgSyntax(true);
+ .SetEnableUuidAsPrimaryKey(true);
Server = new TServer(settings);
if (useRealThreads) {
@@ -1974,13 +1972,6 @@ Y_UNIT_TEST_SUITE(Cdc) {
{"datetime64_value", "Datetime64", false, false},
{"timestamp64_value", "Timestamp64", false, false},
{"interval64_value", "Interval64", false, false},
- {"pgint2_value", "pgint2", false, false},
- {"pgint4_value", "pgint4", false, false},
- {"pgint8_value", "pgint8", false, false},
- {"pgfloat4_value", "pgfloat4", false, false},
- {"pgfloat8_value", "pgfloat8", false, false},
- {"pgbytea_value", "pgbytea", false, false},
- {"pgtext_value", "pgtext", false, false},
});
TopicRunner::Read(table, Updates(NKikimrSchemeOp::ECdcStreamFormatJson), {
R"(UPSERT INTO `/Root/Table` (key, int32_value) VALUES (1, -100500);)",
@@ -2006,13 +1997,6 @@ Y_UNIT_TEST_SUITE(Cdc) {
R"(UPSERT INTO `/Root/Table` (key, datetime64_value) VALUES (21, CAST(1597235696 AS Datetime64));)",
R"(UPSERT INTO `/Root/Table` (key, timestamp64_value) VALUES (22, CAST(1597235696123456 AS Timestamp64));)",
R"(UPSERT INTO `/Root/Table` (key, interval64_value) VALUES (23, CAST(-300500 AS Interval64));)",
- R"(UPSERT INTO `/Root/Table` (key, pgint2_value) VALUES (24, -42ps);)",
- R"(UPSERT INTO `/Root/Table` (key, pgint4_value) VALUES (25, -420p);)",
- R"(UPSERT INTO `/Root/Table` (key, pgint8_value) VALUES (26, -4200pb);)",
- R"(UPSERT INTO `/Root/Table` (key, pgfloat4_value) VALUES (27, 3.1415pf4);)",
- R"(UPSERT INTO `/Root/Table` (key, pgfloat8_value) VALUES (28, 2.718pf8);)",
- R"(UPSERT INTO `/Root/Table` (key, pgbytea_value) VALUES (29, 'lorem "ipsum"'pb);)",
- R"(UPSERT INTO `/Root/Table` (key, pgtext_value) VALUES (30, 'lorem "ipsum"'p);)",
}, {
R"({"key":[1],"update":{"int32_value":-100500}})",
R"({"key":[2],"update":{"uint32_value":100500}})",
@@ -2037,13 +2021,6 @@ Y_UNIT_TEST_SUITE(Cdc) {
R"({"key":[21],"update":{"datetime64_value":1597235696}})",
R"({"key":[22],"update":{"timestamp64_value":1597235696123456}})",
R"({"key":[23],"update":{"interval64_value":-300500}})",
- R"({"key":[24],"update":{"pgint2_value":"-42"}})",
- R"({"key":[25],"update":{"pgint4_value":"-420"}})",
- R"({"key":[26],"update":{"pgint8_value":"-4200"}})",
- R"({"key":[27],"update":{"pgfloat4_value":"3.1415"}})",
- R"({"key":[28],"update":{"pgfloat8_value":"2.718"}})",
- R"({"key":[29],"update":{"pgbytea_value":"\\x6c6f72656d2022697073756d22"}})",
- R"({"key":[30],"update":{"pgtext_value":"lorem \"ipsum\""}})",
});
}
diff --git a/ydb/core/tx/datashard/export_common.cpp b/ydb/core/tx/datashard/export_common.cpp
index 152b8d1714..53cdc71f21 100644
--- a/ydb/core/tx/datashard/export_common.cpp
+++ b/ydb/core/tx/datashard/export_common.cpp
@@ -113,12 +113,6 @@ TString DyNumberToString(TStringBuf data) {
return result;
}
-TString PgToString(TStringBuf data, const NScheme::TTypeInfo& typeInfo) {
- const NPg::TConvertResult& pgResult = NPg::PgNativeTextFromNativeBinary(data, typeInfo.GetPgTypeDesc());
- Y_ABORT_UNLESS(pgResult.Error.Empty());
- return pgResult.Str;
-}
-
bool DecimalToStream(const std::pair<ui64, i64>& loHi, IOutputStream& out, TString& err) {
Y_UNUSED(err);
using namespace NYql::NDecimal;
diff --git a/ydb/core/tx/datashard/export_common.h b/ydb/core/tx/datashard/export_common.h
index 04887f0489..fac97dde1d 100644
--- a/ydb/core/tx/datashard/export_common.h
+++ b/ydb/core/tx/datashard/export_common.h
@@ -41,7 +41,6 @@ TMaybe<Ydb::Scheme::ModifyPermissionsRequest> GenYdbPermissions(
TString DecimalToString(const std::pair<ui64, i64>& loHi);
TString DyNumberToString(TStringBuf data);
-TString PgToString(TStringBuf data, const NScheme::TTypeInfo& typeInfo);
bool DecimalToStream(const std::pair<ui64, i64>& loHi, IOutputStream& out, TString& err);
bool DyNumberToStream(TStringBuf data, IOutputStream& out, TString& err);
bool PgToStream(TStringBuf data, const NScheme::TTypeInfo& typeInfo, IOutputStream& out, TString& err);
diff --git a/ydb/core/tx/datashard/ut_change_exchange/ya.make b/ydb/core/tx/datashard/ut_change_exchange/ya.make
index d3a52ea720..d1621d16cf 100644
--- a/ydb/core/tx/datashard/ut_change_exchange/ya.make
+++ b/ydb/core/tx/datashard/ut_change_exchange/ya.make
@@ -23,7 +23,7 @@ PEERDIR(
library/cpp/regex/pcre
library/cpp/svnversion
ydb/core/kqp/ut/common
- ydb/core/testlib/pg
+ ydb/core/testlib/default
ydb/core/tx
ydb/library/yql/public/udf/service/exception_policy
ydb/public/lib/yson_value