Public Member Functions | Public Attributes | Private Member Functions | Private Attributes

hsm_HL7CacheFile Class Reference

#include <hsm_HL7CacheFile.h>

Collaboration diagram for hsm_HL7CacheFile:
Collaboration graph
[legend]

List of all members.

Public Member Functions

 hsm_HL7CacheFile (const cmn_Path &a_path)
void AddMessageToCache (string &a_message)
cmn_Path ReadNextFile (vector< string > &a_messageList)
bool IsAnyFileToProcess ()
void DeleteProcessingFile (cmn_Path &a_processingFilePath)

Public Attributes

cmn_Mutex m_list_x

Private Member Functions

void RemoveFromOnDiskList (string &a_fileName)
string HandleCoruptedFile (const cmn_Path &a_fileName)
UInt32_t CountNumberOfMsgs ()
string GetNextMessage (vector< char > &buffer, UInt64_t a_bufferSize, UInt32_t &a_position)
void CreateFile ()
string DumpState ()

Private Attributes

 log_CLASSID_m
vector< hl7MsgFileName_tm_onDiskLists
UInt32_t m_lastFileNum
cmn_Path m_filePath
cmn_File m_currentFile
UInt32_t m_currentNumMsg

Detailed Description

Definition at line 57 of file hsm_HL7CacheFile.h.


Constructor & Destructor Documentation

hsm_HL7CacheFile::hsm_HL7CacheFile ( const cmn_Path a_path  ) 

Definition at line 50 of file hsm_HL7CacheFile.cpp.

References cmn_File::CloseF(), cmn_CreatePath(), cmn_Str2Num(), CountNumberOfMsgs(), CreateFile(), dbg_DETAIL, dbg_LOW, dbg_NORM, DumpState(), evt_ERROR, cmn_File::Exists(), fom_READ, ivd_BaseException::GetFriendly(), cmn_File::GetFullPathRef(), cmn_FastDirLst::GetNextName(), HandleCoruptedFile(), log_DBG_m, log_ERR_m, log_FUNC_m, log_WriteEvent(), m_currentFile, m_currentNumMsg, hl7MsgFileName_t::m_fileName, hl7MsgFileName_t::m_fileNum, m_filePath, m_lastFileNum, m_onDiskLists, cmn_File::OpenF(), and cmn_File::SetFullPath().

    : m_lastFileNum(0),
      m_filePath(a_path),
      m_currentNumMsg(0)
{
    log_FUNC_m(hsm_HL7CacheFile);

    cmn_File listPath(m_filePath);
    if (!listPath.Exists()) {
        cmn_CreatePath(m_filePath);
    }

    log_DBG_m(dbg_DETAIL, "m_filePath: " << m_filePath);

    cmn_FastDirLst dirList(m_filePath);
    ivd_GenInode_t gInode;
    ivd_FileType_e fType;
    string foundName = dirList.GetNextName(gInode, fType);
    while (foundName.length() != 0) {
        hl7MsgFileName_t f;
        f.m_fileName = foundName;
        UInt32_t counterLength = f.m_fileName.find(".");
        string ext = f.m_fileName.substr(counterLength + 1, f.m_fileName.size());

        if (ext.compare("msg") == 0) { //process only files with .msg
            f.m_fileNum = cmn_Str2Num(f.m_fileName.substr(0, counterLength));

            log_DBG_m(dbg_NORM, "Found file: " << f.m_fileName <<
                                ", fileNum:" << f.m_fileNum);

            if (m_lastFileNum < f.m_fileNum) {
                m_lastFileNum = f.m_fileNum;
            }
            m_onDiskLists.push_back(f);
            log_DBG_m(dbg_DETAIL, "Added file: " << f.m_fileName <<
                                  ", fileNum:" << f.m_fileNum);
        }
        foundName = dirList.GetNextName(gInode, fType);
    }

    if (m_onDiskLists.size() == 0) {
        CreateFile();
        log_DBG_m(dbg_DETAIL, "m_lastFileNum: " << m_lastFileNum);
    }
    else {
        m_currentFile.SetFullPath(m_filePath + m_onDiskLists[m_onDiskLists.size()-1].m_fileName);
        try {
            m_currentFile.OpenF(fom_READ);
            m_currentNumMsg = CountNumberOfMsgs();
            m_currentFile.CloseF();
        }
        catch (ivd_Error &e) {
            m_currentFile.CloseF();

            cmn_Path currentFile = m_currentFile.GetFullPathRef();
            string errFile = HandleCoruptedFile(currentFile);

            ostringstream sstr;
            sstr << "Cannot read HL7 queue file. File renamed to: " << errFile;
            log_WriteEvent(evt_ERROR, sstr.str());
            log_ERR_m(e.GetFriendly());

            // Create a new file for processing
            CreateFile();
        }
    }
    log_DBG_m(dbg_LOW, DumpState());
}

