diff options
author | chertus <azuikov@ydb.tech> | 2023-07-28 14:45:07 +0300 |
---|---|---|
committer | chertus <azuikov@ydb.tech> | 2023-07-28 14:45:07 +0300 |
commit | 5e02559e039805fbeaabcb29658780f74e76a588 (patch) | |
tree | aac41e9ab0f327a548181efaf1cb13b3bfd978dc | |
parent | 8f1427a2b0ca7e19329ea7b7d59ea83ee9461464 (diff) | |
download | ydb-5e02559e039805fbeaabcb29658780f74e76a588.tar.gz |
KIKIMR-18866 test for bulk upsert with dup fields in batch
-rw-r--r-- | ydb/services/ydb/ydb_bulk_upsert_olap_ut.cpp | 68 |
1 files changed, 68 insertions, 0 deletions
diff --git a/ydb/services/ydb/ydb_bulk_upsert_olap_ut.cpp b/ydb/services/ydb/ydb_bulk_upsert_olap_ut.cpp index 8efc581bff..f9b8adb02c 100644 --- a/ydb/services/ydb/ydb_bulk_upsert_olap_ut.cpp +++ b/ydb/services/ydb/ydb_bulk_upsert_olap_ut.cpp @@ -195,6 +195,74 @@ Y_UNIT_TEST_SUITE(YdbTableBulkUpsertOlap) { } } + Y_UNIT_TEST(UpsertArrowDupField) { + NKikimrConfig::TAppConfig appConfig; + TKikimrWithGrpcAndRootSchema server(appConfig); + server.Server_->GetRuntime()->SetLogPriority(NKikimrServices::TX_COLUMNSHARD, NActors::NLog::PRI_DEBUG); + + ui16 grpc = server.GetPort(); + TString location = TStringBuilder() << "localhost:" << grpc; + auto connection = NYdb::TDriver(TDriverConfig().SetEndpoint(location)); + + NYdb::NTable::TTableClient client(connection); + auto session = client.GetSession().ExtractValueSync().GetSession(); + TString tablePath = TTestOlap::TablePath; + + // KIKIMR-18866 + for (ui32 i = 0; i < 2; ++i) { + // CREATE TABLE import1(a Text NOT NULL, b Text, c Text, PRIMARY KEY(a)) + std::vector<std::pair<TString, NYdb::EPrimitiveType>> ydbSchema = { + { "a", NYdb::EPrimitiveType::Utf8 }, + { "b", NYdb::EPrimitiveType::Utf8 }, + { "c", NYdb::EPrimitiveType::Utf8 } + }; + + auto tableBuilder = client.GetTableBuilder(); + for (auto& [name, type] : ydbSchema) { + if (name == "a") { + tableBuilder.AddNonNullableColumn(name, type); + } else { + tableBuilder.AddNullableColumn(name, type); + } + } + tableBuilder.SetPrimaryKeyColumns({"a"}); + auto result = session.CreateTable(tablePath, tableBuilder.Build(), {}).ExtractValueSync(); + + UNIT_ASSERT_EQUAL(result.IsTransportError(), false); + UNIT_ASSERT_EQUAL(result.GetStatus(), EStatus::SUCCESS); + + std::vector<std::shared_ptr<arrow::Field>> fields; + fields.push_back(std::make_shared<arrow::Field>("a", arrow::utf8())); + fields.push_back(std::make_shared<arrow::Field>("b", arrow::utf8())); + fields.push_back(std::make_shared<arrow::Field>("b", arrow::utf8())); // not unique column + if (i) { + fields.push_back(std::make_shared<arrow::Field>("c", arrow::utf8())); + } + + auto schema = std::make_shared<arrow::Schema>(fields); + std::unique_ptr<arrow::RecordBatchBuilder> builder; + arrow::RecordBatchBuilder::Make(schema, arrow::default_memory_pool(), 1, &builder).ok(); + for (i64 i = 0; i < schema->num_fields(); ++i) { + builder->GetFieldAs<arrow::StringBuilder>(i)->Append("test").ok(); + } + std::shared_ptr<arrow::RecordBatch> batch; + builder->Flush(false, &batch).ok(); + + TString strSchema = NArrow::SerializeSchema(*schema); + TString strBatch = NArrow::SerializeBatchNoCompression(batch); + + auto res = client.BulkUpsert(tablePath, + NYdb::NTable::EDataFormat::ApacheArrow, strBatch, strSchema).GetValueSync(); + + Cerr << res.GetStatus() << Endl; + UNIT_ASSERT_EQUAL_C(res.GetStatus(), EStatus::BAD_REQUEST, res.GetIssues().ToString()); + + result = session.DropTable(tablePath).GetValueSync(); + UNIT_ASSERT_EQUAL(result.IsTransportError(), false); + UNIT_ASSERT_EQUAL(result.GetStatus(), EStatus::SUCCESS); + } + } + Y_UNIT_TEST(ParquetImportBug) { NKikimrConfig::TAppConfig appConfig; TKikimrWithGrpcAndRootSchema server(appConfig); |