diff options
author | Maxim Yurchuk <maxim-yurchuk@ydb.tech> | 2024-11-30 18:13:21 +0000 |
---|---|---|
committer | GitHub <noreply@github.com> | 2024-11-30 18:13:21 +0000 |
commit | 7466d62733bffe5cb040f37b21c5a9a4ad174353 (patch) | |
tree | 10d7bd92bb71816d77d6df3499415ad9575f92e7 /yql | |
parent | d7f99d9336ffa2fd5e9340c9b1d62fb767d70e9a (diff) | |
parent | 331f05c2e8cac3dc77c994803ae0721bf33b37a6 (diff) | |
download | ydb-7466d62733bffe5cb040f37b21c5a9a4ad174353.tar.gz |
Merge pull request #12145 from ydb-platform/mergelibs-241129-1330
Library import 241129-1330
Diffstat (limited to 'yql')
-rw-r--r-- | yql/essentials/providers/common/proto/gateways_config.proto | 1 | ||||
-rw-r--r-- | yql/essentials/tools/purebench/purebench.cpp | 273 | ||||
-rw-r--r-- | yql/essentials/tools/purebench/ya.make | 6 |
3 files changed, 264 insertions, 16 deletions
diff --git a/yql/essentials/providers/common/proto/gateways_config.proto b/yql/essentials/providers/common/proto/gateways_config.proto index 35654cb467..875ccaa22f 100644 --- a/yql/essentials/providers/common/proto/gateways_config.proto +++ b/yql/essentials/providers/common/proto/gateways_config.proto @@ -328,6 +328,7 @@ message TPqClusterConfig { repeated TAttr Settings = 100; optional bool SharedReading = 101; optional string ReconnectPeriod = 102; // disabled by default, example of a parameter: 5m + optional string ReadGroup = 103; } message TPqGatewayConfig { diff --git a/yql/essentials/tools/purebench/purebench.cpp b/yql/essentials/tools/purebench/purebench.cpp index 1ec81317b8..088ebc32b5 100644 --- a/yql/essentials/tools/purebench/purebench.cpp +++ b/yql/essentials/tools/purebench/purebench.cpp @@ -2,17 +2,23 @@ #include <library/cpp/getopt/last_getopt.h> #include <yql/essentials/public/purecalc/purecalc.h> -#include <yt/yql/purecalc/io_specs/mkql/spec.h> #include <yql/essentials/public/purecalc/io_specs/arrow/spec.h> #include <yql/essentials/public/purecalc/helpers/stream/stream_from_vector.h> +#include <yql/essentials/utils/yql_panic.h> #include <yql/essentials/utils/log/log.h> #include <yql/essentials/utils/backtrace/backtrace.h> #include <yql/essentials/public/udf/arrow/util.h> #include <yql/essentials/public/udf/udf_registrator.h> #include <yql/essentials/public/udf/udf_version.h> -#include <library/cpp/skiff/skiff.h> +#include <yql/essentials/minikql/mkql_alloc.h> +#include <yql/essentials/minikql/computation/mkql_computation_node_holders.h> +#include <yql/essentials/minikql/computation/mkql_custom_list.h> +#include <yql/essentials/minikql/computation/mkql_computation_node_pack.h> +#include <yql/essentials/providers/common/codec/yql_codec.h> +#include <yql/essentials/providers/common/schema/mkql/yql_mkql_schema.h> + #include <library/cpp/yson/writer.h> #include <util/datetime/cputimer.h> @@ -25,15 +31,249 @@ using namespace NYql; using namespace NYql::NPureCalc; +using namespace NKikimr::NMiniKQL; +using namespace NYql::NUdf; + +struct TPickleInputSpec : public TInputSpecBase { + TPickleInputSpec(const TVector<NYT::TNode>& schemas) + : Schemas(schemas) + {} + + const TVector<NYT::TNode>& GetSchemas() const final { + return Schemas; + } + + const TVector<NYT::TNode> Schemas; +}; + +class TPickleListValue final: public TCustomListValue { +public: + TPickleListValue( + TMemoryUsageInfo* memInfo, + const TPickleInputSpec& inputSpec, + ui32 index, + IInputStream* underlying, + IWorker* worker + ) + : TCustomListValue(memInfo) + , Underlying_(underlying) + , Worker_(worker) + , ScopedAlloc_(Worker_->GetScopedAlloc()) + , Packer_(false, Worker_->GetInputType(index)) + { + } + + TUnboxedValue GetListIterator() const override { + YQL_ENSURE(!HasIterator_, "Only one pass over input is supported"); + HasIterator_ = true; + return TUnboxedValuePod(const_cast<TPickleListValue*>(this)); + } + + bool Next(TUnboxedValue& result) override { + ui32 len; + auto read = Underlying_->Load(&len, sizeof(len)); + if (!read) { + return false; + } + + YQL_ENSURE(read == sizeof(len)); + if (len > RecordBuffer_.size()) { + RecordBuffer_.resize(Max<size_t>(2*RecordBuffer_.size(), len)); + } + + Underlying_->LoadOrFail(RecordBuffer_.data(), len); + result = Packer_.Unpack(TStringBuf(RecordBuffer_.data(), len), Worker_->GetGraph().GetHolderFactory()); + return true; + } + +private: + mutable bool HasIterator_ = false; + IInputStream* Underlying_; + IWorker* Worker_; + TScopedAlloc& ScopedAlloc_; + TValuePackerGeneric<true> Packer_; + TVector<char> RecordBuffer_; +}; + +template <> +struct TInputSpecTraits<TPickleInputSpec> { + static const constexpr bool IsPartial = false; + + static const constexpr bool SupportPullStreamMode = false; + static const constexpr bool SupportPullListMode = true; + static const constexpr bool SupportPushStreamMode = false; + + static void PreparePullListWorker(const TPickleInputSpec& spec, IPullListWorker* worker, IInputStream* stream) { + PreparePullListWorker(spec, worker, TVector<IInputStream*>({stream})); + } + + static void PreparePullListWorker(const TPickleInputSpec& spec, IPullListWorker* worker, const TVector<IInputStream*>& streams) { + YQL_ENSURE(worker->GetInputsCount() == streams.size(), + "number of input streams should match number of inputs provided by spec"); + + with_lock(worker->GetScopedAlloc()) { + auto& holderFactory = worker->GetGraph().GetHolderFactory(); + for (ui32 i = 0; i < streams.size(); i++) { + auto input = holderFactory.template Create<TPickleListValue>( + spec, i, std::move(streams[i]), worker); + worker->SetInput(std::move(input), i); + } + } + } +}; + +struct TPickleOutputSpec : public TOutputSpecBase { + TPickleOutputSpec(const NYT::TNode& schema) + : Schema(schema) + {} + + const NYT::TNode& GetSchema() const final { + return Schema; + } + + const NYT::TNode Schema; +}; + +class TStreamOutputHandle: private TMoveOnly { +public: + virtual NKikimr::NMiniKQL::TType* GetOutputType() const = 0; + virtual void Run(IOutputStream*) = 0; + virtual ~TStreamOutputHandle() = default; +}; + +class TPickleOutputHandle final: public TStreamOutputHandle { +public: + TPickleOutputHandle(TWorkerHolder<IPullListWorker> worker) + : Worker_(std::move(worker)) + , Packer_(false, Worker_->GetOutputType()) + {} + + NKikimr::NMiniKQL::TType* GetOutputType() const final { + return const_cast<NKikimr::NMiniKQL::TType*>(Worker_->GetOutputType()); + } + + void Run(IOutputStream* stream) final { + Y_ENSURE( + Worker_->GetOutputType()->IsStruct(), + "Run(IOutputStream*) cannot be used with multi-output programs"); + + TBindTerminator bind(Worker_->GetGraph().GetTerminator()); + + with_lock(Worker_->GetScopedAlloc()) { + const auto outputIterator = Worker_->GetOutputIterator(); + + TUnboxedValue value; + while (outputIterator.Next(value)) { + auto buf = Packer_.Pack(value); + ui32 len = buf.Size(); + stream->Write(&len, sizeof(len)); + stream->Write(buf.Data(), len); + } + } + } + +private: + TWorkerHolder<IPullListWorker> Worker_; + TValuePackerGeneric<true> Packer_; +}; + +template <> +struct TOutputSpecTraits<TPickleOutputSpec> { + static const constexpr bool IsPartial = false; + + static const constexpr bool SupportPullStreamMode = false; + static const constexpr bool SupportPullListMode = true; + static const constexpr bool SupportPushStreamMode = false; + + using TPullListReturnType = THolder<TPickleOutputHandle>; + + static TPullListReturnType ConvertPullListWorkerToOutputType(const TPickleOutputSpec&, TWorkerHolder<IPullListWorker> worker) { + return MakeHolder<TPickleOutputHandle>(std::move(worker)); + } +}; + +struct TPrintOutputSpec : public TOutputSpecBase { + TPrintOutputSpec(const NYT::TNode& schema) + : Schema(schema) + {} + + const NYT::TNode& GetSchema() const final { + return Schema; + } + + const NYT::TNode Schema; +}; + +class TPrintOutputHandle final: public TStreamOutputHandle { +public: + TPrintOutputHandle(TWorkerHolder<IPullListWorker> worker) + : Worker_(std::move(worker)) + {} + + NKikimr::NMiniKQL::TType* GetOutputType() const final { + return const_cast<NKikimr::NMiniKQL::TType*>(Worker_->GetOutputType()); + } + + void Run(IOutputStream* stream) final { + Y_ENSURE( + Worker_->GetOutputType()->IsStruct(), + "Run(IOutputStream*) cannot be used with multi-output programs"); + + TBindTerminator bind(Worker_->GetGraph().GetTerminator()); + + with_lock(Worker_->GetScopedAlloc()) { + const auto outputIterator = Worker_->GetOutputIterator(); + + TUnboxedValue value; + while (outputIterator.Next(value)) { + auto str = NCommon::WriteYsonValue(value, GetOutputType()); + stream->Write(str.data(), str.size()); + stream->Write(';'); + } + } + } + +private: + TWorkerHolder<IPullListWorker> Worker_; +}; + +template <> +struct TOutputSpecTraits<TPrintOutputSpec> { + static const constexpr bool IsPartial = false; + + static const constexpr bool SupportPullStreamMode = false; + static const constexpr bool SupportPullListMode = true; + static const constexpr bool SupportPushStreamMode = false; + + using TPullListReturnType = THolder<TPrintOutputHandle>; + + static TPullListReturnType ConvertPullListWorkerToOutputType(const TPrintOutputSpec&, TWorkerHolder<IPullListWorker> worker) { + return MakeHolder<TPrintOutputHandle>(std::move(worker)); + } +}; TStringStream MakeGenInput(ui64 count) { TStringStream stream; - NSkiff::TUncheckedSkiffWriter writer{&stream}; + TScopedAlloc alloc(__LOCATION__); + TTypeEnvironment env(alloc); + TMemoryUsageInfo memInfo("MakeGenInput"); + THolderFactory holderFactory(alloc.Ref(), memInfo); + auto ui64Type = env.GetUi64Lazy(); + std::pair<TString, NKikimr::NMiniKQL::TType*> member("index", ui64Type); + auto ui64StructType = TStructType::Create(&member, 1, env); + TValuePackerGeneric<true> packer(false, ui64StructType); + + TPlainContainerCache cache; for (ui64 i = 0; i < count; ++i) { - writer.WriteVariant16Tag(0); - writer.WriteInt64(i); + TUnboxedValue* items; + auto array = cache.NewArray(holderFactory, 1, items); + items[0] = TUnboxedValuePod(i); + auto buf = packer.Pack(array); + ui32 len = buf.Size(); + stream.Write(&len, sizeof(len)); + stream.Write(buf.Data(), len); } - writer.Finish(); + return stream; } @@ -46,9 +286,9 @@ NYT::TNode RunGenSql( const TVector<NYT::TNode>& inputSchema, const TString& sql, ETranslationMode isPg, - TRunCallable<TSkiffInputSpec, TOutputSpec> runCallable + TRunCallable<TPickleInputSpec, TOutputSpec> runCallable ) { - auto inputSpec = TSkiffInputSpec(inputSchema); + auto inputSpec = TPickleInputSpec(inputSchema); auto outputSpec = TOutputSpec({NYT::TNode::CreateEntity()}); auto program = factory->MakePullListProgram(inputSpec, outputSpec, sql, isPg); @@ -66,13 +306,18 @@ void ShowResults( TStream* input ) { auto inputSpec = TInputSpec(inputSchema); - auto outputSpec = TYsonOutputSpec({NYT::TNode::CreateEntity()}); + auto outputSpec = TPrintOutputSpec({NYT::TNode::CreateEntity()}); auto program = factory->MakePullListProgram(inputSpec, outputSpec, sql, isPg); auto handle = program->Apply(input); TStringStream output; + output << "{Type="; + output << NCommon::WriteTypeToYson(handle->GetOutputType()); + output << ";Data=["; handle->Run(&output); + output << "]}"; TStringInput in(output.Str()); - NYson::ReformatYsonStream(&in, &Cerr, NYson::EYsonFormat::Pretty, NYson::EYsonType::ListFragment); + NYson::ReformatYsonStream(&in, &Cerr, NYson::EYsonFormat::Pretty, NYson::EYsonType::Node); + Cerr << "\n"; } template <typename TInputSpec, typename TOutputSpec> @@ -182,7 +427,7 @@ int Main(int argc, const char *argv[]) if (blockEngineSettings == "disable") { TStringStream outputGenStream; - auto outputGenSchema = RunGenSql<TSkiffOutputSpec>( + auto outputGenSchema = RunGenSql<TPickleOutputSpec>( factory, inputGenSchema, genSql, isPgGen, [&](const auto& program) { auto handle = program->Apply(&inputGenStream); @@ -192,12 +437,12 @@ int Main(int argc, const char *argv[]) if (showResults) { auto inputResStream = TStringStream(outputGenStream); - ShowResults<TSkiffInputSpec>( + ShowResults<TPickleInputSpec>( factory, {outputGenSchema}, testSql, isPgTest, &inputResStream); } inputBenchSize = outputGenStream.Size(); - normalizedTime = RunBenchmarks<TSkiffInputSpec, TSkiffOutputSpec>( + normalizedTime = RunBenchmarks<TPickleInputSpec, TPickleOutputSpec>( factory, {outputGenSchema}, testSql, isPgTest, repeats, [&](const auto& program) { auto inputBorrowed = TStringStream(outputGenStream); @@ -206,7 +451,7 @@ int Main(int argc, const char *argv[]) handle->Run(&output); }); } else { - auto inputGenSpec = TSkiffInputSpec(inputGenSchema); + auto inputGenSpec = TPickleInputSpec(inputGenSchema); auto outputGenSpec = TArrowOutputSpec({NYT::TNode::CreateEntity()}); // XXX: <RunGenSql> cannot be used for this case, since all buffers // from the Datums in the obtained batches are owned by the worker's diff --git a/yql/essentials/tools/purebench/ya.make b/yql/essentials/tools/purebench/ya.make index 7068203bae..9ef54e3a0e 100644 --- a/yql/essentials/tools/purebench/ya.make +++ b/yql/essentials/tools/purebench/ya.make @@ -21,11 +21,13 @@ PEERDIR( yql/essentials/utils/log yql/essentials/public/udf yql/essentials/public/udf/service/exception_policy - library/cpp/skiff library/cpp/yson - yt/yql/purecalc/io_specs/mkql yql/essentials/public/purecalc/io_specs/arrow yql/essentials/public/purecalc + yql/essentials/minikql + yql/essentials/minikql/computation + yql/essentials/providers/common/codec + yql/essentials/providers/common/schema/mkql ) YQL_LAST_ABI_VERSION() |