Public Member Functions | Public Attributes | Private Member Functions | Private Attributes | Friends

rm_Queue Class Reference
[Resource Manager]

#include <rm.h>

Collaboration diagram for rm_Queue:
Collaboration graph
[legend]

List of all members.

Public Member Functions

 rm_Queue ()
 ~rm_Queue ()
void Insert (rm_JobParams &a_iJobParams)
void Remove (UInt64_t a_jobID)
rm_JobParams Find (UInt64_t a_jobID)
vector< rm_JobParamsGetAllJobs ()
vector< rm_JobParamsGetJob (UInt64_t a_jobId)
void Init (i_ResourceManager_i *a_iRM, Int32_t a_rATreshold, Int32_t a_timeStep, Int32_t a_phaseFactor)
Int32_t CalcPriority (rm_JobParams &a_iJobParams)
void Activate ()
void IncReleasedCount ()
UInt32_t GetReleasedCount ()
void ModifyPriority (UInt64_t a_jobID, Int32_t a_modifier)
void SetPhase (UInt64_t a_jobID, UInt32_t a_phase)

Public Attributes

rm_SysState m_SysState
time_t m_lastReported
 log_CLASSID_m

Private Member Functions

void AllocateRec (string a_partName, rm_ResourceTable_t &a_allocTable)
void AllocateMig (bool a_online, string a_partName, rm_ResourceTable_t &a_allocTable)
void AllocateAdmin (rm_ResourceTable_t &a_allocTable)
void AllocateRecovery (string a_partName, rm_ResourceTable_t &a_allocTable)
void AllocateMaint (string a_partName, rm_ResourceTable_t &a_allocTable)
void AllocateReorg (string a_partName, rm_ResourceTable_t &a_allocTable)
void Print ()
void Recalc ()
void Process ()
void Allocate (bool a_online)

Private Attributes

rm_AllocQueue_tm_allocQueue_p
Int32_t m_resourceAllocationTreshold
Int32_t m_timeStep
Int32_t m_phaseFactor
i_ResourceManager_im_iRM
bool m_iRMvalid
rm_QueueExecutorm_queueExec
bool m_disable
bool m_queueActive
cmn_Mutex m_queue_x
cmn_Semaphorem_activateQueue_s
cmn_Mutex m_resCount_x
UInt32_t m_releasedResCount

Friends

class rm_QueueExecutor

Detailed Description

Definition at line 339 of file rm.h.


Constructor & Destructor Documentation

rm_Queue::rm_Queue (  ) 

Definition at line 47 of file rm_queue.cpp.

References log_FUNC_m, m_activateQueue_s, m_allocQueue_p, m_lastReported, m_queueExec, and rm_QueueExecutor.

                  :
    m_iRM(NULL),
    m_iRMvalid(false),
    m_disable(false),
    m_queueActive(false),
    m_releasedResCount(0)

{
    log_FUNC_m(rm_Queue);
    m_allocQueue_p = new rm_AllocQueue_t();
    m_queueExec = new rm_QueueExecutor(*this);

    m_activateQueue_s = new cmn_Semaphore(0);

    time_t nowTime;
    time(&nowTime);
    m_lastReported = nowTime;
}

rm_Queue::~rm_Queue (  ) 

Definition at line 67 of file rm_queue.cpp.

References Activate(), dbg_DETAIL, ivd_Sleep, log_DBG_m, log_FUNC_m, m_activateQueue_s, m_allocQueue_p, m_disable, m_queue_x, and m_queueActive.

                    {
    log_FUNC_m(~rm_Queue);
    {
        cmn_MutexLock l(m_queue_x);
        log_DBG_m(dbg_DETAIL,"Queue locked");
        m_disable = true; //disable queue thread
        Activate();
    }
    log_DBG_m(dbg_DETAIL,"Queue Unlocked");

    while (m_queueActive){ //wait for thread to finish
        log_DBG_m(dbg_DETAIL,"waiting for queue thread to end");
        ivd_Sleep(1);
    }
    delete m_allocQueue_p;
    delete m_activateQueue_s;
}

Here is the call graph for this function:


Member Function Documentation

void rm_Queue::Activate (  ) 
void rm_Queue::Allocate ( bool  a_online  )  [private]

Definition at line 277 of file rm_queue.cpp.

References AllocateAdmin(), AllocateMaint(), AllocateMig(), AllocateRec(), AllocateRecovery(), AllocateReorg(), i_JobParams::bufId, i_JobParams::bufType, cfg_HOUR, dbg_DETAIL, dbg_LOW, dbg_NORM, rm_SysState::GetPartStatus(), GetReleasedCount(), ie_NOJOBTYPE, ipc_EXEC_m, ivd_Error, ivd_JobTypeToText(), i_JobParams::jobID, i_JobParams::jobType, jt_ADMIN, jt_BACKUP, jt_MAINT, jt_MIGRATION, jt_RECALL, jt_RECOVERY, jt_REORG, log_DBG_m, log_ERR_m, log_FUNC_m, log_WriteEvent(), ipc_Log::LogResources(), m_allocQueue_p, m_iRM, m_lastReported, m_resourceAllocationTreshold, m_SysState, i_JobParams::partName, i_JobParams::phase, i_ResourceManager_i::ReleaseResource(), ipc_Log::ResourceBusy(), and cmn_Time::Time2YMDhms().

