aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorNikolay Perfilov <pnv1@yandex-team.ru>2025-01-17 20:11:58 +0300
committerGitHub <noreply@github.com>2025-01-17 20:11:58 +0300
commit83627e00155e16eab2d6442da0035dd054510b19 (patch)
treea212d2fb62b02ab9fd3866b36a19465e81e1673a
parente0f619d4043dd00d032f2bbe9d5dbc1ad9389d60 (diff)
downloadydb-83627e00155e16eab2d6442da0035dd054510b19.tar.gz
Suggest create table text on scheme error during `import file csv` (#13247)
Co-authored-by: Bulat <bylatgr@gmail.com>
-rw-r--r--ydb/apps/ydb/CHANGELOG.md1
-rw-r--r--ydb/public/lib/ydb_cli/common/csv_parser.cpp271
-rw-r--r--ydb/public/lib/ydb_cli/common/csv_parser.h35
-rw-r--r--ydb/public/lib/ydb_cli/common/csv_parser_ut.cpp88
-rw-r--r--ydb/public/lib/ydb_cli/import/import.cpp291
5 files changed, 664 insertions, 22 deletions
diff --git a/ydb/apps/ydb/CHANGELOG.md b/ydb/apps/ydb/CHANGELOG.md
index 92210a1a06..5f41af6f41 100644
--- a/ydb/apps/ydb/CHANGELOG.md
+++ b/ydb/apps/ydb/CHANGELOG.md
@@ -1,3 +1,4 @@
+* Added CREATE TABLE text suggestion on scheme error during `ydb import file csv`
* Backup and restore of changefeeds has been added to `ydb tools dump` and `ydb tools restore`. As a result, there are changes in the backup file structure: for tables with changefeeds, a subdirectory is created for each changefeed, named after the changefeed. This subdirectory contains two files: `changefeed_description.pb`, which contains the changefeed description, and `topic_description.pb`, which contains information about the underlying topic.
* Added `--skip-checksum-validation` option to `ydb import s3` command to skip server-side checksum validation.
diff --git a/ydb/public/lib/ydb_cli/common/csv_parser.cpp b/ydb/public/lib/ydb_cli/common/csv_parser.cpp
index 6b3925c4c0..819bab0fd0 100644
--- a/ydb/public/lib/ydb_cli/common/csv_parser.cpp
+++ b/ydb/public/lib/ydb_cli/common/csv_parser.cpp
@@ -5,6 +5,8 @@
#include <library/cpp/string_utils/csv/csv.h>
+#include <regex>
+
namespace NYdb {
namespace NConsoleClient {
namespace {
@@ -289,6 +291,120 @@ public:
}
}
+ template <class T>
+ bool TryParseArithmetic(const TString& token) const {
+ size_t cnt;
+ try {
+ auto value = StringToArithmetic<T>(token, cnt);
+ if (cnt != token.size() || value < std::numeric_limits<T>::lowest() || value > std::numeric_limits<T>::max()) {
+ return false;
+ }
+ } catch (std::exception& e) {
+ return false;
+ }
+ return true;
+ }
+
+ bool TryParseBool(const TString& token) const {
+ TString tokenLowerCase = to_lower(token);
+ return tokenLowerCase == "true" || tokenLowerCase == "false";
+ }
+
+ bool TryParsePrimitive(const TString& token) {
+ switch (Parser.GetPrimitive()) {
+ case EPrimitiveType::Uint8:
+ return TryParseArithmetic<ui8>(token) && !token.StartsWith('-');
+ case EPrimitiveType::Uint16:
+ return TryParseArithmetic<ui16>(token) && !token.StartsWith('-');;
+ case EPrimitiveType::Uint32:
+ return TryParseArithmetic<ui32>(token) && !token.StartsWith('-');;
+ case EPrimitiveType::Uint64:
+ return TryParseArithmetic<ui64>(token) && !token.StartsWith('-');;
+ case EPrimitiveType::Int8:
+ return TryParseArithmetic<i8>(token);
+ case EPrimitiveType::Int16:
+ return TryParseArithmetic<i16>(token);
+ case EPrimitiveType::Int32:
+ return TryParseArithmetic<i32>(token);
+ case EPrimitiveType::Int64:
+ return TryParseArithmetic<i64>(token);
+ case EPrimitiveType::Bool:
+ return TryParseBool(token);
+ case EPrimitiveType::Json:
+ return token.StartsWith('{') && token.EndsWith('}');
+ break;
+ case EPrimitiveType::JsonDocument:
+ break;
+ case EPrimitiveType::Yson:
+ break;
+ case EPrimitiveType::Uuid:
+ static std::regex uuidRegexTemplate("[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}");
+ return std::regex_match(token.c_str(), uuidRegexTemplate);
+ case EPrimitiveType::Float:
+ return TryParseArithmetic<float>(token);
+ case EPrimitiveType::Double:
+ return TryParseArithmetic<double>(token);
+ case EPrimitiveType::DyNumber:
+ break;
+ case EPrimitiveType::Date: {
+ TInstant date;
+ return TInstant::TryParseIso8601(token, date) && token.length() <= 10;
+ }
+ case EPrimitiveType::Datetime: {
+ TInstant datetime;
+ return TInstant::TryParseIso8601(token, datetime) && token.length() <= 19;
+ }
+ case EPrimitiveType::Timestamp: {
+ TInstant timestamp;
+ return TInstant::TryParseIso8601(token, timestamp) || TryParseArithmetic<ui64>(token);
+ }
+ case EPrimitiveType::Interval:
+ break;
+ case EPrimitiveType::Date32: {
+ TInstant date;
+ return TInstant::TryParseIso8601(token, date) || TryParseArithmetic<i32>(token);
+ }
+ case EPrimitiveType::Datetime64: {
+ TInstant date;
+ return TInstant::TryParseIso8601(token, date) || TryParseArithmetic<i64>(token);
+ }
+ case EPrimitiveType::Timestamp64: {
+ TInstant date;
+ return TInstant::TryParseIso8601(token, date) || TryParseArithmetic<i64>(token);
+ }
+ case EPrimitiveType::Interval64:
+ return TryParseArithmetic<i64>(token);
+ case EPrimitiveType::TzDate:
+ break;
+ case EPrimitiveType::TzDatetime:
+ break;
+ case EPrimitiveType::TzTimestamp:
+ break;
+ default:
+ throw TCsvParseException() << "Unsupported primitive type: " << Parser.GetPrimitive();
+ }
+ return false;
+ }
+
+ bool TryParseValue(const TStringBuf& token, TPossibleType& possibleType) {
+ if (NullValue && token == NullValue) {
+ possibleType.SetHasNulls(true);
+ return true;
+ }
+ possibleType.SetHasNonNulls(true);
+ switch (Parser.GetKind()) {
+ case TTypeParser::ETypeKind::Primitive: {
+ return TryParsePrimitive(TString(token));
+ }
+ case TTypeParser::ETypeKind::Decimal: {
+ break;
+ }
+ default:
+ throw TCsvParseException() << "Unsupported type kind: " << Parser.GetKind();
+ }
+ return false;
+ }
+
TValue Convert(const TStringBuf& token) {
BuildValue(token);
return Builder.Build();
@@ -330,6 +446,16 @@ TValue FieldToValue(TTypeParser& parser,
}
}
+bool TryParse(TTypeParser& parser, const TStringBuf& token, const std::optional<TString>& nullValue, TPossibleType& possibleType) {
+ try {
+ TCsvToYdbConverter converter(parser, nullValue);
+ return converter.TryParseValue(token, possibleType);
+ } catch (std::exception& e) {
+ Cerr << "UNEXPECTED EXCEPTION: " << e.what() << Endl;
+ return false;
+ }
+}
+
TStringBuf Consume(NCsvFormat::CsvSplitter& splitter,
const TCsvParser::TParseMetadata& meta,
const TString& columnName) {
@@ -489,6 +615,151 @@ void TCsvParser::BuildLineType() {
ResultLineType = builder.Build();
ResultListType = TTypeBuilder().List(ResultLineType.value()).Build();
}
+namespace {
+static const std::vector<TType> availableTypes = {
+ TTypeBuilder().Primitive(EPrimitiveType::Bool).Build(),
+ TTypeBuilder().Primitive(EPrimitiveType::Uint64).Build(),
+ TTypeBuilder().Primitive(EPrimitiveType::Int64).Build(),
+ TTypeBuilder().Primitive(EPrimitiveType::Double).Build(),
+ TTypeBuilder().Primitive(EPrimitiveType::Date).Build(),
+ TTypeBuilder().Primitive(EPrimitiveType::Datetime).Build(),
+ TTypeBuilder().Primitive(EPrimitiveType::Timestamp).Build(),
+ TTypeBuilder().Primitive(EPrimitiveType::Json).Build(),
+ TTypeBuilder().Primitive(EPrimitiveType::Uuid).Build(),
+};
+
+static const auto availableTypesEnd = availableTypes.end();
+
+} // namespace
+
+TPossibleType::TPossibleType() {
+ CurrentType = availableTypes.begin();
+}
+
+TPossibleType::TPossibleType(std::vector<TType>::const_iterator currentType)
+: CurrentType(currentType)
+{
+}
+
+void TPossibleType::SetIterator(const std::vector<TType>::const_iterator& newIterator) {
+ CurrentType = newIterator;
+}
+
+std::vector<TType>::const_iterator& TPossibleType::GetIterator() {
+ return CurrentType;
+}
+
+const std::vector<TType>::const_iterator& TPossibleType::GetAvailableTypesEnd() {
+ return availableTypesEnd;
+}
+
+void TPossibleType::SetHasNulls(bool hasNulls) {
+ HasNulls = hasNulls;
+}
+
+bool TPossibleType::GetHasNulls() const {
+ return HasNulls;
+}
+
+void TPossibleType::SetHasNonNulls(bool hasNonNulls) {
+ HasNonNulls = hasNonNulls;
+}
+
+bool TPossibleType::GetHasNonNulls() const {
+ return HasNonNulls;
+}
+
+TPossibleTypes::TPossibleTypes(size_t size) {
+ ColumnPossibleTypes.resize(size);
+}
+
+TPossibleTypes::TPossibleTypes(std::vector<TPossibleType>& currentColumnTypes)
+: ColumnPossibleTypes(currentColumnTypes)
+{
+}
+
+// Pass this copy to a worker to parse his chunk of data with it to merge it later back into this main chunk
+TPossibleTypes TPossibleTypes::GetCopy() {
+ std::shared_lock<std::shared_mutex> ReadLock(Lock);
+ return TPossibleTypes(ColumnPossibleTypes);
+}
+
+// Merge this main chunk with another chunk that parsed a CSV batch and maybe dismissed some types
+void TPossibleTypes::MergeWith(TPossibleTypes& newTypes) {
+ auto newTypesVec = newTypes.GetColumnPossibleTypes();
+ {
+ std::shared_lock<std::shared_mutex> ReadLock(Lock);
+ bool changed = false;
+ for (size_t i = 0; i < ColumnPossibleTypes.size(); ++i) {
+ auto& currentPossibleType = ColumnPossibleTypes[i];
+ auto& newPossibleType = newTypesVec[i];
+ auto& currentIt = currentPossibleType.GetIterator();
+ const auto& newIt = newPossibleType.GetIterator();
+ if (newIt > currentIt) {
+ changed = true;
+ break;
+ }
+ if (currentPossibleType.GetHasNulls() != newPossibleType.GetHasNulls()
+ || currentPossibleType.GetHasNonNulls() != newPossibleType.GetHasNonNulls()) {
+ changed = true;
+ break;
+ }
+ }
+ if (!changed) {
+ return;
+ }
+ }
+ std::unique_lock<std::shared_mutex> WriteLock(Lock);
+ for (size_t i = 0; i < ColumnPossibleTypes.size(); ++i) {
+ auto& currentPossibleType = ColumnPossibleTypes[i];
+ auto& newPossibleType = newTypesVec[i];
+ const auto& newIt = newPossibleType.GetIterator();
+ if (newIt > currentPossibleType.GetIterator()) {
+ currentPossibleType.SetIterator(newIt);
+ }
+ if (newPossibleType.GetHasNulls()) {
+ currentPossibleType.SetHasNulls(true);
+ }
+ if (newPossibleType.GetHasNonNulls()) {
+ currentPossibleType.SetHasNonNulls(true);
+ }
+ }
+}
+
+std::vector<TPossibleType>& TPossibleTypes::GetColumnPossibleTypes() {
+ return ColumnPossibleTypes;
+}
+
+void TCsvParser::ParseLineTypes(TString& line, TPossibleTypes& possibleTypes, const TParseMetadata& meta) {
+ NCsvFormat::CsvSplitter splitter(line, Delimeter);
+ auto headerIt = Header.cbegin();
+ auto typesIt = possibleTypes.GetColumnPossibleTypes().begin();
+ do {
+ if (headerIt == Header.cend()) {
+ throw FormatError(yexception() << "Header contains less fields than data. Header: \"" << HeaderRow << "\", data: \"" << line << "\"", meta);
+ }
+ TStringBuf token = Consume(splitter, meta, *headerIt);
+ TPossibleType& possibleType = *typesIt;
+ auto& typeIt = possibleType.GetIterator();
+ while (typeIt != availableTypesEnd) {
+ TTypeParser typeParser(*typeIt);
+ if (TryParse(typeParser, token, NullValue, possibleType)) {
+ break;
+ }
+ ++typeIt;
+ }
+ ++headerIt;
+ ++typesIt;
+ } while (splitter.Step());
+
+ if (headerIt != Header.cend()) {
+ throw FormatError(yexception() << "Header contains more fields than data. Header: \"" << HeaderRow << "\", data: \"" << line << "\"", meta);
+ }
+}
+
+const TVector<TString>& TCsvParser::GetHeader() {
+ return Header;
+}
}
}
diff --git a/ydb/public/lib/ydb_cli/common/csv_parser.h b/ydb/public/lib/ydb_cli/common/csv_parser.h
index 56ee5949ea..301d4cb601 100644
--- a/ydb/public/lib/ydb_cli/common/csv_parser.h
+++ b/ydb/public/lib/ydb_cli/common/csv_parser.h
@@ -4,11 +4,44 @@
#include <library/cpp/string_utils/csv/csv.h>
+#include <shared_mutex>
+
namespace NYdb {
namespace NConsoleClient {
class TCsvParseException : public yexception {};
+class TPossibleType {
+public:
+ TPossibleType();
+ TPossibleType(std::vector<TType>::const_iterator currentType);
+ void SetIterator(const std::vector<TType>::const_iterator& newIterator);
+ std::vector<TType>::const_iterator& GetIterator();
+ static const std::vector<TType>::const_iterator& GetAvailableTypesEnd();
+ void SetHasNulls(bool hasNulls);
+ bool GetHasNulls() const;
+ void SetHasNonNulls(bool hasNonNulls);
+ bool GetHasNonNulls() const;
+private:
+ std::vector<TType>::const_iterator CurrentType;
+ bool HasNulls = false;
+ bool HasNonNulls = false;
+};
+
+class TPossibleTypes {
+public:
+ TPossibleTypes(size_t size);
+ TPossibleTypes(std::vector<TPossibleType>& currentColumnTypes);
+ // Pass this copy to a worker to parse his chunk of data with it to merge it later back into this main chunk
+ TPossibleTypes GetCopy();
+ // Merge this main chunk with another chunk that parsed a CSV batch and maybe dismissed some types
+ void MergeWith(TPossibleTypes& newTypes);
+ std::vector<TPossibleType>& GetColumnPossibleTypes();
+private:
+ std::vector<TPossibleType> ColumnPossibleTypes;
+ std::shared_mutex Lock;
+};
+
class TCsvParser {
public:
struct TParseMetadata {
@@ -36,6 +69,8 @@ public:
TValue BuildList(std::vector<TString>& lines, const TString& filename,
std::optional<ui64> row = std::nullopt) const;
void BuildLineType();
+ const TVector<TString>& GetHeader();
+ void ParseLineTypes(TString& line, TPossibleTypes& possibleTypes, const TParseMetadata& meta);
private:
TVector<TString> Header;
diff --git a/ydb/public/lib/ydb_cli/common/csv_parser_ut.cpp b/ydb/public/lib/ydb_cli/common/csv_parser_ut.cpp
index 1353b05bf2..847902d1c3 100644
--- a/ydb/public/lib/ydb_cli/common/csv_parser_ut.cpp
+++ b/ydb/public/lib/ydb_cli/common/csv_parser_ut.cpp
@@ -336,4 +336,92 @@ Y_UNIT_TEST_SUITE(YdbCliCsvParserTests) {
.EndList().Build();
AssertValuesEqual(builtResult, expexctedResult);
}
+
+ void CheckPossibleTypes(TVector<TVector<TString>>&& batches, TVector<TMaybe<TType>>&& expectedTypes) {
+ TVector<TString> columnNames;
+ for (size_t i = 1; i <= expectedTypes.size(); ++i) {
+ columnNames.push_back(TStringBuilder() << "column" << i);
+ }
+ size_t columnCount = columnNames.size();
+ TCsvParser parser = TCsvParser(std::move(columnNames), ',', "");
+ TPossibleTypes possibleTypesGlobal(columnCount);
+ uint64_t row = 0;
+ for (auto& batch : batches) {
+ TPossibleTypes batchTypes = possibleTypesGlobal.GetCopy();
+ for (auto& line : batch) {
+ ++row;
+ parser.ParseLineTypes(line, batchTypes, TCsvParser::TParseMetadata{row, "testFile.csv"});
+ }
+ possibleTypesGlobal.MergeWith(batchTypes);
+ }
+ auto& possibleTypesResult = possibleTypesGlobal.GetColumnPossibleTypes();
+ UNIT_ASSERT_EQUAL_C(expectedTypes.size(), possibleTypesResult.size(),
+ TStringBuilder() << "Expected " << expectedTypes.size() << " columns, got " << possibleTypesResult.size());
+ for (size_t i = 0; i < expectedTypes.size(); ++i) {
+ TPossibleType& resultPossibleType = possibleTypesResult[i];
+ std::vector<TType>::const_iterator& it = resultPossibleType.GetIterator();
+ TType resultType = (it == TPossibleType::GetAvailableTypesEnd()
+ ? TTypeBuilder().Primitive(EPrimitiveType::Utf8).Build()
+ : *it);
+ if (!resultPossibleType.GetHasNonNulls()) {
+ UNIT_ASSERT(!expectedTypes[i].Defined());
+ } else {
+ UNIT_ASSERT(expectedTypes[i].Defined());
+ if (resultPossibleType.GetHasNulls()) {
+ resultType = TTypeBuilder()
+ .BeginOptional()
+ .Primitive(TTypeParser(resultType).GetPrimitive())
+ .EndOptional()
+ .Build();
+ }
+ UNIT_ASSERT_C(TypesEqual(expectedTypes[i].GetRef(), resultType),
+ TStringBuilder() << "Expected type " << expectedTypes[i].GetRef() << ", got " << resultType);
+ }
+ }
+ }
+
+ Y_UNIT_TEST(InferTypesNonNull) {
+ TVector<TVector<TString>> batches = {
+ {
+ "1 0,True,0,10,100000000000,-10,-100000000000,2001-01-01,2001-01-01T12:12:12,2001-01-01T12:12:12.111111,550e8400-e29b-41d4-a716-446655440000,\"{\"\"name1\"\":\"\"value1\"\"}\"",
+ "2 0,False,0,20,0,0,0,2001-01-02,2001-01-02T12:12:12,2001-01-02T12:12:12.111111,550e8400-e29b-41d4-a716-446655440001,\"{\"\"name2\"\": \"\"value2\"\"}\""
+ }
+ };
+ TVector<TMaybe<TType>> expectedTypes = {
+ TTypeBuilder().Primitive(EPrimitiveType::Utf8).Build(),
+ TTypeBuilder().Primitive(EPrimitiveType::Bool).Build(),
+ TTypeBuilder().Primitive(EPrimitiveType::Uint64).Build(),
+ TTypeBuilder().Primitive(EPrimitiveType::Uint64).Build(),
+ TTypeBuilder().Primitive(EPrimitiveType::Uint64).Build(),
+ TTypeBuilder().Primitive(EPrimitiveType::Int64).Build(),
+ TTypeBuilder().Primitive(EPrimitiveType::Int64).Build(),
+ TTypeBuilder().Primitive(EPrimitiveType::Date).Build(),
+ TTypeBuilder().Primitive(EPrimitiveType::Datetime).Build(),
+ TTypeBuilder().Primitive(EPrimitiveType::Timestamp).Build(),
+ TTypeBuilder().Primitive(EPrimitiveType::Uuid).Build(),
+ TTypeBuilder().Primitive(EPrimitiveType::Json).Build()
+ };
+ CheckPossibleTypes(std::move(batches), std::move(expectedTypes));
+ }
+
+ Y_UNIT_TEST(InferTypesWithNulls) {
+ TVector<TVector<TString>> batches = {
+ {
+ ",True,0,2001-01-01,",
+ ",False,,2001-01-02,"
+ },
+ {
+ ",True,0,10,",
+ ",,0,True,abc"
+ }
+ };
+ TVector<TMaybe<TType>> expectedTypes = {
+ Nothing(), // Can't infer type of a column that has only null values
+ TTypeBuilder().BeginOptional().Primitive(EPrimitiveType::Bool).EndOptional().Build(),
+ TTypeBuilder().BeginOptional().Primitive(EPrimitiveType::Uint64).EndOptional().Build(),
+ TTypeBuilder().Primitive(EPrimitiveType::Utf8).Build(),
+ TTypeBuilder().BeginOptional().Primitive(EPrimitiveType::Utf8).EndOptional().Build(),
+ };
+ CheckPossibleTypes(std::move(batches), std::move(expectedTypes));
+ }
} \ No newline at end of file
diff --git a/ydb/public/lib/ydb_cli/import/import.cpp b/ydb/public/lib/ydb_cli/import/import.cpp
index d77e2e97a6..52d3240e97 100644
--- a/ydb/public/lib/ydb_cli/import/import.cpp
+++ b/ydb/public/lib/ydb_cli/import/import.cpp
@@ -4,6 +4,7 @@
#include <ydb/public/sdk/cpp/client/ydb_driver/driver.h>
#include <ydb/public/sdk/cpp/client/ydb_operation/operation.h>
#include <ydb/public/sdk/cpp/client/ydb_proto/accessor.h>
+#include <ydb/public/sdk/cpp/client/ydb_query/client.h>
#include <ydb/public/sdk/cpp/client/ydb_scheme/scheme.h>
#include <ydb/public/sdk/cpp/client/ydb_table/table.h>
@@ -14,6 +15,7 @@
#include <ydb/public/lib/ydb_cli/common/recursive_list.h>
#include <ydb/public/lib/ydb_cli/common/interactive.h>
#include <ydb/public/lib/ydb_cli/common/progress_bar.h>
+#include <ydb/public/lib/ydb_cli/commands/ydb_common.h>
#include <ydb/public/lib/ydb_cli/dump/util/util.h>
#include <ydb/public/lib/ydb_cli/import/cli_arrow_helpers.h>
@@ -50,6 +52,8 @@ namespace NYdb {
namespace NConsoleClient {
namespace {
+constexpr ui64 rowsToAnalyze = 100000;
+
inline
TStatus MakeStatus(EStatus code = EStatus::SUCCESS, const TString& error = {}) {
NYql::TIssues issues;
@@ -89,36 +93,41 @@ void InitCsvParser(TCsvParser& parser,
bool& removeLastDelimiter,
NCsvFormat::TLinesSplitter& csvSource,
const TImportFileSettings& settings,
- const std::map<TString, TType>* columnTypes,
- const NTable::TTableDescription* dbTableInfo) {
- if (settings.Header_ || settings.HeaderRow_) {
- TString headerRow;
+ const TString& headerRow,
+ const std::map<TString, TType>* columnTypes = nullptr,
+ const NTable::TTableDescription* dbTableInfo = nullptr) {
+ if (settings.Header_ || headerRow) {
+ TString newHeaderRow;
if (settings.Header_) {
- headerRow = csvSource.ConsumeLine();
+ newHeaderRow = csvSource.ConsumeLine();
}
- if (settings.HeaderRow_) {
- headerRow = settings.HeaderRow_;
+ if (headerRow) {
+ newHeaderRow = headerRow;
}
- if (headerRow.EndsWith("\r\n")) {
- headerRow.erase(headerRow.size() - 2);
+ if (newHeaderRow.EndsWith("\r\n")) {
+ newHeaderRow.erase(newHeaderRow.size() - 2);
}
- if (headerRow.EndsWith("\n")) {
- headerRow.erase(headerRow.size() - 1);
+ if (newHeaderRow.EndsWith("\n")) {
+ newHeaderRow.erase(newHeaderRow.size() - 1);
}
- if (headerRow.EndsWith(settings.Delimiter_)) {
+ if (newHeaderRow.EndsWith(settings.Delimiter_)) {
removeLastDelimiter = true;
- headerRow.erase(headerRow.size() - settings.Delimiter_.size());
+ newHeaderRow.erase(newHeaderRow.size() - settings.Delimiter_.size());
}
- parser = TCsvParser(std::move(headerRow), settings.Delimiter_[0], settings.NullValue_, columnTypes);
+ parser = TCsvParser(std::move(newHeaderRow), settings.Delimiter_[0], settings.NullValue_, columnTypes);
} else {
TVector<TString> columns;
- Y_ENSURE_BT(dbTableInfo);
+ if (!dbTableInfo) {
+ throw yexception() << "Need to specify column names";
+ }
for (const auto& column : dbTableInfo->GetColumns()) {
columns.push_back(column.Name);
}
parser = TCsvParser(std::move(columns), settings.Delimiter_[0], settings.NullValue_, columnTypes);
}
- parser.BuildLineType();
+ if (columnTypes) {
+ parser.BuildLineType();
+ }
}
FHANDLE GetStdinFileno() {
@@ -502,6 +511,12 @@ private:
std::map<TString, TType> GetColumnTypes();
void ValidateTValueUpsertTable();
std::shared_ptr<TProgressFile> LoadOrStartImportProgress(const TString& filePath);
+ TStatus GenerateCreateTableFromCsv(IInputStream& input,
+ const TString& relativeTablePath,
+ const TString& filePath,
+ TString& suggestion);
+ TStatus SuggestCreateTableRequest(const TVector<TString>& filePaths, const TString& relativeTablePath,
+ TString& suggestion);
std::shared_ptr<NTable::TTableClient> TableClient;
std::shared_ptr<NScheme::TSchemeClient> SchemeClient;
@@ -578,8 +593,16 @@ TStatus TImportFileClient::TImpl::Import(const TVector<TString>& filePaths, cons
if (describeStatus.GetStatus() == EStatus::SCHEME_ERROR) {
auto describePathResult = NDump::DescribePath(*SchemeClient, dbPath);
if (describePathResult.GetStatus() != EStatus::SUCCESS) {
- return MakeStatus(EStatus::SCHEME_ERROR,
- TStringBuilder() << describePathResult.GetIssues().ToString() << dbPath);
+ TStringBuilder errorMessage;
+ errorMessage << describePathResult.GetIssues().ToString() << dbPath << Endl;
+ TString suggestMessage;
+ auto suggestStatus = SuggestCreateTableRequest(filePaths, dbPath, suggestMessage);
+ if (suggestStatus.IsSuccess()) {
+ errorMessage << suggestMessage << Endl;
+ } else {
+ errorMessage << "Error while trying to generate CREATE TABLE request suggestion: " << suggestStatus << Endl;
+ }
+ return MakeStatus(EStatus::SCHEME_ERROR, errorMessage);
}
}
return describeStatus;
@@ -691,11 +714,12 @@ TStatus TImportFileClient::TImpl::Import(const TVector<TString>& filePaths, cons
return UpsertJson(input, dbPath, fileSizeHint, progressCallback);
case EDataFormat::Parquet:
return UpsertParquet(filePath, dbPath, progressCallback);
- default: ;
+ default:
+ break;
}
return MakeStatus(EStatus::BAD_REQUEST,
- TStringBuilder() << "Unsupported format #" << (int) Settings.Format_);
+ TStringBuilder() << "Unsupported file format #" << (int) Settings.Format_);
} catch (const std::exception& e) {
return MakeStatus(EStatus::INTERNAL_ERROR,
TStringBuilder() << "Error: " << e.what());
@@ -779,6 +803,74 @@ std::shared_ptr<TProgressFile> TImportFileClient::TImpl::LoadOrStartImportProgre
return progressFile;
}
+TStatus TImportFileClient::TImpl::SuggestCreateTableRequest(const TVector<TString>& filePaths,
+ const TString& relativeTablePath, TString& suggestion) {
+ // All files should have the same scheme so probably no need to analyze more than one file
+ CurrentFileCount = 1;
+ size_t filePathsSize = 1;
+ const auto& filePath = filePaths[0];
+
+ if (Settings.Format_ == EDataFormat::Tsv && Settings.Delimiter_ != "\t") {
+ return MakeStatus(EStatus::BAD_REQUEST,
+ TStringBuilder() << "Illegal delimiter for TSV format, only tab is allowed");
+ }
+
+ UpsertSettings
+ .OperationTimeout(Settings.OperationTimeout_)
+ .ClientTimeout(Settings.ClientTimeout_);
+
+ auto pool = CreateThreadPool(filePathsSize);
+ TVector<NThreading::TFuture<TStatus>> asyncResults;
+
+ std::unique_ptr<TFileInput> fileInput;
+ std::optional<ui64> fileSizeHint;
+
+ if (!filePath.empty()) {
+ const TFsPath dataFile(filePath);
+
+ if (!dataFile.Exists()) {
+ return MakeStatus(EStatus::BAD_REQUEST,
+ TStringBuilder() << "File does not exist: " << filePath);
+ }
+
+ if (!dataFile.IsFile()) {
+ return MakeStatus(EStatus::BAD_REQUEST,
+ TStringBuilder() << "Not a file: " << filePath);
+ }
+
+ TFile file(filePath, OpenExisting | RdOnly | Seq);
+ i64 fileLength = file.GetLength();
+ if (fileLength && fileLength >= 0) {
+ fileSizeHint = fileLength;
+ }
+
+ fileInput = std::make_unique<TFileInput>(file, Settings.FileBufferSize_);
+ }
+
+ IInputStream& input = fileInput ? *fileInput : Cin;
+
+ try {
+ switch (Settings.Format_) {
+ case EDataFormat::Default:
+ case EDataFormat::Csv:
+ case EDataFormat::Tsv:
+ return GenerateCreateTableFromCsv(input, relativeTablePath, filePath, suggestion);
+ case EDataFormat::Json:
+ case EDataFormat::JsonUnicode:
+ case EDataFormat::JsonBase64:
+ case EDataFormat::Parquet:
+ default:
+ break;
+ }
+
+ return MakeStatus(EStatus::BAD_REQUEST,
+ TStringBuilder() << "Unsupported file format #" << (int) Settings.Format_);
+ } catch (const std::exception& e) {
+ return MakeStatus(EStatus::INTERNAL_ERROR,
+ TStringBuilder() << "Error: " << e.what());
+ }
+}
+
inline
TAsyncStatus TImportFileClient::TImpl::UpsertTValueBuffer(const TString& dbPath, TValueBuilder& builder) {
auto retryFunc = [this, dbPath, rows = builder.Build()]
@@ -853,7 +945,7 @@ TStatus TImportFileClient::TImpl::UpsertCsv(IInputStream& input,
TCsvParser parser;
bool removeLastDelimiter = false;
- InitCsvParser(parser, removeLastDelimiter, splitter, Settings, &columnTypes, DbTableInfo.get());
+ InitCsvParser(parser, removeLastDelimiter, splitter, Settings, Settings.HeaderRow_, &columnTypes, DbTableInfo.get());
ui64 rowsToSkip = Max((ui64)Settings.SkipRows_,
progressFile->HasLastImportedLine() ? progressFile->GetLastImportedLine() : 0);
@@ -1037,7 +1129,7 @@ TStatus TImportFileClient::TImpl::UpsertCsvByBlocks(const TString& filePath,
bool removeLastDelimiter = false;
TStringInput headerInput(headerRow);
NCsvFormat::TLinesSplitter headerSplitter(headerInput, Settings.Delimiter_[0]);
- InitCsvParser(parser, removeLastDelimiter, headerSplitter, Settings, &columnTypes, DbTableInfo.get());
+ InitCsvParser(parser, removeLastDelimiter, headerSplitter, Settings, Settings.HeaderRow_, &columnTypes, DbTableInfo.get());
TVector<NThreading::TFuture<void>> threadResults;
threadResults.reserve(threadCount);
@@ -1149,6 +1241,161 @@ TStatus TImportFileClient::TImpl::UpsertCsvByBlocks(const TString& filePath,
return MakeStatus();
}
+TStatus TImportFileClient::TImpl::GenerateCreateTableFromCsv(IInputStream& input,
+ const TString& relativeTablePath,
+ const TString& filePath,
+ TString& suggestion) {
+ TCountingInput countInput(&input);
+ NCsvFormat::TLinesSplitter splitter(countInput);
+
+ size_t maxJobInflight = Settings.Threads_;
+ std::counting_semaphore<> jobsSemaphore(maxJobInflight);
+
+ TCsvParser parser;
+ bool removeLastDelimiter = false;
+
+ if (!Settings.Header_ && !Settings.HeaderRow_) {
+ TString firstRow = splitter.ConsumeLine();
+ NCsvFormat::CsvSplitter csvSplitter(firstRow, Settings.Delimiter_[0]);
+ size_t columnSize = 0;
+ do {
+ csvSplitter.Consume();
+ ++columnSize;
+ } while (csvSplitter.Step());
+ TStringBuilder columns;
+ for (size_t i = 0; i < columnSize; ++i) {
+ if (i > 0) {
+ columns << Settings.Delimiter_;
+ }
+ columns << "column" << i;
+ }
+ InitCsvParser(parser, removeLastDelimiter, splitter, Settings, columns);
+ } else {
+ InitCsvParser(parser, removeLastDelimiter, splitter, Settings, Settings.HeaderRow_);
+ }
+
+ const auto& header = parser.GetHeader();
+
+ TPossibleTypes columnTypes(header.size());
+
+ for (ui32 i = 0; i < Settings.SkipRows_; ++i) {
+ splitter.ConsumeLine();
+ }
+
+ ui64 row = Settings.SkipRows_ + Settings.Header_;
+ ui64 batchBytes = 0;
+
+ TString line;
+ std::vector<TAsyncStatus> inFlightRequests;
+ std::vector<TString> buffer;
+
+ auto checkCsvFunc = [&](std::vector<TString>&& buffer, ui64 row) {
+ TPossibleTypes typesCopy = columnTypes.GetCopy();
+ try {
+ for (auto& line : buffer) {
+ parser.ParseLineTypes(line, typesCopy, TCsvParser::TParseMetadata{row, filePath});
+ }
+ } catch (const std::exception& e) {
+ if (!Failed.exchange(true)) {
+ ErrorStatus = MakeHolder<TStatus>(MakeStatus(EStatus::INTERNAL_ERROR, e.what()));
+ }
+ jobsSemaphore.release();
+ throw;
+ }
+ columnTypes.MergeWith(typesCopy);
+ jobsSemaphore.release();
+ };
+
+ while (TString line = splitter.ConsumeLine()) {
+ ++row;
+ if (row > rowsToAnalyze) {
+ break;
+ }
+ if (line.empty()) {
+ continue;
+ }
+ batchBytes += line.size();
+
+ if (removeLastDelimiter) {
+ if (!line.EndsWith(Settings.Delimiter_)) {
+ return MakeStatus(EStatus::BAD_REQUEST,
+ "According to the header, lines should end with a delimiter");
+ }
+ line.erase(line.size() - Settings.Delimiter_.size());
+ }
+
+ buffer.push_back(line);
+
+ if (batchBytes < Settings.BytesPerRequest_) {
+ continue;
+ }
+
+ auto workerFunc = [&checkCsvFunc, row, buffer = std::move(buffer)]() mutable {
+ checkCsvFunc(std::move(buffer), row);
+ };
+ batchBytes = 0;
+ buffer.clear();
+
+ jobsSemaphore.acquire();
+
+ if (!ProcessingPool->AddFunc(workerFunc)) {
+ return MakeStatus(EStatus::INTERNAL_ERROR, "Couldn't add worker func");
+ }
+
+ if (Failed) {
+ break;
+ }
+ }
+
+ // Check the rest if buffer is not empty
+ if (!buffer.empty() && countInput.Counter() > 0 && !Failed) {
+ jobsSemaphore.acquire();
+ checkCsvFunc(std::move(buffer), row);
+ }
+
+ for (size_t i = 0; i < maxJobInflight; ++i) {
+ jobsSemaphore.acquire();
+ }
+
+ TStringBuilder res;
+ res << "Example CreateTable request text generated based on data in file " << filePath << ":" << Endl << Endl;
+ res << "CREATE TABLE " << (relativeTablePath.empty() ? "`new_table`" : "`" + relativeTablePath + "`")<< " (" << Endl;
+ auto& possibleTypes = columnTypes.GetColumnPossibleTypes();
+ for (size_t i = 0; i < header.size(); ++i) {
+ auto& possibleType = possibleTypes[i];
+ auto& possibleTypeIt = possibleType.GetIterator();
+ TString typeText = possibleTypeIt != possibleType.GetAvailableTypesEnd()
+ && possibleType.GetHasNonNulls() ? possibleTypeIt->ToString() : "Text";
+ res << " `" << header[i] << "` " << typeText;
+ if (!possibleType.GetHasNulls()) {
+ res << " NOT NULL";
+ }
+ res << ",";
+ if (!possibleType.GetHasNonNulls()) {
+ res << " -- No data in this column to infer type";
+ }
+ res << Endl;
+ }
+ res << " PRIMARY KEY (`" << header[0] << "`) -- First column is chosen. Probably need to change this" << Endl;
+ res <<
+R"()
+WITH (
+ STORE = ROW -- or COLUMN
+ -- Other useful table options:
+ --, AUTO_PARTITIONING_BY_SIZE = ENABLED
+ --, AUTO_PARTITIONING_BY_LOAD = ENABLED
+ --, UNIFORM_PARTITIONS = 100
+ --, AUTO_PARTITIONING_MIN_PARTITIONS_COUNT = 100
+ --, AUTO_PARTITIONING_MIN_PARTITIONS_COUNT = 1000
+);)";
+ suggestion = res;
+ if (Failed) {
+ return *ErrorStatus;
+ } else {
+ return MakeStatus();
+ }
+}
+
TStatus TImportFileClient::TImpl::UpsertJson(IInputStream& input, const TString& dbPath, std::optional<ui64> inputSizeHint,
ProgressCallbackFunc & progressCallback) {
const TType tableType = GetTableType();