Public Member Functions | Private Member Functions | Private Attributes

hsm_HL7MessageProxy Class Reference

HL7 message proxy class provides message sending functionality. More...

#include <hsm_HL7MessageProxy.h>

Inheritance diagram for hsm_HL7MessageProxy:
Inheritance graph
[legend]
Collaboration diagram for hsm_HL7MessageProxy:
Collaboration graph
[legend]

List of all members.

Public Member Functions

 hsm_HL7MessageProxy (string a_partitionName)
 Construct message proxy object.
virtual ~hsm_HL7MessageProxy ()
void Append (hsm_hl7Data_t &a_message, hl7_msgType_e a_msgType)
 Append message to message buffer.
void ReportInvalidHL7Configuration (const string &a_gidCid)
void Stop ()
 Stop thread.

Private Member Functions

virtual void * RunUndetached (void *a_arg)
 Thread main function.
void AddMessageToCache (string &a_message, cmn_Path &a_spDirName)
 Add message to message cache.
void LoadListFromFile ()
void SendMessages ()
string JoinHostPortOperationGidCid (const string &a_hostnamePort, const string &a_operation, const string &a_gidCid)
void SplitHostPortOperationGidCid (const string &a_hostPortOperationGidCid, string &a_hostnamePort, string &a_operation, string &a_gidCid)

Private Attributes

 log_CLASSID_m
UInt32_t m_wakeUpTime
cmn_Mutex m_messageCacheLock_x
 Message cache.
bool m_stopped
 Thread stop variable.
cmn_Condition m_message_c
 Sleeping mechanism.
UInt32_t m_msgCounter
 Counter for appended messages.
cmn_Path m_varPartPath
vector< string > m_notValidHL7Cfg_v
 Vecor of non GID-CID paths for which warning was already written.
cmn_Mutex m_notValidHL7Cfg_x
 Non GID-CID path cache.
map< string, hl7 * > m_hl7Refs_v
map< string, hsm_HL7CacheFile * > m_cacheFiles_v

Detailed Description

HL7 message proxy class provides message sending functionality.

Definition at line 52 of file hsm_HL7MessageProxy.h.


Constructor & Destructor Documentation

hsm_HL7MessageProxy::hsm_HL7MessageProxy ( string  a_partitionName  ) 

Construct message proxy object.

Parameters:
a_sendMessage_p (in) pointer to function for sending messages

Definition at line 57 of file hsm_HL7MessageProxy.cpp.

References cfg_MINUTE, cmn_GetEnvVariable(), cmn_Str2Num(), dbg_DETAIL, dbg_LOW, cmn_Global::dirs, cmn_File::Exists(), g_cmn, cmn_FastDirLst::GetNextName(), hl7MessageInterval_c(), ift_DIR, log_DBG_m, log_FUNC_m, m_cacheFiles_v, m_hl7Refs_v, m_varPartPath, m_wakeUpTime, ivd_Directories::part, and SplitHostPortOperationGidCid().

    : cmn_Thread(),
      m_wakeUpTime(cfg_HL7_MESSAGE_SEND_INTERVAL),
      m_stopped(false),
      m_message_c(&m_messageCacheLock_x),
      m_msgCounter(0) {
    log_FUNC_m(hsm_HL7MessageProxy);

    // HL7 mesasge sending interval
    string envVar = cmn_GetEnvVariable(hl7MessageInterval_c);
    log_DBG_m(dbg_LOW, hl7MessageInterval_c << " = '" << envVar << "'");
    UInt32_t sendInterval(cmn_Str2Num(envVar));
    if (sendInterval != 0) {
        m_wakeUpTime = sendInterval * cfg_MINUTE;
    }
    log_DBG_m(dbg_LOW, "HL7 mesasge sending interval = " << (m_wakeUpTime / cfg_MINUTE) << "min");

    // check all directories on startup
    m_varPartPath = g_cmn.dirs.part + a_partitionName + string("hl7");

    cmn_File listPath(m_varPartPath);
    if (!listPath.Exists()) {
        // none of files exist
        return;
    }

    cmn_FastDirLst dirList(m_varPartPath);
    ivd_GenInode_t gInode;
    ivd_FileType_e fType;
    string foundName = dirList.GetNextName(gInode, fType);

    while (foundName.length() != 0) {
        if (fType == ift_DIR) {
            try {
                string hostnamePort;
                string operation;
                string gidCid;
                SplitHostPortOperationGidCid(foundName, hostnamePort, operation, gidCid);
                log_DBG_m(dbg_DETAIL, "directory found: " << foundName
                                    << " hostnamePort: " << hostnamePort
                                    << " operation: " << operation
                                    << " gidCid: " << gidCid);
                UInt32_t position = hostnamePort.find("+");
                if (position != string::npos) {
                    if (m_cacheFiles_v.count(foundName) == 0) {
                        m_cacheFiles_v[foundName] = new hsm_HL7CacheFile(m_varPartPath + foundName);
                    }

                    if (m_hl7Refs_v.count(hostnamePort) == 0) {
                        string hostname = hostnamePort.substr(0, position);
                        string port = hostnamePort.substr(position+1, hostnamePort.length());
                        m_hl7Refs_v[hostnamePort] = new hl7(hostname, port);
                    }
                }
            }
            catch (...) {
                // continue
            }
        }
        foundName = dirList.GetNextName(gInode, fType);
    }
}

