diff options
author | brgayazov <bulat@ydb.tech> | 2023-05-15 20:47:01 +0300 |
---|---|---|
committer | brgayazov <bulat@ydb.tech> | 2023-05-15 20:47:01 +0300 |
commit | 72b6ed8df07bf2a4c960a4fb117c3017c686ea73 (patch) | |
tree | cbc3bd0b4e9965ad9c127f4cd802b536fbc39c04 | |
parent | 0228ff692d85e807b26befef2e14c91210ec449e (diff) | |
download | ydb-72b6ed8df07bf2a4c960a4fb117c3017c686ea73.tar.gz |
Add performing delimiter at end of each line
7 files changed, 80 insertions, 5 deletions
diff --git a/ydb/public/lib/ydb_cli/import/import.cpp b/ydb/public/lib/ydb_cli/import/import.cpp index e229089b60..b337930a88 100644 --- a/ydb/public/lib/ydb_cli/import/import.cpp +++ b/ydb/public/lib/ydb_cli/import/import.cpp @@ -362,6 +362,7 @@ TStatus TImportFileClient::UpsertCsv(IInputStream& input, const TString& dbPath, TCountingInput countInput(&input); NCsvFormat::TLinesSplitter splitter(countInput); TString headerRow; + bool RemoveLastDelimiter = false; if (settings.Header_ || settings.HeaderRow_) { if (settings.Header_) { headerRow = splitter.ConsumeLine(); @@ -369,6 +370,10 @@ TStatus TImportFileClient::UpsertCsv(IInputStream& input, const TString& dbPath, if (settings.HeaderRow_) { headerRow = settings.HeaderRow_; } + if (headerRow.EndsWith(settings.Delimiter_)) { + RemoveLastDelimiter = true; + headerRow.erase(headerRow.Size() - settings.Delimiter_.Size()); + } headerRow += '\n'; buffer = headerRow; } @@ -384,9 +389,16 @@ TStatus TImportFileClient::UpsertCsv(IInputStream& input, const TString& dbPath, ui64 readSize = 0; ui64 nextBorder = VerboseModeReadSize; while (TString line = splitter.ConsumeLine()) { + readSize += line.size(); + if (RemoveLastDelimiter) { + if (!line.EndsWith(settings.Delimiter_)) { + return MakeStatus(EStatus::BAD_REQUEST, + "According to the header, lines should end with a delimiter"); + } + line.erase(line.Size() - settings.Delimiter_.Size()); + } buffer += line; buffer += '\n'; - readSize += line.size(); ++idx; if (readSize >= nextBorder && RetrySettings.Verbose_) { nextBorder += VerboseModeReadSize; @@ -414,18 +426,23 @@ TStatus TImportFileClient::UpsertCsvByBlocks(const TString& filePath, const TStr TMaxInflightGetter inFlightGetter(settings.MaxInFlightRequests_, FilesCount); TString headerRow; TCsvFileReader splitter(filePath, settings, headerRow, inFlightGetter); + bool RemoveLastDelimiter = false; if (settings.Header_ || settings.HeaderRow_) { if (settings.HeaderRow_) { headerRow = settings.HeaderRow_; } + if (headerRow.EndsWith(settings.Delimiter_)) { + RemoveLastDelimiter = true; + headerRow.erase(headerRow.Size() - settings.Delimiter_.Size()); + } headerRow += '\n'; } TVector<TAsyncStatus> threadResults(splitter.GetSplitCount()); THolder<IThreadPool> pool = CreateThreadPool(splitter.GetSplitCount()); for (size_t threadId = 0; threadId < splitter.GetSplitCount(); ++threadId) { - auto loadCsv = [this, &settings, &headerRow, &splitter, &dbPath, threadId] () { + auto loadCsv = [this, &settings, &headerRow, &splitter, &dbPath, threadId, RemoveLastDelimiter] () { std::vector<TAsyncStatus> inFlightRequests; TString buffer; buffer = headerRow; @@ -435,9 +452,16 @@ TStatus TImportFileClient::UpsertCsvByBlocks(const TString& filePath, const TStr TAsyncStatus status; TString line; while (splitter.GetChunk(threadId).ConsumeLine(line)) { + readSize += line.size(); + if (RemoveLastDelimiter) { + if (!line.EndsWith(settings.Delimiter_)) { + return MakeStatus(EStatus::BAD_REQUEST, + "According to the header, lines should end with a delimiter"); + } + line.erase(line.Size() - settings.Delimiter_.Size()); + } buffer += line; buffer += '\n'; - readSize += line.size(); ++idx; if (readSize >= nextBorder && RetrySettings.Verbose_) { nextBorder += VerboseModeReadSize; diff --git a/ydb/tests/functional/ydb_cli/canondata/result.json b/ydb/tests/functional/ydb_cli/canondata/result.json index fd211ec825..d0b4abe937 100644 --- a/ydb/tests/functional/ydb_cli/canondata/result.json +++ b/ydb/tests/functional/ydb_cli/canondata/result.json @@ -2,6 +2,12 @@ "test_ydb_impex.TestImpex.test_format_csv": { "uri": "file://test_ydb_impex.TestImpex.test_format_csv/result.output" }, + "test_ydb_impex.TestImpex.test_format_csv_delimeter_at_end_of_lines": { + "uri": "file://test_ydb_impex.TestImpex.test_format_csv_delimeter_at_end_of_lines/result.output" + }, + "test_ydb_impex.TestImpex.test_format_csv_delimeter_at_end_of_lines_newline_delimited": { + "uri": "file://test_ydb_impex.TestImpex.test_format_csv_delimeter_at_end_of_lines_newline_delimited/result.output" + }, "test_ydb_impex.TestImpex.test_format_csv_from_stdin": { "uri": "file://test_ydb_impex.TestImpex.test_format_csv_from_stdin/result.output" }, @@ -44,6 +50,12 @@ "test_ydb_impex.TestImpex.test_format_tsv": { "uri": "file://test_ydb_impex.TestImpex.test_format_tsv/result.output" }, + "test_ydb_impex.TestImpex.test_format_tsv_delimeter_at_end_of_lines": { + "uri": "file://test_ydb_impex.TestImpex.test_format_tsv_delimeter_at_end_of_lines/result.output" + }, + "test_ydb_impex.TestImpex.test_format_tsv_delimeter_at_end_of_lines_newline_delimited": { + "uri": "file://test_ydb_impex.TestImpex.test_format_tsv_delimeter_at_end_of_lines_newline_delimited/result.output" + }, "test_ydb_impex.TestImpex.test_format_tsv_from_stdin": { "uri": "file://test_ydb_impex.TestImpex.test_format_tsv_from_stdin/result.output" }, diff --git a/ydb/tests/functional/ydb_cli/canondata/test_ydb_impex.TestImpex.test_format_csv_delimeter_at_end_of_lines/result.output b/ydb/tests/functional/ydb_cli/canondata/test_ydb_impex.TestImpex.test_format_csv_delimeter_at_end_of_lines/result.output new file mode 100644 index 0000000000..d900a07535 --- /dev/null +++ b/ydb/tests/functional/ydb_cli/canondata/test_ydb_impex.TestImpex.test_format_csv_delimeter_at_end_of_lines/result.output @@ -0,0 +1,5 @@ +1,1111,"one" +2,2222,"two" +3,3333,"three" +5,5555,"five" +7,7777,"seven" diff --git a/ydb/tests/functional/ydb_cli/canondata/test_ydb_impex.TestImpex.test_format_csv_delimeter_at_end_of_lines_newline_delimited/result.output b/ydb/tests/functional/ydb_cli/canondata/test_ydb_impex.TestImpex.test_format_csv_delimeter_at_end_of_lines_newline_delimited/result.output new file mode 100644 index 0000000000..d900a07535 --- /dev/null +++ b/ydb/tests/functional/ydb_cli/canondata/test_ydb_impex.TestImpex.test_format_csv_delimeter_at_end_of_lines_newline_delimited/result.output @@ -0,0 +1,5 @@ +1,1111,"one" +2,2222,"two" +3,3333,"three" +5,5555,"five" +7,7777,"seven" diff --git a/ydb/tests/functional/ydb_cli/canondata/test_ydb_impex.TestImpex.test_format_tsv_delimeter_at_end_of_lines/result.output b/ydb/tests/functional/ydb_cli/canondata/test_ydb_impex.TestImpex.test_format_tsv_delimeter_at_end_of_lines/result.output new file mode 100644 index 0000000000..ebe43b48d1 --- /dev/null +++ b/ydb/tests/functional/ydb_cli/canondata/test_ydb_impex.TestImpex.test_format_tsv_delimeter_at_end_of_lines/result.output @@ -0,0 +1,5 @@ +1 1111 "one" +2 2222 "two" +3 3333 "three" +5 5555 "five" +7 7777 "seven" diff --git a/ydb/tests/functional/ydb_cli/canondata/test_ydb_impex.TestImpex.test_format_tsv_delimeter_at_end_of_lines_newline_delimited/result.output b/ydb/tests/functional/ydb_cli/canondata/test_ydb_impex.TestImpex.test_format_tsv_delimeter_at_end_of_lines_newline_delimited/result.output new file mode 100644 index 0000000000..ebe43b48d1 --- /dev/null +++ b/ydb/tests/functional/ydb_cli/canondata/test_ydb_impex.TestImpex.test_format_tsv_delimeter_at_end_of_lines_newline_delimited/result.output @@ -0,0 +1,5 @@ +1 1111 "one" +2 2222 "two" +3 3333 "three" +5 5555 "five" +7 7777 "seven" diff --git a/ydb/tests/functional/ydb_cli/test_ydb_impex.py b/ydb/tests/functional/ydb_cli/test_ydb_impex.py index f3aa9e4be8..c206f9c7da 100644 --- a/ydb/tests/functional/ydb_cli/test_ydb_impex.py +++ b/ydb/tests/functional/ydb_cli/test_ydb_impex.py @@ -49,12 +49,15 @@ DATA_ARRAY_CSV = ["""key,id,value DATA_ARRAY_CSV_BAD_HEADER = DATA_ARRAY_CSV DATA_ARRAY_CSV_BAD_HEADER[0].replace("key,id,value", 'a,b,c') +DATA_CSV_END_LINES = DATA_CSV.replace("\n", ",\n") + DATA_TSV = DATA_CSV.replace(',', '\t') DATA_ARRAY_TSV = list(map(lambda s: s.replace(',', '\t'), DATA_ARRAY_CSV)) DATA_ARRAY_TSV_BAD_HEADER = DATA_ARRAY_TSV DATA_ARRAY_TSV_BAD_HEADER[0].replace("key\tid\tvalue", 'a\tb\tc') +DATA_TSV_END_LINES = DATA_TSV.replace('\n', '\t\n') DATA_JSON = """{"key":1,"id":1111,"value":"one"} {"key":2,"id":2222,"value":"two"} @@ -199,11 +202,11 @@ class TestImpex(BaseTestTableService): for key in range(i * rows, (i + 1) * rows): f.write(TestImpex.get_row_in_format(ftype, key, id_set[key % len(id_set)], value_set[key % len(value_set)])) - def run_import(self, ftype, data): + def run_import(self, ftype, data, additional_args=[]): self.clear_table() with open("tempinput.{}".format(ftype), "w") as f: f.writelines(data) - self.execute_ydb_cli_command(["import", "file", ftype, "-p", self.table_path, "-i", "tempinput.{}".format(ftype)] + self.get_header_flag(ftype)) + self.execute_ydb_cli_command(["import", "file", ftype, "-p", self.table_path, "-i", "tempinput.{}".format(ftype)] + self.get_header_flag(ftype) + additional_args) def run_import_from_stdin(self, ftype, data): self.clear_table() @@ -243,6 +246,14 @@ class TestImpex(BaseTestTableService): self.run_import("csv", DATA_CSV) return self.run_export("csv") + def test_format_csv_delimeter_at_end_of_lines(self): + self.run_import("csv", DATA_CSV_END_LINES) + return self.run_export("csv") + + def test_format_csv_delimeter_at_end_of_lines_newline_delimited(self): + self.run_import("csv", DATA_CSV_END_LINES, ["--newline-delimited"]) + return self.run_export("csv") + def test_format_csv_from_stdin(self): self.run_import_from_stdin("csv", DATA_CSV) return self.run_export("csv") @@ -286,6 +297,14 @@ class TestImpex(BaseTestTableService): self.run_import("tsv", DATA_TSV) return self.run_export("tsv") + def test_format_tsv_delimeter_at_end_of_lines(self): + self.run_import("tsv", DATA_TSV_END_LINES) + return self.run_export("tsv") + + def test_format_tsv_delimeter_at_end_of_lines_newline_delimited(self): + self.run_import("tsv", DATA_TSV_END_LINES, ["--newline-delimited"]) + return self.run_export("tsv") + def test_format_tsv_from_stdin(self): self.run_import_from_stdin("tsv", DATA_TSV) return self.run_export("tsv") |