diff options
author | mrlolthe1st <mrlolthe1st@yandex-team.com> | 2022-10-18 17:52:46 +0300 |
---|---|---|
committer | mrlolthe1st <mrlolthe1st@yandex-team.com> | 2022-10-18 17:52:46 +0300 |
commit | 88df45386dd60cd3e11f6087c62f6d5441b9b62a (patch) | |
tree | b35f492149807e008eb31769dbbf92f790513f39 | |
parent | 86f150b73356104e0b24527cb89c03adb1751829 (diff) | |
download | ydb-88df45386dd60cd3e11f6087c62f6d5441b9b62a.tar.gz |
Simulating S3 read from local fs
Simulating S3 from local files
7 files changed, 141 insertions, 42 deletions
diff --git a/ydb/library/yql/providers/s3/actors/yql_s3_read_actor.cpp b/ydb/library/yql/providers/s3/actors/yql_s3_read_actor.cpp index d25cfb788a..c1af7c69d5 100644 --- a/ydb/library/yql/providers/s3/actors/yql_s3_read_actor.cpp +++ b/ydb/library/yql/providers/s3/actors/yql_s3_read_actor.cpp @@ -14,6 +14,7 @@ #include <ydb/library/yql/udfs/common/clickhouse/client/src/DataTypes/DataTypesNumber.h> #include <ydb/library/yql/udfs/common/clickhouse/client/src/IO/ReadBuffer.h> +#include <ydb/library/yql/udfs/common/clickhouse/client/src/IO/ReadBufferFromFile.h> #include <ydb/library/yql/udfs/common/clickhouse/client/src/Core/Block.h> #include <ydb/library/yql/udfs/common/clickhouse/client/src/Core/ColumnsWithTypeAndName.h> @@ -106,6 +107,7 @@ struct TEvPrivate { EvReadError, EvRetry, EvNextBlock, + EvBlockProcessed, EvEnd }; @@ -152,10 +154,16 @@ struct TEvPrivate { }; struct TEvNextBlock : public NActors::TEventLocal<TEvNextBlock, EvNextBlock> { - TEvNextBlock(NDB::Block& block, size_t pathInd) : PathIndex(pathInd) { Block.swap(block); } + TEvNextBlock(NDB::Block& block, size_t pathInd, std::function<void()> functor = [](){}) : PathIndex(pathInd), Functor(functor) { Block.swap(block); } NDB::Block Block; const size_t PathIndex; + std::function<void()> Functor; }; + + struct TEvBlockProcessed : public NActors::TEventLocal<TEvBlockProcessed, EvBlockProcessed> { + TEvBlockProcessed() {} + }; + }; using namespace NKikimr::NMiniKQL; @@ -409,8 +417,8 @@ private: static constexpr std::string_view TruncatedSuffix = "... [truncated]"sv; public: - TS3ReadCoroImpl(ui64 inputIndex, const TTxId& txId, const NActors::TActorId& computeActorId, const TRetryStuff::TPtr& retryStuff, const TReadSpec::TPtr& readSpec, size_t pathIndex, const TString& path) - : TActorCoroImpl(256_KB), InputIndex(inputIndex), TxId(txId), RetryStuff(retryStuff), ReadSpec(readSpec), ComputeActorId(computeActorId), PathIndex(pathIndex), Path(path) + TS3ReadCoroImpl(ui64 inputIndex, const TTxId& txId, const NActors::TActorId& computeActorId, const TRetryStuff::TPtr& retryStuff, const TReadSpec::TPtr& readSpec, size_t pathIndex, const TString& path, const TString& url, const std::size_t maxBlocksInFly) + : TActorCoroImpl(256_KB), InputIndex(inputIndex), TxId(txId), RetryStuff(retryStuff), ReadSpec(readSpec), ComputeActorId(computeActorId), PathIndex(pathIndex), Path(path), Url(url), MaxBlocksInFly(maxBlocksInFly) {} bool Next(TString& value) { @@ -500,24 +508,52 @@ private: NYql::NDqProto::StatusIds::StatusCode fatalCode = NYql::NDqProto::StatusIds::EXTERNAL_ERROR; TIssue exceptIssue; + bool isLocal = Url.StartsWith("file://"); try { - TReadBufferFromStream buffer(this); - const auto decompress(MakeDecompressor(buffer, ReadSpec->Compression)); + std::unique_ptr<NDB::ReadBuffer> buffer; + if (isLocal) { + buffer = std::make_unique<NDB::ReadBufferFromFile>(Url.substr(7) + Path); + } else { + buffer = std::make_unique<TReadBufferFromStream>(this); + } + const auto decompress(MakeDecompressor(*buffer, ReadSpec->Compression)); YQL_ENSURE(ReadSpec->Compression.empty() == !decompress, "Unsupported " << ReadSpec->Compression << " compression."); - NDB::InputStreamFromInputFormat stream(NDB::FormatFactory::instance().getInputFormat(ReadSpec->Format, decompress ? *decompress : buffer, NDB::Block(ReadSpec->Columns), nullptr, 1_MB, ReadSpec->Settings)); - - while (auto block = stream.read()) { - Send(ParentActorId, new TEvPrivate::TEvNextBlock(block, PathIndex)); + NDB::InputStreamFromInputFormat stream(NDB::FormatFactory::instance().getInputFormat(ReadSpec->Format, decompress ? *decompress : *buffer, NDB::Block(ReadSpec->Columns), nullptr, 1_MB, ReadSpec->Settings)); + auto actorSystem = GetActorSystem(); + auto selfActorId = SelfActorId; + size_t cntBlocksInFly = 0; + if (isLocal) { + while (auto block = stream.read()) { + if (++cntBlocksInFly > MaxBlocksInFly) { + WaitForSpecificEvent<TEvPrivate::TEvBlockProcessed>(); + --cntBlocksInFly; + } + Send(ParentActorId, new TEvPrivate::TEvNextBlock(block, PathIndex, [actorSystem, selfActorId]() { + actorSystem->Send(new IEventHandle(selfActorId, selfActorId, new TEvPrivate::TEvBlockProcessed())); + })); + } + while (cntBlocksInFly--) { + WaitForSpecificEvent<TEvPrivate::TEvBlockProcessed>(); + } + } else { + while (auto block = stream.read()) { + Send(ParentActorId, new TEvPrivate::TEvNextBlock(block, PathIndex)); + } } + } catch (const TS3ReadError&) { // Finish reading. Add error from server to issues + } catch (const TDtorException&) { + throw; } catch (const std::exception& err) { exceptIssue.Message = TStringBuilder() << "Error while reading file " << Path << ", details: " << err.what(); fatalCode = NYql::NDqProto::StatusIds::INTERNAL_ERROR; RetryStuff->Cancel(); } - WaitFinish(); + if (!isLocal) { + WaitFinish(); + } if (!ErrorText.empty()) { TString errorCode; @@ -583,6 +619,7 @@ private: return result; } + private: const ui64 InputIndex; const TTxId TxId; @@ -592,6 +629,7 @@ private: const NActors::TActorId ComputeActorId; const size_t PathIndex; const TString Path; + const TString Url; bool InputFinished = false; long HttpResponseCode = 0L; @@ -601,6 +639,7 @@ private: std::size_t LastOffset = 0; TString LastData; + std::size_t MaxBlocksInFly = 2; }; class TS3ReadCoroActor : public TActorCoro { @@ -612,8 +651,10 @@ public: private: void Registered(TActorSystem* actorSystem, const TActorId& parent) override { TActorCoro::Registered(actorSystem, parent); // Calls TActorCoro::OnRegister and sends bootstrap event to ourself. - LOG_DEBUG_S(*actorSystem, NKikimrServices::KQP_COMPUTE, "TS3ReadCoroActor" << ": " << SelfId() << ", TxId: " << RetryStuff->TxId << ". " << "Start Download, Url: " << RetryStuff->Url << ", Offset: " << RetryStuff->Offset); - DownloadStart(RetryStuff, actorSystem, SelfId(), parent); + if (RetryStuff->Url.substr(0, 6) != "file://") { + LOG_DEBUG_S(*actorSystem, NKikimrServices::KQP_COMPUTE, "TS3ReadCoroActor" << ": " << SelfId() << ", TxId: " << RetryStuff->TxId << ". " << "Start Download, Url: " << RetryStuff->Url << ", Offset: " << RetryStuff->Offset); + DownloadStart(RetryStuff, actorSystem, SelfId(), parent); + } } static IHTTPGateway::THeaders MakeHeader(const TString& token) { @@ -637,7 +678,8 @@ public: ui64 startPathIndex, const TReadSpec::TPtr& readSpec, const NActors::TActorId& computeActorId, - const IRetryPolicy<long>::TPtr& retryPolicy + const IRetryPolicy<long>::TPtr& retryPolicy, + const std::size_t maxBlocksInFly ) : Gateway(std::move(gateway)) , HolderFactory(holderFactory) , InputIndex(inputIndex) @@ -651,16 +693,18 @@ public: , StartPathIndex(startPathIndex) , ReadSpec(readSpec) , Count(Paths.size()) + , MaxBlocksInFly(maxBlocksInFly) {} void Bootstrap() { LOG_D("TS3StreamReadActor", "Bootstrap"); Become(&TS3StreamReadActor::StateFunc); for (size_t pathInd = 0; pathInd < Paths.size(); ++pathInd) { + const TPath& path = Paths[pathInd]; auto stuff = std::make_shared<TRetryStuff>(Gateway, Url + std::get<TString>(path), Headers, std::get<std::size_t>(path), TxId, RetryPolicy); RetryStuffForFile.push_back(stuff); - auto impl = MakeHolder<TS3ReadCoroImpl>(InputIndex, TxId, ComputeActorId, stuff, ReadSpec, pathInd + StartPathIndex, std::get<TString>(path)); + auto impl = MakeHolder<TS3ReadCoroImpl>(InputIndex, TxId, ComputeActorId, stuff, ReadSpec, pathInd + StartPathIndex, std::get<TString>(path), Url, MaxBlocksInFly); RegisterWithSameMailbox(std::make_unique<TS3ReadCoroActor>(std::move(impl), std::move(stuff)).release()); } } @@ -670,8 +714,8 @@ public: private: class TBoxedBlock : public TComputationValue<TBoxedBlock> { public: - TBoxedBlock(TMemoryUsageInfo* memInfo, NDB::Block& block) - : TComputationValue(memInfo) + TBoxedBlock(TMemoryUsageInfo* memInfo, NDB::Block& block, std::function<void()> functor) + : TComputationValue(memInfo), OnDestroyFunctor(functor) { Block.swap(block); } @@ -684,7 +728,22 @@ private: return &Block; } + ~TBoxedBlock() { + if (OnDestroyFunctor) { + OnDestroyFunctor(); + } + } + + NDB::Block Block; + std::function<void()> OnDestroyFunctor; + }; + + class TReadyBlock { + public: + TReadyBlock(TEvPrivate::TEvNextBlock::TPtr& event) : PathInd(event->Get()->PathIndex), Functor (std::move(event->Get()->Functor)) { Block.swap(event->Get()->Block); } NDB::Block Block; + size_t PathInd; + std::function<void()> Functor; }; void SaveState(const NDqProto::TCheckpoint&, NDqProto::TSourceState&) final {} @@ -695,15 +754,14 @@ private: i64 GetAsyncInputData(TUnboxedValueVector& output, TMaybe<TInstant>&, bool& finished, i64 free) final { i64 total = 0LL; if (!Blocks.empty()) do { - auto& block = std::get<NDB::Block>(Blocks.front()); - const i64 s = block.bytes(); + const i64 s = Blocks.front().Block.bytes(); - auto value = HolderFactory.Create<TBoxedBlock>(block); + auto value = HolderFactory.Create<TBoxedBlock>(Blocks.front().Block, Blocks.front().Functor); if (AddPathIndex) { NUdf::TUnboxedValue* tupleItems = nullptr; auto tuple = ContainerCache.NewArray(HolderFactory, 2, tupleItems); *tupleItems++ = value; - *tupleItems++ = NUdf::TUnboxedValuePod(std::get<ui64>(Blocks.front())); + *tupleItems++ = NUdf::TUnboxedValuePod(Blocks.front().PathInd); value = tuple; } @@ -711,7 +769,7 @@ private: total += s; output.emplace_back(std::move(value)); Blocks.pop_front(); - } while (!Blocks.empty() && free > 0LL && std::get<NDB::Block>(Blocks.front()).bytes() <= size_t(free)); + } while (!Blocks.empty() && free > 0LL && Blocks.front().Block.bytes() <= size_t(free)); finished = Blocks.empty() && !Count; if (finished) { @@ -745,11 +803,7 @@ private: } void HandleNextBlock(TEvPrivate::TEvNextBlock::TPtr& next) { - Blocks.emplace_back(); - auto& block = std::get<NDB::Block>(Blocks.back()); - auto& pathInd = std::get<size_t>(Blocks.back()); - block.swap(next->Get()->Block); - pathInd = next->Get()->PathIndex; + Blocks.emplace_back(next); Send(ComputeActorId, new IDqComputeActorAsyncInput::TEvNewAsyncInputDataArrived(InputIndex)); } @@ -782,8 +836,9 @@ private: const bool AddPathIndex; const ui64 StartPathIndex; const TReadSpec::TPtr ReadSpec; - std::deque<std::tuple<NDB::Block, size_t>> Blocks; + std::deque<TReadyBlock> Blocks; ui32 Count; + const std::size_t MaxBlocksInFly; }; using namespace NKikimr::NMiniKQL; @@ -961,9 +1016,12 @@ std::pair<NYql::NDq::IDqComputeActorAsyncInput*, IActor*> CreateS3ReadActor( #undef SET_FLAG #undef SUPPORTED_FLAGS - + std::size_t maxBlocksInFly = 2; + if (const auto it = settings.find("fileReadBlocksInFly"); settings.cend() != it) + maxBlocksInFly = FromString<ui64>(it->second); const auto actor = new TS3StreamReadActor(inputIndex, txId, std::move(gateway), holderFactory, params.GetUrl(), authToken, - std::move(paths), addPathIndex, startPathIndex, readSpec, computeActorId, retryPolicy); + std::move(paths), addPathIndex, startPathIndex, readSpec, computeActorId, retryPolicy, maxBlocksInFly); + return {actor, actor}; } else { ui64 sizeLimit = std::numeric_limits<ui64>::max(); diff --git a/ydb/library/yql/providers/s3/provider/yql_s3_io_discovery.cpp b/ydb/library/yql/providers/s3/provider/yql_s3_io_discovery.cpp index 8f5f635a22..96a8bf9dea 100644 --- a/ydb/library/yql/providers/s3/provider/yql_s3_io_discovery.cpp +++ b/ydb/library/yql/providers/s3/provider/yql_s3_io_discovery.cpp @@ -56,7 +56,7 @@ class TS3IODiscoveryTransformer : public TGraphTransformerBase { public: TS3IODiscoveryTransformer(TS3State::TPtr state, IHTTPGateway::TPtr gateway) : State_(std::move(state)) - , Lister_(IS3Lister::Make(gateway, State_->Configuration->MaxDiscoveryFilesPerQuery, State_->Configuration->MaxInflightListsPerQuery)) + , Lister_(IS3Lister::Make(gateway, State_->Configuration->MaxDiscoveryFilesPerQuery, State_->Configuration->MaxInflightListsPerQuery, State_->Configuration->AllowLocalFiles)) {} TStatus DoTransform(TExprNode::TPtr input, TExprNode::TPtr& output, TExprContext& ctx) final { diff --git a/ydb/library/yql/providers/s3/provider/yql_s3_list.cpp b/ydb/library/yql/providers/s3/provider/yql_s3_list.cpp index 38be367bb1..c076000463 100644 --- a/ydb/library/yql/providers/s3/provider/yql_s3_list.cpp +++ b/ydb/library/yql/providers/s3/provider/yql_s3_list.cpp @@ -14,6 +14,7 @@ #include <library/cpp/threading/future/async_semaphore.h> #include <library/cpp/xml/document/xml-document.h> #include <util/string/builder.h> +#include <util/folder/iterator.h> namespace NYql { @@ -73,9 +74,10 @@ using namespace NThreading; class TS3Lister : public IS3Lister { public: - explicit TS3Lister(const IHTTPGateway::TPtr& httpGateway, ui64 maxFilesPerQuery) + explicit TS3Lister(const IHTTPGateway::TPtr& httpGateway, ui64 maxFilesPerQuery, bool allowLocalFiles) : Gateway(httpGateway) , MaxFilesPerQuery(maxFilesPerQuery) + , AllowLocalFiles(allowLocalFiles) {} private: using TResultFilter = std::function<bool (const TString& path, TVector<TString>& matchedGlobs)>; @@ -224,11 +226,49 @@ private: YQL_CLOG(INFO, ProviderS3) << "Listing of " << urlStr << prefix << " : got exception: " << ex.what(); promise.SetException(std::current_exception()); } - - + #include <iostream> TFuture<TListResult> DoList(const TString& token, const TString& urlStr, const TString& pattern, const TMaybe<TString>& pathPrefix) { TString prefix; TResultFilter filter = MakeFilter(pattern, pathPrefix, prefix); + auto promise = NewPromise<IS3Lister::TListResult>(); + auto future = promise.GetFuture(); + + if (urlStr.substr(0, 7) == "file://") { + try { + if (!AllowLocalFiles) { + ythrow yexception() << "Using local files as DataSource isn't allowed, but trying access " << urlStr; + } + auto fullPath = urlStr.substr(7); + for (const auto &e: TPathSplit(fullPath)) { + if (e == "..") { + ythrow yexception() << "Security violation: trying access parent directory in path"; + } + } + + IS3Lister::TListEntries output; + + for (const auto& entry: TDirIterator(fullPath)) { + if (entry.fts_type != FTS_F) { + continue; + } + + auto filename = TString(entry.fts_path + urlStr.size() - 7); + TVector<TString> matches; + if (filter(filename, matches)) { + output.emplace_back(); + output.back().Path = filename; + output.back().Size = entry.fts_statp->st_size; + output.back().MatchedGlobs.swap(matches); + } + } + + promise.SetValue(std::move(output)); + } catch (const std::exception& ex) { + promise.SetException(std::current_exception()); + } + return future; + } + const auto retryPolicy = GetHTTPDefaultRetryPolicy(); TUrlBuilder urlBuilder(urlStr); @@ -242,9 +282,6 @@ private: headers.emplace_back("X-YaCloud-SubjectToken:" + token); } - auto promise = NewPromise<IS3Lister::TListResult>(); - auto future = promise.GetFuture(); - Gateway->Download( url, std::move(headers), @@ -277,6 +314,7 @@ private: const IHTTPGateway::TPtr Gateway; const ui64 MaxFilesPerQuery; + const bool AllowLocalFiles; }; class TS3ParallelLimitedLister : public IS3Lister { @@ -321,8 +359,8 @@ private: } -IS3Lister::TPtr IS3Lister::Make(const IHTTPGateway::TPtr& httpGateway, ui64 maxFilesPerQuery, ui64 maxInflightListsPerQuery) { - auto lister = IS3Lister::TPtr(new TS3Lister(httpGateway, maxFilesPerQuery)); +IS3Lister::TPtr IS3Lister::Make(const IHTTPGateway::TPtr& httpGateway, ui64 maxFilesPerQuery, ui64 maxInflightListsPerQuery, bool allowLocalFiles) { + auto lister = IS3Lister::TPtr(new TS3Lister(httpGateway, maxFilesPerQuery, allowLocalFiles)); return IS3Lister::TPtr(new TS3ParallelLimitedLister(lister, maxInflightListsPerQuery)); } diff --git a/ydb/library/yql/providers/s3/provider/yql_s3_list.h b/ydb/library/yql/providers/s3/provider/yql_s3_list.h index 5c7d4f64ea..04f5dd1a74 100644 --- a/ydb/library/yql/providers/s3/provider/yql_s3_list.h +++ b/ydb/library/yql/providers/s3/provider/yql_s3_list.h @@ -35,7 +35,7 @@ public: // pathPrefix is a "constant" path prefix virtual NThreading::TFuture<TListResult> ListRegex(const TString& token, const TString& url, const TString& pattern, const TString& pathPrefix) = 0; - static TPtr Make(const IHTTPGateway::TPtr& httpGateway, ui64 maxFilesPerQuery, ui64 maxInflightListsPerQuery); + static TPtr Make(const IHTTPGateway::TPtr& httpGateway, ui64 maxFilesPerQuery, ui64 maxInflightListsPerQuery, bool allowLocalFiles); }; }
\ No newline at end of file diff --git a/ydb/library/yql/providers/s3/provider/yql_s3_provider.cpp b/ydb/library/yql/providers/s3/provider/yql_s3_provider.cpp index 9b1d35e98e..a8de126793 100644 --- a/ydb/library/yql/providers/s3/provider/yql_s3_provider.cpp +++ b/ydb/library/yql/providers/s3/provider/yql_s3_provider.cpp @@ -4,8 +4,8 @@ namespace NYql { -TDataProviderInitializer GetS3DataProviderInitializer(IHTTPGateway::TPtr gateway, ISecuredServiceAccountCredentialsFactory::TPtr credentialsFactory) { - return [gateway, credentialsFactory] ( +TDataProviderInitializer GetS3DataProviderInitializer(IHTTPGateway::TPtr gateway, ISecuredServiceAccountCredentialsFactory::TPtr credentialsFactory, bool allowLocalFiles) { + return [gateway, credentialsFactory, allowLocalFiles] ( const TString& userName, const TString& sessionId, const TGatewaysConfig* gatewaysConfig, @@ -32,6 +32,8 @@ TDataProviderInitializer GetS3DataProviderInitializer(IHTTPGateway::TPtr gateway state->Configuration->Init(gatewaysConfig->GetS3(), typeCtx); } + state->Configuration->AllowLocalFiles = allowLocalFiles; + TDataProviderInfo info; info.Names.insert({TString{S3ProviderName}}); diff --git a/ydb/library/yql/providers/s3/provider/yql_s3_provider.h b/ydb/library/yql/providers/s3/provider/yql_s3_provider.h index d4367b14b2..d281442575 100644 --- a/ydb/library/yql/providers/s3/provider/yql_s3_provider.h +++ b/ydb/library/yql/providers/s3/provider/yql_s3_provider.h @@ -29,7 +29,7 @@ struct TS3State : public TThrRefBase ISecuredServiceAccountCredentialsFactory::TPtr CredentialsFactory; }; -TDataProviderInitializer GetS3DataProviderInitializer(IHTTPGateway::TPtr gateway, ISecuredServiceAccountCredentialsFactory::TPtr credentialsFactory = nullptr); +TDataProviderInitializer GetS3DataProviderInitializer(IHTTPGateway::TPtr gateway, ISecuredServiceAccountCredentialsFactory::TPtr credentialsFactory = nullptr, bool allowLocalFiles = false); TIntrusivePtr<IDataProvider> CreateS3DataSource(TS3State::TPtr state, IHTTPGateway::TPtr gateway); TIntrusivePtr<IDataProvider> CreateS3DataSink(TS3State::TPtr state, IHTTPGateway::TPtr gateway); diff --git a/ydb/library/yql/providers/s3/provider/yql_s3_settings.h b/ydb/library/yql/providers/s3/provider/yql_s3_settings.h index bbfdef5d0d..bfb8cfae57 100644 --- a/ydb/library/yql/providers/s3/provider/yql_s3_settings.h +++ b/ydb/library/yql/providers/s3/provider/yql_s3_settings.h @@ -42,6 +42,7 @@ struct TS3Configuration : public TS3Settings, public NCommon::TSettingDispatcher ui64 MaxDiscoveryFilesPerQuery; ui64 MaxReadSizePerQuery; ui64 MaxInflightListsPerQuery; + bool AllowLocalFiles; }; } // NYql |