diff options
| author | ivanmorozov <[email protected]> | 2022-09-23 15:34:12 +0300 | 
|---|---|---|
| committer | ivanmorozov <[email protected]> | 2022-09-23 15:34:12 +0300 | 
| commit | f7362d18e23479d33f86e53dea41c0ef0974bc8d (patch) | |
| tree | 8582d4fc3d595108a4730c077c4d2e51f6384d30 | |
| parent | 07c943de4adb7d76d1a1725b6c5b9d9b9edf7733 (diff) | |
spans usage for rpc_long_tx
| -rw-r--r-- | library/cpp/actors/wilson/wilson_profile_span.cpp | 8 | ||||
| -rw-r--r-- | library/cpp/actors/wilson/wilson_profile_span.h | 4 | ||||
| -rw-r--r-- | ydb/core/grpc_services/rpc_long_tx.cpp | 94 | 
3 files changed, 86 insertions, 20 deletions
diff --git a/library/cpp/actors/wilson/wilson_profile_span.cpp b/library/cpp/actors/wilson/wilson_profile_span.cpp index 177db5de83d..3939be3c94d 100644 --- a/library/cpp/actors/wilson/wilson_profile_span.cpp +++ b/library/cpp/actors/wilson/wilson_profile_span.cpp @@ -34,11 +34,17 @@ TProfileSpan::TProfileSpan(const ui8 verbosity, TTraceId parentId, std::optional  }  TProfileSpan::~TProfileSpan() { -    if (Enabled) { +    if (Enabled && (ResultTimes.GetMapSafe().size() || PairInstances.size())) {          TBase::Attribute("profile", ProfileToString());      }  } +NWilson::TProfileSpan TProfileSpan::BuildChildrenSpan(std::optional<TString> name, const ui8 verbosity) const { +    TTraceId parentTraceId = TBase::GetTraceId(); +    const ui8 newVerbosity = verbosity ? verbosity : parentTraceId.GetVerbosity(); +    return TProfileSpan(newVerbosity, std::move(parentTraceId), name); +} +  TString TProfileSpan::ProfileToString() const {      if (!Enabled) {          return "DISABLED"; diff --git a/library/cpp/actors/wilson/wilson_profile_span.h b/library/cpp/actors/wilson/wilson_profile_span.h index ece18376313..f12747e4ac0 100644 --- a/library/cpp/actors/wilson/wilson_profile_span.h +++ b/library/cpp/actors/wilson/wilson_profile_span.h @@ -19,7 +19,7 @@ private:          void AddMin(const TInstant instance);          TString ToString() const;      }; -    mutable NJson::TJsonValue ResultTimes; +    mutable NJson::TJsonValue ResultTimes = NJson::JSON_MAP;      std::map<TString, TMinMaxPair> PairInstances;      std::vector<NJson::TJsonValue*> CurrentJsonPath;      mutable TInstant LastNoGuards = Now(); @@ -32,6 +32,8 @@ public:      TProfileSpan(const ui8 verbosity, TTraceId parentId, std::optional<TString> name);      ~TProfileSpan(); +    TProfileSpan BuildChildrenSpan(std::optional<TString> name, const ui8 verbosity = 0) const; +      using TBase::TBase;      TString ProfileToString() const; diff --git a/ydb/core/grpc_services/rpc_long_tx.cpp b/ydb/core/grpc_services/rpc_long_tx.cpp index a404de7cf34..01dc3997c74 100644 --- a/ydb/core/grpc_services/rpc_long_tx.cpp +++ b/ydb/core/grpc_services/rpc_long_tx.cpp @@ -14,6 +14,7 @@  #include <ydb/core/tx/columnshard/columnshard.h>  #include <ydb/core/tx/long_tx_service/public/events.h> +#include <library/cpp/actors/wilson/wilson_profile_span.h>  #include <contrib/libs/apache/arrow/cpp/src/arrow/compute/api.h>  namespace NKikimr { @@ -41,7 +42,50 @@ std::shared_ptr<arrow::Schema> ExtractArrowSchema(const NKikimrSchemeOp::TColumn      return NArrow::MakeArrowSchema(columns);  } -THashMap<ui64, TString> SplitData(const std::shared_ptr<arrow::RecordBatch>& batch, +class TShardInfo { +private: +    const TString Data; +    const ui32 RowsCount; +public: +    TShardInfo(const TString& data, const ui32 rowsCount) +        : Data(data) +        , RowsCount(rowsCount) +    { + +    } +    const TString& GetData() const { +        return Data; +    } +    ui32 GetRowsCount() const { +        return RowsCount; +    } +}; + +class TFullSplitData { +private: +    ui32 ShardsCount = 0; +    THashMap<ui64, TShardInfo> ShardsInfo; +public: +    TFullSplitData(const ui32 shardsCount) +        : ShardsCount(shardsCount) +    { + +    } + +    const THashMap<ui64, TShardInfo>& GetShardsInfo() const { +        return ShardsInfo; +    } + +    ui32 GetShardsCount() const { +        return ShardsCount; +    } + +    void AddShardInfo(const ui64 tabletId, TShardInfo&& info) { +        ShardsInfo.emplace(tabletId, std::move(info)); +    } +}; + +TFullSplitData SplitData(const std::shared_ptr<arrow::RecordBatch>& batch,      const NKikimrSchemeOp::TColumnTableDescription& description)  {      Y_VERIFY(batch); @@ -54,11 +98,12 @@ THashMap<ui64, TString> SplitData(const std::shared_ptr<arrow::RecordBatch>& bat      TVector<TString> shardingColumns(hashSharding.GetColumns().begin(), hashSharding.GetColumns().end());      ui32 numShards = tabletIds.size();      Y_VERIFY(numShards); +    TFullSplitData result(numShards);      if (numShards == 1) { -        THashMap<ui64, TString> out; -        out.emplace(tabletIds[0], NArrow::SerializeBatchNoCompression(batch)); -        return out; +        TShardInfo splitInfo(NArrow::SerializeBatchNoCompression(batch), batch->num_rows()); +        result.AddShardInfo(tabletIds[0], std::move(splitInfo)); +        return result;      }      std::vector<ui32> rowSharding; @@ -71,7 +116,7 @@ THashMap<ui64, TString> SplitData(const std::shared_ptr<arrow::RecordBatch>& bat      }      if (rowSharding.empty()) { -        return {}; +        return result;      }      std::vector<std::shared_ptr<arrow::RecordBatch>> sharded = NArrow::ShardingSplit(batch, rowSharding, numShards); @@ -80,16 +125,17 @@ THashMap<ui64, TString> SplitData(const std::shared_ptr<arrow::RecordBatch>& bat      THashMap<ui64, TString> out;      for (size_t i = 0; i < sharded.size(); ++i) {          if (sharded[i]) { -            out.emplace(tabletIds[i], NArrow::SerializeBatchNoCompression(sharded[i])); +            TShardInfo splitInfo(NArrow::SerializeBatchNoCompression(sharded[i]), sharded[i]->num_rows()); +            result.AddShardInfo(tabletIds[i], std::move(splitInfo));          }      } -    Y_VERIFY(!out.empty()); -    return out; +    Y_VERIFY(result.GetShardsInfo().size()); +    return result;  } -// Deserailizes arrow batch and splits it -THashMap<ui64, TString> SplitData(const TString& data, const NKikimrSchemeOp::TColumnTableDescription& description) { +// Deserialize arrow batch and splits it +TFullSplitData SplitData(const TString& data, const NKikimrSchemeOp::TColumnTableDescription& description) {      Y_VERIFY(description.HasSchema());      auto& olapSchema = description.GetSchema();      Y_VERIFY(olapSchema.GetEngine() == NKikimrSchemeOp::COLUMN_ENGINE_REPLACING_TIMESERIES); @@ -97,9 +143,8 @@ THashMap<ui64, TString> SplitData(const TString& data, const NKikimrSchemeOp::TC      std::shared_ptr<arrow::Schema> schema = ExtractArrowSchema(olapSchema);      std::shared_ptr<arrow::RecordBatch> batch = NArrow::DeserializeBatch(data, schema);      if (!batch || !batch->ValidateFull().ok()) { -        return {}; +        return TFullSplitData(0);      } -      return SplitData(batch, description);  } @@ -343,6 +388,7 @@ public:          , DedupId(dedupId)          , LongTxId(longTxId)          , LeaderPipeCache(MakePipePeNodeCacheID(false)) +        , ActorSpan(0, NWilson::TTraceId::NewTraceId(0, Max<ui32>()), "TLongTxWriteBase")      {          if (token) {              UserToken.emplace(token); @@ -360,6 +406,7 @@ protected:      }      void ProceedWithSchema(const NSchemeCache::TSchemeCacheNavigate& resp) { +        NWilson::TProfileSpan pSpan = ActorSpan.BuildChildrenSpan("ProceedWithSchema");          if (resp.ErrorCount > 0) {              // TODO: map to a correct error              return ReplyError(Ydb::StatusIds::SCHEME_ERROR, "There was an error during table query"); @@ -407,15 +454,23 @@ protected:              SendWriteRequest(shard, tableId, DedupId, GetSerializedData());          } else if (sharding.HasHashSharding()) { -            auto batches = HasDeserializedBatch() ? +            const TFullSplitData batches = HasDeserializedBatch() ?                  SplitData(GetDeserializedBatch(), description) :                  SplitData(GetSerializedData(), description); -            if (batches.empty()) { +            if (batches.GetShardsInfo().empty()) {                  return ReplyError(Ydb::StatusIds::SCHEME_ERROR, "Cannot deserialize or split input data");              } -            for (auto& [shard, batch] : batches) { -                SendWriteRequest(shard, tableId, DedupId, batch); +            ui32 sumBytes = 0; +            ui32 rowsCount = 0; +            for (auto& [shard, info] : batches.GetShardsInfo()) { +                sumBytes += info.GetData().size(); +                rowsCount += info.GetRowsCount(); +                SendWriteRequest(shard, tableId, DedupId, info.GetData());              } +            pSpan.Attribute("affected_shards_count", (long)batches.GetShardsInfo().size()); +            pSpan.Attribute("bytes", (long)sumBytes); +            pSpan.Attribute("rows", (long)rowsCount); +            pSpan.Attribute("shards_count", (long)batches.GetShardsCount());          } else {              return ReplyError(Ydb::StatusIds::SCHEME_ERROR, "Sharding method is not supported");          } @@ -469,6 +524,7 @@ private:      }      void Handle(TEvColumnShard::TEvWriteResult::TPtr& ev) { +        auto gProfile = ActorSpan.StartStackTimeGuard("WriteResult");          const auto* msg = ev->Get();          ui64 shardId = msg->Record.GetOrigin();          Y_VERIFY(WaitShards.count(shardId) || ShardsWrites.count(shardId)); @@ -491,6 +547,7 @@ private:      }      void Handle(TEvPipeCache::TEvDeliveryProblem::TPtr& ev) { +        NWilson::TProfileSpan pSpan(0, ActorSpan.GetTraceId(), "DeliveryProblem");          const auto* msg = ev->Get();          if (msg->NotDelivered) { @@ -519,6 +576,7 @@ private:      }      void Handle(TEvLongTxService::TEvAttachColumnShardWritesResult::TPtr& ev) { +        NWilson::TProfileSpan pSpan(0, ActorSpan.GetTraceId(), "AttachColumnShardWritesResult");          const auto* msg = ev->Get();          if (msg->Record.GetStatus() != Ydb::StatusIds::SUCCESS) { @@ -536,7 +594,7 @@ private:  private:      void SendToTablet(ui64 tabletId, THolder<IEventBase> event) {          this->Send(LeaderPipeCache, new TEvPipeCache::TEvForward(event.Release(), tabletId, true), -                IEventHandle::FlagTrackDelivery); +                IEventHandle::FlagTrackDelivery, 0, ActorSpan.GetTraceId());      }  protected: @@ -547,7 +605,6 @@ protected:      virtual std::shared_ptr<arrow::RecordBatch> GetDeserializedBatch() const {          return nullptr;      } -      virtual TString GetSerializedData() = 0;      virtual void RaiseIssue(const NYql::TIssue& issue) = 0;      virtual void ReplyError(Ydb::StatusIds::StatusCode status, const TString& message = TString()) = 0; @@ -563,6 +620,7 @@ private:      std::optional<NACLib::TUserToken> UserToken;      THashSet<ui64> WaitShards;      THashMap<ui64, ui64> ShardsWrites; +    NWilson::TProfileSpan ActorSpan;  };  | 
