aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorNikolay Perfilov <pnv1@yandex-team.ru>2024-11-20 00:58:55 +0300
committerGitHub <noreply@github.com>2024-11-20 00:58:55 +0300
commitfef9bc2801129afea020f8aa69e2ce2733ce78aa (patch)
tree27795342b96706414bc72757344207d00b3a7197
parent9c294d8c6c3e536454a850244386e7508b3932a1 (diff)
downloadydb-fef9bc2801129afea020f8aa69e2ce2733ce78aa.tar.gz
Improve csv import, steps 1 & 2 (#11679)
-rw-r--r--ydb/public/lib/ydb_cli/commands/ydb_service_import.cpp1
-rw-r--r--ydb/public/lib/ydb_cli/common/csv_parser.cpp89
-rw-r--r--ydb/public/lib/ydb_cli/common/csv_parser.h28
-rw-r--r--ydb/public/lib/ydb_cli/common/csv_parser_ut.cpp146
-rw-r--r--ydb/public/lib/ydb_cli/common/parameters.cpp6
-rw-r--r--ydb/public/lib/ydb_cli/import/import.cpp139
-rw-r--r--ydb/public/lib/ydb_cli/import/import.h7
7 files changed, 311 insertions, 105 deletions
diff --git a/ydb/public/lib/ydb_cli/commands/ydb_service_import.cpp b/ydb/public/lib/ydb_cli/commands/ydb_service_import.cpp
index 8ab5de6929..23ab8e32ea 100644
--- a/ydb/public/lib/ydb_cli/commands/ydb_service_import.cpp
+++ b/ydb/public/lib/ydb_cli/commands/ydb_service_import.cpp
@@ -302,6 +302,7 @@ int TCommandImportFromCsv::Run(TConfig& config) {
settings.NewlineDelimited(NewlineDelimited);
settings.HeaderRow(HeaderRow);
settings.NullValue(NullValue);
+ settings.Verbose(config.IsVerbose());
if (Delimiter.size() != 1) {
throw TMisuseException()
diff --git a/ydb/public/lib/ydb_cli/common/csv_parser.cpp b/ydb/public/lib/ydb_cli/common/csv_parser.cpp
index 61c90dd1ff..b3d50636b7 100644
--- a/ydb/public/lib/ydb_cli/common/csv_parser.cpp
+++ b/ydb/public/lib/ydb_cli/common/csv_parser.cpp
@@ -1,5 +1,6 @@
#include "csv_parser.h"
+#include <ydb/public/api/protos/ydb_value.pb.h>
#include <ydb/public/lib/ydb_cli/common/common.h>
#include <library/cpp/string_utils/csv/csv.h>
@@ -177,7 +178,7 @@ public:
}
}
- void BuildValue(TStringBuf token) {
+ void BuildValue(const TStringBuf& token) {
switch (Parser.GetKind()) {
case TTypeParser::ETypeKind::Primitive: {
BuildPrimitive(TString(token));
@@ -279,7 +280,7 @@ public:
throw TCsvParseException() << "Expected bool value: \"true\" or \"false\", received: \"" << token << "\".";
}
- void EnsureNull(TStringBuf token) const {
+ void EnsureNull(const TStringBuf& token) const {
if (!NullValue) {
throw TCsvParseException() << "Expected null value instead of \"" << token << "\", but null value is not set.";
}
@@ -288,7 +289,7 @@ public:
}
}
- TValue Convert(TStringBuf token) {
+ TValue Convert(const TStringBuf& token) {
BuildValue(token);
return Builder.Build();
}
@@ -317,10 +318,10 @@ TCsvParseException FormatError(const std::exception& inputError,
}
TValue FieldToValue(TTypeParser& parser,
- TStringBuf token,
+ const TStringBuf& token,
const std::optional<TString>& nullValue,
const TCsvParser::TParseMetadata& meta,
- TString columnName) {
+ const TString& columnName) {
try {
TCsvToYdbConverter converter(parser, nullValue);
return converter.Convert(token);
@@ -331,7 +332,7 @@ TValue FieldToValue(TTypeParser& parser,
TStringBuf Consume(NCsvFormat::CsvSplitter& splitter,
const TCsvParser::TParseMetadata& meta,
- TString columnName) {
+ const TString& columnName) {
try {
return splitter.Consume();
} catch (std::exception& e) {
@@ -342,12 +343,12 @@ TStringBuf Consume(NCsvFormat::CsvSplitter& splitter,
}
TCsvParser::TCsvParser(TString&& headerRow, const char delimeter, const std::optional<TString>& nullValue,
- const std::map<TString, TType>* paramTypes,
+ const std::map<TString, TType>* destinationTypes,
const std::map<TString, TString>* paramSources)
: HeaderRow(std::move(headerRow))
, Delimeter(delimeter)
, NullValue(nullValue)
- , ParamTypes(paramTypes)
+ , DestinationTypes(destinationTypes)
, ParamSources(paramSources)
{
NCsvFormat::CsvSplitter splitter(HeaderRow, Delimeter);
@@ -355,17 +356,17 @@ TCsvParser::TCsvParser(TString&& headerRow, const char delimeter, const std::opt
}
TCsvParser::TCsvParser(TVector<TString>&& header, const char delimeter, const std::optional<TString>& nullValue,
- const std::map<TString, TType>* paramTypes,
+ const std::map<TString, TType>* destinationTypes,
const std::map<TString, TString>* paramSources)
: Header(std::move(header))
, Delimeter(delimeter)
, NullValue(nullValue)
- , ParamTypes(paramTypes)
+ , DestinationTypes(destinationTypes)
, ParamSources(paramSources)
{
}
-void TCsvParser::GetParams(TString&& data, TParamsBuilder& builder, const TParseMetadata& meta) const {
+void TCsvParser::BuildParams(TString& data, TParamsBuilder& builder, const TParseMetadata& meta) const {
NCsvFormat::CsvSplitter splitter(data, Delimeter);
auto headerIt = Header.begin();
do {
@@ -374,8 +375,8 @@ void TCsvParser::GetParams(TString&& data, TParamsBuilder& builder, const TParse
}
TStringBuf token = Consume(splitter, meta, *headerIt);
TString fullname = "$" + *headerIt;
- auto paramIt = ParamTypes->find(fullname);
- if (paramIt == ParamTypes->end()) {
+ auto paramIt = DestinationTypes->find(fullname);
+ if (paramIt == DestinationTypes->end()) {
++headerIt;
continue;
}
@@ -395,7 +396,7 @@ void TCsvParser::GetParams(TString&& data, TParamsBuilder& builder, const TParse
}
}
-void TCsvParser::GetValue(TString&& data, TValueBuilder& builder, const TType& type, const TParseMetadata& meta) const {
+void TCsvParser::BuildValue(TString& data, TValueBuilder& builder, const TType& type, const TParseMetadata& meta) const {
NCsvFormat::CsvSplitter splitter(data, Delimeter);
auto headerIt = Header.cbegin();
std::map<TString, TStringBuf> fields;
@@ -431,18 +432,68 @@ void TCsvParser::GetValue(TString&& data, TValueBuilder& builder, const TType& t
builder.EndStruct();
}
-TType TCsvParser::GetColumnsType() const {
+TValue TCsvParser::BuildList(std::vector<TString>& lines, const TString& filename, std::optional<ui64> row) const {
+ std::vector<std::unique_ptr<TTypeParser>> columnTypeParsers;
+ columnTypeParsers.reserve(ResultColumnCount);
+ for (const TType* type : ResultLineTypesSorted) {
+ columnTypeParsers.push_back(std::make_unique<TTypeParser>(*type));
+ }
+ Ydb::Value listValue;
+ auto* listItems = listValue.mutable_items();
+ listItems->Reserve(lines.size());
+ for (auto& line : lines) {
+ std::vector<TStringBuf> fields;
+ NCsvFormat::CsvSplitter splitter(line, Delimeter);
+ TParseMetadata meta {row, filename};
+ auto headerIt = Header.cbegin();
+ auto skipIt = SkipBitMap.begin();
+ do {
+ if (headerIt == Header.cend()) { // SkipBitMap has same size as Header
+ throw FormatError(yexception() << "Header contains less fields than data. Header: \"" << HeaderRow << "\", data: \"" << line << "\"", meta);
+ }
+ TStringBuf nextField = Consume(splitter, meta, *headerIt);
+ if (!*skipIt) {
+ fields.emplace_back(nextField);
+ }
+ ++headerIt;
+ ++skipIt;
+ } while (splitter.Step());
+ auto* structItems = listItems->Add()->mutable_items();
+ structItems->Reserve(ResultColumnCount);
+ auto typeParserIt = columnTypeParsers.begin();
+ auto fieldIt = fields.begin();
+ auto nameIt = ResultLineNamesSorted.begin();
+ // fields size equals columnTypeParsers size, no need for second end check
+ for (; typeParserIt != columnTypeParsers.end(); ++typeParserIt, ++fieldIt, ++nameIt) {
+ *structItems->Add() = FieldToValue(*typeParserIt->get(), *fieldIt, NullValue, meta, **nameIt).GetProto();
+ }
+ if (row.has_value()) {
+ ++row.value();
+ }
+ }
+ return TValue(ResultListType.value(), std::move(listValue));
+}
+
+void TCsvParser::BuildLineType() {
+ SkipBitMap.reserve(Header.size());
+ ResultColumnCount = 0;
TTypeBuilder builder;
builder.BeginStruct();
for (const auto& colName : Header) {
- if (ParamTypes->find(colName) != ParamTypes->end()) {
- builder.AddMember(colName, ParamTypes->at(colName));
+ auto findIt = DestinationTypes->find(colName);
+ if (findIt != DestinationTypes->end()) {
+ builder.AddMember(colName, findIt->second);
+ ResultLineTypesSorted.emplace_back(&findIt->second);
+ ResultLineNamesSorted.emplace_back(&colName);
+ SkipBitMap.push_back(false);
+ ++ResultColumnCount;
} else {
- builder.AddMember("__ydb_skip_column_name", TTypeBuilder().Build());
+ SkipBitMap.push_back(true);
}
}
builder.EndStruct();
- return builder.Build();
+ ResultLineType = builder.Build();
+ ResultListType = TTypeBuilder().List(ResultLineType.value()).Build();
}
}
diff --git a/ydb/public/lib/ydb_cli/common/csv_parser.h b/ydb/public/lib/ydb_cli/common/csv_parser.h
index 21387aeee1..7d68909441 100644
--- a/ydb/public/lib/ydb_cli/common/csv_parser.h
+++ b/ydb/public/lib/ydb_cli/common/csv_parser.h
@@ -25,23 +25,39 @@ public:
~TCsvParser() = default;
TCsvParser(TString&& headerRow, const char delimeter, const std::optional<TString>& nullValue,
- const std::map<TString, TType>* paramTypes = nullptr,
+ const std::map<TString, TType>* destinationTypes = nullptr,
const std::map<TString, TString>* paramSources = nullptr);
TCsvParser(TVector<TString>&& header, const char delimeter, const std::optional<TString>& nullValue,
- const std::map<TString, TType>* paramTypes = nullptr,
+ const std::map<TString, TType>* destinationTypes = nullptr,
const std::map<TString, TString>* paramSources = nullptr);
- void GetParams(TString&& data, TParamsBuilder& builder, const TParseMetadata& meta) const;
- void GetValue(TString&& data, TValueBuilder& builder, const TType& type, const TParseMetadata& meta) const;
- TType GetColumnsType() const;
+ void BuildParams(TString& data, TParamsBuilder& builder, const TParseMetadata& meta) const;
+ void BuildValue(TString& data, TValueBuilder& builder, const TType& type, const TParseMetadata& meta) const;
+ TValue BuildList(std::vector<TString>& lines, const TString& filename,
+ std::optional<ui64> row = std::nullopt) const;
+ void BuildLineType();
private:
TVector<TString> Header;
TString HeaderRow;
char Delimeter;
std::optional<TString> NullValue;
- const std::map<TString, TType>* ParamTypes;
+ // Types of destination table or query parameters
+ // Column name -> column type
+ const std::map<TString, TType>* DestinationTypes;
const std::map<TString, TString>* ParamSources;
+ // Type of a single row in resulting TValue.
+ // Column order according to the header, though can have less elements than the Header
+ std::optional<TType> ResultLineType = std::nullopt;
+ std::optional<TType> ResultListType = std::nullopt;
+ // If a value is true (header column is absent in dstTypes), skip corresponding value in input csv row
+ std::vector<bool> SkipBitMap;
+ // Count of columns in each struct in resulting TValue
+ size_t ResultColumnCount;
+ // Types of columns in a single row in resulting TValue.
+ // Column order according to the header, though can have less elements than the Header
+ std::vector<const TType*> ResultLineTypesSorted;
+ std::vector<const TString*> ResultLineNamesSorted;
};
}
diff --git a/ydb/public/lib/ydb_cli/common/csv_parser_ut.cpp b/ydb/public/lib/ydb_cli/common/csv_parser_ut.cpp
index ce54c4ea8e..1a78e69523 100644
--- a/ydb/public/lib/ydb_cli/common/csv_parser_ut.cpp
+++ b/ydb/public/lib/ydb_cli/common/csv_parser_ut.cpp
@@ -8,41 +8,54 @@ using namespace NYdb;
using namespace NYdb::NConsoleClient;
Y_UNIT_TEST_SUITE(YdbCliCsvParserTests) {
- bool CompareValues(const TValue& lhs, const TValue& rhs) {
- TString stringFirst, stringSecond;
- NProtoBuf::TextFormat::PrintToString(lhs.GetProto(), &stringFirst);
- NProtoBuf::TextFormat::PrintToString(rhs.GetProto(), &stringSecond);
- return stringFirst == stringSecond;
+ void AssertValuesEqual(const TValue& actual, const TValue& expected) {
+ TString actualString, expectedString;
+ NProtoBuf::TextFormat::PrintToString(actual.GetProto(), &actualString);
+ NProtoBuf::TextFormat::PrintToString(expected.GetProto(), &expectedString);
+ UNIT_ASSERT_VALUES_EQUAL_C(actualString, expectedString,
+ "Expected string: \"" << expectedString << "\", actual string: \"" << actualString << "\"");
}
- void CommonTestParams(TString&& header, TString&& data, const std::map<TString, TValue>& result) {
+ void CommonTestParams(TString&& header, TString&& data, const std::map<TString, TValue>& estimatedResult) {
std::map<TString, TType> paramTypes;
- for (const auto& [name, value] : result) {
+ for (const auto& [name, value] : estimatedResult) {
paramTypes.insert({name, value.GetType()});
}
std::map<TString, TString> paramSources;
TCsvParser parser(std::move(header), ',', "", &paramTypes, &paramSources);
TParamsBuilder paramBuilder;
- parser.GetParams(std::move(data), paramBuilder, TCsvParser::TParseMetadata{});
+ parser.BuildParams(data, paramBuilder, TCsvParser::TParseMetadata{});
auto values = paramBuilder.Build().GetValues();
- UNIT_ASSERT_EQUAL(values.size(), result.size());
- for (const auto& [name, value] : result) {
+ UNIT_ASSERT_EQUAL(values.size(), estimatedResult.size());
+ for (const auto& [name, value] : estimatedResult) {
auto it = values.find(name);
UNIT_ASSERT_UNEQUAL(it, values.end());
- UNIT_ASSERT(CompareValues(value, it->second));
+ AssertValuesEqual(value, it->second);
}
}
- void CommonTestValue(TString&& header, TString&& data, const TValue& result) {
+ void CommonTestValue(TString&& header, TString&& data, const TValue& estimatedResult) {
std::map<TString, TType> paramTypes;
- for (auto member : result.GetType().GetProto().struct_type().members()) {
+ for (auto member : estimatedResult.GetType().GetProto().struct_type().members()) {
paramTypes.insert({member.name(), member.type()});
}
TCsvParser parser(std::move(header), ',', "", &paramTypes, nullptr);
TValueBuilder valueBuilder;
- parser.GetValue(std::move(data), valueBuilder, result.GetType(), TCsvParser::TParseMetadata{});
- UNIT_ASSERT(CompareValues(valueBuilder.Build(), result));
+ parser.BuildValue(data, valueBuilder, estimatedResult.GetType(), TCsvParser::TParseMetadata{});
+ AssertValuesEqual(valueBuilder.Build(), estimatedResult);
+ }
+
+ void CommonTestBuildList(TString&& header, std::vector<TString>&& data, const TValue& estimatedResult) {
+ std::map<TString, TType> columnTypes;
+ for (auto member : estimatedResult.GetType().GetProto().list_type().item().struct_type().members()) {
+ columnTypes.insert({member.name(), member.type()});
+ }
+
+ TCsvParser parser(std::move(header), ',', "", &columnTypes, nullptr);
+ parser.BuildLineType();
+ TValue builtResult = parser.BuildList(data, "testFile.csv", 0);
+ AssertValuesEqual(builtResult, estimatedResult);
}
TValue MakeStruct(const TString& name, const TValue& value) {
@@ -71,6 +84,37 @@ Y_UNIT_TEST_SUITE(YdbCliCsvParserTests) {
CommonTestValue("name", "100000000000", MakeStruct("name", TValueBuilder().Uint64(100000000000).Build()));
}
+ Y_UNIT_TEST(IntegerTypesTestList) {
+ CommonTestBuildList(
+ "int8,int16,int32,int64,uint8,uint16,uint32,uint64",
+ {
+ "-1,-10000,-1000000,-100000000000,1,10000,1000000,100000000000",
+ "-2,-20000,-2000000,-200000000000,2,20000,2000000,200000000000"
+ },
+ TValueBuilder().BeginList()
+ .AddListItem().BeginStruct()
+ .AddMember("int8").Int8(-1)
+ .AddMember("int16").Int16(-10000)
+ .AddMember("int32").Int32(-1000000)
+ .AddMember("int64").Int64(-100000000000)
+ .AddMember("uint8").Uint8(1)
+ .AddMember("uint16").Uint16(10000)
+ .AddMember("uint32").Uint32(1000000)
+ .AddMember("uint64").Uint64(100000000000)
+ .EndStruct()
+ .AddListItem().BeginStruct()
+ .AddMember("int8").Int8(-2)
+ .AddMember("int16").Int16(-20000)
+ .AddMember("int32").Int32(-2000000)
+ .AddMember("int64").Int64(-200000000000)
+ .AddMember("uint8").Uint8(2)
+ .AddMember("uint16").Uint16(20000)
+ .AddMember("uint32").Uint32(2000000)
+ .AddMember("uint64").Uint64(200000000000)
+ .EndStruct()
+ .EndList().Build());
+ }
+
Y_UNIT_TEST(DateTypesTestParams) {
CommonTestParams("name", "\"2001-01-01\"", {{"$name", TValueBuilder().Date(TInstant::ParseIso8601("2001-01-01")).Build()}});
CommonTestParams("name", "\"2001-01-01T12:12:12\"", {{"$name", TValueBuilder().Datetime(TInstant::ParseIso8601("2001-01-01T12:12:12")).Build()}});
@@ -97,6 +141,41 @@ Y_UNIT_TEST_SUITE(YdbCliCsvParserTests) {
CommonTestValue("name", "\"2001-01-01T12:12:12.111111,Europe/Moscow\"", MakeStruct("name", TValueBuilder().TzTimestamp("2001-01-01T12:12:12.111111,Europe/Moscow").Build()));
}
+ Y_UNIT_TEST(DateTypesTestBuildList) {
+ CommonTestBuildList(
+ "dateIso8601,dateTimeIso8601,timestampIso8601,date,datetime,timestamp,interval,tzDate,tzDatetime,tzTimestamp",
+ {
+ "\"2001-01-01\",\"2001-01-01T12:12:12\",\"2001-01-01T12:12:12.111111\",12000,1200000,120000000,-2000,\"2001-01-01,Europe/Moscow\",\"2001-01-01T12:12:12,Europe/Moscow\",\"2001-01-01T12:12:12.111111,Europe/Moscow\"",
+ "\"2001-01-02\",\"2001-01-01T12:12:13\",\"2001-01-01T12:12:12.222222\",13000,1300000,130000000,-3000,\"2001-01-02,Europe/Moscow\",\"2001-01-01T12:12:13,Europe/Moscow\",\"2001-01-01T12:12:12.222222,Europe/Moscow\""
+ },
+ TValueBuilder().BeginList()
+ .AddListItem().BeginStruct()
+ .AddMember("dateIso8601").Date(TInstant::ParseIso8601("2001-01-01"))
+ .AddMember("dateTimeIso8601").Datetime(TInstant::ParseIso8601("2001-01-01T12:12:12"))
+ .AddMember("timestampIso8601").Timestamp(TInstant::ParseIso8601("2001-01-01T12:12:12.111111"))
+ .AddMember("date").Date(TInstant::Days(12000))
+ .AddMember("datetime").Datetime(TInstant::Seconds(1200000))
+ .AddMember("timestamp").Timestamp(TInstant::MicroSeconds(120000000))
+ .AddMember("interval").Interval(-2000)
+ .AddMember("tzDate").TzDate("2001-01-01,Europe/Moscow")
+ .AddMember("tzDatetime").TzDatetime("2001-01-01T12:12:12,Europe/Moscow")
+ .AddMember("tzTimestamp").TzTimestamp("2001-01-01T12:12:12.111111,Europe/Moscow")
+ .EndStruct()
+ .AddListItem().BeginStruct()
+ .AddMember("dateIso8601").Date(TInstant::ParseIso8601("2001-01-02"))
+ .AddMember("dateTimeIso8601").Datetime(TInstant::ParseIso8601("2001-01-01T12:12:13"))
+ .AddMember("timestampIso8601").Timestamp(TInstant::ParseIso8601("2001-01-01T12:12:12.222222"))
+ .AddMember("date").Date(TInstant::Days(13000))
+ .AddMember("datetime").Datetime(TInstant::Seconds(1300000))
+ .AddMember("timestamp").Timestamp(TInstant::MicroSeconds(130000000))
+ .AddMember("interval").Interval(-3000)
+ .AddMember("tzDate").TzDate("2001-01-02,Europe/Moscow")
+ .AddMember("tzDatetime").TzDatetime("2001-01-01T12:12:13,Europe/Moscow")
+ .AddMember("tzTimestamp").TzTimestamp("2001-01-01T12:12:12.222222,Europe/Moscow")
+ .EndStruct()
+ .EndList().Build());
+ }
+
Y_UNIT_TEST(OtherPrimitiveTypeTestParams) {
CommonTestParams("name", "строка", {{"$name", TValueBuilder().Utf8("строка").Build()}});
CommonTestParams("name", "строка", {{"$name", TValueBuilder().String("строка").Build()}});
@@ -134,6 +213,43 @@ Y_UNIT_TEST_SUITE(YdbCliCsvParserTests) {
CommonTestValue("name", "данные", MakeStruct("name", TValueBuilder().Pg(TPgValue(TPgValue::VK_TEXT, "данные", TPgType("some_type"))).Build()));
}
+ Y_UNIT_TEST(OtherPrimitiveTypesTestBuildList) {
+ CommonTestBuildList(
+ "utf8,string,bool,float,double,dyNumber,decimal,uuid,json,optionalUtf8,pg",
+ {
+ "строка1,строка2,true,1.183,1.183,\"1.183\",1.183,550e8400-e29b-41d4-a716-446655440000,\"{\"\"a\"\":10, \"\"b\"\":\"\"string1\"\"}\",\"строка1\",\"данные1\"",
+ "строка3,строка4,false,1.184,1.184,\"1.184\",1.184,550e8400-e29b-41d4-a716-446655440001,\"{\"\"a\"\":11, \"\"b\"\":\"\"string2\"\"}\",\"строка2\",\"данные2\""
+ },
+ TValueBuilder().BeginList()
+ .AddListItem().BeginStruct()
+ .AddMember("utf8").Utf8("строка1")
+ .AddMember("string").String("строка2")
+ .AddMember("bool").Bool(true)
+ .AddMember("float").Float(1.183)
+ .AddMember("double").Double(1.183)
+ .AddMember("dyNumber").DyNumber("1.183")
+ .AddMember("decimal").Decimal(TDecimalValue("1.183", 22, 9))
+ .AddMember("uuid").Uuid(TUuidValue("550e8400-e29b-41d4-a716-446655440000"))
+ .AddMember("json").Json("{\"a\":10, \"b\":\"string1\"}")
+ .AddMember("optionalUtf8").OptionalUtf8("строка1")
+ .AddMember("pg").Pg(TPgValue(TPgValue::VK_TEXT, "данные1", TPgType("some_type")))
+ .EndStruct()
+ .AddListItem().BeginStruct()
+ .AddMember("utf8").Utf8("строка3")
+ .AddMember("string").String("строка4")
+ .AddMember("bool").Bool(false)
+ .AddMember("float").Float(1.184)
+ .AddMember("double").Double(1.184)
+ .AddMember("dyNumber").DyNumber("1.184")
+ .AddMember("decimal").Decimal(TDecimalValue("1.184", 22, 9))
+ .AddMember("uuid").Uuid(TUuidValue("550e8400-e29b-41d4-a716-446655440001"))
+ .AddMember("json").Json("{\"a\":11, \"b\":\"string2\"}")
+ .AddMember("optionalUtf8").OptionalUtf8("строка2")
+ .AddMember("pg").Pg(TPgValue(TPgValue::VK_TEXT, "данные2", TPgType("some_type")))
+ .EndStruct()
+ .EndList().Build());
+ }
+
Y_UNIT_TEST(EdgeValuesTestParams) {
CommonTestParams("name", "255", {{"$name", TValueBuilder().Uint8(255).Build()}});
CommonTestParams("name", "65535", {{"$name", TValueBuilder().Uint16(65535).Build()}});
diff --git a/ydb/public/lib/ydb_cli/common/parameters.cpp b/ydb/public/lib/ydb_cli/common/parameters.cpp
index 7eec4314c0..ae843a3703 100644
--- a/ydb/public/lib/ydb_cli/common/parameters.cpp
+++ b/ydb/public/lib/ydb_cli/common/parameters.cpp
@@ -405,7 +405,7 @@ bool TCommandWithParameters::GetNextParams(const TDriver& driver, const TString&
}
case EDataFormat::Csv:
case EDataFormat::Tsv: {
- CsvParser.GetParams(std::move(*data), *paramBuilder, TCsvParser::TParseMetadata{});
+ CsvParser.BuildParams(*data, *paramBuilder, TCsvParser::TParseMetadata{});
break;
}
default:
@@ -449,7 +449,7 @@ bool TCommandWithParameters::GetNextParams(const TDriver& driver, const TString&
case EDataFormat::Csv:
case EDataFormat::Tsv: {
TValueBuilder valueBuilder;
- CsvParser.GetValue(std::move(*data), valueBuilder, type, TCsvParser::TParseMetadata{});
+ CsvParser.BuildValue(*data, valueBuilder, type, TCsvParser::TParseMetadata{});
paramBuilder->AddParam(fullname, valueBuilder.Build());
break;
}
@@ -533,7 +533,7 @@ bool TCommandWithParameters::GetNextParams(const TDriver& driver, const TString&
case EDataFormat::Csv:
case EDataFormat::Tsv: {
valueBuilder.AddListItem();
- CsvParser.GetValue(std::move(*data), valueBuilder, type.GetProto().list_type().item(), TCsvParser::TParseMetadata{});
+ CsvParser.BuildValue(*data, valueBuilder, type.GetProto().list_type().item(), TCsvParser::TParseMetadata{});
break;
}
default:
diff --git a/ydb/public/lib/ydb_cli/import/import.cpp b/ydb/public/lib/ydb_cli/import/import.cpp
index 8e8f55c12a..ddfd3ccb61 100644
--- a/ydb/public/lib/ydb_cli/import/import.cpp
+++ b/ydb/public/lib/ydb_cli/import/import.cpp
@@ -1,5 +1,6 @@
#include "import.h"
+#include <util/stream/format.h>
#include <ydb/public/sdk/cpp/client/ydb_driver/driver.h>
#include <ydb/public/sdk/cpp/client/ydb_operation/operation.h>
#include <ydb/public/sdk/cpp/client/ydb_proto/accessor.h>
@@ -79,6 +80,10 @@ TStatus WaitForQueue(const size_t maxQueueSize, std::vector<TAsyncStatus>& inFli
return MakeStatus();
}
+TString PrettifyBytes(double bytes) {
+ return ToString(HumanReadableSize(bytes, SF_BYTES));
+}
+
void InitCsvParser(TCsvParser& parser,
bool& removeLastDelimiter,
NCsvFormat::TLinesSplitter& csvSource,
@@ -104,16 +109,15 @@ void InitCsvParser(TCsvParser& parser,
headerRow.erase(headerRow.size() - settings.Delimiter_.size());
}
parser = TCsvParser(std::move(headerRow), settings.Delimiter_[0], settings.NullValue_, columnTypes);
- return;
- }
-
- TVector<TString> columns;
- Y_ENSURE_BT(dbTableInfo);
- for (const auto& column : dbTableInfo->GetColumns()) {
- columns.push_back(column.Name);
+ } else {
+ TVector<TString> columns;
+ Y_ENSURE_BT(dbTableInfo);
+ for (const auto& column : dbTableInfo->GetColumns()) {
+ columns.push_back(column.Name);
+ }
+ parser = TCsvParser(std::move(columns), settings.Delimiter_[0], settings.NullValue_, columnTypes);
}
- parser = TCsvParser(std::move(columns), settings.Delimiter_[0], settings.NullValue_, columnTypes);
- return;
+ parser.BuildLineType();
}
FHANDLE GetStdinFileno() {
@@ -126,22 +130,22 @@ FHANDLE GetStdinFileno() {
class TMaxInflightGetter {
public:
- TMaxInflightGetter(ui64 totalMaxInFlight, std::atomic<ui64>& filesCount)
+ TMaxInflightGetter(ui64 totalMaxInFlight, std::atomic<ui64>& currentFileCount)
: TotalMaxInFlight(totalMaxInFlight)
- , FilesCount(filesCount) {
+ , CurrentFileCount(currentFileCount) {
}
~TMaxInflightGetter() {
- --FilesCount;
+ --CurrentFileCount;
}
ui64 GetCurrentMaxInflight() const {
- return (TotalMaxInFlight - 1) / FilesCount + 1; // round up
+ return (TotalMaxInFlight - 1) / CurrentFileCount + 1; // round up
}
private:
ui64 TotalMaxInFlight;
- std::atomic<ui64>& FilesCount;
+ std::atomic<ui64>& CurrentFileCount;
};
class TCsvFileReader {
@@ -269,13 +273,13 @@ TImportFileClient::TImportFileClient(const TDriver& driver, const TClientCommand
}
TStatus TImportFileClient::Import(const TVector<TString>& filePaths, const TString& dbPath, const TImportFileSettings& settings) {
- FilesCount = filePaths.size();
+ CurrentFileCount = filePaths.size();
if (settings.Format_ == EDataFormat::Tsv && settings.Delimiter_ != "\t") {
return MakeStatus(EStatus::BAD_REQUEST,
TStringBuilder() << "Illegal delimiter for TSV format, only tab is allowed");
}
- auto resultStatus = TableClient->RetryOperationSync(
+ auto describeStatus = TableClient->RetryOperationSync(
[this, dbPath](NTable::TSession session) {
auto result = session.DescribeTable(dbPath).ExtractValueSync();
if (result.IsSuccess()) {
@@ -284,16 +288,16 @@ TStatus TImportFileClient::Import(const TVector<TString>& filePaths, const TStri
return result;
}, NTable::TRetryOperationSettings{RetrySettings}.MaxRetries(10));
- if (!resultStatus.IsSuccess()) {
+ if (!describeStatus.IsSuccess()) {
/// TODO: Remove this after server fix: https://github.com/ydb-platform/ydb/issues/7791
- if (resultStatus.GetStatus() == EStatus::SCHEME_ERROR) {
+ if (describeStatus.GetStatus() == EStatus::SCHEME_ERROR) {
auto describePathResult = NDump::DescribePath(*SchemeClient, dbPath);
if (describePathResult.GetStatus() != EStatus::SUCCESS) {
return MakeStatus(EStatus::SCHEME_ERROR,
TStringBuilder() << describePathResult.GetIssues().ToString() << dbPath);
}
}
- return resultStatus;
+ return describeStatus;
}
UpsertSettings
@@ -399,7 +403,11 @@ TStatus TImportFileClient::Import(const TVector<TString>& filePaths, const TStri
auto finish = TInstant::Now();
auto duration = finish - start;
progressBar.SetProcess(100);
- Cerr << "Elapsed: " << duration.SecondsFloat() << " sec\n";
+ if (duration.SecondsFloat() > 0) {
+ std::cerr << "Elapsed: " << std::setprecision(3) << duration.SecondsFloat() << " sec. Total read size: "
+ << PrettifyBytes(TotalBytesRead) << ". Average processing speed: "
+ << PrettifyBytes((double)TotalBytesRead / duration.SecondsFloat()) << "/s." << std::endl;
+ }
return MakeStatus(EStatus::SUCCESS);
}
@@ -418,6 +426,20 @@ TAsyncStatus TImportFileClient::UpsertTValueBuffer(const TString& dbPath, TValue
return TableClient->RetryOperation(upsert, RetrySettings);
}
+inline
+TAsyncStatus TImportFileClient::UpsertTValueBuffer(const TString& dbPath, TValue&& rows) {
+ auto upsert = [this, dbPath, rows = std::move(rows)]
+ (NYdb::NTable::TTableClient& tableClient) mutable -> TAsyncStatus {
+ NYdb::TValue rowsCopy(rows.GetType(), rows.GetProto());
+ return tableClient.BulkUpsert(dbPath, std::move(rowsCopy), UpsertSettings)
+ .Apply([](const NYdb::NTable::TAsyncBulkUpsertResult& bulkUpsertResult) {
+ NYdb::TStatus status = bulkUpsertResult.GetValueSync();
+ return NThreading::MakeFuture(status);
+ });
+ };
+ return TableClient->RetryOperation(upsert, RetrySettings);
+}
+
TStatus TImportFileClient::UpsertCsv(IInputStream& input,
const TString& dbPath,
const TImportFileSettings& settings,
@@ -425,13 +447,14 @@ TStatus TImportFileClient::UpsertCsv(IInputStream& input,
std::optional<ui64> inputSizeHint,
ProgressCallbackFunc & progressCallback) {
- TMaxInflightGetter inFlightGetter(settings.MaxInFlightRequests_, FilesCount);
+ TMaxInflightGetter inFlightGetter(settings.MaxInFlightRequests_, CurrentFileCount);
TCountingInput countInput(&input);
NCsvFormat::TLinesSplitter splitter(countInput);
auto columnTypes = GetColumnTypes();
ValidateTValueUpsertTable();
+ TInstant fileStartTime = TInstant::Now();
TCsvParser parser;
bool removeLastDelimiter = false;
@@ -441,13 +464,11 @@ TStatus TImportFileClient::UpsertCsv(IInputStream& input,
splitter.ConsumeLine();
}
- TType lineType = parser.GetColumnsType();
-
THolder<IThreadPool> pool = CreateThreadPool(settings.Threads_);
ui64 row = settings.SkipRows_ + settings.Header_ + 1;
ui64 batchRows = 0;
- ui64 nextBorder = VerboseModeReadSize;
+ ui64 nextBorder = VerboseModeStepSize;
ui64 batchBytes = 0;
ui64 readBytes = 0;
@@ -455,16 +476,8 @@ TStatus TImportFileClient::UpsertCsv(IInputStream& input,
std::vector<TAsyncStatus> inFlightRequests;
std::vector<TString> buffer;
- auto upsertCsv = [&](ui64 row, std::vector<TString>&& buffer) {
- TValueBuilder builder;
- builder.BeginList();
- for (auto&& line : buffer) {
- builder.AddListItem();
- parser.GetValue(std::move(line), builder, lineType, TCsvParser::TParseMetadata{row, filePath});
- ++row;
- }
- builder.EndList();
- return UpsertTValueBuffer(dbPath, builder).ExtractValueSync();
+ auto upsertCsvFunc = [&](std::vector<TString>&& buffer, ui64 row) {
+ return UpsertTValueBuffer(dbPath, parser.BuildList(buffer, filePath, row)).ExtractValueSync();
};
while (TString line = splitter.ConsumeLine()) {
@@ -485,9 +498,9 @@ TStatus TImportFileClient::UpsertCsv(IInputStream& input,
buffer.push_back(line);
- if (readBytes >= nextBorder && RetrySettings.Verbose_) {
- nextBorder += VerboseModeReadSize;
- Cerr << "Processed " << 1.0 * readBytes / (1 << 20) << "Mb and " << row + batchRows << " records" << Endl;
+ if (readBytes >= nextBorder && settings.Verbose_) {
+ nextBorder += VerboseModeStepSize;
+ Cerr << "Processed " << PrettifyBytes(readBytes) << " and " << row + batchRows << " records" << Endl;
}
if (batchBytes < settings.BytesPerRequest_) {
@@ -498,8 +511,8 @@ TStatus TImportFileClient::UpsertCsv(IInputStream& input,
progressCallback(readBytes, *inputSizeHint);
}
- auto asyncUpsertCSV = [&upsertCsv, row, buffer = std::move(buffer)]() mutable {
- return upsertCsv(row, std::move(buffer));
+ auto asyncUpsertCSV = [&upsertCsvFunc, row, buffer = std::move(buffer)]() mutable {
+ return upsertCsvFunc(std::move(buffer), row);
};
row += batchRows;
batchRows = 0;
@@ -515,17 +528,30 @@ TStatus TImportFileClient::UpsertCsv(IInputStream& input,
}
if (!buffer.empty() && countInput.Counter() > 0) {
- upsertCsv(row, std::move(buffer));
+ upsertCsvFunc(std::move(buffer), row);
}
- return WaitForQueue(0, inFlightRequests);
+ TotalBytesRead += readBytes;
+
+ auto waitResult = WaitForQueue(0, inFlightRequests);
+ if (settings.Verbose_) {
+ std::stringstream str;
+ double fileProcessingTimeSeconds = (TInstant::Now() - fileStartTime).SecondsFloat();
+ str << std::endl << "File " << filePath << " of " << PrettifyBytes(readBytes)
+ << " processed in " << std::setprecision(3) << fileProcessingTimeSeconds << " sec";
+ if (fileProcessingTimeSeconds > 0) {
+ str << ", " << PrettifyBytes((double)readBytes / fileProcessingTimeSeconds) << "/s" << std::endl;
+ }
+ std::cerr << str.str();
+ }
+ return waitResult;
}
TStatus TImportFileClient::UpsertCsvByBlocks(const TString& filePath,
const TString& dbPath,
const TImportFileSettings& settings) {
- TMaxInflightGetter inFlightGetter(settings.MaxInFlightRequests_, FilesCount);
+ TMaxInflightGetter inFlightGetter(settings.MaxInFlightRequests_, CurrentFileCount);
TString headerRow;
TCsvFileReader splitter(filePath, settings, headerRow, inFlightGetter);
@@ -538,28 +564,19 @@ TStatus TImportFileClient::UpsertCsvByBlocks(const TString& filePath,
NCsvFormat::TLinesSplitter headerSplitter(headerInput, settings.Delimiter_[0]);
InitCsvParser(parser, removeLastDelimiter, headerSplitter, settings, &columnTypes, DbTableInfo.get());
- TType lineType = parser.GetColumnsType();
-
TVector<TAsyncStatus> threadResults(splitter.GetSplitCount());
THolder<IThreadPool> pool = CreateThreadPool(splitter.GetSplitCount());
for (size_t threadId = 0; threadId < splitter.GetSplitCount(); ++threadId) {
auto loadCsv = [&, threadId] () {
- auto upsertCsv = [&](std::vector<TString>&& buffer) {
- TValueBuilder builder;
- builder.BeginList();
- for (auto&& line : buffer) {
- builder.AddListItem();
- parser.GetValue(std::move(line), builder, lineType, TCsvParser::TParseMetadata{std::nullopt, filePath});
- }
- builder.EndList();
- return UpsertTValueBuffer(dbPath, builder);
+ auto upsertCsvFunc = [&](std::vector<TString>&& buffer) {
+ return UpsertTValueBuffer(dbPath, parser.BuildList(buffer, filePath));
};
std::vector<TAsyncStatus> inFlightRequests;
std::vector<TString> buffer;
ui32 idx = settings.SkipRows_;
ui64 readBytes = 0;
ui64 batchBytes = 0;
- ui64 nextBorder = VerboseModeReadSize;
+ ui64 nextBorder = VerboseModeStepSize;
TAsyncStatus status;
TString line;
while (splitter.GetChunk(threadId).ConsumeLine(line)) {
@@ -577,10 +594,10 @@ TStatus TImportFileClient::UpsertCsvByBlocks(const TString& filePath,
}
buffer.push_back(line);
++idx;
- if (readBytes >= nextBorder && RetrySettings.Verbose_) {
- nextBorder += VerboseModeReadSize;
+ if (readBytes >= nextBorder && settings.Verbose_) {
+ nextBorder += VerboseModeStepSize;
TStringBuilder builder;
- builder << "Processed " << 1.0 * readBytes / (1 << 20) << "Mb and " << idx << " records" << Endl;
+ builder << "Processed " << PrettifyBytes(readBytes) << " and " << idx << " records" << Endl;
Cerr << builder;
}
if (batchBytes >= settings.BytesPerRequest_) {
@@ -590,15 +607,17 @@ TStatus TImportFileClient::UpsertCsvByBlocks(const TString& filePath,
return status;
}
- inFlightRequests.push_back(upsertCsv(std::move(buffer)));
+ inFlightRequests.push_back(upsertCsvFunc(std::move(buffer)));
buffer.clear();
}
}
if (!buffer.empty() && splitter.GetChunk(threadId).GetReadCount() != 0) {
- inFlightRequests.push_back(upsertCsv(std::move(buffer)));
+ inFlightRequests.push_back(upsertCsvFunc(std::move(buffer)));
}
+ TotalBytesRead += readBytes;
+
return WaitForQueue(0, inFlightRequests);
};
threadResults[threadId] = NThreading::Async(loadCsv, *pool);
@@ -617,7 +636,7 @@ TStatus TImportFileClient::UpsertJson(IInputStream& input, const TString& dbPath
const TType tableType = GetTableType();
ValidateTValueUpsertTable();
- TMaxInflightGetter inFlightGetter(settings.MaxInFlightRequests_, FilesCount);
+ TMaxInflightGetter inFlightGetter(settings.MaxInFlightRequests_, CurrentFileCount);
THolder<IThreadPool> pool = CreateThreadPool(settings.Threads_);
ui64 readBytes = 0;
diff --git a/ydb/public/lib/ydb_cli/import/import.h b/ydb/public/lib/ydb_cli/import/import.h
index b05a93fb1e..949e95ef38 100644
--- a/ydb/public/lib/ydb_cli/import/import.h
+++ b/ydb/public/lib/ydb_cli/import/import.h
@@ -54,6 +54,7 @@ struct TImportFileSettings : public TOperationRequestSettings<TImportFileSetting
FLUENT_SETTING_DEFAULT(TString, HeaderRow, "");
FLUENT_SETTING_DEFAULT(TString, Delimiter, DefaultDelimiter);
FLUENT_SETTING_DEFAULT(std::optional<TString>, NullValue, std::nullopt);
+ FLUENT_SETTING_DEFAULT(bool, Verbose, false);
};
class TImportFileClient {
@@ -76,9 +77,10 @@ private:
std::unique_ptr<const NTable::TTableDescription> DbTableInfo;
- std::atomic<ui64> FilesCount;
+ std::atomic<ui64> CurrentFileCount;
+ std::atomic<ui64> TotalBytesRead = 0;
- static constexpr ui32 VerboseModeReadSize = 1 << 27; // 100 MB
+ static constexpr ui32 VerboseModeStepSize = 1 << 27; // 128 MB
using ProgressCallbackFunc = std::function<void (ui64, ui64)>;
@@ -94,6 +96,7 @@ private:
const TImportFileSettings& settings);
TAsyncStatus UpsertTValueBuffer(const TString& dbPath, TValueBuilder& builder);
+ TAsyncStatus UpsertTValueBuffer(const TString& dbPath, TValue&& rows);
TStatus UpsertJson(IInputStream &input, const TString &dbPath, const TImportFileSettings &settings,
std::optional<ui64> inputSizeHint, ProgressCallbackFunc & progressCallback);