Here is the call graph for this function:

hsm_HL7MessageProxy::~hsm_HL7MessageProxy (  )  [virtual]

Definition at line 122 of file hsm_HL7MessageProxy.cpp.

References dbg_DETAIL, log_DBG_m, log_FUNC_m, and m_cacheFiles_v.

                                          {
    log_FUNC_m(~hsm_HL7MessageProxy);

    map<string, hsm_HL7CacheFile*>::iterator iter = m_cacheFiles_v.begin();
    for (; iter != m_cacheFiles_v.end(); iter = m_cacheFiles_v.begin()) {
        log_DBG_m(dbg_DETAIL, "Removing HL7 cache file for " << iter->first);
        delete iter->second;
        m_cacheFiles_v.erase(iter);
    }
}


Member Function Documentation

void hsm_HL7MessageProxy::AddMessageToCache ( string &  a_message,
cmn_Path a_spDirName 
) [private]

Add message to message cache.

Parameters:
a_message (in) message to be put into teh cache
a_spDirName (in) directory name of Storage Peak in cache where message will be cached

Definition at line 256 of file hsm_HL7MessageProxy.cpp.

References cmn_Path::GetFileName(), log_FUNC_m, and m_cacheFiles_v.

Referenced by Append().

                                                                                    {
    log_FUNC_m(AddMessageToCache);

    string hostPortOperationGidCid = a_spDirName.GetFileName();
    if (m_cacheFiles_v.count(hostPortOperationGidCid) == 0) {
        m_cacheFiles_v[hostPortOperationGidCid] = new hsm_HL7CacheFile(a_spDirName);
    }

    m_cacheFiles_v[hostPortOperationGidCid]->AddMessageToCache(a_message);
}

Here is the call graph for this function:

Here is the caller graph for this function:

void hsm_HL7MessageProxy::Append ( hsm_hl7Data_t a_message,
hl7_msgType_e  a_msgType 
)

Append message to message buffer.

Parameters:
a_message (in) message to be put into buffer

Definition at line 135 of file hsm_HL7MessageProxy.cpp.

References AddMessageToCache(), cmn_Condition::Broadcast(), dbg_DETAIL, ivd_JobTypeToText(), JoinHostPortOperationGidCid(), jt_MIGRATION, jt_RECALL, jt_UNKNOWN, log_DBG_m, log_FUNC_m, hsm_hl7Data_t::m_basePath, hsm_hl7Data_t::m_cid, hsm_hl7Data_t::m_fileName, hsm_hl7Data_t::m_gid, m_hl7Refs_v, hsm_hl7Data_t::m_hostname, m_message_c, m_messageCacheLock_x, m_msgCounter, hsm_hl7Data_t::m_port, m_varPartPath, mt_Migrate, mt_Recall, and path.