Here is the call graph for this function:


Member Function Documentation

void hsm_HL7CacheFile::AddMessageToCache ( string &  a_message  ) 

Definition at line 121 of file hsm_HL7CacheFile.cpp.

References c_hl7msg_list_size(), c_signature(), cmn_File::CloseF(), CountNumberOfMsgs(), CreateFile(), dbg_DETAIL, evt_ERROR, fom_OPEN_ALWAYS, fom_READ, fom_WRITE, cmn_Path::GetFileName(), ivd_BaseException::GetFriendly(), cmn_File::GetFullPathRef(), HandleCoruptedFile(), ie_FILE_ERROR, cmn_File::IsOpen(), ivd_Error, log_DBG_m, log_ERR_m, log_FUNC_m, log_WriteEvent(), m_currentFile, m_currentNumMsg, m_list_x, cmn_File::OpenF(), cmn_File::SeekEndF(), and cmn_File::WriteF().

                                                          {
    log_FUNC_m(AddMessageToCache);

    cmn_MutexLock l(m_list_x);

    if (!m_currentFile.IsOpen()) {
        m_currentFile.OpenF(fom_READ | fom_WRITE | fom_OPEN_ALWAYS);
    }

    if (m_currentNumMsg == 0) {
        try {
            m_currentNumMsg = CountNumberOfMsgs();
        }
        catch(ivd_Error &e) {
            m_currentFile.CloseF();

            cmn_Path currentFile = m_currentFile.GetFullPathRef();
            string errFile = HandleCoruptedFile(currentFile);

            ostringstream sstr;
            sstr << "Cannot read HL7 queue file. File renamed to: " << errFile;
            log_WriteEvent(evt_ERROR, sstr.str());
            log_ERR_m(e.GetFriendly());

            // Create a new file for processing
            CreateFile();
        }
    }

    m_currentFile.SeekEndF();
    log_DBG_m(dbg_DETAIL, "Adding message to file: " << m_currentFile.GetFullPathRef().GetFileName());

    fileEntry_t fe;
    strcpy(fe.m_signature, c_signature);
    fe.m_version = c_fileEntryVersion;
    fe.m_msgSize = a_message.size();

    /*
    log_DBG_m(dbg_DETAIL, "fe.m_signature: " << fe.m_signature << endl <<
                          "fe.m_version: " << fe.m_version << endl <<
                          "fe.m_msgSize: " << fe.m_msgSize);
                          */

    // Write message info
    UInt64_t written = m_currentFile.WriteF(&fe, sizeof(fe));
    if (written != sizeof(fe)) {
        ostringstream sstr;
        sstr << "Writing to file: " << m_currentFile.GetFullPathRef() <<  " failed.";
        throw ivd_Error(ie_FILE_ERROR, sstr.str());
    }

    // Write message
    written = m_currentFile.WriteF(a_message.c_str(), a_message.size());
    if (written != a_message.size()) {
        ostringstream sstr;
        sstr << "Writing to file: " << m_currentFile.GetFullPathRef() <<  " failed.";
        throw ivd_Error(ie_FILE_ERROR, sstr.str());
    }

    //log_DBG_m(dbg_DETAIL, "msg written: " << a_message);
    m_currentNumMsg++;

    if (m_currentNumMsg >= c_hl7msg_list_size) {
        log_DBG_m(dbg_DETAIL, "c_hl7msg_list_size exceeded " << m_currentNumMsg);
        CreateFile();
    }
}

Here is the call graph for this function:

UInt32_t hsm_HL7CacheFile::CountNumberOfMsgs (  )  [private]

Definition at line 329 of file hsm_HL7CacheFile.cpp.

References dbg_DETAIL, cmn_File::GetFullPathRef(), GetNextMessage(), cmn_File::GetSize(), ie_FILE_ERROR, ivd_Error, log_DBG_m, log_FUNC_m, m_currentFile, cmn_File::ReadF(), cmn_File::SeekF(), and size.

