diff options
author | a-romanov <a-romanov@yandex-team.ru> | 2022-03-18 03:14:32 +0300 |
---|---|---|
committer | a-romanov <a-romanov@yandex-team.ru> | 2022-03-18 03:14:32 +0300 |
commit | 972191bd72f3e29e1070f02cc43917a1fc8a81ee (patch) | |
tree | 130f9d8753b5b7fcd248481cb7fcfa4f4d2ab0cd | |
parent | 29742ca2d4486c34dd664eaf05731175fe2b30a7 (diff) | |
download | ydb-972191bd72f3e29e1070f02cc43917a1fc8a81ee.tar.gz |
YQ-727 S3 Source coro actor first draft.
ref:e08f3219646c5a1216fd053eea1e60c477a113c0
6 files changed, 259 insertions, 9 deletions
diff --git a/ydb/library/yql/providers/s3/actors/CMakeLists.txt b/ydb/library/yql/providers/s3/actors/CMakeLists.txt index e95f3343a1..6e59be0cdd 100644 --- a/ydb/library/yql/providers/s3/actors/CMakeLists.txt +++ b/ydb/library/yql/providers/s3/actors/CMakeLists.txt @@ -16,6 +16,7 @@ target_link_libraries(providers-s3-actors PUBLIC yutil yql-minikql-computation common-token_accessor-client + common-schema-mkql yql-public-types dq-actors-compute providers-common-http_gateway diff --git a/ydb/library/yql/providers/s3/actors/yql_s3_read_actor.cpp b/ydb/library/yql/providers/s3/actors/yql_s3_read_actor.cpp index d2d0670654..32e88705f1 100644 --- a/ydb/library/yql/providers/s3/actors/yql_s3_read_actor.cpp +++ b/ydb/library/yql/providers/s3/actors/yql_s3_read_actor.cpp @@ -1,15 +1,26 @@ #include "yql_s3_read_actor.h" #include <ydb/library/yql/minikql/mkql_string_util.h> +#include <ydb/library/yql/minikql/computation/mkql_computation_node.h> +#include <ydb/library/yql/minikql/mkql_program_builder.h> +#include <ydb/library/yql/minikql/invoke_builtins/mkql_builtins.h> +#include <ydb/library/yql/minikql/mkql_function_registry.h> +#include <ydb/library/yql/minikql/mkql_terminator.h> +#include <ydb/library/yql/minikql/comp_nodes/mkql_factories.h> +#include <ydb/library/yql/providers/common/schema/mkql/yql_mkql_schema.h> #include <ydb/library/yql/utils/yql_panic.h> + #include <ydb/library/yql/providers/s3/proto/range.pb.h> #include <ydb/library/yql/providers/common/provider/yql_provider_names.h> #include <library/cpp/actors/core/actor_bootstrapped.h> +#include <library/cpp/actors/core/actor_coroutine.h> #include <library/cpp/actors/core/events.h> #include <library/cpp/actors/core/event_local.h> #include <library/cpp/actors/core/hfunc.h> +#include <util/generic/size_literals.h> + #include <queue> namespace NYql::NDq { @@ -24,8 +35,10 @@ struct TEvPrivate { EvBegin = EventSpaceBegin(TEvents::ES_PRIVATE), EvReadResult = EvBegin, + EvReadFinished, EvReadError, EvRetry, + EvReadDone, EvEnd }; @@ -34,20 +47,28 @@ struct TEvPrivate { // Events struct TEvReadResult : public TEventLocal<TEvReadResult, EvReadResult> { - TEvReadResult(IHTTPGateway::TContent&& result): Result(std::move(result)) {} + TEvReadResult(IHTTPGateway::TContent&& result, size_t pathInd): Result(std::move(result)), PathIndex(pathInd) {} IHTTPGateway::TContent Result; + const size_t PathIndex = 0; + }; + + struct TEvReadFinished : public TEventLocal<TEvReadFinished, EvReadFinished> { + TEvReadFinished(size_t pathInd) : PathIndex(pathInd) {} + const size_t PathIndex = 0; }; struct TEvReadError : public TEventLocal<TEvReadError, EvReadError> { - TEvReadError(TIssues&& error, size_t pathInd): Error(std::move(error)), PathIndex(pathInd) {} - TIssues Error; - size_t PathIndex = 0; + TEvReadError(TIssues&& error, size_t pathInd) : Error(std::move(error)), PathIndex(pathInd) {} + const TIssues Error; + const size_t PathIndex = 0; }; struct TEvRetryEvent : public NActors::TEventLocal<TEvRetryEvent, EvRetry> { explicit TEvRetryEvent(size_t pathIndex) : PathIndex(pathIndex) {} - size_t PathIndex = 0; + const size_t PathIndex = 0; }; + + struct TEvReadDone : public TEventLocal<TEvReadDone, EvReadDone> {}; }; } // namespace @@ -137,7 +158,7 @@ private: static void OnDownloadFinished(TActorSystem* actorSystem, TActorId selfId, IHTTPGateway::TResult&& result, size_t pathInd) { switch (result.index()) { case 0U: - actorSystem->Send(new IEventHandle(selfId, TActorId(), new TEvPrivate::TEvReadResult(std::get<IHTTPGateway::TContent>(std::move(result))))); + actorSystem->Send(new IEventHandle(selfId, TActorId(), new TEvPrivate::TEvReadResult(std::get<IHTTPGateway::TContent>(std::move(result)), pathInd))); return; case 1U: actorSystem->Send(new IEventHandle(selfId, TActorId(), new TEvPrivate::TEvReadError(std::get<TIssues>(std::move(result)), pathInd))); @@ -223,7 +244,212 @@ private: ui32 MaxRetriesPerPath = 3; }; +using namespace NKikimr::NMiniKQL; + +struct TOutput { + TUnboxedValueDeque Data; + using TPtr = std::shared_ptr<TOutput>; +}; + +class TS3ReadCoroImpl : public TActorCoroImpl { +private: + class TCoroStreamWrapper: public TMutableComputationNode<TCoroStreamWrapper> { + using TBaseComputation = TMutableComputationNode<TCoroStreamWrapper>; + public: + class TStreamValue : public TComputationValue<TStreamValue> { + public: + using TBase = TComputationValue<TStreamValue>; + + TStreamValue(TMemoryUsageInfo* memInfo, TS3ReadCoroImpl* coro) + : TBase(memInfo), Coro(coro) + {} + + private: + NUdf::EFetchStatus Fetch(NUdf::TUnboxedValue& value) final { + return Coro->Next(value) ? NUdf::EFetchStatus::Ok : NUdf::EFetchStatus::Finish; + } + + private: + TS3ReadCoroImpl *const Coro; + }; + + TCoroStreamWrapper(TComputationMutables& mutables, TS3ReadCoroImpl* coro) + : TBaseComputation(mutables), Coro(coro) + {} + + NUdf::TUnboxedValuePod DoCalculate(TComputationContext& ctx) const { + return ctx.HolderFactory.Create<TStreamValue>(Coro); + } + + static IComputationNode* Make(TCallable&, const TComputationNodeFactoryContext& ctx, TS3ReadCoroImpl* coro) { + return new TCoroStreamWrapper(ctx.Mutables, coro); + } + private: + void RegisterDependencies() const final {} + TS3ReadCoroImpl *const Coro; + }; + +public: + TS3ReadCoroImpl(const TTypeEnvironment& typeEnv, const IFunctionRegistry& functionRegistry, ui64 inputIndex, const NActors::TActorId& computeActorId, ui64, TString format, TString rowType, TOutput::TPtr outputs) + : TActorCoroImpl(256_KB), TypeEnv(typeEnv), FunctionRegistry(functionRegistry), InputIndex(inputIndex), Format(std::move(format)), RowType(std::move(rowType)), ComputeActorId(computeActorId), Outputs(std::move(outputs)) + {} + + bool Next(NUdf::TUnboxedValue& value) { + TypeEnv.GetAllocator().Release(); + const auto ev = WaitForSpecificEvent<TEvPrivate::TEvReadResult, TEvPrivate::TEvReadError, TEvPrivate::TEvReadFinished>(); + TypeEnv.GetAllocator().Acquire(); + switch (const auto etype = ev->GetTypeRewrite()) { + case TEvPrivate::TEvReadFinished::EventType: + return false; + case TEvPrivate::TEvReadError::EventType: + return false; + case TEvPrivate::TEvReadResult::EventType: + value = NUdf::TUnboxedValue(MakeString(NUdf::TStringRef(std::string_view(ev->Get<TEvPrivate::TEvReadResult>()->Result)))); + return true; + default: + return false; + } + } +private: + void Run() final { + try { + const auto randStub = CreateDeterministicRandomProvider(1); + const auto timeStub = CreateDeterministicTimeProvider(10000000); + + const auto alloc = TypeEnv.BindAllocator(); + const auto pb = std::make_unique<TProgramBuilder>(TypeEnv, FunctionRegistry); + + TCallableBuilder callableBuilder(TypeEnv, "CoroStream", pb->NewStreamType(pb->NewDataType(NUdf::EDataSlot::String))); + + const auto factory = [this](TCallable& callable, const TComputationNodeFactoryContext& ctx) -> IComputationNode* { + return callable.GetType()->GetName() == "CoroStream" ? + TCoroStreamWrapper::Make(callable, ctx, this) : GetBuiltinFactory()(callable, ctx); + }; + + TRuntimeNode stream(callableBuilder.Build(), false); + + const auto outputItemType = NCommon::ParseTypeFromYson(TStringBuf(RowType), *pb, Cerr); + const auto userType = pb->NewTupleType({pb->NewTupleType({pb->NewDataType(NUdf::EDataSlot::String)}), pb->NewStructType({}), outputItemType}); + const auto root = pb->Apply(pb->Udf("ClickHouseClient.ParseFormat", {}, userType, Format), {stream}); + + TExploringNodeVisitor explorer; + explorer.Walk(root.GetNode(), TypeEnv); + TComputationPatternOpts opts(TypeEnv.GetAllocator().Ref(), TypeEnv, factory, &FunctionRegistry, NUdf::EValidateMode::None, NUdf::EValidatePolicy::Exception, "OFF", EGraphPerProcess::Single); + const auto pattern = MakeComputationPattern(explorer, root, {}, opts); + const auto graph = pattern->Clone(opts.ToComputationOptions(*randStub, *timeStub)); + const TBindTerminator bind(graph->GetTerminator()); + + const auto output = graph->GetValue(); + for (NUdf::TUnboxedValue v; NUdf::EFetchStatus::Ok == output.Fetch(v);) + Outputs->Data.emplace_back(std::move(v)); + + Outputs.reset(); + } catch (const yexception& err) { + Cerr << __func__ << " exception " << err.what() << Endl; + return; + } + + Y_UNUSED(WaitForSpecificEvent<TEvPrivate::TEvReadDone>()); + } + + void ProcessUnexpectedEvent(TAutoPtr<IEventHandle>) final { + Send(ComputeActorId, new IDqSourceActor::TEvSourceError(InputIndex, TIssues{TIssue("Unexpected event")}, true)); + } +private: + const TTypeEnvironment& TypeEnv; + const IFunctionRegistry& FunctionRegistry; + const ui64 InputIndex; + const TString Format, RowType, Compression; + const NActors::TActorId ComputeActorId; + TOutput::TPtr Outputs; +}; + +class TS3ReadCoroActor : public TActorCoro, public IDqSourceActor { +public: + using TPath = std::tuple<TString, size_t>; + using TPathList = std::vector<TPath>; + + TS3ReadCoroActor(THolder<TS3ReadCoroImpl> impl, + ui64 inputIndex, + IHTTPGateway::TPtr gateway, + const TString& url, + const TString& token, + TPathList&& paths, + TOutput::TPtr outputs) + : TActorCoro(THolder<TActorCoroImpl>(impl.Release())) + , InputIndex(inputIndex) + , Gateway(std::move(gateway)) + , Url(url) + , Headers(MakeHeader(token)) + , Paths(std::move(paths)) + , Outputs(std::move(outputs)) + {} + + static constexpr char ActorName[] = "S3_READ_ACTOR_CORO"; +private: + void SaveState(const NDqProto::TCheckpoint&, NDqProto::TSourceState&) final {} + void LoadState(const NDqProto::TSourceState&) final {} + void CommitState(const NDqProto::TCheckpoint&) final {} + ui64 GetInputIndex() const final { return InputIndex; } + + i64 GetSourceData(NKikimr::NMiniKQL::TUnboxedValueVector& buffer, bool& finished, i64) final { + i64 total = Outputs->Data.size(); + std::move(Outputs->Data.begin(), Outputs->Data.end(), std::back_inserter(buffer)); + Outputs->Data.clear(); + + if (Outputs.unique()) { + finished = true; + Send(SelfId(), new TEvPrivate::TEvReadDone); + } + + return total; + } + + void PassAway() final { +//Todo return TActorCoro::PassAway(); + } + + + static void OnNewData(TActorSystem* actorSystem, TActorId selfId, IHTTPGateway::TContent&& data, size_t pathInd) { + actorSystem->Send(new IEventHandle(selfId, TActorId(), new TEvPrivate::TEvReadResult(std::move(data), pathInd))); + } + + static void OnDownloadFinished(TActorSystem* actorSystem, TActorId selfId, std::optional<TIssues> result, size_t pathInd) { + if (result) + actorSystem->Send(new IEventHandle(selfId, TActorId(), new TEvPrivate::TEvReadError(TIssues{*result}, pathInd))); + else + actorSystem->Send(new IEventHandle(selfId, TActorId(), new TEvPrivate::TEvReadFinished(pathInd))); + } + + TAutoPtr<IEventHandle> AfterRegister(const TActorId& self, const TActorId& parent) { + for (size_t pathInd = 0; pathInd < Paths.size(); ++pathInd) { + const auto& path = Paths[pathInd]; + Gateway->Download(Url + std::get<TString>(path), + Headers, std::get<size_t>(path), + std::bind(&TS3ReadCoroActor::OnNewData, TActivationContext::ActorSystem(), self, std::placeholders::_1, pathInd), + std::bind(&TS3ReadCoroActor::OnDownloadFinished, TActivationContext::ActorSystem(), self, std::placeholders::_1, pathInd)); + }; + + return TActorCoro::AfterRegister(self, parent); + } + + static IHTTPGateway::THeaders MakeHeader(const TString& token) { + return token.empty() ? IHTTPGateway::THeaders() : IHTTPGateway::THeaders{TString("X-YaCloud-SubjectToken:") += token}; + } + + const ui64 InputIndex; + + const IHTTPGateway::TPtr Gateway; + + const TString Url; + const IHTTPGateway::THeaders Headers; + const TPathList Paths; + const TOutput::TPtr Outputs; +}; + std::pair<NYql::NDq::IDqSourceActor*, IActor*> CreateS3ReadActor( + const TTypeEnvironment& typeEnv, + const IFunctionRegistry& functionRegistry, IHTTPGateway::TPtr gateway, NS3::TSource&& params, ui64 inputIndex, @@ -254,8 +480,16 @@ std::pair<NYql::NDq::IDqSourceActor*, IActor*> CreateS3ReadActor( const auto token = secureParams.Value(params.GetToken(), TString{}); const auto credentialsProviderFactory = CreateCredentialsProviderFactoryForStructuredToken(credentialsFactory, token); const auto authToken = credentialsProviderFactory->CreateProvider()->GetAuthInfo(); - const auto actor = new TS3ReadActor(inputIndex, std::move(gateway), params.GetUrl(), authToken, std::move(paths), computeActorId, retryConfig); - return {actor, actor}; + + if (params.HasFormat()) { + auto outputs = std::make_shared<TOutput>(); + auto impl = MakeHolder<TS3ReadCoroImpl>(typeEnv, functionRegistry, inputIndex, computeActorId, paths.size(), params.GetFormat(), params.GetRowType(), outputs); + const auto actor = new TS3ReadCoroActor(std::move(impl), inputIndex, std::move(gateway), params.GetUrl(), authToken, std::move(paths), outputs); + return {actor, actor}; + } else { + const auto actor = new TS3ReadActor(inputIndex, std::move(gateway), params.GetUrl(), authToken, std::move(paths), computeActorId, retryConfig); + return {actor, actor}; + } } } // namespace NYql::NDq diff --git a/ydb/library/yql/providers/s3/actors/yql_s3_read_actor.h b/ydb/library/yql/providers/s3/actors/yql_s3_read_actor.h index 685ddf7e41..1edfb68ddf 100644 --- a/ydb/library/yql/providers/s3/actors/yql_s3_read_actor.h +++ b/ydb/library/yql/providers/s3/actors/yql_s3_read_actor.h @@ -10,6 +10,8 @@ namespace NYql::NDq { std::pair<NYql::NDq::IDqSourceActor*, NActors::IActor*> CreateS3ReadActor( + const NKikimr::NMiniKQL::TTypeEnvironment& typeEnv, + const NKikimr::NMiniKQL::IFunctionRegistry& functionRegistry, IHTTPGateway::TPtr gateway, NS3::TSource&& params, ui64 inputIndex, diff --git a/ydb/library/yql/providers/s3/actors/yql_s3_source_factory.cpp b/ydb/library/yql/providers/s3/actors/yql_s3_source_factory.cpp index 5319a692a5..f24dbda547 100644 --- a/ydb/library/yql/providers/s3/actors/yql_s3_source_factory.cpp +++ b/ydb/library/yql/providers/s3/actors/yql_s3_source_factory.cpp @@ -12,7 +12,7 @@ void RegisterS3ReadActorFactory( const std::shared_ptr<NYql::NS3::TRetryConfig>& retryConfig) { factory.Register<NS3::TSource>("S3Source", [credentialsFactory, gateway, retryConfig](NS3::TSource&& settings, IDqSourceActorFactory::TArguments&& args) { - return CreateS3ReadActor(gateway, std::move(settings), args.InputIndex, args.SecureParams, args.TaskParams, args.ComputeActorId, credentialsFactory, retryConfig); + return CreateS3ReadActor(args.TypeEnv, *args.HolderFactory.GetFunctionRegistry(), gateway, std::move(settings), args.InputIndex, args.SecureParams, args.TaskParams, args.ComputeActorId, credentialsFactory, retryConfig); }); } diff --git a/ydb/library/yql/providers/s3/proto/source.proto b/ydb/library/yql/providers/s3/proto/source.proto index bb8786851d..73a89ec12d 100644 --- a/ydb/library/yql/providers/s3/proto/source.proto +++ b/ydb/library/yql/providers/s3/proto/source.proto @@ -12,4 +12,7 @@ message TSource { string Url = 1; string Token = 2; repeated TPath Path = 3; + optional string RowType = 4; + optional string Format = 5; + optional string Compression = 6; } diff --git a/ydb/library/yql/providers/s3/provider/yql_s3_dq_integration.cpp b/ydb/library/yql/providers/s3/provider/yql_s3_dq_integration.cpp index 87a0b23234..3106467d79 100644 --- a/ydb/library/yql/providers/s3/provider/yql_s3_dq_integration.cpp +++ b/ydb/library/yql/providers/s3/provider/yql_s3_dq_integration.cpp @@ -3,6 +3,7 @@ #include <ydb/library/yql/dq/expr_nodes/dq_expr_nodes.h> #include <ydb/library/yql/providers/common/dq/yql_dq_integration_impl.h> +#include <ydb/library/yql/providers/common/schema/expr/yql_expr_schema.h> #include <ydb/library/yql/providers/dq/common/yql_dq_settings.h> #include <ydb/library/yql/providers/dq/expr_nodes/dqs_expr_nodes.h> #include <ydb/library/yql/providers/s3/expr_nodes/yql_s3_expr_nodes.h> @@ -151,6 +152,15 @@ public: p->SetSize(FromString<ui64>(paths.Item(i).Size().Value())); } + if (const auto mayParseSettings = settings.Maybe<TS3ParseSettings>()) { + const auto parseSettings = mayParseSettings.Cast(); + srcDesc.SetFormat(parseSettings.Format().StringValue().c_str()); + srcDesc.SetRowType(NCommon::WriteTypeToYson(parseSettings.RowType().Ref().GetTypeAnn()->Cast<TTypeExprType>()->GetType(), NYT::NYson::EYsonFormat::Text)); + + if (const auto compression = parseSettings.Compression()) + srcDesc.SetCompression(compression.Cast().StringValue().c_str()); + } + protoSettings.PackFrom(srcDesc); sourceType = "S3Source"; } |