Referenced by hsm_FileHeader::QueueHL7Message().

                                                                                  {
    log_FUNC_m(Append);

    cmn_MutexLock l(m_messageCacheLock_x);

    log_DBG_m(dbg_DETAIL, "Adding message:"
        "* gid: " << a_message.m_gid << endl <<
        "* cid: " << a_message.m_cid << endl <<
        "* path: " << a_message.m_basePath << endl <<
        "* fileName: " << a_message.m_fileName << endl <<
        "* host: " << a_message.m_hostname << endl <<
        "* port: " << a_message.m_port);

    string hostPort = a_message.m_hostname + "+" + a_message.m_port;

    if (m_hl7Refs_v.count(hostPort) == 0) {
        m_hl7Refs_v[hostPort] = new hl7(a_message.m_hostname, a_message.m_port);
    }

    string hl7Message = m_hl7Refs_v[hostPort]->CreateHL7Message(a_message.m_gid,
                                                                a_message.m_cid,
                                                                a_message.m_basePath,
                                                                a_message.m_fileName,
                                                                a_msgType);
    string operation;
    if (a_msgType == mt_Migrate) {
        operation = ivd_JobTypeToText(jt_MIGRATION);
    }
    else if (a_msgType == mt_Recall) {
        operation = ivd_JobTypeToText(jt_RECALL);
    }
    else {
        operation = ivd_JobTypeToText(jt_UNKNOWN);
    }

    string hostPortOperationGidCid(
            JoinHostPortOperationGidCid(hostPort,
                                        operation,
                                        (a_message.m_gid + "-" + a_message.m_cid)));
    cmn_Path path = m_varPartPath + hostPortOperationGidCid;
    AddMessageToCache(hl7Message, path);

    m_msgCounter++;

    if (m_msgCounter >= 1000) {
        m_message_c.Broadcast();
        m_msgCounter = 0;
    }
}

Here is the call graph for this function:

Here is the caller graph for this function:

string hsm_HL7MessageProxy::JoinHostPortOperationGidCid ( const string &  a_hostnamePort,
const string &  a_operation,
const string &  a_gidCid 
) [private]

Definition at line 331 of file hsm_HL7MessageProxy.cpp.

References c_hostPortOperationGidCidDelimiter().

Referenced by Append().

                                                                                {
    return a_hostnamePort
        + c_hostPortOperationGidCidDelimiter
        + a_operation
        + c_hostPortOperationGidCidDelimiter
        + a_gidCid;
}

Here is the call graph for this function:

Here is the caller graph for this function:

void hsm_HL7MessageProxy::LoadListFromFile (  )  [private]
void hsm_HL7MessageProxy::ReportInvalidHL7Configuration ( const string &  a_gidCid  ) 

Definition at line 187 of file hsm_HL7MessageProxy.cpp.

References dbg_DETAIL, evt_WARNING, log_DBG_m, log_FUNC_m, log_WriteEvent(), m_notValidHL7Cfg_v, and m_notValidHL7Cfg_x.

Referenced by hsm_FileHeader::QueueHL7Message().

                                                                              {
    log_FUNC_m(ReportInvalidHL7Configuration);

    cmn_MutexLock l(m_notValidHL7Cfg_x);

    vector<string>::iterator iter = m_notValidHL7Cfg_v.begin();
    bool found(false);
    for (; iter != m_notValidHL7Cfg_v.end(); iter ++) {
        if (a_gidCid.compare(*iter) == 0) {
            found = true;
            break;
        }
    }

    if (!found) {
        m_notValidHL7Cfg_v.push_back(a_gidCid);
        ostringstream msg;
        msg << "HL7 info not set for directory " << a_gidCid << ". HL7 messages for files in this directory will not be sent.";
        log_WriteEvent(evt_WARNING, msg.str());
    }
    else {
        log_DBG_m(dbg_DETAIL, "Warning already reported for directory: " << a_gidCid);
    }
}

Here is the call graph for this function:

Here is the caller graph for this function:

void * hsm_HL7MessageProxy::RunUndetached ( void *  a_arg  )  [private, virtual]

Thread main function.

