aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authornsofya <nsofya@yandex-team.com>2023-08-05 15:18:53 +0300
committernsofya <nsofya@yandex-team.com>2023-08-05 15:56:37 +0300
commit392403b56a9951b878388b4c0086fdffa02bb1a1 (patch)
tree89556fa6149897672cbaa3997ecc3e99623c0d88
parentb31702172dc2daefa2653d49c5eed1107126a5a8 (diff)
downloadydb-392403b56a9951b878388b4c0086fdffa02bb1a1.tar.gz
KIKIMR-18343: Use schema version from kqp on write
-rw-r--r--ydb/core/tx/columnshard/columnshard__write.cpp23
-rw-r--r--ydb/core/tx/columnshard/columnshard_private_events.h14
-rw-r--r--ydb/core/tx/columnshard/engines/column_engine.h17
-rw-r--r--ydb/core/tx/columnshard/engines/scheme/abstract_scheme.h1
-rw-r--r--ydb/core/tx/columnshard/engines/scheme/filtered_scheme.cpp4
-rw-r--r--ydb/core/tx/columnshard/engines/scheme/filtered_scheme.h1
-rw-r--r--ydb/core/tx/columnshard/engines/scheme/index_info.cpp1
-rw-r--r--ydb/core/tx/columnshard/engines/scheme/index_info.h5
-rw-r--r--ydb/core/tx/columnshard/engines/scheme/snapshot_scheme.cpp4
-rw-r--r--ydb/core/tx/columnshard/engines/scheme/snapshot_scheme.h1
-rw-r--r--ydb/core/tx/columnshard/engines/writer/indexed_blob_constructor.cpp11
-rw-r--r--ydb/core/tx/columnshard/engines/writer/indexed_blob_constructor.h8
-rw-r--r--ydb/core/tx/columnshard/operations/write.cpp3
-rw-r--r--ydb/core/tx/columnshard/operations/write_data.cpp8
-rw-r--r--ydb/core/tx/columnshard/operations/write_data.h2
-rw-r--r--ydb/core/tx/columnshard/ut_rw/ut_columnshard_read_write.cpp4
-rw-r--r--ydb/core/tx/ev_write/events.h2
-rw-r--r--ydb/core/tx/ev_write/write_data.h1
-rw-r--r--ydb/core/tx/schemeshard/schemeshard_olap_types.h2
19 files changed, 86 insertions, 26 deletions
diff --git a/ydb/core/tx/columnshard/columnshard__write.cpp b/ydb/core/tx/columnshard/columnshard__write.cpp
index 39a12f5baf..027b991331 100644
--- a/ydb/core/tx/columnshard/columnshard__write.cpp
+++ b/ydb/core/tx/columnshard/columnshard__write.cpp
@@ -56,7 +56,9 @@ bool TTxWrite::InsertOneBlob(TTransactionContext& txc, const TEvPrivate::TEvWrit
const auto& writeMeta(PutBlobResult->Get()->GetWriteMeta());
- NOlap::TInsertedData insertData((ui64)writeId, writeMeta.GetTableId(), writeMeta.GetDedupId(), logoBlobId, meta, time, PutBlobResult->Get()->GetSnapshot());
+ auto tableSchema = Self->TablesManager.GetPrimaryIndex()->GetVersionedIndex().GetSchemaUnsafe(PutBlobResult->Get()->GetSchemaVersion());
+
+ NOlap::TInsertedData insertData((ui64)writeId, writeMeta.GetTableId(), writeMeta.GetDedupId(), logoBlobId, meta, time, tableSchema->GetSnapshot());
bool ok = Self->InsertTable->Insert(dbTable, std::move(insertData));
if (ok) {
THashSet<TWriteId> writesToAbort = Self->InsertTable->OldWritesToAbort(time);
@@ -292,7 +294,7 @@ void TColumnShard::Handle(TEvColumnShard::TEvWrite::TPtr& ev, const TActorContex
<< WritesMonitor.DebugString()
<< " at tablet " << TabletID());
- auto writeController = std::make_shared<NOlap::TIndexedWriteController>(ctx.SelfID, writeData, snapshotSchema);
+ auto writeController = std::make_shared<NOlap::TIndexedWriteController>(ctx.SelfID, writeData);
ctx.Register(CreateWriteActor(TabletID(), writeController, BlobManager->StartBlobBatch(), TInstant::Max(), Settings.MaxSmallBlobSize));
}
}
@@ -303,6 +305,21 @@ void TColumnShard::Handle(NEvents::TDataEvents::TEvWrite::TPtr& ev, const TActor
const ui64 txId = ev->Get()->GetTxId();
const auto source = ev->Sender;
+ if (!record.GetTableId().HasSchemaVersion()) {
+ IncCounter(COUNTER_WRITE_FAIL);
+ auto result = NEvents::TDataEvents::TEvWriteResult::BuildError(txId, NKikimrDataEvents::TEvWriteResult::BAD_REQUEST, "schema version not set");
+ ctx.Send(source, result.release());
+ return;
+ }
+
+ auto schema = TablesManager.GetPrimaryIndex()->GetVersionedIndex().GetSchema(record.GetTableId().GetSchemaVersion());
+ if (!schema) {
+ IncCounter(COUNTER_WRITE_FAIL);
+ auto result = NEvents::TDataEvents::TEvWriteResult::BuildError(txId, NKikimrDataEvents::TEvWriteResult::BAD_REQUEST, "unknown schema version");
+ ctx.Send(source, result.release());
+ return;
+ }
+
if (!TablesManager.IsReadyForWrite(tableId)) {
IncCounter(COUNTER_WRITE_FAIL);
auto result = NEvents::TDataEvents::TEvWriteResult::BuildError(txId, NKikimrDataEvents::TEvWriteResult::ERROR, "table not writable");
@@ -310,7 +327,7 @@ void TColumnShard::Handle(NEvents::TDataEvents::TEvWrite::TPtr& ev, const TActor
return;
}
- auto arrowData = std::make_shared<TArrowData>(TablesManager.GetPrimaryIndex()->GetVersionedIndex().GetLastSchema());
+ auto arrowData = std::make_shared<TArrowData>(schema);
if (!arrowData->Parse(record.GetReplace(), TPayloadHelper<NEvents::TDataEvents::TEvWrite>(*ev->Get()))) {
IncCounter(COUNTER_WRITE_FAIL);
auto result = NEvents::TDataEvents::TEvWriteResult::BuildError(txId, NKikimrDataEvents::TEvWriteResult::BAD_REQUEST, "parsing data error");
diff --git a/ydb/core/tx/columnshard/columnshard_private_events.h b/ydb/core/tx/columnshard/columnshard_private_events.h
index af290e2556..ce18fd97bb 100644
--- a/ydb/core/tx/columnshard/columnshard_private_events.h
+++ b/ydb/core/tx/columnshard/columnshard_private_events.h
@@ -272,18 +272,18 @@ struct TEvPrivate {
}
};
- TEvWriteBlobsResult(const NColumnShard::TBlobPutResult::TPtr& putResult, const NEvWrite::TWriteMeta& writeMeta, const NOlap::TSnapshot& snapshot)
+ TEvWriteBlobsResult(const NColumnShard::TBlobPutResult::TPtr& putResult, const NEvWrite::TWriteMeta& writeMeta)
: PutResult(putResult)
, WriteMeta(writeMeta)
- , Snapshot(snapshot)
{
Y_VERIFY(PutResult);
}
- TEvWriteBlobsResult(const NColumnShard::TBlobPutResult::TPtr& putResult, TVector<TPutBlobData>&& blobData, const NEvWrite::TWriteMeta& writeMeta, const NOlap::TSnapshot& snapshot)
- : TEvWriteBlobsResult(putResult, writeMeta, snapshot)
+ TEvWriteBlobsResult(const NColumnShard::TBlobPutResult::TPtr& putResult, TVector<TPutBlobData>&& blobData, const NEvWrite::TWriteMeta& writeMeta, const ui64 schemaVersion)
+ : TEvWriteBlobsResult(putResult, writeMeta)
{
BlobData = std::move(blobData);
+ SchemaVersion = schemaVersion;
}
const TVector<TPutBlobData>& GetBlobData() const {
@@ -302,15 +302,15 @@ struct TEvPrivate {
return WriteMeta;
}
- const NOlap::TSnapshot& GetSnapshot() const {
- return Snapshot;
+ ui64 GetSchemaVersion() const {
+ return SchemaVersion;
}
private:
NColumnShard::TBlobPutResult::TPtr PutResult;
TVector<TPutBlobData> BlobData;
NEvWrite::TWriteMeta WriteMeta;
- NOlap::TSnapshot Snapshot;
+ ui64 SchemaVersion = 0;
};
};
diff --git a/ydb/core/tx/columnshard/engines/column_engine.h b/ydb/core/tx/columnshard/engines/column_engine.h
index 761bf724a8..3b2e5f7a1f 100644
--- a/ydb/core/tx/columnshard/engines/column_engine.h
+++ b/ydb/core/tx/columnshard/engines/column_engine.h
@@ -305,7 +305,19 @@ public:
class TVersionedIndex {
std::map<TSnapshot, ISnapshotSchema::TPtr> Snapshots;
std::shared_ptr<arrow::Schema> IndexKey;
+ std::map<ui64, ISnapshotSchema::TPtr> SnapshotByVersion;
public:
+ ISnapshotSchema::TPtr GetSchema(const ui64 version) const {
+ auto it = SnapshotByVersion.find(version);
+ return it == SnapshotByVersion.end() ? nullptr : it->second;
+ }
+
+ ISnapshotSchema::TPtr GetSchemaUnsafe(const ui64 version) const {
+ auto it = SnapshotByVersion.find(version);
+ Y_VERIFY(it != SnapshotByVersion.end());
+ return it->second;
+ }
+
ISnapshotSchema::TPtr GetSchema(const TSnapshot& version) const {
for (auto it = Snapshots.rbegin(); it != Snapshots.rend(); ++it) {
if (it->first <= version) {
@@ -332,7 +344,10 @@ public:
} else {
Y_VERIFY(IndexKey->Equals(indexInfo.GetIndexKey()));
}
- Snapshots.emplace(version, std::make_shared<TSnapshotSchema>(std::move(indexInfo), version));
+ auto it = Snapshots.emplace(version, std::make_shared<TSnapshotSchema>(std::move(indexInfo), version));
+ if (!SnapshotByVersion.emplace(it.first->second->GetVersion(), it.first->second).second) {
+ Y_VERIFY(GetLastSchema()->GetVersion() == it.first->second->GetVersion());
+ }
}
};
diff --git a/ydb/core/tx/columnshard/engines/scheme/abstract_scheme.h b/ydb/core/tx/columnshard/engines/scheme/abstract_scheme.h
index 06fd516712..02748f05c9 100644
--- a/ydb/core/tx/columnshard/engines/scheme/abstract_scheme.h
+++ b/ydb/core/tx/columnshard/engines/scheme/abstract_scheme.h
@@ -40,6 +40,7 @@ public:
virtual const std::shared_ptr<arrow::Schema>& GetSchema() const = 0;
virtual const TIndexInfo& GetIndexInfo() const = 0;
virtual const TSnapshot& GetSnapshot() const = 0;
+ virtual ui64 GetVersion() const = 0;
virtual ui32 GetColumnsCount() const = 0;
std::shared_ptr<arrow::RecordBatch> NormalizeBatch(const ISnapshotSchema& dataSchema, const std::shared_ptr<arrow::RecordBatch> batch) const;
diff --git a/ydb/core/tx/columnshard/engines/scheme/filtered_scheme.cpp b/ydb/core/tx/columnshard/engines/scheme/filtered_scheme.cpp
index 31b8135138..61d3a8d200 100644
--- a/ydb/core/tx/columnshard/engines/scheme/filtered_scheme.cpp
+++ b/ydb/core/tx/columnshard/engines/scheme/filtered_scheme.cpp
@@ -63,4 +63,8 @@ ui32 TFilteredSnapshotSchema::GetColumnsCount() const {
return Schema->num_fields();
}
+ui64 TFilteredSnapshotSchema::GetVersion() const {
+ return OriginalSnapshot->GetIndexInfo().GetVersion();
+}
+
}
diff --git a/ydb/core/tx/columnshard/engines/scheme/filtered_scheme.h b/ydb/core/tx/columnshard/engines/scheme/filtered_scheme.h
index 00e754e587..9673d78061 100644
--- a/ydb/core/tx/columnshard/engines/scheme/filtered_scheme.h
+++ b/ydb/core/tx/columnshard/engines/scheme/filtered_scheme.h
@@ -23,6 +23,7 @@ public:
const TIndexInfo& GetIndexInfo() const override;
const TSnapshot& GetSnapshot() const override;
ui32 GetColumnsCount() const override;
+ ui64 GetVersion() const override;
};
}
diff --git a/ydb/core/tx/columnshard/engines/scheme/index_info.cpp b/ydb/core/tx/columnshard/engines/scheme/index_info.cpp
index b87054b82c..3d6b808e4e 100644
--- a/ydb/core/tx/columnshard/engines/scheme/index_info.cpp
+++ b/ydb/core/tx/columnshard/engines/scheme/index_info.cpp
@@ -374,6 +374,7 @@ bool TIndexInfo::DeserializeFromProto(const NKikimrSchemeOp::TColumnTableSchema&
DefaultCompression = *result;
}
+ Version = schema.GetVersion();
CompositeMarks = schema.GetCompositeMarks();
CompositeIndexKey = AppData()->FeatureFlags.GetForceColumnTablesCompositeMarks() ? true : CompositeMarks;
return true;
diff --git a/ydb/core/tx/columnshard/engines/scheme/index_info.h b/ydb/core/tx/columnshard/engines/scheme/index_info.h
index 7bb0b9bdda..ec8835e71f 100644
--- a/ydb/core/tx/columnshard/engines/scheme/index_info.h
+++ b/ydb/core/tx/columnshard/engines/scheme/index_info.h
@@ -179,8 +179,13 @@ public:
return result;
}
+ ui64 GetVersion() const {
+ return Version;
+ }
+
private:
ui32 Id;
+ ui64 Version = 0;
TString Name;
bool CompositeIndexKey = false;
mutable std::shared_ptr<arrow::Schema> Schema;
diff --git a/ydb/core/tx/columnshard/engines/scheme/snapshot_scheme.cpp b/ydb/core/tx/columnshard/engines/scheme/snapshot_scheme.cpp
index 52d4b7d328..3058c5943c 100644
--- a/ydb/core/tx/columnshard/engines/scheme/snapshot_scheme.cpp
+++ b/ydb/core/tx/columnshard/engines/scheme/snapshot_scheme.cpp
@@ -46,4 +46,8 @@ ui32 TSnapshotSchema::GetColumnsCount() const {
return Schema->num_fields();
}
+ui64 TSnapshotSchema::GetVersion() const {
+ return IndexInfo.GetVersion();
+}
+
}
diff --git a/ydb/core/tx/columnshard/engines/scheme/snapshot_scheme.h b/ydb/core/tx/columnshard/engines/scheme/snapshot_scheme.h
index b72819a311..e144e7fe35 100644
--- a/ydb/core/tx/columnshard/engines/scheme/snapshot_scheme.h
+++ b/ydb/core/tx/columnshard/engines/scheme/snapshot_scheme.h
@@ -23,6 +23,7 @@ public:
const TIndexInfo& GetIndexInfo() const override;
const TSnapshot& GetSnapshot() const override;
ui32 GetColumnsCount() const override;
+ ui64 GetVersion() const override;
};
}
diff --git a/ydb/core/tx/columnshard/engines/writer/indexed_blob_constructor.cpp b/ydb/core/tx/columnshard/engines/writer/indexed_blob_constructor.cpp
index 87232608f2..0ab5872d39 100644
--- a/ydb/core/tx/columnshard/engines/writer/indexed_blob_constructor.cpp
+++ b/ydb/core/tx/columnshard/engines/writer/indexed_blob_constructor.cpp
@@ -6,9 +6,8 @@
namespace NKikimr::NOlap {
-TIndexedWriteController::TBlobConstructor::TBlobConstructor(NOlap::ISnapshotSchema::TPtr snapshotSchema, TIndexedWriteController& owner)
+TIndexedWriteController::TBlobConstructor::TBlobConstructor(TIndexedWriteController& owner)
: Owner(owner)
- , SnapshotSchema(snapshotSchema)
{}
const TString& TIndexedWriteController::TBlobConstructor::GetBlob() const {
@@ -55,9 +54,9 @@ bool TIndexedWriteController::TBlobConstructor::Init() {
return true;
}
-TIndexedWriteController::TIndexedWriteController(const TActorId& dstActor, const NEvWrite::TWriteData& writeData, NOlap::ISnapshotSchema::TPtr snapshotSchema)
+TIndexedWriteController::TIndexedWriteController(const TActorId& dstActor, const NEvWrite::TWriteData& writeData)
: WriteData(writeData)
- , BlobConstructor(std::make_shared<TBlobConstructor>(snapshotSchema, *this))
+ , BlobConstructor(std::make_shared<TBlobConstructor>(*this))
, DstActor(dstActor)
{
ResourceUsage.SourceMemorySize = WriteData.GetSize();
@@ -65,10 +64,10 @@ TIndexedWriteController::TIndexedWriteController(const TActorId& dstActor, const
void TIndexedWriteController::DoOnReadyResult(const NActors::TActorContext& ctx, const NColumnShard::TBlobPutResult::TPtr& putResult) {
if (putResult->GetPutStatus() == NKikimrProto::OK) {
- auto result = std::make_unique<NColumnShard::TEvPrivate::TEvWriteBlobsResult>(putResult, std::move(BlobData), WriteData.GetWriteMeta(), BlobConstructor->GetSnapshot());
+ auto result = std::make_unique<NColumnShard::TEvPrivate::TEvWriteBlobsResult>(putResult, std::move(BlobData), WriteData.GetWriteMeta(), WriteData.GetData().GetSchemaVersion());
ctx.Send(DstActor, result.release());
} else {
- auto result = std::make_unique<NColumnShard::TEvPrivate::TEvWriteBlobsResult>(putResult, WriteData.GetWriteMeta(), BlobConstructor->GetSnapshot());
+ auto result = std::make_unique<NColumnShard::TEvPrivate::TEvWriteBlobsResult>(putResult, WriteData.GetWriteMeta());
ctx.Send(DstActor, result.release());
}
}
diff --git a/ydb/core/tx/columnshard/engines/writer/indexed_blob_constructor.h b/ydb/core/tx/columnshard/engines/writer/indexed_blob_constructor.h
index 299c9b640f..2800c58fc0 100644
--- a/ydb/core/tx/columnshard/engines/writer/indexed_blob_constructor.h
+++ b/ydb/core/tx/columnshard/engines/writer/indexed_blob_constructor.h
@@ -16,19 +16,15 @@ class TIndexedWriteController : public NColumnShard::IWriteController {
private:
class TBlobConstructor : public IBlobConstructor {
TIndexedWriteController& Owner;
- NOlap::ISnapshotSchema::TPtr SnapshotSchema;
std::vector<NArrow::TSerializedBatch> BlobsSplitted;
ui64 CurrentIndex = 0;
public:
- TBlobConstructor(NOlap::ISnapshotSchema::TPtr snapshotSchema, TIndexedWriteController& owner);
+ TBlobConstructor(TIndexedWriteController& owner);
const TString& GetBlob() const override;
EStatus BuildNext() override;
bool RegisterBlobId(const TUnifiedBlobId& blobId) override;
- const NOlap::TSnapshot& GetSnapshot() const {
- return SnapshotSchema->GetSnapshot();
- }
bool Init();
};
@@ -38,7 +34,7 @@ private:
TActorId DstActor;
public:
- TIndexedWriteController(const TActorId& dstActor, const NEvWrite::TWriteData& writeData, NOlap::ISnapshotSchema::TPtr snapshotSchema);
+ TIndexedWriteController(const TActorId& dstActor, const NEvWrite::TWriteData& writeData);
void DoOnReadyResult(const NActors::TActorContext& ctx, const NColumnShard::TBlobPutResult::TPtr& putResult) override;
diff --git a/ydb/core/tx/columnshard/operations/write.cpp b/ydb/core/tx/columnshard/operations/write.cpp
index cefb4b481b..63883faa35 100644
--- a/ydb/core/tx/columnshard/operations/write.cpp
+++ b/ydb/core/tx/columnshard/operations/write.cpp
@@ -22,8 +22,7 @@ namespace NKikimr::NColumnShard {
Y_VERIFY(Status == EOperationStatus::Draft);
NEvWrite::TWriteMeta writeMeta((ui64)WriteId, tableId, source);
- const auto& snapshotSchema = owner.TablesManager.GetPrimaryIndex()->GetVersionedIndex().GetLastSchema();
- auto writeController = std::make_shared<NOlap::TIndexedWriteController>(ctx.SelfID, NEvWrite::TWriteData(writeMeta, data), snapshotSchema);
+ auto writeController = std::make_shared<NOlap::TIndexedWriteController>(ctx.SelfID, NEvWrite::TWriteData(writeMeta, data));
ctx.Register(CreateWriteActor(owner.TabletID(), writeController, owner.BlobManager->StartBlobBatch(), TInstant::Max(), owner.Settings.MaxSmallBlobSize));
Status = EOperationStatus::Started;
}
diff --git a/ydb/core/tx/columnshard/operations/write_data.cpp b/ydb/core/tx/columnshard/operations/write_data.cpp
index b994b09684..18e1d4e7bd 100644
--- a/ydb/core/tx/columnshard/operations/write_data.cpp
+++ b/ydb/core/tx/columnshard/operations/write_data.cpp
@@ -20,6 +20,10 @@ std::shared_ptr<arrow::RecordBatch> TArrowData::GetArrowBatch() const {
return IndexSchema->PrepareForInsert(IncomingData, BatchSchema->GetSchema());
}
+ui64 TArrowData::GetSchemaVersion() const {
+ return IndexSchema->GetVersion();
+}
+
bool TProtoArrowData::ParseFromProto(const NKikimrTxColumnShard::TEvWrite& proto) {
IncomingData = proto.GetData();
if (proto.HasMeta()) {
@@ -40,4 +44,8 @@ std::shared_ptr<arrow::RecordBatch> TProtoArrowData::GetArrowBatch() const {
return IndexSchema->PrepareForInsert(IncomingData, ArrowSchema);
}
+ui64 TProtoArrowData::GetSchemaVersion() const {
+ return IndexSchema->GetVersion();
+}
+
}
diff --git a/ydb/core/tx/columnshard/operations/write_data.h b/ydb/core/tx/columnshard/operations/write_data.h
index 8bb52f1e66..bf8c79eb34 100644
--- a/ydb/core/tx/columnshard/operations/write_data.h
+++ b/ydb/core/tx/columnshard/operations/write_data.h
@@ -50,6 +50,7 @@ public:
bool Parse(const NKikimrDataEvents::TOperationData& proto, const IPayloadData& payload);
std::shared_ptr<arrow::RecordBatch> GetArrowBatch() const override;
+ ui64 GetSchemaVersion() const override;
private:
NOlap::ISnapshotSchema::TPtr IndexSchema;
@@ -69,6 +70,7 @@ public:
bool ParseFromProto(const NKikimrTxColumnShard::TEvWrite& proto);
std::shared_ptr<arrow::RecordBatch> GetArrowBatch() const override;
+ ui64 GetSchemaVersion() const override;
private:
NOlap::ISnapshotSchema::TPtr IndexSchema;
diff --git a/ydb/core/tx/columnshard/ut_rw/ut_columnshard_read_write.cpp b/ydb/core/tx/columnshard/ut_rw/ut_columnshard_read_write.cpp
index 350455b7ad..ae9e2954f9 100644
--- a/ydb/core/tx/columnshard/ut_rw/ut_columnshard_read_write.cpp
+++ b/ydb/core/tx/columnshard/ut_rw/ut_columnshard_read_write.cpp
@@ -1860,6 +1860,10 @@ Y_UNIT_TEST_SUITE(EvWrite) {
}
proto.MutableArrowData()->SetPayloadIndex(Index);
}
+
+ ui64 GetSchemaVersion() const override {
+ return 1;
+ }
};
void PrepareTablet(TTestBasicRuntime& runtime, const ui64 tableId, const std::vector<std::pair<TString, TTypeInfo>>& schema) {
diff --git a/ydb/core/tx/ev_write/events.h b/ydb/core/tx/ev_write/events.h
index 2ddd53c2b3..827e0100a9 100644
--- a/ydb/core/tx/ev_write/events.h
+++ b/ydb/core/tx/ev_write/events.h
@@ -17,6 +17,7 @@ public:
using TPtr = std::shared_ptr<IDataConstructor>;
virtual ~IDataConstructor() {}
virtual void Serialize(NKikimrDataEvents::TOperationData& proto) const = 0;
+ virtual ui64 GetSchemaVersion() const = 0;
};
struct TDataEvents {
@@ -53,6 +54,7 @@ struct TDataEvents {
void AddReplaceOp(const ui64 tableId, const IDataConstructor::TPtr& data) {
Record.MutableTableId()->SetTableId(tableId);
Y_VERIFY(data);
+ Record.MutableTableId()->SetSchemaVersion(data->GetSchemaVersion());
data->Serialize(*Record.MutableReplace());
}
diff --git a/ydb/core/tx/ev_write/write_data.h b/ydb/core/tx/ev_write/write_data.h
index 54f7ee0525..bcdd83a071 100644
--- a/ydb/core/tx/ev_write/write_data.h
+++ b/ydb/core/tx/ev_write/write_data.h
@@ -13,6 +13,7 @@ public:
virtual ~IDataContainer() {}
virtual std::shared_ptr<arrow::RecordBatch> GetArrowBatch() const = 0;
virtual const TString& GetData() const = 0;
+ virtual ui64 GetSchemaVersion() const = 0;
};
class TWriteMeta {
diff --git a/ydb/core/tx/schemeshard/schemeshard_olap_types.h b/ydb/core/tx/schemeshard/schemeshard_olap_types.h
index 1a9a69e6cf..e3015d8786 100644
--- a/ydb/core/tx/schemeshard/schemeshard_olap_types.h
+++ b/ydb/core/tx/schemeshard/schemeshard_olap_types.h
@@ -144,7 +144,7 @@ namespace NKikimr::NSchemeShard {
}
bool Update(const TOlapSchemaUpdate& schemaUpdate, IErrorCollector& errors);
-
+
void Parse(const NKikimrSchemeOp::TColumnTableSchema& tableSchema);
void Serialize(NKikimrSchemeOp::TColumnTableSchema& tableSchema) const;
bool Validate(const NKikimrSchemeOp::TColumnTableSchema& opSchema, IErrorCollector& errors) const;