Referenced by Process().

                                     {
    log_FUNC_m(Allocate);
    rm_AllocQueue_it iter;
    iter = m_allocQueue_p->begin();

    Int32_t tresHold = (iter->first * m_resourceAllocationTreshold) / 100;
    log_DBG_m(dbg_DETAIL, "Priority of First:   " << iter->first << endl <<
                          "Treshold at:         " << tresHold << endl <<
                          "Processing Queue:    ");

    while (iter != m_allocQueue_p->end()) {

        log_DBG_m(dbg_DETAIL,"Processing Job: "<< iter->second.jobID <<
                             " with priority " << iter->first);

        time_t nowTime;
        time(&nowTime);
        if ((nowTime - m_lastReported > cfg_HOUR) &&
            (nowTime > iter->second.startTime + (cfg_HOUR * 6) )) { //is job waiting more than 6h

            ostringstream errStr;
            errStr << "Job:" << iter->second.jobID << " waiting for resources longer than 6h";
            log_ERR_m(errStr.str());
            log_WriteEvent(errStr.str());
            m_lastReported = nowTime;
        }
        rm_ResourceTable_t& resources = iter->second.m_resources;
        if ( iter->first < tresHold ) {
            if (!iter->second.m_jobInformed) {
                resources[0].resourceBusyStatus = i_PRIORITY_BELOW_THRESHOLD;
                log_DBG_m(dbg_NORM, "Inform job:" << 
                                    resources[0].resNum << " status:" << 
                                    ipc_Log::ResourceBusy(resources[0].resourceBusyStatus));
                try {
                    iter->second.iJob->SetResourceBusyStatus(
                                                resources[0].resNum,
                                                resources[0].resourceBusyStatus);
                    iter->second.m_jobInformed = true;
                } catch (...){
                    //ignore, maybe job finished
                }                
            }
            iter++;
            continue;
        };

        if ((iter->second.jobType != jt_ADMIN) &&
            (iter->second.jobType != jt_BACKUP)){
            bool partFound(false);
            try {
                rm_PartitionStatus ps = m_SysState.GetPartStatus(iter->second.partName);
                partFound = true;
            } catch (ivd_Error &e){
                log_DBG_m(dbg_NORM, e);
                partFound = false;
            }
            if(!partFound){
                log_DBG_m(dbg_LOW,"Will delete Resource Request. Partition Unregisted meanwhile");

                m_allocQueue_p->erase(iter);
                iter = m_allocQueue_p->begin();
                break;
            }
        }

        if (GetReleasedCount() > 0) {
            if (iter != m_allocQueue_p->begin()) {
                log_DBG_m(dbg_NORM, "Resource released. Start from beginning.");
                iter = m_allocQueue_p->begin();
                continue;
            }
        }

        // request resources from rmdb
        switch (iter->second.jobType) {
            case (jt_RECALL):  AllocateRec(iter->second.partName,
                                            iter->second.m_resources);
                                break;

            case (jt_MIGRATION):
                                AllocateMig(a_online,
                                            iter->second.partName,
                                            iter->second.m_resources);
                                break;


            case (jt_RECOVERY):AllocateRecovery(iter->second.partName,
                                                 iter->second.m_resources);
                                break;
            case (jt_ADMIN):
            case (jt_BACKUP):  AllocateAdmin(  iter->second.m_resources);
                                break;

            case (jt_REORG):   AllocateReorg(  iter->second.partName,
                                                iter->second.m_resources);
                                break;

            case (jt_MAINT):   AllocateMaint(  iter->second.partName,
                                                iter->second.m_resources);
                                break;

            default:            throw ivd_Error(ie_NOJOBTYPE, "Unknown Job Type", true);

        };
        log_DBG_m(dbg_DETAIL, "Processing Job w/ID:" << iter->second.jobID);
        //Assign resource to Job

        log_DBG_m(dbg_NORM,"[JobType, Partition, StartTime, Phase]");

        if (resources[0].resAllocated) { //resources were allocated
            log_DBG_m(dbg_LOW,"Resources allocated");
            i_ResourceList_t iResourceList;

            try {
                ipc_EXEC_m (
                    iResourceList.length(resources.size());

                    for (UInt32_t j = 0; j < resources.size(); j++){
                        iResourceList[j] = resources[j].Convert2Corba();
                        iResourceList[j].fileId = resources[0].fileId;
                    };

                    cmn_Time time;
                    time = iter->second.startTime;

                    log_DBG_m(dbg_NORM,
                        "AssignResources invoked for Job:" << iter->second.jobID <<  endl << "[" <<
                        ivd_JobTypeToText(iter->second.jobType) << ", " <<
                        iter->second.partName << ", " <<
                        time.Time2YMDhms() << ", " <<
                        iter->second.phase << "]" << endl <<
                        ipc_Log::LogResources(iResourceList));

                    iter->second.iJob->AssignResources(iResourceList);
                );
            } catch (ivd_Error& e) {
                log_ERR_m("Caught exception on AssignResources: "<< e << endl <<
                          "will release resources and remove Job from list");
                i_JobParams jobParams;
                jobParams.jobID     = iter->second.jobID;
                jobParams.jobType   = iter->second.jobType;
                jobParams.partName  = iter->second.partName.c_str();
                jobParams.bufType   = iter->second.bufType;
                jobParams.bufId     = iter->second.bufId;
                jobParams.phase     = iter->second.phase;

                ipc_EXEC_m(
                    m_iRM->ReleaseResource(jobParams, iResourceList);
                );

            }
            m_allocQueue_p->erase(iter);
            log_DBG_m(dbg_DETAIL, "deleting allocation request.");
            iter = m_allocQueue_p->begin();
            log_DBG_m(dbg_DETAIL, "job erased from queue");
        } else {
            if (!iter->second.m_jobInformed) {
                log_DBG_m(dbg_NORM, "Inform job why it is waiting:" << 
                                    resources[0].resNum << " status:" << 
                                    ipc_Log::ResourceBusy(resources[0].resourceBusyStatus));
                try {    
                    iter->second.iJob->SetResourceBusyStatus(
                                                resources[0].resNum,
                                                resources[0].resourceBusyStatus);
                    iter->second.m_jobInformed = true;
                } catch (...){
                    //ignore, maybe job finished
                }                
            }
            iter++;
            log_DBG_m(dbg_DETAIL, "iter++");
        }
    }
}

