diff options
author | Ilnaz Nizametdinov <i.nizametdinov@gmail.com> | 2022-04-06 20:20:06 +0300 |
---|---|---|
committer | Ilnaz Nizametdinov <i.nizametdinov@gmail.com> | 2022-04-06 20:20:06 +0300 |
commit | ce210dfe8be9cc3d2fd48804db67956ae0ff92de (patch) | |
tree | 06b2e467202a49b1a176268bed3c77ba7163afbb | |
parent | 4bd02dd05787932a6a10f1df1acacb7250662ddb (diff) | |
download | ydb-ce210dfe8be9cc3d2fd48804db67956ae0ff92de.tar.gz |
Store key next to the value, md5 partition key KIKIMR-13698
ref:d0c23c4bc795b5e59cfcf4d64cd6363397c383a6
-rw-r--r-- | ydb/core/tx/datashard/change_record.cpp | 21 | ||||
-rw-r--r-- | ydb/core/tx/datashard/change_record.h | 2 | ||||
-rw-r--r-- | ydb/core/tx/datashard/change_sender_cdc_stream.cpp | 26 | ||||
-rw-r--r-- | ydb/core/tx/datashard/datashard_ut_change_exchange.cpp | 77 |
4 files changed, 77 insertions, 49 deletions
diff --git a/ydb/core/tx/datashard/change_record.cpp b/ydb/core/tx/datashard/change_record.cpp index 6f0f93e9f52..2a7db10a626 100644 --- a/ydb/core/tx/datashard/change_record.cpp +++ b/ydb/core/tx/datashard/change_record.cpp @@ -1,6 +1,7 @@ #include "change_record.h" #include "export_common.h" +#include <library/cpp/digest/md5/md5.h> #include <library/cpp/json/json_reader.h> #include <library/cpp/json/json_writer.h> #include <library/cpp/json/yson/json2yson.h> @@ -142,7 +143,7 @@ static void SerializeJsonValue(TUserTable::TCPtr schema, NJson::TJsonValue& valu } } -void TChangeRecord::SerializeTo(NJson::TJsonValue& key, NJson::TJsonValue& value) const { +void TChangeRecord::SerializeTo(NJson::TJsonValue& json) const { switch (Kind) { case EKind::CdcDataChange: { Y_VERIFY(Schema); @@ -150,32 +151,32 @@ void TChangeRecord::SerializeTo(NJson::TJsonValue& key, NJson::TJsonValue& value NKikimrChangeExchange::TChangeRecord::TDataChange body; Y_VERIFY(body.ParseFromArray(Body.data(), Body.size())); - SerializeJsonKey(Schema, key, body.GetKey()); + SerializeJsonKey(Schema, json["key"], body.GetKey()); if (body.HasOldImage()) { - SerializeJsonValue(Schema, value["oldImage"], body.GetOldImage()); + SerializeJsonValue(Schema, json["oldImage"], body.GetOldImage()); } if (body.HasNewImage()) { - SerializeJsonValue(Schema, value["newImage"], body.GetNewImage()); + SerializeJsonValue(Schema, json["newImage"], body.GetNewImage()); } const auto hasAnyImage = body.HasOldImage() || body.HasNewImage(); switch (body.GetRowOperationCase()) { case NKikimrChangeExchange::TChangeRecord::TDataChange::kUpsert: - value["update"].SetType(NJson::EJsonValueType::JSON_MAP); + json["update"].SetType(NJson::EJsonValueType::JSON_MAP); if (!hasAnyImage) { - SerializeJsonValue(Schema, value["update"], body.GetUpsert()); + SerializeJsonValue(Schema, json["update"], body.GetUpsert()); } break; case NKikimrChangeExchange::TChangeRecord::TDataChange::kReset: - value["reset"].SetType(NJson::EJsonValueType::JSON_MAP); + json["reset"].SetType(NJson::EJsonValueType::JSON_MAP); if (!hasAnyImage) { - SerializeJsonValue(Schema, value["reset"], body.GetReset()); + SerializeJsonValue(Schema, json["reset"], body.GetReset()); } break; case NKikimrChangeExchange::TChangeRecord::TDataChange::kErase: - value["erase"].SetType(NJson::EJsonValueType::JSON_MAP); + json["erase"].SetType(NJson::EJsonValueType::JSON_MAP); break; default: Y_FAIL_S("Unexpected row operation: " << static_cast<int>(body.GetRowOperationCase())); @@ -233,7 +234,7 @@ TString TChangeRecord::GetPartitionKey() const { NJson::TJsonValue key; SerializeJsonKey(Schema, key, body.GetKey()); - PartitionKey.ConstructInPlace(WriteJson(key, false)); + PartitionKey.ConstructInPlace(MD5::Calc(WriteJson(key, false))); break; } diff --git a/ydb/core/tx/datashard/change_record.h b/ydb/core/tx/datashard/change_record.h index 1ee4259f1b9..db7d920ddbf 100644 --- a/ydb/core/tx/datashard/change_record.h +++ b/ydb/core/tx/datashard/change_record.h @@ -41,7 +41,7 @@ public: ui64 GetSchemaVersion() const { return SchemaVersion; } void SerializeTo(NKikimrChangeExchange::TChangeRecord& record) const; - void SerializeTo(NJson::TJsonValue& key, NJson::TJsonValue& value) const; + void SerializeTo(NJson::TJsonValue& json) const; TConstArrayRef<TCell> GetKey() const; i64 GetSeqNo() const; diff --git a/ydb/core/tx/datashard/change_sender_cdc_stream.cpp b/ydb/core/tx/datashard/change_sender_cdc_stream.cpp index a526fd7dbe2..301722c108b 100644 --- a/ydb/core/tx/datashard/change_sender_cdc_stream.cpp +++ b/ydb/core/tx/datashard/change_sender_cdc_stream.cpp @@ -5,7 +5,6 @@ #include <library/cpp/actors/core/actor_bootstrapped.h> #include <library/cpp/actors/core/hfunc.h> #include <library/cpp/actors/core/log.h> -#include <library/cpp/digest/md5/md5.h> #include <library/cpp/json/json_writer.h> #include <ydb/core/persqueue/partition_key_range/partition_key_range.h> @@ -20,8 +19,6 @@ namespace NDataShard { using namespace NPQ; class TCdcChangeSenderPartition: public TActorBootstrapped<TCdcChangeSenderPartition> { - static constexpr auto CodecRaw = 0; - TStringBuf GetLogPrefix() const { if (!LogPrefix) { LogPrefix = TStringBuilder() @@ -92,33 +89,27 @@ class TCdcChangeSenderPartition: public TActorBootstrapped<TCdcChangeSenderParti ? TInstant::FromValue(record.GetGroup()) : TInstant::MilliSeconds(record.GetStep()); - NKikimrPQClient::TDataChunk data; - data.SetSeqNo(record.GetSeqNo()); - data.SetCreateTime(createdAt.MilliSeconds()); - data.SetCodec(CodecRaw); - // TODO: meta? - auto& cmd = *request.MutablePartitionRequest()->AddCmdWrite(); cmd.SetSeqNo(record.GetSeqNo()); cmd.SetSourceId(NSourceIdEncoding::EncodeSimple(SourceId)); cmd.SetCreateTimeMS(createdAt.MilliSeconds()); + NKikimrPQClient::TDataChunk data; + data.SetCodec(0 /* CODEC_RAW */); + switch (Format) { case NKikimrSchemeOp::ECdcStreamFormatProto: { NKikimrChangeExchange::TChangeRecord protoRecord; record.SerializeTo(protoRecord); data.SetData(protoRecord.SerializeAsString()); - cmd.SetData(data.SerializeAsString()); break; } case NKikimrSchemeOp::ECdcStreamFormatJson: { - NJson::TJsonValue key; - NJson::TJsonValue value; - record.SerializeTo(key, value); - data.SetData(WriteJson(value, false)); - cmd.SetData(data.SerializeAsString()); - cmd.SetPartitionKey(WriteJson(key, false)); + NJson::TJsonValue json; + record.SerializeTo(json); + data.SetData(WriteJson(json, false)); + cmd.SetPartitionKey(record.GetPartitionKey()); break; } @@ -129,6 +120,7 @@ class TCdcChangeSenderPartition: public TActorBootstrapped<TCdcChangeSenderParti } } + cmd.SetData(data.SerializeAsString()); Pending.push_back(record.GetSeqNo()); } @@ -646,7 +638,7 @@ class TCdcChangeSenderMain: public TActorBootstrapped<TCdcChangeSenderMain> case NKikimrSchemeOp::ECdcStreamFormatJson: { using namespace NKikimr::NDataStreams::V1; - const auto hashKey = HexBytesToDecimal(MD5::Calc(record.GetPartitionKey())); + const auto hashKey = HexBytesToDecimal(record.GetPartitionKey() /* MD5 */); return ShardFromDecimal(hashKey, KeyDesc->Partitions.size()); } diff --git a/ydb/core/tx/datashard/datashard_ut_change_exchange.cpp b/ydb/core/tx/datashard/datashard_ut_change_exchange.cpp index 0400b7d4159..74aa9ac3322 100644 --- a/ydb/core/tx/datashard/datashard_ut_change_exchange.cpp +++ b/ydb/core/tx/datashard/datashard_ut_change_exchange.cpp @@ -1,9 +1,14 @@ #include "datashard_ut_common.h" +#include <library/cpp/digest/md5/md5.h> +#include <library/cpp/json/json_reader.h> + #include <ydb/core/base/path.h> #include <ydb/public/sdk/cpp/client/ydb_datastreams/datastreams.h> #include <ydb/public/sdk/cpp/client/ydb_persqueue_public/persqueue.h> +#include <util/generic/size_literals.h> +#include <util/string/printf.h> #include <util/string/strip.h> namespace NKikimr { @@ -699,9 +704,20 @@ Y_UNIT_TEST_SUITE(Cdc) { }; } + TString CalcPartitionKey(const TString& data) { + NJson::TJsonValue json; + UNIT_ASSERT(NJson::ReadJsonTree(data, &json)); + + NJson::TJsonValue::TMapType root; + UNIT_ASSERT(json.GetMap(&root)); + + UNIT_ASSERT(root.contains("key")); + return MD5::Calc(root.at("key").GetStringRobust()); + } + struct PqRunner { static void Read(const TShardedTableOptions& tableDesc, const TCdcStream& streamDesc, - const TVector<TString>& queries, const TVector<std::pair<TString, TString>>& records) + const TVector<TString>& queries, const TVector<TString>& records) { TTestPqEnv env(tableDesc, streamDesc); @@ -733,8 +749,8 @@ Y_UNIT_TEST_SUITE(Cdc) { if (auto* data = std::get_if<TReadSessionEvent::TDataReceivedEvent>(&*ev)) { for (const auto& item : data->GetMessages()) { const auto& record = records.at(reads++); - UNIT_ASSERT_VALUES_EQUAL(record.first, item.GetPartitionKey()); - UNIT_ASSERT_VALUES_EQUAL(record.second, item.GetData()); + UNIT_ASSERT_VALUES_EQUAL(record, item.GetData()); + UNIT_ASSERT_VALUES_EQUAL(CalcPartitionKey(record), item.GetPartitionKey()); } } else if (auto* create = std::get_if<TReadSessionEvent::TCreatePartitionStreamEvent>(&*ev)) { create->Confirm(); @@ -778,7 +794,7 @@ Y_UNIT_TEST_SUITE(Cdc) { struct YdsRunner { static void Read(const TShardedTableOptions& tableDesc, const TCdcStream& streamDesc, - const TVector<TString>& queries, const TVector<std::pair<TString, TString>>& records) + const TVector<TString>& queries, const TVector<TString>& records) { TTestYdsEnv env(tableDesc, streamDesc); @@ -829,8 +845,8 @@ Y_UNIT_TEST_SUITE(Cdc) { for (ui32 i = 0; i < records.size(); ++i) { const auto& actual = res.GetResult().records().at(i); const auto& expected = records.at(i); - UNIT_ASSERT_VALUES_EQUAL(actual.partition_key(), expected.first); - UNIT_ASSERT_VALUES_EQUAL(actual.data(), expected.second); + UNIT_ASSERT_VALUES_EQUAL(actual.data(), expected); + UNIT_ASSERT_VALUES_EQUAL(actual.partition_key(), CalcPartitionKey(expected)); } } @@ -877,10 +893,10 @@ Y_UNIT_TEST_SUITE(Cdc) { )", R"( DELETE FROM `/Root/Table` WHERE key = 1; )"}, { - {"[1]", R"({"update":{}})"}, - {"[2]", R"({"update":{}})"}, - {"[3]", R"({"update":{}})"}, - {"[1]", R"({"erase":{}})"}, + R"({"update":{},"key":[1]})", + R"({"update":{},"key":[2]})", + R"({"update":{},"key":[3]})", + R"({"erase":{},"key":[1]})", }); } @@ -893,10 +909,10 @@ Y_UNIT_TEST_SUITE(Cdc) { )", R"( DELETE FROM `/Root/Table` WHERE key = 1; )"}, { - {"[1]", R"({"update":{"value":10}})"}, - {"[2]", R"({"update":{"value":20}})"}, - {"[3]", R"({"update":{"value":30}})"}, - {"[1]", R"({"erase":{}})"}, + R"({"update":{"value":10},"key":[1]})", + R"({"update":{"value":20},"key":[2]})", + R"({"update":{"value":30},"key":[3]})", + R"({"erase":{},"key":[1]})", }); } @@ -914,13 +930,32 @@ Y_UNIT_TEST_SUITE(Cdc) { )", R"( DELETE FROM `/Root/Table` WHERE key = 1; )"}, { - {"[1]", R"({"update":{},"newImage":{"value":10}})"}, - {"[2]", R"({"update":{},"newImage":{"value":20}})"}, - {"[3]", R"({"update":{},"newImage":{"value":30}})"}, - {"[1]", R"({"update":{},"newImage":{"value":100},"oldImage":{"value":10}})"}, - {"[2]", R"({"update":{},"newImage":{"value":200},"oldImage":{"value":20}})"}, - {"[3]", R"({"update":{},"newImage":{"value":300},"oldImage":{"value":30}})"}, - {"[1]", R"({"erase":{},"oldImage":{"value":100}})"}, + R"({"update":{},"newImage":{"value":10},"key":[1]})", + R"({"update":{},"newImage":{"value":20},"key":[2]})", + R"({"update":{},"newImage":{"value":30},"key":[3]})", + R"({"update":{},"newImage":{"value":100},"key":[1],"oldImage":{"value":10}})", + R"({"update":{},"newImage":{"value":200},"key":[2],"oldImage":{"value":20}})", + R"({"update":{},"newImage":{"value":300},"key":[3],"oldImage":{"value":30}})", + R"({"erase":{},"key":[1],"oldImage":{"value":100}})", + }); + } + + TShardedTableOptions Utf8Table() { + return TShardedTableOptions() + .Columns({ + {"key", "Utf8", true, false}, + {"value", "Uint32", false, false}, + }); + } + + Y_UNIT_TEST_TWIN(HugeKey, PqRunner, YdsRunner) { + const auto key = TString(512_KB, 'A'); + + TRunner::Read(Utf8Table(), KeysOnly(NKikimrSchemeOp::ECdcStreamFormatJson), {Sprintf(R"( + UPSERT INTO `/Root/Table` (key, value) VALUES + ("%s", 1); + )", key.c_str())}, { + Sprintf(R"({"update":{},"key":["%s"]})", key.c_str()), }); } |