diff options
author | Nikolay Perfilov <pnv1@yandex-team.ru> | 2024-11-20 00:58:55 +0300 |
---|---|---|
committer | GitHub <noreply@github.com> | 2024-11-20 00:58:55 +0300 |
commit | fef9bc2801129afea020f8aa69e2ce2733ce78aa (patch) | |
tree | 27795342b96706414bc72757344207d00b3a7197 | |
parent | 9c294d8c6c3e536454a850244386e7508b3932a1 (diff) | |
download | ydb-fef9bc2801129afea020f8aa69e2ce2733ce78aa.tar.gz |
Improve csv import, steps 1 & 2 (#11679)
-rw-r--r-- | ydb/public/lib/ydb_cli/commands/ydb_service_import.cpp | 1 | ||||
-rw-r--r-- | ydb/public/lib/ydb_cli/common/csv_parser.cpp | 89 | ||||
-rw-r--r-- | ydb/public/lib/ydb_cli/common/csv_parser.h | 28 | ||||
-rw-r--r-- | ydb/public/lib/ydb_cli/common/csv_parser_ut.cpp | 146 | ||||
-rw-r--r-- | ydb/public/lib/ydb_cli/common/parameters.cpp | 6 | ||||
-rw-r--r-- | ydb/public/lib/ydb_cli/import/import.cpp | 139 | ||||
-rw-r--r-- | ydb/public/lib/ydb_cli/import/import.h | 7 |
7 files changed, 311 insertions, 105 deletions
diff --git a/ydb/public/lib/ydb_cli/commands/ydb_service_import.cpp b/ydb/public/lib/ydb_cli/commands/ydb_service_import.cpp index 8ab5de6929..23ab8e32ea 100644 --- a/ydb/public/lib/ydb_cli/commands/ydb_service_import.cpp +++ b/ydb/public/lib/ydb_cli/commands/ydb_service_import.cpp @@ -302,6 +302,7 @@ int TCommandImportFromCsv::Run(TConfig& config) { settings.NewlineDelimited(NewlineDelimited); settings.HeaderRow(HeaderRow); settings.NullValue(NullValue); + settings.Verbose(config.IsVerbose()); if (Delimiter.size() != 1) { throw TMisuseException() diff --git a/ydb/public/lib/ydb_cli/common/csv_parser.cpp b/ydb/public/lib/ydb_cli/common/csv_parser.cpp index 61c90dd1ff..b3d50636b7 100644 --- a/ydb/public/lib/ydb_cli/common/csv_parser.cpp +++ b/ydb/public/lib/ydb_cli/common/csv_parser.cpp @@ -1,5 +1,6 @@ #include "csv_parser.h" +#include <ydb/public/api/protos/ydb_value.pb.h> #include <ydb/public/lib/ydb_cli/common/common.h> #include <library/cpp/string_utils/csv/csv.h> @@ -177,7 +178,7 @@ public: } } - void BuildValue(TStringBuf token) { + void BuildValue(const TStringBuf& token) { switch (Parser.GetKind()) { case TTypeParser::ETypeKind::Primitive: { BuildPrimitive(TString(token)); @@ -279,7 +280,7 @@ public: throw TCsvParseException() << "Expected bool value: \"true\" or \"false\", received: \"" << token << "\"."; } - void EnsureNull(TStringBuf token) const { + void EnsureNull(const TStringBuf& token) const { if (!NullValue) { throw TCsvParseException() << "Expected null value instead of \"" << token << "\", but null value is not set."; } @@ -288,7 +289,7 @@ public: } } - TValue Convert(TStringBuf token) { + TValue Convert(const TStringBuf& token) { BuildValue(token); return Builder.Build(); } @@ -317,10 +318,10 @@ TCsvParseException FormatError(const std::exception& inputError, } TValue FieldToValue(TTypeParser& parser, - TStringBuf token, + const TStringBuf& token, const std::optional<TString>& nullValue, const TCsvParser::TParseMetadata& meta, - TString columnName) { + const TString& columnName) { try { TCsvToYdbConverter converter(parser, nullValue); return converter.Convert(token); @@ -331,7 +332,7 @@ TValue FieldToValue(TTypeParser& parser, TStringBuf Consume(NCsvFormat::CsvSplitter& splitter, const TCsvParser::TParseMetadata& meta, - TString columnName) { + const TString& columnName) { try { return splitter.Consume(); } catch (std::exception& e) { @@ -342,12 +343,12 @@ TStringBuf Consume(NCsvFormat::CsvSplitter& splitter, } TCsvParser::TCsvParser(TString&& headerRow, const char delimeter, const std::optional<TString>& nullValue, - const std::map<TString, TType>* paramTypes, + const std::map<TString, TType>* destinationTypes, const std::map<TString, TString>* paramSources) : HeaderRow(std::move(headerRow)) , Delimeter(delimeter) , NullValue(nullValue) - , ParamTypes(paramTypes) + , DestinationTypes(destinationTypes) , ParamSources(paramSources) { NCsvFormat::CsvSplitter splitter(HeaderRow, Delimeter); @@ -355,17 +356,17 @@ TCsvParser::TCsvParser(TString&& headerRow, const char delimeter, const std::opt } TCsvParser::TCsvParser(TVector<TString>&& header, const char delimeter, const std::optional<TString>& nullValue, - const std::map<TString, TType>* paramTypes, + const std::map<TString, TType>* destinationTypes, const std::map<TString, TString>* paramSources) : Header(std::move(header)) , Delimeter(delimeter) , NullValue(nullValue) - , ParamTypes(paramTypes) + , DestinationTypes(destinationTypes) , ParamSources(paramSources) { } -void TCsvParser::GetParams(TString&& data, TParamsBuilder& builder, const TParseMetadata& meta) const { +void TCsvParser::BuildParams(TString& data, TParamsBuilder& builder, const TParseMetadata& meta) const { NCsvFormat::CsvSplitter splitter(data, Delimeter); auto headerIt = Header.begin(); do { @@ -374,8 +375,8 @@ void TCsvParser::GetParams(TString&& data, TParamsBuilder& builder, const TParse } TStringBuf token = Consume(splitter, meta, *headerIt); TString fullname = "$" + *headerIt; - auto paramIt = ParamTypes->find(fullname); - if (paramIt == ParamTypes->end()) { + auto paramIt = DestinationTypes->find(fullname); + if (paramIt == DestinationTypes->end()) { ++headerIt; continue; } @@ -395,7 +396,7 @@ void TCsvParser::GetParams(TString&& data, TParamsBuilder& builder, const TParse } } -void TCsvParser::GetValue(TString&& data, TValueBuilder& builder, const TType& type, const TParseMetadata& meta) const { +void TCsvParser::BuildValue(TString& data, TValueBuilder& builder, const TType& type, const TParseMetadata& meta) const { NCsvFormat::CsvSplitter splitter(data, Delimeter); auto headerIt = Header.cbegin(); std::map<TString, TStringBuf> fields; @@ -431,18 +432,68 @@ void TCsvParser::GetValue(TString&& data, TValueBuilder& builder, const TType& t builder.EndStruct(); } -TType TCsvParser::GetColumnsType() const { +TValue TCsvParser::BuildList(std::vector<TString>& lines, const TString& filename, std::optional<ui64> row) const { + std::vector<std::unique_ptr<TTypeParser>> columnTypeParsers; + columnTypeParsers.reserve(ResultColumnCount); + for (const TType* type : ResultLineTypesSorted) { + columnTypeParsers.push_back(std::make_unique<TTypeParser>(*type)); + } + Ydb::Value listValue; + auto* listItems = listValue.mutable_items(); + listItems->Reserve(lines.size()); + for (auto& line : lines) { + std::vector<TStringBuf> fields; + NCsvFormat::CsvSplitter splitter(line, Delimeter); + TParseMetadata meta {row, filename}; + auto headerIt = Header.cbegin(); + auto skipIt = SkipBitMap.begin(); + do { + if (headerIt == Header.cend()) { // SkipBitMap has same size as Header + throw FormatError(yexception() << "Header contains less fields than data. Header: \"" << HeaderRow << "\", data: \"" << line << "\"", meta); + } + TStringBuf nextField = Consume(splitter, meta, *headerIt); + if (!*skipIt) { + fields.emplace_back(nextField); + } + ++headerIt; + ++skipIt; + } while (splitter.Step()); + auto* structItems = listItems->Add()->mutable_items(); + structItems->Reserve(ResultColumnCount); + auto typeParserIt = columnTypeParsers.begin(); + auto fieldIt = fields.begin(); + auto nameIt = ResultLineNamesSorted.begin(); + // fields size equals columnTypeParsers size, no need for second end check + for (; typeParserIt != columnTypeParsers.end(); ++typeParserIt, ++fieldIt, ++nameIt) { + *structItems->Add() = FieldToValue(*typeParserIt->get(), *fieldIt, NullValue, meta, **nameIt).GetProto(); + } + if (row.has_value()) { + ++row.value(); + } + } + return TValue(ResultListType.value(), std::move(listValue)); +} + +void TCsvParser::BuildLineType() { + SkipBitMap.reserve(Header.size()); + ResultColumnCount = 0; TTypeBuilder builder; builder.BeginStruct(); for (const auto& colName : Header) { - if (ParamTypes->find(colName) != ParamTypes->end()) { - builder.AddMember(colName, ParamTypes->at(colName)); + auto findIt = DestinationTypes->find(colName); + if (findIt != DestinationTypes->end()) { + builder.AddMember(colName, findIt->second); + ResultLineTypesSorted.emplace_back(&findIt->second); + ResultLineNamesSorted.emplace_back(&colName); + SkipBitMap.push_back(false); + ++ResultColumnCount; } else { - builder.AddMember("__ydb_skip_column_name", TTypeBuilder().Build()); + SkipBitMap.push_back(true); } } builder.EndStruct(); - return builder.Build(); + ResultLineType = builder.Build(); + ResultListType = TTypeBuilder().List(ResultLineType.value()).Build(); } } diff --git a/ydb/public/lib/ydb_cli/common/csv_parser.h b/ydb/public/lib/ydb_cli/common/csv_parser.h index 21387aeee1..7d68909441 100644 --- a/ydb/public/lib/ydb_cli/common/csv_parser.h +++ b/ydb/public/lib/ydb_cli/common/csv_parser.h @@ -25,23 +25,39 @@ public: ~TCsvParser() = default; TCsvParser(TString&& headerRow, const char delimeter, const std::optional<TString>& nullValue, - const std::map<TString, TType>* paramTypes = nullptr, + const std::map<TString, TType>* destinationTypes = nullptr, const std::map<TString, TString>* paramSources = nullptr); TCsvParser(TVector<TString>&& header, const char delimeter, const std::optional<TString>& nullValue, - const std::map<TString, TType>* paramTypes = nullptr, + const std::map<TString, TType>* destinationTypes = nullptr, const std::map<TString, TString>* paramSources = nullptr); - void GetParams(TString&& data, TParamsBuilder& builder, const TParseMetadata& meta) const; - void GetValue(TString&& data, TValueBuilder& builder, const TType& type, const TParseMetadata& meta) const; - TType GetColumnsType() const; + void BuildParams(TString& data, TParamsBuilder& builder, const TParseMetadata& meta) const; + void BuildValue(TString& data, TValueBuilder& builder, const TType& type, const TParseMetadata& meta) const; + TValue BuildList(std::vector<TString>& lines, const TString& filename, + std::optional<ui64> row = std::nullopt) const; + void BuildLineType(); private: TVector<TString> Header; TString HeaderRow; char Delimeter; std::optional<TString> NullValue; - const std::map<TString, TType>* ParamTypes; + // Types of destination table or query parameters + // Column name -> column type + const std::map<TString, TType>* DestinationTypes; const std::map<TString, TString>* ParamSources; + // Type of a single row in resulting TValue. + // Column order according to the header, though can have less elements than the Header + std::optional<TType> ResultLineType = std::nullopt; + std::optional<TType> ResultListType = std::nullopt; + // If a value is true (header column is absent in dstTypes), skip corresponding value in input csv row + std::vector<bool> SkipBitMap; + // Count of columns in each struct in resulting TValue + size_t ResultColumnCount; + // Types of columns in a single row in resulting TValue. + // Column order according to the header, though can have less elements than the Header + std::vector<const TType*> ResultLineTypesSorted; + std::vector<const TString*> ResultLineNamesSorted; }; } diff --git a/ydb/public/lib/ydb_cli/common/csv_parser_ut.cpp b/ydb/public/lib/ydb_cli/common/csv_parser_ut.cpp index ce54c4ea8e..1a78e69523 100644 --- a/ydb/public/lib/ydb_cli/common/csv_parser_ut.cpp +++ b/ydb/public/lib/ydb_cli/common/csv_parser_ut.cpp @@ -8,41 +8,54 @@ using namespace NYdb; using namespace NYdb::NConsoleClient; Y_UNIT_TEST_SUITE(YdbCliCsvParserTests) { - bool CompareValues(const TValue& lhs, const TValue& rhs) { - TString stringFirst, stringSecond; - NProtoBuf::TextFormat::PrintToString(lhs.GetProto(), &stringFirst); - NProtoBuf::TextFormat::PrintToString(rhs.GetProto(), &stringSecond); - return stringFirst == stringSecond; + void AssertValuesEqual(const TValue& actual, const TValue& expected) { + TString actualString, expectedString; + NProtoBuf::TextFormat::PrintToString(actual.GetProto(), &actualString); + NProtoBuf::TextFormat::PrintToString(expected.GetProto(), &expectedString); + UNIT_ASSERT_VALUES_EQUAL_C(actualString, expectedString, + "Expected string: \"" << expectedString << "\", actual string: \"" << actualString << "\""); } - void CommonTestParams(TString&& header, TString&& data, const std::map<TString, TValue>& result) { + void CommonTestParams(TString&& header, TString&& data, const std::map<TString, TValue>& estimatedResult) { std::map<TString, TType> paramTypes; - for (const auto& [name, value] : result) { + for (const auto& [name, value] : estimatedResult) { paramTypes.insert({name, value.GetType()}); } std::map<TString, TString> paramSources; TCsvParser parser(std::move(header), ',', "", ¶mTypes, ¶mSources); TParamsBuilder paramBuilder; - parser.GetParams(std::move(data), paramBuilder, TCsvParser::TParseMetadata{}); + parser.BuildParams(data, paramBuilder, TCsvParser::TParseMetadata{}); auto values = paramBuilder.Build().GetValues(); - UNIT_ASSERT_EQUAL(values.size(), result.size()); - for (const auto& [name, value] : result) { + UNIT_ASSERT_EQUAL(values.size(), estimatedResult.size()); + for (const auto& [name, value] : estimatedResult) { auto it = values.find(name); UNIT_ASSERT_UNEQUAL(it, values.end()); - UNIT_ASSERT(CompareValues(value, it->second)); + AssertValuesEqual(value, it->second); } } - void CommonTestValue(TString&& header, TString&& data, const TValue& result) { + void CommonTestValue(TString&& header, TString&& data, const TValue& estimatedResult) { std::map<TString, TType> paramTypes; - for (auto member : result.GetType().GetProto().struct_type().members()) { + for (auto member : estimatedResult.GetType().GetProto().struct_type().members()) { paramTypes.insert({member.name(), member.type()}); } TCsvParser parser(std::move(header), ',', "", ¶mTypes, nullptr); TValueBuilder valueBuilder; - parser.GetValue(std::move(data), valueBuilder, result.GetType(), TCsvParser::TParseMetadata{}); - UNIT_ASSERT(CompareValues(valueBuilder.Build(), result)); + parser.BuildValue(data, valueBuilder, estimatedResult.GetType(), TCsvParser::TParseMetadata{}); + AssertValuesEqual(valueBuilder.Build(), estimatedResult); + } + + void CommonTestBuildList(TString&& header, std::vector<TString>&& data, const TValue& estimatedResult) { + std::map<TString, TType> columnTypes; + for (auto member : estimatedResult.GetType().GetProto().list_type().item().struct_type().members()) { + columnTypes.insert({member.name(), member.type()}); + } + + TCsvParser parser(std::move(header), ',', "", &columnTypes, nullptr); + parser.BuildLineType(); + TValue builtResult = parser.BuildList(data, "testFile.csv", 0); + AssertValuesEqual(builtResult, estimatedResult); } TValue MakeStruct(const TString& name, const TValue& value) { @@ -71,6 +84,37 @@ Y_UNIT_TEST_SUITE(YdbCliCsvParserTests) { CommonTestValue("name", "100000000000", MakeStruct("name", TValueBuilder().Uint64(100000000000).Build())); } + Y_UNIT_TEST(IntegerTypesTestList) { + CommonTestBuildList( + "int8,int16,int32,int64,uint8,uint16,uint32,uint64", + { + "-1,-10000,-1000000,-100000000000,1,10000,1000000,100000000000", + "-2,-20000,-2000000,-200000000000,2,20000,2000000,200000000000" + }, + TValueBuilder().BeginList() + .AddListItem().BeginStruct() + .AddMember("int8").Int8(-1) + .AddMember("int16").Int16(-10000) + .AddMember("int32").Int32(-1000000) + .AddMember("int64").Int64(-100000000000) + .AddMember("uint8").Uint8(1) + .AddMember("uint16").Uint16(10000) + .AddMember("uint32").Uint32(1000000) + .AddMember("uint64").Uint64(100000000000) + .EndStruct() + .AddListItem().BeginStruct() + .AddMember("int8").Int8(-2) + .AddMember("int16").Int16(-20000) + .AddMember("int32").Int32(-2000000) + .AddMember("int64").Int64(-200000000000) + .AddMember("uint8").Uint8(2) + .AddMember("uint16").Uint16(20000) + .AddMember("uint32").Uint32(2000000) + .AddMember("uint64").Uint64(200000000000) + .EndStruct() + .EndList().Build()); + } + Y_UNIT_TEST(DateTypesTestParams) { CommonTestParams("name", "\"2001-01-01\"", {{"$name", TValueBuilder().Date(TInstant::ParseIso8601("2001-01-01")).Build()}}); CommonTestParams("name", "\"2001-01-01T12:12:12\"", {{"$name", TValueBuilder().Datetime(TInstant::ParseIso8601("2001-01-01T12:12:12")).Build()}}); @@ -97,6 +141,41 @@ Y_UNIT_TEST_SUITE(YdbCliCsvParserTests) { CommonTestValue("name", "\"2001-01-01T12:12:12.111111,Europe/Moscow\"", MakeStruct("name", TValueBuilder().TzTimestamp("2001-01-01T12:12:12.111111,Europe/Moscow").Build())); } + Y_UNIT_TEST(DateTypesTestBuildList) { + CommonTestBuildList( + "dateIso8601,dateTimeIso8601,timestampIso8601,date,datetime,timestamp,interval,tzDate,tzDatetime,tzTimestamp", + { + "\"2001-01-01\",\"2001-01-01T12:12:12\",\"2001-01-01T12:12:12.111111\",12000,1200000,120000000,-2000,\"2001-01-01,Europe/Moscow\",\"2001-01-01T12:12:12,Europe/Moscow\",\"2001-01-01T12:12:12.111111,Europe/Moscow\"", + "\"2001-01-02\",\"2001-01-01T12:12:13\",\"2001-01-01T12:12:12.222222\",13000,1300000,130000000,-3000,\"2001-01-02,Europe/Moscow\",\"2001-01-01T12:12:13,Europe/Moscow\",\"2001-01-01T12:12:12.222222,Europe/Moscow\"" + }, + TValueBuilder().BeginList() + .AddListItem().BeginStruct() + .AddMember("dateIso8601").Date(TInstant::ParseIso8601("2001-01-01")) + .AddMember("dateTimeIso8601").Datetime(TInstant::ParseIso8601("2001-01-01T12:12:12")) + .AddMember("timestampIso8601").Timestamp(TInstant::ParseIso8601("2001-01-01T12:12:12.111111")) + .AddMember("date").Date(TInstant::Days(12000)) + .AddMember("datetime").Datetime(TInstant::Seconds(1200000)) + .AddMember("timestamp").Timestamp(TInstant::MicroSeconds(120000000)) + .AddMember("interval").Interval(-2000) + .AddMember("tzDate").TzDate("2001-01-01,Europe/Moscow") + .AddMember("tzDatetime").TzDatetime("2001-01-01T12:12:12,Europe/Moscow") + .AddMember("tzTimestamp").TzTimestamp("2001-01-01T12:12:12.111111,Europe/Moscow") + .EndStruct() + .AddListItem().BeginStruct() + .AddMember("dateIso8601").Date(TInstant::ParseIso8601("2001-01-02")) + .AddMember("dateTimeIso8601").Datetime(TInstant::ParseIso8601("2001-01-01T12:12:13")) + .AddMember("timestampIso8601").Timestamp(TInstant::ParseIso8601("2001-01-01T12:12:12.222222")) + .AddMember("date").Date(TInstant::Days(13000)) + .AddMember("datetime").Datetime(TInstant::Seconds(1300000)) + .AddMember("timestamp").Timestamp(TInstant::MicroSeconds(130000000)) + .AddMember("interval").Interval(-3000) + .AddMember("tzDate").TzDate("2001-01-02,Europe/Moscow") + .AddMember("tzDatetime").TzDatetime("2001-01-01T12:12:13,Europe/Moscow") + .AddMember("tzTimestamp").TzTimestamp("2001-01-01T12:12:12.222222,Europe/Moscow") + .EndStruct() + .EndList().Build()); + } + Y_UNIT_TEST(OtherPrimitiveTypeTestParams) { CommonTestParams("name", "строка", {{"$name", TValueBuilder().Utf8("строка").Build()}}); CommonTestParams("name", "строка", {{"$name", TValueBuilder().String("строка").Build()}}); @@ -134,6 +213,43 @@ Y_UNIT_TEST_SUITE(YdbCliCsvParserTests) { CommonTestValue("name", "данные", MakeStruct("name", TValueBuilder().Pg(TPgValue(TPgValue::VK_TEXT, "данные", TPgType("some_type"))).Build())); } + Y_UNIT_TEST(OtherPrimitiveTypesTestBuildList) { + CommonTestBuildList( + "utf8,string,bool,float,double,dyNumber,decimal,uuid,json,optionalUtf8,pg", + { + "строка1,строка2,true,1.183,1.183,\"1.183\",1.183,550e8400-e29b-41d4-a716-446655440000,\"{\"\"a\"\":10, \"\"b\"\":\"\"string1\"\"}\",\"строка1\",\"данные1\"", + "строка3,строка4,false,1.184,1.184,\"1.184\",1.184,550e8400-e29b-41d4-a716-446655440001,\"{\"\"a\"\":11, \"\"b\"\":\"\"string2\"\"}\",\"строка2\",\"данные2\"" + }, + TValueBuilder().BeginList() + .AddListItem().BeginStruct() + .AddMember("utf8").Utf8("строка1") + .AddMember("string").String("строка2") + .AddMember("bool").Bool(true) + .AddMember("float").Float(1.183) + .AddMember("double").Double(1.183) + .AddMember("dyNumber").DyNumber("1.183") + .AddMember("decimal").Decimal(TDecimalValue("1.183", 22, 9)) + .AddMember("uuid").Uuid(TUuidValue("550e8400-e29b-41d4-a716-446655440000")) + .AddMember("json").Json("{\"a\":10, \"b\":\"string1\"}") + .AddMember("optionalUtf8").OptionalUtf8("строка1") + .AddMember("pg").Pg(TPgValue(TPgValue::VK_TEXT, "данные1", TPgType("some_type"))) + .EndStruct() + .AddListItem().BeginStruct() + .AddMember("utf8").Utf8("строка3") + .AddMember("string").String("строка4") + .AddMember("bool").Bool(false) + .AddMember("float").Float(1.184) + .AddMember("double").Double(1.184) + .AddMember("dyNumber").DyNumber("1.184") + .AddMember("decimal").Decimal(TDecimalValue("1.184", 22, 9)) + .AddMember("uuid").Uuid(TUuidValue("550e8400-e29b-41d4-a716-446655440001")) + .AddMember("json").Json("{\"a\":11, \"b\":\"string2\"}") + .AddMember("optionalUtf8").OptionalUtf8("строка2") + .AddMember("pg").Pg(TPgValue(TPgValue::VK_TEXT, "данные2", TPgType("some_type"))) + .EndStruct() + .EndList().Build()); + } + Y_UNIT_TEST(EdgeValuesTestParams) { CommonTestParams("name", "255", {{"$name", TValueBuilder().Uint8(255).Build()}}); CommonTestParams("name", "65535", {{"$name", TValueBuilder().Uint16(65535).Build()}}); diff --git a/ydb/public/lib/ydb_cli/common/parameters.cpp b/ydb/public/lib/ydb_cli/common/parameters.cpp index 7eec4314c0..ae843a3703 100644 --- a/ydb/public/lib/ydb_cli/common/parameters.cpp +++ b/ydb/public/lib/ydb_cli/common/parameters.cpp @@ -405,7 +405,7 @@ bool TCommandWithParameters::GetNextParams(const TDriver& driver, const TString& } case EDataFormat::Csv: case EDataFormat::Tsv: { - CsvParser.GetParams(std::move(*data), *paramBuilder, TCsvParser::TParseMetadata{}); + CsvParser.BuildParams(*data, *paramBuilder, TCsvParser::TParseMetadata{}); break; } default: @@ -449,7 +449,7 @@ bool TCommandWithParameters::GetNextParams(const TDriver& driver, const TString& case EDataFormat::Csv: case EDataFormat::Tsv: { TValueBuilder valueBuilder; - CsvParser.GetValue(std::move(*data), valueBuilder, type, TCsvParser::TParseMetadata{}); + CsvParser.BuildValue(*data, valueBuilder, type, TCsvParser::TParseMetadata{}); paramBuilder->AddParam(fullname, valueBuilder.Build()); break; } @@ -533,7 +533,7 @@ bool TCommandWithParameters::GetNextParams(const TDriver& driver, const TString& case EDataFormat::Csv: case EDataFormat::Tsv: { valueBuilder.AddListItem(); - CsvParser.GetValue(std::move(*data), valueBuilder, type.GetProto().list_type().item(), TCsvParser::TParseMetadata{}); + CsvParser.BuildValue(*data, valueBuilder, type.GetProto().list_type().item(), TCsvParser::TParseMetadata{}); break; } default: diff --git a/ydb/public/lib/ydb_cli/import/import.cpp b/ydb/public/lib/ydb_cli/import/import.cpp index 8e8f55c12a..ddfd3ccb61 100644 --- a/ydb/public/lib/ydb_cli/import/import.cpp +++ b/ydb/public/lib/ydb_cli/import/import.cpp @@ -1,5 +1,6 @@ #include "import.h" +#include <util/stream/format.h> #include <ydb/public/sdk/cpp/client/ydb_driver/driver.h> #include <ydb/public/sdk/cpp/client/ydb_operation/operation.h> #include <ydb/public/sdk/cpp/client/ydb_proto/accessor.h> @@ -79,6 +80,10 @@ TStatus WaitForQueue(const size_t maxQueueSize, std::vector<TAsyncStatus>& inFli return MakeStatus(); } +TString PrettifyBytes(double bytes) { + return ToString(HumanReadableSize(bytes, SF_BYTES)); +} + void InitCsvParser(TCsvParser& parser, bool& removeLastDelimiter, NCsvFormat::TLinesSplitter& csvSource, @@ -104,16 +109,15 @@ void InitCsvParser(TCsvParser& parser, headerRow.erase(headerRow.size() - settings.Delimiter_.size()); } parser = TCsvParser(std::move(headerRow), settings.Delimiter_[0], settings.NullValue_, columnTypes); - return; - } - - TVector<TString> columns; - Y_ENSURE_BT(dbTableInfo); - for (const auto& column : dbTableInfo->GetColumns()) { - columns.push_back(column.Name); + } else { + TVector<TString> columns; + Y_ENSURE_BT(dbTableInfo); + for (const auto& column : dbTableInfo->GetColumns()) { + columns.push_back(column.Name); + } + parser = TCsvParser(std::move(columns), settings.Delimiter_[0], settings.NullValue_, columnTypes); } - parser = TCsvParser(std::move(columns), settings.Delimiter_[0], settings.NullValue_, columnTypes); - return; + parser.BuildLineType(); } FHANDLE GetStdinFileno() { @@ -126,22 +130,22 @@ FHANDLE GetStdinFileno() { class TMaxInflightGetter { public: - TMaxInflightGetter(ui64 totalMaxInFlight, std::atomic<ui64>& filesCount) + TMaxInflightGetter(ui64 totalMaxInFlight, std::atomic<ui64>& currentFileCount) : TotalMaxInFlight(totalMaxInFlight) - , FilesCount(filesCount) { + , CurrentFileCount(currentFileCount) { } ~TMaxInflightGetter() { - --FilesCount; + --CurrentFileCount; } ui64 GetCurrentMaxInflight() const { - return (TotalMaxInFlight - 1) / FilesCount + 1; // round up + return (TotalMaxInFlight - 1) / CurrentFileCount + 1; // round up } private: ui64 TotalMaxInFlight; - std::atomic<ui64>& FilesCount; + std::atomic<ui64>& CurrentFileCount; }; class TCsvFileReader { @@ -269,13 +273,13 @@ TImportFileClient::TImportFileClient(const TDriver& driver, const TClientCommand } TStatus TImportFileClient::Import(const TVector<TString>& filePaths, const TString& dbPath, const TImportFileSettings& settings) { - FilesCount = filePaths.size(); + CurrentFileCount = filePaths.size(); if (settings.Format_ == EDataFormat::Tsv && settings.Delimiter_ != "\t") { return MakeStatus(EStatus::BAD_REQUEST, TStringBuilder() << "Illegal delimiter for TSV format, only tab is allowed"); } - auto resultStatus = TableClient->RetryOperationSync( + auto describeStatus = TableClient->RetryOperationSync( [this, dbPath](NTable::TSession session) { auto result = session.DescribeTable(dbPath).ExtractValueSync(); if (result.IsSuccess()) { @@ -284,16 +288,16 @@ TStatus TImportFileClient::Import(const TVector<TString>& filePaths, const TStri return result; }, NTable::TRetryOperationSettings{RetrySettings}.MaxRetries(10)); - if (!resultStatus.IsSuccess()) { + if (!describeStatus.IsSuccess()) { /// TODO: Remove this after server fix: https://github.com/ydb-platform/ydb/issues/7791 - if (resultStatus.GetStatus() == EStatus::SCHEME_ERROR) { + if (describeStatus.GetStatus() == EStatus::SCHEME_ERROR) { auto describePathResult = NDump::DescribePath(*SchemeClient, dbPath); if (describePathResult.GetStatus() != EStatus::SUCCESS) { return MakeStatus(EStatus::SCHEME_ERROR, TStringBuilder() << describePathResult.GetIssues().ToString() << dbPath); } } - return resultStatus; + return describeStatus; } UpsertSettings @@ -399,7 +403,11 @@ TStatus TImportFileClient::Import(const TVector<TString>& filePaths, const TStri auto finish = TInstant::Now(); auto duration = finish - start; progressBar.SetProcess(100); - Cerr << "Elapsed: " << duration.SecondsFloat() << " sec\n"; + if (duration.SecondsFloat() > 0) { + std::cerr << "Elapsed: " << std::setprecision(3) << duration.SecondsFloat() << " sec. Total read size: " + << PrettifyBytes(TotalBytesRead) << ". Average processing speed: " + << PrettifyBytes((double)TotalBytesRead / duration.SecondsFloat()) << "/s." << std::endl; + } return MakeStatus(EStatus::SUCCESS); } @@ -418,6 +426,20 @@ TAsyncStatus TImportFileClient::UpsertTValueBuffer(const TString& dbPath, TValue return TableClient->RetryOperation(upsert, RetrySettings); } +inline +TAsyncStatus TImportFileClient::UpsertTValueBuffer(const TString& dbPath, TValue&& rows) { + auto upsert = [this, dbPath, rows = std::move(rows)] + (NYdb::NTable::TTableClient& tableClient) mutable -> TAsyncStatus { + NYdb::TValue rowsCopy(rows.GetType(), rows.GetProto()); + return tableClient.BulkUpsert(dbPath, std::move(rowsCopy), UpsertSettings) + .Apply([](const NYdb::NTable::TAsyncBulkUpsertResult& bulkUpsertResult) { + NYdb::TStatus status = bulkUpsertResult.GetValueSync(); + return NThreading::MakeFuture(status); + }); + }; + return TableClient->RetryOperation(upsert, RetrySettings); +} + TStatus TImportFileClient::UpsertCsv(IInputStream& input, const TString& dbPath, const TImportFileSettings& settings, @@ -425,13 +447,14 @@ TStatus TImportFileClient::UpsertCsv(IInputStream& input, std::optional<ui64> inputSizeHint, ProgressCallbackFunc & progressCallback) { - TMaxInflightGetter inFlightGetter(settings.MaxInFlightRequests_, FilesCount); + TMaxInflightGetter inFlightGetter(settings.MaxInFlightRequests_, CurrentFileCount); TCountingInput countInput(&input); NCsvFormat::TLinesSplitter splitter(countInput); auto columnTypes = GetColumnTypes(); ValidateTValueUpsertTable(); + TInstant fileStartTime = TInstant::Now(); TCsvParser parser; bool removeLastDelimiter = false; @@ -441,13 +464,11 @@ TStatus TImportFileClient::UpsertCsv(IInputStream& input, splitter.ConsumeLine(); } - TType lineType = parser.GetColumnsType(); - THolder<IThreadPool> pool = CreateThreadPool(settings.Threads_); ui64 row = settings.SkipRows_ + settings.Header_ + 1; ui64 batchRows = 0; - ui64 nextBorder = VerboseModeReadSize; + ui64 nextBorder = VerboseModeStepSize; ui64 batchBytes = 0; ui64 readBytes = 0; @@ -455,16 +476,8 @@ TStatus TImportFileClient::UpsertCsv(IInputStream& input, std::vector<TAsyncStatus> inFlightRequests; std::vector<TString> buffer; - auto upsertCsv = [&](ui64 row, std::vector<TString>&& buffer) { - TValueBuilder builder; - builder.BeginList(); - for (auto&& line : buffer) { - builder.AddListItem(); - parser.GetValue(std::move(line), builder, lineType, TCsvParser::TParseMetadata{row, filePath}); - ++row; - } - builder.EndList(); - return UpsertTValueBuffer(dbPath, builder).ExtractValueSync(); + auto upsertCsvFunc = [&](std::vector<TString>&& buffer, ui64 row) { + return UpsertTValueBuffer(dbPath, parser.BuildList(buffer, filePath, row)).ExtractValueSync(); }; while (TString line = splitter.ConsumeLine()) { @@ -485,9 +498,9 @@ TStatus TImportFileClient::UpsertCsv(IInputStream& input, buffer.push_back(line); - if (readBytes >= nextBorder && RetrySettings.Verbose_) { - nextBorder += VerboseModeReadSize; - Cerr << "Processed " << 1.0 * readBytes / (1 << 20) << "Mb and " << row + batchRows << " records" << Endl; + if (readBytes >= nextBorder && settings.Verbose_) { + nextBorder += VerboseModeStepSize; + Cerr << "Processed " << PrettifyBytes(readBytes) << " and " << row + batchRows << " records" << Endl; } if (batchBytes < settings.BytesPerRequest_) { @@ -498,8 +511,8 @@ TStatus TImportFileClient::UpsertCsv(IInputStream& input, progressCallback(readBytes, *inputSizeHint); } - auto asyncUpsertCSV = [&upsertCsv, row, buffer = std::move(buffer)]() mutable { - return upsertCsv(row, std::move(buffer)); + auto asyncUpsertCSV = [&upsertCsvFunc, row, buffer = std::move(buffer)]() mutable { + return upsertCsvFunc(std::move(buffer), row); }; row += batchRows; batchRows = 0; @@ -515,17 +528,30 @@ TStatus TImportFileClient::UpsertCsv(IInputStream& input, } if (!buffer.empty() && countInput.Counter() > 0) { - upsertCsv(row, std::move(buffer)); + upsertCsvFunc(std::move(buffer), row); } - return WaitForQueue(0, inFlightRequests); + TotalBytesRead += readBytes; + + auto waitResult = WaitForQueue(0, inFlightRequests); + if (settings.Verbose_) { + std::stringstream str; + double fileProcessingTimeSeconds = (TInstant::Now() - fileStartTime).SecondsFloat(); + str << std::endl << "File " << filePath << " of " << PrettifyBytes(readBytes) + << " processed in " << std::setprecision(3) << fileProcessingTimeSeconds << " sec"; + if (fileProcessingTimeSeconds > 0) { + str << ", " << PrettifyBytes((double)readBytes / fileProcessingTimeSeconds) << "/s" << std::endl; + } + std::cerr << str.str(); + } + return waitResult; } TStatus TImportFileClient::UpsertCsvByBlocks(const TString& filePath, const TString& dbPath, const TImportFileSettings& settings) { - TMaxInflightGetter inFlightGetter(settings.MaxInFlightRequests_, FilesCount); + TMaxInflightGetter inFlightGetter(settings.MaxInFlightRequests_, CurrentFileCount); TString headerRow; TCsvFileReader splitter(filePath, settings, headerRow, inFlightGetter); @@ -538,28 +564,19 @@ TStatus TImportFileClient::UpsertCsvByBlocks(const TString& filePath, NCsvFormat::TLinesSplitter headerSplitter(headerInput, settings.Delimiter_[0]); InitCsvParser(parser, removeLastDelimiter, headerSplitter, settings, &columnTypes, DbTableInfo.get()); - TType lineType = parser.GetColumnsType(); - TVector<TAsyncStatus> threadResults(splitter.GetSplitCount()); THolder<IThreadPool> pool = CreateThreadPool(splitter.GetSplitCount()); for (size_t threadId = 0; threadId < splitter.GetSplitCount(); ++threadId) { auto loadCsv = [&, threadId] () { - auto upsertCsv = [&](std::vector<TString>&& buffer) { - TValueBuilder builder; - builder.BeginList(); - for (auto&& line : buffer) { - builder.AddListItem(); - parser.GetValue(std::move(line), builder, lineType, TCsvParser::TParseMetadata{std::nullopt, filePath}); - } - builder.EndList(); - return UpsertTValueBuffer(dbPath, builder); + auto upsertCsvFunc = [&](std::vector<TString>&& buffer) { + return UpsertTValueBuffer(dbPath, parser.BuildList(buffer, filePath)); }; std::vector<TAsyncStatus> inFlightRequests; std::vector<TString> buffer; ui32 idx = settings.SkipRows_; ui64 readBytes = 0; ui64 batchBytes = 0; - ui64 nextBorder = VerboseModeReadSize; + ui64 nextBorder = VerboseModeStepSize; TAsyncStatus status; TString line; while (splitter.GetChunk(threadId).ConsumeLine(line)) { @@ -577,10 +594,10 @@ TStatus TImportFileClient::UpsertCsvByBlocks(const TString& filePath, } buffer.push_back(line); ++idx; - if (readBytes >= nextBorder && RetrySettings.Verbose_) { - nextBorder += VerboseModeReadSize; + if (readBytes >= nextBorder && settings.Verbose_) { + nextBorder += VerboseModeStepSize; TStringBuilder builder; - builder << "Processed " << 1.0 * readBytes / (1 << 20) << "Mb and " << idx << " records" << Endl; + builder << "Processed " << PrettifyBytes(readBytes) << " and " << idx << " records" << Endl; Cerr << builder; } if (batchBytes >= settings.BytesPerRequest_) { @@ -590,15 +607,17 @@ TStatus TImportFileClient::UpsertCsvByBlocks(const TString& filePath, return status; } - inFlightRequests.push_back(upsertCsv(std::move(buffer))); + inFlightRequests.push_back(upsertCsvFunc(std::move(buffer))); buffer.clear(); } } if (!buffer.empty() && splitter.GetChunk(threadId).GetReadCount() != 0) { - inFlightRequests.push_back(upsertCsv(std::move(buffer))); + inFlightRequests.push_back(upsertCsvFunc(std::move(buffer))); } + TotalBytesRead += readBytes; + return WaitForQueue(0, inFlightRequests); }; threadResults[threadId] = NThreading::Async(loadCsv, *pool); @@ -617,7 +636,7 @@ TStatus TImportFileClient::UpsertJson(IInputStream& input, const TString& dbPath const TType tableType = GetTableType(); ValidateTValueUpsertTable(); - TMaxInflightGetter inFlightGetter(settings.MaxInFlightRequests_, FilesCount); + TMaxInflightGetter inFlightGetter(settings.MaxInFlightRequests_, CurrentFileCount); THolder<IThreadPool> pool = CreateThreadPool(settings.Threads_); ui64 readBytes = 0; diff --git a/ydb/public/lib/ydb_cli/import/import.h b/ydb/public/lib/ydb_cli/import/import.h index b05a93fb1e..949e95ef38 100644 --- a/ydb/public/lib/ydb_cli/import/import.h +++ b/ydb/public/lib/ydb_cli/import/import.h @@ -54,6 +54,7 @@ struct TImportFileSettings : public TOperationRequestSettings<TImportFileSetting FLUENT_SETTING_DEFAULT(TString, HeaderRow, ""); FLUENT_SETTING_DEFAULT(TString, Delimiter, DefaultDelimiter); FLUENT_SETTING_DEFAULT(std::optional<TString>, NullValue, std::nullopt); + FLUENT_SETTING_DEFAULT(bool, Verbose, false); }; class TImportFileClient { @@ -76,9 +77,10 @@ private: std::unique_ptr<const NTable::TTableDescription> DbTableInfo; - std::atomic<ui64> FilesCount; + std::atomic<ui64> CurrentFileCount; + std::atomic<ui64> TotalBytesRead = 0; - static constexpr ui32 VerboseModeReadSize = 1 << 27; // 100 MB + static constexpr ui32 VerboseModeStepSize = 1 << 27; // 128 MB using ProgressCallbackFunc = std::function<void (ui64, ui64)>; @@ -94,6 +96,7 @@ private: const TImportFileSettings& settings); TAsyncStatus UpsertTValueBuffer(const TString& dbPath, TValueBuilder& builder); + TAsyncStatus UpsertTValueBuffer(const TString& dbPath, TValue&& rows); TStatus UpsertJson(IInputStream &input, const TString &dbPath, const TImportFileSettings &settings, std::optional<ui64> inputSizeHint, ProgressCallbackFunc & progressCallback); |