Here is the call graph for this function:

Here is the caller graph for this function:

void rm_Queue::AllocateAdmin ( rm_ResourceTable_t a_allocTable  )  [private]

Definition at line 664 of file rm_queue.cpp.

References dbg_LOW, rm_DBOperation::Execute(), ie_INVALID_TBL_SIZE, jt_ADMIN, log_DBG_m, log_ERR_m, log_FUNC_m, i_ResourceManager_i::m_DBThread, m_iRM, m_SysState, rm_SysState::Release(), and rm_SysState::Reserve().

Referenced by Allocate().

                                                             {
    log_FUNC_m(AllocateAdmin);

    if (a_allocTable.size() != 1) {
        throw ivd_InternalError(ie_INVALID_TBL_SIZE, "Size should be exactly 1 (one) for Admin Jobs");
    }

    if (!m_SysState.Reserve(jt_ADMIN, 1 )){
            a_allocTable[0].resourceBusyStatus = i_MAX_NUM_DRIVES;
            return;
        }

    try {
        if (a_allocTable[0].mediumKey > 0){
            dbo_AllocateAdmin aa(a_allocTable, *(m_iRM->m_DBThread));
            aa.Execute();
        } else {
            bool a(false);
            dbo_AllocateMig am(a_allocTable, a, *(m_iRM->m_DBThread));
            am.Execute();            
        }
    } catch (ivd_Exception& e ) {
        log_DBG_m(dbg_LOW, "Caught exception in rm_Queue: " << e);
        log_ERR_m("Caught exception in rm_Queue: " << e);
    }

    if ( !a_allocTable[0].resAllocated ){
        m_SysState.Release(jt_ADMIN, a_allocTable.size() );
    }
}

Here is the call graph for this function:

Here is the caller graph for this function:

void rm_Queue::AllocateMaint ( string  a_partName,
rm_ResourceTable_t a_allocTable 
) [private]

Definition at line 588 of file rm_queue.cpp.

References dbg_LOW, rm_DBOperation::Execute(), ie_DATA_CORRUPTION, ivd_Error, jt_MAINT, log_DBG_m, log_ERR_m, log_FUNC_m, i_ResourceManager_i::m_DBThread, m_iRM, m_SysState, rm_SysState::Release(), and rm_SysState::Reserve().

Referenced by Allocate().

                                                                                {
    log_FUNC_m(AllocateMaint);

    try {
        
        if ( a_allocTable.size() == 0 )
            throw ivd_Error(ie_DATA_CORRUPTION, "Allocation Table size should be > 0 ");
        
        if (!m_SysState.Reserve(a_partName, jt_MAINT, 1)){
            a_allocTable[0].resourceBusyStatus = i_MAX_NUM_DRIVES;
            return;
        }

        // Main job may have N resources. Recall is detected by medium key.
        if (a_allocTable[0].mediumKey > 0){
            dbo_AllocateRec ar(a_allocTable[0], *(m_iRM->m_DBThread));
            ar.Execute();
        } else {
            bool a(true);
            dbo_AllocateMig am(a_allocTable, a, *(m_iRM->m_DBThread));
            am.Execute();
            if (!a_allocTable[0].resAllocated){
                a = false; //now try offline too
                dbo_AllocateMig am1(a_allocTable, a, *(m_iRM->m_DBThread));
                am1.Execute();
            }
        }

        if ( !a_allocTable[0].resAllocated ){
            m_SysState.Release(a_partName, jt_MAINT, a_allocTable.size() );
        }
    } catch (ivd_Exception& e ) {
        log_DBG_m(dbg_LOW, "Caught exception in rm_Queue: " << e);
        log_ERR_m(e);
    }
}

Here is the call graph for this function:

Here is the caller graph for this function:

