aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorIlnaz Nizametdinov <i.nizametdinov@gmail.com>2022-04-06 20:20:06 +0300
committerIlnaz Nizametdinov <i.nizametdinov@gmail.com>2022-04-06 20:20:06 +0300
commitce210dfe8be9cc3d2fd48804db67956ae0ff92de (patch)
tree06b2e467202a49b1a176268bed3c77ba7163afbb
parent4bd02dd05787932a6a10f1df1acacb7250662ddb (diff)
downloadydb-ce210dfe8be9cc3d2fd48804db67956ae0ff92de.tar.gz
Store key next to the value, md5 partition key KIKIMR-13698
ref:d0c23c4bc795b5e59cfcf4d64cd6363397c383a6
-rw-r--r--ydb/core/tx/datashard/change_record.cpp21
-rw-r--r--ydb/core/tx/datashard/change_record.h2
-rw-r--r--ydb/core/tx/datashard/change_sender_cdc_stream.cpp26
-rw-r--r--ydb/core/tx/datashard/datashard_ut_change_exchange.cpp77
4 files changed, 77 insertions, 49 deletions
diff --git a/ydb/core/tx/datashard/change_record.cpp b/ydb/core/tx/datashard/change_record.cpp
index 6f0f93e9f52..2a7db10a626 100644
--- a/ydb/core/tx/datashard/change_record.cpp
+++ b/ydb/core/tx/datashard/change_record.cpp
@@ -1,6 +1,7 @@
#include "change_record.h"
#include "export_common.h"
+#include <library/cpp/digest/md5/md5.h>
#include <library/cpp/json/json_reader.h>
#include <library/cpp/json/json_writer.h>
#include <library/cpp/json/yson/json2yson.h>
@@ -142,7 +143,7 @@ static void SerializeJsonValue(TUserTable::TCPtr schema, NJson::TJsonValue& valu
}
}
-void TChangeRecord::SerializeTo(NJson::TJsonValue& key, NJson::TJsonValue& value) const {
+void TChangeRecord::SerializeTo(NJson::TJsonValue& json) const {
switch (Kind) {
case EKind::CdcDataChange: {
Y_VERIFY(Schema);
@@ -150,32 +151,32 @@ void TChangeRecord::SerializeTo(NJson::TJsonValue& key, NJson::TJsonValue& value
NKikimrChangeExchange::TChangeRecord::TDataChange body;
Y_VERIFY(body.ParseFromArray(Body.data(), Body.size()));
- SerializeJsonKey(Schema, key, body.GetKey());
+ SerializeJsonKey(Schema, json["key"], body.GetKey());
if (body.HasOldImage()) {
- SerializeJsonValue(Schema, value["oldImage"], body.GetOldImage());
+ SerializeJsonValue(Schema, json["oldImage"], body.GetOldImage());
}
if (body.HasNewImage()) {
- SerializeJsonValue(Schema, value["newImage"], body.GetNewImage());
+ SerializeJsonValue(Schema, json["newImage"], body.GetNewImage());
}
const auto hasAnyImage = body.HasOldImage() || body.HasNewImage();
switch (body.GetRowOperationCase()) {
case NKikimrChangeExchange::TChangeRecord::TDataChange::kUpsert:
- value["update"].SetType(NJson::EJsonValueType::JSON_MAP);
+ json["update"].SetType(NJson::EJsonValueType::JSON_MAP);
if (!hasAnyImage) {
- SerializeJsonValue(Schema, value["update"], body.GetUpsert());
+ SerializeJsonValue(Schema, json["update"], body.GetUpsert());
}
break;
case NKikimrChangeExchange::TChangeRecord::TDataChange::kReset:
- value["reset"].SetType(NJson::EJsonValueType::JSON_MAP);
+ json["reset"].SetType(NJson::EJsonValueType::JSON_MAP);
if (!hasAnyImage) {
- SerializeJsonValue(Schema, value["reset"], body.GetReset());
+ SerializeJsonValue(Schema, json["reset"], body.GetReset());
}
break;
case NKikimrChangeExchange::TChangeRecord::TDataChange::kErase:
- value["erase"].SetType(NJson::EJsonValueType::JSON_MAP);
+ json["erase"].SetType(NJson::EJsonValueType::JSON_MAP);
break;
default:
Y_FAIL_S("Unexpected row operation: " << static_cast<int>(body.GetRowOperationCase()));
@@ -233,7 +234,7 @@ TString TChangeRecord::GetPartitionKey() const {
NJson::TJsonValue key;
SerializeJsonKey(Schema, key, body.GetKey());
- PartitionKey.ConstructInPlace(WriteJson(key, false));
+ PartitionKey.ConstructInPlace(MD5::Calc(WriteJson(key, false)));
break;
}
diff --git a/ydb/core/tx/datashard/change_record.h b/ydb/core/tx/datashard/change_record.h
index 1ee4259f1b9..db7d920ddbf 100644
--- a/ydb/core/tx/datashard/change_record.h
+++ b/ydb/core/tx/datashard/change_record.h
@@ -41,7 +41,7 @@ public:
ui64 GetSchemaVersion() const { return SchemaVersion; }
void SerializeTo(NKikimrChangeExchange::TChangeRecord& record) const;
- void SerializeTo(NJson::TJsonValue& key, NJson::TJsonValue& value) const;
+ void SerializeTo(NJson::TJsonValue& json) const;
TConstArrayRef<TCell> GetKey() const;
i64 GetSeqNo() const;
diff --git a/ydb/core/tx/datashard/change_sender_cdc_stream.cpp b/ydb/core/tx/datashard/change_sender_cdc_stream.cpp
index a526fd7dbe2..301722c108b 100644
--- a/ydb/core/tx/datashard/change_sender_cdc_stream.cpp
+++ b/ydb/core/tx/datashard/change_sender_cdc_stream.cpp
@@ -5,7 +5,6 @@
#include <library/cpp/actors/core/actor_bootstrapped.h>
#include <library/cpp/actors/core/hfunc.h>
#include <library/cpp/actors/core/log.h>
-#include <library/cpp/digest/md5/md5.h>
#include <library/cpp/json/json_writer.h>
#include <ydb/core/persqueue/partition_key_range/partition_key_range.h>
@@ -20,8 +19,6 @@ namespace NDataShard {
using namespace NPQ;
class TCdcChangeSenderPartition: public TActorBootstrapped<TCdcChangeSenderPartition> {
- static constexpr auto CodecRaw = 0;
-
TStringBuf GetLogPrefix() const {
if (!LogPrefix) {
LogPrefix = TStringBuilder()
@@ -92,33 +89,27 @@ class TCdcChangeSenderPartition: public TActorBootstrapped<TCdcChangeSenderParti
? TInstant::FromValue(record.GetGroup())
: TInstant::MilliSeconds(record.GetStep());
- NKikimrPQClient::TDataChunk data;
- data.SetSeqNo(record.GetSeqNo());
- data.SetCreateTime(createdAt.MilliSeconds());
- data.SetCodec(CodecRaw);
- // TODO: meta?
-
auto& cmd = *request.MutablePartitionRequest()->AddCmdWrite();
cmd.SetSeqNo(record.GetSeqNo());
cmd.SetSourceId(NSourceIdEncoding::EncodeSimple(SourceId));
cmd.SetCreateTimeMS(createdAt.MilliSeconds());
+ NKikimrPQClient::TDataChunk data;
+ data.SetCodec(0 /* CODEC_RAW */);
+
switch (Format) {
case NKikimrSchemeOp::ECdcStreamFormatProto: {
NKikimrChangeExchange::TChangeRecord protoRecord;
record.SerializeTo(protoRecord);
data.SetData(protoRecord.SerializeAsString());
- cmd.SetData(data.SerializeAsString());
break;
}
case NKikimrSchemeOp::ECdcStreamFormatJson: {
- NJson::TJsonValue key;
- NJson::TJsonValue value;
- record.SerializeTo(key, value);
- data.SetData(WriteJson(value, false));
- cmd.SetData(data.SerializeAsString());
- cmd.SetPartitionKey(WriteJson(key, false));
+ NJson::TJsonValue json;
+ record.SerializeTo(json);
+ data.SetData(WriteJson(json, false));
+ cmd.SetPartitionKey(record.GetPartitionKey());
break;
}
@@ -129,6 +120,7 @@ class TCdcChangeSenderPartition: public TActorBootstrapped<TCdcChangeSenderParti
}
}
+ cmd.SetData(data.SerializeAsString());
Pending.push_back(record.GetSeqNo());
}
@@ -646,7 +638,7 @@ class TCdcChangeSenderMain: public TActorBootstrapped<TCdcChangeSenderMain>
case NKikimrSchemeOp::ECdcStreamFormatJson: {
using namespace NKikimr::NDataStreams::V1;
- const auto hashKey = HexBytesToDecimal(MD5::Calc(record.GetPartitionKey()));
+ const auto hashKey = HexBytesToDecimal(record.GetPartitionKey() /* MD5 */);
return ShardFromDecimal(hashKey, KeyDesc->Partitions.size());
}
diff --git a/ydb/core/tx/datashard/datashard_ut_change_exchange.cpp b/ydb/core/tx/datashard/datashard_ut_change_exchange.cpp
index 0400b7d4159..74aa9ac3322 100644
--- a/ydb/core/tx/datashard/datashard_ut_change_exchange.cpp
+++ b/ydb/core/tx/datashard/datashard_ut_change_exchange.cpp
@@ -1,9 +1,14 @@
#include "datashard_ut_common.h"
+#include <library/cpp/digest/md5/md5.h>
+#include <library/cpp/json/json_reader.h>
+
#include <ydb/core/base/path.h>
#include <ydb/public/sdk/cpp/client/ydb_datastreams/datastreams.h>
#include <ydb/public/sdk/cpp/client/ydb_persqueue_public/persqueue.h>
+#include <util/generic/size_literals.h>
+#include <util/string/printf.h>
#include <util/string/strip.h>
namespace NKikimr {
@@ -699,9 +704,20 @@ Y_UNIT_TEST_SUITE(Cdc) {
};
}
+ TString CalcPartitionKey(const TString& data) {
+ NJson::TJsonValue json;
+ UNIT_ASSERT(NJson::ReadJsonTree(data, &json));
+
+ NJson::TJsonValue::TMapType root;
+ UNIT_ASSERT(json.GetMap(&root));
+
+ UNIT_ASSERT(root.contains("key"));
+ return MD5::Calc(root.at("key").GetStringRobust());
+ }
+
struct PqRunner {
static void Read(const TShardedTableOptions& tableDesc, const TCdcStream& streamDesc,
- const TVector<TString>& queries, const TVector<std::pair<TString, TString>>& records)
+ const TVector<TString>& queries, const TVector<TString>& records)
{
TTestPqEnv env(tableDesc, streamDesc);
@@ -733,8 +749,8 @@ Y_UNIT_TEST_SUITE(Cdc) {
if (auto* data = std::get_if<TReadSessionEvent::TDataReceivedEvent>(&*ev)) {
for (const auto& item : data->GetMessages()) {
const auto& record = records.at(reads++);
- UNIT_ASSERT_VALUES_EQUAL(record.first, item.GetPartitionKey());
- UNIT_ASSERT_VALUES_EQUAL(record.second, item.GetData());
+ UNIT_ASSERT_VALUES_EQUAL(record, item.GetData());
+ UNIT_ASSERT_VALUES_EQUAL(CalcPartitionKey(record), item.GetPartitionKey());
}
} else if (auto* create = std::get_if<TReadSessionEvent::TCreatePartitionStreamEvent>(&*ev)) {
create->Confirm();
@@ -778,7 +794,7 @@ Y_UNIT_TEST_SUITE(Cdc) {
struct YdsRunner {
static void Read(const TShardedTableOptions& tableDesc, const TCdcStream& streamDesc,
- const TVector<TString>& queries, const TVector<std::pair<TString, TString>>& records)
+ const TVector<TString>& queries, const TVector<TString>& records)
{
TTestYdsEnv env(tableDesc, streamDesc);
@@ -829,8 +845,8 @@ Y_UNIT_TEST_SUITE(Cdc) {
for (ui32 i = 0; i < records.size(); ++i) {
const auto& actual = res.GetResult().records().at(i);
const auto& expected = records.at(i);
- UNIT_ASSERT_VALUES_EQUAL(actual.partition_key(), expected.first);
- UNIT_ASSERT_VALUES_EQUAL(actual.data(), expected.second);
+ UNIT_ASSERT_VALUES_EQUAL(actual.data(), expected);
+ UNIT_ASSERT_VALUES_EQUAL(actual.partition_key(), CalcPartitionKey(expected));
}
}
@@ -877,10 +893,10 @@ Y_UNIT_TEST_SUITE(Cdc) {
)", R"(
DELETE FROM `/Root/Table` WHERE key = 1;
)"}, {
- {"[1]", R"({"update":{}})"},
- {"[2]", R"({"update":{}})"},
- {"[3]", R"({"update":{}})"},
- {"[1]", R"({"erase":{}})"},
+ R"({"update":{},"key":[1]})",
+ R"({"update":{},"key":[2]})",
+ R"({"update":{},"key":[3]})",
+ R"({"erase":{},"key":[1]})",
});
}
@@ -893,10 +909,10 @@ Y_UNIT_TEST_SUITE(Cdc) {
)", R"(
DELETE FROM `/Root/Table` WHERE key = 1;
)"}, {
- {"[1]", R"({"update":{"value":10}})"},
- {"[2]", R"({"update":{"value":20}})"},
- {"[3]", R"({"update":{"value":30}})"},
- {"[1]", R"({"erase":{}})"},
+ R"({"update":{"value":10},"key":[1]})",
+ R"({"update":{"value":20},"key":[2]})",
+ R"({"update":{"value":30},"key":[3]})",
+ R"({"erase":{},"key":[1]})",
});
}
@@ -914,13 +930,32 @@ Y_UNIT_TEST_SUITE(Cdc) {
)", R"(
DELETE FROM `/Root/Table` WHERE key = 1;
)"}, {
- {"[1]", R"({"update":{},"newImage":{"value":10}})"},
- {"[2]", R"({"update":{},"newImage":{"value":20}})"},
- {"[3]", R"({"update":{},"newImage":{"value":30}})"},
- {"[1]", R"({"update":{},"newImage":{"value":100},"oldImage":{"value":10}})"},
- {"[2]", R"({"update":{},"newImage":{"value":200},"oldImage":{"value":20}})"},
- {"[3]", R"({"update":{},"newImage":{"value":300},"oldImage":{"value":30}})"},
- {"[1]", R"({"erase":{},"oldImage":{"value":100}})"},
+ R"({"update":{},"newImage":{"value":10},"key":[1]})",
+ R"({"update":{},"newImage":{"value":20},"key":[2]})",
+ R"({"update":{},"newImage":{"value":30},"key":[3]})",
+ R"({"update":{},"newImage":{"value":100},"key":[1],"oldImage":{"value":10}})",
+ R"({"update":{},"newImage":{"value":200},"key":[2],"oldImage":{"value":20}})",
+ R"({"update":{},"newImage":{"value":300},"key":[3],"oldImage":{"value":30}})",
+ R"({"erase":{},"key":[1],"oldImage":{"value":100}})",
+ });
+ }
+
+ TShardedTableOptions Utf8Table() {
+ return TShardedTableOptions()
+ .Columns({
+ {"key", "Utf8", true, false},
+ {"value", "Uint32", false, false},
+ });
+ }
+
+ Y_UNIT_TEST_TWIN(HugeKey, PqRunner, YdsRunner) {
+ const auto key = TString(512_KB, 'A');
+
+ TRunner::Read(Utf8Table(), KeysOnly(NKikimrSchemeOp::ECdcStreamFormatJson), {Sprintf(R"(
+ UPSERT INTO `/Root/Table` (key, value) VALUES
+ ("%s", 1);
+ )", key.c_str())}, {
+ Sprintf(R"({"update":{},"key":["%s"]})", key.c_str()),
});
}