diff options
author | azevaykin <145343289+azevaykin@users.noreply.github.com> | 2024-06-21 12:42:28 +0300 |
---|---|---|
committer | GitHub <noreply@github.com> | 2024-06-21 12:42:28 +0300 |
commit | fdd25a54021d544dd5cff6a2f546f3953f24f1d4 (patch) | |
tree | 2c08c8438982ae46538f678faabf2314c150b9db | |
parent | 2db2848755cc0e652fdf28ff1263519f6d52b0ad (diff) | |
download | ydb-fdd25a54021d544dd5cff6a2f546f3953f24f1d4.tar.gz |
Big datetime in Datashard Export/Import (#5761)
7 files changed, 220 insertions, 16 deletions
diff --git a/ydb/core/tx/datashard/export_s3_buffer_raw.cpp b/ydb/core/tx/datashard/export_s3_buffer_raw.cpp index 649c46735e..9db04d1e08 100644 --- a/ydb/core/tx/datashard/export_s3_buffer_raw.cpp +++ b/ydb/core/tx/datashard/export_s3_buffer_raw.cpp @@ -105,6 +105,14 @@ bool TS3BufferRaw::Collect(const NTable::IScan::TRow& row, IOutputStream& out) { case NScheme::NTypeIds::Interval: serialized = cell.ToStream<i64>(out, ErrorString); break; + case NScheme::NTypeIds::Date32: + serialized = cell.ToStream<i32>(out, ErrorString); + break; + case NScheme::NTypeIds::Datetime64: + case NScheme::NTypeIds::Timestamp64: + case NScheme::NTypeIds::Interval64: + serialized = cell.ToStream<i64>(out, ErrorString); + break; case NScheme::NTypeIds::Decimal: serialized = DecimalToStream(cell.AsValue<std::pair<ui64, i64>>(), out, ErrorString); break; diff --git a/ydb/core/tx/schemeshard/ut_cdc_stream_reboots/ut_cdc_stream_reboots.cpp b/ydb/core/tx/schemeshard/ut_cdc_stream_reboots/ut_cdc_stream_reboots.cpp index 1224286150..42ecc9f639 100644 --- a/ydb/core/tx/schemeshard/ut_cdc_stream_reboots/ut_cdc_stream_reboots.cpp +++ b/ydb/core/tx/schemeshard/ut_cdc_stream_reboots/ut_cdc_stream_reboots.cpp @@ -649,8 +649,8 @@ Y_UNIT_TEST_SUITE(TCdcStreamWithRebootsTests) { { TInactiveZone inactive(activeZone); - UploadRows(runtime, "/MyRoot/Table", 0, {1}, {2}, {1}); - UploadRows(runtime, "/MyRoot/Table", 1, {1}, {2}, {Max<ui32>()}); + UploadRow(runtime, "/MyRoot/Table", 0, {1}, {2}, {TCell::Make(1u)}, {TCell::Make(1u)}); + UploadRow(runtime, "/MyRoot/Table", 1, {1}, {2}, {TCell::Make(Max<ui32>())}, {TCell::Make(Max<ui32>())}); CheckRegistrations(runtime, {"/MyRoot/Table", 2}, {"/MyRoot/Table/Stream/streamImpl", 1}); } }); @@ -694,7 +694,8 @@ Y_UNIT_TEST_SUITE(TCdcStreamWithRebootsTests) { { TInactiveZone inactive(activeZone); - UploadRows(runtime, "/MyRoot/Table", 0, {1}, {2}, {1, Max<ui32>()}); + UploadRow(runtime, "/MyRoot/Table", 0, {1}, {2}, {TCell::Make(1u)}, {TCell::Make(1u)}); + UploadRow(runtime, "/MyRoot/Table", 0, {1}, {2}, {TCell::Make(Max<ui32>())}, {TCell::Make(Max<ui32>())}); CheckRegistrations(runtime, {"/MyRoot/Table", 1}, {"/MyRoot/Table/Stream/streamImpl", 2}); } }); diff --git a/ydb/core/tx/schemeshard/ut_export/ut_export.cpp b/ydb/core/tx/schemeshard/ut_export/ut_export.cpp index c675323f05..9ba6f342cf 100644 --- a/ydb/core/tx/schemeshard/ut_export/ut_export.cpp +++ b/ydb/core/tx/schemeshard/ut_export/ut_export.cpp @@ -1433,7 +1433,7 @@ partitioning_settings { env.TestWaitNotification(runtime, txId); // Write bad DyNumber - UploadRows(runtime, "/MyRoot/Table", 0, {1}, {2}, {1}); + UploadRow(runtime, "/MyRoot/Table", 0, {1}, {2}, {TCell::Make(1u)}, {TCell::Make(1u)}); TPortManager portManager; const ui16 port = portManager.GetPort(); diff --git a/ydb/core/tx/schemeshard/ut_helpers/helpers.cpp b/ydb/core/tx/schemeshard/ut_helpers/helpers.cpp index 3b15ce5fe2..e195634d69 100644 --- a/ydb/core/tx/schemeshard/ut_helpers/helpers.cpp +++ b/ydb/core/tx/schemeshard/ut_helpers/helpers.cpp @@ -2312,7 +2312,7 @@ namespace NSchemeShardUT_Private { UNIT_ASSERT_VALUES_EQUAL(error, ""); } - void UploadRows(TTestActorRuntime& runtime, const TString& tablePath, int partitionIdx, const TVector<ui32>& keyTags, const TVector<ui32>& valueTags, const TVector<ui32>& recordIds) + void UploadRow(TTestActorRuntime& runtime, const TString& tablePath, int partitionIdx, const TVector<ui32>& keyTags, const TVector<ui32>& valueTags, const TVector<TCell>& keys, const TVector<TCell>& values) { auto tableDesc = DescribePath(runtime, tablePath, true, true); const auto& tablePartitions = tableDesc.GetPathDescription().GetTablePartitions(); @@ -2330,15 +2330,9 @@ namespace NSchemeShardUT_Private { scheme.AddValueColumnIds(tag); } - for (ui32 i : recordIds) { - auto key = TVector<TCell>{TCell::Make(i)}; - auto value = TVector<TCell>{TCell::Make(i)}; - Cerr << value[0].AsBuf().Size() << Endl; - - auto& row = *ev->Record.AddRows(); - row.SetKeyColumns(TSerializedCellVec::Serialize(key)); - row.SetValueColumns(TSerializedCellVec::Serialize(value)); - } + auto& row = *ev->Record.AddRows(); + row.SetKeyColumns(TSerializedCellVec::Serialize(keys)); + row.SetValueColumns(TSerializedCellVec::Serialize(values)); const auto& sender = runtime.AllocateEdgeActor(); ForwardToTablet(runtime, datashardTabletId, sender, ev.Release()); diff --git a/ydb/core/tx/schemeshard/ut_helpers/helpers.h b/ydb/core/tx/schemeshard/ut_helpers/helpers.h index 740648da47..0ab601402a 100644 --- a/ydb/core/tx/schemeshard/ut_helpers/helpers.h +++ b/ydb/core/tx/schemeshard/ut_helpers/helpers.h @@ -559,7 +559,7 @@ namespace NSchemeShardUT_Private { void WriteToTopic(TTestActorRuntime& runtime, const TString& path, ui32& msgSeqNo, const TString& message); void UpdateRow(TTestActorRuntime& runtime, const TString& table, const ui32 key, const TString& value, ui64 tabletId = TTestTxConfig::FakeHiveTablets); void UpdateRowPg(TTestActorRuntime& runtime, const TString& table, const ui32 key, ui32 value, ui64 tabletId = TTestTxConfig::FakeHiveTablets); - void UploadRows(TTestActorRuntime& runtime, const TString& tablePath, int partitionIdx, const TVector<ui32>& keyTags, const TVector<ui32>& valueTags, const TVector<ui32>& recordIds); + void UploadRow(TTestActorRuntime& runtime, const TString& tablePath, int partitionIdx, const TVector<ui32>& keyTags, const TVector<ui32>& valueTags, const TVector<TCell>& keys, const TVector<TCell>& values); void WriteRow(TTestActorRuntime& runtime, const ui64 txId, const TString& tablePath, int partitionIdx, const ui32 key, const TString& value, bool successIsExpected = true); void SendNextValRequest(TTestActorRuntime& runtime, const TActorId& sender, const TString& path); diff --git a/ydb/core/tx/schemeshard/ut_helpers/test_env.cpp b/ydb/core/tx/schemeshard/ut_helpers/test_env.cpp index 85d8c03993..b8ff7819ea 100644 --- a/ydb/core/tx/schemeshard/ut_helpers/test_env.cpp +++ b/ydb/core/tx/schemeshard/ut_helpers/test_env.cpp @@ -526,6 +526,7 @@ NSchemeShardUT_Private::TTestEnv::TTestEnv(TTestActorRuntime& runtime, const TTe app.SetEnableBackgroundCompaction(opts.EnableBackgroundCompaction_); app.SetEnableBorrowedSplitCompaction(opts.EnableBorrowedSplitCompaction_); app.FeatureFlags.SetEnablePublicApiExternalBlobs(true); + app.FeatureFlags.SetEnableTableDatetime64(true); app.SetEnableMoveIndex(opts.EnableMoveIndex_); app.SetEnableChangefeedInitialScan(opts.EnableChangefeedInitialScan_); app.SetEnableNotNullDataColumns(opts.EnableNotNullDataColumns_); diff --git a/ydb/core/tx/schemeshard/ut_restore/ut_restore.cpp b/ydb/core/tx/schemeshard/ut_restore/ut_restore.cpp index f07ff49e69..d9760ac532 100644 --- a/ydb/core/tx/schemeshard/ut_restore/ut_restore.cpp +++ b/ydb/core/tx/schemeshard/ut_restore/ut_restore.cpp @@ -12,6 +12,12 @@ #include <ydb/core/wrappers/ut_helpers/s3_mock.h> #include <ydb/core/metering/metering.h> #include <ydb/core/ydb_convert/table_description.h> + +#include <ydb/library/binary_json/write.h> +#include <ydb/library/dynumber/dynumber.h> +#include <ydb/library/uuid/uuid.h> + + #include <ydb/public/api/protos/ydb_import.pb.h> #include <contrib/libs/zstd/include/zstd.h> @@ -665,6 +671,10 @@ value { << "2020-08-12T12:34:56.000000Z," // datetime << "2020-08-12T12:34:56.123456Z," // timestamp << "-300500," // interval + << "-18486," // negative date32 + << "-1597235696," // negative datetime64 + << "-1597235696123456," // negative timestamp64 + << "-300500," // negative interval64 << "3.321," // decimal << ".3321e1," // dynumber << "\"" << CGIEscapeRet("lorem ipsum") << "\"," // string @@ -676,7 +686,9 @@ value { TString yson = TStringBuilder() << "[[[[[" << "[%true];" // bool + << "[\"" << -18486 << "\"];" // date32 << "[\"" << TInstant::ParseIso8601("2020-08-12T00:00:00.000000Z").Days() << "\"];" // date + << "[\"" << -1597235696 << "\"];" // datetime64 << "[\"" << TInstant::ParseIso8601("2020-08-12T12:34:56.000000Z").Seconds() << "\"];" // datetime << "[\"" << "3.321" << "\"];" // decimal << "[\"" << 1.1234 << "\"];" // double @@ -684,11 +696,13 @@ value { << "[\"" << -1.123f << "\"];" // float << "[\"" << -100500 << "\"];" // int32 << "[\"" << -200500 << "\"];" // int64 + << "[\"" << -300500 << "\"];" // interval64 << "[\"" << -300500 << "\"];" // interval << "[\"" << "{\\\"key\\\": \\\"value\\\"}" << "\"];" // json << "[\"" << "{\\\"key\\\":\\\"value\\\"}" << "\"];" // jsondoc << "[\"" << 1 << "\"];" // key << "[\"" << "lorem ipsum" << "\"];" // string + << "[\"" << -1597235696123456 << "\"];" // timestamp64 << "[\"" << TInstant::ParseIso8601("2020-08-12T12:34:56.123456Z").MicroSeconds() << "\"];" // timestamp << "[\"" << 100500 << "\"];" // uint32 << "[\"" << 200500 << "\"];" // uint64 @@ -714,6 +728,10 @@ value { Columns { Name: "datetime_value" Type: "Datetime" } Columns { Name: "timestamp_value" Type: "Timestamp" } Columns { Name: "interval_value" Type: "Interval" } + Columns { Name: "date32_value" Type: "Date32" } + Columns { Name: "datetime64_value" Type: "Datetime64" } + Columns { Name: "timestamp64_value" Type: "Timestamp64" } + Columns { Name: "interval64_value" Type: "Interval64" } Columns { Name: "decimal_value" Type: "Decimal" } Columns { Name: "dynumber_value" Type: "DyNumber" } Columns { Name: "string_value" Type: "String" } @@ -738,6 +756,10 @@ value { "datetime_value", "timestamp_value", "interval_value", + "date32_value", + "datetime64_value", + "timestamp64_value", + "interval64_value", "decimal_value", "dynumber_value", "string_value", @@ -1106,6 +1128,184 @@ value { UNIT_ASSERT_C(CheckDefaultFromSequence(table), "Invalid default value"); } + Y_UNIT_TEST(ExportImportOnSupportedDatatypes) { + TTestBasicRuntime runtime; + TTestEnv env(runtime, TTestEnvOptions()); + ui64 txId = 100; + + TestCreateTable(runtime, ++txId, "/MyRoot", R"( + Name: "Table" + Columns { Name: "key" Type: "Uint64" } + Columns { Name: "int32_value" Type: "Int32" } + Columns { Name: "uint32_value" Type: "Uint32" } + Columns { Name: "int64_value" Type: "Int64" } + Columns { Name: "uint64_value" Type: "Uint64" } + Columns { Name: "uint8_value" Type: "Uint8" } + Columns { Name: "bool_value" Type: "Bool" } + Columns { Name: "double_value" Type: "Double" } + Columns { Name: "float_value" Type: "Float" } + Columns { Name: "date_value" Type: "Date" } + Columns { Name: "datetime_value" Type: "Datetime" } + Columns { Name: "timestamp_value" Type: "Timestamp" } + Columns { Name: "interval_value" Type: "Interval" } + Columns { Name: "date32_value" Type: "Date32" } + Columns { Name: "datetime64_value" Type: "Datetime64" } + Columns { Name: "timestamp64_value" Type: "Timestamp64" } + Columns { Name: "interval64_value" Type: "Interval64" } + Columns { Name: "decimal_value" Type: "Decimal" } + Columns { Name: "dynumber_value" Type: "DyNumber" } + Columns { Name: "string_value" Type: "String" } + Columns { Name: "utf8_value" Type: "Utf8" } + Columns { Name: "json_value" Type: "Json" } + Columns { Name: "jsondoc_value" Type: "JsonDocument" } + Columns { Name: "uuid_value" Type: "Uuid" } + KeyColumnNames: ["key"] + )"); + env.TestWaitNotification(runtime, txId); + + const int partitionIdx = 0; + + const TVector<TCell> keys = {TCell::Make(1ull)}; + + const TString string = "test string"; + const TString json = R"({"key": "value"})"; + auto binaryJson = NBinaryJson::SerializeToBinaryJson(json); + Y_ABORT_UNLESS(binaryJson.Defined()); + + const std::pair<ui64, ui64> decimal = NYql::NDecimal::MakePair(NYql::NDecimal::FromString("16.17", NScheme::DECIMAL_PRECISION, NScheme::DECIMAL_SCALE)); + const TString dynumber = *NDyNumber::ParseDyNumberString("18"); + + char uuid[16]; + NUuid::ParseUuidToArray(TString("65df1ec1-a97d-47b2-ae56-3c023da6ee8c"), reinterpret_cast<ui16*>(uuid), false); + + const TVector<TCell> values = { + TCell::Make<i32>(-1), // Int32 + TCell::Make<ui32>(2), // Uint32 + TCell::Make<i64>(-3), // Int64 + TCell::Make<ui64>(4), // Uint64 + TCell::Make<ui8>(5), // Uint8 + TCell::Make<bool>(true), // Bool + TCell::Make<double>(6.66), // Double + TCell::Make<float>(7.77), // Float + TCell::Make<ui16>(8), // Date + TCell::Make<ui32>(9), // Datetime + TCell::Make<ui64>(10), // Timestamp + TCell::Make<i64>(-11), // Interval + TCell::Make<i32>(-12), // Date32 + TCell::Make<i64>(-13), // Datetime64 + TCell::Make<i64>(-14), // Timestamp64 + TCell::Make<i64>(-15), // Interval64 + TCell::Make<std::pair<ui64, ui64>>(decimal), // Decimal + TCell(dynumber.data(), dynumber.size()), // Dynumber + TCell(string.data(), string.size()), // String + TCell(string.data(), string.size()), // Utf8 + TCell(json.data(), json.size()), // Json + TCell(binaryJson->Data(), binaryJson->Size()), // JsonDocument + TCell(uuid, sizeof(uuid)), // Uuid + }; + + const TVector<ui32> keyTags = {1}; + TVector<ui32> valueTags(values.size()); + std::iota(valueTags.begin(), valueTags.end(), 2); + + UploadRow(runtime, "/MyRoot/Table", partitionIdx, keyTags, valueTags, keys, values); + + TPortManager portManager; + const ui16 port = portManager.GetPort(); + + TS3Mock s3Mock({}, TS3Mock::TSettings(port)); + UNIT_ASSERT(s3Mock.Start()); + + TestExport(runtime, ++txId, "/MyRoot", Sprintf(R"( + ExportToS3Settings { + endpoint: "localhost:%d" + scheme: HTTP + items { + source_path: "/MyRoot/Table" + destination_prefix: "Backup1" + } + } + )", port)); + env.TestWaitNotification(runtime, txId); + TestGetExport(runtime, txId, "/MyRoot"); + + TestImport(runtime, ++txId, "/MyRoot", Sprintf(R"( + ImportFromS3Settings { + endpoint: "localhost:%d" + scheme: HTTP + items { + source_prefix: "Backup1" + destination_path: "/MyRoot/Restored" + } + } + )", port)); + env.TestWaitNotification(runtime, txId); + TestGetImport(runtime, txId, "/MyRoot"); + + + TString expectedJson = TStringBuilder() << "[[[[[" + << "[%true];" // bool + << "[\"" << -12 << "\"];" // date32 + << "[\"" << 8 << "\"];" // date + << "[\"" << -13 << "\"];" // datetime64 + << "[\"" << 9 << "\"];" // datetime + << "[\"" << "16.17" << "\"];" // decimal + << "[\"" << 6.66 << "\"];" // double + << "[\"" << ".18e2" << "\"];" // dynumber + << "[\"" << 7.77f << "\"];" // float + << "[\"" << -1 << "\"];" // int32 + << "[\"" << -3 << "\"];" // int64 + << "[\"" << -15 << "\"];" // interval64 + << "[\"" << -11 << "\"];" // interval + << "[\"" << "{\\\"key\\\": \\\"value\\\"}" << "\"];" // json + << "[\"" << "{\\\"key\\\":\\\"value\\\"}" << "\"];" // jsondoc + << "[\"" << 1 << "\"];" // key + << "[\"" << "test string" << "\"];" // string + << "[\"" << -14 << "\"];" // timestamp64 + << "[\"" << 10 << "\"];" // timestamp + << "[\"" << 2 << "\"];" // uint32 + << "[\"" << 4 << "\"];" // uint64 + << "[\"" << 5 << "\"];" // uint8 + << "[\"" << "test string" << "\"];" // utf8 + << "[[\"" << "wR7fZX2pskeuVjwCPabujA==" << "\"]]" // uuid + << "]];\%false]]]"; + + const TReadKeyDesc readKeyDesc = {"key", "Uint64", "0"}; + + const TVector<TString> readColumns = { + "key", + "int32_value", + "uint32_value", + "int64_value", + "uint64_value", + "uint8_value", + "bool_value", + "double_value", + "float_value", + "date_value", + "datetime_value", + "timestamp_value", + "interval_value", + "date32_value", + "datetime64_value", + "timestamp64_value", + "interval64_value", + "decimal_value", + "dynumber_value", + "string_value", + "utf8_value", + "json_value", + "jsondoc_value", + "uuid_value", + }; + + auto contentOriginalTable = ReadTable(runtime, TTestTxConfig::FakeHiveTablets, "Table", readKeyDesc, readColumns); + NKqp::CompareYson(expectedJson, contentOriginalTable); + + auto contentRestoredTable = ReadTable(runtime, TTestTxConfig::FakeHiveTablets + 2, "Restored", readKeyDesc, readColumns); + NKqp::CompareYson(expectedJson, contentRestoredTable); + } + Y_UNIT_TEST(ExportImportPg) { TTestBasicRuntime runtime; TTestEnv env(runtime, TTestEnvOptions().EnableTablePgTypes(true)); @@ -1119,7 +1319,7 @@ value { )"); env.TestWaitNotification(runtime, txId); - UploadRows(runtime, "/MyRoot/Table", 0, {1}, {2}, {55555}); + UploadRow(runtime, "/MyRoot/Table", 0, {1}, {2}, {TCell::Make(55555u)}, {TCell::Make(55555u)}); TPortManager portManager; const ui16 port = portManager.GetPort(); |