void rm_Queue::AllocateMig ( bool  a_online,
string  a_partName,
rm_ResourceTable_t a_allocTable 
) [private]

Definition at line 484 of file rm_queue.cpp.

References dbg_LOW, rm_DBOperation::Execute(), jt_MIGRATION, log_DBG_m, log_ERR_m, log_FUNC_m, i_ResourceManager_i::m_DBThread, m_iRM, m_SysState, rm_SysState::Release(), and rm_SysState::Reserve().

Referenced by Allocate().

                                                             {
    log_FUNC_m(AllocateMig);

    if (!m_SysState.Reserve(a_partName, jt_MIGRATION, a_allocTable.size())) {
        a_allocTable[0].resourceBusyStatus = i_MAX_NUM_DRIVES;
        return;
    }


    // check usage of drives for specific partition (m_partitions[i].maxDrives > m_partitions[i].drivesInUse) &&
    //                                              (m_partitions[i].maxDrivesMig > m_partitions[i].drivesInUseMig)

    // select all drives which are free, usable ...         -
    // for each pool select records from RMDB which fulfill: |
    //                  medium belongs to Pool               |- this is done by Interbase (SQL query)
    //                  medium is free, empty, usable ...    |
    //                                                      -

    try {
        dbo_AllocateMig am(a_allocTable, a_online, *(m_iRM->m_DBThread));
        am.Execute();
    } catch (ivd_Exception& e ) {
        log_DBG_m(dbg_LOW, "Caught exception in rm_Queue: " << e);
        log_ERR_m("Caught exception in rm_Queue: " << e);
    }

    if ( !a_allocTable[0].resAllocated ){
        m_SysState.Release(a_partName, jt_MIGRATION, a_allocTable.size() );
    }

}

Here is the call graph for this function:

Here is the caller graph for this function:

void rm_Queue::AllocateRec ( string  a_partName,
rm_ResourceTable_t a_allocTable 
) [private]

Definition at line 520 of file rm_queue.cpp.

References dbg_LOW, rm_DBOperation::Execute(), ie_DATA_CORRUPTION, ivd_Error, jt_RECALL, log_DBG_m, log_ERR_m, log_FUNC_m, ipc_Log::LogResources(), i_ResourceManager_i::m_DBThread, m_iRM, m_SysState, mf_DISK, i_ResourceManager_i::ReleaseRecallResources(), and rm_SysState::Reserve().

Referenced by Allocate(), and AllocateRecovery().

                                                             {
    log_FUNC_m(AllocateRec);

    if ( a_allocTable.size() == 0 ) {
        throw ivd_Error(ie_DATA_CORRUPTION, "Allocation Table size should be 1 ");
    }

    try {
        dbo_AllocateRec ar(a_allocTable[0], *(m_iRM->m_DBThread));
        ar.Execute();
        if ( !a_allocTable[0].resAllocated ) {
            log_DBG_m(dbg_LOW, "Recall resources NOT allocated");
            return;
        }
        else {
            if (!m_SysState.Reserve(a_partName, jt_RECALL, 1)){
                //release allocated Resources only if !mt__DISK
                if (a_allocTable[0].mediumFamily != mf_DISK){
                    i_Resource_t res;
                    res = a_allocTable[0].Convert2Corba();
                    m_iRM->ReleaseRecallResources(res);
                }
                a_allocTable[0].resAllocated = false;
                a_allocTable[0].load = false;
                a_allocTable[0].loadSlotAddr.clear();
                a_allocTable[0].unloadSlotAddr.clear();
                a_allocTable[0].unloadBarcode.clear();
                a_allocTable[0].resourceBusyStatus = i_MAX_NUM_DRIVES;
                log_DBG_m(dbg_LOW, "Reset Recall resources Bug 8395:" << endl <<
                    ipc_Log::LogResources(a_allocTable[0].Convert2Corba());
                );
                return;
            }
        }
    } catch (ivd_Exception& e ) {
        log_DBG_m(dbg_LOW, "Caught exception in rm_Queue: " << e);
        log_ERR_m("Caught exception in rm_Queue: " << e);
    }
}

Here is the call graph for this function:

Here is the caller graph for this function:

void rm_Queue::AllocateRecovery ( string  a_partName,
rm_ResourceTable_t a_allocTable 
) [private]

Definition at line 563 of file rm_queue.cpp.

References AllocateRec(), rm_DBOperation::Execute(), ie_DATA_CORRUPTION, ivd_Error, jt_RECOVERY, log_ERR_m, log_FUNC_m, i_ResourceManager_i::m_DBThread, m_iRM, m_SysState, rm_SysState::Release(), and rm_SysState::Reserve().

Referenced by Allocate().

                                                                                   {
    log_FUNC_m(AllocateRec);

    // check if partition is registered (member of m_partitions)
    try {
        
        if ( a_allocTable.size() == 0 )
            throw ivd_Error(ie_DATA_CORRUPTION, "Allocation Table size should be > 0 ");

        if(!m_SysState.Reserve(a_partName, jt_RECOVERY, 1)) {
            a_allocTable[0].resourceBusyStatus = i_MAX_NUM_DRIVES;
            return;
        }

        dbo_AllocateRec ar(a_allocTable[0], *(m_iRM->m_DBThread));
        ar.Execute();
        if ( !a_allocTable[0].resAllocated ){
            m_SysState.Release(a_partName, jt_RECOVERY, a_allocTable.size() );
        }

    } catch (ivd_Error& e){
        log_ERR_m(e);
    }
}

