aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorazevaykin <azevaykin@ydb.tech>2023-12-05 16:09:54 +0300
committerazevaykin <azevaykin@ydb.tech>2023-12-05 17:05:27 +0300
commitdc89a27e3362ea9e929edfab9c317db6c39734f8 (patch)
treeb29f3a6a67488bdac20f29b072bdeb178a265f25
parentccb0ffc593cb44f232a68cf65d08e0bada0cd072 (diff)
downloadydb-dc89a27e3362ea9e929edfab9c317db6c39734f8.tar.gz
TEvWrite constructor + Orbit
-rw-r--r--ydb/core/protos/out/out.cpp8
-rw-r--r--ydb/core/tx/columnshard/ut_rw/ut_columnshard_read_write.cpp6
-rw-r--r--ydb/core/tx/columnshard/ut_rw/ut_normalizer.cpp2
-rw-r--r--ydb/core/tx/data_events/events.h23
4 files changed, 33 insertions, 6 deletions
diff --git a/ydb/core/protos/out/out.cpp b/ydb/core/protos/out/out.cpp
index d3e3db2fb8..f91fb00b09 100644
--- a/ydb/core/protos/out/out.cpp
+++ b/ydb/core/protos/out/out.cpp
@@ -199,3 +199,11 @@ Y_DECLARE_OUT_SPEC(, NKikimrSchemeOp::ECdcStreamState, stream, value) {
Y_DECLARE_OUT_SPEC(, NKikimrSubDomains::EServerlessComputeResourcesMode, stream, value) {
stream << NKikimrSubDomains::EServerlessComputeResourcesMode_Name(value);
}
+
+Y_DECLARE_OUT_SPEC(, NKikimrDataEvents::TEvWriteResult::EStatus, stream, value) {
+ stream << NKikimrDataEvents::TEvWriteResult::EStatus_Name(value);
+}
+
+Y_DECLARE_OUT_SPEC(, NKikimrDataEvents::TEvWrite::ETxMode, stream, value) {
+ stream << NKikimrDataEvents::TEvWrite::ETxMode_Name(value);
+}
diff --git a/ydb/core/tx/columnshard/ut_rw/ut_columnshard_read_write.cpp b/ydb/core/tx/columnshard/ut_rw/ut_columnshard_read_write.cpp
index 6ef68b5867..1dee12340f 100644
--- a/ydb/core/tx/columnshard/ut_rw/ut_columnshard_read_write.cpp
+++ b/ydb/core/tx/columnshard/ut_rw/ut_columnshard_read_write.cpp
@@ -1871,7 +1871,7 @@ Y_UNIT_TEST_SUITE(EvWrite) {
TString blobData = NArrow::SerializeBatchNoCompression(batch);
UNIT_ASSERT(blobData.size() < TLimits::GetMaxBlobSize());
- auto evWrite = std::make_unique<NKikimr::NEvents::TDataEvents::TEvWrite>(txId);
+ auto evWrite = std::make_unique<NKikimr::NEvents::TDataEvents::TEvWrite>(txId, NKikimrDataEvents::TEvWrite::MODE_PREPARE);
ui64 payloadIndex = NEvWrite::TPayloadHelper<NKikimr::NEvents::TDataEvents::TEvWrite>(*evWrite).AddDataToPayload(std::move(blobData));
evWrite->AddOperation(NKikimrDataEvents::TEvWrite::TOperation::OPERATION_REPLACE, tableId, 1, schema, payloadIndex, NKikimrDataEvents::FORMAT_ARROW);
@@ -1916,7 +1916,7 @@ Y_UNIT_TEST_SUITE(EvWrite) {
TString blobData = NArrow::SerializeBatchNoCompression(batch);
UNIT_ASSERT(blobData.size() < TLimits::GetMaxBlobSize());
- auto evWrite = std::make_unique<NKikimr::NEvents::TDataEvents::TEvWrite>(txId);
+ auto evWrite = std::make_unique<NKikimr::NEvents::TDataEvents::TEvWrite>(txId, NKikimrDataEvents::TEvWrite::MODE_PREPARE);
ui64 payloadIndex = NEvWrite::TPayloadHelper<NKikimr::NEvents::TDataEvents::TEvWrite>(*evWrite).AddDataToPayload(std::move(blobData));
evWrite->AddOperation(NKikimrDataEvents::TEvWrite::TOperation::OPERATION_REPLACE, tableId, 1, schema, payloadIndex, NKikimrDataEvents::FORMAT_ARROW);
@@ -1960,7 +1960,7 @@ Y_UNIT_TEST_SUITE(EvWrite) {
TString blobData = NArrow::SerializeBatchNoCompression(batch);
UNIT_ASSERT(blobData.size() > TLimits::GetMaxBlobSize());
- auto evWrite = std::make_unique<NKikimr::NEvents::TDataEvents::TEvWrite>(txId);
+ auto evWrite = std::make_unique<NKikimr::NEvents::TDataEvents::TEvWrite>(txId, NKikimrDataEvents::TEvWrite::MODE_PREPARE);
ui64 payloadIndex = NEvWrite::TPayloadHelper<NKikimr::NEvents::TDataEvents::TEvWrite>(*evWrite).AddDataToPayload(std::move(blobData));
evWrite->AddOperation(NKikimrDataEvents::TEvWrite::TOperation::OPERATION_REPLACE, tableId, 1, schema, payloadIndex, NKikimrDataEvents::FORMAT_ARROW);
diff --git a/ydb/core/tx/columnshard/ut_rw/ut_normalizer.cpp b/ydb/core/tx/columnshard/ut_rw/ut_normalizer.cpp
index 3ee2e00c8b..ce9966b5e7 100644
--- a/ydb/core/tx/columnshard/ut_rw/ut_normalizer.cpp
+++ b/ydb/core/tx/columnshard/ut_rw/ut_normalizer.cpp
@@ -242,7 +242,7 @@ Y_UNIT_TEST_SUITE(Normalizers) {
auto batch = NConstruction::TRecordBatchConstructor({ key1Column, key2Column, column }).BuildBatch(20048);
TString blobData = NArrow::SerializeBatchNoCompression(batch);
- auto evWrite = std::make_unique<NKikimr::NEvents::TDataEvents::TEvWrite>(txId);
+ auto evWrite = std::make_unique<NKikimr::NEvents::TDataEvents::TEvWrite>(txId, NKikimrDataEvents::TEvWrite::MODE_PREPARE);
ui64 payloadIndex = NEvWrite::TPayloadHelper<NKikimr::NEvents::TDataEvents::TEvWrite>(*evWrite).AddDataToPayload(std::move(blobData)); evWrite->AddOperation(NKikimrDataEvents::TEvWrite::TOperation::OPERATION_REPLACE, tableId, 1, schema, payloadIndex, NKikimrDataEvents::FORMAT_ARROW);
TActorId sender = runtime.AllocateEdgeActor();
diff --git a/ydb/core/tx/data_events/events.h b/ydb/core/tx/data_events/events.h
index 64f9e95e64..402d442001 100644
--- a/ydb/core/tx/data_events/events.h
+++ b/ydb/core/tx/data_events/events.h
@@ -2,6 +2,8 @@
#include "write_data.h"
+#include <library/cpp/lwtrace/shuttle.h>
+
#include <ydb/core/protos/data_events.pb.h>
#include <ydb/core/base/events.h>
@@ -36,10 +38,14 @@ struct TDataEvents {
static_assert(EEventType::EvEnd < EventSpaceEnd(TKikimrEvents::ES_DATA_OPERATIONS), "expect EvEnd < EventSpaceEnd(TKikimrEvents::ES_DATA_OPERATIONS)");
struct TEvWrite : public NActors::TEventPB<TEvWrite, NKikimrDataEvents::TEvWrite, TDataEvents::EvWrite> {
+ public:
TEvWrite() = default;
- TEvWrite(ui64 txId) {
+ TEvWrite(ui64 txId, NKikimrDataEvents::TEvWrite::ETxMode txMode) {
+ Y_ABORT_UNLESS(txMode != NKikimrDataEvents::TEvWrite::MODE_UNSPECIFIED);
+
Record.SetTxId(txId);
+ Record.SetTxMode(txMode);
}
void AddOperation(NKikimrDataEvents::TEvWrite_TOperation::EOperationType operationType, ui64 tableId,
@@ -63,10 +69,16 @@ struct TDataEvents {
Y_ABORT_UNLESS(Record.HasTxId());
return Record.GetTxId();
}
+
+ void SetOrbit(NLWTrace::TOrbit&& orbit) { Orbit = std::move(orbit); }
+ NLWTrace::TOrbit& GetOrbit() { return Orbit; }
+ NLWTrace::TOrbit&& MoveOrbit() { return std::move(Orbit); }
+ private:
+ NLWTrace::TOrbit Orbit;
};
struct TEvWriteResult : public NActors::TEventPB<TEvWriteResult, NKikimrDataEvents::TEvWriteResult, TDataEvents::EvWriteResult> {
-
+ public:
TEvWriteResult() = default;
static std::unique_ptr<TEvWriteResult> BuildError(const ui64 txId, const NKikimrDataEvents::TEvWriteResult::EStatus& status, const TString& errorMsg) {
@@ -96,7 +108,14 @@ struct TDataEvents {
result->Record.MutableDomainCoordinators()->CopyFrom(transactionInfo.GetDomainCoordinators());
return result;
}
+
+ void SetOrbit(NLWTrace::TOrbit&& orbit) { Orbit = std::move(orbit); }
+ NLWTrace::TOrbit& GetOrbit() { return Orbit; }
+ NLWTrace::TOrbit&& MoveOrbit() { return std::move(Orbit); }
+ private:
+ NLWTrace::TOrbit Orbit;
};
+
};
}