aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorivanmorozov <ivanmorozov@yandex-team.com>2022-09-23 15:34:12 +0300
committerivanmorozov <ivanmorozov@yandex-team.com>2022-09-23 15:34:12 +0300
commitf7362d18e23479d33f86e53dea41c0ef0974bc8d (patch)
tree8582d4fc3d595108a4730c077c4d2e51f6384d30
parent07c943de4adb7d76d1a1725b6c5b9d9b9edf7733 (diff)
downloadydb-f7362d18e23479d33f86e53dea41c0ef0974bc8d.tar.gz
spans usage for rpc_long_tx
-rw-r--r--library/cpp/actors/wilson/wilson_profile_span.cpp8
-rw-r--r--library/cpp/actors/wilson/wilson_profile_span.h4
-rw-r--r--ydb/core/grpc_services/rpc_long_tx.cpp94
3 files changed, 86 insertions, 20 deletions
diff --git a/library/cpp/actors/wilson/wilson_profile_span.cpp b/library/cpp/actors/wilson/wilson_profile_span.cpp
index 177db5de83..3939be3c94 100644
--- a/library/cpp/actors/wilson/wilson_profile_span.cpp
+++ b/library/cpp/actors/wilson/wilson_profile_span.cpp
@@ -34,11 +34,17 @@ TProfileSpan::TProfileSpan(const ui8 verbosity, TTraceId parentId, std::optional
}
TProfileSpan::~TProfileSpan() {
- if (Enabled) {
+ if (Enabled && (ResultTimes.GetMapSafe().size() || PairInstances.size())) {
TBase::Attribute("profile", ProfileToString());
}
}
+NWilson::TProfileSpan TProfileSpan::BuildChildrenSpan(std::optional<TString> name, const ui8 verbosity) const {
+ TTraceId parentTraceId = TBase::GetTraceId();
+ const ui8 newVerbosity = verbosity ? verbosity : parentTraceId.GetVerbosity();
+ return TProfileSpan(newVerbosity, std::move(parentTraceId), name);
+}
+
TString TProfileSpan::ProfileToString() const {
if (!Enabled) {
return "DISABLED";
diff --git a/library/cpp/actors/wilson/wilson_profile_span.h b/library/cpp/actors/wilson/wilson_profile_span.h
index ece1837631..f12747e4ac 100644
--- a/library/cpp/actors/wilson/wilson_profile_span.h
+++ b/library/cpp/actors/wilson/wilson_profile_span.h
@@ -19,7 +19,7 @@ private:
void AddMin(const TInstant instance);
TString ToString() const;
};
- mutable NJson::TJsonValue ResultTimes;
+ mutable NJson::TJsonValue ResultTimes = NJson::JSON_MAP;
std::map<TString, TMinMaxPair> PairInstances;
std::vector<NJson::TJsonValue*> CurrentJsonPath;
mutable TInstant LastNoGuards = Now();
@@ -32,6 +32,8 @@ public:
TProfileSpan(const ui8 verbosity, TTraceId parentId, std::optional<TString> name);
~TProfileSpan();
+ TProfileSpan BuildChildrenSpan(std::optional<TString> name, const ui8 verbosity = 0) const;
+
using TBase::TBase;
TString ProfileToString() const;
diff --git a/ydb/core/grpc_services/rpc_long_tx.cpp b/ydb/core/grpc_services/rpc_long_tx.cpp
index a404de7cf3..01dc3997c7 100644
--- a/ydb/core/grpc_services/rpc_long_tx.cpp
+++ b/ydb/core/grpc_services/rpc_long_tx.cpp
@@ -14,6 +14,7 @@
#include <ydb/core/tx/columnshard/columnshard.h>
#include <ydb/core/tx/long_tx_service/public/events.h>
+#include <library/cpp/actors/wilson/wilson_profile_span.h>
#include <contrib/libs/apache/arrow/cpp/src/arrow/compute/api.h>
namespace NKikimr {
@@ -41,7 +42,50 @@ std::shared_ptr<arrow::Schema> ExtractArrowSchema(const NKikimrSchemeOp::TColumn
return NArrow::MakeArrowSchema(columns);
}
-THashMap<ui64, TString> SplitData(const std::shared_ptr<arrow::RecordBatch>& batch,
+class TShardInfo {
+private:
+ const TString Data;
+ const ui32 RowsCount;
+public:
+ TShardInfo(const TString& data, const ui32 rowsCount)
+ : Data(data)
+ , RowsCount(rowsCount)
+ {
+
+ }
+ const TString& GetData() const {
+ return Data;
+ }
+ ui32 GetRowsCount() const {
+ return RowsCount;
+ }
+};
+
+class TFullSplitData {
+private:
+ ui32 ShardsCount = 0;
+ THashMap<ui64, TShardInfo> ShardsInfo;
+public:
+ TFullSplitData(const ui32 shardsCount)
+ : ShardsCount(shardsCount)
+ {
+
+ }
+
+ const THashMap<ui64, TShardInfo>& GetShardsInfo() const {
+ return ShardsInfo;
+ }
+
+ ui32 GetShardsCount() const {
+ return ShardsCount;
+ }
+
+ void AddShardInfo(const ui64 tabletId, TShardInfo&& info) {
+ ShardsInfo.emplace(tabletId, std::move(info));
+ }
+};
+
+TFullSplitData SplitData(const std::shared_ptr<arrow::RecordBatch>& batch,
const NKikimrSchemeOp::TColumnTableDescription& description)
{
Y_VERIFY(batch);
@@ -54,11 +98,12 @@ THashMap<ui64, TString> SplitData(const std::shared_ptr<arrow::RecordBatch>& bat
TVector<TString> shardingColumns(hashSharding.GetColumns().begin(), hashSharding.GetColumns().end());
ui32 numShards = tabletIds.size();
Y_VERIFY(numShards);
+ TFullSplitData result(numShards);
if (numShards == 1) {
- THashMap<ui64, TString> out;
- out.emplace(tabletIds[0], NArrow::SerializeBatchNoCompression(batch));
- return out;
+ TShardInfo splitInfo(NArrow::SerializeBatchNoCompression(batch), batch->num_rows());
+ result.AddShardInfo(tabletIds[0], std::move(splitInfo));
+ return result;
}
std::vector<ui32> rowSharding;
@@ -71,7 +116,7 @@ THashMap<ui64, TString> SplitData(const std::shared_ptr<arrow::RecordBatch>& bat
}
if (rowSharding.empty()) {
- return {};
+ return result;
}
std::vector<std::shared_ptr<arrow::RecordBatch>> sharded = NArrow::ShardingSplit(batch, rowSharding, numShards);
@@ -80,16 +125,17 @@ THashMap<ui64, TString> SplitData(const std::shared_ptr<arrow::RecordBatch>& bat
THashMap<ui64, TString> out;
for (size_t i = 0; i < sharded.size(); ++i) {
if (sharded[i]) {
- out.emplace(tabletIds[i], NArrow::SerializeBatchNoCompression(sharded[i]));
+ TShardInfo splitInfo(NArrow::SerializeBatchNoCompression(sharded[i]), sharded[i]->num_rows());
+ result.AddShardInfo(tabletIds[i], std::move(splitInfo));
}
}
- Y_VERIFY(!out.empty());
- return out;
+ Y_VERIFY(result.GetShardsInfo().size());
+ return result;
}
-// Deserailizes arrow batch and splits it
-THashMap<ui64, TString> SplitData(const TString& data, const NKikimrSchemeOp::TColumnTableDescription& description) {
+// Deserialize arrow batch and splits it
+TFullSplitData SplitData(const TString& data, const NKikimrSchemeOp::TColumnTableDescription& description) {
Y_VERIFY(description.HasSchema());
auto& olapSchema = description.GetSchema();
Y_VERIFY(olapSchema.GetEngine() == NKikimrSchemeOp::COLUMN_ENGINE_REPLACING_TIMESERIES);
@@ -97,9 +143,8 @@ THashMap<ui64, TString> SplitData(const TString& data, const NKikimrSchemeOp::TC
std::shared_ptr<arrow::Schema> schema = ExtractArrowSchema(olapSchema);
std::shared_ptr<arrow::RecordBatch> batch = NArrow::DeserializeBatch(data, schema);
if (!batch || !batch->ValidateFull().ok()) {
- return {};
+ return TFullSplitData(0);
}
-
return SplitData(batch, description);
}
@@ -343,6 +388,7 @@ public:
, DedupId(dedupId)
, LongTxId(longTxId)
, LeaderPipeCache(MakePipePeNodeCacheID(false))
+ , ActorSpan(0, NWilson::TTraceId::NewTraceId(0, Max<ui32>()), "TLongTxWriteBase")
{
if (token) {
UserToken.emplace(token);
@@ -360,6 +406,7 @@ protected:
}
void ProceedWithSchema(const NSchemeCache::TSchemeCacheNavigate& resp) {
+ NWilson::TProfileSpan pSpan = ActorSpan.BuildChildrenSpan("ProceedWithSchema");
if (resp.ErrorCount > 0) {
// TODO: map to a correct error
return ReplyError(Ydb::StatusIds::SCHEME_ERROR, "There was an error during table query");
@@ -407,15 +454,23 @@ protected:
SendWriteRequest(shard, tableId, DedupId, GetSerializedData());
} else if (sharding.HasHashSharding()) {
- auto batches = HasDeserializedBatch() ?
+ const TFullSplitData batches = HasDeserializedBatch() ?
SplitData(GetDeserializedBatch(), description) :
SplitData(GetSerializedData(), description);
- if (batches.empty()) {
+ if (batches.GetShardsInfo().empty()) {
return ReplyError(Ydb::StatusIds::SCHEME_ERROR, "Cannot deserialize or split input data");
}
- for (auto& [shard, batch] : batches) {
- SendWriteRequest(shard, tableId, DedupId, batch);
+ ui32 sumBytes = 0;
+ ui32 rowsCount = 0;
+ for (auto& [shard, info] : batches.GetShardsInfo()) {
+ sumBytes += info.GetData().size();
+ rowsCount += info.GetRowsCount();
+ SendWriteRequest(shard, tableId, DedupId, info.GetData());
}
+ pSpan.Attribute("affected_shards_count", (long)batches.GetShardsInfo().size());
+ pSpan.Attribute("bytes", (long)sumBytes);
+ pSpan.Attribute("rows", (long)rowsCount);
+ pSpan.Attribute("shards_count", (long)batches.GetShardsCount());
} else {
return ReplyError(Ydb::StatusIds::SCHEME_ERROR, "Sharding method is not supported");
}
@@ -469,6 +524,7 @@ private:
}
void Handle(TEvColumnShard::TEvWriteResult::TPtr& ev) {
+ auto gProfile = ActorSpan.StartStackTimeGuard("WriteResult");
const auto* msg = ev->Get();
ui64 shardId = msg->Record.GetOrigin();
Y_VERIFY(WaitShards.count(shardId) || ShardsWrites.count(shardId));
@@ -491,6 +547,7 @@ private:
}
void Handle(TEvPipeCache::TEvDeliveryProblem::TPtr& ev) {
+ NWilson::TProfileSpan pSpan(0, ActorSpan.GetTraceId(), "DeliveryProblem");
const auto* msg = ev->Get();
if (msg->NotDelivered) {
@@ -519,6 +576,7 @@ private:
}
void Handle(TEvLongTxService::TEvAttachColumnShardWritesResult::TPtr& ev) {
+ NWilson::TProfileSpan pSpan(0, ActorSpan.GetTraceId(), "AttachColumnShardWritesResult");
const auto* msg = ev->Get();
if (msg->Record.GetStatus() != Ydb::StatusIds::SUCCESS) {
@@ -536,7 +594,7 @@ private:
private:
void SendToTablet(ui64 tabletId, THolder<IEventBase> event) {
this->Send(LeaderPipeCache, new TEvPipeCache::TEvForward(event.Release(), tabletId, true),
- IEventHandle::FlagTrackDelivery);
+ IEventHandle::FlagTrackDelivery, 0, ActorSpan.GetTraceId());
}
protected:
@@ -547,7 +605,6 @@ protected:
virtual std::shared_ptr<arrow::RecordBatch> GetDeserializedBatch() const {
return nullptr;
}
-
virtual TString GetSerializedData() = 0;
virtual void RaiseIssue(const NYql::TIssue& issue) = 0;
virtual void ReplyError(Ydb::StatusIds::StatusCode status, const TString& message = TString()) = 0;
@@ -563,6 +620,7 @@ private:
std::optional<NACLib::TUserToken> UserToken;
THashSet<ui64> WaitShards;
THashMap<ui64, ui64> ShardsWrites;
+ NWilson::TProfileSpan ActorSpan;
};