diff options
author | mzinal <mzinal@yandex-team.com> | 2022-09-19 15:00:32 +0300 |
---|---|---|
committer | mzinal <mzinal@yandex-team.com> | 2022-09-19 15:00:32 +0300 |
commit | cdb11b6f2fcb2acb6d449384db45dfd89013c07f (patch) | |
tree | 1ca28bb743bf631d5703e6bfcb34c379b2dce79f | |
parent | 67395a78c86cf26222cf9d3f410c4feb09453ffe (diff) | |
download | ydb-cdb11b6f2fcb2acb6d449384db45dfd89013c07f.tar.gz |
PR from branch users/mzinal/
Data import and export enhancements for YDB CLI
21 files changed, 483 insertions, 100 deletions
diff --git a/ydb/public/lib/ydb_cli/commands/ydb_service_import.cpp b/ydb/public/lib/ydb_cli/commands/ydb_service_import.cpp index 846383dd20b..4a74452c628 100644 --- a/ydb/public/lib/ydb_cli/commands/ydb_service_import.cpp +++ b/ydb/public/lib/ydb_cli/commands/ydb_service_import.cpp @@ -130,58 +130,35 @@ TCommandImportFromFile::TCommandImportFromFile() { AddCommand(std::make_unique<TCommandImportFromCsv>()); AddCommand(std::make_unique<TCommandImportFromTsv>()); + AddCommand(std::make_unique<TCommandImportFromJson>()); } -/// CSV +/// Import File Shared Config -TCommandImportFromCsv::TCommandImportFromCsv(const TString& cmd, const TString& cmdDescription) - : TYdbCommand(cmd, {}, cmdDescription) -{} - -void TCommandImportFromCsv::Config(TConfig& config) { +void TCommandImportFileBase::Config(TConfig& config) { TYdbCommand::Config(config); config.SetFreeArgsNum(0); config.Opts->AddLongOption('p', "path", "Database path to table") .Required().RequiredArgument("STRING").StoreResult(&Path); - config.Opts->AddLongOption("input-file", "Path to file to import in a local filesystem") - .Required().RequiredArgument("STRING").StoreResult(&FilePath); - config.Opts->AddLongOption("skip-rows", - "Number of header rows to skip (not including the row of column names, if any)") - .RequiredArgument("NUM").StoreResult(&SkipRows).DefaultValue(SkipRows); - config.Opts->AddLongOption("header", - "Set if file contains column names at the first not skipped row") - .StoreTrue(&Header); - if (InputFormat == EOutputFormat::Csv) { - config.Opts->AddLongOption("delimiter", "Field delimiter in rows") - .RequiredArgument("STRING").StoreResult(&Delimiter).DefaultValue(Delimiter); - } - config.Opts->AddLongOption("null-value", "Value that would be interpreted as NULL") - .RequiredArgument("STRING").StoreResult(&NullValue).DefaultValue(NullValue); - // TODO: quoting/quote_char - TImportFileSettings defaults; + config.Opts->AddLongOption('i', "input-file", + "Path to file to be imported, standard input if empty or not specified") + .StoreResult(&FilePath).DefaultValue(FilePath); + const TImportFileSettings defaults; config.Opts->AddLongOption("batch-bytes", "Use portions of this size in bytes to parse and upload file data") .DefaultValue(HumanReadableSize(defaults.BytesPerRequest_, SF_BYTES)).StoreResult(&BytesPerRequest); - config.Opts->AddLongOption("max-in-flight", "Maximum number of in-flight requests; increase to load big files faster (more memory needed)") .DefaultValue(defaults.MaxInFlightRequests_).StoreResult(&MaxInFlightRequests); } -void TCommandImportFromCsv::Parse(TConfig& config) { - TClientCommand::Parse(config); +void TCommandImportFileBase::Parse(TConfig& config) { + TYdbCommand::Parse(config); AdjustPath(config); -} - -int TCommandImportFromCsv::Run(TConfig& config) { - TImportFileSettings settings; - settings.SkipRows(SkipRows); - settings.Header(Header); - settings.NullValue(NullValue); if (auto bytesPerRequest = NYdb::SizeFromString(BytesPerRequest)) { if (bytesPerRequest > TImportFileSettings::MaxBytesPerRequest) { @@ -189,14 +166,42 @@ int TCommandImportFromCsv::Run(TConfig& config) { << "--batch-bytes cannot be larger than " << HumanReadableSize(TImportFileSettings::MaxBytesPerRequest, SF_BYTES); } - - settings.BytesPerRequest(bytesPerRequest); } if (MaxInFlightRequests == 0) { - MaxInFlightRequests = 1; + throw TMisuseException() + << "--max-in-flight must be greater than zero"; + } +} + +/// Import CSV + +void TCommandImportFromCsv::Config(TConfig& config) { + TCommandImportFileBase::Config(config); + + config.Opts->AddLongOption("skip-rows", + "Number of header rows to skip (not including the row of column names, if any)") + .RequiredArgument("NUM").StoreResult(&SkipRows).DefaultValue(SkipRows); + config.Opts->AddLongOption("header", + "Set if file contains column names at the first not skipped row") + .StoreTrue(&Header); + if (InputFormat == EOutputFormat::Csv) { + config.Opts->AddLongOption("delimiter", "Field delimiter in rows") + .RequiredArgument("STRING").StoreResult(&Delimiter).DefaultValue(Delimiter); } + config.Opts->AddLongOption("null-value", "Value that would be interpreted as NULL") + .RequiredArgument("STRING").StoreResult(&NullValue).DefaultValue(NullValue); + // TODO: quoting/quote_char +} + +int TCommandImportFromCsv::Run(TConfig& config) { + TImportFileSettings settings; + settings.Format(InputFormat); settings.MaxInFlightRequests(MaxInFlightRequests); + settings.BytesPerRequest(NYdb::SizeFromString(BytesPerRequest)); + settings.SkipRows(SkipRows); + settings.Header(Header); + settings.NullValue(NullValue); if (Delimiter.size() != 1) { throw TMisuseException() @@ -211,5 +216,35 @@ int TCommandImportFromCsv::Run(TConfig& config) { return EXIT_SUCCESS; } + +/// Import JSON + +void TCommandImportFromJson::Config(TConfig& config) { + TCommandImportFileBase::Config(config); + + AddInputFormats(config, { + EOutputFormat::JsonUnicode, + EOutputFormat::JsonBase64 + }); +} + +void TCommandImportFromJson::Parse(TConfig& config) { + TCommandImportFileBase::Parse(config); + + ParseFormats(); +} + +int TCommandImportFromJson::Run(TConfig& config) { + TImportFileSettings settings; + settings.Format(InputFormat); + settings.MaxInFlightRequests(MaxInFlightRequests); + settings.BytesPerRequest(NYdb::SizeFromString(BytesPerRequest)); + + TImportFileClient client(CreateDriver(config)); + ThrowOnError(client.Import(FilePath, Path, settings)); + + return EXIT_SUCCESS; +} + } } diff --git a/ydb/public/lib/ydb_cli/commands/ydb_service_import.h b/ydb/public/lib/ydb_cli/commands/ydb_service_import.h index 04dc1010476..af304b92a63 100644 --- a/ydb/public/lib/ydb_cli/commands/ydb_service_import.h +++ b/ydb/public/lib/ydb_cli/commands/ydb_service_import.h @@ -22,9 +22,9 @@ class TCommandImportFromS3 : public TYdbOperationCommand, public TCommandWithFormat { public: TCommandImportFromS3(); - virtual void Config(TConfig& config) override; - virtual void Parse(TConfig& config) override; - virtual int Run(TConfig& config) override; + void Config(TConfig& config) override; + void Parse(TConfig& config) override; + int Run(TConfig& config) override; private: struct TItemFields { @@ -46,23 +46,37 @@ public: TCommandImportFromFile(); }; -class TCommandImportFromCsv : public TYdbCommand, - public TCommandWithPath { +class TCommandImportFileBase : public TYdbCommand, + public TCommandWithPath, public TCommandWithFormat { public: - TCommandImportFromCsv(const TString& cmd = "csv", const TString& cmdDescription = "Import data from CSV file"); - virtual void Config(TConfig& config) override; - virtual void Parse(TConfig& config) override; - virtual int Run(TConfig& config) override; + TCommandImportFileBase(const TString& cmd, const TString& cmdDescription) + : TYdbCommand(cmd, {}, cmdDescription) + {} + void Config(TConfig& config) override; + void Parse(TConfig& config) override; protected: - EOutputFormat InputFormat = EOutputFormat::Csv; - TString FilePath; - TString Delimiter = ","; + TString FilePath; // Read from stdin if the file path is empty + TString BytesPerRequest; + ui64 MaxInFlightRequests = 1; +}; + +class TCommandImportFromCsv : public TCommandImportFileBase { +public: + TCommandImportFromCsv(const TString& cmd = "csv", const TString& cmdDescription = "Import data from CSV file") + : TCommandImportFileBase(cmd, cmdDescription) + { + InputFormat = EOutputFormat::Csv; + Delimiter = ","; + } + void Config(TConfig& config) override; + int Run(TConfig& config) override; + +protected: + TString Delimiter; TString NullValue; ui32 SkipRows = 0; bool Header = false; - TString BytesPerRequest; - ui64 MaxInFlightRequests = 1; }; class TCommandImportFromTsv : public TCommandImportFromCsv { @@ -75,5 +89,17 @@ public: } }; +class TCommandImportFromJson : public TCommandImportFileBase { +public: + TCommandImportFromJson() + : TCommandImportFileBase("json", "Import data from JSON file") + { + InputFormat = EOutputFormat::JsonUnicode; + } + void Config(TConfig& config) override; + void Parse(TConfig& config) override; + int Run(TConfig& config) override; +}; + } } diff --git a/ydb/public/lib/ydb_cli/commands/ydb_service_table.cpp b/ydb/public/lib/ydb_cli/commands/ydb_service_table.cpp index 0746da4fb4e..a9648edf1bc 100644 --- a/ydb/public/lib/ydb_cli/commands/ydb_service_table.cpp +++ b/ydb/public/lib/ydb_cli/commands/ydb_service_table.cpp @@ -362,13 +362,14 @@ void TCommandExecuteQuery::Config(TConfig& config) { EOutputFormat::JsonBase64 }); - AddFormats(config, { EOutputFormat::Pretty, EOutputFormat::JsonUnicode, EOutputFormat::JsonUnicodeArray, EOutputFormat::JsonBase64, - EOutputFormat::JsonBase64Array + EOutputFormat::JsonBase64Array, + EOutputFormat::Csv, + EOutputFormat::Tsv }); CheckExamples(config); @@ -699,12 +700,11 @@ void TCommandReadTable::Config(TConfig& config) { EOutputFormat::JsonUnicode, EOutputFormat::JsonUnicodeArray, EOutputFormat::JsonBase64, - EOutputFormat::JsonBase64Array + EOutputFormat::JsonBase64Array, + EOutputFormat::Csv, + EOutputFormat::Tsv }); - // TODO: KIKIMR-8675 - // Add csv format - config.SetFreeArgsNum(1); SetFreeArgTitle(0, "<table path>", "Path to a table"); } diff --git a/ydb/public/lib/ydb_cli/commands/ydb_yql.cpp b/ydb/public/lib/ydb_cli/commands/ydb_yql.cpp index 71019621d28..0ebb7faedc2 100644 --- a/ydb/public/lib/ydb_cli/commands/ydb_yql.cpp +++ b/ydb/public/lib/ydb_cli/commands/ydb_yql.cpp @@ -32,7 +32,9 @@ void TCommandYql::Config(TConfig& config) { EOutputFormat::JsonUnicode, EOutputFormat::JsonUnicodeArray, EOutputFormat::JsonBase64, - EOutputFormat::JsonBase64Array + EOutputFormat::JsonBase64Array, + EOutputFormat::Csv, + EOutputFormat::Tsv }); CheckExamples(config); diff --git a/ydb/public/lib/ydb_cli/common/format.cpp b/ydb/public/lib/ydb_cli/common/format.cpp index 0db3a40d1f5..8f106ca378b 100644 --- a/ydb/public/lib/ydb_cli/common/format.cpp +++ b/ydb/public/lib/ydb_cli/common/format.cpp @@ -35,6 +35,7 @@ namespace { "Every row is a separate binary data on a separate line"}, { EOutputFormat::ProtoJsonBase64, "Output result protobuf in json format, binary strings are encoded with base64" }, { EOutputFormat::Csv, "Output in csv format" }, + { EOutputFormat::Tsv, "Output in tsv format" }, }; THashMap<EMessagingFormat, TString> MessagingFormatDescriptions = { @@ -336,7 +337,10 @@ void TResultSetPrinter::Print(const TResultSet& resultSet) { FormatResultSetJson(resultSet, &Cout, EBinaryStringEncoding::Base64); break; case EOutputFormat::Csv: - PrintCsv(resultSet); + PrintCsv(resultSet, ","); + break; + case EOutputFormat::Tsv: + PrintCsv(resultSet, "\t"); break; default: throw TMisuseException() << "This command doesn't support " << Format << " output format"; @@ -423,14 +427,14 @@ void TResultSetPrinter::PrintJsonArray(const TResultSet& resultSet, EBinaryStrin } } -void TResultSetPrinter::PrintCsv(const TResultSet& resultSet) { +void TResultSetPrinter::PrintCsv(const TResultSet& resultSet, const char* delim) { const TVector<TColumn>& columns = resultSet.GetColumnsMeta(); TResultSetParser parser(resultSet); while (parser.TryNextRow()) { for (ui32 i = 0; i < columns.size(); ++i) { Cout << FormatValueJson(parser.GetValue(i), EBinaryStringEncoding::Unicode); if (i < columns.size() - 1) { - Cout << ","; + Cout << delim; } } Cout << Endl; diff --git a/ydb/public/lib/ydb_cli/common/format.h b/ydb/public/lib/ydb_cli/common/format.h index bc571f7e0c5..f8fb3413cf3 100644 --- a/ydb/public/lib/ydb_cli/common/format.h +++ b/ydb/public/lib/ydb_cli/common/format.h @@ -61,7 +61,7 @@ private: void PrintPretty(const TResultSet& resultSet); void PrintJsonArray(const TResultSet& resultSet, EBinaryStringEncoding encoding); - void PrintCsv(const TResultSet& resultSet); + void PrintCsv(const TResultSet& resultSet, const char* delim); bool FirstPart = true; bool PrintedSomething = false; diff --git a/ydb/public/lib/ydb_cli/import/CMakeLists.txt b/ydb/public/lib/ydb_cli/import/CMakeLists.txt index d63a8b60fc7..d28016ad5b1 100644 --- a/ydb/public/lib/ydb_cli/import/CMakeLists.txt +++ b/ydb/public/lib/ydb_cli/import/CMakeLists.txt @@ -14,6 +14,7 @@ target_link_libraries(lib-ydb_cli-import PUBLIC api-protos common cpp-client-ydb_proto + public-lib-json_value ) target_sources(lib-ydb_cli-import PRIVATE ${CMAKE_SOURCE_DIR}/ydb/public/lib/ydb_cli/import/import.cpp diff --git a/ydb/public/lib/ydb_cli/import/import.cpp b/ydb/public/lib/ydb_cli/import/import.cpp index e9960a21f82..5843c8d0f59 100644 --- a/ydb/public/lib/ydb_cli/import/import.cpp +++ b/ydb/public/lib/ydb_cli/import/import.cpp @@ -8,6 +8,7 @@ #include <ydb/public/api/protos/ydb_table.pb.h> #include <ydb/public/api/protos/ydb_formats.pb.h> +#include <ydb/public/lib/json_value/ydb_json_value.h> #include <ydb/public/lib/ydb_cli/common/recursive_list.h> #include <ydb/public/lib/ydb_cli/dump/util/util.h> @@ -23,6 +24,7 @@ namespace NConsoleClient { namespace { +static inline TStatus MakeStatus(EStatus code = EStatus::SUCCESS, const TString& error = {}) { NYql::TIssues issues; if (error) { @@ -37,22 +39,32 @@ TImportFileClient::TImportFileClient(const TDriver& driver) : OperationClient(std::make_shared<NOperation::TOperationClient>(driver)) , SchemeClient(std::make_shared<NScheme::TSchemeClient>(driver)) , TableClient(std::make_shared<NTable::TTableClient>(driver)) -{} +{ + UpsertSettings + .OperationTimeout(TDuration::Seconds(30)) + .ClientTimeout(TDuration::Seconds(35)); + RetrySettings + .MaxRetries(10); +} TStatus TImportFileClient::Import(const TString& filePath, const TString& dbPath, const TImportFileSettings& settings) { - if (filePath.empty()) { - return MakeStatus(EStatus::BAD_REQUEST, "No file specified"); - } - - TFsPath dataFile(filePath); - if (!dataFile.Exists()) { - return MakeStatus(EStatus::BAD_REQUEST, - TStringBuilder() << "File does not exist: " << filePath); + if (! filePath.empty()) { + const TFsPath dataFile(filePath); + if (!dataFile.Exists()) { + return MakeStatus(EStatus::BAD_REQUEST, + TStringBuilder() << "File does not exist: " << filePath); + } + if (!dataFile.IsFile()) { + return MakeStatus(EStatus::BAD_REQUEST, + TStringBuilder() << "Not a file: " << filePath); + } } - if (!dataFile.IsFile()) { - return MakeStatus(EStatus::BAD_REQUEST, - TStringBuilder() << "Not a file: " << filePath); + if (settings.Format_ == EOutputFormat::Tsv) { + if (settings.Delimiter_ != "\t") { + return MakeStatus(EStatus::BAD_REQUEST, + TStringBuilder() << "Illegal delimiter for TSV format, only tab is allowed"); + } } auto result = NDump::DescribePath(*SchemeClient, dbPath).GetStatus(); @@ -61,12 +73,24 @@ TStatus TImportFileClient::Import(const TString& filePath, const TString& dbPath TStringBuilder() << "Table does not exist: " << dbPath); } - if (settings.Format_ != NTable::EDataFormat::CSV) { - return MakeStatus(EStatus::BAD_REQUEST, - TStringBuilder() << "Unsupported format"); + // If the filename passed is empty, read from stdin, else from the file. + std::unique_ptr<TFileInput> fileInput = filePath.empty() ? nullptr + : std::make_unique<TFileInput>(filePath, settings.FileBufferSize_); + IInputStream& input = fileInput ? *fileInput : Cin; + + switch (settings.Format_) { + case EOutputFormat::Default: + case EOutputFormat::Csv: + case EOutputFormat::Tsv: + return UpsertCsv(input, dbPath, settings); + case EOutputFormat::Json: + case EOutputFormat::JsonUnicode: + case EOutputFormat::JsonBase64: + return UpsertJson(input, dbPath, settings); + default: ; } - - return UpsertCsv(dataFile, dbPath, settings); + return MakeStatus(EStatus::BAD_REQUEST, + TStringBuilder() << "Unsupported format #" << (int) settings.Format_); } namespace { @@ -85,19 +109,23 @@ TStatus WaitForQueue(std::deque<TAsyncStatus>& inFlightRequests, size_t maxQueue } -TStatus TImportFileClient::UpsertCsv(const TString& dataFile, const TString& dbPath, +inline +TAsyncStatus TImportFileClient::UpsertCsvBuffer(const TString& dbPath, const TString& buffer) { + auto upsert = [this, dbPath, buffer](NYdb::NTable::TTableClient& tableClient) -> TAsyncStatus { + return tableClient.BulkUpsert(dbPath, NTable::EDataFormat::CSV, buffer, {}, UpsertSettings) + .Apply([](const NYdb::NTable::TAsyncBulkUpsertResult& bulkUpsertResult) { + NYdb::TStatus status = bulkUpsertResult.GetValueSync(); + return NThreading::MakeFuture(status); + }); + }; + return TableClient->RetryOperation(upsert, RetrySettings); +} + +TStatus TImportFileClient::UpsertCsv(IInputStream& input, const TString& dbPath, const TImportFileSettings& settings) { - TFileInput input(dataFile, settings.FileBufferSize_); TString line; TString buffer; - auto upsertSettings = NTable::TBulkUpsertSettings() - .OperationTimeout(TDuration::Seconds(30)) - .ClientTimeout(TDuration::Seconds(35)); - - auto retrySettings = NTable::TRetryOperationSettings() - .MaxRetries(10); - Ydb::Formats::CsvSettings csvSettings; bool special = false; @@ -128,7 +156,7 @@ TStatus TImportFileClient::UpsertCsv(const TString& dataFile, const TString& dbP if (special) { TString formatSettings; Y_PROTOBUF_SUPPRESS_NODISCARD csvSettings.SerializeToString(&formatSettings); - upsertSettings.FormatSettings(formatSettings); + UpsertSettings.FormatSettings(formatSettings); } std::deque<TAsyncStatus> inFlightRequests; @@ -147,31 +175,94 @@ TStatus TImportFileClient::UpsertCsv(const TString& dataFile, const TString& dbP return status; } - inFlightRequests.push_back(UpsertCsvBuffer(dbPath, buffer, {}, upsertSettings, retrySettings)); + inFlightRequests.push_back(UpsertCsvBuffer(dbPath, buffer)); buffer = headerRow; } } if (!buffer.Empty()) { - inFlightRequests.push_back(UpsertCsvBuffer(dbPath, buffer, {}, upsertSettings, retrySettings)); + inFlightRequests.push_back(UpsertCsvBuffer(dbPath, buffer)); } return WaitForQueue(inFlightRequests, 0); } -TAsyncStatus TImportFileClient::UpsertCsvBuffer(const TString& dbPath, const TString& csv, const TString& header, - const NTable::TBulkUpsertSettings& upsertSettings, - const NTable::TRetryOperationSettings& retrySettings) { - auto upsert = [dbPath, csv, header, upsertSettings](NYdb::NTable::TTableClient& tableClient) -> TAsyncStatus { - return tableClient.BulkUpsert(dbPath, NTable::EDataFormat::CSV, csv, header, upsertSettings) +inline +TAsyncStatus TImportFileClient::UpsertJsonBuffer(const TString& dbPath, TValueBuilder& builder) { + auto upsert = [this, dbPath, rows = builder.Build()] + (NYdb::NTable::TTableClient& tableClient) -> TAsyncStatus { + NYdb::TValue r = rows; + return tableClient.BulkUpsert(dbPath, std::move(r), UpsertSettings) .Apply([](const NYdb::NTable::TAsyncBulkUpsertResult& bulkUpsertResult) { NYdb::TStatus status = bulkUpsertResult.GetValueSync(); return NThreading::MakeFuture(status); }); }; + return TableClient->RetryOperation(upsert, RetrySettings); +} + +TStatus TImportFileClient::UpsertJson(IInputStream& input, const TString& dbPath, + const TImportFileSettings& settings) { + NTable::TCreateSessionResult sessionResult = TableClient->GetSession(NTable::TCreateSessionSettings()).GetValueSync(); + if (! sessionResult.IsSuccess()) + return sessionResult; + NTable::TDescribeTableResult tableResult = sessionResult.GetSession().DescribeTable(dbPath).GetValueSync(); + if (! tableResult.IsSuccess()) + return tableResult; - return TableClient->RetryOperation(upsert, retrySettings); + const TType tableType = GetTableType(tableResult.GetTableDescription()); + const NYdb::EBinaryStringEncoding stringEncoding = + (settings.Format_==EOutputFormat::JsonBase64) ? NYdb::EBinaryStringEncoding::Base64 : + NYdb::EBinaryStringEncoding::Unicode; + + std::deque<TAsyncStatus> inFlightRequests; + + size_t currentSize = 0; + size_t currentRows = 0; + auto currentBatch = std::make_unique<TValueBuilder>(); + currentBatch->BeginList(); + + TString line; + while (size_t sz = input.ReadLine(line)) { + currentBatch->AddListItem(JsonToYdbValue(line, tableType, stringEncoding)); + currentSize += line.Size(); + currentRows += 1; + + if (currentSize >= settings.BytesPerRequest_) { + currentBatch->EndList(); + + auto status = WaitForQueue(inFlightRequests, settings.MaxInFlightRequests_); + if (!status.IsSuccess()) { + return status; + } + + inFlightRequests.push_back(UpsertJsonBuffer(dbPath, *currentBatch)); + + currentBatch = std::make_unique<TValueBuilder>(); + currentBatch->BeginList(); + currentSize = 0; + currentRows = 0; + } + } + + if (currentRows > 0) { + currentBatch->EndList(); + inFlightRequests.push_back(UpsertJsonBuffer(dbPath, *currentBatch)); + } + + return WaitForQueue(inFlightRequests, 0); +} + +TType TImportFileClient::GetTableType(const NTable::TTableDescription& tableDescription) { + TTypeBuilder typeBuilder; + typeBuilder.BeginStruct(); + const auto& columns = tableDescription.GetTableColumns(); + for (auto it = columns.begin(); it!=columns.end(); it++) { + typeBuilder.AddMember((*it).Name, (*it).Type); + } + typeBuilder.EndStruct(); + return typeBuilder.Build(); } } diff --git a/ydb/public/lib/ydb_cli/import/import.h b/ydb/public/lib/ydb_cli/import/import.h index 66269e8e6c1..8b20d66517c 100644 --- a/ydb/public/lib/ydb_cli/import/import.h +++ b/ydb/public/lib/ydb_cli/import/import.h @@ -1,5 +1,6 @@ #pragma once +#include <ydb/public/lib/ydb_cli/common/formats.h> #include <ydb/public/sdk/cpp/client/ydb_types/status/status.h> #include <ydb/public/sdk/cpp/client/ydb_types/fluent_settings_helpers.h> #include <ydb/public/sdk/cpp/client/ydb_table/table.h> @@ -31,10 +32,12 @@ struct TImportFileSettings : public TOperationRequestSettings<TImportFileSetting static constexpr ui64 MaxBytesPerRequest = 8_MB; static constexpr const char * DefaultDelimiter = ","; - FLUENT_SETTING_DEFAULT(NTable::EDataFormat, Format, NTable::EDataFormat::CSV); + // Allowed values: Csv, Tsv, JsonUnicode, JsonBase64. Default means Csv + FLUENT_SETTING_DEFAULT(EOutputFormat, Format, EOutputFormat::Default); FLUENT_SETTING_DEFAULT(ui64, BytesPerRequest, 1_MB); FLUENT_SETTING_DEFAULT(ui64, FileBufferSize, 2_MB); FLUENT_SETTING_DEFAULT(ui64, MaxInFlightRequests, 100); + // Settings below are for CSV format only FLUENT_SETTING_DEFAULT(ui32, SkipRows, 0); FLUENT_SETTING_DEFAULT(bool, Header, false); FLUENT_SETTING_DEFAULT(TString, Delimiter, DefaultDelimiter); @@ -44,7 +47,12 @@ struct TImportFileSettings : public TOperationRequestSettings<TImportFileSetting class TImportFileClient { public: explicit TImportFileClient(const TDriver& driver); + TImportFileClient(const TImportFileClient&) = delete; + // Ingest data from the input file to the database table. + // fsPath: path to input file, stdin if empty + // dbPath: full path to the database table, including the database path + // settings: input data format and operational settings TStatus Import(const TString& fsPath, const TString& dbPath, const TImportFileSettings& settings = {}); private: @@ -52,10 +60,15 @@ private: std::shared_ptr<NScheme::TSchemeClient> SchemeClient; std::shared_ptr<NTable::TTableClient> TableClient; - TStatus UpsertCsv(const TString& dataFile, const TString& dbPath, const TImportFileSettings& settings); - TAsyncStatus UpsertCsvBuffer(const TString& dbPath, const TString& csv, const TString& header, - const NTable::TBulkUpsertSettings& upsertSettings, - const NTable::TRetryOperationSettings& retrySettings); + NTable::TBulkUpsertSettings UpsertSettings; + NTable::TRetryOperationSettings RetrySettings; + + TStatus UpsertCsv(IInputStream& input, const TString& dbPath, const TImportFileSettings& settings); + TAsyncStatus UpsertCsvBuffer(const TString& dbPath, const TString& buffer); + + TStatus UpsertJson(IInputStream& input, const TString& dbPath, const TImportFileSettings& settings); + TAsyncStatus UpsertJsonBuffer(const TString& dbPath, TValueBuilder& builder); + TType GetTableType(const NTable::TTableDescription& tableDescription); }; } diff --git a/ydb/tests/functional/ydb_cli/canondata/result.json b/ydb/tests/functional/ydb_cli/canondata/result.json index 63885584756..57776dab145 100644 --- a/ydb/tests/functional/ydb_cli/canondata/result.json +++ b/ydb/tests/functional/ydb_cli/canondata/result.json @@ -1,4 +1,13 @@ { + "test_ydb_impex.TestImpex.test_format_csv": { + "uri": "file://test_ydb_impex.TestImpex.test_format_csv/result.output" + }, + "test_ydb_impex.TestImpex.test_format_json": { + "uri": "file://test_ydb_impex.TestImpex.test_format_json/result.output" + }, + "test_ydb_impex.TestImpex.test_format_tsv": { + "uri": "file://test_ydb_impex.TestImpex.test_format_tsv/result.output" + }, "test_ydb_scripting.TestExecuteScriptWithFormats.test_stream_yql_script_json_base64": { "uri": "file://test_ydb_scripting.TestExecuteScriptWithFormats.test_stream_yql_script_json_base64/result.output" }, @@ -47,6 +56,9 @@ "test_ydb_scripting.TestScriptingServiceHelp.test_help_ex": { "uri": "file://test_ydb_scripting.TestScriptingServiceHelp.test_help_ex/result.output" }, + "test_ydb_table.TestExecuteQueryWithFormats.test_data_query_csv": { + "uri": "file://test_ydb_table.TestExecuteQueryWithFormats.test_data_query_csv/result.output" + }, "test_ydb_table.TestExecuteQueryWithFormats.test_data_query_json_base64": { "uri": "file://test_ydb_table.TestExecuteQueryWithFormats.test_data_query_json_base64/result.output" }, @@ -62,6 +74,12 @@ "test_ydb_table.TestExecuteQueryWithFormats.test_data_query_pretty": { "uri": "file://test_ydb_table.TestExecuteQueryWithFormats.test_data_query_pretty/result.output" }, + "test_ydb_table.TestExecuteQueryWithFormats.test_data_query_tsv": { + "uri": "file://test_ydb_table.TestExecuteQueryWithFormats.test_data_query_tsv/result.output" + }, + "test_ydb_table.TestExecuteQueryWithFormats.test_read_table_csv": { + "uri": "file://test_ydb_table.TestExecuteQueryWithFormats.test_read_table_csv/result.output" + }, "test_ydb_table.TestExecuteQueryWithFormats.test_read_table_json_base64": { "uri": "file://test_ydb_table.TestExecuteQueryWithFormats.test_read_table_json_base64/result.output" }, @@ -77,6 +95,12 @@ "test_ydb_table.TestExecuteQueryWithFormats.test_read_table_pretty": { "uri": "file://test_ydb_table.TestExecuteQueryWithFormats.test_read_table_pretty/result.output" }, + "test_ydb_table.TestExecuteQueryWithFormats.test_read_table_tsv": { + "uri": "file://test_ydb_table.TestExecuteQueryWithFormats.test_read_table_tsv/result.output" + }, + "test_ydb_table.TestExecuteQueryWithFormats.test_scan_query_csv": { + "uri": "file://test_ydb_table.TestExecuteQueryWithFormats.test_scan_query_csv/result.output" + }, "test_ydb_table.TestExecuteQueryWithFormats.test_scan_query_json_base64": { "uri": "file://test_ydb_table.TestExecuteQueryWithFormats.test_scan_query_json_base64/result.output" }, @@ -92,6 +116,9 @@ "test_ydb_table.TestExecuteQueryWithFormats.test_scan_query_pretty": { "uri": "file://test_ydb_table.TestExecuteQueryWithFormats.test_scan_query_pretty/result.output" }, + "test_ydb_table.TestExecuteQueryWithFormats.test_scan_query_tsv": { + "uri": "file://test_ydb_table.TestExecuteQueryWithFormats.test_scan_query_tsv/result.output" + }, "test_ydb_table.TestExecuteQueryWithParams.test_list": { "uri": "file://test_ydb_table.TestExecuteQueryWithParams.test_list/result.output" }, diff --git a/ydb/tests/functional/ydb_cli/canondata/test_ydb_impex.TestImpex.test_format_csv/result.output b/ydb/tests/functional/ydb_cli/canondata/test_ydb_impex.TestImpex.test_format_csv/result.output new file mode 100644 index 00000000000..d900a075354 --- /dev/null +++ b/ydb/tests/functional/ydb_cli/canondata/test_ydb_impex.TestImpex.test_format_csv/result.output @@ -0,0 +1,5 @@ +1,1111,"one" +2,2222,"two" +3,3333,"three" +5,5555,"five" +7,7777,"seven" diff --git a/ydb/tests/functional/ydb_cli/canondata/test_ydb_impex.TestImpex.test_format_json/result.output b/ydb/tests/functional/ydb_cli/canondata/test_ydb_impex.TestImpex.test_format_json/result.output new file mode 100644 index 00000000000..cead26df793 --- /dev/null +++ b/ydb/tests/functional/ydb_cli/canondata/test_ydb_impex.TestImpex.test_format_json/result.output @@ -0,0 +1,5 @@ +{"key":1,"id":1111,"value":"one"} +{"key":2,"id":2222,"value":"two"} +{"key":3,"id":3333,"value":"three"} +{"key":5,"id":5555,"value":"five"} +{"key":7,"id":7777,"value":"seven"} diff --git a/ydb/tests/functional/ydb_cli/canondata/test_ydb_impex.TestImpex.test_format_tsv/result.output b/ydb/tests/functional/ydb_cli/canondata/test_ydb_impex.TestImpex.test_format_tsv/result.output new file mode 100644 index 00000000000..ebe43b48d1c --- /dev/null +++ b/ydb/tests/functional/ydb_cli/canondata/test_ydb_impex.TestImpex.test_format_tsv/result.output @@ -0,0 +1,5 @@ +1 1111 "one" +2 2222 "two" +3 3333 "three" +5 5555 "five" +7 7777 "seven" diff --git a/ydb/tests/functional/ydb_cli/canondata/test_ydb_table.TestExecuteQueryWithFormats.test_data_query_csv/result.output b/ydb/tests/functional/ydb_cli/canondata/test_ydb_table.TestExecuteQueryWithFormats.test_data_query_csv/result.output new file mode 100644 index 00000000000..06ac255958b --- /dev/null +++ b/ydb/tests/functional/ydb_cli/canondata/test_ydb_table.TestExecuteQueryWithFormats.test_data_query_csv/result.output @@ -0,0 +1,3 @@ +1111,1,"one" +2222,2,"two" +3333,3,"three" diff --git a/ydb/tests/functional/ydb_cli/canondata/test_ydb_table.TestExecuteQueryWithFormats.test_data_query_tsv/result.output b/ydb/tests/functional/ydb_cli/canondata/test_ydb_table.TestExecuteQueryWithFormats.test_data_query_tsv/result.output new file mode 100644 index 00000000000..9f09d15f010 --- /dev/null +++ b/ydb/tests/functional/ydb_cli/canondata/test_ydb_table.TestExecuteQueryWithFormats.test_data_query_tsv/result.output @@ -0,0 +1,3 @@ +1111 1 "one" +2222 2 "two" +3333 3 "three" diff --git a/ydb/tests/functional/ydb_cli/canondata/test_ydb_table.TestExecuteQueryWithFormats.test_read_table_csv/result.output b/ydb/tests/functional/ydb_cli/canondata/test_ydb_table.TestExecuteQueryWithFormats.test_read_table_csv/result.output new file mode 100644 index 00000000000..d900a075354 --- /dev/null +++ b/ydb/tests/functional/ydb_cli/canondata/test_ydb_table.TestExecuteQueryWithFormats.test_read_table_csv/result.output @@ -0,0 +1,5 @@ +1,1111,"one" +2,2222,"two" +3,3333,"three" +5,5555,"five" +7,7777,"seven" diff --git a/ydb/tests/functional/ydb_cli/canondata/test_ydb_table.TestExecuteQueryWithFormats.test_read_table_tsv/result.output b/ydb/tests/functional/ydb_cli/canondata/test_ydb_table.TestExecuteQueryWithFormats.test_read_table_tsv/result.output new file mode 100644 index 00000000000..ebe43b48d1c --- /dev/null +++ b/ydb/tests/functional/ydb_cli/canondata/test_ydb_table.TestExecuteQueryWithFormats.test_read_table_tsv/result.output @@ -0,0 +1,5 @@ +1 1111 "one" +2 2222 "two" +3 3333 "three" +5 5555 "five" +7 7777 "seven" diff --git a/ydb/tests/functional/ydb_cli/canondata/test_ydb_table.TestExecuteQueryWithFormats.test_scan_query_csv/result.output b/ydb/tests/functional/ydb_cli/canondata/test_ydb_table.TestExecuteQueryWithFormats.test_scan_query_csv/result.output new file mode 100644 index 00000000000..06ac255958b --- /dev/null +++ b/ydb/tests/functional/ydb_cli/canondata/test_ydb_table.TestExecuteQueryWithFormats.test_scan_query_csv/result.output @@ -0,0 +1,3 @@ +1111,1,"one" +2222,2,"two" +3333,3,"three" diff --git a/ydb/tests/functional/ydb_cli/canondata/test_ydb_table.TestExecuteQueryWithFormats.test_scan_query_tsv/result.output b/ydb/tests/functional/ydb_cli/canondata/test_ydb_table.TestExecuteQueryWithFormats.test_scan_query_tsv/result.output new file mode 100644 index 00000000000..9f09d15f010 --- /dev/null +++ b/ydb/tests/functional/ydb_cli/canondata/test_ydb_table.TestExecuteQueryWithFormats.test_scan_query_tsv/result.output @@ -0,0 +1,3 @@ +1111 1 "one" +2222 2 "two" +3333 3 "three" diff --git a/ydb/tests/functional/ydb_cli/test_ydb_impex.py b/ydb/tests/functional/ydb_cli/test_ydb_impex.py new file mode 100644 index 00000000000..e8c41f056a7 --- /dev/null +++ b/ydb/tests/functional/ydb_cli/test_ydb_impex.py @@ -0,0 +1,129 @@ +# -*- coding: utf-8 -*- + +from ydb.tests.library.common import yatest_common +from ydb.tests.library.harness.kikimr_cluster import kikimr_cluster_factory +import ydb +import logging + + +logger = logging.getLogger(__name__) + + +DATA_CSV = """key,id,value +1,1111,"one" +2,2222,"two" +3,3333,"three" +5,5555,"five" +7,7777,"seven" +""" + +DATA_TSV = DATA_CSV.replace(',', '\t') + + +DATA_JSON = """{"key":1,"id":1111,"value":"one"} +{"key":2,"id":2222,"value":"two"} +{"key":3,"id":3333,"value":"three"} +{"key":5,"id":5555,"value":"five"} +{"key":7,"id":7777,"value":"seven"} +""" + + +def ydb_bin(): + return yatest_common.binary_path("ydb/apps/ydb/ydb") + + +def create_table(session, path): + session.create_table( + path, + ydb.TableDescription() + .with_column(ydb.Column("key", ydb.OptionalType(ydb.PrimitiveType.Uint32))) + .with_column(ydb.Column("id", ydb.OptionalType(ydb.PrimitiveType.Uint64))) + .with_column(ydb.Column("value", ydb.OptionalType(ydb.PrimitiveType.Utf8))) + .with_primary_keys("key") + ) + + +class BaseTestTableService(object): + @classmethod + def setup_class(cls): + cls.cluster = kikimr_cluster_factory() + cls.cluster.start() + cls.root_dir = "/Root" + driver_config = ydb.DriverConfig( + database="/Root", + endpoint="%s:%s" % (cls.cluster.nodes[1].host, cls.cluster.nodes[1].port)) + cls.driver = ydb.Driver(driver_config) + cls.driver.wait(timeout=4) + + @classmethod + def teardown_class(cls): + if hasattr(cls, 'cluster'): + cls.cluster.stop() + + @classmethod + def execute_ydb_cli_command(cls, args): + execution = yatest_common.execute( + [ + ydb_bin(), + "--endpoint", "grpc://localhost:%d" % cls.cluster.nodes[1].grpc_port, + "--database", cls.root_dir + ] + + args + ) + + result = execution.std_out + logger.debug("std_out:\n" + result.decode('utf-8')) + return result + + @staticmethod + def canonical_result(output_result): + output_file_name = "result.output" + with open(output_file_name, "w") as f: + f.write(output_result.decode('utf-8')) + return yatest_common.canonical_file(output_file_name, local=True, universal_lines=True) + + +class TestImpex(BaseTestTableService): + + @classmethod + def setup_class(cls): + BaseTestTableService.setup_class() + + session = cls.driver.table_client.session().create() + cls.table_path = cls.root_dir + "/impex_table" + create_table(session, cls.table_path) + + def clear_table(self): + query = "DELETE FROM `{}`".format(self.table_path) + self.execute_ydb_cli_command(["yql", "-s", query]) + + def run_import_csv(self, ftype, data): + self.clear_table() + with open("tempinput.dat", "w") as f: + f.writelines(data) + output = self.execute_ydb_cli_command(["import", "file", ftype, "-p", self.table_path, "-i", "tempinput.dat", "--header"]) + return self.canonical_result(output) + + def run_import_json(self, data): + self.clear_table() + with open("tempinput.dat", "w") as f: + f.writelines(data) + output = self.execute_ydb_cli_command(["import", "file", "json", "-p", self.table_path, "-i", "tempinput.dat"]) + return self.canonical_result(output) + + def run_export(self, format): + query = "SELECT `key`, `id`, `value` FROM `{}` ORDER BY `key`".format(self.table_path) + output = self.execute_ydb_cli_command(["table", "query", "execute", "-q", query, "-t", "scan", "--format", format]) + return self.canonical_result(output) + + def test_format_csv(self): + self.run_import_csv("csv", DATA_CSV) + return self.run_export("csv") + + def test_format_tsv(self): + self.run_import_csv("tsv", DATA_TSV) + return self.run_export("tsv") + + def test_format_json(self): + self.run_import_json(DATA_JSON) + return self.run_export("json-unicode") diff --git a/ydb/tests/functional/ydb_cli/test_ydb_table.py b/ydb/tests/functional/ydb_cli/test_ydb_table.py index 148d33ede23..545056d313b 100644 --- a/ydb/tests/functional/ydb_cli/test_ydb_table.py +++ b/ydb/tests/functional/ydb_cli/test_ydb_table.py @@ -173,6 +173,12 @@ class TestExecuteQueryWithFormats(BaseTestTableService): def test_data_query_json_unicode_array(self): return self.execute_data_query('json-unicode-array') + def test_data_query_csv(self): + return self.execute_data_query('csv') + + def test_data_query_tsv(self): + return self.execute_data_query('tsv') + # ScanQuery def test_scan_query_pretty(self): @@ -190,6 +196,12 @@ class TestExecuteQueryWithFormats(BaseTestTableService): def test_scan_query_json_unicode_array(self): return self.execute_scan_query('json-unicode-array') + def test_scan_query_csv(self): + return self.execute_scan_query('csv') + + def test_scan_query_tsv(self): + return self.execute_scan_query('tsv') + # ReadTable def test_read_table_pretty(self): @@ -206,3 +218,9 @@ class TestExecuteQueryWithFormats(BaseTestTableService): def test_read_table_json_unicode_array(self): return self.execute_read_table('json-unicode-array') + + def test_read_table_csv(self): + return self.execute_read_table('csv') + + def test_read_table_tsv(self): + return self.execute_read_table('tsv') |