diff options
author | Ilnaz Nizametdinov <i.nizametdinov@gmail.com> | 2022-03-10 17:48:58 +0300 |
---|---|---|
committer | Ilnaz Nizametdinov <i.nizametdinov@gmail.com> | 2022-03-10 17:48:58 +0300 |
commit | 8d01f8c26b6cd7b0d53bb6e75d34fba2702d338a (patch) | |
tree | 86989f7a102fa6029e1fcad851cc1a9cd2bf0610 | |
parent | 4f7269728d68c309828901214776737f895b1109 (diff) | |
download | ydb-8d01f8c26b6cd7b0d53bb6e75d34fba2702d338a.tar.gz |
CDC records in json format KIKIMR-14198
ref:2e21db5e45fb4048f6bbe49585393b7e877d0b46
32 files changed, 525 insertions, 95 deletions
diff --git a/CMakeLists.darwin.txt b/CMakeLists.darwin.txt index e279fc2dc86..8db1ee0546f 100644 --- a/CMakeLists.darwin.txt +++ b/CMakeLists.darwin.txt @@ -585,6 +585,7 @@ add_subdirectory(ydb/core/sys_view/tablets) add_subdirectory(ydb/core/tx/datashard) add_subdirectory(library/cpp/containers/flat_hash) add_subdirectory(library/cpp/containers/flat_hash/lib) +add_subdirectory(library/cpp/json/yson) add_subdirectory(ydb/core/persqueue/partition_key_range) add_subdirectory(ydb/core/persqueue/writer) add_subdirectory(ydb/core/persqueue/events) @@ -612,7 +613,6 @@ add_subdirectory(ydb/library/yql/dq/actors/compute) add_subdirectory(ydb/library/yql/dq/tasks) add_subdirectory(ydb/core/tx/long_tx_service/public) add_subdirectory(ydb/core/yq/libs/actors) -add_subdirectory(library/cpp/json/yson) add_subdirectory(ydb/core/yq/libs/actors/logging) add_subdirectory(ydb/core/yq/libs/checkpointing) add_subdirectory(ydb/core/yq/libs/checkpointing_common) diff --git a/CMakeLists.linux.txt b/CMakeLists.linux.txt index 01be75e7437..d956b9e31e9 100644 --- a/CMakeLists.linux.txt +++ b/CMakeLists.linux.txt @@ -666,6 +666,7 @@ add_subdirectory(ydb/core/sys_view/tablets) add_subdirectory(ydb/core/tx/datashard) add_subdirectory(library/cpp/containers/flat_hash) add_subdirectory(library/cpp/containers/flat_hash/lib) +add_subdirectory(library/cpp/json/yson) add_subdirectory(ydb/core/persqueue/partition_key_range) add_subdirectory(ydb/core/persqueue/writer) add_subdirectory(ydb/core/persqueue/events) @@ -693,7 +694,6 @@ add_subdirectory(ydb/library/yql/dq/actors/compute) add_subdirectory(ydb/library/yql/dq/tasks) add_subdirectory(ydb/core/tx/long_tx_service/public) add_subdirectory(ydb/core/yq/libs/actors) -add_subdirectory(library/cpp/json/yson) add_subdirectory(ydb/core/yq/libs/actors/logging) add_subdirectory(ydb/core/yq/libs/checkpointing) add_subdirectory(ydb/core/yq/libs/checkpointing_common) diff --git a/ydb/core/engine/minikql/change_collector_iface.h b/ydb/core/engine/minikql/change_collector_iface.h index fe6019b300e..c96ffaa3ce9 100644 --- a/ydb/core/engine/minikql/change_collector_iface.h +++ b/ydb/core/engine/minikql/change_collector_iface.h @@ -9,12 +9,16 @@ namespace NMiniKQL { class IChangeCollector { public: // basic change record's info - struct TChange: public std::tuple<ui64, TPathId, ui64> { - using std::tuple<ui64, TPathId, ui64>::tuple; + struct TChange: public std::tuple<ui64, TPathId, ui64, TPathId, ui64> { + using std::tuple<ui64, TPathId, ui64, TPathId, ui64>::tuple; ui64 Order() const { return std::get<0>(*this); } const TPathId& PathId() const { return std::get<1>(*this); } ui64 BodySize() const { return std::get<2>(*this); } + const TPathId& TableId() const { return std::get<3>(*this); } + ui64 SchemaVersion() const { return std::get<4>(*this); } + + void SetPathId(const TPathId& value) { std::get<1>(*this) = value; } }; public: @@ -34,3 +38,13 @@ public: } // NMiniKQL } // NKikimr + +Y_DECLARE_OUT_SPEC(inline, NKikimr::NMiniKQL::IChangeCollector::TChange, o, x) { + o << "{" + << " Order: " << x.Order() + << " PathId: " << x.PathId() + << " BodySize: " << x.BodySize() + << " TableId: " << x.TableId() + << " SchemaVersion: " << x.SchemaVersion() + << " }"; +} diff --git a/ydb/core/tx/datashard/CMakeLists.txt b/ydb/core/tx/datashard/CMakeLists.txt index a40708faaeb..842cf3224bf 100644 --- a/ydb/core/tx/datashard/CMakeLists.txt +++ b/ydb/core/tx/datashard/CMakeLists.txt @@ -19,6 +19,9 @@ target_link_libraries(core-tx-datashard PUBLIC cpp-actors-core cpp-containers-flat_hash cpp-html-pcdata + library-cpp-json + cpp-json-yson + cpp-string_utils-base64 cpp-string_utils-quote ydb-core-actorlib_impl ydb-core-base @@ -249,6 +252,9 @@ target_link_libraries(core-tx-datashard.global PUBLIC cpp-actors-core cpp-containers-flat_hash cpp-html-pcdata + library-cpp-json + cpp-json-yson + cpp-string_utils-base64 cpp-string_utils-quote ydb-core-actorlib_impl ydb-core-base diff --git a/ydb/core/tx/datashard/alter_cdc_stream_unit.cpp b/ydb/core/tx/datashard/alter_cdc_stream_unit.cpp index b79d5017361..f55579154d2 100644 --- a/ydb/core/tx/datashard/alter_cdc_stream_unit.cpp +++ b/ydb/core/tx/datashard/alter_cdc_stream_unit.cpp @@ -38,9 +38,17 @@ public: Y_VERIFY_S(streamDesc.GetState() == NKikimrSchemeOp::ECdcStreamStateDisabled, "Unexpected alter cdc stream" << ": desc# " << streamDesc.ShortDebugString()); - auto tableInfo = DataShard.AlterTableDisableCdcStream(ctx, txc, pathId, params.GetTableSchemaVersion(), streamPathId); + + const auto version = params.GetTableSchemaVersion(); + Y_VERIFY(version); + + auto tableInfo = DataShard.AlterTableDisableCdcStream(ctx, txc, pathId, version, streamPathId); DataShard.AddUserTable(pathId, tableInfo); + if (tableInfo->NeedSchemaSnapshots()) { + DataShard.AddSchemaSnapshot(pathId, version, op->GetStep(), op->GetTxId(), txc, ctx); + } + BuildResult(op, NKikimrTxDataShard::TEvProposeTransactionResult::COMPLETE); op->Result()->SetStepOrderId(op->GetStepOrder().ToPair()); diff --git a/ydb/core/tx/datashard/alter_table_unit.cpp b/ydb/core/tx/datashard/alter_table_unit.cpp index 78786adae78..34784d6f089 100644 --- a/ydb/core/tx/datashard/alter_table_unit.cpp +++ b/ydb/core/tx/datashard/alter_table_unit.cpp @@ -136,9 +136,12 @@ EExecutionStatus TAlterTableUnit::Execute(TOperation::TPtr op, const auto& alterTableTx = schemeTx.GetAlterTable(); + const auto version = alterTableTx.GetTableSchemaVersion(); + Y_VERIFY(version); + LOG_INFO_S(ctx, NKikimrServices::TX_DATASHARD, "Trying to ALTER TABLE at " << DataShard.TabletID() - << " version " << alterTableTx.GetTableSchemaVersion()); + << " version " << version); TPathId tableId(DataShard.GetPathOwnerId(), alterTableTx.GetId_Deprecated()); if (alterTableTx.HasPathId()) { @@ -148,9 +151,12 @@ EExecutionStatus TAlterTableUnit::Execute(TOperation::TPtr op, } TUserTable::TPtr info = DataShard.AlterUserTable(ctx, txc, alterTableTx); - DataShard.AddUserTable(tableId, info); + if (info->NeedSchemaSnapshots()) { + DataShard.AddSchemaSnapshot(tableId, version, op->GetStep(), op->GetTxId(), txc, ctx); + } + BuildResult(op, NKikimrTxDataShard::TEvProposeTransactionResult::COMPLETE); op->Result()->SetStepOrderId(op->GetStepOrder().ToPair()); diff --git a/ydb/core/tx/datashard/change_collector_base.cpp b/ydb/core/tx/datashard/change_collector_base.cpp index 779d0ce4437..29fca256764 100644 --- a/ydb/core/tx/datashard/change_collector_base.cpp +++ b/ydb/core/tx/datashard/change_collector_base.cpp @@ -143,12 +143,19 @@ void TBaseChangeCollector::Persist( .WithStep(WriteVersion.Step) .WithTxId(WriteVersion.TxId) .WithPathId(pathId) + .WithTableId(tableId.PathId) .WithSchemaVersion(userTable->GetTableSchemaVersion()) .WithBody(body.SerializeAsString()) .Build(); Self->PersistChangeRecord(db, record); - Collected.emplace_back(record.GetOrder(), record.GetPathId(), record.GetBody().size()); + Collected.emplace_back( + record.GetOrder(), + record.GetPathId(), + record.GetBody().size(), + record.GetTableId(), + record.GetSchemaVersion() + ); } } // NDataShard diff --git a/ydb/core/tx/datashard/change_record.cpp b/ydb/core/tx/datashard/change_record.cpp index f191d3c5b03..cce147d67f7 100644 --- a/ydb/core/tx/datashard/change_record.cpp +++ b/ydb/core/tx/datashard/change_record.cpp @@ -1,6 +1,13 @@ #include "change_record.h" +#include "export_common.h" + +#include <library/cpp/json/json_reader.h> +#include <library/cpp/json/yson/json2yson.h> +#include <library/cpp/string_utils/base64/base64.h> #include <ydb/core/protos/change_exchange.pb.h> +#include <ydb/core/util/yverify_stream.h> +#include <ydb/library/binary_json/read.h> #include <util/stream/str.h> @@ -55,6 +62,156 @@ void TChangeRecord::SerializeTo(NKikimrChangeExchange::TChangeRecord& record) co } } +static NJson::TJsonValue StringToJson(TStringBuf in) { + NJson::TJsonValue result; + Y_VERIFY(NJson::ReadJsonTree(in, &result)); + return result; +} + +static NJson::TJsonValue YsonToJson(TStringBuf in) { + NJson::TJsonValue result; + Y_VERIFY(NJson2Yson::DeserializeYsonAsJsonValue(in, &result)); + return result; +} + +static NJson::TJsonValue ToJson(const TCell& cell, NScheme::TTypeId type) { + if (cell.IsNull()) { + return NJson::TJsonValue(NJson::JSON_NULL); + } + + switch (type) { + case NScheme::NTypeIds::Bool: + return NJson::TJsonValue(cell.AsValue<bool>()); + case NScheme::NTypeIds::Int8: + return NJson::TJsonValue(cell.AsValue<i8>()); + case NScheme::NTypeIds::Uint8: + return NJson::TJsonValue(cell.AsValue<ui8>()); + case NScheme::NTypeIds::Int16: + return NJson::TJsonValue(cell.AsValue<i16>()); + case NScheme::NTypeIds::Uint16: + return NJson::TJsonValue(cell.AsValue<ui16>()); + case NScheme::NTypeIds::Int32: + return NJson::TJsonValue(cell.AsValue<i32>()); + case NScheme::NTypeIds::Uint32: + return NJson::TJsonValue(cell.AsValue<ui32>()); + case NScheme::NTypeIds::Int64: + return NJson::TJsonValue(cell.AsValue<i64>()); + case NScheme::NTypeIds::Uint64: + return NJson::TJsonValue(cell.AsValue<ui64>()); + case NScheme::NTypeIds::Float: + return NJson::TJsonValue(cell.AsValue<float>()); + case NScheme::NTypeIds::Double: + return NJson::TJsonValue(cell.AsValue<double>()); + case NScheme::NTypeIds::Date: + return NJson::TJsonValue(TInstant::Days(cell.AsValue<ui16>()).ToString()); + case NScheme::NTypeIds::Datetime: + return NJson::TJsonValue(TInstant::Seconds(cell.AsValue<ui32>()).ToString()); + case NScheme::NTypeIds::Timestamp: + return NJson::TJsonValue(TInstant::MicroSeconds(cell.AsValue<ui64>()).ToString()); + case NScheme::NTypeIds::Interval: + return NJson::TJsonValue(cell.AsValue<i64>()); + case NScheme::NTypeIds::Decimal: + return NJson::TJsonValue(DecimalToString(cell.AsValue<std::pair<ui64, i64>>())); + case NScheme::NTypeIds::DyNumber: + return NJson::TJsonValue(DyNumberToString(cell.AsBuf())); + case NScheme::NTypeIds::String: + case NScheme::NTypeIds::String4k: + case NScheme::NTypeIds::String2m: + return NJson::TJsonValue(Base64Encode(cell.AsBuf())); + case NScheme::NTypeIds::Utf8: + return NJson::TJsonValue(cell.AsBuf()); + case NScheme::NTypeIds::Json: + return StringToJson(cell.AsBuf()); + case NScheme::NTypeIds::JsonDocument: + return StringToJson(NBinaryJson::SerializeToJson(cell.AsBuf())); + case NScheme::NTypeIds::Yson: + return YsonToJson(cell.AsBuf()); + default: + Y_FAIL("Unexpected type"); + } +} + +static void SerializeJsonKey(TUserTable::TCPtr schema, NJson::TJsonValue& key, + const NKikimrChangeExchange::TChangeRecord::TDataChange::TSerializedCells& in) +{ + Y_VERIFY(in.TagsSize() != schema->KeyColumnIds.size()); + for (size_t i = 0; i < schema->KeyColumnIds.size(); ++i) { + Y_VERIFY(in.GetTags(i) == schema->KeyColumnIds.at(i)); + } + + TSerializedCellVec cells; + Y_VERIFY(TSerializedCellVec::TryParse(in.GetData(), cells)); + + Y_VERIFY(cells.GetCells().size() == schema->KeyColumnTypes.size()); + for (size_t i = 0; i < schema->KeyColumnTypes.size(); ++i) { + const auto type = schema->KeyColumnTypes.at(i); + const auto& cell = cells.GetCells().at(i); + key.AppendValue(ToJson(cell, type)); + } +} + +static void SerializeJsonValue(TUserTable::TCPtr schema, NJson::TJsonValue& value, + const NKikimrChangeExchange::TChangeRecord::TDataChange::TSerializedCells& in) +{ + TSerializedCellVec cells; + Y_VERIFY(TSerializedCellVec::TryParse(in.GetData(), cells)); + Y_VERIFY(in.TagsSize() == cells.GetCells().size()); + + for (ui32 i = 0; i < in.TagsSize(); ++i) { + const auto tag = in.GetTags(i); + const auto& cell = cells.GetCells().at(i); + + auto it = schema->Columns.find(tag); + Y_VERIFY(it != schema->Columns.end()); + + const auto& column = it->second; + value.InsertValue(column.Name, ToJson(cell, column.Type)); + } +} + +void TChangeRecord::SerializeTo(NJson::TJsonValue& key, NJson::TJsonValue& value) const { + switch (Kind) { + case EKind::CdcDataChange: { + Y_VERIFY(Schema); + + NKikimrChangeExchange::TChangeRecord::TDataChange body; + Y_VERIFY(body.ParseFromArray(Body.data(), Body.size())); + + SerializeJsonKey(Schema, key, body.GetKey()); + + if (body.HasOldImage()) { + SerializeJsonValue(Schema, value["oldImage"], body.GetOldImage()); + } + + if (body.HasNewImage()) { + SerializeJsonValue(Schema, value["newImage"], body.GetNewImage()); + } + + if (!body.HasOldImage() && !body.HasNewImage()) { + switch (body.GetRowOperationCase()) { + case NKikimrChangeExchange::TChangeRecord::TDataChange::kUpsert: + SerializeJsonValue(Schema, value["update"], body.GetUpsert()); + break; + case NKikimrChangeExchange::TChangeRecord::TDataChange::kReset: + SerializeJsonValue(Schema, value["reset"], body.GetReset()); + break; + case NKikimrChangeExchange::TChangeRecord::TDataChange::kErase: + value["erase"].SetType(NJson::EJsonValueType::JSON_MAP); + break; + default: + Y_FAIL_S("Unexpected row operation: " << static_cast<int>(body.GetRowOperationCase())); + } + } + + break; + } + + case EKind::AsyncIndex: { + Y_FAIL("Not supported"); + } + } +} + TString TChangeRecord::ToString() const { TString result; TStringOutput out(result); @@ -103,11 +260,21 @@ TChangeRecordBuilder& TChangeRecordBuilder::WithPathId(const TPathId& pathId) { return *this; } +TChangeRecordBuilder& TChangeRecordBuilder::WithTableId(const TPathId& tableId) { + Record.TableId = tableId; + return *this; +} + TChangeRecordBuilder& TChangeRecordBuilder::WithSchemaVersion(ui64 version) { Record.SchemaVersion = version; return *this; } +TChangeRecordBuilder& TChangeRecordBuilder::WithSchema(TUserTable::TCPtr schema) { + Record.Schema = schema; + return *this; +} + TChangeRecordBuilder& TChangeRecordBuilder::WithBody(const TString& body) { Record.Body = body; return *this; diff --git a/ydb/core/tx/datashard/change_record.h b/ydb/core/tx/datashard/change_record.h index 96b0ceb4f46..fae0f793732 100644 --- a/ydb/core/tx/datashard/change_record.h +++ b/ydb/core/tx/datashard/change_record.h @@ -1,5 +1,9 @@ #pragma once +#include "datashard_user_table.h" + +#include <library/cpp/json/json_value.h> + #include <ydb/core/base/pathid.h> #include <ydb/core/scheme/scheme_tablecell.h> @@ -30,13 +34,16 @@ public: ui64 GetStep() const { return Step; } ui64 GetTxId() const { return TxId; } const TPathId& GetPathId() const { return PathId; } - ui64 GetSchemaVersion() const { return SchemaVersion; } EKind GetKind() const { return Kind; } const TString& GetBody() const { return Body; } i64 GetSeqNo() const; TConstArrayRef<TCell> GetKey() const; + const TPathId& GetTableId() const { return TableId; } + ui64 GetSchemaVersion() const { return SchemaVersion; } + void SerializeTo(NKikimrChangeExchange::TChangeRecord& record) const; + void SerializeTo(NJson::TJsonValue& key, NJson::TJsonValue& value) const; TString ToString() const; void Out(IOutputStream& out) const; @@ -47,10 +54,13 @@ private: ui64 Step; ui64 TxId; TPathId PathId; - ui64 SchemaVersion; EKind Kind; TString Body; + ui64 SchemaVersion; + TPathId TableId; + TUserTable::TCPtr Schema; + mutable TMaybe<TOwnedCellVec> Key; }; // TChangeRecord @@ -66,7 +76,10 @@ public: TChangeRecordBuilder& WithStep(ui64 step); TChangeRecordBuilder& WithTxId(ui64 txId); TChangeRecordBuilder& WithPathId(const TPathId& pathId); + + TChangeRecordBuilder& WithTableId(const TPathId& tableId); TChangeRecordBuilder& WithSchemaVersion(ui64 version); + TChangeRecordBuilder& WithSchema(TUserTable::TCPtr schema); TChangeRecordBuilder& WithBody(const TString& body); TChangeRecordBuilder& WithBody(TString&& body); diff --git a/ydb/core/tx/datashard/change_sender_cdc_stream.cpp b/ydb/core/tx/datashard/change_sender_cdc_stream.cpp index 288f6a8526f..3533a4997d1 100644 --- a/ydb/core/tx/datashard/change_sender_cdc_stream.cpp +++ b/ydb/core/tx/datashard/change_sender_cdc_stream.cpp @@ -5,6 +5,7 @@ #include <library/cpp/actors/core/actor_bootstrapped.h> #include <library/cpp/actors/core/hfunc.h> #include <library/cpp/actors/core/log.h> +#include <library/cpp/json/json_writer.h> #include <ydb/core/persqueue/partition_key_range/partition_key_range.h> #include <ydb/core/persqueue/writer/source_id_encoding.h> @@ -85,23 +86,46 @@ class TCdcChangeSenderPartition: public TActorBootstrapped<TCdcChangeSenderParti continue; } - const auto createdAt = TInstant::FromValue(record.GetGroup()); - - NKikimrChangeExchange::TChangeRecord protoRecord; - record.SerializeTo(protoRecord); + const auto createdAt = record.GetGroup() + ? TInstant::FromValue(record.GetGroup()) + : TInstant::MilliSeconds(record.GetStep()); NKikimrPQClient::TDataChunk data; data.SetSeqNo(record.GetSeqNo()); data.SetCreateTime(createdAt.MilliSeconds()); data.SetCodec(CodecRaw); - data.SetData(protoRecord.SerializeAsString()); // TODO: meta? auto& cmd = *request.MutablePartitionRequest()->AddCmdWrite(); cmd.SetSeqNo(record.GetSeqNo()); cmd.SetSourceId(NSourceIdEncoding::EncodeSimple(SourceId)); cmd.SetCreateTimeMS(createdAt.MilliSeconds()); - cmd.SetData(data.SerializeAsString()); + + switch (Format) { + case NKikimrSchemeOp::ECdcStreamFormatProto: { + NKikimrChangeExchange::TChangeRecord protoRecord; + record.SerializeTo(protoRecord); + data.SetData(protoRecord.SerializeAsString()); + cmd.SetData(data.SerializeAsString()); + break; + } + + case NKikimrSchemeOp::ECdcStreamFormatJson: { + NJson::TJsonValue key; + NJson::TJsonValue value; + record.SerializeTo(key, value); + data.SetData(WriteJson(value, false)); + cmd.SetData(data.SerializeAsString()); + cmd.SetPartitionKey(WriteJson(key, false)); + break; + } + + default: { + LOG_E("Unknown format" + << ": format# " << static_cast<int>(Format)); + return Leave(); + } + } Pending.push_back(record.GetSeqNo()); } @@ -193,11 +217,17 @@ public: return NKikimrServices::TActivity::CHANGE_SENDER_CDC_ACTOR_PARTITION; } - explicit TCdcChangeSenderPartition(const TActorId& parent, const TDataShardId& dataShard, ui32 partitionId, ui64 shardId) + explicit TCdcChangeSenderPartition( + const TActorId& parent, + const TDataShardId& dataShard, + ui32 partitionId, + ui64 shardId, + NKikimrSchemeOp::ECdcStreamFormat format) : Parent(parent) , DataShard(dataShard) , PartitionId(partitionId) , ShardId(shardId) + , Format(format) , SourceId(ToString(DataShard.TabletId)) { } @@ -222,6 +252,7 @@ private: const TDataShardId DataShard; const ui32 PartitionId; const ui64 ShardId; + const NKikimrSchemeOp::ECdcStreamFormat Format; const TString SourceId; mutable TMaybe<TString> LogPrefix; @@ -374,6 +405,15 @@ class TCdcChangeSenderMain: public TActorBootstrapped<TCdcChangeSenderMain> return Check(&TSchemeCacheHelpers::CheckEntryKind<TNavigate::TEntry>, &TThis::LogWarnAndRetry, entry, expected); } + bool CheckNotEmpty(const TIntrusiveConstPtr<TNavigate::TCdcStreamInfo>& streamInfo) { + if (streamInfo) { + return true; + } + + LogCritAndRetry(TStringBuilder() << "Empty stream info at '" << CurrentStateName() << "'"); + return false; + } + bool CheckNotEmpty(const TIntrusiveConstPtr<TNavigate::TPQGroupInfo>& pqInfo) { if (pqInfo) { return true; @@ -440,6 +480,12 @@ class TCdcChangeSenderMain: public TActorBootstrapped<TCdcChangeSenderMain> return; } + if (!CheckNotEmpty(entry.CdcStreamInfo)) { + return; + } + + Format = entry.CdcStreamInfo->Description.GetFormat(); + Y_VERIFY(entry.ListNodeEntry->Children.size() == 1); const auto& topic = entry.ListNodeEntry->Children.at(0); @@ -597,7 +643,7 @@ class TCdcChangeSenderMain: public TActorBootstrapped<TCdcChangeSenderMain> IActor* CreateSender(ui64 partitionId) override { Y_VERIFY(PartitionToShard.contains(partitionId)); const auto shardId = PartitionToShard.at(partitionId); - return new TCdcChangeSenderPartition(SelfId(), DataShard, partitionId, shardId); + return new TCdcChangeSenderPartition(SelfId(), DataShard, partitionId, shardId, Format); } void Handle(TEvChangeExchange::TEvEnqueueRecords::TPtr& ev) { @@ -668,6 +714,7 @@ public: private: mutable TMaybe<TString> LogPrefix; + NKikimrSchemeOp::ECdcStreamFormat Format; TPathId TopicPathId; THolder<TKeyDesc> KeyDesc; THashMap<ui32, ui64> PartitionToShard; diff --git a/ydb/core/tx/datashard/create_cdc_stream_unit.cpp b/ydb/core/tx/datashard/create_cdc_stream_unit.cpp index 8f577f03b4c..1b9b57707b6 100644 --- a/ydb/core/tx/datashard/create_cdc_stream_unit.cpp +++ b/ydb/core/tx/datashard/create_cdc_stream_unit.cpp @@ -44,20 +44,14 @@ public: auto tableInfo = DataShard.AlterTableAddCdcStream(ctx, txc, pathId, version, streamDesc); DataShard.AddUserTable(pathId, tableInfo); + if (tableInfo->NeedSchemaSnapshots()) { + DataShard.AddSchemaSnapshot(pathId, version, op->GetStep(), op->GetTxId(), txc, ctx); + } + AddSender.Reset(new TEvChangeExchange::TEvAddSender( pathId, TEvChangeExchange::ESenderType::CdcStream, streamPathId )); - /* TODO(ilnaz) - if (streamDesc.GetFormat() == NKikimrSchemeOp::ECdcStreamFormatJson) { - auto& manager = DataShard.GetSchemaSnapshotManager(); - - const auto key = TSchemaSnapshotKey(pathId.OwnerId, pathId.LocalPathId, version); - manager.AddSnapshot(txc.DB, key, TSchemaSnapshot(tableInfo, op->GetStep(), op->GetTxId())); - manager.AcquireReference(key); - } - */ - BuildResult(op, NKikimrTxDataShard::TEvProposeTransactionResult::COMPLETE); op->Result()->SetStepOrderId(op->GetStepOrder().ToPair()); diff --git a/ydb/core/tx/datashard/datashard.cpp b/ydb/core/tx/datashard/datashard.cpp index e636d9c9143..a3fcc458681 100644 --- a/ydb/core/tx/datashard/datashard.cpp +++ b/ydb/core/tx/datashard/datashard.cpp @@ -468,7 +468,9 @@ void TDataShard::PersistChangeRecord(NIceDb::TNiceDb& db, const TChangeRecord& r NIceDb::TUpdate<Schema::ChangeRecords::PathOwnerId>(record.GetPathId().OwnerId), NIceDb::TUpdate<Schema::ChangeRecords::LocalPathId>(record.GetPathId().LocalPathId), NIceDb::TUpdate<Schema::ChangeRecords::BodySize>(record.GetBody().size()), - NIceDb::TUpdate<Schema::ChangeRecords::SchemaVersion>(record.GetSchemaVersion())); + NIceDb::TUpdate<Schema::ChangeRecords::SchemaVersion>(record.GetSchemaVersion()), + NIceDb::TUpdate<Schema::ChangeRecords::TableOwnerId>(record.GetTableId().OwnerId), + NIceDb::TUpdate<Schema::ChangeRecords::TablePathId>(record.GetTableId().LocalPathId)); db.Table<Schema::ChangeRecordDetails>().Key(record.GetOrder()).Update( NIceDb::TUpdate<Schema::ChangeRecordDetails::Kind>(record.GetKind()), NIceDb::TUpdate<Schema::ChangeRecordDetails::Body>(record.GetBody())); @@ -490,18 +492,45 @@ void TDataShard::RemoveChangeRecord(NIceDb::TNiceDb& db, ui64 order) { << ": order: " << order << ", at tablet: " << TabletID()); + db.Table<Schema::ChangeRecords>().Key(order).Delete(); + db.Table<Schema::ChangeRecordDetails>().Key(order).Delete(); + auto it = ChangesQueue.find(order); - if (it != ChangesQueue.end()) { - Y_VERIFY(it->second <= ChangesQueueBytes); - ChangesQueueBytes -= it->second; - ChangesQueue.erase(it); + if (it == ChangesQueue.end()) { + Y_VERIFY_DEBUG_S(false, "Trying to remove non-enqueud record: " << order); + return; } - db.Table<Schema::ChangeRecords>().Key(order).Delete(); - db.Table<Schema::ChangeRecordDetails>().Key(order).Delete(); + const auto& record = it->second; + + Y_VERIFY(record.BodySize <= ChangesQueueBytes); + ChangesQueueBytes -= record.BodySize; + + if (record.SchemaSnapshotAcquired) { + Y_VERIFY(record.TableId); + auto tableIt = TableInfos.find(record.TableId.LocalPathId); + + if (tableIt != TableInfos.end()) { + const auto snapshotKey = TSchemaSnapshotKey(record.TableId, record.SchemaVersion); + const bool last = SchemaSnapshotManager.ReleaseReference(snapshotKey); + + if (last) { + const auto* snapshot = SchemaSnapshotManager.FindSnapshot(snapshotKey); + Y_VERIFY(snapshot); + + if (snapshot->Schema->GetTableSchemaVersion() < tableIt->second->GetTableSchemaVersion()) { + SchemaSnapshotManager.RemoveShapshot(db, snapshotKey); + } + } + } else { + Y_VERIFY_DEBUG(State == TShardState::PreOffline); + } + } + + ChangesQueue.erase(it); } -void TDataShard::EnqueueChangeRecords(TVector<TEvChangeExchange::TEvEnqueueRecords::TRecordInfo>&& records) { +void TDataShard::EnqueueChangeRecords(TVector<NMiniKQL::IChangeCollector::TChange>&& records) { if (!records) { return; } @@ -510,15 +539,23 @@ void TDataShard::EnqueueChangeRecords(TVector<TEvChangeExchange::TEvEnqueueRecor << ": at tablet: " << TabletID() << ", records: " << JoinSeq(", ", records)); + TVector<TEvChangeExchange::TEvEnqueueRecords::TRecordInfo> forward(Reserve(records.size())); for (const auto& record : records) { - if (ChangesQueue.emplace(record.Order, record.BodySize).second) { - Y_VERIFY(ChangesQueueBytes <= (Max<ui64>() - record.BodySize)); - ChangesQueueBytes += record.BodySize; + forward.emplace_back(record.Order(), record.PathId(), record.BodySize()); + + if (auto res = ChangesQueue.emplace(record.Order(), record); res.second) { + Y_VERIFY(ChangesQueueBytes <= (Max<ui64>() - record.BodySize())); + ChangesQueueBytes += record.BodySize(); + + if (record.SchemaVersion()) { + res.first->second.SchemaSnapshotAcquired = SchemaSnapshotManager.AcquireReference( + TSchemaSnapshotKey(record.TableId(), record.SchemaVersion())); + } } } Y_VERIFY(OutChangeSender); - Send(OutChangeSender, new TEvChangeExchange::TEvEnqueueRecords(std::move(records))); + Send(OutChangeSender, new TEvChangeExchange::TEvEnqueueRecords(std::move(forward))); } void TDataShard::CreateChangeSender(const TActorContext& ctx) { @@ -580,14 +617,14 @@ void TDataShard::KillChangeSender(const TActorContext& ctx) { } } -bool TDataShard::LoadChangeRecords(NIceDb::TNiceDb& db, TVector<TEvChangeExchange::TEvEnqueueRecords::TRecordInfo>& changeRecords) { +bool TDataShard::LoadChangeRecords(NIceDb::TNiceDb& db, TVector<NMiniKQL::IChangeCollector::TChange>& records) { using Schema = TDataShard::Schema; LOG_DEBUG_S(*TlsActivationContext, NKikimrServices::TX_DATASHARD, "LoadChangeRecords" << ": QueueSize: " << ChangesQueue.size() << ", at tablet: " << TabletID()); - changeRecords.reserve(ChangesQueue.size()); + records.reserve(ChangesQueue.size()); auto rowset = db.Table<Schema::ChangeRecords>().Range().Select(); if (!rowset.IsReady()) { @@ -597,12 +634,17 @@ bool TDataShard::LoadChangeRecords(NIceDb::TNiceDb& db, TVector<TEvChangeExchang while (!rowset.EndOfSet()) { const ui64 order = rowset.GetValue<Schema::ChangeRecords::Order>(); const ui64 bodySize = rowset.GetValue<Schema::ChangeRecords::BodySize>(); + const ui64 schemaVersion = rowset.GetValue<Schema::ChangeRecords::SchemaVersion>(); const auto pathId = TPathId( rowset.GetValue<Schema::ChangeRecords::PathOwnerId>(), rowset.GetValue<Schema::ChangeRecords::LocalPathId>() ); + const auto tableId = TPathId( + rowset.GetValue<Schema::ChangeRecords::TableOwnerId>(), + rowset.GetValue<Schema::ChangeRecords::TablePathId>() + ); - changeRecords.emplace_back(order, pathId, bodySize); + records.emplace_back(order, pathId, bodySize, tableId, schemaVersion); if (!rowset.Next()) { return false; } @@ -810,6 +852,24 @@ TUserTable::TPtr TDataShard::AlterTableDropCdcStream( return tableInfo; } +void TDataShard::AddSchemaSnapshot(const TPathId& pathId, ui64 tableSchemaVersion, ui64 step, ui64 txId, + TTransactionContext& txc, const TActorContext& ctx) +{ + LOG_DEBUG_S(ctx, NKikimrServices::TX_DATASHARD, "Add schema snapshot" + << ": pathId# " << pathId + << ", version# " << tableSchemaVersion + << ", step# " << step + << ", txId# " << txId + << ", at tablet# " << TabletID()); + + Y_VERIFY(GetPathOwnerId() == pathId.OwnerId); + Y_VERIFY(TableInfos.contains(pathId.LocalPathId)); + auto tableInfo = TableInfos[pathId.LocalPathId]; + + const auto key = TSchemaSnapshotKey(pathId.OwnerId, pathId.LocalPathId, tableSchemaVersion); + SchemaSnapshotManager.AddSnapshot(txc.DB, key, TSchemaSnapshot(tableInfo, step, txId)); +} + TUserTable::TPtr TDataShard::CreateUserTable(TTransactionContext& txc, const NKikimrSchemeOp::TTableDescription& tableScheme) { @@ -863,8 +923,8 @@ THashMap<TPathId, TPathId> TDataShard::GetRemapIndexes(const NKikimrTxDataShard: return remap; } -TUserTable::TPtr TDataShard::MoveUserTable(const TActorContext& ctx, TTransactionContext& txc, - const NKikimrTxDataShard::TMoveTable& move) +TUserTable::TPtr TDataShard::MoveUserTable(TOperation::TPtr op, const NKikimrTxDataShard::TMoveTable& move, + const TActorContext& ctx, TTransactionContext& txc) { auto prevId = TPathId(move.GetPathId().GetOwnerId(), move.GetPathId().GetLocalId()); auto newId = TPathId(move.GetDstPathId().GetOwnerId(), move.GetDstPathId().GetLocalId()); @@ -872,7 +932,10 @@ TUserTable::TPtr TDataShard::MoveUserTable(const TActorContext& ctx, TTransactio Y_VERIFY(GetPathOwnerId() == prevId.OwnerId); Y_VERIFY(TableInfos.contains(prevId.LocalPathId)); - auto newTableInfo = AlterTableSchemaVersion(ctx, txc, prevId, move.GetTableSchemaVersion(), false); + const auto version = move.GetTableSchemaVersion(); + Y_VERIFY(version); + + auto newTableInfo = AlterTableSchemaVersion(ctx, txc, prevId, version, false); newTableInfo->SetPath(move.GetDstPath()); Y_VERIFY(move.ReMapIndexesSize() == newTableInfo->Indexes.size()); @@ -898,6 +961,10 @@ TUserTable::TPtr TDataShard::MoveUserTable(const TActorContext& ctx, TTransactio RemoveUserTable(prevId); AddUserTable(newId, newTableInfo); + if (newTableInfo->NeedSchemaSnapshots()) { + AddSchemaSnapshot(newId, version, op->GetStep(), op->GetTxId(), txc, ctx); + } + NIceDb::TNiceDb db(txc.DB); PersistMoveUserTable(db, prevId.LocalPathId, newId.LocalPathId, *newTableInfo); PersistOwnerPathId(newId.OwnerId, txc); diff --git a/ydb/core/tx/datashard/datashard_change_sending.cpp b/ydb/core/tx/datashard/datashard_change_sending.cpp index 0f51e316416..3ef9d1b96e4 100644 --- a/ydb/core/tx/datashard/datashard_change_sending.cpp +++ b/ydb/core/tx/datashard/datashard_change_sending.cpp @@ -63,6 +63,20 @@ class TDataShard::TTxRequestChangeRecords: public TTransactionBase<TDataShard> { << ", it->Order: " << it->Order << ", it->BodySize: " << it->BodySize); + const auto schemaVersion = basic.GetValue<Schema::ChangeRecords::SchemaVersion>(); + TUserTable::TCPtr schema; + + if (schemaVersion) { + const auto tableId = TPathId( + basic.GetValue<Schema::ChangeRecords::TableOwnerId>(), + basic.GetValue<Schema::ChangeRecords::TablePathId>() + ); + const auto snapshotKey = TSchemaSnapshotKey(tableId, schemaVersion); + if (const auto* snapshot = Self->GetSchemaSnapshotManager().FindSnapshot(snapshotKey)) { + schema = snapshot->Schema; + } + } + RecordsToSend[recipient].emplace_back(TChangeRecordBuilder(details.GetValue<Schema::ChangeRecordDetails::Kind>()) .WithOrder(it->Order) .WithGroup(basic.GetValue<Schema::ChangeRecords::Group>()) @@ -72,7 +86,7 @@ class TDataShard::TTxRequestChangeRecords: public TTransactionBase<TDataShard> { basic.GetValue<Schema::ChangeRecords::PathOwnerId>(), basic.GetValue<Schema::ChangeRecords::LocalPathId>() )) - .WithSchemaVersion(basic.GetValue<Schema::ChangeRecords::SchemaVersion>()) + .WithSchema(schema) .WithBody(details.GetValue<Schema::ChangeRecordDetails::Body>()) .Build()); diff --git a/ydb/core/tx/datashard/datashard_impl.h b/ydb/core/tx/datashard/datashard_impl.h index 3056764635e..17eebf7693f 100644 --- a/ydb/core/tx/datashard/datashard_impl.h +++ b/ydb/core/tx/datashard/datashard_impl.h @@ -653,9 +653,22 @@ class TDataShard struct LocalPathId : Column<6, NScheme::NTypeIds::Uint64> {}; struct BodySize : Column<7, NScheme::NTypeIds::Uint64> {}; struct SchemaVersion : Column<8, NScheme::NTypeIds::Uint64> {}; + struct TableOwnerId : Column<9, NScheme::NTypeIds::Uint64> {}; + struct TablePathId : Column<10, NScheme::NTypeIds::Uint64> {}; using TKey = TableKey<Order>; - using TColumns = TableColumns<Order, Group, PlanStep, TxId, PathOwnerId, LocalPathId, BodySize, SchemaVersion>; + using TColumns = TableColumns< + Order, + Group, + PlanStep, + TxId, + PathOwnerId, + LocalPathId, + BodySize, + SchemaVersion, + TableOwnerId, + TablePathId + >; }; struct ChangeRecordDetails : Table<18> { @@ -1377,7 +1390,8 @@ public: TUserTable::TPtr AlterUserTable(const TActorContext& ctx, TTransactionContext& txc, const NKikimrSchemeOp::TTableDescription& tableScheme); static THashMap<TPathId, TPathId> GetRemapIndexes(const NKikimrTxDataShard::TMoveTable& move); - TUserTable::TPtr MoveUserTable(const TActorContext& ctx, TTransactionContext& txc, const NKikimrTxDataShard::TMoveTable& move); + TUserTable::TPtr MoveUserTable(TOperation::TPtr op, const NKikimrTxDataShard::TMoveTable& move, + const TActorContext& ctx, TTransactionContext& txc); void DropUserTable(TTransactionContext& txc, ui64 tableId); ui32 GetLastLocalTid() const { return LastLocalTid; } @@ -1387,12 +1401,12 @@ public: void PersistChangeRecord(NIceDb::TNiceDb& db, const TChangeRecord& record); void MoveChangeRecord(NIceDb::TNiceDb& db, ui64 order, const TPathId& pathId); void RemoveChangeRecord(NIceDb::TNiceDb& db, ui64 order); - void EnqueueChangeRecords(TVector<TEvChangeExchange::TEvEnqueueRecords::TRecordInfo>&& records); + void EnqueueChangeRecords(TVector<NMiniKQL::IChangeCollector::TChange>&& records); void CreateChangeSender(const TActorContext& ctx); void KillChangeSender(const TActorContext& ctx); void MaybeActivateChangeSender(const TActorContext& ctx); const TActorId& GetChangeSender() const { return OutChangeSender; } - bool LoadChangeRecords(NIceDb::TNiceDb& db, TVector<TEvChangeExchange::TEvEnqueueRecords::TRecordInfo>& changeRecords); + bool LoadChangeRecords(NIceDb::TNiceDb& db, TVector<NMiniKQL::IChangeCollector::TChange>& records); static void PersistSchemeTxResult(NIceDb::TNiceDb &db, const TSchemaOperation& op); @@ -1405,6 +1419,8 @@ public: TSchemaSnapshotManager& GetSchemaSnapshotManager() { return SchemaSnapshotManager; } const TSchemaSnapshotManager& GetSchemaSnapshotManager() const { return SchemaSnapshotManager; } + void AddSchemaSnapshot(const TPathId& pathId, ui64 tableSchemaVersion, ui64 step, ui64 txId, + TTransactionContext& txc, const TActorContext& ctx); template <typename... Args> bool PromoteCompleteEdge(Args&&... args) { @@ -2046,6 +2062,26 @@ private: } }; + struct TEnqueuedRecord { + ui64 BodySize; + TPathId TableId; + ui64 SchemaVersion; + bool SchemaSnapshotAcquired; + + explicit TEnqueuedRecord(ui64 bodySize, const TPathId& tableId, ui64 schemaVersion) + : BodySize(bodySize) + , TableId(tableId) + , SchemaVersion(schemaVersion) + , SchemaSnapshotAcquired(false) + { + } + + explicit TEnqueuedRecord(const NMiniKQL::IChangeCollector::TChange& record) + : TEnqueuedRecord(record.BodySize(), record.TableId(), record.SchemaVersion()) + { + } + }; + using TRequestedRecord = TEvChangeExchange::TEvRequestRecords::TRecordInfo; // split/merge @@ -2058,7 +2094,7 @@ private: TSet<ui64> ChangeRecordsToRemove; // ui64 is order bool RequestChangeRecordsInFly = false; bool RemoveChangeRecordsInFly = false; - THashMap<ui64, ui64> ChangesQueue; // order to size + THashMap<ui64, TEnqueuedRecord> ChangesQueue; // ui64 is order ui64 ChangesQueueBytes = 0; TActorId OutChangeSender; diff --git a/ydb/core/tx/datashard/datashard_schema_snapshots.cpp b/ydb/core/tx/datashard/datashard_schema_snapshots.cpp index d4c7ab05106..80e865701fe 100644 --- a/ydb/core/tx/datashard/datashard_schema_snapshots.cpp +++ b/ydb/core/tx/datashard/datashard_schema_snapshots.cpp @@ -83,6 +83,16 @@ const TSchemaSnapshot* TSchemaSnapshotManager::FindSnapshot(const TSchemaSnapsho return Snapshots.FindPtr(key); } +void TSchemaSnapshotManager::RemoveShapshot(NIceDb::TNiceDb& db, const TSchemaSnapshotKey& key) { + auto it = Snapshots.find(key); + if (it == Snapshots.end()) { + return; + } + + Snapshots.erase(it); + PersistRemoveSnapshot(db, key); +} + bool TSchemaSnapshotManager::AcquireReference(const TSchemaSnapshotKey& key) { auto it = Snapshots.find(key); if (it == Snapshots.end()) { @@ -93,7 +103,7 @@ bool TSchemaSnapshotManager::AcquireReference(const TSchemaSnapshotKey& key) { return true; } -bool TSchemaSnapshotManager::ReleaseReference(const TSchemaSnapshotKey& key, NIceDb::TNiceDb& db) { +bool TSchemaSnapshotManager::ReleaseReference(const TSchemaSnapshotKey& key) { auto refIt = References.find(key); if (refIt == References.end() || refIt->second <= 0) { @@ -113,7 +123,6 @@ bool TSchemaSnapshotManager::ReleaseReference(const TSchemaSnapshotKey& key, NIc return false; } - PersistRemoveSnapshot(db, key); return true; } diff --git a/ydb/core/tx/datashard/datashard_schema_snapshots.h b/ydb/core/tx/datashard/datashard_schema_snapshots.h index a8218a22591..d8e137fecfe 100644 --- a/ydb/core/tx/datashard/datashard_schema_snapshots.h +++ b/ydb/core/tx/datashard/datashard_schema_snapshots.h @@ -31,9 +31,10 @@ public: bool AddSnapshot(NTable::TDatabase& db, const TSchemaSnapshotKey& key, const TSchemaSnapshot& snapshot); const TSchemaSnapshot* FindSnapshot(const TSchemaSnapshotKey& key) const; + void RemoveShapshot(NIceDb::TNiceDb& db, const TSchemaSnapshotKey& key); bool AcquireReference(const TSchemaSnapshotKey& key); - bool ReleaseReference(const TSchemaSnapshotKey& key, NIceDb::TNiceDb& db); + bool ReleaseReference(const TSchemaSnapshotKey& key); private: void PersistAddSnapshot(NIceDb::TNiceDb& db, const TSchemaSnapshotKey& key, const TSchemaSnapshot& snapshot); diff --git a/ydb/core/tx/datashard/datashard_txs.h b/ydb/core/tx/datashard/datashard_txs.h index 4a9ab008789..c221c4dfeb0 100644 --- a/ydb/core/tx/datashard/datashard_txs.h +++ b/ydb/core/tx/datashard/datashard_txs.h @@ -47,7 +47,7 @@ private: bool CreateScheme(TTransactionContext &txc); bool ReadEverything(TTransactionContext &txc); private: - TVector<TEvChangeExchange::TEvEnqueueRecords::TRecordInfo> ChangeRecords; + TVector<NMiniKQL::IChangeCollector::TChange> ChangeRecords; }; class TDataShard::TTxPlanStep : public NTabletFlatExecutor::TTransactionBase<TDataShard> { diff --git a/ydb/core/tx/datashard/datashard_user_table.cpp b/ydb/core/tx/datashard/datashard_user_table.cpp index 70bc9400733..b97cd34012b 100644 --- a/ydb/core/tx/datashard/datashard_user_table.cpp +++ b/ydb/core/tx/datashard/datashard_user_table.cpp @@ -124,6 +124,7 @@ void TUserTable::AddCdcStream(const NKikimrSchemeOp::TCdcStreamDescription& stre } CdcStreams.emplace(streamPathId, TCdcStream(streamDesc)); + JsonCdcStreamCount += ui32(streamDesc.GetFormat() == TCdcStream::EFormat::ECdcStreamFormatJson); NKikimrSchemeOp::TTableDescription schema; GetSchema(schema); @@ -163,6 +164,7 @@ void TUserTable::DropCdcStream(const TPathId& streamPathId) { return; } + JsonCdcStreamCount -= ui32(it->second.Format == TCdcStream::EFormat::ECdcStreamFormatJson); CdcStreams.erase(it); NKikimrSchemeOp::TTableDescription schema; @@ -186,6 +188,10 @@ bool TUserTable::HasCdcStreams() const { return !CdcStreams.empty(); } +bool TUserTable::NeedSchemaSnapshots() const { + return JsonCdcStreamCount > 0; +} + void TUserTable::ParseProto(const NKikimrSchemeOp::TTableDescription& descr) { // We expect schemeshard to send us full list of storage rooms @@ -275,6 +281,7 @@ void TUserTable::ParseProto(const NKikimrSchemeOp::TTableDescription& descr) for (const auto& streamDesc : descr.GetCdcStreams()) { Y_VERIFY(streamDesc.HasPathId()); CdcStreams.emplace(TPathId(streamDesc.GetPathId().GetOwnerId(), streamDesc.GetPathId().GetLocalId()), TCdcStream(streamDesc)); + JsonCdcStreamCount += ui32(streamDesc.GetFormat() == TCdcStream::EFormat::ECdcStreamFormatJson); } } diff --git a/ydb/core/tx/datashard/datashard_user_table.h b/ydb/core/tx/datashard/datashard_user_table.h index 0dab30cb84f..962ef2ead3e 100644 --- a/ydb/core/tx/datashard/datashard_user_table.h +++ b/ydb/core/tx/datashard/datashard_user_table.h @@ -284,10 +284,12 @@ struct TUserTable : public TThrRefBase { struct TCdcStream { using EMode = NKikimrSchemeOp::ECdcStreamMode; + using EFormat = NKikimrSchemeOp::ECdcStreamFormat; using EState = NKikimrSchemeOp::ECdcStreamState; TString Name; EMode Mode; + EFormat Format; EState State; TCdcStream() = default; @@ -295,6 +297,7 @@ struct TUserTable : public TThrRefBase { TCdcStream(const NKikimrSchemeOp::TCdcStreamDescription& streamDesc) : Name(streamDesc.GetName()) , Mode(streamDesc.GetMode()) + , Format(streamDesc.GetFormat()) , State(streamDesc.GetState()) { } @@ -357,6 +360,7 @@ struct TUserTable : public TThrRefBase { TMap<TPathId, TTableIndex> Indexes; TMap<TPathId, TCdcStream> CdcStreams; ui32 AsyncIndexCount = 0; + ui32 JsonCdcStreamCount = 0; // Tablet thread access only, updated in-place mutable TStats Stats; @@ -407,6 +411,7 @@ struct TUserTable : public TThrRefBase { void DisableCdcStream(const TPathId& streamPathId); void DropCdcStream(const TPathId& streamPathId); bool HasCdcStreams() const; + bool NeedSchemaSnapshots() const; private: void DoApplyCreate(NTabletFlatExecutor::TTransactionContext& txc, const TString& tableName, bool shadow, diff --git a/ydb/core/tx/datashard/direct_tx_unit.cpp b/ydb/core/tx/datashard/direct_tx_unit.cpp index 1284b1f6ad6..d477ca5514e 100644 --- a/ydb/core/tx/datashard/direct_tx_unit.cpp +++ b/ydb/core/tx/datashard/direct_tx_unit.cpp @@ -38,12 +38,7 @@ public: return EExecutionStatus::Restart; } - if (auto changes = tx->GetCollectedChanges()) { - op->ChangeRecords().reserve(changes.size()); - for (const auto& change : changes) { - op->ChangeRecords().emplace_back(change.Order(), change.PathId(), change.BodySize()); - } - } + op->ChangeRecords() = std::move(tx->GetCollectedChanges()); DataShard.SysLocksTable().ApplyLocks(); Pipeline.AddCommittingOp(op); diff --git a/ydb/core/tx/datashard/drop_cdc_stream_unit.cpp b/ydb/core/tx/datashard/drop_cdc_stream_unit.cpp index 039a379a52b..048dd40874f 100644 --- a/ydb/core/tx/datashard/drop_cdc_stream_unit.cpp +++ b/ydb/core/tx/datashard/drop_cdc_stream_unit.cpp @@ -37,9 +37,16 @@ public: const auto streamPathId = TPathId(params.GetStreamPathId().GetOwnerId(), params.GetStreamPathId().GetLocalId()); Y_VERIFY(streamPathId.OwnerId == DataShard.GetPathOwnerId()); - auto tableInfo = DataShard.AlterTableDropCdcStream(ctx, txc, pathId, params.GetTableSchemaVersion(), streamPathId); + const auto version = params.GetTableSchemaVersion(); + Y_VERIFY(version); + + auto tableInfo = DataShard.AlterTableDropCdcStream(ctx, txc, pathId, version, streamPathId); DataShard.AddUserTable(pathId, tableInfo); + if (tableInfo->NeedSchemaSnapshots()) { + DataShard.AddSchemaSnapshot(pathId, version, op->GetStep(), op->GetTxId(), txc, ctx); + } + RemoveSender.Reset(new TEvChangeExchange::TEvRemoveSender(streamPathId)); BuildResult(op, NKikimrTxDataShard::TEvProposeTransactionResult::COMPLETE); diff --git a/ydb/core/tx/datashard/drop_index_notice_unit.cpp b/ydb/core/tx/datashard/drop_index_notice_unit.cpp index 43110699580..d1c877dccf4 100644 --- a/ydb/core/tx/datashard/drop_index_notice_unit.cpp +++ b/ydb/core/tx/datashard/drop_index_notice_unit.cpp @@ -33,6 +33,9 @@ public: auto pathId = TPathId(params.GetPathId().GetOwnerId(), params.GetPathId().GetLocalId()); Y_VERIFY(pathId.OwnerId == DataShard.GetPathOwnerId()); + const auto version = params.GetTableSchemaVersion(); + Y_VERIFY(version); + TUserTable::TPtr tableInfo; if (params.HasIndexPathId()) { auto indexPathId = TPathId(params.GetIndexPathId().GetOwnerId(), params.GetIndexPathId().GetLocalId()); @@ -46,14 +49,18 @@ public: RemoveSender.Reset(new TEvChangeExchange::TEvRemoveSender(indexPathId)); } - tableInfo = DataShard.AlterTableDropIndex(ctx, txc, pathId, params.GetTableSchemaVersion(), indexPathId); + tableInfo = DataShard.AlterTableDropIndex(ctx, txc, pathId, version, indexPathId); } else { - tableInfo = DataShard.AlterTableSchemaVersion(ctx, txc, pathId, params.GetTableSchemaVersion()); + tableInfo = DataShard.AlterTableSchemaVersion(ctx, txc, pathId, version); } Y_VERIFY(tableInfo); DataShard.AddUserTable(pathId, tableInfo); + if (tableInfo->NeedSchemaSnapshots()) { + DataShard.AddSchemaSnapshot(pathId, version, op->GetStep(), op->GetTxId(), txc, ctx); + } + BuildResult(op, NKikimrTxDataShard::TEvProposeTransactionResult::COMPLETE); op->Result()->SetStepOrderId(op->GetStepOrder().ToPair()); diff --git a/ydb/core/tx/datashard/execute_data_tx_unit.cpp b/ydb/core/tx/datashard/execute_data_tx_unit.cpp index b32f635cc0f..a34599d2eac 100644 --- a/ydb/core/tx/datashard/execute_data_tx_unit.cpp +++ b/ydb/core/tx/datashard/execute_data_tx_unit.cpp @@ -240,12 +240,7 @@ void TExecuteDataTxUnit::ExecuteDataTx(TOperation::TPtr op, if (op->IsImmediate() && !op->IsReadOnly()) DataShard.PromoteCompleteEdge(writeVersion.Step, txc); - if (auto changes = tx->GetDataTx()->GetCollectedChanges()) { - op->ChangeRecords().reserve(changes.size()); - for (const auto& change : changes) { - op->ChangeRecords().emplace_back(change.Order(), change.PathId(), change.BodySize()); - } - } + op->ChangeRecords() = std::move(tx->GetDataTx()->GetCollectedChanges()); } LOG_TRACE_S(ctx, NKikimrServices::TX_DATASHARD, diff --git a/ydb/core/tx/datashard/execute_distributed_erase_tx_unit.cpp b/ydb/core/tx/datashard/execute_distributed_erase_tx_unit.cpp index 3c9a86e06b4..9b62c04a72c 100644 --- a/ydb/core/tx/datashard/execute_distributed_erase_tx_unit.cpp +++ b/ydb/core/tx/datashard/execute_distributed_erase_tx_unit.cpp @@ -54,12 +54,7 @@ public: return EExecutionStatus::Restart; } - if (auto changes = changeCollector->GetCollected()) { - op->ChangeRecords().reserve(changes.size()); - for (const auto& change : changes) { - op->ChangeRecords().emplace_back(change.Order(), change.PathId(), change.BodySize()); - } - } + op->ChangeRecords() = std::move(changeCollector->GetCollected()); } else if (eraseTx->HasDependencies()) { THashMap<ui64, TDynBitMap> presentRows; for (const auto& dependency : eraseTx->GetDependencies()) { diff --git a/ydb/core/tx/datashard/execute_kqp_data_tx_unit.cpp b/ydb/core/tx/datashard/execute_kqp_data_tx_unit.cpp index 946e035f821..16e109e6b9e 100644 --- a/ydb/core/tx/datashard/execute_kqp_data_tx_unit.cpp +++ b/ydb/core/tx/datashard/execute_kqp_data_tx_unit.cpp @@ -156,12 +156,7 @@ EExecutionStatus TExecuteKqpDataTxUnit::Execute(TOperation::TPtr op, TTransactio DataShard.PromoteCompleteEdge(writeVersion.Step, txc); } - if (auto changes = dataTx->GetCollectedChanges()) { - op->ChangeRecords().reserve(changes.size()); - for (const auto& change : changes) { - op->ChangeRecords().emplace_back(change.Order(), change.PathId(), change.BodySize()); - } - } + op->ChangeRecords() = std::move(dataTx->GetCollectedChanges()); KqpUpdateDataShardStatCounters(DataShard, dataTx->GetCounters()); auto statsMode = kqpTx.GetRuntimeSettings().GetStatsMode(); diff --git a/ydb/core/tx/datashard/finalize_build_index_unit.cpp b/ydb/core/tx/datashard/finalize_build_index_unit.cpp index 6f8a17ec503..778676ec984 100644 --- a/ydb/core/tx/datashard/finalize_build_index_unit.cpp +++ b/ydb/core/tx/datashard/finalize_build_index_unit.cpp @@ -31,9 +31,16 @@ public: auto pathId = TPathId(params.GetPathId().GetOwnerId(), params.GetPathId().GetLocalId()); Y_VERIFY(pathId.OwnerId == DataShard.GetPathOwnerId()); - auto tableInfo = DataShard.AlterTableSchemaVersion(ctx, txc, pathId, params.GetTableSchemaVersion()); + const auto version = params.GetTableSchemaVersion(); + Y_VERIFY(version); + + auto tableInfo = DataShard.AlterTableSchemaVersion(ctx, txc, pathId, version); DataShard.AddUserTable(pathId, tableInfo); + if (tableInfo->NeedSchemaSnapshots()) { + DataShard.AddSchemaSnapshot(pathId, version, op->GetStep(), op->GetTxId(), txc, ctx); + } + ui64 step = params.GetSnapshotStep(); ui64 txId = params.GetSnapshotTxId(); Y_VERIFY(step != 0); diff --git a/ydb/core/tx/datashard/initiate_build_index_unit.cpp b/ydb/core/tx/datashard/initiate_build_index_unit.cpp index f52034aeea6..07e53261e2f 100644 --- a/ydb/core/tx/datashard/initiate_build_index_unit.cpp +++ b/ydb/core/tx/datashard/initiate_build_index_unit.cpp @@ -33,6 +33,9 @@ public: auto pathId = TPathId(params.GetPathId().GetOwnerId(), params.GetPathId().GetLocalId()); Y_VERIFY(pathId.OwnerId == DataShard.GetPathOwnerId()); + const auto version = params.GetTableSchemaVersion(); + Y_VERIFY(version); + TUserTable::TPtr tableInfo; if (params.HasIndexDescription()) { const auto& indexDesc = params.GetIndexDescription(); @@ -44,14 +47,18 @@ public: )); } - tableInfo = DataShard.AlterTableAddIndex(ctx, txc, pathId, params.GetTableSchemaVersion(), indexDesc); + tableInfo = DataShard.AlterTableAddIndex(ctx, txc, pathId, version, indexDesc); } else { - tableInfo = DataShard.AlterTableSchemaVersion(ctx, txc, pathId, params.GetTableSchemaVersion()); + tableInfo = DataShard.AlterTableSchemaVersion(ctx, txc, pathId, version); } Y_VERIFY(tableInfo); DataShard.AddUserTable(pathId, tableInfo); + if (tableInfo->NeedSchemaSnapshots()) { + DataShard.AddSchemaSnapshot(pathId, version, op->GetStep(), op->GetTxId(), txc, ctx); + } + ui64 step = tx->GetStep(); ui64 txId = tx->GetTxId(); Y_VERIFY(step != 0); diff --git a/ydb/core/tx/datashard/move_table_unit.cpp b/ydb/core/tx/datashard/move_table_unit.cpp index 975cb115482..ad4becbe1a5 100644 --- a/ydb/core/tx/datashard/move_table_unit.cpp +++ b/ydb/core/tx/datashard/move_table_unit.cpp @@ -15,13 +15,13 @@ public: return true; } - void MoveChangeRecords(NIceDb::TNiceDb& db, const NKikimrTxDataShard::TMoveTable& move, TVector<TEvChangeExchange::TEvEnqueueRecords::TRecordInfo>& changeRecords) { + void MoveChangeRecords(NIceDb::TNiceDb& db, const NKikimrTxDataShard::TMoveTable& move, TVector<NMiniKQL::IChangeCollector::TChange>& changeRecords) { const THashMap<TPathId, TPathId> remap = DataShard.GetRemapIndexes(move); for (auto& record: changeRecords) { - if (remap.contains(record.PathId)) { // here could be the records for already deleted indexes, so skip them - record.PathId = remap.at(record.PathId); - DataShard.MoveChangeRecord(db, record.Order, record.PathId); + if (remap.contains(record.PathId())) { // here could be the records for already deleted indexes, so skip them + record.SetPathId(remap.at(record.PathId())); + DataShard.MoveChangeRecord(db, record.Order(), record.PathId()); } } } @@ -42,7 +42,7 @@ public: } NIceDb::TNiceDb db(txc.DB); - TVector<TEvChangeExchange::TEvEnqueueRecords::TRecordInfo> changeRecords; + TVector<NMiniKQL::IChangeCollector::TChange> changeRecords; if (!DataShard.LoadChangeRecords(db, changeRecords)) { return EExecutionStatus::Restart; } @@ -56,7 +56,7 @@ public: DataShard.KillChangeSender(ctx); - DataShard.MoveUserTable(ctx, txc, params); + DataShard.MoveUserTable(op, params, ctx, txc); DataShard.CreateChangeSender(ctx); MoveChangeRecords(db, params, changeRecords); diff --git a/ydb/core/tx/datashard/operation.h b/ydb/core/tx/datashard/operation.h index 0f4b713e91c..3d330806c72 100644 --- a/ydb/core/tx/datashard/operation.h +++ b/ydb/core/tx/datashard/operation.h @@ -5,10 +5,10 @@ #include "datashard_locks.h" #include "datashard_outreadset.h" #include "datashard_snapshots.h" -#include "change_exchange.h" #include "execution_unit_kind.h" #include <ydb/core/engine/mkql_engine_flat.h> +#include <ydb/core/engine/minikql/change_collector_iface.h> #include <ydb/core/protos/tx_datashard.pb.h> #include <ydb/core/tablet_flat/tablet_flat_executor.h> #include <ydb/core/tx/balance_coverage/balance_coverage_builder.h> @@ -435,7 +435,7 @@ struct TOutputOpData { using TResultPtr = THolder<TEvDataShard::TEvProposeTransactionResult>; using TDelayedAcks = TVector<THolder<IEventHandle>>; using TOutReadSets = TMap<std::pair<ui64, ui64>, TString>; // source:target -> body - using TChangeRecord = TEvChangeExchange::TEvEnqueueRecords::TRecordInfo; + using TChangeRecord = NMiniKQL::IChangeCollector::TChange; TResultPtr Result; // ACKs to send on successful operation completion. diff --git a/ydb/core/tx/datashard/snapshot_key.h b/ydb/core/tx/datashard/snapshot_key.h index 7a742a6e8aa..63c1ea44d77 100644 --- a/ydb/core/tx/datashard/snapshot_key.h +++ b/ydb/core/tx/datashard/snapshot_key.h @@ -1,5 +1,7 @@ #pragma once +#include <ydb/core/base/pathid.h> + #include <util/generic/fwd.h> #include <tuple> @@ -90,6 +92,10 @@ struct TSchemaSnapshotKey , Version(version) { } + TSchemaSnapshotKey(const TPathId& pathId, ui64 version) + : TSchemaSnapshotKey(pathId.OwnerId, pathId.LocalPathId, version) + { } + auto ToTuple() const { return std::tuple_cat(TSnapshotTableKey::ToTuple(), std::make_tuple(Version)); } diff --git a/ydb/core/tx/datashard/ya.make b/ydb/core/tx/datashard/ya.make index 9c98e8e79b0..08264b072ae 100644 --- a/ydb/core/tx/datashard/ya.make +++ b/ydb/core/tx/datashard/ya.make @@ -207,6 +207,9 @@ PEERDIR( library/cpp/actors/core library/cpp/containers/flat_hash library/cpp/html/pcdata + library/cpp/json + library/cpp/json/yson + library/cpp/string_utils/base64 library/cpp/string_utils/quote ydb/core/actorlib_impl ydb/core/base diff --git a/ydb/tests/functional/scheme_tests/canondata/tablet_scheme_tests.TestTabletSchemes.test_tablet_schemes_flat_datashard_/flat_datashard.schema b/ydb/tests/functional/scheme_tests/canondata/tablet_scheme_tests.TestTabletSchemes.test_tablet_schemes_flat_datashard_/flat_datashard.schema index 90bf68adbef..05d887b602f 100644 --- a/ydb/tests/functional/scheme_tests/canondata/tablet_scheme_tests.TestTabletSchemes.test_tablet_schemes_flat_datashard_/flat_datashard.schema +++ b/ydb/tests/functional/scheme_tests/canondata/tablet_scheme_tests.TestTabletSchemes.test_tablet_schemes_flat_datashard_/flat_datashard.schema @@ -1010,6 +1010,16 @@ "ColumnId": 8, "ColumnName": "SchemaVersion", "ColumnType": "Uint64" + }, + { + "ColumnId": 9, + "ColumnName": "TableOwnerId", + "ColumnType": "Uint64" + }, + { + "ColumnId": 10, + "ColumnName": "TablePathId", + "ColumnType": "Uint64" } ], "ColumnsDropped": [], @@ -1023,7 +1033,9 @@ 5, 6, 7, - 8 + 8, + 9, + 10 ], "RoomID": 0, "Codec": 0, |