diff options
author | imunkin <[email protected]> | 2024-11-08 10:00:23 +0300 |
---|---|---|
committer | imunkin <[email protected]> | 2024-11-08 10:12:13 +0300 |
commit | a784a2f943d6e15caa6241e2e96d80aac6dbf375 (patch) | |
tree | 05f1e5366c916b988a8afb75bdab8ddeee0f6e6d /yql/essentials/udfs/common/streaming/streaming_udf.cpp | |
parent | d70137a7b530ccaa52834274913bbb5a3d1ca06e (diff) |
Move yql/udfs/common/ to /yql/essentials YQL-19206
Except the following directories:
* clickhouse/client
* datetime
* knn
* roaring
commit_hash:c7da95636144d28db109d6b17ddc762e9bacb59f
Diffstat (limited to 'yql/essentials/udfs/common/streaming/streaming_udf.cpp')
-rw-r--r-- | yql/essentials/udfs/common/streaming/streaming_udf.cpp | 829 |
1 files changed, 829 insertions, 0 deletions
diff --git a/yql/essentials/udfs/common/streaming/streaming_udf.cpp b/yql/essentials/udfs/common/streaming/streaming_udf.cpp new file mode 100644 index 00000000000..bd01935321e --- /dev/null +++ b/yql/essentials/udfs/common/streaming/streaming_udf.cpp @@ -0,0 +1,829 @@ +#include <yql/essentials/public/udf/udf_value.h> +#include <yql/essentials/public/udf/udf_registrator.h> +#include <yql/essentials/public/udf/udf_type_builder.h> +#include <yql/essentials/public/udf/udf_value_builder.h> +#include <yql/essentials/public/udf/udf_terminator.h> + +#include <util/generic/buffer.h> +#include <util/generic/mem_copy.h> +#include <util/generic/maybe.h> +#include <util/generic/ptr.h> +#include <util/string/builder.h> +#include <util/stream/mem.h> +#include <library/cpp/deprecated/kmp/kmp.h> +#include <util/string/strip.h> +#include <util/system/condvar.h> +#include <util/system/shellcommand.h> +#include <util/system/tempfile.h> +#include <util/system/sysstat.h> + +#include <functional> + +using namespace NKikimr; +using namespace NUdf; + +namespace { + // Cyclic Read-Write buffer. + // Not thread safe, synchronization between reader and writer threads + // should be managed externally. + class TCyclicRWBuffer { + public: + TCyclicRWBuffer(size_t capacity) + : Buffer(capacity) + , Finished(false) + , DataStart(0) + , DataSize(0) + { + Buffer.Resize(capacity); + } + + bool IsFinished() const { + return Finished; + } + + void Finish() { + Finished = true; + } + + bool HasData() const { + return DataSize > 0; + } + + size_t GetDataSize() const { + return DataSize; + } + + void GetData(const char*& ptr, size_t& len) const { + size_t readSize = GetDataRegionSize(DataStart, DataSize); + ptr = Buffer.Data() + DataStart; + len = readSize; + } + + void CommitRead(size_t len) { + Y_DEBUG_ABORT_UNLESS(len <= GetDataRegionSize(DataStart, DataSize)); + + DataStart = GetBufferPosition(DataStart + len); + DataSize -= len; + } + + bool CanWrite() const { + return WriteSize() > 0; + } + + size_t WriteSize() const { + return Buffer.Size() - DataSize; + } + + size_t Write(const char*& ptr, size_t& len) { + if (!CanWrite()) { + return 0; + } + + size_t bytesWritten = 0; + size_t bytesToWrite = std::min(len, WriteSize()); + while (bytesToWrite > 0) { + size_t writeStart = GetWriteStart(); + size_t writeSize = GetDataRegionSize(writeStart, bytesToWrite); + + MemCopy(Data(writeStart), ptr, writeSize); + + DataSize += writeSize; + bytesWritten += writeSize; + bytesToWrite -= writeSize; + + ptr += writeSize; + len -= writeSize; + } + + return bytesWritten; + } + + size_t Write(IZeroCopyInput& input) { + const void* ptr; + size_t dataLen = input.Next(&ptr, WriteSize()); + const char* dataPtr = reinterpret_cast<const char*>(ptr); + return Write(dataPtr, dataLen); + } + + private: + size_t GetBufferPosition(size_t pos) const { + return pos % Buffer.Size(); + } + + size_t GetDataRegionSize(size_t start, size_t size) const { + Y_DEBUG_ABORT_UNLESS(start < Buffer.Size()); + + return std::min(size, Buffer.Size() - start); + } + + size_t GetWriteStart() const { + return GetBufferPosition(DataStart + DataSize); + } + + char* Data(size_t pos) { + Y_DEBUG_ABORT_UNLESS(pos < Buffer.Size()); + + return (Buffer.Data() + pos); + } + + private: + TBuffer Buffer; + + bool Finished; + + size_t DataStart; + size_t DataSize; + }; + + struct TStreamingParams { + public: + const size_t DefaultProcessPollLatencyMs = 5 * 1000; // 5 seconds + const size_t DefaultInputBufferSizeBytes = 4 * 1024 * 1024; // 4MB + const size_t DefaultOutputBufferSizeBytes = 16 * 1024 * 1024; // 16MB + const char* DefaultInputDelimiter = "\n"; + const char* DefaultOutputDelimiter = "\n"; + + public: + TUnboxedValue InputStreamObj; + TString CommandLine; + TUnboxedValue ArgumentsList; + TString InputDelimiter; + TString OutputDelimiter; + size_t InputBufferSizeBytes; + size_t OutputBufferSizeBytes; + size_t ProcessPollLatencyMs; + + TStreamingParams() + : InputDelimiter(DefaultInputDelimiter) + , OutputDelimiter(DefaultOutputDelimiter) + , InputBufferSizeBytes(DefaultInputBufferSizeBytes) + , OutputBufferSizeBytes(DefaultOutputBufferSizeBytes) + , ProcessPollLatencyMs(DefaultProcessPollLatencyMs) + { + } + }; + + struct TThreadSyncData { + TMutex BuffersMutex; + TCondVar InputBufferCanReadCond; + TCondVar MainThreadHasWorkCond; + TCondVar OutputBufferCanWriteCond; + }; + + class TStringListBufferedInputStream: public IInputStream { + public: + TStringListBufferedInputStream(TUnboxedValue rowsStream, const TString& delimiter, size_t bufferSizeBytes, + TThreadSyncData& syncData, TSourcePosition pos) + : RowsStream(rowsStream) + , Delimiter(delimiter) + , SyncData(syncData) + , Pos_(pos) + , DelimiterMatcher(delimiter) + , DelimiterInput(delimiter) + , Buffer(bufferSizeBytes) + , CurReadMode(ReadMode::Start) + { + } + + TStringListBufferedInputStream(const TStringListBufferedInputStream&) = delete; + TStringListBufferedInputStream& operator=(const TStringListBufferedInputStream&) = delete; + + TCyclicRWBuffer& GetBuffer() { + return Buffer; + } + + // Fetch input from upstream list iterator to the buffer. + // Called from Main thread. + EFetchStatus FetchInput() { + with_lock (SyncData.BuffersMutex) { + Y_DEBUG_ABORT_UNLESS(!Buffer.HasData()); + Y_DEBUG_ABORT_UNLESS(Buffer.CanWrite()); + + bool receivedYield = false; + + while (Buffer.CanWrite() && CurReadMode != ReadMode::Done && !receivedYield) { + switch (CurReadMode) { + case ReadMode::Start: { + auto status = ReadNextString(); + if (status == EFetchStatus::Yield) { + receivedYield = true; + break; + } + + CurReadMode = (status == EFetchStatus::Ok) + ? ReadMode::String + : ReadMode::Done; + + break; + } + + case ReadMode::String: + if (CurStringInput.Exhausted()) { + DelimiterInput.Reset(Delimiter.data(), Delimiter.size()); + CurReadMode = ReadMode::Delimiter; + break; + } + + Buffer.Write(CurStringInput); + break; + + case ReadMode::Delimiter: + if (DelimiterInput.Exhausted()) { + CurReadMode = ReadMode::Start; + break; + } + + Buffer.Write(DelimiterInput); + break; + + default: + break; + } + } + + if (CurReadMode == ReadMode::Done) { + Buffer.Finish(); + } + + SyncData.InputBufferCanReadCond.Signal(); + return receivedYield ? EFetchStatus::Yield : EFetchStatus::Ok; + } + } + + private: + // Read data to pass into the child process input pipe. + // Called from Communicate thread. + size_t DoRead(void* buf, size_t len) override { + try { + with_lock (SyncData.BuffersMutex) { + while (!Buffer.HasData() && !Buffer.IsFinished()) { + SyncData.MainThreadHasWorkCond.Signal(); + SyncData.InputBufferCanReadCond.WaitI(SyncData.BuffersMutex); + } + + if (!Buffer.HasData()) { + Y_DEBUG_ABORT_UNLESS(Buffer.IsFinished()); + return 0; + } + + const char* dataPtr; + size_t dataLen; + Buffer.GetData(dataPtr, dataLen); + + size_t bytesRead = std::min(dataLen, len); + Y_DEBUG_ABORT_UNLESS(bytesRead > 0); + memcpy(buf, dataPtr, bytesRead); + Buffer.CommitRead(bytesRead); + return bytesRead; + } + + ythrow yexception(); + } catch (const std::exception& e) { + UdfTerminate((TStringBuilder() << Pos_ << " " << e.what()).data()); + } + } + + EFetchStatus ReadNextString() { + TUnboxedValue item; + EFetchStatus status = RowsStream.Fetch(item); + switch (status) { + case EFetchStatus::Yield: + case EFetchStatus::Finish: + return status; + default: + break; + } + + CurString = item.GetElement(0); + CurStringInput.Reset(CurString.AsStringRef().Data(), CurString.AsStringRef().Size()); + + // Check that input string doesn't contain delimiters + const char* match; + Y_UNUSED(match); + if (DelimiterMatcher.SubStr( + CurString.AsStringRef().Data(), + CurString.AsStringRef().Data() + CurString.AsStringRef().Size(), + match)) + { + ythrow yexception() << "Delimiter found in input string."; + } + + return EFetchStatus::Ok; + } + + private: + enum class ReadMode { + Start, + String, + Delimiter, + Done + }; + + TUnboxedValue RowsStream; + TString Delimiter; + TThreadSyncData& SyncData; + TSourcePosition Pos_; + + TKMPMatcher DelimiterMatcher; + TUnboxedValue CurString; + TMemoryInput CurStringInput; + TMemoryInput DelimiterInput; + + TCyclicRWBuffer Buffer; + + ReadMode CurReadMode; + }; + + class TStringListBufferedOutputStream: public IOutputStream { + public: + TStringListBufferedOutputStream(const TString& delimiter, size_t stringBufferSizeBytes, + TStringListBufferedInputStream& inputStream, TThreadSyncData& syncData) + : Delimiter(delimiter) + , InputStream(inputStream) + , SyncData(syncData) + , HasDelimiterMatch(false) + , DelimiterMatcherCallback(HasDelimiterMatch) + , DelimiterMatcher(delimiter.data(), delimiter.data() + delimiter.size(), &DelimiterMatcherCallback) + , Buffer(stringBufferSizeBytes) + { + } + + TStringListBufferedOutputStream(const TStringListBufferedOutputStream&) = delete; + TStringListBufferedOutputStream& operator=(const TStringListBufferedOutputStream&) = delete; + + // Get string record from buffer. + // Called from Main thread. + EFetchStatus FetchNextString(TString& str) { + while (!HasDelimiterMatch) { + with_lock (SyncData.BuffersMutex) { + bool inputHasData; + bool bufferNeedsData; + + do { + inputHasData = InputStream.GetBuffer().HasData() || InputStream.GetBuffer().IsFinished(); + bufferNeedsData = !Buffer.HasData() && !Buffer.IsFinished(); + + if (inputHasData && bufferNeedsData) { + SyncData.MainThreadHasWorkCond.WaitI(SyncData.BuffersMutex); + } + } while (inputHasData && bufferNeedsData); + + if (!inputHasData) { + auto status = InputStream.FetchInput(); + if (status == EFetchStatus::Yield) { + return EFetchStatus::Yield; + } + } + + if (bufferNeedsData) { + continue; + } + + if (!Buffer.HasData()) { + Y_DEBUG_ABORT_UNLESS(Buffer.IsFinished()); + str = TString(TStringBuf(CurrentString.Data(), CurrentString.Size())); + CurrentString.Clear(); + return str.empty() ? EFetchStatus::Finish : EFetchStatus::Ok; + } + + const char* data; + size_t size; + Buffer.GetData(data, size); + + size_t read = 0; + while (!HasDelimiterMatch && read < size) { + DelimiterMatcher.Push(data[read]); + ++read; + } + + Y_DEBUG_ABORT_UNLESS(read > 0); + CurrentString.Append(data, read); + bool signalCanWrite = !Buffer.CanWrite(); + Buffer.CommitRead(read); + + if (signalCanWrite) { + SyncData.OutputBufferCanWriteCond.Signal(); + } + } + } + + Y_DEBUG_ABORT_UNLESS(CurrentString.Size() >= Delimiter.size()); + str = TString(TStringBuf(CurrentString.Data(), CurrentString.Size() - Delimiter.size())); + CurrentString.Clear(); + HasDelimiterMatch = false; + + return EFetchStatus::Ok; + } + + TCyclicRWBuffer& GetBuffer() { + return Buffer; + } + + private: + // Write data from child process output to buffer. + // Called from Communicate thread. + void DoWrite(const void* buf, size_t len) override { + const char* curStrPos = reinterpret_cast<const char*>(buf); + size_t curStrLen = len; + + while (curStrLen > 0) { + with_lock (SyncData.BuffersMutex) { + while (!Buffer.CanWrite() && !Buffer.IsFinished()) { + SyncData.OutputBufferCanWriteCond.WaitI(SyncData.BuffersMutex); + } + + if (Buffer.IsFinished()) { + return; + } + + bool signalCanRead = !Buffer.HasData(); + Buffer.Write(curStrPos, curStrLen); + + if (signalCanRead) { + SyncData.MainThreadHasWorkCond.Signal(); + } + } + } + } + + void DoFinish() override { + IOutputStream::DoFinish(); + + with_lock (SyncData.BuffersMutex) { + Buffer.Finish(); + SyncData.MainThreadHasWorkCond.Signal(); + } + } + + private: + class MatcherCallback: public TKMPStreamMatcher<char>::ICallback { + public: + MatcherCallback(bool& hasMatch) + : HasMatch(hasMatch) + { + } + + void OnMatch(const char* begin, const char* end) override { + Y_UNUSED(begin); + Y_UNUSED(end); + + HasMatch = true; + } + + private: + bool& HasMatch; + }; + + private: + TString Delimiter; + TStringListBufferedInputStream& InputStream; + TThreadSyncData& SyncData; + + bool HasDelimiterMatch; + MatcherCallback DelimiterMatcherCallback; + TKMPStreamMatcher<char> DelimiterMatcher; + + TBuffer CurrentString; + + TCyclicRWBuffer Buffer; + }; + + class TStreamingOutputListIterator { + public: + TStreamingOutputListIterator(const TStreamingParams& params, const IValueBuilder* valueBuilder, TSourcePosition pos) + : StreamingParams(params) + , ValueBuilder(valueBuilder) + , Pos_(pos) + { + } + + TStreamingOutputListIterator(const TStreamingOutputListIterator&) = delete; + TStreamingOutputListIterator& operator=(const TStreamingOutputListIterator&) = delete; + + ~TStreamingOutputListIterator() { + if (ShellCommand) { + Y_DEBUG_ABORT_UNLESS(InputStream && OutputStream); + + try { + ShellCommand->Terminate(); + } catch (const std::exception& e) { + Cerr << CurrentExceptionMessage(); + } + + // Let Communicate thread finish. + with_lock (ThreadSyncData.BuffersMutex) { + InputStream->GetBuffer().Finish(); + OutputStream->GetBuffer().Finish(); + ThreadSyncData.InputBufferCanReadCond.Signal(); + ThreadSyncData.OutputBufferCanWriteCond.Signal(); + } + + ShellCommand->Wait(); + } + } + + EFetchStatus Fetch(TUnboxedValue& result) { + try { + EFetchStatus status = EFetchStatus::Ok; + + if (!ProcessStarted()) { + StartProcess(); + + // Don't try to fetch data if there was a problem starting the process, + // this causes infinite wait on Windows system due to incorrect ShellCommand behavior. + if (ShellCommand->GetStatus() != TShellCommand::SHELL_RUNNING && ShellCommand->GetStatus() != TShellCommand::SHELL_FINISHED) { + status = EFetchStatus::Finish; + } + } + + if (status == EFetchStatus::Ok) { + status = OutputStream->FetchNextString(CurrentRecord); + } + + if (status == EFetchStatus::Finish) { + switch (ShellCommand->GetStatus()) { + case TShellCommand::SHELL_FINISHED: + break; + case TShellCommand::SHELL_INTERNAL_ERROR: + ythrow yexception() << "Internal error running process: " << ShellCommand->GetInternalError(); + break; + case TShellCommand::SHELL_ERROR: + ythrow yexception() << "Error running user process: " << ShellCommand->GetError(); + break; + default: + ythrow yexception() << "Unexpected shell command status: " << (int)ShellCommand->GetStatus(); + } + return EFetchStatus::Finish; + } + + if (status == EFetchStatus::Ok) { + TUnboxedValue* items = nullptr; + result = ValueBuilder->NewArray(1, items); + *items = ValueBuilder->NewString(TStringRef(CurrentRecord.data(), CurrentRecord.size())); + } + + return status; + } catch (const std::exception& e) { + UdfTerminate((TStringBuilder() << Pos_ << " " << e.what()).data()); + } + } + + private: + void StartProcess() { + InputStream.Reset(new TStringListBufferedInputStream( + StreamingParams.InputStreamObj, StreamingParams.InputDelimiter, + StreamingParams.InputBufferSizeBytes, ThreadSyncData, Pos_)); + + OutputStream.Reset(new TStringListBufferedOutputStream( + StreamingParams.OutputDelimiter, StreamingParams.OutputBufferSizeBytes, *InputStream, + ThreadSyncData)); + + TShellCommandOptions opt; + opt.SetAsync(true).SetUseShell(false).SetLatency(StreamingParams.ProcessPollLatencyMs).SetInputStream(InputStream.Get()).SetOutputStream(OutputStream.Get()).SetCloseStreams(true).SetCloseAllFdsOnExec(true); + + TList<TString> commandArguments; + auto argumetsIterator = StreamingParams.ArgumentsList.GetListIterator(); + for (TUnboxedValue item; argumetsIterator.Next(item);) { + commandArguments.emplace_back(TStringBuf(item.AsStringRef())); + } + + ShellCommand.Reset(new TShellCommand(StreamingParams.CommandLine, commandArguments, opt)); + ShellCommand->Run(); + } + + bool ProcessStarted() const { + return !!ShellCommand; + } + + private: + TStreamingParams StreamingParams; + const IValueBuilder* ValueBuilder; + TSourcePosition Pos_; + + TThreadSyncData ThreadSyncData; + + THolder<TShellCommand> ShellCommand; + THolder<TStringListBufferedInputStream> InputStream; + THolder<TStringListBufferedOutputStream> OutputStream; + + TString CurrentRecord; + }; + + class TStreamingOutput: public TBoxedValue { + public: + TStreamingOutput(const TStreamingParams& params, const IValueBuilder* valueBuilder, TSourcePosition pos) + : StreamingParams(params) + , ValueBuilder(valueBuilder) + , Pos_(pos) + { + } + + TStreamingOutput(const TStreamingOutput&) = delete; + TStreamingOutput& operator=(const TStreamingOutput&) = delete; + + private: + EFetchStatus Fetch(TUnboxedValue& result) override { + if (IsFinished) { + return EFetchStatus::Finish; + } + + if (!Iterator) { + Iterator.Reset(new TStreamingOutputListIterator(StreamingParams, ValueBuilder, Pos_)); + } + + auto ret = Iterator->Fetch(result); + + if (ret == EFetchStatus::Finish) { + IsFinished = true; + Iterator.Reset(); + } + + return ret; + } + + TStreamingParams StreamingParams; + const IValueBuilder* ValueBuilder; + TSourcePosition Pos_; + bool IsFinished = false; + THolder<TStreamingOutputListIterator> Iterator; + }; + + class TStreamingScriptOutput: public TStreamingOutput { + public: + TStreamingScriptOutput(const TStreamingParams& params, const IValueBuilder* valueBuilder, + TSourcePosition pos, const TString& script, const TString& scriptFilename) + : TStreamingOutput(params, valueBuilder, pos) + , ScriptFileHandle(scriptFilename) + { + auto scriptStripped = StripBeforeShebang(script); + ScriptFileHandle.Write(scriptStripped.data(), scriptStripped.size()); + ScriptFileHandle.Close(); + + if (Chmod(ScriptFileHandle.Name().c_str(), MODE0755) != 0) { + ythrow yexception() << "Chmod failed for script file:" << ScriptFileHandle.Name() + << " with error: " << LastSystemErrorText(); + } + } + + private: + static TString StripBeforeShebang(const TString& script) { + auto shebangIndex = script.find("#!"); + if (shebangIndex != TString::npos) { + auto scriptStripped = StripStringLeft(script); + + if (scriptStripped.size() == script.size() - shebangIndex) { + return scriptStripped; + } + } + + return script; + } + + TTempFileHandle ScriptFileHandle; + }; + + class TStreamingProcess: public TBoxedValue { + public: + TStreamingProcess(TSourcePosition pos) + : Pos_(pos) + {} + + private: + TUnboxedValue Run(const IValueBuilder* valueBuilder, + const TUnboxedValuePod* args) const override { + auto inputListArg = args[0]; + auto commandLineArg = args[1].AsStringRef(); + auto argumentsArg = args[2]; + auto inputDelimiterArg = args[3]; + auto outputDelimiterArg = args[4]; + + Y_DEBUG_ABORT_UNLESS(inputListArg.IsBoxed()); + + TStreamingParams params; + params.InputStreamObj = TUnboxedValuePod(inputListArg); + params.CommandLine = TString(TStringBuf(commandLineArg)); + params.ArgumentsList = !argumentsArg + ? valueBuilder->NewEmptyList() + : TUnboxedValue(argumentsArg.GetOptionalValue()); + + if (inputDelimiterArg) { + params.InputDelimiter = TString(TStringBuf(inputDelimiterArg.AsStringRef())); + } + if (outputDelimiterArg) { + params.OutputDelimiter = TString(TStringBuf(outputDelimiterArg.AsStringRef())); + } + + return TUnboxedValuePod(new TStreamingOutput(params, valueBuilder, Pos_)); + } + + public: + static TStringRef Name() { + static auto name = TStringRef::Of("Process"); + return name; + } + + private: + TSourcePosition Pos_; + }; + + class TStreamingProcessInline: public TBoxedValue { + public: + TStreamingProcessInline(TSourcePosition pos) + : Pos_(pos) + {} + + private: + TUnboxedValue Run(const IValueBuilder* valueBuilder, + const TUnboxedValuePod* args) const override { + auto inputListArg = args[0]; + auto scriptArg = args[1].AsStringRef(); + auto argumentsArg = args[2]; + auto inputDelimiterArg = args[3]; + auto outputDelimiterArg = args[4]; + + TString script(scriptArg); + TString scriptFilename = MakeTempName("."); + + TStreamingParams params; + params.InputStreamObj = TUnboxedValuePod(inputListArg); + params.CommandLine = scriptFilename; + params.ArgumentsList = !argumentsArg + ? valueBuilder->NewEmptyList() + : TUnboxedValue(argumentsArg.GetOptionalValue()); + + if (inputDelimiterArg) { + params.InputDelimiter = TString(TStringBuf(inputDelimiterArg.AsStringRef())); + } + if (outputDelimiterArg) { + params.OutputDelimiter = TString(TStringBuf(outputDelimiterArg.AsStringRef())); + } + + return TUnboxedValuePod(new TStreamingScriptOutput(params, valueBuilder, Pos_, script, scriptFilename)); + } + + public: + static TStringRef Name() { + static auto name = TStringRef::Of("ProcessInline"); + return name; + } + + private: + TSourcePosition Pos_; + }; + + class TStreamingModule: public IUdfModule { + public: + TStringRef Name() const { + return TStringRef::Of("Streaming"); + } + + void CleanupOnTerminate() const final { + } + + void GetAllFunctions(IFunctionsSink& sink) const final { + sink.Add(TStreamingProcess::Name()); + sink.Add(TStreamingProcessInline::Name()); + } + + void BuildFunctionTypeInfo( + const TStringRef& name, + NUdf::TType* userType, + const TStringRef& typeConfig, + ui32 flags, + IFunctionTypeInfoBuilder& builder) const override { + try { + Y_UNUSED(userType); + Y_UNUSED(typeConfig); + + bool typesOnly = (flags & TFlags::TypesOnly); + + auto optionalStringType = builder.Optional()->Item<char*>().Build(); + auto rowType = builder.Struct(1)->AddField("Data", TDataType<char*>::Id, nullptr).Build(); + auto rowsType = builder.Stream()->Item(rowType).Build(); + auto stringListType = builder.List()->Item(TDataType<char*>::Id).Build(); + auto optionalStringListType = builder.Optional()->Item(stringListType).Build(); + + if (TStreamingProcess::Name() == name) { + builder.Returns(rowsType).Args()->Add(rowsType).Add<char*>().Add(optionalStringListType).Add(optionalStringType).Add(optionalStringType).Done().OptionalArgs(3); + + if (!typesOnly) { + builder.Implementation(new TStreamingProcess(builder.GetSourcePosition())); + } + } + + if (TStreamingProcessInline::Name() == name) { + builder.Returns(rowsType).Args()->Add(rowsType).Add<char*>().Add(optionalStringListType).Add(optionalStringType).Add(optionalStringType).Done().OptionalArgs(3); + + if (!typesOnly) { + builder.Implementation(new TStreamingProcessInline(builder.GetSourcePosition())); + } + } + } catch (const std::exception& e) { + builder.SetError(CurrentExceptionMessage()); + } + } + }; + +} + +REGISTER_MODULES(TStreamingModule) |