aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--ydb/core/io_formats/csv_arrow.cpp41
-rw-r--r--ydb/services/ydb/ydb_bulk_upsert_olap_ut.cpp55
2 files changed, 84 insertions, 12 deletions
diff --git a/ydb/core/io_formats/csv_arrow.cpp b/ydb/core/io_formats/csv_arrow.cpp
index 8834b02674..71e7ec31f5 100644
--- a/ydb/core/io_formats/csv_arrow.cpp
+++ b/ydb/core/io_formats/csv_arrow.cpp
@@ -86,23 +86,16 @@ std::shared_ptr<arrow::RecordBatch> TArrowCSV::ConvertColumnTypes(std::shared_pt
if (!parsedBatch) {
return parsedBatch;
}
- std::shared_ptr<arrow::Schema> schema;
- {
- arrow::SchemaBuilder sBuilder;
- for (auto&& f : parsedBatch->schema()->fields()) {
- Y_VERIFY(sBuilder.AddField(std::make_shared<arrow::Field>(f->name(), f->type())).ok());
- }
- auto resultSchema = sBuilder.Finish();
- Y_VERIFY(resultSchema.ok());
- schema = *resultSchema;
- }
+ const auto& schema = parsedBatch->schema();
std::vector<std::shared_ptr<arrow::Array>> resultColumns;
std::set<std::string> columnsFilter(ResultColumns.begin(), ResultColumns.end());
arrow::SchemaBuilder sBuilderFixed;
- for (auto&& f : schema->fields()) {
+
+ for (const auto& f : schema->fields()) {
auto fArr = parsedBatch->GetColumnByName(f->name());
+ Y_VERIFY(fArr);
std::shared_ptr<arrow::DataType> originalType;
if (columnsFilter.contains(f->name()) || columnsFilter.empty()) {
auto it = OriginalColumnTypes.find(f->name());
@@ -180,9 +173,33 @@ std::shared_ptr<arrow::RecordBatch> TArrowCSV::ReadNext(const TString& csv, TStr
}
std::shared_ptr<arrow::RecordBatch> batchParsed;
- Reader->ReadNext(&batchParsed).ok();
+ auto res = Reader->ReadNext(&batchParsed);
+ if (!res.ok()) {
+ errString = ErrorPrefix() + res.ToString();
+ return {};
+ }
+
+ if (batchParsed) {
+ if (!batchParsed->Validate().ok()) {
+ errString = ErrorPrefix() + batchParsed->Validate().ToString();
+ return {};
+ }
+ if (!batchParsed->schema()->HasDistinctFieldNames()) {
+ errString = ErrorPrefix() + "duplicate column names:";
+ for (auto& field : batchParsed->schema()->fields()) {
+ if (batchParsed->schema()->GetFieldIndex(field->name()) == -1) {
+ errString += " '" + field->name() + "'";
+ }
+ }
+ return {};
+ }
+ }
std::shared_ptr<arrow::RecordBatch> batch = ConvertColumnTypes(batchParsed);
+ if (batch && !batch->Validate().ok()) {
+ errString = ErrorPrefix() + batch->Validate().ToString();
+ return {};
+ }
if (batch && !ResultColumns.empty()) {
batch = NArrow::ExtractColumns(batch, ResultColumns);
diff --git a/ydb/services/ydb/ydb_bulk_upsert_olap_ut.cpp b/ydb/services/ydb/ydb_bulk_upsert_olap_ut.cpp
index 8a7977b478..8efc581bff 100644
--- a/ydb/services/ydb/ydb_bulk_upsert_olap_ut.cpp
+++ b/ydb/services/ydb/ydb_bulk_upsert_olap_ut.cpp
@@ -356,6 +356,61 @@ Y_UNIT_TEST_SUITE(YdbTableBulkUpsertOlap) {
UNIT_ASSERT_VALUES_EQUAL(rows.size(), 1);
UNIT_ASSERT_VALUES_EQUAL(rows[0],
"123123bs,testd,subscr,2020-01-17T22:58:50.000000Z,1973-11-27T01:52:03.000000Z,(empty maybe),http,ru,AsiaNovo,hello,{}");
+
+ result = session.DropTable(tablePath).GetValueSync();
+ UNIT_ASSERT_EQUAL(result.IsTransportError(), false);
+ UNIT_ASSERT_EQUAL(result.GetStatus(), EStatus::SUCCESS);
+ }
+
+ // KIKIMR-18859
+ {
+ // CREATE TABLE import1(a Text NOT NULL, b Text, c Text, PRIMARY KEY(a))
+ std::vector<std::pair<TString, NYdb::EPrimitiveType>> schema = {
+ { "a", NYdb::EPrimitiveType::Utf8 },
+ { "b", NYdb::EPrimitiveType::Utf8 },
+ { "c", NYdb::EPrimitiveType::Utf8 }
+ };
+
+ auto tableBuilder = client.GetTableBuilder();
+ for (auto& [name, type] : schema) {
+ if (name == "a") {
+ tableBuilder.AddNonNullableColumn(name, type);
+ } else {
+ tableBuilder.AddNullableColumn(name, type);
+ }
+ }
+ tableBuilder.SetPrimaryKeyColumns({"a"});
+ auto result = session.CreateTable(tablePath, tableBuilder.Build(), {}).ExtractValueSync();
+
+ UNIT_ASSERT_EQUAL(result.IsTransportError(), false);
+ UNIT_ASSERT_EQUAL(result.GetStatus(), EStatus::SUCCESS);
+
+ // for i in `seq 1 9999`; do echo 'aaa'"$i"',bbb,ccc'; done | ydb import file csv -p import1 --columns "a,b,b"
+ TString csv;
+ csv += "a,b,b\n"; // wrong header
+ for (size_t i = 0; i < 10000; ++i) {
+ csv += TString("aaa") + ToString(i) + ",bbb,ccc\n";
+ }
+
+ Ydb::Formats::CsvSettings csvSettings;
+ csvSettings.set_header(true);
+ csvSettings.set_delimiter(",");
+
+ TString formatSettings;
+ Y_PROTOBUF_SUPPRESS_NODISCARD csvSettings.SerializeToString(&formatSettings);
+
+ NYdb::NTable::TBulkUpsertSettings upsertSettings;
+ upsertSettings.FormatSettings(formatSettings);
+
+ auto res = client.BulkUpsert(tablePath,
+ NYdb::NTable::EDataFormat::CSV, csv, {}, upsertSettings).GetValueSync();
+
+ Cerr << res.GetStatus() << Endl;
+ UNIT_ASSERT_EQUAL_C(res.GetStatus(), EStatus::BAD_REQUEST, res.GetIssues().ToString());
+
+ result = session.DropTable(tablePath).GetValueSync();
+ UNIT_ASSERT_EQUAL(result.IsTransportError(), false);
+ UNIT_ASSERT_EQUAL(result.GetStatus(), EStatus::SUCCESS);
}
}