diff options
author | nsofya <nsofya@yandex-team.com> | 2023-08-05 15:18:53 +0300 |
---|---|---|
committer | nsofya <nsofya@yandex-team.com> | 2023-08-05 15:56:37 +0300 |
commit | 392403b56a9951b878388b4c0086fdffa02bb1a1 (patch) | |
tree | 89556fa6149897672cbaa3997ecc3e99623c0d88 | |
parent | b31702172dc2daefa2653d49c5eed1107126a5a8 (diff) | |
download | ydb-392403b56a9951b878388b4c0086fdffa02bb1a1.tar.gz |
KIKIMR-18343: Use schema version from kqp on write
19 files changed, 86 insertions, 26 deletions
diff --git a/ydb/core/tx/columnshard/columnshard__write.cpp b/ydb/core/tx/columnshard/columnshard__write.cpp index 39a12f5baf..027b991331 100644 --- a/ydb/core/tx/columnshard/columnshard__write.cpp +++ b/ydb/core/tx/columnshard/columnshard__write.cpp @@ -56,7 +56,9 @@ bool TTxWrite::InsertOneBlob(TTransactionContext& txc, const TEvPrivate::TEvWrit const auto& writeMeta(PutBlobResult->Get()->GetWriteMeta()); - NOlap::TInsertedData insertData((ui64)writeId, writeMeta.GetTableId(), writeMeta.GetDedupId(), logoBlobId, meta, time, PutBlobResult->Get()->GetSnapshot()); + auto tableSchema = Self->TablesManager.GetPrimaryIndex()->GetVersionedIndex().GetSchemaUnsafe(PutBlobResult->Get()->GetSchemaVersion()); + + NOlap::TInsertedData insertData((ui64)writeId, writeMeta.GetTableId(), writeMeta.GetDedupId(), logoBlobId, meta, time, tableSchema->GetSnapshot()); bool ok = Self->InsertTable->Insert(dbTable, std::move(insertData)); if (ok) { THashSet<TWriteId> writesToAbort = Self->InsertTable->OldWritesToAbort(time); @@ -292,7 +294,7 @@ void TColumnShard::Handle(TEvColumnShard::TEvWrite::TPtr& ev, const TActorContex << WritesMonitor.DebugString() << " at tablet " << TabletID()); - auto writeController = std::make_shared<NOlap::TIndexedWriteController>(ctx.SelfID, writeData, snapshotSchema); + auto writeController = std::make_shared<NOlap::TIndexedWriteController>(ctx.SelfID, writeData); ctx.Register(CreateWriteActor(TabletID(), writeController, BlobManager->StartBlobBatch(), TInstant::Max(), Settings.MaxSmallBlobSize)); } } @@ -303,6 +305,21 @@ void TColumnShard::Handle(NEvents::TDataEvents::TEvWrite::TPtr& ev, const TActor const ui64 txId = ev->Get()->GetTxId(); const auto source = ev->Sender; + if (!record.GetTableId().HasSchemaVersion()) { + IncCounter(COUNTER_WRITE_FAIL); + auto result = NEvents::TDataEvents::TEvWriteResult::BuildError(txId, NKikimrDataEvents::TEvWriteResult::BAD_REQUEST, "schema version not set"); + ctx.Send(source, result.release()); + return; + } + + auto schema = TablesManager.GetPrimaryIndex()->GetVersionedIndex().GetSchema(record.GetTableId().GetSchemaVersion()); + if (!schema) { + IncCounter(COUNTER_WRITE_FAIL); + auto result = NEvents::TDataEvents::TEvWriteResult::BuildError(txId, NKikimrDataEvents::TEvWriteResult::BAD_REQUEST, "unknown schema version"); + ctx.Send(source, result.release()); + return; + } + if (!TablesManager.IsReadyForWrite(tableId)) { IncCounter(COUNTER_WRITE_FAIL); auto result = NEvents::TDataEvents::TEvWriteResult::BuildError(txId, NKikimrDataEvents::TEvWriteResult::ERROR, "table not writable"); @@ -310,7 +327,7 @@ void TColumnShard::Handle(NEvents::TDataEvents::TEvWrite::TPtr& ev, const TActor return; } - auto arrowData = std::make_shared<TArrowData>(TablesManager.GetPrimaryIndex()->GetVersionedIndex().GetLastSchema()); + auto arrowData = std::make_shared<TArrowData>(schema); if (!arrowData->Parse(record.GetReplace(), TPayloadHelper<NEvents::TDataEvents::TEvWrite>(*ev->Get()))) { IncCounter(COUNTER_WRITE_FAIL); auto result = NEvents::TDataEvents::TEvWriteResult::BuildError(txId, NKikimrDataEvents::TEvWriteResult::BAD_REQUEST, "parsing data error"); diff --git a/ydb/core/tx/columnshard/columnshard_private_events.h b/ydb/core/tx/columnshard/columnshard_private_events.h index af290e2556..ce18fd97bb 100644 --- a/ydb/core/tx/columnshard/columnshard_private_events.h +++ b/ydb/core/tx/columnshard/columnshard_private_events.h @@ -272,18 +272,18 @@ struct TEvPrivate { } }; - TEvWriteBlobsResult(const NColumnShard::TBlobPutResult::TPtr& putResult, const NEvWrite::TWriteMeta& writeMeta, const NOlap::TSnapshot& snapshot) + TEvWriteBlobsResult(const NColumnShard::TBlobPutResult::TPtr& putResult, const NEvWrite::TWriteMeta& writeMeta) : PutResult(putResult) , WriteMeta(writeMeta) - , Snapshot(snapshot) { Y_VERIFY(PutResult); } - TEvWriteBlobsResult(const NColumnShard::TBlobPutResult::TPtr& putResult, TVector<TPutBlobData>&& blobData, const NEvWrite::TWriteMeta& writeMeta, const NOlap::TSnapshot& snapshot) - : TEvWriteBlobsResult(putResult, writeMeta, snapshot) + TEvWriteBlobsResult(const NColumnShard::TBlobPutResult::TPtr& putResult, TVector<TPutBlobData>&& blobData, const NEvWrite::TWriteMeta& writeMeta, const ui64 schemaVersion) + : TEvWriteBlobsResult(putResult, writeMeta) { BlobData = std::move(blobData); + SchemaVersion = schemaVersion; } const TVector<TPutBlobData>& GetBlobData() const { @@ -302,15 +302,15 @@ struct TEvPrivate { return WriteMeta; } - const NOlap::TSnapshot& GetSnapshot() const { - return Snapshot; + ui64 GetSchemaVersion() const { + return SchemaVersion; } private: NColumnShard::TBlobPutResult::TPtr PutResult; TVector<TPutBlobData> BlobData; NEvWrite::TWriteMeta WriteMeta; - NOlap::TSnapshot Snapshot; + ui64 SchemaVersion = 0; }; }; diff --git a/ydb/core/tx/columnshard/engines/column_engine.h b/ydb/core/tx/columnshard/engines/column_engine.h index 761bf724a8..3b2e5f7a1f 100644 --- a/ydb/core/tx/columnshard/engines/column_engine.h +++ b/ydb/core/tx/columnshard/engines/column_engine.h @@ -305,7 +305,19 @@ public: class TVersionedIndex { std::map<TSnapshot, ISnapshotSchema::TPtr> Snapshots; std::shared_ptr<arrow::Schema> IndexKey; + std::map<ui64, ISnapshotSchema::TPtr> SnapshotByVersion; public: + ISnapshotSchema::TPtr GetSchema(const ui64 version) const { + auto it = SnapshotByVersion.find(version); + return it == SnapshotByVersion.end() ? nullptr : it->second; + } + + ISnapshotSchema::TPtr GetSchemaUnsafe(const ui64 version) const { + auto it = SnapshotByVersion.find(version); + Y_VERIFY(it != SnapshotByVersion.end()); + return it->second; + } + ISnapshotSchema::TPtr GetSchema(const TSnapshot& version) const { for (auto it = Snapshots.rbegin(); it != Snapshots.rend(); ++it) { if (it->first <= version) { @@ -332,7 +344,10 @@ public: } else { Y_VERIFY(IndexKey->Equals(indexInfo.GetIndexKey())); } - Snapshots.emplace(version, std::make_shared<TSnapshotSchema>(std::move(indexInfo), version)); + auto it = Snapshots.emplace(version, std::make_shared<TSnapshotSchema>(std::move(indexInfo), version)); + if (!SnapshotByVersion.emplace(it.first->second->GetVersion(), it.first->second).second) { + Y_VERIFY(GetLastSchema()->GetVersion() == it.first->second->GetVersion()); + } } }; diff --git a/ydb/core/tx/columnshard/engines/scheme/abstract_scheme.h b/ydb/core/tx/columnshard/engines/scheme/abstract_scheme.h index 06fd516712..02748f05c9 100644 --- a/ydb/core/tx/columnshard/engines/scheme/abstract_scheme.h +++ b/ydb/core/tx/columnshard/engines/scheme/abstract_scheme.h @@ -40,6 +40,7 @@ public: virtual const std::shared_ptr<arrow::Schema>& GetSchema() const = 0; virtual const TIndexInfo& GetIndexInfo() const = 0; virtual const TSnapshot& GetSnapshot() const = 0; + virtual ui64 GetVersion() const = 0; virtual ui32 GetColumnsCount() const = 0; std::shared_ptr<arrow::RecordBatch> NormalizeBatch(const ISnapshotSchema& dataSchema, const std::shared_ptr<arrow::RecordBatch> batch) const; diff --git a/ydb/core/tx/columnshard/engines/scheme/filtered_scheme.cpp b/ydb/core/tx/columnshard/engines/scheme/filtered_scheme.cpp index 31b8135138..61d3a8d200 100644 --- a/ydb/core/tx/columnshard/engines/scheme/filtered_scheme.cpp +++ b/ydb/core/tx/columnshard/engines/scheme/filtered_scheme.cpp @@ -63,4 +63,8 @@ ui32 TFilteredSnapshotSchema::GetColumnsCount() const { return Schema->num_fields(); } +ui64 TFilteredSnapshotSchema::GetVersion() const { + return OriginalSnapshot->GetIndexInfo().GetVersion(); +} + } diff --git a/ydb/core/tx/columnshard/engines/scheme/filtered_scheme.h b/ydb/core/tx/columnshard/engines/scheme/filtered_scheme.h index 00e754e587..9673d78061 100644 --- a/ydb/core/tx/columnshard/engines/scheme/filtered_scheme.h +++ b/ydb/core/tx/columnshard/engines/scheme/filtered_scheme.h @@ -23,6 +23,7 @@ public: const TIndexInfo& GetIndexInfo() const override; const TSnapshot& GetSnapshot() const override; ui32 GetColumnsCount() const override; + ui64 GetVersion() const override; }; } diff --git a/ydb/core/tx/columnshard/engines/scheme/index_info.cpp b/ydb/core/tx/columnshard/engines/scheme/index_info.cpp index b87054b82c..3d6b808e4e 100644 --- a/ydb/core/tx/columnshard/engines/scheme/index_info.cpp +++ b/ydb/core/tx/columnshard/engines/scheme/index_info.cpp @@ -374,6 +374,7 @@ bool TIndexInfo::DeserializeFromProto(const NKikimrSchemeOp::TColumnTableSchema& DefaultCompression = *result; } + Version = schema.GetVersion(); CompositeMarks = schema.GetCompositeMarks(); CompositeIndexKey = AppData()->FeatureFlags.GetForceColumnTablesCompositeMarks() ? true : CompositeMarks; return true; diff --git a/ydb/core/tx/columnshard/engines/scheme/index_info.h b/ydb/core/tx/columnshard/engines/scheme/index_info.h index 7bb0b9bdda..ec8835e71f 100644 --- a/ydb/core/tx/columnshard/engines/scheme/index_info.h +++ b/ydb/core/tx/columnshard/engines/scheme/index_info.h @@ -179,8 +179,13 @@ public: return result; } + ui64 GetVersion() const { + return Version; + } + private: ui32 Id; + ui64 Version = 0; TString Name; bool CompositeIndexKey = false; mutable std::shared_ptr<arrow::Schema> Schema; diff --git a/ydb/core/tx/columnshard/engines/scheme/snapshot_scheme.cpp b/ydb/core/tx/columnshard/engines/scheme/snapshot_scheme.cpp index 52d4b7d328..3058c5943c 100644 --- a/ydb/core/tx/columnshard/engines/scheme/snapshot_scheme.cpp +++ b/ydb/core/tx/columnshard/engines/scheme/snapshot_scheme.cpp @@ -46,4 +46,8 @@ ui32 TSnapshotSchema::GetColumnsCount() const { return Schema->num_fields(); } +ui64 TSnapshotSchema::GetVersion() const { + return IndexInfo.GetVersion(); +} + } diff --git a/ydb/core/tx/columnshard/engines/scheme/snapshot_scheme.h b/ydb/core/tx/columnshard/engines/scheme/snapshot_scheme.h index b72819a311..e144e7fe35 100644 --- a/ydb/core/tx/columnshard/engines/scheme/snapshot_scheme.h +++ b/ydb/core/tx/columnshard/engines/scheme/snapshot_scheme.h @@ -23,6 +23,7 @@ public: const TIndexInfo& GetIndexInfo() const override; const TSnapshot& GetSnapshot() const override; ui32 GetColumnsCount() const override; + ui64 GetVersion() const override; }; } diff --git a/ydb/core/tx/columnshard/engines/writer/indexed_blob_constructor.cpp b/ydb/core/tx/columnshard/engines/writer/indexed_blob_constructor.cpp index 87232608f2..0ab5872d39 100644 --- a/ydb/core/tx/columnshard/engines/writer/indexed_blob_constructor.cpp +++ b/ydb/core/tx/columnshard/engines/writer/indexed_blob_constructor.cpp @@ -6,9 +6,8 @@ namespace NKikimr::NOlap { -TIndexedWriteController::TBlobConstructor::TBlobConstructor(NOlap::ISnapshotSchema::TPtr snapshotSchema, TIndexedWriteController& owner) +TIndexedWriteController::TBlobConstructor::TBlobConstructor(TIndexedWriteController& owner) : Owner(owner) - , SnapshotSchema(snapshotSchema) {} const TString& TIndexedWriteController::TBlobConstructor::GetBlob() const { @@ -55,9 +54,9 @@ bool TIndexedWriteController::TBlobConstructor::Init() { return true; } -TIndexedWriteController::TIndexedWriteController(const TActorId& dstActor, const NEvWrite::TWriteData& writeData, NOlap::ISnapshotSchema::TPtr snapshotSchema) +TIndexedWriteController::TIndexedWriteController(const TActorId& dstActor, const NEvWrite::TWriteData& writeData) : WriteData(writeData) - , BlobConstructor(std::make_shared<TBlobConstructor>(snapshotSchema, *this)) + , BlobConstructor(std::make_shared<TBlobConstructor>(*this)) , DstActor(dstActor) { ResourceUsage.SourceMemorySize = WriteData.GetSize(); @@ -65,10 +64,10 @@ TIndexedWriteController::TIndexedWriteController(const TActorId& dstActor, const void TIndexedWriteController::DoOnReadyResult(const NActors::TActorContext& ctx, const NColumnShard::TBlobPutResult::TPtr& putResult) { if (putResult->GetPutStatus() == NKikimrProto::OK) { - auto result = std::make_unique<NColumnShard::TEvPrivate::TEvWriteBlobsResult>(putResult, std::move(BlobData), WriteData.GetWriteMeta(), BlobConstructor->GetSnapshot()); + auto result = std::make_unique<NColumnShard::TEvPrivate::TEvWriteBlobsResult>(putResult, std::move(BlobData), WriteData.GetWriteMeta(), WriteData.GetData().GetSchemaVersion()); ctx.Send(DstActor, result.release()); } else { - auto result = std::make_unique<NColumnShard::TEvPrivate::TEvWriteBlobsResult>(putResult, WriteData.GetWriteMeta(), BlobConstructor->GetSnapshot()); + auto result = std::make_unique<NColumnShard::TEvPrivate::TEvWriteBlobsResult>(putResult, WriteData.GetWriteMeta()); ctx.Send(DstActor, result.release()); } } diff --git a/ydb/core/tx/columnshard/engines/writer/indexed_blob_constructor.h b/ydb/core/tx/columnshard/engines/writer/indexed_blob_constructor.h index 299c9b640f..2800c58fc0 100644 --- a/ydb/core/tx/columnshard/engines/writer/indexed_blob_constructor.h +++ b/ydb/core/tx/columnshard/engines/writer/indexed_blob_constructor.h @@ -16,19 +16,15 @@ class TIndexedWriteController : public NColumnShard::IWriteController { private: class TBlobConstructor : public IBlobConstructor { TIndexedWriteController& Owner; - NOlap::ISnapshotSchema::TPtr SnapshotSchema; std::vector<NArrow::TSerializedBatch> BlobsSplitted; ui64 CurrentIndex = 0; public: - TBlobConstructor(NOlap::ISnapshotSchema::TPtr snapshotSchema, TIndexedWriteController& owner); + TBlobConstructor(TIndexedWriteController& owner); const TString& GetBlob() const override; EStatus BuildNext() override; bool RegisterBlobId(const TUnifiedBlobId& blobId) override; - const NOlap::TSnapshot& GetSnapshot() const { - return SnapshotSchema->GetSnapshot(); - } bool Init(); }; @@ -38,7 +34,7 @@ private: TActorId DstActor; public: - TIndexedWriteController(const TActorId& dstActor, const NEvWrite::TWriteData& writeData, NOlap::ISnapshotSchema::TPtr snapshotSchema); + TIndexedWriteController(const TActorId& dstActor, const NEvWrite::TWriteData& writeData); void DoOnReadyResult(const NActors::TActorContext& ctx, const NColumnShard::TBlobPutResult::TPtr& putResult) override; diff --git a/ydb/core/tx/columnshard/operations/write.cpp b/ydb/core/tx/columnshard/operations/write.cpp index cefb4b481b..63883faa35 100644 --- a/ydb/core/tx/columnshard/operations/write.cpp +++ b/ydb/core/tx/columnshard/operations/write.cpp @@ -22,8 +22,7 @@ namespace NKikimr::NColumnShard { Y_VERIFY(Status == EOperationStatus::Draft); NEvWrite::TWriteMeta writeMeta((ui64)WriteId, tableId, source); - const auto& snapshotSchema = owner.TablesManager.GetPrimaryIndex()->GetVersionedIndex().GetLastSchema(); - auto writeController = std::make_shared<NOlap::TIndexedWriteController>(ctx.SelfID, NEvWrite::TWriteData(writeMeta, data), snapshotSchema); + auto writeController = std::make_shared<NOlap::TIndexedWriteController>(ctx.SelfID, NEvWrite::TWriteData(writeMeta, data)); ctx.Register(CreateWriteActor(owner.TabletID(), writeController, owner.BlobManager->StartBlobBatch(), TInstant::Max(), owner.Settings.MaxSmallBlobSize)); Status = EOperationStatus::Started; } diff --git a/ydb/core/tx/columnshard/operations/write_data.cpp b/ydb/core/tx/columnshard/operations/write_data.cpp index b994b09684..18e1d4e7bd 100644 --- a/ydb/core/tx/columnshard/operations/write_data.cpp +++ b/ydb/core/tx/columnshard/operations/write_data.cpp @@ -20,6 +20,10 @@ std::shared_ptr<arrow::RecordBatch> TArrowData::GetArrowBatch() const { return IndexSchema->PrepareForInsert(IncomingData, BatchSchema->GetSchema()); } +ui64 TArrowData::GetSchemaVersion() const { + return IndexSchema->GetVersion(); +} + bool TProtoArrowData::ParseFromProto(const NKikimrTxColumnShard::TEvWrite& proto) { IncomingData = proto.GetData(); if (proto.HasMeta()) { @@ -40,4 +44,8 @@ std::shared_ptr<arrow::RecordBatch> TProtoArrowData::GetArrowBatch() const { return IndexSchema->PrepareForInsert(IncomingData, ArrowSchema); } +ui64 TProtoArrowData::GetSchemaVersion() const { + return IndexSchema->GetVersion(); +} + } diff --git a/ydb/core/tx/columnshard/operations/write_data.h b/ydb/core/tx/columnshard/operations/write_data.h index 8bb52f1e66..bf8c79eb34 100644 --- a/ydb/core/tx/columnshard/operations/write_data.h +++ b/ydb/core/tx/columnshard/operations/write_data.h @@ -50,6 +50,7 @@ public: bool Parse(const NKikimrDataEvents::TOperationData& proto, const IPayloadData& payload); std::shared_ptr<arrow::RecordBatch> GetArrowBatch() const override; + ui64 GetSchemaVersion() const override; private: NOlap::ISnapshotSchema::TPtr IndexSchema; @@ -69,6 +70,7 @@ public: bool ParseFromProto(const NKikimrTxColumnShard::TEvWrite& proto); std::shared_ptr<arrow::RecordBatch> GetArrowBatch() const override; + ui64 GetSchemaVersion() const override; private: NOlap::ISnapshotSchema::TPtr IndexSchema; diff --git a/ydb/core/tx/columnshard/ut_rw/ut_columnshard_read_write.cpp b/ydb/core/tx/columnshard/ut_rw/ut_columnshard_read_write.cpp index 350455b7ad..ae9e2954f9 100644 --- a/ydb/core/tx/columnshard/ut_rw/ut_columnshard_read_write.cpp +++ b/ydb/core/tx/columnshard/ut_rw/ut_columnshard_read_write.cpp @@ -1860,6 +1860,10 @@ Y_UNIT_TEST_SUITE(EvWrite) { } proto.MutableArrowData()->SetPayloadIndex(Index); } + + ui64 GetSchemaVersion() const override { + return 1; + } }; void PrepareTablet(TTestBasicRuntime& runtime, const ui64 tableId, const std::vector<std::pair<TString, TTypeInfo>>& schema) { diff --git a/ydb/core/tx/ev_write/events.h b/ydb/core/tx/ev_write/events.h index 2ddd53c2b3..827e0100a9 100644 --- a/ydb/core/tx/ev_write/events.h +++ b/ydb/core/tx/ev_write/events.h @@ -17,6 +17,7 @@ public: using TPtr = std::shared_ptr<IDataConstructor>; virtual ~IDataConstructor() {} virtual void Serialize(NKikimrDataEvents::TOperationData& proto) const = 0; + virtual ui64 GetSchemaVersion() const = 0; }; struct TDataEvents { @@ -53,6 +54,7 @@ struct TDataEvents { void AddReplaceOp(const ui64 tableId, const IDataConstructor::TPtr& data) { Record.MutableTableId()->SetTableId(tableId); Y_VERIFY(data); + Record.MutableTableId()->SetSchemaVersion(data->GetSchemaVersion()); data->Serialize(*Record.MutableReplace()); } diff --git a/ydb/core/tx/ev_write/write_data.h b/ydb/core/tx/ev_write/write_data.h index 54f7ee0525..bcdd83a071 100644 --- a/ydb/core/tx/ev_write/write_data.h +++ b/ydb/core/tx/ev_write/write_data.h @@ -13,6 +13,7 @@ public: virtual ~IDataContainer() {} virtual std::shared_ptr<arrow::RecordBatch> GetArrowBatch() const = 0; virtual const TString& GetData() const = 0; + virtual ui64 GetSchemaVersion() const = 0; }; class TWriteMeta { diff --git a/ydb/core/tx/schemeshard/schemeshard_olap_types.h b/ydb/core/tx/schemeshard/schemeshard_olap_types.h index 1a9a69e6cf..e3015d8786 100644 --- a/ydb/core/tx/schemeshard/schemeshard_olap_types.h +++ b/ydb/core/tx/schemeshard/schemeshard_olap_types.h @@ -144,7 +144,7 @@ namespace NKikimr::NSchemeShard { } bool Update(const TOlapSchemaUpdate& schemaUpdate, IErrorCollector& errors); - + void Parse(const NKikimrSchemeOp::TColumnTableSchema& tableSchema); void Serialize(NKikimrSchemeOp::TColumnTableSchema& tableSchema) const; bool Validate(const NKikimrSchemeOp::TColumnTableSchema& opSchema, IErrorCollector& errors) const; |