Reimplemented from cmn_Thread.

Definition at line 223 of file hsm_HL7MessageProxy.cpp.

References dbg_NORM, evt_ERROR, ivd_BaseException::GetFriendly(), cmn_Thread::GetTime(), log_DBG_m, log_ERR_m, log_FUNC_m, log_WriteEvent(), m_message_c, m_messageCacheLock_x, m_stopped, m_wakeUpTime, SendMessages(), and cmn_Condition::TimedWait().

                                                    {
    log_FUNC_m(RunUndetached);
    try {
        while (true) {
            cmn_MutexLock l(m_messageCacheLock_x);

            SendMessages();

            log_DBG_m(dbg_NORM, "Waiting to send the messages ...");
            m_message_c.TimedWait(cmn_Thread::GetTime(m_wakeUpTime));

            if (m_stopped) {
                break;
            }
        }
    }
    catch (ivd_Exception& e) {
        log_WriteEvent(evt_ERROR, "HL7 message proxy terminated.");
        log_ERR_m(e.GetFriendly());
    }
    catch (std::exception &se) {
        log_WriteEvent(evt_ERROR, "HL7 message proxy terminated.");
        log_ERR_m(se.what());
    }
    catch (...) {
        log_WriteEvent(evt_ERROR, "HL7 message proxy terminated.");
        log_ERR_m("Unknown error.");
    }
    return NULL;
}

Here is the call graph for this function:

void hsm_HL7MessageProxy::SendMessages (  )  [private]

Definition at line 269 of file hsm_HL7MessageProxy.cpp.

References dbg_DETAIL, dbg_GetLevel(), dbg_NORM, g_fsLog, cmn_Path::GetFileName(), log_DBG_m, log_FUNC_m, log_WriteEvent(), m_cacheFiles_v, m_hl7Refs_v, path, SplitHostPortOperationGidCid(), and log_ivdfs::WriteHL7MsgNotification().

Referenced by RunUndetached().

                                       {
    log_FUNC_m(SendMessages);

    map<string, hsm_HL7CacheFile*>::iterator iter = m_cacheFiles_v.begin();
    for (; iter != m_cacheFiles_v.end(); iter++) {
        while (iter->second->IsAnyFileToProcess()) {
            vector<string> messageList_v;
            cmn_Path processingFile = iter->second->ReadNextFile(messageList_v);
            log_DBG_m(dbg_DETAIL, "Processing file: " << processingFile << " found messages: " << messageList_v.size());

            if (processingFile.compare("") != 0) {
                if (messageList_v.size() != 0) {
                    cmn_Path path = iter->first;
                    string hostPortOperationGidCid = path.GetFileName();
                    string hostnamePort;
                    string operation;
                    string gidCid;
                    SplitHostPortOperationGidCid(hostPortOperationGidCid, hostnamePort, operation, gidCid);
                    log_DBG_m(dbg_DETAIL, "hostPortOperationGidCid: " << hostPortOperationGidCid
                                        << " hostnamePort: " << hostnamePort
                                        << " operation: " << operation
                                        << " gidCid: " << gidCid);
                    bool sent = m_hl7Refs_v[hostnamePort]->SendBlock(messageList_v);
                    // Replace "+" with ":" in hostnameport string
                    UInt32_t plusPos = hostnamePort.find("+");
                    if (plusPos != string::npos) {
                        hostnamePort.replace(plusPos, 1, ":");
                    }
                    if (sent) {
                        log_DBG_m(dbg_DETAIL, "Messages sent sucessfully");
                        iter->second->DeleteProcessingFile(processingFile);
                        ostringstream msg;
                        msg << "HL7 message sent (#files: " << messageList_v.size()
                            << ", operation: " << operation
                            << ", server: " << hostnamePort << ")";
                        log_WriteEvent(msg.str(), "", 0, gidCid);

                        // log HL7 message sent if debugging is enabled
                        if (dbg_GetLevel() != dbg_NONE) {
                            vector<string>::iterator iter = messageList_v.begin();
                            for ( ; iter != messageList_v.end(); iter++) {
                                g_fsLog.WriteHL7MsgNotification(*iter);
                            }
                        }
                    }
                    else {
                        ostringstream msg;
                        msg << "HL7 message sending failed (#files: " << messageList_v.size() << ", server: " << hostnamePort << ")";
                        log_WriteEvent(msg.str(), "", 0, gidCid);
                        break;
                    }
                }
                else {
                    log_DBG_m(dbg_NORM, "HL7 queue file " << processingFile << " is empty.");
                }
            }
        }
    }
}

