diff options
author | nsofya <nsofya@yandex-team.com> | 2023-06-06 17:34:58 +0300 |
---|---|---|
committer | nsofya <nsofya@yandex-team.com> | 2023-06-06 17:34:58 +0300 |
commit | d0be5aa18d38add1dd73a3279bc23a1d4a3ad6a6 (patch) | |
tree | a772087ffac4399dd43d90045c2071eeb41a610d | |
parent | 7a5a8df74899e9512edd404d83d4ab73ce2de7d8 (diff) | |
download | ydb-d0be5aa18d38add1dd73a3279bc23a1d4a3ad6a6.tar.gz |
Add null columns if not present in input batch scheme
Add null columns if not presents in input batch scheme
-rw-r--r-- | ydb/core/formats/arrow/arrow_helpers.cpp | 15 | ||||
-rw-r--r-- | ydb/core/kqp/ut/scheme/kqp_scheme_ut.cpp | 35 | ||||
-rw-r--r-- | ydb/core/tx/columnshard/columnshard__write.cpp | 2 | ||||
-rw-r--r-- | ydb/core/tx/columnshard/columnshard__write_index.cpp | 3 | ||||
-rw-r--r-- | ydb/core/tx/columnshard/columnshard_impl.h | 4 | ||||
-rw-r--r-- | ydb/core/tx/columnshard/engines/index_info.cpp | 61 | ||||
-rw-r--r-- | ydb/core/tx/columnshard/engines/index_info.h | 2 | ||||
-rw-r--r-- | ydb/core/tx/columnshard/engines/portion_info.cpp | 61 | ||||
-rw-r--r-- | ydb/core/tx/columnshard/engines/portion_info.h | 1 | ||||
-rw-r--r-- | ydb/core/tx/columnshard/write_actor.cpp | 17 | ||||
-rw-r--r-- | ydb/core/tx/tx_proxy/upload_rows_common_impl.h | 11 |
11 files changed, 120 insertions, 92 deletions
diff --git a/ydb/core/formats/arrow/arrow_helpers.cpp b/ydb/core/formats/arrow/arrow_helpers.cpp index 21fc452453f..d248ae52dee 100644 --- a/ydb/core/formats/arrow/arrow_helpers.cpp +++ b/ydb/core/formats/arrow/arrow_helpers.cpp @@ -207,22 +207,23 @@ std::shared_ptr<arrow::RecordBatch> ExtractColumns(const std::shared_ptr<arrow:: for (auto& field : dstSchema->fields()) { columns.push_back(srcBatch->GetColumnByName(field->name())); - Y_VERIFY(columns.back()); - if (!columns.back()->type()->Equals(field->type())) { - columns.back() = {}; - } - if (!columns.back()) { if (addNotExisted) { auto result = arrow::MakeArrayOfNull(field->type(), srcBatch->num_rows()); if (!result.ok()) { - return {}; + return nullptr; } columns.back() = *result; } else { - return {}; + return nullptr; } } + + Y_VERIFY(columns.back()); + if (!columns.back()->type()->Equals(field->type())) { + AFL_ERROR(NKikimrServices::TX_COLUMNSHARD)("event", "cannot_parse_incoming_batch")("reason", "invalid_column_type")("column", field->name())("column_type", field->type()->ToString()); + return nullptr; + } } return arrow::RecordBatch::Make(dstSchema, srcBatch->num_rows(), columns); diff --git a/ydb/core/kqp/ut/scheme/kqp_scheme_ut.cpp b/ydb/core/kqp/ut/scheme/kqp_scheme_ut.cpp index 64a0ff7d802..78fc70f4b0f 100644 --- a/ydb/core/kqp/ut/scheme/kqp_scheme_ut.cpp +++ b/ydb/core/kqp/ut/scheme/kqp_scheme_ut.cpp @@ -4821,6 +4821,13 @@ Y_UNIT_TEST_SUITE(KqpOlapScheme) { UNIT_ASSERT_VALUES_EQUAL_C(resCommitTx.Status().GetStatus(), EStatus::SUCCESS, resCommitTx.Status().GetIssues().ToString()); } + void InsertBulk(const TColumnTable& table, TTestHelper::TUpdatesBuilder& updates, const EStatus opStatus = EStatus::SUCCESS) { + Y_UNUSED(opStatus); + NKikimr::Tests::NCS::THelper helper(Kikimr.GetTestServer()); + auto batch = updates.BuildArrow(); + helper.SendDataViaActorSystem(table.GetName(), batch); + } + void ReadData(const TString& query, const TString& expected) { auto it = TableClient.StreamExecuteScanQuery(query).GetValueSync(); UNIT_ASSERT_C(it.IsSuccess(), it.GetIssues().ToString()); @@ -4928,6 +4935,34 @@ Y_UNIT_TEST_SUITE(KqpOlapScheme) { } } + Y_UNIT_TEST(AddColumnOldSchemeBulkUpsert) { + TKikimrSettings runnerSettings; + runnerSettings.WithSampleTables = false; + TTestHelper testHelper(runnerSettings); + + TVector<TTestHelper::TColumnSchema> schema = { + TTestHelper::TColumnSchema().SetName("id").SetType(NScheme::NTypeIds::Int32).SetNullable(false), + TTestHelper::TColumnSchema().SetName("resource_id").SetType(NScheme::NTypeIds::Utf8), + TTestHelper::TColumnSchema().SetName("level").SetType(NScheme::NTypeIds::Int32) + }; + + TTestHelper::TColumnTable testTable; + + testTable.SetName("/Root/ColumnTableTest").SetPrimaryKey({"id"}).SetSharding({"id"}).SetSchema(schema); + testHelper.CreateTable(testTable); + { + auto alterQuery = TStringBuilder() << "ALTER TABLE `" << testTable.GetName() << "` ADD COLUMN new_column Uint64;"; + auto alterResult = testHelper.GetSession().ExecuteSchemeQuery(alterQuery).GetValueSync(); + UNIT_ASSERT_VALUES_EQUAL_C(alterResult.GetStatus(), EStatus::SUCCESS, alterResult.GetIssues().ToString()); + } + { + TTestHelper::TUpdatesBuilder tableInserter(testTable.GetArrowSchema(schema)); + tableInserter.AddRow().Add(1).Add("test_res_1").AddNull(); + testHelper.InsertBulk(testTable, tableInserter, EStatus::SUCCESS); + } + testHelper.ReadData("SELECT * FROM `/Root/ColumnTableTest` WHERE id=1", "[[1;#;#;[\"test_res_1\"]]]"); + } + Y_UNIT_TEST(AddColumnOnSchemeChange) { TKikimrSettings runnerSettings; runnerSettings.WithSampleTables = false; diff --git a/ydb/core/tx/columnshard/columnshard__write.cpp b/ydb/core/tx/columnshard/columnshard__write.cpp index 0dcb1a12e56..3869a4760ea 100644 --- a/ydb/core/tx/columnshard/columnshard__write.cpp +++ b/ydb/core/tx/columnshard/columnshard__write.cpp @@ -224,7 +224,7 @@ void TColumnShard::Handle(TEvColumnShard::TEvWrite::TPtr& ev, const TActorContex ++WritesInFly; // write started const auto& snapshotSchema = TablesManager.GetPrimaryIndex()->GetVersionedIndex().GetLastSchema(); - ctx.Register(CreateWriteActor(TabletID(), snapshotSchema->GetIndexInfo(), ctx.SelfID, + ctx.Register(CreateWriteActor(TabletID(), snapshotSchema, ctx.SelfID, BlobManager->StartBlobBatch(), Settings.BlobWriteGrouppingEnabled, ev->Release())); } diff --git a/ydb/core/tx/columnshard/columnshard__write_index.cpp b/ydb/core/tx/columnshard/columnshard__write_index.cpp index 82754ad0a72..c7b59eb4bd5 100644 --- a/ydb/core/tx/columnshard/columnshard__write_index.cpp +++ b/ydb/core/tx/columnshard/columnshard__write_index.cpp @@ -344,7 +344,8 @@ void TColumnShard::Handle(TEvPrivate::TEvWriteIndex::TPtr& ev, const TActorConte LOG_S_DEBUG("WriteIndex (" << blobs.size() << " blobs) at tablet " << TabletID()); Y_VERIFY(!blobs.empty()); - ctx.Register(CreateWriteActor(TabletID(), NOlap::TIndexInfo::BuildDefault(), ctx.SelfID, + auto snapshotSchema = std::make_shared<NOlap::TSnapshotSchema>(NOlap::TIndexInfo::BuildDefault(), NOlap::TSnapshot::Zero()); + ctx.Register(CreateWriteActor(TabletID(), snapshotSchema, ctx.SelfID, BlobManager->StartBlobBatch(), Settings.BlobWriteGrouppingEnabled, ev->Release())); } } else { diff --git a/ydb/core/tx/columnshard/columnshard_impl.h b/ydb/core/tx/columnshard/columnshard_impl.h index e83d109f13d..4670398b548 100644 --- a/ydb/core/tx/columnshard/columnshard_impl.h +++ b/ydb/core/tx/columnshard/columnshard_impl.h @@ -27,10 +27,10 @@ extern bool gAllowLogBatchingDefaultValue; IActor* CreateIndexingActor(ui64 tabletId, const TActorId& parent, const TIndexationCounters& counters); IActor* CreateCompactionActor(ui64 tabletId, const TActorId& parent, const ui64 workers); IActor* CreateEvictionActor(ui64 tabletId, const TActorId& parent, const TIndexationCounters& counters); -IActor* CreateWriteActor(ui64 tabletId, const NOlap::TIndexInfo& indexTable, +IActor* CreateWriteActor(ui64 tabletId, const NOlap::ISnapshotSchema::TPtr& snapshotSchema, const TActorId& dstActor, TBlobBatch&& blobBatch, bool blobGrouppingEnabled, TAutoPtr<TEvColumnShard::TEvWrite> ev, const TInstant& deadline = TInstant::Max()); -IActor* CreateWriteActor(ui64 tabletId, const NOlap::TIndexInfo& indexTable, +IActor* CreateWriteActor(ui64 tabletId, const NOlap::ISnapshotSchema::TPtr& snapshotSchema, const TActorId& dstActor, TBlobBatch&& blobBatch, bool blobGrouppingEnabled, TAutoPtr<TEvPrivate::TEvWriteIndex> ev, const TInstant& deadline = TInstant::Max()); IActor* CreateReadActor(ui64 tabletId, diff --git a/ydb/core/tx/columnshard/engines/index_info.cpp b/ydb/core/tx/columnshard/engines/index_info.cpp index 233f69c66cb..886b67762d2 100644 --- a/ydb/core/tx/columnshard/engines/index_info.cpp +++ b/ydb/core/tx/columnshard/engines/index_info.cpp @@ -129,67 +129,6 @@ std::vector<TNameTypeInfo> TIndexInfo::GetColumns(const std::vector<ui32>& ids) return NOlap::GetColumns(*this, ids); } -std::shared_ptr<arrow::RecordBatch> TIndexInfo::PrepareForInsert(const TString& data, const TString& metadata, - TString& strError) const { - std::shared_ptr<arrow::Schema> schema = ArrowSchema(); - std::shared_ptr<arrow::Schema> differentSchema; - if (metadata.size()) { - differentSchema = NArrow::DeserializeSchema(metadata); - if (!differentSchema) { - strError = "DeserializeSchema() failed"; - return {}; - } - } - - auto batch = NArrow::DeserializeBatch(data, (differentSchema ? differentSchema : schema)); - if (!batch) { - strError = "DeserializeBatch() failed"; - return {}; - } - if (batch->num_rows() == 0) { - strError = "empty batch"; - return {}; - } - - // Correct schema - if (differentSchema) { - batch = NArrow::ExtractColumns(batch, ArrowSchema()); - if (!batch) { - strError = "cannot correct schema"; - return {}; - } - } - - if (!batch->schema()->Equals(ArrowSchema())) { - strError = "unexpected schema for insert batch: '" + batch->schema()->ToString() + "'"; - return {}; - } - - // Check PK is NOT NULL - for (auto& field : SortingKey->fields()) { - auto column = batch->GetColumnByName(field->name()); - if (!column) { - strError = "missing PK column '" + field->name() + "'"; - return {}; - } - if (NArrow::HasNulls(column)) { - strError = "PK column '" + field->name() + "' contains NULLs"; - return {}; - } - } - - auto status = batch->ValidateFull(); - if (!status.ok()) { - strError = status.ToString(); - return {}; - } - - Y_VERIFY(SortingKey); - batch = NArrow::SortBatch(batch, SortingKey); - Y_VERIFY_DEBUG(NArrow::IsSorted(batch, SortingKey)); - return batch; -} - std::shared_ptr<arrow::Schema> TIndexInfo::ArrowSchema() const { if (!Schema) { std::vector<ui32> ids; diff --git a/ydb/core/tx/columnshard/engines/index_info.h b/ydb/core/tx/columnshard/engines/index_info.h index 446ea3d108a..988f3f7b165 100644 --- a/ydb/core/tx/columnshard/engines/index_info.h +++ b/ydb/core/tx/columnshard/engines/index_info.h @@ -259,8 +259,6 @@ public: std::shared_ptr<arrow::Schema> ArrowSchema(const std::vector<ui32>& columnIds, bool withSpecials = false) const; std::shared_ptr<arrow::Schema> ArrowSchema(const std::vector<TString>& columnNames) const; std::shared_ptr<arrow::Field> ArrowColumnField(ui32 columnId) const; - std::shared_ptr<arrow::RecordBatch> PrepareForInsert(const TString& data, const TString& metadata, - TString& strError) const; const THashSet<TString>& GetRequiredColumns() const { return RequiredColumns; diff --git a/ydb/core/tx/columnshard/engines/portion_info.cpp b/ydb/core/tx/columnshard/engines/portion_info.cpp index b40b25a6340..196b153fcf0 100644 --- a/ydb/core/tx/columnshard/engines/portion_info.cpp +++ b/ydb/core/tx/columnshard/engines/portion_info.cpp @@ -31,6 +31,67 @@ std::shared_ptr<arrow::RecordBatch> ISnapshotSchema::NormalizeBatch(const ISnaps return arrow::RecordBatch::Make(resultArrowSchema, batch->num_rows(), newColumns); } +std::shared_ptr<arrow::RecordBatch> ISnapshotSchema::PrepareForInsert(const TString& data, const TString& dataSchemaStr, TString& strError) const { + std::shared_ptr<arrow::Schema> dstSchema = GetIndexInfo().ArrowSchema(); + std::shared_ptr<arrow::Schema> dataSchema; + if (dataSchemaStr.size()) { + dataSchema = NArrow::DeserializeSchema(dataSchemaStr); + if (!dataSchema) { + strError = "DeserializeSchema() failed"; + return nullptr; + } + } + + auto batch = NArrow::DeserializeBatch(data, (dataSchema ? dataSchema : dstSchema)); + if (!batch) { + strError = "DeserializeBatch() failed"; + return nullptr; + } + if (batch->num_rows() == 0) { + strError = "empty batch"; + return nullptr; + } + + // Correct schema + if (dataSchema) { + batch = NArrow::ExtractColumns(batch, dstSchema, true); + if (!batch) { + strError = "cannot correct schema"; + return nullptr; + } + } + + if (!batch->schema()->Equals(dstSchema)) { + strError = "unexpected schema for insert batch: '" + batch->schema()->ToString() + "'"; + return nullptr; + } + + const auto& sortingKey = GetIndexInfo().GetSortingKey(); + Y_VERIFY(sortingKey); + + // Check PK is NOT NULL + for (auto& field : sortingKey->fields()) { + auto column = batch->GetColumnByName(field->name()); + if (!column) { + strError = "missing PK column '" + field->name() + "'"; + return nullptr; + } + if (NArrow::HasNulls(column)) { + strError = "PK column '" + field->name() + "' contains NULLs"; + return nullptr; + } + } + + auto status = batch->ValidateFull(); + if (!status.ok()) { + strError = status.ToString(); + return nullptr; + } + batch = NArrow::SortBatch(batch, sortingKey); + Y_VERIFY_DEBUG(NArrow::IsSorted(batch, sortingKey)); + return batch; +} + TString TPortionInfo::SerializeColumn(const std::shared_ptr<arrow::Array>& array, const std::shared_ptr<arrow::Field>& field, const TColumnSaver saver) { diff --git a/ydb/core/tx/columnshard/engines/portion_info.h b/ydb/core/tx/columnshard/engines/portion_info.h index ae81f94ecba..bd96e230781 100644 --- a/ydb/core/tx/columnshard/engines/portion_info.h +++ b/ydb/core/tx/columnshard/engines/portion_info.h @@ -49,6 +49,7 @@ public: virtual const TSnapshot& GetSnapshot() const = 0; std::shared_ptr<arrow::RecordBatch> NormalizeBatch(const ISnapshotSchema& dataSchema, const std::shared_ptr<arrow::RecordBatch> batch) const; + std::shared_ptr<arrow::RecordBatch> PrepareForInsert(const TString& data, const TString& dataSchemaStr, TString& strError) const; }; class TSnapshotSchema: public ISnapshotSchema { diff --git a/ydb/core/tx/columnshard/write_actor.cpp b/ydb/core/tx/columnshard/write_actor.cpp index 7ef9b70e4ba..1d7e39c193b 100644 --- a/ydb/core/tx/columnshard/write_actor.cpp +++ b/ydb/core/tx/columnshard/write_actor.cpp @@ -13,7 +13,7 @@ public: } TWriteActor(ui64 tabletId, - const NOlap::TIndexInfo& indexInfo, + const NOlap::ISnapshotSchema::TPtr& snapshotSchema, const TActorId& dstActor, TBlobBatch&& blobBatch, bool blobGrouppingEnabled, @@ -21,7 +21,7 @@ public: TAutoPtr<TEvPrivate::TEvWriteIndex> writeIndexEv, const TInstant& deadline) : TabletId(tabletId) - , IndexInfo(indexInfo) + , SnapshotSchema(snapshotSchema) , DstActor(dstActor) , BlobBatch(std::move(blobBatch)) , BlobGrouppingEnabled(blobGrouppingEnabled) @@ -29,6 +29,7 @@ public: , WriteIndexEv(writeIndexEv) , Deadline(deadline) { + Y_VERIFY(SnapshotSchema); Y_VERIFY(WriteEv || WriteIndexEv); Y_VERIFY(!WriteEv || !WriteIndexEv); } @@ -118,7 +119,7 @@ public: std::shared_ptr<arrow::RecordBatch>& batch = WriteEv->WrittenBatch; { TCpuGuard guard(ResourceUsage); - batch = IndexInfo.PrepareForInsert(srcData, meta, strError); + batch = SnapshotSchema->PrepareForInsert(srcData, meta, strError); } if (!batch) { LOG_S_INFO("Bad data for writeId " << writeId << ", pathId " << pathId @@ -275,7 +276,7 @@ public: private: ui64 TabletId; - NOlap::TIndexInfo IndexInfo; + NOlap::ISnapshotSchema::TPtr SnapshotSchema; TActorId DstActor; TBlobBatch BlobBatch; bool BlobGrouppingEnabled; @@ -311,16 +312,16 @@ private: } // namespace -IActor* CreateWriteActor(ui64 tabletId, const NOlap::TIndexInfo& indexTable, +IActor* CreateWriteActor(ui64 tabletId, const NOlap::ISnapshotSchema::TPtr& snapshotSchema, const TActorId& dstActor, TBlobBatch&& blobBatch, bool blobGrouppingEnabled, TAutoPtr<TEvColumnShard::TEvWrite> ev, const TInstant& deadline) { - return new TWriteActor(tabletId, indexTable, dstActor, std::move(blobBatch), blobGrouppingEnabled, ev, {}, deadline); + return new TWriteActor(tabletId, snapshotSchema, dstActor, std::move(blobBatch), blobGrouppingEnabled, ev, {}, deadline); } -IActor* CreateWriteActor(ui64 tabletId, const NOlap::TIndexInfo& indexTable, +IActor* CreateWriteActor(ui64 tabletId, const NOlap::ISnapshotSchema::TPtr& snapshotSchema, const TActorId& dstActor, TBlobBatch&& blobBatch, bool blobGrouppingEnabled, TAutoPtr<TEvPrivate::TEvWriteIndex> ev, const TInstant& deadline) { - return new TWriteActor(tabletId, indexTable, dstActor, std::move(blobBatch), blobGrouppingEnabled, {}, ev, deadline); + return new TWriteActor(tabletId, snapshotSchema, dstActor, std::move(blobBatch), blobGrouppingEnabled, {}, ev, deadline); } } diff --git a/ydb/core/tx/tx_proxy/upload_rows_common_impl.h b/ydb/core/tx/tx_proxy/upload_rows_common_impl.h index 5177df23e2a..bb6a85122b7 100644 --- a/ydb/core/tx/tx_proxy/upload_rows_common_impl.h +++ b/ydb/core/tx/tx_proxy/upload_rows_common_impl.h @@ -729,22 +729,13 @@ private: const auto& description = entry.ColumnTableInfo->Description; const auto& schema = description.GetSchema(); -#if 1 // TODO: do we need this restriction? - if ((size_t)schema.GetColumns().size() != KeyColumnPositions.size() + ValueColumnPositions.size()) { - ReplyWithError(Ydb::StatusIds::SCHEME_ERROR, - "Column count in the request doesn't match column count in the schema", ctx); - return {}; - } -#endif std::vector<TString> outColumns; outColumns.reserve(YdbSchema.size()); for (size_t i = 0; i < (size_t)schema.GetColumns().size(); ++i) { auto columnId = schema.GetColumns(i).GetId(); if (!Id2Position.count(columnId)) { - ReplyWithError(Ydb::StatusIds::SCHEME_ERROR, - "Column id in the request doesn't match column id in the schema", ctx); - return {}; + continue; } size_t position = Id2Position[columnId]; outColumns.push_back(YdbSchema[position].first); |