Here is the call graph for this function:

Here is the caller graph for this function:

void rm_Queue::AllocateReorg ( string  a_partName,
rm_ResourceTable_t a_allocTable 
) [private]

Definition at line 625 of file rm_queue.cpp.

References dbg_LOW, rm_DBOperation::Execute(), ie_DATA_CORRUPTION, ivd_Error, jt_MAINT, log_DBG_m, log_ERR_m, log_FUNC_m, i_ResourceManager_i::m_DBThread, m_iRM, m_SysState, rm_SysState::Release(), and rm_SysState::Reserve().

Referenced by Allocate().

                                                                               {
    log_FUNC_m(AllocateReorg);

    try {
        if ( a_allocTable.size() == 0 )
            throw ivd_Error(ie_DATA_CORRUPTION, "Allocation Table size should be > 0 ");
            
        if (!m_SysState.Reserve(a_partName, jt_MAINT, 1)) {
            a_allocTable[0].resourceBusyStatus = i_MAX_NUM_DRIVES;
            return;
        }

        if (a_allocTable[0].resNum == 0){
            dbo_AllocateRec ar(a_allocTable[0], *(m_iRM->m_DBThread));
            ar.Execute();
        } else if (a_allocTable[0].resNum == 1) {
            bool a(true); //try online first
            dbo_AllocateMig am(a_allocTable, a, *(m_iRM->m_DBThread));
            am.Execute();
            if (!a_allocTable[0].resAllocated){
                a = false; //now try offline too
                dbo_AllocateMig am1(a_allocTable, a, *(m_iRM->m_DBThread));
                am1.Execute();
            }
        } else {
            throw ivd_Error(ie_DATA_CORRUPTION, "Reorg job can have only 2 resources");
        }

        if ( !a_allocTable[0].resAllocated ){
            m_SysState.Release(a_partName, jt_MAINT, a_allocTable.size() );
        }

    } catch (ivd_Exception& e ) {
        log_DBG_m(dbg_LOW, "Caught exception in rm_Queue: " << e);
        log_ERR_m(e);
    }
}

Here is the call graph for this function:

Here is the caller graph for this function:

Int32_t rm_Queue::CalcPriority ( rm_JobParams a_iJobParams  ) 

Definition at line 104 of file rm_queue.cpp.

References dbg_DETAIL, dbg_LOW, rm_SysState::GetAdminPriority(), rm_SysState::GetBackupPriority(), rm_SysState::GetMaintPriority(), rm_SysState::GetMigrationPriority(), rm_SysState::GetPartitionPriority(), rm_SysState::GetRecallPriority(), rm_SysState::GetRecoveryPriority(), cmn_Time::GetTime_t(), ie_NOJOBTYPE, rm_JobParams::jobID, rm_JobParams::jobPriority, rm_JobParams::jobType, jt_ADMIN, jt_BACKUP, jt_MAINT, jt_MIGRATION, jt_RECALL, jt_RECOVERY, jt_REORG, log_DBG_m, log_FUNC_m, m_phaseFactor, m_SysState, m_timeStep, rm_JobParams::partName, rm_JobParams::phase, rm_JobParams::priorityModifier, and rm_JobParams::startTime.

Referenced by Insert(), and Recalc().

                                                        {
    log_FUNC_m(CalcPriority);

    cmn_Time nowTime;
    Int64_t jobTypePriority(0);
    Int64_t jobPartPriority(1);
    Int64_t phase(a_iJobParams.phase);
    Int64_t phaseFactor(m_phaseFactor);
    Int64_t startTime(a_iJobParams.startTime);
    Int64_t timeStep(m_timeStep);
    Int64_t modifier(a_iJobParams.priorityModifier);
    Int64_t divisor(1);

    Int64_t jobPrio(0);

    if (a_iJobParams.jobType == jt_ADMIN || a_iJobParams.jobType == jt_BACKUP) {
        divisor = 10000;
        jobPartPriority = 1;
    }
    else {
        divisor = 100;
        jobPartPriority = m_SysState.GetPartitionPriority(a_iJobParams.partName);
    }

    switch (a_iJobParams.jobType){
        case (jt_ADMIN):
            jobTypePriority = m_SysState.GetAdminPriority();
            break;
        case (jt_BACKUP):
            jobTypePriority = m_SysState.GetBackupPriority();
            break;
        case (jt_RECALL):
            jobTypePriority = m_SysState.GetRecallPriority(a_iJobParams.partName);
            break;
        case (jt_MIGRATION):
            jobTypePriority = m_SysState.GetMigrationPriority(a_iJobParams.partName);
            break;
        case (jt_RECOVERY):
            jobTypePriority = m_SysState.GetRecoveryPriority(a_iJobParams.partName);
            break;
        case (jt_MAINT):
        case (jt_REORG):
            jobTypePriority = m_SysState.GetMaintPriority(a_iJobParams.partName);
            break;
        default:
            throw ivd_InternalError(ie_NOJOBTYPE, "calculating priority of job in queue w/o job type");
    };

    Int64_t timeDiff(nowTime.GetTime_t() - startTime);
    jobPrio = jobTypePriority * jobPartPriority +
            phase * phaseFactor +
            (timeDiff * timeStep * jobTypePriority) / divisor;
    
    log_DBG_m(dbg_DETAIL,
              "Job ID: " << a_iJobParams.jobID <<
                      ", type prio: " << jobTypePriority <<
                      ", part prio: " << jobPartPriority <<
                      ", phase: " << phase <<
                      ", phase fact: " << phaseFactor <<
                      ", start time: " << startTime <<
                      ", now: " << nowTime.GetTime_t() <<
                      ", time step: " << timeStep <<
                      ", divisor: " << divisor <<
                      ", modifier: " << modifier <<
                      " ==> prio: " << jobPrio);

    if (jobPrio > LONG_MAX) {
        jobPrio = LONG_MAX;
        log_DBG_m(dbg_LOW, "Priority exceeded maximum. Setting to LONG_MAX.");
    }
    a_iJobParams.jobPriority = static_cast<Int32_t>(jobPrio);
    return a_iJobParams.jobPriority;
} // rm_Queue::CalcPriority()

