aboutsummaryrefslogtreecommitdiffstats
path: root/yql
diff options
context:
space:
mode:
authorMaxim Yurchuk <maxim-yurchuk@ydb.tech>2024-11-30 18:13:21 +0000
committerGitHub <noreply@github.com>2024-11-30 18:13:21 +0000
commit7466d62733bffe5cb040f37b21c5a9a4ad174353 (patch)
tree10d7bd92bb71816d77d6df3499415ad9575f92e7 /yql
parentd7f99d9336ffa2fd5e9340c9b1d62fb767d70e9a (diff)
parent331f05c2e8cac3dc77c994803ae0721bf33b37a6 (diff)
downloadydb-7466d62733bffe5cb040f37b21c5a9a4ad174353.tar.gz
Merge pull request #12145 from ydb-platform/mergelibs-241129-1330
Library import 241129-1330
Diffstat (limited to 'yql')
-rw-r--r--yql/essentials/providers/common/proto/gateways_config.proto1
-rw-r--r--yql/essentials/tools/purebench/purebench.cpp273
-rw-r--r--yql/essentials/tools/purebench/ya.make6
3 files changed, 264 insertions, 16 deletions
diff --git a/yql/essentials/providers/common/proto/gateways_config.proto b/yql/essentials/providers/common/proto/gateways_config.proto
index 35654cb467..875ccaa22f 100644
--- a/yql/essentials/providers/common/proto/gateways_config.proto
+++ b/yql/essentials/providers/common/proto/gateways_config.proto
@@ -328,6 +328,7 @@ message TPqClusterConfig {
repeated TAttr Settings = 100;
optional bool SharedReading = 101;
optional string ReconnectPeriod = 102; // disabled by default, example of a parameter: 5m
+ optional string ReadGroup = 103;
}
message TPqGatewayConfig {
diff --git a/yql/essentials/tools/purebench/purebench.cpp b/yql/essentials/tools/purebench/purebench.cpp
index 1ec81317b8..088ebc32b5 100644
--- a/yql/essentials/tools/purebench/purebench.cpp
+++ b/yql/essentials/tools/purebench/purebench.cpp
@@ -2,17 +2,23 @@
#include <library/cpp/getopt/last_getopt.h>
#include <yql/essentials/public/purecalc/purecalc.h>
-#include <yt/yql/purecalc/io_specs/mkql/spec.h>
#include <yql/essentials/public/purecalc/io_specs/arrow/spec.h>
#include <yql/essentials/public/purecalc/helpers/stream/stream_from_vector.h>
+#include <yql/essentials/utils/yql_panic.h>
#include <yql/essentials/utils/log/log.h>
#include <yql/essentials/utils/backtrace/backtrace.h>
#include <yql/essentials/public/udf/arrow/util.h>
#include <yql/essentials/public/udf/udf_registrator.h>
#include <yql/essentials/public/udf/udf_version.h>
-#include <library/cpp/skiff/skiff.h>
+#include <yql/essentials/minikql/mkql_alloc.h>
+#include <yql/essentials/minikql/computation/mkql_computation_node_holders.h>
+#include <yql/essentials/minikql/computation/mkql_custom_list.h>
+#include <yql/essentials/minikql/computation/mkql_computation_node_pack.h>
+#include <yql/essentials/providers/common/codec/yql_codec.h>
+#include <yql/essentials/providers/common/schema/mkql/yql_mkql_schema.h>
+
#include <library/cpp/yson/writer.h>
#include <util/datetime/cputimer.h>
@@ -25,15 +31,249 @@
using namespace NYql;
using namespace NYql::NPureCalc;
+using namespace NKikimr::NMiniKQL;
+using namespace NYql::NUdf;
+
+struct TPickleInputSpec : public TInputSpecBase {
+ TPickleInputSpec(const TVector<NYT::TNode>& schemas)
+ : Schemas(schemas)
+ {}
+
+ const TVector<NYT::TNode>& GetSchemas() const final {
+ return Schemas;
+ }
+
+ const TVector<NYT::TNode> Schemas;
+};
+
+class TPickleListValue final: public TCustomListValue {
+public:
+ TPickleListValue(
+ TMemoryUsageInfo* memInfo,
+ const TPickleInputSpec& inputSpec,
+ ui32 index,
+ IInputStream* underlying,
+ IWorker* worker
+ )
+ : TCustomListValue(memInfo)
+ , Underlying_(underlying)
+ , Worker_(worker)
+ , ScopedAlloc_(Worker_->GetScopedAlloc())
+ , Packer_(false, Worker_->GetInputType(index))
+ {
+ }
+
+ TUnboxedValue GetListIterator() const override {
+ YQL_ENSURE(!HasIterator_, "Only one pass over input is supported");
+ HasIterator_ = true;
+ return TUnboxedValuePod(const_cast<TPickleListValue*>(this));
+ }
+
+ bool Next(TUnboxedValue& result) override {
+ ui32 len;
+ auto read = Underlying_->Load(&len, sizeof(len));
+ if (!read) {
+ return false;
+ }
+
+ YQL_ENSURE(read == sizeof(len));
+ if (len > RecordBuffer_.size()) {
+ RecordBuffer_.resize(Max<size_t>(2*RecordBuffer_.size(), len));
+ }
+
+ Underlying_->LoadOrFail(RecordBuffer_.data(), len);
+ result = Packer_.Unpack(TStringBuf(RecordBuffer_.data(), len), Worker_->GetGraph().GetHolderFactory());
+ return true;
+ }
+
+private:
+ mutable bool HasIterator_ = false;
+ IInputStream* Underlying_;
+ IWorker* Worker_;
+ TScopedAlloc& ScopedAlloc_;
+ TValuePackerGeneric<true> Packer_;
+ TVector<char> RecordBuffer_;
+};
+
+template <>
+struct TInputSpecTraits<TPickleInputSpec> {
+ static const constexpr bool IsPartial = false;
+
+ static const constexpr bool SupportPullStreamMode = false;
+ static const constexpr bool SupportPullListMode = true;
+ static const constexpr bool SupportPushStreamMode = false;
+
+ static void PreparePullListWorker(const TPickleInputSpec& spec, IPullListWorker* worker, IInputStream* stream) {
+ PreparePullListWorker(spec, worker, TVector<IInputStream*>({stream}));
+ }
+
+ static void PreparePullListWorker(const TPickleInputSpec& spec, IPullListWorker* worker, const TVector<IInputStream*>& streams) {
+ YQL_ENSURE(worker->GetInputsCount() == streams.size(),
+ "number of input streams should match number of inputs provided by spec");
+
+ with_lock(worker->GetScopedAlloc()) {
+ auto& holderFactory = worker->GetGraph().GetHolderFactory();
+ for (ui32 i = 0; i < streams.size(); i++) {
+ auto input = holderFactory.template Create<TPickleListValue>(
+ spec, i, std::move(streams[i]), worker);
+ worker->SetInput(std::move(input), i);
+ }
+ }
+ }
+};
+
+struct TPickleOutputSpec : public TOutputSpecBase {
+ TPickleOutputSpec(const NYT::TNode& schema)
+ : Schema(schema)
+ {}
+
+ const NYT::TNode& GetSchema() const final {
+ return Schema;
+ }
+
+ const NYT::TNode Schema;
+};
+
+class TStreamOutputHandle: private TMoveOnly {
+public:
+ virtual NKikimr::NMiniKQL::TType* GetOutputType() const = 0;
+ virtual void Run(IOutputStream*) = 0;
+ virtual ~TStreamOutputHandle() = default;
+};
+
+class TPickleOutputHandle final: public TStreamOutputHandle {
+public:
+ TPickleOutputHandle(TWorkerHolder<IPullListWorker> worker)
+ : Worker_(std::move(worker))
+ , Packer_(false, Worker_->GetOutputType())
+ {}
+
+ NKikimr::NMiniKQL::TType* GetOutputType() const final {
+ return const_cast<NKikimr::NMiniKQL::TType*>(Worker_->GetOutputType());
+ }
+
+ void Run(IOutputStream* stream) final {
+ Y_ENSURE(
+ Worker_->GetOutputType()->IsStruct(),
+ "Run(IOutputStream*) cannot be used with multi-output programs");
+
+ TBindTerminator bind(Worker_->GetGraph().GetTerminator());
+
+ with_lock(Worker_->GetScopedAlloc()) {
+ const auto outputIterator = Worker_->GetOutputIterator();
+
+ TUnboxedValue value;
+ while (outputIterator.Next(value)) {
+ auto buf = Packer_.Pack(value);
+ ui32 len = buf.Size();
+ stream->Write(&len, sizeof(len));
+ stream->Write(buf.Data(), len);
+ }
+ }
+ }
+
+private:
+ TWorkerHolder<IPullListWorker> Worker_;
+ TValuePackerGeneric<true> Packer_;
+};
+
+template <>
+struct TOutputSpecTraits<TPickleOutputSpec> {
+ static const constexpr bool IsPartial = false;
+
+ static const constexpr bool SupportPullStreamMode = false;
+ static const constexpr bool SupportPullListMode = true;
+ static const constexpr bool SupportPushStreamMode = false;
+
+ using TPullListReturnType = THolder<TPickleOutputHandle>;
+
+ static TPullListReturnType ConvertPullListWorkerToOutputType(const TPickleOutputSpec&, TWorkerHolder<IPullListWorker> worker) {
+ return MakeHolder<TPickleOutputHandle>(std::move(worker));
+ }
+};
+
+struct TPrintOutputSpec : public TOutputSpecBase {
+ TPrintOutputSpec(const NYT::TNode& schema)
+ : Schema(schema)
+ {}
+
+ const NYT::TNode& GetSchema() const final {
+ return Schema;
+ }
+
+ const NYT::TNode Schema;
+};
+
+class TPrintOutputHandle final: public TStreamOutputHandle {
+public:
+ TPrintOutputHandle(TWorkerHolder<IPullListWorker> worker)
+ : Worker_(std::move(worker))
+ {}
+
+ NKikimr::NMiniKQL::TType* GetOutputType() const final {
+ return const_cast<NKikimr::NMiniKQL::TType*>(Worker_->GetOutputType());
+ }
+
+ void Run(IOutputStream* stream) final {
+ Y_ENSURE(
+ Worker_->GetOutputType()->IsStruct(),
+ "Run(IOutputStream*) cannot be used with multi-output programs");
+
+ TBindTerminator bind(Worker_->GetGraph().GetTerminator());
+
+ with_lock(Worker_->GetScopedAlloc()) {
+ const auto outputIterator = Worker_->GetOutputIterator();
+
+ TUnboxedValue value;
+ while (outputIterator.Next(value)) {
+ auto str = NCommon::WriteYsonValue(value, GetOutputType());
+ stream->Write(str.data(), str.size());
+ stream->Write(';');
+ }
+ }
+ }
+
+private:
+ TWorkerHolder<IPullListWorker> Worker_;
+};
+
+template <>
+struct TOutputSpecTraits<TPrintOutputSpec> {
+ static const constexpr bool IsPartial = false;
+
+ static const constexpr bool SupportPullStreamMode = false;
+ static const constexpr bool SupportPullListMode = true;
+ static const constexpr bool SupportPushStreamMode = false;
+
+ using TPullListReturnType = THolder<TPrintOutputHandle>;
+
+ static TPullListReturnType ConvertPullListWorkerToOutputType(const TPrintOutputSpec&, TWorkerHolder<IPullListWorker> worker) {
+ return MakeHolder<TPrintOutputHandle>(std::move(worker));
+ }
+};
TStringStream MakeGenInput(ui64 count) {
TStringStream stream;
- NSkiff::TUncheckedSkiffWriter writer{&stream};
+ TScopedAlloc alloc(__LOCATION__);
+ TTypeEnvironment env(alloc);
+ TMemoryUsageInfo memInfo("MakeGenInput");
+ THolderFactory holderFactory(alloc.Ref(), memInfo);
+ auto ui64Type = env.GetUi64Lazy();
+ std::pair<TString, NKikimr::NMiniKQL::TType*> member("index", ui64Type);
+ auto ui64StructType = TStructType::Create(&member, 1, env);
+ TValuePackerGeneric<true> packer(false, ui64StructType);
+
+ TPlainContainerCache cache;
for (ui64 i = 0; i < count; ++i) {
- writer.WriteVariant16Tag(0);
- writer.WriteInt64(i);
+ TUnboxedValue* items;
+ auto array = cache.NewArray(holderFactory, 1, items);
+ items[0] = TUnboxedValuePod(i);
+ auto buf = packer.Pack(array);
+ ui32 len = buf.Size();
+ stream.Write(&len, sizeof(len));
+ stream.Write(buf.Data(), len);
}
- writer.Finish();
+
return stream;
}
@@ -46,9 +286,9 @@ NYT::TNode RunGenSql(
const TVector<NYT::TNode>& inputSchema,
const TString& sql,
ETranslationMode isPg,
- TRunCallable<TSkiffInputSpec, TOutputSpec> runCallable
+ TRunCallable<TPickleInputSpec, TOutputSpec> runCallable
) {
- auto inputSpec = TSkiffInputSpec(inputSchema);
+ auto inputSpec = TPickleInputSpec(inputSchema);
auto outputSpec = TOutputSpec({NYT::TNode::CreateEntity()});
auto program = factory->MakePullListProgram(inputSpec, outputSpec, sql, isPg);
@@ -66,13 +306,18 @@ void ShowResults(
TStream* input
) {
auto inputSpec = TInputSpec(inputSchema);
- auto outputSpec = TYsonOutputSpec({NYT::TNode::CreateEntity()});
+ auto outputSpec = TPrintOutputSpec({NYT::TNode::CreateEntity()});
auto program = factory->MakePullListProgram(inputSpec, outputSpec, sql, isPg);
auto handle = program->Apply(input);
TStringStream output;
+ output << "{Type=";
+ output << NCommon::WriteTypeToYson(handle->GetOutputType());
+ output << ";Data=[";
handle->Run(&output);
+ output << "]}";
TStringInput in(output.Str());
- NYson::ReformatYsonStream(&in, &Cerr, NYson::EYsonFormat::Pretty, NYson::EYsonType::ListFragment);
+ NYson::ReformatYsonStream(&in, &Cerr, NYson::EYsonFormat::Pretty, NYson::EYsonType::Node);
+ Cerr << "\n";
}
template <typename TInputSpec, typename TOutputSpec>
@@ -182,7 +427,7 @@ int Main(int argc, const char *argv[])
if (blockEngineSettings == "disable") {
TStringStream outputGenStream;
- auto outputGenSchema = RunGenSql<TSkiffOutputSpec>(
+ auto outputGenSchema = RunGenSql<TPickleOutputSpec>(
factory, inputGenSchema, genSql, isPgGen,
[&](const auto& program) {
auto handle = program->Apply(&inputGenStream);
@@ -192,12 +437,12 @@ int Main(int argc, const char *argv[])
if (showResults) {
auto inputResStream = TStringStream(outputGenStream);
- ShowResults<TSkiffInputSpec>(
+ ShowResults<TPickleInputSpec>(
factory, {outputGenSchema}, testSql, isPgTest, &inputResStream);
}
inputBenchSize = outputGenStream.Size();
- normalizedTime = RunBenchmarks<TSkiffInputSpec, TSkiffOutputSpec>(
+ normalizedTime = RunBenchmarks<TPickleInputSpec, TPickleOutputSpec>(
factory, {outputGenSchema}, testSql, isPgTest, repeats,
[&](const auto& program) {
auto inputBorrowed = TStringStream(outputGenStream);
@@ -206,7 +451,7 @@ int Main(int argc, const char *argv[])
handle->Run(&output);
});
} else {
- auto inputGenSpec = TSkiffInputSpec(inputGenSchema);
+ auto inputGenSpec = TPickleInputSpec(inputGenSchema);
auto outputGenSpec = TArrowOutputSpec({NYT::TNode::CreateEntity()});
// XXX: <RunGenSql> cannot be used for this case, since all buffers
// from the Datums in the obtained batches are owned by the worker's
diff --git a/yql/essentials/tools/purebench/ya.make b/yql/essentials/tools/purebench/ya.make
index 7068203bae..9ef54e3a0e 100644
--- a/yql/essentials/tools/purebench/ya.make
+++ b/yql/essentials/tools/purebench/ya.make
@@ -21,11 +21,13 @@ PEERDIR(
yql/essentials/utils/log
yql/essentials/public/udf
yql/essentials/public/udf/service/exception_policy
- library/cpp/skiff
library/cpp/yson
- yt/yql/purecalc/io_specs/mkql
yql/essentials/public/purecalc/io_specs/arrow
yql/essentials/public/purecalc
+ yql/essentials/minikql
+ yql/essentials/minikql/computation
+ yql/essentials/providers/common/codec
+ yql/essentials/providers/common/schema/mkql
)
YQL_LAST_ABI_VERSION()