aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorgusev-p <gusev-p@yandex-team.ru>2022-02-10 16:47:20 +0300
committerDaniil Cherednik <dcherednik@yandex-team.ru>2022-02-10 16:47:20 +0300
commit47af3b5bf148ddab250833ec454d30d7c4930c31 (patch)
tree9814fbd1c3effac9b8377c5d604b367b14e2db55
parent1715700d00b30399d3648be821fd585ae552365e (diff)
downloadydb-47af3b5bf148ddab250833ec454d30d7c4930c31.tar.gz
Restoring authorship annotation for <gusev-p@yandex-team.ru>. Commit 2 of 2.
-rw-r--r--contrib/libs/grpc/src/core/ext/filters/client_channel/resolver/dns/c_ares/dns_resolver_ares.cc2
-rw-r--r--contrib/libs/grpc/src/core/ext/filters/client_channel/resolver/dns/native/dns_resolver.cc2
-rw-r--r--contrib/libs/grpc/src/core/lib/surface/init.cc2
-rw-r--r--contrib/libs/grpc/src/cpp/server/server_builder.cc2
-rw-r--r--contrib/libs/libiconv/dynamic/libiconv.exports2
-rw-r--r--contrib/libs/tcmalloc/tcmalloc/static_vars.h2
-rw-r--r--contrib/libs/yaml-cpp/include/yaml-cpp/node/iterator.h22
-rw-r--r--library/cpp/actors/core/log.cpp46
-rw-r--r--library/cpp/actors/core/log.h2
-rw-r--r--library/cpp/histogram/adaptive/multi_histogram.h2
-rw-r--r--library/cpp/monlib/counters/counters.h8
-rw-r--r--library/cpp/monlib/dynamic_counters/encode.cpp8
-rw-r--r--library/cpp/monlib/dynamic_counters/encode.h10
-rw-r--r--library/cpp/monlib/encode/buffered/buffered_encoder_base.cpp56
-rw-r--r--library/cpp/monlib/encode/buffered/buffered_encoder_base.h8
-rw-r--r--library/cpp/monlib/encode/buffered/string_pool.h8
-rw-r--r--library/cpp/monlib/encode/json/json.h16
-rw-r--r--library/cpp/monlib/encode/json/json_decoder.cpp2
-rw-r--r--library/cpp/monlib/encode/json/json_decoder_ut.cpp112
-rw-r--r--library/cpp/monlib/encode/json/json_encoder.cpp200
-rw-r--r--library/cpp/monlib/encode/json/json_ut.cpp446
-rw-r--r--library/cpp/monlib/encode/json/ut/expected.json30
-rw-r--r--library/cpp/monlib/encode/json/ut/expected_buffered.json24
-rw-r--r--library/cpp/monlib/encode/json/ut/expected_cloud.json74
-rw-r--r--library/cpp/monlib/encode/json/ut/expected_cloud_buffered.json184
-rw-r--r--library/cpp/monlib/encode/json/ut/ya.make6
-rw-r--r--library/cpp/monlib/encode/prometheus/prometheus.h4
-rw-r--r--library/cpp/monlib/encode/prometheus/prometheus_decoder.cpp16
-rw-r--r--library/cpp/monlib/encode/prometheus/prometheus_encoder.cpp14
-rw-r--r--library/cpp/monlib/encode/spack/spack_v1.h32
-rw-r--r--library/cpp/monlib/encode/spack/spack_v1_decoder.cpp38
-rw-r--r--library/cpp/monlib/encode/spack/spack_v1_encoder.cpp74
-rw-r--r--library/cpp/monlib/encode/spack/spack_v1_ut.cpp294
-rw-r--r--library/cpp/monlib/metrics/metric_value.h36
-rw-r--r--library/cpp/testing/benchmark/bench.h16
-rw-r--r--util/system/compiler.cpp6
-rw-r--r--util/system/compiler.h24
-rw-r--r--util/thread/pool.cpp2
-rw-r--r--ydb/core/driver_lib/run/run.cpp4
-rw-r--r--ydb/core/protos/config.proto26
-rw-r--r--ydb/core/yq/libs/pretty_printers/minikql_program_printer.cpp6
-rw-r--r--ydb/library/yql/minikql/arrow/arrow_defs.h46
-rw-r--r--ydb/library/yql/minikql/arrow/mkql_memory_pool.cpp100
-rw-r--r--ydb/library/yql/minikql/arrow/mkql_memory_pool.h20
-rw-r--r--ydb/library/yql/minikql/arrow/ya.make4
-rw-r--r--ydb/library/yql/minikql/comp_nodes/mkql_block_add.cpp294
-rw-r--r--ydb/library/yql/minikql/comp_nodes/mkql_block_add.h20
-rw-r--r--ydb/library/yql/minikql/comp_nodes/mkql_blocks.cpp600
-rw-r--r--ydb/library/yql/minikql/comp_nodes/mkql_blocks.h26
-rw-r--r--ydb/library/yql/minikql/comp_nodes/mkql_factory.cpp14
-rw-r--r--ydb/library/yql/minikql/comp_nodes/ut/mkql_blocks_ut.cpp644
-rw-r--r--ydb/library/yql/minikql/comp_nodes/ut/mkql_computation_node_ut.cpp2
-rw-r--r--ydb/library/yql/minikql/comp_nodes/ut/ya.make2
-rw-r--r--ydb/library/yql/minikql/comp_nodes/ya.make6
-rw-r--r--ydb/library/yql/minikql/computation/mkql_computation_node.cpp12
-rw-r--r--ydb/library/yql/minikql/computation/mkql_computation_node.h12
-rw-r--r--ydb/library/yql/minikql/computation/mkql_computation_node_graph.cpp22
-rw-r--r--ydb/library/yql/minikql/computation/mkql_computation_node_holders.cpp8
-rw-r--r--ydb/library/yql/minikql/computation/mkql_computation_node_holders.h64
-rw-r--r--ydb/library/yql/minikql/computation/ya.make2
-rw-r--r--ydb/library/yql/minikql/mkql_node.cpp88
-rw-r--r--ydb/library/yql/minikql/mkql_node.h80
-rw-r--r--ydb/library/yql/minikql/mkql_node_cast.cpp2
-rw-r--r--ydb/library/yql/minikql/mkql_node_printer.cpp60
-rw-r--r--ydb/library/yql/minikql/mkql_node_printer_ut.cpp4
-rw-r--r--ydb/library/yql/minikql/mkql_node_serialization.cpp96
-rw-r--r--ydb/library/yql/minikql/mkql_node_visitor.cpp28
-rw-r--r--ydb/library/yql/minikql/mkql_node_visitor.h8
-rw-r--r--ydb/library/yql/minikql/mkql_program_builder.cpp146
-rw-r--r--ydb/library/yql/minikql/mkql_program_builder.h16
-rw-r--r--ydb/library/yql/minikql/mkql_terminator.cpp6
-rw-r--r--ydb/library/yql/minikql/mkql_terminator.h2
-rw-r--r--ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/write_session.cpp10
-rw-r--r--ydb/public/tools/lib/cmds/__init__.py2
-rw-r--r--ydb/tests/library/harness/kikimr_config.py2
75 files changed, 2158 insertions, 2158 deletions
diff --git a/contrib/libs/grpc/src/core/ext/filters/client_channel/resolver/dns/c_ares/dns_resolver_ares.cc b/contrib/libs/grpc/src/core/ext/filters/client_channel/resolver/dns/c_ares/dns_resolver_ares.cc
index f31457b452..43d638ab3f 100644
--- a/contrib/libs/grpc/src/core/ext/filters/client_channel/resolver/dns/c_ares/dns_resolver_ares.cc
+++ b/contrib/libs/grpc/src/core/ext/filters/client_channel/resolver/dns/c_ares/dns_resolver_ares.cc
@@ -491,7 +491,7 @@ static bool should_use_ares(const char* resolver_env) {
static bool should_use_ares(const char* resolver_env) {
// TODO(lidiz): Remove the "g_custom_iomgr_enabled" flag once c-ares support
// custom IO managers (e.g. gevent).
- return !g_custom_iomgr_enabled && resolver_env != nullptr && (gpr_stricmp(resolver_env, "ares") == 0);
+ return !g_custom_iomgr_enabled && resolver_env != nullptr && (gpr_stricmp(resolver_env, "ares") == 0);
}
#endif /* GRPC_UV */
diff --git a/contrib/libs/grpc/src/core/ext/filters/client_channel/resolver/dns/native/dns_resolver.cc b/contrib/libs/grpc/src/core/ext/filters/client_channel/resolver/dns/native/dns_resolver.cc
index 269f60556a..a0e3566190 100644
--- a/contrib/libs/grpc/src/core/ext/filters/client_channel/resolver/dns/native/dns_resolver.cc
+++ b/contrib/libs/grpc/src/core/ext/filters/client_channel/resolver/dns/native/dns_resolver.cc
@@ -178,7 +178,7 @@ void NativeDnsResolver::OnResolvedLocked(grpc_error* error) {
if (shutdown_) {
if (addresses_ != nullptr) {
grpc_resolved_addresses_destroy(addresses_);
- }
+ }
Unref(DEBUG_LOCATION, "dns-resolving");
GRPC_ERROR_UNREF(error);
return;
diff --git a/contrib/libs/grpc/src/core/lib/surface/init.cc b/contrib/libs/grpc/src/core/lib/surface/init.cc
index 20fedce532..7b79ba426b 100644
--- a/contrib/libs/grpc/src/core/lib/surface/init.cc
+++ b/contrib/libs/grpc/src/core/lib/surface/init.cc
@@ -191,7 +191,7 @@ void grpc_shutdown_internal_locked(void) {
grpc_core::channelz::ChannelzRegistry::Shutdown();
grpc_stats_shutdown();
}
- grpc_core::Fork::GlobalShutdown();
+ grpc_core::Fork::GlobalShutdown();
grpc_core::ExecCtx::GlobalShutdown();
grpc_core::ApplicationCallbackExecCtx::GlobalShutdown();
g_shutting_down = false;
diff --git a/contrib/libs/grpc/src/cpp/server/server_builder.cc b/contrib/libs/grpc/src/cpp/server/server_builder.cc
index 8b846ba178..0cc00b365f 100644
--- a/contrib/libs/grpc/src/cpp/server/server_builder.cc
+++ b/contrib/libs/grpc/src/cpp/server/server_builder.cc
@@ -397,7 +397,7 @@ std::unique_ptr<grpc::Server> ServerBuilder::BuildAndStart() {
for (auto& port : ports_) {
int r = server->AddListeningPort(port.addr, port.creds.get());
if (!r) {
- server->Shutdown();
+ server->Shutdown();
return nullptr;
}
if (port.selected_port != nullptr) {
diff --git a/contrib/libs/libiconv/dynamic/libiconv.exports b/contrib/libs/libiconv/dynamic/libiconv.exports
index f09d45208f..c38a20d918 100644
--- a/contrib/libs/libiconv/dynamic/libiconv.exports
+++ b/contrib/libs/libiconv/dynamic/libiconv.exports
@@ -1,4 +1,4 @@
C libiconv
C libiconv_open
C libiconv_close
-C libiconvctl \ No newline at end of file
+C libiconvctl \ No newline at end of file
diff --git a/contrib/libs/tcmalloc/tcmalloc/static_vars.h b/contrib/libs/tcmalloc/tcmalloc/static_vars.h
index 537d83120e..be68edc189 100644
--- a/contrib/libs/tcmalloc/tcmalloc/static_vars.h
+++ b/contrib/libs/tcmalloc/tcmalloc/static_vars.h
@@ -125,7 +125,7 @@ class Static {
return cpu_cache_active_;
}
static void ActivateCPUCache() { cpu_cache_active_ = true; }
- static void DeactivateCPUCache() { cpu_cache_active_ = false; }
+ static void DeactivateCPUCache() { cpu_cache_active_ = false; }
static bool ForkSupportEnabled() { return fork_support_enabled_; }
static void EnableForkSupport() { fork_support_enabled_ = true; }
diff --git a/contrib/libs/yaml-cpp/include/yaml-cpp/node/iterator.h b/contrib/libs/yaml-cpp/include/yaml-cpp/node/iterator.h
index f4831a4335..6618169c53 100644
--- a/contrib/libs/yaml-cpp/include/yaml-cpp/node/iterator.h
+++ b/contrib/libs/yaml-cpp/include/yaml-cpp/node/iterator.h
@@ -17,21 +17,21 @@
namespace YAML {
namespace detail {
-struct node_pair: public std::pair<Node, Node> {
- node_pair() = default;
- node_pair(const Node& first, const Node& second)
- : std::pair<Node, Node>(first, second)
- {
- }
-};
-
-struct iterator_value : public Node, node_pair {
+struct node_pair: public std::pair<Node, Node> {
+ node_pair() = default;
+ node_pair(const Node& first, const Node& second)
+ : std::pair<Node, Node>(first, second)
+ {
+ }
+};
+
+struct iterator_value : public Node, node_pair {
iterator_value() {}
explicit iterator_value(const Node& rhs)
: Node(rhs),
- node_pair(Node(Node::ZombieNode), Node(Node::ZombieNode)) {}
+ node_pair(Node(Node::ZombieNode), Node(Node::ZombieNode)) {}
explicit iterator_value(const Node& key, const Node& value)
- : Node(Node::ZombieNode), node_pair(key, value) {}
+ : Node(Node::ZombieNode), node_pair(key, value) {}
};
}
}
diff --git a/library/cpp/actors/core/log.cpp b/library/cpp/actors/core/log.cpp
index 83ec0c5e43..5f63b5af58 100644
--- a/library/cpp/actors/core/log.cpp
+++ b/library/cpp/actors/core/log.cpp
@@ -715,26 +715,26 @@ namespace NActors {
}
};
- class TCompositeLogBackend: public TLogBackend {
- public:
- TCompositeLogBackend(TVector<TAutoPtr<TLogBackend>>&& underlyingBackends)
- : UnderlyingBackends(std::move(underlyingBackends))
- {
- }
-
- void WriteData(const TLogRecord& rec) override {
- for (auto& b: UnderlyingBackends) {
- b->WriteData(rec);
- }
- }
-
- void ReopenLog() override {
- }
-
- private:
- TVector<TAutoPtr<TLogBackend>> UnderlyingBackends;
- };
-
+ class TCompositeLogBackend: public TLogBackend {
+ public:
+ TCompositeLogBackend(TVector<TAutoPtr<TLogBackend>>&& underlyingBackends)
+ : UnderlyingBackends(std::move(underlyingBackends))
+ {
+ }
+
+ void WriteData(const TLogRecord& rec) override {
+ for (auto& b: UnderlyingBackends) {
+ b->WriteData(rec);
+ }
+ }
+
+ void ReopenLog() override {
+ }
+
+ private:
+ TVector<TAutoPtr<TLogBackend>> UnderlyingBackends;
+ };
+
TAutoPtr<TLogBackend> CreateStderrBackend() {
return new TStderrBackend();
}
@@ -747,7 +747,7 @@ namespace NActors {
return new TNullLogBackend();
}
- TAutoPtr<TLogBackend> CreateCompositeLogBackend(TVector<TAutoPtr<TLogBackend>>&& underlyingBackends) {
- return new TCompositeLogBackend(std::move(underlyingBackends));
- }
+ TAutoPtr<TLogBackend> CreateCompositeLogBackend(TVector<TAutoPtr<TLogBackend>>&& underlyingBackends) {
+ return new TCompositeLogBackend(std::move(underlyingBackends));
+ }
}
diff --git a/library/cpp/actors/core/log.h b/library/cpp/actors/core/log.h
index 8499990e80..c11a7cf3c1 100644
--- a/library/cpp/actors/core/log.h
+++ b/library/cpp/actors/core/log.h
@@ -299,7 +299,7 @@ namespace NActors {
TAutoPtr<TLogBackend> CreateStderrBackend();
TAutoPtr<TLogBackend> CreateFileBackend(const TString& fileName);
TAutoPtr<TLogBackend> CreateNullBackend();
- TAutoPtr<TLogBackend> CreateCompositeLogBackend(TVector<TAutoPtr<TLogBackend>>&& underlyingBackends);
+ TAutoPtr<TLogBackend> CreateCompositeLogBackend(TVector<TAutoPtr<TLogBackend>>&& underlyingBackends);
/////////////////////////////////////////////////////////////////////
// Logging adaptors for memory log and logging into filesystem
diff --git a/library/cpp/histogram/adaptive/multi_histogram.h b/library/cpp/histogram/adaptive/multi_histogram.h
index c9948f6733..41caac5ba6 100644
--- a/library/cpp/histogram/adaptive/multi_histogram.h
+++ b/library/cpp/histogram/adaptive/multi_histogram.h
@@ -64,7 +64,7 @@ namespace NKiwiAggr {
TVector<ui64> GetIds() const {
TVector<ui64> result(0);
- for (THistogramsMap::const_iterator it = Histograms.begin(); it != Histograms.end(); ++it) {
+ for (THistogramsMap::const_iterator it = Histograms.begin(); it != Histograms.end(); ++it) {
result.push_back(it->first);
}
return result;
diff --git a/library/cpp/monlib/counters/counters.h b/library/cpp/monlib/counters/counters.h
index 2180cf381d..038b55f0c8 100644
--- a/library/cpp/monlib/counters/counters.h
+++ b/library/cpp/monlib/counters/counters.h
@@ -112,10 +112,10 @@ namespace NMonitoring {
return AtomicGet(Value) == 0;
}
- TAtomic& GetAtomic() {
- return Value;
- }
-
+ TAtomic& GetAtomic() {
+ return Value;
+ }
+
private:
TAtomic Value;
bool Derivative;
diff --git a/library/cpp/monlib/dynamic_counters/encode.cpp b/library/cpp/monlib/dynamic_counters/encode.cpp
index f609c6160d..ffa48d276e 100644
--- a/library/cpp/monlib/dynamic_counters/encode.cpp
+++ b/library/cpp/monlib/dynamic_counters/encode.cpp
@@ -113,10 +113,10 @@ namespace NMonitoring {
}
}
- THolder<ICountableConsumer> AsCountableConsumer(IMetricEncoderPtr encoder, TCountableBase::EVisibility visibility) {
- return MakeHolder<TConsumer>(std::move(encoder), visibility);
- }
-
+ THolder<ICountableConsumer> AsCountableConsumer(IMetricEncoderPtr encoder, TCountableBase::EVisibility visibility) {
+ return MakeHolder<TConsumer>(std::move(encoder), visibility);
+ }
+
void ToJson(const TDynamicCounters& counters, IOutputStream* out) {
TConsumer consumer{EncoderJson(out), TCountableBase::EVisibility::Public};
counters.Accept(TString{}, TString{}, consumer);
diff --git a/library/cpp/monlib/dynamic_counters/encode.h b/library/cpp/monlib/dynamic_counters/encode.h
index 1a3b253518..c79964d7cb 100644
--- a/library/cpp/monlib/dynamic_counters/encode.h
+++ b/library/cpp/monlib/dynamic_counters/encode.h
@@ -2,7 +2,7 @@
#include "counters.h"
-#include <library/cpp/monlib/encode/encoder.h>
+#include <library/cpp/monlib/encode/encoder.h>
#include <library/cpp/monlib/encode/format.h>
namespace NMonitoring {
@@ -13,10 +13,10 @@ namespace NMonitoring {
TCountableBase::EVisibility visibility = TCountableBase::EVisibility::Public
);
- THolder<ICountableConsumer> AsCountableConsumer(
- NMonitoring::IMetricEncoderPtr encoder,
- TCountableBase::EVisibility visibility = TCountableBase::EVisibility::Public);
-
+ THolder<ICountableConsumer> AsCountableConsumer(
+ NMonitoring::IMetricEncoderPtr encoder,
+ TCountableBase::EVisibility visibility = TCountableBase::EVisibility::Public);
+
void ToJson(const TDynamicCounters& counters, IOutputStream* out);
TString ToJson(const TDynamicCounters& counters);
diff --git a/library/cpp/monlib/encode/buffered/buffered_encoder_base.cpp b/library/cpp/monlib/encode/buffered/buffered_encoder_base.cpp
index 4928fdf70f..87c832d642 100644
--- a/library/cpp/monlib/encode/buffered/buffered_encoder_base.cpp
+++ b/library/cpp/monlib/encode/buffered/buffered_encoder_base.cpp
@@ -1,8 +1,8 @@
#include "buffered_encoder_base.h"
-#include <util/string/join.h>
-#include <util/string/builder.h>
-
+#include <util/string/join.h>
+#include <util/string/builder.h>
+
namespace NMonitoring {
void TBufferedEncoderBase::OnStreamBegin() {
@@ -42,8 +42,8 @@ void TBufferedEncoderBase::OnMetricEnd() {
Y_ENSURE(existing.GetValueType() == metric.TimeSeries.GetValueType(),
"Time series point type mismatch: expected " << existing.GetValueType()
- << " but found " << metric.TimeSeries.GetValueType()
- << ", labels '" << FormatLabels(metric.Labels) << "'");
+ << " but found " << metric.TimeSeries.GetValueType()
+ << ", labels '" << FormatLabels(metric.Labels) << "'");
existing.CopyFrom(metric.TimeSeries);
Metrics_.pop_back();
@@ -144,27 +144,27 @@ void TBufferedEncoderBase::OnLogHistogram(TInstant time, TLogHistogramSnapshotPt
metric.TimeSeries.Add(time, s.Get());
}
-TString TBufferedEncoderBase::FormatLabels(const TPooledLabels& labels) const {
- auto formattedLabels = TVector<TString>(Reserve(labels.size() + CommonLabels_.size()));
- auto addLabel = [&](const TPooledLabel& l) {
- auto formattedLabel = TStringBuilder() << LabelNamesPool_.Get(l.Key) << '=' << LabelValuesPool_.Get(l.Value);
- formattedLabels.push_back(std::move(formattedLabel));
- };
-
- for (const auto& l: labels) {
- addLabel(l);
- }
- for (const auto& l: CommonLabels_) {
- const auto it = FindIf(labels, [&](const TPooledLabel& label) {
- return label.Key == l.Key;
- });
- if (it == labels.end()) {
- addLabel(l);
- }
- }
- Sort(formattedLabels);
-
- return TStringBuilder() << "{" << JoinSeq(", ", formattedLabels) << "}";
-}
-
+TString TBufferedEncoderBase::FormatLabels(const TPooledLabels& labels) const {
+ auto formattedLabels = TVector<TString>(Reserve(labels.size() + CommonLabels_.size()));
+ auto addLabel = [&](const TPooledLabel& l) {
+ auto formattedLabel = TStringBuilder() << LabelNamesPool_.Get(l.Key) << '=' << LabelValuesPool_.Get(l.Value);
+ formattedLabels.push_back(std::move(formattedLabel));
+ };
+
+ for (const auto& l: labels) {
+ addLabel(l);
+ }
+ for (const auto& l: CommonLabels_) {
+ const auto it = FindIf(labels, [&](const TPooledLabel& label) {
+ return label.Key == l.Key;
+ });
+ if (it == labels.end()) {
+ addLabel(l);
+ }
+ }
+ Sort(formattedLabels);
+
+ return TStringBuilder() << "{" << JoinSeq(", ", formattedLabels) << "}";
+}
+
} // namespace NMonitoring
diff --git a/library/cpp/monlib/encode/buffered/buffered_encoder_base.h b/library/cpp/monlib/encode/buffered/buffered_encoder_base.h
index 12f838d905..fe3714e58f 100644
--- a/library/cpp/monlib/encode/buffered/buffered_encoder_base.h
+++ b/library/cpp/monlib/encode/buffered/buffered_encoder_base.h
@@ -82,10 +82,10 @@ protected:
TMetricTimeSeries TimeSeries;
};
-protected:
- TString FormatLabels(const TPooledLabels& labels) const;
-
-protected:
+protected:
+ TString FormatLabels(const TPooledLabels& labels) const;
+
+protected:
TEncoderState State_;
TStringPoolBuilder LabelNamesPool_;
diff --git a/library/cpp/monlib/encode/buffered/string_pool.h b/library/cpp/monlib/encode/buffered/string_pool.h
index f07d050729..00e5644608 100644
--- a/library/cpp/monlib/encode/buffered/string_pool.h
+++ b/library/cpp/monlib/encode/buffered/string_pool.h
@@ -37,10 +37,10 @@ namespace NMonitoring {
return StrVector_.at(index).first;
}
- TStringBuf Get(const TValue* value) const {
- return StrVector_.at(value->Index).first;
- }
-
+ TStringBuf Get(const TValue* value) const {
+ return StrVector_.at(value->Index).first;
+ }
+
template <typename TConsumer>
void ForEach(TConsumer&& c) {
Y_ENSURE(IsBuilt_, "Pool must be sorted first");
diff --git a/library/cpp/monlib/encode/json/json.h b/library/cpp/monlib/encode/json/json.h
index f6968c0016..21530f20c3 100644
--- a/library/cpp/monlib/encode/json/json.h
+++ b/library/cpp/monlib/encode/json/json.h
@@ -16,14 +16,14 @@ namespace NMonitoring {
/// Buffered encoder will merge series with same labels into one.
IMetricEncoderPtr BufferedEncoderJson(IOutputStream* out, int indentation = 0);
- IMetricEncoderPtr EncoderCloudJson(IOutputStream* out,
- int indentation = 0,
- TStringBuf metricNameLabel = "name");
-
- IMetricEncoderPtr BufferedEncoderCloudJson(IOutputStream* out,
- int indentation = 0,
- TStringBuf metricNameLabel = "name");
-
+ IMetricEncoderPtr EncoderCloudJson(IOutputStream* out,
+ int indentation = 0,
+ TStringBuf metricNameLabel = "name");
+
+ IMetricEncoderPtr BufferedEncoderCloudJson(IOutputStream* out,
+ int indentation = 0,
+ TStringBuf metricNameLabel = "name");
+
void DecodeJson(TStringBuf data, IMetricConsumer* c, TStringBuf metricNameLabel = "name");
}
diff --git a/library/cpp/monlib/encode/json/json_decoder.cpp b/library/cpp/monlib/encode/json/json_decoder.cpp
index 8420b93aab..d44ff5fd28 100644
--- a/library/cpp/monlib/encode/json/json_decoder.cpp
+++ b/library/cpp/monlib/encode/json/json_decoder.cpp
@@ -52,7 +52,7 @@ public:
Bounds_.clear();
Values_.clear();
- InfPresented_ = false;
+ InfPresented_ = false;
return snapshot;
}
diff --git a/library/cpp/monlib/encode/json/json_decoder_ut.cpp b/library/cpp/monlib/encode/json/json_decoder_ut.cpp
index 52c34e4f37..4464e1d26a 100644
--- a/library/cpp/monlib/encode/json/json_decoder_ut.cpp
+++ b/library/cpp/monlib/encode/json/json_decoder_ut.cpp
@@ -120,60 +120,60 @@ Y_UNIT_TEST_SUITE(TJsonDecoderTest) {
ValidateCommonParts(std::move(commonParts), true, true);
ValidateMetrics(collector.Metrics);
}
-
- Y_UNIT_TEST(CanParseHistogramsWithInf) {
- const char* metricsData = R"({
-"metrics":
- [
- {
- "hist": {
- "bounds": [
- 10
- ],
- "buckets": [
- 11
- ],
- "inf": 12
- },
- "name":"s1",
- "type": "HIST_RATE"
- },
- {
- "hist": {
- "bounds": [
- 20
- ],
- "buckets": [
- 21
- ]
- },
- "name":"s2",
- "type":"HIST_RATE"
- }
- ]
-})";
- TCollectingConsumer consumer(false);
- DecodeJson(metricsData, &consumer);
-
- UNIT_ASSERT_VALUES_EQUAL(consumer.Metrics.size(), 2);
- {
- const auto& m = consumer.Metrics[0];
- UNIT_ASSERT_VALUES_EQUAL(m.Kind, EMetricType::HIST_RATE);
- UNIT_ASSERT_VALUES_EQUAL(m.Values->Size(), 1);
- const auto* histogram = (*m.Values)[0].GetValue().AsHistogram();
- UNIT_ASSERT_VALUES_EQUAL(histogram->Count(), 2);
- UNIT_ASSERT_VALUES_EQUAL(histogram->UpperBound(1), Max<TBucketBound>());
- UNIT_ASSERT_VALUES_EQUAL(histogram->Value(0), 11);
- UNIT_ASSERT_VALUES_EQUAL(histogram->Value(1), 12);
- }
- {
- const auto& m = consumer.Metrics[1];
- UNIT_ASSERT_VALUES_EQUAL(m.Kind, EMetricType::HIST_RATE);
- UNIT_ASSERT_VALUES_EQUAL(m.Values->Size(), 1);
- const auto* histogram = (*m.Values)[0].GetValue().AsHistogram();
- UNIT_ASSERT_VALUES_EQUAL(histogram->Count(), 1);
- UNIT_ASSERT_VALUES_EQUAL(histogram->UpperBound(0), 20);
- UNIT_ASSERT_VALUES_EQUAL(histogram->Value(0), 21);
- }
- }
+
+ Y_UNIT_TEST(CanParseHistogramsWithInf) {
+ const char* metricsData = R"({
+"metrics":
+ [
+ {
+ "hist": {
+ "bounds": [
+ 10
+ ],
+ "buckets": [
+ 11
+ ],
+ "inf": 12
+ },
+ "name":"s1",
+ "type": "HIST_RATE"
+ },
+ {
+ "hist": {
+ "bounds": [
+ 20
+ ],
+ "buckets": [
+ 21
+ ]
+ },
+ "name":"s2",
+ "type":"HIST_RATE"
+ }
+ ]
+})";
+ TCollectingConsumer consumer(false);
+ DecodeJson(metricsData, &consumer);
+
+ UNIT_ASSERT_VALUES_EQUAL(consumer.Metrics.size(), 2);
+ {
+ const auto& m = consumer.Metrics[0];
+ UNIT_ASSERT_VALUES_EQUAL(m.Kind, EMetricType::HIST_RATE);
+ UNIT_ASSERT_VALUES_EQUAL(m.Values->Size(), 1);
+ const auto* histogram = (*m.Values)[0].GetValue().AsHistogram();
+ UNIT_ASSERT_VALUES_EQUAL(histogram->Count(), 2);
+ UNIT_ASSERT_VALUES_EQUAL(histogram->UpperBound(1), Max<TBucketBound>());
+ UNIT_ASSERT_VALUES_EQUAL(histogram->Value(0), 11);
+ UNIT_ASSERT_VALUES_EQUAL(histogram->Value(1), 12);
+ }
+ {
+ const auto& m = consumer.Metrics[1];
+ UNIT_ASSERT_VALUES_EQUAL(m.Kind, EMetricType::HIST_RATE);
+ UNIT_ASSERT_VALUES_EQUAL(m.Values->Size(), 1);
+ const auto* histogram = (*m.Values)[0].GetValue().AsHistogram();
+ UNIT_ASSERT_VALUES_EQUAL(histogram->Count(), 1);
+ UNIT_ASSERT_VALUES_EQUAL(histogram->UpperBound(0), 20);
+ UNIT_ASSERT_VALUES_EQUAL(histogram->Value(0), 21);
+ }
+ }
}
diff --git a/library/cpp/monlib/encode/json/json_encoder.cpp b/library/cpp/monlib/encode/json/json_encoder.cpp
index fb14ffdf07..20d2bb6283 100644
--- a/library/cpp/monlib/encode/json/json_encoder.cpp
+++ b/library/cpp/monlib/encode/json/json_encoder.cpp
@@ -14,21 +14,21 @@
namespace NMonitoring {
namespace {
- enum class EJsonStyle {
- Solomon,
- Cloud
- };
-
+ enum class EJsonStyle {
+ Solomon,
+ Cloud
+ };
+
///////////////////////////////////////////////////////////////////////
// TJsonWriter
///////////////////////////////////////////////////////////////////////
class TJsonWriter {
public:
- TJsonWriter(IOutputStream* out, int indentation, EJsonStyle style, TStringBuf metricNameLabel)
+ TJsonWriter(IOutputStream* out, int indentation, EJsonStyle style, TStringBuf metricNameLabel)
: Buf_(NJsonWriter::HEM_UNSAFE, out)
- , Style_(style)
- , MetricNameLabel_(metricNameLabel)
- , CurrentMetricName_()
+ , Style_(style)
+ , MetricNameLabel_(metricNameLabel)
+ , CurrentMetricName_()
{
Buf_.SetIndentSpaces(indentation);
Buf_.SetWriteNanAsString();
@@ -37,11 +37,11 @@ namespace NMonitoring {
void WriteTime(TInstant time) {
if (time != TInstant::Zero()) {
Buf_.WriteKey(TStringBuf("ts"));
- if (Style_ == EJsonStyle::Solomon) {
- Buf_.WriteULongLong(time.Seconds());
- } else {
- Buf_.WriteString(time.ToString());
- }
+ if (Style_ == EJsonStyle::Solomon) {
+ Buf_.WriteULongLong(time.Seconds());
+ } else {
+ Buf_.WriteString(time.ToString());
+ }
}
}
@@ -61,8 +61,8 @@ namespace NMonitoring {
}
void WriteValue(IHistogramSnapshot* s) {
- Y_ENSURE(Style_ == EJsonStyle::Solomon);
-
+ Y_ENSURE(Style_ == EJsonStyle::Solomon);
+
Buf_.WriteKey(TStringBuf("hist"));
Buf_.BeginObject();
if (ui32 count = s->Count()) {
@@ -94,8 +94,8 @@ namespace NMonitoring {
}
void WriteValue(ISummaryDoubleSnapshot* s) {
- Y_ENSURE(Style_ == EJsonStyle::Solomon);
-
+ Y_ENSURE(Style_ == EJsonStyle::Solomon);
+
Buf_.WriteKey(TStringBuf("summary"));
Buf_.BeginObject();
@@ -118,8 +118,8 @@ namespace NMonitoring {
}
void WriteValue(TLogHistogramSnapshot* s) {
- Y_ENSURE(Style_ == EJsonStyle::Solomon);
-
+ Y_ENSURE(Style_ == EJsonStyle::Solomon);
+
Buf_.WriteKey(TStringBuf("log_hist"));
Buf_.BeginObject();
@@ -169,64 +169,64 @@ namespace NMonitoring {
break;
case EMetricValueType::UNKNOWN:
- ythrow yexception() << "unknown metric value type";
+ ythrow yexception() << "unknown metric value type";
}
}
void WriteLabel(TStringBuf name, TStringBuf value) {
Y_ENSURE(IsUtf(name), "label name is not valid UTF-8 string");
Y_ENSURE(IsUtf(value), "label value is not valid UTF-8 string");
- if (Style_ == EJsonStyle::Cloud && name == MetricNameLabel_) {
- CurrentMetricName_ = value;
- } else {
- Buf_.WriteKey(name);
- Buf_.WriteString(value);
- }
- }
-
- void WriteMetricType(EMetricType type) {
- if (Style_ == EJsonStyle::Cloud) {
- Buf_.WriteKey("type");
- Buf_.WriteString(MetricTypeToCloudStr(type));
- } else {
- Buf_.WriteKey("kind");
- Buf_.WriteString(MetricTypeToStr(type));
- }
- }
-
- void WriteName() {
- if (Style_ != EJsonStyle::Cloud) {
- return;
- }
- if (CurrentMetricName_.Empty()) {
- ythrow yexception() << "label '" << MetricNameLabel_ << "' is not defined";
- }
- Buf_.WriteKey("name");
- Buf_.WriteString(CurrentMetricName_);
- CurrentMetricName_.clear();
- }
-
- private:
- static TStringBuf MetricTypeToCloudStr(EMetricType type) {
- switch (type) {
- case EMetricType::GAUGE:
- return TStringBuf("DGAUGE");
- case EMetricType::COUNTER:
- return TStringBuf("COUNTER");
- case EMetricType::RATE:
- return TStringBuf("RATE");
- case EMetricType::IGAUGE:
- return TStringBuf("IGAUGE");
- default:
- ythrow yexception() << "metric type '" << type << "' is not supported by cloud json format";
- }
- }
-
+ if (Style_ == EJsonStyle::Cloud && name == MetricNameLabel_) {
+ CurrentMetricName_ = value;
+ } else {
+ Buf_.WriteKey(name);
+ Buf_.WriteString(value);
+ }
+ }
+
+ void WriteMetricType(EMetricType type) {
+ if (Style_ == EJsonStyle::Cloud) {
+ Buf_.WriteKey("type");
+ Buf_.WriteString(MetricTypeToCloudStr(type));
+ } else {
+ Buf_.WriteKey("kind");
+ Buf_.WriteString(MetricTypeToStr(type));
+ }
+ }
+
+ void WriteName() {
+ if (Style_ != EJsonStyle::Cloud) {
+ return;
+ }
+ if (CurrentMetricName_.Empty()) {
+ ythrow yexception() << "label '" << MetricNameLabel_ << "' is not defined";
+ }
+ Buf_.WriteKey("name");
+ Buf_.WriteString(CurrentMetricName_);
+ CurrentMetricName_.clear();
+ }
+
+ private:
+ static TStringBuf MetricTypeToCloudStr(EMetricType type) {
+ switch (type) {
+ case EMetricType::GAUGE:
+ return TStringBuf("DGAUGE");
+ case EMetricType::COUNTER:
+ return TStringBuf("COUNTER");
+ case EMetricType::RATE:
+ return TStringBuf("RATE");
+ case EMetricType::IGAUGE:
+ return TStringBuf("IGAUGE");
+ default:
+ ythrow yexception() << "metric type '" << type << "' is not supported by cloud json format";
+ }
+ }
+
protected:
NJsonWriter::TBuf Buf_;
- EJsonStyle Style_;
- TString MetricNameLabel_;
- TString CurrentMetricName_;
+ EJsonStyle Style_;
+ TString MetricNameLabel_;
+ TString CurrentMetricName_;
};
///////////////////////////////////////////////////////////////////////
@@ -234,8 +234,8 @@ namespace NMonitoring {
///////////////////////////////////////////////////////////////////////
class TEncoderJson final: public IMetricEncoder, public TJsonWriter {
public:
- TEncoderJson(IOutputStream* out, int indentation, EJsonStyle style, TStringBuf metricNameLabel)
- : TJsonWriter{out, indentation, style, metricNameLabel}
+ TEncoderJson(IOutputStream* out, int indentation, EJsonStyle style, TStringBuf metricNameLabel)
+ : TJsonWriter{out, indentation, style, metricNameLabel}
{
}
@@ -267,11 +267,11 @@ namespace NMonitoring {
State_.Switch(TEncoderState::EState::ROOT, TEncoderState::EState::METRIC);
if (Buf_.KeyExpected()) {
// first metric, so open metrics array
- Buf_.WriteKey(TStringBuf(Style_ == EJsonStyle::Solomon ? "sensors" : "metrics"));
+ Buf_.WriteKey(TStringBuf(Style_ == EJsonStyle::Solomon ? "sensors" : "metrics"));
Buf_.BeginList();
}
Buf_.BeginObject();
- WriteMetricType(type);
+ WriteMetricType(type);
}
void OnMetricEnd() override {
@@ -300,7 +300,7 @@ namespace NMonitoring {
}
if (State_ == TEncoderState::EState::ROOT) {
State_ = TEncoderState::EState::COMMON_LABELS;
- Buf_.WriteKey(TStringBuf(Style_ == EJsonStyle::Solomon ? "commonLabels" : "labels"));
+ Buf_.WriteKey(TStringBuf(Style_ == EJsonStyle::Solomon ? "commonLabels" : "labels"));
} else if (State_ == TEncoderState::EState::METRIC) {
State_ = TEncoderState::EState::METRIC_LABELS;
Buf_.WriteKey(TStringBuf("labels"));
@@ -323,9 +323,9 @@ namespace NMonitoring {
Y_ENSURE(!EmptyLabels_, "Labels cannot be empty");
Buf_.EndObject();
- if (State_ == TEncoderState::EState::METRIC) {
- WriteName();
- }
+ if (State_ == TEncoderState::EState::METRIC) {
+ WriteName();
+ }
}
void OnLabel(TStringBuf name, TStringBuf value) override {
@@ -420,8 +420,8 @@ namespace NMonitoring {
///////////////////////////////////////////////////////////////////////
class TBufferedJsonEncoder : public TBufferedEncoderBase, public TJsonWriter {
public:
- TBufferedJsonEncoder(IOutputStream* out, int indentation, EJsonStyle style, TStringBuf metricNameLabel)
- : TJsonWriter{out, indentation, style, metricNameLabel}
+ TBufferedJsonEncoder(IOutputStream* out, int indentation, EJsonStyle style, TStringBuf metricNameLabel)
+ : TJsonWriter{out, indentation, style, metricNameLabel}
{
MetricsMergingMode_ = EMetricsMergingMode::MERGE_METRICS;
}
@@ -464,12 +464,12 @@ namespace NMonitoring {
WriteTime(CommonTime_);
if (CommonLabels_.size() > 0) {
- Buf_.WriteKey(TStringBuf(Style_ == EJsonStyle::Solomon ? "commonLabels": "labels"));
- WriteLabels(CommonLabels_, true);
+ Buf_.WriteKey(TStringBuf(Style_ == EJsonStyle::Solomon ? "commonLabels": "labels"));
+ WriteLabels(CommonLabels_, true);
}
if (Metrics_.size() > 0) {
- Buf_.WriteKey(TStringBuf(Style_ == EJsonStyle::Solomon ? "sensors" : "metrics"));
+ Buf_.WriteKey(TStringBuf(Style_ == EJsonStyle::Solomon ? "sensors" : "metrics"));
WriteMetrics();
}
@@ -488,10 +488,10 @@ namespace NMonitoring {
void WriteMetric(TMetric& metric) {
Buf_.BeginObject();
- WriteMetricType(metric.MetricType);
+ WriteMetricType(metric.MetricType);
Buf_.WriteKey(TStringBuf("labels"));
- WriteLabels(metric.Labels, false);
+ WriteLabels(metric.Labels, false);
metric.TimeSeries.SortByTs();
if (metric.TimeSeries.Size() == 1) {
@@ -515,7 +515,7 @@ namespace NMonitoring {
Buf_.EndObject();
}
- void WriteLabels(const TPooledLabels& labels, bool isCommon) {
+ void WriteLabels(const TPooledLabels& labels, bool isCommon) {
Buf_.BeginObject();
for (auto i = 0u; i < labels.size(); ++i) {
@@ -526,10 +526,10 @@ namespace NMonitoring {
}
Buf_.EndObject();
-
- if (!isCommon) {
- WriteName();
- }
+
+ if (!isCommon) {
+ WriteName();
+ }
}
private:
@@ -539,18 +539,18 @@ namespace NMonitoring {
}
IMetricEncoderPtr EncoderJson(IOutputStream* out, int indentation) {
- return MakeHolder<TEncoderJson>(out, indentation, EJsonStyle::Solomon, "");
+ return MakeHolder<TEncoderJson>(out, indentation, EJsonStyle::Solomon, "");
}
IMetricEncoderPtr BufferedEncoderJson(IOutputStream* out, int indentation) {
- return MakeHolder<TBufferedJsonEncoder>(out, indentation, EJsonStyle::Solomon, "");
+ return MakeHolder<TBufferedJsonEncoder>(out, indentation, EJsonStyle::Solomon, "");
+ }
+
+ IMetricEncoderPtr EncoderCloudJson(IOutputStream* out, int indentation, TStringBuf metricNameLabel) {
+ return MakeHolder<TEncoderJson>(out, indentation, EJsonStyle::Cloud, metricNameLabel);
+ }
+
+ IMetricEncoderPtr BufferedEncoderCloudJson(IOutputStream* out, int indentation, TStringBuf metricNameLabel) {
+ return MakeHolder<TBufferedJsonEncoder>(out, indentation, EJsonStyle::Cloud, metricNameLabel);
}
-
- IMetricEncoderPtr EncoderCloudJson(IOutputStream* out, int indentation, TStringBuf metricNameLabel) {
- return MakeHolder<TEncoderJson>(out, indentation, EJsonStyle::Cloud, metricNameLabel);
- }
-
- IMetricEncoderPtr BufferedEncoderCloudJson(IOutputStream* out, int indentation, TStringBuf metricNameLabel) {
- return MakeHolder<TBufferedJsonEncoder>(out, indentation, EJsonStyle::Cloud, metricNameLabel);
- }
}
diff --git a/library/cpp/monlib/encode/json/json_ut.cpp b/library/cpp/monlib/encode/json/json_ut.cpp
index 9bd38e5fc5..09e7909289 100644
--- a/library/cpp/monlib/encode/json/json_ut.cpp
+++ b/library/cpp/monlib/encode/json/json_ut.cpp
@@ -8,7 +8,7 @@
#include <library/cpp/testing/unittest/registar.h>
#include <util/stream/str.h>
-#include <util/string/builder.h>
+#include <util/string/builder.h>
#include <limits>
@@ -136,171 +136,171 @@ Y_UNIT_TEST_SUITE(TJsonTest) {
const TInstant now = TInstant::ParseIso8601Deprecated("2017-11-05T01:02:03Z");
Y_UNIT_TEST(Encode) {
- auto check = [](bool cloud, bool buffered, TStringBuf expectedResourceKey) {
- TString json;
- TStringOutput out(json);
- auto e = cloud
- ? (buffered ? BufferedEncoderCloudJson(&out, 2, "metric") : EncoderCloudJson(&out, 2, "metric"))
- : (buffered ? BufferedEncoderJson(&out, 2) : EncoderJson(&out, 2));
- e->OnStreamBegin();
- { // common time
- e->OnCommonTime(TInstant::Seconds(1500000000));
+ auto check = [](bool cloud, bool buffered, TStringBuf expectedResourceKey) {
+ TString json;
+ TStringOutput out(json);
+ auto e = cloud
+ ? (buffered ? BufferedEncoderCloudJson(&out, 2, "metric") : EncoderCloudJson(&out, 2, "metric"))
+ : (buffered ? BufferedEncoderJson(&out, 2) : EncoderJson(&out, 2));
+ e->OnStreamBegin();
+ { // common time
+ e->OnCommonTime(TInstant::Seconds(1500000000));
}
- { // common labels
+ { // common labels
e->OnLabelsBegin();
- e->OnLabel("project", "solomon");
+ e->OnLabel("project", "solomon");
e->OnLabelsEnd();
}
- { // metric #1
- e->OnMetricBegin(EMetricType::COUNTER);
- {
- e->OnLabelsBegin();
- e->OnLabel("metric", "single");
- e->OnLabel("labels", "l1");
- e->OnLabelsEnd();
- }
- e->OnUint64(now, 17);
- e->OnMetricEnd();
+ { // metric #1
+ e->OnMetricBegin(EMetricType::COUNTER);
+ {
+ e->OnLabelsBegin();
+ e->OnLabel("metric", "single");
+ e->OnLabel("labels", "l1");
+ e->OnLabelsEnd();
+ }
+ e->OnUint64(now, 17);
+ e->OnMetricEnd();
+ }
+ { // metric #2
+ e->OnMetricBegin(EMetricType::RATE);
+ {
+ e->OnLabelsBegin();
+ e->OnLabel("metric", "single");
+ e->OnLabel("labels", "l2");
+ e->OnLabelsEnd();
+ }
+ e->OnUint64(now, 17);
+ e->OnMetricEnd();
+ }
+ { // metric #3
+ e->OnMetricBegin(EMetricType::GAUGE);
+ e->OnDouble(now, 3.14);
+ {
+ e->OnLabelsBegin();
+ e->OnLabel("metric", "single");
+ e->OnLabel("labels", "l3");
+ e->OnLabelsEnd();
+ }
+ e->OnMetricEnd();
+ }
+ { // metric #4
+ e->OnMetricBegin(EMetricType::IGAUGE);
+ e->OnInt64(now, 42);
+ {
+ e->OnLabelsBegin();
+ e->OnLabel("metric", "single_igauge");
+ e->OnLabel("labels", "l4");
+ e->OnLabelsEnd();
+ }
+ e->OnMetricEnd();
}
- { // metric #2
- e->OnMetricBegin(EMetricType::RATE);
- {
- e->OnLabelsBegin();
- e->OnLabel("metric", "single");
- e->OnLabel("labels", "l2");
- e->OnLabelsEnd();
- }
- e->OnUint64(now, 17);
- e->OnMetricEnd();
+ { // metric #5
+ e->OnMetricBegin(EMetricType::GAUGE);
+ {
+ e->OnLabelsBegin();
+ e->OnLabel("metric", "multiple");
+ e->OnLabel("labels", "l5");
+ e->OnLabelsEnd();
+ }
+ e->OnDouble(now, std::numeric_limits<double>::quiet_NaN());
+ e->OnDouble(now + TDuration::Seconds(15), std::numeric_limits<double>::infinity());
+ e->OnDouble(now + TDuration::Seconds(30), -std::numeric_limits<double>::infinity());
+ e->OnMetricEnd();
}
- { // metric #3
- e->OnMetricBegin(EMetricType::GAUGE);
- e->OnDouble(now, 3.14);
- {
- e->OnLabelsBegin();
- e->OnLabel("metric", "single");
- e->OnLabel("labels", "l3");
- e->OnLabelsEnd();
- }
- e->OnMetricEnd();
- }
- { // metric #4
- e->OnMetricBegin(EMetricType::IGAUGE);
- e->OnInt64(now, 42);
- {
- e->OnLabelsBegin();
- e->OnLabel("metric", "single_igauge");
- e->OnLabel("labels", "l4");
- e->OnLabelsEnd();
- }
- e->OnMetricEnd();
- }
- { // metric #5
- e->OnMetricBegin(EMetricType::GAUGE);
- {
- e->OnLabelsBegin();
- e->OnLabel("metric", "multiple");
- e->OnLabel("labels", "l5");
- e->OnLabelsEnd();
- }
- e->OnDouble(now, std::numeric_limits<double>::quiet_NaN());
- e->OnDouble(now + TDuration::Seconds(15), std::numeric_limits<double>::infinity());
- e->OnDouble(now + TDuration::Seconds(30), -std::numeric_limits<double>::infinity());
- e->OnMetricEnd();
- }
-
- { // metric #6
- e->OnMetricBegin(EMetricType::COUNTER);
- e->OnUint64(now, 1337);
- e->OnUint64(now + TDuration::Seconds(15), 1338);
- {
- e->OnLabelsBegin();
- e->OnLabel("metric", "multiple");
- e->OnLabel("labels", "l6");
- e->OnLabelsEnd();
- }
- e->OnMetricEnd();
- }
- e->OnStreamEnd();
- e->Close();
- json += "\n";
-
+
+ { // metric #6
+ e->OnMetricBegin(EMetricType::COUNTER);
+ e->OnUint64(now, 1337);
+ e->OnUint64(now + TDuration::Seconds(15), 1338);
+ {
+ e->OnLabelsBegin();
+ e->OnLabel("metric", "multiple");
+ e->OnLabel("labels", "l6");
+ e->OnLabelsEnd();
+ }
+ e->OnMetricEnd();
+ }
+ e->OnStreamEnd();
+ e->Close();
+ json += "\n";
+
auto parseJson = [] (auto buf) {
NJson::TJsonValue value;
NJson::ReadJsonTree(buf, &value, true);
return value;
};
- const auto expectedJson = NResource::Find(expectedResourceKey);
+ const auto expectedJson = NResource::Find(expectedResourceKey);
UNIT_ASSERT_EQUAL(parseJson(json), parseJson(expectedJson));
- };
-
- check(false, false, "/expected.json");
- check(false, true, "/expected_buffered.json");
- check(true, false, "/expected_cloud.json");
- check(true, true, "/expected_cloud_buffered.json");
- }
-
- TLogHistogramSnapshotPtr TestLogHistogram(ui32 v = 1) {
- TVector<double> buckets{0.5 * v, 0.25 * v, 0.25 * v, 0.5 * v};
- return MakeIntrusive<TLogHistogramSnapshot>(1.5, 1u, 0, std::move(buckets));
- }
-
- Y_UNIT_TEST(HistogramAndSummaryMetricTypesAreNotSupportedByCloudJson) {
- const TInstant now = TInstant::ParseIso8601Deprecated("2017-11-05T01:02:03Z");
-
- auto emit = [&](IMetricEncoder* encoder, EMetricType metricType) {
- encoder->OnStreamBegin();
+ };
+
+ check(false, false, "/expected.json");
+ check(false, true, "/expected_buffered.json");
+ check(true, false, "/expected_cloud.json");
+ check(true, true, "/expected_cloud_buffered.json");
+ }
+
+ TLogHistogramSnapshotPtr TestLogHistogram(ui32 v = 1) {
+ TVector<double> buckets{0.5 * v, 0.25 * v, 0.25 * v, 0.5 * v};
+ return MakeIntrusive<TLogHistogramSnapshot>(1.5, 1u, 0, std::move(buckets));
+ }
+
+ Y_UNIT_TEST(HistogramAndSummaryMetricTypesAreNotSupportedByCloudJson) {
+ const TInstant now = TInstant::ParseIso8601Deprecated("2017-11-05T01:02:03Z");
+
+ auto emit = [&](IMetricEncoder* encoder, EMetricType metricType) {
+ encoder->OnStreamBegin();
{
- encoder->OnMetricBegin(metricType);
- {
- encoder->OnLabelsBegin();
- encoder->OnLabel("name", "m");
- encoder->OnLabelsEnd();
- }
-
- switch (metricType) {
- case EMetricType::HIST: {
- auto histogram = ExponentialHistogram(6, 2);
- encoder->OnHistogram(now, histogram->Snapshot());
- break;
- }
- case EMetricType::LOGHIST: {
- auto histogram = TestLogHistogram();
- encoder->OnLogHistogram(now, histogram);
- break;
- }
- case EMetricType::DSUMMARY: {
- auto summary = MakeIntrusive<TSummaryDoubleSnapshot>(10., -0.5, 0.5, 0.3, 30u);
- encoder->OnSummaryDouble(now, summary);
- break;
- }
- default:
- Y_FAIL("unexpected metric type [%s]", ToString(metricType).c_str());
- }
-
- encoder->OnMetricEnd();
+ encoder->OnMetricBegin(metricType);
+ {
+ encoder->OnLabelsBegin();
+ encoder->OnLabel("name", "m");
+ encoder->OnLabelsEnd();
+ }
+
+ switch (metricType) {
+ case EMetricType::HIST: {
+ auto histogram = ExponentialHistogram(6, 2);
+ encoder->OnHistogram(now, histogram->Snapshot());
+ break;
+ }
+ case EMetricType::LOGHIST: {
+ auto histogram = TestLogHistogram();
+ encoder->OnLogHistogram(now, histogram);
+ break;
+ }
+ case EMetricType::DSUMMARY: {
+ auto summary = MakeIntrusive<TSummaryDoubleSnapshot>(10., -0.5, 0.5, 0.3, 30u);
+ encoder->OnSummaryDouble(now, summary);
+ break;
+ }
+ default:
+ Y_FAIL("unexpected metric type [%s]", ToString(metricType).c_str());
+ }
+
+ encoder->OnMetricEnd();
}
- encoder->OnStreamEnd();
- encoder->Close();
- };
-
- auto doTest = [&](bool buffered, EMetricType metricType) {
- TString json;
- TStringOutput out(json);
- auto encoder = buffered ? BufferedEncoderCloudJson(&out, 2) : EncoderCloudJson(&out, 2);
- const TString expectedMessage = TStringBuilder()
- << "metric type '" << metricType << "' is not supported by cloud json format";
- UNIT_ASSERT_EXCEPTION_CONTAINS_C(emit(encoder.Get(), metricType), yexception, expectedMessage,
- TString("buffered: ") + ToString(buffered));
- };
-
- doTest(false, EMetricType::HIST);
- doTest(false, EMetricType::LOGHIST);
- doTest(false, EMetricType::DSUMMARY);
- doTest(true, EMetricType::HIST);
- doTest(true, EMetricType::LOGHIST);
- doTest(true, EMetricType::DSUMMARY);
+ encoder->OnStreamEnd();
+ encoder->Close();
+ };
+
+ auto doTest = [&](bool buffered, EMetricType metricType) {
+ TString json;
+ TStringOutput out(json);
+ auto encoder = buffered ? BufferedEncoderCloudJson(&out, 2) : EncoderCloudJson(&out, 2);
+ const TString expectedMessage = TStringBuilder()
+ << "metric type '" << metricType << "' is not supported by cloud json format";
+ UNIT_ASSERT_EXCEPTION_CONTAINS_C(emit(encoder.Get(), metricType), yexception, expectedMessage,
+ TString("buffered: ") + ToString(buffered));
+ };
+
+ doTest(false, EMetricType::HIST);
+ doTest(false, EMetricType::LOGHIST);
+ doTest(false, EMetricType::DSUMMARY);
+ doTest(true, EMetricType::HIST);
+ doTest(true, EMetricType::LOGHIST);
+ doTest(true, EMetricType::DSUMMARY);
}
Y_UNIT_TEST(MetricsWithDifferentLabelOrderGetMerged) {
@@ -369,13 +369,13 @@ Y_UNIT_TEST_SUITE(TJsonTest) {
UNIT_ASSERT_VALUES_EQUAL(samples.CommonLabelsSize(), 1);
AssertLabelEqual(samples.GetCommonLabels(0), "project", "solomon");
- UNIT_ASSERT_VALUES_EQUAL(samples.SamplesSize(), 6);
+ UNIT_ASSERT_VALUES_EQUAL(samples.SamplesSize(), 6);
{
const NProto::TMultiSample& s = samples.GetSamples(0);
UNIT_ASSERT_EQUAL(s.GetMetricType(), NProto::COUNTER);
UNIT_ASSERT_VALUES_EQUAL(s.LabelsSize(), 2);
AssertLabelEqual(s.GetLabels(0), "metric", "single");
- AssertLabelEqual(s.GetLabels(1), "labels", "l1");
+ AssertLabelEqual(s.GetLabels(1), "labels", "l1");
UNIT_ASSERT_VALUES_EQUAL(s.PointsSize(), 1);
AssertPointEqual(s.GetPoints(0), now, ui64(17));
@@ -385,7 +385,7 @@ Y_UNIT_TEST_SUITE(TJsonTest) {
UNIT_ASSERT_EQUAL(s.GetMetricType(), NProto::RATE);
UNIT_ASSERT_VALUES_EQUAL(s.LabelsSize(), 2);
AssertLabelEqual(s.GetLabels(0), "metric", "single");
- AssertLabelEqual(s.GetLabels(1), "labels", "l2");
+ AssertLabelEqual(s.GetLabels(1), "labels", "l2");
UNIT_ASSERT_VALUES_EQUAL(s.PointsSize(), 1);
AssertPointEqual(s.GetPoints(0), now, ui64(17));
@@ -395,27 +395,27 @@ Y_UNIT_TEST_SUITE(TJsonTest) {
UNIT_ASSERT_EQUAL(s.GetMetricType(), NProto::GAUGE);
UNIT_ASSERT_VALUES_EQUAL(s.LabelsSize(), 2);
AssertLabelEqual(s.GetLabels(0), "metric", "single");
- AssertLabelEqual(s.GetLabels(1), "labels", "l3");
+ AssertLabelEqual(s.GetLabels(1), "labels", "l3");
UNIT_ASSERT_VALUES_EQUAL(s.PointsSize(), 1);
AssertPointEqual(s.GetPoints(0), now, 3.14);
}
{
const NProto::TMultiSample& s = samples.GetSamples(3);
- UNIT_ASSERT_EQUAL(s.GetMetricType(), NProto::IGAUGE);
- UNIT_ASSERT_VALUES_EQUAL(s.LabelsSize(), 2);
- AssertLabelEqual(s.GetLabels(0), "metric", "single_igauge");
- AssertLabelEqual(s.GetLabels(1), "labels", "l4");
-
- UNIT_ASSERT_VALUES_EQUAL(s.PointsSize(), 1);
- AssertPointEqual(s.GetPoints(0), now, i64(42));
- }
- {
- const NProto::TMultiSample& s = samples.GetSamples(4);
+ UNIT_ASSERT_EQUAL(s.GetMetricType(), NProto::IGAUGE);
+ UNIT_ASSERT_VALUES_EQUAL(s.LabelsSize(), 2);
+ AssertLabelEqual(s.GetLabels(0), "metric", "single_igauge");
+ AssertLabelEqual(s.GetLabels(1), "labels", "l4");
+
+ UNIT_ASSERT_VALUES_EQUAL(s.PointsSize(), 1);
+ AssertPointEqual(s.GetPoints(0), now, i64(42));
+ }
+ {
+ const NProto::TMultiSample& s = samples.GetSamples(4);
UNIT_ASSERT_EQUAL(s.GetMetricType(), NProto::GAUGE);
UNIT_ASSERT_VALUES_EQUAL(s.LabelsSize(), 2);
AssertLabelEqual(s.GetLabels(0), "metric", "multiple");
- AssertLabelEqual(s.GetLabels(1), "labels", "l5");
+ AssertLabelEqual(s.GetLabels(1), "labels", "l5");
UNIT_ASSERT_VALUES_EQUAL(s.PointsSize(), 3);
AssertPointEqualNan(s.GetPoints(0), now);
@@ -423,11 +423,11 @@ Y_UNIT_TEST_SUITE(TJsonTest) {
AssertPointEqualInf(s.GetPoints(2), now + TDuration::Seconds(30), -11);
}
{
- const NProto::TMultiSample& s = samples.GetSamples(5);
+ const NProto::TMultiSample& s = samples.GetSamples(5);
UNIT_ASSERT_EQUAL(s.GetMetricType(), NProto::COUNTER);
UNIT_ASSERT_VALUES_EQUAL(s.LabelsSize(), 2);
AssertLabelEqual(s.GetLabels(0), "metric", "multiple");
- AssertLabelEqual(s.GetLabels(1), "labels", "l6");
+ AssertLabelEqual(s.GetLabels(1), "labels", "l6");
UNIT_ASSERT_VALUES_EQUAL(s.PointsSize(), 2);
AssertPointEqual(s.GetPoints(0), now, ui64(1337));
@@ -1158,61 +1158,61 @@ Y_UNIT_TEST_SUITE(TJsonTest) {
UNIT_ASSERT_NO_DIFF(result2, NResource::Find("/int_gauge.json"));
}
- Y_UNIT_TEST(InconsistentMetricTypes) {
- auto emitMetrics = [](IMetricEncoder& encoder, const TString& expectedError) {
- encoder.OnMetricBegin(EMetricType::GAUGE);
- {
- encoder.OnLabelsBegin();
- encoder.OnLabel("name", "m");
- encoder.OnLabel("l1", "v1");
- encoder.OnLabel("l2", "v2");
- encoder.OnLabelsEnd();
- }
- encoder.OnDouble(now, 1.0);
- encoder.OnMetricEnd();
-
- encoder.OnMetricBegin(EMetricType::COUNTER);
- {
- encoder.OnLabelsBegin();
- encoder.OnLabel("name", "m");
- encoder.OnLabel("l1", "v1");
- encoder.OnLabel("l2", "v2");
- encoder.OnLabelsEnd();
- }
- encoder.OnUint64(now, 1);
-
- UNIT_ASSERT_EXCEPTION_CONTAINS(encoder.OnMetricEnd(),
- yexception,
- expectedError);
- };
-
- {
- TStringStream out;
- auto encoder = BufferedEncoderJson(&out);
-
- encoder->OnStreamBegin();
- encoder->OnLabelsBegin();
- encoder->OnLabel("c", "cv");
- encoder->OnLabelsEnd();
- emitMetrics(*encoder,
- "Time series point type mismatch: expected DOUBLE but found UINT64, "
- "labels '{c=cv, l1=v1, l2=v2, name=m}'");
- }
-
- {
- TStringStream out;
- auto encoder = BufferedEncoderJson(&out);
-
- encoder->OnStreamBegin();
- encoder->OnLabelsBegin();
- encoder->OnLabel("l1", "v100");
- encoder->OnLabelsEnd();
- emitMetrics(*encoder,
- "Time series point type mismatch: expected DOUBLE but found UINT64, "
- "labels '{l1=v1, l2=v2, name=m}'");
- }
- }
-
+ Y_UNIT_TEST(InconsistentMetricTypes) {
+ auto emitMetrics = [](IMetricEncoder& encoder, const TString& expectedError) {
+ encoder.OnMetricBegin(EMetricType::GAUGE);
+ {
+ encoder.OnLabelsBegin();
+ encoder.OnLabel("name", "m");
+ encoder.OnLabel("l1", "v1");
+ encoder.OnLabel("l2", "v2");
+ encoder.OnLabelsEnd();
+ }
+ encoder.OnDouble(now, 1.0);
+ encoder.OnMetricEnd();
+
+ encoder.OnMetricBegin(EMetricType::COUNTER);
+ {
+ encoder.OnLabelsBegin();
+ encoder.OnLabel("name", "m");
+ encoder.OnLabel("l1", "v1");
+ encoder.OnLabel("l2", "v2");
+ encoder.OnLabelsEnd();
+ }
+ encoder.OnUint64(now, 1);
+
+ UNIT_ASSERT_EXCEPTION_CONTAINS(encoder.OnMetricEnd(),
+ yexception,
+ expectedError);
+ };
+
+ {
+ TStringStream out;
+ auto encoder = BufferedEncoderJson(&out);
+
+ encoder->OnStreamBegin();
+ encoder->OnLabelsBegin();
+ encoder->OnLabel("c", "cv");
+ encoder->OnLabelsEnd();
+ emitMetrics(*encoder,
+ "Time series point type mismatch: expected DOUBLE but found UINT64, "
+ "labels '{c=cv, l1=v1, l2=v2, name=m}'");
+ }
+
+ {
+ TStringStream out;
+ auto encoder = BufferedEncoderJson(&out);
+
+ encoder->OnStreamBegin();
+ encoder->OnLabelsBegin();
+ encoder->OnLabel("l1", "v100");
+ encoder->OnLabelsEnd();
+ emitMetrics(*encoder,
+ "Time series point type mismatch: expected DOUBLE but found UINT64, "
+ "labels '{l1=v1, l2=v2, name=m}'");
+ }
+ }
+
Y_UNIT_TEST(IntGaugeDecode) {
NProto::TMultiSamplesList samples;
{
diff --git a/library/cpp/monlib/encode/json/ut/expected.json b/library/cpp/monlib/encode/json/ut/expected.json
index 30c405b0bd..ead853455b 100644
--- a/library/cpp/monlib/encode/json/ut/expected.json
+++ b/library/cpp/monlib/encode/json/ut/expected.json
@@ -11,7 +11,7 @@
"labels":
{
"metric":"single",
- "labels":"l1"
+ "labels":"l1"
},
"ts":1509843723,
"value":17
@@ -21,7 +21,7 @@
"labels":
{
"metric":"single",
- "labels":"l2"
+ "labels":"l2"
},
"ts":1509843723,
"value":17
@@ -31,27 +31,27 @@
"labels":
{
"metric":"single",
- "labels":"l3"
+ "labels":"l3"
},
"ts":1509843723,
"value":3.14
},
{
- "kind":"IGAUGE",
- "labels":
- {
- "metric":"single_igauge",
- "labels":"l4"
- },
- "ts":1509843723,
- "value":42
- },
- {
+ "kind":"IGAUGE",
+ "labels":
+ {
+ "metric":"single_igauge",
+ "labels":"l4"
+ },
+ "ts":1509843723,
+ "value":42
+ },
+ {
"kind":"GAUGE",
"labels":
{
"metric":"multiple",
- "labels":"l5"
+ "labels":"l5"
},
"timeseries":
[
@@ -85,7 +85,7 @@
"labels":
{
"metric":"multiple",
- "labels":"l6"
+ "labels":"l6"
}
}
]
diff --git a/library/cpp/monlib/encode/json/ut/expected_buffered.json b/library/cpp/monlib/encode/json/ut/expected_buffered.json
index 0a33842437..9a6a1d6201 100644
--- a/library/cpp/monlib/encode/json/ut/expected_buffered.json
+++ b/library/cpp/monlib/encode/json/ut/expected_buffered.json
@@ -37,16 +37,16 @@
"value":3.14
},
{
- "kind":"IGAUGE",
- "labels":
- {
+ "kind":"IGAUGE",
+ "labels":
+ {
"labels":"l4",
"metric":"single_igauge"
- },
- "ts":1509843723,
- "value":42
- },
- {
+ },
+ "ts":1509843723,
+ "value":42
+ },
+ {
"kind":"GAUGE",
"labels":
{
@@ -71,11 +71,11 @@
},
{
"kind":"COUNTER",
- "labels":
- {
+ "labels":
+ {
"labels":"l6",
"metric":"multiple"
- },
+ },
"timeseries":
[
{
@@ -86,7 +86,7 @@
"ts":1509843738,
"value":1338
}
- ]
+ ]
}
]
}
diff --git a/library/cpp/monlib/encode/json/ut/expected_cloud.json b/library/cpp/monlib/encode/json/ut/expected_cloud.json
index 78e90ec3be..6184811579 100644
--- a/library/cpp/monlib/encode/json/ut/expected_cloud.json
+++ b/library/cpp/monlib/encode/json/ut/expected_cloud.json
@@ -1,92 +1,92 @@
{
- "ts":"2017-07-14T02:40:00.000000Z",
- "labels":
+ "ts":"2017-07-14T02:40:00.000000Z",
+ "labels":
{
"project":"solomon"
},
- "metrics":
+ "metrics":
[
{
- "type":"COUNTER",
+ "type":"COUNTER",
"labels":
{
- "labels":"l1"
+ "labels":"l1"
},
- "name":"single",
- "ts":"2017-11-05T01:02:03.000000Z",
+ "name":"single",
+ "ts":"2017-11-05T01:02:03.000000Z",
"value":17
},
{
- "type":"RATE",
+ "type":"RATE",
"labels":
{
- "labels":"l2"
+ "labels":"l2"
},
- "name":"single",
- "ts":"2017-11-05T01:02:03.000000Z",
+ "name":"single",
+ "ts":"2017-11-05T01:02:03.000000Z",
"value":17
},
{
- "type":"DGAUGE",
+ "type":"DGAUGE",
"labels":
{
- "labels":"l3"
+ "labels":"l3"
},
- "name":"single",
- "ts":"2017-11-05T01:02:03.000000Z",
+ "name":"single",
+ "ts":"2017-11-05T01:02:03.000000Z",
"value":3.14
},
{
- "type":"IGAUGE",
+ "type":"IGAUGE",
"labels":
{
- "labels":"l4"
+ "labels":"l4"
},
- "name":"single_igauge",
- "ts":"2017-11-05T01:02:03.000000Z",
- "value":42
- },
- {
- "type":"DGAUGE",
- "labels":
- {
- "labels":"l5"
- },
- "name":"multiple",
+ "name":"single_igauge",
+ "ts":"2017-11-05T01:02:03.000000Z",
+ "value":42
+ },
+ {
+ "type":"DGAUGE",
+ "labels":
+ {
+ "labels":"l5"
+ },
+ "name":"multiple",
"timeseries":
[
{
- "ts":"2017-11-05T01:02:03.000000Z",
+ "ts":"2017-11-05T01:02:03.000000Z",
"value":"nan"
},
{
- "ts":"2017-11-05T01:02:18.000000Z",
+ "ts":"2017-11-05T01:02:18.000000Z",
"value":"inf"
},
{
- "ts":"2017-11-05T01:02:33.000000Z",
+ "ts":"2017-11-05T01:02:33.000000Z",
"value":"-inf"
}
]
},
{
- "type":"COUNTER",
+ "type":"COUNTER",
"timeseries":
[
{
- "ts":"2017-11-05T01:02:03.000000Z",
+ "ts":"2017-11-05T01:02:03.000000Z",
"value":1337
},
{
- "ts":"2017-11-05T01:02:18.000000Z",
+ "ts":"2017-11-05T01:02:18.000000Z",
"value":1338
}
],
"labels":
{
- "labels":"l6"
- },
- "name":"multiple"
+ "labels":"l6"
+ },
+ "name":"multiple"
}
]
}
diff --git a/library/cpp/monlib/encode/json/ut/expected_cloud_buffered.json b/library/cpp/monlib/encode/json/ut/expected_cloud_buffered.json
index 5b7aeb836e..be237d522b 100644
--- a/library/cpp/monlib/encode/json/ut/expected_cloud_buffered.json
+++ b/library/cpp/monlib/encode/json/ut/expected_cloud_buffered.json
@@ -1,92 +1,92 @@
-{
- "ts":"2017-07-14T02:40:00.000000Z",
- "labels":
- {
- "project":"solomon"
- },
- "metrics":
- [
- {
- "type":"COUNTER",
- "labels":
- {
- "labels":"l1"
- },
- "name":"single",
- "ts":"2017-11-05T01:02:03.000000Z",
- "value":17
- },
- {
- "type":"RATE",
- "labels":
- {
- "labels":"l2"
- },
- "name":"single",
- "ts":"2017-11-05T01:02:03.000000Z",
- "value":17
- },
- {
- "type":"DGAUGE",
- "labels":
- {
- "labels":"l3"
- },
- "name":"single",
- "ts":"2017-11-05T01:02:03.000000Z",
- "value":3.14
- },
- {
- "type":"IGAUGE",
- "labels":
- {
- "labels":"l4"
- },
- "name":"single_igauge",
- "ts":"2017-11-05T01:02:03.000000Z",
- "value":42
- },
- {
- "type":"DGAUGE",
- "labels":
- {
- "labels":"l5"
- },
- "name":"multiple",
- "timeseries":
- [
- {
- "ts":"2017-11-05T01:02:03.000000Z",
- "value":"nan"
- },
- {
- "ts":"2017-11-05T01:02:18.000000Z",
- "value":"inf"
- },
- {
- "ts":"2017-11-05T01:02:33.000000Z",
- "value":"-inf"
- }
- ]
- },
- {
- "type":"COUNTER",
- "labels":
- {
- "labels":"l6"
- },
- "name":"multiple",
- "timeseries":
- [
- {
- "ts":"2017-11-05T01:02:03.000000Z",
- "value":1337
- },
- {
- "ts":"2017-11-05T01:02:18.000000Z",
- "value":1338
- }
- ]
- }
- ]
-}
+{
+ "ts":"2017-07-14T02:40:00.000000Z",
+ "labels":
+ {
+ "project":"solomon"
+ },
+ "metrics":
+ [
+ {
+ "type":"COUNTER",
+ "labels":
+ {
+ "labels":"l1"
+ },
+ "name":"single",
+ "ts":"2017-11-05T01:02:03.000000Z",
+ "value":17
+ },
+ {
+ "type":"RATE",
+ "labels":
+ {
+ "labels":"l2"
+ },
+ "name":"single",
+ "ts":"2017-11-05T01:02:03.000000Z",
+ "value":17
+ },
+ {
+ "type":"DGAUGE",
+ "labels":
+ {
+ "labels":"l3"
+ },
+ "name":"single",
+ "ts":"2017-11-05T01:02:03.000000Z",
+ "value":3.14
+ },
+ {
+ "type":"IGAUGE",
+ "labels":
+ {
+ "labels":"l4"
+ },
+ "name":"single_igauge",
+ "ts":"2017-11-05T01:02:03.000000Z",
+ "value":42
+ },
+ {
+ "type":"DGAUGE",
+ "labels":
+ {
+ "labels":"l5"
+ },
+ "name":"multiple",
+ "timeseries":
+ [
+ {
+ "ts":"2017-11-05T01:02:03.000000Z",
+ "value":"nan"
+ },
+ {
+ "ts":"2017-11-05T01:02:18.000000Z",
+ "value":"inf"
+ },
+ {
+ "ts":"2017-11-05T01:02:33.000000Z",
+ "value":"-inf"
+ }
+ ]
+ },
+ {
+ "type":"COUNTER",
+ "labels":
+ {
+ "labels":"l6"
+ },
+ "name":"multiple",
+ "timeseries":
+ [
+ {
+ "ts":"2017-11-05T01:02:03.000000Z",
+ "value":1337
+ },
+ {
+ "ts":"2017-11-05T01:02:18.000000Z",
+ "value":1338
+ }
+ ]
+ }
+ ]
+}
diff --git a/library/cpp/monlib/encode/json/ut/ya.make b/library/cpp/monlib/encode/json/ut/ya.make
index 851443ae62..e50c4f4903 100644
--- a/library/cpp/monlib/encode/json/ut/ya.make
+++ b/library/cpp/monlib/encode/json/ut/ya.make
@@ -15,9 +15,9 @@ RESOURCE(
buffered_ts_merge.json /buffered_ts_merge.json
empty_series.json /empty_series.json
expected.json /expected.json
- expected_buffered.json /expected_buffered.json
- expected_cloud.json /expected_cloud.json
- expected_cloud_buffered.json /expected_cloud_buffered.json
+ expected_buffered.json /expected_buffered.json
+ expected_cloud.json /expected_cloud.json
+ expected_cloud_buffered.json /expected_cloud_buffered.json
merged.json /merged.json
histogram_timeseries.json /histogram_timeseries.json
histogram_value.json /histogram_value.json
diff --git a/library/cpp/monlib/encode/prometheus/prometheus.h b/library/cpp/monlib/encode/prometheus/prometheus.h
index 21248f5fef..2e7fa31c28 100644
--- a/library/cpp/monlib/encode/prometheus/prometheus.h
+++ b/library/cpp/monlib/encode/prometheus/prometheus.h
@@ -11,8 +11,8 @@ namespace NMonitoring {
class TPrometheusDecodeException: public yexception {
};
- IMetricEncoderPtr EncoderPrometheus(IOutputStream* out, TStringBuf metricNameLabel = "sensor");
+ IMetricEncoderPtr EncoderPrometheus(IOutputStream* out, TStringBuf metricNameLabel = "sensor");
- void DecodePrometheus(TStringBuf data, IMetricConsumer* c, TStringBuf metricNameLabel = "sensor");
+ void DecodePrometheus(TStringBuf data, IMetricConsumer* c, TStringBuf metricNameLabel = "sensor");
}
diff --git a/library/cpp/monlib/encode/prometheus/prometheus_decoder.cpp b/library/cpp/monlib/encode/prometheus/prometheus_decoder.cpp
index 5dc0bc8033..7e81357dbd 100644
--- a/library/cpp/monlib/encode/prometheus/prometheus_decoder.cpp
+++ b/library/cpp/monlib/encode/prometheus/prometheus_decoder.cpp
@@ -168,10 +168,10 @@ namespace NMonitoring {
///////////////////////////////////////////////////////////////////////
class TPrometheusReader {
public:
- TPrometheusReader(TStringBuf data, IMetricConsumer* c, TStringBuf metricNameLabel)
+ TPrometheusReader(TStringBuf data, IMetricConsumer* c, TStringBuf metricNameLabel)
: Data_(data)
, Consumer_(c)
- , MetricNameLabel_(metricNameLabel)
+ , MetricNameLabel_(metricNameLabel)
{
}
@@ -516,12 +516,12 @@ namespace NMonitoring {
}
void ConsumeLabels(TStringBuf name, const TLabelsMap& labels) {
- Y_PARSER_ENSURE(labels.count(MetricNameLabel_) == 0,
- "label name '" << MetricNameLabel_ <<
+ Y_PARSER_ENSURE(labels.count(MetricNameLabel_) == 0,
+ "label name '" << MetricNameLabel_ <<
"' is reserved, but is used with metric: " << name << LabelsToStr(labels));
Consumer_->OnLabelsBegin();
- Consumer_->OnLabel(MetricNameLabel_, TString(name)); // TODO: remove this string allocation
+ Consumer_->OnLabel(MetricNameLabel_, TString(name)); // TODO: remove this string allocation
for (const auto& it: labels) {
Consumer_->OnLabel(it.first, it.second);
}
@@ -579,7 +579,7 @@ namespace NMonitoring {
private:
TStringBuf Data_;
IMetricConsumer* Consumer_;
- TStringBuf MetricNameLabel_;
+ TStringBuf MetricNameLabel_;
THashMap<TString, EPrometheusMetricType> SeenTypes_;
THistogramBuilder HistogramBuilder_;
@@ -589,8 +589,8 @@ namespace NMonitoring {
};
} // namespace
-void DecodePrometheus(TStringBuf data, IMetricConsumer* c, TStringBuf metricNameLabel) {
- TPrometheusReader reader(data, c, metricNameLabel);
+void DecodePrometheus(TStringBuf data, IMetricConsumer* c, TStringBuf metricNameLabel) {
+ TPrometheusReader reader(data, c, metricNameLabel);
reader.Read();
}
diff --git a/library/cpp/monlib/encode/prometheus/prometheus_encoder.cpp b/library/cpp/monlib/encode/prometheus/prometheus_encoder.cpp
index c370fd2b6e..15efeb8c03 100644
--- a/library/cpp/monlib/encode/prometheus/prometheus_encoder.cpp
+++ b/library/cpp/monlib/encode/prometheus/prometheus_encoder.cpp
@@ -249,9 +249,9 @@ namespace NMonitoring {
///////////////////////////////////////////////////////////////////////
class TPrometheusEncoder final: public IMetricEncoder {
public:
- explicit TPrometheusEncoder(IOutputStream* out, TStringBuf metricNameLabel)
+ explicit TPrometheusEncoder(IOutputStream* out, TStringBuf metricNameLabel)
: Writer_(out)
- , MetricNameLabel_(metricNameLabel)
+ , MetricNameLabel_(metricNameLabel)
{
}
@@ -358,10 +358,10 @@ namespace NMonitoring {
MetricState_.Labels.Add(l.Name(), l.Value());
}
- TMaybe<TLabel> nameLabel = MetricState_.Labels.Extract(MetricNameLabel_);
+ TMaybe<TLabel> nameLabel = MetricState_.Labels.Extract(MetricNameLabel_);
Y_ENSURE(nameLabel,
"labels " << MetricState_.Labels <<
- " does not contain label '" << MetricNameLabel_ << '\'');
+ " does not contain label '" << MetricNameLabel_ << '\'');
const TString& metricName = ToString(nameLabel->Value());
if (MetricState_.Type != EMetricType::DSUMMARY) {
@@ -399,15 +399,15 @@ namespace NMonitoring {
private:
TEncoderState State_;
TPrometheusWriter Writer_;
- TString MetricNameLabel_;
+ TString MetricNameLabel_;
TInstant CommonTime_ = TInstant::Zero();
TLabels CommonLabels_;
TMetricState MetricState_;
};
}
- IMetricEncoderPtr EncoderPrometheus(IOutputStream* out, TStringBuf metricNameLabel) {
- return MakeHolder<TPrometheusEncoder>(out, metricNameLabel);
+ IMetricEncoderPtr EncoderPrometheus(IOutputStream* out, TStringBuf metricNameLabel) {
+ return MakeHolder<TPrometheusEncoder>(out, metricNameLabel);
}
} // namespace NMonitoring
diff --git a/library/cpp/monlib/encode/spack/spack_v1.h b/library/cpp/monlib/encode/spack/spack_v1.h
index e962d21467..cf1c9417b9 100644
--- a/library/cpp/monlib/encode/spack/spack_v1.h
+++ b/library/cpp/monlib/encode/spack/spack_v1.h
@@ -78,7 +78,7 @@ namespace NMonitoring {
///////////////////////////////////////////////////////////////////////////////
struct Y_PACKED TSpackHeader {
ui16 Magic = 0x5053; // "SP"
- ui16 Version; // MSB - major version, LSB - minor version
+ ui16 Version; // MSB - major version, LSB - minor version
ui16 HeaderSize = sizeof(TSpackHeader);
ui8 TimePrecision;
ui8 Compression;
@@ -89,12 +89,12 @@ namespace NMonitoring {
// add new fields here
};
- enum ESpackV1Version: ui16 {
- SV1_00 = 0x0100,
- SV1_01 = 0x0101,
- SV1_02 = 0x0102
- };
-
+ enum ESpackV1Version: ui16 {
+ SV1_00 = 0x0100,
+ SV1_01 = 0x0101,
+ SV1_02 = 0x0102
+ };
+
IMetricEncoderPtr EncoderSpackV1(
IOutputStream* out,
ETimePrecision timePrecision,
@@ -102,14 +102,14 @@ namespace NMonitoring {
EMetricsMergingMode mergingMode = EMetricsMergingMode::DEFAULT
);
- IMetricEncoderPtr EncoderSpackV12(
- IOutputStream* out,
- ETimePrecision timePrecision,
- ECompression compression,
- EMetricsMergingMode mergingMode = EMetricsMergingMode::DEFAULT,
- TStringBuf metricNameLabel = "name"
- );
+ IMetricEncoderPtr EncoderSpackV12(
+ IOutputStream* out,
+ ETimePrecision timePrecision,
+ ECompression compression,
+ EMetricsMergingMode mergingMode = EMetricsMergingMode::DEFAULT,
+ TStringBuf metricNameLabel = "name"
+ );
+
+ void DecodeSpackV1(IInputStream* in, IMetricConsumer* c, TStringBuf metricNameLabel = "name");
- void DecodeSpackV1(IInputStream* in, IMetricConsumer* c, TStringBuf metricNameLabel = "name");
-
}
diff --git a/library/cpp/monlib/encode/spack/spack_v1_decoder.cpp b/library/cpp/monlib/encode/spack/spack_v1_decoder.cpp
index 330e81a7a2..1f445fc80d 100644
--- a/library/cpp/monlib/encode/spack/spack_v1_decoder.cpp
+++ b/library/cpp/monlib/encode/spack/spack_v1_decoder.cpp
@@ -27,9 +27,9 @@ namespace NMonitoring {
///////////////////////////////////////////////////////////////////////
class TDecoderSpackV1 {
public:
- TDecoderSpackV1(IInputStream* in, TStringBuf metricNameLabel)
+ TDecoderSpackV1(IInputStream* in, TStringBuf metricNameLabel)
: In_(in)
- , MetricNameLabel_(metricNameLabel)
+ , MetricNameLabel_(metricNameLabel)
{
}
@@ -83,9 +83,9 @@ namespace NMonitoring {
// (4) read common labels
if (ui32 commonLabelsCount = ReadVarint()) {
- c->OnLabelsBegin();
+ c->OnLabelsBegin();
ReadLabels(labelNames, labelValues, commonLabelsCount, c);
- c->OnLabelsEnd();
+ c->OnLabelsEnd();
}
// (5) read metrics
@@ -110,20 +110,20 @@ namespace NMonitoring {
// TODO: use it
ReadFixed<ui8>(); // skip flags byte
- auto metricNameValueIndex = std::numeric_limits<ui32>::max();
- if (Header_.Version >= SV1_02) {
- metricNameValueIndex = ReadVarint();
- }
-
+ auto metricNameValueIndex = std::numeric_limits<ui32>::max();
+ if (Header_.Version >= SV1_02) {
+ metricNameValueIndex = ReadVarint();
+ }
+
// (5.2) labels
ui32 labelsCount = ReadVarint();
- DECODE_ENSURE(Header_.Version >= SV1_02 || labelsCount > 0, "metric #" << i << " has no labels");
- c->OnLabelsBegin();
- if (Header_.Version >= SV1_02) {
- c->OnLabel(MetricNameLabel_, labelValues.Get(metricNameValueIndex));
- }
+ DECODE_ENSURE(Header_.Version >= SV1_02 || labelsCount > 0, "metric #" << i << " has no labels");
+ c->OnLabelsBegin();
+ if (Header_.Version >= SV1_02) {
+ c->OnLabel(MetricNameLabel_, labelValues.Get(metricNameValueIndex));
+ }
ReadLabels(labelNames, labelValues, labelsCount, c);
- c->OnLabelsEnd();
+ c->OnLabelsEnd();
// (5.3) values
switch (valueType) {
@@ -214,7 +214,7 @@ namespace NMonitoring {
ui32 bucketsCount = ReadVarint();
auto s = TExplicitHistogramSnapshot::New(bucketsCount);
- if (SV1_00 == Header_.Version) { // v1.0
+ if (SV1_00 == Header_.Version) { // v1.0
for (ui32 i = 0; i < bucketsCount; i++) {
i64 bound = ReadFixed<i64>();
double doubleBound = (bound != Max<i64>())
@@ -275,7 +275,7 @@ namespace NMonitoring {
private:
IInputStream* In_;
- TString MetricNameLabel_;
+ TString MetricNameLabel_;
ETimePrecision TimePrecision_;
TSpackHeader Header_;
}; // class TDecoderSpackV1
@@ -450,8 +450,8 @@ namespace NMonitoring {
}
}
- void DecodeSpackV1(IInputStream* in, IMetricConsumer* c, TStringBuf metricNameLabel) {
- TDecoderSpackV1 decoder(in, metricNameLabel);
+ void DecodeSpackV1(IInputStream* in, IMetricConsumer* c, TStringBuf metricNameLabel) {
+ TDecoderSpackV1 decoder(in, metricNameLabel);
decoder.Decode(c);
}
diff --git a/library/cpp/monlib/encode/spack/spack_v1_encoder.cpp b/library/cpp/monlib/encode/spack/spack_v1_encoder.cpp
index 3e7a2fc7ea..a2b0bb5f50 100644
--- a/library/cpp/monlib/encode/spack/spack_v1_encoder.cpp
+++ b/library/cpp/monlib/encode/spack/spack_v1_encoder.cpp
@@ -23,15 +23,15 @@ namespace NMonitoring {
IOutputStream* out,
ETimePrecision timePrecision,
ECompression compression,
- EMetricsMergingMode mergingMode,
- ESpackV1Version version,
- TStringBuf metricNameLabel
+ EMetricsMergingMode mergingMode,
+ ESpackV1Version version,
+ TStringBuf metricNameLabel
)
: Out_(out)
, TimePrecision_(timePrecision)
, Compression_(compression)
- , Version_(version)
- , MetricName_(Version_ >= SV1_02 ? LabelNamesPool_.PutIfAbsent(metricNameLabel) : nullptr)
+ , Version_(version)
+ , MetricName_(Version_ >= SV1_02 ? LabelNamesPool_.PutIfAbsent(metricNameLabel) : nullptr)
{
MetricsMergingMode_ = mergingMode;
@@ -89,7 +89,7 @@ namespace NMonitoring {
// (1) write header
TSpackHeader header;
- header.Version = Version_;
+ header.Version = Version_;
header.TimePrecision = EncodeTimePrecision(TimePrecision_);
header.Compression = EncodeCompression(Compression_);
header.LabelNamesSize = static_cast<ui32>(
@@ -119,7 +119,7 @@ namespace NMonitoring {
WriteTime(CommonTime_);
// (4) write common labels' indexes
- WriteLabels(CommonLabels_, nullptr);
+ WriteLabels(CommonLabels_, nullptr);
// (5) write metrics
// metrics count already written in header
@@ -132,19 +132,19 @@ namespace NMonitoring {
ui8 flagsByte = 0x00;
Out_->Write(&flagsByte, sizeof(flagsByte));
- // v1.2 format addition — metric name
- if (Version_ >= SV1_02) {
- const auto it = FindIf(metric.Labels, [&](const auto& l) {
- return l.Key == MetricName_;
- });
- Y_ENSURE(it != metric.Labels.end(),
- "metric name label '" << LabelNamesPool_.Get(MetricName_->Index) << "' not found, "
- << "all metric labels '" << FormatLabels(metric.Labels) << "'");
- WriteVarUInt32(Out_, it->Value->Index);
- }
-
+ // v1.2 format addition — metric name
+ if (Version_ >= SV1_02) {
+ const auto it = FindIf(metric.Labels, [&](const auto& l) {
+ return l.Key == MetricName_;
+ });
+ Y_ENSURE(it != metric.Labels.end(),
+ "metric name label '" << LabelNamesPool_.Get(MetricName_->Index) << "' not found, "
+ << "all metric labels '" << FormatLabels(metric.Labels) << "'");
+ WriteVarUInt32(Out_, it->Value->Index);
+ }
+
// (5.2) labels
- WriteLabels(metric.Labels, MetricName_);
+ WriteLabels(metric.Labels, MetricName_);
// (5.3) values
switch (metric.TimeSeries.Size()) {
@@ -190,12 +190,12 @@ namespace NMonitoring {
return (static_cast<ui8>(metric.MetricType) << 2) | static_cast<ui8>(valueType);
}
- void WriteLabels(const TPooledLabels& labels, const TPooledStr* skipKey) {
- WriteVarUInt32(Out_, static_cast<ui32>(skipKey ? labels.size() - 1 : labels.size()));
+ void WriteLabels(const TPooledLabels& labels, const TPooledStr* skipKey) {
+ WriteVarUInt32(Out_, static_cast<ui32>(skipKey ? labels.size() - 1 : labels.size()));
for (auto&& label : labels) {
- if (label.Key == skipKey) {
- continue;
- }
+ if (label.Key == skipKey) {
+ continue;
+ }
WriteVarUInt32(Out_, label.Key->Index);
WriteVarUInt32(Out_, label.Value->Index);
}
@@ -289,8 +289,8 @@ namespace NMonitoring {
IOutputStream* Out_;
ETimePrecision TimePrecision_;
ECompression Compression_;
- ESpackV1Version Version_;
- const TPooledStr* MetricName_;
+ ESpackV1Version Version_;
+ const TPooledStr* MetricName_;
bool Closed_ = false;
};
@@ -302,17 +302,17 @@ namespace NMonitoring {
ECompression compression,
EMetricsMergingMode mergingMode
) {
- return MakeHolder<TEncoderSpackV1>(out, timePrecision, compression, mergingMode, SV1_01, "");
+ return MakeHolder<TEncoderSpackV1>(out, timePrecision, compression, mergingMode, SV1_01, "");
}
- IMetricEncoderPtr EncoderSpackV12(
- IOutputStream* out,
- ETimePrecision timePrecision,
- ECompression compression,
- EMetricsMergingMode mergingMode,
- TStringBuf metricNameLabel
- ) {
- Y_ENSURE(!metricNameLabel.Empty(), "metricNameLabel can't be empty");
- return MakeHolder<TEncoderSpackV1>(out, timePrecision, compression, mergingMode, SV1_02, metricNameLabel);
- }
+ IMetricEncoderPtr EncoderSpackV12(
+ IOutputStream* out,
+ ETimePrecision timePrecision,
+ ECompression compression,
+ EMetricsMergingMode mergingMode,
+ TStringBuf metricNameLabel
+ ) {
+ Y_ENSURE(!metricNameLabel.Empty(), "metricNameLabel can't be empty");
+ return MakeHolder<TEncoderSpackV1>(out, timePrecision, compression, mergingMode, SV1_02, metricNameLabel);
+ }
}
diff --git a/library/cpp/monlib/encode/spack/spack_v1_ut.cpp b/library/cpp/monlib/encode/spack/spack_v1_ut.cpp
index c6748cb0f5..fe778eb7e0 100644
--- a/library/cpp/monlib/encode/spack/spack_v1_ut.cpp
+++ b/library/cpp/monlib/encode/spack/spack_v1_ut.cpp
@@ -468,7 +468,7 @@ Y_UNIT_TEST_SUITE(TSpackTest) {
IMetricEncoderPtr e = EncoderProtobuf(&samples);
TBuffer data(expectedSize);
- if (SV1_00 == version) { // v1.0
+ if (SV1_00 == version) { // v1.0
data.Append(reinterpret_cast<char*>(expectedHeader_v1_0), Y_ARRAY_SIZE(expectedHeader_v1_0));
} else {
data.Append(reinterpret_cast<char*>(expectedHeader), Y_ARRAY_SIZE(expectedHeader));
@@ -480,7 +480,7 @@ Y_UNIT_TEST_SUITE(TSpackTest) {
data.Append(reinterpret_cast<char*>(expectedMetric2), Y_ARRAY_SIZE(expectedMetric2));
data.Append(reinterpret_cast<char*>(expectedMetric3), Y_ARRAY_SIZE(expectedMetric3));
data.Append(reinterpret_cast<char*>(expectedMetric4), Y_ARRAY_SIZE(expectedMetric4));
- if (SV1_00 == version) { // v1.0
+ if (SV1_00 == version) { // v1.0
data.Append(reinterpret_cast<char*>(expectedMetric5_v1_0), Y_ARRAY_SIZE(expectedMetric5_v1_0));
} else {
data.Append(reinterpret_cast<char*>(expectedMetric5), Y_ARRAY_SIZE(expectedMetric5));
@@ -494,7 +494,7 @@ Y_UNIT_TEST_SUITE(TSpackTest) {
void DecodeDataToSamples(NProto::TMultiSamplesList & samples) {
TSpackHeader header;
- header.Version = SV1_01;
+ header.Version = SV1_01;
DecodeDataToSamples(samples, header.Version);
}
@@ -689,7 +689,7 @@ Y_UNIT_TEST_SUITE(TSpackTest) {
// Check that histogram bounds decoded from different versions are the same
NProto::TMultiSamplesList samples, samples_v1_0;
DecodeDataToSamples(samples);
- DecodeDataToSamples(samples_v1_0, /*version = */ SV1_00);
+ DecodeDataToSamples(samples_v1_0, /*version = */ SV1_00);
const NProto::THistogram& pointHistogram = samples.GetSamples(4).GetPoints(0).GetHistogram();
const NProto::THistogram& pointHistogram_v1_0 = samples_v1_0.GetSamples(4).GetPoints(0).GetHistogram();
@@ -698,148 +698,148 @@ Y_UNIT_TEST_SUITE(TSpackTest) {
UNIT_ASSERT_DOUBLES_EQUAL(pointHistogram.GetBounds(i), pointHistogram_v1_0.GetBounds(i), Min<double>());
}
}
-
- Y_UNIT_TEST(SimpleV12) {
- ui8 expectedSerialized[] = {
- // header
- 0x53, 0x50, // magic "SP" (fixed ui16)
- // minor, major
- 0x02, 0x01, // version (fixed ui16)
- 0x18, 0x00, // header size (fixed ui16)
- 0x00, // time precision (fixed ui8)
- 0x00, // compression algorithm (fixed ui8)
- 0x0A, 0x00, 0x00, 0x00, // label names size (fixed ui32)
- 0x14, 0x00, 0x00, 0x00, // labels values size (fixed ui32)
- 0x01, 0x00, 0x00, 0x00, // metric count (fixed ui32)
- 0x01, 0x00, 0x00, 0x00, // points count (fixed ui32)
-
- // string pools
- 0x73, 0x00, // "s\0"
- 0x70, 0x72, 0x6f, 0x6a, 0x65, 0x63, 0x74, 0x00, // "project\0"
- 0x73, 0x6f, 0x6c, 0x6f, 0x6d, 0x6f, 0x6e, 0x00, // "solomon\0"
- 0x74, 0x65, 0x6D, 0x70, 0x65, 0x72, 0x61, 0x74, // temperature
- 0x75, 0x72, 0x65, 0x00,
-
- // common time
- 0x00, 0x2f, 0x68, 0x59, // common time in seconds (fixed ui32)
-
- // common labels
- 0x00, // common labels count (varint)
-
- // metric
- 0x09, // types (COUNTER | ONE_WITHOUT_TS) (fixed ui8)
- 0x00, // flags (fixed ui8)
- 0x01, // name index (varint)
- 0x01, // metric labels count (varint)
- 0x01, // 'project' label name index (varint)
- 0x00, // 'project' label value index (varint)
- 0x11, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, // value (fixed ui64)
- };
-
- // encode
- {
- TBuffer actualSerialized;
- {
- TBufferOutput out(actualSerialized);
- auto e = EncoderSpackV12(
- &out,
- ETimePrecision::SECONDS,
- ECompression::IDENTITY,
- EMetricsMergingMode::DEFAULT,
- "s");
-
- e->OnStreamBegin();
- e->OnCommonTime(TInstant::Seconds(1500000000));
-
- {
- e->OnMetricBegin(EMetricType::COUNTER);
- {
- e->OnLabelsBegin();
+
+ Y_UNIT_TEST(SimpleV12) {
+ ui8 expectedSerialized[] = {
+ // header
+ 0x53, 0x50, // magic "SP" (fixed ui16)
+ // minor, major
+ 0x02, 0x01, // version (fixed ui16)
+ 0x18, 0x00, // header size (fixed ui16)
+ 0x00, // time precision (fixed ui8)
+ 0x00, // compression algorithm (fixed ui8)
+ 0x0A, 0x00, 0x00, 0x00, // label names size (fixed ui32)
+ 0x14, 0x00, 0x00, 0x00, // labels values size (fixed ui32)
+ 0x01, 0x00, 0x00, 0x00, // metric count (fixed ui32)
+ 0x01, 0x00, 0x00, 0x00, // points count (fixed ui32)
+
+ // string pools
+ 0x73, 0x00, // "s\0"
+ 0x70, 0x72, 0x6f, 0x6a, 0x65, 0x63, 0x74, 0x00, // "project\0"
+ 0x73, 0x6f, 0x6c, 0x6f, 0x6d, 0x6f, 0x6e, 0x00, // "solomon\0"
+ 0x74, 0x65, 0x6D, 0x70, 0x65, 0x72, 0x61, 0x74, // temperature
+ 0x75, 0x72, 0x65, 0x00,
+
+ // common time
+ 0x00, 0x2f, 0x68, 0x59, // common time in seconds (fixed ui32)
+
+ // common labels
+ 0x00, // common labels count (varint)
+
+ // metric
+ 0x09, // types (COUNTER | ONE_WITHOUT_TS) (fixed ui8)
+ 0x00, // flags (fixed ui8)
+ 0x01, // name index (varint)
+ 0x01, // metric labels count (varint)
+ 0x01, // 'project' label name index (varint)
+ 0x00, // 'project' label value index (varint)
+ 0x11, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, // value (fixed ui64)
+ };
+
+ // encode
+ {
+ TBuffer actualSerialized;
+ {
+ TBufferOutput out(actualSerialized);
+ auto e = EncoderSpackV12(
+ &out,
+ ETimePrecision::SECONDS,
+ ECompression::IDENTITY,
+ EMetricsMergingMode::DEFAULT,
+ "s");
+
+ e->OnStreamBegin();
+ e->OnCommonTime(TInstant::Seconds(1500000000));
+
+ {
+ e->OnMetricBegin(EMetricType::COUNTER);
+ {
+ e->OnLabelsBegin();
+ e->OnLabel("project", "solomon");
+ e->OnLabel("s", "temperature");
+ e->OnLabelsEnd();
+ }
+ // Only the last value will be encoded
+ e->OnUint64(TInstant::Zero(), 10);
+ e->OnUint64(TInstant::Zero(), 13);
+ e->OnUint64(TInstant::Zero(), 17);
+ e->OnMetricEnd();
+ }
+
+ e->OnStreamEnd();
+ e->Close();
+ }
+
+ UNIT_ASSERT_VALUES_EQUAL(actualSerialized.Size(), Y_ARRAY_SIZE(expectedSerialized));
+ UNIT_ASSERT_BINARY_EQUALS(actualSerialized.Data(), expectedSerialized);
+ }
+
+ // decode
+ {
+ NProto::TMultiSamplesList samples;
+ {
+ auto input = TMemoryInput(expectedSerialized, Y_ARRAY_SIZE(expectedSerialized));
+ auto encoder = EncoderProtobuf(&samples);
+ DecodeSpackV1(&input, encoder.Get(), "s");
+ }
+
+ UNIT_ASSERT_VALUES_EQUAL(TInstant::MilliSeconds(samples.GetCommonTime()),
+ TInstant::Seconds(1500000000));
+
+ UNIT_ASSERT_VALUES_EQUAL(samples.CommonLabelsSize(), 0);
+
+ UNIT_ASSERT_VALUES_EQUAL(samples.SamplesSize(), 1);
+ {
+ const auto& s = samples.GetSamples(0);
+ UNIT_ASSERT_EQUAL(s.GetMetricType(), NProto::COUNTER);
+ UNIT_ASSERT_VALUES_EQUAL(s.LabelsSize(), 2);
+ AssertLabelEqual(s.GetLabels(0), "s", "temperature");
+ AssertLabelEqual(s.GetLabels(1), "project", "solomon");
+
+ UNIT_ASSERT_VALUES_EQUAL(s.PointsSize(), 1);
+ AssertPointEqual(s.GetPoints(0), TInstant::Zero(), ui64(17));
+ }
+ }
+ }
+
+ Y_UNIT_TEST(V12MissingNameForOneMetric) {
+ TBuffer b;
+ TBufferOutput out(b);
+ auto e = EncoderSpackV12(
+ &out,
+ ETimePrecision::SECONDS,
+ ECompression::IDENTITY,
+ EMetricsMergingMode::DEFAULT,
+ "s");
+
+ UNIT_ASSERT_EXCEPTION_CONTAINS(
+ [&]() {
+ e->OnStreamBegin();
+ {
+ e->OnMetricBegin(EMetricType::COUNTER);
+ {
+ e->OnLabelsBegin();
+ e->OnLabel("s", "s1");
+ e->OnLabelsEnd();
+ }
+ e->OnUint64(TInstant::Zero(), 1);
+ e->OnMetricEnd();
+
+ e->OnMetricBegin(EMetricType::COUNTER);
+ {
+ e->OnLabelsBegin();
e->OnLabel("project", "solomon");
- e->OnLabel("s", "temperature");
- e->OnLabelsEnd();
- }
- // Only the last value will be encoded
- e->OnUint64(TInstant::Zero(), 10);
- e->OnUint64(TInstant::Zero(), 13);
- e->OnUint64(TInstant::Zero(), 17);
- e->OnMetricEnd();
- }
-
- e->OnStreamEnd();
- e->Close();
- }
-
- UNIT_ASSERT_VALUES_EQUAL(actualSerialized.Size(), Y_ARRAY_SIZE(expectedSerialized));
- UNIT_ASSERT_BINARY_EQUALS(actualSerialized.Data(), expectedSerialized);
- }
-
- // decode
- {
- NProto::TMultiSamplesList samples;
- {
- auto input = TMemoryInput(expectedSerialized, Y_ARRAY_SIZE(expectedSerialized));
- auto encoder = EncoderProtobuf(&samples);
- DecodeSpackV1(&input, encoder.Get(), "s");
- }
-
- UNIT_ASSERT_VALUES_EQUAL(TInstant::MilliSeconds(samples.GetCommonTime()),
- TInstant::Seconds(1500000000));
-
- UNIT_ASSERT_VALUES_EQUAL(samples.CommonLabelsSize(), 0);
-
- UNIT_ASSERT_VALUES_EQUAL(samples.SamplesSize(), 1);
- {
- const auto& s = samples.GetSamples(0);
- UNIT_ASSERT_EQUAL(s.GetMetricType(), NProto::COUNTER);
- UNIT_ASSERT_VALUES_EQUAL(s.LabelsSize(), 2);
- AssertLabelEqual(s.GetLabels(0), "s", "temperature");
- AssertLabelEqual(s.GetLabels(1), "project", "solomon");
-
- UNIT_ASSERT_VALUES_EQUAL(s.PointsSize(), 1);
- AssertPointEqual(s.GetPoints(0), TInstant::Zero(), ui64(17));
- }
- }
- }
-
- Y_UNIT_TEST(V12MissingNameForOneMetric) {
- TBuffer b;
- TBufferOutput out(b);
- auto e = EncoderSpackV12(
- &out,
- ETimePrecision::SECONDS,
- ECompression::IDENTITY,
- EMetricsMergingMode::DEFAULT,
- "s");
-
- UNIT_ASSERT_EXCEPTION_CONTAINS(
- [&]() {
- e->OnStreamBegin();
- {
- e->OnMetricBegin(EMetricType::COUNTER);
- {
- e->OnLabelsBegin();
- e->OnLabel("s", "s1");
- e->OnLabelsEnd();
- }
- e->OnUint64(TInstant::Zero(), 1);
- e->OnMetricEnd();
-
- e->OnMetricBegin(EMetricType::COUNTER);
- {
- e->OnLabelsBegin();
- e->OnLabel("project", "solomon");
- e->OnLabel("m", "v");
- e->OnLabelsEnd();
- }
- e->OnUint64(TInstant::Zero(), 2);
- e->OnMetricEnd();
- }
-
- e->OnStreamEnd();
- e->Close();
- }(),
- yexception,
- "metric name label 's' not found, all metric labels '{m=v, project=solomon}'");
- }
+ e->OnLabel("m", "v");
+ e->OnLabelsEnd();
+ }
+ e->OnUint64(TInstant::Zero(), 2);
+ e->OnMetricEnd();
+ }
+
+ e->OnStreamEnd();
+ e->Close();
+ }(),
+ yexception,
+ "metric name label 's' not found, all metric labels '{m=v, project=solomon}'");
+ }
}
diff --git a/library/cpp/monlib/metrics/metric_value.h b/library/cpp/monlib/metrics/metric_value.h
index 5f235e7a0b..607fcc8602 100644
--- a/library/cpp/monlib/metrics/metric_value.h
+++ b/library/cpp/monlib/metrics/metric_value.h
@@ -384,35 +384,35 @@ namespace NMonitoring {
template <typename T>
void Add(TInstant time, T value) {
- Add(TPoint(time, value), TValueType<T>::Type);
- }
-
- void Add(TPoint point, EMetricValueType valueType) {
+ Add(TPoint(time, value), TValueType<T>::Type);
+ }
+
+ void Add(TPoint point, EMetricValueType valueType) {
if (Empty()) {
- ValueType_ = valueType;
+ ValueType_ = valueType;
} else {
- CheckTypes(ValueType_, valueType);
+ CheckTypes(ValueType_, valueType);
}
- Points_.push_back(point);
+ Points_.push_back(point);
if (ValueType_ == EMetricValueType::SUMMARY) {
- TPoint& p = Points_.back();
- p.GetValue().AsSummaryDouble()->Ref();
+ TPoint& p = Points_.back();
+ p.GetValue().AsSummaryDouble()->Ref();
} else if (ValueType_ == EMetricValueType::HISTOGRAM) {
- TPoint& p = Points_.back();
- p.GetValue().AsHistogram()->Ref();
+ TPoint& p = Points_.back();
+ p.GetValue().AsHistogram()->Ref();
} else if (ValueType_ == EMetricValueType::LOGHISTOGRAM) {
- TPoint& p = Points_.back();
- p.GetValue().AsLogHistogram()->Ref();
+ TPoint& p = Points_.back();
+ p.GetValue().AsLogHistogram()->Ref();
}
}
void CopyFrom(const TMetricTimeSeries& other) {
- if (Empty()) {
- ValueType_ = other.ValueType_;
- } else {
- CheckTypes(GetValueType(), other.GetValueType());
- }
+ if (Empty()) {
+ ValueType_ = other.ValueType_;
+ } else {
+ CheckTypes(GetValueType(), other.GetValueType());
+ }
size_t prevSize = Points_.size();
Copy(std::begin(other.Points_), std::end(other.Points_),
diff --git a/library/cpp/testing/benchmark/bench.h b/library/cpp/testing/benchmark/bench.h
index 4225a31fd3..21551ad0dd 100644
--- a/library/cpp/testing/benchmark/bench.h
+++ b/library/cpp/testing/benchmark/bench.h
@@ -1,9 +1,9 @@
#pragma once
-#include <util/system/compiler.h>
+#include <util/system/compiler.h>
#include <util/system/types.h>
-#include <utility>
+#include <utility>
namespace NBench {
namespace NCpu {
@@ -71,14 +71,14 @@ namespace NBench {
}
#endif
- /**
- * Use this function to prevent unused variables elimination.
- *
- * @param Unused variable (e.g. return value of benchmarked function).
- */
+ /**
+ * Use this function to prevent unused variables elimination.
+ *
+ * @param Unused variable (e.g. return value of benchmarked function).
+ */
template <typename T>
Y_FORCE_INLINE void DoNotOptimize(T&& datum) {
- ::DoNotOptimizeAway(std::forward<T>(datum));
+ ::DoNotOptimizeAway(std::forward<T>(datum));
}
int Main(int argc, char** argv);
diff --git a/util/system/compiler.cpp b/util/system/compiler.cpp
index 4803c75a4a..d4b3cca0af 100644
--- a/util/system/compiler.cpp
+++ b/util/system/compiler.cpp
@@ -4,6 +4,6 @@
[[noreturn]] Y_HIDDEN void _YandexAbort() {
std::abort();
}
-
-void UseCharPointerImpl(volatile const char*) {
-}
+
+void UseCharPointerImpl(volatile const char*) {
+}
diff --git a/util/system/compiler.h b/util/system/compiler.h
index ad523479be..b373edcc46 100644
--- a/util/system/compiler.h
+++ b/util/system/compiler.h
@@ -1,9 +1,9 @@
#pragma once
-#if defined(_MSC_VER)
+#if defined(_MSC_VER)
#include <intrin.h>
-#endif
-
+#endif
+
// useful cross-platfrom definitions for compilers
/**
@@ -623,11 +623,11 @@ _YandexAbort();
do { \
} while (0)
#endif
-
-#ifdef __cplusplus
-
+
+#ifdef __cplusplus
+
void UseCharPointerImpl(volatile const char*);
-
+
template <typename T>
Y_FORCE_INLINE void DoNotOptimizeAway(T&& datum) {
#if defined(_MSC_VER)
@@ -641,10 +641,10 @@ Y_FORCE_INLINE void DoNotOptimizeAway(T&& datum) {
Y_FAKE_READ(datum);
#endif
}
-
+
/**
- * Use this macro to prevent unused variables elimination.
- */
+ * Use this macro to prevent unused variables elimination.
+ */
#define Y_DO_NOT_OPTIMIZE_AWAY(X) ::DoNotOptimizeAway(X)
-
-#endif
+
+#endif
diff --git a/util/thread/pool.cpp b/util/thread/pool.cpp
index 1b6745c272..05fad02e9b 100644
--- a/util/thread/pool.cpp
+++ b/util/thread/pool.cpp
@@ -147,7 +147,7 @@ public:
return ThreadCountReal;
}
- inline void AtforkAction() noexcept Y_NO_SANITIZE("thread") {
+ inline void AtforkAction() noexcept Y_NO_SANITIZE("thread") {
Forked = true;
}
diff --git a/ydb/core/driver_lib/run/run.cpp b/ydb/core/driver_lib/run/run.cpp
index 62bf1fa07e..a4f74aa4e0 100644
--- a/ydb/core/driver_lib/run/run.cpp
+++ b/ydb/core/driver_lib/run/run.cpp
@@ -119,8 +119,8 @@
#include <ydb/library/yql/minikql/invoke_builtins/mkql_builtins.h>
#include <util/folder/dirut.h>
-#include <util/system/file.h>
-#include <util/system/getpid.h>
+#include <util/system/file.h>
+#include <util/system/getpid.h>
#include <util/system/hostname.h>
#include <ydb/core/tracing/tablet_info.h>
diff --git a/ydb/core/protos/config.proto b/ydb/core/protos/config.proto
index 4bac573477..d64169d4fc 100644
--- a/ydb/core/protos/config.proto
+++ b/ydb/core/protos/config.proto
@@ -254,18 +254,18 @@ message TBlobStorageFormatConfig {
repeated TDrive Drive = 1;
}
-message TUAClientConfig {
- optional string Uri = 1;
- optional string SharedSecretKey = 2;
- optional uint64 MaxInflightBytes = 3 [default = 100000000];
- optional uint64 GrpcReconnectDelayMs = 4;
- optional uint64 GrpcSendDelayMs = 5;
- optional uint64 GrpcMaxMessageSize = 6;
- optional string ClientLogFile = 7;
- optional uint32 ClientLogPriority = 8;
- optional string LogName = 9;
-}
-
+message TUAClientConfig {
+ optional string Uri = 1;
+ optional string SharedSecretKey = 2;
+ optional uint64 MaxInflightBytes = 3 [default = 100000000];
+ optional uint64 GrpcReconnectDelayMs = 4;
+ optional uint64 GrpcSendDelayMs = 5;
+ optional uint64 GrpcMaxMessageSize = 6;
+ optional string ClientLogFile = 7;
+ optional uint32 ClientLogPriority = 8;
+ optional string LogName = 9;
+}
+
message TLogConfig {
message TEntry {
optional bytes Component = 1;
@@ -286,7 +286,7 @@ message TLogConfig {
optional string BackendFileName = 10;
optional string SysLogService = 11;
optional bool SysLogToStdErr = 12; // writes logs to stderr as well as in syslog
- optional TUAClientConfig UAClientConfig = 13;
+ optional TUAClientConfig UAClientConfig = 13;
optional uint64 TimeThresholdMs = 14 [default = 1000];
}
diff --git a/ydb/core/yq/libs/pretty_printers/minikql_program_printer.cpp b/ydb/core/yq/libs/pretty_printers/minikql_program_printer.cpp
index 73130d39fb..8c80588457 100644
--- a/ydb/core/yq/libs/pretty_printers/minikql_program_printer.cpp
+++ b/ydb/core/yq/libs/pretty_printers/minikql_program_printer.cpp
@@ -68,9 +68,9 @@ public:
void Visit(TTaggedType&) override {
Out << "TaggedType";
}
- void Visit(TBlockType&) override {
- Out << "BlockType";
- }
+ void Visit(TBlockType&) override {
+ Out << "BlockType";
+ }
// Values
void Visit(TVoid&) override {
diff --git a/ydb/library/yql/minikql/arrow/arrow_defs.h b/ydb/library/yql/minikql/arrow/arrow_defs.h
index 7b3d5dc8e6..97b26e0345 100644
--- a/ydb/library/yql/minikql/arrow/arrow_defs.h
+++ b/ydb/library/yql/minikql/arrow/arrow_defs.h
@@ -1,24 +1,24 @@
-#pragma once
-
+#pragma once
+
#include <ydb/library/yql/minikql/defs.h>
-
-#define ARROW_CHECK_STATUS(s, op, ...) \
- MKQL_ENSURE(s.ok(), "Operation failed: [" << #op << "]\n" \
- << "" __VA_ARGS__ << ": [" << s.ToString() << "]") \
-
-#define ARROW_OK_S(op, ...) \
-do { \
- ::arrow::Status _s = (op); \
- ARROW_CHECK_STATUS(_s, op, __VA_ARGS__); \
- } while (false)
-
-#define ARROW_OK(op) ARROW_OK_S(op, "Bad status")
-
-#define ARROW_RESULT_S(op, ...) \
- [&]() { \
- auto result = (op); \
- ARROW_CHECK_STATUS(result.status(), op, __VA_ARGS__); \
- return std::move(result).ValueOrDie(); \
- }()
-
-#define ARROW_RESULT(op) ARROW_RESULT_S(op, "Bad status")
+
+#define ARROW_CHECK_STATUS(s, op, ...) \
+ MKQL_ENSURE(s.ok(), "Operation failed: [" << #op << "]\n" \
+ << "" __VA_ARGS__ << ": [" << s.ToString() << "]") \
+
+#define ARROW_OK_S(op, ...) \
+do { \
+ ::arrow::Status _s = (op); \
+ ARROW_CHECK_STATUS(_s, op, __VA_ARGS__); \
+ } while (false)
+
+#define ARROW_OK(op) ARROW_OK_S(op, "Bad status")
+
+#define ARROW_RESULT_S(op, ...) \
+ [&]() { \
+ auto result = (op); \
+ ARROW_CHECK_STATUS(result.status(), op, __VA_ARGS__); \
+ return std::move(result).ValueOrDie(); \
+ }()
+
+#define ARROW_RESULT(op) ARROW_RESULT_S(op, "Bad status")
diff --git a/ydb/library/yql/minikql/arrow/mkql_memory_pool.cpp b/ydb/library/yql/minikql/arrow/mkql_memory_pool.cpp
index 57b8f2a246..7bc6ad02b0 100644
--- a/ydb/library/yql/minikql/arrow/mkql_memory_pool.cpp
+++ b/ydb/library/yql/minikql/arrow/mkql_memory_pool.cpp
@@ -1,50 +1,50 @@
-#include "mkql_memory_pool.h"
-
-namespace NKikimr::NMiniKQL {
-
-namespace {
-class TArrowMemoryPool : public arrow::MemoryPool {
-public:
- explicit TArrowMemoryPool(TAllocState& allocState)
- : AllocState(allocState) {
- }
-
- arrow::Status Allocate(int64_t size, uint8_t** out) override {
- *out = static_cast<uint8_t*>(MKQLAllocWithSize(size));
- return arrow::Status();
- }
-
- arrow::Status Reallocate(int64_t oldSize, int64_t newSize, uint8_t** ptr) override {
- void* newPtr = MKQLAllocWithSize(newSize);
- ::memcpy(newPtr, *ptr, std::min(oldSize, newSize));
- MKQLFreeWithSize(*ptr, oldSize);
- *ptr = static_cast<uint8_t*>(newPtr);
- return arrow::Status();
- }
-
- void Free(uint8_t* buffer, int64_t size) override {
- MKQLFreeWithSize(buffer, size);
- }
-
- int64_t bytes_allocated() const override {
- return AllocState.GetUsed();
- }
-
- int64_t max_memory() const override {
- return -1;
- }
-
- std::string backend_name() const override {
- return "MKQL";
- }
-
-private:
- TAllocState& AllocState;
-};
-}
-
-std::unique_ptr <arrow::MemoryPool> MakeArrowMemoryPool(TAllocState& allocState) {
- return std::make_unique<TArrowMemoryPool>(allocState);
-}
-
-}
+#include "mkql_memory_pool.h"
+
+namespace NKikimr::NMiniKQL {
+
+namespace {
+class TArrowMemoryPool : public arrow::MemoryPool {
+public:
+ explicit TArrowMemoryPool(TAllocState& allocState)
+ : AllocState(allocState) {
+ }
+
+ arrow::Status Allocate(int64_t size, uint8_t** out) override {
+ *out = static_cast<uint8_t*>(MKQLAllocWithSize(size));
+ return arrow::Status();
+ }
+
+ arrow::Status Reallocate(int64_t oldSize, int64_t newSize, uint8_t** ptr) override {
+ void* newPtr = MKQLAllocWithSize(newSize);
+ ::memcpy(newPtr, *ptr, std::min(oldSize, newSize));
+ MKQLFreeWithSize(*ptr, oldSize);
+ *ptr = static_cast<uint8_t*>(newPtr);
+ return arrow::Status();
+ }
+
+ void Free(uint8_t* buffer, int64_t size) override {
+ MKQLFreeWithSize(buffer, size);
+ }
+
+ int64_t bytes_allocated() const override {
+ return AllocState.GetUsed();
+ }
+
+ int64_t max_memory() const override {
+ return -1;
+ }
+
+ std::string backend_name() const override {
+ return "MKQL";
+ }
+
+private:
+ TAllocState& AllocState;
+};
+}
+
+std::unique_ptr <arrow::MemoryPool> MakeArrowMemoryPool(TAllocState& allocState) {
+ return std::make_unique<TArrowMemoryPool>(allocState);
+}
+
+}
diff --git a/ydb/library/yql/minikql/arrow/mkql_memory_pool.h b/ydb/library/yql/minikql/arrow/mkql_memory_pool.h
index dd23cbb84f..e48e0a7af2 100644
--- a/ydb/library/yql/minikql/arrow/mkql_memory_pool.h
+++ b/ydb/library/yql/minikql/arrow/mkql_memory_pool.h
@@ -1,11 +1,11 @@
-#pragma once
-
+#pragma once
+
#include <ydb/library/yql/minikql/mkql_alloc.h>
-
-#include <contrib/libs/apache/arrow/cpp/src/arrow/memory_pool.h>
-
-namespace NKikimr::NMiniKQL {
-
-std::unique_ptr<arrow::MemoryPool> MakeArrowMemoryPool(TAllocState& allocState);
-
-}
+
+#include <contrib/libs/apache/arrow/cpp/src/arrow/memory_pool.h>
+
+namespace NKikimr::NMiniKQL {
+
+std::unique_ptr<arrow::MemoryPool> MakeArrowMemoryPool(TAllocState& allocState);
+
+}
diff --git a/ydb/library/yql/minikql/arrow/ya.make b/ydb/library/yql/minikql/arrow/ya.make
index b6d675f30d..442e5d3996 100644
--- a/ydb/library/yql/minikql/arrow/ya.make
+++ b/ydb/library/yql/minikql/arrow/ya.make
@@ -5,11 +5,11 @@ OWNER(
)
SRCS(
- mkql_memory_pool.cpp
+ mkql_memory_pool.cpp
)
PEERDIR(
- contrib/libs/apache/arrow
+ contrib/libs/apache/arrow
ydb/library/yql/minikql
)
diff --git a/ydb/library/yql/minikql/comp_nodes/mkql_block_add.cpp b/ydb/library/yql/minikql/comp_nodes/mkql_block_add.cpp
index f87ca99874..5a57e95c24 100644
--- a/ydb/library/yql/minikql/comp_nodes/mkql_block_add.cpp
+++ b/ydb/library/yql/minikql/comp_nodes/mkql_block_add.cpp
@@ -1,152 +1,152 @@
-#include "mkql_block_add.h"
-
+#include "mkql_block_add.h"
+
#include <ydb/library/yql/minikql/arrow/arrow_defs.h>
#include <ydb/library/yql/minikql/computation/mkql_computation_node_holders.h>
#include <ydb/library/yql/minikql/computation/mkql_computation_node_codegen.h>
#include <ydb/library/yql/minikql/mkql_node_builder.h>
#include <ydb/library/yql/minikql/mkql_node_cast.h>
-
-#include <arrow/array/builder_primitive.h>
-#include <arrow/compute/exec_internal.h>
-#include <arrow/compute/function.h>
-#include <arrow/compute/kernel.h>
-#include <arrow/compute/registry.h>
-#include <arrow/util/bit_util.h>
-
-namespace NKikimr {
-namespace NMiniKQL {
-
-namespace {
-
-class TBlockAddWrapper : public TMutableComputationNode<TBlockAddWrapper> {
-public:
- TBlockAddWrapper(TComputationMutables& mutables,
- IComputationNode* leftArg,
- IComputationNode* rightArg,
- TType* leftArgType,
- TType* rightArgType,
- TType* outputType)
- : TMutableComputationNode(mutables)
- , LeftArg(leftArg)
- , RightArg(rightArg)
- , LeftValueDesc(ToValueDescr(leftArgType))
- , RightValueDesc(ToValueDescr(rightArgType))
- , OutputValueDescr(ToValueDescr(outputType))
- , Kernel(ResolveKernel(LeftValueDesc, RightValueDesc))
- , OutputTypeBitWidth(static_cast<const arrow::FixedWidthType&>(*OutputValueDescr.type).bit_width())
- , FunctionRegistry(*arrow::compute::GetFunctionRegistry())
- {
- {
- auto execContext = arrow::compute::ExecContext();
- auto kernelContext = arrow::compute::KernelContext(&execContext);
- const auto kernelOutputValueDesc = ARROW_RESULT(Kernel.signature->out_type().Resolve(&kernelContext, {
- LeftValueDesc,
- RightValueDesc
- }));
- Y_VERIFY_DEBUG(kernelOutputValueDesc == OutputValueDescr);
- }
-
- Y_VERIFY_DEBUG(
- LeftValueDesc.shape == arrow::ValueDescr::ARRAY && RightValueDesc.shape == arrow::ValueDescr::ARRAY ||
- LeftValueDesc.shape == arrow::ValueDescr::SCALAR && RightValueDesc.shape == arrow::ValueDescr::ARRAY ||
- LeftValueDesc.shape == arrow::ValueDescr::ARRAY && RightValueDesc.shape == arrow::ValueDescr::SCALAR);
- }
-
- NUdf::TUnboxedValuePod DoCalculate(TComputationContext& ctx) const {
- const auto leftValue = LeftArg->GetValue(ctx);
- const auto rightValue = RightArg->GetValue(ctx);
- auto& leftDatum = TArrowBlock::From(leftValue).GetDatum();
- auto& rightDatum = TArrowBlock::From(rightValue).GetDatum();
- Y_VERIFY_DEBUG(leftDatum.descr() == LeftValueDesc);
- Y_VERIFY_DEBUG(rightDatum.descr() == RightValueDesc);
- const auto leftKind = leftDatum.kind();
- const auto rightKind = rightDatum.kind();
- MKQL_ENSURE(leftKind != arrow::Datum::ARRAY || rightKind != arrow::Datum::ARRAY ||
- leftDatum.array()->length == rightDatum.array()->length,
- "block size mismatch: "
- << static_cast<ui64>(leftDatum.array()->length)
- << " != "
- << static_cast<ui64>(rightDatum.array()->length));
- const auto blockLength = leftKind == arrow::Datum::ARRAY
- ? leftDatum.array()->length
- : rightDatum.array()->length;
-
- auto execContext = arrow::compute::ExecContext(&ctx.ArrowMemoryPool, nullptr, &FunctionRegistry);
- auto kernelContext = arrow::compute::KernelContext(&execContext);
-
- arrow::Datum output = arrow::ArrayData::Make(
- OutputValueDescr.type,
- blockLength,
- std::vector<std::shared_ptr<arrow::Buffer>> {
- ARROW_RESULT(kernelContext.AllocateBitmap(blockLength)),
- ARROW_RESULT(kernelContext.Allocate(arrow::BitUtil::BytesForBits(OutputTypeBitWidth * blockLength)))
- });
- const auto inputBatch = arrow::compute::ExecBatch({leftDatum, rightDatum}, blockLength);
- ARROW_OK(arrow::compute::detail::PropagateNulls(&kernelContext, inputBatch, output.array().get()));
- ARROW_OK(Kernel.exec(&kernelContext, inputBatch, &output));
- return ctx.HolderFactory.CreateArrowBlock(std::move(output));
- }
-
-private:
- void RegisterDependencies() const final {
- this->DependsOn(LeftArg);
- this->DependsOn(RightArg);
- }
-
- static const arrow::compute::ScalarKernel& ResolveKernel(const arrow::ValueDescr& leftArg,
- const arrow::ValueDescr& rightArg)
- {
- auto* functionRegistry = arrow::compute::GetFunctionRegistry();
- Y_VERIFY_DEBUG(functionRegistry != nullptr);
- auto function = ARROW_RESULT(functionRegistry->GetFunction("add"));
- Y_VERIFY_DEBUG(function != nullptr);
- Y_VERIFY_DEBUG(function->kind() == arrow::compute::Function::SCALAR);
-
- const auto* kernel = ARROW_RESULT(function->DispatchExact({leftArg, rightArg}));
- return *static_cast<const arrow::compute::ScalarKernel*>(kernel);
- }
-
- static std::shared_ptr<arrow::DataType> ConvertType(TType* type) {
- bool isOptional;
- const auto dataType = UnpackOptionalData(type, isOptional);
- switch (*dataType->GetDataSlot()) {
- case NUdf::EDataSlot::Uint64:
- return arrow::uint64();
- default:
- Y_FAIL("unexpected type %s", TString(dataType->GetKindAsStr()).c_str());
- }
- }
-
- static arrow::ValueDescr ToValueDescr(TType* type) {
- auto* blockType = AS_TYPE(TBlockType, type);
- const auto shape = blockType->GetShape() == TBlockType::EShape::Single
- ? arrow::ValueDescr::SCALAR
- : arrow::ValueDescr::ARRAY;
- return arrow::ValueDescr(ConvertType(blockType->GetItemType()), shape);
- }
-
-private:
- IComputationNode* LeftArg;
- IComputationNode* RightArg;
- const arrow::ValueDescr LeftValueDesc;
- const arrow::ValueDescr RightValueDesc;
- const arrow::ValueDescr OutputValueDescr;
- const arrow::compute::ScalarKernel& Kernel;
- const int OutputTypeBitWidth;
- arrow::compute::FunctionRegistry& FunctionRegistry;
-};
-
-}
-
-IComputationNode* WrapBlockAdd(TCallable& callable, const TComputationNodeFactoryContext& ctx) {
- const auto* callableType = callable.GetType();
- return new TBlockAddWrapper(ctx.Mutables,
- LocateNode(ctx.NodeLocator, callable, 0),
- LocateNode(ctx.NodeLocator, callable, 1),
- callableType->GetArgumentType(0),
- callableType->GetArgumentType(1),
- callableType->GetReturnType());
-}
-
-}
-}
+
+#include <arrow/array/builder_primitive.h>
+#include <arrow/compute/exec_internal.h>
+#include <arrow/compute/function.h>
+#include <arrow/compute/kernel.h>
+#include <arrow/compute/registry.h>
+#include <arrow/util/bit_util.h>
+
+namespace NKikimr {
+namespace NMiniKQL {
+
+namespace {
+
+class TBlockAddWrapper : public TMutableComputationNode<TBlockAddWrapper> {
+public:
+ TBlockAddWrapper(TComputationMutables& mutables,
+ IComputationNode* leftArg,
+ IComputationNode* rightArg,
+ TType* leftArgType,
+ TType* rightArgType,
+ TType* outputType)
+ : TMutableComputationNode(mutables)
+ , LeftArg(leftArg)
+ , RightArg(rightArg)
+ , LeftValueDesc(ToValueDescr(leftArgType))
+ , RightValueDesc(ToValueDescr(rightArgType))
+ , OutputValueDescr(ToValueDescr(outputType))
+ , Kernel(ResolveKernel(LeftValueDesc, RightValueDesc))
+ , OutputTypeBitWidth(static_cast<const arrow::FixedWidthType&>(*OutputValueDescr.type).bit_width())
+ , FunctionRegistry(*arrow::compute::GetFunctionRegistry())
+ {
+ {
+ auto execContext = arrow::compute::ExecContext();
+ auto kernelContext = arrow::compute::KernelContext(&execContext);
+ const auto kernelOutputValueDesc = ARROW_RESULT(Kernel.signature->out_type().Resolve(&kernelContext, {
+ LeftValueDesc,
+ RightValueDesc
+ }));
+ Y_VERIFY_DEBUG(kernelOutputValueDesc == OutputValueDescr);
+ }
+
+ Y_VERIFY_DEBUG(
+ LeftValueDesc.shape == arrow::ValueDescr::ARRAY && RightValueDesc.shape == arrow::ValueDescr::ARRAY ||
+ LeftValueDesc.shape == arrow::ValueDescr::SCALAR && RightValueDesc.shape == arrow::ValueDescr::ARRAY ||
+ LeftValueDesc.shape == arrow::ValueDescr::ARRAY && RightValueDesc.shape == arrow::ValueDescr::SCALAR);
+ }
+
+ NUdf::TUnboxedValuePod DoCalculate(TComputationContext& ctx) const {
+ const auto leftValue = LeftArg->GetValue(ctx);
+ const auto rightValue = RightArg->GetValue(ctx);
+ auto& leftDatum = TArrowBlock::From(leftValue).GetDatum();
+ auto& rightDatum = TArrowBlock::From(rightValue).GetDatum();
+ Y_VERIFY_DEBUG(leftDatum.descr() == LeftValueDesc);
+ Y_VERIFY_DEBUG(rightDatum.descr() == RightValueDesc);
+ const auto leftKind = leftDatum.kind();
+ const auto rightKind = rightDatum.kind();
+ MKQL_ENSURE(leftKind != arrow::Datum::ARRAY || rightKind != arrow::Datum::ARRAY ||
+ leftDatum.array()->length == rightDatum.array()->length,
+ "block size mismatch: "
+ << static_cast<ui64>(leftDatum.array()->length)
+ << " != "
+ << static_cast<ui64>(rightDatum.array()->length));
+ const auto blockLength = leftKind == arrow::Datum::ARRAY
+ ? leftDatum.array()->length
+ : rightDatum.array()->length;
+
+ auto execContext = arrow::compute::ExecContext(&ctx.ArrowMemoryPool, nullptr, &FunctionRegistry);
+ auto kernelContext = arrow::compute::KernelContext(&execContext);
+
+ arrow::Datum output = arrow::ArrayData::Make(
+ OutputValueDescr.type,
+ blockLength,
+ std::vector<std::shared_ptr<arrow::Buffer>> {
+ ARROW_RESULT(kernelContext.AllocateBitmap(blockLength)),
+ ARROW_RESULT(kernelContext.Allocate(arrow::BitUtil::BytesForBits(OutputTypeBitWidth * blockLength)))
+ });
+ const auto inputBatch = arrow::compute::ExecBatch({leftDatum, rightDatum}, blockLength);
+ ARROW_OK(arrow::compute::detail::PropagateNulls(&kernelContext, inputBatch, output.array().get()));
+ ARROW_OK(Kernel.exec(&kernelContext, inputBatch, &output));
+ return ctx.HolderFactory.CreateArrowBlock(std::move(output));
+ }
+
+private:
+ void RegisterDependencies() const final {
+ this->DependsOn(LeftArg);
+ this->DependsOn(RightArg);
+ }
+
+ static const arrow::compute::ScalarKernel& ResolveKernel(const arrow::ValueDescr& leftArg,
+ const arrow::ValueDescr& rightArg)
+ {
+ auto* functionRegistry = arrow::compute::GetFunctionRegistry();
+ Y_VERIFY_DEBUG(functionRegistry != nullptr);
+ auto function = ARROW_RESULT(functionRegistry->GetFunction("add"));
+ Y_VERIFY_DEBUG(function != nullptr);
+ Y_VERIFY_DEBUG(function->kind() == arrow::compute::Function::SCALAR);
+
+ const auto* kernel = ARROW_RESULT(function->DispatchExact({leftArg, rightArg}));
+ return *static_cast<const arrow::compute::ScalarKernel*>(kernel);
+ }
+
+ static std::shared_ptr<arrow::DataType> ConvertType(TType* type) {
+ bool isOptional;
+ const auto dataType = UnpackOptionalData(type, isOptional);
+ switch (*dataType->GetDataSlot()) {
+ case NUdf::EDataSlot::Uint64:
+ return arrow::uint64();
+ default:
+ Y_FAIL("unexpected type %s", TString(dataType->GetKindAsStr()).c_str());
+ }
+ }
+
+ static arrow::ValueDescr ToValueDescr(TType* type) {
+ auto* blockType = AS_TYPE(TBlockType, type);
+ const auto shape = blockType->GetShape() == TBlockType::EShape::Single
+ ? arrow::ValueDescr::SCALAR
+ : arrow::ValueDescr::ARRAY;
+ return arrow::ValueDescr(ConvertType(blockType->GetItemType()), shape);
+ }
+
+private:
+ IComputationNode* LeftArg;
+ IComputationNode* RightArg;
+ const arrow::ValueDescr LeftValueDesc;
+ const arrow::ValueDescr RightValueDesc;
+ const arrow::ValueDescr OutputValueDescr;
+ const arrow::compute::ScalarKernel& Kernel;
+ const int OutputTypeBitWidth;
+ arrow::compute::FunctionRegistry& FunctionRegistry;
+};
+
+}
+
+IComputationNode* WrapBlockAdd(TCallable& callable, const TComputationNodeFactoryContext& ctx) {
+ const auto* callableType = callable.GetType();
+ return new TBlockAddWrapper(ctx.Mutables,
+ LocateNode(ctx.NodeLocator, callable, 0),
+ LocateNode(ctx.NodeLocator, callable, 1),
+ callableType->GetArgumentType(0),
+ callableType->GetArgumentType(1),
+ callableType->GetReturnType());
+}
+
+}
+}
diff --git a/ydb/library/yql/minikql/comp_nodes/mkql_block_add.h b/ydb/library/yql/minikql/comp_nodes/mkql_block_add.h
index 4508498634..bc4aa3b370 100644
--- a/ydb/library/yql/minikql/comp_nodes/mkql_block_add.h
+++ b/ydb/library/yql/minikql/comp_nodes/mkql_block_add.h
@@ -1,11 +1,11 @@
-#pragma once
-
+#pragma once
+
#include <ydb/library/yql/minikql/computation/mkql_computation_node.h>
-
-namespace NKikimr {
-namespace NMiniKQL {
-
-IComputationNode* WrapBlockAdd(TCallable& callable, const TComputationNodeFactoryContext& ctx);
-
-}
-}
+
+namespace NKikimr {
+namespace NMiniKQL {
+
+IComputationNode* WrapBlockAdd(TCallable& callable, const TComputationNodeFactoryContext& ctx);
+
+}
+}
diff --git a/ydb/library/yql/minikql/comp_nodes/mkql_blocks.cpp b/ydb/library/yql/minikql/comp_nodes/mkql_blocks.cpp
index 0533362eac..b14c5002d2 100644
--- a/ydb/library/yql/minikql/comp_nodes/mkql_blocks.cpp
+++ b/ydb/library/yql/minikql/comp_nodes/mkql_blocks.cpp
@@ -1,304 +1,304 @@
-#include "mkql_blocks.h"
-
+#include "mkql_blocks.h"
+
#include <ydb/library/yql/minikql/arrow/arrow_defs.h>
#include <ydb/library/yql/minikql/computation/mkql_computation_node_holders.h>
#include <ydb/library/yql/minikql/mkql_node_builder.h>
#include <ydb/library/yql/minikql/mkql_node_cast.h>
-
-#include <arrow/array/builder_primitive.h>
-#include <arrow/util/bitmap.h>
-#include <arrow/util/bit_util.h>
-
-#include <util/generic/size_literals.h>
-
-namespace NKikimr {
-namespace NMiniKQL {
-
-namespace {
-
-class TBlockBuilder {
-public:
- explicit TBlockBuilder(TComputationContext& ctx)
- : ItemType(arrow::uint64())
- , MaxLength_(MaxBlockSizeInBytes / TypeSize(*ItemType))
- , Ctx(ctx)
- , Builder(&Ctx.ArrowMemoryPool)
- {
- ARROW_OK(Builder.Reserve(MaxLength_));
- }
-
- void Add(NUdf::TUnboxedValue& value) {
- Y_VERIFY_DEBUG(Builder.length() < MaxLength_);
- if (value) {
- Builder.UnsafeAppend(value.Get<ui64>());
- } else {
- Builder.UnsafeAppendNull();
- }
- }
-
- inline size_t MaxLength() const noexcept {
- return MaxLength_;
- }
-
- NUdf::TUnboxedValuePod Build() {
- std::shared_ptr<arrow::ArrayData> result;
- ARROW_OK(Builder.FinishInternal(&result));
- return Ctx.HolderFactory.CreateArrowBlock(std::move(result));
- }
-
-private:
- static int64_t TypeSize(arrow::DataType& itemType) {
- const auto bits = static_cast<const arrow::FixedWidthType&>(itemType).bit_width();
- return arrow::BitUtil::BytesForBits(bits);
- }
-
-private:
- static constexpr size_t MaxBlockSizeInBytes = 1_MB;
-
-private:
- std::shared_ptr<arrow::DataType> ItemType;
- const size_t MaxLength_;
- TComputationContext& Ctx;
- arrow::UInt64Builder Builder;
-};
-
-class TToBlocksWrapper: public TStatelessFlowComputationNode<TToBlocksWrapper> {
-public:
- explicit TToBlocksWrapper(IComputationNode* flow)
- : TStatelessFlowComputationNode(flow, EValueRepresentation::Boxed)
- , Flow(flow)
- {
- }
-
- NUdf::TUnboxedValuePod DoCalculate(TComputationContext& ctx) const {
- auto builder = TBlockBuilder(ctx);
-
- for (size_t i = 0; i < builder.MaxLength(); ++i) {
- auto result = Flow->GetValue(ctx);
- if (result.IsFinish() || result.IsYield()) {
- if (i == 0) {
- return result.Release();
- }
- break;
- }
- builder.Add(result);
- }
-
- return builder.Build();
- }
-
-private:
- void RegisterDependencies() const final {
- FlowDependsOn(Flow);
- }
-
-private:
- IComputationNode* const Flow;
-};
-
-class TWideToBlocksWrapper : public TStatefulWideFlowComputationNode<TWideToBlocksWrapper> {
-public:
- TWideToBlocksWrapper(TComputationMutables& mutables,
- IComputationWideFlowNode* flow,
- size_t width)
- : TStatefulWideFlowComputationNode(mutables, flow, EValueRepresentation::Embedded)
- , Flow(flow)
- , Width(width)
- {
- Y_VERIFY_DEBUG(Width > 0);
- }
-
- EFetchResult DoCalculate(NUdf::TUnboxedValue& state,
- TComputationContext& ctx,
- NUdf::TUnboxedValue*const* output) const
- {
- auto builders = std::vector<TBlockBuilder>();
- builders.reserve(Width);
- for (size_t i = 0; i < Width; ++i) {
- builders.push_back(TBlockBuilder(ctx));
- }
- size_t maxLength = builders.front().MaxLength();
- for (size_t i = 1; i < Width; ++i) {
- maxLength = Min(maxLength, builders[i].MaxLength());
- }
-
- auto& s = GetState(state, ctx);
- for (size_t i = 0; i < maxLength; ++i) {
- if (const auto result = Flow->FetchValues(ctx, s.ValuePointers.data()); EFetchResult::One != result) {
- if (i == 0) {
- return result;
- }
- break;
- }
- for (size_t j = 0; j < Width; ++j) {
- if (output[j] != nullptr) {
- builders[j].Add(s.Values[j]);
- }
- }
- }
-
- for (size_t i = 0; i < Width; ++i) {
- if (auto* out = output[i]; out != nullptr) {
- *out = builders[i].Build();
- }
- }
-
- return EFetchResult::One;
- }
-
-private:
- struct TState: public TComputationValue<TState> {
- std::vector<NUdf::TUnboxedValue> Values;
- std::vector<NUdf::TUnboxedValue*> ValuePointers;
-
- TState(TMemoryUsageInfo* memInfo, size_t width)
- : TComputationValue(memInfo)
- , Values(width)
- , ValuePointers(width)
- {
- for (size_t i = 0; i < width; ++i) {
- ValuePointers[i] = &Values[i];
- }
- }
- };
-
-private:
- void RegisterDependencies() const final {
- FlowDependsOn(Flow);
- }
-
- TState& GetState(NUdf::TUnboxedValue& state, TComputationContext& ctx) const {
- if (!state.HasValue()) {
- state = ctx.HolderFactory.Create<TState>(Width);
- }
- return *static_cast<TState*>(state.AsBoxed().Get());
- }
-
-private:
- IComputationWideFlowNode* Flow;
- const size_t Width;
-};
-
-class TFromBlocksWrapper : public TMutableComputationNode<TFromBlocksWrapper> {
-public:
- TFromBlocksWrapper(TComputationMutables& mutables, IComputationNode* flow)
- : TMutableComputationNode(mutables)
- , Flow(flow)
- , StateIndex(mutables.CurValueIndex++)
- {
- }
-
- NUdf::TUnboxedValuePod DoCalculate(TComputationContext& ctx) const {
- auto& state = GetState(ctx);
-
- if (state.Array == nullptr || state.Index == state.Array->length) {
- auto result = Flow->GetValue(ctx);
- if (result.IsFinish()) {
- return NUdf::TUnboxedValue::MakeFinish();
- }
- if (result.IsYield()) {
- return NUdf::TUnboxedValue::MakeYield();
- }
- state.Array = TArrowBlock::From(result).GetDatum().array();
- state.Index = 0;
- }
-
- const auto result = state.GetValue();
- ++state.Index;
- return result;
- }
-
-private:
- struct TState: public TComputationValue<TState> {
- using TComputationValue::TComputationValue;
-
- NUdf::TUnboxedValuePod GetValue() const {
- const auto nullCount = Array->GetNullCount();
-
- return nullCount == Array->length || (nullCount > 0 && !HasValue())
- ? NUdf::TUnboxedValuePod()
- : DoGetValue();
- }
-
- private:
- NUdf::TUnboxedValuePod DoGetValue() const {
- return NUdf::TUnboxedValuePod(Array->GetValues<ui64>(1)[Index]);
- }
-
- bool HasValue() const {
- return arrow::BitUtil::GetBit(Array->GetValues<uint8_t>(0), Index + Array->offset);
- }
-
- public:
- std::shared_ptr<arrow::ArrayData> Array{nullptr};
- size_t Index{0};
- };
-
-private:
- void RegisterDependencies() const final {
- this->DependsOn(Flow);
- }
-
- TState& GetState(TComputationContext& ctx) const {
- auto& result = ctx.MutableValues[StateIndex];
- if (!result.HasValue()) {
- result = ctx.HolderFactory.Create<TState>();
- }
- return *static_cast<TState*>(result.AsBoxed().Get());
- }
-
-private:
- IComputationNode* const Flow;
- const ui32 StateIndex;
-};
-
-arrow::Datum ExtractLiteral(TRuntimeNode n) {
- if (n.GetStaticType()->IsOptional()) {
- const auto* dataLiteral = AS_VALUE(TOptionalLiteral, n);
- if (!dataLiteral->HasItem()) {
- return arrow::MakeNullScalar(arrow::uint64());
- }
- n = dataLiteral->GetItem();
- }
-
- const auto* dataLiteral = AS_VALUE(TDataLiteral, n);
- return arrow::Datum(static_cast<uint64_t>(dataLiteral->AsValue().Get<ui64>()));
-}
-
-}
-
-IComputationNode* WrapToBlocks(TCallable& callable, const TComputationNodeFactoryContext& ctx) {
- MKQL_ENSURE(callable.GetInputsCount() == 1, "Expected 1 args, got " << callable.GetInputsCount());
-
- return new TToBlocksWrapper(LocateNode(ctx.NodeLocator, callable, 0));
-}
-
-IComputationNode* WrapWideToBlocks(TCallable& callable, const TComputationNodeFactoryContext& ctx) {
- MKQL_ENSURE(callable.GetInputsCount() == 1, "Expected 1 args, got " << callable.GetInputsCount());
-
- const auto* flowType = AS_TYPE(TFlowType, callable.GetInput(0).GetStaticType());
- const auto* tupleType = AS_TYPE(TTupleType, flowType->GetItemType());
-
- auto* wideFlow = dynamic_cast<IComputationWideFlowNode*>(LocateNode(ctx.NodeLocator, callable, 0));
- MKQL_ENSURE(wideFlow != nullptr, "Expected wide flow node");
-
- return new TWideToBlocksWrapper(ctx.Mutables,
- wideFlow,
- tupleType->GetElementsCount());
-}
-
-IComputationNode* WrapFromBlocks(TCallable& callable, const TComputationNodeFactoryContext& ctx) {
- MKQL_ENSURE(callable.GetInputsCount() == 1, "Expected 1 args, got " << callable.GetInputsCount());
-
- return new TFromBlocksWrapper(ctx.Mutables, LocateNode(ctx.NodeLocator, callable, 0));
-}
-
-IComputationNode* WrapAsSingle(TCallable& callable, const TComputationNodeFactoryContext& ctx) {
- MKQL_ENSURE(callable.GetInputsCount() == 1, "Expected 1 args, got " << callable.GetInputsCount());
-
- auto value = ExtractLiteral(callable.GetInput(0U));
- return ctx.NodeFactory.CreateImmutableNode(ctx.HolderFactory.CreateArrowBlock(std::move(value)));
-}
-
-}
-}
+
+#include <arrow/array/builder_primitive.h>
+#include <arrow/util/bitmap.h>
+#include <arrow/util/bit_util.h>
+
+#include <util/generic/size_literals.h>
+
+namespace NKikimr {
+namespace NMiniKQL {
+
+namespace {
+
+class TBlockBuilder {
+public:
+ explicit TBlockBuilder(TComputationContext& ctx)
+ : ItemType(arrow::uint64())
+ , MaxLength_(MaxBlockSizeInBytes / TypeSize(*ItemType))
+ , Ctx(ctx)
+ , Builder(&Ctx.ArrowMemoryPool)
+ {
+ ARROW_OK(Builder.Reserve(MaxLength_));
+ }
+
+ void Add(NUdf::TUnboxedValue& value) {
+ Y_VERIFY_DEBUG(Builder.length() < MaxLength_);
+ if (value) {
+ Builder.UnsafeAppend(value.Get<ui64>());
+ } else {
+ Builder.UnsafeAppendNull();
+ }
+ }
+
+ inline size_t MaxLength() const noexcept {
+ return MaxLength_;
+ }
+
+ NUdf::TUnboxedValuePod Build() {
+ std::shared_ptr<arrow::ArrayData> result;
+ ARROW_OK(Builder.FinishInternal(&result));
+ return Ctx.HolderFactory.CreateArrowBlock(std::move(result));
+ }
+
+private:
+ static int64_t TypeSize(arrow::DataType& itemType) {
+ const auto bits = static_cast<const arrow::FixedWidthType&>(itemType).bit_width();
+ return arrow::BitUtil::BytesForBits(bits);
+ }
+
+private:
+ static constexpr size_t MaxBlockSizeInBytes = 1_MB;
+
+private:
+ std::shared_ptr<arrow::DataType> ItemType;
+ const size_t MaxLength_;
+ TComputationContext& Ctx;
+ arrow::UInt64Builder Builder;
+};
+
+class TToBlocksWrapper: public TStatelessFlowComputationNode<TToBlocksWrapper> {
+public:
+ explicit TToBlocksWrapper(IComputationNode* flow)
+ : TStatelessFlowComputationNode(flow, EValueRepresentation::Boxed)
+ , Flow(flow)
+ {
+ }
+
+ NUdf::TUnboxedValuePod DoCalculate(TComputationContext& ctx) const {
+ auto builder = TBlockBuilder(ctx);
+
+ for (size_t i = 0; i < builder.MaxLength(); ++i) {
+ auto result = Flow->GetValue(ctx);
+ if (result.IsFinish() || result.IsYield()) {
+ if (i == 0) {
+ return result.Release();
+ }
+ break;
+ }
+ builder.Add(result);
+ }
+
+ return builder.Build();
+ }
+
+private:
+ void RegisterDependencies() const final {
+ FlowDependsOn(Flow);
+ }
+
+private:
+ IComputationNode* const Flow;
+};
+
+class TWideToBlocksWrapper : public TStatefulWideFlowComputationNode<TWideToBlocksWrapper> {
+public:
+ TWideToBlocksWrapper(TComputationMutables& mutables,
+ IComputationWideFlowNode* flow,
+ size_t width)
+ : TStatefulWideFlowComputationNode(mutables, flow, EValueRepresentation::Embedded)
+ , Flow(flow)
+ , Width(width)
+ {
+ Y_VERIFY_DEBUG(Width > 0);
+ }
+
+ EFetchResult DoCalculate(NUdf::TUnboxedValue& state,
+ TComputationContext& ctx,
+ NUdf::TUnboxedValue*const* output) const
+ {
+ auto builders = std::vector<TBlockBuilder>();
+ builders.reserve(Width);
+ for (size_t i = 0; i < Width; ++i) {
+ builders.push_back(TBlockBuilder(ctx));
+ }
+ size_t maxLength = builders.front().MaxLength();
+ for (size_t i = 1; i < Width; ++i) {
+ maxLength = Min(maxLength, builders[i].MaxLength());
+ }
+
+ auto& s = GetState(state, ctx);
+ for (size_t i = 0; i < maxLength; ++i) {
+ if (const auto result = Flow->FetchValues(ctx, s.ValuePointers.data()); EFetchResult::One != result) {
+ if (i == 0) {
+ return result;
+ }
+ break;
+ }
+ for (size_t j = 0; j < Width; ++j) {
+ if (output[j] != nullptr) {
+ builders[j].Add(s.Values[j]);
+ }
+ }
+ }
+
+ for (size_t i = 0; i < Width; ++i) {
+ if (auto* out = output[i]; out != nullptr) {
+ *out = builders[i].Build();
+ }
+ }
+
+ return EFetchResult::One;
+ }
+
+private:
+ struct TState: public TComputationValue<TState> {
+ std::vector<NUdf::TUnboxedValue> Values;
+ std::vector<NUdf::TUnboxedValue*> ValuePointers;
+
+ TState(TMemoryUsageInfo* memInfo, size_t width)
+ : TComputationValue(memInfo)
+ , Values(width)
+ , ValuePointers(width)
+ {
+ for (size_t i = 0; i < width; ++i) {
+ ValuePointers[i] = &Values[i];
+ }
+ }
+ };
+
+private:
+ void RegisterDependencies() const final {
+ FlowDependsOn(Flow);
+ }
+
+ TState& GetState(NUdf::TUnboxedValue& state, TComputationContext& ctx) const {
+ if (!state.HasValue()) {
+ state = ctx.HolderFactory.Create<TState>(Width);
+ }
+ return *static_cast<TState*>(state.AsBoxed().Get());
+ }
+
+private:
+ IComputationWideFlowNode* Flow;
+ const size_t Width;
+};
+
+class TFromBlocksWrapper : public TMutableComputationNode<TFromBlocksWrapper> {
+public:
+ TFromBlocksWrapper(TComputationMutables& mutables, IComputationNode* flow)
+ : TMutableComputationNode(mutables)
+ , Flow(flow)
+ , StateIndex(mutables.CurValueIndex++)
+ {
+ }
+
+ NUdf::TUnboxedValuePod DoCalculate(TComputationContext& ctx) const {
+ auto& state = GetState(ctx);
+
+ if (state.Array == nullptr || state.Index == state.Array->length) {
+ auto result = Flow->GetValue(ctx);
+ if (result.IsFinish()) {
+ return NUdf::TUnboxedValue::MakeFinish();
+ }
+ if (result.IsYield()) {
+ return NUdf::TUnboxedValue::MakeYield();
+ }
+ state.Array = TArrowBlock::From(result).GetDatum().array();
+ state.Index = 0;
+ }
+
+ const auto result = state.GetValue();
+ ++state.Index;
+ return result;
+ }
+
+private:
+ struct TState: public TComputationValue<TState> {
+ using TComputationValue::TComputationValue;
+
+ NUdf::TUnboxedValuePod GetValue() const {
+ const auto nullCount = Array->GetNullCount();
+
+ return nullCount == Array->length || (nullCount > 0 && !HasValue())
+ ? NUdf::TUnboxedValuePod()
+ : DoGetValue();
+ }
+
+ private:
+ NUdf::TUnboxedValuePod DoGetValue() const {
+ return NUdf::TUnboxedValuePod(Array->GetValues<ui64>(1)[Index]);
+ }
+
+ bool HasValue() const {
+ return arrow::BitUtil::GetBit(Array->GetValues<uint8_t>(0), Index + Array->offset);
+ }
+
+ public:
+ std::shared_ptr<arrow::ArrayData> Array{nullptr};
+ size_t Index{0};
+ };
+
+private:
+ void RegisterDependencies() const final {
+ this->DependsOn(Flow);
+ }
+
+ TState& GetState(TComputationContext& ctx) const {
+ auto& result = ctx.MutableValues[StateIndex];
+ if (!result.HasValue()) {
+ result = ctx.HolderFactory.Create<TState>();
+ }
+ return *static_cast<TState*>(result.AsBoxed().Get());
+ }
+
+private:
+ IComputationNode* const Flow;
+ const ui32 StateIndex;
+};
+
+arrow::Datum ExtractLiteral(TRuntimeNode n) {
+ if (n.GetStaticType()->IsOptional()) {
+ const auto* dataLiteral = AS_VALUE(TOptionalLiteral, n);
+ if (!dataLiteral->HasItem()) {
+ return arrow::MakeNullScalar(arrow::uint64());
+ }
+ n = dataLiteral->GetItem();
+ }
+
+ const auto* dataLiteral = AS_VALUE(TDataLiteral, n);
+ return arrow::Datum(static_cast<uint64_t>(dataLiteral->AsValue().Get<ui64>()));
+}
+
+}
+
+IComputationNode* WrapToBlocks(TCallable& callable, const TComputationNodeFactoryContext& ctx) {
+ MKQL_ENSURE(callable.GetInputsCount() == 1, "Expected 1 args, got " << callable.GetInputsCount());
+
+ return new TToBlocksWrapper(LocateNode(ctx.NodeLocator, callable, 0));
+}
+
+IComputationNode* WrapWideToBlocks(TCallable& callable, const TComputationNodeFactoryContext& ctx) {
+ MKQL_ENSURE(callable.GetInputsCount() == 1, "Expected 1 args, got " << callable.GetInputsCount());
+
+ const auto* flowType = AS_TYPE(TFlowType, callable.GetInput(0).GetStaticType());
+ const auto* tupleType = AS_TYPE(TTupleType, flowType->GetItemType());
+
+ auto* wideFlow = dynamic_cast<IComputationWideFlowNode*>(LocateNode(ctx.NodeLocator, callable, 0));
+ MKQL_ENSURE(wideFlow != nullptr, "Expected wide flow node");
+
+ return new TWideToBlocksWrapper(ctx.Mutables,
+ wideFlow,
+ tupleType->GetElementsCount());
+}
+
+IComputationNode* WrapFromBlocks(TCallable& callable, const TComputationNodeFactoryContext& ctx) {
+ MKQL_ENSURE(callable.GetInputsCount() == 1, "Expected 1 args, got " << callable.GetInputsCount());
+
+ return new TFromBlocksWrapper(ctx.Mutables, LocateNode(ctx.NodeLocator, callable, 0));
+}
+
+IComputationNode* WrapAsSingle(TCallable& callable, const TComputationNodeFactoryContext& ctx) {
+ MKQL_ENSURE(callable.GetInputsCount() == 1, "Expected 1 args, got " << callable.GetInputsCount());
+
+ auto value = ExtractLiteral(callable.GetInput(0U));
+ return ctx.NodeFactory.CreateImmutableNode(ctx.HolderFactory.CreateArrowBlock(std::move(value)));
+}
+
+}
+}
diff --git a/ydb/library/yql/minikql/comp_nodes/mkql_blocks.h b/ydb/library/yql/minikql/comp_nodes/mkql_blocks.h
index d0dae21cf7..c13dd5339a 100644
--- a/ydb/library/yql/minikql/comp_nodes/mkql_blocks.h
+++ b/ydb/library/yql/minikql/comp_nodes/mkql_blocks.h
@@ -1,14 +1,14 @@
-#pragma once
-
+#pragma once
+
#include <ydb/library/yql/minikql/computation/mkql_computation_node.h>
-
-namespace NKikimr {
-namespace NMiniKQL {
-
-IComputationNode* WrapToBlocks(TCallable& callable, const TComputationNodeFactoryContext& ctx);
-IComputationNode* WrapWideToBlocks(TCallable& callable, const TComputationNodeFactoryContext& ctx);
-IComputationNode* WrapFromBlocks(TCallable& callable, const TComputationNodeFactoryContext& ctx);
-IComputationNode* WrapAsSingle(TCallable& callable, const TComputationNodeFactoryContext& ctx);
-
-}
-}
+
+namespace NKikimr {
+namespace NMiniKQL {
+
+IComputationNode* WrapToBlocks(TCallable& callable, const TComputationNodeFactoryContext& ctx);
+IComputationNode* WrapWideToBlocks(TCallable& callable, const TComputationNodeFactoryContext& ctx);
+IComputationNode* WrapFromBlocks(TCallable& callable, const TComputationNodeFactoryContext& ctx);
+IComputationNode* WrapAsSingle(TCallable& callable, const TComputationNodeFactoryContext& ctx);
+
+}
+}
diff --git a/ydb/library/yql/minikql/comp_nodes/mkql_factory.cpp b/ydb/library/yql/minikql/comp_nodes/mkql_factory.cpp
index f142ac5ff4..57599acb64 100644
--- a/ydb/library/yql/minikql/comp_nodes/mkql_factory.cpp
+++ b/ydb/library/yql/minikql/comp_nodes/mkql_factory.cpp
@@ -4,8 +4,8 @@
#include "mkql_aggrcount.h"
#include "mkql_append.h"
#include "mkql_apply.h"
-#include "mkql_block_add.h"
-#include "mkql_blocks.h"
+#include "mkql_block_add.h"
+#include "mkql_blocks.h"
#include "mkql_callable.h"
#include "mkql_chain_map.h"
#include "mkql_chain1_map.h"
@@ -257,11 +257,11 @@ struct TCallableComputationNodeBuilderFuncMapFiller {
{"DecimalMul", &WrapDecimalMul},
{"ToFlow", &WrapToFlow},
{"FromFlow", &WrapFromFlow},
- {"ToBlocks", &WrapToBlocks},
- {"WideToBlocks", &WrapWideToBlocks},
- {"BlockAdd", &WrapBlockAdd},
- {"FromBlocks", &WrapFromBlocks},
- {"AsSingle", &WrapAsSingle},
+ {"ToBlocks", &WrapToBlocks},
+ {"WideToBlocks", &WrapWideToBlocks},
+ {"BlockAdd", &WrapBlockAdd},
+ {"FromBlocks", &WrapFromBlocks},
+ {"AsSingle", &WrapAsSingle},
{"MakeHeap", &WrapMakeHeap},
{"PushHeap", &WrapPushHeap},
{"PopHeap", &WrapPopHeap},
diff --git a/ydb/library/yql/minikql/comp_nodes/ut/mkql_blocks_ut.cpp b/ydb/library/yql/minikql/comp_nodes/ut/mkql_blocks_ut.cpp
index 7be7f89d2f..2fd575f239 100644
--- a/ydb/library/yql/minikql/comp_nodes/ut/mkql_blocks_ut.cpp
+++ b/ydb/library/yql/minikql/comp_nodes/ut/mkql_blocks_ut.cpp
@@ -1,323 +1,323 @@
-#include "mkql_computation_node_ut.h"
-
+#include "mkql_computation_node_ut.h"
+
#include <ydb/library/yql/minikql/computation/mkql_computation_node_holders.h>
-
-namespace NKikimr {
-namespace NMiniKQL {
-Y_UNIT_TEST_SUITE(TMiniKQLBlocksTest) {
-Y_UNIT_TEST(TestEmpty) {
- TSetup<false> setup;
- auto& pb = *setup.PgmBuilder;
-
- const auto type = pb.NewDataType(NUdf::TDataType<ui64>::Id);
- const auto list = pb.NewEmptyList(type);
- const auto sourceFlow = pb.ToFlow(list);
- const auto flowAfterBlocks = pb.FromBlocks(pb.ToBlocks(sourceFlow));
- const auto pgmReturn = pb.ForwardList(flowAfterBlocks);
-
- const auto graph = setup.BuildGraph(pgmReturn);
- const auto iterator = graph->GetValue().GetListIterator();
- NUdf::TUnboxedValue item;
- UNIT_ASSERT(!iterator.Next(item));
-}
-
-Y_UNIT_TEST(TestSimple) {
- static const size_t dataCount = 1000;
- TSetup<false> setup;
- auto& pb = *setup.PgmBuilder;
-
- auto data = TVector<TRuntimeNode>(Reserve(dataCount));
- for (size_t i = 0; i < dataCount; ++i) {
- data.push_back(pb.NewDataLiteral<ui64>(i));
- }
- const auto type = pb.NewDataType(NUdf::TDataType<ui64>::Id);
- const auto list = pb.NewList(type, data);
- const auto sourceFlow = pb.ToFlow(list);
- const auto flowAfterBlocks = pb.FromBlocks(pb.ToBlocks(sourceFlow));
- const auto pgmReturn = pb.ForwardList(flowAfterBlocks);
-
- const auto graph = setup.BuildGraph(pgmReturn);
- const auto iterator = graph->GetValue().GetListIterator();
-
- for (size_t i = 0; i < dataCount; ++i) {
- NUdf::TUnboxedValue item;
- UNIT_ASSERT(iterator.Next(item));
- UNIT_ASSERT_VALUES_EQUAL(item.Get<ui64>(), i);
- }
- NUdf::TUnboxedValue item;
- UNIT_ASSERT(!iterator.Next(item));
-}
-
-Y_UNIT_TEST(TestWideToBlocks) {
- TSetup<false> setup;
- auto& pb = *setup.PgmBuilder;
-
- const auto ui64Type = pb.NewDataType(NUdf::TDataType<ui64>::Id);
- const auto tupleType = pb.NewTupleType({ui64Type, ui64Type});
-
- const auto data1 = pb.NewTuple(tupleType, {pb.NewDataLiteral<ui64>(1), pb.NewDataLiteral<ui64>(10)});
- const auto data2 = pb.NewTuple(tupleType, {pb.NewDataLiteral<ui64>(2), pb.NewDataLiteral<ui64>(20)});
- const auto data3 = pb.NewTuple(tupleType, {pb.NewDataLiteral<ui64>(3), pb.NewDataLiteral<ui64>(30)});
-
- const auto list = pb.NewList(tupleType, {data1, data2, data3});
- const auto flow = pb.ToFlow(list);
-
- const auto wideFlow = pb.ExpandMap(flow, [&](TRuntimeNode item) -> TRuntimeNode::TList {
- return {pb.Nth(item, 0U), pb.Nth(item, 1U)};
- });
- const auto wideBlocksFlow = pb.WideToBlocks(wideFlow);
- const auto narrowBlocksFlow = pb.NarrowMap(wideBlocksFlow, [&](TRuntimeNode::TList items) -> TRuntimeNode {
- return items[1];
- });
- const auto narrowFlow = pb.FromBlocks(narrowBlocksFlow);
- const auto pgmReturn = pb.ForwardList(narrowFlow);
-
- const auto graph = setup.BuildGraph(pgmReturn);
- const auto iterator = graph->GetValue().GetListIterator();
-
- NUdf::TUnboxedValue item;
- UNIT_ASSERT(iterator.Next(item));
- UNIT_ASSERT_VALUES_EQUAL(item.Get<ui64>(), 10);
-
- UNIT_ASSERT(iterator.Next(item));
- UNIT_ASSERT_VALUES_EQUAL(item.Get<ui64>(), 20);
-
- UNIT_ASSERT(iterator.Next(item));
- UNIT_ASSERT_VALUES_EQUAL(item.Get<ui64>(), 30);
-}
-
-Y_UNIT_TEST(TestSingle) {
- const ui64 testValue = 42;
-
- TSetup<false> setup;
- auto& pb = *setup.PgmBuilder;
-
- auto dataLiteral = pb.NewDataLiteral<ui64>(testValue);
- const auto dataAfterBlocks = pb.AsSingle(dataLiteral);
-
- const auto graph = setup.BuildGraph(dataAfterBlocks);
- const auto value = graph->GetValue();
- UNIT_ASSERT(value.HasValue() && value.IsBoxed());
- UNIT_ASSERT_VALUES_EQUAL(TArrowBlock::From(value).GetDatum().scalar_as<arrow::UInt64Scalar>().value, testValue);
-}
-
-Y_UNIT_TEST(TestBlockAdd) {
- TSetup<false> setup;
- TProgramBuilder& pb = *setup.PgmBuilder;
-
- const auto ui64Type = pb.NewDataType(NUdf::TDataType<ui64>::Id);
- const auto tupleType = pb.NewTupleType({ui64Type, ui64Type});
-
- const auto data1 = pb.NewTuple(tupleType, {pb.NewDataLiteral<ui64>(1), pb.NewDataLiteral<ui64>(10)});
- const auto data2 = pb.NewTuple(tupleType, {pb.NewDataLiteral<ui64>(2), pb.NewDataLiteral<ui64>(20)});
- const auto data3 = pb.NewTuple(tupleType, {pb.NewDataLiteral<ui64>(3), pb.NewDataLiteral<ui64>(30)});
-
- const auto list = pb.NewList(tupleType, {data1, data2, data3});
- const auto flow = pb.ToFlow(list);
-
- const auto wideFlow = pb.ExpandMap(flow, [&](TRuntimeNode item) -> TRuntimeNode::TList {
- return {pb.Nth(item, 0U), pb.Nth(item, 1U)};
- });
- const auto wideBlocksFlow = pb.WideToBlocks(wideFlow);
- const auto sumWideFlow = pb.WideMap(wideBlocksFlow, [&](TRuntimeNode::TList items) -> TRuntimeNode::TList {
- return {pb.BlockAdd(items[0], items[1])};
- });
- const auto sumNarrowFlow = pb.NarrowMap(sumWideFlow, [&](TRuntimeNode::TList items) -> TRuntimeNode {
- return items[0];
- });
- const auto pgmReturn = pb.Collect(pb.FromBlocks(sumNarrowFlow));
-
- const auto graph = setup.BuildGraph(pgmReturn);
- const auto iterator = graph->GetValue().GetListIterator();
-
- NUdf::TUnboxedValue item;
- UNIT_ASSERT(iterator.Next(item));
- UNIT_ASSERT_VALUES_EQUAL(item.Get<ui64>(), 11);
-
- UNIT_ASSERT(iterator.Next(item));
- UNIT_ASSERT_VALUES_EQUAL(item.Get<ui64>(), 22);
-
- UNIT_ASSERT(iterator.Next(item));
- UNIT_ASSERT_VALUES_EQUAL(item.Get<ui64>(), 33);
- UNIT_ASSERT(!iterator.Next(item));
- UNIT_ASSERT(!iterator.Next(item));
-}
-
-Y_UNIT_TEST(TestBlockAddWithNullables) {
- TSetup<false> setup;
- TProgramBuilder& pb = *setup.PgmBuilder;
-
- const auto optionalUi64Type = pb.NewDataType(NUdf::TDataType<ui64>::Id, true);
- const auto tupleType = pb.NewTupleType({optionalUi64Type, optionalUi64Type});
- const auto emptyOptionalUi64 = pb.NewEmptyOptional(optionalUi64Type);
-
- const auto data1 = pb.NewTuple(tupleType, {
- pb.NewOptional(pb.NewDataLiteral<ui64>(1)),
- emptyOptionalUi64
- });
- const auto data2 = pb.NewTuple(tupleType, {
- emptyOptionalUi64,
- pb.NewOptional(pb.NewDataLiteral<ui64>(20))
- });
- const auto data3 = pb.NewTuple(tupleType, {
- emptyOptionalUi64,
- emptyOptionalUi64
- });
- const auto data4 = pb.NewTuple(tupleType, {
- pb.NewOptional(pb.NewDataLiteral<ui64>(10)),
- pb.NewOptional(pb.NewDataLiteral<ui64>(20))
- });
-
- const auto list = pb.NewList(tupleType, {data1, data2, data3, data4});
- const auto flow = pb.ToFlow(list);
-
- const auto wideFlow = pb.ExpandMap(flow, [&](TRuntimeNode item) -> TRuntimeNode::TList {
- return {pb.Nth(item, 0U), pb.Nth(item, 1U)};
- });
- const auto wideBlocksFlow = pb.WideToBlocks(wideFlow);
- const auto sumWideFlow = pb.WideMap(wideBlocksFlow, [&](TRuntimeNode::TList items) -> TRuntimeNode::TList {
- return {pb.BlockAdd(items[0], items[1])};
- });
- const auto sumNarrowFlow = pb.NarrowMap(sumWideFlow, [&](TRuntimeNode::TList items) -> TRuntimeNode {
- return items[0];
- });
- const auto pgmReturn = pb.Collect(pb.FromBlocks(sumNarrowFlow));
-
- const auto graph = setup.BuildGraph(pgmReturn);
- const auto iterator = graph->GetValue().GetListIterator();
-
- NUdf::TUnboxedValue item;
- UNIT_ASSERT(iterator.Next(item));
- UNIT_ASSERT(!item);
-
- UNIT_ASSERT(iterator.Next(item));
- UNIT_ASSERT(!item);
-
- UNIT_ASSERT(iterator.Next(item));
- UNIT_ASSERT(!item);
-
- UNIT_ASSERT(iterator.Next(item));
- UNIT_ASSERT_VALUES_EQUAL(item.Get<ui64>(), 30);
- UNIT_ASSERT(!iterator.Next(item));
-}
-
-Y_UNIT_TEST(TestBlockAddWithNullableSingle) {
- TSetup<false> setup;
- TProgramBuilder& pb = *setup.PgmBuilder;
-
- const auto optionalUi64Type = pb.NewDataType(NUdf::TDataType<ui64>::Id, true);
- const auto emptyOptionalUi64 = pb.NewEmptyOptional(optionalUi64Type);
-
- const auto list = pb.NewList(optionalUi64Type, {
- pb.NewOptional(pb.NewDataLiteral<ui64>(10)),
- pb.NewOptional(pb.NewDataLiteral<ui64>(20)),
- pb.NewOptional(pb.NewDataLiteral<ui64>(30))
- });
- const auto flow = pb.ToFlow(list);
- const auto blocksFlow = pb.ToBlocks(flow);
-
- THolder<IComputationGraph> graph;
- auto map = [&](const TProgramBuilder::TUnaryLambda& func) {
- const auto pgmReturn = pb.Collect(pb.FromBlocks(pb.Map(blocksFlow, func)));
- graph = setup.BuildGraph(pgmReturn);
- return graph->GetValue().GetListIterator();
- };
-
- {
- const auto single = pb.AsSingle(emptyOptionalUi64);
- auto iterator = map([&](TRuntimeNode item) -> TRuntimeNode {
- return {pb.BlockAdd(single, item)};
- });
-
- NUdf::TUnboxedValue item;
- UNIT_ASSERT(iterator.Next(item));
- UNIT_ASSERT(!item);
-
- UNIT_ASSERT(iterator.Next(item));
- UNIT_ASSERT(!item);
-
- UNIT_ASSERT(iterator.Next(item));
- UNIT_ASSERT(!item);
-
- UNIT_ASSERT(!iterator.Next(item));
- }
-
- {
- const auto single = pb.AsSingle(emptyOptionalUi64);
- auto iterator = map([&](TRuntimeNode item) -> TRuntimeNode {
- return {pb.BlockAdd(item, single)};
- });
-
- NUdf::TUnboxedValue item;
- UNIT_ASSERT(iterator.Next(item));
- UNIT_ASSERT(!item);
-
- UNIT_ASSERT(iterator.Next(item));
- UNIT_ASSERT(!item);
-
- UNIT_ASSERT(iterator.Next(item));
- UNIT_ASSERT(!item);
-
- UNIT_ASSERT(!iterator.Next(item));
- }
-
- {
- const auto single = pb.AsSingle(pb.NewDataLiteral<ui64>(100));
- auto iterator = map([&](TRuntimeNode item) -> TRuntimeNode {
- return {pb.BlockAdd(item, single)};
- });
-
- NUdf::TUnboxedValue item;
- UNIT_ASSERT(iterator.Next(item));
- UNIT_ASSERT_VALUES_EQUAL(item.Get<ui64>(), 110);
-
- UNIT_ASSERT(iterator.Next(item));
- UNIT_ASSERT_VALUES_EQUAL(item.Get<ui64>(), 120);
-
- UNIT_ASSERT(iterator.Next(item));
- UNIT_ASSERT_VALUES_EQUAL(item.Get<ui64>(), 130);
-
- UNIT_ASSERT(!iterator.Next(item));
- }
-}
-
-Y_UNIT_TEST(TestBlockAddWithSingle) {
- TSetup<false> setup;
- TProgramBuilder& pb = *setup.PgmBuilder;
-
- const auto ui64Type = pb.NewDataType(NUdf::TDataType<ui64>::Id);
-
- const auto data1 = pb.NewDataLiteral<ui64>(10);
- const auto data2 = pb.NewDataLiteral<ui64>(20);
- const auto data3 = pb.NewDataLiteral<ui64>(30);
- const auto rightSingle = pb.AsSingle(pb.NewDataLiteral<ui64>(100));
- const auto leftSingle = pb.AsSingle(pb.NewDataLiteral<ui64>(1000));
-
- const auto list = pb.NewList(ui64Type, {data1, data2, data3});
- const auto flow = pb.ToFlow(list);
- const auto blocksFlow = pb.ToBlocks(flow);
- const auto sumBlocksFlow = pb.Map(blocksFlow, [&](TRuntimeNode item) -> TRuntimeNode {
- return {pb.BlockAdd(leftSingle, {pb.BlockAdd(item, rightSingle )})};
- });
- const auto pgmReturn = pb.Collect(pb.FromBlocks(sumBlocksFlow));
-
- const auto graph = setup.BuildGraph(pgmReturn);
- const auto iterator = graph->GetValue().GetListIterator();
-
- NUdf::TUnboxedValue item;
- UNIT_ASSERT(iterator.Next(item));
- UNIT_ASSERT_VALUES_EQUAL(item.Get<ui64>(), 1110);
-
- UNIT_ASSERT(iterator.Next(item));
- UNIT_ASSERT_VALUES_EQUAL(item.Get<ui64>(), 1120);
-
- UNIT_ASSERT(iterator.Next(item));
- UNIT_ASSERT_VALUES_EQUAL(item.Get<ui64>(), 1130);
- UNIT_ASSERT(!iterator.Next(item));
- UNIT_ASSERT(!iterator.Next(item));
-}
-
-}
-
-}
-}
+
+namespace NKikimr {
+namespace NMiniKQL {
+Y_UNIT_TEST_SUITE(TMiniKQLBlocksTest) {
+Y_UNIT_TEST(TestEmpty) {
+ TSetup<false> setup;
+ auto& pb = *setup.PgmBuilder;
+
+ const auto type = pb.NewDataType(NUdf::TDataType<ui64>::Id);
+ const auto list = pb.NewEmptyList(type);
+ const auto sourceFlow = pb.ToFlow(list);
+ const auto flowAfterBlocks = pb.FromBlocks(pb.ToBlocks(sourceFlow));
+ const auto pgmReturn = pb.ForwardList(flowAfterBlocks);
+
+ const auto graph = setup.BuildGraph(pgmReturn);
+ const auto iterator = graph->GetValue().GetListIterator();
+ NUdf::TUnboxedValue item;
+ UNIT_ASSERT(!iterator.Next(item));
+}
+
+Y_UNIT_TEST(TestSimple) {
+ static const size_t dataCount = 1000;
+ TSetup<false> setup;
+ auto& pb = *setup.PgmBuilder;
+
+ auto data = TVector<TRuntimeNode>(Reserve(dataCount));
+ for (size_t i = 0; i < dataCount; ++i) {
+ data.push_back(pb.NewDataLiteral<ui64>(i));
+ }
+ const auto type = pb.NewDataType(NUdf::TDataType<ui64>::Id);
+ const auto list = pb.NewList(type, data);
+ const auto sourceFlow = pb.ToFlow(list);
+ const auto flowAfterBlocks = pb.FromBlocks(pb.ToBlocks(sourceFlow));
+ const auto pgmReturn = pb.ForwardList(flowAfterBlocks);
+
+ const auto graph = setup.BuildGraph(pgmReturn);
+ const auto iterator = graph->GetValue().GetListIterator();
+
+ for (size_t i = 0; i < dataCount; ++i) {
+ NUdf::TUnboxedValue item;
+ UNIT_ASSERT(iterator.Next(item));
+ UNIT_ASSERT_VALUES_EQUAL(item.Get<ui64>(), i);
+ }
+ NUdf::TUnboxedValue item;
+ UNIT_ASSERT(!iterator.Next(item));
+}
+
+Y_UNIT_TEST(TestWideToBlocks) {
+ TSetup<false> setup;
+ auto& pb = *setup.PgmBuilder;
+
+ const auto ui64Type = pb.NewDataType(NUdf::TDataType<ui64>::Id);
+ const auto tupleType = pb.NewTupleType({ui64Type, ui64Type});
+
+ const auto data1 = pb.NewTuple(tupleType, {pb.NewDataLiteral<ui64>(1), pb.NewDataLiteral<ui64>(10)});
+ const auto data2 = pb.NewTuple(tupleType, {pb.NewDataLiteral<ui64>(2), pb.NewDataLiteral<ui64>(20)});
+ const auto data3 = pb.NewTuple(tupleType, {pb.NewDataLiteral<ui64>(3), pb.NewDataLiteral<ui64>(30)});
+
+ const auto list = pb.NewList(tupleType, {data1, data2, data3});
+ const auto flow = pb.ToFlow(list);
+
+ const auto wideFlow = pb.ExpandMap(flow, [&](TRuntimeNode item) -> TRuntimeNode::TList {
+ return {pb.Nth(item, 0U), pb.Nth(item, 1U)};
+ });
+ const auto wideBlocksFlow = pb.WideToBlocks(wideFlow);
+ const auto narrowBlocksFlow = pb.NarrowMap(wideBlocksFlow, [&](TRuntimeNode::TList items) -> TRuntimeNode {
+ return items[1];
+ });
+ const auto narrowFlow = pb.FromBlocks(narrowBlocksFlow);
+ const auto pgmReturn = pb.ForwardList(narrowFlow);
+
+ const auto graph = setup.BuildGraph(pgmReturn);
+ const auto iterator = graph->GetValue().GetListIterator();
+
+ NUdf::TUnboxedValue item;
+ UNIT_ASSERT(iterator.Next(item));
+ UNIT_ASSERT_VALUES_EQUAL(item.Get<ui64>(), 10);
+
+ UNIT_ASSERT(iterator.Next(item));
+ UNIT_ASSERT_VALUES_EQUAL(item.Get<ui64>(), 20);
+
+ UNIT_ASSERT(iterator.Next(item));
+ UNIT_ASSERT_VALUES_EQUAL(item.Get<ui64>(), 30);
+}
+
+Y_UNIT_TEST(TestSingle) {
+ const ui64 testValue = 42;
+
+ TSetup<false> setup;
+ auto& pb = *setup.PgmBuilder;
+
+ auto dataLiteral = pb.NewDataLiteral<ui64>(testValue);
+ const auto dataAfterBlocks = pb.AsSingle(dataLiteral);
+
+ const auto graph = setup.BuildGraph(dataAfterBlocks);
+ const auto value = graph->GetValue();
+ UNIT_ASSERT(value.HasValue() && value.IsBoxed());
+ UNIT_ASSERT_VALUES_EQUAL(TArrowBlock::From(value).GetDatum().scalar_as<arrow::UInt64Scalar>().value, testValue);
+}
+
+Y_UNIT_TEST(TestBlockAdd) {
+ TSetup<false> setup;
+ TProgramBuilder& pb = *setup.PgmBuilder;
+
+ const auto ui64Type = pb.NewDataType(NUdf::TDataType<ui64>::Id);
+ const auto tupleType = pb.NewTupleType({ui64Type, ui64Type});
+
+ const auto data1 = pb.NewTuple(tupleType, {pb.NewDataLiteral<ui64>(1), pb.NewDataLiteral<ui64>(10)});
+ const auto data2 = pb.NewTuple(tupleType, {pb.NewDataLiteral<ui64>(2), pb.NewDataLiteral<ui64>(20)});
+ const auto data3 = pb.NewTuple(tupleType, {pb.NewDataLiteral<ui64>(3), pb.NewDataLiteral<ui64>(30)});
+
+ const auto list = pb.NewList(tupleType, {data1, data2, data3});
+ const auto flow = pb.ToFlow(list);
+
+ const auto wideFlow = pb.ExpandMap(flow, [&](TRuntimeNode item) -> TRuntimeNode::TList {
+ return {pb.Nth(item, 0U), pb.Nth(item, 1U)};
+ });
+ const auto wideBlocksFlow = pb.WideToBlocks(wideFlow);
+ const auto sumWideFlow = pb.WideMap(wideBlocksFlow, [&](TRuntimeNode::TList items) -> TRuntimeNode::TList {
+ return {pb.BlockAdd(items[0], items[1])};
+ });
+ const auto sumNarrowFlow = pb.NarrowMap(sumWideFlow, [&](TRuntimeNode::TList items) -> TRuntimeNode {
+ return items[0];
+ });
+ const auto pgmReturn = pb.Collect(pb.FromBlocks(sumNarrowFlow));
+
+ const auto graph = setup.BuildGraph(pgmReturn);
+ const auto iterator = graph->GetValue().GetListIterator();
+
+ NUdf::TUnboxedValue item;
+ UNIT_ASSERT(iterator.Next(item));
+ UNIT_ASSERT_VALUES_EQUAL(item.Get<ui64>(), 11);
+
+ UNIT_ASSERT(iterator.Next(item));
+ UNIT_ASSERT_VALUES_EQUAL(item.Get<ui64>(), 22);
+
+ UNIT_ASSERT(iterator.Next(item));
+ UNIT_ASSERT_VALUES_EQUAL(item.Get<ui64>(), 33);
+ UNIT_ASSERT(!iterator.Next(item));
+ UNIT_ASSERT(!iterator.Next(item));
+}
+
+Y_UNIT_TEST(TestBlockAddWithNullables) {
+ TSetup<false> setup;
+ TProgramBuilder& pb = *setup.PgmBuilder;
+
+ const auto optionalUi64Type = pb.NewDataType(NUdf::TDataType<ui64>::Id, true);
+ const auto tupleType = pb.NewTupleType({optionalUi64Type, optionalUi64Type});
+ const auto emptyOptionalUi64 = pb.NewEmptyOptional(optionalUi64Type);
+
+ const auto data1 = pb.NewTuple(tupleType, {
+ pb.NewOptional(pb.NewDataLiteral<ui64>(1)),
+ emptyOptionalUi64
+ });
+ const auto data2 = pb.NewTuple(tupleType, {
+ emptyOptionalUi64,
+ pb.NewOptional(pb.NewDataLiteral<ui64>(20))
+ });
+ const auto data3 = pb.NewTuple(tupleType, {
+ emptyOptionalUi64,
+ emptyOptionalUi64
+ });
+ const auto data4 = pb.NewTuple(tupleType, {
+ pb.NewOptional(pb.NewDataLiteral<ui64>(10)),
+ pb.NewOptional(pb.NewDataLiteral<ui64>(20))
+ });
+
+ const auto list = pb.NewList(tupleType, {data1, data2, data3, data4});
+ const auto flow = pb.ToFlow(list);
+
+ const auto wideFlow = pb.ExpandMap(flow, [&](TRuntimeNode item) -> TRuntimeNode::TList {
+ return {pb.Nth(item, 0U), pb.Nth(item, 1U)};
+ });
+ const auto wideBlocksFlow = pb.WideToBlocks(wideFlow);
+ const auto sumWideFlow = pb.WideMap(wideBlocksFlow, [&](TRuntimeNode::TList items) -> TRuntimeNode::TList {
+ return {pb.BlockAdd(items[0], items[1])};
+ });
+ const auto sumNarrowFlow = pb.NarrowMap(sumWideFlow, [&](TRuntimeNode::TList items) -> TRuntimeNode {
+ return items[0];
+ });
+ const auto pgmReturn = pb.Collect(pb.FromBlocks(sumNarrowFlow));
+
+ const auto graph = setup.BuildGraph(pgmReturn);
+ const auto iterator = graph->GetValue().GetListIterator();
+
+ NUdf::TUnboxedValue item;
+ UNIT_ASSERT(iterator.Next(item));
+ UNIT_ASSERT(!item);
+
+ UNIT_ASSERT(iterator.Next(item));
+ UNIT_ASSERT(!item);
+
+ UNIT_ASSERT(iterator.Next(item));
+ UNIT_ASSERT(!item);
+
+ UNIT_ASSERT(iterator.Next(item));
+ UNIT_ASSERT_VALUES_EQUAL(item.Get<ui64>(), 30);
+ UNIT_ASSERT(!iterator.Next(item));
+}
+
+Y_UNIT_TEST(TestBlockAddWithNullableSingle) {
+ TSetup<false> setup;
+ TProgramBuilder& pb = *setup.PgmBuilder;
+
+ const auto optionalUi64Type = pb.NewDataType(NUdf::TDataType<ui64>::Id, true);
+ const auto emptyOptionalUi64 = pb.NewEmptyOptional(optionalUi64Type);
+
+ const auto list = pb.NewList(optionalUi64Type, {
+ pb.NewOptional(pb.NewDataLiteral<ui64>(10)),
+ pb.NewOptional(pb.NewDataLiteral<ui64>(20)),
+ pb.NewOptional(pb.NewDataLiteral<ui64>(30))
+ });
+ const auto flow = pb.ToFlow(list);
+ const auto blocksFlow = pb.ToBlocks(flow);
+
+ THolder<IComputationGraph> graph;
+ auto map = [&](const TProgramBuilder::TUnaryLambda& func) {
+ const auto pgmReturn = pb.Collect(pb.FromBlocks(pb.Map(blocksFlow, func)));
+ graph = setup.BuildGraph(pgmReturn);
+ return graph->GetValue().GetListIterator();
+ };
+
+ {
+ const auto single = pb.AsSingle(emptyOptionalUi64);
+ auto iterator = map([&](TRuntimeNode item) -> TRuntimeNode {
+ return {pb.BlockAdd(single, item)};
+ });
+
+ NUdf::TUnboxedValue item;
+ UNIT_ASSERT(iterator.Next(item));
+ UNIT_ASSERT(!item);
+
+ UNIT_ASSERT(iterator.Next(item));
+ UNIT_ASSERT(!item);
+
+ UNIT_ASSERT(iterator.Next(item));
+ UNIT_ASSERT(!item);
+
+ UNIT_ASSERT(!iterator.Next(item));
+ }
+
+ {
+ const auto single = pb.AsSingle(emptyOptionalUi64);
+ auto iterator = map([&](TRuntimeNode item) -> TRuntimeNode {
+ return {pb.BlockAdd(item, single)};
+ });
+
+ NUdf::TUnboxedValue item;
+ UNIT_ASSERT(iterator.Next(item));
+ UNIT_ASSERT(!item);
+
+ UNIT_ASSERT(iterator.Next(item));
+ UNIT_ASSERT(!item);
+
+ UNIT_ASSERT(iterator.Next(item));
+ UNIT_ASSERT(!item);
+
+ UNIT_ASSERT(!iterator.Next(item));
+ }
+
+ {
+ const auto single = pb.AsSingle(pb.NewDataLiteral<ui64>(100));
+ auto iterator = map([&](TRuntimeNode item) -> TRuntimeNode {
+ return {pb.BlockAdd(item, single)};
+ });
+
+ NUdf::TUnboxedValue item;
+ UNIT_ASSERT(iterator.Next(item));
+ UNIT_ASSERT_VALUES_EQUAL(item.Get<ui64>(), 110);
+
+ UNIT_ASSERT(iterator.Next(item));
+ UNIT_ASSERT_VALUES_EQUAL(item.Get<ui64>(), 120);
+
+ UNIT_ASSERT(iterator.Next(item));
+ UNIT_ASSERT_VALUES_EQUAL(item.Get<ui64>(), 130);
+
+ UNIT_ASSERT(!iterator.Next(item));
+ }
+}
+
+Y_UNIT_TEST(TestBlockAddWithSingle) {
+ TSetup<false> setup;
+ TProgramBuilder& pb = *setup.PgmBuilder;
+
+ const auto ui64Type = pb.NewDataType(NUdf::TDataType<ui64>::Id);
+
+ const auto data1 = pb.NewDataLiteral<ui64>(10);
+ const auto data2 = pb.NewDataLiteral<ui64>(20);
+ const auto data3 = pb.NewDataLiteral<ui64>(30);
+ const auto rightSingle = pb.AsSingle(pb.NewDataLiteral<ui64>(100));
+ const auto leftSingle = pb.AsSingle(pb.NewDataLiteral<ui64>(1000));
+
+ const auto list = pb.NewList(ui64Type, {data1, data2, data3});
+ const auto flow = pb.ToFlow(list);
+ const auto blocksFlow = pb.ToBlocks(flow);
+ const auto sumBlocksFlow = pb.Map(blocksFlow, [&](TRuntimeNode item) -> TRuntimeNode {
+ return {pb.BlockAdd(leftSingle, {pb.BlockAdd(item, rightSingle )})};
+ });
+ const auto pgmReturn = pb.Collect(pb.FromBlocks(sumBlocksFlow));
+
+ const auto graph = setup.BuildGraph(pgmReturn);
+ const auto iterator = graph->GetValue().GetListIterator();
+
+ NUdf::TUnboxedValue item;
+ UNIT_ASSERT(iterator.Next(item));
+ UNIT_ASSERT_VALUES_EQUAL(item.Get<ui64>(), 1110);
+
+ UNIT_ASSERT(iterator.Next(item));
+ UNIT_ASSERT_VALUES_EQUAL(item.Get<ui64>(), 1120);
+
+ UNIT_ASSERT(iterator.Next(item));
+ UNIT_ASSERT_VALUES_EQUAL(item.Get<ui64>(), 1130);
+ UNIT_ASSERT(!iterator.Next(item));
+ UNIT_ASSERT(!iterator.Next(item));
+}
+
+}
+
+}
+}
diff --git a/ydb/library/yql/minikql/comp_nodes/ut/mkql_computation_node_ut.cpp b/ydb/library/yql/minikql/comp_nodes/ut/mkql_computation_node_ut.cpp
index 4c305b3560..19f82aae4e 100644
--- a/ydb/library/yql/minikql/comp_nodes/ut/mkql_computation_node_ut.cpp
+++ b/ydb/library/yql/minikql/comp_nodes/ut/mkql_computation_node_ut.cpp
@@ -16,7 +16,7 @@ namespace NMiniKQL {
namespace {
-
+
ui64 g_Yield = std::numeric_limits<ui64>::max();
ui64 g_TestStreamData[] = {0, 0, 1, 0, 0, 0, 1, 2, 3};
ui64 g_TestYieldStreamData[] = {0, 1, 2, g_Yield, 0, g_Yield, 1, 2, 0, 1, 2, 0, g_Yield, 1, 2};
diff --git a/ydb/library/yql/minikql/comp_nodes/ut/ya.make b/ydb/library/yql/minikql/comp_nodes/ut/ya.make
index 70dde719ff..dffda1318f 100644
--- a/ydb/library/yql/minikql/comp_nodes/ut/ya.make
+++ b/ydb/library/yql/minikql/comp_nodes/ut/ya.make
@@ -19,7 +19,7 @@ OWNER(
)
SRCS(
- mkql_blocks_ut.cpp
+ mkql_blocks_ut.cpp
mkql_combine_ut.cpp
mkql_condense_ut.cpp
mkql_decimal_ut.cpp
diff --git a/ydb/library/yql/minikql/comp_nodes/ya.make b/ydb/library/yql/minikql/comp_nodes/ya.make
index 99a5eff186..55838c4012 100644
--- a/ydb/library/yql/minikql/comp_nodes/ya.make
+++ b/ydb/library/yql/minikql/comp_nodes/ya.make
@@ -16,8 +16,8 @@ SRCS(
mkql_append.h
mkql_apply.cpp
mkql_apply.h
- mkql_block_add.cpp
- mkql_blocks.cpp
+ mkql_block_add.cpp
+ mkql_blocks.cpp
mkql_callable.cpp
mkql_callable.h
mkql_chain_map.cpp
@@ -210,7 +210,7 @@ SRCS(
)
PEERDIR(
- contrib/libs/apache/arrow
+ contrib/libs/apache/arrow
ydb/library/binary_json
ydb/library/yql/minikql
ydb/library/yql/minikql/invoke_builtins
diff --git a/ydb/library/yql/minikql/computation/mkql_computation_node.cpp b/ydb/library/yql/minikql/computation/mkql_computation_node.cpp
index aafc04ae1f..aa2e27f028 100644
--- a/ydb/library/yql/minikql/computation/mkql_computation_node.cpp
+++ b/ydb/library/yql/minikql/computation/mkql_computation_node.cpp
@@ -26,15 +26,15 @@
namespace NKikimr {
namespace NMiniKQL {
-TComputationContext::TComputationContext(const THolderFactory& holderFactory,
- const NUdf::IValueBuilder* builder,
- TComputationOptsFull& opts,
- const TComputationMutables& mutables,
- arrow::MemoryPool& arrowMemoryPool)
+TComputationContext::TComputationContext(const THolderFactory& holderFactory,
+ const NUdf::IValueBuilder* builder,
+ TComputationOptsFull& opts,
+ const TComputationMutables& mutables,
+ arrow::MemoryPool& arrowMemoryPool)
: TComputationContextLLVM{holderFactory, opts.Stats, std::make_unique<NUdf::TUnboxedValue[]>(mutables.CurValueIndex), builder}
, RandomProvider(opts.RandomProvider)
, TimeProvider(opts.TimeProvider)
- , ArrowMemoryPool(arrowMemoryPool)
+ , ArrowMemoryPool(arrowMemoryPool)
{
std::fill_n(MutableValues.get(), mutables.CurValueIndex, NUdf::TUnboxedValue(NUdf::TUnboxedValuePod::Invalid()));
}
diff --git a/ydb/library/yql/minikql/computation/mkql_computation_node.h b/ydb/library/yql/minikql/computation/mkql_computation_node.h
index 09c1ecf219..a5d11b180b 100644
--- a/ydb/library/yql/minikql/computation/mkql_computation_node.h
+++ b/ydb/library/yql/minikql/computation/mkql_computation_node.h
@@ -82,13 +82,13 @@ struct TComputationContext : public TComputationContextLLVM {
IRandomProvider& RandomProvider;
ITimeProvider& TimeProvider;
bool ExecuteLLVM = true;
- arrow::MemoryPool& ArrowMemoryPool;
+ arrow::MemoryPool& ArrowMemoryPool;
- TComputationContext(const THolderFactory& holderFactory,
- const NUdf::IValueBuilder* builder,
- TComputationOptsFull& opts,
- const TComputationMutables& mutables,
- arrow::MemoryPool& arrowMemoryPool);
+ TComputationContext(const THolderFactory& holderFactory,
+ const NUdf::IValueBuilder* builder,
+ TComputationOptsFull& opts,
+ const TComputationMutables& mutables,
+ arrow::MemoryPool& arrowMemoryPool);
~TComputationContext();
// Returns true if current usage delta exceeds the memory limit
diff --git a/ydb/library/yql/minikql/computation/mkql_computation_node_graph.cpp b/ydb/library/yql/minikql/computation/mkql_computation_node_graph.cpp
index e2a401b366..f2b188ae47 100644
--- a/ydb/library/yql/minikql/computation/mkql_computation_node_graph.cpp
+++ b/ydb/library/yql/minikql/computation/mkql_computation_node_graph.cpp
@@ -286,10 +286,10 @@ private:
VisitType<TFlowType>(node);
}
- void Visit(TBlockType& node) override {
- VisitType<TBlockType>(node);
- }
-
+ void Visit(TBlockType& node) override {
+ VisitType<TBlockType>(node);
+ }
+
void Visit(TTaggedType& node) override {
VisitType<TTaggedType>(node);
}
@@ -535,7 +535,7 @@ public:
HolderFactory = MakeHolder<THolderFactory>(CompOpts.AllocState, *MemInfo, patternNodes->HolderFactory->GetFunctionRegistry());
ValueBuilder = MakeHolder<TDefaultValueBuilder>(*HolderFactory.Get(), compOpts.ValidatePolicy);
ValueBuilder->SetSecureParamsProvider(CompOpts.SecureParamsProvider);
- ArrowMemoryPool = MakeArrowMemoryPool(CompOpts.AllocState);
+ ArrowMemoryPool = MakeArrowMemoryPool(CompOpts.AllocState);
}
~TComputationGraph() {
@@ -553,11 +553,11 @@ public:
void Prepare() override {
if (!IsPrepared) {
- Ctx.Reset(new TComputationContext(*HolderFactory,
- ValueBuilder.Get(),
- CompOpts,
- PatternNodes->GetMutables(),
- *ArrowMemoryPool));
+ Ctx.Reset(new TComputationContext(*HolderFactory,
+ ValueBuilder.Get(),
+ CompOpts,
+ PatternNodes->GetMutables(),
+ *ArrowMemoryPool));
ValueBuilder->SetCalleePositionHolder(Ctx->CalleePosition);
for (auto& node : PatternNodes->GetNodes()) {
node->InitNode(*Ctx);
@@ -650,7 +650,7 @@ private:
const TIntrusivePtr<TMemoryUsageInfo> MemInfo;
THolder<THolderFactory> HolderFactory;
THolder<TDefaultValueBuilder> ValueBuilder;
- std::unique_ptr<arrow::MemoryPool> ArrowMemoryPool;
+ std::unique_ptr<arrow::MemoryPool> ArrowMemoryPool;
THolder<TComputationContext> Ctx;
TComputationOptsFull CompOpts;
bool IsPrepared = false;
diff --git a/ydb/library/yql/minikql/computation/mkql_computation_node_holders.cpp b/ydb/library/yql/minikql/computation/mkql_computation_node_holders.cpp
index 9e89235b92..67a7cfb302 100644
--- a/ydb/library/yql/minikql/computation/mkql_computation_node_holders.cpp
+++ b/ydb/library/yql/minikql/computation/mkql_computation_node_holders.cpp
@@ -3042,10 +3042,10 @@ NUdf::TSingleBlockPtr THolderFactory::CreateSingleBlock(const NUdf::TUnboxedValu
return AllocateOn<TSingleBlockImpl>(CurrentAllocState, &MemInfo, value);
}
-NUdf::TUnboxedValuePod THolderFactory::CreateArrowBlock(arrow::Datum&& datum) const {
- return Create<TArrowBlock>(std::move(datum));
-}
-
+NUdf::TUnboxedValuePod THolderFactory::CreateArrowBlock(arrow::Datum&& datum) const {
+ return Create<TArrowBlock>(std::move(datum));
+}
+
NUdf::TUnboxedValuePod THolderFactory::VectorAsArray(TUnboxedValueVector& values) const {
if (values.empty())
return GetEmptyContainer();
diff --git a/ydb/library/yql/minikql/computation/mkql_computation_node_holders.h b/ydb/library/yql/minikql/computation/mkql_computation_node_holders.h
index 2a0c80759a..4486d6f43a 100644
--- a/ydb/library/yql/minikql/computation/mkql_computation_node_holders.h
+++ b/ydb/library/yql/minikql/computation/mkql_computation_node_holders.h
@@ -9,8 +9,8 @@
#include <ydb/library/yql/minikql/compact_hash.h>
#include <ydb/library/yql/minikql/mkql_type_ops.h>
-#include <contrib/libs/apache/arrow/cpp/src/arrow/datum.h>
-
+#include <contrib/libs/apache/arrow/cpp/src/arrow/datum.h>
+
#include <util/generic/maybe.h>
#include <util/memory/pool.h>
@@ -321,34 +321,34 @@ private:
TType* const Type;
};
-class TArrowBlock: public TComputationValue<TArrowBlock> {
-public:
- explicit TArrowBlock(TMemoryUsageInfo* memInfo, arrow::Datum&& datum)
- : TComputationValue(memInfo)
- , Datum_(std::move(datum))
- {
- }
-
- inline static TArrowBlock& From(const NUdf::TUnboxedValue& value) {
- return *static_cast<TArrowBlock*>(value.AsBoxed().Get());
- }
-
- inline arrow::Datum& GetDatum() {
- return Datum_;
- }
-
- NUdf::TStringRef GetResourceTag() const override {
- return NUdf::TStringRef::Of("ArrowBlock");
- }
-
- void* GetResource() override {
- return &Datum_;
- }
-
-private:
- arrow::Datum Datum_;
-};
-
+class TArrowBlock: public TComputationValue<TArrowBlock> {
+public:
+ explicit TArrowBlock(TMemoryUsageInfo* memInfo, arrow::Datum&& datum)
+ : TComputationValue(memInfo)
+ , Datum_(std::move(datum))
+ {
+ }
+
+ inline static TArrowBlock& From(const NUdf::TUnboxedValue& value) {
+ return *static_cast<TArrowBlock*>(value.AsBoxed().Get());
+ }
+
+ inline arrow::Datum& GetDatum() {
+ return Datum_;
+ }
+
+ NUdf::TStringRef GetResourceTag() const override {
+ return NUdf::TStringRef::Of("ArrowBlock");
+ }
+
+ void* GetResource() override {
+ return &Datum_;
+ }
+
+private:
+ arrow::Datum Datum_;
+};
+
//////////////////////////////////////////////////////////////////////////////
// THolderFactory
//////////////////////////////////////////////////////////////////////////////
@@ -375,8 +375,8 @@ public:
NUdf::TFlatArrayBlockPtr CreateFlatArrayBlock(ui32 count) const;
NUdf::TSingleBlockPtr CreateSingleBlock(const NUdf::TUnboxedValue& value) const;
- NUdf::TUnboxedValuePod CreateArrowBlock(arrow::Datum&& datum) const;
-
+ NUdf::TUnboxedValuePod CreateArrowBlock(arrow::Datum&& datum) const;
+
NUdf::TUnboxedValuePod VectorAsArray(TUnboxedValueVector& values) const;
template <class TForwardIterator>
diff --git a/ydb/library/yql/minikql/computation/ya.make b/ydb/library/yql/minikql/computation/ya.make
index 5a8af5675d..ae419b3b23 100644
--- a/ydb/library/yql/minikql/computation/ya.make
+++ b/ydb/library/yql/minikql/computation/ya.make
@@ -56,7 +56,7 @@ LLVM_BC(
)
PEERDIR(
- contrib/libs/apache/arrow
+ contrib/libs/apache/arrow
library/cpp/enumbitset
library/cpp/packedtypes
library/cpp/random_provider
diff --git a/ydb/library/yql/minikql/mkql_node.cpp b/ydb/library/yql/minikql/mkql_node.cpp
index 1886244760..4c6f3e45ad 100644
--- a/ydb/library/yql/minikql/mkql_node.cpp
+++ b/ydb/library/yql/minikql/mkql_node.cpp
@@ -209,7 +209,7 @@ TStringBuf TType::GetKindAsStr() const {
xx(EmptyList, TEmptyListType) \
xx(EmptyDict, TEmptyDictType) \
xx(Tagged, TTaggedType) \
- xx(Block, TBlockType) \
+ xx(Block, TBlockType) \
void TType::Accept(INodeVisitor& visitor) {
switch (Kind) {
@@ -2138,48 +2138,48 @@ void TVariantLiteral::DoFreeze(const TTypeEnvironment& env) {
Item.Freeze();
}
-TBlockType::TBlockType(TType* itemType, EShape shape, const TTypeEnvironment& env)
- : TType(EKind::Block, env.GetTypeOfType())
- , ItemType(itemType)
- , Shape(shape)
-{
-}
-
-TBlockType* TBlockType::Create(TType* itemType, EShape shape, const TTypeEnvironment& env) {
- return ::new(env.Allocate<TBlockType>()) TBlockType(itemType, shape, env);
-}
-
-bool TBlockType::IsSameType(const TBlockType& typeToCompare) const {
- return GetItemType()->IsSameType(*typeToCompare.GetItemType()) && Shape == typeToCompare.Shape;
-}
-
-bool TBlockType::IsConvertableTo(const TBlockType& typeToCompare, bool ignoreTagged) const {
- return Shape == typeToCompare.Shape &&
- GetItemType()->IsConvertableTo(*typeToCompare.GetItemType(), ignoreTagged);
-}
-
-void TBlockType::DoUpdateLinks(const THashMap<TNode*, TNode*>& links) {
- const auto itemTypeIt = links.find(ItemType);
- if (itemTypeIt != links.end()) {
- auto* newNode = itemTypeIt->second;
- Y_VERIFY_DEBUG(ItemType->Equals(*newNode));
- ItemType = static_cast<TType*>(newNode);
- }
-}
-
-TNode* TBlockType::DoCloneOnCallableWrite(const TTypeEnvironment& env) const {
- auto newTypeNode = (TNode*)ItemType->GetCookie();
- if (!newTypeNode) {
- return const_cast<TBlockType*>(this);
- }
-
- return ::new(env.Allocate<TBlockType>()) TBlockType(static_cast<TType*>(newTypeNode), Shape, env);
-}
-
-void TBlockType::DoFreeze(const TTypeEnvironment& env) {
- Y_UNUSED(env);
-}
-
+TBlockType::TBlockType(TType* itemType, EShape shape, const TTypeEnvironment& env)
+ : TType(EKind::Block, env.GetTypeOfType())
+ , ItemType(itemType)
+ , Shape(shape)
+{
+}
+
+TBlockType* TBlockType::Create(TType* itemType, EShape shape, const TTypeEnvironment& env) {
+ return ::new(env.Allocate<TBlockType>()) TBlockType(itemType, shape, env);
+}
+
+bool TBlockType::IsSameType(const TBlockType& typeToCompare) const {
+ return GetItemType()->IsSameType(*typeToCompare.GetItemType()) && Shape == typeToCompare.Shape;
+}
+
+bool TBlockType::IsConvertableTo(const TBlockType& typeToCompare, bool ignoreTagged) const {
+ return Shape == typeToCompare.Shape &&
+ GetItemType()->IsConvertableTo(*typeToCompare.GetItemType(), ignoreTagged);
+}
+
+void TBlockType::DoUpdateLinks(const THashMap<TNode*, TNode*>& links) {
+ const auto itemTypeIt = links.find(ItemType);
+ if (itemTypeIt != links.end()) {
+ auto* newNode = itemTypeIt->second;
+ Y_VERIFY_DEBUG(ItemType->Equals(*newNode));
+ ItemType = static_cast<TType*>(newNode);
+ }
+}
+
+TNode* TBlockType::DoCloneOnCallableWrite(const TTypeEnvironment& env) const {
+ auto newTypeNode = (TNode*)ItemType->GetCookie();
+ if (!newTypeNode) {
+ return const_cast<TBlockType*>(this);
+ }
+
+ return ::new(env.Allocate<TBlockType>()) TBlockType(static_cast<TType*>(newTypeNode), Shape, env);
+}
+
+void TBlockType::DoFreeze(const TTypeEnvironment& env) {
+ Y_UNUSED(env);
+}
+
bool IsNumericType(NUdf::TDataTypeId typeId) {
auto slot = NUdf::FindDataSlot(typeId);
return slot && NUdf::GetDataTypeInfo(*slot).Features & NUdf::NumericType;
@@ -2236,7 +2236,7 @@ EValueRepresentation GetValueRepresentation(const TType* type) {
case TType::EKind::Dict:
case TType::EKind::List:
case TType::EKind::Resource:
- case TType::EKind::Block:
+ case TType::EKind::Block:
case TType::EKind::Callable:
case TType::EKind::EmptyList:
case TType::EKind::EmptyDict:
diff --git a/ydb/library/yql/minikql/mkql_node.h b/ydb/library/yql/minikql/mkql_node.h
index 3ae2e8bdaa..ee74455ea6 100644
--- a/ydb/library/yql/minikql/mkql_node.h
+++ b/ydb/library/yql/minikql/mkql_node.h
@@ -147,8 +147,8 @@ class TTypeEnvironment;
XX(ReservedKind, 15) \
XX(EmptyList, 16 + 2) \
XX(EmptyDict, 32 + 2) \
- XX(Tagged, 48 + 7) \
- XX(Block, 16 + 13)
+ XX(Tagged, 48 + 7) \
+ XX(Block, 16 + 13)
class TType : public TNode {
public:
@@ -1321,44 +1321,44 @@ private:
ui32 Index;
};
-class TBlockType : public TType {
- friend class TType;
-
-public:
- enum class EShape: ui8 {
- Single = 0,
- Many = 1
- };
-
-public:
- static TBlockType* Create(TType* itemType, EShape shape, const TTypeEnvironment& env);
-
- using TType::IsSameType;
- bool IsSameType(const TBlockType& typeToCompare) const;
-
- using TType::IsConvertableTo;
- bool IsConvertableTo(const TBlockType& typeToCompare, bool ignoreTagged = false) const;
-
- inline TType* GetItemType() const noexcept {
- return ItemType;
- }
-
- inline EShape GetShape() const noexcept {
- return Shape;
- }
-
-private:
- TBlockType(TType* itemType, EShape shape, const TTypeEnvironment& env);
-
- void DoUpdateLinks(const THashMap<TNode*, TNode*>& links);
- TNode* DoCloneOnCallableWrite(const TTypeEnvironment& env) const;
- void DoFreeze(const TTypeEnvironment& env);
-
-private:
- TType* ItemType;
- EShape Shape;
-};
-
+class TBlockType : public TType {
+ friend class TType;
+
+public:
+ enum class EShape: ui8 {
+ Single = 0,
+ Many = 1
+ };
+
+public:
+ static TBlockType* Create(TType* itemType, EShape shape, const TTypeEnvironment& env);
+
+ using TType::IsSameType;
+ bool IsSameType(const TBlockType& typeToCompare) const;
+
+ using TType::IsConvertableTo;
+ bool IsConvertableTo(const TBlockType& typeToCompare, bool ignoreTagged = false) const;
+
+ inline TType* GetItemType() const noexcept {
+ return ItemType;
+ }
+
+ inline EShape GetShape() const noexcept {
+ return Shape;
+ }
+
+private:
+ TBlockType(TType* itemType, EShape shape, const TTypeEnvironment& env);
+
+ void DoUpdateLinks(const THashMap<TNode*, TNode*>& links);
+ TNode* DoCloneOnCallableWrite(const TTypeEnvironment& env) const;
+ void DoFreeze(const TTypeEnvironment& env);
+
+private:
+ TType* ItemType;
+ EShape Shape;
+};
+
inline bool TRuntimeNode::operator==(const TRuntimeNode& other) const {
return IsImmediate() == other.IsImmediate() && GetNode()->Equals(*other.GetNode());
}
diff --git a/ydb/library/yql/minikql/mkql_node_cast.cpp b/ydb/library/yql/minikql/mkql_node_cast.cpp
index e5d644e446..0a62a7d615 100644
--- a/ydb/library/yql/minikql/mkql_node_cast.cpp
+++ b/ydb/library/yql/minikql/mkql_node_cast.cpp
@@ -52,7 +52,7 @@ MKQL_AS_TYPE(Variant)
MKQL_AS_TYPE(Stream)
MKQL_AS_TYPE(Flow)
MKQL_AS_TYPE(Tagged)
-MKQL_AS_TYPE(Block)
+MKQL_AS_TYPE(Block)
MKQL_AS_VALUE(Any, Type)
MKQL_AS_VALUE(Callable, Type)
diff --git a/ydb/library/yql/minikql/mkql_node_printer.cpp b/ydb/library/yql/minikql/mkql_node_printer.cpp
index 971de764e1..f7515a3005 100644
--- a/ydb/library/yql/minikql/mkql_node_printer.cpp
+++ b/ydb/library/yql/minikql/mkql_node_printer.cpp
@@ -189,36 +189,36 @@ namespace {
WriteNewline();
}
- void Visit(TBlockType& node) override {
- WriteIndentation();
- Out << "Type (Block) {";
- WriteNewline();
-
- {
- TIndentScope scope(this);
- WriteIndentation();
- Out << "Block item type: {";
- WriteNewline();
-
- {
- TIndentScope scope2(this);
- node.GetItemType()->Accept(*this);
- }
-
- WriteIndentation();
- Out << "}";
- WriteNewline();
-
- WriteIndentation();
- Out << "Block shape: " << (node.GetShape() == TBlockType::EShape::Single ? "Single" : "Many");
- WriteNewline();
- }
-
- WriteIndentation();
- Out << "}";
- WriteNewline();
- }
-
+ void Visit(TBlockType& node) override {
+ WriteIndentation();
+ Out << "Type (Block) {";
+ WriteNewline();
+
+ {
+ TIndentScope scope(this);
+ WriteIndentation();
+ Out << "Block item type: {";
+ WriteNewline();
+
+ {
+ TIndentScope scope2(this);
+ node.GetItemType()->Accept(*this);
+ }
+
+ WriteIndentation();
+ Out << "}";
+ WriteNewline();
+
+ WriteIndentation();
+ Out << "Block shape: " << (node.GetShape() == TBlockType::EShape::Single ? "Single" : "Many");
+ WriteNewline();
+ }
+
+ WriteIndentation();
+ Out << "}";
+ WriteNewline();
+ }
+
void Visit(TOptionalType& node) override {
WriteIndentation();
Out << "Type (Optional) {";
diff --git a/ydb/library/yql/minikql/mkql_node_printer_ut.cpp b/ydb/library/yql/minikql/mkql_node_printer_ut.cpp
index 0e72384007..ec8c5d1afe 100644
--- a/ydb/library/yql/minikql/mkql_node_printer_ut.cpp
+++ b/ydb/library/yql/minikql/mkql_node_printer_ut.cpp
@@ -76,8 +76,8 @@ namespace {
structBuilder.Add("41", TRuntimeNode(env.GetEmptyList(), true));
structBuilder.Add("42", TRuntimeNode(env.GetEmptyDict(), true));
structBuilder.Add("43", TRuntimeNode(TTaggedType::Create(dtype1, "mytag", env), true));
- structBuilder.Add("44", TRuntimeNode(TBlockType::Create(dtype1, TBlockType::EShape::Single, env), true));
- structBuilder.Add("45", TRuntimeNode(TBlockType::Create(dtype2, TBlockType::EShape::Many, env), true));
+ structBuilder.Add("44", TRuntimeNode(TBlockType::Create(dtype1, TBlockType::EShape::Single, env), true));
+ structBuilder.Add("45", TRuntimeNode(TBlockType::Create(dtype2, TBlockType::EShape::Many, env), true));
return structBuilder.Build();
}
}
diff --git a/ydb/library/yql/minikql/mkql_node_serialization.cpp b/ydb/library/yql/minikql/mkql_node_serialization.cpp
index 665e974e33..e314f38cd6 100644
--- a/ydb/library/yql/minikql/mkql_node_serialization.cpp
+++ b/ydb/library/yql/minikql/mkql_node_serialization.cpp
@@ -210,19 +210,19 @@ namespace {
IsProcessed0 = false;
}
- void Visit(TBlockType& node) override {
- if (node.GetCookie() != 0) {
- Owner.WriteReference(node);
- IsProcessed0 = true;
- return;
- }
-
- Owner.Write(TypeMarker | (char)TType::EKind::Block);
- auto itemType = node.GetItemType();
- Owner.AddChildNode(*itemType);
- IsProcessed0 = false;
- }
-
+ void Visit(TBlockType& node) override {
+ if (node.GetCookie() != 0) {
+ Owner.WriteReference(node);
+ IsProcessed0 = true;
+ return;
+ }
+
+ Owner.Write(TypeMarker | (char)TType::EKind::Block);
+ auto itemType = node.GetItemType();
+ Owner.AddChildNode(*itemType);
+ IsProcessed0 = false;
+ }
+
void Visit(TTaggedType& node) override {
if (node.GetCookie() != 0) {
Owner.WriteReference(node);
@@ -596,11 +596,11 @@ namespace {
Owner.RegisterReference(node);
}
- void Visit(TBlockType& node) override {
- Owner.Write(static_cast<ui8>(node.GetShape()));
- Owner.RegisterReference(node);
- }
-
+ void Visit(TBlockType& node) override {
+ Owner.Write(static_cast<ui8>(node.GetShape()));
+ Owner.RegisterReference(node);
+ }
+
void Visit(TTaggedType& node) override {
auto tag = node.GetTagStr();
Owner.WriteName(tag);
@@ -1196,7 +1196,7 @@ namespace {
case TType::EKind::Variant:
return ReadVariantType();
case TType::EKind::Flow:
- return ReadFlowOrBlockType(code);
+ return ReadFlowOrBlockType(code);
case TType::EKind::Null:
return ReadNullType();
default:
@@ -1313,15 +1313,15 @@ namespace {
return node;
}
- TNode* ReadFlowOrBlockType(char code) {
- switch ((TType::EKind)(code & TypeMask)) {
- case TType::EKind::Flow: return ReadFlowType();
- case TType::EKind::Block: return ReadBlockType();
- default:
- ThrowCorrupted();
- }
- }
-
+ TNode* ReadFlowOrBlockType(char code) {
+ switch ((TType::EKind)(code & TypeMask)) {
+ case TType::EKind::Flow: return ReadFlowType();
+ case TType::EKind::Block: return ReadBlockType();
+ default:
+ ThrowCorrupted();
+ }
+ }
+
TNode* ReadFlowType() {
auto itemTypeNode = PopNode();
if (itemTypeNode->GetType()->GetKind() != TType::EKind::Type)
@@ -1333,26 +1333,26 @@ namespace {
return node;
}
- TNode* ReadBlockType() {
- auto itemTypeNode = PopNode();
- if (itemTypeNode->GetType()->GetKind() != TType::EKind::Type) {
- ThrowCorrupted();
- }
-
- const auto shapeChar = Read();
- if (shapeChar != static_cast<char>(TBlockType::EShape::Single) &&
- shapeChar != static_cast<char>(TBlockType::EShape::Many))
- {
- ThrowCorrupted();
- }
- const auto shape = static_cast<TBlockType::EShape>(shapeChar);
-
- auto itemType = static_cast<TType*>(itemTypeNode);
- auto node = TBlockType::Create(itemType, shape, Env);
- Nodes.push_back(node);
- return node;
- }
-
+ TNode* ReadBlockType() {
+ auto itemTypeNode = PopNode();
+ if (itemTypeNode->GetType()->GetKind() != TType::EKind::Type) {
+ ThrowCorrupted();
+ }
+
+ const auto shapeChar = Read();
+ if (shapeChar != static_cast<char>(TBlockType::EShape::Single) &&
+ shapeChar != static_cast<char>(TBlockType::EShape::Many))
+ {
+ ThrowCorrupted();
+ }
+ const auto shape = static_cast<TBlockType::EShape>(shapeChar);
+
+ auto itemType = static_cast<TType*>(itemTypeNode);
+ auto node = TBlockType::Create(itemType, shape, Env);
+ Nodes.push_back(node);
+ return node;
+ }
+
TNode* ReadTaggedType() {
auto baseTypeNode = PopNode();
if (baseTypeNode->GetType()->GetKind() != TType::EKind::Type)
diff --git a/ydb/library/yql/minikql/mkql_node_visitor.cpp b/ydb/library/yql/minikql/mkql_node_visitor.cpp
index 4c6d1ffbb7..5a4ac8e479 100644
--- a/ydb/library/yql/minikql/mkql_node_visitor.cpp
+++ b/ydb/library/yql/minikql/mkql_node_visitor.cpp
@@ -166,11 +166,11 @@ void TThrowingNodeVisitor::Visit(TTaggedType& node) {
ThrowUnexpectedNodeType();
}
-void TThrowingNodeVisitor::Visit(TBlockType& node) {
- Y_UNUSED(node);
- ThrowUnexpectedNodeType();
-}
-
+void TThrowingNodeVisitor::Visit(TBlockType& node) {
+ Y_UNUSED(node);
+ ThrowUnexpectedNodeType();
+}
+
void TThrowingNodeVisitor::ThrowUnexpectedNodeType() {
THROW yexception() << "Unexpected node type";
}
@@ -299,10 +299,10 @@ void TEmptyNodeVisitor::Visit(TTaggedType& node) {
Y_UNUSED(node);
}
-void TEmptyNodeVisitor::Visit(TBlockType& node) {
- Y_UNUSED(node);
-}
-
+void TEmptyNodeVisitor::Visit(TBlockType& node) {
+ Y_UNUSED(node);
+}
+
void TExploringNodeVisitor::Visit(TTypeType& node) {
Y_VERIFY_DEBUG(node.GetType() == &node);
}
@@ -476,11 +476,11 @@ void TExploringNodeVisitor::Visit(TTaggedType& node) {
AddChildNode(&node, *node.GetBaseType());
}
-void TExploringNodeVisitor::Visit(TBlockType& node) {
- AddChildNode(&node, *node.GetType());
- AddChildNode(&node, *node.GetItemType());
-}
-
+void TExploringNodeVisitor::Visit(TBlockType& node) {
+ AddChildNode(&node, *node.GetType());
+ AddChildNode(&node, *node.GetItemType());
+}
+
void TExploringNodeVisitor::AddChildNode(TNode* parent, TNode& child) {
Stack->push_back(&child);
diff --git a/ydb/library/yql/minikql/mkql_node_visitor.h b/ydb/library/yql/minikql/mkql_node_visitor.h
index 1b7f4b8455..f5bf76d520 100644
--- a/ydb/library/yql/minikql/mkql_node_visitor.h
+++ b/ydb/library/yql/minikql/mkql_node_visitor.h
@@ -45,7 +45,7 @@ public:
virtual void Visit(TStreamType& node) = 0;
virtual void Visit(TFlowType& node) = 0;
virtual void Visit(TTaggedType& node) = 0;
- virtual void Visit(TBlockType& node) = 0;
+ virtual void Visit(TBlockType& node) = 0;
};
class TThrowingNodeVisitor : public INodeVisitor {
@@ -81,7 +81,7 @@ public:
void Visit(TStreamType& node) override;
void Visit(TFlowType& node) override;
void Visit(TTaggedType& node) override;
- void Visit(TBlockType& node) override;
+ void Visit(TBlockType& node) override;
protected:
static void ThrowUnexpectedNodeType();
@@ -120,7 +120,7 @@ public:
void Visit(TStreamType& node) override;
void Visit(TFlowType& node) override;
void Visit(TTaggedType& node) override;
- void Visit(TBlockType& node) override;
+ void Visit(TBlockType& node) override;
};
class TExploringNodeVisitor : public INodeVisitor {
@@ -158,7 +158,7 @@ public:
void Visit(TStreamType& node) override;
void Visit(TFlowType& node) override;
void Visit(TTaggedType& node) override;
- void Visit(TBlockType& node) override;
+ void Visit(TBlockType& node) override;
void Walk(TNode* root, const TTypeEnvironment& env, const std::vector<TNode*>& terminalNodes = std::vector<TNode*>(),
bool buildConsumersMap = false, size_t nodesCountHint = 0);
diff --git a/ydb/library/yql/minikql/mkql_program_builder.cpp b/ydb/library/yql/minikql/mkql_program_builder.cpp
index 1c4d6a1e44..47ae6e4c8f 100644
--- a/ydb/library/yql/minikql/mkql_program_builder.cpp
+++ b/ydb/library/yql/minikql/mkql_program_builder.cpp
@@ -1365,71 +1365,71 @@ TRuntimeNode TProgramBuilder::Steal(TRuntimeNode input) {
return TRuntimeNode(callableBuilder.Build(), false);
}
-TRuntimeNode TProgramBuilder::ToBlocks(TRuntimeNode flow) {
- auto* flowType = AS_TYPE(TFlowType, flow.GetStaticType());
- auto* blockType = NewBlockType(flowType->GetItemType(), TBlockType::EShape::Many);
-
- TCallableBuilder callableBuilder(Env, __func__, NewFlowType(blockType));
- callableBuilder.Add(flow);
- return TRuntimeNode(callableBuilder.Build(), false);
-}
-
-TRuntimeNode TProgramBuilder::WideToBlocks(TRuntimeNode flow) {
- TType* outputTupleType;
- {
- const auto* inputTupleType = AS_TYPE(TTupleType, AS_TYPE(TFlowType, flow.GetStaticType())->GetItemType());
- std::vector<TType*> outputTupleItems;
- outputTupleItems.reserve(inputTupleType->GetElementsCount());
- for (size_t i = 0; i < inputTupleType->GetElementsCount(); ++i) {
- outputTupleItems.push_back(NewBlockType(inputTupleType->GetElementType(i), TBlockType::EShape::Many));
- }
- outputTupleType = NewTupleType(outputTupleItems);
- }
-
- TCallableBuilder callableBuilder(Env, __func__, NewFlowType(outputTupleType));
- callableBuilder.Add(flow);
- return TRuntimeNode(callableBuilder.Build(), false);
-}
-
-TRuntimeNode TProgramBuilder::FromBlocks(TRuntimeNode flow) {
- auto* flowType = AS_TYPE(TFlowType, flow.GetStaticType());
- auto* blockType = AS_TYPE(TBlockType, flowType->GetItemType());
-
- TCallableBuilder callableBuilder(Env, __func__, NewFlowType(blockType->GetItemType()));
- callableBuilder.Add(flow);
- return TRuntimeNode(callableBuilder.Build(), false);
-}
-
-TRuntimeNode TProgramBuilder::AsSingle(TRuntimeNode value) {
- TCallableBuilder callableBuilder(Env, __func__, NewBlockType(value.GetStaticType(), TBlockType::EShape::Single));
- callableBuilder.Add(value);
- return TRuntimeNode(callableBuilder.Build(), false);
-}
-
-TRuntimeNode TProgramBuilder::BlockAdd(TRuntimeNode arg1, TRuntimeNode arg2) {
- bool arg1Optional;
- auto* arg1BlockType = AS_TYPE(TBlockType, arg1.GetStaticType());
- auto* arg1Type = UnpackOptionalData(arg1BlockType->GetItemType(), arg1Optional);
-
- bool arg2Optional;
- auto* arg2BlockType = AS_TYPE(TBlockType, arg2.GetStaticType());
- auto* arg2Type = UnpackOptionalData(arg2BlockType->GetItemType(), arg2Optional);
-
- MKQL_ENSURE(arg1BlockType->GetShape() != TBlockType::EShape::Single ||
- arg2BlockType->GetShape() != TBlockType::EShape::Single,
- "At least one EShape::Many block expected");
-
- auto* resultDataType = BuildArithmeticCommonType(arg1Type, arg2Type);
- if (arg1Optional || arg2Optional) {
- resultDataType = NewOptionalType(resultDataType);
- }
- auto* callableType = TCallableBuilder(Env, __func__, NewBlockType(resultDataType, TBlockType::EShape::Many))
- .Add(arg1)
- .Add(arg2)
- .Build();
- return TRuntimeNode(callableType, false);
-}
-
+TRuntimeNode TProgramBuilder::ToBlocks(TRuntimeNode flow) {
+ auto* flowType = AS_TYPE(TFlowType, flow.GetStaticType());
+ auto* blockType = NewBlockType(flowType->GetItemType(), TBlockType::EShape::Many);
+
+ TCallableBuilder callableBuilder(Env, __func__, NewFlowType(blockType));
+ callableBuilder.Add(flow);
+ return TRuntimeNode(callableBuilder.Build(), false);
+}
+
+TRuntimeNode TProgramBuilder::WideToBlocks(TRuntimeNode flow) {
+ TType* outputTupleType;
+ {
+ const auto* inputTupleType = AS_TYPE(TTupleType, AS_TYPE(TFlowType, flow.GetStaticType())->GetItemType());
+ std::vector<TType*> outputTupleItems;
+ outputTupleItems.reserve(inputTupleType->GetElementsCount());
+ for (size_t i = 0; i < inputTupleType->GetElementsCount(); ++i) {
+ outputTupleItems.push_back(NewBlockType(inputTupleType->GetElementType(i), TBlockType::EShape::Many));
+ }
+ outputTupleType = NewTupleType(outputTupleItems);
+ }
+
+ TCallableBuilder callableBuilder(Env, __func__, NewFlowType(outputTupleType));
+ callableBuilder.Add(flow);
+ return TRuntimeNode(callableBuilder.Build(), false);
+}
+
+TRuntimeNode TProgramBuilder::FromBlocks(TRuntimeNode flow) {
+ auto* flowType = AS_TYPE(TFlowType, flow.GetStaticType());
+ auto* blockType = AS_TYPE(TBlockType, flowType->GetItemType());
+
+ TCallableBuilder callableBuilder(Env, __func__, NewFlowType(blockType->GetItemType()));
+ callableBuilder.Add(flow);
+ return TRuntimeNode(callableBuilder.Build(), false);
+}
+
+TRuntimeNode TProgramBuilder::AsSingle(TRuntimeNode value) {
+ TCallableBuilder callableBuilder(Env, __func__, NewBlockType(value.GetStaticType(), TBlockType::EShape::Single));
+ callableBuilder.Add(value);
+ return TRuntimeNode(callableBuilder.Build(), false);
+}
+
+TRuntimeNode TProgramBuilder::BlockAdd(TRuntimeNode arg1, TRuntimeNode arg2) {
+ bool arg1Optional;
+ auto* arg1BlockType = AS_TYPE(TBlockType, arg1.GetStaticType());
+ auto* arg1Type = UnpackOptionalData(arg1BlockType->GetItemType(), arg1Optional);
+
+ bool arg2Optional;
+ auto* arg2BlockType = AS_TYPE(TBlockType, arg2.GetStaticType());
+ auto* arg2Type = UnpackOptionalData(arg2BlockType->GetItemType(), arg2Optional);
+
+ MKQL_ENSURE(arg1BlockType->GetShape() != TBlockType::EShape::Single ||
+ arg2BlockType->GetShape() != TBlockType::EShape::Single,
+ "At least one EShape::Many block expected");
+
+ auto* resultDataType = BuildArithmeticCommonType(arg1Type, arg2Type);
+ if (arg1Optional || arg2Optional) {
+ resultDataType = NewOptionalType(resultDataType);
+ }
+ auto* callableType = TCallableBuilder(Env, __func__, NewBlockType(resultDataType, TBlockType::EShape::Many))
+ .Add(arg1)
+ .Add(arg2)
+ .Build();
+ return TRuntimeNode(callableType, false);
+}
+
TRuntimeNode TProgramBuilder::ListFromRange(TRuntimeNode start, TRuntimeNode end, TRuntimeNode step) {
MKQL_ENSURE(start.GetStaticType()->IsData(), "Expected data");
MKQL_ENSURE(end.GetStaticType()->IsSameType(*start.GetStaticType()), "Mismatch type");
@@ -2034,14 +2034,14 @@ TType* TProgramBuilder::NewFlowType(TType* itemType) {
return TFlowType::Create(itemType, Env);
}
-TType* TProgramBuilder::NewBlockType(TType* itemType, TBlockType::EShape shape) {
- bool isOptional;
- auto* dataType = UnpackOptionalData(itemType, isOptional);
- MKQL_ENSURE(dataType->GetDataSlot() == NUdf::EDataSlot::Uint64, "Expected Uint64");
-
- return TBlockType::Create(itemType, shape, Env);
-}
-
+TType* TProgramBuilder::NewBlockType(TType* itemType, TBlockType::EShape shape) {
+ bool isOptional;
+ auto* dataType = UnpackOptionalData(itemType, isOptional);
+ MKQL_ENSURE(dataType->GetDataSlot() == NUdf::EDataSlot::Uint64, "Expected Uint64");
+
+ return TBlockType::Create(itemType, shape, Env);
+}
+
TType* TProgramBuilder::NewTaggedType(TType* baseType, const std::string_view& tag) {
return TTaggedType::Create(baseType, tag, Env);
}
diff --git a/ydb/library/yql/minikql/mkql_program_builder.h b/ydb/library/yql/minikql/mkql_program_builder.h
index 7fe9846332..10f1ad7ccf 100644
--- a/ydb/library/yql/minikql/mkql_program_builder.h
+++ b/ydb/library/yql/minikql/mkql_program_builder.h
@@ -179,7 +179,7 @@ public:
TType* NewStreamType(TType* itemType);
TType* NewFlowType(TType* itemType);
TType* NewTaggedType(TType* baseType, const std::string_view& tag);
- TType* NewBlockType(TType* itemType, TBlockType::EShape shape);
+ TType* NewBlockType(TType* itemType, TBlockType::EShape shape);
TType* NewEmptyTupleType();
TType* NewTupleType(const TArrayRef<TType* const>& elements);
@@ -232,13 +232,13 @@ public:
TRuntimeNode FromFlow(TRuntimeNode flow);
TRuntimeNode Steal(TRuntimeNode input);
- TRuntimeNode ToBlocks(TRuntimeNode flow);
- TRuntimeNode WideToBlocks(TRuntimeNode flow);
- TRuntimeNode FromBlocks(TRuntimeNode flow);
- TRuntimeNode AsSingle(TRuntimeNode flow);
-
- TRuntimeNode BlockAdd(TRuntimeNode data1, TRuntimeNode data2);
-
+ TRuntimeNode ToBlocks(TRuntimeNode flow);
+ TRuntimeNode WideToBlocks(TRuntimeNode flow);
+ TRuntimeNode FromBlocks(TRuntimeNode flow);
+ TRuntimeNode AsSingle(TRuntimeNode flow);
+
+ TRuntimeNode BlockAdd(TRuntimeNode data1, TRuntimeNode data2);
+
// udfs
TRuntimeNode Udf(
const std::string_view& funcName,
diff --git a/ydb/library/yql/minikql/mkql_terminator.cpp b/ydb/library/yql/minikql/mkql_terminator.cpp
index 275428a4d5..d86ba4edaf 100644
--- a/ydb/library/yql/minikql/mkql_terminator.cpp
+++ b/ydb/library/yql/minikql/mkql_terminator.cpp
@@ -8,14 +8,14 @@ namespace NMiniKQL {
thread_local ITerminator* TBindTerminator::Terminator = nullptr;
TBindTerminator::TBindTerminator(ITerminator* terminator)
- : PreviousTerminator(Terminator)
+ : PreviousTerminator(Terminator)
{
- Terminator = terminator;
+ Terminator = terminator;
}
TBindTerminator::~TBindTerminator()
{
- Terminator = PreviousTerminator;
+ Terminator = PreviousTerminator;
}
[[noreturn]] void MKQLTerminate(const char* message) {
diff --git a/ydb/library/yql/minikql/mkql_terminator.h b/ydb/library/yql/minikql/mkql_terminator.h
index 97259759bc..2453e88938 100644
--- a/ydb/library/yql/minikql/mkql_terminator.h
+++ b/ydb/library/yql/minikql/mkql_terminator.h
@@ -23,7 +23,7 @@ struct TBindTerminator : private TNonCopyable {
static thread_local ITerminator* Terminator;
private:
- ITerminator* PreviousTerminator;
+ ITerminator* PreviousTerminator;
};
[[noreturn]] void MKQLTerminate(const char* message);
diff --git a/ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/write_session.cpp b/ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/write_session.cpp
index 9b40e1c156..af4e08b979 100644
--- a/ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/write_session.cpp
+++ b/ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/write_session.cpp
@@ -1127,11 +1127,11 @@ void TWriteSession::HandleWakeUpImpl() {
LastTokenUpdate = TInstant::Now();
UpdateTokenIfNeededImpl();
}
-
- const auto flushAfter = CurrentBatch.StartedAt == TInstant::Zero()
- ? WakeupInterval
- : WakeupInterval - Min(Now() - CurrentBatch.StartedAt, WakeupInterval);
- Connections->ScheduleCallback(flushAfter, std::move(callback));
+
+ const auto flushAfter = CurrentBatch.StartedAt == TInstant::Zero()
+ ? WakeupInterval
+ : WakeupInterval - Min(Now() - CurrentBatch.StartedAt, WakeupInterval);
+ Connections->ScheduleCallback(flushAfter, std::move(callback));
}
void TWriteSession::UpdateTimedCountersImpl() {
diff --git a/ydb/public/tools/lib/cmds/__init__.py b/ydb/public/tools/lib/cmds/__init__.py
index 71382b515a..977f0a4aae 100644
--- a/ydb/public/tools/lib/cmds/__init__.py
+++ b/ydb/public/tools/lib/cmds/__init__.py
@@ -143,7 +143,7 @@ class Recipe(object):
return os.path.join(self.arguments.ydb_working_dir, self.recipe_metafile)
if os.getenv(self.recipe_metafile_var) is not None:
return os.getenv(self.recipe_metafile_var)
- return os.path.join(self.generate_data_path(), self.recipe_metafile)
+ return os.path.join(self.generate_data_path(), self.recipe_metafile)
def database_file_path(self):
if self.arguments.ydb_working_dir:
diff --git a/ydb/tests/library/harness/kikimr_config.py b/ydb/tests/library/harness/kikimr_config.py
index a14da6596e..57c282b9f4 100644
--- a/ydb/tests/library/harness/kikimr_config.py
+++ b/ydb/tests/library/harness/kikimr_config.py
@@ -103,7 +103,7 @@ class KikimrConfigGenerator(object):
use_log_files=True,
grpc_ssl_enable=False,
use_in_memory_pdisks=False,
- enable_pqcd=False,
+ enable_pqcd=False,
enable_metering=False,
grpc_tls_data_path=None,
yql_config_path=None,