diff options
author | chertus <azuikov@ydb.tech> | 2022-12-01 19:51:48 +0300 |
---|---|---|
committer | chertus <azuikov@ydb.tech> | 2022-12-01 19:51:48 +0300 |
commit | 0378892eff2ed24098930e64fec6083daf7627c9 (patch) | |
tree | 9e0f906a3c6e472840d0210a92f97ec0d9f2e053 | |
parent | e05ad9efe2e6c957982dbd54241d153900a710e6 (diff) | |
download | ydb-0378892eff2ed24098930e64fec6083daf7627c9.tar.gz |
fix CSV NULL parsing
-rw-r--r-- | ydb/core/formats/arrow_helpers.cpp | 1 | ||||
-rw-r--r-- | ydb/core/grpc_services/rpc_load_rows.cpp | 27 | ||||
-rw-r--r-- | ydb/core/io_formats/CMakeLists.txt | 1 | ||||
-rw-r--r-- | ydb/core/io_formats/csv.h | 36 | ||||
-rw-r--r-- | ydb/core/io_formats/csv_arrow.cpp | 39 | ||||
-rw-r--r-- | ydb/core/io_formats/ut/CMakeLists.darwin.txt | 48 | ||||
-rw-r--r-- | ydb/core/io_formats/ut/CMakeLists.linux-aarch64.txt | 50 | ||||
-rw-r--r-- | ydb/core/io_formats/ut/CMakeLists.linux.txt | 52 | ||||
-rw-r--r-- | ydb/core/io_formats/ut/CMakeLists.txt | 15 | ||||
-rw-r--r-- | ydb/core/io_formats/ut_csv.cpp | 276 | ||||
-rw-r--r-- | ydb/services/ydb/ydb_bulk_upsert_olap_ut.cpp | 142 |
11 files changed, 630 insertions, 57 deletions
diff --git a/ydb/core/formats/arrow_helpers.cpp b/ydb/core/formats/arrow_helpers.cpp index 2137751d9cf..7920eb0e39a 100644 --- a/ydb/core/formats/arrow_helpers.cpp +++ b/ydb/core/formats/arrow_helpers.cpp @@ -1214,6 +1214,7 @@ bool TArrowToYdbConverter::Process(const arrow::RecordBatch& batch, TString& err auto& curCell = cells[0][col]; if (column->IsNull(row)) { curCell = TCell(); + ++col; continue; } diff --git a/ydb/core/grpc_services/rpc_load_rows.cpp b/ydb/core/grpc_services/rpc_load_rows.cpp index 7cdb15f5ab7..7e6467d5503 100644 --- a/ydb/core/grpc_services/rpc_load_rows.cpp +++ b/ydb/core/grpc_services/rpc_load_rows.cpp @@ -612,11 +612,6 @@ private: } case EUploadSource::CSV: { - if (SrcColumns.empty()) { - errorMessage = "Cannot upsert CSV: no src columns"; - return false; - } - auto& data = GetSourceData(); auto& cvsSettings = GetCsvSettings(); ui32 skipRows = cvsSettings.skip_rows(); @@ -624,11 +619,8 @@ private: auto& nullValue = cvsSettings.null_value(); bool withHeader = cvsSettings.header(); - ui32 blockSize = NFormats::TArrowCSV::DEFAULT_BLOCK_SIZE; - if (data.size() >= blockSize) { - blockSize *= data.size() / blockSize + 1; - } - NFormats::TArrowCSV reader(SrcColumns, skipRows, withHeader, blockSize); + NFormats::TArrowCSV reader(SrcColumns, withHeader); + reader.SetSkipRows(skipRows); if (!delimiter.empty()) { if (delimiter.size() != 1) { @@ -643,16 +635,19 @@ private: reader.SetNullValue(nullValue); } - Batch = reader.ReadNext(data, errorMessage); + if (data.size() > NFormats::TArrowCSV::DEFAULT_BLOCK_SIZE) { + ui32 blockSize = NFormats::TArrowCSV::DEFAULT_BLOCK_SIZE; + blockSize *= data.size() / blockSize + 1; + reader.SetBlockSize(blockSize); + } + + Batch = reader.ReadSingleBatch(data, errorMessage); if (!Batch) { - if (errorMessage.empty()) { - errorMessage = "Cannot read CSV data"; - } return false; } - if (reader.ReadNext(data, errorMessage)) { - errorMessage = "Too big CSV batch"; + if (!Batch->num_rows()) { + errorMessage = "No rows in CSV"; return false; } diff --git a/ydb/core/io_formats/CMakeLists.txt b/ydb/core/io_formats/CMakeLists.txt index 45aa4994beb..6e8e51e9673 100644 --- a/ydb/core/io_formats/CMakeLists.txt +++ b/ydb/core/io_formats/CMakeLists.txt @@ -6,6 +6,7 @@ # original buildsystem will not be accepted. +add_subdirectory(ut) add_library(ydb-core-io_formats) target_compile_options(ydb-core-io_formats PRIVATE diff --git a/ydb/core/io_formats/csv.h b/ydb/core/io_formats/csv.h index 0924ac16850..8e9ef79e254 100644 --- a/ydb/core/io_formats/csv.h +++ b/ydb/core/io_formats/csv.h @@ -23,17 +23,27 @@ public: /// If header is true read column names from first line after skipRows. Parse columns as strings in this case. /// @note It's possible to skip header with skipRows and use typed columns instead. - TArrowCSV(const TVector<std::pair<TString, NScheme::TTypeInfo>>& columns, - ui32 skipRows = 0, bool header = false, ui32 blockSize = DEFAULT_BLOCK_SIZE); + TArrowCSV(const TVector<std::pair<TString, NScheme::TTypeInfo>>& columns, bool header = false); std::shared_ptr<arrow::RecordBatch> ReadNext(const TString& csv, TString& errString); + std::shared_ptr<arrow::RecordBatch> ReadSingleBatch(const TString& csv, TString& errString); void Reset() { Reader = {}; } - void SetDelimiter(char delimiter = ',') { - ParseOptions.delimiter = delimiter; + void SetSkipRows(ui32 skipRows) { + ReadOptions.skip_rows = skipRows; + } + + void SetBlockSize(ui32 blockSize = DEFAULT_BLOCK_SIZE) { + ReadOptions.block_size = blockSize; + } + + void SetDelimiter(std::optional<char> delimiter) { + if (delimiter) { + ParseOptions.delimiter = *delimiter; + } } void SetQuoting(bool quoting = true, char quoteChar = '"', bool doubleQuote = true) { @@ -47,16 +57,10 @@ public: ParseOptions.escape_char = escapeChar; } - void SetNullValue(const TString& null) { - if (!null.empty()) { - ConvertOptions.null_values = { std::string(null.data(), null.size()) }; - ConvertOptions.strings_can_be_null = true; - ConvertOptions.quoted_strings_can_be_null = true; - } else { - ConvertOptions.null_values.clear(); - ConvertOptions.strings_can_be_null = false; - ConvertOptions.quoted_strings_can_be_null = true; - } + void SetNullValue(const TString& null = "") { + ConvertOptions.null_values = { std::string(null.data(), null.size()) }; + ConvertOptions.strings_can_be_null = true; + ConvertOptions.quoted_strings_can_be_null = false; } private: @@ -65,6 +69,10 @@ private: arrow::csv::ConvertOptions ConvertOptions; std::shared_ptr<arrow::csv::StreamingReader> Reader; std::vector<TString> ResultColumns; + + static TString ErrorPrefix() { + return "Cannot read CSV: "; + } }; } diff --git a/ydb/core/io_formats/csv_arrow.cpp b/ydb/core/io_formats/csv_arrow.cpp index e4a42f7f802..78a1a300ff1 100644 --- a/ydb/core/io_formats/csv_arrow.cpp +++ b/ydb/core/io_formats/csv_arrow.cpp @@ -3,16 +3,14 @@ namespace NKikimr::NFormats { -TArrowCSV::TArrowCSV(const TVector<std::pair<TString, NScheme::TTypeInfo>>& columns, ui32 skipRows, bool header, - ui32 blockSize) +TArrowCSV::TArrowCSV(const TVector<std::pair<TString, NScheme::TTypeInfo>>& columns, bool header) : ReadOptions(arrow::csv::ReadOptions::Defaults()) , ParseOptions(arrow::csv::ParseOptions::Defaults()) , ConvertOptions(arrow::csv::ConvertOptions::Defaults()) { ConvertOptions.check_utf8 = false; - ReadOptions.block_size = blockSize; + ReadOptions.block_size = DEFAULT_BLOCK_SIZE; ReadOptions.use_threads = false; - ReadOptions.skip_rows = skipRows; ReadOptions.autogenerate_column_names = false; if (header) { // !autogenerate + column_names.empty() => read from CSV @@ -32,26 +30,35 @@ TArrowCSV::TArrowCSV(const TVector<std::pair<TString, NScheme::TTypeInfo>>& colu ReadOptions.column_names.push_back(columnName); ConvertOptions.column_types[columnName] = NArrow::GetArrowType(type); } +#if 0 } else { ReadOptions.autogenerate_column_names = true; +#endif } + + SetNullValue(); // set default null value } std::shared_ptr<arrow::RecordBatch> TArrowCSV::ReadNext(const TString& csv, TString& errString) { - if (!Reader && csv.Size()) { + if (!Reader) { + if (ConvertOptions.column_types.empty()) { + errString = ErrorPrefix() + "no columns specified"; + return {}; + } + auto buffer = std::make_shared<NArrow::TBufferOverString>(csv); auto input = std::make_shared<arrow::io::BufferReader>(buffer); auto res = arrow::csv::StreamingReader::Make(arrow::io::default_io_context(), input, ReadOptions, ParseOptions, ConvertOptions); if (!res.ok()) { - errString = TStringBuilder() << "Cannot read CSV: " << res.status().ToString(); + errString = ErrorPrefix() + res.status().ToString(); return {}; } Reader = *res; } if (!Reader) { - errString = "Cannot read CSV: no reader"; + errString = ErrorPrefix() + "cannot make reader"; return {}; } @@ -61,8 +68,24 @@ std::shared_ptr<arrow::RecordBatch> TArrowCSV::ReadNext(const TString& csv, TStr if (batch && !ResultColumns.empty()) { batch = NArrow::ExtractColumns(batch, ResultColumns); if (!batch) { - errString = "Cannot read CSV: not all result columns present"; + errString = ErrorPrefix() + "not all result columns present"; + } + } + return batch; +} + +std::shared_ptr<arrow::RecordBatch> TArrowCSV::ReadSingleBatch(const TString& csv, TString& errString) { + auto batch = ReadNext(csv, errString); + if (!batch) { + if (errString.empty()) { + errString = ErrorPrefix(); } + return {}; + } + + if (ReadNext(csv, errString)) { + errString = ErrorPrefix() + "too big CSV data portion"; + return {}; } return batch; } diff --git a/ydb/core/io_formats/ut/CMakeLists.darwin.txt b/ydb/core/io_formats/ut/CMakeLists.darwin.txt new file mode 100644 index 00000000000..d7632c83b9c --- /dev/null +++ b/ydb/core/io_formats/ut/CMakeLists.darwin.txt @@ -0,0 +1,48 @@ + +# This file was gererated by the build system used internally in the Yandex monorepo. +# Only simple modifications are allowed (adding source-files to targets, adding simple properties +# like target_include_directories). These modifications will be ported to original +# ya.make files by maintainers. Any complex modifications which can't be ported back to the +# original buildsystem will not be accepted. + + + +add_executable(ydb-core-io_formats-ut) +target_compile_options(ydb-core-io_formats-ut PRIVATE + -DUSE_CURRENT_UDF_ABI_VERSION +) +target_include_directories(ydb-core-io_formats-ut PRIVATE + ${CMAKE_SOURCE_DIR}/ydb/core/io_formats +) +target_link_libraries(ydb-core-io_formats-ut PUBLIC + contrib-libs-cxxsupp + yutil + library-cpp-cpuid_check + cpp-testing-unittest_main + ydb-core-io_formats + udf-service-exception_policy + yql-sql-pg_dummy +) +target_link_options(ydb-core-io_formats-ut PRIVATE + -Wl,-no_deduplicate + -Wl,-sdk_version,10.15 + -fPIC + -fPIC + -framework + CoreFoundation +) +target_sources(ydb-core-io_formats-ut PRIVATE + ${CMAKE_SOURCE_DIR}/ydb/core/io_formats/ut_csv.cpp +) +add_test( + NAME + ydb-core-io_formats-ut + COMMAND + ydb-core-io_formats-ut + --print-before-suite + --print-before-test + --fork-tests + --print-times + --show-fails +) +vcs_info(ydb-core-io_formats-ut) diff --git a/ydb/core/io_formats/ut/CMakeLists.linux-aarch64.txt b/ydb/core/io_formats/ut/CMakeLists.linux-aarch64.txt new file mode 100644 index 00000000000..86319d79e09 --- /dev/null +++ b/ydb/core/io_formats/ut/CMakeLists.linux-aarch64.txt @@ -0,0 +1,50 @@ + +# This file was gererated by the build system used internally in the Yandex monorepo. +# Only simple modifications are allowed (adding source-files to targets, adding simple properties +# like target_include_directories). These modifications will be ported to original +# ya.make files by maintainers. Any complex modifications which can't be ported back to the +# original buildsystem will not be accepted. + + + +add_executable(ydb-core-io_formats-ut) +target_compile_options(ydb-core-io_formats-ut PRIVATE + -DUSE_CURRENT_UDF_ABI_VERSION +) +target_include_directories(ydb-core-io_formats-ut PRIVATE + ${CMAKE_SOURCE_DIR}/ydb/core/io_formats +) +target_link_libraries(ydb-core-io_formats-ut PUBLIC + contrib-libs-cxxsupp + yutil + library-cpp-lfalloc + cpp-testing-unittest_main + ydb-core-io_formats + udf-service-exception_policy + yql-sql-pg_dummy +) +target_link_options(ydb-core-io_formats-ut PRIVATE + -ldl + -lrt + -Wl,--no-as-needed + -fPIC + -fPIC + -lpthread + -lrt + -ldl +) +target_sources(ydb-core-io_formats-ut PRIVATE + ${CMAKE_SOURCE_DIR}/ydb/core/io_formats/ut_csv.cpp +) +add_test( + NAME + ydb-core-io_formats-ut + COMMAND + ydb-core-io_formats-ut + --print-before-suite + --print-before-test + --fork-tests + --print-times + --show-fails +) +vcs_info(ydb-core-io_formats-ut) diff --git a/ydb/core/io_formats/ut/CMakeLists.linux.txt b/ydb/core/io_formats/ut/CMakeLists.linux.txt new file mode 100644 index 00000000000..3df69f44ed2 --- /dev/null +++ b/ydb/core/io_formats/ut/CMakeLists.linux.txt @@ -0,0 +1,52 @@ + +# This file was gererated by the build system used internally in the Yandex monorepo. +# Only simple modifications are allowed (adding source-files to targets, adding simple properties +# like target_include_directories). These modifications will be ported to original +# ya.make files by maintainers. Any complex modifications which can't be ported back to the +# original buildsystem will not be accepted. + + + +add_executable(ydb-core-io_formats-ut) +target_compile_options(ydb-core-io_formats-ut PRIVATE + -DUSE_CURRENT_UDF_ABI_VERSION +) +target_include_directories(ydb-core-io_formats-ut PRIVATE + ${CMAKE_SOURCE_DIR}/ydb/core/io_formats +) +target_link_libraries(ydb-core-io_formats-ut PUBLIC + contrib-libs-cxxsupp + yutil + cpp-malloc-tcmalloc + libs-tcmalloc-no_percpu_cache + library-cpp-cpuid_check + cpp-testing-unittest_main + ydb-core-io_formats + udf-service-exception_policy + yql-sql-pg_dummy +) +target_link_options(ydb-core-io_formats-ut PRIVATE + -ldl + -lrt + -Wl,--no-as-needed + -fPIC + -fPIC + -lpthread + -lrt + -ldl +) +target_sources(ydb-core-io_formats-ut PRIVATE + ${CMAKE_SOURCE_DIR}/ydb/core/io_formats/ut_csv.cpp +) +add_test( + NAME + ydb-core-io_formats-ut + COMMAND + ydb-core-io_formats-ut + --print-before-suite + --print-before-test + --fork-tests + --print-times + --show-fails +) +vcs_info(ydb-core-io_formats-ut) diff --git a/ydb/core/io_formats/ut/CMakeLists.txt b/ydb/core/io_formats/ut/CMakeLists.txt new file mode 100644 index 00000000000..3e0811fb22e --- /dev/null +++ b/ydb/core/io_formats/ut/CMakeLists.txt @@ -0,0 +1,15 @@ + +# This file was gererated by the build system used internally in the Yandex monorepo. +# Only simple modifications are allowed (adding source-files to targets, adding simple properties +# like target_include_directories). These modifications will be ported to original +# ya.make files by maintainers. Any complex modifications which can't be ported back to the +# original buildsystem will not be accepted. + + +if (CMAKE_SYSTEM_PROCESSOR STREQUAL "aarch64" AND UNIX AND NOT APPLE AND NOT ANDROID) + include(CMakeLists.linux-aarch64.txt) +elseif (APPLE) + include(CMakeLists.darwin.txt) +elseif (CMAKE_SYSTEM_PROCESSOR STREQUAL "x86_64" AND UNIX AND NOT APPLE AND NOT ANDROID) + include(CMakeLists.linux.txt) +endif() diff --git a/ydb/core/io_formats/ut_csv.cpp b/ydb/core/io_formats/ut_csv.cpp new file mode 100644 index 00000000000..38b16a67364 --- /dev/null +++ b/ydb/core/io_formats/ut_csv.cpp @@ -0,0 +1,276 @@ +#include "csv.h" + +#include <ydb/core/formats/arrow_helpers.h> +#include <library/cpp/testing/unittest/registar.h> + +namespace NKikimr::NFormats { + +namespace { + +TString MakeHeader(const TVector<std::pair<TString, NScheme::TTypeInfo>>& columns, char delimiter) { + TString header; + for (auto& [name, _] : columns) { + header += name + delimiter; + } + if (header.size()) { + header.resize(header.size() - 1); + } + return header; +} + +TString TestIntsData(const TVector<std::pair<TString, NScheme::TTypeInfo>>& columns, ui32 numRows, + char delimiter, TString endLine = "\n", bool addEmptyLine = false) { + TString data; + for (ui32 row = 0; row < numRows; ++row) { + if (data.size()) { + data.resize(data.size() - 1); + data += endLine; + } + for (size_t i = 0; i < columns.size(); ++i) { + data += ToString(row) + delimiter; + } + } + data.resize(data.size() - 1); + if (addEmptyLine) { + data += endLine; + } + return data; +} + +std::shared_ptr<arrow::RecordBatch> +TestReadSingleBatch(TArrowCSV& reader, + const TVector<std::pair<TString, NScheme::TTypeInfo>>& columns, const TString& data, ui32 numRows) { + TString errorMessage; + auto batch = reader.ReadSingleBatch(data, errorMessage); + if (!errorMessage.empty()) { + Cerr << errorMessage << "\n"; + } + UNIT_ASSERT(batch); + UNIT_ASSERT(errorMessage.empty()); + UNIT_ASSERT(batch->ValidateFull().ok()); + UNIT_ASSERT_EQUAL(batch->num_rows(), numRows); + UNIT_ASSERT_EQUAL((size_t)batch->num_columns(), columns.size()); + + for (size_t i = 0; i < columns.size(); ++i) { + UNIT_ASSERT_EQUAL(columns[i].first, batch->schema()->field(i)->name()); + UNIT_ASSERT(NArrow::GetArrowType(columns[i].second)->Equals(batch->schema()->field(i)->type())); + // TODO: check data + } + return batch; +} + +std::shared_ptr<arrow::RecordBatch> +TestReadSingleBatch(const TVector<std::pair<TString, NScheme::TTypeInfo>>& columns, const TString& data, + char delimiter, bool header, ui32 numRows, ui32 skipRows = 0, std::optional<char> escape = {}) { + TArrowCSV reader(columns, header); + reader.SetDelimiter(delimiter); + if (skipRows) { + reader.SetSkipRows(skipRows); + } + if (escape) { + reader.SetEscaping(true, *escape); + } + + return TestReadSingleBatch(reader, columns, data, numRows); +} + +} + +Y_UNIT_TEST_SUITE(FormatCSV) { + Y_UNIT_TEST(EmptyData) { + TString data = ""; + TVector<std::pair<TString, NScheme::TTypeInfo>> columns; + + { + TArrowCSV reader(columns, false); + + TString errorMessage; + auto batch = reader.ReadNext(data, errorMessage); + Cerr << errorMessage << "\n"; + UNIT_ASSERT(!batch); + UNIT_ASSERT(!errorMessage.empty()); + } + + { + columns = { + {"u32", NScheme::TTypeInfo(NScheme::NTypeIds::Uint32)}, + {"i64", NScheme::TTypeInfo(NScheme::NTypeIds::Int64)} + }; + + TArrowCSV reader(columns, false); + + TString errorMessage; + auto batch = reader.ReadNext(data, errorMessage); + Cerr << errorMessage << "\n"; + UNIT_ASSERT(!batch); + UNIT_ASSERT(!errorMessage.empty()); + } + } + + Y_UNIT_TEST(Common) { + TVector<std::pair<TString, NScheme::TTypeInfo>> columns = { + {"u8", NScheme::TTypeInfo(NScheme::NTypeIds::Uint8)}, + {"u16", NScheme::TTypeInfo(NScheme::NTypeIds::Uint16)}, + {"u32", NScheme::TTypeInfo(NScheme::NTypeIds::Uint32)}, + {"u64", NScheme::TTypeInfo(NScheme::NTypeIds::Uint64)}, + {"i8", NScheme::TTypeInfo(NScheme::NTypeIds::Int8)}, + {"i16", NScheme::TTypeInfo(NScheme::NTypeIds::Int16)}, + {"i32", NScheme::TTypeInfo(NScheme::NTypeIds::Int32)}, + {"i64", NScheme::TTypeInfo(NScheme::NTypeIds::Int64)} + }; + + // half of columns + auto uColumns = columns; + uColumns.resize(columns.size() / 2); + + // another half of columns + TVector<std::pair<TString, NScheme::TTypeInfo>> sColumns( + columns.begin() + (columns.size() / 2), columns.end()); + + std::vector<char> delimiters = {',', ';', '\t'}; + std::vector<TString> endlines = {"\n", "\r\n", "\r"}; + bool addEmptyLine = false; + ui32 numRows = 10; + + for (auto& endLine : endlines) { + for (auto delim : delimiters) { + // no header + addEmptyLine = !addEmptyLine; + TString csv = TestIntsData(columns, numRows, delim, endLine, addEmptyLine); + TestReadSingleBatch(columns, csv, delim, false, numRows); + + // header, all columns + TString header = MakeHeader(columns, delim); + TestReadSingleBatch(columns, header + endLine + csv, delim, true, numRows); + + // header, skip rows, all columns + TestReadSingleBatch(columns, TString("line1") + endLine + "line2" + endLine + header + endLine + csv, + delim, true, numRows, 2); + + // header, some columns + TestReadSingleBatch(uColumns, header + endLine + csv, delim, true, numRows); + TestReadSingleBatch(sColumns, header + endLine + csv, delim, true, numRows); + + // header, skip rows, some columns + TestReadSingleBatch(uColumns, endLine + header + endLine + csv, delim, true, numRows, 1); + TestReadSingleBatch(sColumns, endLine + header + endLine + csv, delim, true, numRows, 1); + } + } + } + + Y_UNIT_TEST(Strings) { + TVector<std::pair<TString, NScheme::TTypeInfo>> columns = { + {"string", NScheme::TTypeInfo(NScheme::NTypeIds::String)}, + {"utf8", NScheme::TTypeInfo(NScheme::NTypeIds::Utf8)} + }; + + // TODO: SetQuoting + std::vector<TString> quotes = {"\"", "\'", ""}; + + char delimiter = ','; + TString endLine = "\n"; + + for (auto& q : quotes) { + TString csv; + csv += q + "aaa" + q + delimiter + q + "bbbbb" + q + endLine; + csv += q + "123" + q + delimiter + q + "456" + q + endLine; + csv += q + "+-/*=" + q + delimiter + q + "~!@#$%^&*()?" + q + endLine; + + TestReadSingleBatch(columns, csv, delimiter, false, 3); + } + + for (auto& q : quotes) { + TString csv; + csv += q + "d\\'Artagnan" + q + delimiter + q + "Jeanne d'Arc" + q + endLine; + csv += q + "\\\'\\\"\\\'" + q + delimiter + q + "\\\"\\\'\\\"" + q + endLine; + + auto batch = TestReadSingleBatch(columns, csv, delimiter, false, 2, 0, '\\'); + for (auto& col : batch->columns()) { + auto& typedColumn = static_cast<arrow::BinaryArray&>(*col); + for (int i = 0; i < typedColumn.length(); ++i) { + auto view = typedColumn.GetView(i); + std::string_view value(view.data(), view.size()); + Cerr << value << "\n"; + } + } + } + } + + Y_UNIT_TEST(Nulls) { + TVector<std::pair<TString, NScheme::TTypeInfo>> columns = { + {"u32", NScheme::TTypeInfo(NScheme::NTypeIds::Uint32)}, + {"string", NScheme::TTypeInfo(NScheme::NTypeIds::String)}, + {"utf8", NScheme::TTypeInfo(NScheme::NTypeIds::Utf8)} + }; + + std::vector<TString> nulls = {"", "", "\\N", "NULL"}; + bool defaultNull = true; + + char delimiter = ','; + TString endLine = "\n"; + TString q = "\""; + + std::string nullChar = "ᴺᵁᴸᴸ"; + + for (auto& null : nulls) { + TString csv; + csv += TString() + null + delimiter + q + q + delimiter + q + q + endLine; + csv += TString() + null + delimiter + q + null + q + delimiter + q + null + q + endLine; + csv += TString() + null + delimiter + null + delimiter + null + endLine; + + TArrowCSV reader(columns, false); + if (!nulls.empty() || !defaultNull) { + reader.SetNullValue(null); + } else { + defaultNull = false; + } + + auto batch = TestReadSingleBatch(reader, columns, csv, 3); + + Cerr << "src:\n" << csv; + + auto& ui32Column = static_cast<arrow::UInt32Array&>(*batch->columns()[0]); + auto& strColumn = static_cast<arrow::BinaryArray&>(*batch->columns()[1]); + auto& utf8Column = static_cast<arrow::StringArray&>(*batch->columns()[2]); + + Cerr << "parsed:\n"; + + for (int i = 0; i < batch->num_rows(); ++i) { + if (ui32Column.IsNull(i)) { + Cerr << nullChar << delimiter; + } else { + Cerr << ui32Column.Value(i) << delimiter; + } + + if (strColumn.IsNull(i)) { + Cerr << nullChar << delimiter; + } else { + auto view = strColumn.GetView(i); + std::string_view value(view.data(), view.size()); + Cerr << value << delimiter; + } + + if (utf8Column.IsNull(i)) { + Cerr << nullChar << "\n"; + } else { + auto view = utf8Column.GetView(i); + std::string_view value(view.data(), view.size()); + Cerr << value << "\n"; + } + + UNIT_ASSERT(ui32Column.IsNull(i)); + UNIT_ASSERT(i == 2 || !strColumn.IsNull(i)); + UNIT_ASSERT(i == 2 || !utf8Column.IsNull(i)); + UNIT_ASSERT(i != 2 || strColumn.IsNull(i)); + UNIT_ASSERT(i != 2 || utf8Column.IsNull(i)); + } + } + } +#if 0 + Y_UNIT_TEST(Dates) { + // TODO + } +#endif +} + +} diff --git a/ydb/services/ydb/ydb_bulk_upsert_olap_ut.cpp b/ydb/services/ydb/ydb_bulk_upsert_olap_ut.cpp index 3db641abf7d..c671389d95a 100644 --- a/ydb/services/ydb/ydb_bulk_upsert_olap_ut.cpp +++ b/ydb/services/ydb/ydb_bulk_upsert_olap_ut.cpp @@ -11,18 +11,20 @@ using namespace NYdb; namespace { -ui32 ScanQuerySelect(NYdb::NTable::TTableClient client, const TString& tablePath, - const std::vector<std::pair<TString, NYdb::EPrimitiveType>>& ydbSchema = TTestOlap::PublicSchema()) { +std::vector<TString> ScanQuerySelect( + NYdb::NTable::TTableClient client, const TString& tablePath, + const std::vector<std::pair<TString, NYdb::EPrimitiveType>>& ydbSchema = TTestOlap::PublicSchema()) +{ auto query = Sprintf("SELECT * FROM `%s`", tablePath.c_str()); // Executes scan query auto result = client.StreamExecuteScanQuery(query).GetValueSync(); if (!result.IsSuccess()) { Cerr << "ScanQuery execution failure: " << result.GetIssues().ToString() << Endl; - return 0; + return {}; } - ui32 numRows = 0; + std::vector<TString> out; bool eos = false; Cout << "ScanQuery:" << Endl; while (!eos) { @@ -31,7 +33,7 @@ ui32 ScanQuerySelect(NYdb::NTable::TTableClient client, const TString& tablePath eos = true; if (!streamPart.EOS()) { Cerr << "ScanQuery execution failure: " << streamPart.GetIssues().ToString() << Endl; - return 0; + return {}; } continue; } @@ -42,31 +44,50 @@ ui32 ScanQuerySelect(NYdb::NTable::TTableClient client, const TString& tablePath TResultSetParser parser(rs); while (parser.TryNextRow()) { + TStringBuilder ss; + for (auto& [colName, colType] : ydbSchema) { switch (colType) { case NYdb::EPrimitiveType::Timestamp: - Cout << parser.ColumnParser(colName).GetOptionalTimestamp() << ", "; + ss << parser.ColumnParser(colName).GetOptionalTimestamp() << ","; + break; + case NYdb::EPrimitiveType::Datetime: + ss << parser.ColumnParser(colName).GetOptionalDatetime() << ","; + break; + case NYdb::EPrimitiveType::String: { + auto& col = parser.ColumnParser(colName); + if (col.GetKind() == TTypeParser::ETypeKind::Optional) { + ss << col.GetOptionalString() << ","; + } else { + ss << col.GetString() << ","; + } break; + } case NYdb::EPrimitiveType::Utf8: - Cout << parser.ColumnParser(colName).GetOptionalUtf8() << ", "; + ss << parser.ColumnParser(colName).GetOptionalUtf8() << ","; break; case NYdb::EPrimitiveType::Int32: - Cout << parser.ColumnParser(colName).GetOptionalInt32() << ", "; + ss << parser.ColumnParser(colName).GetOptionalInt32() << ","; break; case NYdb::EPrimitiveType::JsonDocument: - Cout << parser.ColumnParser(colName).GetOptionalJsonDocument() << ", "; + ss << parser.ColumnParser(colName).GetOptionalJsonDocument() << ","; break; default: - Cout << "<other>, "; + ss << "<other>,"; break; } } - Cout << Endl; - ++numRows; + + out.emplace_back(TString(ss)); + auto& str = out.back(); + if (str.size()) { + str.resize(str.size() - 1); + } + Cout << str << Endl; } } } - return numRows; + return out; } } @@ -104,8 +125,8 @@ Y_UNIT_TEST_SUITE(YdbTableBulkUpsertOlap) { Cerr << "Upsert done: " << TInstant::Now() - start << Endl; { // Read all - ui32 numRows = ScanQuerySelect(client, tablePath); - UNIT_ASSERT_GT(numRows, 0); + auto rows = ScanQuerySelect(client, tablePath); + UNIT_ASSERT_GT(rows.size(), 0); } // Negatives @@ -158,6 +179,89 @@ Y_UNIT_TEST_SUITE(YdbTableBulkUpsertOlap) { } } + Y_UNIT_TEST(UpsertCsvBug) { + NKikimrConfig::TAppConfig appConfig; + TKikimrWithGrpcAndRootSchema server(appConfig); + server.Server_->GetRuntime()->SetLogPriority(NKikimrServices::TX_COLUMNSHARD, NActors::NLog::PRI_DEBUG); + + ui16 grpc = server.GetPort(); + TString location = TStringBuilder() << "localhost:" << grpc; + auto connection = NYdb::TDriver(TDriverConfig().SetEndpoint(location)); + + NYdb::NTable::TTableClient client(connection); + auto session = client.GetSession().ExtractValueSync().GetSession(); + TString tablePath = TTestOlap::TablePath; + + { // KIKIMR-16411 +// CREATE TABLE subscriber ( +// id String NOT NULL, +// email Utf8, +// status String, +// subscribed_at Datetime, +// confirmed_at Datetime, +// unsubscribed_at Datetime, +// referrer Utf8, +// language Utf8, +// timezone Utf8, +// ip_address String, +// fields JsonDocument, +// PRIMARY KEY (id) +// ); + std::vector<std::pair<TString, NYdb::EPrimitiveType>> schema = { + { "id", NYdb::EPrimitiveType::String }, + { "email", NYdb::EPrimitiveType::Utf8 }, + { "status", NYdb::EPrimitiveType::String }, + { "subscribed_at", NYdb::EPrimitiveType::Datetime }, + { "confirmed_at", NYdb::EPrimitiveType::Datetime }, + { "unsubscribed_at", NYdb::EPrimitiveType::Datetime }, + { "referrer", NYdb::EPrimitiveType::Utf8 }, + { "language", NYdb::EPrimitiveType::Utf8 }, + { "timezone", NYdb::EPrimitiveType::Utf8 }, + { "ip_address", NYdb::EPrimitiveType::String }, + { "fields", NYdb::EPrimitiveType::JsonDocument } + }; + + auto tableBuilder = client.GetTableBuilder(); + for (auto& [name, type] : schema) { + if (name == "id") { + tableBuilder.AddNonNullableColumn(name, type); + } else { + tableBuilder.AddNullableColumn(name, type); + } + } + tableBuilder.SetPrimaryKeyColumns({"id"}); + auto result = session.CreateTable(tablePath, tableBuilder.Build(), {}).ExtractValueSync(); + + UNIT_ASSERT_EQUAL(result.IsTransportError(), false); + UNIT_ASSERT_EQUAL(result.GetStatus(), EStatus::SUCCESS); + + TString csv = + "id|email|status|subscribed_at|confirmed_at|unsubscribed_at|referrer|language|timezone|ip_address|fields\n" + "123123bs|testd|subscr|1579301930|123213123||http|ru|AsiaNovo|hello|\"{}\"\n"; + + Ydb::Formats::CsvSettings csvSettings; + csvSettings.set_header(true); + csvSettings.set_delimiter("|"); + + TString formatSettings; + Y_PROTOBUF_SUPPRESS_NODISCARD csvSettings.SerializeToString(&formatSettings); + + NYdb::NTable::TBulkUpsertSettings upsertSettings; + upsertSettings.FormatSettings(formatSettings); + + auto res = client.BulkUpsert(tablePath, + NYdb::NTable::EDataFormat::CSV, csv, {}, upsertSettings).GetValueSync(); + + Cerr << res.GetStatus() << Endl; + UNIT_ASSERT_EQUAL_C(res.GetStatus(), EStatus::SUCCESS, res.GetIssues().ToString()); + + auto rows = ScanQuerySelect(client, tablePath, schema); + UNIT_ASSERT_VALUES_EQUAL(rows.size(), 1); + UNIT_ASSERT_VALUES_EQUAL(rows[0], + "123123bs,testd,subscr,2020-01-17T22:58:50.000000Z,1973-11-27T01:52:03.000000Z,(empty maybe),http,ru,AsiaNovo,hello,{}"); + } + } + Y_UNIT_TEST(UpsertCSV) { NKikimrConfig::TAppConfig appConfig; TKikimrWithGrpcAndRootSchema server(appConfig); @@ -190,8 +294,8 @@ Y_UNIT_TEST_SUITE(YdbTableBulkUpsertOlap) { Cerr << "Upsert done: " << TInstant::Now() - start << Endl; { // Read all - ui32 numRows = ScanQuerySelect(client, tablePath); - UNIT_ASSERT_GT(numRows, 0); + auto rows = ScanQuerySelect(client, tablePath); + UNIT_ASSERT_GT(rows.size(), 0); } // Negatives @@ -339,8 +443,8 @@ Y_UNIT_TEST_SUITE(YdbTableBulkUpsertOlap) { Cerr << "Upsert done: " << TInstant::Now() - start << Endl; { // Read all - ui32 numRows = ScanQuerySelect(client, tablePath); - UNIT_ASSERT_GT(numRows, 0); + auto rows = ScanQuerySelect(client, tablePath); + UNIT_ASSERT_GT(rows.size(), 0); } // Read |