Here is the call graph for this function:

Here is the caller graph for this function:

rm_JobParams rm_Queue::Find ( UInt64_t  a_jobID  ) 

Definition at line 214 of file rm_queue.cpp.

References dbg_DETAIL, ie_JOBNOTFOUND, ivd_Error, log_DBG_m, log_FUNC_m, m_allocQueue_p, and m_queue_x.

                                            {
    log_FUNC_m(Find);

    log_DBG_m(dbg_DETAIL, "searching job in Queue: " << a_jobId);
    log_DBG_m(dbg_DETAIL, "Locking Queue");
    cmn_MutexLock l(m_queue_x);

    for (rm_AllocQueue_it iter = m_allocQueue_p->begin();  iter != m_allocQueue_p->end(); iter++) {
        if (iter->second.jobID == a_jobId ) {
            log_DBG_m(dbg_DETAIL,"Unlocking Queue");
            return (iter->second);
        }
    }
    throw ivd_Error(ie_JOBNOTFOUND,"no such job in queue");
}

vector< rm_JobParams > rm_Queue::GetAllJobs (  ) 

Definition at line 712 of file rm_queue.cpp.

References dbg_DETAIL, log_DBG_m, log_FUNC_m, m_allocQueue_p, and m_queue_x.

Referenced by i_ResourceManager_i::GetAllJobs().

                                          {
    log_FUNC_m(GetAllJobs);
    vector<rm_JobParams> rmJPVec;

    log_DBG_m(dbg_DETAIL,"Locking Queue");
    cmn_MutexLock l(m_queue_x);

    rm_AllocQueue_it iter;
    for ( iter = m_allocQueue_p->begin(); iter != m_allocQueue_p->end(); iter++ ) {
        rm_JobParams rmJP = iter->second;
        rmJPVec.push_back(rmJP);
    };
    return rmJPVec;
    log_DBG_m(dbg_DETAIL,"Unlocking Queue");
}

Here is the caller graph for this function:

vector< rm_JobParams > rm_Queue::GetJob ( UInt64_t  a_jobId  ) 

Definition at line 730 of file rm_queue.cpp.

References dbg_DETAIL, rm_JobParams::jobID, log_DBG_m, log_FUNC_m, m_allocQueue_p, and m_queue_x.

Referenced by i_ResourceManager_i::GetJob(), and i_ResourceManager_i::GetJobResources().

                                                      {
    log_FUNC_m(GetJob);
    vector<rm_JobParams> rmJPVec;

    log_DBG_m(dbg_DETAIL,"Locking Queue");
    cmn_MutexLock l(m_queue_x);

    rm_AllocQueue_it iter;
    for ( iter = m_allocQueue_p->begin(); iter != m_allocQueue_p->end(); iter++ ){
        rm_JobParams& rmJP = iter->second;
        if (rmJP.jobID == a_jobID){
            rmJPVec.push_back(rmJP);
        }

    };
    if (rmJPVec.empty()) {
        log_DBG_m(dbg_DETAIL,"No jobs found in rm->queue");
    }
    return rmJPVec;

}

Here is the caller graph for this function:

UInt32_t rm_Queue::GetReleasedCount (  ) 

Definition at line 241 of file rm_queue.cpp.

References m_releasedResCount, and m_resCount_x.

Referenced by Allocate().

Here is the caller graph for this function:

void rm_Queue::IncReleasedCount (  ) 

Definition at line 236 of file rm_queue.cpp.

References m_releasedResCount, and m_resCount_x.

Referenced by i_ResourceManager_i::ReleaseResource().

Here is the caller graph for this function:

void rm_Queue::Init ( i_ResourceManager_i a_iRM,
Int32_t  a_rATreshold,
Int32_t  a_timeStep,
Int32_t  a_phaseFactor 
)

Definition at line 85 of file rm_queue.cpp.

