diff options
-rw-r--r-- | ydb/core/io_formats/csv_arrow.cpp | 41 | ||||
-rw-r--r-- | ydb/services/ydb/ydb_bulk_upsert_olap_ut.cpp | 55 |
2 files changed, 84 insertions, 12 deletions
diff --git a/ydb/core/io_formats/csv_arrow.cpp b/ydb/core/io_formats/csv_arrow.cpp index 8834b02674..71e7ec31f5 100644 --- a/ydb/core/io_formats/csv_arrow.cpp +++ b/ydb/core/io_formats/csv_arrow.cpp @@ -86,23 +86,16 @@ std::shared_ptr<arrow::RecordBatch> TArrowCSV::ConvertColumnTypes(std::shared_pt if (!parsedBatch) { return parsedBatch; } - std::shared_ptr<arrow::Schema> schema; - { - arrow::SchemaBuilder sBuilder; - for (auto&& f : parsedBatch->schema()->fields()) { - Y_VERIFY(sBuilder.AddField(std::make_shared<arrow::Field>(f->name(), f->type())).ok()); - } - auto resultSchema = sBuilder.Finish(); - Y_VERIFY(resultSchema.ok()); - schema = *resultSchema; - } + const auto& schema = parsedBatch->schema(); std::vector<std::shared_ptr<arrow::Array>> resultColumns; std::set<std::string> columnsFilter(ResultColumns.begin(), ResultColumns.end()); arrow::SchemaBuilder sBuilderFixed; - for (auto&& f : schema->fields()) { + + for (const auto& f : schema->fields()) { auto fArr = parsedBatch->GetColumnByName(f->name()); + Y_VERIFY(fArr); std::shared_ptr<arrow::DataType> originalType; if (columnsFilter.contains(f->name()) || columnsFilter.empty()) { auto it = OriginalColumnTypes.find(f->name()); @@ -180,9 +173,33 @@ std::shared_ptr<arrow::RecordBatch> TArrowCSV::ReadNext(const TString& csv, TStr } std::shared_ptr<arrow::RecordBatch> batchParsed; - Reader->ReadNext(&batchParsed).ok(); + auto res = Reader->ReadNext(&batchParsed); + if (!res.ok()) { + errString = ErrorPrefix() + res.ToString(); + return {}; + } + + if (batchParsed) { + if (!batchParsed->Validate().ok()) { + errString = ErrorPrefix() + batchParsed->Validate().ToString(); + return {}; + } + if (!batchParsed->schema()->HasDistinctFieldNames()) { + errString = ErrorPrefix() + "duplicate column names:"; + for (auto& field : batchParsed->schema()->fields()) { + if (batchParsed->schema()->GetFieldIndex(field->name()) == -1) { + errString += " '" + field->name() + "'"; + } + } + return {}; + } + } std::shared_ptr<arrow::RecordBatch> batch = ConvertColumnTypes(batchParsed); + if (batch && !batch->Validate().ok()) { + errString = ErrorPrefix() + batch->Validate().ToString(); + return {}; + } if (batch && !ResultColumns.empty()) { batch = NArrow::ExtractColumns(batch, ResultColumns); diff --git a/ydb/services/ydb/ydb_bulk_upsert_olap_ut.cpp b/ydb/services/ydb/ydb_bulk_upsert_olap_ut.cpp index 8a7977b478..8efc581bff 100644 --- a/ydb/services/ydb/ydb_bulk_upsert_olap_ut.cpp +++ b/ydb/services/ydb/ydb_bulk_upsert_olap_ut.cpp @@ -356,6 +356,61 @@ Y_UNIT_TEST_SUITE(YdbTableBulkUpsertOlap) { UNIT_ASSERT_VALUES_EQUAL(rows.size(), 1); UNIT_ASSERT_VALUES_EQUAL(rows[0], "123123bs,testd,subscr,2020-01-17T22:58:50.000000Z,1973-11-27T01:52:03.000000Z,(empty maybe),http,ru,AsiaNovo,hello,{}"); + + result = session.DropTable(tablePath).GetValueSync(); + UNIT_ASSERT_EQUAL(result.IsTransportError(), false); + UNIT_ASSERT_EQUAL(result.GetStatus(), EStatus::SUCCESS); + } + + // KIKIMR-18859 + { + // CREATE TABLE import1(a Text NOT NULL, b Text, c Text, PRIMARY KEY(a)) + std::vector<std::pair<TString, NYdb::EPrimitiveType>> schema = { + { "a", NYdb::EPrimitiveType::Utf8 }, + { "b", NYdb::EPrimitiveType::Utf8 }, + { "c", NYdb::EPrimitiveType::Utf8 } + }; + + auto tableBuilder = client.GetTableBuilder(); + for (auto& [name, type] : schema) { + if (name == "a") { + tableBuilder.AddNonNullableColumn(name, type); + } else { + tableBuilder.AddNullableColumn(name, type); + } + } + tableBuilder.SetPrimaryKeyColumns({"a"}); + auto result = session.CreateTable(tablePath, tableBuilder.Build(), {}).ExtractValueSync(); + + UNIT_ASSERT_EQUAL(result.IsTransportError(), false); + UNIT_ASSERT_EQUAL(result.GetStatus(), EStatus::SUCCESS); + + // for i in `seq 1 9999`; do echo 'aaa'"$i"',bbb,ccc'; done | ydb import file csv -p import1 --columns "a,b,b" + TString csv; + csv += "a,b,b\n"; // wrong header + for (size_t i = 0; i < 10000; ++i) { + csv += TString("aaa") + ToString(i) + ",bbb,ccc\n"; + } + + Ydb::Formats::CsvSettings csvSettings; + csvSettings.set_header(true); + csvSettings.set_delimiter(","); + + TString formatSettings; + Y_PROTOBUF_SUPPRESS_NODISCARD csvSettings.SerializeToString(&formatSettings); + + NYdb::NTable::TBulkUpsertSettings upsertSettings; + upsertSettings.FormatSettings(formatSettings); + + auto res = client.BulkUpsert(tablePath, + NYdb::NTable::EDataFormat::CSV, csv, {}, upsertSettings).GetValueSync(); + + Cerr << res.GetStatus() << Endl; + UNIT_ASSERT_EQUAL_C(res.GetStatus(), EStatus::BAD_REQUEST, res.GetIssues().ToString()); + + result = session.DropTable(tablePath).GetValueSync(); + UNIT_ASSERT_EQUAL(result.IsTransportError(), false); + UNIT_ASSERT_EQUAL(result.GetStatus(), EStatus::SUCCESS); } } |