diff options
author | Nikolay Perfilov <pnv1@yandex-team.ru> | 2025-01-17 20:11:58 +0300 |
---|---|---|
committer | GitHub <noreply@github.com> | 2025-01-17 20:11:58 +0300 |
commit | 83627e00155e16eab2d6442da0035dd054510b19 (patch) | |
tree | a212d2fb62b02ab9fd3866b36a19465e81e1673a | |
parent | e0f619d4043dd00d032f2bbe9d5dbc1ad9389d60 (diff) | |
download | ydb-83627e00155e16eab2d6442da0035dd054510b19.tar.gz |
Suggest create table text on scheme error during `import file csv` (#13247)
Co-authored-by: Bulat <bylatgr@gmail.com>
-rw-r--r-- | ydb/apps/ydb/CHANGELOG.md | 1 | ||||
-rw-r--r-- | ydb/public/lib/ydb_cli/common/csv_parser.cpp | 271 | ||||
-rw-r--r-- | ydb/public/lib/ydb_cli/common/csv_parser.h | 35 | ||||
-rw-r--r-- | ydb/public/lib/ydb_cli/common/csv_parser_ut.cpp | 88 | ||||
-rw-r--r-- | ydb/public/lib/ydb_cli/import/import.cpp | 291 |
5 files changed, 664 insertions, 22 deletions
diff --git a/ydb/apps/ydb/CHANGELOG.md b/ydb/apps/ydb/CHANGELOG.md index 92210a1a06..5f41af6f41 100644 --- a/ydb/apps/ydb/CHANGELOG.md +++ b/ydb/apps/ydb/CHANGELOG.md @@ -1,3 +1,4 @@ +* Added CREATE TABLE text suggestion on scheme error during `ydb import file csv` * Backup and restore of changefeeds has been added to `ydb tools dump` and `ydb tools restore`. As a result, there are changes in the backup file structure: for tables with changefeeds, a subdirectory is created for each changefeed, named after the changefeed. This subdirectory contains two files: `changefeed_description.pb`, which contains the changefeed description, and `topic_description.pb`, which contains information about the underlying topic. * Added `--skip-checksum-validation` option to `ydb import s3` command to skip server-side checksum validation. diff --git a/ydb/public/lib/ydb_cli/common/csv_parser.cpp b/ydb/public/lib/ydb_cli/common/csv_parser.cpp index 6b3925c4c0..819bab0fd0 100644 --- a/ydb/public/lib/ydb_cli/common/csv_parser.cpp +++ b/ydb/public/lib/ydb_cli/common/csv_parser.cpp @@ -5,6 +5,8 @@ #include <library/cpp/string_utils/csv/csv.h> +#include <regex> + namespace NYdb { namespace NConsoleClient { namespace { @@ -289,6 +291,120 @@ public: } } + template <class T> + bool TryParseArithmetic(const TString& token) const { + size_t cnt; + try { + auto value = StringToArithmetic<T>(token, cnt); + if (cnt != token.size() || value < std::numeric_limits<T>::lowest() || value > std::numeric_limits<T>::max()) { + return false; + } + } catch (std::exception& e) { + return false; + } + return true; + } + + bool TryParseBool(const TString& token) const { + TString tokenLowerCase = to_lower(token); + return tokenLowerCase == "true" || tokenLowerCase == "false"; + } + + bool TryParsePrimitive(const TString& token) { + switch (Parser.GetPrimitive()) { + case EPrimitiveType::Uint8: + return TryParseArithmetic<ui8>(token) && !token.StartsWith('-'); + case EPrimitiveType::Uint16: + return TryParseArithmetic<ui16>(token) && !token.StartsWith('-');; + case EPrimitiveType::Uint32: + return TryParseArithmetic<ui32>(token) && !token.StartsWith('-');; + case EPrimitiveType::Uint64: + return TryParseArithmetic<ui64>(token) && !token.StartsWith('-');; + case EPrimitiveType::Int8: + return TryParseArithmetic<i8>(token); + case EPrimitiveType::Int16: + return TryParseArithmetic<i16>(token); + case EPrimitiveType::Int32: + return TryParseArithmetic<i32>(token); + case EPrimitiveType::Int64: + return TryParseArithmetic<i64>(token); + case EPrimitiveType::Bool: + return TryParseBool(token); + case EPrimitiveType::Json: + return token.StartsWith('{') && token.EndsWith('}'); + break; + case EPrimitiveType::JsonDocument: + break; + case EPrimitiveType::Yson: + break; + case EPrimitiveType::Uuid: + static std::regex uuidRegexTemplate("[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}"); + return std::regex_match(token.c_str(), uuidRegexTemplate); + case EPrimitiveType::Float: + return TryParseArithmetic<float>(token); + case EPrimitiveType::Double: + return TryParseArithmetic<double>(token); + case EPrimitiveType::DyNumber: + break; + case EPrimitiveType::Date: { + TInstant date; + return TInstant::TryParseIso8601(token, date) && token.length() <= 10; + } + case EPrimitiveType::Datetime: { + TInstant datetime; + return TInstant::TryParseIso8601(token, datetime) && token.length() <= 19; + } + case EPrimitiveType::Timestamp: { + TInstant timestamp; + return TInstant::TryParseIso8601(token, timestamp) || TryParseArithmetic<ui64>(token); + } + case EPrimitiveType::Interval: + break; + case EPrimitiveType::Date32: { + TInstant date; + return TInstant::TryParseIso8601(token, date) || TryParseArithmetic<i32>(token); + } + case EPrimitiveType::Datetime64: { + TInstant date; + return TInstant::TryParseIso8601(token, date) || TryParseArithmetic<i64>(token); + } + case EPrimitiveType::Timestamp64: { + TInstant date; + return TInstant::TryParseIso8601(token, date) || TryParseArithmetic<i64>(token); + } + case EPrimitiveType::Interval64: + return TryParseArithmetic<i64>(token); + case EPrimitiveType::TzDate: + break; + case EPrimitiveType::TzDatetime: + break; + case EPrimitiveType::TzTimestamp: + break; + default: + throw TCsvParseException() << "Unsupported primitive type: " << Parser.GetPrimitive(); + } + return false; + } + + bool TryParseValue(const TStringBuf& token, TPossibleType& possibleType) { + if (NullValue && token == NullValue) { + possibleType.SetHasNulls(true); + return true; + } + possibleType.SetHasNonNulls(true); + switch (Parser.GetKind()) { + case TTypeParser::ETypeKind::Primitive: { + return TryParsePrimitive(TString(token)); + } + case TTypeParser::ETypeKind::Decimal: { + break; + } + default: + throw TCsvParseException() << "Unsupported type kind: " << Parser.GetKind(); + } + return false; + } + TValue Convert(const TStringBuf& token) { BuildValue(token); return Builder.Build(); @@ -330,6 +446,16 @@ TValue FieldToValue(TTypeParser& parser, } } +bool TryParse(TTypeParser& parser, const TStringBuf& token, const std::optional<TString>& nullValue, TPossibleType& possibleType) { + try { + TCsvToYdbConverter converter(parser, nullValue); + return converter.TryParseValue(token, possibleType); + } catch (std::exception& e) { + Cerr << "UNEXPECTED EXCEPTION: " << e.what() << Endl; + return false; + } +} + TStringBuf Consume(NCsvFormat::CsvSplitter& splitter, const TCsvParser::TParseMetadata& meta, const TString& columnName) { @@ -489,6 +615,151 @@ void TCsvParser::BuildLineType() { ResultLineType = builder.Build(); ResultListType = TTypeBuilder().List(ResultLineType.value()).Build(); } +namespace { +static const std::vector<TType> availableTypes = { + TTypeBuilder().Primitive(EPrimitiveType::Bool).Build(), + TTypeBuilder().Primitive(EPrimitiveType::Uint64).Build(), + TTypeBuilder().Primitive(EPrimitiveType::Int64).Build(), + TTypeBuilder().Primitive(EPrimitiveType::Double).Build(), + TTypeBuilder().Primitive(EPrimitiveType::Date).Build(), + TTypeBuilder().Primitive(EPrimitiveType::Datetime).Build(), + TTypeBuilder().Primitive(EPrimitiveType::Timestamp).Build(), + TTypeBuilder().Primitive(EPrimitiveType::Json).Build(), + TTypeBuilder().Primitive(EPrimitiveType::Uuid).Build(), +}; + +static const auto availableTypesEnd = availableTypes.end(); + +} // namespace + +TPossibleType::TPossibleType() { + CurrentType = availableTypes.begin(); +} + +TPossibleType::TPossibleType(std::vector<TType>::const_iterator currentType) +: CurrentType(currentType) +{ +} + +void TPossibleType::SetIterator(const std::vector<TType>::const_iterator& newIterator) { + CurrentType = newIterator; +} + +std::vector<TType>::const_iterator& TPossibleType::GetIterator() { + return CurrentType; +} + +const std::vector<TType>::const_iterator& TPossibleType::GetAvailableTypesEnd() { + return availableTypesEnd; +} + +void TPossibleType::SetHasNulls(bool hasNulls) { + HasNulls = hasNulls; +} + +bool TPossibleType::GetHasNulls() const { + return HasNulls; +} + +void TPossibleType::SetHasNonNulls(bool hasNonNulls) { + HasNonNulls = hasNonNulls; +} + +bool TPossibleType::GetHasNonNulls() const { + return HasNonNulls; +} + +TPossibleTypes::TPossibleTypes(size_t size) { + ColumnPossibleTypes.resize(size); +} + +TPossibleTypes::TPossibleTypes(std::vector<TPossibleType>& currentColumnTypes) +: ColumnPossibleTypes(currentColumnTypes) +{ +} + +// Pass this copy to a worker to parse his chunk of data with it to merge it later back into this main chunk +TPossibleTypes TPossibleTypes::GetCopy() { + std::shared_lock<std::shared_mutex> ReadLock(Lock); + return TPossibleTypes(ColumnPossibleTypes); +} + +// Merge this main chunk with another chunk that parsed a CSV batch and maybe dismissed some types +void TPossibleTypes::MergeWith(TPossibleTypes& newTypes) { + auto newTypesVec = newTypes.GetColumnPossibleTypes(); + { + std::shared_lock<std::shared_mutex> ReadLock(Lock); + bool changed = false; + for (size_t i = 0; i < ColumnPossibleTypes.size(); ++i) { + auto& currentPossibleType = ColumnPossibleTypes[i]; + auto& newPossibleType = newTypesVec[i]; + auto& currentIt = currentPossibleType.GetIterator(); + const auto& newIt = newPossibleType.GetIterator(); + if (newIt > currentIt) { + changed = true; + break; + } + if (currentPossibleType.GetHasNulls() != newPossibleType.GetHasNulls() + || currentPossibleType.GetHasNonNulls() != newPossibleType.GetHasNonNulls()) { + changed = true; + break; + } + } + if (!changed) { + return; + } + } + std::unique_lock<std::shared_mutex> WriteLock(Lock); + for (size_t i = 0; i < ColumnPossibleTypes.size(); ++i) { + auto& currentPossibleType = ColumnPossibleTypes[i]; + auto& newPossibleType = newTypesVec[i]; + const auto& newIt = newPossibleType.GetIterator(); + if (newIt > currentPossibleType.GetIterator()) { + currentPossibleType.SetIterator(newIt); + } + if (newPossibleType.GetHasNulls()) { + currentPossibleType.SetHasNulls(true); + } + if (newPossibleType.GetHasNonNulls()) { + currentPossibleType.SetHasNonNulls(true); + } + } +} + +std::vector<TPossibleType>& TPossibleTypes::GetColumnPossibleTypes() { + return ColumnPossibleTypes; +} + +void TCsvParser::ParseLineTypes(TString& line, TPossibleTypes& possibleTypes, const TParseMetadata& meta) { + NCsvFormat::CsvSplitter splitter(line, Delimeter); + auto headerIt = Header.cbegin(); + auto typesIt = possibleTypes.GetColumnPossibleTypes().begin(); + do { + if (headerIt == Header.cend()) { + throw FormatError(yexception() << "Header contains less fields than data. Header: \"" << HeaderRow << "\", data: \"" << line << "\"", meta); + } + TStringBuf token = Consume(splitter, meta, *headerIt); + TPossibleType& possibleType = *typesIt; + auto& typeIt = possibleType.GetIterator(); + while (typeIt != availableTypesEnd) { + TTypeParser typeParser(*typeIt); + if (TryParse(typeParser, token, NullValue, possibleType)) { + break; + } + ++typeIt; + } + ++headerIt; + ++typesIt; + } while (splitter.Step()); + + if (headerIt != Header.cend()) { + throw FormatError(yexception() << "Header contains more fields than data. Header: \"" << HeaderRow << "\", data: \"" << line << "\"", meta); + } +} + +const TVector<TString>& TCsvParser::GetHeader() { + return Header; +} } } diff --git a/ydb/public/lib/ydb_cli/common/csv_parser.h b/ydb/public/lib/ydb_cli/common/csv_parser.h index 56ee5949ea..301d4cb601 100644 --- a/ydb/public/lib/ydb_cli/common/csv_parser.h +++ b/ydb/public/lib/ydb_cli/common/csv_parser.h @@ -4,11 +4,44 @@ #include <library/cpp/string_utils/csv/csv.h> +#include <shared_mutex> + namespace NYdb { namespace NConsoleClient { class TCsvParseException : public yexception {}; +class TPossibleType { +public: + TPossibleType(); + TPossibleType(std::vector<TType>::const_iterator currentType); + void SetIterator(const std::vector<TType>::const_iterator& newIterator); + std::vector<TType>::const_iterator& GetIterator(); + static const std::vector<TType>::const_iterator& GetAvailableTypesEnd(); + void SetHasNulls(bool hasNulls); + bool GetHasNulls() const; + void SetHasNonNulls(bool hasNonNulls); + bool GetHasNonNulls() const; +private: + std::vector<TType>::const_iterator CurrentType; + bool HasNulls = false; + bool HasNonNulls = false; +}; + +class TPossibleTypes { +public: + TPossibleTypes(size_t size); + TPossibleTypes(std::vector<TPossibleType>& currentColumnTypes); + // Pass this copy to a worker to parse his chunk of data with it to merge it later back into this main chunk + TPossibleTypes GetCopy(); + // Merge this main chunk with another chunk that parsed a CSV batch and maybe dismissed some types + void MergeWith(TPossibleTypes& newTypes); + std::vector<TPossibleType>& GetColumnPossibleTypes(); +private: + std::vector<TPossibleType> ColumnPossibleTypes; + std::shared_mutex Lock; +}; + class TCsvParser { public: struct TParseMetadata { @@ -36,6 +69,8 @@ public: TValue BuildList(std::vector<TString>& lines, const TString& filename, std::optional<ui64> row = std::nullopt) const; void BuildLineType(); + const TVector<TString>& GetHeader(); + void ParseLineTypes(TString& line, TPossibleTypes& possibleTypes, const TParseMetadata& meta); private: TVector<TString> Header; diff --git a/ydb/public/lib/ydb_cli/common/csv_parser_ut.cpp b/ydb/public/lib/ydb_cli/common/csv_parser_ut.cpp index 1353b05bf2..847902d1c3 100644 --- a/ydb/public/lib/ydb_cli/common/csv_parser_ut.cpp +++ b/ydb/public/lib/ydb_cli/common/csv_parser_ut.cpp @@ -336,4 +336,92 @@ Y_UNIT_TEST_SUITE(YdbCliCsvParserTests) { .EndList().Build(); AssertValuesEqual(builtResult, expexctedResult); } + + void CheckPossibleTypes(TVector<TVector<TString>>&& batches, TVector<TMaybe<TType>>&& expectedTypes) { + TVector<TString> columnNames; + for (size_t i = 1; i <= expectedTypes.size(); ++i) { + columnNames.push_back(TStringBuilder() << "column" << i); + } + size_t columnCount = columnNames.size(); + TCsvParser parser = TCsvParser(std::move(columnNames), ',', ""); + TPossibleTypes possibleTypesGlobal(columnCount); + uint64_t row = 0; + for (auto& batch : batches) { + TPossibleTypes batchTypes = possibleTypesGlobal.GetCopy(); + for (auto& line : batch) { + ++row; + parser.ParseLineTypes(line, batchTypes, TCsvParser::TParseMetadata{row, "testFile.csv"}); + } + possibleTypesGlobal.MergeWith(batchTypes); + } + auto& possibleTypesResult = possibleTypesGlobal.GetColumnPossibleTypes(); + UNIT_ASSERT_EQUAL_C(expectedTypes.size(), possibleTypesResult.size(), + TStringBuilder() << "Expected " << expectedTypes.size() << " columns, got " << possibleTypesResult.size()); + for (size_t i = 0; i < expectedTypes.size(); ++i) { + TPossibleType& resultPossibleType = possibleTypesResult[i]; + std::vector<TType>::const_iterator& it = resultPossibleType.GetIterator(); + TType resultType = (it == TPossibleType::GetAvailableTypesEnd() + ? TTypeBuilder().Primitive(EPrimitiveType::Utf8).Build() + : *it); + if (!resultPossibleType.GetHasNonNulls()) { + UNIT_ASSERT(!expectedTypes[i].Defined()); + } else { + UNIT_ASSERT(expectedTypes[i].Defined()); + if (resultPossibleType.GetHasNulls()) { + resultType = TTypeBuilder() + .BeginOptional() + .Primitive(TTypeParser(resultType).GetPrimitive()) + .EndOptional() + .Build(); + } + UNIT_ASSERT_C(TypesEqual(expectedTypes[i].GetRef(), resultType), + TStringBuilder() << "Expected type " << expectedTypes[i].GetRef() << ", got " << resultType); + } + } + } + + Y_UNIT_TEST(InferTypesNonNull) { + TVector<TVector<TString>> batches = { + { + "1 0,True,0,10,100000000000,-10,-100000000000,2001-01-01,2001-01-01T12:12:12,2001-01-01T12:12:12.111111,550e8400-e29b-41d4-a716-446655440000,\"{\"\"name1\"\":\"\"value1\"\"}\"", + "2 0,False,0,20,0,0,0,2001-01-02,2001-01-02T12:12:12,2001-01-02T12:12:12.111111,550e8400-e29b-41d4-a716-446655440001,\"{\"\"name2\"\": \"\"value2\"\"}\"" + } + }; + TVector<TMaybe<TType>> expectedTypes = { + TTypeBuilder().Primitive(EPrimitiveType::Utf8).Build(), + TTypeBuilder().Primitive(EPrimitiveType::Bool).Build(), + TTypeBuilder().Primitive(EPrimitiveType::Uint64).Build(), + TTypeBuilder().Primitive(EPrimitiveType::Uint64).Build(), + TTypeBuilder().Primitive(EPrimitiveType::Uint64).Build(), + TTypeBuilder().Primitive(EPrimitiveType::Int64).Build(), + TTypeBuilder().Primitive(EPrimitiveType::Int64).Build(), + TTypeBuilder().Primitive(EPrimitiveType::Date).Build(), + TTypeBuilder().Primitive(EPrimitiveType::Datetime).Build(), + TTypeBuilder().Primitive(EPrimitiveType::Timestamp).Build(), + TTypeBuilder().Primitive(EPrimitiveType::Uuid).Build(), + TTypeBuilder().Primitive(EPrimitiveType::Json).Build() + }; + CheckPossibleTypes(std::move(batches), std::move(expectedTypes)); + } + + Y_UNIT_TEST(InferTypesWithNulls) { + TVector<TVector<TString>> batches = { + { + ",True,0,2001-01-01,", + ",False,,2001-01-02," + }, + { + ",True,0,10,", + ",,0,True,abc" + } + }; + TVector<TMaybe<TType>> expectedTypes = { + Nothing(), // Can't infer type of a column that has only null values + TTypeBuilder().BeginOptional().Primitive(EPrimitiveType::Bool).EndOptional().Build(), + TTypeBuilder().BeginOptional().Primitive(EPrimitiveType::Uint64).EndOptional().Build(), + TTypeBuilder().Primitive(EPrimitiveType::Utf8).Build(), + TTypeBuilder().BeginOptional().Primitive(EPrimitiveType::Utf8).EndOptional().Build(), + }; + CheckPossibleTypes(std::move(batches), std::move(expectedTypes)); + } }
\ No newline at end of file diff --git a/ydb/public/lib/ydb_cli/import/import.cpp b/ydb/public/lib/ydb_cli/import/import.cpp index d77e2e97a6..52d3240e97 100644 --- a/ydb/public/lib/ydb_cli/import/import.cpp +++ b/ydb/public/lib/ydb_cli/import/import.cpp @@ -4,6 +4,7 @@ #include <ydb/public/sdk/cpp/client/ydb_driver/driver.h> #include <ydb/public/sdk/cpp/client/ydb_operation/operation.h> #include <ydb/public/sdk/cpp/client/ydb_proto/accessor.h> +#include <ydb/public/sdk/cpp/client/ydb_query/client.h> #include <ydb/public/sdk/cpp/client/ydb_scheme/scheme.h> #include <ydb/public/sdk/cpp/client/ydb_table/table.h> @@ -14,6 +15,7 @@ #include <ydb/public/lib/ydb_cli/common/recursive_list.h> #include <ydb/public/lib/ydb_cli/common/interactive.h> #include <ydb/public/lib/ydb_cli/common/progress_bar.h> +#include <ydb/public/lib/ydb_cli/commands/ydb_common.h> #include <ydb/public/lib/ydb_cli/dump/util/util.h> #include <ydb/public/lib/ydb_cli/import/cli_arrow_helpers.h> @@ -50,6 +52,8 @@ namespace NYdb { namespace NConsoleClient { namespace { +constexpr ui64 rowsToAnalyze = 100000; + inline TStatus MakeStatus(EStatus code = EStatus::SUCCESS, const TString& error = {}) { NYql::TIssues issues; @@ -89,36 +93,41 @@ void InitCsvParser(TCsvParser& parser, bool& removeLastDelimiter, NCsvFormat::TLinesSplitter& csvSource, const TImportFileSettings& settings, - const std::map<TString, TType>* columnTypes, - const NTable::TTableDescription* dbTableInfo) { - if (settings.Header_ || settings.HeaderRow_) { - TString headerRow; + const TString& headerRow, + const std::map<TString, TType>* columnTypes = nullptr, + const NTable::TTableDescription* dbTableInfo = nullptr) { + if (settings.Header_ || headerRow) { + TString newHeaderRow; if (settings.Header_) { - headerRow = csvSource.ConsumeLine(); + newHeaderRow = csvSource.ConsumeLine(); } - if (settings.HeaderRow_) { - headerRow = settings.HeaderRow_; + if (headerRow) { + newHeaderRow = headerRow; } - if (headerRow.EndsWith("\r\n")) { - headerRow.erase(headerRow.size() - 2); + if (newHeaderRow.EndsWith("\r\n")) { + newHeaderRow.erase(newHeaderRow.size() - 2); } - if (headerRow.EndsWith("\n")) { - headerRow.erase(headerRow.size() - 1); + if (newHeaderRow.EndsWith("\n")) { + newHeaderRow.erase(newHeaderRow.size() - 1); } - if (headerRow.EndsWith(settings.Delimiter_)) { + if (newHeaderRow.EndsWith(settings.Delimiter_)) { removeLastDelimiter = true; - headerRow.erase(headerRow.size() - settings.Delimiter_.size()); + newHeaderRow.erase(newHeaderRow.size() - settings.Delimiter_.size()); } - parser = TCsvParser(std::move(headerRow), settings.Delimiter_[0], settings.NullValue_, columnTypes); + parser = TCsvParser(std::move(newHeaderRow), settings.Delimiter_[0], settings.NullValue_, columnTypes); } else { TVector<TString> columns; - Y_ENSURE_BT(dbTableInfo); + if (!dbTableInfo) { + throw yexception() << "Need to specify column names"; + } for (const auto& column : dbTableInfo->GetColumns()) { columns.push_back(column.Name); } parser = TCsvParser(std::move(columns), settings.Delimiter_[0], settings.NullValue_, columnTypes); } - parser.BuildLineType(); + if (columnTypes) { + parser.BuildLineType(); + } } FHANDLE GetStdinFileno() { @@ -502,6 +511,12 @@ private: std::map<TString, TType> GetColumnTypes(); void ValidateTValueUpsertTable(); std::shared_ptr<TProgressFile> LoadOrStartImportProgress(const TString& filePath); + TStatus GenerateCreateTableFromCsv(IInputStream& input, + const TString& relativeTablePath, + const TString& filePath, + TString& suggestion); + TStatus SuggestCreateTableRequest(const TVector<TString>& filePaths, const TString& relativeTablePath, + TString& suggestion); std::shared_ptr<NTable::TTableClient> TableClient; std::shared_ptr<NScheme::TSchemeClient> SchemeClient; @@ -578,8 +593,16 @@ TStatus TImportFileClient::TImpl::Import(const TVector<TString>& filePaths, cons if (describeStatus.GetStatus() == EStatus::SCHEME_ERROR) { auto describePathResult = NDump::DescribePath(*SchemeClient, dbPath); if (describePathResult.GetStatus() != EStatus::SUCCESS) { - return MakeStatus(EStatus::SCHEME_ERROR, - TStringBuilder() << describePathResult.GetIssues().ToString() << dbPath); + TStringBuilder errorMessage; + errorMessage << describePathResult.GetIssues().ToString() << dbPath << Endl; + TString suggestMessage; + auto suggestStatus = SuggestCreateTableRequest(filePaths, dbPath, suggestMessage); + if (suggestStatus.IsSuccess()) { + errorMessage << suggestMessage << Endl; + } else { + errorMessage << "Error while trying to generate CREATE TABLE request suggestion: " << suggestStatus << Endl; + } + return MakeStatus(EStatus::SCHEME_ERROR, errorMessage); } } return describeStatus; @@ -691,11 +714,12 @@ TStatus TImportFileClient::TImpl::Import(const TVector<TString>& filePaths, cons return UpsertJson(input, dbPath, fileSizeHint, progressCallback); case EDataFormat::Parquet: return UpsertParquet(filePath, dbPath, progressCallback); - default: ; + default: + break; } return MakeStatus(EStatus::BAD_REQUEST, - TStringBuilder() << "Unsupported format #" << (int) Settings.Format_); + TStringBuilder() << "Unsupported file format #" << (int) Settings.Format_); } catch (const std::exception& e) { return MakeStatus(EStatus::INTERNAL_ERROR, TStringBuilder() << "Error: " << e.what()); @@ -779,6 +803,74 @@ std::shared_ptr<TProgressFile> TImportFileClient::TImpl::LoadOrStartImportProgre return progressFile; } +TStatus TImportFileClient::TImpl::SuggestCreateTableRequest(const TVector<TString>& filePaths, + const TString& relativeTablePath, TString& suggestion) { + // All files should have the same scheme so probably no need to analyze more than one file + CurrentFileCount = 1; + size_t filePathsSize = 1; + const auto& filePath = filePaths[0]; + + if (Settings.Format_ == EDataFormat::Tsv && Settings.Delimiter_ != "\t") { + return MakeStatus(EStatus::BAD_REQUEST, + TStringBuilder() << "Illegal delimiter for TSV format, only tab is allowed"); + } + + UpsertSettings + .OperationTimeout(Settings.OperationTimeout_) + .ClientTimeout(Settings.ClientTimeout_); + + auto pool = CreateThreadPool(filePathsSize); + TVector<NThreading::TFuture<TStatus>> asyncResults; + + std::unique_ptr<TFileInput> fileInput; + std::optional<ui64> fileSizeHint; + + if (!filePath.empty()) { + const TFsPath dataFile(filePath); + + if (!dataFile.Exists()) { + return MakeStatus(EStatus::BAD_REQUEST, + TStringBuilder() << "File does not exist: " << filePath); + } + + if (!dataFile.IsFile()) { + return MakeStatus(EStatus::BAD_REQUEST, + TStringBuilder() << "Not a file: " << filePath); + } + + TFile file(filePath, OpenExisting | RdOnly | Seq); + i64 fileLength = file.GetLength(); + if (fileLength && fileLength >= 0) { + fileSizeHint = fileLength; + } + + fileInput = std::make_unique<TFileInput>(file, Settings.FileBufferSize_); + } + + IInputStream& input = fileInput ? *fileInput : Cin; + + try { + switch (Settings.Format_) { + case EDataFormat::Default: + case EDataFormat::Csv: + case EDataFormat::Tsv: + return GenerateCreateTableFromCsv(input, relativeTablePath, filePath, suggestion); + case EDataFormat::Json: + case EDataFormat::JsonUnicode: + case EDataFormat::JsonBase64: + case EDataFormat::Parquet: + default: + break; + } + + return MakeStatus(EStatus::BAD_REQUEST, + TStringBuilder() << "Unsupported file format #" << (int) Settings.Format_); + } catch (const std::exception& e) { + return MakeStatus(EStatus::INTERNAL_ERROR, + TStringBuilder() << "Error: " << e.what()); + } +} + inline TAsyncStatus TImportFileClient::TImpl::UpsertTValueBuffer(const TString& dbPath, TValueBuilder& builder) { auto retryFunc = [this, dbPath, rows = builder.Build()] @@ -853,7 +945,7 @@ TStatus TImportFileClient::TImpl::UpsertCsv(IInputStream& input, TCsvParser parser; bool removeLastDelimiter = false; - InitCsvParser(parser, removeLastDelimiter, splitter, Settings, &columnTypes, DbTableInfo.get()); + InitCsvParser(parser, removeLastDelimiter, splitter, Settings, Settings.HeaderRow_, &columnTypes, DbTableInfo.get()); ui64 rowsToSkip = Max((ui64)Settings.SkipRows_, progressFile->HasLastImportedLine() ? progressFile->GetLastImportedLine() : 0); @@ -1037,7 +1129,7 @@ TStatus TImportFileClient::TImpl::UpsertCsvByBlocks(const TString& filePath, bool removeLastDelimiter = false; TStringInput headerInput(headerRow); NCsvFormat::TLinesSplitter headerSplitter(headerInput, Settings.Delimiter_[0]); - InitCsvParser(parser, removeLastDelimiter, headerSplitter, Settings, &columnTypes, DbTableInfo.get()); + InitCsvParser(parser, removeLastDelimiter, headerSplitter, Settings, Settings.HeaderRow_, &columnTypes, DbTableInfo.get()); TVector<NThreading::TFuture<void>> threadResults; threadResults.reserve(threadCount); @@ -1149,6 +1241,161 @@ TStatus TImportFileClient::TImpl::UpsertCsvByBlocks(const TString& filePath, return MakeStatus(); } +TStatus TImportFileClient::TImpl::GenerateCreateTableFromCsv(IInputStream& input, + const TString& relativeTablePath, + const TString& filePath, + TString& suggestion) { + TCountingInput countInput(&input); + NCsvFormat::TLinesSplitter splitter(countInput); + + size_t maxJobInflight = Settings.Threads_; + std::counting_semaphore<> jobsSemaphore(maxJobInflight); + + TCsvParser parser; + bool removeLastDelimiter = false; + + if (!Settings.Header_ && !Settings.HeaderRow_) { + TString firstRow = splitter.ConsumeLine(); + NCsvFormat::CsvSplitter csvSplitter(firstRow, Settings.Delimiter_[0]); + size_t columnSize = 0; + do { + csvSplitter.Consume(); + ++columnSize; + } while (csvSplitter.Step()); + TStringBuilder columns; + for (size_t i = 0; i < columnSize; ++i) { + if (i > 0) { + columns << Settings.Delimiter_; + } + columns << "column" << i; + } + InitCsvParser(parser, removeLastDelimiter, splitter, Settings, columns); + } else { + InitCsvParser(parser, removeLastDelimiter, splitter, Settings, Settings.HeaderRow_); + } + + const auto& header = parser.GetHeader(); + + TPossibleTypes columnTypes(header.size()); + + for (ui32 i = 0; i < Settings.SkipRows_; ++i) { + splitter.ConsumeLine(); + } + + ui64 row = Settings.SkipRows_ + Settings.Header_; + ui64 batchBytes = 0; + + TString line; + std::vector<TAsyncStatus> inFlightRequests; + std::vector<TString> buffer; + + auto checkCsvFunc = [&](std::vector<TString>&& buffer, ui64 row) { + TPossibleTypes typesCopy = columnTypes.GetCopy(); + try { + for (auto& line : buffer) { + parser.ParseLineTypes(line, typesCopy, TCsvParser::TParseMetadata{row, filePath}); + } + } catch (const std::exception& e) { + if (!Failed.exchange(true)) { + ErrorStatus = MakeHolder<TStatus>(MakeStatus(EStatus::INTERNAL_ERROR, e.what())); + } + jobsSemaphore.release(); + throw; + } + columnTypes.MergeWith(typesCopy); + jobsSemaphore.release(); + }; + + while (TString line = splitter.ConsumeLine()) { + ++row; + if (row > rowsToAnalyze) { + break; + } + if (line.empty()) { + continue; + } + batchBytes += line.size(); + + if (removeLastDelimiter) { + if (!line.EndsWith(Settings.Delimiter_)) { + return MakeStatus(EStatus::BAD_REQUEST, + "According to the header, lines should end with a delimiter"); + } + line.erase(line.size() - Settings.Delimiter_.size()); + } + + buffer.push_back(line); + + if (batchBytes < Settings.BytesPerRequest_) { + continue; + } + + auto workerFunc = [&checkCsvFunc, row, buffer = std::move(buffer)]() mutable { + checkCsvFunc(std::move(buffer), row); + }; + batchBytes = 0; + buffer.clear(); + + jobsSemaphore.acquire(); + + if (!ProcessingPool->AddFunc(workerFunc)) { + return MakeStatus(EStatus::INTERNAL_ERROR, "Couldn't add worker func"); + } + + if (Failed) { + break; + } + } + + // Check the rest if buffer is not empty + if (!buffer.empty() && countInput.Counter() > 0 && !Failed) { + jobsSemaphore.acquire(); + checkCsvFunc(std::move(buffer), row); + } + + for (size_t i = 0; i < maxJobInflight; ++i) { + jobsSemaphore.acquire(); + } + + TStringBuilder res; + res << "Example CreateTable request text generated based on data in file " << filePath << ":" << Endl << Endl; + res << "CREATE TABLE " << (relativeTablePath.empty() ? "`new_table`" : "`" + relativeTablePath + "`")<< " (" << Endl; + auto& possibleTypes = columnTypes.GetColumnPossibleTypes(); + for (size_t i = 0; i < header.size(); ++i) { + auto& possibleType = possibleTypes[i]; + auto& possibleTypeIt = possibleType.GetIterator(); + TString typeText = possibleTypeIt != possibleType.GetAvailableTypesEnd() + && possibleType.GetHasNonNulls() ? possibleTypeIt->ToString() : "Text"; + res << " `" << header[i] << "` " << typeText; + if (!possibleType.GetHasNulls()) { + res << " NOT NULL"; + } + res << ","; + if (!possibleType.GetHasNonNulls()) { + res << " -- No data in this column to infer type"; + } + res << Endl; + } + res << " PRIMARY KEY (`" << header[0] << "`) -- First column is chosen. Probably need to change this" << Endl; + res << +R"() +WITH ( + STORE = ROW -- or COLUMN + -- Other useful table options: + --, AUTO_PARTITIONING_BY_SIZE = ENABLED + --, AUTO_PARTITIONING_BY_LOAD = ENABLED + --, UNIFORM_PARTITIONS = 100 + --, AUTO_PARTITIONING_MIN_PARTITIONS_COUNT = 100 + --, AUTO_PARTITIONING_MIN_PARTITIONS_COUNT = 1000 +);)"; + suggestion = res; + if (Failed) { + return *ErrorStatus; + } else { + return MakeStatus(); + } +} + TStatus TImportFileClient::TImpl::UpsertJson(IInputStream& input, const TString& dbPath, std::optional<ui64> inputSizeHint, ProgressCallbackFunc & progressCallback) { const TType tableType = GetTableType(); |