Referenced by AddMessageToCache(), and hsm_HL7CacheFile().

                                             {
    log_FUNC_m(CountNumberOfMsgs);

    UInt32_t size(static_cast<UInt32_t>(m_currentFile.GetSize()));
    log_DBG_m(dbg_DETAIL, " file size: " << size);

    if (size == 0) {
        return 0;
    }

    vector<char> buffer;
    buffer.resize(size);

    m_currentFile.SeekF(0);

    UInt32_t read = m_currentFile.ReadF(&buffer[0], size);
    log_DBG_m(dbg_DETAIL, "Reading done:" << read);
    if (read != size) {
        ostringstream sstr;
        sstr << "Could not read file " << m_currentFile.GetFullPathRef();
        throw ivd_Error(ie_FILE_ERROR, sstr.str());
    }

    UInt32_t counter(0);
    UInt32_t position(0);

    while (position < size) {
        GetNextMessage(buffer, size, position);
        counter++;
    }

    log_DBG_m(dbg_DETAIL, "counter: " << counter);
    return counter;
}

Here is the call graph for this function:

Here is the caller graph for this function:

void hsm_HL7CacheFile::CreateFile (  )  [private]
void hsm_HL7CacheFile::DeleteProcessingFile ( cmn_Path a_processingFilePath  ) 

Definition at line 266 of file hsm_HL7CacheFile.cpp.

References CreateFile(), dbg_DETAIL, dbg_LOW, dbg_NORM, cmn_File::DeleteF(), DumpState(), cmn_Path::GetFileName(), ivd_BaseException::GetFriendly(), log_DBG_m, log_FUNC_m, m_lastFileNum, m_list_x, m_onDiskLists, RemoveFromOnDiskList(), and ivd_BaseException::what().

                                                                          {
    log_FUNC_m(DeleteProcessingFile);

    cmn_MutexLock l(m_list_x);
    cmn_File f(a_processingFilePath);
    try {
        f.DeleteF();
        log_DBG_m(dbg_DETAIL, "deleted:" << a_processingFilePath);
    } catch (ivd_Error& e) {
        log_DBG_m(dbg_LOW, "Deleting of file failed." << e.GetFriendly());
    } catch (ivd_SysError& se) {
        log_DBG_m(dbg_LOW, "Deleting of file failed." << se.what());
    }
    catch(...) {
        log_DBG_m(dbg_LOW, "Deleting of file failed. Unknown exception.");
    }

    string fileName = a_processingFilePath.GetFileName();

    RemoveFromOnDiskList(fileName);

    if (m_onDiskLists.size() == 0) {
        m_lastFileNum = 0;
        CreateFile();
    }

    log_DBG_m(dbg_NORM, DumpState());
}

Here is the call graph for this function:

string hsm_HL7CacheFile::DumpState (  )  [private]

Definition at line 474 of file hsm_HL7CacheFile.cpp.

References cmn_Path::GetFileName(), cmn_File::GetFullPathRef(), m_currentFile, m_currentNumMsg, m_lastFileNum, and m_onDiskLists.

Referenced by DeleteProcessingFile(), hsm_HL7CacheFile(), and ReadNextFile().

                                   {
    ostringstream sstr;
    sstr << "FileName:" << m_currentFile.GetFullPathRef().GetFileName() <<
            ", m_currentNumReq:" << m_currentNumMsg <<
            ", m_lastFileNum:" << m_lastFileNum <<
            ", m_onDiskLists.size():" << m_onDiskLists.size() << endl;
    for (UInt32_t i(0); i < m_onDiskLists.size(); i++) {
        sstr << i << ": m_fileName:" << m_onDiskLists[i].m_fileName <<
                     ", m_fileNum:" << m_onDiskLists[i].m_fileNum << endl;
    };
    return sstr.str();
}

Here is the call graph for this function:

Here is the caller graph for this function:

string hsm_HL7CacheFile::GetNextMessage ( vector< char > &  buffer,
UInt64_t  a_bufferSize,
UInt32_t a_position 
) [private]

Definition at line 383 of file hsm_HL7CacheFile.cpp.

References c_fileEntryVersion(), c_signature(), ie_DATA_CORRUPTION, ivd_Error, fileEntry_t::m_msgSize, fileEntry_t::m_signature, and fileEntry_t::m_version.

