aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorchertus <azuikov@ydb.tech>2023-07-28 14:45:07 +0300
committerchertus <azuikov@ydb.tech>2023-07-28 14:45:07 +0300
commit5e02559e039805fbeaabcb29658780f74e76a588 (patch)
treeaac41e9ab0f327a548181efaf1cb13b3bfd978dc
parent8f1427a2b0ca7e19329ea7b7d59ea83ee9461464 (diff)
downloadydb-5e02559e039805fbeaabcb29658780f74e76a588.tar.gz
KIKIMR-18866 test for bulk upsert with dup fields in batch
-rw-r--r--ydb/services/ydb/ydb_bulk_upsert_olap_ut.cpp68
1 files changed, 68 insertions, 0 deletions
diff --git a/ydb/services/ydb/ydb_bulk_upsert_olap_ut.cpp b/ydb/services/ydb/ydb_bulk_upsert_olap_ut.cpp
index 8efc581bff..f9b8adb02c 100644
--- a/ydb/services/ydb/ydb_bulk_upsert_olap_ut.cpp
+++ b/ydb/services/ydb/ydb_bulk_upsert_olap_ut.cpp
@@ -195,6 +195,74 @@ Y_UNIT_TEST_SUITE(YdbTableBulkUpsertOlap) {
}
}
+ Y_UNIT_TEST(UpsertArrowDupField) {
+ NKikimrConfig::TAppConfig appConfig;
+ TKikimrWithGrpcAndRootSchema server(appConfig);
+ server.Server_->GetRuntime()->SetLogPriority(NKikimrServices::TX_COLUMNSHARD, NActors::NLog::PRI_DEBUG);
+
+ ui16 grpc = server.GetPort();
+ TString location = TStringBuilder() << "localhost:" << grpc;
+ auto connection = NYdb::TDriver(TDriverConfig().SetEndpoint(location));
+
+ NYdb::NTable::TTableClient client(connection);
+ auto session = client.GetSession().ExtractValueSync().GetSession();
+ TString tablePath = TTestOlap::TablePath;
+
+ // KIKIMR-18866
+ for (ui32 i = 0; i < 2; ++i) {
+ // CREATE TABLE import1(a Text NOT NULL, b Text, c Text, PRIMARY KEY(a))
+ std::vector<std::pair<TString, NYdb::EPrimitiveType>> ydbSchema = {
+ { "a", NYdb::EPrimitiveType::Utf8 },
+ { "b", NYdb::EPrimitiveType::Utf8 },
+ { "c", NYdb::EPrimitiveType::Utf8 }
+ };
+
+ auto tableBuilder = client.GetTableBuilder();
+ for (auto& [name, type] : ydbSchema) {
+ if (name == "a") {
+ tableBuilder.AddNonNullableColumn(name, type);
+ } else {
+ tableBuilder.AddNullableColumn(name, type);
+ }
+ }
+ tableBuilder.SetPrimaryKeyColumns({"a"});
+ auto result = session.CreateTable(tablePath, tableBuilder.Build(), {}).ExtractValueSync();
+
+ UNIT_ASSERT_EQUAL(result.IsTransportError(), false);
+ UNIT_ASSERT_EQUAL(result.GetStatus(), EStatus::SUCCESS);
+
+ std::vector<std::shared_ptr<arrow::Field>> fields;
+ fields.push_back(std::make_shared<arrow::Field>("a", arrow::utf8()));
+ fields.push_back(std::make_shared<arrow::Field>("b", arrow::utf8()));
+ fields.push_back(std::make_shared<arrow::Field>("b", arrow::utf8())); // not unique column
+ if (i) {
+ fields.push_back(std::make_shared<arrow::Field>("c", arrow::utf8()));
+ }
+
+ auto schema = std::make_shared<arrow::Schema>(fields);
+ std::unique_ptr<arrow::RecordBatchBuilder> builder;
+ arrow::RecordBatchBuilder::Make(schema, arrow::default_memory_pool(), 1, &builder).ok();
+ for (i64 i = 0; i < schema->num_fields(); ++i) {
+ builder->GetFieldAs<arrow::StringBuilder>(i)->Append("test").ok();
+ }
+ std::shared_ptr<arrow::RecordBatch> batch;
+ builder->Flush(false, &batch).ok();
+
+ TString strSchema = NArrow::SerializeSchema(*schema);
+ TString strBatch = NArrow::SerializeBatchNoCompression(batch);
+
+ auto res = client.BulkUpsert(tablePath,
+ NYdb::NTable::EDataFormat::ApacheArrow, strBatch, strSchema).GetValueSync();
+
+ Cerr << res.GetStatus() << Endl;
+ UNIT_ASSERT_EQUAL_C(res.GetStatus(), EStatus::BAD_REQUEST, res.GetIssues().ToString());
+
+ result = session.DropTable(tablePath).GetValueSync();
+ UNIT_ASSERT_EQUAL(result.IsTransportError(), false);
+ UNIT_ASSERT_EQUAL(result.GetStatus(), EStatus::SUCCESS);
+ }
+ }
+
Y_UNIT_TEST(ParquetImportBug) {
NKikimrConfig::TAppConfig appConfig;
TKikimrWithGrpcAndRootSchema server(appConfig);