aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorazevaykin <145343289+azevaykin@users.noreply.github.com>2024-06-21 12:42:28 +0300
committerGitHub <noreply@github.com>2024-06-21 12:42:28 +0300
commitfdd25a54021d544dd5cff6a2f546f3953f24f1d4 (patch)
tree2c08c8438982ae46538f678faabf2314c150b9db
parent2db2848755cc0e652fdf28ff1263519f6d52b0ad (diff)
downloadydb-fdd25a54021d544dd5cff6a2f546f3953f24f1d4.tar.gz
Big datetime in Datashard Export/Import (#5761)
-rw-r--r--ydb/core/tx/datashard/export_s3_buffer_raw.cpp8
-rw-r--r--ydb/core/tx/schemeshard/ut_cdc_stream_reboots/ut_cdc_stream_reboots.cpp7
-rw-r--r--ydb/core/tx/schemeshard/ut_export/ut_export.cpp2
-rw-r--r--ydb/core/tx/schemeshard/ut_helpers/helpers.cpp14
-rw-r--r--ydb/core/tx/schemeshard/ut_helpers/helpers.h2
-rw-r--r--ydb/core/tx/schemeshard/ut_helpers/test_env.cpp1
-rw-r--r--ydb/core/tx/schemeshard/ut_restore/ut_restore.cpp202
7 files changed, 220 insertions, 16 deletions
diff --git a/ydb/core/tx/datashard/export_s3_buffer_raw.cpp b/ydb/core/tx/datashard/export_s3_buffer_raw.cpp
index 649c46735e..9db04d1e08 100644
--- a/ydb/core/tx/datashard/export_s3_buffer_raw.cpp
+++ b/ydb/core/tx/datashard/export_s3_buffer_raw.cpp
@@ -105,6 +105,14 @@ bool TS3BufferRaw::Collect(const NTable::IScan::TRow& row, IOutputStream& out) {
case NScheme::NTypeIds::Interval:
serialized = cell.ToStream<i64>(out, ErrorString);
break;
+ case NScheme::NTypeIds::Date32:
+ serialized = cell.ToStream<i32>(out, ErrorString);
+ break;
+ case NScheme::NTypeIds::Datetime64:
+ case NScheme::NTypeIds::Timestamp64:
+ case NScheme::NTypeIds::Interval64:
+ serialized = cell.ToStream<i64>(out, ErrorString);
+ break;
case NScheme::NTypeIds::Decimal:
serialized = DecimalToStream(cell.AsValue<std::pair<ui64, i64>>(), out, ErrorString);
break;
diff --git a/ydb/core/tx/schemeshard/ut_cdc_stream_reboots/ut_cdc_stream_reboots.cpp b/ydb/core/tx/schemeshard/ut_cdc_stream_reboots/ut_cdc_stream_reboots.cpp
index 1224286150..42ecc9f639 100644
--- a/ydb/core/tx/schemeshard/ut_cdc_stream_reboots/ut_cdc_stream_reboots.cpp
+++ b/ydb/core/tx/schemeshard/ut_cdc_stream_reboots/ut_cdc_stream_reboots.cpp
@@ -649,8 +649,8 @@ Y_UNIT_TEST_SUITE(TCdcStreamWithRebootsTests) {
{
TInactiveZone inactive(activeZone);
- UploadRows(runtime, "/MyRoot/Table", 0, {1}, {2}, {1});
- UploadRows(runtime, "/MyRoot/Table", 1, {1}, {2}, {Max<ui32>()});
+ UploadRow(runtime, "/MyRoot/Table", 0, {1}, {2}, {TCell::Make(1u)}, {TCell::Make(1u)});
+ UploadRow(runtime, "/MyRoot/Table", 1, {1}, {2}, {TCell::Make(Max<ui32>())}, {TCell::Make(Max<ui32>())});
CheckRegistrations(runtime, {"/MyRoot/Table", 2}, {"/MyRoot/Table/Stream/streamImpl", 1});
}
});
@@ -694,7 +694,8 @@ Y_UNIT_TEST_SUITE(TCdcStreamWithRebootsTests) {
{
TInactiveZone inactive(activeZone);
- UploadRows(runtime, "/MyRoot/Table", 0, {1}, {2}, {1, Max<ui32>()});
+ UploadRow(runtime, "/MyRoot/Table", 0, {1}, {2}, {TCell::Make(1u)}, {TCell::Make(1u)});
+ UploadRow(runtime, "/MyRoot/Table", 0, {1}, {2}, {TCell::Make(Max<ui32>())}, {TCell::Make(Max<ui32>())});
CheckRegistrations(runtime, {"/MyRoot/Table", 1}, {"/MyRoot/Table/Stream/streamImpl", 2});
}
});
diff --git a/ydb/core/tx/schemeshard/ut_export/ut_export.cpp b/ydb/core/tx/schemeshard/ut_export/ut_export.cpp
index c675323f05..9ba6f342cf 100644
--- a/ydb/core/tx/schemeshard/ut_export/ut_export.cpp
+++ b/ydb/core/tx/schemeshard/ut_export/ut_export.cpp
@@ -1433,7 +1433,7 @@ partitioning_settings {
env.TestWaitNotification(runtime, txId);
// Write bad DyNumber
- UploadRows(runtime, "/MyRoot/Table", 0, {1}, {2}, {1});
+ UploadRow(runtime, "/MyRoot/Table", 0, {1}, {2}, {TCell::Make(1u)}, {TCell::Make(1u)});
TPortManager portManager;
const ui16 port = portManager.GetPort();
diff --git a/ydb/core/tx/schemeshard/ut_helpers/helpers.cpp b/ydb/core/tx/schemeshard/ut_helpers/helpers.cpp
index 3b15ce5fe2..e195634d69 100644
--- a/ydb/core/tx/schemeshard/ut_helpers/helpers.cpp
+++ b/ydb/core/tx/schemeshard/ut_helpers/helpers.cpp
@@ -2312,7 +2312,7 @@ namespace NSchemeShardUT_Private {
UNIT_ASSERT_VALUES_EQUAL(error, "");
}
- void UploadRows(TTestActorRuntime& runtime, const TString& tablePath, int partitionIdx, const TVector<ui32>& keyTags, const TVector<ui32>& valueTags, const TVector<ui32>& recordIds)
+ void UploadRow(TTestActorRuntime& runtime, const TString& tablePath, int partitionIdx, const TVector<ui32>& keyTags, const TVector<ui32>& valueTags, const TVector<TCell>& keys, const TVector<TCell>& values)
{
auto tableDesc = DescribePath(runtime, tablePath, true, true);
const auto& tablePartitions = tableDesc.GetPathDescription().GetTablePartitions();
@@ -2330,15 +2330,9 @@ namespace NSchemeShardUT_Private {
scheme.AddValueColumnIds(tag);
}
- for (ui32 i : recordIds) {
- auto key = TVector<TCell>{TCell::Make(i)};
- auto value = TVector<TCell>{TCell::Make(i)};
- Cerr << value[0].AsBuf().Size() << Endl;
-
- auto& row = *ev->Record.AddRows();
- row.SetKeyColumns(TSerializedCellVec::Serialize(key));
- row.SetValueColumns(TSerializedCellVec::Serialize(value));
- }
+ auto& row = *ev->Record.AddRows();
+ row.SetKeyColumns(TSerializedCellVec::Serialize(keys));
+ row.SetValueColumns(TSerializedCellVec::Serialize(values));
const auto& sender = runtime.AllocateEdgeActor();
ForwardToTablet(runtime, datashardTabletId, sender, ev.Release());
diff --git a/ydb/core/tx/schemeshard/ut_helpers/helpers.h b/ydb/core/tx/schemeshard/ut_helpers/helpers.h
index 740648da47..0ab601402a 100644
--- a/ydb/core/tx/schemeshard/ut_helpers/helpers.h
+++ b/ydb/core/tx/schemeshard/ut_helpers/helpers.h
@@ -559,7 +559,7 @@ namespace NSchemeShardUT_Private {
void WriteToTopic(TTestActorRuntime& runtime, const TString& path, ui32& msgSeqNo, const TString& message);
void UpdateRow(TTestActorRuntime& runtime, const TString& table, const ui32 key, const TString& value, ui64 tabletId = TTestTxConfig::FakeHiveTablets);
void UpdateRowPg(TTestActorRuntime& runtime, const TString& table, const ui32 key, ui32 value, ui64 tabletId = TTestTxConfig::FakeHiveTablets);
- void UploadRows(TTestActorRuntime& runtime, const TString& tablePath, int partitionIdx, const TVector<ui32>& keyTags, const TVector<ui32>& valueTags, const TVector<ui32>& recordIds);
+ void UploadRow(TTestActorRuntime& runtime, const TString& tablePath, int partitionIdx, const TVector<ui32>& keyTags, const TVector<ui32>& valueTags, const TVector<TCell>& keys, const TVector<TCell>& values);
void WriteRow(TTestActorRuntime& runtime, const ui64 txId, const TString& tablePath, int partitionIdx, const ui32 key, const TString& value, bool successIsExpected = true);
void SendNextValRequest(TTestActorRuntime& runtime, const TActorId& sender, const TString& path);
diff --git a/ydb/core/tx/schemeshard/ut_helpers/test_env.cpp b/ydb/core/tx/schemeshard/ut_helpers/test_env.cpp
index 85d8c03993..b8ff7819ea 100644
--- a/ydb/core/tx/schemeshard/ut_helpers/test_env.cpp
+++ b/ydb/core/tx/schemeshard/ut_helpers/test_env.cpp
@@ -526,6 +526,7 @@ NSchemeShardUT_Private::TTestEnv::TTestEnv(TTestActorRuntime& runtime, const TTe
app.SetEnableBackgroundCompaction(opts.EnableBackgroundCompaction_);
app.SetEnableBorrowedSplitCompaction(opts.EnableBorrowedSplitCompaction_);
app.FeatureFlags.SetEnablePublicApiExternalBlobs(true);
+ app.FeatureFlags.SetEnableTableDatetime64(true);
app.SetEnableMoveIndex(opts.EnableMoveIndex_);
app.SetEnableChangefeedInitialScan(opts.EnableChangefeedInitialScan_);
app.SetEnableNotNullDataColumns(opts.EnableNotNullDataColumns_);
diff --git a/ydb/core/tx/schemeshard/ut_restore/ut_restore.cpp b/ydb/core/tx/schemeshard/ut_restore/ut_restore.cpp
index f07ff49e69..d9760ac532 100644
--- a/ydb/core/tx/schemeshard/ut_restore/ut_restore.cpp
+++ b/ydb/core/tx/schemeshard/ut_restore/ut_restore.cpp
@@ -12,6 +12,12 @@
#include <ydb/core/wrappers/ut_helpers/s3_mock.h>
#include <ydb/core/metering/metering.h>
#include <ydb/core/ydb_convert/table_description.h>
+
+#include <ydb/library/binary_json/write.h>
+#include <ydb/library/dynumber/dynumber.h>
+#include <ydb/library/uuid/uuid.h>
+
+
#include <ydb/public/api/protos/ydb_import.pb.h>
#include <contrib/libs/zstd/include/zstd.h>
@@ -665,6 +671,10 @@ value {
<< "2020-08-12T12:34:56.000000Z," // datetime
<< "2020-08-12T12:34:56.123456Z," // timestamp
<< "-300500," // interval
+ << "-18486," // negative date32
+ << "-1597235696," // negative datetime64
+ << "-1597235696123456," // negative timestamp64
+ << "-300500," // negative interval64
<< "3.321," // decimal
<< ".3321e1," // dynumber
<< "\"" << CGIEscapeRet("lorem ipsum") << "\"," // string
@@ -676,7 +686,9 @@ value {
TString yson = TStringBuilder() << "[[[[["
<< "[%true];" // bool
+ << "[\"" << -18486 << "\"];" // date32
<< "[\"" << TInstant::ParseIso8601("2020-08-12T00:00:00.000000Z").Days() << "\"];" // date
+ << "[\"" << -1597235696 << "\"];" // datetime64
<< "[\"" << TInstant::ParseIso8601("2020-08-12T12:34:56.000000Z").Seconds() << "\"];" // datetime
<< "[\"" << "3.321" << "\"];" // decimal
<< "[\"" << 1.1234 << "\"];" // double
@@ -684,11 +696,13 @@ value {
<< "[\"" << -1.123f << "\"];" // float
<< "[\"" << -100500 << "\"];" // int32
<< "[\"" << -200500 << "\"];" // int64
+ << "[\"" << -300500 << "\"];" // interval64
<< "[\"" << -300500 << "\"];" // interval
<< "[\"" << "{\\\"key\\\": \\\"value\\\"}" << "\"];" // json
<< "[\"" << "{\\\"key\\\":\\\"value\\\"}" << "\"];" // jsondoc
<< "[\"" << 1 << "\"];" // key
<< "[\"" << "lorem ipsum" << "\"];" // string
+ << "[\"" << -1597235696123456 << "\"];" // timestamp64
<< "[\"" << TInstant::ParseIso8601("2020-08-12T12:34:56.123456Z").MicroSeconds() << "\"];" // timestamp
<< "[\"" << 100500 << "\"];" // uint32
<< "[\"" << 200500 << "\"];" // uint64
@@ -714,6 +728,10 @@ value {
Columns { Name: "datetime_value" Type: "Datetime" }
Columns { Name: "timestamp_value" Type: "Timestamp" }
Columns { Name: "interval_value" Type: "Interval" }
+ Columns { Name: "date32_value" Type: "Date32" }
+ Columns { Name: "datetime64_value" Type: "Datetime64" }
+ Columns { Name: "timestamp64_value" Type: "Timestamp64" }
+ Columns { Name: "interval64_value" Type: "Interval64" }
Columns { Name: "decimal_value" Type: "Decimal" }
Columns { Name: "dynumber_value" Type: "DyNumber" }
Columns { Name: "string_value" Type: "String" }
@@ -738,6 +756,10 @@ value {
"datetime_value",
"timestamp_value",
"interval_value",
+ "date32_value",
+ "datetime64_value",
+ "timestamp64_value",
+ "interval64_value",
"decimal_value",
"dynumber_value",
"string_value",
@@ -1106,6 +1128,184 @@ value {
UNIT_ASSERT_C(CheckDefaultFromSequence(table), "Invalid default value");
}
+ Y_UNIT_TEST(ExportImportOnSupportedDatatypes) {
+ TTestBasicRuntime runtime;
+ TTestEnv env(runtime, TTestEnvOptions());
+ ui64 txId = 100;
+
+ TestCreateTable(runtime, ++txId, "/MyRoot", R"(
+ Name: "Table"
+ Columns { Name: "key" Type: "Uint64" }
+ Columns { Name: "int32_value" Type: "Int32" }
+ Columns { Name: "uint32_value" Type: "Uint32" }
+ Columns { Name: "int64_value" Type: "Int64" }
+ Columns { Name: "uint64_value" Type: "Uint64" }
+ Columns { Name: "uint8_value" Type: "Uint8" }
+ Columns { Name: "bool_value" Type: "Bool" }
+ Columns { Name: "double_value" Type: "Double" }
+ Columns { Name: "float_value" Type: "Float" }
+ Columns { Name: "date_value" Type: "Date" }
+ Columns { Name: "datetime_value" Type: "Datetime" }
+ Columns { Name: "timestamp_value" Type: "Timestamp" }
+ Columns { Name: "interval_value" Type: "Interval" }
+ Columns { Name: "date32_value" Type: "Date32" }
+ Columns { Name: "datetime64_value" Type: "Datetime64" }
+ Columns { Name: "timestamp64_value" Type: "Timestamp64" }
+ Columns { Name: "interval64_value" Type: "Interval64" }
+ Columns { Name: "decimal_value" Type: "Decimal" }
+ Columns { Name: "dynumber_value" Type: "DyNumber" }
+ Columns { Name: "string_value" Type: "String" }
+ Columns { Name: "utf8_value" Type: "Utf8" }
+ Columns { Name: "json_value" Type: "Json" }
+ Columns { Name: "jsondoc_value" Type: "JsonDocument" }
+ Columns { Name: "uuid_value" Type: "Uuid" }
+ KeyColumnNames: ["key"]
+ )");
+ env.TestWaitNotification(runtime, txId);
+
+ const int partitionIdx = 0;
+
+ const TVector<TCell> keys = {TCell::Make(1ull)};
+
+ const TString string = "test string";
+ const TString json = R"({"key": "value"})";
+ auto binaryJson = NBinaryJson::SerializeToBinaryJson(json);
+ Y_ABORT_UNLESS(binaryJson.Defined());
+
+ const std::pair<ui64, ui64> decimal = NYql::NDecimal::MakePair(NYql::NDecimal::FromString("16.17", NScheme::DECIMAL_PRECISION, NScheme::DECIMAL_SCALE));
+ const TString dynumber = *NDyNumber::ParseDyNumberString("18");
+
+ char uuid[16];
+ NUuid::ParseUuidToArray(TString("65df1ec1-a97d-47b2-ae56-3c023da6ee8c"), reinterpret_cast<ui16*>(uuid), false);
+
+ const TVector<TCell> values = {
+ TCell::Make<i32>(-1), // Int32
+ TCell::Make<ui32>(2), // Uint32
+ TCell::Make<i64>(-3), // Int64
+ TCell::Make<ui64>(4), // Uint64
+ TCell::Make<ui8>(5), // Uint8
+ TCell::Make<bool>(true), // Bool
+ TCell::Make<double>(6.66), // Double
+ TCell::Make<float>(7.77), // Float
+ TCell::Make<ui16>(8), // Date
+ TCell::Make<ui32>(9), // Datetime
+ TCell::Make<ui64>(10), // Timestamp
+ TCell::Make<i64>(-11), // Interval
+ TCell::Make<i32>(-12), // Date32
+ TCell::Make<i64>(-13), // Datetime64
+ TCell::Make<i64>(-14), // Timestamp64
+ TCell::Make<i64>(-15), // Interval64
+ TCell::Make<std::pair<ui64, ui64>>(decimal), // Decimal
+ TCell(dynumber.data(), dynumber.size()), // Dynumber
+ TCell(string.data(), string.size()), // String
+ TCell(string.data(), string.size()), // Utf8
+ TCell(json.data(), json.size()), // Json
+ TCell(binaryJson->Data(), binaryJson->Size()), // JsonDocument
+ TCell(uuid, sizeof(uuid)), // Uuid
+ };
+
+ const TVector<ui32> keyTags = {1};
+ TVector<ui32> valueTags(values.size());
+ std::iota(valueTags.begin(), valueTags.end(), 2);
+
+ UploadRow(runtime, "/MyRoot/Table", partitionIdx, keyTags, valueTags, keys, values);
+
+ TPortManager portManager;
+ const ui16 port = portManager.GetPort();
+
+ TS3Mock s3Mock({}, TS3Mock::TSettings(port));
+ UNIT_ASSERT(s3Mock.Start());
+
+ TestExport(runtime, ++txId, "/MyRoot", Sprintf(R"(
+ ExportToS3Settings {
+ endpoint: "localhost:%d"
+ scheme: HTTP
+ items {
+ source_path: "/MyRoot/Table"
+ destination_prefix: "Backup1"
+ }
+ }
+ )", port));
+ env.TestWaitNotification(runtime, txId);
+ TestGetExport(runtime, txId, "/MyRoot");
+
+ TestImport(runtime, ++txId, "/MyRoot", Sprintf(R"(
+ ImportFromS3Settings {
+ endpoint: "localhost:%d"
+ scheme: HTTP
+ items {
+ source_prefix: "Backup1"
+ destination_path: "/MyRoot/Restored"
+ }
+ }
+ )", port));
+ env.TestWaitNotification(runtime, txId);
+ TestGetImport(runtime, txId, "/MyRoot");
+
+
+ TString expectedJson = TStringBuilder() << "[[[[["
+ << "[%true];" // bool
+ << "[\"" << -12 << "\"];" // date32
+ << "[\"" << 8 << "\"];" // date
+ << "[\"" << -13 << "\"];" // datetime64
+ << "[\"" << 9 << "\"];" // datetime
+ << "[\"" << "16.17" << "\"];" // decimal
+ << "[\"" << 6.66 << "\"];" // double
+ << "[\"" << ".18e2" << "\"];" // dynumber
+ << "[\"" << 7.77f << "\"];" // float
+ << "[\"" << -1 << "\"];" // int32
+ << "[\"" << -3 << "\"];" // int64
+ << "[\"" << -15 << "\"];" // interval64
+ << "[\"" << -11 << "\"];" // interval
+ << "[\"" << "{\\\"key\\\": \\\"value\\\"}" << "\"];" // json
+ << "[\"" << "{\\\"key\\\":\\\"value\\\"}" << "\"];" // jsondoc
+ << "[\"" << 1 << "\"];" // key
+ << "[\"" << "test string" << "\"];" // string
+ << "[\"" << -14 << "\"];" // timestamp64
+ << "[\"" << 10 << "\"];" // timestamp
+ << "[\"" << 2 << "\"];" // uint32
+ << "[\"" << 4 << "\"];" // uint64
+ << "[\"" << 5 << "\"];" // uint8
+ << "[\"" << "test string" << "\"];" // utf8
+ << "[[\"" << "wR7fZX2pskeuVjwCPabujA==" << "\"]]" // uuid
+ << "]];\%false]]]";
+
+ const TReadKeyDesc readKeyDesc = {"key", "Uint64", "0"};
+
+ const TVector<TString> readColumns = {
+ "key",
+ "int32_value",
+ "uint32_value",
+ "int64_value",
+ "uint64_value",
+ "uint8_value",
+ "bool_value",
+ "double_value",
+ "float_value",
+ "date_value",
+ "datetime_value",
+ "timestamp_value",
+ "interval_value",
+ "date32_value",
+ "datetime64_value",
+ "timestamp64_value",
+ "interval64_value",
+ "decimal_value",
+ "dynumber_value",
+ "string_value",
+ "utf8_value",
+ "json_value",
+ "jsondoc_value",
+ "uuid_value",
+ };
+
+ auto contentOriginalTable = ReadTable(runtime, TTestTxConfig::FakeHiveTablets, "Table", readKeyDesc, readColumns);
+ NKqp::CompareYson(expectedJson, contentOriginalTable);
+
+ auto contentRestoredTable = ReadTable(runtime, TTestTxConfig::FakeHiveTablets + 2, "Restored", readKeyDesc, readColumns);
+ NKqp::CompareYson(expectedJson, contentRestoredTable);
+ }
+
Y_UNIT_TEST(ExportImportPg) {
TTestBasicRuntime runtime;
TTestEnv env(runtime, TTestEnvOptions().EnableTablePgTypes(true));
@@ -1119,7 +1319,7 @@ value {
)");
env.TestWaitNotification(runtime, txId);
- UploadRows(runtime, "/MyRoot/Table", 0, {1}, {2}, {55555});
+ UploadRow(runtime, "/MyRoot/Table", 0, {1}, {2}, {TCell::Make(55555u)}, {TCell::Make(55555u)});
TPortManager portManager;
const ui16 port = portManager.GetPort();