Referenced by CountNumberOfMsgs(), and ReadNextFile().

                                                              {
    //log_FUNC_m(GetNextMessage);

    fileEntry_t entry;

    UInt32_t fixContentSize(sizeof(fileEntry_t));
    UInt64_t bufferLeft = a_bufferSize - a_position;

    /*
    log_DBG_m(dbg_DETAIL, "fixContentSize: " << fixContentSize <<
                          ", a_position: " << a_position);
                          */

    //log_DBG_m(dbg_DETAIL, "header dump: " << endl << cmn_HexDump(&a_buffer[0], fixContentSize, 30, true) );
    if (fixContentSize > bufferLeft) {
        //reposition to next signature
        ostringstream sstr;
        sstr << "File size is smaller then expected. Size " << bufferLeft <<
                ", expected " << fixContentSize;
        throw ivd_Error(ie_DATA_CORRUPTION, sstr.str());
    }
    memcpy(&entry, &a_buffer[0] + a_position, fixContentSize);

/*
    log_DBG_m(dbg_DETAIL, "entry.m_signature: " << entry.m_signature << endl <<
                          "entry.m_version: " << entry.m_version << endl <<
                          "entry.m_msgSize: " << entry.m_msgSize);
                          */

    if (strcmp(entry.m_signature, c_signature) != 0) {
        ostringstream sstr;
        sstr << "Entry signature not correct: " << entry.m_signature;
        throw ivd_Error(ie_DATA_CORRUPTION, sstr.str());
    }
    if (entry.m_version != c_fileEntryVersion) {
        ostringstream msg;
        msg << "Entry version not correct: " << entry.m_version;
        throw ivd_Error(ie_DATA_CORRUPTION, msg.str());
    }
    if (entry.m_msgSize == 0) {
        ostringstream sstr;
        sstr << "Message size is 0 ";
        throw ivd_Error(ie_DATA_CORRUPTION, sstr.str());
    }

    //log_DBG_m(dbg_DETAIL, "msgSize: " << entry.m_msgSize);

    if (fixContentSize + entry.m_msgSize > a_bufferSize) {
        //reposition to next signature
        ostringstream sstr;
        sstr << "File size is smaller then expected. Size" << a_bufferSize <<
                ", expected " << fixContentSize + entry.m_msgSize;
        throw ivd_Error(ie_DATA_CORRUPTION, sstr.str());
    }

    vector<char> buffer(entry.m_msgSize+1, '\0');
    //log_DBG_m(dbg_DETAIL, "msg dump: " << endl << cmn_HexDump(&a_buffer[0] + a_position + fixContentSize, entry.m_msgSize, 30, true) );
    memcpy(&buffer[0], &a_buffer[0] + a_position + fixContentSize, entry.m_msgSize);

    string msg(&buffer[0]);
    //log_DBG_m(dbg_DETAIL, "msg read: " << msg);

    a_position += fixContentSize + entry.m_msgSize;
    //log_DBG_m(dbg_DETAIL, "a_position: " << a_position);

    return msg;
}

Here is the call graph for this function:

Here is the caller graph for this function:

string hsm_HL7CacheFile::HandleCoruptedFile ( const cmn_Path a_fileName  )  [private]

Definition at line 312 of file hsm_HL7CacheFile.cpp.

References cmn_MoveFile(), cmn_Path::GetFileName(), cmn_Time::GetTimeStamp(), log_FUNC_m, RemoveFromOnDiskList(), and cmn_Path::UpPath().

Referenced by AddMessageToCache(), hsm_HL7CacheFile(), and ReadNextFile().

                                                                          {
    log_FUNC_m(HandleCoruptedFile);

    cmn_Time time;
    string fileName = a_coruptedFile.GetFileName();
    string errFileName = fileName + "_" + time.GetTimeStamp() + ".err";
    cmn_Path errFile = a_coruptedFile.UpPath() + errFileName;

    cmn_MoveFile(a_coruptedFile, errFile);

    RemoveFromOnDiskList(fileName);

    return errFile;
}

Here is the call graph for this function:

Here is the caller graph for this function:

bool hsm_HL7CacheFile::IsAnyFileToProcess (  ) 

Definition at line 366 of file hsm_HL7CacheFile.cpp.

References dbg_DETAIL, log_DBG_m, log_FUNC_m, m_currentNumMsg, and m_onDiskLists.

                                          {
    log_FUNC_m(IsAnyFileToProcess);

    if (m_onDiskLists.size() > 0) {
        if (m_onDiskLists.size() == 1 && m_currentNumMsg == 0) {
            ostringstream sstr;
            sstr << "One list exists but it is empty.";
            log_DBG_m(dbg_DETAIL, sstr.str());
            return false;
        }
        else return true;
    }
    return false;
}

