aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorIlnaz Nizametdinov <i.nizametdinov@gmail.com>2022-03-10 17:48:58 +0300
committerIlnaz Nizametdinov <i.nizametdinov@gmail.com>2022-03-10 17:48:58 +0300
commit8d01f8c26b6cd7b0d53bb6e75d34fba2702d338a (patch)
tree86989f7a102fa6029e1fcad851cc1a9cd2bf0610
parent4f7269728d68c309828901214776737f895b1109 (diff)
downloadydb-8d01f8c26b6cd7b0d53bb6e75d34fba2702d338a.tar.gz
CDC records in json format KIKIMR-14198
ref:2e21db5e45fb4048f6bbe49585393b7e877d0b46
-rw-r--r--CMakeLists.darwin.txt2
-rw-r--r--CMakeLists.linux.txt2
-rw-r--r--ydb/core/engine/minikql/change_collector_iface.h18
-rw-r--r--ydb/core/tx/datashard/CMakeLists.txt6
-rw-r--r--ydb/core/tx/datashard/alter_cdc_stream_unit.cpp10
-rw-r--r--ydb/core/tx/datashard/alter_table_unit.cpp10
-rw-r--r--ydb/core/tx/datashard/change_collector_base.cpp9
-rw-r--r--ydb/core/tx/datashard/change_record.cpp167
-rw-r--r--ydb/core/tx/datashard/change_record.h17
-rw-r--r--ydb/core/tx/datashard/change_sender_cdc_stream.cpp63
-rw-r--r--ydb/core/tx/datashard/create_cdc_stream_unit.cpp14
-rw-r--r--ydb/core/tx/datashard/datashard.cpp103
-rw-r--r--ydb/core/tx/datashard/datashard_change_sending.cpp16
-rw-r--r--ydb/core/tx/datashard/datashard_impl.h46
-rw-r--r--ydb/core/tx/datashard/datashard_schema_snapshots.cpp13
-rw-r--r--ydb/core/tx/datashard/datashard_schema_snapshots.h3
-rw-r--r--ydb/core/tx/datashard/datashard_txs.h2
-rw-r--r--ydb/core/tx/datashard/datashard_user_table.cpp7
-rw-r--r--ydb/core/tx/datashard/datashard_user_table.h5
-rw-r--r--ydb/core/tx/datashard/direct_tx_unit.cpp7
-rw-r--r--ydb/core/tx/datashard/drop_cdc_stream_unit.cpp9
-rw-r--r--ydb/core/tx/datashard/drop_index_notice_unit.cpp11
-rw-r--r--ydb/core/tx/datashard/execute_data_tx_unit.cpp7
-rw-r--r--ydb/core/tx/datashard/execute_distributed_erase_tx_unit.cpp7
-rw-r--r--ydb/core/tx/datashard/execute_kqp_data_tx_unit.cpp7
-rw-r--r--ydb/core/tx/datashard/finalize_build_index_unit.cpp9
-rw-r--r--ydb/core/tx/datashard/initiate_build_index_unit.cpp11
-rw-r--r--ydb/core/tx/datashard/move_table_unit.cpp12
-rw-r--r--ydb/core/tx/datashard/operation.h4
-rw-r--r--ydb/core/tx/datashard/snapshot_key.h6
-rw-r--r--ydb/core/tx/datashard/ya.make3
-rw-r--r--ydb/tests/functional/scheme_tests/canondata/tablet_scheme_tests.TestTabletSchemes.test_tablet_schemes_flat_datashard_/flat_datashard.schema14
32 files changed, 525 insertions, 95 deletions
diff --git a/CMakeLists.darwin.txt b/CMakeLists.darwin.txt
index e279fc2dc86..8db1ee0546f 100644
--- a/CMakeLists.darwin.txt
+++ b/CMakeLists.darwin.txt
@@ -585,6 +585,7 @@ add_subdirectory(ydb/core/sys_view/tablets)
add_subdirectory(ydb/core/tx/datashard)
add_subdirectory(library/cpp/containers/flat_hash)
add_subdirectory(library/cpp/containers/flat_hash/lib)
+add_subdirectory(library/cpp/json/yson)
add_subdirectory(ydb/core/persqueue/partition_key_range)
add_subdirectory(ydb/core/persqueue/writer)
add_subdirectory(ydb/core/persqueue/events)
@@ -612,7 +613,6 @@ add_subdirectory(ydb/library/yql/dq/actors/compute)
add_subdirectory(ydb/library/yql/dq/tasks)
add_subdirectory(ydb/core/tx/long_tx_service/public)
add_subdirectory(ydb/core/yq/libs/actors)
-add_subdirectory(library/cpp/json/yson)
add_subdirectory(ydb/core/yq/libs/actors/logging)
add_subdirectory(ydb/core/yq/libs/checkpointing)
add_subdirectory(ydb/core/yq/libs/checkpointing_common)
diff --git a/CMakeLists.linux.txt b/CMakeLists.linux.txt
index 01be75e7437..d956b9e31e9 100644
--- a/CMakeLists.linux.txt
+++ b/CMakeLists.linux.txt
@@ -666,6 +666,7 @@ add_subdirectory(ydb/core/sys_view/tablets)
add_subdirectory(ydb/core/tx/datashard)
add_subdirectory(library/cpp/containers/flat_hash)
add_subdirectory(library/cpp/containers/flat_hash/lib)
+add_subdirectory(library/cpp/json/yson)
add_subdirectory(ydb/core/persqueue/partition_key_range)
add_subdirectory(ydb/core/persqueue/writer)
add_subdirectory(ydb/core/persqueue/events)
@@ -693,7 +694,6 @@ add_subdirectory(ydb/library/yql/dq/actors/compute)
add_subdirectory(ydb/library/yql/dq/tasks)
add_subdirectory(ydb/core/tx/long_tx_service/public)
add_subdirectory(ydb/core/yq/libs/actors)
-add_subdirectory(library/cpp/json/yson)
add_subdirectory(ydb/core/yq/libs/actors/logging)
add_subdirectory(ydb/core/yq/libs/checkpointing)
add_subdirectory(ydb/core/yq/libs/checkpointing_common)
diff --git a/ydb/core/engine/minikql/change_collector_iface.h b/ydb/core/engine/minikql/change_collector_iface.h
index fe6019b300e..c96ffaa3ce9 100644
--- a/ydb/core/engine/minikql/change_collector_iface.h
+++ b/ydb/core/engine/minikql/change_collector_iface.h
@@ -9,12 +9,16 @@ namespace NMiniKQL {
class IChangeCollector {
public:
// basic change record's info
- struct TChange: public std::tuple<ui64, TPathId, ui64> {
- using std::tuple<ui64, TPathId, ui64>::tuple;
+ struct TChange: public std::tuple<ui64, TPathId, ui64, TPathId, ui64> {
+ using std::tuple<ui64, TPathId, ui64, TPathId, ui64>::tuple;
ui64 Order() const { return std::get<0>(*this); }
const TPathId& PathId() const { return std::get<1>(*this); }
ui64 BodySize() const { return std::get<2>(*this); }
+ const TPathId& TableId() const { return std::get<3>(*this); }
+ ui64 SchemaVersion() const { return std::get<4>(*this); }
+
+ void SetPathId(const TPathId& value) { std::get<1>(*this) = value; }
};
public:
@@ -34,3 +38,13 @@ public:
} // NMiniKQL
} // NKikimr
+
+Y_DECLARE_OUT_SPEC(inline, NKikimr::NMiniKQL::IChangeCollector::TChange, o, x) {
+ o << "{"
+ << " Order: " << x.Order()
+ << " PathId: " << x.PathId()
+ << " BodySize: " << x.BodySize()
+ << " TableId: " << x.TableId()
+ << " SchemaVersion: " << x.SchemaVersion()
+ << " }";
+}
diff --git a/ydb/core/tx/datashard/CMakeLists.txt b/ydb/core/tx/datashard/CMakeLists.txt
index a40708faaeb..842cf3224bf 100644
--- a/ydb/core/tx/datashard/CMakeLists.txt
+++ b/ydb/core/tx/datashard/CMakeLists.txt
@@ -19,6 +19,9 @@ target_link_libraries(core-tx-datashard PUBLIC
cpp-actors-core
cpp-containers-flat_hash
cpp-html-pcdata
+ library-cpp-json
+ cpp-json-yson
+ cpp-string_utils-base64
cpp-string_utils-quote
ydb-core-actorlib_impl
ydb-core-base
@@ -249,6 +252,9 @@ target_link_libraries(core-tx-datashard.global PUBLIC
cpp-actors-core
cpp-containers-flat_hash
cpp-html-pcdata
+ library-cpp-json
+ cpp-json-yson
+ cpp-string_utils-base64
cpp-string_utils-quote
ydb-core-actorlib_impl
ydb-core-base
diff --git a/ydb/core/tx/datashard/alter_cdc_stream_unit.cpp b/ydb/core/tx/datashard/alter_cdc_stream_unit.cpp
index b79d5017361..f55579154d2 100644
--- a/ydb/core/tx/datashard/alter_cdc_stream_unit.cpp
+++ b/ydb/core/tx/datashard/alter_cdc_stream_unit.cpp
@@ -38,9 +38,17 @@ public:
Y_VERIFY_S(streamDesc.GetState() == NKikimrSchemeOp::ECdcStreamStateDisabled, "Unexpected alter cdc stream"
<< ": desc# " << streamDesc.ShortDebugString());
- auto tableInfo = DataShard.AlterTableDisableCdcStream(ctx, txc, pathId, params.GetTableSchemaVersion(), streamPathId);
+
+ const auto version = params.GetTableSchemaVersion();
+ Y_VERIFY(version);
+
+ auto tableInfo = DataShard.AlterTableDisableCdcStream(ctx, txc, pathId, version, streamPathId);
DataShard.AddUserTable(pathId, tableInfo);
+ if (tableInfo->NeedSchemaSnapshots()) {
+ DataShard.AddSchemaSnapshot(pathId, version, op->GetStep(), op->GetTxId(), txc, ctx);
+ }
+
BuildResult(op, NKikimrTxDataShard::TEvProposeTransactionResult::COMPLETE);
op->Result()->SetStepOrderId(op->GetStepOrder().ToPair());
diff --git a/ydb/core/tx/datashard/alter_table_unit.cpp b/ydb/core/tx/datashard/alter_table_unit.cpp
index 78786adae78..34784d6f089 100644
--- a/ydb/core/tx/datashard/alter_table_unit.cpp
+++ b/ydb/core/tx/datashard/alter_table_unit.cpp
@@ -136,9 +136,12 @@ EExecutionStatus TAlterTableUnit::Execute(TOperation::TPtr op,
const auto& alterTableTx = schemeTx.GetAlterTable();
+ const auto version = alterTableTx.GetTableSchemaVersion();
+ Y_VERIFY(version);
+
LOG_INFO_S(ctx, NKikimrServices::TX_DATASHARD,
"Trying to ALTER TABLE at " << DataShard.TabletID()
- << " version " << alterTableTx.GetTableSchemaVersion());
+ << " version " << version);
TPathId tableId(DataShard.GetPathOwnerId(), alterTableTx.GetId_Deprecated());
if (alterTableTx.HasPathId()) {
@@ -148,9 +151,12 @@ EExecutionStatus TAlterTableUnit::Execute(TOperation::TPtr op,
}
TUserTable::TPtr info = DataShard.AlterUserTable(ctx, txc, alterTableTx);
-
DataShard.AddUserTable(tableId, info);
+ if (info->NeedSchemaSnapshots()) {
+ DataShard.AddSchemaSnapshot(tableId, version, op->GetStep(), op->GetTxId(), txc, ctx);
+ }
+
BuildResult(op, NKikimrTxDataShard::TEvProposeTransactionResult::COMPLETE);
op->Result()->SetStepOrderId(op->GetStepOrder().ToPair());
diff --git a/ydb/core/tx/datashard/change_collector_base.cpp b/ydb/core/tx/datashard/change_collector_base.cpp
index 779d0ce4437..29fca256764 100644
--- a/ydb/core/tx/datashard/change_collector_base.cpp
+++ b/ydb/core/tx/datashard/change_collector_base.cpp
@@ -143,12 +143,19 @@ void TBaseChangeCollector::Persist(
.WithStep(WriteVersion.Step)
.WithTxId(WriteVersion.TxId)
.WithPathId(pathId)
+ .WithTableId(tableId.PathId)
.WithSchemaVersion(userTable->GetTableSchemaVersion())
.WithBody(body.SerializeAsString())
.Build();
Self->PersistChangeRecord(db, record);
- Collected.emplace_back(record.GetOrder(), record.GetPathId(), record.GetBody().size());
+ Collected.emplace_back(
+ record.GetOrder(),
+ record.GetPathId(),
+ record.GetBody().size(),
+ record.GetTableId(),
+ record.GetSchemaVersion()
+ );
}
} // NDataShard
diff --git a/ydb/core/tx/datashard/change_record.cpp b/ydb/core/tx/datashard/change_record.cpp
index f191d3c5b03..cce147d67f7 100644
--- a/ydb/core/tx/datashard/change_record.cpp
+++ b/ydb/core/tx/datashard/change_record.cpp
@@ -1,6 +1,13 @@
#include "change_record.h"
+#include "export_common.h"
+
+#include <library/cpp/json/json_reader.h>
+#include <library/cpp/json/yson/json2yson.h>
+#include <library/cpp/string_utils/base64/base64.h>
#include <ydb/core/protos/change_exchange.pb.h>
+#include <ydb/core/util/yverify_stream.h>
+#include <ydb/library/binary_json/read.h>
#include <util/stream/str.h>
@@ -55,6 +62,156 @@ void TChangeRecord::SerializeTo(NKikimrChangeExchange::TChangeRecord& record) co
}
}
+static NJson::TJsonValue StringToJson(TStringBuf in) {
+ NJson::TJsonValue result;
+ Y_VERIFY(NJson::ReadJsonTree(in, &result));
+ return result;
+}
+
+static NJson::TJsonValue YsonToJson(TStringBuf in) {
+ NJson::TJsonValue result;
+ Y_VERIFY(NJson2Yson::DeserializeYsonAsJsonValue(in, &result));
+ return result;
+}
+
+static NJson::TJsonValue ToJson(const TCell& cell, NScheme::TTypeId type) {
+ if (cell.IsNull()) {
+ return NJson::TJsonValue(NJson::JSON_NULL);
+ }
+
+ switch (type) {
+ case NScheme::NTypeIds::Bool:
+ return NJson::TJsonValue(cell.AsValue<bool>());
+ case NScheme::NTypeIds::Int8:
+ return NJson::TJsonValue(cell.AsValue<i8>());
+ case NScheme::NTypeIds::Uint8:
+ return NJson::TJsonValue(cell.AsValue<ui8>());
+ case NScheme::NTypeIds::Int16:
+ return NJson::TJsonValue(cell.AsValue<i16>());
+ case NScheme::NTypeIds::Uint16:
+ return NJson::TJsonValue(cell.AsValue<ui16>());
+ case NScheme::NTypeIds::Int32:
+ return NJson::TJsonValue(cell.AsValue<i32>());
+ case NScheme::NTypeIds::Uint32:
+ return NJson::TJsonValue(cell.AsValue<ui32>());
+ case NScheme::NTypeIds::Int64:
+ return NJson::TJsonValue(cell.AsValue<i64>());
+ case NScheme::NTypeIds::Uint64:
+ return NJson::TJsonValue(cell.AsValue<ui64>());
+ case NScheme::NTypeIds::Float:
+ return NJson::TJsonValue(cell.AsValue<float>());
+ case NScheme::NTypeIds::Double:
+ return NJson::TJsonValue(cell.AsValue<double>());
+ case NScheme::NTypeIds::Date:
+ return NJson::TJsonValue(TInstant::Days(cell.AsValue<ui16>()).ToString());
+ case NScheme::NTypeIds::Datetime:
+ return NJson::TJsonValue(TInstant::Seconds(cell.AsValue<ui32>()).ToString());
+ case NScheme::NTypeIds::Timestamp:
+ return NJson::TJsonValue(TInstant::MicroSeconds(cell.AsValue<ui64>()).ToString());
+ case NScheme::NTypeIds::Interval:
+ return NJson::TJsonValue(cell.AsValue<i64>());
+ case NScheme::NTypeIds::Decimal:
+ return NJson::TJsonValue(DecimalToString(cell.AsValue<std::pair<ui64, i64>>()));
+ case NScheme::NTypeIds::DyNumber:
+ return NJson::TJsonValue(DyNumberToString(cell.AsBuf()));
+ case NScheme::NTypeIds::String:
+ case NScheme::NTypeIds::String4k:
+ case NScheme::NTypeIds::String2m:
+ return NJson::TJsonValue(Base64Encode(cell.AsBuf()));
+ case NScheme::NTypeIds::Utf8:
+ return NJson::TJsonValue(cell.AsBuf());
+ case NScheme::NTypeIds::Json:
+ return StringToJson(cell.AsBuf());
+ case NScheme::NTypeIds::JsonDocument:
+ return StringToJson(NBinaryJson::SerializeToJson(cell.AsBuf()));
+ case NScheme::NTypeIds::Yson:
+ return YsonToJson(cell.AsBuf());
+ default:
+ Y_FAIL("Unexpected type");
+ }
+}
+
+static void SerializeJsonKey(TUserTable::TCPtr schema, NJson::TJsonValue& key,
+ const NKikimrChangeExchange::TChangeRecord::TDataChange::TSerializedCells& in)
+{
+ Y_VERIFY(in.TagsSize() != schema->KeyColumnIds.size());
+ for (size_t i = 0; i < schema->KeyColumnIds.size(); ++i) {
+ Y_VERIFY(in.GetTags(i) == schema->KeyColumnIds.at(i));
+ }
+
+ TSerializedCellVec cells;
+ Y_VERIFY(TSerializedCellVec::TryParse(in.GetData(), cells));
+
+ Y_VERIFY(cells.GetCells().size() == schema->KeyColumnTypes.size());
+ for (size_t i = 0; i < schema->KeyColumnTypes.size(); ++i) {
+ const auto type = schema->KeyColumnTypes.at(i);
+ const auto& cell = cells.GetCells().at(i);
+ key.AppendValue(ToJson(cell, type));
+ }
+}
+
+static void SerializeJsonValue(TUserTable::TCPtr schema, NJson::TJsonValue& value,
+ const NKikimrChangeExchange::TChangeRecord::TDataChange::TSerializedCells& in)
+{
+ TSerializedCellVec cells;
+ Y_VERIFY(TSerializedCellVec::TryParse(in.GetData(), cells));
+ Y_VERIFY(in.TagsSize() == cells.GetCells().size());
+
+ for (ui32 i = 0; i < in.TagsSize(); ++i) {
+ const auto tag = in.GetTags(i);
+ const auto& cell = cells.GetCells().at(i);
+
+ auto it = schema->Columns.find(tag);
+ Y_VERIFY(it != schema->Columns.end());
+
+ const auto& column = it->second;
+ value.InsertValue(column.Name, ToJson(cell, column.Type));
+ }
+}
+
+void TChangeRecord::SerializeTo(NJson::TJsonValue& key, NJson::TJsonValue& value) const {
+ switch (Kind) {
+ case EKind::CdcDataChange: {
+ Y_VERIFY(Schema);
+
+ NKikimrChangeExchange::TChangeRecord::TDataChange body;
+ Y_VERIFY(body.ParseFromArray(Body.data(), Body.size()));
+
+ SerializeJsonKey(Schema, key, body.GetKey());
+
+ if (body.HasOldImage()) {
+ SerializeJsonValue(Schema, value["oldImage"], body.GetOldImage());
+ }
+
+ if (body.HasNewImage()) {
+ SerializeJsonValue(Schema, value["newImage"], body.GetNewImage());
+ }
+
+ if (!body.HasOldImage() && !body.HasNewImage()) {
+ switch (body.GetRowOperationCase()) {
+ case NKikimrChangeExchange::TChangeRecord::TDataChange::kUpsert:
+ SerializeJsonValue(Schema, value["update"], body.GetUpsert());
+ break;
+ case NKikimrChangeExchange::TChangeRecord::TDataChange::kReset:
+ SerializeJsonValue(Schema, value["reset"], body.GetReset());
+ break;
+ case NKikimrChangeExchange::TChangeRecord::TDataChange::kErase:
+ value["erase"].SetType(NJson::EJsonValueType::JSON_MAP);
+ break;
+ default:
+ Y_FAIL_S("Unexpected row operation: " << static_cast<int>(body.GetRowOperationCase()));
+ }
+ }
+
+ break;
+ }
+
+ case EKind::AsyncIndex: {
+ Y_FAIL("Not supported");
+ }
+ }
+}
+
TString TChangeRecord::ToString() const {
TString result;
TStringOutput out(result);
@@ -103,11 +260,21 @@ TChangeRecordBuilder& TChangeRecordBuilder::WithPathId(const TPathId& pathId) {
return *this;
}
+TChangeRecordBuilder& TChangeRecordBuilder::WithTableId(const TPathId& tableId) {
+ Record.TableId = tableId;
+ return *this;
+}
+
TChangeRecordBuilder& TChangeRecordBuilder::WithSchemaVersion(ui64 version) {
Record.SchemaVersion = version;
return *this;
}
+TChangeRecordBuilder& TChangeRecordBuilder::WithSchema(TUserTable::TCPtr schema) {
+ Record.Schema = schema;
+ return *this;
+}
+
TChangeRecordBuilder& TChangeRecordBuilder::WithBody(const TString& body) {
Record.Body = body;
return *this;
diff --git a/ydb/core/tx/datashard/change_record.h b/ydb/core/tx/datashard/change_record.h
index 96b0ceb4f46..fae0f793732 100644
--- a/ydb/core/tx/datashard/change_record.h
+++ b/ydb/core/tx/datashard/change_record.h
@@ -1,5 +1,9 @@
#pragma once
+#include "datashard_user_table.h"
+
+#include <library/cpp/json/json_value.h>
+
#include <ydb/core/base/pathid.h>
#include <ydb/core/scheme/scheme_tablecell.h>
@@ -30,13 +34,16 @@ public:
ui64 GetStep() const { return Step; }
ui64 GetTxId() const { return TxId; }
const TPathId& GetPathId() const { return PathId; }
- ui64 GetSchemaVersion() const { return SchemaVersion; }
EKind GetKind() const { return Kind; }
const TString& GetBody() const { return Body; }
i64 GetSeqNo() const;
TConstArrayRef<TCell> GetKey() const;
+ const TPathId& GetTableId() const { return TableId; }
+ ui64 GetSchemaVersion() const { return SchemaVersion; }
+
void SerializeTo(NKikimrChangeExchange::TChangeRecord& record) const;
+ void SerializeTo(NJson::TJsonValue& key, NJson::TJsonValue& value) const;
TString ToString() const;
void Out(IOutputStream& out) const;
@@ -47,10 +54,13 @@ private:
ui64 Step;
ui64 TxId;
TPathId PathId;
- ui64 SchemaVersion;
EKind Kind;
TString Body;
+ ui64 SchemaVersion;
+ TPathId TableId;
+ TUserTable::TCPtr Schema;
+
mutable TMaybe<TOwnedCellVec> Key;
}; // TChangeRecord
@@ -66,7 +76,10 @@ public:
TChangeRecordBuilder& WithStep(ui64 step);
TChangeRecordBuilder& WithTxId(ui64 txId);
TChangeRecordBuilder& WithPathId(const TPathId& pathId);
+
+ TChangeRecordBuilder& WithTableId(const TPathId& tableId);
TChangeRecordBuilder& WithSchemaVersion(ui64 version);
+ TChangeRecordBuilder& WithSchema(TUserTable::TCPtr schema);
TChangeRecordBuilder& WithBody(const TString& body);
TChangeRecordBuilder& WithBody(TString&& body);
diff --git a/ydb/core/tx/datashard/change_sender_cdc_stream.cpp b/ydb/core/tx/datashard/change_sender_cdc_stream.cpp
index 288f6a8526f..3533a4997d1 100644
--- a/ydb/core/tx/datashard/change_sender_cdc_stream.cpp
+++ b/ydb/core/tx/datashard/change_sender_cdc_stream.cpp
@@ -5,6 +5,7 @@
#include <library/cpp/actors/core/actor_bootstrapped.h>
#include <library/cpp/actors/core/hfunc.h>
#include <library/cpp/actors/core/log.h>
+#include <library/cpp/json/json_writer.h>
#include <ydb/core/persqueue/partition_key_range/partition_key_range.h>
#include <ydb/core/persqueue/writer/source_id_encoding.h>
@@ -85,23 +86,46 @@ class TCdcChangeSenderPartition: public TActorBootstrapped<TCdcChangeSenderParti
continue;
}
- const auto createdAt = TInstant::FromValue(record.GetGroup());
-
- NKikimrChangeExchange::TChangeRecord protoRecord;
- record.SerializeTo(protoRecord);
+ const auto createdAt = record.GetGroup()
+ ? TInstant::FromValue(record.GetGroup())
+ : TInstant::MilliSeconds(record.GetStep());
NKikimrPQClient::TDataChunk data;
data.SetSeqNo(record.GetSeqNo());
data.SetCreateTime(createdAt.MilliSeconds());
data.SetCodec(CodecRaw);
- data.SetData(protoRecord.SerializeAsString());
// TODO: meta?
auto& cmd = *request.MutablePartitionRequest()->AddCmdWrite();
cmd.SetSeqNo(record.GetSeqNo());
cmd.SetSourceId(NSourceIdEncoding::EncodeSimple(SourceId));
cmd.SetCreateTimeMS(createdAt.MilliSeconds());
- cmd.SetData(data.SerializeAsString());
+
+ switch (Format) {
+ case NKikimrSchemeOp::ECdcStreamFormatProto: {
+ NKikimrChangeExchange::TChangeRecord protoRecord;
+ record.SerializeTo(protoRecord);
+ data.SetData(protoRecord.SerializeAsString());
+ cmd.SetData(data.SerializeAsString());
+ break;
+ }
+
+ case NKikimrSchemeOp::ECdcStreamFormatJson: {
+ NJson::TJsonValue key;
+ NJson::TJsonValue value;
+ record.SerializeTo(key, value);
+ data.SetData(WriteJson(value, false));
+ cmd.SetData(data.SerializeAsString());
+ cmd.SetPartitionKey(WriteJson(key, false));
+ break;
+ }
+
+ default: {
+ LOG_E("Unknown format"
+ << ": format# " << static_cast<int>(Format));
+ return Leave();
+ }
+ }
Pending.push_back(record.GetSeqNo());
}
@@ -193,11 +217,17 @@ public:
return NKikimrServices::TActivity::CHANGE_SENDER_CDC_ACTOR_PARTITION;
}
- explicit TCdcChangeSenderPartition(const TActorId& parent, const TDataShardId& dataShard, ui32 partitionId, ui64 shardId)
+ explicit TCdcChangeSenderPartition(
+ const TActorId& parent,
+ const TDataShardId& dataShard,
+ ui32 partitionId,
+ ui64 shardId,
+ NKikimrSchemeOp::ECdcStreamFormat format)
: Parent(parent)
, DataShard(dataShard)
, PartitionId(partitionId)
, ShardId(shardId)
+ , Format(format)
, SourceId(ToString(DataShard.TabletId))
{
}
@@ -222,6 +252,7 @@ private:
const TDataShardId DataShard;
const ui32 PartitionId;
const ui64 ShardId;
+ const NKikimrSchemeOp::ECdcStreamFormat Format;
const TString SourceId;
mutable TMaybe<TString> LogPrefix;
@@ -374,6 +405,15 @@ class TCdcChangeSenderMain: public TActorBootstrapped<TCdcChangeSenderMain>
return Check(&TSchemeCacheHelpers::CheckEntryKind<TNavigate::TEntry>, &TThis::LogWarnAndRetry, entry, expected);
}
+ bool CheckNotEmpty(const TIntrusiveConstPtr<TNavigate::TCdcStreamInfo>& streamInfo) {
+ if (streamInfo) {
+ return true;
+ }
+
+ LogCritAndRetry(TStringBuilder() << "Empty stream info at '" << CurrentStateName() << "'");
+ return false;
+ }
+
bool CheckNotEmpty(const TIntrusiveConstPtr<TNavigate::TPQGroupInfo>& pqInfo) {
if (pqInfo) {
return true;
@@ -440,6 +480,12 @@ class TCdcChangeSenderMain: public TActorBootstrapped<TCdcChangeSenderMain>
return;
}
+ if (!CheckNotEmpty(entry.CdcStreamInfo)) {
+ return;
+ }
+
+ Format = entry.CdcStreamInfo->Description.GetFormat();
+
Y_VERIFY(entry.ListNodeEntry->Children.size() == 1);
const auto& topic = entry.ListNodeEntry->Children.at(0);
@@ -597,7 +643,7 @@ class TCdcChangeSenderMain: public TActorBootstrapped<TCdcChangeSenderMain>
IActor* CreateSender(ui64 partitionId) override {
Y_VERIFY(PartitionToShard.contains(partitionId));
const auto shardId = PartitionToShard.at(partitionId);
- return new TCdcChangeSenderPartition(SelfId(), DataShard, partitionId, shardId);
+ return new TCdcChangeSenderPartition(SelfId(), DataShard, partitionId, shardId, Format);
}
void Handle(TEvChangeExchange::TEvEnqueueRecords::TPtr& ev) {
@@ -668,6 +714,7 @@ public:
private:
mutable TMaybe<TString> LogPrefix;
+ NKikimrSchemeOp::ECdcStreamFormat Format;
TPathId TopicPathId;
THolder<TKeyDesc> KeyDesc;
THashMap<ui32, ui64> PartitionToShard;
diff --git a/ydb/core/tx/datashard/create_cdc_stream_unit.cpp b/ydb/core/tx/datashard/create_cdc_stream_unit.cpp
index 8f577f03b4c..1b9b57707b6 100644
--- a/ydb/core/tx/datashard/create_cdc_stream_unit.cpp
+++ b/ydb/core/tx/datashard/create_cdc_stream_unit.cpp
@@ -44,20 +44,14 @@ public:
auto tableInfo = DataShard.AlterTableAddCdcStream(ctx, txc, pathId, version, streamDesc);
DataShard.AddUserTable(pathId, tableInfo);
+ if (tableInfo->NeedSchemaSnapshots()) {
+ DataShard.AddSchemaSnapshot(pathId, version, op->GetStep(), op->GetTxId(), txc, ctx);
+ }
+
AddSender.Reset(new TEvChangeExchange::TEvAddSender(
pathId, TEvChangeExchange::ESenderType::CdcStream, streamPathId
));
- /* TODO(ilnaz)
- if (streamDesc.GetFormat() == NKikimrSchemeOp::ECdcStreamFormatJson) {
- auto& manager = DataShard.GetSchemaSnapshotManager();
-
- const auto key = TSchemaSnapshotKey(pathId.OwnerId, pathId.LocalPathId, version);
- manager.AddSnapshot(txc.DB, key, TSchemaSnapshot(tableInfo, op->GetStep(), op->GetTxId()));
- manager.AcquireReference(key);
- }
- */
-
BuildResult(op, NKikimrTxDataShard::TEvProposeTransactionResult::COMPLETE);
op->Result()->SetStepOrderId(op->GetStepOrder().ToPair());
diff --git a/ydb/core/tx/datashard/datashard.cpp b/ydb/core/tx/datashard/datashard.cpp
index e636d9c9143..a3fcc458681 100644
--- a/ydb/core/tx/datashard/datashard.cpp
+++ b/ydb/core/tx/datashard/datashard.cpp
@@ -468,7 +468,9 @@ void TDataShard::PersistChangeRecord(NIceDb::TNiceDb& db, const TChangeRecord& r
NIceDb::TUpdate<Schema::ChangeRecords::PathOwnerId>(record.GetPathId().OwnerId),
NIceDb::TUpdate<Schema::ChangeRecords::LocalPathId>(record.GetPathId().LocalPathId),
NIceDb::TUpdate<Schema::ChangeRecords::BodySize>(record.GetBody().size()),
- NIceDb::TUpdate<Schema::ChangeRecords::SchemaVersion>(record.GetSchemaVersion()));
+ NIceDb::TUpdate<Schema::ChangeRecords::SchemaVersion>(record.GetSchemaVersion()),
+ NIceDb::TUpdate<Schema::ChangeRecords::TableOwnerId>(record.GetTableId().OwnerId),
+ NIceDb::TUpdate<Schema::ChangeRecords::TablePathId>(record.GetTableId().LocalPathId));
db.Table<Schema::ChangeRecordDetails>().Key(record.GetOrder()).Update(
NIceDb::TUpdate<Schema::ChangeRecordDetails::Kind>(record.GetKind()),
NIceDb::TUpdate<Schema::ChangeRecordDetails::Body>(record.GetBody()));
@@ -490,18 +492,45 @@ void TDataShard::RemoveChangeRecord(NIceDb::TNiceDb& db, ui64 order) {
<< ": order: " << order
<< ", at tablet: " << TabletID());
+ db.Table<Schema::ChangeRecords>().Key(order).Delete();
+ db.Table<Schema::ChangeRecordDetails>().Key(order).Delete();
+
auto it = ChangesQueue.find(order);
- if (it != ChangesQueue.end()) {
- Y_VERIFY(it->second <= ChangesQueueBytes);
- ChangesQueueBytes -= it->second;
- ChangesQueue.erase(it);
+ if (it == ChangesQueue.end()) {
+ Y_VERIFY_DEBUG_S(false, "Trying to remove non-enqueud record: " << order);
+ return;
}
- db.Table<Schema::ChangeRecords>().Key(order).Delete();
- db.Table<Schema::ChangeRecordDetails>().Key(order).Delete();
+ const auto& record = it->second;
+
+ Y_VERIFY(record.BodySize <= ChangesQueueBytes);
+ ChangesQueueBytes -= record.BodySize;
+
+ if (record.SchemaSnapshotAcquired) {
+ Y_VERIFY(record.TableId);
+ auto tableIt = TableInfos.find(record.TableId.LocalPathId);
+
+ if (tableIt != TableInfos.end()) {
+ const auto snapshotKey = TSchemaSnapshotKey(record.TableId, record.SchemaVersion);
+ const bool last = SchemaSnapshotManager.ReleaseReference(snapshotKey);
+
+ if (last) {
+ const auto* snapshot = SchemaSnapshotManager.FindSnapshot(snapshotKey);
+ Y_VERIFY(snapshot);
+
+ if (snapshot->Schema->GetTableSchemaVersion() < tableIt->second->GetTableSchemaVersion()) {
+ SchemaSnapshotManager.RemoveShapshot(db, snapshotKey);
+ }
+ }
+ } else {
+ Y_VERIFY_DEBUG(State == TShardState::PreOffline);
+ }
+ }
+
+ ChangesQueue.erase(it);
}
-void TDataShard::EnqueueChangeRecords(TVector<TEvChangeExchange::TEvEnqueueRecords::TRecordInfo>&& records) {
+void TDataShard::EnqueueChangeRecords(TVector<NMiniKQL::IChangeCollector::TChange>&& records) {
if (!records) {
return;
}
@@ -510,15 +539,23 @@ void TDataShard::EnqueueChangeRecords(TVector<TEvChangeExchange::TEvEnqueueRecor
<< ": at tablet: " << TabletID()
<< ", records: " << JoinSeq(", ", records));
+ TVector<TEvChangeExchange::TEvEnqueueRecords::TRecordInfo> forward(Reserve(records.size()));
for (const auto& record : records) {
- if (ChangesQueue.emplace(record.Order, record.BodySize).second) {
- Y_VERIFY(ChangesQueueBytes <= (Max<ui64>() - record.BodySize));
- ChangesQueueBytes += record.BodySize;
+ forward.emplace_back(record.Order(), record.PathId(), record.BodySize());
+
+ if (auto res = ChangesQueue.emplace(record.Order(), record); res.second) {
+ Y_VERIFY(ChangesQueueBytes <= (Max<ui64>() - record.BodySize()));
+ ChangesQueueBytes += record.BodySize();
+
+ if (record.SchemaVersion()) {
+ res.first->second.SchemaSnapshotAcquired = SchemaSnapshotManager.AcquireReference(
+ TSchemaSnapshotKey(record.TableId(), record.SchemaVersion()));
+ }
}
}
Y_VERIFY(OutChangeSender);
- Send(OutChangeSender, new TEvChangeExchange::TEvEnqueueRecords(std::move(records)));
+ Send(OutChangeSender, new TEvChangeExchange::TEvEnqueueRecords(std::move(forward)));
}
void TDataShard::CreateChangeSender(const TActorContext& ctx) {
@@ -580,14 +617,14 @@ void TDataShard::KillChangeSender(const TActorContext& ctx) {
}
}
-bool TDataShard::LoadChangeRecords(NIceDb::TNiceDb& db, TVector<TEvChangeExchange::TEvEnqueueRecords::TRecordInfo>& changeRecords) {
+bool TDataShard::LoadChangeRecords(NIceDb::TNiceDb& db, TVector<NMiniKQL::IChangeCollector::TChange>& records) {
using Schema = TDataShard::Schema;
LOG_DEBUG_S(*TlsActivationContext, NKikimrServices::TX_DATASHARD, "LoadChangeRecords"
<< ": QueueSize: " << ChangesQueue.size()
<< ", at tablet: " << TabletID());
- changeRecords.reserve(ChangesQueue.size());
+ records.reserve(ChangesQueue.size());
auto rowset = db.Table<Schema::ChangeRecords>().Range().Select();
if (!rowset.IsReady()) {
@@ -597,12 +634,17 @@ bool TDataShard::LoadChangeRecords(NIceDb::TNiceDb& db, TVector<TEvChangeExchang
while (!rowset.EndOfSet()) {
const ui64 order = rowset.GetValue<Schema::ChangeRecords::Order>();
const ui64 bodySize = rowset.GetValue<Schema::ChangeRecords::BodySize>();
+ const ui64 schemaVersion = rowset.GetValue<Schema::ChangeRecords::SchemaVersion>();
const auto pathId = TPathId(
rowset.GetValue<Schema::ChangeRecords::PathOwnerId>(),
rowset.GetValue<Schema::ChangeRecords::LocalPathId>()
);
+ const auto tableId = TPathId(
+ rowset.GetValue<Schema::ChangeRecords::TableOwnerId>(),
+ rowset.GetValue<Schema::ChangeRecords::TablePathId>()
+ );
- changeRecords.emplace_back(order, pathId, bodySize);
+ records.emplace_back(order, pathId, bodySize, tableId, schemaVersion);
if (!rowset.Next()) {
return false;
}
@@ -810,6 +852,24 @@ TUserTable::TPtr TDataShard::AlterTableDropCdcStream(
return tableInfo;
}
+void TDataShard::AddSchemaSnapshot(const TPathId& pathId, ui64 tableSchemaVersion, ui64 step, ui64 txId,
+ TTransactionContext& txc, const TActorContext& ctx)
+{
+ LOG_DEBUG_S(ctx, NKikimrServices::TX_DATASHARD, "Add schema snapshot"
+ << ": pathId# " << pathId
+ << ", version# " << tableSchemaVersion
+ << ", step# " << step
+ << ", txId# " << txId
+ << ", at tablet# " << TabletID());
+
+ Y_VERIFY(GetPathOwnerId() == pathId.OwnerId);
+ Y_VERIFY(TableInfos.contains(pathId.LocalPathId));
+ auto tableInfo = TableInfos[pathId.LocalPathId];
+
+ const auto key = TSchemaSnapshotKey(pathId.OwnerId, pathId.LocalPathId, tableSchemaVersion);
+ SchemaSnapshotManager.AddSnapshot(txc.DB, key, TSchemaSnapshot(tableInfo, step, txId));
+}
+
TUserTable::TPtr TDataShard::CreateUserTable(TTransactionContext& txc,
const NKikimrSchemeOp::TTableDescription& tableScheme)
{
@@ -863,8 +923,8 @@ THashMap<TPathId, TPathId> TDataShard::GetRemapIndexes(const NKikimrTxDataShard:
return remap;
}
-TUserTable::TPtr TDataShard::MoveUserTable(const TActorContext& ctx, TTransactionContext& txc,
- const NKikimrTxDataShard::TMoveTable& move)
+TUserTable::TPtr TDataShard::MoveUserTable(TOperation::TPtr op, const NKikimrTxDataShard::TMoveTable& move,
+ const TActorContext& ctx, TTransactionContext& txc)
{
auto prevId = TPathId(move.GetPathId().GetOwnerId(), move.GetPathId().GetLocalId());
auto newId = TPathId(move.GetDstPathId().GetOwnerId(), move.GetDstPathId().GetLocalId());
@@ -872,7 +932,10 @@ TUserTable::TPtr TDataShard::MoveUserTable(const TActorContext& ctx, TTransactio
Y_VERIFY(GetPathOwnerId() == prevId.OwnerId);
Y_VERIFY(TableInfos.contains(prevId.LocalPathId));
- auto newTableInfo = AlterTableSchemaVersion(ctx, txc, prevId, move.GetTableSchemaVersion(), false);
+ const auto version = move.GetTableSchemaVersion();
+ Y_VERIFY(version);
+
+ auto newTableInfo = AlterTableSchemaVersion(ctx, txc, prevId, version, false);
newTableInfo->SetPath(move.GetDstPath());
Y_VERIFY(move.ReMapIndexesSize() == newTableInfo->Indexes.size());
@@ -898,6 +961,10 @@ TUserTable::TPtr TDataShard::MoveUserTable(const TActorContext& ctx, TTransactio
RemoveUserTable(prevId);
AddUserTable(newId, newTableInfo);
+ if (newTableInfo->NeedSchemaSnapshots()) {
+ AddSchemaSnapshot(newId, version, op->GetStep(), op->GetTxId(), txc, ctx);
+ }
+
NIceDb::TNiceDb db(txc.DB);
PersistMoveUserTable(db, prevId.LocalPathId, newId.LocalPathId, *newTableInfo);
PersistOwnerPathId(newId.OwnerId, txc);
diff --git a/ydb/core/tx/datashard/datashard_change_sending.cpp b/ydb/core/tx/datashard/datashard_change_sending.cpp
index 0f51e316416..3ef9d1b96e4 100644
--- a/ydb/core/tx/datashard/datashard_change_sending.cpp
+++ b/ydb/core/tx/datashard/datashard_change_sending.cpp
@@ -63,6 +63,20 @@ class TDataShard::TTxRequestChangeRecords: public TTransactionBase<TDataShard> {
<< ", it->Order: " << it->Order
<< ", it->BodySize: " << it->BodySize);
+ const auto schemaVersion = basic.GetValue<Schema::ChangeRecords::SchemaVersion>();
+ TUserTable::TCPtr schema;
+
+ if (schemaVersion) {
+ const auto tableId = TPathId(
+ basic.GetValue<Schema::ChangeRecords::TableOwnerId>(),
+ basic.GetValue<Schema::ChangeRecords::TablePathId>()
+ );
+ const auto snapshotKey = TSchemaSnapshotKey(tableId, schemaVersion);
+ if (const auto* snapshot = Self->GetSchemaSnapshotManager().FindSnapshot(snapshotKey)) {
+ schema = snapshot->Schema;
+ }
+ }
+
RecordsToSend[recipient].emplace_back(TChangeRecordBuilder(details.GetValue<Schema::ChangeRecordDetails::Kind>())
.WithOrder(it->Order)
.WithGroup(basic.GetValue<Schema::ChangeRecords::Group>())
@@ -72,7 +86,7 @@ class TDataShard::TTxRequestChangeRecords: public TTransactionBase<TDataShard> {
basic.GetValue<Schema::ChangeRecords::PathOwnerId>(),
basic.GetValue<Schema::ChangeRecords::LocalPathId>()
))
- .WithSchemaVersion(basic.GetValue<Schema::ChangeRecords::SchemaVersion>())
+ .WithSchema(schema)
.WithBody(details.GetValue<Schema::ChangeRecordDetails::Body>())
.Build());
diff --git a/ydb/core/tx/datashard/datashard_impl.h b/ydb/core/tx/datashard/datashard_impl.h
index 3056764635e..17eebf7693f 100644
--- a/ydb/core/tx/datashard/datashard_impl.h
+++ b/ydb/core/tx/datashard/datashard_impl.h
@@ -653,9 +653,22 @@ class TDataShard
struct LocalPathId : Column<6, NScheme::NTypeIds::Uint64> {};
struct BodySize : Column<7, NScheme::NTypeIds::Uint64> {};
struct SchemaVersion : Column<8, NScheme::NTypeIds::Uint64> {};
+ struct TableOwnerId : Column<9, NScheme::NTypeIds::Uint64> {};
+ struct TablePathId : Column<10, NScheme::NTypeIds::Uint64> {};
using TKey = TableKey<Order>;
- using TColumns = TableColumns<Order, Group, PlanStep, TxId, PathOwnerId, LocalPathId, BodySize, SchemaVersion>;
+ using TColumns = TableColumns<
+ Order,
+ Group,
+ PlanStep,
+ TxId,
+ PathOwnerId,
+ LocalPathId,
+ BodySize,
+ SchemaVersion,
+ TableOwnerId,
+ TablePathId
+ >;
};
struct ChangeRecordDetails : Table<18> {
@@ -1377,7 +1390,8 @@ public:
TUserTable::TPtr AlterUserTable(const TActorContext& ctx, TTransactionContext& txc,
const NKikimrSchemeOp::TTableDescription& tableScheme);
static THashMap<TPathId, TPathId> GetRemapIndexes(const NKikimrTxDataShard::TMoveTable& move);
- TUserTable::TPtr MoveUserTable(const TActorContext& ctx, TTransactionContext& txc, const NKikimrTxDataShard::TMoveTable& move);
+ TUserTable::TPtr MoveUserTable(TOperation::TPtr op, const NKikimrTxDataShard::TMoveTable& move,
+ const TActorContext& ctx, TTransactionContext& txc);
void DropUserTable(TTransactionContext& txc, ui64 tableId);
ui32 GetLastLocalTid() const { return LastLocalTid; }
@@ -1387,12 +1401,12 @@ public:
void PersistChangeRecord(NIceDb::TNiceDb& db, const TChangeRecord& record);
void MoveChangeRecord(NIceDb::TNiceDb& db, ui64 order, const TPathId& pathId);
void RemoveChangeRecord(NIceDb::TNiceDb& db, ui64 order);
- void EnqueueChangeRecords(TVector<TEvChangeExchange::TEvEnqueueRecords::TRecordInfo>&& records);
+ void EnqueueChangeRecords(TVector<NMiniKQL::IChangeCollector::TChange>&& records);
void CreateChangeSender(const TActorContext& ctx);
void KillChangeSender(const TActorContext& ctx);
void MaybeActivateChangeSender(const TActorContext& ctx);
const TActorId& GetChangeSender() const { return OutChangeSender; }
- bool LoadChangeRecords(NIceDb::TNiceDb& db, TVector<TEvChangeExchange::TEvEnqueueRecords::TRecordInfo>& changeRecords);
+ bool LoadChangeRecords(NIceDb::TNiceDb& db, TVector<NMiniKQL::IChangeCollector::TChange>& records);
static void PersistSchemeTxResult(NIceDb::TNiceDb &db, const TSchemaOperation& op);
@@ -1405,6 +1419,8 @@ public:
TSchemaSnapshotManager& GetSchemaSnapshotManager() { return SchemaSnapshotManager; }
const TSchemaSnapshotManager& GetSchemaSnapshotManager() const { return SchemaSnapshotManager; }
+ void AddSchemaSnapshot(const TPathId& pathId, ui64 tableSchemaVersion, ui64 step, ui64 txId,
+ TTransactionContext& txc, const TActorContext& ctx);
template <typename... Args>
bool PromoteCompleteEdge(Args&&... args) {
@@ -2046,6 +2062,26 @@ private:
}
};
+ struct TEnqueuedRecord {
+ ui64 BodySize;
+ TPathId TableId;
+ ui64 SchemaVersion;
+ bool SchemaSnapshotAcquired;
+
+ explicit TEnqueuedRecord(ui64 bodySize, const TPathId& tableId, ui64 schemaVersion)
+ : BodySize(bodySize)
+ , TableId(tableId)
+ , SchemaVersion(schemaVersion)
+ , SchemaSnapshotAcquired(false)
+ {
+ }
+
+ explicit TEnqueuedRecord(const NMiniKQL::IChangeCollector::TChange& record)
+ : TEnqueuedRecord(record.BodySize(), record.TableId(), record.SchemaVersion())
+ {
+ }
+ };
+
using TRequestedRecord = TEvChangeExchange::TEvRequestRecords::TRecordInfo;
// split/merge
@@ -2058,7 +2094,7 @@ private:
TSet<ui64> ChangeRecordsToRemove; // ui64 is order
bool RequestChangeRecordsInFly = false;
bool RemoveChangeRecordsInFly = false;
- THashMap<ui64, ui64> ChangesQueue; // order to size
+ THashMap<ui64, TEnqueuedRecord> ChangesQueue; // ui64 is order
ui64 ChangesQueueBytes = 0;
TActorId OutChangeSender;
diff --git a/ydb/core/tx/datashard/datashard_schema_snapshots.cpp b/ydb/core/tx/datashard/datashard_schema_snapshots.cpp
index d4c7ab05106..80e865701fe 100644
--- a/ydb/core/tx/datashard/datashard_schema_snapshots.cpp
+++ b/ydb/core/tx/datashard/datashard_schema_snapshots.cpp
@@ -83,6 +83,16 @@ const TSchemaSnapshot* TSchemaSnapshotManager::FindSnapshot(const TSchemaSnapsho
return Snapshots.FindPtr(key);
}
+void TSchemaSnapshotManager::RemoveShapshot(NIceDb::TNiceDb& db, const TSchemaSnapshotKey& key) {
+ auto it = Snapshots.find(key);
+ if (it == Snapshots.end()) {
+ return;
+ }
+
+ Snapshots.erase(it);
+ PersistRemoveSnapshot(db, key);
+}
+
bool TSchemaSnapshotManager::AcquireReference(const TSchemaSnapshotKey& key) {
auto it = Snapshots.find(key);
if (it == Snapshots.end()) {
@@ -93,7 +103,7 @@ bool TSchemaSnapshotManager::AcquireReference(const TSchemaSnapshotKey& key) {
return true;
}
-bool TSchemaSnapshotManager::ReleaseReference(const TSchemaSnapshotKey& key, NIceDb::TNiceDb& db) {
+bool TSchemaSnapshotManager::ReleaseReference(const TSchemaSnapshotKey& key) {
auto refIt = References.find(key);
if (refIt == References.end() || refIt->second <= 0) {
@@ -113,7 +123,6 @@ bool TSchemaSnapshotManager::ReleaseReference(const TSchemaSnapshotKey& key, NIc
return false;
}
- PersistRemoveSnapshot(db, key);
return true;
}
diff --git a/ydb/core/tx/datashard/datashard_schema_snapshots.h b/ydb/core/tx/datashard/datashard_schema_snapshots.h
index a8218a22591..d8e137fecfe 100644
--- a/ydb/core/tx/datashard/datashard_schema_snapshots.h
+++ b/ydb/core/tx/datashard/datashard_schema_snapshots.h
@@ -31,9 +31,10 @@ public:
bool AddSnapshot(NTable::TDatabase& db, const TSchemaSnapshotKey& key, const TSchemaSnapshot& snapshot);
const TSchemaSnapshot* FindSnapshot(const TSchemaSnapshotKey& key) const;
+ void RemoveShapshot(NIceDb::TNiceDb& db, const TSchemaSnapshotKey& key);
bool AcquireReference(const TSchemaSnapshotKey& key);
- bool ReleaseReference(const TSchemaSnapshotKey& key, NIceDb::TNiceDb& db);
+ bool ReleaseReference(const TSchemaSnapshotKey& key);
private:
void PersistAddSnapshot(NIceDb::TNiceDb& db, const TSchemaSnapshotKey& key, const TSchemaSnapshot& snapshot);
diff --git a/ydb/core/tx/datashard/datashard_txs.h b/ydb/core/tx/datashard/datashard_txs.h
index 4a9ab008789..c221c4dfeb0 100644
--- a/ydb/core/tx/datashard/datashard_txs.h
+++ b/ydb/core/tx/datashard/datashard_txs.h
@@ -47,7 +47,7 @@ private:
bool CreateScheme(TTransactionContext &txc);
bool ReadEverything(TTransactionContext &txc);
private:
- TVector<TEvChangeExchange::TEvEnqueueRecords::TRecordInfo> ChangeRecords;
+ TVector<NMiniKQL::IChangeCollector::TChange> ChangeRecords;
};
class TDataShard::TTxPlanStep : public NTabletFlatExecutor::TTransactionBase<TDataShard> {
diff --git a/ydb/core/tx/datashard/datashard_user_table.cpp b/ydb/core/tx/datashard/datashard_user_table.cpp
index 70bc9400733..b97cd34012b 100644
--- a/ydb/core/tx/datashard/datashard_user_table.cpp
+++ b/ydb/core/tx/datashard/datashard_user_table.cpp
@@ -124,6 +124,7 @@ void TUserTable::AddCdcStream(const NKikimrSchemeOp::TCdcStreamDescription& stre
}
CdcStreams.emplace(streamPathId, TCdcStream(streamDesc));
+ JsonCdcStreamCount += ui32(streamDesc.GetFormat() == TCdcStream::EFormat::ECdcStreamFormatJson);
NKikimrSchemeOp::TTableDescription schema;
GetSchema(schema);
@@ -163,6 +164,7 @@ void TUserTable::DropCdcStream(const TPathId& streamPathId) {
return;
}
+ JsonCdcStreamCount -= ui32(it->second.Format == TCdcStream::EFormat::ECdcStreamFormatJson);
CdcStreams.erase(it);
NKikimrSchemeOp::TTableDescription schema;
@@ -186,6 +188,10 @@ bool TUserTable::HasCdcStreams() const {
return !CdcStreams.empty();
}
+bool TUserTable::NeedSchemaSnapshots() const {
+ return JsonCdcStreamCount > 0;
+}
+
void TUserTable::ParseProto(const NKikimrSchemeOp::TTableDescription& descr)
{
// We expect schemeshard to send us full list of storage rooms
@@ -275,6 +281,7 @@ void TUserTable::ParseProto(const NKikimrSchemeOp::TTableDescription& descr)
for (const auto& streamDesc : descr.GetCdcStreams()) {
Y_VERIFY(streamDesc.HasPathId());
CdcStreams.emplace(TPathId(streamDesc.GetPathId().GetOwnerId(), streamDesc.GetPathId().GetLocalId()), TCdcStream(streamDesc));
+ JsonCdcStreamCount += ui32(streamDesc.GetFormat() == TCdcStream::EFormat::ECdcStreamFormatJson);
}
}
diff --git a/ydb/core/tx/datashard/datashard_user_table.h b/ydb/core/tx/datashard/datashard_user_table.h
index 0dab30cb84f..962ef2ead3e 100644
--- a/ydb/core/tx/datashard/datashard_user_table.h
+++ b/ydb/core/tx/datashard/datashard_user_table.h
@@ -284,10 +284,12 @@ struct TUserTable : public TThrRefBase {
struct TCdcStream {
using EMode = NKikimrSchemeOp::ECdcStreamMode;
+ using EFormat = NKikimrSchemeOp::ECdcStreamFormat;
using EState = NKikimrSchemeOp::ECdcStreamState;
TString Name;
EMode Mode;
+ EFormat Format;
EState State;
TCdcStream() = default;
@@ -295,6 +297,7 @@ struct TUserTable : public TThrRefBase {
TCdcStream(const NKikimrSchemeOp::TCdcStreamDescription& streamDesc)
: Name(streamDesc.GetName())
, Mode(streamDesc.GetMode())
+ , Format(streamDesc.GetFormat())
, State(streamDesc.GetState())
{
}
@@ -357,6 +360,7 @@ struct TUserTable : public TThrRefBase {
TMap<TPathId, TTableIndex> Indexes;
TMap<TPathId, TCdcStream> CdcStreams;
ui32 AsyncIndexCount = 0;
+ ui32 JsonCdcStreamCount = 0;
// Tablet thread access only, updated in-place
mutable TStats Stats;
@@ -407,6 +411,7 @@ struct TUserTable : public TThrRefBase {
void DisableCdcStream(const TPathId& streamPathId);
void DropCdcStream(const TPathId& streamPathId);
bool HasCdcStreams() const;
+ bool NeedSchemaSnapshots() const;
private:
void DoApplyCreate(NTabletFlatExecutor::TTransactionContext& txc, const TString& tableName, bool shadow,
diff --git a/ydb/core/tx/datashard/direct_tx_unit.cpp b/ydb/core/tx/datashard/direct_tx_unit.cpp
index 1284b1f6ad6..d477ca5514e 100644
--- a/ydb/core/tx/datashard/direct_tx_unit.cpp
+++ b/ydb/core/tx/datashard/direct_tx_unit.cpp
@@ -38,12 +38,7 @@ public:
return EExecutionStatus::Restart;
}
- if (auto changes = tx->GetCollectedChanges()) {
- op->ChangeRecords().reserve(changes.size());
- for (const auto& change : changes) {
- op->ChangeRecords().emplace_back(change.Order(), change.PathId(), change.BodySize());
- }
- }
+ op->ChangeRecords() = std::move(tx->GetCollectedChanges());
DataShard.SysLocksTable().ApplyLocks();
Pipeline.AddCommittingOp(op);
diff --git a/ydb/core/tx/datashard/drop_cdc_stream_unit.cpp b/ydb/core/tx/datashard/drop_cdc_stream_unit.cpp
index 039a379a52b..048dd40874f 100644
--- a/ydb/core/tx/datashard/drop_cdc_stream_unit.cpp
+++ b/ydb/core/tx/datashard/drop_cdc_stream_unit.cpp
@@ -37,9 +37,16 @@ public:
const auto streamPathId = TPathId(params.GetStreamPathId().GetOwnerId(), params.GetStreamPathId().GetLocalId());
Y_VERIFY(streamPathId.OwnerId == DataShard.GetPathOwnerId());
- auto tableInfo = DataShard.AlterTableDropCdcStream(ctx, txc, pathId, params.GetTableSchemaVersion(), streamPathId);
+ const auto version = params.GetTableSchemaVersion();
+ Y_VERIFY(version);
+
+ auto tableInfo = DataShard.AlterTableDropCdcStream(ctx, txc, pathId, version, streamPathId);
DataShard.AddUserTable(pathId, tableInfo);
+ if (tableInfo->NeedSchemaSnapshots()) {
+ DataShard.AddSchemaSnapshot(pathId, version, op->GetStep(), op->GetTxId(), txc, ctx);
+ }
+
RemoveSender.Reset(new TEvChangeExchange::TEvRemoveSender(streamPathId));
BuildResult(op, NKikimrTxDataShard::TEvProposeTransactionResult::COMPLETE);
diff --git a/ydb/core/tx/datashard/drop_index_notice_unit.cpp b/ydb/core/tx/datashard/drop_index_notice_unit.cpp
index 43110699580..d1c877dccf4 100644
--- a/ydb/core/tx/datashard/drop_index_notice_unit.cpp
+++ b/ydb/core/tx/datashard/drop_index_notice_unit.cpp
@@ -33,6 +33,9 @@ public:
auto pathId = TPathId(params.GetPathId().GetOwnerId(), params.GetPathId().GetLocalId());
Y_VERIFY(pathId.OwnerId == DataShard.GetPathOwnerId());
+ const auto version = params.GetTableSchemaVersion();
+ Y_VERIFY(version);
+
TUserTable::TPtr tableInfo;
if (params.HasIndexPathId()) {
auto indexPathId = TPathId(params.GetIndexPathId().GetOwnerId(), params.GetIndexPathId().GetLocalId());
@@ -46,14 +49,18 @@ public:
RemoveSender.Reset(new TEvChangeExchange::TEvRemoveSender(indexPathId));
}
- tableInfo = DataShard.AlterTableDropIndex(ctx, txc, pathId, params.GetTableSchemaVersion(), indexPathId);
+ tableInfo = DataShard.AlterTableDropIndex(ctx, txc, pathId, version, indexPathId);
} else {
- tableInfo = DataShard.AlterTableSchemaVersion(ctx, txc, pathId, params.GetTableSchemaVersion());
+ tableInfo = DataShard.AlterTableSchemaVersion(ctx, txc, pathId, version);
}
Y_VERIFY(tableInfo);
DataShard.AddUserTable(pathId, tableInfo);
+ if (tableInfo->NeedSchemaSnapshots()) {
+ DataShard.AddSchemaSnapshot(pathId, version, op->GetStep(), op->GetTxId(), txc, ctx);
+ }
+
BuildResult(op, NKikimrTxDataShard::TEvProposeTransactionResult::COMPLETE);
op->Result()->SetStepOrderId(op->GetStepOrder().ToPair());
diff --git a/ydb/core/tx/datashard/execute_data_tx_unit.cpp b/ydb/core/tx/datashard/execute_data_tx_unit.cpp
index b32f635cc0f..a34599d2eac 100644
--- a/ydb/core/tx/datashard/execute_data_tx_unit.cpp
+++ b/ydb/core/tx/datashard/execute_data_tx_unit.cpp
@@ -240,12 +240,7 @@ void TExecuteDataTxUnit::ExecuteDataTx(TOperation::TPtr op,
if (op->IsImmediate() && !op->IsReadOnly())
DataShard.PromoteCompleteEdge(writeVersion.Step, txc);
- if (auto changes = tx->GetDataTx()->GetCollectedChanges()) {
- op->ChangeRecords().reserve(changes.size());
- for (const auto& change : changes) {
- op->ChangeRecords().emplace_back(change.Order(), change.PathId(), change.BodySize());
- }
- }
+ op->ChangeRecords() = std::move(tx->GetDataTx()->GetCollectedChanges());
}
LOG_TRACE_S(ctx, NKikimrServices::TX_DATASHARD,
diff --git a/ydb/core/tx/datashard/execute_distributed_erase_tx_unit.cpp b/ydb/core/tx/datashard/execute_distributed_erase_tx_unit.cpp
index 3c9a86e06b4..9b62c04a72c 100644
--- a/ydb/core/tx/datashard/execute_distributed_erase_tx_unit.cpp
+++ b/ydb/core/tx/datashard/execute_distributed_erase_tx_unit.cpp
@@ -54,12 +54,7 @@ public:
return EExecutionStatus::Restart;
}
- if (auto changes = changeCollector->GetCollected()) {
- op->ChangeRecords().reserve(changes.size());
- for (const auto& change : changes) {
- op->ChangeRecords().emplace_back(change.Order(), change.PathId(), change.BodySize());
- }
- }
+ op->ChangeRecords() = std::move(changeCollector->GetCollected());
} else if (eraseTx->HasDependencies()) {
THashMap<ui64, TDynBitMap> presentRows;
for (const auto& dependency : eraseTx->GetDependencies()) {
diff --git a/ydb/core/tx/datashard/execute_kqp_data_tx_unit.cpp b/ydb/core/tx/datashard/execute_kqp_data_tx_unit.cpp
index 946e035f821..16e109e6b9e 100644
--- a/ydb/core/tx/datashard/execute_kqp_data_tx_unit.cpp
+++ b/ydb/core/tx/datashard/execute_kqp_data_tx_unit.cpp
@@ -156,12 +156,7 @@ EExecutionStatus TExecuteKqpDataTxUnit::Execute(TOperation::TPtr op, TTransactio
DataShard.PromoteCompleteEdge(writeVersion.Step, txc);
}
- if (auto changes = dataTx->GetCollectedChanges()) {
- op->ChangeRecords().reserve(changes.size());
- for (const auto& change : changes) {
- op->ChangeRecords().emplace_back(change.Order(), change.PathId(), change.BodySize());
- }
- }
+ op->ChangeRecords() = std::move(dataTx->GetCollectedChanges());
KqpUpdateDataShardStatCounters(DataShard, dataTx->GetCounters());
auto statsMode = kqpTx.GetRuntimeSettings().GetStatsMode();
diff --git a/ydb/core/tx/datashard/finalize_build_index_unit.cpp b/ydb/core/tx/datashard/finalize_build_index_unit.cpp
index 6f8a17ec503..778676ec984 100644
--- a/ydb/core/tx/datashard/finalize_build_index_unit.cpp
+++ b/ydb/core/tx/datashard/finalize_build_index_unit.cpp
@@ -31,9 +31,16 @@ public:
auto pathId = TPathId(params.GetPathId().GetOwnerId(), params.GetPathId().GetLocalId());
Y_VERIFY(pathId.OwnerId == DataShard.GetPathOwnerId());
- auto tableInfo = DataShard.AlterTableSchemaVersion(ctx, txc, pathId, params.GetTableSchemaVersion());
+ const auto version = params.GetTableSchemaVersion();
+ Y_VERIFY(version);
+
+ auto tableInfo = DataShard.AlterTableSchemaVersion(ctx, txc, pathId, version);
DataShard.AddUserTable(pathId, tableInfo);
+ if (tableInfo->NeedSchemaSnapshots()) {
+ DataShard.AddSchemaSnapshot(pathId, version, op->GetStep(), op->GetTxId(), txc, ctx);
+ }
+
ui64 step = params.GetSnapshotStep();
ui64 txId = params.GetSnapshotTxId();
Y_VERIFY(step != 0);
diff --git a/ydb/core/tx/datashard/initiate_build_index_unit.cpp b/ydb/core/tx/datashard/initiate_build_index_unit.cpp
index f52034aeea6..07e53261e2f 100644
--- a/ydb/core/tx/datashard/initiate_build_index_unit.cpp
+++ b/ydb/core/tx/datashard/initiate_build_index_unit.cpp
@@ -33,6 +33,9 @@ public:
auto pathId = TPathId(params.GetPathId().GetOwnerId(), params.GetPathId().GetLocalId());
Y_VERIFY(pathId.OwnerId == DataShard.GetPathOwnerId());
+ const auto version = params.GetTableSchemaVersion();
+ Y_VERIFY(version);
+
TUserTable::TPtr tableInfo;
if (params.HasIndexDescription()) {
const auto& indexDesc = params.GetIndexDescription();
@@ -44,14 +47,18 @@ public:
));
}
- tableInfo = DataShard.AlterTableAddIndex(ctx, txc, pathId, params.GetTableSchemaVersion(), indexDesc);
+ tableInfo = DataShard.AlterTableAddIndex(ctx, txc, pathId, version, indexDesc);
} else {
- tableInfo = DataShard.AlterTableSchemaVersion(ctx, txc, pathId, params.GetTableSchemaVersion());
+ tableInfo = DataShard.AlterTableSchemaVersion(ctx, txc, pathId, version);
}
Y_VERIFY(tableInfo);
DataShard.AddUserTable(pathId, tableInfo);
+ if (tableInfo->NeedSchemaSnapshots()) {
+ DataShard.AddSchemaSnapshot(pathId, version, op->GetStep(), op->GetTxId(), txc, ctx);
+ }
+
ui64 step = tx->GetStep();
ui64 txId = tx->GetTxId();
Y_VERIFY(step != 0);
diff --git a/ydb/core/tx/datashard/move_table_unit.cpp b/ydb/core/tx/datashard/move_table_unit.cpp
index 975cb115482..ad4becbe1a5 100644
--- a/ydb/core/tx/datashard/move_table_unit.cpp
+++ b/ydb/core/tx/datashard/move_table_unit.cpp
@@ -15,13 +15,13 @@ public:
return true;
}
- void MoveChangeRecords(NIceDb::TNiceDb& db, const NKikimrTxDataShard::TMoveTable& move, TVector<TEvChangeExchange::TEvEnqueueRecords::TRecordInfo>& changeRecords) {
+ void MoveChangeRecords(NIceDb::TNiceDb& db, const NKikimrTxDataShard::TMoveTable& move, TVector<NMiniKQL::IChangeCollector::TChange>& changeRecords) {
const THashMap<TPathId, TPathId> remap = DataShard.GetRemapIndexes(move);
for (auto& record: changeRecords) {
- if (remap.contains(record.PathId)) { // here could be the records for already deleted indexes, so skip them
- record.PathId = remap.at(record.PathId);
- DataShard.MoveChangeRecord(db, record.Order, record.PathId);
+ if (remap.contains(record.PathId())) { // here could be the records for already deleted indexes, so skip them
+ record.SetPathId(remap.at(record.PathId()));
+ DataShard.MoveChangeRecord(db, record.Order(), record.PathId());
}
}
}
@@ -42,7 +42,7 @@ public:
}
NIceDb::TNiceDb db(txc.DB);
- TVector<TEvChangeExchange::TEvEnqueueRecords::TRecordInfo> changeRecords;
+ TVector<NMiniKQL::IChangeCollector::TChange> changeRecords;
if (!DataShard.LoadChangeRecords(db, changeRecords)) {
return EExecutionStatus::Restart;
}
@@ -56,7 +56,7 @@ public:
DataShard.KillChangeSender(ctx);
- DataShard.MoveUserTable(ctx, txc, params);
+ DataShard.MoveUserTable(op, params, ctx, txc);
DataShard.CreateChangeSender(ctx);
MoveChangeRecords(db, params, changeRecords);
diff --git a/ydb/core/tx/datashard/operation.h b/ydb/core/tx/datashard/operation.h
index 0f4b713e91c..3d330806c72 100644
--- a/ydb/core/tx/datashard/operation.h
+++ b/ydb/core/tx/datashard/operation.h
@@ -5,10 +5,10 @@
#include "datashard_locks.h"
#include "datashard_outreadset.h"
#include "datashard_snapshots.h"
-#include "change_exchange.h"
#include "execution_unit_kind.h"
#include <ydb/core/engine/mkql_engine_flat.h>
+#include <ydb/core/engine/minikql/change_collector_iface.h>
#include <ydb/core/protos/tx_datashard.pb.h>
#include <ydb/core/tablet_flat/tablet_flat_executor.h>
#include <ydb/core/tx/balance_coverage/balance_coverage_builder.h>
@@ -435,7 +435,7 @@ struct TOutputOpData {
using TResultPtr = THolder<TEvDataShard::TEvProposeTransactionResult>;
using TDelayedAcks = TVector<THolder<IEventHandle>>;
using TOutReadSets = TMap<std::pair<ui64, ui64>, TString>; // source:target -> body
- using TChangeRecord = TEvChangeExchange::TEvEnqueueRecords::TRecordInfo;
+ using TChangeRecord = NMiniKQL::IChangeCollector::TChange;
TResultPtr Result;
// ACKs to send on successful operation completion.
diff --git a/ydb/core/tx/datashard/snapshot_key.h b/ydb/core/tx/datashard/snapshot_key.h
index 7a742a6e8aa..63c1ea44d77 100644
--- a/ydb/core/tx/datashard/snapshot_key.h
+++ b/ydb/core/tx/datashard/snapshot_key.h
@@ -1,5 +1,7 @@
#pragma once
+#include <ydb/core/base/pathid.h>
+
#include <util/generic/fwd.h>
#include <tuple>
@@ -90,6 +92,10 @@ struct TSchemaSnapshotKey
, Version(version)
{ }
+ TSchemaSnapshotKey(const TPathId& pathId, ui64 version)
+ : TSchemaSnapshotKey(pathId.OwnerId, pathId.LocalPathId, version)
+ { }
+
auto ToTuple() const {
return std::tuple_cat(TSnapshotTableKey::ToTuple(), std::make_tuple(Version));
}
diff --git a/ydb/core/tx/datashard/ya.make b/ydb/core/tx/datashard/ya.make
index 9c98e8e79b0..08264b072ae 100644
--- a/ydb/core/tx/datashard/ya.make
+++ b/ydb/core/tx/datashard/ya.make
@@ -207,6 +207,9 @@ PEERDIR(
library/cpp/actors/core
library/cpp/containers/flat_hash
library/cpp/html/pcdata
+ library/cpp/json
+ library/cpp/json/yson
+ library/cpp/string_utils/base64
library/cpp/string_utils/quote
ydb/core/actorlib_impl
ydb/core/base
diff --git a/ydb/tests/functional/scheme_tests/canondata/tablet_scheme_tests.TestTabletSchemes.test_tablet_schemes_flat_datashard_/flat_datashard.schema b/ydb/tests/functional/scheme_tests/canondata/tablet_scheme_tests.TestTabletSchemes.test_tablet_schemes_flat_datashard_/flat_datashard.schema
index 90bf68adbef..05d887b602f 100644
--- a/ydb/tests/functional/scheme_tests/canondata/tablet_scheme_tests.TestTabletSchemes.test_tablet_schemes_flat_datashard_/flat_datashard.schema
+++ b/ydb/tests/functional/scheme_tests/canondata/tablet_scheme_tests.TestTabletSchemes.test_tablet_schemes_flat_datashard_/flat_datashard.schema
@@ -1010,6 +1010,16 @@
"ColumnId": 8,
"ColumnName": "SchemaVersion",
"ColumnType": "Uint64"
+ },
+ {
+ "ColumnId": 9,
+ "ColumnName": "TableOwnerId",
+ "ColumnType": "Uint64"
+ },
+ {
+ "ColumnId": 10,
+ "ColumnName": "TablePathId",
+ "ColumnType": "Uint64"
}
],
"ColumnsDropped": [],
@@ -1023,7 +1033,9 @@
5,
6,
7,
- 8
+ 8,
+ 9,
+ 10
],
"RoomID": 0,
"Codec": 0,