diff options
author | gusev-p <gusev-p@yandex-team.ru> | 2022-02-10 16:47:20 +0300 |
---|---|---|
committer | Daniil Cherednik <dcherednik@yandex-team.ru> | 2022-02-10 16:47:20 +0300 |
commit | 47af3b5bf148ddab250833ec454d30d7c4930c31 (patch) | |
tree | 9814fbd1c3effac9b8377c5d604b367b14e2db55 | |
parent | 1715700d00b30399d3648be821fd585ae552365e (diff) | |
download | ydb-47af3b5bf148ddab250833ec454d30d7c4930c31.tar.gz |
Restoring authorship annotation for <gusev-p@yandex-team.ru>. Commit 2 of 2.
75 files changed, 2158 insertions, 2158 deletions
diff --git a/contrib/libs/grpc/src/core/ext/filters/client_channel/resolver/dns/c_ares/dns_resolver_ares.cc b/contrib/libs/grpc/src/core/ext/filters/client_channel/resolver/dns/c_ares/dns_resolver_ares.cc index f31457b452..43d638ab3f 100644 --- a/contrib/libs/grpc/src/core/ext/filters/client_channel/resolver/dns/c_ares/dns_resolver_ares.cc +++ b/contrib/libs/grpc/src/core/ext/filters/client_channel/resolver/dns/c_ares/dns_resolver_ares.cc @@ -491,7 +491,7 @@ static bool should_use_ares(const char* resolver_env) { static bool should_use_ares(const char* resolver_env) { // TODO(lidiz): Remove the "g_custom_iomgr_enabled" flag once c-ares support // custom IO managers (e.g. gevent). - return !g_custom_iomgr_enabled && resolver_env != nullptr && (gpr_stricmp(resolver_env, "ares") == 0); + return !g_custom_iomgr_enabled && resolver_env != nullptr && (gpr_stricmp(resolver_env, "ares") == 0); } #endif /* GRPC_UV */ diff --git a/contrib/libs/grpc/src/core/ext/filters/client_channel/resolver/dns/native/dns_resolver.cc b/contrib/libs/grpc/src/core/ext/filters/client_channel/resolver/dns/native/dns_resolver.cc index 269f60556a..a0e3566190 100644 --- a/contrib/libs/grpc/src/core/ext/filters/client_channel/resolver/dns/native/dns_resolver.cc +++ b/contrib/libs/grpc/src/core/ext/filters/client_channel/resolver/dns/native/dns_resolver.cc @@ -178,7 +178,7 @@ void NativeDnsResolver::OnResolvedLocked(grpc_error* error) { if (shutdown_) { if (addresses_ != nullptr) { grpc_resolved_addresses_destroy(addresses_); - } + } Unref(DEBUG_LOCATION, "dns-resolving"); GRPC_ERROR_UNREF(error); return; diff --git a/contrib/libs/grpc/src/core/lib/surface/init.cc b/contrib/libs/grpc/src/core/lib/surface/init.cc index 20fedce532..7b79ba426b 100644 --- a/contrib/libs/grpc/src/core/lib/surface/init.cc +++ b/contrib/libs/grpc/src/core/lib/surface/init.cc @@ -191,7 +191,7 @@ void grpc_shutdown_internal_locked(void) { grpc_core::channelz::ChannelzRegistry::Shutdown(); grpc_stats_shutdown(); } - grpc_core::Fork::GlobalShutdown(); + grpc_core::Fork::GlobalShutdown(); grpc_core::ExecCtx::GlobalShutdown(); grpc_core::ApplicationCallbackExecCtx::GlobalShutdown(); g_shutting_down = false; diff --git a/contrib/libs/grpc/src/cpp/server/server_builder.cc b/contrib/libs/grpc/src/cpp/server/server_builder.cc index 8b846ba178..0cc00b365f 100644 --- a/contrib/libs/grpc/src/cpp/server/server_builder.cc +++ b/contrib/libs/grpc/src/cpp/server/server_builder.cc @@ -397,7 +397,7 @@ std::unique_ptr<grpc::Server> ServerBuilder::BuildAndStart() { for (auto& port : ports_) { int r = server->AddListeningPort(port.addr, port.creds.get()); if (!r) { - server->Shutdown(); + server->Shutdown(); return nullptr; } if (port.selected_port != nullptr) { diff --git a/contrib/libs/libiconv/dynamic/libiconv.exports b/contrib/libs/libiconv/dynamic/libiconv.exports index f09d45208f..c38a20d918 100644 --- a/contrib/libs/libiconv/dynamic/libiconv.exports +++ b/contrib/libs/libiconv/dynamic/libiconv.exports @@ -1,4 +1,4 @@ C libiconv C libiconv_open C libiconv_close -C libiconvctl
\ No newline at end of file +C libiconvctl
\ No newline at end of file diff --git a/contrib/libs/tcmalloc/tcmalloc/static_vars.h b/contrib/libs/tcmalloc/tcmalloc/static_vars.h index 537d83120e..be68edc189 100644 --- a/contrib/libs/tcmalloc/tcmalloc/static_vars.h +++ b/contrib/libs/tcmalloc/tcmalloc/static_vars.h @@ -125,7 +125,7 @@ class Static { return cpu_cache_active_; } static void ActivateCPUCache() { cpu_cache_active_ = true; } - static void DeactivateCPUCache() { cpu_cache_active_ = false; } + static void DeactivateCPUCache() { cpu_cache_active_ = false; } static bool ForkSupportEnabled() { return fork_support_enabled_; } static void EnableForkSupport() { fork_support_enabled_ = true; } diff --git a/contrib/libs/yaml-cpp/include/yaml-cpp/node/iterator.h b/contrib/libs/yaml-cpp/include/yaml-cpp/node/iterator.h index f4831a4335..6618169c53 100644 --- a/contrib/libs/yaml-cpp/include/yaml-cpp/node/iterator.h +++ b/contrib/libs/yaml-cpp/include/yaml-cpp/node/iterator.h @@ -17,21 +17,21 @@ namespace YAML { namespace detail { -struct node_pair: public std::pair<Node, Node> { - node_pair() = default; - node_pair(const Node& first, const Node& second) - : std::pair<Node, Node>(first, second) - { - } -}; - -struct iterator_value : public Node, node_pair { +struct node_pair: public std::pair<Node, Node> { + node_pair() = default; + node_pair(const Node& first, const Node& second) + : std::pair<Node, Node>(first, second) + { + } +}; + +struct iterator_value : public Node, node_pair { iterator_value() {} explicit iterator_value(const Node& rhs) : Node(rhs), - node_pair(Node(Node::ZombieNode), Node(Node::ZombieNode)) {} + node_pair(Node(Node::ZombieNode), Node(Node::ZombieNode)) {} explicit iterator_value(const Node& key, const Node& value) - : Node(Node::ZombieNode), node_pair(key, value) {} + : Node(Node::ZombieNode), node_pair(key, value) {} }; } } diff --git a/library/cpp/actors/core/log.cpp b/library/cpp/actors/core/log.cpp index 83ec0c5e43..5f63b5af58 100644 --- a/library/cpp/actors/core/log.cpp +++ b/library/cpp/actors/core/log.cpp @@ -715,26 +715,26 @@ namespace NActors { } }; - class TCompositeLogBackend: public TLogBackend { - public: - TCompositeLogBackend(TVector<TAutoPtr<TLogBackend>>&& underlyingBackends) - : UnderlyingBackends(std::move(underlyingBackends)) - { - } - - void WriteData(const TLogRecord& rec) override { - for (auto& b: UnderlyingBackends) { - b->WriteData(rec); - } - } - - void ReopenLog() override { - } - - private: - TVector<TAutoPtr<TLogBackend>> UnderlyingBackends; - }; - + class TCompositeLogBackend: public TLogBackend { + public: + TCompositeLogBackend(TVector<TAutoPtr<TLogBackend>>&& underlyingBackends) + : UnderlyingBackends(std::move(underlyingBackends)) + { + } + + void WriteData(const TLogRecord& rec) override { + for (auto& b: UnderlyingBackends) { + b->WriteData(rec); + } + } + + void ReopenLog() override { + } + + private: + TVector<TAutoPtr<TLogBackend>> UnderlyingBackends; + }; + TAutoPtr<TLogBackend> CreateStderrBackend() { return new TStderrBackend(); } @@ -747,7 +747,7 @@ namespace NActors { return new TNullLogBackend(); } - TAutoPtr<TLogBackend> CreateCompositeLogBackend(TVector<TAutoPtr<TLogBackend>>&& underlyingBackends) { - return new TCompositeLogBackend(std::move(underlyingBackends)); - } + TAutoPtr<TLogBackend> CreateCompositeLogBackend(TVector<TAutoPtr<TLogBackend>>&& underlyingBackends) { + return new TCompositeLogBackend(std::move(underlyingBackends)); + } } diff --git a/library/cpp/actors/core/log.h b/library/cpp/actors/core/log.h index 8499990e80..c11a7cf3c1 100644 --- a/library/cpp/actors/core/log.h +++ b/library/cpp/actors/core/log.h @@ -299,7 +299,7 @@ namespace NActors { TAutoPtr<TLogBackend> CreateStderrBackend(); TAutoPtr<TLogBackend> CreateFileBackend(const TString& fileName); TAutoPtr<TLogBackend> CreateNullBackend(); - TAutoPtr<TLogBackend> CreateCompositeLogBackend(TVector<TAutoPtr<TLogBackend>>&& underlyingBackends); + TAutoPtr<TLogBackend> CreateCompositeLogBackend(TVector<TAutoPtr<TLogBackend>>&& underlyingBackends); ///////////////////////////////////////////////////////////////////// // Logging adaptors for memory log and logging into filesystem diff --git a/library/cpp/histogram/adaptive/multi_histogram.h b/library/cpp/histogram/adaptive/multi_histogram.h index c9948f6733..41caac5ba6 100644 --- a/library/cpp/histogram/adaptive/multi_histogram.h +++ b/library/cpp/histogram/adaptive/multi_histogram.h @@ -64,7 +64,7 @@ namespace NKiwiAggr { TVector<ui64> GetIds() const { TVector<ui64> result(0); - for (THistogramsMap::const_iterator it = Histograms.begin(); it != Histograms.end(); ++it) { + for (THistogramsMap::const_iterator it = Histograms.begin(); it != Histograms.end(); ++it) { result.push_back(it->first); } return result; diff --git a/library/cpp/monlib/counters/counters.h b/library/cpp/monlib/counters/counters.h index 2180cf381d..038b55f0c8 100644 --- a/library/cpp/monlib/counters/counters.h +++ b/library/cpp/monlib/counters/counters.h @@ -112,10 +112,10 @@ namespace NMonitoring { return AtomicGet(Value) == 0; } - TAtomic& GetAtomic() { - return Value; - } - + TAtomic& GetAtomic() { + return Value; + } + private: TAtomic Value; bool Derivative; diff --git a/library/cpp/monlib/dynamic_counters/encode.cpp b/library/cpp/monlib/dynamic_counters/encode.cpp index f609c6160d..ffa48d276e 100644 --- a/library/cpp/monlib/dynamic_counters/encode.cpp +++ b/library/cpp/monlib/dynamic_counters/encode.cpp @@ -113,10 +113,10 @@ namespace NMonitoring { } } - THolder<ICountableConsumer> AsCountableConsumer(IMetricEncoderPtr encoder, TCountableBase::EVisibility visibility) { - return MakeHolder<TConsumer>(std::move(encoder), visibility); - } - + THolder<ICountableConsumer> AsCountableConsumer(IMetricEncoderPtr encoder, TCountableBase::EVisibility visibility) { + return MakeHolder<TConsumer>(std::move(encoder), visibility); + } + void ToJson(const TDynamicCounters& counters, IOutputStream* out) { TConsumer consumer{EncoderJson(out), TCountableBase::EVisibility::Public}; counters.Accept(TString{}, TString{}, consumer); diff --git a/library/cpp/monlib/dynamic_counters/encode.h b/library/cpp/monlib/dynamic_counters/encode.h index 1a3b253518..c79964d7cb 100644 --- a/library/cpp/monlib/dynamic_counters/encode.h +++ b/library/cpp/monlib/dynamic_counters/encode.h @@ -2,7 +2,7 @@ #include "counters.h" -#include <library/cpp/monlib/encode/encoder.h> +#include <library/cpp/monlib/encode/encoder.h> #include <library/cpp/monlib/encode/format.h> namespace NMonitoring { @@ -13,10 +13,10 @@ namespace NMonitoring { TCountableBase::EVisibility visibility = TCountableBase::EVisibility::Public ); - THolder<ICountableConsumer> AsCountableConsumer( - NMonitoring::IMetricEncoderPtr encoder, - TCountableBase::EVisibility visibility = TCountableBase::EVisibility::Public); - + THolder<ICountableConsumer> AsCountableConsumer( + NMonitoring::IMetricEncoderPtr encoder, + TCountableBase::EVisibility visibility = TCountableBase::EVisibility::Public); + void ToJson(const TDynamicCounters& counters, IOutputStream* out); TString ToJson(const TDynamicCounters& counters); diff --git a/library/cpp/monlib/encode/buffered/buffered_encoder_base.cpp b/library/cpp/monlib/encode/buffered/buffered_encoder_base.cpp index 4928fdf70f..87c832d642 100644 --- a/library/cpp/monlib/encode/buffered/buffered_encoder_base.cpp +++ b/library/cpp/monlib/encode/buffered/buffered_encoder_base.cpp @@ -1,8 +1,8 @@ #include "buffered_encoder_base.h" -#include <util/string/join.h> -#include <util/string/builder.h> - +#include <util/string/join.h> +#include <util/string/builder.h> + namespace NMonitoring { void TBufferedEncoderBase::OnStreamBegin() { @@ -42,8 +42,8 @@ void TBufferedEncoderBase::OnMetricEnd() { Y_ENSURE(existing.GetValueType() == metric.TimeSeries.GetValueType(), "Time series point type mismatch: expected " << existing.GetValueType() - << " but found " << metric.TimeSeries.GetValueType() - << ", labels '" << FormatLabels(metric.Labels) << "'"); + << " but found " << metric.TimeSeries.GetValueType() + << ", labels '" << FormatLabels(metric.Labels) << "'"); existing.CopyFrom(metric.TimeSeries); Metrics_.pop_back(); @@ -144,27 +144,27 @@ void TBufferedEncoderBase::OnLogHistogram(TInstant time, TLogHistogramSnapshotPt metric.TimeSeries.Add(time, s.Get()); } -TString TBufferedEncoderBase::FormatLabels(const TPooledLabels& labels) const { - auto formattedLabels = TVector<TString>(Reserve(labels.size() + CommonLabels_.size())); - auto addLabel = [&](const TPooledLabel& l) { - auto formattedLabel = TStringBuilder() << LabelNamesPool_.Get(l.Key) << '=' << LabelValuesPool_.Get(l.Value); - formattedLabels.push_back(std::move(formattedLabel)); - }; - - for (const auto& l: labels) { - addLabel(l); - } - for (const auto& l: CommonLabels_) { - const auto it = FindIf(labels, [&](const TPooledLabel& label) { - return label.Key == l.Key; - }); - if (it == labels.end()) { - addLabel(l); - } - } - Sort(formattedLabels); - - return TStringBuilder() << "{" << JoinSeq(", ", formattedLabels) << "}"; -} - +TString TBufferedEncoderBase::FormatLabels(const TPooledLabels& labels) const { + auto formattedLabels = TVector<TString>(Reserve(labels.size() + CommonLabels_.size())); + auto addLabel = [&](const TPooledLabel& l) { + auto formattedLabel = TStringBuilder() << LabelNamesPool_.Get(l.Key) << '=' << LabelValuesPool_.Get(l.Value); + formattedLabels.push_back(std::move(formattedLabel)); + }; + + for (const auto& l: labels) { + addLabel(l); + } + for (const auto& l: CommonLabels_) { + const auto it = FindIf(labels, [&](const TPooledLabel& label) { + return label.Key == l.Key; + }); + if (it == labels.end()) { + addLabel(l); + } + } + Sort(formattedLabels); + + return TStringBuilder() << "{" << JoinSeq(", ", formattedLabels) << "}"; +} + } // namespace NMonitoring diff --git a/library/cpp/monlib/encode/buffered/buffered_encoder_base.h b/library/cpp/monlib/encode/buffered/buffered_encoder_base.h index 12f838d905..fe3714e58f 100644 --- a/library/cpp/monlib/encode/buffered/buffered_encoder_base.h +++ b/library/cpp/monlib/encode/buffered/buffered_encoder_base.h @@ -82,10 +82,10 @@ protected: TMetricTimeSeries TimeSeries; }; -protected: - TString FormatLabels(const TPooledLabels& labels) const; - -protected: +protected: + TString FormatLabels(const TPooledLabels& labels) const; + +protected: TEncoderState State_; TStringPoolBuilder LabelNamesPool_; diff --git a/library/cpp/monlib/encode/buffered/string_pool.h b/library/cpp/monlib/encode/buffered/string_pool.h index f07d050729..00e5644608 100644 --- a/library/cpp/monlib/encode/buffered/string_pool.h +++ b/library/cpp/monlib/encode/buffered/string_pool.h @@ -37,10 +37,10 @@ namespace NMonitoring { return StrVector_.at(index).first; } - TStringBuf Get(const TValue* value) const { - return StrVector_.at(value->Index).first; - } - + TStringBuf Get(const TValue* value) const { + return StrVector_.at(value->Index).first; + } + template <typename TConsumer> void ForEach(TConsumer&& c) { Y_ENSURE(IsBuilt_, "Pool must be sorted first"); diff --git a/library/cpp/monlib/encode/json/json.h b/library/cpp/monlib/encode/json/json.h index f6968c0016..21530f20c3 100644 --- a/library/cpp/monlib/encode/json/json.h +++ b/library/cpp/monlib/encode/json/json.h @@ -16,14 +16,14 @@ namespace NMonitoring { /// Buffered encoder will merge series with same labels into one. IMetricEncoderPtr BufferedEncoderJson(IOutputStream* out, int indentation = 0); - IMetricEncoderPtr EncoderCloudJson(IOutputStream* out, - int indentation = 0, - TStringBuf metricNameLabel = "name"); - - IMetricEncoderPtr BufferedEncoderCloudJson(IOutputStream* out, - int indentation = 0, - TStringBuf metricNameLabel = "name"); - + IMetricEncoderPtr EncoderCloudJson(IOutputStream* out, + int indentation = 0, + TStringBuf metricNameLabel = "name"); + + IMetricEncoderPtr BufferedEncoderCloudJson(IOutputStream* out, + int indentation = 0, + TStringBuf metricNameLabel = "name"); + void DecodeJson(TStringBuf data, IMetricConsumer* c, TStringBuf metricNameLabel = "name"); } diff --git a/library/cpp/monlib/encode/json/json_decoder.cpp b/library/cpp/monlib/encode/json/json_decoder.cpp index 8420b93aab..d44ff5fd28 100644 --- a/library/cpp/monlib/encode/json/json_decoder.cpp +++ b/library/cpp/monlib/encode/json/json_decoder.cpp @@ -52,7 +52,7 @@ public: Bounds_.clear(); Values_.clear(); - InfPresented_ = false; + InfPresented_ = false; return snapshot; } diff --git a/library/cpp/monlib/encode/json/json_decoder_ut.cpp b/library/cpp/monlib/encode/json/json_decoder_ut.cpp index 52c34e4f37..4464e1d26a 100644 --- a/library/cpp/monlib/encode/json/json_decoder_ut.cpp +++ b/library/cpp/monlib/encode/json/json_decoder_ut.cpp @@ -120,60 +120,60 @@ Y_UNIT_TEST_SUITE(TJsonDecoderTest) { ValidateCommonParts(std::move(commonParts), true, true); ValidateMetrics(collector.Metrics); } - - Y_UNIT_TEST(CanParseHistogramsWithInf) { - const char* metricsData = R"({ -"metrics": - [ - { - "hist": { - "bounds": [ - 10 - ], - "buckets": [ - 11 - ], - "inf": 12 - }, - "name":"s1", - "type": "HIST_RATE" - }, - { - "hist": { - "bounds": [ - 20 - ], - "buckets": [ - 21 - ] - }, - "name":"s2", - "type":"HIST_RATE" - } - ] -})"; - TCollectingConsumer consumer(false); - DecodeJson(metricsData, &consumer); - - UNIT_ASSERT_VALUES_EQUAL(consumer.Metrics.size(), 2); - { - const auto& m = consumer.Metrics[0]; - UNIT_ASSERT_VALUES_EQUAL(m.Kind, EMetricType::HIST_RATE); - UNIT_ASSERT_VALUES_EQUAL(m.Values->Size(), 1); - const auto* histogram = (*m.Values)[0].GetValue().AsHistogram(); - UNIT_ASSERT_VALUES_EQUAL(histogram->Count(), 2); - UNIT_ASSERT_VALUES_EQUAL(histogram->UpperBound(1), Max<TBucketBound>()); - UNIT_ASSERT_VALUES_EQUAL(histogram->Value(0), 11); - UNIT_ASSERT_VALUES_EQUAL(histogram->Value(1), 12); - } - { - const auto& m = consumer.Metrics[1]; - UNIT_ASSERT_VALUES_EQUAL(m.Kind, EMetricType::HIST_RATE); - UNIT_ASSERT_VALUES_EQUAL(m.Values->Size(), 1); - const auto* histogram = (*m.Values)[0].GetValue().AsHistogram(); - UNIT_ASSERT_VALUES_EQUAL(histogram->Count(), 1); - UNIT_ASSERT_VALUES_EQUAL(histogram->UpperBound(0), 20); - UNIT_ASSERT_VALUES_EQUAL(histogram->Value(0), 21); - } - } + + Y_UNIT_TEST(CanParseHistogramsWithInf) { + const char* metricsData = R"({ +"metrics": + [ + { + "hist": { + "bounds": [ + 10 + ], + "buckets": [ + 11 + ], + "inf": 12 + }, + "name":"s1", + "type": "HIST_RATE" + }, + { + "hist": { + "bounds": [ + 20 + ], + "buckets": [ + 21 + ] + }, + "name":"s2", + "type":"HIST_RATE" + } + ] +})"; + TCollectingConsumer consumer(false); + DecodeJson(metricsData, &consumer); + + UNIT_ASSERT_VALUES_EQUAL(consumer.Metrics.size(), 2); + { + const auto& m = consumer.Metrics[0]; + UNIT_ASSERT_VALUES_EQUAL(m.Kind, EMetricType::HIST_RATE); + UNIT_ASSERT_VALUES_EQUAL(m.Values->Size(), 1); + const auto* histogram = (*m.Values)[0].GetValue().AsHistogram(); + UNIT_ASSERT_VALUES_EQUAL(histogram->Count(), 2); + UNIT_ASSERT_VALUES_EQUAL(histogram->UpperBound(1), Max<TBucketBound>()); + UNIT_ASSERT_VALUES_EQUAL(histogram->Value(0), 11); + UNIT_ASSERT_VALUES_EQUAL(histogram->Value(1), 12); + } + { + const auto& m = consumer.Metrics[1]; + UNIT_ASSERT_VALUES_EQUAL(m.Kind, EMetricType::HIST_RATE); + UNIT_ASSERT_VALUES_EQUAL(m.Values->Size(), 1); + const auto* histogram = (*m.Values)[0].GetValue().AsHistogram(); + UNIT_ASSERT_VALUES_EQUAL(histogram->Count(), 1); + UNIT_ASSERT_VALUES_EQUAL(histogram->UpperBound(0), 20); + UNIT_ASSERT_VALUES_EQUAL(histogram->Value(0), 21); + } + } } diff --git a/library/cpp/monlib/encode/json/json_encoder.cpp b/library/cpp/monlib/encode/json/json_encoder.cpp index fb14ffdf07..20d2bb6283 100644 --- a/library/cpp/monlib/encode/json/json_encoder.cpp +++ b/library/cpp/monlib/encode/json/json_encoder.cpp @@ -14,21 +14,21 @@ namespace NMonitoring { namespace { - enum class EJsonStyle { - Solomon, - Cloud - }; - + enum class EJsonStyle { + Solomon, + Cloud + }; + /////////////////////////////////////////////////////////////////////// // TJsonWriter /////////////////////////////////////////////////////////////////////// class TJsonWriter { public: - TJsonWriter(IOutputStream* out, int indentation, EJsonStyle style, TStringBuf metricNameLabel) + TJsonWriter(IOutputStream* out, int indentation, EJsonStyle style, TStringBuf metricNameLabel) : Buf_(NJsonWriter::HEM_UNSAFE, out) - , Style_(style) - , MetricNameLabel_(metricNameLabel) - , CurrentMetricName_() + , Style_(style) + , MetricNameLabel_(metricNameLabel) + , CurrentMetricName_() { Buf_.SetIndentSpaces(indentation); Buf_.SetWriteNanAsString(); @@ -37,11 +37,11 @@ namespace NMonitoring { void WriteTime(TInstant time) { if (time != TInstant::Zero()) { Buf_.WriteKey(TStringBuf("ts")); - if (Style_ == EJsonStyle::Solomon) { - Buf_.WriteULongLong(time.Seconds()); - } else { - Buf_.WriteString(time.ToString()); - } + if (Style_ == EJsonStyle::Solomon) { + Buf_.WriteULongLong(time.Seconds()); + } else { + Buf_.WriteString(time.ToString()); + } } } @@ -61,8 +61,8 @@ namespace NMonitoring { } void WriteValue(IHistogramSnapshot* s) { - Y_ENSURE(Style_ == EJsonStyle::Solomon); - + Y_ENSURE(Style_ == EJsonStyle::Solomon); + Buf_.WriteKey(TStringBuf("hist")); Buf_.BeginObject(); if (ui32 count = s->Count()) { @@ -94,8 +94,8 @@ namespace NMonitoring { } void WriteValue(ISummaryDoubleSnapshot* s) { - Y_ENSURE(Style_ == EJsonStyle::Solomon); - + Y_ENSURE(Style_ == EJsonStyle::Solomon); + Buf_.WriteKey(TStringBuf("summary")); Buf_.BeginObject(); @@ -118,8 +118,8 @@ namespace NMonitoring { } void WriteValue(TLogHistogramSnapshot* s) { - Y_ENSURE(Style_ == EJsonStyle::Solomon); - + Y_ENSURE(Style_ == EJsonStyle::Solomon); + Buf_.WriteKey(TStringBuf("log_hist")); Buf_.BeginObject(); @@ -169,64 +169,64 @@ namespace NMonitoring { break; case EMetricValueType::UNKNOWN: - ythrow yexception() << "unknown metric value type"; + ythrow yexception() << "unknown metric value type"; } } void WriteLabel(TStringBuf name, TStringBuf value) { Y_ENSURE(IsUtf(name), "label name is not valid UTF-8 string"); Y_ENSURE(IsUtf(value), "label value is not valid UTF-8 string"); - if (Style_ == EJsonStyle::Cloud && name == MetricNameLabel_) { - CurrentMetricName_ = value; - } else { - Buf_.WriteKey(name); - Buf_.WriteString(value); - } - } - - void WriteMetricType(EMetricType type) { - if (Style_ == EJsonStyle::Cloud) { - Buf_.WriteKey("type"); - Buf_.WriteString(MetricTypeToCloudStr(type)); - } else { - Buf_.WriteKey("kind"); - Buf_.WriteString(MetricTypeToStr(type)); - } - } - - void WriteName() { - if (Style_ != EJsonStyle::Cloud) { - return; - } - if (CurrentMetricName_.Empty()) { - ythrow yexception() << "label '" << MetricNameLabel_ << "' is not defined"; - } - Buf_.WriteKey("name"); - Buf_.WriteString(CurrentMetricName_); - CurrentMetricName_.clear(); - } - - private: - static TStringBuf MetricTypeToCloudStr(EMetricType type) { - switch (type) { - case EMetricType::GAUGE: - return TStringBuf("DGAUGE"); - case EMetricType::COUNTER: - return TStringBuf("COUNTER"); - case EMetricType::RATE: - return TStringBuf("RATE"); - case EMetricType::IGAUGE: - return TStringBuf("IGAUGE"); - default: - ythrow yexception() << "metric type '" << type << "' is not supported by cloud json format"; - } - } - + if (Style_ == EJsonStyle::Cloud && name == MetricNameLabel_) { + CurrentMetricName_ = value; + } else { + Buf_.WriteKey(name); + Buf_.WriteString(value); + } + } + + void WriteMetricType(EMetricType type) { + if (Style_ == EJsonStyle::Cloud) { + Buf_.WriteKey("type"); + Buf_.WriteString(MetricTypeToCloudStr(type)); + } else { + Buf_.WriteKey("kind"); + Buf_.WriteString(MetricTypeToStr(type)); + } + } + + void WriteName() { + if (Style_ != EJsonStyle::Cloud) { + return; + } + if (CurrentMetricName_.Empty()) { + ythrow yexception() << "label '" << MetricNameLabel_ << "' is not defined"; + } + Buf_.WriteKey("name"); + Buf_.WriteString(CurrentMetricName_); + CurrentMetricName_.clear(); + } + + private: + static TStringBuf MetricTypeToCloudStr(EMetricType type) { + switch (type) { + case EMetricType::GAUGE: + return TStringBuf("DGAUGE"); + case EMetricType::COUNTER: + return TStringBuf("COUNTER"); + case EMetricType::RATE: + return TStringBuf("RATE"); + case EMetricType::IGAUGE: + return TStringBuf("IGAUGE"); + default: + ythrow yexception() << "metric type '" << type << "' is not supported by cloud json format"; + } + } + protected: NJsonWriter::TBuf Buf_; - EJsonStyle Style_; - TString MetricNameLabel_; - TString CurrentMetricName_; + EJsonStyle Style_; + TString MetricNameLabel_; + TString CurrentMetricName_; }; /////////////////////////////////////////////////////////////////////// @@ -234,8 +234,8 @@ namespace NMonitoring { /////////////////////////////////////////////////////////////////////// class TEncoderJson final: public IMetricEncoder, public TJsonWriter { public: - TEncoderJson(IOutputStream* out, int indentation, EJsonStyle style, TStringBuf metricNameLabel) - : TJsonWriter{out, indentation, style, metricNameLabel} + TEncoderJson(IOutputStream* out, int indentation, EJsonStyle style, TStringBuf metricNameLabel) + : TJsonWriter{out, indentation, style, metricNameLabel} { } @@ -267,11 +267,11 @@ namespace NMonitoring { State_.Switch(TEncoderState::EState::ROOT, TEncoderState::EState::METRIC); if (Buf_.KeyExpected()) { // first metric, so open metrics array - Buf_.WriteKey(TStringBuf(Style_ == EJsonStyle::Solomon ? "sensors" : "metrics")); + Buf_.WriteKey(TStringBuf(Style_ == EJsonStyle::Solomon ? "sensors" : "metrics")); Buf_.BeginList(); } Buf_.BeginObject(); - WriteMetricType(type); + WriteMetricType(type); } void OnMetricEnd() override { @@ -300,7 +300,7 @@ namespace NMonitoring { } if (State_ == TEncoderState::EState::ROOT) { State_ = TEncoderState::EState::COMMON_LABELS; - Buf_.WriteKey(TStringBuf(Style_ == EJsonStyle::Solomon ? "commonLabels" : "labels")); + Buf_.WriteKey(TStringBuf(Style_ == EJsonStyle::Solomon ? "commonLabels" : "labels")); } else if (State_ == TEncoderState::EState::METRIC) { State_ = TEncoderState::EState::METRIC_LABELS; Buf_.WriteKey(TStringBuf("labels")); @@ -323,9 +323,9 @@ namespace NMonitoring { Y_ENSURE(!EmptyLabels_, "Labels cannot be empty"); Buf_.EndObject(); - if (State_ == TEncoderState::EState::METRIC) { - WriteName(); - } + if (State_ == TEncoderState::EState::METRIC) { + WriteName(); + } } void OnLabel(TStringBuf name, TStringBuf value) override { @@ -420,8 +420,8 @@ namespace NMonitoring { /////////////////////////////////////////////////////////////////////// class TBufferedJsonEncoder : public TBufferedEncoderBase, public TJsonWriter { public: - TBufferedJsonEncoder(IOutputStream* out, int indentation, EJsonStyle style, TStringBuf metricNameLabel) - : TJsonWriter{out, indentation, style, metricNameLabel} + TBufferedJsonEncoder(IOutputStream* out, int indentation, EJsonStyle style, TStringBuf metricNameLabel) + : TJsonWriter{out, indentation, style, metricNameLabel} { MetricsMergingMode_ = EMetricsMergingMode::MERGE_METRICS; } @@ -464,12 +464,12 @@ namespace NMonitoring { WriteTime(CommonTime_); if (CommonLabels_.size() > 0) { - Buf_.WriteKey(TStringBuf(Style_ == EJsonStyle::Solomon ? "commonLabels": "labels")); - WriteLabels(CommonLabels_, true); + Buf_.WriteKey(TStringBuf(Style_ == EJsonStyle::Solomon ? "commonLabels": "labels")); + WriteLabels(CommonLabels_, true); } if (Metrics_.size() > 0) { - Buf_.WriteKey(TStringBuf(Style_ == EJsonStyle::Solomon ? "sensors" : "metrics")); + Buf_.WriteKey(TStringBuf(Style_ == EJsonStyle::Solomon ? "sensors" : "metrics")); WriteMetrics(); } @@ -488,10 +488,10 @@ namespace NMonitoring { void WriteMetric(TMetric& metric) { Buf_.BeginObject(); - WriteMetricType(metric.MetricType); + WriteMetricType(metric.MetricType); Buf_.WriteKey(TStringBuf("labels")); - WriteLabels(metric.Labels, false); + WriteLabels(metric.Labels, false); metric.TimeSeries.SortByTs(); if (metric.TimeSeries.Size() == 1) { @@ -515,7 +515,7 @@ namespace NMonitoring { Buf_.EndObject(); } - void WriteLabels(const TPooledLabels& labels, bool isCommon) { + void WriteLabels(const TPooledLabels& labels, bool isCommon) { Buf_.BeginObject(); for (auto i = 0u; i < labels.size(); ++i) { @@ -526,10 +526,10 @@ namespace NMonitoring { } Buf_.EndObject(); - - if (!isCommon) { - WriteName(); - } + + if (!isCommon) { + WriteName(); + } } private: @@ -539,18 +539,18 @@ namespace NMonitoring { } IMetricEncoderPtr EncoderJson(IOutputStream* out, int indentation) { - return MakeHolder<TEncoderJson>(out, indentation, EJsonStyle::Solomon, ""); + return MakeHolder<TEncoderJson>(out, indentation, EJsonStyle::Solomon, ""); } IMetricEncoderPtr BufferedEncoderJson(IOutputStream* out, int indentation) { - return MakeHolder<TBufferedJsonEncoder>(out, indentation, EJsonStyle::Solomon, ""); + return MakeHolder<TBufferedJsonEncoder>(out, indentation, EJsonStyle::Solomon, ""); + } + + IMetricEncoderPtr EncoderCloudJson(IOutputStream* out, int indentation, TStringBuf metricNameLabel) { + return MakeHolder<TEncoderJson>(out, indentation, EJsonStyle::Cloud, metricNameLabel); + } + + IMetricEncoderPtr BufferedEncoderCloudJson(IOutputStream* out, int indentation, TStringBuf metricNameLabel) { + return MakeHolder<TBufferedJsonEncoder>(out, indentation, EJsonStyle::Cloud, metricNameLabel); } - - IMetricEncoderPtr EncoderCloudJson(IOutputStream* out, int indentation, TStringBuf metricNameLabel) { - return MakeHolder<TEncoderJson>(out, indentation, EJsonStyle::Cloud, metricNameLabel); - } - - IMetricEncoderPtr BufferedEncoderCloudJson(IOutputStream* out, int indentation, TStringBuf metricNameLabel) { - return MakeHolder<TBufferedJsonEncoder>(out, indentation, EJsonStyle::Cloud, metricNameLabel); - } } diff --git a/library/cpp/monlib/encode/json/json_ut.cpp b/library/cpp/monlib/encode/json/json_ut.cpp index 9bd38e5fc5..09e7909289 100644 --- a/library/cpp/monlib/encode/json/json_ut.cpp +++ b/library/cpp/monlib/encode/json/json_ut.cpp @@ -8,7 +8,7 @@ #include <library/cpp/testing/unittest/registar.h> #include <util/stream/str.h> -#include <util/string/builder.h> +#include <util/string/builder.h> #include <limits> @@ -136,171 +136,171 @@ Y_UNIT_TEST_SUITE(TJsonTest) { const TInstant now = TInstant::ParseIso8601Deprecated("2017-11-05T01:02:03Z"); Y_UNIT_TEST(Encode) { - auto check = [](bool cloud, bool buffered, TStringBuf expectedResourceKey) { - TString json; - TStringOutput out(json); - auto e = cloud - ? (buffered ? BufferedEncoderCloudJson(&out, 2, "metric") : EncoderCloudJson(&out, 2, "metric")) - : (buffered ? BufferedEncoderJson(&out, 2) : EncoderJson(&out, 2)); - e->OnStreamBegin(); - { // common time - e->OnCommonTime(TInstant::Seconds(1500000000)); + auto check = [](bool cloud, bool buffered, TStringBuf expectedResourceKey) { + TString json; + TStringOutput out(json); + auto e = cloud + ? (buffered ? BufferedEncoderCloudJson(&out, 2, "metric") : EncoderCloudJson(&out, 2, "metric")) + : (buffered ? BufferedEncoderJson(&out, 2) : EncoderJson(&out, 2)); + e->OnStreamBegin(); + { // common time + e->OnCommonTime(TInstant::Seconds(1500000000)); } - { // common labels + { // common labels e->OnLabelsBegin(); - e->OnLabel("project", "solomon"); + e->OnLabel("project", "solomon"); e->OnLabelsEnd(); } - { // metric #1 - e->OnMetricBegin(EMetricType::COUNTER); - { - e->OnLabelsBegin(); - e->OnLabel("metric", "single"); - e->OnLabel("labels", "l1"); - e->OnLabelsEnd(); - } - e->OnUint64(now, 17); - e->OnMetricEnd(); + { // metric #1 + e->OnMetricBegin(EMetricType::COUNTER); + { + e->OnLabelsBegin(); + e->OnLabel("metric", "single"); + e->OnLabel("labels", "l1"); + e->OnLabelsEnd(); + } + e->OnUint64(now, 17); + e->OnMetricEnd(); + } + { // metric #2 + e->OnMetricBegin(EMetricType::RATE); + { + e->OnLabelsBegin(); + e->OnLabel("metric", "single"); + e->OnLabel("labels", "l2"); + e->OnLabelsEnd(); + } + e->OnUint64(now, 17); + e->OnMetricEnd(); + } + { // metric #3 + e->OnMetricBegin(EMetricType::GAUGE); + e->OnDouble(now, 3.14); + { + e->OnLabelsBegin(); + e->OnLabel("metric", "single"); + e->OnLabel("labels", "l3"); + e->OnLabelsEnd(); + } + e->OnMetricEnd(); + } + { // metric #4 + e->OnMetricBegin(EMetricType::IGAUGE); + e->OnInt64(now, 42); + { + e->OnLabelsBegin(); + e->OnLabel("metric", "single_igauge"); + e->OnLabel("labels", "l4"); + e->OnLabelsEnd(); + } + e->OnMetricEnd(); } - { // metric #2 - e->OnMetricBegin(EMetricType::RATE); - { - e->OnLabelsBegin(); - e->OnLabel("metric", "single"); - e->OnLabel("labels", "l2"); - e->OnLabelsEnd(); - } - e->OnUint64(now, 17); - e->OnMetricEnd(); + { // metric #5 + e->OnMetricBegin(EMetricType::GAUGE); + { + e->OnLabelsBegin(); + e->OnLabel("metric", "multiple"); + e->OnLabel("labels", "l5"); + e->OnLabelsEnd(); + } + e->OnDouble(now, std::numeric_limits<double>::quiet_NaN()); + e->OnDouble(now + TDuration::Seconds(15), std::numeric_limits<double>::infinity()); + e->OnDouble(now + TDuration::Seconds(30), -std::numeric_limits<double>::infinity()); + e->OnMetricEnd(); } - { // metric #3 - e->OnMetricBegin(EMetricType::GAUGE); - e->OnDouble(now, 3.14); - { - e->OnLabelsBegin(); - e->OnLabel("metric", "single"); - e->OnLabel("labels", "l3"); - e->OnLabelsEnd(); - } - e->OnMetricEnd(); - } - { // metric #4 - e->OnMetricBegin(EMetricType::IGAUGE); - e->OnInt64(now, 42); - { - e->OnLabelsBegin(); - e->OnLabel("metric", "single_igauge"); - e->OnLabel("labels", "l4"); - e->OnLabelsEnd(); - } - e->OnMetricEnd(); - } - { // metric #5 - e->OnMetricBegin(EMetricType::GAUGE); - { - e->OnLabelsBegin(); - e->OnLabel("metric", "multiple"); - e->OnLabel("labels", "l5"); - e->OnLabelsEnd(); - } - e->OnDouble(now, std::numeric_limits<double>::quiet_NaN()); - e->OnDouble(now + TDuration::Seconds(15), std::numeric_limits<double>::infinity()); - e->OnDouble(now + TDuration::Seconds(30), -std::numeric_limits<double>::infinity()); - e->OnMetricEnd(); - } - - { // metric #6 - e->OnMetricBegin(EMetricType::COUNTER); - e->OnUint64(now, 1337); - e->OnUint64(now + TDuration::Seconds(15), 1338); - { - e->OnLabelsBegin(); - e->OnLabel("metric", "multiple"); - e->OnLabel("labels", "l6"); - e->OnLabelsEnd(); - } - e->OnMetricEnd(); - } - e->OnStreamEnd(); - e->Close(); - json += "\n"; - + + { // metric #6 + e->OnMetricBegin(EMetricType::COUNTER); + e->OnUint64(now, 1337); + e->OnUint64(now + TDuration::Seconds(15), 1338); + { + e->OnLabelsBegin(); + e->OnLabel("metric", "multiple"); + e->OnLabel("labels", "l6"); + e->OnLabelsEnd(); + } + e->OnMetricEnd(); + } + e->OnStreamEnd(); + e->Close(); + json += "\n"; + auto parseJson = [] (auto buf) { NJson::TJsonValue value; NJson::ReadJsonTree(buf, &value, true); return value; }; - const auto expectedJson = NResource::Find(expectedResourceKey); + const auto expectedJson = NResource::Find(expectedResourceKey); UNIT_ASSERT_EQUAL(parseJson(json), parseJson(expectedJson)); - }; - - check(false, false, "/expected.json"); - check(false, true, "/expected_buffered.json"); - check(true, false, "/expected_cloud.json"); - check(true, true, "/expected_cloud_buffered.json"); - } - - TLogHistogramSnapshotPtr TestLogHistogram(ui32 v = 1) { - TVector<double> buckets{0.5 * v, 0.25 * v, 0.25 * v, 0.5 * v}; - return MakeIntrusive<TLogHistogramSnapshot>(1.5, 1u, 0, std::move(buckets)); - } - - Y_UNIT_TEST(HistogramAndSummaryMetricTypesAreNotSupportedByCloudJson) { - const TInstant now = TInstant::ParseIso8601Deprecated("2017-11-05T01:02:03Z"); - - auto emit = [&](IMetricEncoder* encoder, EMetricType metricType) { - encoder->OnStreamBegin(); + }; + + check(false, false, "/expected.json"); + check(false, true, "/expected_buffered.json"); + check(true, false, "/expected_cloud.json"); + check(true, true, "/expected_cloud_buffered.json"); + } + + TLogHistogramSnapshotPtr TestLogHistogram(ui32 v = 1) { + TVector<double> buckets{0.5 * v, 0.25 * v, 0.25 * v, 0.5 * v}; + return MakeIntrusive<TLogHistogramSnapshot>(1.5, 1u, 0, std::move(buckets)); + } + + Y_UNIT_TEST(HistogramAndSummaryMetricTypesAreNotSupportedByCloudJson) { + const TInstant now = TInstant::ParseIso8601Deprecated("2017-11-05T01:02:03Z"); + + auto emit = [&](IMetricEncoder* encoder, EMetricType metricType) { + encoder->OnStreamBegin(); { - encoder->OnMetricBegin(metricType); - { - encoder->OnLabelsBegin(); - encoder->OnLabel("name", "m"); - encoder->OnLabelsEnd(); - } - - switch (metricType) { - case EMetricType::HIST: { - auto histogram = ExponentialHistogram(6, 2); - encoder->OnHistogram(now, histogram->Snapshot()); - break; - } - case EMetricType::LOGHIST: { - auto histogram = TestLogHistogram(); - encoder->OnLogHistogram(now, histogram); - break; - } - case EMetricType::DSUMMARY: { - auto summary = MakeIntrusive<TSummaryDoubleSnapshot>(10., -0.5, 0.5, 0.3, 30u); - encoder->OnSummaryDouble(now, summary); - break; - } - default: - Y_FAIL("unexpected metric type [%s]", ToString(metricType).c_str()); - } - - encoder->OnMetricEnd(); + encoder->OnMetricBegin(metricType); + { + encoder->OnLabelsBegin(); + encoder->OnLabel("name", "m"); + encoder->OnLabelsEnd(); + } + + switch (metricType) { + case EMetricType::HIST: { + auto histogram = ExponentialHistogram(6, 2); + encoder->OnHistogram(now, histogram->Snapshot()); + break; + } + case EMetricType::LOGHIST: { + auto histogram = TestLogHistogram(); + encoder->OnLogHistogram(now, histogram); + break; + } + case EMetricType::DSUMMARY: { + auto summary = MakeIntrusive<TSummaryDoubleSnapshot>(10., -0.5, 0.5, 0.3, 30u); + encoder->OnSummaryDouble(now, summary); + break; + } + default: + Y_FAIL("unexpected metric type [%s]", ToString(metricType).c_str()); + } + + encoder->OnMetricEnd(); } - encoder->OnStreamEnd(); - encoder->Close(); - }; - - auto doTest = [&](bool buffered, EMetricType metricType) { - TString json; - TStringOutput out(json); - auto encoder = buffered ? BufferedEncoderCloudJson(&out, 2) : EncoderCloudJson(&out, 2); - const TString expectedMessage = TStringBuilder() - << "metric type '" << metricType << "' is not supported by cloud json format"; - UNIT_ASSERT_EXCEPTION_CONTAINS_C(emit(encoder.Get(), metricType), yexception, expectedMessage, - TString("buffered: ") + ToString(buffered)); - }; - - doTest(false, EMetricType::HIST); - doTest(false, EMetricType::LOGHIST); - doTest(false, EMetricType::DSUMMARY); - doTest(true, EMetricType::HIST); - doTest(true, EMetricType::LOGHIST); - doTest(true, EMetricType::DSUMMARY); + encoder->OnStreamEnd(); + encoder->Close(); + }; + + auto doTest = [&](bool buffered, EMetricType metricType) { + TString json; + TStringOutput out(json); + auto encoder = buffered ? BufferedEncoderCloudJson(&out, 2) : EncoderCloudJson(&out, 2); + const TString expectedMessage = TStringBuilder() + << "metric type '" << metricType << "' is not supported by cloud json format"; + UNIT_ASSERT_EXCEPTION_CONTAINS_C(emit(encoder.Get(), metricType), yexception, expectedMessage, + TString("buffered: ") + ToString(buffered)); + }; + + doTest(false, EMetricType::HIST); + doTest(false, EMetricType::LOGHIST); + doTest(false, EMetricType::DSUMMARY); + doTest(true, EMetricType::HIST); + doTest(true, EMetricType::LOGHIST); + doTest(true, EMetricType::DSUMMARY); } Y_UNIT_TEST(MetricsWithDifferentLabelOrderGetMerged) { @@ -369,13 +369,13 @@ Y_UNIT_TEST_SUITE(TJsonTest) { UNIT_ASSERT_VALUES_EQUAL(samples.CommonLabelsSize(), 1); AssertLabelEqual(samples.GetCommonLabels(0), "project", "solomon"); - UNIT_ASSERT_VALUES_EQUAL(samples.SamplesSize(), 6); + UNIT_ASSERT_VALUES_EQUAL(samples.SamplesSize(), 6); { const NProto::TMultiSample& s = samples.GetSamples(0); UNIT_ASSERT_EQUAL(s.GetMetricType(), NProto::COUNTER); UNIT_ASSERT_VALUES_EQUAL(s.LabelsSize(), 2); AssertLabelEqual(s.GetLabels(0), "metric", "single"); - AssertLabelEqual(s.GetLabels(1), "labels", "l1"); + AssertLabelEqual(s.GetLabels(1), "labels", "l1"); UNIT_ASSERT_VALUES_EQUAL(s.PointsSize(), 1); AssertPointEqual(s.GetPoints(0), now, ui64(17)); @@ -385,7 +385,7 @@ Y_UNIT_TEST_SUITE(TJsonTest) { UNIT_ASSERT_EQUAL(s.GetMetricType(), NProto::RATE); UNIT_ASSERT_VALUES_EQUAL(s.LabelsSize(), 2); AssertLabelEqual(s.GetLabels(0), "metric", "single"); - AssertLabelEqual(s.GetLabels(1), "labels", "l2"); + AssertLabelEqual(s.GetLabels(1), "labels", "l2"); UNIT_ASSERT_VALUES_EQUAL(s.PointsSize(), 1); AssertPointEqual(s.GetPoints(0), now, ui64(17)); @@ -395,27 +395,27 @@ Y_UNIT_TEST_SUITE(TJsonTest) { UNIT_ASSERT_EQUAL(s.GetMetricType(), NProto::GAUGE); UNIT_ASSERT_VALUES_EQUAL(s.LabelsSize(), 2); AssertLabelEqual(s.GetLabels(0), "metric", "single"); - AssertLabelEqual(s.GetLabels(1), "labels", "l3"); + AssertLabelEqual(s.GetLabels(1), "labels", "l3"); UNIT_ASSERT_VALUES_EQUAL(s.PointsSize(), 1); AssertPointEqual(s.GetPoints(0), now, 3.14); } { const NProto::TMultiSample& s = samples.GetSamples(3); - UNIT_ASSERT_EQUAL(s.GetMetricType(), NProto::IGAUGE); - UNIT_ASSERT_VALUES_EQUAL(s.LabelsSize(), 2); - AssertLabelEqual(s.GetLabels(0), "metric", "single_igauge"); - AssertLabelEqual(s.GetLabels(1), "labels", "l4"); - - UNIT_ASSERT_VALUES_EQUAL(s.PointsSize(), 1); - AssertPointEqual(s.GetPoints(0), now, i64(42)); - } - { - const NProto::TMultiSample& s = samples.GetSamples(4); + UNIT_ASSERT_EQUAL(s.GetMetricType(), NProto::IGAUGE); + UNIT_ASSERT_VALUES_EQUAL(s.LabelsSize(), 2); + AssertLabelEqual(s.GetLabels(0), "metric", "single_igauge"); + AssertLabelEqual(s.GetLabels(1), "labels", "l4"); + + UNIT_ASSERT_VALUES_EQUAL(s.PointsSize(), 1); + AssertPointEqual(s.GetPoints(0), now, i64(42)); + } + { + const NProto::TMultiSample& s = samples.GetSamples(4); UNIT_ASSERT_EQUAL(s.GetMetricType(), NProto::GAUGE); UNIT_ASSERT_VALUES_EQUAL(s.LabelsSize(), 2); AssertLabelEqual(s.GetLabels(0), "metric", "multiple"); - AssertLabelEqual(s.GetLabels(1), "labels", "l5"); + AssertLabelEqual(s.GetLabels(1), "labels", "l5"); UNIT_ASSERT_VALUES_EQUAL(s.PointsSize(), 3); AssertPointEqualNan(s.GetPoints(0), now); @@ -423,11 +423,11 @@ Y_UNIT_TEST_SUITE(TJsonTest) { AssertPointEqualInf(s.GetPoints(2), now + TDuration::Seconds(30), -11); } { - const NProto::TMultiSample& s = samples.GetSamples(5); + const NProto::TMultiSample& s = samples.GetSamples(5); UNIT_ASSERT_EQUAL(s.GetMetricType(), NProto::COUNTER); UNIT_ASSERT_VALUES_EQUAL(s.LabelsSize(), 2); AssertLabelEqual(s.GetLabels(0), "metric", "multiple"); - AssertLabelEqual(s.GetLabels(1), "labels", "l6"); + AssertLabelEqual(s.GetLabels(1), "labels", "l6"); UNIT_ASSERT_VALUES_EQUAL(s.PointsSize(), 2); AssertPointEqual(s.GetPoints(0), now, ui64(1337)); @@ -1158,61 +1158,61 @@ Y_UNIT_TEST_SUITE(TJsonTest) { UNIT_ASSERT_NO_DIFF(result2, NResource::Find("/int_gauge.json")); } - Y_UNIT_TEST(InconsistentMetricTypes) { - auto emitMetrics = [](IMetricEncoder& encoder, const TString& expectedError) { - encoder.OnMetricBegin(EMetricType::GAUGE); - { - encoder.OnLabelsBegin(); - encoder.OnLabel("name", "m"); - encoder.OnLabel("l1", "v1"); - encoder.OnLabel("l2", "v2"); - encoder.OnLabelsEnd(); - } - encoder.OnDouble(now, 1.0); - encoder.OnMetricEnd(); - - encoder.OnMetricBegin(EMetricType::COUNTER); - { - encoder.OnLabelsBegin(); - encoder.OnLabel("name", "m"); - encoder.OnLabel("l1", "v1"); - encoder.OnLabel("l2", "v2"); - encoder.OnLabelsEnd(); - } - encoder.OnUint64(now, 1); - - UNIT_ASSERT_EXCEPTION_CONTAINS(encoder.OnMetricEnd(), - yexception, - expectedError); - }; - - { - TStringStream out; - auto encoder = BufferedEncoderJson(&out); - - encoder->OnStreamBegin(); - encoder->OnLabelsBegin(); - encoder->OnLabel("c", "cv"); - encoder->OnLabelsEnd(); - emitMetrics(*encoder, - "Time series point type mismatch: expected DOUBLE but found UINT64, " - "labels '{c=cv, l1=v1, l2=v2, name=m}'"); - } - - { - TStringStream out; - auto encoder = BufferedEncoderJson(&out); - - encoder->OnStreamBegin(); - encoder->OnLabelsBegin(); - encoder->OnLabel("l1", "v100"); - encoder->OnLabelsEnd(); - emitMetrics(*encoder, - "Time series point type mismatch: expected DOUBLE but found UINT64, " - "labels '{l1=v1, l2=v2, name=m}'"); - } - } - + Y_UNIT_TEST(InconsistentMetricTypes) { + auto emitMetrics = [](IMetricEncoder& encoder, const TString& expectedError) { + encoder.OnMetricBegin(EMetricType::GAUGE); + { + encoder.OnLabelsBegin(); + encoder.OnLabel("name", "m"); + encoder.OnLabel("l1", "v1"); + encoder.OnLabel("l2", "v2"); + encoder.OnLabelsEnd(); + } + encoder.OnDouble(now, 1.0); + encoder.OnMetricEnd(); + + encoder.OnMetricBegin(EMetricType::COUNTER); + { + encoder.OnLabelsBegin(); + encoder.OnLabel("name", "m"); + encoder.OnLabel("l1", "v1"); + encoder.OnLabel("l2", "v2"); + encoder.OnLabelsEnd(); + } + encoder.OnUint64(now, 1); + + UNIT_ASSERT_EXCEPTION_CONTAINS(encoder.OnMetricEnd(), + yexception, + expectedError); + }; + + { + TStringStream out; + auto encoder = BufferedEncoderJson(&out); + + encoder->OnStreamBegin(); + encoder->OnLabelsBegin(); + encoder->OnLabel("c", "cv"); + encoder->OnLabelsEnd(); + emitMetrics(*encoder, + "Time series point type mismatch: expected DOUBLE but found UINT64, " + "labels '{c=cv, l1=v1, l2=v2, name=m}'"); + } + + { + TStringStream out; + auto encoder = BufferedEncoderJson(&out); + + encoder->OnStreamBegin(); + encoder->OnLabelsBegin(); + encoder->OnLabel("l1", "v100"); + encoder->OnLabelsEnd(); + emitMetrics(*encoder, + "Time series point type mismatch: expected DOUBLE but found UINT64, " + "labels '{l1=v1, l2=v2, name=m}'"); + } + } + Y_UNIT_TEST(IntGaugeDecode) { NProto::TMultiSamplesList samples; { diff --git a/library/cpp/monlib/encode/json/ut/expected.json b/library/cpp/monlib/encode/json/ut/expected.json index 30c405b0bd..ead853455b 100644 --- a/library/cpp/monlib/encode/json/ut/expected.json +++ b/library/cpp/monlib/encode/json/ut/expected.json @@ -11,7 +11,7 @@ "labels": { "metric":"single", - "labels":"l1" + "labels":"l1" }, "ts":1509843723, "value":17 @@ -21,7 +21,7 @@ "labels": { "metric":"single", - "labels":"l2" + "labels":"l2" }, "ts":1509843723, "value":17 @@ -31,27 +31,27 @@ "labels": { "metric":"single", - "labels":"l3" + "labels":"l3" }, "ts":1509843723, "value":3.14 }, { - "kind":"IGAUGE", - "labels": - { - "metric":"single_igauge", - "labels":"l4" - }, - "ts":1509843723, - "value":42 - }, - { + "kind":"IGAUGE", + "labels": + { + "metric":"single_igauge", + "labels":"l4" + }, + "ts":1509843723, + "value":42 + }, + { "kind":"GAUGE", "labels": { "metric":"multiple", - "labels":"l5" + "labels":"l5" }, "timeseries": [ @@ -85,7 +85,7 @@ "labels": { "metric":"multiple", - "labels":"l6" + "labels":"l6" } } ] diff --git a/library/cpp/monlib/encode/json/ut/expected_buffered.json b/library/cpp/monlib/encode/json/ut/expected_buffered.json index 0a33842437..9a6a1d6201 100644 --- a/library/cpp/monlib/encode/json/ut/expected_buffered.json +++ b/library/cpp/monlib/encode/json/ut/expected_buffered.json @@ -37,16 +37,16 @@ "value":3.14 }, { - "kind":"IGAUGE", - "labels": - { + "kind":"IGAUGE", + "labels": + { "labels":"l4", "metric":"single_igauge" - }, - "ts":1509843723, - "value":42 - }, - { + }, + "ts":1509843723, + "value":42 + }, + { "kind":"GAUGE", "labels": { @@ -71,11 +71,11 @@ }, { "kind":"COUNTER", - "labels": - { + "labels": + { "labels":"l6", "metric":"multiple" - }, + }, "timeseries": [ { @@ -86,7 +86,7 @@ "ts":1509843738, "value":1338 } - ] + ] } ] } diff --git a/library/cpp/monlib/encode/json/ut/expected_cloud.json b/library/cpp/monlib/encode/json/ut/expected_cloud.json index 78e90ec3be..6184811579 100644 --- a/library/cpp/monlib/encode/json/ut/expected_cloud.json +++ b/library/cpp/monlib/encode/json/ut/expected_cloud.json @@ -1,92 +1,92 @@ { - "ts":"2017-07-14T02:40:00.000000Z", - "labels": + "ts":"2017-07-14T02:40:00.000000Z", + "labels": { "project":"solomon" }, - "metrics": + "metrics": [ { - "type":"COUNTER", + "type":"COUNTER", "labels": { - "labels":"l1" + "labels":"l1" }, - "name":"single", - "ts":"2017-11-05T01:02:03.000000Z", + "name":"single", + "ts":"2017-11-05T01:02:03.000000Z", "value":17 }, { - "type":"RATE", + "type":"RATE", "labels": { - "labels":"l2" + "labels":"l2" }, - "name":"single", - "ts":"2017-11-05T01:02:03.000000Z", + "name":"single", + "ts":"2017-11-05T01:02:03.000000Z", "value":17 }, { - "type":"DGAUGE", + "type":"DGAUGE", "labels": { - "labels":"l3" + "labels":"l3" }, - "name":"single", - "ts":"2017-11-05T01:02:03.000000Z", + "name":"single", + "ts":"2017-11-05T01:02:03.000000Z", "value":3.14 }, { - "type":"IGAUGE", + "type":"IGAUGE", "labels": { - "labels":"l4" + "labels":"l4" }, - "name":"single_igauge", - "ts":"2017-11-05T01:02:03.000000Z", - "value":42 - }, - { - "type":"DGAUGE", - "labels": - { - "labels":"l5" - }, - "name":"multiple", + "name":"single_igauge", + "ts":"2017-11-05T01:02:03.000000Z", + "value":42 + }, + { + "type":"DGAUGE", + "labels": + { + "labels":"l5" + }, + "name":"multiple", "timeseries": [ { - "ts":"2017-11-05T01:02:03.000000Z", + "ts":"2017-11-05T01:02:03.000000Z", "value":"nan" }, { - "ts":"2017-11-05T01:02:18.000000Z", + "ts":"2017-11-05T01:02:18.000000Z", "value":"inf" }, { - "ts":"2017-11-05T01:02:33.000000Z", + "ts":"2017-11-05T01:02:33.000000Z", "value":"-inf" } ] }, { - "type":"COUNTER", + "type":"COUNTER", "timeseries": [ { - "ts":"2017-11-05T01:02:03.000000Z", + "ts":"2017-11-05T01:02:03.000000Z", "value":1337 }, { - "ts":"2017-11-05T01:02:18.000000Z", + "ts":"2017-11-05T01:02:18.000000Z", "value":1338 } ], "labels": { - "labels":"l6" - }, - "name":"multiple" + "labels":"l6" + }, + "name":"multiple" } ] } diff --git a/library/cpp/monlib/encode/json/ut/expected_cloud_buffered.json b/library/cpp/monlib/encode/json/ut/expected_cloud_buffered.json index 5b7aeb836e..be237d522b 100644 --- a/library/cpp/monlib/encode/json/ut/expected_cloud_buffered.json +++ b/library/cpp/monlib/encode/json/ut/expected_cloud_buffered.json @@ -1,92 +1,92 @@ -{ - "ts":"2017-07-14T02:40:00.000000Z", - "labels": - { - "project":"solomon" - }, - "metrics": - [ - { - "type":"COUNTER", - "labels": - { - "labels":"l1" - }, - "name":"single", - "ts":"2017-11-05T01:02:03.000000Z", - "value":17 - }, - { - "type":"RATE", - "labels": - { - "labels":"l2" - }, - "name":"single", - "ts":"2017-11-05T01:02:03.000000Z", - "value":17 - }, - { - "type":"DGAUGE", - "labels": - { - "labels":"l3" - }, - "name":"single", - "ts":"2017-11-05T01:02:03.000000Z", - "value":3.14 - }, - { - "type":"IGAUGE", - "labels": - { - "labels":"l4" - }, - "name":"single_igauge", - "ts":"2017-11-05T01:02:03.000000Z", - "value":42 - }, - { - "type":"DGAUGE", - "labels": - { - "labels":"l5" - }, - "name":"multiple", - "timeseries": - [ - { - "ts":"2017-11-05T01:02:03.000000Z", - "value":"nan" - }, - { - "ts":"2017-11-05T01:02:18.000000Z", - "value":"inf" - }, - { - "ts":"2017-11-05T01:02:33.000000Z", - "value":"-inf" - } - ] - }, - { - "type":"COUNTER", - "labels": - { - "labels":"l6" - }, - "name":"multiple", - "timeseries": - [ - { - "ts":"2017-11-05T01:02:03.000000Z", - "value":1337 - }, - { - "ts":"2017-11-05T01:02:18.000000Z", - "value":1338 - } - ] - } - ] -} +{ + "ts":"2017-07-14T02:40:00.000000Z", + "labels": + { + "project":"solomon" + }, + "metrics": + [ + { + "type":"COUNTER", + "labels": + { + "labels":"l1" + }, + "name":"single", + "ts":"2017-11-05T01:02:03.000000Z", + "value":17 + }, + { + "type":"RATE", + "labels": + { + "labels":"l2" + }, + "name":"single", + "ts":"2017-11-05T01:02:03.000000Z", + "value":17 + }, + { + "type":"DGAUGE", + "labels": + { + "labels":"l3" + }, + "name":"single", + "ts":"2017-11-05T01:02:03.000000Z", + "value":3.14 + }, + { + "type":"IGAUGE", + "labels": + { + "labels":"l4" + }, + "name":"single_igauge", + "ts":"2017-11-05T01:02:03.000000Z", + "value":42 + }, + { + "type":"DGAUGE", + "labels": + { + "labels":"l5" + }, + "name":"multiple", + "timeseries": + [ + { + "ts":"2017-11-05T01:02:03.000000Z", + "value":"nan" + }, + { + "ts":"2017-11-05T01:02:18.000000Z", + "value":"inf" + }, + { + "ts":"2017-11-05T01:02:33.000000Z", + "value":"-inf" + } + ] + }, + { + "type":"COUNTER", + "labels": + { + "labels":"l6" + }, + "name":"multiple", + "timeseries": + [ + { + "ts":"2017-11-05T01:02:03.000000Z", + "value":1337 + }, + { + "ts":"2017-11-05T01:02:18.000000Z", + "value":1338 + } + ] + } + ] +} diff --git a/library/cpp/monlib/encode/json/ut/ya.make b/library/cpp/monlib/encode/json/ut/ya.make index 851443ae62..e50c4f4903 100644 --- a/library/cpp/monlib/encode/json/ut/ya.make +++ b/library/cpp/monlib/encode/json/ut/ya.make @@ -15,9 +15,9 @@ RESOURCE( buffered_ts_merge.json /buffered_ts_merge.json empty_series.json /empty_series.json expected.json /expected.json - expected_buffered.json /expected_buffered.json - expected_cloud.json /expected_cloud.json - expected_cloud_buffered.json /expected_cloud_buffered.json + expected_buffered.json /expected_buffered.json + expected_cloud.json /expected_cloud.json + expected_cloud_buffered.json /expected_cloud_buffered.json merged.json /merged.json histogram_timeseries.json /histogram_timeseries.json histogram_value.json /histogram_value.json diff --git a/library/cpp/monlib/encode/prometheus/prometheus.h b/library/cpp/monlib/encode/prometheus/prometheus.h index 21248f5fef..2e7fa31c28 100644 --- a/library/cpp/monlib/encode/prometheus/prometheus.h +++ b/library/cpp/monlib/encode/prometheus/prometheus.h @@ -11,8 +11,8 @@ namespace NMonitoring { class TPrometheusDecodeException: public yexception { }; - IMetricEncoderPtr EncoderPrometheus(IOutputStream* out, TStringBuf metricNameLabel = "sensor"); + IMetricEncoderPtr EncoderPrometheus(IOutputStream* out, TStringBuf metricNameLabel = "sensor"); - void DecodePrometheus(TStringBuf data, IMetricConsumer* c, TStringBuf metricNameLabel = "sensor"); + void DecodePrometheus(TStringBuf data, IMetricConsumer* c, TStringBuf metricNameLabel = "sensor"); } diff --git a/library/cpp/monlib/encode/prometheus/prometheus_decoder.cpp b/library/cpp/monlib/encode/prometheus/prometheus_decoder.cpp index 5dc0bc8033..7e81357dbd 100644 --- a/library/cpp/monlib/encode/prometheus/prometheus_decoder.cpp +++ b/library/cpp/monlib/encode/prometheus/prometheus_decoder.cpp @@ -168,10 +168,10 @@ namespace NMonitoring { /////////////////////////////////////////////////////////////////////// class TPrometheusReader { public: - TPrometheusReader(TStringBuf data, IMetricConsumer* c, TStringBuf metricNameLabel) + TPrometheusReader(TStringBuf data, IMetricConsumer* c, TStringBuf metricNameLabel) : Data_(data) , Consumer_(c) - , MetricNameLabel_(metricNameLabel) + , MetricNameLabel_(metricNameLabel) { } @@ -516,12 +516,12 @@ namespace NMonitoring { } void ConsumeLabels(TStringBuf name, const TLabelsMap& labels) { - Y_PARSER_ENSURE(labels.count(MetricNameLabel_) == 0, - "label name '" << MetricNameLabel_ << + Y_PARSER_ENSURE(labels.count(MetricNameLabel_) == 0, + "label name '" << MetricNameLabel_ << "' is reserved, but is used with metric: " << name << LabelsToStr(labels)); Consumer_->OnLabelsBegin(); - Consumer_->OnLabel(MetricNameLabel_, TString(name)); // TODO: remove this string allocation + Consumer_->OnLabel(MetricNameLabel_, TString(name)); // TODO: remove this string allocation for (const auto& it: labels) { Consumer_->OnLabel(it.first, it.second); } @@ -579,7 +579,7 @@ namespace NMonitoring { private: TStringBuf Data_; IMetricConsumer* Consumer_; - TStringBuf MetricNameLabel_; + TStringBuf MetricNameLabel_; THashMap<TString, EPrometheusMetricType> SeenTypes_; THistogramBuilder HistogramBuilder_; @@ -589,8 +589,8 @@ namespace NMonitoring { }; } // namespace -void DecodePrometheus(TStringBuf data, IMetricConsumer* c, TStringBuf metricNameLabel) { - TPrometheusReader reader(data, c, metricNameLabel); +void DecodePrometheus(TStringBuf data, IMetricConsumer* c, TStringBuf metricNameLabel) { + TPrometheusReader reader(data, c, metricNameLabel); reader.Read(); } diff --git a/library/cpp/monlib/encode/prometheus/prometheus_encoder.cpp b/library/cpp/monlib/encode/prometheus/prometheus_encoder.cpp index c370fd2b6e..15efeb8c03 100644 --- a/library/cpp/monlib/encode/prometheus/prometheus_encoder.cpp +++ b/library/cpp/monlib/encode/prometheus/prometheus_encoder.cpp @@ -249,9 +249,9 @@ namespace NMonitoring { /////////////////////////////////////////////////////////////////////// class TPrometheusEncoder final: public IMetricEncoder { public: - explicit TPrometheusEncoder(IOutputStream* out, TStringBuf metricNameLabel) + explicit TPrometheusEncoder(IOutputStream* out, TStringBuf metricNameLabel) : Writer_(out) - , MetricNameLabel_(metricNameLabel) + , MetricNameLabel_(metricNameLabel) { } @@ -358,10 +358,10 @@ namespace NMonitoring { MetricState_.Labels.Add(l.Name(), l.Value()); } - TMaybe<TLabel> nameLabel = MetricState_.Labels.Extract(MetricNameLabel_); + TMaybe<TLabel> nameLabel = MetricState_.Labels.Extract(MetricNameLabel_); Y_ENSURE(nameLabel, "labels " << MetricState_.Labels << - " does not contain label '" << MetricNameLabel_ << '\''); + " does not contain label '" << MetricNameLabel_ << '\''); const TString& metricName = ToString(nameLabel->Value()); if (MetricState_.Type != EMetricType::DSUMMARY) { @@ -399,15 +399,15 @@ namespace NMonitoring { private: TEncoderState State_; TPrometheusWriter Writer_; - TString MetricNameLabel_; + TString MetricNameLabel_; TInstant CommonTime_ = TInstant::Zero(); TLabels CommonLabels_; TMetricState MetricState_; }; } - IMetricEncoderPtr EncoderPrometheus(IOutputStream* out, TStringBuf metricNameLabel) { - return MakeHolder<TPrometheusEncoder>(out, metricNameLabel); + IMetricEncoderPtr EncoderPrometheus(IOutputStream* out, TStringBuf metricNameLabel) { + return MakeHolder<TPrometheusEncoder>(out, metricNameLabel); } } // namespace NMonitoring diff --git a/library/cpp/monlib/encode/spack/spack_v1.h b/library/cpp/monlib/encode/spack/spack_v1.h index e962d21467..cf1c9417b9 100644 --- a/library/cpp/monlib/encode/spack/spack_v1.h +++ b/library/cpp/monlib/encode/spack/spack_v1.h @@ -78,7 +78,7 @@ namespace NMonitoring { /////////////////////////////////////////////////////////////////////////////// struct Y_PACKED TSpackHeader { ui16 Magic = 0x5053; // "SP" - ui16 Version; // MSB - major version, LSB - minor version + ui16 Version; // MSB - major version, LSB - minor version ui16 HeaderSize = sizeof(TSpackHeader); ui8 TimePrecision; ui8 Compression; @@ -89,12 +89,12 @@ namespace NMonitoring { // add new fields here }; - enum ESpackV1Version: ui16 { - SV1_00 = 0x0100, - SV1_01 = 0x0101, - SV1_02 = 0x0102 - }; - + enum ESpackV1Version: ui16 { + SV1_00 = 0x0100, + SV1_01 = 0x0101, + SV1_02 = 0x0102 + }; + IMetricEncoderPtr EncoderSpackV1( IOutputStream* out, ETimePrecision timePrecision, @@ -102,14 +102,14 @@ namespace NMonitoring { EMetricsMergingMode mergingMode = EMetricsMergingMode::DEFAULT ); - IMetricEncoderPtr EncoderSpackV12( - IOutputStream* out, - ETimePrecision timePrecision, - ECompression compression, - EMetricsMergingMode mergingMode = EMetricsMergingMode::DEFAULT, - TStringBuf metricNameLabel = "name" - ); + IMetricEncoderPtr EncoderSpackV12( + IOutputStream* out, + ETimePrecision timePrecision, + ECompression compression, + EMetricsMergingMode mergingMode = EMetricsMergingMode::DEFAULT, + TStringBuf metricNameLabel = "name" + ); + + void DecodeSpackV1(IInputStream* in, IMetricConsumer* c, TStringBuf metricNameLabel = "name"); - void DecodeSpackV1(IInputStream* in, IMetricConsumer* c, TStringBuf metricNameLabel = "name"); - } diff --git a/library/cpp/monlib/encode/spack/spack_v1_decoder.cpp b/library/cpp/monlib/encode/spack/spack_v1_decoder.cpp index 330e81a7a2..1f445fc80d 100644 --- a/library/cpp/monlib/encode/spack/spack_v1_decoder.cpp +++ b/library/cpp/monlib/encode/spack/spack_v1_decoder.cpp @@ -27,9 +27,9 @@ namespace NMonitoring { /////////////////////////////////////////////////////////////////////// class TDecoderSpackV1 { public: - TDecoderSpackV1(IInputStream* in, TStringBuf metricNameLabel) + TDecoderSpackV1(IInputStream* in, TStringBuf metricNameLabel) : In_(in) - , MetricNameLabel_(metricNameLabel) + , MetricNameLabel_(metricNameLabel) { } @@ -83,9 +83,9 @@ namespace NMonitoring { // (4) read common labels if (ui32 commonLabelsCount = ReadVarint()) { - c->OnLabelsBegin(); + c->OnLabelsBegin(); ReadLabels(labelNames, labelValues, commonLabelsCount, c); - c->OnLabelsEnd(); + c->OnLabelsEnd(); } // (5) read metrics @@ -110,20 +110,20 @@ namespace NMonitoring { // TODO: use it ReadFixed<ui8>(); // skip flags byte - auto metricNameValueIndex = std::numeric_limits<ui32>::max(); - if (Header_.Version >= SV1_02) { - metricNameValueIndex = ReadVarint(); - } - + auto metricNameValueIndex = std::numeric_limits<ui32>::max(); + if (Header_.Version >= SV1_02) { + metricNameValueIndex = ReadVarint(); + } + // (5.2) labels ui32 labelsCount = ReadVarint(); - DECODE_ENSURE(Header_.Version >= SV1_02 || labelsCount > 0, "metric #" << i << " has no labels"); - c->OnLabelsBegin(); - if (Header_.Version >= SV1_02) { - c->OnLabel(MetricNameLabel_, labelValues.Get(metricNameValueIndex)); - } + DECODE_ENSURE(Header_.Version >= SV1_02 || labelsCount > 0, "metric #" << i << " has no labels"); + c->OnLabelsBegin(); + if (Header_.Version >= SV1_02) { + c->OnLabel(MetricNameLabel_, labelValues.Get(metricNameValueIndex)); + } ReadLabels(labelNames, labelValues, labelsCount, c); - c->OnLabelsEnd(); + c->OnLabelsEnd(); // (5.3) values switch (valueType) { @@ -214,7 +214,7 @@ namespace NMonitoring { ui32 bucketsCount = ReadVarint(); auto s = TExplicitHistogramSnapshot::New(bucketsCount); - if (SV1_00 == Header_.Version) { // v1.0 + if (SV1_00 == Header_.Version) { // v1.0 for (ui32 i = 0; i < bucketsCount; i++) { i64 bound = ReadFixed<i64>(); double doubleBound = (bound != Max<i64>()) @@ -275,7 +275,7 @@ namespace NMonitoring { private: IInputStream* In_; - TString MetricNameLabel_; + TString MetricNameLabel_; ETimePrecision TimePrecision_; TSpackHeader Header_; }; // class TDecoderSpackV1 @@ -450,8 +450,8 @@ namespace NMonitoring { } } - void DecodeSpackV1(IInputStream* in, IMetricConsumer* c, TStringBuf metricNameLabel) { - TDecoderSpackV1 decoder(in, metricNameLabel); + void DecodeSpackV1(IInputStream* in, IMetricConsumer* c, TStringBuf metricNameLabel) { + TDecoderSpackV1 decoder(in, metricNameLabel); decoder.Decode(c); } diff --git a/library/cpp/monlib/encode/spack/spack_v1_encoder.cpp b/library/cpp/monlib/encode/spack/spack_v1_encoder.cpp index 3e7a2fc7ea..a2b0bb5f50 100644 --- a/library/cpp/monlib/encode/spack/spack_v1_encoder.cpp +++ b/library/cpp/monlib/encode/spack/spack_v1_encoder.cpp @@ -23,15 +23,15 @@ namespace NMonitoring { IOutputStream* out, ETimePrecision timePrecision, ECompression compression, - EMetricsMergingMode mergingMode, - ESpackV1Version version, - TStringBuf metricNameLabel + EMetricsMergingMode mergingMode, + ESpackV1Version version, + TStringBuf metricNameLabel ) : Out_(out) , TimePrecision_(timePrecision) , Compression_(compression) - , Version_(version) - , MetricName_(Version_ >= SV1_02 ? LabelNamesPool_.PutIfAbsent(metricNameLabel) : nullptr) + , Version_(version) + , MetricName_(Version_ >= SV1_02 ? LabelNamesPool_.PutIfAbsent(metricNameLabel) : nullptr) { MetricsMergingMode_ = mergingMode; @@ -89,7 +89,7 @@ namespace NMonitoring { // (1) write header TSpackHeader header; - header.Version = Version_; + header.Version = Version_; header.TimePrecision = EncodeTimePrecision(TimePrecision_); header.Compression = EncodeCompression(Compression_); header.LabelNamesSize = static_cast<ui32>( @@ -119,7 +119,7 @@ namespace NMonitoring { WriteTime(CommonTime_); // (4) write common labels' indexes - WriteLabels(CommonLabels_, nullptr); + WriteLabels(CommonLabels_, nullptr); // (5) write metrics // metrics count already written in header @@ -132,19 +132,19 @@ namespace NMonitoring { ui8 flagsByte = 0x00; Out_->Write(&flagsByte, sizeof(flagsByte)); - // v1.2 format addition — metric name - if (Version_ >= SV1_02) { - const auto it = FindIf(metric.Labels, [&](const auto& l) { - return l.Key == MetricName_; - }); - Y_ENSURE(it != metric.Labels.end(), - "metric name label '" << LabelNamesPool_.Get(MetricName_->Index) << "' not found, " - << "all metric labels '" << FormatLabels(metric.Labels) << "'"); - WriteVarUInt32(Out_, it->Value->Index); - } - + // v1.2 format addition — metric name + if (Version_ >= SV1_02) { + const auto it = FindIf(metric.Labels, [&](const auto& l) { + return l.Key == MetricName_; + }); + Y_ENSURE(it != metric.Labels.end(), + "metric name label '" << LabelNamesPool_.Get(MetricName_->Index) << "' not found, " + << "all metric labels '" << FormatLabels(metric.Labels) << "'"); + WriteVarUInt32(Out_, it->Value->Index); + } + // (5.2) labels - WriteLabels(metric.Labels, MetricName_); + WriteLabels(metric.Labels, MetricName_); // (5.3) values switch (metric.TimeSeries.Size()) { @@ -190,12 +190,12 @@ namespace NMonitoring { return (static_cast<ui8>(metric.MetricType) << 2) | static_cast<ui8>(valueType); } - void WriteLabels(const TPooledLabels& labels, const TPooledStr* skipKey) { - WriteVarUInt32(Out_, static_cast<ui32>(skipKey ? labels.size() - 1 : labels.size())); + void WriteLabels(const TPooledLabels& labels, const TPooledStr* skipKey) { + WriteVarUInt32(Out_, static_cast<ui32>(skipKey ? labels.size() - 1 : labels.size())); for (auto&& label : labels) { - if (label.Key == skipKey) { - continue; - } + if (label.Key == skipKey) { + continue; + } WriteVarUInt32(Out_, label.Key->Index); WriteVarUInt32(Out_, label.Value->Index); } @@ -289,8 +289,8 @@ namespace NMonitoring { IOutputStream* Out_; ETimePrecision TimePrecision_; ECompression Compression_; - ESpackV1Version Version_; - const TPooledStr* MetricName_; + ESpackV1Version Version_; + const TPooledStr* MetricName_; bool Closed_ = false; }; @@ -302,17 +302,17 @@ namespace NMonitoring { ECompression compression, EMetricsMergingMode mergingMode ) { - return MakeHolder<TEncoderSpackV1>(out, timePrecision, compression, mergingMode, SV1_01, ""); + return MakeHolder<TEncoderSpackV1>(out, timePrecision, compression, mergingMode, SV1_01, ""); } - IMetricEncoderPtr EncoderSpackV12( - IOutputStream* out, - ETimePrecision timePrecision, - ECompression compression, - EMetricsMergingMode mergingMode, - TStringBuf metricNameLabel - ) { - Y_ENSURE(!metricNameLabel.Empty(), "metricNameLabel can't be empty"); - return MakeHolder<TEncoderSpackV1>(out, timePrecision, compression, mergingMode, SV1_02, metricNameLabel); - } + IMetricEncoderPtr EncoderSpackV12( + IOutputStream* out, + ETimePrecision timePrecision, + ECompression compression, + EMetricsMergingMode mergingMode, + TStringBuf metricNameLabel + ) { + Y_ENSURE(!metricNameLabel.Empty(), "metricNameLabel can't be empty"); + return MakeHolder<TEncoderSpackV1>(out, timePrecision, compression, mergingMode, SV1_02, metricNameLabel); + } } diff --git a/library/cpp/monlib/encode/spack/spack_v1_ut.cpp b/library/cpp/monlib/encode/spack/spack_v1_ut.cpp index c6748cb0f5..fe778eb7e0 100644 --- a/library/cpp/monlib/encode/spack/spack_v1_ut.cpp +++ b/library/cpp/monlib/encode/spack/spack_v1_ut.cpp @@ -468,7 +468,7 @@ Y_UNIT_TEST_SUITE(TSpackTest) { IMetricEncoderPtr e = EncoderProtobuf(&samples); TBuffer data(expectedSize); - if (SV1_00 == version) { // v1.0 + if (SV1_00 == version) { // v1.0 data.Append(reinterpret_cast<char*>(expectedHeader_v1_0), Y_ARRAY_SIZE(expectedHeader_v1_0)); } else { data.Append(reinterpret_cast<char*>(expectedHeader), Y_ARRAY_SIZE(expectedHeader)); @@ -480,7 +480,7 @@ Y_UNIT_TEST_SUITE(TSpackTest) { data.Append(reinterpret_cast<char*>(expectedMetric2), Y_ARRAY_SIZE(expectedMetric2)); data.Append(reinterpret_cast<char*>(expectedMetric3), Y_ARRAY_SIZE(expectedMetric3)); data.Append(reinterpret_cast<char*>(expectedMetric4), Y_ARRAY_SIZE(expectedMetric4)); - if (SV1_00 == version) { // v1.0 + if (SV1_00 == version) { // v1.0 data.Append(reinterpret_cast<char*>(expectedMetric5_v1_0), Y_ARRAY_SIZE(expectedMetric5_v1_0)); } else { data.Append(reinterpret_cast<char*>(expectedMetric5), Y_ARRAY_SIZE(expectedMetric5)); @@ -494,7 +494,7 @@ Y_UNIT_TEST_SUITE(TSpackTest) { void DecodeDataToSamples(NProto::TMultiSamplesList & samples) { TSpackHeader header; - header.Version = SV1_01; + header.Version = SV1_01; DecodeDataToSamples(samples, header.Version); } @@ -689,7 +689,7 @@ Y_UNIT_TEST_SUITE(TSpackTest) { // Check that histogram bounds decoded from different versions are the same NProto::TMultiSamplesList samples, samples_v1_0; DecodeDataToSamples(samples); - DecodeDataToSamples(samples_v1_0, /*version = */ SV1_00); + DecodeDataToSamples(samples_v1_0, /*version = */ SV1_00); const NProto::THistogram& pointHistogram = samples.GetSamples(4).GetPoints(0).GetHistogram(); const NProto::THistogram& pointHistogram_v1_0 = samples_v1_0.GetSamples(4).GetPoints(0).GetHistogram(); @@ -698,148 +698,148 @@ Y_UNIT_TEST_SUITE(TSpackTest) { UNIT_ASSERT_DOUBLES_EQUAL(pointHistogram.GetBounds(i), pointHistogram_v1_0.GetBounds(i), Min<double>()); } } - - Y_UNIT_TEST(SimpleV12) { - ui8 expectedSerialized[] = { - // header - 0x53, 0x50, // magic "SP" (fixed ui16) - // minor, major - 0x02, 0x01, // version (fixed ui16) - 0x18, 0x00, // header size (fixed ui16) - 0x00, // time precision (fixed ui8) - 0x00, // compression algorithm (fixed ui8) - 0x0A, 0x00, 0x00, 0x00, // label names size (fixed ui32) - 0x14, 0x00, 0x00, 0x00, // labels values size (fixed ui32) - 0x01, 0x00, 0x00, 0x00, // metric count (fixed ui32) - 0x01, 0x00, 0x00, 0x00, // points count (fixed ui32) - - // string pools - 0x73, 0x00, // "s\0" - 0x70, 0x72, 0x6f, 0x6a, 0x65, 0x63, 0x74, 0x00, // "project\0" - 0x73, 0x6f, 0x6c, 0x6f, 0x6d, 0x6f, 0x6e, 0x00, // "solomon\0" - 0x74, 0x65, 0x6D, 0x70, 0x65, 0x72, 0x61, 0x74, // temperature - 0x75, 0x72, 0x65, 0x00, - - // common time - 0x00, 0x2f, 0x68, 0x59, // common time in seconds (fixed ui32) - - // common labels - 0x00, // common labels count (varint) - - // metric - 0x09, // types (COUNTER | ONE_WITHOUT_TS) (fixed ui8) - 0x00, // flags (fixed ui8) - 0x01, // name index (varint) - 0x01, // metric labels count (varint) - 0x01, // 'project' label name index (varint) - 0x00, // 'project' label value index (varint) - 0x11, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, // value (fixed ui64) - }; - - // encode - { - TBuffer actualSerialized; - { - TBufferOutput out(actualSerialized); - auto e = EncoderSpackV12( - &out, - ETimePrecision::SECONDS, - ECompression::IDENTITY, - EMetricsMergingMode::DEFAULT, - "s"); - - e->OnStreamBegin(); - e->OnCommonTime(TInstant::Seconds(1500000000)); - - { - e->OnMetricBegin(EMetricType::COUNTER); - { - e->OnLabelsBegin(); + + Y_UNIT_TEST(SimpleV12) { + ui8 expectedSerialized[] = { + // header + 0x53, 0x50, // magic "SP" (fixed ui16) + // minor, major + 0x02, 0x01, // version (fixed ui16) + 0x18, 0x00, // header size (fixed ui16) + 0x00, // time precision (fixed ui8) + 0x00, // compression algorithm (fixed ui8) + 0x0A, 0x00, 0x00, 0x00, // label names size (fixed ui32) + 0x14, 0x00, 0x00, 0x00, // labels values size (fixed ui32) + 0x01, 0x00, 0x00, 0x00, // metric count (fixed ui32) + 0x01, 0x00, 0x00, 0x00, // points count (fixed ui32) + + // string pools + 0x73, 0x00, // "s\0" + 0x70, 0x72, 0x6f, 0x6a, 0x65, 0x63, 0x74, 0x00, // "project\0" + 0x73, 0x6f, 0x6c, 0x6f, 0x6d, 0x6f, 0x6e, 0x00, // "solomon\0" + 0x74, 0x65, 0x6D, 0x70, 0x65, 0x72, 0x61, 0x74, // temperature + 0x75, 0x72, 0x65, 0x00, + + // common time + 0x00, 0x2f, 0x68, 0x59, // common time in seconds (fixed ui32) + + // common labels + 0x00, // common labels count (varint) + + // metric + 0x09, // types (COUNTER | ONE_WITHOUT_TS) (fixed ui8) + 0x00, // flags (fixed ui8) + 0x01, // name index (varint) + 0x01, // metric labels count (varint) + 0x01, // 'project' label name index (varint) + 0x00, // 'project' label value index (varint) + 0x11, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, // value (fixed ui64) + }; + + // encode + { + TBuffer actualSerialized; + { + TBufferOutput out(actualSerialized); + auto e = EncoderSpackV12( + &out, + ETimePrecision::SECONDS, + ECompression::IDENTITY, + EMetricsMergingMode::DEFAULT, + "s"); + + e->OnStreamBegin(); + e->OnCommonTime(TInstant::Seconds(1500000000)); + + { + e->OnMetricBegin(EMetricType::COUNTER); + { + e->OnLabelsBegin(); + e->OnLabel("project", "solomon"); + e->OnLabel("s", "temperature"); + e->OnLabelsEnd(); + } + // Only the last value will be encoded + e->OnUint64(TInstant::Zero(), 10); + e->OnUint64(TInstant::Zero(), 13); + e->OnUint64(TInstant::Zero(), 17); + e->OnMetricEnd(); + } + + e->OnStreamEnd(); + e->Close(); + } + + UNIT_ASSERT_VALUES_EQUAL(actualSerialized.Size(), Y_ARRAY_SIZE(expectedSerialized)); + UNIT_ASSERT_BINARY_EQUALS(actualSerialized.Data(), expectedSerialized); + } + + // decode + { + NProto::TMultiSamplesList samples; + { + auto input = TMemoryInput(expectedSerialized, Y_ARRAY_SIZE(expectedSerialized)); + auto encoder = EncoderProtobuf(&samples); + DecodeSpackV1(&input, encoder.Get(), "s"); + } + + UNIT_ASSERT_VALUES_EQUAL(TInstant::MilliSeconds(samples.GetCommonTime()), + TInstant::Seconds(1500000000)); + + UNIT_ASSERT_VALUES_EQUAL(samples.CommonLabelsSize(), 0); + + UNIT_ASSERT_VALUES_EQUAL(samples.SamplesSize(), 1); + { + const auto& s = samples.GetSamples(0); + UNIT_ASSERT_EQUAL(s.GetMetricType(), NProto::COUNTER); + UNIT_ASSERT_VALUES_EQUAL(s.LabelsSize(), 2); + AssertLabelEqual(s.GetLabels(0), "s", "temperature"); + AssertLabelEqual(s.GetLabels(1), "project", "solomon"); + + UNIT_ASSERT_VALUES_EQUAL(s.PointsSize(), 1); + AssertPointEqual(s.GetPoints(0), TInstant::Zero(), ui64(17)); + } + } + } + + Y_UNIT_TEST(V12MissingNameForOneMetric) { + TBuffer b; + TBufferOutput out(b); + auto e = EncoderSpackV12( + &out, + ETimePrecision::SECONDS, + ECompression::IDENTITY, + EMetricsMergingMode::DEFAULT, + "s"); + + UNIT_ASSERT_EXCEPTION_CONTAINS( + [&]() { + e->OnStreamBegin(); + { + e->OnMetricBegin(EMetricType::COUNTER); + { + e->OnLabelsBegin(); + e->OnLabel("s", "s1"); + e->OnLabelsEnd(); + } + e->OnUint64(TInstant::Zero(), 1); + e->OnMetricEnd(); + + e->OnMetricBegin(EMetricType::COUNTER); + { + e->OnLabelsBegin(); e->OnLabel("project", "solomon"); - e->OnLabel("s", "temperature"); - e->OnLabelsEnd(); - } - // Only the last value will be encoded - e->OnUint64(TInstant::Zero(), 10); - e->OnUint64(TInstant::Zero(), 13); - e->OnUint64(TInstant::Zero(), 17); - e->OnMetricEnd(); - } - - e->OnStreamEnd(); - e->Close(); - } - - UNIT_ASSERT_VALUES_EQUAL(actualSerialized.Size(), Y_ARRAY_SIZE(expectedSerialized)); - UNIT_ASSERT_BINARY_EQUALS(actualSerialized.Data(), expectedSerialized); - } - - // decode - { - NProto::TMultiSamplesList samples; - { - auto input = TMemoryInput(expectedSerialized, Y_ARRAY_SIZE(expectedSerialized)); - auto encoder = EncoderProtobuf(&samples); - DecodeSpackV1(&input, encoder.Get(), "s"); - } - - UNIT_ASSERT_VALUES_EQUAL(TInstant::MilliSeconds(samples.GetCommonTime()), - TInstant::Seconds(1500000000)); - - UNIT_ASSERT_VALUES_EQUAL(samples.CommonLabelsSize(), 0); - - UNIT_ASSERT_VALUES_EQUAL(samples.SamplesSize(), 1); - { - const auto& s = samples.GetSamples(0); - UNIT_ASSERT_EQUAL(s.GetMetricType(), NProto::COUNTER); - UNIT_ASSERT_VALUES_EQUAL(s.LabelsSize(), 2); - AssertLabelEqual(s.GetLabels(0), "s", "temperature"); - AssertLabelEqual(s.GetLabels(1), "project", "solomon"); - - UNIT_ASSERT_VALUES_EQUAL(s.PointsSize(), 1); - AssertPointEqual(s.GetPoints(0), TInstant::Zero(), ui64(17)); - } - } - } - - Y_UNIT_TEST(V12MissingNameForOneMetric) { - TBuffer b; - TBufferOutput out(b); - auto e = EncoderSpackV12( - &out, - ETimePrecision::SECONDS, - ECompression::IDENTITY, - EMetricsMergingMode::DEFAULT, - "s"); - - UNIT_ASSERT_EXCEPTION_CONTAINS( - [&]() { - e->OnStreamBegin(); - { - e->OnMetricBegin(EMetricType::COUNTER); - { - e->OnLabelsBegin(); - e->OnLabel("s", "s1"); - e->OnLabelsEnd(); - } - e->OnUint64(TInstant::Zero(), 1); - e->OnMetricEnd(); - - e->OnMetricBegin(EMetricType::COUNTER); - { - e->OnLabelsBegin(); - e->OnLabel("project", "solomon"); - e->OnLabel("m", "v"); - e->OnLabelsEnd(); - } - e->OnUint64(TInstant::Zero(), 2); - e->OnMetricEnd(); - } - - e->OnStreamEnd(); - e->Close(); - }(), - yexception, - "metric name label 's' not found, all metric labels '{m=v, project=solomon}'"); - } + e->OnLabel("m", "v"); + e->OnLabelsEnd(); + } + e->OnUint64(TInstant::Zero(), 2); + e->OnMetricEnd(); + } + + e->OnStreamEnd(); + e->Close(); + }(), + yexception, + "metric name label 's' not found, all metric labels '{m=v, project=solomon}'"); + } } diff --git a/library/cpp/monlib/metrics/metric_value.h b/library/cpp/monlib/metrics/metric_value.h index 5f235e7a0b..607fcc8602 100644 --- a/library/cpp/monlib/metrics/metric_value.h +++ b/library/cpp/monlib/metrics/metric_value.h @@ -384,35 +384,35 @@ namespace NMonitoring { template <typename T> void Add(TInstant time, T value) { - Add(TPoint(time, value), TValueType<T>::Type); - } - - void Add(TPoint point, EMetricValueType valueType) { + Add(TPoint(time, value), TValueType<T>::Type); + } + + void Add(TPoint point, EMetricValueType valueType) { if (Empty()) { - ValueType_ = valueType; + ValueType_ = valueType; } else { - CheckTypes(ValueType_, valueType); + CheckTypes(ValueType_, valueType); } - Points_.push_back(point); + Points_.push_back(point); if (ValueType_ == EMetricValueType::SUMMARY) { - TPoint& p = Points_.back(); - p.GetValue().AsSummaryDouble()->Ref(); + TPoint& p = Points_.back(); + p.GetValue().AsSummaryDouble()->Ref(); } else if (ValueType_ == EMetricValueType::HISTOGRAM) { - TPoint& p = Points_.back(); - p.GetValue().AsHistogram()->Ref(); + TPoint& p = Points_.back(); + p.GetValue().AsHistogram()->Ref(); } else if (ValueType_ == EMetricValueType::LOGHISTOGRAM) { - TPoint& p = Points_.back(); - p.GetValue().AsLogHistogram()->Ref(); + TPoint& p = Points_.back(); + p.GetValue().AsLogHistogram()->Ref(); } } void CopyFrom(const TMetricTimeSeries& other) { - if (Empty()) { - ValueType_ = other.ValueType_; - } else { - CheckTypes(GetValueType(), other.GetValueType()); - } + if (Empty()) { + ValueType_ = other.ValueType_; + } else { + CheckTypes(GetValueType(), other.GetValueType()); + } size_t prevSize = Points_.size(); Copy(std::begin(other.Points_), std::end(other.Points_), diff --git a/library/cpp/testing/benchmark/bench.h b/library/cpp/testing/benchmark/bench.h index 4225a31fd3..21551ad0dd 100644 --- a/library/cpp/testing/benchmark/bench.h +++ b/library/cpp/testing/benchmark/bench.h @@ -1,9 +1,9 @@ #pragma once -#include <util/system/compiler.h> +#include <util/system/compiler.h> #include <util/system/types.h> -#include <utility> +#include <utility> namespace NBench { namespace NCpu { @@ -71,14 +71,14 @@ namespace NBench { } #endif - /** - * Use this function to prevent unused variables elimination. - * - * @param Unused variable (e.g. return value of benchmarked function). - */ + /** + * Use this function to prevent unused variables elimination. + * + * @param Unused variable (e.g. return value of benchmarked function). + */ template <typename T> Y_FORCE_INLINE void DoNotOptimize(T&& datum) { - ::DoNotOptimizeAway(std::forward<T>(datum)); + ::DoNotOptimizeAway(std::forward<T>(datum)); } int Main(int argc, char** argv); diff --git a/util/system/compiler.cpp b/util/system/compiler.cpp index 4803c75a4a..d4b3cca0af 100644 --- a/util/system/compiler.cpp +++ b/util/system/compiler.cpp @@ -4,6 +4,6 @@ [[noreturn]] Y_HIDDEN void _YandexAbort() { std::abort(); } - -void UseCharPointerImpl(volatile const char*) { -} + +void UseCharPointerImpl(volatile const char*) { +} diff --git a/util/system/compiler.h b/util/system/compiler.h index ad523479be..b373edcc46 100644 --- a/util/system/compiler.h +++ b/util/system/compiler.h @@ -1,9 +1,9 @@ #pragma once -#if defined(_MSC_VER) +#if defined(_MSC_VER) #include <intrin.h> -#endif - +#endif + // useful cross-platfrom definitions for compilers /** @@ -623,11 +623,11 @@ _YandexAbort(); do { \ } while (0) #endif - -#ifdef __cplusplus - + +#ifdef __cplusplus + void UseCharPointerImpl(volatile const char*); - + template <typename T> Y_FORCE_INLINE void DoNotOptimizeAway(T&& datum) { #if defined(_MSC_VER) @@ -641,10 +641,10 @@ Y_FORCE_INLINE void DoNotOptimizeAway(T&& datum) { Y_FAKE_READ(datum); #endif } - + /** - * Use this macro to prevent unused variables elimination. - */ + * Use this macro to prevent unused variables elimination. + */ #define Y_DO_NOT_OPTIMIZE_AWAY(X) ::DoNotOptimizeAway(X) - -#endif + +#endif diff --git a/util/thread/pool.cpp b/util/thread/pool.cpp index 1b6745c272..05fad02e9b 100644 --- a/util/thread/pool.cpp +++ b/util/thread/pool.cpp @@ -147,7 +147,7 @@ public: return ThreadCountReal; } - inline void AtforkAction() noexcept Y_NO_SANITIZE("thread") { + inline void AtforkAction() noexcept Y_NO_SANITIZE("thread") { Forked = true; } diff --git a/ydb/core/driver_lib/run/run.cpp b/ydb/core/driver_lib/run/run.cpp index 62bf1fa07e..a4f74aa4e0 100644 --- a/ydb/core/driver_lib/run/run.cpp +++ b/ydb/core/driver_lib/run/run.cpp @@ -119,8 +119,8 @@ #include <ydb/library/yql/minikql/invoke_builtins/mkql_builtins.h> #include <util/folder/dirut.h> -#include <util/system/file.h> -#include <util/system/getpid.h> +#include <util/system/file.h> +#include <util/system/getpid.h> #include <util/system/hostname.h> #include <ydb/core/tracing/tablet_info.h> diff --git a/ydb/core/protos/config.proto b/ydb/core/protos/config.proto index 4bac573477..d64169d4fc 100644 --- a/ydb/core/protos/config.proto +++ b/ydb/core/protos/config.proto @@ -254,18 +254,18 @@ message TBlobStorageFormatConfig { repeated TDrive Drive = 1; } -message TUAClientConfig { - optional string Uri = 1; - optional string SharedSecretKey = 2; - optional uint64 MaxInflightBytes = 3 [default = 100000000]; - optional uint64 GrpcReconnectDelayMs = 4; - optional uint64 GrpcSendDelayMs = 5; - optional uint64 GrpcMaxMessageSize = 6; - optional string ClientLogFile = 7; - optional uint32 ClientLogPriority = 8; - optional string LogName = 9; -} - +message TUAClientConfig { + optional string Uri = 1; + optional string SharedSecretKey = 2; + optional uint64 MaxInflightBytes = 3 [default = 100000000]; + optional uint64 GrpcReconnectDelayMs = 4; + optional uint64 GrpcSendDelayMs = 5; + optional uint64 GrpcMaxMessageSize = 6; + optional string ClientLogFile = 7; + optional uint32 ClientLogPriority = 8; + optional string LogName = 9; +} + message TLogConfig { message TEntry { optional bytes Component = 1; @@ -286,7 +286,7 @@ message TLogConfig { optional string BackendFileName = 10; optional string SysLogService = 11; optional bool SysLogToStdErr = 12; // writes logs to stderr as well as in syslog - optional TUAClientConfig UAClientConfig = 13; + optional TUAClientConfig UAClientConfig = 13; optional uint64 TimeThresholdMs = 14 [default = 1000]; } diff --git a/ydb/core/yq/libs/pretty_printers/minikql_program_printer.cpp b/ydb/core/yq/libs/pretty_printers/minikql_program_printer.cpp index 73130d39fb..8c80588457 100644 --- a/ydb/core/yq/libs/pretty_printers/minikql_program_printer.cpp +++ b/ydb/core/yq/libs/pretty_printers/minikql_program_printer.cpp @@ -68,9 +68,9 @@ public: void Visit(TTaggedType&) override { Out << "TaggedType"; } - void Visit(TBlockType&) override { - Out << "BlockType"; - } + void Visit(TBlockType&) override { + Out << "BlockType"; + } // Values void Visit(TVoid&) override { diff --git a/ydb/library/yql/minikql/arrow/arrow_defs.h b/ydb/library/yql/minikql/arrow/arrow_defs.h index 7b3d5dc8e6..97b26e0345 100644 --- a/ydb/library/yql/minikql/arrow/arrow_defs.h +++ b/ydb/library/yql/minikql/arrow/arrow_defs.h @@ -1,24 +1,24 @@ -#pragma once - +#pragma once + #include <ydb/library/yql/minikql/defs.h> - -#define ARROW_CHECK_STATUS(s, op, ...) \ - MKQL_ENSURE(s.ok(), "Operation failed: [" << #op << "]\n" \ - << "" __VA_ARGS__ << ": [" << s.ToString() << "]") \ - -#define ARROW_OK_S(op, ...) \ -do { \ - ::arrow::Status _s = (op); \ - ARROW_CHECK_STATUS(_s, op, __VA_ARGS__); \ - } while (false) - -#define ARROW_OK(op) ARROW_OK_S(op, "Bad status") - -#define ARROW_RESULT_S(op, ...) \ - [&]() { \ - auto result = (op); \ - ARROW_CHECK_STATUS(result.status(), op, __VA_ARGS__); \ - return std::move(result).ValueOrDie(); \ - }() - -#define ARROW_RESULT(op) ARROW_RESULT_S(op, "Bad status") + +#define ARROW_CHECK_STATUS(s, op, ...) \ + MKQL_ENSURE(s.ok(), "Operation failed: [" << #op << "]\n" \ + << "" __VA_ARGS__ << ": [" << s.ToString() << "]") \ + +#define ARROW_OK_S(op, ...) \ +do { \ + ::arrow::Status _s = (op); \ + ARROW_CHECK_STATUS(_s, op, __VA_ARGS__); \ + } while (false) + +#define ARROW_OK(op) ARROW_OK_S(op, "Bad status") + +#define ARROW_RESULT_S(op, ...) \ + [&]() { \ + auto result = (op); \ + ARROW_CHECK_STATUS(result.status(), op, __VA_ARGS__); \ + return std::move(result).ValueOrDie(); \ + }() + +#define ARROW_RESULT(op) ARROW_RESULT_S(op, "Bad status") diff --git a/ydb/library/yql/minikql/arrow/mkql_memory_pool.cpp b/ydb/library/yql/minikql/arrow/mkql_memory_pool.cpp index 57b8f2a246..7bc6ad02b0 100644 --- a/ydb/library/yql/minikql/arrow/mkql_memory_pool.cpp +++ b/ydb/library/yql/minikql/arrow/mkql_memory_pool.cpp @@ -1,50 +1,50 @@ -#include "mkql_memory_pool.h" - -namespace NKikimr::NMiniKQL { - -namespace { -class TArrowMemoryPool : public arrow::MemoryPool { -public: - explicit TArrowMemoryPool(TAllocState& allocState) - : AllocState(allocState) { - } - - arrow::Status Allocate(int64_t size, uint8_t** out) override { - *out = static_cast<uint8_t*>(MKQLAllocWithSize(size)); - return arrow::Status(); - } - - arrow::Status Reallocate(int64_t oldSize, int64_t newSize, uint8_t** ptr) override { - void* newPtr = MKQLAllocWithSize(newSize); - ::memcpy(newPtr, *ptr, std::min(oldSize, newSize)); - MKQLFreeWithSize(*ptr, oldSize); - *ptr = static_cast<uint8_t*>(newPtr); - return arrow::Status(); - } - - void Free(uint8_t* buffer, int64_t size) override { - MKQLFreeWithSize(buffer, size); - } - - int64_t bytes_allocated() const override { - return AllocState.GetUsed(); - } - - int64_t max_memory() const override { - return -1; - } - - std::string backend_name() const override { - return "MKQL"; - } - -private: - TAllocState& AllocState; -}; -} - -std::unique_ptr <arrow::MemoryPool> MakeArrowMemoryPool(TAllocState& allocState) { - return std::make_unique<TArrowMemoryPool>(allocState); -} - -} +#include "mkql_memory_pool.h" + +namespace NKikimr::NMiniKQL { + +namespace { +class TArrowMemoryPool : public arrow::MemoryPool { +public: + explicit TArrowMemoryPool(TAllocState& allocState) + : AllocState(allocState) { + } + + arrow::Status Allocate(int64_t size, uint8_t** out) override { + *out = static_cast<uint8_t*>(MKQLAllocWithSize(size)); + return arrow::Status(); + } + + arrow::Status Reallocate(int64_t oldSize, int64_t newSize, uint8_t** ptr) override { + void* newPtr = MKQLAllocWithSize(newSize); + ::memcpy(newPtr, *ptr, std::min(oldSize, newSize)); + MKQLFreeWithSize(*ptr, oldSize); + *ptr = static_cast<uint8_t*>(newPtr); + return arrow::Status(); + } + + void Free(uint8_t* buffer, int64_t size) override { + MKQLFreeWithSize(buffer, size); + } + + int64_t bytes_allocated() const override { + return AllocState.GetUsed(); + } + + int64_t max_memory() const override { + return -1; + } + + std::string backend_name() const override { + return "MKQL"; + } + +private: + TAllocState& AllocState; +}; +} + +std::unique_ptr <arrow::MemoryPool> MakeArrowMemoryPool(TAllocState& allocState) { + return std::make_unique<TArrowMemoryPool>(allocState); +} + +} diff --git a/ydb/library/yql/minikql/arrow/mkql_memory_pool.h b/ydb/library/yql/minikql/arrow/mkql_memory_pool.h index dd23cbb84f..e48e0a7af2 100644 --- a/ydb/library/yql/minikql/arrow/mkql_memory_pool.h +++ b/ydb/library/yql/minikql/arrow/mkql_memory_pool.h @@ -1,11 +1,11 @@ -#pragma once - +#pragma once + #include <ydb/library/yql/minikql/mkql_alloc.h> - -#include <contrib/libs/apache/arrow/cpp/src/arrow/memory_pool.h> - -namespace NKikimr::NMiniKQL { - -std::unique_ptr<arrow::MemoryPool> MakeArrowMemoryPool(TAllocState& allocState); - -} + +#include <contrib/libs/apache/arrow/cpp/src/arrow/memory_pool.h> + +namespace NKikimr::NMiniKQL { + +std::unique_ptr<arrow::MemoryPool> MakeArrowMemoryPool(TAllocState& allocState); + +} diff --git a/ydb/library/yql/minikql/arrow/ya.make b/ydb/library/yql/minikql/arrow/ya.make index b6d675f30d..442e5d3996 100644 --- a/ydb/library/yql/minikql/arrow/ya.make +++ b/ydb/library/yql/minikql/arrow/ya.make @@ -5,11 +5,11 @@ OWNER( ) SRCS( - mkql_memory_pool.cpp + mkql_memory_pool.cpp ) PEERDIR( - contrib/libs/apache/arrow + contrib/libs/apache/arrow ydb/library/yql/minikql ) diff --git a/ydb/library/yql/minikql/comp_nodes/mkql_block_add.cpp b/ydb/library/yql/minikql/comp_nodes/mkql_block_add.cpp index f87ca99874..5a57e95c24 100644 --- a/ydb/library/yql/minikql/comp_nodes/mkql_block_add.cpp +++ b/ydb/library/yql/minikql/comp_nodes/mkql_block_add.cpp @@ -1,152 +1,152 @@ -#include "mkql_block_add.h" - +#include "mkql_block_add.h" + #include <ydb/library/yql/minikql/arrow/arrow_defs.h> #include <ydb/library/yql/minikql/computation/mkql_computation_node_holders.h> #include <ydb/library/yql/minikql/computation/mkql_computation_node_codegen.h> #include <ydb/library/yql/minikql/mkql_node_builder.h> #include <ydb/library/yql/minikql/mkql_node_cast.h> - -#include <arrow/array/builder_primitive.h> -#include <arrow/compute/exec_internal.h> -#include <arrow/compute/function.h> -#include <arrow/compute/kernel.h> -#include <arrow/compute/registry.h> -#include <arrow/util/bit_util.h> - -namespace NKikimr { -namespace NMiniKQL { - -namespace { - -class TBlockAddWrapper : public TMutableComputationNode<TBlockAddWrapper> { -public: - TBlockAddWrapper(TComputationMutables& mutables, - IComputationNode* leftArg, - IComputationNode* rightArg, - TType* leftArgType, - TType* rightArgType, - TType* outputType) - : TMutableComputationNode(mutables) - , LeftArg(leftArg) - , RightArg(rightArg) - , LeftValueDesc(ToValueDescr(leftArgType)) - , RightValueDesc(ToValueDescr(rightArgType)) - , OutputValueDescr(ToValueDescr(outputType)) - , Kernel(ResolveKernel(LeftValueDesc, RightValueDesc)) - , OutputTypeBitWidth(static_cast<const arrow::FixedWidthType&>(*OutputValueDescr.type).bit_width()) - , FunctionRegistry(*arrow::compute::GetFunctionRegistry()) - { - { - auto execContext = arrow::compute::ExecContext(); - auto kernelContext = arrow::compute::KernelContext(&execContext); - const auto kernelOutputValueDesc = ARROW_RESULT(Kernel.signature->out_type().Resolve(&kernelContext, { - LeftValueDesc, - RightValueDesc - })); - Y_VERIFY_DEBUG(kernelOutputValueDesc == OutputValueDescr); - } - - Y_VERIFY_DEBUG( - LeftValueDesc.shape == arrow::ValueDescr::ARRAY && RightValueDesc.shape == arrow::ValueDescr::ARRAY || - LeftValueDesc.shape == arrow::ValueDescr::SCALAR && RightValueDesc.shape == arrow::ValueDescr::ARRAY || - LeftValueDesc.shape == arrow::ValueDescr::ARRAY && RightValueDesc.shape == arrow::ValueDescr::SCALAR); - } - - NUdf::TUnboxedValuePod DoCalculate(TComputationContext& ctx) const { - const auto leftValue = LeftArg->GetValue(ctx); - const auto rightValue = RightArg->GetValue(ctx); - auto& leftDatum = TArrowBlock::From(leftValue).GetDatum(); - auto& rightDatum = TArrowBlock::From(rightValue).GetDatum(); - Y_VERIFY_DEBUG(leftDatum.descr() == LeftValueDesc); - Y_VERIFY_DEBUG(rightDatum.descr() == RightValueDesc); - const auto leftKind = leftDatum.kind(); - const auto rightKind = rightDatum.kind(); - MKQL_ENSURE(leftKind != arrow::Datum::ARRAY || rightKind != arrow::Datum::ARRAY || - leftDatum.array()->length == rightDatum.array()->length, - "block size mismatch: " - << static_cast<ui64>(leftDatum.array()->length) - << " != " - << static_cast<ui64>(rightDatum.array()->length)); - const auto blockLength = leftKind == arrow::Datum::ARRAY - ? leftDatum.array()->length - : rightDatum.array()->length; - - auto execContext = arrow::compute::ExecContext(&ctx.ArrowMemoryPool, nullptr, &FunctionRegistry); - auto kernelContext = arrow::compute::KernelContext(&execContext); - - arrow::Datum output = arrow::ArrayData::Make( - OutputValueDescr.type, - blockLength, - std::vector<std::shared_ptr<arrow::Buffer>> { - ARROW_RESULT(kernelContext.AllocateBitmap(blockLength)), - ARROW_RESULT(kernelContext.Allocate(arrow::BitUtil::BytesForBits(OutputTypeBitWidth * blockLength))) - }); - const auto inputBatch = arrow::compute::ExecBatch({leftDatum, rightDatum}, blockLength); - ARROW_OK(arrow::compute::detail::PropagateNulls(&kernelContext, inputBatch, output.array().get())); - ARROW_OK(Kernel.exec(&kernelContext, inputBatch, &output)); - return ctx.HolderFactory.CreateArrowBlock(std::move(output)); - } - -private: - void RegisterDependencies() const final { - this->DependsOn(LeftArg); - this->DependsOn(RightArg); - } - - static const arrow::compute::ScalarKernel& ResolveKernel(const arrow::ValueDescr& leftArg, - const arrow::ValueDescr& rightArg) - { - auto* functionRegistry = arrow::compute::GetFunctionRegistry(); - Y_VERIFY_DEBUG(functionRegistry != nullptr); - auto function = ARROW_RESULT(functionRegistry->GetFunction("add")); - Y_VERIFY_DEBUG(function != nullptr); - Y_VERIFY_DEBUG(function->kind() == arrow::compute::Function::SCALAR); - - const auto* kernel = ARROW_RESULT(function->DispatchExact({leftArg, rightArg})); - return *static_cast<const arrow::compute::ScalarKernel*>(kernel); - } - - static std::shared_ptr<arrow::DataType> ConvertType(TType* type) { - bool isOptional; - const auto dataType = UnpackOptionalData(type, isOptional); - switch (*dataType->GetDataSlot()) { - case NUdf::EDataSlot::Uint64: - return arrow::uint64(); - default: - Y_FAIL("unexpected type %s", TString(dataType->GetKindAsStr()).c_str()); - } - } - - static arrow::ValueDescr ToValueDescr(TType* type) { - auto* blockType = AS_TYPE(TBlockType, type); - const auto shape = blockType->GetShape() == TBlockType::EShape::Single - ? arrow::ValueDescr::SCALAR - : arrow::ValueDescr::ARRAY; - return arrow::ValueDescr(ConvertType(blockType->GetItemType()), shape); - } - -private: - IComputationNode* LeftArg; - IComputationNode* RightArg; - const arrow::ValueDescr LeftValueDesc; - const arrow::ValueDescr RightValueDesc; - const arrow::ValueDescr OutputValueDescr; - const arrow::compute::ScalarKernel& Kernel; - const int OutputTypeBitWidth; - arrow::compute::FunctionRegistry& FunctionRegistry; -}; - -} - -IComputationNode* WrapBlockAdd(TCallable& callable, const TComputationNodeFactoryContext& ctx) { - const auto* callableType = callable.GetType(); - return new TBlockAddWrapper(ctx.Mutables, - LocateNode(ctx.NodeLocator, callable, 0), - LocateNode(ctx.NodeLocator, callable, 1), - callableType->GetArgumentType(0), - callableType->GetArgumentType(1), - callableType->GetReturnType()); -} - -} -} + +#include <arrow/array/builder_primitive.h> +#include <arrow/compute/exec_internal.h> +#include <arrow/compute/function.h> +#include <arrow/compute/kernel.h> +#include <arrow/compute/registry.h> +#include <arrow/util/bit_util.h> + +namespace NKikimr { +namespace NMiniKQL { + +namespace { + +class TBlockAddWrapper : public TMutableComputationNode<TBlockAddWrapper> { +public: + TBlockAddWrapper(TComputationMutables& mutables, + IComputationNode* leftArg, + IComputationNode* rightArg, + TType* leftArgType, + TType* rightArgType, + TType* outputType) + : TMutableComputationNode(mutables) + , LeftArg(leftArg) + , RightArg(rightArg) + , LeftValueDesc(ToValueDescr(leftArgType)) + , RightValueDesc(ToValueDescr(rightArgType)) + , OutputValueDescr(ToValueDescr(outputType)) + , Kernel(ResolveKernel(LeftValueDesc, RightValueDesc)) + , OutputTypeBitWidth(static_cast<const arrow::FixedWidthType&>(*OutputValueDescr.type).bit_width()) + , FunctionRegistry(*arrow::compute::GetFunctionRegistry()) + { + { + auto execContext = arrow::compute::ExecContext(); + auto kernelContext = arrow::compute::KernelContext(&execContext); + const auto kernelOutputValueDesc = ARROW_RESULT(Kernel.signature->out_type().Resolve(&kernelContext, { + LeftValueDesc, + RightValueDesc + })); + Y_VERIFY_DEBUG(kernelOutputValueDesc == OutputValueDescr); + } + + Y_VERIFY_DEBUG( + LeftValueDesc.shape == arrow::ValueDescr::ARRAY && RightValueDesc.shape == arrow::ValueDescr::ARRAY || + LeftValueDesc.shape == arrow::ValueDescr::SCALAR && RightValueDesc.shape == arrow::ValueDescr::ARRAY || + LeftValueDesc.shape == arrow::ValueDescr::ARRAY && RightValueDesc.shape == arrow::ValueDescr::SCALAR); + } + + NUdf::TUnboxedValuePod DoCalculate(TComputationContext& ctx) const { + const auto leftValue = LeftArg->GetValue(ctx); + const auto rightValue = RightArg->GetValue(ctx); + auto& leftDatum = TArrowBlock::From(leftValue).GetDatum(); + auto& rightDatum = TArrowBlock::From(rightValue).GetDatum(); + Y_VERIFY_DEBUG(leftDatum.descr() == LeftValueDesc); + Y_VERIFY_DEBUG(rightDatum.descr() == RightValueDesc); + const auto leftKind = leftDatum.kind(); + const auto rightKind = rightDatum.kind(); + MKQL_ENSURE(leftKind != arrow::Datum::ARRAY || rightKind != arrow::Datum::ARRAY || + leftDatum.array()->length == rightDatum.array()->length, + "block size mismatch: " + << static_cast<ui64>(leftDatum.array()->length) + << " != " + << static_cast<ui64>(rightDatum.array()->length)); + const auto blockLength = leftKind == arrow::Datum::ARRAY + ? leftDatum.array()->length + : rightDatum.array()->length; + + auto execContext = arrow::compute::ExecContext(&ctx.ArrowMemoryPool, nullptr, &FunctionRegistry); + auto kernelContext = arrow::compute::KernelContext(&execContext); + + arrow::Datum output = arrow::ArrayData::Make( + OutputValueDescr.type, + blockLength, + std::vector<std::shared_ptr<arrow::Buffer>> { + ARROW_RESULT(kernelContext.AllocateBitmap(blockLength)), + ARROW_RESULT(kernelContext.Allocate(arrow::BitUtil::BytesForBits(OutputTypeBitWidth * blockLength))) + }); + const auto inputBatch = arrow::compute::ExecBatch({leftDatum, rightDatum}, blockLength); + ARROW_OK(arrow::compute::detail::PropagateNulls(&kernelContext, inputBatch, output.array().get())); + ARROW_OK(Kernel.exec(&kernelContext, inputBatch, &output)); + return ctx.HolderFactory.CreateArrowBlock(std::move(output)); + } + +private: + void RegisterDependencies() const final { + this->DependsOn(LeftArg); + this->DependsOn(RightArg); + } + + static const arrow::compute::ScalarKernel& ResolveKernel(const arrow::ValueDescr& leftArg, + const arrow::ValueDescr& rightArg) + { + auto* functionRegistry = arrow::compute::GetFunctionRegistry(); + Y_VERIFY_DEBUG(functionRegistry != nullptr); + auto function = ARROW_RESULT(functionRegistry->GetFunction("add")); + Y_VERIFY_DEBUG(function != nullptr); + Y_VERIFY_DEBUG(function->kind() == arrow::compute::Function::SCALAR); + + const auto* kernel = ARROW_RESULT(function->DispatchExact({leftArg, rightArg})); + return *static_cast<const arrow::compute::ScalarKernel*>(kernel); + } + + static std::shared_ptr<arrow::DataType> ConvertType(TType* type) { + bool isOptional; + const auto dataType = UnpackOptionalData(type, isOptional); + switch (*dataType->GetDataSlot()) { + case NUdf::EDataSlot::Uint64: + return arrow::uint64(); + default: + Y_FAIL("unexpected type %s", TString(dataType->GetKindAsStr()).c_str()); + } + } + + static arrow::ValueDescr ToValueDescr(TType* type) { + auto* blockType = AS_TYPE(TBlockType, type); + const auto shape = blockType->GetShape() == TBlockType::EShape::Single + ? arrow::ValueDescr::SCALAR + : arrow::ValueDescr::ARRAY; + return arrow::ValueDescr(ConvertType(blockType->GetItemType()), shape); + } + +private: + IComputationNode* LeftArg; + IComputationNode* RightArg; + const arrow::ValueDescr LeftValueDesc; + const arrow::ValueDescr RightValueDesc; + const arrow::ValueDescr OutputValueDescr; + const arrow::compute::ScalarKernel& Kernel; + const int OutputTypeBitWidth; + arrow::compute::FunctionRegistry& FunctionRegistry; +}; + +} + +IComputationNode* WrapBlockAdd(TCallable& callable, const TComputationNodeFactoryContext& ctx) { + const auto* callableType = callable.GetType(); + return new TBlockAddWrapper(ctx.Mutables, + LocateNode(ctx.NodeLocator, callable, 0), + LocateNode(ctx.NodeLocator, callable, 1), + callableType->GetArgumentType(0), + callableType->GetArgumentType(1), + callableType->GetReturnType()); +} + +} +} diff --git a/ydb/library/yql/minikql/comp_nodes/mkql_block_add.h b/ydb/library/yql/minikql/comp_nodes/mkql_block_add.h index 4508498634..bc4aa3b370 100644 --- a/ydb/library/yql/minikql/comp_nodes/mkql_block_add.h +++ b/ydb/library/yql/minikql/comp_nodes/mkql_block_add.h @@ -1,11 +1,11 @@ -#pragma once - +#pragma once + #include <ydb/library/yql/minikql/computation/mkql_computation_node.h> - -namespace NKikimr { -namespace NMiniKQL { - -IComputationNode* WrapBlockAdd(TCallable& callable, const TComputationNodeFactoryContext& ctx); - -} -} + +namespace NKikimr { +namespace NMiniKQL { + +IComputationNode* WrapBlockAdd(TCallable& callable, const TComputationNodeFactoryContext& ctx); + +} +} diff --git a/ydb/library/yql/minikql/comp_nodes/mkql_blocks.cpp b/ydb/library/yql/minikql/comp_nodes/mkql_blocks.cpp index 0533362eac..b14c5002d2 100644 --- a/ydb/library/yql/minikql/comp_nodes/mkql_blocks.cpp +++ b/ydb/library/yql/minikql/comp_nodes/mkql_blocks.cpp @@ -1,304 +1,304 @@ -#include "mkql_blocks.h" - +#include "mkql_blocks.h" + #include <ydb/library/yql/minikql/arrow/arrow_defs.h> #include <ydb/library/yql/minikql/computation/mkql_computation_node_holders.h> #include <ydb/library/yql/minikql/mkql_node_builder.h> #include <ydb/library/yql/minikql/mkql_node_cast.h> - -#include <arrow/array/builder_primitive.h> -#include <arrow/util/bitmap.h> -#include <arrow/util/bit_util.h> - -#include <util/generic/size_literals.h> - -namespace NKikimr { -namespace NMiniKQL { - -namespace { - -class TBlockBuilder { -public: - explicit TBlockBuilder(TComputationContext& ctx) - : ItemType(arrow::uint64()) - , MaxLength_(MaxBlockSizeInBytes / TypeSize(*ItemType)) - , Ctx(ctx) - , Builder(&Ctx.ArrowMemoryPool) - { - ARROW_OK(Builder.Reserve(MaxLength_)); - } - - void Add(NUdf::TUnboxedValue& value) { - Y_VERIFY_DEBUG(Builder.length() < MaxLength_); - if (value) { - Builder.UnsafeAppend(value.Get<ui64>()); - } else { - Builder.UnsafeAppendNull(); - } - } - - inline size_t MaxLength() const noexcept { - return MaxLength_; - } - - NUdf::TUnboxedValuePod Build() { - std::shared_ptr<arrow::ArrayData> result; - ARROW_OK(Builder.FinishInternal(&result)); - return Ctx.HolderFactory.CreateArrowBlock(std::move(result)); - } - -private: - static int64_t TypeSize(arrow::DataType& itemType) { - const auto bits = static_cast<const arrow::FixedWidthType&>(itemType).bit_width(); - return arrow::BitUtil::BytesForBits(bits); - } - -private: - static constexpr size_t MaxBlockSizeInBytes = 1_MB; - -private: - std::shared_ptr<arrow::DataType> ItemType; - const size_t MaxLength_; - TComputationContext& Ctx; - arrow::UInt64Builder Builder; -}; - -class TToBlocksWrapper: public TStatelessFlowComputationNode<TToBlocksWrapper> { -public: - explicit TToBlocksWrapper(IComputationNode* flow) - : TStatelessFlowComputationNode(flow, EValueRepresentation::Boxed) - , Flow(flow) - { - } - - NUdf::TUnboxedValuePod DoCalculate(TComputationContext& ctx) const { - auto builder = TBlockBuilder(ctx); - - for (size_t i = 0; i < builder.MaxLength(); ++i) { - auto result = Flow->GetValue(ctx); - if (result.IsFinish() || result.IsYield()) { - if (i == 0) { - return result.Release(); - } - break; - } - builder.Add(result); - } - - return builder.Build(); - } - -private: - void RegisterDependencies() const final { - FlowDependsOn(Flow); - } - -private: - IComputationNode* const Flow; -}; - -class TWideToBlocksWrapper : public TStatefulWideFlowComputationNode<TWideToBlocksWrapper> { -public: - TWideToBlocksWrapper(TComputationMutables& mutables, - IComputationWideFlowNode* flow, - size_t width) - : TStatefulWideFlowComputationNode(mutables, flow, EValueRepresentation::Embedded) - , Flow(flow) - , Width(width) - { - Y_VERIFY_DEBUG(Width > 0); - } - - EFetchResult DoCalculate(NUdf::TUnboxedValue& state, - TComputationContext& ctx, - NUdf::TUnboxedValue*const* output) const - { - auto builders = std::vector<TBlockBuilder>(); - builders.reserve(Width); - for (size_t i = 0; i < Width; ++i) { - builders.push_back(TBlockBuilder(ctx)); - } - size_t maxLength = builders.front().MaxLength(); - for (size_t i = 1; i < Width; ++i) { - maxLength = Min(maxLength, builders[i].MaxLength()); - } - - auto& s = GetState(state, ctx); - for (size_t i = 0; i < maxLength; ++i) { - if (const auto result = Flow->FetchValues(ctx, s.ValuePointers.data()); EFetchResult::One != result) { - if (i == 0) { - return result; - } - break; - } - for (size_t j = 0; j < Width; ++j) { - if (output[j] != nullptr) { - builders[j].Add(s.Values[j]); - } - } - } - - for (size_t i = 0; i < Width; ++i) { - if (auto* out = output[i]; out != nullptr) { - *out = builders[i].Build(); - } - } - - return EFetchResult::One; - } - -private: - struct TState: public TComputationValue<TState> { - std::vector<NUdf::TUnboxedValue> Values; - std::vector<NUdf::TUnboxedValue*> ValuePointers; - - TState(TMemoryUsageInfo* memInfo, size_t width) - : TComputationValue(memInfo) - , Values(width) - , ValuePointers(width) - { - for (size_t i = 0; i < width; ++i) { - ValuePointers[i] = &Values[i]; - } - } - }; - -private: - void RegisterDependencies() const final { - FlowDependsOn(Flow); - } - - TState& GetState(NUdf::TUnboxedValue& state, TComputationContext& ctx) const { - if (!state.HasValue()) { - state = ctx.HolderFactory.Create<TState>(Width); - } - return *static_cast<TState*>(state.AsBoxed().Get()); - } - -private: - IComputationWideFlowNode* Flow; - const size_t Width; -}; - -class TFromBlocksWrapper : public TMutableComputationNode<TFromBlocksWrapper> { -public: - TFromBlocksWrapper(TComputationMutables& mutables, IComputationNode* flow) - : TMutableComputationNode(mutables) - , Flow(flow) - , StateIndex(mutables.CurValueIndex++) - { - } - - NUdf::TUnboxedValuePod DoCalculate(TComputationContext& ctx) const { - auto& state = GetState(ctx); - - if (state.Array == nullptr || state.Index == state.Array->length) { - auto result = Flow->GetValue(ctx); - if (result.IsFinish()) { - return NUdf::TUnboxedValue::MakeFinish(); - } - if (result.IsYield()) { - return NUdf::TUnboxedValue::MakeYield(); - } - state.Array = TArrowBlock::From(result).GetDatum().array(); - state.Index = 0; - } - - const auto result = state.GetValue(); - ++state.Index; - return result; - } - -private: - struct TState: public TComputationValue<TState> { - using TComputationValue::TComputationValue; - - NUdf::TUnboxedValuePod GetValue() const { - const auto nullCount = Array->GetNullCount(); - - return nullCount == Array->length || (nullCount > 0 && !HasValue()) - ? NUdf::TUnboxedValuePod() - : DoGetValue(); - } - - private: - NUdf::TUnboxedValuePod DoGetValue() const { - return NUdf::TUnboxedValuePod(Array->GetValues<ui64>(1)[Index]); - } - - bool HasValue() const { - return arrow::BitUtil::GetBit(Array->GetValues<uint8_t>(0), Index + Array->offset); - } - - public: - std::shared_ptr<arrow::ArrayData> Array{nullptr}; - size_t Index{0}; - }; - -private: - void RegisterDependencies() const final { - this->DependsOn(Flow); - } - - TState& GetState(TComputationContext& ctx) const { - auto& result = ctx.MutableValues[StateIndex]; - if (!result.HasValue()) { - result = ctx.HolderFactory.Create<TState>(); - } - return *static_cast<TState*>(result.AsBoxed().Get()); - } - -private: - IComputationNode* const Flow; - const ui32 StateIndex; -}; - -arrow::Datum ExtractLiteral(TRuntimeNode n) { - if (n.GetStaticType()->IsOptional()) { - const auto* dataLiteral = AS_VALUE(TOptionalLiteral, n); - if (!dataLiteral->HasItem()) { - return arrow::MakeNullScalar(arrow::uint64()); - } - n = dataLiteral->GetItem(); - } - - const auto* dataLiteral = AS_VALUE(TDataLiteral, n); - return arrow::Datum(static_cast<uint64_t>(dataLiteral->AsValue().Get<ui64>())); -} - -} - -IComputationNode* WrapToBlocks(TCallable& callable, const TComputationNodeFactoryContext& ctx) { - MKQL_ENSURE(callable.GetInputsCount() == 1, "Expected 1 args, got " << callable.GetInputsCount()); - - return new TToBlocksWrapper(LocateNode(ctx.NodeLocator, callable, 0)); -} - -IComputationNode* WrapWideToBlocks(TCallable& callable, const TComputationNodeFactoryContext& ctx) { - MKQL_ENSURE(callable.GetInputsCount() == 1, "Expected 1 args, got " << callable.GetInputsCount()); - - const auto* flowType = AS_TYPE(TFlowType, callable.GetInput(0).GetStaticType()); - const auto* tupleType = AS_TYPE(TTupleType, flowType->GetItemType()); - - auto* wideFlow = dynamic_cast<IComputationWideFlowNode*>(LocateNode(ctx.NodeLocator, callable, 0)); - MKQL_ENSURE(wideFlow != nullptr, "Expected wide flow node"); - - return new TWideToBlocksWrapper(ctx.Mutables, - wideFlow, - tupleType->GetElementsCount()); -} - -IComputationNode* WrapFromBlocks(TCallable& callable, const TComputationNodeFactoryContext& ctx) { - MKQL_ENSURE(callable.GetInputsCount() == 1, "Expected 1 args, got " << callable.GetInputsCount()); - - return new TFromBlocksWrapper(ctx.Mutables, LocateNode(ctx.NodeLocator, callable, 0)); -} - -IComputationNode* WrapAsSingle(TCallable& callable, const TComputationNodeFactoryContext& ctx) { - MKQL_ENSURE(callable.GetInputsCount() == 1, "Expected 1 args, got " << callable.GetInputsCount()); - - auto value = ExtractLiteral(callable.GetInput(0U)); - return ctx.NodeFactory.CreateImmutableNode(ctx.HolderFactory.CreateArrowBlock(std::move(value))); -} - -} -} + +#include <arrow/array/builder_primitive.h> +#include <arrow/util/bitmap.h> +#include <arrow/util/bit_util.h> + +#include <util/generic/size_literals.h> + +namespace NKikimr { +namespace NMiniKQL { + +namespace { + +class TBlockBuilder { +public: + explicit TBlockBuilder(TComputationContext& ctx) + : ItemType(arrow::uint64()) + , MaxLength_(MaxBlockSizeInBytes / TypeSize(*ItemType)) + , Ctx(ctx) + , Builder(&Ctx.ArrowMemoryPool) + { + ARROW_OK(Builder.Reserve(MaxLength_)); + } + + void Add(NUdf::TUnboxedValue& value) { + Y_VERIFY_DEBUG(Builder.length() < MaxLength_); + if (value) { + Builder.UnsafeAppend(value.Get<ui64>()); + } else { + Builder.UnsafeAppendNull(); + } + } + + inline size_t MaxLength() const noexcept { + return MaxLength_; + } + + NUdf::TUnboxedValuePod Build() { + std::shared_ptr<arrow::ArrayData> result; + ARROW_OK(Builder.FinishInternal(&result)); + return Ctx.HolderFactory.CreateArrowBlock(std::move(result)); + } + +private: + static int64_t TypeSize(arrow::DataType& itemType) { + const auto bits = static_cast<const arrow::FixedWidthType&>(itemType).bit_width(); + return arrow::BitUtil::BytesForBits(bits); + } + +private: + static constexpr size_t MaxBlockSizeInBytes = 1_MB; + +private: + std::shared_ptr<arrow::DataType> ItemType; + const size_t MaxLength_; + TComputationContext& Ctx; + arrow::UInt64Builder Builder; +}; + +class TToBlocksWrapper: public TStatelessFlowComputationNode<TToBlocksWrapper> { +public: + explicit TToBlocksWrapper(IComputationNode* flow) + : TStatelessFlowComputationNode(flow, EValueRepresentation::Boxed) + , Flow(flow) + { + } + + NUdf::TUnboxedValuePod DoCalculate(TComputationContext& ctx) const { + auto builder = TBlockBuilder(ctx); + + for (size_t i = 0; i < builder.MaxLength(); ++i) { + auto result = Flow->GetValue(ctx); + if (result.IsFinish() || result.IsYield()) { + if (i == 0) { + return result.Release(); + } + break; + } + builder.Add(result); + } + + return builder.Build(); + } + +private: + void RegisterDependencies() const final { + FlowDependsOn(Flow); + } + +private: + IComputationNode* const Flow; +}; + +class TWideToBlocksWrapper : public TStatefulWideFlowComputationNode<TWideToBlocksWrapper> { +public: + TWideToBlocksWrapper(TComputationMutables& mutables, + IComputationWideFlowNode* flow, + size_t width) + : TStatefulWideFlowComputationNode(mutables, flow, EValueRepresentation::Embedded) + , Flow(flow) + , Width(width) + { + Y_VERIFY_DEBUG(Width > 0); + } + + EFetchResult DoCalculate(NUdf::TUnboxedValue& state, + TComputationContext& ctx, + NUdf::TUnboxedValue*const* output) const + { + auto builders = std::vector<TBlockBuilder>(); + builders.reserve(Width); + for (size_t i = 0; i < Width; ++i) { + builders.push_back(TBlockBuilder(ctx)); + } + size_t maxLength = builders.front().MaxLength(); + for (size_t i = 1; i < Width; ++i) { + maxLength = Min(maxLength, builders[i].MaxLength()); + } + + auto& s = GetState(state, ctx); + for (size_t i = 0; i < maxLength; ++i) { + if (const auto result = Flow->FetchValues(ctx, s.ValuePointers.data()); EFetchResult::One != result) { + if (i == 0) { + return result; + } + break; + } + for (size_t j = 0; j < Width; ++j) { + if (output[j] != nullptr) { + builders[j].Add(s.Values[j]); + } + } + } + + for (size_t i = 0; i < Width; ++i) { + if (auto* out = output[i]; out != nullptr) { + *out = builders[i].Build(); + } + } + + return EFetchResult::One; + } + +private: + struct TState: public TComputationValue<TState> { + std::vector<NUdf::TUnboxedValue> Values; + std::vector<NUdf::TUnboxedValue*> ValuePointers; + + TState(TMemoryUsageInfo* memInfo, size_t width) + : TComputationValue(memInfo) + , Values(width) + , ValuePointers(width) + { + for (size_t i = 0; i < width; ++i) { + ValuePointers[i] = &Values[i]; + } + } + }; + +private: + void RegisterDependencies() const final { + FlowDependsOn(Flow); + } + + TState& GetState(NUdf::TUnboxedValue& state, TComputationContext& ctx) const { + if (!state.HasValue()) { + state = ctx.HolderFactory.Create<TState>(Width); + } + return *static_cast<TState*>(state.AsBoxed().Get()); + } + +private: + IComputationWideFlowNode* Flow; + const size_t Width; +}; + +class TFromBlocksWrapper : public TMutableComputationNode<TFromBlocksWrapper> { +public: + TFromBlocksWrapper(TComputationMutables& mutables, IComputationNode* flow) + : TMutableComputationNode(mutables) + , Flow(flow) + , StateIndex(mutables.CurValueIndex++) + { + } + + NUdf::TUnboxedValuePod DoCalculate(TComputationContext& ctx) const { + auto& state = GetState(ctx); + + if (state.Array == nullptr || state.Index == state.Array->length) { + auto result = Flow->GetValue(ctx); + if (result.IsFinish()) { + return NUdf::TUnboxedValue::MakeFinish(); + } + if (result.IsYield()) { + return NUdf::TUnboxedValue::MakeYield(); + } + state.Array = TArrowBlock::From(result).GetDatum().array(); + state.Index = 0; + } + + const auto result = state.GetValue(); + ++state.Index; + return result; + } + +private: + struct TState: public TComputationValue<TState> { + using TComputationValue::TComputationValue; + + NUdf::TUnboxedValuePod GetValue() const { + const auto nullCount = Array->GetNullCount(); + + return nullCount == Array->length || (nullCount > 0 && !HasValue()) + ? NUdf::TUnboxedValuePod() + : DoGetValue(); + } + + private: + NUdf::TUnboxedValuePod DoGetValue() const { + return NUdf::TUnboxedValuePod(Array->GetValues<ui64>(1)[Index]); + } + + bool HasValue() const { + return arrow::BitUtil::GetBit(Array->GetValues<uint8_t>(0), Index + Array->offset); + } + + public: + std::shared_ptr<arrow::ArrayData> Array{nullptr}; + size_t Index{0}; + }; + +private: + void RegisterDependencies() const final { + this->DependsOn(Flow); + } + + TState& GetState(TComputationContext& ctx) const { + auto& result = ctx.MutableValues[StateIndex]; + if (!result.HasValue()) { + result = ctx.HolderFactory.Create<TState>(); + } + return *static_cast<TState*>(result.AsBoxed().Get()); + } + +private: + IComputationNode* const Flow; + const ui32 StateIndex; +}; + +arrow::Datum ExtractLiteral(TRuntimeNode n) { + if (n.GetStaticType()->IsOptional()) { + const auto* dataLiteral = AS_VALUE(TOptionalLiteral, n); + if (!dataLiteral->HasItem()) { + return arrow::MakeNullScalar(arrow::uint64()); + } + n = dataLiteral->GetItem(); + } + + const auto* dataLiteral = AS_VALUE(TDataLiteral, n); + return arrow::Datum(static_cast<uint64_t>(dataLiteral->AsValue().Get<ui64>())); +} + +} + +IComputationNode* WrapToBlocks(TCallable& callable, const TComputationNodeFactoryContext& ctx) { + MKQL_ENSURE(callable.GetInputsCount() == 1, "Expected 1 args, got " << callable.GetInputsCount()); + + return new TToBlocksWrapper(LocateNode(ctx.NodeLocator, callable, 0)); +} + +IComputationNode* WrapWideToBlocks(TCallable& callable, const TComputationNodeFactoryContext& ctx) { + MKQL_ENSURE(callable.GetInputsCount() == 1, "Expected 1 args, got " << callable.GetInputsCount()); + + const auto* flowType = AS_TYPE(TFlowType, callable.GetInput(0).GetStaticType()); + const auto* tupleType = AS_TYPE(TTupleType, flowType->GetItemType()); + + auto* wideFlow = dynamic_cast<IComputationWideFlowNode*>(LocateNode(ctx.NodeLocator, callable, 0)); + MKQL_ENSURE(wideFlow != nullptr, "Expected wide flow node"); + + return new TWideToBlocksWrapper(ctx.Mutables, + wideFlow, + tupleType->GetElementsCount()); +} + +IComputationNode* WrapFromBlocks(TCallable& callable, const TComputationNodeFactoryContext& ctx) { + MKQL_ENSURE(callable.GetInputsCount() == 1, "Expected 1 args, got " << callable.GetInputsCount()); + + return new TFromBlocksWrapper(ctx.Mutables, LocateNode(ctx.NodeLocator, callable, 0)); +} + +IComputationNode* WrapAsSingle(TCallable& callable, const TComputationNodeFactoryContext& ctx) { + MKQL_ENSURE(callable.GetInputsCount() == 1, "Expected 1 args, got " << callable.GetInputsCount()); + + auto value = ExtractLiteral(callable.GetInput(0U)); + return ctx.NodeFactory.CreateImmutableNode(ctx.HolderFactory.CreateArrowBlock(std::move(value))); +} + +} +} diff --git a/ydb/library/yql/minikql/comp_nodes/mkql_blocks.h b/ydb/library/yql/minikql/comp_nodes/mkql_blocks.h index d0dae21cf7..c13dd5339a 100644 --- a/ydb/library/yql/minikql/comp_nodes/mkql_blocks.h +++ b/ydb/library/yql/minikql/comp_nodes/mkql_blocks.h @@ -1,14 +1,14 @@ -#pragma once - +#pragma once + #include <ydb/library/yql/minikql/computation/mkql_computation_node.h> - -namespace NKikimr { -namespace NMiniKQL { - -IComputationNode* WrapToBlocks(TCallable& callable, const TComputationNodeFactoryContext& ctx); -IComputationNode* WrapWideToBlocks(TCallable& callable, const TComputationNodeFactoryContext& ctx); -IComputationNode* WrapFromBlocks(TCallable& callable, const TComputationNodeFactoryContext& ctx); -IComputationNode* WrapAsSingle(TCallable& callable, const TComputationNodeFactoryContext& ctx); - -} -} + +namespace NKikimr { +namespace NMiniKQL { + +IComputationNode* WrapToBlocks(TCallable& callable, const TComputationNodeFactoryContext& ctx); +IComputationNode* WrapWideToBlocks(TCallable& callable, const TComputationNodeFactoryContext& ctx); +IComputationNode* WrapFromBlocks(TCallable& callable, const TComputationNodeFactoryContext& ctx); +IComputationNode* WrapAsSingle(TCallable& callable, const TComputationNodeFactoryContext& ctx); + +} +} diff --git a/ydb/library/yql/minikql/comp_nodes/mkql_factory.cpp b/ydb/library/yql/minikql/comp_nodes/mkql_factory.cpp index f142ac5ff4..57599acb64 100644 --- a/ydb/library/yql/minikql/comp_nodes/mkql_factory.cpp +++ b/ydb/library/yql/minikql/comp_nodes/mkql_factory.cpp @@ -4,8 +4,8 @@ #include "mkql_aggrcount.h" #include "mkql_append.h" #include "mkql_apply.h" -#include "mkql_block_add.h" -#include "mkql_blocks.h" +#include "mkql_block_add.h" +#include "mkql_blocks.h" #include "mkql_callable.h" #include "mkql_chain_map.h" #include "mkql_chain1_map.h" @@ -257,11 +257,11 @@ struct TCallableComputationNodeBuilderFuncMapFiller { {"DecimalMul", &WrapDecimalMul}, {"ToFlow", &WrapToFlow}, {"FromFlow", &WrapFromFlow}, - {"ToBlocks", &WrapToBlocks}, - {"WideToBlocks", &WrapWideToBlocks}, - {"BlockAdd", &WrapBlockAdd}, - {"FromBlocks", &WrapFromBlocks}, - {"AsSingle", &WrapAsSingle}, + {"ToBlocks", &WrapToBlocks}, + {"WideToBlocks", &WrapWideToBlocks}, + {"BlockAdd", &WrapBlockAdd}, + {"FromBlocks", &WrapFromBlocks}, + {"AsSingle", &WrapAsSingle}, {"MakeHeap", &WrapMakeHeap}, {"PushHeap", &WrapPushHeap}, {"PopHeap", &WrapPopHeap}, diff --git a/ydb/library/yql/minikql/comp_nodes/ut/mkql_blocks_ut.cpp b/ydb/library/yql/minikql/comp_nodes/ut/mkql_blocks_ut.cpp index 7be7f89d2f..2fd575f239 100644 --- a/ydb/library/yql/minikql/comp_nodes/ut/mkql_blocks_ut.cpp +++ b/ydb/library/yql/minikql/comp_nodes/ut/mkql_blocks_ut.cpp @@ -1,323 +1,323 @@ -#include "mkql_computation_node_ut.h" - +#include "mkql_computation_node_ut.h" + #include <ydb/library/yql/minikql/computation/mkql_computation_node_holders.h> - -namespace NKikimr { -namespace NMiniKQL { -Y_UNIT_TEST_SUITE(TMiniKQLBlocksTest) { -Y_UNIT_TEST(TestEmpty) { - TSetup<false> setup; - auto& pb = *setup.PgmBuilder; - - const auto type = pb.NewDataType(NUdf::TDataType<ui64>::Id); - const auto list = pb.NewEmptyList(type); - const auto sourceFlow = pb.ToFlow(list); - const auto flowAfterBlocks = pb.FromBlocks(pb.ToBlocks(sourceFlow)); - const auto pgmReturn = pb.ForwardList(flowAfterBlocks); - - const auto graph = setup.BuildGraph(pgmReturn); - const auto iterator = graph->GetValue().GetListIterator(); - NUdf::TUnboxedValue item; - UNIT_ASSERT(!iterator.Next(item)); -} - -Y_UNIT_TEST(TestSimple) { - static const size_t dataCount = 1000; - TSetup<false> setup; - auto& pb = *setup.PgmBuilder; - - auto data = TVector<TRuntimeNode>(Reserve(dataCount)); - for (size_t i = 0; i < dataCount; ++i) { - data.push_back(pb.NewDataLiteral<ui64>(i)); - } - const auto type = pb.NewDataType(NUdf::TDataType<ui64>::Id); - const auto list = pb.NewList(type, data); - const auto sourceFlow = pb.ToFlow(list); - const auto flowAfterBlocks = pb.FromBlocks(pb.ToBlocks(sourceFlow)); - const auto pgmReturn = pb.ForwardList(flowAfterBlocks); - - const auto graph = setup.BuildGraph(pgmReturn); - const auto iterator = graph->GetValue().GetListIterator(); - - for (size_t i = 0; i < dataCount; ++i) { - NUdf::TUnboxedValue item; - UNIT_ASSERT(iterator.Next(item)); - UNIT_ASSERT_VALUES_EQUAL(item.Get<ui64>(), i); - } - NUdf::TUnboxedValue item; - UNIT_ASSERT(!iterator.Next(item)); -} - -Y_UNIT_TEST(TestWideToBlocks) { - TSetup<false> setup; - auto& pb = *setup.PgmBuilder; - - const auto ui64Type = pb.NewDataType(NUdf::TDataType<ui64>::Id); - const auto tupleType = pb.NewTupleType({ui64Type, ui64Type}); - - const auto data1 = pb.NewTuple(tupleType, {pb.NewDataLiteral<ui64>(1), pb.NewDataLiteral<ui64>(10)}); - const auto data2 = pb.NewTuple(tupleType, {pb.NewDataLiteral<ui64>(2), pb.NewDataLiteral<ui64>(20)}); - const auto data3 = pb.NewTuple(tupleType, {pb.NewDataLiteral<ui64>(3), pb.NewDataLiteral<ui64>(30)}); - - const auto list = pb.NewList(tupleType, {data1, data2, data3}); - const auto flow = pb.ToFlow(list); - - const auto wideFlow = pb.ExpandMap(flow, [&](TRuntimeNode item) -> TRuntimeNode::TList { - return {pb.Nth(item, 0U), pb.Nth(item, 1U)}; - }); - const auto wideBlocksFlow = pb.WideToBlocks(wideFlow); - const auto narrowBlocksFlow = pb.NarrowMap(wideBlocksFlow, [&](TRuntimeNode::TList items) -> TRuntimeNode { - return items[1]; - }); - const auto narrowFlow = pb.FromBlocks(narrowBlocksFlow); - const auto pgmReturn = pb.ForwardList(narrowFlow); - - const auto graph = setup.BuildGraph(pgmReturn); - const auto iterator = graph->GetValue().GetListIterator(); - - NUdf::TUnboxedValue item; - UNIT_ASSERT(iterator.Next(item)); - UNIT_ASSERT_VALUES_EQUAL(item.Get<ui64>(), 10); - - UNIT_ASSERT(iterator.Next(item)); - UNIT_ASSERT_VALUES_EQUAL(item.Get<ui64>(), 20); - - UNIT_ASSERT(iterator.Next(item)); - UNIT_ASSERT_VALUES_EQUAL(item.Get<ui64>(), 30); -} - -Y_UNIT_TEST(TestSingle) { - const ui64 testValue = 42; - - TSetup<false> setup; - auto& pb = *setup.PgmBuilder; - - auto dataLiteral = pb.NewDataLiteral<ui64>(testValue); - const auto dataAfterBlocks = pb.AsSingle(dataLiteral); - - const auto graph = setup.BuildGraph(dataAfterBlocks); - const auto value = graph->GetValue(); - UNIT_ASSERT(value.HasValue() && value.IsBoxed()); - UNIT_ASSERT_VALUES_EQUAL(TArrowBlock::From(value).GetDatum().scalar_as<arrow::UInt64Scalar>().value, testValue); -} - -Y_UNIT_TEST(TestBlockAdd) { - TSetup<false> setup; - TProgramBuilder& pb = *setup.PgmBuilder; - - const auto ui64Type = pb.NewDataType(NUdf::TDataType<ui64>::Id); - const auto tupleType = pb.NewTupleType({ui64Type, ui64Type}); - - const auto data1 = pb.NewTuple(tupleType, {pb.NewDataLiteral<ui64>(1), pb.NewDataLiteral<ui64>(10)}); - const auto data2 = pb.NewTuple(tupleType, {pb.NewDataLiteral<ui64>(2), pb.NewDataLiteral<ui64>(20)}); - const auto data3 = pb.NewTuple(tupleType, {pb.NewDataLiteral<ui64>(3), pb.NewDataLiteral<ui64>(30)}); - - const auto list = pb.NewList(tupleType, {data1, data2, data3}); - const auto flow = pb.ToFlow(list); - - const auto wideFlow = pb.ExpandMap(flow, [&](TRuntimeNode item) -> TRuntimeNode::TList { - return {pb.Nth(item, 0U), pb.Nth(item, 1U)}; - }); - const auto wideBlocksFlow = pb.WideToBlocks(wideFlow); - const auto sumWideFlow = pb.WideMap(wideBlocksFlow, [&](TRuntimeNode::TList items) -> TRuntimeNode::TList { - return {pb.BlockAdd(items[0], items[1])}; - }); - const auto sumNarrowFlow = pb.NarrowMap(sumWideFlow, [&](TRuntimeNode::TList items) -> TRuntimeNode { - return items[0]; - }); - const auto pgmReturn = pb.Collect(pb.FromBlocks(sumNarrowFlow)); - - const auto graph = setup.BuildGraph(pgmReturn); - const auto iterator = graph->GetValue().GetListIterator(); - - NUdf::TUnboxedValue item; - UNIT_ASSERT(iterator.Next(item)); - UNIT_ASSERT_VALUES_EQUAL(item.Get<ui64>(), 11); - - UNIT_ASSERT(iterator.Next(item)); - UNIT_ASSERT_VALUES_EQUAL(item.Get<ui64>(), 22); - - UNIT_ASSERT(iterator.Next(item)); - UNIT_ASSERT_VALUES_EQUAL(item.Get<ui64>(), 33); - UNIT_ASSERT(!iterator.Next(item)); - UNIT_ASSERT(!iterator.Next(item)); -} - -Y_UNIT_TEST(TestBlockAddWithNullables) { - TSetup<false> setup; - TProgramBuilder& pb = *setup.PgmBuilder; - - const auto optionalUi64Type = pb.NewDataType(NUdf::TDataType<ui64>::Id, true); - const auto tupleType = pb.NewTupleType({optionalUi64Type, optionalUi64Type}); - const auto emptyOptionalUi64 = pb.NewEmptyOptional(optionalUi64Type); - - const auto data1 = pb.NewTuple(tupleType, { - pb.NewOptional(pb.NewDataLiteral<ui64>(1)), - emptyOptionalUi64 - }); - const auto data2 = pb.NewTuple(tupleType, { - emptyOptionalUi64, - pb.NewOptional(pb.NewDataLiteral<ui64>(20)) - }); - const auto data3 = pb.NewTuple(tupleType, { - emptyOptionalUi64, - emptyOptionalUi64 - }); - const auto data4 = pb.NewTuple(tupleType, { - pb.NewOptional(pb.NewDataLiteral<ui64>(10)), - pb.NewOptional(pb.NewDataLiteral<ui64>(20)) - }); - - const auto list = pb.NewList(tupleType, {data1, data2, data3, data4}); - const auto flow = pb.ToFlow(list); - - const auto wideFlow = pb.ExpandMap(flow, [&](TRuntimeNode item) -> TRuntimeNode::TList { - return {pb.Nth(item, 0U), pb.Nth(item, 1U)}; - }); - const auto wideBlocksFlow = pb.WideToBlocks(wideFlow); - const auto sumWideFlow = pb.WideMap(wideBlocksFlow, [&](TRuntimeNode::TList items) -> TRuntimeNode::TList { - return {pb.BlockAdd(items[0], items[1])}; - }); - const auto sumNarrowFlow = pb.NarrowMap(sumWideFlow, [&](TRuntimeNode::TList items) -> TRuntimeNode { - return items[0]; - }); - const auto pgmReturn = pb.Collect(pb.FromBlocks(sumNarrowFlow)); - - const auto graph = setup.BuildGraph(pgmReturn); - const auto iterator = graph->GetValue().GetListIterator(); - - NUdf::TUnboxedValue item; - UNIT_ASSERT(iterator.Next(item)); - UNIT_ASSERT(!item); - - UNIT_ASSERT(iterator.Next(item)); - UNIT_ASSERT(!item); - - UNIT_ASSERT(iterator.Next(item)); - UNIT_ASSERT(!item); - - UNIT_ASSERT(iterator.Next(item)); - UNIT_ASSERT_VALUES_EQUAL(item.Get<ui64>(), 30); - UNIT_ASSERT(!iterator.Next(item)); -} - -Y_UNIT_TEST(TestBlockAddWithNullableSingle) { - TSetup<false> setup; - TProgramBuilder& pb = *setup.PgmBuilder; - - const auto optionalUi64Type = pb.NewDataType(NUdf::TDataType<ui64>::Id, true); - const auto emptyOptionalUi64 = pb.NewEmptyOptional(optionalUi64Type); - - const auto list = pb.NewList(optionalUi64Type, { - pb.NewOptional(pb.NewDataLiteral<ui64>(10)), - pb.NewOptional(pb.NewDataLiteral<ui64>(20)), - pb.NewOptional(pb.NewDataLiteral<ui64>(30)) - }); - const auto flow = pb.ToFlow(list); - const auto blocksFlow = pb.ToBlocks(flow); - - THolder<IComputationGraph> graph; - auto map = [&](const TProgramBuilder::TUnaryLambda& func) { - const auto pgmReturn = pb.Collect(pb.FromBlocks(pb.Map(blocksFlow, func))); - graph = setup.BuildGraph(pgmReturn); - return graph->GetValue().GetListIterator(); - }; - - { - const auto single = pb.AsSingle(emptyOptionalUi64); - auto iterator = map([&](TRuntimeNode item) -> TRuntimeNode { - return {pb.BlockAdd(single, item)}; - }); - - NUdf::TUnboxedValue item; - UNIT_ASSERT(iterator.Next(item)); - UNIT_ASSERT(!item); - - UNIT_ASSERT(iterator.Next(item)); - UNIT_ASSERT(!item); - - UNIT_ASSERT(iterator.Next(item)); - UNIT_ASSERT(!item); - - UNIT_ASSERT(!iterator.Next(item)); - } - - { - const auto single = pb.AsSingle(emptyOptionalUi64); - auto iterator = map([&](TRuntimeNode item) -> TRuntimeNode { - return {pb.BlockAdd(item, single)}; - }); - - NUdf::TUnboxedValue item; - UNIT_ASSERT(iterator.Next(item)); - UNIT_ASSERT(!item); - - UNIT_ASSERT(iterator.Next(item)); - UNIT_ASSERT(!item); - - UNIT_ASSERT(iterator.Next(item)); - UNIT_ASSERT(!item); - - UNIT_ASSERT(!iterator.Next(item)); - } - - { - const auto single = pb.AsSingle(pb.NewDataLiteral<ui64>(100)); - auto iterator = map([&](TRuntimeNode item) -> TRuntimeNode { - return {pb.BlockAdd(item, single)}; - }); - - NUdf::TUnboxedValue item; - UNIT_ASSERT(iterator.Next(item)); - UNIT_ASSERT_VALUES_EQUAL(item.Get<ui64>(), 110); - - UNIT_ASSERT(iterator.Next(item)); - UNIT_ASSERT_VALUES_EQUAL(item.Get<ui64>(), 120); - - UNIT_ASSERT(iterator.Next(item)); - UNIT_ASSERT_VALUES_EQUAL(item.Get<ui64>(), 130); - - UNIT_ASSERT(!iterator.Next(item)); - } -} - -Y_UNIT_TEST(TestBlockAddWithSingle) { - TSetup<false> setup; - TProgramBuilder& pb = *setup.PgmBuilder; - - const auto ui64Type = pb.NewDataType(NUdf::TDataType<ui64>::Id); - - const auto data1 = pb.NewDataLiteral<ui64>(10); - const auto data2 = pb.NewDataLiteral<ui64>(20); - const auto data3 = pb.NewDataLiteral<ui64>(30); - const auto rightSingle = pb.AsSingle(pb.NewDataLiteral<ui64>(100)); - const auto leftSingle = pb.AsSingle(pb.NewDataLiteral<ui64>(1000)); - - const auto list = pb.NewList(ui64Type, {data1, data2, data3}); - const auto flow = pb.ToFlow(list); - const auto blocksFlow = pb.ToBlocks(flow); - const auto sumBlocksFlow = pb.Map(blocksFlow, [&](TRuntimeNode item) -> TRuntimeNode { - return {pb.BlockAdd(leftSingle, {pb.BlockAdd(item, rightSingle )})}; - }); - const auto pgmReturn = pb.Collect(pb.FromBlocks(sumBlocksFlow)); - - const auto graph = setup.BuildGraph(pgmReturn); - const auto iterator = graph->GetValue().GetListIterator(); - - NUdf::TUnboxedValue item; - UNIT_ASSERT(iterator.Next(item)); - UNIT_ASSERT_VALUES_EQUAL(item.Get<ui64>(), 1110); - - UNIT_ASSERT(iterator.Next(item)); - UNIT_ASSERT_VALUES_EQUAL(item.Get<ui64>(), 1120); - - UNIT_ASSERT(iterator.Next(item)); - UNIT_ASSERT_VALUES_EQUAL(item.Get<ui64>(), 1130); - UNIT_ASSERT(!iterator.Next(item)); - UNIT_ASSERT(!iterator.Next(item)); -} - -} - -} -} + +namespace NKikimr { +namespace NMiniKQL { +Y_UNIT_TEST_SUITE(TMiniKQLBlocksTest) { +Y_UNIT_TEST(TestEmpty) { + TSetup<false> setup; + auto& pb = *setup.PgmBuilder; + + const auto type = pb.NewDataType(NUdf::TDataType<ui64>::Id); + const auto list = pb.NewEmptyList(type); + const auto sourceFlow = pb.ToFlow(list); + const auto flowAfterBlocks = pb.FromBlocks(pb.ToBlocks(sourceFlow)); + const auto pgmReturn = pb.ForwardList(flowAfterBlocks); + + const auto graph = setup.BuildGraph(pgmReturn); + const auto iterator = graph->GetValue().GetListIterator(); + NUdf::TUnboxedValue item; + UNIT_ASSERT(!iterator.Next(item)); +} + +Y_UNIT_TEST(TestSimple) { + static const size_t dataCount = 1000; + TSetup<false> setup; + auto& pb = *setup.PgmBuilder; + + auto data = TVector<TRuntimeNode>(Reserve(dataCount)); + for (size_t i = 0; i < dataCount; ++i) { + data.push_back(pb.NewDataLiteral<ui64>(i)); + } + const auto type = pb.NewDataType(NUdf::TDataType<ui64>::Id); + const auto list = pb.NewList(type, data); + const auto sourceFlow = pb.ToFlow(list); + const auto flowAfterBlocks = pb.FromBlocks(pb.ToBlocks(sourceFlow)); + const auto pgmReturn = pb.ForwardList(flowAfterBlocks); + + const auto graph = setup.BuildGraph(pgmReturn); + const auto iterator = graph->GetValue().GetListIterator(); + + for (size_t i = 0; i < dataCount; ++i) { + NUdf::TUnboxedValue item; + UNIT_ASSERT(iterator.Next(item)); + UNIT_ASSERT_VALUES_EQUAL(item.Get<ui64>(), i); + } + NUdf::TUnboxedValue item; + UNIT_ASSERT(!iterator.Next(item)); +} + +Y_UNIT_TEST(TestWideToBlocks) { + TSetup<false> setup; + auto& pb = *setup.PgmBuilder; + + const auto ui64Type = pb.NewDataType(NUdf::TDataType<ui64>::Id); + const auto tupleType = pb.NewTupleType({ui64Type, ui64Type}); + + const auto data1 = pb.NewTuple(tupleType, {pb.NewDataLiteral<ui64>(1), pb.NewDataLiteral<ui64>(10)}); + const auto data2 = pb.NewTuple(tupleType, {pb.NewDataLiteral<ui64>(2), pb.NewDataLiteral<ui64>(20)}); + const auto data3 = pb.NewTuple(tupleType, {pb.NewDataLiteral<ui64>(3), pb.NewDataLiteral<ui64>(30)}); + + const auto list = pb.NewList(tupleType, {data1, data2, data3}); + const auto flow = pb.ToFlow(list); + + const auto wideFlow = pb.ExpandMap(flow, [&](TRuntimeNode item) -> TRuntimeNode::TList { + return {pb.Nth(item, 0U), pb.Nth(item, 1U)}; + }); + const auto wideBlocksFlow = pb.WideToBlocks(wideFlow); + const auto narrowBlocksFlow = pb.NarrowMap(wideBlocksFlow, [&](TRuntimeNode::TList items) -> TRuntimeNode { + return items[1]; + }); + const auto narrowFlow = pb.FromBlocks(narrowBlocksFlow); + const auto pgmReturn = pb.ForwardList(narrowFlow); + + const auto graph = setup.BuildGraph(pgmReturn); + const auto iterator = graph->GetValue().GetListIterator(); + + NUdf::TUnboxedValue item; + UNIT_ASSERT(iterator.Next(item)); + UNIT_ASSERT_VALUES_EQUAL(item.Get<ui64>(), 10); + + UNIT_ASSERT(iterator.Next(item)); + UNIT_ASSERT_VALUES_EQUAL(item.Get<ui64>(), 20); + + UNIT_ASSERT(iterator.Next(item)); + UNIT_ASSERT_VALUES_EQUAL(item.Get<ui64>(), 30); +} + +Y_UNIT_TEST(TestSingle) { + const ui64 testValue = 42; + + TSetup<false> setup; + auto& pb = *setup.PgmBuilder; + + auto dataLiteral = pb.NewDataLiteral<ui64>(testValue); + const auto dataAfterBlocks = pb.AsSingle(dataLiteral); + + const auto graph = setup.BuildGraph(dataAfterBlocks); + const auto value = graph->GetValue(); + UNIT_ASSERT(value.HasValue() && value.IsBoxed()); + UNIT_ASSERT_VALUES_EQUAL(TArrowBlock::From(value).GetDatum().scalar_as<arrow::UInt64Scalar>().value, testValue); +} + +Y_UNIT_TEST(TestBlockAdd) { + TSetup<false> setup; + TProgramBuilder& pb = *setup.PgmBuilder; + + const auto ui64Type = pb.NewDataType(NUdf::TDataType<ui64>::Id); + const auto tupleType = pb.NewTupleType({ui64Type, ui64Type}); + + const auto data1 = pb.NewTuple(tupleType, {pb.NewDataLiteral<ui64>(1), pb.NewDataLiteral<ui64>(10)}); + const auto data2 = pb.NewTuple(tupleType, {pb.NewDataLiteral<ui64>(2), pb.NewDataLiteral<ui64>(20)}); + const auto data3 = pb.NewTuple(tupleType, {pb.NewDataLiteral<ui64>(3), pb.NewDataLiteral<ui64>(30)}); + + const auto list = pb.NewList(tupleType, {data1, data2, data3}); + const auto flow = pb.ToFlow(list); + + const auto wideFlow = pb.ExpandMap(flow, [&](TRuntimeNode item) -> TRuntimeNode::TList { + return {pb.Nth(item, 0U), pb.Nth(item, 1U)}; + }); + const auto wideBlocksFlow = pb.WideToBlocks(wideFlow); + const auto sumWideFlow = pb.WideMap(wideBlocksFlow, [&](TRuntimeNode::TList items) -> TRuntimeNode::TList { + return {pb.BlockAdd(items[0], items[1])}; + }); + const auto sumNarrowFlow = pb.NarrowMap(sumWideFlow, [&](TRuntimeNode::TList items) -> TRuntimeNode { + return items[0]; + }); + const auto pgmReturn = pb.Collect(pb.FromBlocks(sumNarrowFlow)); + + const auto graph = setup.BuildGraph(pgmReturn); + const auto iterator = graph->GetValue().GetListIterator(); + + NUdf::TUnboxedValue item; + UNIT_ASSERT(iterator.Next(item)); + UNIT_ASSERT_VALUES_EQUAL(item.Get<ui64>(), 11); + + UNIT_ASSERT(iterator.Next(item)); + UNIT_ASSERT_VALUES_EQUAL(item.Get<ui64>(), 22); + + UNIT_ASSERT(iterator.Next(item)); + UNIT_ASSERT_VALUES_EQUAL(item.Get<ui64>(), 33); + UNIT_ASSERT(!iterator.Next(item)); + UNIT_ASSERT(!iterator.Next(item)); +} + +Y_UNIT_TEST(TestBlockAddWithNullables) { + TSetup<false> setup; + TProgramBuilder& pb = *setup.PgmBuilder; + + const auto optionalUi64Type = pb.NewDataType(NUdf::TDataType<ui64>::Id, true); + const auto tupleType = pb.NewTupleType({optionalUi64Type, optionalUi64Type}); + const auto emptyOptionalUi64 = pb.NewEmptyOptional(optionalUi64Type); + + const auto data1 = pb.NewTuple(tupleType, { + pb.NewOptional(pb.NewDataLiteral<ui64>(1)), + emptyOptionalUi64 + }); + const auto data2 = pb.NewTuple(tupleType, { + emptyOptionalUi64, + pb.NewOptional(pb.NewDataLiteral<ui64>(20)) + }); + const auto data3 = pb.NewTuple(tupleType, { + emptyOptionalUi64, + emptyOptionalUi64 + }); + const auto data4 = pb.NewTuple(tupleType, { + pb.NewOptional(pb.NewDataLiteral<ui64>(10)), + pb.NewOptional(pb.NewDataLiteral<ui64>(20)) + }); + + const auto list = pb.NewList(tupleType, {data1, data2, data3, data4}); + const auto flow = pb.ToFlow(list); + + const auto wideFlow = pb.ExpandMap(flow, [&](TRuntimeNode item) -> TRuntimeNode::TList { + return {pb.Nth(item, 0U), pb.Nth(item, 1U)}; + }); + const auto wideBlocksFlow = pb.WideToBlocks(wideFlow); + const auto sumWideFlow = pb.WideMap(wideBlocksFlow, [&](TRuntimeNode::TList items) -> TRuntimeNode::TList { + return {pb.BlockAdd(items[0], items[1])}; + }); + const auto sumNarrowFlow = pb.NarrowMap(sumWideFlow, [&](TRuntimeNode::TList items) -> TRuntimeNode { + return items[0]; + }); + const auto pgmReturn = pb.Collect(pb.FromBlocks(sumNarrowFlow)); + + const auto graph = setup.BuildGraph(pgmReturn); + const auto iterator = graph->GetValue().GetListIterator(); + + NUdf::TUnboxedValue item; + UNIT_ASSERT(iterator.Next(item)); + UNIT_ASSERT(!item); + + UNIT_ASSERT(iterator.Next(item)); + UNIT_ASSERT(!item); + + UNIT_ASSERT(iterator.Next(item)); + UNIT_ASSERT(!item); + + UNIT_ASSERT(iterator.Next(item)); + UNIT_ASSERT_VALUES_EQUAL(item.Get<ui64>(), 30); + UNIT_ASSERT(!iterator.Next(item)); +} + +Y_UNIT_TEST(TestBlockAddWithNullableSingle) { + TSetup<false> setup; + TProgramBuilder& pb = *setup.PgmBuilder; + + const auto optionalUi64Type = pb.NewDataType(NUdf::TDataType<ui64>::Id, true); + const auto emptyOptionalUi64 = pb.NewEmptyOptional(optionalUi64Type); + + const auto list = pb.NewList(optionalUi64Type, { + pb.NewOptional(pb.NewDataLiteral<ui64>(10)), + pb.NewOptional(pb.NewDataLiteral<ui64>(20)), + pb.NewOptional(pb.NewDataLiteral<ui64>(30)) + }); + const auto flow = pb.ToFlow(list); + const auto blocksFlow = pb.ToBlocks(flow); + + THolder<IComputationGraph> graph; + auto map = [&](const TProgramBuilder::TUnaryLambda& func) { + const auto pgmReturn = pb.Collect(pb.FromBlocks(pb.Map(blocksFlow, func))); + graph = setup.BuildGraph(pgmReturn); + return graph->GetValue().GetListIterator(); + }; + + { + const auto single = pb.AsSingle(emptyOptionalUi64); + auto iterator = map([&](TRuntimeNode item) -> TRuntimeNode { + return {pb.BlockAdd(single, item)}; + }); + + NUdf::TUnboxedValue item; + UNIT_ASSERT(iterator.Next(item)); + UNIT_ASSERT(!item); + + UNIT_ASSERT(iterator.Next(item)); + UNIT_ASSERT(!item); + + UNIT_ASSERT(iterator.Next(item)); + UNIT_ASSERT(!item); + + UNIT_ASSERT(!iterator.Next(item)); + } + + { + const auto single = pb.AsSingle(emptyOptionalUi64); + auto iterator = map([&](TRuntimeNode item) -> TRuntimeNode { + return {pb.BlockAdd(item, single)}; + }); + + NUdf::TUnboxedValue item; + UNIT_ASSERT(iterator.Next(item)); + UNIT_ASSERT(!item); + + UNIT_ASSERT(iterator.Next(item)); + UNIT_ASSERT(!item); + + UNIT_ASSERT(iterator.Next(item)); + UNIT_ASSERT(!item); + + UNIT_ASSERT(!iterator.Next(item)); + } + + { + const auto single = pb.AsSingle(pb.NewDataLiteral<ui64>(100)); + auto iterator = map([&](TRuntimeNode item) -> TRuntimeNode { + return {pb.BlockAdd(item, single)}; + }); + + NUdf::TUnboxedValue item; + UNIT_ASSERT(iterator.Next(item)); + UNIT_ASSERT_VALUES_EQUAL(item.Get<ui64>(), 110); + + UNIT_ASSERT(iterator.Next(item)); + UNIT_ASSERT_VALUES_EQUAL(item.Get<ui64>(), 120); + + UNIT_ASSERT(iterator.Next(item)); + UNIT_ASSERT_VALUES_EQUAL(item.Get<ui64>(), 130); + + UNIT_ASSERT(!iterator.Next(item)); + } +} + +Y_UNIT_TEST(TestBlockAddWithSingle) { + TSetup<false> setup; + TProgramBuilder& pb = *setup.PgmBuilder; + + const auto ui64Type = pb.NewDataType(NUdf::TDataType<ui64>::Id); + + const auto data1 = pb.NewDataLiteral<ui64>(10); + const auto data2 = pb.NewDataLiteral<ui64>(20); + const auto data3 = pb.NewDataLiteral<ui64>(30); + const auto rightSingle = pb.AsSingle(pb.NewDataLiteral<ui64>(100)); + const auto leftSingle = pb.AsSingle(pb.NewDataLiteral<ui64>(1000)); + + const auto list = pb.NewList(ui64Type, {data1, data2, data3}); + const auto flow = pb.ToFlow(list); + const auto blocksFlow = pb.ToBlocks(flow); + const auto sumBlocksFlow = pb.Map(blocksFlow, [&](TRuntimeNode item) -> TRuntimeNode { + return {pb.BlockAdd(leftSingle, {pb.BlockAdd(item, rightSingle )})}; + }); + const auto pgmReturn = pb.Collect(pb.FromBlocks(sumBlocksFlow)); + + const auto graph = setup.BuildGraph(pgmReturn); + const auto iterator = graph->GetValue().GetListIterator(); + + NUdf::TUnboxedValue item; + UNIT_ASSERT(iterator.Next(item)); + UNIT_ASSERT_VALUES_EQUAL(item.Get<ui64>(), 1110); + + UNIT_ASSERT(iterator.Next(item)); + UNIT_ASSERT_VALUES_EQUAL(item.Get<ui64>(), 1120); + + UNIT_ASSERT(iterator.Next(item)); + UNIT_ASSERT_VALUES_EQUAL(item.Get<ui64>(), 1130); + UNIT_ASSERT(!iterator.Next(item)); + UNIT_ASSERT(!iterator.Next(item)); +} + +} + +} +} diff --git a/ydb/library/yql/minikql/comp_nodes/ut/mkql_computation_node_ut.cpp b/ydb/library/yql/minikql/comp_nodes/ut/mkql_computation_node_ut.cpp index 4c305b3560..19f82aae4e 100644 --- a/ydb/library/yql/minikql/comp_nodes/ut/mkql_computation_node_ut.cpp +++ b/ydb/library/yql/minikql/comp_nodes/ut/mkql_computation_node_ut.cpp @@ -16,7 +16,7 @@ namespace NMiniKQL { namespace { - + ui64 g_Yield = std::numeric_limits<ui64>::max(); ui64 g_TestStreamData[] = {0, 0, 1, 0, 0, 0, 1, 2, 3}; ui64 g_TestYieldStreamData[] = {0, 1, 2, g_Yield, 0, g_Yield, 1, 2, 0, 1, 2, 0, g_Yield, 1, 2}; diff --git a/ydb/library/yql/minikql/comp_nodes/ut/ya.make b/ydb/library/yql/minikql/comp_nodes/ut/ya.make index 70dde719ff..dffda1318f 100644 --- a/ydb/library/yql/minikql/comp_nodes/ut/ya.make +++ b/ydb/library/yql/minikql/comp_nodes/ut/ya.make @@ -19,7 +19,7 @@ OWNER( ) SRCS( - mkql_blocks_ut.cpp + mkql_blocks_ut.cpp mkql_combine_ut.cpp mkql_condense_ut.cpp mkql_decimal_ut.cpp diff --git a/ydb/library/yql/minikql/comp_nodes/ya.make b/ydb/library/yql/minikql/comp_nodes/ya.make index 99a5eff186..55838c4012 100644 --- a/ydb/library/yql/minikql/comp_nodes/ya.make +++ b/ydb/library/yql/minikql/comp_nodes/ya.make @@ -16,8 +16,8 @@ SRCS( mkql_append.h mkql_apply.cpp mkql_apply.h - mkql_block_add.cpp - mkql_blocks.cpp + mkql_block_add.cpp + mkql_blocks.cpp mkql_callable.cpp mkql_callable.h mkql_chain_map.cpp @@ -210,7 +210,7 @@ SRCS( ) PEERDIR( - contrib/libs/apache/arrow + contrib/libs/apache/arrow ydb/library/binary_json ydb/library/yql/minikql ydb/library/yql/minikql/invoke_builtins diff --git a/ydb/library/yql/minikql/computation/mkql_computation_node.cpp b/ydb/library/yql/minikql/computation/mkql_computation_node.cpp index aafc04ae1f..aa2e27f028 100644 --- a/ydb/library/yql/minikql/computation/mkql_computation_node.cpp +++ b/ydb/library/yql/minikql/computation/mkql_computation_node.cpp @@ -26,15 +26,15 @@ namespace NKikimr { namespace NMiniKQL { -TComputationContext::TComputationContext(const THolderFactory& holderFactory, - const NUdf::IValueBuilder* builder, - TComputationOptsFull& opts, - const TComputationMutables& mutables, - arrow::MemoryPool& arrowMemoryPool) +TComputationContext::TComputationContext(const THolderFactory& holderFactory, + const NUdf::IValueBuilder* builder, + TComputationOptsFull& opts, + const TComputationMutables& mutables, + arrow::MemoryPool& arrowMemoryPool) : TComputationContextLLVM{holderFactory, opts.Stats, std::make_unique<NUdf::TUnboxedValue[]>(mutables.CurValueIndex), builder} , RandomProvider(opts.RandomProvider) , TimeProvider(opts.TimeProvider) - , ArrowMemoryPool(arrowMemoryPool) + , ArrowMemoryPool(arrowMemoryPool) { std::fill_n(MutableValues.get(), mutables.CurValueIndex, NUdf::TUnboxedValue(NUdf::TUnboxedValuePod::Invalid())); } diff --git a/ydb/library/yql/minikql/computation/mkql_computation_node.h b/ydb/library/yql/minikql/computation/mkql_computation_node.h index 09c1ecf219..a5d11b180b 100644 --- a/ydb/library/yql/minikql/computation/mkql_computation_node.h +++ b/ydb/library/yql/minikql/computation/mkql_computation_node.h @@ -82,13 +82,13 @@ struct TComputationContext : public TComputationContextLLVM { IRandomProvider& RandomProvider; ITimeProvider& TimeProvider; bool ExecuteLLVM = true; - arrow::MemoryPool& ArrowMemoryPool; + arrow::MemoryPool& ArrowMemoryPool; - TComputationContext(const THolderFactory& holderFactory, - const NUdf::IValueBuilder* builder, - TComputationOptsFull& opts, - const TComputationMutables& mutables, - arrow::MemoryPool& arrowMemoryPool); + TComputationContext(const THolderFactory& holderFactory, + const NUdf::IValueBuilder* builder, + TComputationOptsFull& opts, + const TComputationMutables& mutables, + arrow::MemoryPool& arrowMemoryPool); ~TComputationContext(); // Returns true if current usage delta exceeds the memory limit diff --git a/ydb/library/yql/minikql/computation/mkql_computation_node_graph.cpp b/ydb/library/yql/minikql/computation/mkql_computation_node_graph.cpp index e2a401b366..f2b188ae47 100644 --- a/ydb/library/yql/minikql/computation/mkql_computation_node_graph.cpp +++ b/ydb/library/yql/minikql/computation/mkql_computation_node_graph.cpp @@ -286,10 +286,10 @@ private: VisitType<TFlowType>(node); } - void Visit(TBlockType& node) override { - VisitType<TBlockType>(node); - } - + void Visit(TBlockType& node) override { + VisitType<TBlockType>(node); + } + void Visit(TTaggedType& node) override { VisitType<TTaggedType>(node); } @@ -535,7 +535,7 @@ public: HolderFactory = MakeHolder<THolderFactory>(CompOpts.AllocState, *MemInfo, patternNodes->HolderFactory->GetFunctionRegistry()); ValueBuilder = MakeHolder<TDefaultValueBuilder>(*HolderFactory.Get(), compOpts.ValidatePolicy); ValueBuilder->SetSecureParamsProvider(CompOpts.SecureParamsProvider); - ArrowMemoryPool = MakeArrowMemoryPool(CompOpts.AllocState); + ArrowMemoryPool = MakeArrowMemoryPool(CompOpts.AllocState); } ~TComputationGraph() { @@ -553,11 +553,11 @@ public: void Prepare() override { if (!IsPrepared) { - Ctx.Reset(new TComputationContext(*HolderFactory, - ValueBuilder.Get(), - CompOpts, - PatternNodes->GetMutables(), - *ArrowMemoryPool)); + Ctx.Reset(new TComputationContext(*HolderFactory, + ValueBuilder.Get(), + CompOpts, + PatternNodes->GetMutables(), + *ArrowMemoryPool)); ValueBuilder->SetCalleePositionHolder(Ctx->CalleePosition); for (auto& node : PatternNodes->GetNodes()) { node->InitNode(*Ctx); @@ -650,7 +650,7 @@ private: const TIntrusivePtr<TMemoryUsageInfo> MemInfo; THolder<THolderFactory> HolderFactory; THolder<TDefaultValueBuilder> ValueBuilder; - std::unique_ptr<arrow::MemoryPool> ArrowMemoryPool; + std::unique_ptr<arrow::MemoryPool> ArrowMemoryPool; THolder<TComputationContext> Ctx; TComputationOptsFull CompOpts; bool IsPrepared = false; diff --git a/ydb/library/yql/minikql/computation/mkql_computation_node_holders.cpp b/ydb/library/yql/minikql/computation/mkql_computation_node_holders.cpp index 9e89235b92..67a7cfb302 100644 --- a/ydb/library/yql/minikql/computation/mkql_computation_node_holders.cpp +++ b/ydb/library/yql/minikql/computation/mkql_computation_node_holders.cpp @@ -3042,10 +3042,10 @@ NUdf::TSingleBlockPtr THolderFactory::CreateSingleBlock(const NUdf::TUnboxedValu return AllocateOn<TSingleBlockImpl>(CurrentAllocState, &MemInfo, value); } -NUdf::TUnboxedValuePod THolderFactory::CreateArrowBlock(arrow::Datum&& datum) const { - return Create<TArrowBlock>(std::move(datum)); -} - +NUdf::TUnboxedValuePod THolderFactory::CreateArrowBlock(arrow::Datum&& datum) const { + return Create<TArrowBlock>(std::move(datum)); +} + NUdf::TUnboxedValuePod THolderFactory::VectorAsArray(TUnboxedValueVector& values) const { if (values.empty()) return GetEmptyContainer(); diff --git a/ydb/library/yql/minikql/computation/mkql_computation_node_holders.h b/ydb/library/yql/minikql/computation/mkql_computation_node_holders.h index 2a0c80759a..4486d6f43a 100644 --- a/ydb/library/yql/minikql/computation/mkql_computation_node_holders.h +++ b/ydb/library/yql/minikql/computation/mkql_computation_node_holders.h @@ -9,8 +9,8 @@ #include <ydb/library/yql/minikql/compact_hash.h> #include <ydb/library/yql/minikql/mkql_type_ops.h> -#include <contrib/libs/apache/arrow/cpp/src/arrow/datum.h> - +#include <contrib/libs/apache/arrow/cpp/src/arrow/datum.h> + #include <util/generic/maybe.h> #include <util/memory/pool.h> @@ -321,34 +321,34 @@ private: TType* const Type; }; -class TArrowBlock: public TComputationValue<TArrowBlock> { -public: - explicit TArrowBlock(TMemoryUsageInfo* memInfo, arrow::Datum&& datum) - : TComputationValue(memInfo) - , Datum_(std::move(datum)) - { - } - - inline static TArrowBlock& From(const NUdf::TUnboxedValue& value) { - return *static_cast<TArrowBlock*>(value.AsBoxed().Get()); - } - - inline arrow::Datum& GetDatum() { - return Datum_; - } - - NUdf::TStringRef GetResourceTag() const override { - return NUdf::TStringRef::Of("ArrowBlock"); - } - - void* GetResource() override { - return &Datum_; - } - -private: - arrow::Datum Datum_; -}; - +class TArrowBlock: public TComputationValue<TArrowBlock> { +public: + explicit TArrowBlock(TMemoryUsageInfo* memInfo, arrow::Datum&& datum) + : TComputationValue(memInfo) + , Datum_(std::move(datum)) + { + } + + inline static TArrowBlock& From(const NUdf::TUnboxedValue& value) { + return *static_cast<TArrowBlock*>(value.AsBoxed().Get()); + } + + inline arrow::Datum& GetDatum() { + return Datum_; + } + + NUdf::TStringRef GetResourceTag() const override { + return NUdf::TStringRef::Of("ArrowBlock"); + } + + void* GetResource() override { + return &Datum_; + } + +private: + arrow::Datum Datum_; +}; + ////////////////////////////////////////////////////////////////////////////// // THolderFactory ////////////////////////////////////////////////////////////////////////////// @@ -375,8 +375,8 @@ public: NUdf::TFlatArrayBlockPtr CreateFlatArrayBlock(ui32 count) const; NUdf::TSingleBlockPtr CreateSingleBlock(const NUdf::TUnboxedValue& value) const; - NUdf::TUnboxedValuePod CreateArrowBlock(arrow::Datum&& datum) const; - + NUdf::TUnboxedValuePod CreateArrowBlock(arrow::Datum&& datum) const; + NUdf::TUnboxedValuePod VectorAsArray(TUnboxedValueVector& values) const; template <class TForwardIterator> diff --git a/ydb/library/yql/minikql/computation/ya.make b/ydb/library/yql/minikql/computation/ya.make index 5a8af5675d..ae419b3b23 100644 --- a/ydb/library/yql/minikql/computation/ya.make +++ b/ydb/library/yql/minikql/computation/ya.make @@ -56,7 +56,7 @@ LLVM_BC( ) PEERDIR( - contrib/libs/apache/arrow + contrib/libs/apache/arrow library/cpp/enumbitset library/cpp/packedtypes library/cpp/random_provider diff --git a/ydb/library/yql/minikql/mkql_node.cpp b/ydb/library/yql/minikql/mkql_node.cpp index 1886244760..4c6f3e45ad 100644 --- a/ydb/library/yql/minikql/mkql_node.cpp +++ b/ydb/library/yql/minikql/mkql_node.cpp @@ -209,7 +209,7 @@ TStringBuf TType::GetKindAsStr() const { xx(EmptyList, TEmptyListType) \ xx(EmptyDict, TEmptyDictType) \ xx(Tagged, TTaggedType) \ - xx(Block, TBlockType) \ + xx(Block, TBlockType) \ void TType::Accept(INodeVisitor& visitor) { switch (Kind) { @@ -2138,48 +2138,48 @@ void TVariantLiteral::DoFreeze(const TTypeEnvironment& env) { Item.Freeze(); } -TBlockType::TBlockType(TType* itemType, EShape shape, const TTypeEnvironment& env) - : TType(EKind::Block, env.GetTypeOfType()) - , ItemType(itemType) - , Shape(shape) -{ -} - -TBlockType* TBlockType::Create(TType* itemType, EShape shape, const TTypeEnvironment& env) { - return ::new(env.Allocate<TBlockType>()) TBlockType(itemType, shape, env); -} - -bool TBlockType::IsSameType(const TBlockType& typeToCompare) const { - return GetItemType()->IsSameType(*typeToCompare.GetItemType()) && Shape == typeToCompare.Shape; -} - -bool TBlockType::IsConvertableTo(const TBlockType& typeToCompare, bool ignoreTagged) const { - return Shape == typeToCompare.Shape && - GetItemType()->IsConvertableTo(*typeToCompare.GetItemType(), ignoreTagged); -} - -void TBlockType::DoUpdateLinks(const THashMap<TNode*, TNode*>& links) { - const auto itemTypeIt = links.find(ItemType); - if (itemTypeIt != links.end()) { - auto* newNode = itemTypeIt->second; - Y_VERIFY_DEBUG(ItemType->Equals(*newNode)); - ItemType = static_cast<TType*>(newNode); - } -} - -TNode* TBlockType::DoCloneOnCallableWrite(const TTypeEnvironment& env) const { - auto newTypeNode = (TNode*)ItemType->GetCookie(); - if (!newTypeNode) { - return const_cast<TBlockType*>(this); - } - - return ::new(env.Allocate<TBlockType>()) TBlockType(static_cast<TType*>(newTypeNode), Shape, env); -} - -void TBlockType::DoFreeze(const TTypeEnvironment& env) { - Y_UNUSED(env); -} - +TBlockType::TBlockType(TType* itemType, EShape shape, const TTypeEnvironment& env) + : TType(EKind::Block, env.GetTypeOfType()) + , ItemType(itemType) + , Shape(shape) +{ +} + +TBlockType* TBlockType::Create(TType* itemType, EShape shape, const TTypeEnvironment& env) { + return ::new(env.Allocate<TBlockType>()) TBlockType(itemType, shape, env); +} + +bool TBlockType::IsSameType(const TBlockType& typeToCompare) const { + return GetItemType()->IsSameType(*typeToCompare.GetItemType()) && Shape == typeToCompare.Shape; +} + +bool TBlockType::IsConvertableTo(const TBlockType& typeToCompare, bool ignoreTagged) const { + return Shape == typeToCompare.Shape && + GetItemType()->IsConvertableTo(*typeToCompare.GetItemType(), ignoreTagged); +} + +void TBlockType::DoUpdateLinks(const THashMap<TNode*, TNode*>& links) { + const auto itemTypeIt = links.find(ItemType); + if (itemTypeIt != links.end()) { + auto* newNode = itemTypeIt->second; + Y_VERIFY_DEBUG(ItemType->Equals(*newNode)); + ItemType = static_cast<TType*>(newNode); + } +} + +TNode* TBlockType::DoCloneOnCallableWrite(const TTypeEnvironment& env) const { + auto newTypeNode = (TNode*)ItemType->GetCookie(); + if (!newTypeNode) { + return const_cast<TBlockType*>(this); + } + + return ::new(env.Allocate<TBlockType>()) TBlockType(static_cast<TType*>(newTypeNode), Shape, env); +} + +void TBlockType::DoFreeze(const TTypeEnvironment& env) { + Y_UNUSED(env); +} + bool IsNumericType(NUdf::TDataTypeId typeId) { auto slot = NUdf::FindDataSlot(typeId); return slot && NUdf::GetDataTypeInfo(*slot).Features & NUdf::NumericType; @@ -2236,7 +2236,7 @@ EValueRepresentation GetValueRepresentation(const TType* type) { case TType::EKind::Dict: case TType::EKind::List: case TType::EKind::Resource: - case TType::EKind::Block: + case TType::EKind::Block: case TType::EKind::Callable: case TType::EKind::EmptyList: case TType::EKind::EmptyDict: diff --git a/ydb/library/yql/minikql/mkql_node.h b/ydb/library/yql/minikql/mkql_node.h index 3ae2e8bdaa..ee74455ea6 100644 --- a/ydb/library/yql/minikql/mkql_node.h +++ b/ydb/library/yql/minikql/mkql_node.h @@ -147,8 +147,8 @@ class TTypeEnvironment; XX(ReservedKind, 15) \ XX(EmptyList, 16 + 2) \ XX(EmptyDict, 32 + 2) \ - XX(Tagged, 48 + 7) \ - XX(Block, 16 + 13) + XX(Tagged, 48 + 7) \ + XX(Block, 16 + 13) class TType : public TNode { public: @@ -1321,44 +1321,44 @@ private: ui32 Index; }; -class TBlockType : public TType { - friend class TType; - -public: - enum class EShape: ui8 { - Single = 0, - Many = 1 - }; - -public: - static TBlockType* Create(TType* itemType, EShape shape, const TTypeEnvironment& env); - - using TType::IsSameType; - bool IsSameType(const TBlockType& typeToCompare) const; - - using TType::IsConvertableTo; - bool IsConvertableTo(const TBlockType& typeToCompare, bool ignoreTagged = false) const; - - inline TType* GetItemType() const noexcept { - return ItemType; - } - - inline EShape GetShape() const noexcept { - return Shape; - } - -private: - TBlockType(TType* itemType, EShape shape, const TTypeEnvironment& env); - - void DoUpdateLinks(const THashMap<TNode*, TNode*>& links); - TNode* DoCloneOnCallableWrite(const TTypeEnvironment& env) const; - void DoFreeze(const TTypeEnvironment& env); - -private: - TType* ItemType; - EShape Shape; -}; - +class TBlockType : public TType { + friend class TType; + +public: + enum class EShape: ui8 { + Single = 0, + Many = 1 + }; + +public: + static TBlockType* Create(TType* itemType, EShape shape, const TTypeEnvironment& env); + + using TType::IsSameType; + bool IsSameType(const TBlockType& typeToCompare) const; + + using TType::IsConvertableTo; + bool IsConvertableTo(const TBlockType& typeToCompare, bool ignoreTagged = false) const; + + inline TType* GetItemType() const noexcept { + return ItemType; + } + + inline EShape GetShape() const noexcept { + return Shape; + } + +private: + TBlockType(TType* itemType, EShape shape, const TTypeEnvironment& env); + + void DoUpdateLinks(const THashMap<TNode*, TNode*>& links); + TNode* DoCloneOnCallableWrite(const TTypeEnvironment& env) const; + void DoFreeze(const TTypeEnvironment& env); + +private: + TType* ItemType; + EShape Shape; +}; + inline bool TRuntimeNode::operator==(const TRuntimeNode& other) const { return IsImmediate() == other.IsImmediate() && GetNode()->Equals(*other.GetNode()); } diff --git a/ydb/library/yql/minikql/mkql_node_cast.cpp b/ydb/library/yql/minikql/mkql_node_cast.cpp index e5d644e446..0a62a7d615 100644 --- a/ydb/library/yql/minikql/mkql_node_cast.cpp +++ b/ydb/library/yql/minikql/mkql_node_cast.cpp @@ -52,7 +52,7 @@ MKQL_AS_TYPE(Variant) MKQL_AS_TYPE(Stream) MKQL_AS_TYPE(Flow) MKQL_AS_TYPE(Tagged) -MKQL_AS_TYPE(Block) +MKQL_AS_TYPE(Block) MKQL_AS_VALUE(Any, Type) MKQL_AS_VALUE(Callable, Type) diff --git a/ydb/library/yql/minikql/mkql_node_printer.cpp b/ydb/library/yql/minikql/mkql_node_printer.cpp index 971de764e1..f7515a3005 100644 --- a/ydb/library/yql/minikql/mkql_node_printer.cpp +++ b/ydb/library/yql/minikql/mkql_node_printer.cpp @@ -189,36 +189,36 @@ namespace { WriteNewline(); } - void Visit(TBlockType& node) override { - WriteIndentation(); - Out << "Type (Block) {"; - WriteNewline(); - - { - TIndentScope scope(this); - WriteIndentation(); - Out << "Block item type: {"; - WriteNewline(); - - { - TIndentScope scope2(this); - node.GetItemType()->Accept(*this); - } - - WriteIndentation(); - Out << "}"; - WriteNewline(); - - WriteIndentation(); - Out << "Block shape: " << (node.GetShape() == TBlockType::EShape::Single ? "Single" : "Many"); - WriteNewline(); - } - - WriteIndentation(); - Out << "}"; - WriteNewline(); - } - + void Visit(TBlockType& node) override { + WriteIndentation(); + Out << "Type (Block) {"; + WriteNewline(); + + { + TIndentScope scope(this); + WriteIndentation(); + Out << "Block item type: {"; + WriteNewline(); + + { + TIndentScope scope2(this); + node.GetItemType()->Accept(*this); + } + + WriteIndentation(); + Out << "}"; + WriteNewline(); + + WriteIndentation(); + Out << "Block shape: " << (node.GetShape() == TBlockType::EShape::Single ? "Single" : "Many"); + WriteNewline(); + } + + WriteIndentation(); + Out << "}"; + WriteNewline(); + } + void Visit(TOptionalType& node) override { WriteIndentation(); Out << "Type (Optional) {"; diff --git a/ydb/library/yql/minikql/mkql_node_printer_ut.cpp b/ydb/library/yql/minikql/mkql_node_printer_ut.cpp index 0e72384007..ec8c5d1afe 100644 --- a/ydb/library/yql/minikql/mkql_node_printer_ut.cpp +++ b/ydb/library/yql/minikql/mkql_node_printer_ut.cpp @@ -76,8 +76,8 @@ namespace { structBuilder.Add("41", TRuntimeNode(env.GetEmptyList(), true)); structBuilder.Add("42", TRuntimeNode(env.GetEmptyDict(), true)); structBuilder.Add("43", TRuntimeNode(TTaggedType::Create(dtype1, "mytag", env), true)); - structBuilder.Add("44", TRuntimeNode(TBlockType::Create(dtype1, TBlockType::EShape::Single, env), true)); - structBuilder.Add("45", TRuntimeNode(TBlockType::Create(dtype2, TBlockType::EShape::Many, env), true)); + structBuilder.Add("44", TRuntimeNode(TBlockType::Create(dtype1, TBlockType::EShape::Single, env), true)); + structBuilder.Add("45", TRuntimeNode(TBlockType::Create(dtype2, TBlockType::EShape::Many, env), true)); return structBuilder.Build(); } } diff --git a/ydb/library/yql/minikql/mkql_node_serialization.cpp b/ydb/library/yql/minikql/mkql_node_serialization.cpp index 665e974e33..e314f38cd6 100644 --- a/ydb/library/yql/minikql/mkql_node_serialization.cpp +++ b/ydb/library/yql/minikql/mkql_node_serialization.cpp @@ -210,19 +210,19 @@ namespace { IsProcessed0 = false; } - void Visit(TBlockType& node) override { - if (node.GetCookie() != 0) { - Owner.WriteReference(node); - IsProcessed0 = true; - return; - } - - Owner.Write(TypeMarker | (char)TType::EKind::Block); - auto itemType = node.GetItemType(); - Owner.AddChildNode(*itemType); - IsProcessed0 = false; - } - + void Visit(TBlockType& node) override { + if (node.GetCookie() != 0) { + Owner.WriteReference(node); + IsProcessed0 = true; + return; + } + + Owner.Write(TypeMarker | (char)TType::EKind::Block); + auto itemType = node.GetItemType(); + Owner.AddChildNode(*itemType); + IsProcessed0 = false; + } + void Visit(TTaggedType& node) override { if (node.GetCookie() != 0) { Owner.WriteReference(node); @@ -596,11 +596,11 @@ namespace { Owner.RegisterReference(node); } - void Visit(TBlockType& node) override { - Owner.Write(static_cast<ui8>(node.GetShape())); - Owner.RegisterReference(node); - } - + void Visit(TBlockType& node) override { + Owner.Write(static_cast<ui8>(node.GetShape())); + Owner.RegisterReference(node); + } + void Visit(TTaggedType& node) override { auto tag = node.GetTagStr(); Owner.WriteName(tag); @@ -1196,7 +1196,7 @@ namespace { case TType::EKind::Variant: return ReadVariantType(); case TType::EKind::Flow: - return ReadFlowOrBlockType(code); + return ReadFlowOrBlockType(code); case TType::EKind::Null: return ReadNullType(); default: @@ -1313,15 +1313,15 @@ namespace { return node; } - TNode* ReadFlowOrBlockType(char code) { - switch ((TType::EKind)(code & TypeMask)) { - case TType::EKind::Flow: return ReadFlowType(); - case TType::EKind::Block: return ReadBlockType(); - default: - ThrowCorrupted(); - } - } - + TNode* ReadFlowOrBlockType(char code) { + switch ((TType::EKind)(code & TypeMask)) { + case TType::EKind::Flow: return ReadFlowType(); + case TType::EKind::Block: return ReadBlockType(); + default: + ThrowCorrupted(); + } + } + TNode* ReadFlowType() { auto itemTypeNode = PopNode(); if (itemTypeNode->GetType()->GetKind() != TType::EKind::Type) @@ -1333,26 +1333,26 @@ namespace { return node; } - TNode* ReadBlockType() { - auto itemTypeNode = PopNode(); - if (itemTypeNode->GetType()->GetKind() != TType::EKind::Type) { - ThrowCorrupted(); - } - - const auto shapeChar = Read(); - if (shapeChar != static_cast<char>(TBlockType::EShape::Single) && - shapeChar != static_cast<char>(TBlockType::EShape::Many)) - { - ThrowCorrupted(); - } - const auto shape = static_cast<TBlockType::EShape>(shapeChar); - - auto itemType = static_cast<TType*>(itemTypeNode); - auto node = TBlockType::Create(itemType, shape, Env); - Nodes.push_back(node); - return node; - } - + TNode* ReadBlockType() { + auto itemTypeNode = PopNode(); + if (itemTypeNode->GetType()->GetKind() != TType::EKind::Type) { + ThrowCorrupted(); + } + + const auto shapeChar = Read(); + if (shapeChar != static_cast<char>(TBlockType::EShape::Single) && + shapeChar != static_cast<char>(TBlockType::EShape::Many)) + { + ThrowCorrupted(); + } + const auto shape = static_cast<TBlockType::EShape>(shapeChar); + + auto itemType = static_cast<TType*>(itemTypeNode); + auto node = TBlockType::Create(itemType, shape, Env); + Nodes.push_back(node); + return node; + } + TNode* ReadTaggedType() { auto baseTypeNode = PopNode(); if (baseTypeNode->GetType()->GetKind() != TType::EKind::Type) diff --git a/ydb/library/yql/minikql/mkql_node_visitor.cpp b/ydb/library/yql/minikql/mkql_node_visitor.cpp index 4c6d1ffbb7..5a4ac8e479 100644 --- a/ydb/library/yql/minikql/mkql_node_visitor.cpp +++ b/ydb/library/yql/minikql/mkql_node_visitor.cpp @@ -166,11 +166,11 @@ void TThrowingNodeVisitor::Visit(TTaggedType& node) { ThrowUnexpectedNodeType(); } -void TThrowingNodeVisitor::Visit(TBlockType& node) { - Y_UNUSED(node); - ThrowUnexpectedNodeType(); -} - +void TThrowingNodeVisitor::Visit(TBlockType& node) { + Y_UNUSED(node); + ThrowUnexpectedNodeType(); +} + void TThrowingNodeVisitor::ThrowUnexpectedNodeType() { THROW yexception() << "Unexpected node type"; } @@ -299,10 +299,10 @@ void TEmptyNodeVisitor::Visit(TTaggedType& node) { Y_UNUSED(node); } -void TEmptyNodeVisitor::Visit(TBlockType& node) { - Y_UNUSED(node); -} - +void TEmptyNodeVisitor::Visit(TBlockType& node) { + Y_UNUSED(node); +} + void TExploringNodeVisitor::Visit(TTypeType& node) { Y_VERIFY_DEBUG(node.GetType() == &node); } @@ -476,11 +476,11 @@ void TExploringNodeVisitor::Visit(TTaggedType& node) { AddChildNode(&node, *node.GetBaseType()); } -void TExploringNodeVisitor::Visit(TBlockType& node) { - AddChildNode(&node, *node.GetType()); - AddChildNode(&node, *node.GetItemType()); -} - +void TExploringNodeVisitor::Visit(TBlockType& node) { + AddChildNode(&node, *node.GetType()); + AddChildNode(&node, *node.GetItemType()); +} + void TExploringNodeVisitor::AddChildNode(TNode* parent, TNode& child) { Stack->push_back(&child); diff --git a/ydb/library/yql/minikql/mkql_node_visitor.h b/ydb/library/yql/minikql/mkql_node_visitor.h index 1b7f4b8455..f5bf76d520 100644 --- a/ydb/library/yql/minikql/mkql_node_visitor.h +++ b/ydb/library/yql/minikql/mkql_node_visitor.h @@ -45,7 +45,7 @@ public: virtual void Visit(TStreamType& node) = 0; virtual void Visit(TFlowType& node) = 0; virtual void Visit(TTaggedType& node) = 0; - virtual void Visit(TBlockType& node) = 0; + virtual void Visit(TBlockType& node) = 0; }; class TThrowingNodeVisitor : public INodeVisitor { @@ -81,7 +81,7 @@ public: void Visit(TStreamType& node) override; void Visit(TFlowType& node) override; void Visit(TTaggedType& node) override; - void Visit(TBlockType& node) override; + void Visit(TBlockType& node) override; protected: static void ThrowUnexpectedNodeType(); @@ -120,7 +120,7 @@ public: void Visit(TStreamType& node) override; void Visit(TFlowType& node) override; void Visit(TTaggedType& node) override; - void Visit(TBlockType& node) override; + void Visit(TBlockType& node) override; }; class TExploringNodeVisitor : public INodeVisitor { @@ -158,7 +158,7 @@ public: void Visit(TStreamType& node) override; void Visit(TFlowType& node) override; void Visit(TTaggedType& node) override; - void Visit(TBlockType& node) override; + void Visit(TBlockType& node) override; void Walk(TNode* root, const TTypeEnvironment& env, const std::vector<TNode*>& terminalNodes = std::vector<TNode*>(), bool buildConsumersMap = false, size_t nodesCountHint = 0); diff --git a/ydb/library/yql/minikql/mkql_program_builder.cpp b/ydb/library/yql/minikql/mkql_program_builder.cpp index 1c4d6a1e44..47ae6e4c8f 100644 --- a/ydb/library/yql/minikql/mkql_program_builder.cpp +++ b/ydb/library/yql/minikql/mkql_program_builder.cpp @@ -1365,71 +1365,71 @@ TRuntimeNode TProgramBuilder::Steal(TRuntimeNode input) { return TRuntimeNode(callableBuilder.Build(), false); } -TRuntimeNode TProgramBuilder::ToBlocks(TRuntimeNode flow) { - auto* flowType = AS_TYPE(TFlowType, flow.GetStaticType()); - auto* blockType = NewBlockType(flowType->GetItemType(), TBlockType::EShape::Many); - - TCallableBuilder callableBuilder(Env, __func__, NewFlowType(blockType)); - callableBuilder.Add(flow); - return TRuntimeNode(callableBuilder.Build(), false); -} - -TRuntimeNode TProgramBuilder::WideToBlocks(TRuntimeNode flow) { - TType* outputTupleType; - { - const auto* inputTupleType = AS_TYPE(TTupleType, AS_TYPE(TFlowType, flow.GetStaticType())->GetItemType()); - std::vector<TType*> outputTupleItems; - outputTupleItems.reserve(inputTupleType->GetElementsCount()); - for (size_t i = 0; i < inputTupleType->GetElementsCount(); ++i) { - outputTupleItems.push_back(NewBlockType(inputTupleType->GetElementType(i), TBlockType::EShape::Many)); - } - outputTupleType = NewTupleType(outputTupleItems); - } - - TCallableBuilder callableBuilder(Env, __func__, NewFlowType(outputTupleType)); - callableBuilder.Add(flow); - return TRuntimeNode(callableBuilder.Build(), false); -} - -TRuntimeNode TProgramBuilder::FromBlocks(TRuntimeNode flow) { - auto* flowType = AS_TYPE(TFlowType, flow.GetStaticType()); - auto* blockType = AS_TYPE(TBlockType, flowType->GetItemType()); - - TCallableBuilder callableBuilder(Env, __func__, NewFlowType(blockType->GetItemType())); - callableBuilder.Add(flow); - return TRuntimeNode(callableBuilder.Build(), false); -} - -TRuntimeNode TProgramBuilder::AsSingle(TRuntimeNode value) { - TCallableBuilder callableBuilder(Env, __func__, NewBlockType(value.GetStaticType(), TBlockType::EShape::Single)); - callableBuilder.Add(value); - return TRuntimeNode(callableBuilder.Build(), false); -} - -TRuntimeNode TProgramBuilder::BlockAdd(TRuntimeNode arg1, TRuntimeNode arg2) { - bool arg1Optional; - auto* arg1BlockType = AS_TYPE(TBlockType, arg1.GetStaticType()); - auto* arg1Type = UnpackOptionalData(arg1BlockType->GetItemType(), arg1Optional); - - bool arg2Optional; - auto* arg2BlockType = AS_TYPE(TBlockType, arg2.GetStaticType()); - auto* arg2Type = UnpackOptionalData(arg2BlockType->GetItemType(), arg2Optional); - - MKQL_ENSURE(arg1BlockType->GetShape() != TBlockType::EShape::Single || - arg2BlockType->GetShape() != TBlockType::EShape::Single, - "At least one EShape::Many block expected"); - - auto* resultDataType = BuildArithmeticCommonType(arg1Type, arg2Type); - if (arg1Optional || arg2Optional) { - resultDataType = NewOptionalType(resultDataType); - } - auto* callableType = TCallableBuilder(Env, __func__, NewBlockType(resultDataType, TBlockType::EShape::Many)) - .Add(arg1) - .Add(arg2) - .Build(); - return TRuntimeNode(callableType, false); -} - +TRuntimeNode TProgramBuilder::ToBlocks(TRuntimeNode flow) { + auto* flowType = AS_TYPE(TFlowType, flow.GetStaticType()); + auto* blockType = NewBlockType(flowType->GetItemType(), TBlockType::EShape::Many); + + TCallableBuilder callableBuilder(Env, __func__, NewFlowType(blockType)); + callableBuilder.Add(flow); + return TRuntimeNode(callableBuilder.Build(), false); +} + +TRuntimeNode TProgramBuilder::WideToBlocks(TRuntimeNode flow) { + TType* outputTupleType; + { + const auto* inputTupleType = AS_TYPE(TTupleType, AS_TYPE(TFlowType, flow.GetStaticType())->GetItemType()); + std::vector<TType*> outputTupleItems; + outputTupleItems.reserve(inputTupleType->GetElementsCount()); + for (size_t i = 0; i < inputTupleType->GetElementsCount(); ++i) { + outputTupleItems.push_back(NewBlockType(inputTupleType->GetElementType(i), TBlockType::EShape::Many)); + } + outputTupleType = NewTupleType(outputTupleItems); + } + + TCallableBuilder callableBuilder(Env, __func__, NewFlowType(outputTupleType)); + callableBuilder.Add(flow); + return TRuntimeNode(callableBuilder.Build(), false); +} + +TRuntimeNode TProgramBuilder::FromBlocks(TRuntimeNode flow) { + auto* flowType = AS_TYPE(TFlowType, flow.GetStaticType()); + auto* blockType = AS_TYPE(TBlockType, flowType->GetItemType()); + + TCallableBuilder callableBuilder(Env, __func__, NewFlowType(blockType->GetItemType())); + callableBuilder.Add(flow); + return TRuntimeNode(callableBuilder.Build(), false); +} + +TRuntimeNode TProgramBuilder::AsSingle(TRuntimeNode value) { + TCallableBuilder callableBuilder(Env, __func__, NewBlockType(value.GetStaticType(), TBlockType::EShape::Single)); + callableBuilder.Add(value); + return TRuntimeNode(callableBuilder.Build(), false); +} + +TRuntimeNode TProgramBuilder::BlockAdd(TRuntimeNode arg1, TRuntimeNode arg2) { + bool arg1Optional; + auto* arg1BlockType = AS_TYPE(TBlockType, arg1.GetStaticType()); + auto* arg1Type = UnpackOptionalData(arg1BlockType->GetItemType(), arg1Optional); + + bool arg2Optional; + auto* arg2BlockType = AS_TYPE(TBlockType, arg2.GetStaticType()); + auto* arg2Type = UnpackOptionalData(arg2BlockType->GetItemType(), arg2Optional); + + MKQL_ENSURE(arg1BlockType->GetShape() != TBlockType::EShape::Single || + arg2BlockType->GetShape() != TBlockType::EShape::Single, + "At least one EShape::Many block expected"); + + auto* resultDataType = BuildArithmeticCommonType(arg1Type, arg2Type); + if (arg1Optional || arg2Optional) { + resultDataType = NewOptionalType(resultDataType); + } + auto* callableType = TCallableBuilder(Env, __func__, NewBlockType(resultDataType, TBlockType::EShape::Many)) + .Add(arg1) + .Add(arg2) + .Build(); + return TRuntimeNode(callableType, false); +} + TRuntimeNode TProgramBuilder::ListFromRange(TRuntimeNode start, TRuntimeNode end, TRuntimeNode step) { MKQL_ENSURE(start.GetStaticType()->IsData(), "Expected data"); MKQL_ENSURE(end.GetStaticType()->IsSameType(*start.GetStaticType()), "Mismatch type"); @@ -2034,14 +2034,14 @@ TType* TProgramBuilder::NewFlowType(TType* itemType) { return TFlowType::Create(itemType, Env); } -TType* TProgramBuilder::NewBlockType(TType* itemType, TBlockType::EShape shape) { - bool isOptional; - auto* dataType = UnpackOptionalData(itemType, isOptional); - MKQL_ENSURE(dataType->GetDataSlot() == NUdf::EDataSlot::Uint64, "Expected Uint64"); - - return TBlockType::Create(itemType, shape, Env); -} - +TType* TProgramBuilder::NewBlockType(TType* itemType, TBlockType::EShape shape) { + bool isOptional; + auto* dataType = UnpackOptionalData(itemType, isOptional); + MKQL_ENSURE(dataType->GetDataSlot() == NUdf::EDataSlot::Uint64, "Expected Uint64"); + + return TBlockType::Create(itemType, shape, Env); +} + TType* TProgramBuilder::NewTaggedType(TType* baseType, const std::string_view& tag) { return TTaggedType::Create(baseType, tag, Env); } diff --git a/ydb/library/yql/minikql/mkql_program_builder.h b/ydb/library/yql/minikql/mkql_program_builder.h index 7fe9846332..10f1ad7ccf 100644 --- a/ydb/library/yql/minikql/mkql_program_builder.h +++ b/ydb/library/yql/minikql/mkql_program_builder.h @@ -179,7 +179,7 @@ public: TType* NewStreamType(TType* itemType); TType* NewFlowType(TType* itemType); TType* NewTaggedType(TType* baseType, const std::string_view& tag); - TType* NewBlockType(TType* itemType, TBlockType::EShape shape); + TType* NewBlockType(TType* itemType, TBlockType::EShape shape); TType* NewEmptyTupleType(); TType* NewTupleType(const TArrayRef<TType* const>& elements); @@ -232,13 +232,13 @@ public: TRuntimeNode FromFlow(TRuntimeNode flow); TRuntimeNode Steal(TRuntimeNode input); - TRuntimeNode ToBlocks(TRuntimeNode flow); - TRuntimeNode WideToBlocks(TRuntimeNode flow); - TRuntimeNode FromBlocks(TRuntimeNode flow); - TRuntimeNode AsSingle(TRuntimeNode flow); - - TRuntimeNode BlockAdd(TRuntimeNode data1, TRuntimeNode data2); - + TRuntimeNode ToBlocks(TRuntimeNode flow); + TRuntimeNode WideToBlocks(TRuntimeNode flow); + TRuntimeNode FromBlocks(TRuntimeNode flow); + TRuntimeNode AsSingle(TRuntimeNode flow); + + TRuntimeNode BlockAdd(TRuntimeNode data1, TRuntimeNode data2); + // udfs TRuntimeNode Udf( const std::string_view& funcName, diff --git a/ydb/library/yql/minikql/mkql_terminator.cpp b/ydb/library/yql/minikql/mkql_terminator.cpp index 275428a4d5..d86ba4edaf 100644 --- a/ydb/library/yql/minikql/mkql_terminator.cpp +++ b/ydb/library/yql/minikql/mkql_terminator.cpp @@ -8,14 +8,14 @@ namespace NMiniKQL { thread_local ITerminator* TBindTerminator::Terminator = nullptr; TBindTerminator::TBindTerminator(ITerminator* terminator) - : PreviousTerminator(Terminator) + : PreviousTerminator(Terminator) { - Terminator = terminator; + Terminator = terminator; } TBindTerminator::~TBindTerminator() { - Terminator = PreviousTerminator; + Terminator = PreviousTerminator; } [[noreturn]] void MKQLTerminate(const char* message) { diff --git a/ydb/library/yql/minikql/mkql_terminator.h b/ydb/library/yql/minikql/mkql_terminator.h index 97259759bc..2453e88938 100644 --- a/ydb/library/yql/minikql/mkql_terminator.h +++ b/ydb/library/yql/minikql/mkql_terminator.h @@ -23,7 +23,7 @@ struct TBindTerminator : private TNonCopyable { static thread_local ITerminator* Terminator; private: - ITerminator* PreviousTerminator; + ITerminator* PreviousTerminator; }; [[noreturn]] void MKQLTerminate(const char* message); diff --git a/ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/write_session.cpp b/ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/write_session.cpp index 9b40e1c156..af4e08b979 100644 --- a/ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/write_session.cpp +++ b/ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/write_session.cpp @@ -1127,11 +1127,11 @@ void TWriteSession::HandleWakeUpImpl() { LastTokenUpdate = TInstant::Now(); UpdateTokenIfNeededImpl(); } - - const auto flushAfter = CurrentBatch.StartedAt == TInstant::Zero() - ? WakeupInterval - : WakeupInterval - Min(Now() - CurrentBatch.StartedAt, WakeupInterval); - Connections->ScheduleCallback(flushAfter, std::move(callback)); + + const auto flushAfter = CurrentBatch.StartedAt == TInstant::Zero() + ? WakeupInterval + : WakeupInterval - Min(Now() - CurrentBatch.StartedAt, WakeupInterval); + Connections->ScheduleCallback(flushAfter, std::move(callback)); } void TWriteSession::UpdateTimedCountersImpl() { diff --git a/ydb/public/tools/lib/cmds/__init__.py b/ydb/public/tools/lib/cmds/__init__.py index 71382b515a..977f0a4aae 100644 --- a/ydb/public/tools/lib/cmds/__init__.py +++ b/ydb/public/tools/lib/cmds/__init__.py @@ -143,7 +143,7 @@ class Recipe(object): return os.path.join(self.arguments.ydb_working_dir, self.recipe_metafile) if os.getenv(self.recipe_metafile_var) is not None: return os.getenv(self.recipe_metafile_var) - return os.path.join(self.generate_data_path(), self.recipe_metafile) + return os.path.join(self.generate_data_path(), self.recipe_metafile) def database_file_path(self): if self.arguments.ydb_working_dir: diff --git a/ydb/tests/library/harness/kikimr_config.py b/ydb/tests/library/harness/kikimr_config.py index a14da6596e..57c282b9f4 100644 --- a/ydb/tests/library/harness/kikimr_config.py +++ b/ydb/tests/library/harness/kikimr_config.py @@ -103,7 +103,7 @@ class KikimrConfigGenerator(object): use_log_files=True, grpc_ssl_enable=False, use_in_memory_pdisks=False, - enable_pqcd=False, + enable_pqcd=False, enable_metering=False, grpc_tls_data_path=None, yql_config_path=None, |