cmn_Path hsm_HL7CacheFile::ReadNextFile ( vector< string > &  a_messageList  ) 

Definition at line 191 of file hsm_HL7CacheFile.cpp.

References cmn_File::CloseF(), CreateFile(), dbg_DETAIL, dbg_NORM, DumpState(), evt_ERROR, fom_READ, ivd_BaseException::GetFriendly(), cmn_File::GetFullPathRef(), GetNextMessage(), cmn_File::GetSize(), HandleCoruptedFile(), ie_FILE_ERROR, ivd_Error, log_DBG_m, log_ERR_m, log_FUNC_m, log_WriteEvent(), m_currentFile, m_currentNumMsg, m_filePath, m_list_x, m_onDiskLists, cmn_File::OpenF(), cmn_File::ReadF(), and size.

                                                                     {
    log_FUNC_m(ReadNextFile);

    log_DBG_m(dbg_DETAIL, DumpState());

    cmn_MutexLock l(m_list_x);
    //opens next file from list

    vector<char> buffer;

    if (m_onDiskLists.size() == 0) {
        ostringstream sstr;
        sstr << "No more lists to process";
        log_DBG_m(dbg_NORM, sstr.str());
        return ("");
    }

    cmn_Path fileProcessed(m_filePath + m_onDiskLists[0].m_fileName);

    if (m_onDiskLists.size() == 1 && m_currentNumMsg == 0) {
        ostringstream sstr;
        sstr << "One file exists but it is empty:" << m_currentFile.GetFullPathRef();
        log_DBG_m(dbg_NORM, sstr.str());
        return ("");
    }

    if (m_onDiskLists.size() == 1) {
        //only current file has entries, create a new file for new messages,
        //while this file is being processed
        CreateFile();
    }

    //process first file from the list
    cmn_File f(m_filePath + m_onDiskLists[0].m_fileName);
    try {
        f.OpenF(fom_READ);

        UInt32_t size(static_cast<UInt32_t>(f.GetSize()));
        buffer.resize(size);

        UInt32_t read = f.ReadF(&buffer[0], size);
        log_DBG_m(dbg_DETAIL, "Reading done:" << read);

        if (read != size) {
            ostringstream sstr;
            sstr << "Could not read file " << m_currentFile.GetFullPathRef();
            throw ivd_Error(ie_FILE_ERROR, sstr.str());
        }

        //creates vector
        UInt32_t position(0);
        while (position < size) {
            string msg = GetNextMessage(buffer, size, position);
            a_messageList.push_back(msg);
        }

        f.CloseF();

        log_DBG_m(dbg_NORM, DumpState());
        return fileProcessed;
    }
    catch(ivd_Error &e) {
        f.CloseF();
        a_messageList.clear();
        string errFile = HandleCoruptedFile(m_filePath + m_onDiskLists[0].m_fileName);
        ostringstream sstr;
        sstr << "Cannot read HL7 queue file. File renamed to: " << errFile;
        log_WriteEvent(evt_ERROR, sstr.str());
        log_ERR_m(e.GetFriendly());
        return ("");
    }
}

Here is the call graph for this function:

void hsm_HL7CacheFile::RemoveFromOnDiskList ( string &  a_fileName  )  [private]

Definition at line 297 of file hsm_HL7CacheFile.cpp.

References m_onDiskLists.

Referenced by DeleteProcessingFile(), and HandleCoruptedFile().

                                                            {

    if (m_onDiskLists.size() > 0) {
        vector<hl7MsgFileName_t>::iterator iter = m_onDiskLists.begin();
        for (; iter != m_onDiskLists.end(); iter++) {
            if (fileName.compare(iter->m_fileName) == 0) {
                m_onDiskLists.erase(iter);
                break;
            }
        }
    }
}

Here is the caller graph for this function:


Member Data Documentation

Definition at line 71 of file hsm_HL7CacheFile.h.

Definition at line 84 of file hsm_HL7CacheFile.h.

Referenced by CreateFile(), hsm_HL7CacheFile(), and ReadNextFile().

Definition at line 82 of file hsm_HL7CacheFile.h.

Referenced by CreateFile(), DeleteProcessingFile(), DumpState(), and hsm_HL7CacheFile().

Definition at line 62 of file hsm_HL7CacheFile.h.

Referenced by AddMessageToCache(), DeleteProcessingFile(), and ReadNextFile().


The documentation for this class was generated from the following files: