aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorchertus <azuikov@ydb.tech>2022-12-01 19:51:48 +0300
committerchertus <azuikov@ydb.tech>2022-12-01 19:51:48 +0300
commit0378892eff2ed24098930e64fec6083daf7627c9 (patch)
tree9e0f906a3c6e472840d0210a92f97ec0d9f2e053
parente05ad9efe2e6c957982dbd54241d153900a710e6 (diff)
downloadydb-0378892eff2ed24098930e64fec6083daf7627c9.tar.gz
fix CSV NULL parsing
-rw-r--r--ydb/core/formats/arrow_helpers.cpp1
-rw-r--r--ydb/core/grpc_services/rpc_load_rows.cpp27
-rw-r--r--ydb/core/io_formats/CMakeLists.txt1
-rw-r--r--ydb/core/io_formats/csv.h36
-rw-r--r--ydb/core/io_formats/csv_arrow.cpp39
-rw-r--r--ydb/core/io_formats/ut/CMakeLists.darwin.txt48
-rw-r--r--ydb/core/io_formats/ut/CMakeLists.linux-aarch64.txt50
-rw-r--r--ydb/core/io_formats/ut/CMakeLists.linux.txt52
-rw-r--r--ydb/core/io_formats/ut/CMakeLists.txt15
-rw-r--r--ydb/core/io_formats/ut_csv.cpp276
-rw-r--r--ydb/services/ydb/ydb_bulk_upsert_olap_ut.cpp142
11 files changed, 630 insertions, 57 deletions
diff --git a/ydb/core/formats/arrow_helpers.cpp b/ydb/core/formats/arrow_helpers.cpp
index 2137751d9cf..7920eb0e39a 100644
--- a/ydb/core/formats/arrow_helpers.cpp
+++ b/ydb/core/formats/arrow_helpers.cpp
@@ -1214,6 +1214,7 @@ bool TArrowToYdbConverter::Process(const arrow::RecordBatch& batch, TString& err
auto& curCell = cells[0][col];
if (column->IsNull(row)) {
curCell = TCell();
+ ++col;
continue;
}
diff --git a/ydb/core/grpc_services/rpc_load_rows.cpp b/ydb/core/grpc_services/rpc_load_rows.cpp
index 7cdb15f5ab7..7e6467d5503 100644
--- a/ydb/core/grpc_services/rpc_load_rows.cpp
+++ b/ydb/core/grpc_services/rpc_load_rows.cpp
@@ -612,11 +612,6 @@ private:
}
case EUploadSource::CSV:
{
- if (SrcColumns.empty()) {
- errorMessage = "Cannot upsert CSV: no src columns";
- return false;
- }
-
auto& data = GetSourceData();
auto& cvsSettings = GetCsvSettings();
ui32 skipRows = cvsSettings.skip_rows();
@@ -624,11 +619,8 @@ private:
auto& nullValue = cvsSettings.null_value();
bool withHeader = cvsSettings.header();
- ui32 blockSize = NFormats::TArrowCSV::DEFAULT_BLOCK_SIZE;
- if (data.size() >= blockSize) {
- blockSize *= data.size() / blockSize + 1;
- }
- NFormats::TArrowCSV reader(SrcColumns, skipRows, withHeader, blockSize);
+ NFormats::TArrowCSV reader(SrcColumns, withHeader);
+ reader.SetSkipRows(skipRows);
if (!delimiter.empty()) {
if (delimiter.size() != 1) {
@@ -643,16 +635,19 @@ private:
reader.SetNullValue(nullValue);
}
- Batch = reader.ReadNext(data, errorMessage);
+ if (data.size() > NFormats::TArrowCSV::DEFAULT_BLOCK_SIZE) {
+ ui32 blockSize = NFormats::TArrowCSV::DEFAULT_BLOCK_SIZE;
+ blockSize *= data.size() / blockSize + 1;
+ reader.SetBlockSize(blockSize);
+ }
+
+ Batch = reader.ReadSingleBatch(data, errorMessage);
if (!Batch) {
- if (errorMessage.empty()) {
- errorMessage = "Cannot read CSV data";
- }
return false;
}
- if (reader.ReadNext(data, errorMessage)) {
- errorMessage = "Too big CSV batch";
+ if (!Batch->num_rows()) {
+ errorMessage = "No rows in CSV";
return false;
}
diff --git a/ydb/core/io_formats/CMakeLists.txt b/ydb/core/io_formats/CMakeLists.txt
index 45aa4994beb..6e8e51e9673 100644
--- a/ydb/core/io_formats/CMakeLists.txt
+++ b/ydb/core/io_formats/CMakeLists.txt
@@ -6,6 +6,7 @@
# original buildsystem will not be accepted.
+add_subdirectory(ut)
add_library(ydb-core-io_formats)
target_compile_options(ydb-core-io_formats PRIVATE
diff --git a/ydb/core/io_formats/csv.h b/ydb/core/io_formats/csv.h
index 0924ac16850..8e9ef79e254 100644
--- a/ydb/core/io_formats/csv.h
+++ b/ydb/core/io_formats/csv.h
@@ -23,17 +23,27 @@ public:
/// If header is true read column names from first line after skipRows. Parse columns as strings in this case.
/// @note It's possible to skip header with skipRows and use typed columns instead.
- TArrowCSV(const TVector<std::pair<TString, NScheme::TTypeInfo>>& columns,
- ui32 skipRows = 0, bool header = false, ui32 blockSize = DEFAULT_BLOCK_SIZE);
+ TArrowCSV(const TVector<std::pair<TString, NScheme::TTypeInfo>>& columns, bool header = false);
std::shared_ptr<arrow::RecordBatch> ReadNext(const TString& csv, TString& errString);
+ std::shared_ptr<arrow::RecordBatch> ReadSingleBatch(const TString& csv, TString& errString);
void Reset() {
Reader = {};
}
- void SetDelimiter(char delimiter = ',') {
- ParseOptions.delimiter = delimiter;
+ void SetSkipRows(ui32 skipRows) {
+ ReadOptions.skip_rows = skipRows;
+ }
+
+ void SetBlockSize(ui32 blockSize = DEFAULT_BLOCK_SIZE) {
+ ReadOptions.block_size = blockSize;
+ }
+
+ void SetDelimiter(std::optional<char> delimiter) {
+ if (delimiter) {
+ ParseOptions.delimiter = *delimiter;
+ }
}
void SetQuoting(bool quoting = true, char quoteChar = '"', bool doubleQuote = true) {
@@ -47,16 +57,10 @@ public:
ParseOptions.escape_char = escapeChar;
}
- void SetNullValue(const TString& null) {
- if (!null.empty()) {
- ConvertOptions.null_values = { std::string(null.data(), null.size()) };
- ConvertOptions.strings_can_be_null = true;
- ConvertOptions.quoted_strings_can_be_null = true;
- } else {
- ConvertOptions.null_values.clear();
- ConvertOptions.strings_can_be_null = false;
- ConvertOptions.quoted_strings_can_be_null = true;
- }
+ void SetNullValue(const TString& null = "") {
+ ConvertOptions.null_values = { std::string(null.data(), null.size()) };
+ ConvertOptions.strings_can_be_null = true;
+ ConvertOptions.quoted_strings_can_be_null = false;
}
private:
@@ -65,6 +69,10 @@ private:
arrow::csv::ConvertOptions ConvertOptions;
std::shared_ptr<arrow::csv::StreamingReader> Reader;
std::vector<TString> ResultColumns;
+
+ static TString ErrorPrefix() {
+ return "Cannot read CSV: ";
+ }
};
}
diff --git a/ydb/core/io_formats/csv_arrow.cpp b/ydb/core/io_formats/csv_arrow.cpp
index e4a42f7f802..78a1a300ff1 100644
--- a/ydb/core/io_formats/csv_arrow.cpp
+++ b/ydb/core/io_formats/csv_arrow.cpp
@@ -3,16 +3,14 @@
namespace NKikimr::NFormats {
-TArrowCSV::TArrowCSV(const TVector<std::pair<TString, NScheme::TTypeInfo>>& columns, ui32 skipRows, bool header,
- ui32 blockSize)
+TArrowCSV::TArrowCSV(const TVector<std::pair<TString, NScheme::TTypeInfo>>& columns, bool header)
: ReadOptions(arrow::csv::ReadOptions::Defaults())
, ParseOptions(arrow::csv::ParseOptions::Defaults())
, ConvertOptions(arrow::csv::ConvertOptions::Defaults())
{
ConvertOptions.check_utf8 = false;
- ReadOptions.block_size = blockSize;
+ ReadOptions.block_size = DEFAULT_BLOCK_SIZE;
ReadOptions.use_threads = false;
- ReadOptions.skip_rows = skipRows;
ReadOptions.autogenerate_column_names = false;
if (header) {
// !autogenerate + column_names.empty() => read from CSV
@@ -32,26 +30,35 @@ TArrowCSV::TArrowCSV(const TVector<std::pair<TString, NScheme::TTypeInfo>>& colu
ReadOptions.column_names.push_back(columnName);
ConvertOptions.column_types[columnName] = NArrow::GetArrowType(type);
}
+#if 0
} else {
ReadOptions.autogenerate_column_names = true;
+#endif
}
+
+ SetNullValue(); // set default null value
}
std::shared_ptr<arrow::RecordBatch> TArrowCSV::ReadNext(const TString& csv, TString& errString) {
- if (!Reader && csv.Size()) {
+ if (!Reader) {
+ if (ConvertOptions.column_types.empty()) {
+ errString = ErrorPrefix() + "no columns specified";
+ return {};
+ }
+
auto buffer = std::make_shared<NArrow::TBufferOverString>(csv);
auto input = std::make_shared<arrow::io::BufferReader>(buffer);
auto res = arrow::csv::StreamingReader::Make(arrow::io::default_io_context(), input,
ReadOptions, ParseOptions, ConvertOptions);
if (!res.ok()) {
- errString = TStringBuilder() << "Cannot read CSV: " << res.status().ToString();
+ errString = ErrorPrefix() + res.status().ToString();
return {};
}
Reader = *res;
}
if (!Reader) {
- errString = "Cannot read CSV: no reader";
+ errString = ErrorPrefix() + "cannot make reader";
return {};
}
@@ -61,8 +68,24 @@ std::shared_ptr<arrow::RecordBatch> TArrowCSV::ReadNext(const TString& csv, TStr
if (batch && !ResultColumns.empty()) {
batch = NArrow::ExtractColumns(batch, ResultColumns);
if (!batch) {
- errString = "Cannot read CSV: not all result columns present";
+ errString = ErrorPrefix() + "not all result columns present";
+ }
+ }
+ return batch;
+}
+
+std::shared_ptr<arrow::RecordBatch> TArrowCSV::ReadSingleBatch(const TString& csv, TString& errString) {
+ auto batch = ReadNext(csv, errString);
+ if (!batch) {
+ if (errString.empty()) {
+ errString = ErrorPrefix();
}
+ return {};
+ }
+
+ if (ReadNext(csv, errString)) {
+ errString = ErrorPrefix() + "too big CSV data portion";
+ return {};
}
return batch;
}
diff --git a/ydb/core/io_formats/ut/CMakeLists.darwin.txt b/ydb/core/io_formats/ut/CMakeLists.darwin.txt
new file mode 100644
index 00000000000..d7632c83b9c
--- /dev/null
+++ b/ydb/core/io_formats/ut/CMakeLists.darwin.txt
@@ -0,0 +1,48 @@
+
+# This file was gererated by the build system used internally in the Yandex monorepo.
+# Only simple modifications are allowed (adding source-files to targets, adding simple properties
+# like target_include_directories). These modifications will be ported to original
+# ya.make files by maintainers. Any complex modifications which can't be ported back to the
+# original buildsystem will not be accepted.
+
+
+
+add_executable(ydb-core-io_formats-ut)
+target_compile_options(ydb-core-io_formats-ut PRIVATE
+ -DUSE_CURRENT_UDF_ABI_VERSION
+)
+target_include_directories(ydb-core-io_formats-ut PRIVATE
+ ${CMAKE_SOURCE_DIR}/ydb/core/io_formats
+)
+target_link_libraries(ydb-core-io_formats-ut PUBLIC
+ contrib-libs-cxxsupp
+ yutil
+ library-cpp-cpuid_check
+ cpp-testing-unittest_main
+ ydb-core-io_formats
+ udf-service-exception_policy
+ yql-sql-pg_dummy
+)
+target_link_options(ydb-core-io_formats-ut PRIVATE
+ -Wl,-no_deduplicate
+ -Wl,-sdk_version,10.15
+ -fPIC
+ -fPIC
+ -framework
+ CoreFoundation
+)
+target_sources(ydb-core-io_formats-ut PRIVATE
+ ${CMAKE_SOURCE_DIR}/ydb/core/io_formats/ut_csv.cpp
+)
+add_test(
+ NAME
+ ydb-core-io_formats-ut
+ COMMAND
+ ydb-core-io_formats-ut
+ --print-before-suite
+ --print-before-test
+ --fork-tests
+ --print-times
+ --show-fails
+)
+vcs_info(ydb-core-io_formats-ut)
diff --git a/ydb/core/io_formats/ut/CMakeLists.linux-aarch64.txt b/ydb/core/io_formats/ut/CMakeLists.linux-aarch64.txt
new file mode 100644
index 00000000000..86319d79e09
--- /dev/null
+++ b/ydb/core/io_formats/ut/CMakeLists.linux-aarch64.txt
@@ -0,0 +1,50 @@
+
+# This file was gererated by the build system used internally in the Yandex monorepo.
+# Only simple modifications are allowed (adding source-files to targets, adding simple properties
+# like target_include_directories). These modifications will be ported to original
+# ya.make files by maintainers. Any complex modifications which can't be ported back to the
+# original buildsystem will not be accepted.
+
+
+
+add_executable(ydb-core-io_formats-ut)
+target_compile_options(ydb-core-io_formats-ut PRIVATE
+ -DUSE_CURRENT_UDF_ABI_VERSION
+)
+target_include_directories(ydb-core-io_formats-ut PRIVATE
+ ${CMAKE_SOURCE_DIR}/ydb/core/io_formats
+)
+target_link_libraries(ydb-core-io_formats-ut PUBLIC
+ contrib-libs-cxxsupp
+ yutil
+ library-cpp-lfalloc
+ cpp-testing-unittest_main
+ ydb-core-io_formats
+ udf-service-exception_policy
+ yql-sql-pg_dummy
+)
+target_link_options(ydb-core-io_formats-ut PRIVATE
+ -ldl
+ -lrt
+ -Wl,--no-as-needed
+ -fPIC
+ -fPIC
+ -lpthread
+ -lrt
+ -ldl
+)
+target_sources(ydb-core-io_formats-ut PRIVATE
+ ${CMAKE_SOURCE_DIR}/ydb/core/io_formats/ut_csv.cpp
+)
+add_test(
+ NAME
+ ydb-core-io_formats-ut
+ COMMAND
+ ydb-core-io_formats-ut
+ --print-before-suite
+ --print-before-test
+ --fork-tests
+ --print-times
+ --show-fails
+)
+vcs_info(ydb-core-io_formats-ut)
diff --git a/ydb/core/io_formats/ut/CMakeLists.linux.txt b/ydb/core/io_formats/ut/CMakeLists.linux.txt
new file mode 100644
index 00000000000..3df69f44ed2
--- /dev/null
+++ b/ydb/core/io_formats/ut/CMakeLists.linux.txt
@@ -0,0 +1,52 @@
+
+# This file was gererated by the build system used internally in the Yandex monorepo.
+# Only simple modifications are allowed (adding source-files to targets, adding simple properties
+# like target_include_directories). These modifications will be ported to original
+# ya.make files by maintainers. Any complex modifications which can't be ported back to the
+# original buildsystem will not be accepted.
+
+
+
+add_executable(ydb-core-io_formats-ut)
+target_compile_options(ydb-core-io_formats-ut PRIVATE
+ -DUSE_CURRENT_UDF_ABI_VERSION
+)
+target_include_directories(ydb-core-io_formats-ut PRIVATE
+ ${CMAKE_SOURCE_DIR}/ydb/core/io_formats
+)
+target_link_libraries(ydb-core-io_formats-ut PUBLIC
+ contrib-libs-cxxsupp
+ yutil
+ cpp-malloc-tcmalloc
+ libs-tcmalloc-no_percpu_cache
+ library-cpp-cpuid_check
+ cpp-testing-unittest_main
+ ydb-core-io_formats
+ udf-service-exception_policy
+ yql-sql-pg_dummy
+)
+target_link_options(ydb-core-io_formats-ut PRIVATE
+ -ldl
+ -lrt
+ -Wl,--no-as-needed
+ -fPIC
+ -fPIC
+ -lpthread
+ -lrt
+ -ldl
+)
+target_sources(ydb-core-io_formats-ut PRIVATE
+ ${CMAKE_SOURCE_DIR}/ydb/core/io_formats/ut_csv.cpp
+)
+add_test(
+ NAME
+ ydb-core-io_formats-ut
+ COMMAND
+ ydb-core-io_formats-ut
+ --print-before-suite
+ --print-before-test
+ --fork-tests
+ --print-times
+ --show-fails
+)
+vcs_info(ydb-core-io_formats-ut)
diff --git a/ydb/core/io_formats/ut/CMakeLists.txt b/ydb/core/io_formats/ut/CMakeLists.txt
new file mode 100644
index 00000000000..3e0811fb22e
--- /dev/null
+++ b/ydb/core/io_formats/ut/CMakeLists.txt
@@ -0,0 +1,15 @@
+
+# This file was gererated by the build system used internally in the Yandex monorepo.
+# Only simple modifications are allowed (adding source-files to targets, adding simple properties
+# like target_include_directories). These modifications will be ported to original
+# ya.make files by maintainers. Any complex modifications which can't be ported back to the
+# original buildsystem will not be accepted.
+
+
+if (CMAKE_SYSTEM_PROCESSOR STREQUAL "aarch64" AND UNIX AND NOT APPLE AND NOT ANDROID)
+ include(CMakeLists.linux-aarch64.txt)
+elseif (APPLE)
+ include(CMakeLists.darwin.txt)
+elseif (CMAKE_SYSTEM_PROCESSOR STREQUAL "x86_64" AND UNIX AND NOT APPLE AND NOT ANDROID)
+ include(CMakeLists.linux.txt)
+endif()
diff --git a/ydb/core/io_formats/ut_csv.cpp b/ydb/core/io_formats/ut_csv.cpp
new file mode 100644
index 00000000000..38b16a67364
--- /dev/null
+++ b/ydb/core/io_formats/ut_csv.cpp
@@ -0,0 +1,276 @@
+#include "csv.h"
+
+#include <ydb/core/formats/arrow_helpers.h>
+#include <library/cpp/testing/unittest/registar.h>
+
+namespace NKikimr::NFormats {
+
+namespace {
+
+TString MakeHeader(const TVector<std::pair<TString, NScheme::TTypeInfo>>& columns, char delimiter) {
+ TString header;
+ for (auto& [name, _] : columns) {
+ header += name + delimiter;
+ }
+ if (header.size()) {
+ header.resize(header.size() - 1);
+ }
+ return header;
+}
+
+TString TestIntsData(const TVector<std::pair<TString, NScheme::TTypeInfo>>& columns, ui32 numRows,
+ char delimiter, TString endLine = "\n", bool addEmptyLine = false) {
+ TString data;
+ for (ui32 row = 0; row < numRows; ++row) {
+ if (data.size()) {
+ data.resize(data.size() - 1);
+ data += endLine;
+ }
+ for (size_t i = 0; i < columns.size(); ++i) {
+ data += ToString(row) + delimiter;
+ }
+ }
+ data.resize(data.size() - 1);
+ if (addEmptyLine) {
+ data += endLine;
+ }
+ return data;
+}
+
+std::shared_ptr<arrow::RecordBatch>
+TestReadSingleBatch(TArrowCSV& reader,
+ const TVector<std::pair<TString, NScheme::TTypeInfo>>& columns, const TString& data, ui32 numRows) {
+ TString errorMessage;
+ auto batch = reader.ReadSingleBatch(data, errorMessage);
+ if (!errorMessage.empty()) {
+ Cerr << errorMessage << "\n";
+ }
+ UNIT_ASSERT(batch);
+ UNIT_ASSERT(errorMessage.empty());
+ UNIT_ASSERT(batch->ValidateFull().ok());
+ UNIT_ASSERT_EQUAL(batch->num_rows(), numRows);
+ UNIT_ASSERT_EQUAL((size_t)batch->num_columns(), columns.size());
+
+ for (size_t i = 0; i < columns.size(); ++i) {
+ UNIT_ASSERT_EQUAL(columns[i].first, batch->schema()->field(i)->name());
+ UNIT_ASSERT(NArrow::GetArrowType(columns[i].second)->Equals(batch->schema()->field(i)->type()));
+ // TODO: check data
+ }
+ return batch;
+}
+
+std::shared_ptr<arrow::RecordBatch>
+TestReadSingleBatch(const TVector<std::pair<TString, NScheme::TTypeInfo>>& columns, const TString& data,
+ char delimiter, bool header, ui32 numRows, ui32 skipRows = 0, std::optional<char> escape = {}) {
+ TArrowCSV reader(columns, header);
+ reader.SetDelimiter(delimiter);
+ if (skipRows) {
+ reader.SetSkipRows(skipRows);
+ }
+ if (escape) {
+ reader.SetEscaping(true, *escape);
+ }
+
+ return TestReadSingleBatch(reader, columns, data, numRows);
+}
+
+}
+
+Y_UNIT_TEST_SUITE(FormatCSV) {
+ Y_UNIT_TEST(EmptyData) {
+ TString data = "";
+ TVector<std::pair<TString, NScheme::TTypeInfo>> columns;
+
+ {
+ TArrowCSV reader(columns, false);
+
+ TString errorMessage;
+ auto batch = reader.ReadNext(data, errorMessage);
+ Cerr << errorMessage << "\n";
+ UNIT_ASSERT(!batch);
+ UNIT_ASSERT(!errorMessage.empty());
+ }
+
+ {
+ columns = {
+ {"u32", NScheme::TTypeInfo(NScheme::NTypeIds::Uint32)},
+ {"i64", NScheme::TTypeInfo(NScheme::NTypeIds::Int64)}
+ };
+
+ TArrowCSV reader(columns, false);
+
+ TString errorMessage;
+ auto batch = reader.ReadNext(data, errorMessage);
+ Cerr << errorMessage << "\n";
+ UNIT_ASSERT(!batch);
+ UNIT_ASSERT(!errorMessage.empty());
+ }
+ }
+
+ Y_UNIT_TEST(Common) {
+ TVector<std::pair<TString, NScheme::TTypeInfo>> columns = {
+ {"u8", NScheme::TTypeInfo(NScheme::NTypeIds::Uint8)},
+ {"u16", NScheme::TTypeInfo(NScheme::NTypeIds::Uint16)},
+ {"u32", NScheme::TTypeInfo(NScheme::NTypeIds::Uint32)},
+ {"u64", NScheme::TTypeInfo(NScheme::NTypeIds::Uint64)},
+ {"i8", NScheme::TTypeInfo(NScheme::NTypeIds::Int8)},
+ {"i16", NScheme::TTypeInfo(NScheme::NTypeIds::Int16)},
+ {"i32", NScheme::TTypeInfo(NScheme::NTypeIds::Int32)},
+ {"i64", NScheme::TTypeInfo(NScheme::NTypeIds::Int64)}
+ };
+
+ // half of columns
+ auto uColumns = columns;
+ uColumns.resize(columns.size() / 2);
+
+ // another half of columns
+ TVector<std::pair<TString, NScheme::TTypeInfo>> sColumns(
+ columns.begin() + (columns.size() / 2), columns.end());
+
+ std::vector<char> delimiters = {',', ';', '\t'};
+ std::vector<TString> endlines = {"\n", "\r\n", "\r"};
+ bool addEmptyLine = false;
+ ui32 numRows = 10;
+
+ for (auto& endLine : endlines) {
+ for (auto delim : delimiters) {
+ // no header
+ addEmptyLine = !addEmptyLine;
+ TString csv = TestIntsData(columns, numRows, delim, endLine, addEmptyLine);
+ TestReadSingleBatch(columns, csv, delim, false, numRows);
+
+ // header, all columns
+ TString header = MakeHeader(columns, delim);
+ TestReadSingleBatch(columns, header + endLine + csv, delim, true, numRows);
+
+ // header, skip rows, all columns
+ TestReadSingleBatch(columns, TString("line1") + endLine + "line2" + endLine + header + endLine + csv,
+ delim, true, numRows, 2);
+
+ // header, some columns
+ TestReadSingleBatch(uColumns, header + endLine + csv, delim, true, numRows);
+ TestReadSingleBatch(sColumns, header + endLine + csv, delim, true, numRows);
+
+ // header, skip rows, some columns
+ TestReadSingleBatch(uColumns, endLine + header + endLine + csv, delim, true, numRows, 1);
+ TestReadSingleBatch(sColumns, endLine + header + endLine + csv, delim, true, numRows, 1);
+ }
+ }
+ }
+
+ Y_UNIT_TEST(Strings) {
+ TVector<std::pair<TString, NScheme::TTypeInfo>> columns = {
+ {"string", NScheme::TTypeInfo(NScheme::NTypeIds::String)},
+ {"utf8", NScheme::TTypeInfo(NScheme::NTypeIds::Utf8)}
+ };
+
+ // TODO: SetQuoting
+ std::vector<TString> quotes = {"\"", "\'", ""};
+
+ char delimiter = ',';
+ TString endLine = "\n";
+
+ for (auto& q : quotes) {
+ TString csv;
+ csv += q + "aaa" + q + delimiter + q + "bbbbb" + q + endLine;
+ csv += q + "123" + q + delimiter + q + "456" + q + endLine;
+ csv += q + "+-/*=" + q + delimiter + q + "~!@#$%^&*()?" + q + endLine;
+
+ TestReadSingleBatch(columns, csv, delimiter, false, 3);
+ }
+
+ for (auto& q : quotes) {
+ TString csv;
+ csv += q + "d\\'Artagnan" + q + delimiter + q + "Jeanne d'Arc" + q + endLine;
+ csv += q + "\\\'\\\"\\\'" + q + delimiter + q + "\\\"\\\'\\\"" + q + endLine;
+
+ auto batch = TestReadSingleBatch(columns, csv, delimiter, false, 2, 0, '\\');
+ for (auto& col : batch->columns()) {
+ auto& typedColumn = static_cast<arrow::BinaryArray&>(*col);
+ for (int i = 0; i < typedColumn.length(); ++i) {
+ auto view = typedColumn.GetView(i);
+ std::string_view value(view.data(), view.size());
+ Cerr << value << "\n";
+ }
+ }
+ }
+ }
+
+ Y_UNIT_TEST(Nulls) {
+ TVector<std::pair<TString, NScheme::TTypeInfo>> columns = {
+ {"u32", NScheme::TTypeInfo(NScheme::NTypeIds::Uint32)},
+ {"string", NScheme::TTypeInfo(NScheme::NTypeIds::String)},
+ {"utf8", NScheme::TTypeInfo(NScheme::NTypeIds::Utf8)}
+ };
+
+ std::vector<TString> nulls = {"", "", "\\N", "NULL"};
+ bool defaultNull = true;
+
+ char delimiter = ',';
+ TString endLine = "\n";
+ TString q = "\"";
+
+ std::string nullChar = "ᴺᵁᴸᴸ";
+
+ for (auto& null : nulls) {
+ TString csv;
+ csv += TString() + null + delimiter + q + q + delimiter + q + q + endLine;
+ csv += TString() + null + delimiter + q + null + q + delimiter + q + null + q + endLine;
+ csv += TString() + null + delimiter + null + delimiter + null + endLine;
+
+ TArrowCSV reader(columns, false);
+ if (!nulls.empty() || !defaultNull) {
+ reader.SetNullValue(null);
+ } else {
+ defaultNull = false;
+ }
+
+ auto batch = TestReadSingleBatch(reader, columns, csv, 3);
+
+ Cerr << "src:\n" << csv;
+
+ auto& ui32Column = static_cast<arrow::UInt32Array&>(*batch->columns()[0]);
+ auto& strColumn = static_cast<arrow::BinaryArray&>(*batch->columns()[1]);
+ auto& utf8Column = static_cast<arrow::StringArray&>(*batch->columns()[2]);
+
+ Cerr << "parsed:\n";
+
+ for (int i = 0; i < batch->num_rows(); ++i) {
+ if (ui32Column.IsNull(i)) {
+ Cerr << nullChar << delimiter;
+ } else {
+ Cerr << ui32Column.Value(i) << delimiter;
+ }
+
+ if (strColumn.IsNull(i)) {
+ Cerr << nullChar << delimiter;
+ } else {
+ auto view = strColumn.GetView(i);
+ std::string_view value(view.data(), view.size());
+ Cerr << value << delimiter;
+ }
+
+ if (utf8Column.IsNull(i)) {
+ Cerr << nullChar << "\n";
+ } else {
+ auto view = utf8Column.GetView(i);
+ std::string_view value(view.data(), view.size());
+ Cerr << value << "\n";
+ }
+
+ UNIT_ASSERT(ui32Column.IsNull(i));
+ UNIT_ASSERT(i == 2 || !strColumn.IsNull(i));
+ UNIT_ASSERT(i == 2 || !utf8Column.IsNull(i));
+ UNIT_ASSERT(i != 2 || strColumn.IsNull(i));
+ UNIT_ASSERT(i != 2 || utf8Column.IsNull(i));
+ }
+ }
+ }
+#if 0
+ Y_UNIT_TEST(Dates) {
+ // TODO
+ }
+#endif
+}
+
+}
diff --git a/ydb/services/ydb/ydb_bulk_upsert_olap_ut.cpp b/ydb/services/ydb/ydb_bulk_upsert_olap_ut.cpp
index 3db641abf7d..c671389d95a 100644
--- a/ydb/services/ydb/ydb_bulk_upsert_olap_ut.cpp
+++ b/ydb/services/ydb/ydb_bulk_upsert_olap_ut.cpp
@@ -11,18 +11,20 @@ using namespace NYdb;
namespace {
-ui32 ScanQuerySelect(NYdb::NTable::TTableClient client, const TString& tablePath,
- const std::vector<std::pair<TString, NYdb::EPrimitiveType>>& ydbSchema = TTestOlap::PublicSchema()) {
+std::vector<TString> ScanQuerySelect(
+ NYdb::NTable::TTableClient client, const TString& tablePath,
+ const std::vector<std::pair<TString, NYdb::EPrimitiveType>>& ydbSchema = TTestOlap::PublicSchema())
+{
auto query = Sprintf("SELECT * FROM `%s`", tablePath.c_str());
// Executes scan query
auto result = client.StreamExecuteScanQuery(query).GetValueSync();
if (!result.IsSuccess()) {
Cerr << "ScanQuery execution failure: " << result.GetIssues().ToString() << Endl;
- return 0;
+ return {};
}
- ui32 numRows = 0;
+ std::vector<TString> out;
bool eos = false;
Cout << "ScanQuery:" << Endl;
while (!eos) {
@@ -31,7 +33,7 @@ ui32 ScanQuerySelect(NYdb::NTable::TTableClient client, const TString& tablePath
eos = true;
if (!streamPart.EOS()) {
Cerr << "ScanQuery execution failure: " << streamPart.GetIssues().ToString() << Endl;
- return 0;
+ return {};
}
continue;
}
@@ -42,31 +44,50 @@ ui32 ScanQuerySelect(NYdb::NTable::TTableClient client, const TString& tablePath
TResultSetParser parser(rs);
while (parser.TryNextRow()) {
+ TStringBuilder ss;
+
for (auto& [colName, colType] : ydbSchema) {
switch (colType) {
case NYdb::EPrimitiveType::Timestamp:
- Cout << parser.ColumnParser(colName).GetOptionalTimestamp() << ", ";
+ ss << parser.ColumnParser(colName).GetOptionalTimestamp() << ",";
+ break;
+ case NYdb::EPrimitiveType::Datetime:
+ ss << parser.ColumnParser(colName).GetOptionalDatetime() << ",";
+ break;
+ case NYdb::EPrimitiveType::String: {
+ auto& col = parser.ColumnParser(colName);
+ if (col.GetKind() == TTypeParser::ETypeKind::Optional) {
+ ss << col.GetOptionalString() << ",";
+ } else {
+ ss << col.GetString() << ",";
+ }
break;
+ }
case NYdb::EPrimitiveType::Utf8:
- Cout << parser.ColumnParser(colName).GetOptionalUtf8() << ", ";
+ ss << parser.ColumnParser(colName).GetOptionalUtf8() << ",";
break;
case NYdb::EPrimitiveType::Int32:
- Cout << parser.ColumnParser(colName).GetOptionalInt32() << ", ";
+ ss << parser.ColumnParser(colName).GetOptionalInt32() << ",";
break;
case NYdb::EPrimitiveType::JsonDocument:
- Cout << parser.ColumnParser(colName).GetOptionalJsonDocument() << ", ";
+ ss << parser.ColumnParser(colName).GetOptionalJsonDocument() << ",";
break;
default:
- Cout << "<other>, ";
+ ss << "<other>,";
break;
}
}
- Cout << Endl;
- ++numRows;
+
+ out.emplace_back(TString(ss));
+ auto& str = out.back();
+ if (str.size()) {
+ str.resize(str.size() - 1);
+ }
+ Cout << str << Endl;
}
}
}
- return numRows;
+ return out;
}
}
@@ -104,8 +125,8 @@ Y_UNIT_TEST_SUITE(YdbTableBulkUpsertOlap) {
Cerr << "Upsert done: " << TInstant::Now() - start << Endl;
{ // Read all
- ui32 numRows = ScanQuerySelect(client, tablePath);
- UNIT_ASSERT_GT(numRows, 0);
+ auto rows = ScanQuerySelect(client, tablePath);
+ UNIT_ASSERT_GT(rows.size(), 0);
}
// Negatives
@@ -158,6 +179,89 @@ Y_UNIT_TEST_SUITE(YdbTableBulkUpsertOlap) {
}
}
+ Y_UNIT_TEST(UpsertCsvBug) {
+ NKikimrConfig::TAppConfig appConfig;
+ TKikimrWithGrpcAndRootSchema server(appConfig);
+ server.Server_->GetRuntime()->SetLogPriority(NKikimrServices::TX_COLUMNSHARD, NActors::NLog::PRI_DEBUG);
+
+ ui16 grpc = server.GetPort();
+ TString location = TStringBuilder() << "localhost:" << grpc;
+ auto connection = NYdb::TDriver(TDriverConfig().SetEndpoint(location));
+
+ NYdb::NTable::TTableClient client(connection);
+ auto session = client.GetSession().ExtractValueSync().GetSession();
+ TString tablePath = TTestOlap::TablePath;
+
+ { // KIKIMR-16411
+// CREATE TABLE subscriber (
+// id String NOT NULL,
+// email Utf8,
+// status String,
+// subscribed_at Datetime,
+// confirmed_at Datetime,
+// unsubscribed_at Datetime,
+// referrer Utf8,
+// language Utf8,
+// timezone Utf8,
+// ip_address String,
+// fields JsonDocument,
+// PRIMARY KEY (id)
+// );
+ std::vector<std::pair<TString, NYdb::EPrimitiveType>> schema = {
+ { "id", NYdb::EPrimitiveType::String },
+ { "email", NYdb::EPrimitiveType::Utf8 },
+ { "status", NYdb::EPrimitiveType::String },
+ { "subscribed_at", NYdb::EPrimitiveType::Datetime },
+ { "confirmed_at", NYdb::EPrimitiveType::Datetime },
+ { "unsubscribed_at", NYdb::EPrimitiveType::Datetime },
+ { "referrer", NYdb::EPrimitiveType::Utf8 },
+ { "language", NYdb::EPrimitiveType::Utf8 },
+ { "timezone", NYdb::EPrimitiveType::Utf8 },
+ { "ip_address", NYdb::EPrimitiveType::String },
+ { "fields", NYdb::EPrimitiveType::JsonDocument }
+ };
+
+ auto tableBuilder = client.GetTableBuilder();
+ for (auto& [name, type] : schema) {
+ if (name == "id") {
+ tableBuilder.AddNonNullableColumn(name, type);
+ } else {
+ tableBuilder.AddNullableColumn(name, type);
+ }
+ }
+ tableBuilder.SetPrimaryKeyColumns({"id"});
+ auto result = session.CreateTable(tablePath, tableBuilder.Build(), {}).ExtractValueSync();
+
+ UNIT_ASSERT_EQUAL(result.IsTransportError(), false);
+ UNIT_ASSERT_EQUAL(result.GetStatus(), EStatus::SUCCESS);
+
+ TString csv =
+ "id|email|status|subscribed_at|confirmed_at|unsubscribed_at|referrer|language|timezone|ip_address|fields\n"
+ "123123bs|testd|subscr|1579301930|123213123||http|ru|AsiaNovo|hello|\"{}\"\n";
+
+ Ydb::Formats::CsvSettings csvSettings;
+ csvSettings.set_header(true);
+ csvSettings.set_delimiter("|");
+
+ TString formatSettings;
+ Y_PROTOBUF_SUPPRESS_NODISCARD csvSettings.SerializeToString(&formatSettings);
+
+ NYdb::NTable::TBulkUpsertSettings upsertSettings;
+ upsertSettings.FormatSettings(formatSettings);
+
+ auto res = client.BulkUpsert(tablePath,
+ NYdb::NTable::EDataFormat::CSV, csv, {}, upsertSettings).GetValueSync();
+
+ Cerr << res.GetStatus() << Endl;
+ UNIT_ASSERT_EQUAL_C(res.GetStatus(), EStatus::SUCCESS, res.GetIssues().ToString());
+
+ auto rows = ScanQuerySelect(client, tablePath, schema);
+ UNIT_ASSERT_VALUES_EQUAL(rows.size(), 1);
+ UNIT_ASSERT_VALUES_EQUAL(rows[0],
+ "123123bs,testd,subscr,2020-01-17T22:58:50.000000Z,1973-11-27T01:52:03.000000Z,(empty maybe),http,ru,AsiaNovo,hello,{}");
+ }
+ }
+
Y_UNIT_TEST(UpsertCSV) {
NKikimrConfig::TAppConfig appConfig;
TKikimrWithGrpcAndRootSchema server(appConfig);
@@ -190,8 +294,8 @@ Y_UNIT_TEST_SUITE(YdbTableBulkUpsertOlap) {
Cerr << "Upsert done: " << TInstant::Now() - start << Endl;
{ // Read all
- ui32 numRows = ScanQuerySelect(client, tablePath);
- UNIT_ASSERT_GT(numRows, 0);
+ auto rows = ScanQuerySelect(client, tablePath);
+ UNIT_ASSERT_GT(rows.size(), 0);
}
// Negatives
@@ -339,8 +443,8 @@ Y_UNIT_TEST_SUITE(YdbTableBulkUpsertOlap) {
Cerr << "Upsert done: " << TInstant::Now() - start << Endl;
{ // Read all
- ui32 numRows = ScanQuerySelect(client, tablePath);
- UNIT_ASSERT_GT(numRows, 0);
+ auto rows = ScanQuerySelect(client, tablePath);
+ UNIT_ASSERT_GT(rows.size(), 0);
}
// Read