References dbg_DETAIL, log_DBG_m, log_FUNC_m, m_iRM, m_iRMvalid, m_phaseFactor, m_queueActive, m_queueExec, m_resourceAllocationTreshold, m_timeStep, and cmn_Thread::Start().

Referenced by i_ResourceManager_i::i_ResourceManager_i().

                                                                                                                {
    log_FUNC_m(Init);

    m_resourceAllocationTreshold  = a_rATreshold;
    m_timeStep                    = a_timeStep;
    m_phaseFactor                 = a_phaseFactor;
    m_iRM = a_iRM;
    m_iRMvalid = true;
    log_DBG_m(dbg_DETAIL,"Initializing Queue " << m_iRM << " "
                                         << m_resourceAllocationTreshold << " "
                                         << m_timeStep << " "
                                         << m_phaseFactor  );
    m_queueActive = true;
    m_queueExec->Start();

}

Here is the call graph for this function:

Here is the caller graph for this function:

void rm_Queue::Insert ( rm_JobParams a_iJobParams  ) 

Definition at line 179 of file rm_queue.cpp.

References CalcPriority(), dbg_DETAIL, log_DBG_m, log_FUNC_m, m_allocQueue_p, and m_queue_x.

Referenced by i_ResourceManager_i::ExchangeRecallResources(), i_ResourceManager_i::ExchangeResources(), i_ResourceManager_i::GetRecallResources(), and i_ResourceManager_i::GetResources().

                                                {
    log_FUNC_m(Insert);

    log_DBG_m(dbg_DETAIL,"inserting new job into list");
    {
        log_DBG_m(dbg_DETAIL,"Locking Queue");
        cmn_MutexLock l(m_queue_x);

        m_allocQueue_p->insert( rm_AllocPair_t( CalcPriority(a_iJobParams) , a_iJobParams));
        log_DBG_m(dbg_DETAIL,"Unlocking Queue");
    }
}

Here is the call graph for this function:

Here is the caller graph for this function:

void rm_Queue::ModifyPriority ( UInt64_t  a_jobID,
Int32_t  a_modifier 
)

Definition at line 753 of file rm_queue.cpp.

References dbg_LOW, rm_JobParams::jobID, log_DBG_m, log_FUNC_m, m_allocQueue_p, m_queue_x, rm_JobParams::priorityModifier, and Recalc().

Referenced by i_ResourceManager_i::SetPriorityModifier().

                                                                  {
    log_FUNC_m(ModifyPriority);

    cmn_MutexLock l(m_queue_x);
    log_DBG_m(dbg_LOW,"New modifier for job: " << a_jobID << " = " << a_modifier);

    rm_AllocQueue_it iter;
    for ( iter = m_allocQueue_p->begin(); iter != m_allocQueue_p->end(); iter++ ){
        rm_JobParams& rmJP = iter->second;
        if (rmJP.jobID == a_jobID){
            rmJP.priorityModifier = a_modifier;
        }
    };
    Recalc();
}

Here is the call graph for this function:

Here is the caller graph for this function:

void rm_Queue::Print (  )  [private]

Definition at line 696 of file rm_queue.cpp.

References dbg_DETAIL, localtime(), log_DBG_m, log_FUNC_m, and m_allocQueue_p.

Referenced by Recalc().

                     {
    log_FUNC_m(Print);

    struct tm * tmTime;
    log_DBG_m(dbg_DETAIL,"------------- Printing Queue (start)-------------------------" << endl);
    rm_AllocQueue_it iter;
    for ( iter = m_allocQueue_p->begin(); iter != m_allocQueue_p->end(); iter++ ){
        UInt64_t i = iter->second.jobID;
        tmTime = localtime(&(iter->second.startTime));
        log_DBG_m(dbg_DETAIL,
            "JobID:" << i << "  Priority:" << iter->second.jobPriority << " Insert Time:" << asctime(tmTime));
    };
    log_DBG_m(dbg_DETAIL, "------------- Printing Queue (end)---------------------------" << endl);

}

Here is the call graph for this function:

Here is the caller graph for this function:

void rm_Queue::Process (  )  [private]

Definition at line 454 of file rm_queue.cpp.

References Allocate(), dbg_DETAIL, dbg_NORM, log_DBG_m, log_FUNC_m, m_allocQueue_p, m_resourceAllocationTreshold, and Recalc().

Referenced by rm_QueueExecutor::Run().

                       {
    log_FUNC_m(Process);

    Recalc();

    if ( m_allocQueue_p->begin() == m_allocQueue_p->end() ) {
        log_DBG_m(dbg_NORM,"Queue is empty");
        return;
    }

    rm_AllocQueue_it iter;
    iter = m_allocQueue_p->begin();

    Int32_t tresHold = (iter->first * m_resourceAllocationTreshold) / 100;
    log_DBG_m(dbg_DETAIL, "Priority of First:   " << iter->first << endl <<
                          "Treshold at:         " << tresHold << endl <<
                          "Processing Queue:    ");

    Allocate(true); //allocate

    if ( m_allocQueue_p->begin() == m_allocQueue_p->end() ) {
        log_DBG_m(dbg_NORM,"Queue empty after Alloc online media");
        return;
    }

    Allocate(false);

}

