diff options
author | ivanmorozov <ivanmorozov@yandex-team.com> | 2022-09-23 15:34:12 +0300 |
---|---|---|
committer | ivanmorozov <ivanmorozov@yandex-team.com> | 2022-09-23 15:34:12 +0300 |
commit | f7362d18e23479d33f86e53dea41c0ef0974bc8d (patch) | |
tree | 8582d4fc3d595108a4730c077c4d2e51f6384d30 | |
parent | 07c943de4adb7d76d1a1725b6c5b9d9b9edf7733 (diff) | |
download | ydb-f7362d18e23479d33f86e53dea41c0ef0974bc8d.tar.gz |
spans usage for rpc_long_tx
-rw-r--r-- | library/cpp/actors/wilson/wilson_profile_span.cpp | 8 | ||||
-rw-r--r-- | library/cpp/actors/wilson/wilson_profile_span.h | 4 | ||||
-rw-r--r-- | ydb/core/grpc_services/rpc_long_tx.cpp | 94 |
3 files changed, 86 insertions, 20 deletions
diff --git a/library/cpp/actors/wilson/wilson_profile_span.cpp b/library/cpp/actors/wilson/wilson_profile_span.cpp index 177db5de83..3939be3c94 100644 --- a/library/cpp/actors/wilson/wilson_profile_span.cpp +++ b/library/cpp/actors/wilson/wilson_profile_span.cpp @@ -34,11 +34,17 @@ TProfileSpan::TProfileSpan(const ui8 verbosity, TTraceId parentId, std::optional } TProfileSpan::~TProfileSpan() { - if (Enabled) { + if (Enabled && (ResultTimes.GetMapSafe().size() || PairInstances.size())) { TBase::Attribute("profile", ProfileToString()); } } +NWilson::TProfileSpan TProfileSpan::BuildChildrenSpan(std::optional<TString> name, const ui8 verbosity) const { + TTraceId parentTraceId = TBase::GetTraceId(); + const ui8 newVerbosity = verbosity ? verbosity : parentTraceId.GetVerbosity(); + return TProfileSpan(newVerbosity, std::move(parentTraceId), name); +} + TString TProfileSpan::ProfileToString() const { if (!Enabled) { return "DISABLED"; diff --git a/library/cpp/actors/wilson/wilson_profile_span.h b/library/cpp/actors/wilson/wilson_profile_span.h index ece1837631..f12747e4ac 100644 --- a/library/cpp/actors/wilson/wilson_profile_span.h +++ b/library/cpp/actors/wilson/wilson_profile_span.h @@ -19,7 +19,7 @@ private: void AddMin(const TInstant instance); TString ToString() const; }; - mutable NJson::TJsonValue ResultTimes; + mutable NJson::TJsonValue ResultTimes = NJson::JSON_MAP; std::map<TString, TMinMaxPair> PairInstances; std::vector<NJson::TJsonValue*> CurrentJsonPath; mutable TInstant LastNoGuards = Now(); @@ -32,6 +32,8 @@ public: TProfileSpan(const ui8 verbosity, TTraceId parentId, std::optional<TString> name); ~TProfileSpan(); + TProfileSpan BuildChildrenSpan(std::optional<TString> name, const ui8 verbosity = 0) const; + using TBase::TBase; TString ProfileToString() const; diff --git a/ydb/core/grpc_services/rpc_long_tx.cpp b/ydb/core/grpc_services/rpc_long_tx.cpp index a404de7cf3..01dc3997c7 100644 --- a/ydb/core/grpc_services/rpc_long_tx.cpp +++ b/ydb/core/grpc_services/rpc_long_tx.cpp @@ -14,6 +14,7 @@ #include <ydb/core/tx/columnshard/columnshard.h> #include <ydb/core/tx/long_tx_service/public/events.h> +#include <library/cpp/actors/wilson/wilson_profile_span.h> #include <contrib/libs/apache/arrow/cpp/src/arrow/compute/api.h> namespace NKikimr { @@ -41,7 +42,50 @@ std::shared_ptr<arrow::Schema> ExtractArrowSchema(const NKikimrSchemeOp::TColumn return NArrow::MakeArrowSchema(columns); } -THashMap<ui64, TString> SplitData(const std::shared_ptr<arrow::RecordBatch>& batch, +class TShardInfo { +private: + const TString Data; + const ui32 RowsCount; +public: + TShardInfo(const TString& data, const ui32 rowsCount) + : Data(data) + , RowsCount(rowsCount) + { + + } + const TString& GetData() const { + return Data; + } + ui32 GetRowsCount() const { + return RowsCount; + } +}; + +class TFullSplitData { +private: + ui32 ShardsCount = 0; + THashMap<ui64, TShardInfo> ShardsInfo; +public: + TFullSplitData(const ui32 shardsCount) + : ShardsCount(shardsCount) + { + + } + + const THashMap<ui64, TShardInfo>& GetShardsInfo() const { + return ShardsInfo; + } + + ui32 GetShardsCount() const { + return ShardsCount; + } + + void AddShardInfo(const ui64 tabletId, TShardInfo&& info) { + ShardsInfo.emplace(tabletId, std::move(info)); + } +}; + +TFullSplitData SplitData(const std::shared_ptr<arrow::RecordBatch>& batch, const NKikimrSchemeOp::TColumnTableDescription& description) { Y_VERIFY(batch); @@ -54,11 +98,12 @@ THashMap<ui64, TString> SplitData(const std::shared_ptr<arrow::RecordBatch>& bat TVector<TString> shardingColumns(hashSharding.GetColumns().begin(), hashSharding.GetColumns().end()); ui32 numShards = tabletIds.size(); Y_VERIFY(numShards); + TFullSplitData result(numShards); if (numShards == 1) { - THashMap<ui64, TString> out; - out.emplace(tabletIds[0], NArrow::SerializeBatchNoCompression(batch)); - return out; + TShardInfo splitInfo(NArrow::SerializeBatchNoCompression(batch), batch->num_rows()); + result.AddShardInfo(tabletIds[0], std::move(splitInfo)); + return result; } std::vector<ui32> rowSharding; @@ -71,7 +116,7 @@ THashMap<ui64, TString> SplitData(const std::shared_ptr<arrow::RecordBatch>& bat } if (rowSharding.empty()) { - return {}; + return result; } std::vector<std::shared_ptr<arrow::RecordBatch>> sharded = NArrow::ShardingSplit(batch, rowSharding, numShards); @@ -80,16 +125,17 @@ THashMap<ui64, TString> SplitData(const std::shared_ptr<arrow::RecordBatch>& bat THashMap<ui64, TString> out; for (size_t i = 0; i < sharded.size(); ++i) { if (sharded[i]) { - out.emplace(tabletIds[i], NArrow::SerializeBatchNoCompression(sharded[i])); + TShardInfo splitInfo(NArrow::SerializeBatchNoCompression(sharded[i]), sharded[i]->num_rows()); + result.AddShardInfo(tabletIds[i], std::move(splitInfo)); } } - Y_VERIFY(!out.empty()); - return out; + Y_VERIFY(result.GetShardsInfo().size()); + return result; } -// Deserailizes arrow batch and splits it -THashMap<ui64, TString> SplitData(const TString& data, const NKikimrSchemeOp::TColumnTableDescription& description) { +// Deserialize arrow batch and splits it +TFullSplitData SplitData(const TString& data, const NKikimrSchemeOp::TColumnTableDescription& description) { Y_VERIFY(description.HasSchema()); auto& olapSchema = description.GetSchema(); Y_VERIFY(olapSchema.GetEngine() == NKikimrSchemeOp::COLUMN_ENGINE_REPLACING_TIMESERIES); @@ -97,9 +143,8 @@ THashMap<ui64, TString> SplitData(const TString& data, const NKikimrSchemeOp::TC std::shared_ptr<arrow::Schema> schema = ExtractArrowSchema(olapSchema); std::shared_ptr<arrow::RecordBatch> batch = NArrow::DeserializeBatch(data, schema); if (!batch || !batch->ValidateFull().ok()) { - return {}; + return TFullSplitData(0); } - return SplitData(batch, description); } @@ -343,6 +388,7 @@ public: , DedupId(dedupId) , LongTxId(longTxId) , LeaderPipeCache(MakePipePeNodeCacheID(false)) + , ActorSpan(0, NWilson::TTraceId::NewTraceId(0, Max<ui32>()), "TLongTxWriteBase") { if (token) { UserToken.emplace(token); @@ -360,6 +406,7 @@ protected: } void ProceedWithSchema(const NSchemeCache::TSchemeCacheNavigate& resp) { + NWilson::TProfileSpan pSpan = ActorSpan.BuildChildrenSpan("ProceedWithSchema"); if (resp.ErrorCount > 0) { // TODO: map to a correct error return ReplyError(Ydb::StatusIds::SCHEME_ERROR, "There was an error during table query"); @@ -407,15 +454,23 @@ protected: SendWriteRequest(shard, tableId, DedupId, GetSerializedData()); } else if (sharding.HasHashSharding()) { - auto batches = HasDeserializedBatch() ? + const TFullSplitData batches = HasDeserializedBatch() ? SplitData(GetDeserializedBatch(), description) : SplitData(GetSerializedData(), description); - if (batches.empty()) { + if (batches.GetShardsInfo().empty()) { return ReplyError(Ydb::StatusIds::SCHEME_ERROR, "Cannot deserialize or split input data"); } - for (auto& [shard, batch] : batches) { - SendWriteRequest(shard, tableId, DedupId, batch); + ui32 sumBytes = 0; + ui32 rowsCount = 0; + for (auto& [shard, info] : batches.GetShardsInfo()) { + sumBytes += info.GetData().size(); + rowsCount += info.GetRowsCount(); + SendWriteRequest(shard, tableId, DedupId, info.GetData()); } + pSpan.Attribute("affected_shards_count", (long)batches.GetShardsInfo().size()); + pSpan.Attribute("bytes", (long)sumBytes); + pSpan.Attribute("rows", (long)rowsCount); + pSpan.Attribute("shards_count", (long)batches.GetShardsCount()); } else { return ReplyError(Ydb::StatusIds::SCHEME_ERROR, "Sharding method is not supported"); } @@ -469,6 +524,7 @@ private: } void Handle(TEvColumnShard::TEvWriteResult::TPtr& ev) { + auto gProfile = ActorSpan.StartStackTimeGuard("WriteResult"); const auto* msg = ev->Get(); ui64 shardId = msg->Record.GetOrigin(); Y_VERIFY(WaitShards.count(shardId) || ShardsWrites.count(shardId)); @@ -491,6 +547,7 @@ private: } void Handle(TEvPipeCache::TEvDeliveryProblem::TPtr& ev) { + NWilson::TProfileSpan pSpan(0, ActorSpan.GetTraceId(), "DeliveryProblem"); const auto* msg = ev->Get(); if (msg->NotDelivered) { @@ -519,6 +576,7 @@ private: } void Handle(TEvLongTxService::TEvAttachColumnShardWritesResult::TPtr& ev) { + NWilson::TProfileSpan pSpan(0, ActorSpan.GetTraceId(), "AttachColumnShardWritesResult"); const auto* msg = ev->Get(); if (msg->Record.GetStatus() != Ydb::StatusIds::SUCCESS) { @@ -536,7 +594,7 @@ private: private: void SendToTablet(ui64 tabletId, THolder<IEventBase> event) { this->Send(LeaderPipeCache, new TEvPipeCache::TEvForward(event.Release(), tabletId, true), - IEventHandle::FlagTrackDelivery); + IEventHandle::FlagTrackDelivery, 0, ActorSpan.GetTraceId()); } protected: @@ -547,7 +605,6 @@ protected: virtual std::shared_ptr<arrow::RecordBatch> GetDeserializedBatch() const { return nullptr; } - virtual TString GetSerializedData() = 0; virtual void RaiseIssue(const NYql::TIssue& issue) = 0; virtual void ReplyError(Ydb::StatusIds::StatusCode status, const TString& message = TString()) = 0; @@ -563,6 +620,7 @@ private: std::optional<NACLib::TUserToken> UserToken; THashSet<ui64> WaitShards; THashMap<ui64, ui64> ShardsWrites; + NWilson::TProfileSpan ActorSpan; }; |