aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authornsofya <nsofya@yandex-team.com>2023-06-06 17:34:58 +0300
committernsofya <nsofya@yandex-team.com>2023-06-06 17:34:58 +0300
commitd0be5aa18d38add1dd73a3279bc23a1d4a3ad6a6 (patch)
treea772087ffac4399dd43d90045c2071eeb41a610d
parent7a5a8df74899e9512edd404d83d4ab73ce2de7d8 (diff)
downloadydb-d0be5aa18d38add1dd73a3279bc23a1d4a3ad6a6.tar.gz
Add null columns if not present in input batch scheme
Add null columns if not presents in input batch scheme
-rw-r--r--ydb/core/formats/arrow/arrow_helpers.cpp15
-rw-r--r--ydb/core/kqp/ut/scheme/kqp_scheme_ut.cpp35
-rw-r--r--ydb/core/tx/columnshard/columnshard__write.cpp2
-rw-r--r--ydb/core/tx/columnshard/columnshard__write_index.cpp3
-rw-r--r--ydb/core/tx/columnshard/columnshard_impl.h4
-rw-r--r--ydb/core/tx/columnshard/engines/index_info.cpp61
-rw-r--r--ydb/core/tx/columnshard/engines/index_info.h2
-rw-r--r--ydb/core/tx/columnshard/engines/portion_info.cpp61
-rw-r--r--ydb/core/tx/columnshard/engines/portion_info.h1
-rw-r--r--ydb/core/tx/columnshard/write_actor.cpp17
-rw-r--r--ydb/core/tx/tx_proxy/upload_rows_common_impl.h11
11 files changed, 120 insertions, 92 deletions
diff --git a/ydb/core/formats/arrow/arrow_helpers.cpp b/ydb/core/formats/arrow/arrow_helpers.cpp
index 21fc452453f..d248ae52dee 100644
--- a/ydb/core/formats/arrow/arrow_helpers.cpp
+++ b/ydb/core/formats/arrow/arrow_helpers.cpp
@@ -207,22 +207,23 @@ std::shared_ptr<arrow::RecordBatch> ExtractColumns(const std::shared_ptr<arrow::
for (auto& field : dstSchema->fields()) {
columns.push_back(srcBatch->GetColumnByName(field->name()));
- Y_VERIFY(columns.back());
- if (!columns.back()->type()->Equals(field->type())) {
- columns.back() = {};
- }
-
if (!columns.back()) {
if (addNotExisted) {
auto result = arrow::MakeArrayOfNull(field->type(), srcBatch->num_rows());
if (!result.ok()) {
- return {};
+ return nullptr;
}
columns.back() = *result;
} else {
- return {};
+ return nullptr;
}
}
+
+ Y_VERIFY(columns.back());
+ if (!columns.back()->type()->Equals(field->type())) {
+ AFL_ERROR(NKikimrServices::TX_COLUMNSHARD)("event", "cannot_parse_incoming_batch")("reason", "invalid_column_type")("column", field->name())("column_type", field->type()->ToString());
+ return nullptr;
+ }
}
return arrow::RecordBatch::Make(dstSchema, srcBatch->num_rows(), columns);
diff --git a/ydb/core/kqp/ut/scheme/kqp_scheme_ut.cpp b/ydb/core/kqp/ut/scheme/kqp_scheme_ut.cpp
index 64a0ff7d802..78fc70f4b0f 100644
--- a/ydb/core/kqp/ut/scheme/kqp_scheme_ut.cpp
+++ b/ydb/core/kqp/ut/scheme/kqp_scheme_ut.cpp
@@ -4821,6 +4821,13 @@ Y_UNIT_TEST_SUITE(KqpOlapScheme) {
UNIT_ASSERT_VALUES_EQUAL_C(resCommitTx.Status().GetStatus(), EStatus::SUCCESS, resCommitTx.Status().GetIssues().ToString());
}
+ void InsertBulk(const TColumnTable& table, TTestHelper::TUpdatesBuilder& updates, const EStatus opStatus = EStatus::SUCCESS) {
+ Y_UNUSED(opStatus);
+ NKikimr::Tests::NCS::THelper helper(Kikimr.GetTestServer());
+ auto batch = updates.BuildArrow();
+ helper.SendDataViaActorSystem(table.GetName(), batch);
+ }
+
void ReadData(const TString& query, const TString& expected) {
auto it = TableClient.StreamExecuteScanQuery(query).GetValueSync();
UNIT_ASSERT_C(it.IsSuccess(), it.GetIssues().ToString());
@@ -4928,6 +4935,34 @@ Y_UNIT_TEST_SUITE(KqpOlapScheme) {
}
}
+ Y_UNIT_TEST(AddColumnOldSchemeBulkUpsert) {
+ TKikimrSettings runnerSettings;
+ runnerSettings.WithSampleTables = false;
+ TTestHelper testHelper(runnerSettings);
+
+ TVector<TTestHelper::TColumnSchema> schema = {
+ TTestHelper::TColumnSchema().SetName("id").SetType(NScheme::NTypeIds::Int32).SetNullable(false),
+ TTestHelper::TColumnSchema().SetName("resource_id").SetType(NScheme::NTypeIds::Utf8),
+ TTestHelper::TColumnSchema().SetName("level").SetType(NScheme::NTypeIds::Int32)
+ };
+
+ TTestHelper::TColumnTable testTable;
+
+ testTable.SetName("/Root/ColumnTableTest").SetPrimaryKey({"id"}).SetSharding({"id"}).SetSchema(schema);
+ testHelper.CreateTable(testTable);
+ {
+ auto alterQuery = TStringBuilder() << "ALTER TABLE `" << testTable.GetName() << "` ADD COLUMN new_column Uint64;";
+ auto alterResult = testHelper.GetSession().ExecuteSchemeQuery(alterQuery).GetValueSync();
+ UNIT_ASSERT_VALUES_EQUAL_C(alterResult.GetStatus(), EStatus::SUCCESS, alterResult.GetIssues().ToString());
+ }
+ {
+ TTestHelper::TUpdatesBuilder tableInserter(testTable.GetArrowSchema(schema));
+ tableInserter.AddRow().Add(1).Add("test_res_1").AddNull();
+ testHelper.InsertBulk(testTable, tableInserter, EStatus::SUCCESS);
+ }
+ testHelper.ReadData("SELECT * FROM `/Root/ColumnTableTest` WHERE id=1", "[[1;#;#;[\"test_res_1\"]]]");
+ }
+
Y_UNIT_TEST(AddColumnOnSchemeChange) {
TKikimrSettings runnerSettings;
runnerSettings.WithSampleTables = false;
diff --git a/ydb/core/tx/columnshard/columnshard__write.cpp b/ydb/core/tx/columnshard/columnshard__write.cpp
index 0dcb1a12e56..3869a4760ea 100644
--- a/ydb/core/tx/columnshard/columnshard__write.cpp
+++ b/ydb/core/tx/columnshard/columnshard__write.cpp
@@ -224,7 +224,7 @@ void TColumnShard::Handle(TEvColumnShard::TEvWrite::TPtr& ev, const TActorContex
++WritesInFly; // write started
const auto& snapshotSchema = TablesManager.GetPrimaryIndex()->GetVersionedIndex().GetLastSchema();
- ctx.Register(CreateWriteActor(TabletID(), snapshotSchema->GetIndexInfo(), ctx.SelfID,
+ ctx.Register(CreateWriteActor(TabletID(), snapshotSchema, ctx.SelfID,
BlobManager->StartBlobBatch(), Settings.BlobWriteGrouppingEnabled, ev->Release()));
}
diff --git a/ydb/core/tx/columnshard/columnshard__write_index.cpp b/ydb/core/tx/columnshard/columnshard__write_index.cpp
index 82754ad0a72..c7b59eb4bd5 100644
--- a/ydb/core/tx/columnshard/columnshard__write_index.cpp
+++ b/ydb/core/tx/columnshard/columnshard__write_index.cpp
@@ -344,7 +344,8 @@ void TColumnShard::Handle(TEvPrivate::TEvWriteIndex::TPtr& ev, const TActorConte
LOG_S_DEBUG("WriteIndex (" << blobs.size() << " blobs) at tablet " << TabletID());
Y_VERIFY(!blobs.empty());
- ctx.Register(CreateWriteActor(TabletID(), NOlap::TIndexInfo::BuildDefault(), ctx.SelfID,
+ auto snapshotSchema = std::make_shared<NOlap::TSnapshotSchema>(NOlap::TIndexInfo::BuildDefault(), NOlap::TSnapshot::Zero());
+ ctx.Register(CreateWriteActor(TabletID(), snapshotSchema, ctx.SelfID,
BlobManager->StartBlobBatch(), Settings.BlobWriteGrouppingEnabled, ev->Release()));
}
} else {
diff --git a/ydb/core/tx/columnshard/columnshard_impl.h b/ydb/core/tx/columnshard/columnshard_impl.h
index e83d109f13d..4670398b548 100644
--- a/ydb/core/tx/columnshard/columnshard_impl.h
+++ b/ydb/core/tx/columnshard/columnshard_impl.h
@@ -27,10 +27,10 @@ extern bool gAllowLogBatchingDefaultValue;
IActor* CreateIndexingActor(ui64 tabletId, const TActorId& parent, const TIndexationCounters& counters);
IActor* CreateCompactionActor(ui64 tabletId, const TActorId& parent, const ui64 workers);
IActor* CreateEvictionActor(ui64 tabletId, const TActorId& parent, const TIndexationCounters& counters);
-IActor* CreateWriteActor(ui64 tabletId, const NOlap::TIndexInfo& indexTable,
+IActor* CreateWriteActor(ui64 tabletId, const NOlap::ISnapshotSchema::TPtr& snapshotSchema,
const TActorId& dstActor, TBlobBatch&& blobBatch, bool blobGrouppingEnabled,
TAutoPtr<TEvColumnShard::TEvWrite> ev, const TInstant& deadline = TInstant::Max());
-IActor* CreateWriteActor(ui64 tabletId, const NOlap::TIndexInfo& indexTable,
+IActor* CreateWriteActor(ui64 tabletId, const NOlap::ISnapshotSchema::TPtr& snapshotSchema,
const TActorId& dstActor, TBlobBatch&& blobBatch, bool blobGrouppingEnabled,
TAutoPtr<TEvPrivate::TEvWriteIndex> ev, const TInstant& deadline = TInstant::Max());
IActor* CreateReadActor(ui64 tabletId,
diff --git a/ydb/core/tx/columnshard/engines/index_info.cpp b/ydb/core/tx/columnshard/engines/index_info.cpp
index 233f69c66cb..886b67762d2 100644
--- a/ydb/core/tx/columnshard/engines/index_info.cpp
+++ b/ydb/core/tx/columnshard/engines/index_info.cpp
@@ -129,67 +129,6 @@ std::vector<TNameTypeInfo> TIndexInfo::GetColumns(const std::vector<ui32>& ids)
return NOlap::GetColumns(*this, ids);
}
-std::shared_ptr<arrow::RecordBatch> TIndexInfo::PrepareForInsert(const TString& data, const TString& metadata,
- TString& strError) const {
- std::shared_ptr<arrow::Schema> schema = ArrowSchema();
- std::shared_ptr<arrow::Schema> differentSchema;
- if (metadata.size()) {
- differentSchema = NArrow::DeserializeSchema(metadata);
- if (!differentSchema) {
- strError = "DeserializeSchema() failed";
- return {};
- }
- }
-
- auto batch = NArrow::DeserializeBatch(data, (differentSchema ? differentSchema : schema));
- if (!batch) {
- strError = "DeserializeBatch() failed";
- return {};
- }
- if (batch->num_rows() == 0) {
- strError = "empty batch";
- return {};
- }
-
- // Correct schema
- if (differentSchema) {
- batch = NArrow::ExtractColumns(batch, ArrowSchema());
- if (!batch) {
- strError = "cannot correct schema";
- return {};
- }
- }
-
- if (!batch->schema()->Equals(ArrowSchema())) {
- strError = "unexpected schema for insert batch: '" + batch->schema()->ToString() + "'";
- return {};
- }
-
- // Check PK is NOT NULL
- for (auto& field : SortingKey->fields()) {
- auto column = batch->GetColumnByName(field->name());
- if (!column) {
- strError = "missing PK column '" + field->name() + "'";
- return {};
- }
- if (NArrow::HasNulls(column)) {
- strError = "PK column '" + field->name() + "' contains NULLs";
- return {};
- }
- }
-
- auto status = batch->ValidateFull();
- if (!status.ok()) {
- strError = status.ToString();
- return {};
- }
-
- Y_VERIFY(SortingKey);
- batch = NArrow::SortBatch(batch, SortingKey);
- Y_VERIFY_DEBUG(NArrow::IsSorted(batch, SortingKey));
- return batch;
-}
-
std::shared_ptr<arrow::Schema> TIndexInfo::ArrowSchema() const {
if (!Schema) {
std::vector<ui32> ids;
diff --git a/ydb/core/tx/columnshard/engines/index_info.h b/ydb/core/tx/columnshard/engines/index_info.h
index 446ea3d108a..988f3f7b165 100644
--- a/ydb/core/tx/columnshard/engines/index_info.h
+++ b/ydb/core/tx/columnshard/engines/index_info.h
@@ -259,8 +259,6 @@ public:
std::shared_ptr<arrow::Schema> ArrowSchema(const std::vector<ui32>& columnIds, bool withSpecials = false) const;
std::shared_ptr<arrow::Schema> ArrowSchema(const std::vector<TString>& columnNames) const;
std::shared_ptr<arrow::Field> ArrowColumnField(ui32 columnId) const;
- std::shared_ptr<arrow::RecordBatch> PrepareForInsert(const TString& data, const TString& metadata,
- TString& strError) const;
const THashSet<TString>& GetRequiredColumns() const {
return RequiredColumns;
diff --git a/ydb/core/tx/columnshard/engines/portion_info.cpp b/ydb/core/tx/columnshard/engines/portion_info.cpp
index b40b25a6340..196b153fcf0 100644
--- a/ydb/core/tx/columnshard/engines/portion_info.cpp
+++ b/ydb/core/tx/columnshard/engines/portion_info.cpp
@@ -31,6 +31,67 @@ std::shared_ptr<arrow::RecordBatch> ISnapshotSchema::NormalizeBatch(const ISnaps
return arrow::RecordBatch::Make(resultArrowSchema, batch->num_rows(), newColumns);
}
+std::shared_ptr<arrow::RecordBatch> ISnapshotSchema::PrepareForInsert(const TString& data, const TString& dataSchemaStr, TString& strError) const {
+ std::shared_ptr<arrow::Schema> dstSchema = GetIndexInfo().ArrowSchema();
+ std::shared_ptr<arrow::Schema> dataSchema;
+ if (dataSchemaStr.size()) {
+ dataSchema = NArrow::DeserializeSchema(dataSchemaStr);
+ if (!dataSchema) {
+ strError = "DeserializeSchema() failed";
+ return nullptr;
+ }
+ }
+
+ auto batch = NArrow::DeserializeBatch(data, (dataSchema ? dataSchema : dstSchema));
+ if (!batch) {
+ strError = "DeserializeBatch() failed";
+ return nullptr;
+ }
+ if (batch->num_rows() == 0) {
+ strError = "empty batch";
+ return nullptr;
+ }
+
+ // Correct schema
+ if (dataSchema) {
+ batch = NArrow::ExtractColumns(batch, dstSchema, true);
+ if (!batch) {
+ strError = "cannot correct schema";
+ return nullptr;
+ }
+ }
+
+ if (!batch->schema()->Equals(dstSchema)) {
+ strError = "unexpected schema for insert batch: '" + batch->schema()->ToString() + "'";
+ return nullptr;
+ }
+
+ const auto& sortingKey = GetIndexInfo().GetSortingKey();
+ Y_VERIFY(sortingKey);
+
+ // Check PK is NOT NULL
+ for (auto& field : sortingKey->fields()) {
+ auto column = batch->GetColumnByName(field->name());
+ if (!column) {
+ strError = "missing PK column '" + field->name() + "'";
+ return nullptr;
+ }
+ if (NArrow::HasNulls(column)) {
+ strError = "PK column '" + field->name() + "' contains NULLs";
+ return nullptr;
+ }
+ }
+
+ auto status = batch->ValidateFull();
+ if (!status.ok()) {
+ strError = status.ToString();
+ return nullptr;
+ }
+ batch = NArrow::SortBatch(batch, sortingKey);
+ Y_VERIFY_DEBUG(NArrow::IsSorted(batch, sortingKey));
+ return batch;
+}
+
TString TPortionInfo::SerializeColumn(const std::shared_ptr<arrow::Array>& array,
const std::shared_ptr<arrow::Field>& field,
const TColumnSaver saver) {
diff --git a/ydb/core/tx/columnshard/engines/portion_info.h b/ydb/core/tx/columnshard/engines/portion_info.h
index ae81f94ecba..bd96e230781 100644
--- a/ydb/core/tx/columnshard/engines/portion_info.h
+++ b/ydb/core/tx/columnshard/engines/portion_info.h
@@ -49,6 +49,7 @@ public:
virtual const TSnapshot& GetSnapshot() const = 0;
std::shared_ptr<arrow::RecordBatch> NormalizeBatch(const ISnapshotSchema& dataSchema, const std::shared_ptr<arrow::RecordBatch> batch) const;
+ std::shared_ptr<arrow::RecordBatch> PrepareForInsert(const TString& data, const TString& dataSchemaStr, TString& strError) const;
};
class TSnapshotSchema: public ISnapshotSchema {
diff --git a/ydb/core/tx/columnshard/write_actor.cpp b/ydb/core/tx/columnshard/write_actor.cpp
index 7ef9b70e4ba..1d7e39c193b 100644
--- a/ydb/core/tx/columnshard/write_actor.cpp
+++ b/ydb/core/tx/columnshard/write_actor.cpp
@@ -13,7 +13,7 @@ public:
}
TWriteActor(ui64 tabletId,
- const NOlap::TIndexInfo& indexInfo,
+ const NOlap::ISnapshotSchema::TPtr& snapshotSchema,
const TActorId& dstActor,
TBlobBatch&& blobBatch,
bool blobGrouppingEnabled,
@@ -21,7 +21,7 @@ public:
TAutoPtr<TEvPrivate::TEvWriteIndex> writeIndexEv,
const TInstant& deadline)
: TabletId(tabletId)
- , IndexInfo(indexInfo)
+ , SnapshotSchema(snapshotSchema)
, DstActor(dstActor)
, BlobBatch(std::move(blobBatch))
, BlobGrouppingEnabled(blobGrouppingEnabled)
@@ -29,6 +29,7 @@ public:
, WriteIndexEv(writeIndexEv)
, Deadline(deadline)
{
+ Y_VERIFY(SnapshotSchema);
Y_VERIFY(WriteEv || WriteIndexEv);
Y_VERIFY(!WriteEv || !WriteIndexEv);
}
@@ -118,7 +119,7 @@ public:
std::shared_ptr<arrow::RecordBatch>& batch = WriteEv->WrittenBatch;
{
TCpuGuard guard(ResourceUsage);
- batch = IndexInfo.PrepareForInsert(srcData, meta, strError);
+ batch = SnapshotSchema->PrepareForInsert(srcData, meta, strError);
}
if (!batch) {
LOG_S_INFO("Bad data for writeId " << writeId << ", pathId " << pathId
@@ -275,7 +276,7 @@ public:
private:
ui64 TabletId;
- NOlap::TIndexInfo IndexInfo;
+ NOlap::ISnapshotSchema::TPtr SnapshotSchema;
TActorId DstActor;
TBlobBatch BlobBatch;
bool BlobGrouppingEnabled;
@@ -311,16 +312,16 @@ private:
} // namespace
-IActor* CreateWriteActor(ui64 tabletId, const NOlap::TIndexInfo& indexTable,
+IActor* CreateWriteActor(ui64 tabletId, const NOlap::ISnapshotSchema::TPtr& snapshotSchema,
const TActorId& dstActor, TBlobBatch&& blobBatch, bool blobGrouppingEnabled,
TAutoPtr<TEvColumnShard::TEvWrite> ev, const TInstant& deadline) {
- return new TWriteActor(tabletId, indexTable, dstActor, std::move(blobBatch), blobGrouppingEnabled, ev, {}, deadline);
+ return new TWriteActor(tabletId, snapshotSchema, dstActor, std::move(blobBatch), blobGrouppingEnabled, ev, {}, deadline);
}
-IActor* CreateWriteActor(ui64 tabletId, const NOlap::TIndexInfo& indexTable,
+IActor* CreateWriteActor(ui64 tabletId, const NOlap::ISnapshotSchema::TPtr& snapshotSchema,
const TActorId& dstActor, TBlobBatch&& blobBatch, bool blobGrouppingEnabled,
TAutoPtr<TEvPrivate::TEvWriteIndex> ev, const TInstant& deadline) {
- return new TWriteActor(tabletId, indexTable, dstActor, std::move(blobBatch), blobGrouppingEnabled, {}, ev, deadline);
+ return new TWriteActor(tabletId, snapshotSchema, dstActor, std::move(blobBatch), blobGrouppingEnabled, {}, ev, deadline);
}
}
diff --git a/ydb/core/tx/tx_proxy/upload_rows_common_impl.h b/ydb/core/tx/tx_proxy/upload_rows_common_impl.h
index 5177df23e2a..bb6a85122b7 100644
--- a/ydb/core/tx/tx_proxy/upload_rows_common_impl.h
+++ b/ydb/core/tx/tx_proxy/upload_rows_common_impl.h
@@ -729,22 +729,13 @@ private:
const auto& description = entry.ColumnTableInfo->Description;
const auto& schema = description.GetSchema();
-#if 1 // TODO: do we need this restriction?
- if ((size_t)schema.GetColumns().size() != KeyColumnPositions.size() + ValueColumnPositions.size()) {
- ReplyWithError(Ydb::StatusIds::SCHEME_ERROR,
- "Column count in the request doesn't match column count in the schema", ctx);
- return {};
- }
-#endif
std::vector<TString> outColumns;
outColumns.reserve(YdbSchema.size());
for (size_t i = 0; i < (size_t)schema.GetColumns().size(); ++i) {
auto columnId = schema.GetColumns(i).GetId();
if (!Id2Position.count(columnId)) {
- ReplyWithError(Ydb::StatusIds::SCHEME_ERROR,
- "Column id in the request doesn't match column id in the schema", ctx);
- return {};
+ continue;
}
size_t position = Id2Position[columnId];
outColumns.push_back(YdbSchema[position].first);