Here is the call graph for this function:

Here is the caller graph for this function:

void rm_Queue::Recalc (  )  [private]

Definition at line 252 of file rm_queue.cpp.

References CalcPriority(), cmn_Global::dbg, dbg_DETAIL, g_cmn, log_Debugger::GetLevel(), log_DBG_m, log_FUNC_m, m_allocQueue_p, and Print().

Referenced by ModifyPriority(), Process(), and SetPhase().

                      {
    log_FUNC_m(Recalc);
    rm_AllocQueue_t *tmpQueue = new rm_AllocQueue_t();  //allocate new Queue

    // copy all jobs to new queue recalculating the priority
    log_DBG_m(dbg_DETAIL, "recalculating priority of all jobs");
    for ( rm_AllocQueue_it iter = m_allocQueue_p->begin(); iter != m_allocQueue_p->end(); iter++ ){
        tmpQueue->insert(rm_AllocPair_t((CalcPriority(iter->second) + iter->second.priorityModifier), iter->second));
        log_DBG_m(dbg_DETAIL, "inserted job:" << iter->second.jobID 
            << ", priority: " << iter->first )
    };

    // delete the old queue
    delete m_allocQueue_p;
    // set the queue pointer to the new queue
    m_allocQueue_p = tmpQueue;

    if (g_cmn.dbg.GetLevel() == dbg_DETAIL) {
        log_DBG_m(dbg_DETAIL,"Prining queue after Recalc " << endl);

        Print();
    }

}

Here is the call graph for this function:

Here is the caller graph for this function:

void rm_Queue::Remove ( UInt64_t  a_jobID  ) 

Definition at line 192 of file rm_queue.cpp.

References dbg_DETAIL, dbg_NORM, log_DBG_m, log_FUNC_m, m_allocQueue_p, and m_queue_x.

Referenced by i_ResourceManager_i::CancelGetResource().

                                      {
    log_FUNC_m(Remove);

    log_DBG_m(dbg_NORM,"removing job "<< a_jobID << " from queue");

    cmn_MutexLock l(m_queue_x);
    log_DBG_m(dbg_DETAIL,"l(m_queue_x);");

    rm_AllocQueue_it iter = m_allocQueue_p->begin();

    while (iter != m_allocQueue_p->end()) {
        if ( iter->second.jobID == a_jobID ){
            log_DBG_m(dbg_DETAIL,"erase " << a_jobID);
            m_allocQueue_p->erase(iter);
            iter = m_allocQueue_p->begin();
        } else {
            iter++;
        }
    };
    log_DBG_m(dbg_DETAIL,"Unlocking Queue");
}

Here is the caller graph for this function:

void rm_Queue::SetPhase ( UInt64_t  a_jobID,
UInt32_t  a_phase 
)

Definition at line 769 of file rm_queue.cpp.

References dbg_DETAIL, rm_JobParams::jobID, log_DBG_m, log_FUNC_m, m_allocQueue_p, m_queue_x, rm_JobParams::phase, and Recalc().

Referenced by i_ResourceManager_i::SetPhase().

                                                          {
    log_FUNC_m(SetPhase);

    cmn_MutexLock l(m_queue_x);

    log_DBG_m(dbg_DETAIL,"Setting phase of Job " << a_jobID << " to " << a_phase);
    rm_AllocQueue_it iter;
    for ( iter = m_allocQueue_p->begin(); iter != m_allocQueue_p->end(); iter++ ){
        rm_JobParams& rmJP = iter->second;
        if (rmJP.jobID == a_jobID){
            rmJP.phase = a_phase;
        }
    };
    Recalc();
}

Here is the call graph for this function:

Here is the caller graph for this function:


Friends And Related Function Documentation

friend class rm_QueueExecutor [friend]

Definition at line 344 of file rm.h.

Referenced by rm_Queue().


Member Data Documentation

Definition at line 420 of file rm.h.

Definition at line 391 of file rm.h.

Referenced by Activate(), rm_Queue(), rm_QueueExecutor::Run(), and ~rm_Queue().

bool rm_Queue::m_disable [private]

Definition at line 387 of file rm.h.

Referenced by rm_QueueExecutor::Run(), and ~rm_Queue().

bool rm_Queue::m_iRMvalid [private]

Definition at line 378 of file rm.h.

Referenced by Init().

Definition at line 418 of file rm.h.

Referenced by Allocate(), and rm_Queue().

Definition at line 351 of file rm.h.

Referenced by CalcPriority(), and Init().

bool rm_Queue::m_queueActive [private]

Definition at line 388 of file rm.h.

Referenced by Init(), ~rm_Queue(), and rm_QueueExecutor::~rm_QueueExecutor().

Definition at line 380 of file rm.h.

Referenced by Init(), and rm_Queue().

Definition at line 394 of file rm.h.

Referenced by GetReleasedCount(), and IncReleasedCount().

Definition at line 393 of file rm.h.

Referenced by GetReleasedCount(), and IncReleasedCount().

Definition at line 349 of file rm.h.

Referenced by Allocate(), Init(), and Process().

Definition at line 350 of file rm.h.

Referenced by CalcPriority(), and Init().


The documentation for this class was generated from the following files: