aboutsummaryrefslogblamecommitdiffstats
path: root/util/system/direct_io.cpp
blob: f59c54b0cbdb1e9b9fbe99a461a593289f0979c9 (plain) (tree)
1
2
3
4
5
6
7
8


                                    
                             
                  
              

                                 
      
 
           
                                 


                            
 
                                                                                            
 
                                                                                  
 


                                                              
                                                              
                              
                                                                    
                                                       
                    


                                                                                                                     
             





                         
                                                                                                               





                       
                                                                   

                         
                                                             













                                                                                             
                            
               

                               
                                                                                                   
            
                                                                                                    
     
                     




                                                 


                   

                                         

                                               
 
 
                                      

                 
 
                                                                         
                                              








                                                                                       
                                                                      












                                                                                     
 
                                        
 



                                                         
                                                                                      



                                             



                                                                       
 
                        
                                                                                                                      
     


                                                 
 
















                                                                                           
     
 

                         
 

                                                                                                       
 
                       
                                                                
                                                             
                        
                  
         
                                                            
 


                                                  
                           
     
                     
 


                                                                    
 
                                                                                  
                     
                 
     
                            
                                                                   

                                                                      
         
                                                                   
                                                                      
                
                                                                                 
     

                                
                                                                                       

                                                                    
 
                                    
 



                                                                  
 

                                                                                     
     
#include "direct_io.h"

#include <util/generic/singleton.h>
#include <util/generic/yexception.h>
#include <util/system/info.h>
#include "align.h"

#ifdef _linux_
    #include <util/string/cast.h>
    #include <linux/version.h>
    #include <sys/utsname.h>
#endif

namespace {
    struct TAlignmentCalcer {
        inline TAlignmentCalcer()
            : Alignment(0)
        {
#ifdef _linux_
            utsname sysInfo;

            Y_VERIFY(!uname(&sysInfo), "Error while call uname: %s", LastSystemErrorText());

            TStringBuf release(sysInfo.release);
            release = release.substr(0, release.find_first_not_of(".0123456789"));

            int v1 = FromString<int>(release.NextTok('.'));
            int v2 = FromString<int>(release.NextTok('.'));
            int v3 = FromString<int>(release.NextTok('.'));
            int linuxVersionCode = KERNEL_VERSION(v1, v2, v3);

            if (linuxVersionCode < KERNEL_VERSION(2, 4, 10)) {
                Alignment = 0;
            } else if (linuxVersionCode < KERNEL_VERSION(2, 6, 0)) {
                Alignment = NSystemInfo::GetPageSize();
            } else {
                // Default alignment used to be 512, but most modern devices rely on 4k physical blocks.
                // 4k alignment works well for both 512 and 4k blocks and doesn't require 512e support in the kernel.
                // See IGNIETFERRO-946.
                Alignment = 4096;
            }
#endif
        }

        size_t Alignment;
    };
}

TDirectIOBufferedFile::TDirectIOBufferedFile(const TString& path, EOpenMode oMode, size_t buflen /*= 1 << 17*/)
    : File(path, oMode)
    , Alignment(0)
    , DataLen(0)
    , ReadPosition(0)
    , WritePosition(0)
    , DirectIO(false)
{
    if (buflen == 0) {
        ythrow TFileError() << "unbuffered usage is not supported";
    }

    if (oMode & Direct) {
        Alignment = Singleton<TAlignmentCalcer>()->Alignment;
        SetDirectIO(true);
    }

    WritePosition = File.GetLength();
    FlushedBytes = WritePosition;
    FlushedToDisk = FlushedBytes;
    BufLen = (!!Alignment) ? AlignUp(buflen, Alignment) : buflen;
    BufferStorage.Resize(BufLen + Alignment);
    Buffer = (!!Alignment) ? AlignUp(BufferStorage.Data(), Alignment) : BufferStorage.Data();
}

#define DIRECT_IO_FLAGS (O_DIRECT | O_SYNC)

void TDirectIOBufferedFile::SetDirectIO(bool value) {
#ifdef _linux_
    if (DirectIO == value) {
        return;
    }

    if (!!Alignment && value) {
        (void)fcntl(File.GetHandle(), F_SETFL, fcntl(File.GetHandle(), F_GETFL) | DIRECT_IO_FLAGS);
    } else {
        (void)fcntl(File.GetHandle(), F_SETFL, fcntl(File.GetHandle(), F_GETFL) & ~DIRECT_IO_FLAGS);
    }

    DirectIO = value;
#else
    DirectIO = value;
#endif
}

TDirectIOBufferedFile::~TDirectIOBufferedFile() {
    try {
        Finish();
    } catch (...) {
    }
}

void TDirectIOBufferedFile::FlushData() {
    WriteToFile(Buffer, DataLen, FlushedBytes);
    DataLen = 0;
    File.FlushData();
}

void TDirectIOBufferedFile::Finish() {
    FlushData();
    File.Flush();
    File.Close();
}

void TDirectIOBufferedFile::Write(const void* buffer, size_t byteCount) {
    WriteToBuffer(buffer, byteCount, DataLen);
    WritePosition += byteCount;
}

void TDirectIOBufferedFile::WriteToBuffer(const void* buf, size_t len, ui64 position) {
    while (len > 0) {
        size_t writeLen = Min<size_t>(BufLen - position, len);

        if (writeLen > 0) {
            memcpy((char*)Buffer + position, buf, writeLen);
            buf = (char*)buf + writeLen;
            len -= writeLen;
            DataLen = (size_t)Max(position + writeLen, (ui64)DataLen);
            position += writeLen;
        }

        if (DataLen == BufLen) {
            WriteToFile(Buffer, DataLen, FlushedBytes);
            DataLen = 0;
            position = 0;
        }
    }
}

void TDirectIOBufferedFile::WriteToFile(const void* buf, size_t len, ui64 position) {
    if (!!len) {
        SetDirectIO(IsAligned(buf) && IsAligned(len) && IsAligned(position));

        File.Pwrite(buf, len, position);

        FlushedBytes = Max(FlushedBytes, position + len);
        FlushedToDisk = Min(FlushedToDisk, position);
    }
}

size_t TDirectIOBufferedFile::PreadSafe(void* buffer, size_t byteCount, ui64 offset) {
    if (FlushedToDisk < offset + byteCount) {
        File.FlushData();
        FlushedToDisk = FlushedBytes;
    }

#ifdef _linux_
    ssize_t bytesRead = 0;
    do {
        bytesRead = pread(File.GetHandle(), buffer, byteCount, offset);
    } while (bytesRead == -1 && errno == EINTR);

    if (bytesRead < 0) {
        ythrow yexception() << "error while pread file: " << LastSystemError() << "(" << LastSystemErrorText() << ")";
    }

    return bytesRead;
#else
    return File.Pread(buffer, byteCount, offset);
#endif
}

size_t TDirectIOBufferedFile::ReadFromFile(void* buffer, size_t byteCount, ui64 offset) {
    SetDirectIO(true);

    ui64 bytesRead = 0;

    while (byteCount) {
        if (!Alignment || IsAligned(buffer) && IsAligned(byteCount) && IsAligned(offset)) {
            if (const ui64 fromFile = PreadSafe(buffer, byteCount, offset)) {
                buffer = (char*)buffer + fromFile;
                byteCount -= fromFile;
                offset += fromFile;
                bytesRead += fromFile;
            } else {
                return bytesRead;
            }
        } else {
            break;
        }
    }

    if (!byteCount) {
        return bytesRead;
    }

    ui64 bufSize = AlignUp(Min<size_t>(BufferStorage.Size(), byteCount + (Alignment << 1)), Alignment);
    TBuffer readBufferStorage(bufSize + Alignment);
    char* readBuffer = AlignUp((char*)readBufferStorage.Data(), Alignment);

    while (byteCount) {
        ui64 begin = AlignDown(offset, (ui64)Alignment);
        ui64 end = AlignUp(offset + byteCount, (ui64)Alignment);
        ui64 toRead = Min(end - begin, bufSize);
        ui64 fromFile = PreadSafe(readBuffer, toRead, begin);

        if (!fromFile) {
            break;
        }

        ui64 delta = offset - begin;
        ui64 count = Min<ui64>(fromFile - delta, byteCount);

        memcpy(buffer, readBuffer + delta, count);
        buffer = (char*)buffer + count;
        byteCount -= count;
        offset += count;
        bytesRead += count;
    }
    return bytesRead;
}

size_t TDirectIOBufferedFile::Read(void* buffer, size_t byteCount) {
    size_t bytesRead = Pread(buffer, byteCount, ReadPosition);
    ReadPosition += bytesRead;
    return bytesRead;
}

size_t TDirectIOBufferedFile::Pread(void* buffer, size_t byteCount, ui64 offset) {
    if (!byteCount) {
        return 0;
    }

    size_t readFromFile = 0;
    if (offset < FlushedBytes) {
        readFromFile = Min<ui64>(byteCount, FlushedBytes - offset);
        size_t bytesRead = ReadFromFile(buffer, readFromFile, offset);
        if (bytesRead != readFromFile || readFromFile == byteCount) {
            return bytesRead;
        }
    }
    ui64 start = offset > FlushedBytes ? offset - FlushedBytes : 0;
    ui64 count = Min<ui64>(DataLen - start, byteCount - readFromFile);
    if (count) {
        memcpy((char*)buffer + readFromFile, (const char*)Buffer + start, count);
    }
    return count + readFromFile;
}

void TDirectIOBufferedFile::Pwrite(const void* buffer, size_t byteCount, ui64 offset) {
    if (offset > WritePosition) {
        ythrow yexception() << "cannot frite to position" << offset;
    }

    size_t writeToBufer = byteCount;
    size_t writeToFile = 0;

    if (FlushedBytes > offset) {
        writeToFile = Min<ui64>(byteCount, FlushedBytes - offset);
        WriteToFile(buffer, writeToFile, offset);
        writeToBufer -= writeToFile;
    }

    if (writeToBufer > 0) {
        ui64 bufferOffset = offset + writeToFile - FlushedBytes;
        WriteToBuffer((const char*)buffer + writeToFile, writeToBufer, bufferOffset);
    }
}