aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authormzinal <mzinal@yandex-team.com>2022-09-19 15:00:32 +0300
committermzinal <mzinal@yandex-team.com>2022-09-19 15:00:32 +0300
commitcdb11b6f2fcb2acb6d449384db45dfd89013c07f (patch)
tree1ca28bb743bf631d5703e6bfcb34c379b2dce79f
parent67395a78c86cf26222cf9d3f410c4feb09453ffe (diff)
downloadydb-cdb11b6f2fcb2acb6d449384db45dfd89013c07f.tar.gz
PR from branch users/mzinal/
Data import and export enhancements for YDB CLI
-rw-r--r--ydb/public/lib/ydb_cli/commands/ydb_service_import.cpp105
-rw-r--r--ydb/public/lib/ydb_cli/commands/ydb_service_import.h54
-rw-r--r--ydb/public/lib/ydb_cli/commands/ydb_service_table.cpp12
-rw-r--r--ydb/public/lib/ydb_cli/commands/ydb_yql.cpp4
-rw-r--r--ydb/public/lib/ydb_cli/common/format.cpp10
-rw-r--r--ydb/public/lib/ydb_cli/common/format.h2
-rw-r--r--ydb/public/lib/ydb_cli/import/CMakeLists.txt1
-rw-r--r--ydb/public/lib/ydb_cli/import/import.cpp161
-rw-r--r--ydb/public/lib/ydb_cli/import/import.h23
-rw-r--r--ydb/tests/functional/ydb_cli/canondata/result.json27
-rw-r--r--ydb/tests/functional/ydb_cli/canondata/test_ydb_impex.TestImpex.test_format_csv/result.output5
-rw-r--r--ydb/tests/functional/ydb_cli/canondata/test_ydb_impex.TestImpex.test_format_json/result.output5
-rw-r--r--ydb/tests/functional/ydb_cli/canondata/test_ydb_impex.TestImpex.test_format_tsv/result.output5
-rw-r--r--ydb/tests/functional/ydb_cli/canondata/test_ydb_table.TestExecuteQueryWithFormats.test_data_query_csv/result.output3
-rw-r--r--ydb/tests/functional/ydb_cli/canondata/test_ydb_table.TestExecuteQueryWithFormats.test_data_query_tsv/result.output3
-rw-r--r--ydb/tests/functional/ydb_cli/canondata/test_ydb_table.TestExecuteQueryWithFormats.test_read_table_csv/result.output5
-rw-r--r--ydb/tests/functional/ydb_cli/canondata/test_ydb_table.TestExecuteQueryWithFormats.test_read_table_tsv/result.output5
-rw-r--r--ydb/tests/functional/ydb_cli/canondata/test_ydb_table.TestExecuteQueryWithFormats.test_scan_query_csv/result.output3
-rw-r--r--ydb/tests/functional/ydb_cli/canondata/test_ydb_table.TestExecuteQueryWithFormats.test_scan_query_tsv/result.output3
-rw-r--r--ydb/tests/functional/ydb_cli/test_ydb_impex.py129
-rw-r--r--ydb/tests/functional/ydb_cli/test_ydb_table.py18
21 files changed, 483 insertions, 100 deletions
diff --git a/ydb/public/lib/ydb_cli/commands/ydb_service_import.cpp b/ydb/public/lib/ydb_cli/commands/ydb_service_import.cpp
index 846383dd20b..4a74452c628 100644
--- a/ydb/public/lib/ydb_cli/commands/ydb_service_import.cpp
+++ b/ydb/public/lib/ydb_cli/commands/ydb_service_import.cpp
@@ -130,58 +130,35 @@ TCommandImportFromFile::TCommandImportFromFile()
{
AddCommand(std::make_unique<TCommandImportFromCsv>());
AddCommand(std::make_unique<TCommandImportFromTsv>());
+ AddCommand(std::make_unique<TCommandImportFromJson>());
}
-/// CSV
+/// Import File Shared Config
-TCommandImportFromCsv::TCommandImportFromCsv(const TString& cmd, const TString& cmdDescription)
- : TYdbCommand(cmd, {}, cmdDescription)
-{}
-
-void TCommandImportFromCsv::Config(TConfig& config) {
+void TCommandImportFileBase::Config(TConfig& config) {
TYdbCommand::Config(config);
config.SetFreeArgsNum(0);
config.Opts->AddLongOption('p', "path", "Database path to table")
.Required().RequiredArgument("STRING").StoreResult(&Path);
- config.Opts->AddLongOption("input-file", "Path to file to import in a local filesystem")
- .Required().RequiredArgument("STRING").StoreResult(&FilePath);
- config.Opts->AddLongOption("skip-rows",
- "Number of header rows to skip (not including the row of column names, if any)")
- .RequiredArgument("NUM").StoreResult(&SkipRows).DefaultValue(SkipRows);
- config.Opts->AddLongOption("header",
- "Set if file contains column names at the first not skipped row")
- .StoreTrue(&Header);
- if (InputFormat == EOutputFormat::Csv) {
- config.Opts->AddLongOption("delimiter", "Field delimiter in rows")
- .RequiredArgument("STRING").StoreResult(&Delimiter).DefaultValue(Delimiter);
- }
- config.Opts->AddLongOption("null-value", "Value that would be interpreted as NULL")
- .RequiredArgument("STRING").StoreResult(&NullValue).DefaultValue(NullValue);
- // TODO: quoting/quote_char
- TImportFileSettings defaults;
+ config.Opts->AddLongOption('i', "input-file",
+ "Path to file to be imported, standard input if empty or not specified")
+ .StoreResult(&FilePath).DefaultValue(FilePath);
+ const TImportFileSettings defaults;
config.Opts->AddLongOption("batch-bytes",
"Use portions of this size in bytes to parse and upload file data")
.DefaultValue(HumanReadableSize(defaults.BytesPerRequest_, SF_BYTES)).StoreResult(&BytesPerRequest);
-
config.Opts->AddLongOption("max-in-flight",
"Maximum number of in-flight requests; increase to load big files faster (more memory needed)")
.DefaultValue(defaults.MaxInFlightRequests_).StoreResult(&MaxInFlightRequests);
}
-void TCommandImportFromCsv::Parse(TConfig& config) {
- TClientCommand::Parse(config);
+void TCommandImportFileBase::Parse(TConfig& config) {
+ TYdbCommand::Parse(config);
AdjustPath(config);
-}
-
-int TCommandImportFromCsv::Run(TConfig& config) {
- TImportFileSettings settings;
- settings.SkipRows(SkipRows);
- settings.Header(Header);
- settings.NullValue(NullValue);
if (auto bytesPerRequest = NYdb::SizeFromString(BytesPerRequest)) {
if (bytesPerRequest > TImportFileSettings::MaxBytesPerRequest) {
@@ -189,14 +166,42 @@ int TCommandImportFromCsv::Run(TConfig& config) {
<< "--batch-bytes cannot be larger than "
<< HumanReadableSize(TImportFileSettings::MaxBytesPerRequest, SF_BYTES);
}
-
- settings.BytesPerRequest(bytesPerRequest);
}
if (MaxInFlightRequests == 0) {
- MaxInFlightRequests = 1;
+ throw TMisuseException()
+ << "--max-in-flight must be greater than zero";
+ }
+}
+
+/// Import CSV
+
+void TCommandImportFromCsv::Config(TConfig& config) {
+ TCommandImportFileBase::Config(config);
+
+ config.Opts->AddLongOption("skip-rows",
+ "Number of header rows to skip (not including the row of column names, if any)")
+ .RequiredArgument("NUM").StoreResult(&SkipRows).DefaultValue(SkipRows);
+ config.Opts->AddLongOption("header",
+ "Set if file contains column names at the first not skipped row")
+ .StoreTrue(&Header);
+ if (InputFormat == EOutputFormat::Csv) {
+ config.Opts->AddLongOption("delimiter", "Field delimiter in rows")
+ .RequiredArgument("STRING").StoreResult(&Delimiter).DefaultValue(Delimiter);
}
+ config.Opts->AddLongOption("null-value", "Value that would be interpreted as NULL")
+ .RequiredArgument("STRING").StoreResult(&NullValue).DefaultValue(NullValue);
+ // TODO: quoting/quote_char
+}
+
+int TCommandImportFromCsv::Run(TConfig& config) {
+ TImportFileSettings settings;
+ settings.Format(InputFormat);
settings.MaxInFlightRequests(MaxInFlightRequests);
+ settings.BytesPerRequest(NYdb::SizeFromString(BytesPerRequest));
+ settings.SkipRows(SkipRows);
+ settings.Header(Header);
+ settings.NullValue(NullValue);
if (Delimiter.size() != 1) {
throw TMisuseException()
@@ -211,5 +216,35 @@ int TCommandImportFromCsv::Run(TConfig& config) {
return EXIT_SUCCESS;
}
+
+/// Import JSON
+
+void TCommandImportFromJson::Config(TConfig& config) {
+ TCommandImportFileBase::Config(config);
+
+ AddInputFormats(config, {
+ EOutputFormat::JsonUnicode,
+ EOutputFormat::JsonBase64
+ });
+}
+
+void TCommandImportFromJson::Parse(TConfig& config) {
+ TCommandImportFileBase::Parse(config);
+
+ ParseFormats();
+}
+
+int TCommandImportFromJson::Run(TConfig& config) {
+ TImportFileSettings settings;
+ settings.Format(InputFormat);
+ settings.MaxInFlightRequests(MaxInFlightRequests);
+ settings.BytesPerRequest(NYdb::SizeFromString(BytesPerRequest));
+
+ TImportFileClient client(CreateDriver(config));
+ ThrowOnError(client.Import(FilePath, Path, settings));
+
+ return EXIT_SUCCESS;
+}
+
}
}
diff --git a/ydb/public/lib/ydb_cli/commands/ydb_service_import.h b/ydb/public/lib/ydb_cli/commands/ydb_service_import.h
index 04dc1010476..af304b92a63 100644
--- a/ydb/public/lib/ydb_cli/commands/ydb_service_import.h
+++ b/ydb/public/lib/ydb_cli/commands/ydb_service_import.h
@@ -22,9 +22,9 @@ class TCommandImportFromS3 : public TYdbOperationCommand,
public TCommandWithFormat {
public:
TCommandImportFromS3();
- virtual void Config(TConfig& config) override;
- virtual void Parse(TConfig& config) override;
- virtual int Run(TConfig& config) override;
+ void Config(TConfig& config) override;
+ void Parse(TConfig& config) override;
+ int Run(TConfig& config) override;
private:
struct TItemFields {
@@ -46,23 +46,37 @@ public:
TCommandImportFromFile();
};
-class TCommandImportFromCsv : public TYdbCommand,
- public TCommandWithPath {
+class TCommandImportFileBase : public TYdbCommand,
+ public TCommandWithPath, public TCommandWithFormat {
public:
- TCommandImportFromCsv(const TString& cmd = "csv", const TString& cmdDescription = "Import data from CSV file");
- virtual void Config(TConfig& config) override;
- virtual void Parse(TConfig& config) override;
- virtual int Run(TConfig& config) override;
+ TCommandImportFileBase(const TString& cmd, const TString& cmdDescription)
+ : TYdbCommand(cmd, {}, cmdDescription)
+ {}
+ void Config(TConfig& config) override;
+ void Parse(TConfig& config) override;
protected:
- EOutputFormat InputFormat = EOutputFormat::Csv;
- TString FilePath;
- TString Delimiter = ",";
+ TString FilePath; // Read from stdin if the file path is empty
+ TString BytesPerRequest;
+ ui64 MaxInFlightRequests = 1;
+};
+
+class TCommandImportFromCsv : public TCommandImportFileBase {
+public:
+ TCommandImportFromCsv(const TString& cmd = "csv", const TString& cmdDescription = "Import data from CSV file")
+ : TCommandImportFileBase(cmd, cmdDescription)
+ {
+ InputFormat = EOutputFormat::Csv;
+ Delimiter = ",";
+ }
+ void Config(TConfig& config) override;
+ int Run(TConfig& config) override;
+
+protected:
+ TString Delimiter;
TString NullValue;
ui32 SkipRows = 0;
bool Header = false;
- TString BytesPerRequest;
- ui64 MaxInFlightRequests = 1;
};
class TCommandImportFromTsv : public TCommandImportFromCsv {
@@ -75,5 +89,17 @@ public:
}
};
+class TCommandImportFromJson : public TCommandImportFileBase {
+public:
+ TCommandImportFromJson()
+ : TCommandImportFileBase("json", "Import data from JSON file")
+ {
+ InputFormat = EOutputFormat::JsonUnicode;
+ }
+ void Config(TConfig& config) override;
+ void Parse(TConfig& config) override;
+ int Run(TConfig& config) override;
+};
+
}
}
diff --git a/ydb/public/lib/ydb_cli/commands/ydb_service_table.cpp b/ydb/public/lib/ydb_cli/commands/ydb_service_table.cpp
index 0746da4fb4e..a9648edf1bc 100644
--- a/ydb/public/lib/ydb_cli/commands/ydb_service_table.cpp
+++ b/ydb/public/lib/ydb_cli/commands/ydb_service_table.cpp
@@ -362,13 +362,14 @@ void TCommandExecuteQuery::Config(TConfig& config) {
EOutputFormat::JsonBase64
});
-
AddFormats(config, {
EOutputFormat::Pretty,
EOutputFormat::JsonUnicode,
EOutputFormat::JsonUnicodeArray,
EOutputFormat::JsonBase64,
- EOutputFormat::JsonBase64Array
+ EOutputFormat::JsonBase64Array,
+ EOutputFormat::Csv,
+ EOutputFormat::Tsv
});
CheckExamples(config);
@@ -699,12 +700,11 @@ void TCommandReadTable::Config(TConfig& config) {
EOutputFormat::JsonUnicode,
EOutputFormat::JsonUnicodeArray,
EOutputFormat::JsonBase64,
- EOutputFormat::JsonBase64Array
+ EOutputFormat::JsonBase64Array,
+ EOutputFormat::Csv,
+ EOutputFormat::Tsv
});
- // TODO: KIKIMR-8675
- // Add csv format
-
config.SetFreeArgsNum(1);
SetFreeArgTitle(0, "<table path>", "Path to a table");
}
diff --git a/ydb/public/lib/ydb_cli/commands/ydb_yql.cpp b/ydb/public/lib/ydb_cli/commands/ydb_yql.cpp
index 71019621d28..0ebb7faedc2 100644
--- a/ydb/public/lib/ydb_cli/commands/ydb_yql.cpp
+++ b/ydb/public/lib/ydb_cli/commands/ydb_yql.cpp
@@ -32,7 +32,9 @@ void TCommandYql::Config(TConfig& config) {
EOutputFormat::JsonUnicode,
EOutputFormat::JsonUnicodeArray,
EOutputFormat::JsonBase64,
- EOutputFormat::JsonBase64Array
+ EOutputFormat::JsonBase64Array,
+ EOutputFormat::Csv,
+ EOutputFormat::Tsv
});
CheckExamples(config);
diff --git a/ydb/public/lib/ydb_cli/common/format.cpp b/ydb/public/lib/ydb_cli/common/format.cpp
index 0db3a40d1f5..8f106ca378b 100644
--- a/ydb/public/lib/ydb_cli/common/format.cpp
+++ b/ydb/public/lib/ydb_cli/common/format.cpp
@@ -35,6 +35,7 @@ namespace {
"Every row is a separate binary data on a separate line"},
{ EOutputFormat::ProtoJsonBase64, "Output result protobuf in json format, binary strings are encoded with base64" },
{ EOutputFormat::Csv, "Output in csv format" },
+ { EOutputFormat::Tsv, "Output in tsv format" },
};
THashMap<EMessagingFormat, TString> MessagingFormatDescriptions = {
@@ -336,7 +337,10 @@ void TResultSetPrinter::Print(const TResultSet& resultSet) {
FormatResultSetJson(resultSet, &Cout, EBinaryStringEncoding::Base64);
break;
case EOutputFormat::Csv:
- PrintCsv(resultSet);
+ PrintCsv(resultSet, ",");
+ break;
+ case EOutputFormat::Tsv:
+ PrintCsv(resultSet, "\t");
break;
default:
throw TMisuseException() << "This command doesn't support " << Format << " output format";
@@ -423,14 +427,14 @@ void TResultSetPrinter::PrintJsonArray(const TResultSet& resultSet, EBinaryStrin
}
}
-void TResultSetPrinter::PrintCsv(const TResultSet& resultSet) {
+void TResultSetPrinter::PrintCsv(const TResultSet& resultSet, const char* delim) {
const TVector<TColumn>& columns = resultSet.GetColumnsMeta();
TResultSetParser parser(resultSet);
while (parser.TryNextRow()) {
for (ui32 i = 0; i < columns.size(); ++i) {
Cout << FormatValueJson(parser.GetValue(i), EBinaryStringEncoding::Unicode);
if (i < columns.size() - 1) {
- Cout << ",";
+ Cout << delim;
}
}
Cout << Endl;
diff --git a/ydb/public/lib/ydb_cli/common/format.h b/ydb/public/lib/ydb_cli/common/format.h
index bc571f7e0c5..f8fb3413cf3 100644
--- a/ydb/public/lib/ydb_cli/common/format.h
+++ b/ydb/public/lib/ydb_cli/common/format.h
@@ -61,7 +61,7 @@ private:
void PrintPretty(const TResultSet& resultSet);
void PrintJsonArray(const TResultSet& resultSet, EBinaryStringEncoding encoding);
- void PrintCsv(const TResultSet& resultSet);
+ void PrintCsv(const TResultSet& resultSet, const char* delim);
bool FirstPart = true;
bool PrintedSomething = false;
diff --git a/ydb/public/lib/ydb_cli/import/CMakeLists.txt b/ydb/public/lib/ydb_cli/import/CMakeLists.txt
index d63a8b60fc7..d28016ad5b1 100644
--- a/ydb/public/lib/ydb_cli/import/CMakeLists.txt
+++ b/ydb/public/lib/ydb_cli/import/CMakeLists.txt
@@ -14,6 +14,7 @@ target_link_libraries(lib-ydb_cli-import PUBLIC
api-protos
common
cpp-client-ydb_proto
+ public-lib-json_value
)
target_sources(lib-ydb_cli-import PRIVATE
${CMAKE_SOURCE_DIR}/ydb/public/lib/ydb_cli/import/import.cpp
diff --git a/ydb/public/lib/ydb_cli/import/import.cpp b/ydb/public/lib/ydb_cli/import/import.cpp
index e9960a21f82..5843c8d0f59 100644
--- a/ydb/public/lib/ydb_cli/import/import.cpp
+++ b/ydb/public/lib/ydb_cli/import/import.cpp
@@ -8,6 +8,7 @@
#include <ydb/public/api/protos/ydb_table.pb.h>
#include <ydb/public/api/protos/ydb_formats.pb.h>
+#include <ydb/public/lib/json_value/ydb_json_value.h>
#include <ydb/public/lib/ydb_cli/common/recursive_list.h>
#include <ydb/public/lib/ydb_cli/dump/util/util.h>
@@ -23,6 +24,7 @@ namespace NConsoleClient {
namespace {
+static inline
TStatus MakeStatus(EStatus code = EStatus::SUCCESS, const TString& error = {}) {
NYql::TIssues issues;
if (error) {
@@ -37,22 +39,32 @@ TImportFileClient::TImportFileClient(const TDriver& driver)
: OperationClient(std::make_shared<NOperation::TOperationClient>(driver))
, SchemeClient(std::make_shared<NScheme::TSchemeClient>(driver))
, TableClient(std::make_shared<NTable::TTableClient>(driver))
-{}
+{
+ UpsertSettings
+ .OperationTimeout(TDuration::Seconds(30))
+ .ClientTimeout(TDuration::Seconds(35));
+ RetrySettings
+ .MaxRetries(10);
+}
TStatus TImportFileClient::Import(const TString& filePath, const TString& dbPath, const TImportFileSettings& settings) {
- if (filePath.empty()) {
- return MakeStatus(EStatus::BAD_REQUEST, "No file specified");
- }
-
- TFsPath dataFile(filePath);
- if (!dataFile.Exists()) {
- return MakeStatus(EStatus::BAD_REQUEST,
- TStringBuilder() << "File does not exist: " << filePath);
+ if (! filePath.empty()) {
+ const TFsPath dataFile(filePath);
+ if (!dataFile.Exists()) {
+ return MakeStatus(EStatus::BAD_REQUEST,
+ TStringBuilder() << "File does not exist: " << filePath);
+ }
+ if (!dataFile.IsFile()) {
+ return MakeStatus(EStatus::BAD_REQUEST,
+ TStringBuilder() << "Not a file: " << filePath);
+ }
}
- if (!dataFile.IsFile()) {
- return MakeStatus(EStatus::BAD_REQUEST,
- TStringBuilder() << "Not a file: " << filePath);
+ if (settings.Format_ == EOutputFormat::Tsv) {
+ if (settings.Delimiter_ != "\t") {
+ return MakeStatus(EStatus::BAD_REQUEST,
+ TStringBuilder() << "Illegal delimiter for TSV format, only tab is allowed");
+ }
}
auto result = NDump::DescribePath(*SchemeClient, dbPath).GetStatus();
@@ -61,12 +73,24 @@ TStatus TImportFileClient::Import(const TString& filePath, const TString& dbPath
TStringBuilder() << "Table does not exist: " << dbPath);
}
- if (settings.Format_ != NTable::EDataFormat::CSV) {
- return MakeStatus(EStatus::BAD_REQUEST,
- TStringBuilder() << "Unsupported format");
+ // If the filename passed is empty, read from stdin, else from the file.
+ std::unique_ptr<TFileInput> fileInput = filePath.empty() ? nullptr
+ : std::make_unique<TFileInput>(filePath, settings.FileBufferSize_);
+ IInputStream& input = fileInput ? *fileInput : Cin;
+
+ switch (settings.Format_) {
+ case EOutputFormat::Default:
+ case EOutputFormat::Csv:
+ case EOutputFormat::Tsv:
+ return UpsertCsv(input, dbPath, settings);
+ case EOutputFormat::Json:
+ case EOutputFormat::JsonUnicode:
+ case EOutputFormat::JsonBase64:
+ return UpsertJson(input, dbPath, settings);
+ default: ;
}
-
- return UpsertCsv(dataFile, dbPath, settings);
+ return MakeStatus(EStatus::BAD_REQUEST,
+ TStringBuilder() << "Unsupported format #" << (int) settings.Format_);
}
namespace {
@@ -85,19 +109,23 @@ TStatus WaitForQueue(std::deque<TAsyncStatus>& inFlightRequests, size_t maxQueue
}
-TStatus TImportFileClient::UpsertCsv(const TString& dataFile, const TString& dbPath,
+inline
+TAsyncStatus TImportFileClient::UpsertCsvBuffer(const TString& dbPath, const TString& buffer) {
+ auto upsert = [this, dbPath, buffer](NYdb::NTable::TTableClient& tableClient) -> TAsyncStatus {
+ return tableClient.BulkUpsert(dbPath, NTable::EDataFormat::CSV, buffer, {}, UpsertSettings)
+ .Apply([](const NYdb::NTable::TAsyncBulkUpsertResult& bulkUpsertResult) {
+ NYdb::TStatus status = bulkUpsertResult.GetValueSync();
+ return NThreading::MakeFuture(status);
+ });
+ };
+ return TableClient->RetryOperation(upsert, RetrySettings);
+}
+
+TStatus TImportFileClient::UpsertCsv(IInputStream& input, const TString& dbPath,
const TImportFileSettings& settings) {
- TFileInput input(dataFile, settings.FileBufferSize_);
TString line;
TString buffer;
- auto upsertSettings = NTable::TBulkUpsertSettings()
- .OperationTimeout(TDuration::Seconds(30))
- .ClientTimeout(TDuration::Seconds(35));
-
- auto retrySettings = NTable::TRetryOperationSettings()
- .MaxRetries(10);
-
Ydb::Formats::CsvSettings csvSettings;
bool special = false;
@@ -128,7 +156,7 @@ TStatus TImportFileClient::UpsertCsv(const TString& dataFile, const TString& dbP
if (special) {
TString formatSettings;
Y_PROTOBUF_SUPPRESS_NODISCARD csvSettings.SerializeToString(&formatSettings);
- upsertSettings.FormatSettings(formatSettings);
+ UpsertSettings.FormatSettings(formatSettings);
}
std::deque<TAsyncStatus> inFlightRequests;
@@ -147,31 +175,94 @@ TStatus TImportFileClient::UpsertCsv(const TString& dataFile, const TString& dbP
return status;
}
- inFlightRequests.push_back(UpsertCsvBuffer(dbPath, buffer, {}, upsertSettings, retrySettings));
+ inFlightRequests.push_back(UpsertCsvBuffer(dbPath, buffer));
buffer = headerRow;
}
}
if (!buffer.Empty()) {
- inFlightRequests.push_back(UpsertCsvBuffer(dbPath, buffer, {}, upsertSettings, retrySettings));
+ inFlightRequests.push_back(UpsertCsvBuffer(dbPath, buffer));
}
return WaitForQueue(inFlightRequests, 0);
}
-TAsyncStatus TImportFileClient::UpsertCsvBuffer(const TString& dbPath, const TString& csv, const TString& header,
- const NTable::TBulkUpsertSettings& upsertSettings,
- const NTable::TRetryOperationSettings& retrySettings) {
- auto upsert = [dbPath, csv, header, upsertSettings](NYdb::NTable::TTableClient& tableClient) -> TAsyncStatus {
- return tableClient.BulkUpsert(dbPath, NTable::EDataFormat::CSV, csv, header, upsertSettings)
+inline
+TAsyncStatus TImportFileClient::UpsertJsonBuffer(const TString& dbPath, TValueBuilder& builder) {
+ auto upsert = [this, dbPath, rows = builder.Build()]
+ (NYdb::NTable::TTableClient& tableClient) -> TAsyncStatus {
+ NYdb::TValue r = rows;
+ return tableClient.BulkUpsert(dbPath, std::move(r), UpsertSettings)
.Apply([](const NYdb::NTable::TAsyncBulkUpsertResult& bulkUpsertResult) {
NYdb::TStatus status = bulkUpsertResult.GetValueSync();
return NThreading::MakeFuture(status);
});
};
+ return TableClient->RetryOperation(upsert, RetrySettings);
+}
+
+TStatus TImportFileClient::UpsertJson(IInputStream& input, const TString& dbPath,
+ const TImportFileSettings& settings) {
+ NTable::TCreateSessionResult sessionResult = TableClient->GetSession(NTable::TCreateSessionSettings()).GetValueSync();
+ if (! sessionResult.IsSuccess())
+ return sessionResult;
+ NTable::TDescribeTableResult tableResult = sessionResult.GetSession().DescribeTable(dbPath).GetValueSync();
+ if (! tableResult.IsSuccess())
+ return tableResult;
- return TableClient->RetryOperation(upsert, retrySettings);
+ const TType tableType = GetTableType(tableResult.GetTableDescription());
+ const NYdb::EBinaryStringEncoding stringEncoding =
+ (settings.Format_==EOutputFormat::JsonBase64) ? NYdb::EBinaryStringEncoding::Base64 :
+ NYdb::EBinaryStringEncoding::Unicode;
+
+ std::deque<TAsyncStatus> inFlightRequests;
+
+ size_t currentSize = 0;
+ size_t currentRows = 0;
+ auto currentBatch = std::make_unique<TValueBuilder>();
+ currentBatch->BeginList();
+
+ TString line;
+ while (size_t sz = input.ReadLine(line)) {
+ currentBatch->AddListItem(JsonToYdbValue(line, tableType, stringEncoding));
+ currentSize += line.Size();
+ currentRows += 1;
+
+ if (currentSize >= settings.BytesPerRequest_) {
+ currentBatch->EndList();
+
+ auto status = WaitForQueue(inFlightRequests, settings.MaxInFlightRequests_);
+ if (!status.IsSuccess()) {
+ return status;
+ }
+
+ inFlightRequests.push_back(UpsertJsonBuffer(dbPath, *currentBatch));
+
+ currentBatch = std::make_unique<TValueBuilder>();
+ currentBatch->BeginList();
+ currentSize = 0;
+ currentRows = 0;
+ }
+ }
+
+ if (currentRows > 0) {
+ currentBatch->EndList();
+ inFlightRequests.push_back(UpsertJsonBuffer(dbPath, *currentBatch));
+ }
+
+ return WaitForQueue(inFlightRequests, 0);
+}
+
+TType TImportFileClient::GetTableType(const NTable::TTableDescription& tableDescription) {
+ TTypeBuilder typeBuilder;
+ typeBuilder.BeginStruct();
+ const auto& columns = tableDescription.GetTableColumns();
+ for (auto it = columns.begin(); it!=columns.end(); it++) {
+ typeBuilder.AddMember((*it).Name, (*it).Type);
+ }
+ typeBuilder.EndStruct();
+ return typeBuilder.Build();
}
}
diff --git a/ydb/public/lib/ydb_cli/import/import.h b/ydb/public/lib/ydb_cli/import/import.h
index 66269e8e6c1..8b20d66517c 100644
--- a/ydb/public/lib/ydb_cli/import/import.h
+++ b/ydb/public/lib/ydb_cli/import/import.h
@@ -1,5 +1,6 @@
#pragma once
+#include <ydb/public/lib/ydb_cli/common/formats.h>
#include <ydb/public/sdk/cpp/client/ydb_types/status/status.h>
#include <ydb/public/sdk/cpp/client/ydb_types/fluent_settings_helpers.h>
#include <ydb/public/sdk/cpp/client/ydb_table/table.h>
@@ -31,10 +32,12 @@ struct TImportFileSettings : public TOperationRequestSettings<TImportFileSetting
static constexpr ui64 MaxBytesPerRequest = 8_MB;
static constexpr const char * DefaultDelimiter = ",";
- FLUENT_SETTING_DEFAULT(NTable::EDataFormat, Format, NTable::EDataFormat::CSV);
+ // Allowed values: Csv, Tsv, JsonUnicode, JsonBase64. Default means Csv
+ FLUENT_SETTING_DEFAULT(EOutputFormat, Format, EOutputFormat::Default);
FLUENT_SETTING_DEFAULT(ui64, BytesPerRequest, 1_MB);
FLUENT_SETTING_DEFAULT(ui64, FileBufferSize, 2_MB);
FLUENT_SETTING_DEFAULT(ui64, MaxInFlightRequests, 100);
+ // Settings below are for CSV format only
FLUENT_SETTING_DEFAULT(ui32, SkipRows, 0);
FLUENT_SETTING_DEFAULT(bool, Header, false);
FLUENT_SETTING_DEFAULT(TString, Delimiter, DefaultDelimiter);
@@ -44,7 +47,12 @@ struct TImportFileSettings : public TOperationRequestSettings<TImportFileSetting
class TImportFileClient {
public:
explicit TImportFileClient(const TDriver& driver);
+ TImportFileClient(const TImportFileClient&) = delete;
+ // Ingest data from the input file to the database table.
+ // fsPath: path to input file, stdin if empty
+ // dbPath: full path to the database table, including the database path
+ // settings: input data format and operational settings
TStatus Import(const TString& fsPath, const TString& dbPath, const TImportFileSettings& settings = {});
private:
@@ -52,10 +60,15 @@ private:
std::shared_ptr<NScheme::TSchemeClient> SchemeClient;
std::shared_ptr<NTable::TTableClient> TableClient;
- TStatus UpsertCsv(const TString& dataFile, const TString& dbPath, const TImportFileSettings& settings);
- TAsyncStatus UpsertCsvBuffer(const TString& dbPath, const TString& csv, const TString& header,
- const NTable::TBulkUpsertSettings& upsertSettings,
- const NTable::TRetryOperationSettings& retrySettings);
+ NTable::TBulkUpsertSettings UpsertSettings;
+ NTable::TRetryOperationSettings RetrySettings;
+
+ TStatus UpsertCsv(IInputStream& input, const TString& dbPath, const TImportFileSettings& settings);
+ TAsyncStatus UpsertCsvBuffer(const TString& dbPath, const TString& buffer);
+
+ TStatus UpsertJson(IInputStream& input, const TString& dbPath, const TImportFileSettings& settings);
+ TAsyncStatus UpsertJsonBuffer(const TString& dbPath, TValueBuilder& builder);
+ TType GetTableType(const NTable::TTableDescription& tableDescription);
};
}
diff --git a/ydb/tests/functional/ydb_cli/canondata/result.json b/ydb/tests/functional/ydb_cli/canondata/result.json
index 63885584756..57776dab145 100644
--- a/ydb/tests/functional/ydb_cli/canondata/result.json
+++ b/ydb/tests/functional/ydb_cli/canondata/result.json
@@ -1,4 +1,13 @@
{
+ "test_ydb_impex.TestImpex.test_format_csv": {
+ "uri": "file://test_ydb_impex.TestImpex.test_format_csv/result.output"
+ },
+ "test_ydb_impex.TestImpex.test_format_json": {
+ "uri": "file://test_ydb_impex.TestImpex.test_format_json/result.output"
+ },
+ "test_ydb_impex.TestImpex.test_format_tsv": {
+ "uri": "file://test_ydb_impex.TestImpex.test_format_tsv/result.output"
+ },
"test_ydb_scripting.TestExecuteScriptWithFormats.test_stream_yql_script_json_base64": {
"uri": "file://test_ydb_scripting.TestExecuteScriptWithFormats.test_stream_yql_script_json_base64/result.output"
},
@@ -47,6 +56,9 @@
"test_ydb_scripting.TestScriptingServiceHelp.test_help_ex": {
"uri": "file://test_ydb_scripting.TestScriptingServiceHelp.test_help_ex/result.output"
},
+ "test_ydb_table.TestExecuteQueryWithFormats.test_data_query_csv": {
+ "uri": "file://test_ydb_table.TestExecuteQueryWithFormats.test_data_query_csv/result.output"
+ },
"test_ydb_table.TestExecuteQueryWithFormats.test_data_query_json_base64": {
"uri": "file://test_ydb_table.TestExecuteQueryWithFormats.test_data_query_json_base64/result.output"
},
@@ -62,6 +74,12 @@
"test_ydb_table.TestExecuteQueryWithFormats.test_data_query_pretty": {
"uri": "file://test_ydb_table.TestExecuteQueryWithFormats.test_data_query_pretty/result.output"
},
+ "test_ydb_table.TestExecuteQueryWithFormats.test_data_query_tsv": {
+ "uri": "file://test_ydb_table.TestExecuteQueryWithFormats.test_data_query_tsv/result.output"
+ },
+ "test_ydb_table.TestExecuteQueryWithFormats.test_read_table_csv": {
+ "uri": "file://test_ydb_table.TestExecuteQueryWithFormats.test_read_table_csv/result.output"
+ },
"test_ydb_table.TestExecuteQueryWithFormats.test_read_table_json_base64": {
"uri": "file://test_ydb_table.TestExecuteQueryWithFormats.test_read_table_json_base64/result.output"
},
@@ -77,6 +95,12 @@
"test_ydb_table.TestExecuteQueryWithFormats.test_read_table_pretty": {
"uri": "file://test_ydb_table.TestExecuteQueryWithFormats.test_read_table_pretty/result.output"
},
+ "test_ydb_table.TestExecuteQueryWithFormats.test_read_table_tsv": {
+ "uri": "file://test_ydb_table.TestExecuteQueryWithFormats.test_read_table_tsv/result.output"
+ },
+ "test_ydb_table.TestExecuteQueryWithFormats.test_scan_query_csv": {
+ "uri": "file://test_ydb_table.TestExecuteQueryWithFormats.test_scan_query_csv/result.output"
+ },
"test_ydb_table.TestExecuteQueryWithFormats.test_scan_query_json_base64": {
"uri": "file://test_ydb_table.TestExecuteQueryWithFormats.test_scan_query_json_base64/result.output"
},
@@ -92,6 +116,9 @@
"test_ydb_table.TestExecuteQueryWithFormats.test_scan_query_pretty": {
"uri": "file://test_ydb_table.TestExecuteQueryWithFormats.test_scan_query_pretty/result.output"
},
+ "test_ydb_table.TestExecuteQueryWithFormats.test_scan_query_tsv": {
+ "uri": "file://test_ydb_table.TestExecuteQueryWithFormats.test_scan_query_tsv/result.output"
+ },
"test_ydb_table.TestExecuteQueryWithParams.test_list": {
"uri": "file://test_ydb_table.TestExecuteQueryWithParams.test_list/result.output"
},
diff --git a/ydb/tests/functional/ydb_cli/canondata/test_ydb_impex.TestImpex.test_format_csv/result.output b/ydb/tests/functional/ydb_cli/canondata/test_ydb_impex.TestImpex.test_format_csv/result.output
new file mode 100644
index 00000000000..d900a075354
--- /dev/null
+++ b/ydb/tests/functional/ydb_cli/canondata/test_ydb_impex.TestImpex.test_format_csv/result.output
@@ -0,0 +1,5 @@
+1,1111,"one"
+2,2222,"two"
+3,3333,"three"
+5,5555,"five"
+7,7777,"seven"
diff --git a/ydb/tests/functional/ydb_cli/canondata/test_ydb_impex.TestImpex.test_format_json/result.output b/ydb/tests/functional/ydb_cli/canondata/test_ydb_impex.TestImpex.test_format_json/result.output
new file mode 100644
index 00000000000..cead26df793
--- /dev/null
+++ b/ydb/tests/functional/ydb_cli/canondata/test_ydb_impex.TestImpex.test_format_json/result.output
@@ -0,0 +1,5 @@
+{"key":1,"id":1111,"value":"one"}
+{"key":2,"id":2222,"value":"two"}
+{"key":3,"id":3333,"value":"three"}
+{"key":5,"id":5555,"value":"five"}
+{"key":7,"id":7777,"value":"seven"}
diff --git a/ydb/tests/functional/ydb_cli/canondata/test_ydb_impex.TestImpex.test_format_tsv/result.output b/ydb/tests/functional/ydb_cli/canondata/test_ydb_impex.TestImpex.test_format_tsv/result.output
new file mode 100644
index 00000000000..ebe43b48d1c
--- /dev/null
+++ b/ydb/tests/functional/ydb_cli/canondata/test_ydb_impex.TestImpex.test_format_tsv/result.output
@@ -0,0 +1,5 @@
+1 1111 "one"
+2 2222 "two"
+3 3333 "three"
+5 5555 "five"
+7 7777 "seven"
diff --git a/ydb/tests/functional/ydb_cli/canondata/test_ydb_table.TestExecuteQueryWithFormats.test_data_query_csv/result.output b/ydb/tests/functional/ydb_cli/canondata/test_ydb_table.TestExecuteQueryWithFormats.test_data_query_csv/result.output
new file mode 100644
index 00000000000..06ac255958b
--- /dev/null
+++ b/ydb/tests/functional/ydb_cli/canondata/test_ydb_table.TestExecuteQueryWithFormats.test_data_query_csv/result.output
@@ -0,0 +1,3 @@
+1111,1,"one"
+2222,2,"two"
+3333,3,"three"
diff --git a/ydb/tests/functional/ydb_cli/canondata/test_ydb_table.TestExecuteQueryWithFormats.test_data_query_tsv/result.output b/ydb/tests/functional/ydb_cli/canondata/test_ydb_table.TestExecuteQueryWithFormats.test_data_query_tsv/result.output
new file mode 100644
index 00000000000..9f09d15f010
--- /dev/null
+++ b/ydb/tests/functional/ydb_cli/canondata/test_ydb_table.TestExecuteQueryWithFormats.test_data_query_tsv/result.output
@@ -0,0 +1,3 @@
+1111 1 "one"
+2222 2 "two"
+3333 3 "three"
diff --git a/ydb/tests/functional/ydb_cli/canondata/test_ydb_table.TestExecuteQueryWithFormats.test_read_table_csv/result.output b/ydb/tests/functional/ydb_cli/canondata/test_ydb_table.TestExecuteQueryWithFormats.test_read_table_csv/result.output
new file mode 100644
index 00000000000..d900a075354
--- /dev/null
+++ b/ydb/tests/functional/ydb_cli/canondata/test_ydb_table.TestExecuteQueryWithFormats.test_read_table_csv/result.output
@@ -0,0 +1,5 @@
+1,1111,"one"
+2,2222,"two"
+3,3333,"three"
+5,5555,"five"
+7,7777,"seven"
diff --git a/ydb/tests/functional/ydb_cli/canondata/test_ydb_table.TestExecuteQueryWithFormats.test_read_table_tsv/result.output b/ydb/tests/functional/ydb_cli/canondata/test_ydb_table.TestExecuteQueryWithFormats.test_read_table_tsv/result.output
new file mode 100644
index 00000000000..ebe43b48d1c
--- /dev/null
+++ b/ydb/tests/functional/ydb_cli/canondata/test_ydb_table.TestExecuteQueryWithFormats.test_read_table_tsv/result.output
@@ -0,0 +1,5 @@
+1 1111 "one"
+2 2222 "two"
+3 3333 "three"
+5 5555 "five"
+7 7777 "seven"
diff --git a/ydb/tests/functional/ydb_cli/canondata/test_ydb_table.TestExecuteQueryWithFormats.test_scan_query_csv/result.output b/ydb/tests/functional/ydb_cli/canondata/test_ydb_table.TestExecuteQueryWithFormats.test_scan_query_csv/result.output
new file mode 100644
index 00000000000..06ac255958b
--- /dev/null
+++ b/ydb/tests/functional/ydb_cli/canondata/test_ydb_table.TestExecuteQueryWithFormats.test_scan_query_csv/result.output
@@ -0,0 +1,3 @@
+1111,1,"one"
+2222,2,"two"
+3333,3,"three"
diff --git a/ydb/tests/functional/ydb_cli/canondata/test_ydb_table.TestExecuteQueryWithFormats.test_scan_query_tsv/result.output b/ydb/tests/functional/ydb_cli/canondata/test_ydb_table.TestExecuteQueryWithFormats.test_scan_query_tsv/result.output
new file mode 100644
index 00000000000..9f09d15f010
--- /dev/null
+++ b/ydb/tests/functional/ydb_cli/canondata/test_ydb_table.TestExecuteQueryWithFormats.test_scan_query_tsv/result.output
@@ -0,0 +1,3 @@
+1111 1 "one"
+2222 2 "two"
+3333 3 "three"
diff --git a/ydb/tests/functional/ydb_cli/test_ydb_impex.py b/ydb/tests/functional/ydb_cli/test_ydb_impex.py
new file mode 100644
index 00000000000..e8c41f056a7
--- /dev/null
+++ b/ydb/tests/functional/ydb_cli/test_ydb_impex.py
@@ -0,0 +1,129 @@
+# -*- coding: utf-8 -*-
+
+from ydb.tests.library.common import yatest_common
+from ydb.tests.library.harness.kikimr_cluster import kikimr_cluster_factory
+import ydb
+import logging
+
+
+logger = logging.getLogger(__name__)
+
+
+DATA_CSV = """key,id,value
+1,1111,"one"
+2,2222,"two"
+3,3333,"three"
+5,5555,"five"
+7,7777,"seven"
+"""
+
+DATA_TSV = DATA_CSV.replace(',', '\t')
+
+
+DATA_JSON = """{"key":1,"id":1111,"value":"one"}
+{"key":2,"id":2222,"value":"two"}
+{"key":3,"id":3333,"value":"three"}
+{"key":5,"id":5555,"value":"five"}
+{"key":7,"id":7777,"value":"seven"}
+"""
+
+
+def ydb_bin():
+ return yatest_common.binary_path("ydb/apps/ydb/ydb")
+
+
+def create_table(session, path):
+ session.create_table(
+ path,
+ ydb.TableDescription()
+ .with_column(ydb.Column("key", ydb.OptionalType(ydb.PrimitiveType.Uint32)))
+ .with_column(ydb.Column("id", ydb.OptionalType(ydb.PrimitiveType.Uint64)))
+ .with_column(ydb.Column("value", ydb.OptionalType(ydb.PrimitiveType.Utf8)))
+ .with_primary_keys("key")
+ )
+
+
+class BaseTestTableService(object):
+ @classmethod
+ def setup_class(cls):
+ cls.cluster = kikimr_cluster_factory()
+ cls.cluster.start()
+ cls.root_dir = "/Root"
+ driver_config = ydb.DriverConfig(
+ database="/Root",
+ endpoint="%s:%s" % (cls.cluster.nodes[1].host, cls.cluster.nodes[1].port))
+ cls.driver = ydb.Driver(driver_config)
+ cls.driver.wait(timeout=4)
+
+ @classmethod
+ def teardown_class(cls):
+ if hasattr(cls, 'cluster'):
+ cls.cluster.stop()
+
+ @classmethod
+ def execute_ydb_cli_command(cls, args):
+ execution = yatest_common.execute(
+ [
+ ydb_bin(),
+ "--endpoint", "grpc://localhost:%d" % cls.cluster.nodes[1].grpc_port,
+ "--database", cls.root_dir
+ ] +
+ args
+ )
+
+ result = execution.std_out
+ logger.debug("std_out:\n" + result.decode('utf-8'))
+ return result
+
+ @staticmethod
+ def canonical_result(output_result):
+ output_file_name = "result.output"
+ with open(output_file_name, "w") as f:
+ f.write(output_result.decode('utf-8'))
+ return yatest_common.canonical_file(output_file_name, local=True, universal_lines=True)
+
+
+class TestImpex(BaseTestTableService):
+
+ @classmethod
+ def setup_class(cls):
+ BaseTestTableService.setup_class()
+
+ session = cls.driver.table_client.session().create()
+ cls.table_path = cls.root_dir + "/impex_table"
+ create_table(session, cls.table_path)
+
+ def clear_table(self):
+ query = "DELETE FROM `{}`".format(self.table_path)
+ self.execute_ydb_cli_command(["yql", "-s", query])
+
+ def run_import_csv(self, ftype, data):
+ self.clear_table()
+ with open("tempinput.dat", "w") as f:
+ f.writelines(data)
+ output = self.execute_ydb_cli_command(["import", "file", ftype, "-p", self.table_path, "-i", "tempinput.dat", "--header"])
+ return self.canonical_result(output)
+
+ def run_import_json(self, data):
+ self.clear_table()
+ with open("tempinput.dat", "w") as f:
+ f.writelines(data)
+ output = self.execute_ydb_cli_command(["import", "file", "json", "-p", self.table_path, "-i", "tempinput.dat"])
+ return self.canonical_result(output)
+
+ def run_export(self, format):
+ query = "SELECT `key`, `id`, `value` FROM `{}` ORDER BY `key`".format(self.table_path)
+ output = self.execute_ydb_cli_command(["table", "query", "execute", "-q", query, "-t", "scan", "--format", format])
+ return self.canonical_result(output)
+
+ def test_format_csv(self):
+ self.run_import_csv("csv", DATA_CSV)
+ return self.run_export("csv")
+
+ def test_format_tsv(self):
+ self.run_import_csv("tsv", DATA_TSV)
+ return self.run_export("tsv")
+
+ def test_format_json(self):
+ self.run_import_json(DATA_JSON)
+ return self.run_export("json-unicode")
diff --git a/ydb/tests/functional/ydb_cli/test_ydb_table.py b/ydb/tests/functional/ydb_cli/test_ydb_table.py
index 148d33ede23..545056d313b 100644
--- a/ydb/tests/functional/ydb_cli/test_ydb_table.py
+++ b/ydb/tests/functional/ydb_cli/test_ydb_table.py
@@ -173,6 +173,12 @@ class TestExecuteQueryWithFormats(BaseTestTableService):
def test_data_query_json_unicode_array(self):
return self.execute_data_query('json-unicode-array')
+ def test_data_query_csv(self):
+ return self.execute_data_query('csv')
+
+ def test_data_query_tsv(self):
+ return self.execute_data_query('tsv')
+
# ScanQuery
def test_scan_query_pretty(self):
@@ -190,6 +196,12 @@ class TestExecuteQueryWithFormats(BaseTestTableService):
def test_scan_query_json_unicode_array(self):
return self.execute_scan_query('json-unicode-array')
+ def test_scan_query_csv(self):
+ return self.execute_scan_query('csv')
+
+ def test_scan_query_tsv(self):
+ return self.execute_scan_query('tsv')
+
# ReadTable
def test_read_table_pretty(self):
@@ -206,3 +218,9 @@ class TestExecuteQueryWithFormats(BaseTestTableService):
def test_read_table_json_unicode_array(self):
return self.execute_read_table('json-unicode-array')
+
+ def test_read_table_csv(self):
+ return self.execute_read_table('csv')
+
+ def test_read_table_tsv(self):
+ return self.execute_read_table('tsv')