diff options
author | a-romanov <a-romanov@yandex-team.ru> | 2022-03-21 17:32:52 +0300 |
---|---|---|
committer | a-romanov <a-romanov@yandex-team.ru> | 2022-03-21 17:32:52 +0300 |
commit | cc6a2b1d8898f6908b30857eff17a9f860adb835 (patch) | |
tree | 9eed9e644b42d938f5dc5380f489f5fa1fb5e1ee | |
parent | 3f2a0c7d35ea5aef609e266ea0e770c383115dca (diff) | |
download | ydb-cc6a2b1d8898f6908b30857eff17a9f860adb835.tar.gz |
YQ-727 S3 Source coro actor.
ref:a41532935edfdddaec5d5059dbaa1a0cec23f08a
-rw-r--r-- | ydb/library/yql/providers/s3/actors/yql_s3_read_actor.cpp | 209 |
1 files changed, 126 insertions, 83 deletions
diff --git a/ydb/library/yql/providers/s3/actors/yql_s3_read_actor.cpp b/ydb/library/yql/providers/s3/actors/yql_s3_read_actor.cpp index 32e88705f11..4c3dc66c270 100644 --- a/ydb/library/yql/providers/s3/actors/yql_s3_read_actor.cpp +++ b/ydb/library/yql/providers/s3/actors/yql_s3_read_actor.cpp @@ -38,7 +38,6 @@ struct TEvPrivate { EvReadFinished, EvReadError, EvRetry, - EvReadDone, EvEnd }; @@ -67,11 +66,10 @@ struct TEvPrivate { explicit TEvRetryEvent(size_t pathIndex) : PathIndex(pathIndex) {} const size_t PathIndex = 0; }; - - struct TEvReadDone : public TEventLocal<TEvReadDone, EvReadDone> {}; }; -} // namespace +using TPath = std::tuple<TString, size_t>; +using TPathList = std::vector<TPath>; class TS3ReadActor : public TActorBootstrapped<TS3ReadActor>, public IDqSourceActor { private: @@ -106,9 +104,6 @@ private: double Epsilon = 0.1; }; public: -using TPath = std::tuple<TString, size_t>; -using TPathList = std::vector<TPath>; - TS3ReadActor(ui64 inputIndex, IHTTPGateway::TPtr gateway, const TString& url, @@ -302,54 +297,51 @@ public: case TEvPrivate::TEvReadFinished::EventType: return false; case TEvPrivate::TEvReadError::EventType: + Send(ComputeActorId, new IDqSourceActor::TEvSourceError(InputIndex, ev->Get<TEvPrivate::TEvReadError>()->Error, true)); return false; case TEvPrivate::TEvReadResult::EventType: - value = NUdf::TUnboxedValue(MakeString(NUdf::TStringRef(std::string_view(ev->Get<TEvPrivate::TEvReadResult>()->Result)))); + value = MakeString(NUdf::TStringRef(std::string_view(ev->Get<TEvPrivate::TEvReadResult>()->Result))); return true; default: return false; } } private: - void Run() final { - try { - const auto randStub = CreateDeterministicRandomProvider(1); - const auto timeStub = CreateDeterministicTimeProvider(10000000); - - const auto alloc = TypeEnv.BindAllocator(); - const auto pb = std::make_unique<TProgramBuilder>(TypeEnv, FunctionRegistry); + void Run() final try { + const auto randStub = CreateDeterministicRandomProvider(1); + const auto timeStub = CreateDeterministicTimeProvider(10000000); - TCallableBuilder callableBuilder(TypeEnv, "CoroStream", pb->NewStreamType(pb->NewDataType(NUdf::EDataSlot::String))); + const auto alloc = TypeEnv.BindAllocator(); + const auto pb = std::make_unique<TProgramBuilder>(TypeEnv, FunctionRegistry); - const auto factory = [this](TCallable& callable, const TComputationNodeFactoryContext& ctx) -> IComputationNode* { - return callable.GetType()->GetName() == "CoroStream" ? - TCoroStreamWrapper::Make(callable, ctx, this) : GetBuiltinFactory()(callable, ctx); - }; + TCallableBuilder callableBuilder(TypeEnv, "CoroStream", pb->NewStreamType(pb->NewDataType(NUdf::EDataSlot::String))); - TRuntimeNode stream(callableBuilder.Build(), false); + const auto factory = [this](TCallable& callable, const TComputationNodeFactoryContext& ctx) -> IComputationNode* { + return callable.GetType()->GetName() == "CoroStream" ? + TCoroStreamWrapper::Make(callable, ctx, this) : GetBuiltinFactory()(callable, ctx); + }; - const auto outputItemType = NCommon::ParseTypeFromYson(TStringBuf(RowType), *pb, Cerr); - const auto userType = pb->NewTupleType({pb->NewTupleType({pb->NewDataType(NUdf::EDataSlot::String)}), pb->NewStructType({}), outputItemType}); - const auto root = pb->Apply(pb->Udf("ClickHouseClient.ParseFormat", {}, userType, Format), {stream}); + TRuntimeNode stream(callableBuilder.Build(), false); - TExploringNodeVisitor explorer; - explorer.Walk(root.GetNode(), TypeEnv); - TComputationPatternOpts opts(TypeEnv.GetAllocator().Ref(), TypeEnv, factory, &FunctionRegistry, NUdf::EValidateMode::None, NUdf::EValidatePolicy::Exception, "OFF", EGraphPerProcess::Single); - const auto pattern = MakeComputationPattern(explorer, root, {}, opts); - const auto graph = pattern->Clone(opts.ToComputationOptions(*randStub, *timeStub)); - const TBindTerminator bind(graph->GetTerminator()); + const auto outputItemType = NCommon::ParseTypeFromYson(TStringBuf(RowType), *pb, Cerr); + const auto userType = pb->NewTupleType({pb->NewTupleType({pb->NewDataType(NUdf::EDataSlot::String)}), pb->NewStructType({}), outputItemType}); + const auto root = pb->Apply(pb->Udf("ClickHouseClient.ParseFormat", {}, userType, Format), {stream}); - const auto output = graph->GetValue(); - for (NUdf::TUnboxedValue v; NUdf::EFetchStatus::Ok == output.Fetch(v);) - Outputs->Data.emplace_back(std::move(v)); + TExploringNodeVisitor explorer; + explorer.Walk(root.GetNode(), TypeEnv); + TComputationPatternOpts opts(TypeEnv.GetAllocator().Ref(), TypeEnv, factory, &FunctionRegistry, NUdf::EValidateMode::None, NUdf::EValidatePolicy::Exception, "OFF", EGraphPerProcess::Single); + const auto pattern = MakeComputationPattern(explorer, root, {}, opts); + const auto graph = pattern->Clone(opts.ToComputationOptions(*randStub, *timeStub)); + const TBindTerminator bind(graph->GetTerminator()); - Outputs.reset(); - } catch (const yexception& err) { - Cerr << __func__ << " exception " << err.what() << Endl; - return; - } + const auto output = graph->GetValue(); + for (NUdf::TUnboxedValue v; NUdf::EFetchStatus::Ok == output.Fetch(v);) + Outputs->Data.emplace_back(std::move(v)); - Y_UNUSED(WaitForSpecificEvent<TEvPrivate::TEvReadDone>()); + Outputs.reset(); + } catch (const yexception& err) { + Send(ComputeActorId, new IDqSourceActor::TEvSourceError(InputIndex, TIssues{TIssue(err.what())}, true)); + return; } void ProcessUnexpectedEvent(TAutoPtr<IEventHandle>) final { @@ -364,28 +356,93 @@ private: TOutput::TPtr Outputs; }; -class TS3ReadCoroActor : public TActorCoro, public IDqSourceActor { +class TS3ReadCoroActor : public TActorCoro { public: - using TPath = std::tuple<TString, size_t>; - using TPathList = std::vector<TPath>; - TS3ReadCoroActor(THolder<TS3ReadCoroImpl> impl, + IHTTPGateway::TPtr gateway, + const TString& url, + const IHTTPGateway::THeaders& headers, + const TString& path, + const std::size_t expectedSize, + TOutput::TPtr outputs) + : TActorCoro(THolder<TActorCoroImpl>(impl.Release())) + , Gateway(std::move(gateway)) + , Url(url) + , Headers(headers) + , Path(path) + , ExpectedSize(expectedSize) + , Outputs(std::move(outputs)) + {} +private: + static void OnNewData(TActorSystem* actorSystem, TActorId selfId, IHTTPGateway::TContent&& data) { + actorSystem->Send(new IEventHandle(selfId, TActorId(), new TEvPrivate::TEvReadResult(std::move(data), 0))); + } + + static void OnDownloadFinished(TActorSystem* actorSystem, TActorId selfId, std::optional<TIssues> result) { + if (result) + actorSystem->Send(new IEventHandle(selfId, TActorId(), new TEvPrivate::TEvReadError(TIssues{*result}, 0))); + else + actorSystem->Send(new IEventHandle(selfId, TActorId(), new TEvPrivate::TEvReadFinished(0))); + } + + TAutoPtr<IEventHandle> AfterRegister(const TActorId& self, const TActorId& parent) { + Gateway->Download(Url + Path, + Headers, ExpectedSize, + std::bind(&TS3ReadCoroActor::OnNewData, TActivationContext::ActorSystem(), self, std::placeholders::_1), + std::bind(&TS3ReadCoroActor::OnDownloadFinished, TActivationContext::ActorSystem(), self, std::placeholders::_1)); + + return TActorCoro::AfterRegister(self, parent); + } + + static IHTTPGateway::THeaders MakeHeader(const TString& token) { + return token.empty() ? IHTTPGateway::THeaders() : IHTTPGateway::THeaders{TString("X-YaCloud-SubjectToken:") += token}; + } + + const IHTTPGateway::TPtr Gateway; + + const TString Url; + const IHTTPGateway::THeaders Headers; + const TString Path; + const std::size_t ExpectedSize; + TOutput::TPtr Outputs; +}; + +class TS3StreamReadActor : public TActorBootstrapped<TS3StreamReadActor>, public IDqSourceActor { +public: + TS3StreamReadActor( + const TTypeEnvironment& typeEnv, + const IFunctionRegistry& functionRegistry, ui64 inputIndex, IHTTPGateway::TPtr gateway, const TString& url, const TString& token, TPathList&& paths, - TOutput::TPtr outputs) - : TActorCoro(THolder<TActorCoroImpl>(impl.Release())) - , InputIndex(inputIndex) + TString format, + TString rowType, + const NActors::TActorId& computeActorId + ) : TypeEnv(typeEnv) + , FunctionRegistry(functionRegistry) , Gateway(std::move(gateway)) + , InputIndex(inputIndex) + , ComputeActorId(computeActorId) , Url(url) , Headers(MakeHeader(token)) , Paths(std::move(paths)) - , Outputs(std::move(outputs)) + , Format(format) + , RowType(rowType) + , Outputs(std::make_shared<TOutput>()) {} - static constexpr char ActorName[] = "S3_READ_ACTOR_CORO"; + void Bootstrap() { + for (size_t pathInd = 0; pathInd < Paths.size(); ++pathInd) { + const TPath& path = Paths[pathInd]; + auto impl = MakeHolder<TS3ReadCoroImpl>(TypeEnv, FunctionRegistry, InputIndex, ComputeActorId, Paths.size(), Format, RowType, Outputs); + RegisterWithSameMailbox(MakeHolder<TS3ReadCoroActor>(std::move(impl), Gateway, Url, Headers, std::get<TString>(path), std::get<std::size_t>(path), Outputs).Release()); + }; + } + + static constexpr char ActorName[] = "S3_READ_ACTOR"; + private: void SaveState(const NDqProto::TCheckpoint&, NDqProto::TSourceState&) final {} void LoadState(const NDqProto::TSourceState&) final {} @@ -393,60 +450,48 @@ private: ui64 GetInputIndex() const final { return InputIndex; } i64 GetSourceData(NKikimr::NMiniKQL::TUnboxedValueVector& buffer, bool& finished, i64) final { + if (!Outputs) + return 0LL; + i64 total = Outputs->Data.size(); std::move(Outputs->Data.begin(), Outputs->Data.end(), std::back_inserter(buffer)); Outputs->Data.clear(); if (Outputs.unique()) { finished = true; - Send(SelfId(), new TEvPrivate::TEvReadDone); + Outputs.reset(); } return total; } - void PassAway() final { -//Todo return TActorCoro::PassAway(); - } - - - static void OnNewData(TActorSystem* actorSystem, TActorId selfId, IHTTPGateway::TContent&& data, size_t pathInd) { - actorSystem->Send(new IEventHandle(selfId, TActorId(), new TEvPrivate::TEvReadResult(std::move(data), pathInd))); - } - - static void OnDownloadFinished(TActorSystem* actorSystem, TActorId selfId, std::optional<TIssues> result, size_t pathInd) { - if (result) - actorSystem->Send(new IEventHandle(selfId, TActorId(), new TEvPrivate::TEvReadError(TIssues{*result}, pathInd))); - else - actorSystem->Send(new IEventHandle(selfId, TActorId(), new TEvPrivate::TEvReadFinished(pathInd))); - } - - TAutoPtr<IEventHandle> AfterRegister(const TActorId& self, const TActorId& parent) { - for (size_t pathInd = 0; pathInd < Paths.size(); ++pathInd) { - const auto& path = Paths[pathInd]; - Gateway->Download(Url + std::get<TString>(path), - Headers, std::get<size_t>(path), - std::bind(&TS3ReadCoroActor::OnNewData, TActivationContext::ActorSystem(), self, std::placeholders::_1, pathInd), - std::bind(&TS3ReadCoroActor::OnDownloadFinished, TActivationContext::ActorSystem(), self, std::placeholders::_1, pathInd)); - }; - - return TActorCoro::AfterRegister(self, parent); + // IActor & IDqSourceActor + void PassAway() override { // Is called from Compute Actor + TActorBootstrapped<TS3StreamReadActor>::PassAway(); } static IHTTPGateway::THeaders MakeHeader(const TString& token) { return token.empty() ? IHTTPGateway::THeaders() : IHTTPGateway::THeaders{TString("X-YaCloud-SubjectToken:") += token}; } - - const ui64 InputIndex; +private: + const TTypeEnvironment& TypeEnv; + const IFunctionRegistry& FunctionRegistry; const IHTTPGateway::TPtr Gateway; + const ui64 InputIndex; + const NActors::TActorId ComputeActorId; + const TString Url; const IHTTPGateway::THeaders Headers; const TPathList Paths; - const TOutput::TPtr Outputs; + const TString Format, RowType, Compression; + + TOutput::TPtr Outputs; }; +} // namespace + std::pair<NYql::NDq::IDqSourceActor*, IActor*> CreateS3ReadActor( const TTypeEnvironment& typeEnv, const IFunctionRegistry& functionRegistry, @@ -463,7 +508,7 @@ std::pair<NYql::NDq::IDqSourceActor*, IActor*> CreateS3ReadActor( for (auto i = 0; i < params.GetPath().size(); ++i) map.emplace(params.GetPath().Get(i).GetPath(), params.GetPath().Get(i).GetSize()); - TS3ReadActor::TPathList paths; + TPathList paths; paths.reserve(map.size()); if (const auto taskParamsIt = taskParams.find(S3ProviderName); taskParamsIt != taskParams.cend()) { NS3::TRange range; @@ -481,10 +526,8 @@ std::pair<NYql::NDq::IDqSourceActor*, IActor*> CreateS3ReadActor( const auto credentialsProviderFactory = CreateCredentialsProviderFactoryForStructuredToken(credentialsFactory, token); const auto authToken = credentialsProviderFactory->CreateProvider()->GetAuthInfo(); - if (params.HasFormat()) { - auto outputs = std::make_shared<TOutput>(); - auto impl = MakeHolder<TS3ReadCoroImpl>(typeEnv, functionRegistry, inputIndex, computeActorId, paths.size(), params.GetFormat(), params.GetRowType(), outputs); - const auto actor = new TS3ReadCoroActor(std::move(impl), inputIndex, std::move(gateway), params.GetUrl(), authToken, std::move(paths), outputs); + if (params.HasFormat() && params.HasRowType()) { + const auto actor = new TS3StreamReadActor(typeEnv, functionRegistry, inputIndex, std::move(gateway), params.GetUrl(), authToken, std::move(paths), params.GetFormat(), params.GetRowType(), computeActorId); return {actor, actor}; } else { const auto actor = new TS3ReadActor(inputIndex, std::move(gateway), params.GetUrl(), authToken, std::move(paths), computeActorId, retryConfig); |