Here is the call graph for this function:

Here is the caller graph for this function:

void hsm_HL7MessageProxy::SplitHostPortOperationGidCid ( const string &  a_hostPortOperationGidCid,
string &  a_hostnamePort,
string &  a_operation,
string &  a_gidCid 
) [private]

Definition at line 343 of file hsm_HL7MessageProxy.cpp.

References c_hostPortOperationGidCidDelimiter(), ie_INVALID_ARG, and ivd_Error.

Referenced by hsm_HL7MessageProxy(), and SendMessages().

                                                                          {
    size_t firstDelimiter;
    firstDelimiter = a_hostPortOperationGidCid.find(c_hostPortOperationGidCidDelimiter);

    size_t secondDelimiter;
    secondDelimiter = a_hostPortOperationGidCid.find(c_hostPortOperationGidCidDelimiter, (firstDelimiter + 1));

    if ((firstDelimiter == string::npos) || (secondDelimiter == string::npos)) {
        ostringstream msg;
        msg << "Not enough delimiters (" << c_hostPortOperationGidCidDelimiter
            << ") in HostPortOperationGidCid string (" << a_hostPortOperationGidCid << ").";
        throw ivd_Error(ie_INVALID_ARG, msg.str());
    }

    a_hostnamePort = a_hostPortOperationGidCid.substr(0, firstDelimiter);
    a_operation = a_hostPortOperationGidCid.substr((firstDelimiter + 1), (secondDelimiter - firstDelimiter - 1));
    a_gidCid = a_hostPortOperationGidCid.substr(secondDelimiter + 1);
}

Here is the call graph for this function:

Here is the caller graph for this function:

void hsm_HL7MessageProxy::Stop (  ) 

Stop thread.

Definition at line 214 of file hsm_HL7MessageProxy.cpp.

References cmn_Condition::Broadcast(), log_FUNC_m, m_message_c, m_messageCacheLock_x, and m_stopped.

Referenced by ClientConf_t::~ClientConf_t().

Here is the call graph for this function:

Here is the caller graph for this function:


Member Data Documentation

Reimplemented from cmn_Thread.

Definition at line 73 of file hsm_HL7MessageProxy.h.

map<string, hl7*> hsm_HL7MessageProxy::m_hl7Refs_v [private]

Definition at line 119 of file hsm_HL7MessageProxy.h.

Referenced by Append(), hsm_HL7MessageProxy(), and SendMessages().

Sleeping mechanism.

Definition at line 86 of file hsm_HL7MessageProxy.h.

Referenced by Append(), RunUndetached(), and Stop().

Message cache.

Definition at line 80 of file hsm_HL7MessageProxy.h.

Referenced by Append(), RunUndetached(), and Stop().

Counter for appended messages.

Definition at line 89 of file hsm_HL7MessageProxy.h.

Referenced by Append().

Vecor of non GID-CID paths for which warning was already written.

Definition at line 94 of file hsm_HL7MessageProxy.h.

Referenced by ReportInvalidHL7Configuration().

Non GID-CID path cache.

Definition at line 97 of file hsm_HL7MessageProxy.h.

Referenced by ReportInvalidHL7Configuration().

Thread stop variable.

Definition at line 83 of file hsm_HL7MessageProxy.h.

Referenced by RunUndetached(), and Stop().

Definition at line 91 of file hsm_HL7MessageProxy.h.

Referenced by Append(), and hsm_HL7MessageProxy().

Definition at line 77 of file hsm_HL7MessageProxy.h.

Referenced by hsm_HL7MessageProxy(), and RunUndetached().


The documentation for this class was generated from the following files: