rm_Queue Class Reference
[Resource Manager]

#include <rm.h>

Collaboration diagram for rm_Queue:

Collaboration graph
[legend]

List of all members.


Detailed Description

Definition at line 325 of file rm.h.


Public Member Functions

 rm_Queue ()
 ~rm_Queue ()
void Insert (rm_JobParams &a_iJobParams)
void Remove (UInt64_t a_jobID)
rm_JobParams Find (UInt64_t a_jobID)
vector< rm_JobParamsGetAllJobs ()
vector< rm_JobParamsGetJob (UInt64_t a_jobId)
void Init (i_ResourceManager_i *a_iRM, Int32_t a_rATreshold, Int32_t a_timeStep, Int32_t a_phaseFactor)
Int32_t CalcPriority (rm_JobParams &a_iJobParams)
void Activate ()
void IncReleasedCount ()
UInt32_t GetReleasedCount ()
void ModifyPriority (UInt64_t a_jobID, Int32_t a_modifier)
void SetPhase (UInt64_t a_jobID, UInt32_t a_phase)

Public Attributes

rm_SysState m_SysState
time_t m_lastReported
 log_CLASSID_m

Private Member Functions

void AllocateRec (string a_partName, rm_ResourceTable_t &a_allocTable)
void AllocateMig (bool a_online, string a_partName, rm_ResourceTable_t &a_allocTable)
void AllocateAdmin (rm_ResourceTable_t &a_allocTable)
void AllocateRecovery (string a_partName, rm_ResourceTable_t &a_allocTable)
void AllocateMaint (string a_partName, rm_ResourceTable_t &a_allocTable)
void AllocateReorg (string a_partName, rm_ResourceTable_t &a_allocTable)
void Print ()
void Recalc ()
void Process ()
void Allocate (bool a_online)

Private Attributes

rm_AllocQueue_tm_allocQueue_p
Int32_t m_resourceAllocationTreshold
Int32_t m_timeStep
Int32_t m_phaseFactor
i_ResourceManager_im_iRM
bool m_iRMvalid
rm_QueueExecutorm_queueExec
bool m_disable
bool m_queueActive
cmn_Mutex m_queue_x
cmn_Semaphorem_activateQueue_s
cmn_Mutex m_resCount_x
UInt32_t m_releasedResCount

Friends

class rm_QueueExecutor

Constructor & Destructor Documentation

rm_Queue::rm_Queue (  ) 

Definition at line 33 of file rm_queue.cpp.

References log_FUNC_m, m_activateQueue_s, m_allocQueue_p, m_lastReported, m_queueExec, and rm_QueueExecutor.

00033                   :
00034     m_iRM(NULL),
00035     m_iRMvalid(false),
00036     m_disable(false),
00037     m_queueActive(false),
00038     m_releasedResCount(0)
00039 
00040 {
00041     log_FUNC_m(rm_Queue);
00042     m_allocQueue_p = new rm_AllocQueue_t();
00043     m_queueExec = new rm_QueueExecutor(*this);
00044 
00045     m_activateQueue_s = new cmn_Semaphore(0);
00046 
00047     time_t nowTime;
00048     time(&nowTime);
00049     m_lastReported = nowTime;
00050 }

rm_Queue::~rm_Queue (  ) 

Definition at line 53 of file rm_queue.cpp.

References Activate(), dbg_DETAIL, ivd_Sleep, log_DBG_m, log_FUNC_m, m_activateQueue_s, m_allocQueue_p, m_disable, m_queue_x, and m_queueActive.

00053                     {
00054     log_FUNC_m(~rm_Queue);
00055     {
00056         cmn_MutexLock l(m_queue_x);
00057         log_DBG_m(dbg_DETAIL,"Queue locked");
00058         m_disable = true; //disable queue thread
00059         Activate();
00060     }
00061     log_DBG_m(dbg_DETAIL,"Queue Unlocked");
00062 
00063     while (m_queueActive){ //wait for thread to finish
00064         log_DBG_m(dbg_DETAIL,"waiting for queue thread to end");
00065         ivd_Sleep(1);
00066     }
00067     delete m_allocQueue_p;
00068     delete m_activateQueue_s;
00069 }

Here is the call graph for this function:


Member Function Documentation

void rm_Queue::AllocateRec ( string  a_partName,
rm_ResourceTable_t a_allocTable 
) [private]

Definition at line 506 of file rm_queue.cpp.

References dbg_LOW, rm_DBOperation::Execute(), i_MAX_NUM_DRIVES, ie_DATA_CORRUPTION, ivd_Error, jt_RECALL, log_DBG_m, log_ERR_m, log_FUNC_m, ipc_Log::LogResources(), i_ResourceManager_i::m_DBThread, m_iRM, m_SysState, mf_DISK, i_ResourceManager_i::ReleaseRecallResources(), and rm_SysState::Reserve().

Referenced by Allocate(), and AllocateRecovery().

00507                                                              {
00508     log_FUNC_m(AllocateRec);
00509 
00510     if ( a_allocTable.size() == 0 ) {
00511         throw ivd_Error(ie_DATA_CORRUPTION, "Allocation Table size should be 1 ");
00512     }
00513 
00514     try {
00515         dbo_AllocateRec ar(a_allocTable[0], *(m_iRM->m_DBThread));
00516         ar.Execute();
00517         if ( !a_allocTable[0].resAllocated ) {
00518             log_DBG_m(dbg_LOW, "Recall resources NOT allocated");
00519             return;
00520         }
00521         else {
00522             if (!m_SysState.Reserve(a_partName, jt_RECALL, 1)){
00523                 //release allocated Resources only if !mt__DISK
00524                 if (a_allocTable[0].mediumFamily != mf_DISK){
00525                     i_Resource_t res;
00526                     res = a_allocTable[0].Convert2Corba();
00527                     m_iRM->ReleaseRecallResources(res);
00528                 }
00529                 a_allocTable[0].resAllocated = false;
00530                 a_allocTable[0].load = false;
00531                 a_allocTable[0].loadSlotAddr.clear();
00532                 a_allocTable[0].unloadSlotAddr.clear();
00533                 a_allocTable[0].unloadBarcode.clear();
00534                 a_allocTable[0].resourceBusyStatus = i_MAX_NUM_DRIVES;
00535                 log_DBG_m(dbg_LOW, "Reset Recall resources Bug 8395:" << endl <<
00536                     ipc_Log::LogResources(a_allocTable[0].Convert2Corba());
00537                 );
00538                 return;
00539             }
00540         }
00541     } catch (ivd_Exception& e ) {
00542         log_DBG_m(dbg_LOW, "Caught exception in rm_Queue: " << e);
00543         log_ERR_m("Caught exception in rm_Queue: " << e);
00544     }
00545 }

Here is the call graph for this function:

Here is the caller graph for this function:

void rm_Queue::AllocateMig ( bool  a_online,
string  a_partName,
rm_ResourceTable_t a_allocTable 
) [private]

Definition at line 470 of file rm_queue.cpp.

References dbg_LOW, rm_DBOperation::Execute(), i_MAX_NUM_DRIVES, jt_MIGRATION, log_DBG_m, log_ERR_m, log_FUNC_m, i_ResourceManager_i::m_DBThread, m_iRM, m_SysState, rm_SysState::Release(), and rm_SysState::Reserve().

Referenced by Allocate().

00472                                                              {
00473     log_FUNC_m(AllocateMig);
00474 
00475     if (!m_SysState.Reserve(a_partName, jt_MIGRATION, a_allocTable.size())) {
00476         a_allocTable[0].resourceBusyStatus = i_MAX_NUM_DRIVES;
00477         return;
00478     }
00479 
00480 
00481     // check usage of drives for specific partition (m_partitions[i].maxDrives > m_partitions[i].drivesInUse) &&
00482     //                                              (m_partitions[i].maxDrivesMig > m_partitions[i].drivesInUseMig)
00483 
00484     // select all drives which are free, usable ...         -
00485     // for each pool select records from RMDB which fulfill: |
00486     //                  medium belongs to Pool               |- this is done by Interbase (SQL query)
00487     //                  medium is free, empty, usable ...    |
00488     //                                                      -
00489 
00490     try {
00491         dbo_AllocateMig am(a_allocTable, a_online, *(m_iRM->m_DBThread));
00492         am.Execute();
00493     } catch (ivd_Exception& e ) {
00494         log_DBG_m(dbg_LOW, "Caught exception in rm_Queue: " << e);
00495         log_ERR_m("Caught exception in rm_Queue: " << e);
00496     }
00497 
00498     if ( !a_allocTable[0].resAllocated ){
00499         m_SysState.Release(a_partName, jt_MIGRATION, a_allocTable.size() );
00500     }
00501 
00502 }

Here is the call graph for this function:

Here is the caller graph for this function:

void rm_Queue::AllocateAdmin ( rm_ResourceTable_t a_allocTable  )  [private]

Definition at line 650 of file rm_queue.cpp.

References dbg_LOW, rm_DBOperation::Execute(), i_MAX_NUM_DRIVES, ie_INVALID_TBL_SIZE, jt_ADMIN, log_DBG_m, log_ERR_m, log_FUNC_m, i_ResourceManager_i::m_DBThread, m_iRM, m_SysState, rm_SysState::Release(), and rm_SysState::Reserve().

Referenced by Allocate().

00650                                                              {
00651     log_FUNC_m(AllocateAdmin);
00652 
00653     if (a_allocTable.size() != 1) {
00654         throw ivd_InternalError(ie_INVALID_TBL_SIZE, "Size should be exactly 1 (one) for Admin Jobs");
00655     }
00656 
00657     if (!m_SysState.Reserve(jt_ADMIN, 1 )){
00658             a_allocTable[0].resourceBusyStatus = i_MAX_NUM_DRIVES;
00659             return;
00660         }
00661 
00662     try {
00663         if (a_allocTable[0].mediumKey > 0){
00664             dbo_AllocateAdmin aa(a_allocTable, *(m_iRM->m_DBThread));
00665             aa.Execute();
00666         } else {
00667             bool a(false);
00668             dbo_AllocateMig am(a_allocTable, a, *(m_iRM->m_DBThread));
00669             am.Execute();            
00670         }
00671     } catch (ivd_Exception& e ) {
00672         log_DBG_m(dbg_LOW, "Caught exception in rm_Queue: " << e);
00673         log_ERR_m("Caught exception in rm_Queue: " << e);
00674     }
00675 
00676     if ( !a_allocTable[0].resAllocated ){
00677         m_SysState.Release(jt_ADMIN, a_allocTable.size() );
00678     }
00679 }

Here is the call graph for this function:

Here is the caller graph for this function:

void rm_Queue::AllocateRecovery ( string  a_partName,
rm_ResourceTable_t a_allocTable 
) [private]

Definition at line 549 of file rm_queue.cpp.

References AllocateRec(), rm_DBOperation::Execute(), i_MAX_NUM_DRIVES, ie_DATA_CORRUPTION, ivd_Error, jt_RECOVERY, log_ERR_m, log_FUNC_m, i_ResourceManager_i::m_DBThread, m_iRM, m_SysState, rm_SysState::Release(), and rm_SysState::Reserve().

Referenced by Allocate().

00549                                                                                    {
00550     log_FUNC_m(AllocateRec);
00551 
00552     // check if partition is registered (member of m_partitions)
00553     try {
00554         
00555         if ( a_allocTable.size() == 0 )
00556             throw ivd_Error(ie_DATA_CORRUPTION, "Allocation Table size should be > 0 ");
00557 
00558         if(!m_SysState.Reserve(a_partName, jt_RECOVERY, 1)) {
00559             a_allocTable[0].resourceBusyStatus = i_MAX_NUM_DRIVES;
00560             return;
00561         }
00562 
00563         dbo_AllocateRec ar(a_allocTable[0], *(m_iRM->m_DBThread));
00564         ar.Execute();
00565         if ( !a_allocTable[0].resAllocated ){
00566             m_SysState.Release(a_partName, jt_RECOVERY, a_allocTable.size() );
00567         }
00568 
00569     } catch (ivd_Error& e){
00570         log_ERR_m(e);
00571     }
00572 }

Here is the call graph for this function:

Here is the caller graph for this function:

void rm_Queue::AllocateMaint ( string  a_partName,
rm_ResourceTable_t a_allocTable 
) [private]

Definition at line 574 of file rm_queue.cpp.

References dbg_LOW, rm_DBOperation::Execute(), i_MAX_NUM_DRIVES, ie_DATA_CORRUPTION, ivd_Error, jt_MAINT, log_DBG_m, log_ERR_m, log_FUNC_m, i_ResourceManager_i::m_DBThread, m_iRM, m_SysState, rm_SysState::Release(), and rm_SysState::Reserve().

Referenced by Allocate().

00574                                                                                 {
00575     log_FUNC_m(AllocateMaint);
00576 
00577     try {
00578         
00579         if ( a_allocTable.size() == 0 )
00580             throw ivd_Error(ie_DATA_CORRUPTION, "Allocation Table size should be > 0 ");
00581         
00582         if (!m_SysState.Reserve(a_partName, jt_MAINT, 1)){
00583             a_allocTable[0].resourceBusyStatus = i_MAX_NUM_DRIVES;
00584             return;
00585         }
00586 
00587         // Main job may have N resources. Recall is detected by medium key.
00588         if (a_allocTable[0].mediumKey > 0){
00589             dbo_AllocateRec ar(a_allocTable[0], *(m_iRM->m_DBThread));
00590             ar.Execute();
00591         } else {
00592             bool a(true);
00593             dbo_AllocateMig am(a_allocTable, a, *(m_iRM->m_DBThread));
00594             am.Execute();
00595             if (!a_allocTable[0].resAllocated){
00596                 a = false; //now try offline too
00597                 dbo_AllocateMig am1(a_allocTable, a, *(m_iRM->m_DBThread));
00598                 am1.Execute();
00599             }
00600         }
00601 
00602         if ( !a_allocTable[0].resAllocated ){
00603             m_SysState.Release(a_partName, jt_MAINT, a_allocTable.size() );
00604         }
00605     } catch (ivd_Exception& e ) {
00606         log_DBG_m(dbg_LOW, "Caught exception in rm_Queue: " << e);
00607         log_ERR_m(e);
00608     }
00609 }

Here is the call graph for this function:

Here is the caller graph for this function:

void rm_Queue::AllocateReorg ( string  a_partName,
rm_ResourceTable_t a_allocTable 
) [private]

Definition at line 611 of file rm_queue.cpp.

References dbg_LOW, rm_DBOperation::Execute(), i_MAX_NUM_DRIVES, ie_DATA_CORRUPTION, ivd_Error, jt_MAINT, log_DBG_m, log_ERR_m, log_FUNC_m, i_ResourceManager_i::m_DBThread, m_iRM, m_SysState, rm_SysState::Release(), and rm_SysState::Reserve().

Referenced by Allocate().

00611                                                                                {
00612     log_FUNC_m(AllocateReorg);
00613 
00614     try {
00615         if ( a_allocTable.size() == 0 )
00616             throw ivd_Error(ie_DATA_CORRUPTION, "Allocation Table size should be > 0 ");
00617             
00618         if (!m_SysState.Reserve(a_partName, jt_MAINT, 1)) {
00619             a_allocTable[0].resourceBusyStatus = i_MAX_NUM_DRIVES;
00620             return;
00621         }
00622 
00623         if (a_allocTable[0].resNum == 0){
00624             dbo_AllocateRec ar(a_allocTable[0], *(m_iRM->m_DBThread));
00625             ar.Execute();
00626         } else if (a_allocTable[0].resNum == 1) {
00627             bool a(true); //try online first
00628             dbo_AllocateMig am(a_allocTable, a, *(m_iRM->m_DBThread));
00629             am.Execute();
00630             if (!a_allocTable[0].resAllocated){
00631                 a = false; //now try offline too
00632                 dbo_AllocateMig am1(a_allocTable, a, *(m_iRM->m_DBThread));
00633                 am1.Execute();
00634             }
00635         } else {
00636             throw ivd_Error(ie_DATA_CORRUPTION, "Reorg job can have only 2 resources");
00637         }
00638 
00639         if ( !a_allocTable[0].resAllocated ){
00640             m_SysState.Release(a_partName, jt_MAINT, a_allocTable.size() );
00641         }
00642 
00643     } catch (ivd_Exception& e ) {
00644         log_DBG_m(dbg_LOW, "Caught exception in rm_Queue: " << e);
00645         log_ERR_m(e);
00646     }
00647 }

Here is the call graph for this function:

Here is the caller graph for this function:

void rm_Queue::Print (  )  [private]

Definition at line 682 of file rm_queue.cpp.

References dbg_DETAIL, localtime(), log_DBG_m, log_FUNC_m, and m_allocQueue_p.

Referenced by Recalc().

00682                      {
00683     log_FUNC_m(Print);
00684 
00685     struct tm * tmTime;
00686     log_DBG_m(dbg_DETAIL,"------------- Printing Queue (start)-------------------------" << endl);
00687     rm_AllocQueue_it iter;
00688     for ( iter = m_allocQueue_p->begin(); iter != m_allocQueue_p->end(); iter++ ){
00689         UInt64_t i = iter->second.jobID;
00690         tmTime = localtime(&(iter->second.startTime));
00691         log_DBG_m(dbg_DETAIL,
00692             "JobID:" << i << "  Priority:" << iter->second.jobPriority << " Insert Time:" << asctime(tmTime));
00693     };
00694     log_DBG_m(dbg_DETAIL, "------------- Printing Queue (end)---------------------------" << endl);
00695 
00696 }

Here is the call graph for this function:

Here is the caller graph for this function:

void rm_Queue::Recalc (  )  [private]

Definition at line 238 of file rm_queue.cpp.

References CalcPriority(), cmn_Global::dbg, dbg_DETAIL, g_cmn, log_Debugger::GetLevel(), log_DBG_m, log_FUNC_m, m_allocQueue_p, and Print().

Referenced by ModifyPriority(), Process(), and SetPhase().

00238                       {
00239     log_FUNC_m(Recalc);
00240     rm_AllocQueue_t *tmpQueue = new rm_AllocQueue_t();  //allocate new Queue
00241 
00242     // copy all jobs to new queue recalculating the priority
00243     log_DBG_m(dbg_DETAIL, "recalculating priority of all jobs");
00244     for ( rm_AllocQueue_it iter = m_allocQueue_p->begin(); iter != m_allocQueue_p->end(); iter++ ){
00245         tmpQueue->insert(rm_AllocPair_t((CalcPriority(iter->second) + iter->second.priorityModifier), iter->second));
00246         log_DBG_m(dbg_DETAIL, "inserted job:" << iter->second.jobID 
00247             << ", priority: " << iter->first )
00248     };
00249 
00250     // delete the old queue
00251     delete m_allocQueue_p;
00252     // set the queue pointer to the new queue
00253     m_allocQueue_p = tmpQueue;
00254 
00255     if (g_cmn.dbg.GetLevel() == dbg_DETAIL) {
00256         log_DBG_m(dbg_DETAIL,"Prining queue after Recalc " << endl);
00257 
00258         Print();
00259     }
00260 
00261 }

Here is the call graph for this function:

Here is the caller graph for this function:

void rm_Queue::Process (  )  [private]

Definition at line 440 of file rm_queue.cpp.

References Allocate(), dbg_DETAIL, dbg_NORM, log_DBG_m, log_FUNC_m, m_allocQueue_p, m_resourceAllocationTreshold, and Recalc().

Referenced by rm_QueueExecutor::Run().

00440                        {
00441     log_FUNC_m(Process);
00442 
00443     Recalc();
00444 
00445     if ( m_allocQueue_p->begin() == m_allocQueue_p->end() ) {
00446         log_DBG_m(dbg_NORM,"Queue is empty");
00447         return;
00448     }
00449 
00450     rm_AllocQueue_it iter;
00451     iter = m_allocQueue_p->begin();
00452 
00453     Int32_t tresHold = (iter->first * m_resourceAllocationTreshold) / 100;
00454     log_DBG_m(dbg_DETAIL, "Priority of First:   " << iter->first << endl <<
00455                           "Treshold at:         " << tresHold << endl <<
00456                           "Processing Queue:    ");
00457 
00458     Allocate(true); //allocate
00459 
00460     if ( m_allocQueue_p->begin() == m_allocQueue_p->end() ) {
00461         log_DBG_m(dbg_NORM,"Queue empty after Alloc online media");
00462         return;
00463     }
00464 
00465     Allocate(false);
00466 
00467 }

Here is the call graph for this function:

Here is the caller graph for this function:

void rm_Queue::Allocate ( bool  a_online  )  [private]

Definition at line 263 of file rm_queue.cpp.

References AllocateAdmin(), AllocateMaint(), AllocateMig(), AllocateRec(), AllocateRecovery(), AllocateReorg(), i_JobParams::bufId, i_JobParams::bufType, cfg_HOUR, dbg_DETAIL, dbg_LOW, dbg_NORM, rm_SysState::GetPartStatus(), GetReleasedCount(), i_PRIORITY_BELOW_THRESHOLD, ie_NOJOBTYPE, ipc_EXEC_m, ivd_Error, ivd_JobTypeToText(), i_JobParams::jobID, i_JobParams::jobType, jt_ADMIN, jt_BACKUP, jt_MAINT, jt_MIGRATION, jt_RECALL, jt_RECOVERY, jt_REORG, log_DBG_m, log_ERR_m, log_FUNC_m, log_WriteEvent(), ipc_Log::LogResources(), m_allocQueue_p, m_iRM, m_lastReported, m_resourceAllocationTreshold, m_SysState, i_JobParams::partName, i_JobParams::phase, i_ResourceManager_i::ReleaseResource(), ipc_Log::ResourceBusy(), and cmn_Time::Time2YMDhms().

Referenced by Process().

00263                                      {
00264     log_FUNC_m(Allocate);
00265     rm_AllocQueue_it iter;
00266     iter = m_allocQueue_p->begin();
00267 
00268     Int32_t tresHold = (iter->first * m_resourceAllocationTreshold) / 100;
00269     log_DBG_m(dbg_DETAIL, "Priority of First:   " << iter->first << endl <<
00270                           "Treshold at:         " << tresHold << endl <<
00271                           "Processing Queue:    ");
00272 
00273     while (iter != m_allocQueue_p->end()) {
00274 
00275         log_DBG_m(dbg_DETAIL,"Processing Job: "<< iter->second.jobID <<
00276                              " with priority " << iter->first);
00277 
00278         time_t nowTime;
00279         time(&nowTime);
00280         if ((nowTime - m_lastReported > cfg_HOUR) &&
00281             (nowTime > iter->second.startTime + (cfg_HOUR * 6) )) { //is job waiting more than 6h
00282 
00283             ostringstream errStr;
00284             errStr << "Job:" << iter->second.jobID << " waiting for resources longer that 6h";
00285             log_ERR_m(errStr.str());
00286             log_WriteEvent(errStr.str());
00287             m_lastReported = nowTime;
00288         }
00289         rm_ResourceTable_t& resources = iter->second.m_resources;
00290         if ( iter->first < tresHold ) {
00291             if (!iter->second.m_jobInformed) {
00292                 resources[0].resourceBusyStatus = i_PRIORITY_BELOW_THRESHOLD;
00293                 log_DBG_m(dbg_NORM, "Inform job:" << 
00294                                     resources[0].resNum << " status:" << 
00295                                     ipc_Log::ResourceBusy(resources[0].resourceBusyStatus));
00296                 try {
00297                     iter->second.iJob->SetResourceBusyStatus(
00298                                                 resources[0].resNum,
00299                                                 resources[0].resourceBusyStatus);
00300                     iter->second.m_jobInformed = true;
00301                 } catch (...){
00302                     //ignore, maybe job finished
00303                 }                
00304             }
00305             iter++;
00306             continue;
00307         };
00308 
00309         if ((iter->second.jobType != jt_ADMIN) &&
00310             (iter->second.jobType != jt_BACKUP)){
00311             bool partFound(false);
00312             try {
00313                 rm_PartitionStatus ps = m_SysState.GetPartStatus(iter->second.partName);
00314                 partFound = true;
00315             } catch (ivd_Error &e){
00316                 log_DBG_m(dbg_NORM, e);
00317                 partFound = false;
00318             }
00319             if(!partFound){
00320                 log_DBG_m(dbg_LOW,"Will delete Resource Request. Partition Unregisted meanwhile");
00321 
00322                 m_allocQueue_p->erase(iter);
00323                 iter = m_allocQueue_p->begin();
00324                 break;
00325             }
00326         }
00327 
00328         if (GetReleasedCount() > 0) {
00329             if (iter != m_allocQueue_p->begin()) {
00330                 log_DBG_m(dbg_NORM, "Resource released. Start from beginning.");
00331                 iter = m_allocQueue_p->begin();
00332                 continue;
00333             }
00334         }
00335 
00336         // request resources from rmdb
00337         switch (iter->second.jobType) {
00338             case (jt_RECALL):  AllocateRec(iter->second.partName,
00339                                             iter->second.m_resources);
00340                                 break;
00341 
00342             case (jt_MIGRATION):
00343                                 AllocateMig(a_online,
00344                                             iter->second.partName,
00345                                             iter->second.m_resources);
00346                                 break;
00347 
00348 
00349             case (jt_RECOVERY):AllocateRecovery(iter->second.partName,
00350                                                  iter->second.m_resources);
00351                                 break;
00352             case (jt_ADMIN):
00353             case (jt_BACKUP):  AllocateAdmin(  iter->second.m_resources);
00354                                 break;
00355 
00356             case (jt_REORG):   AllocateReorg(  iter->second.partName,
00357                                                 iter->second.m_resources);
00358                                 break;
00359 
00360             case (jt_MAINT):   AllocateMaint(  iter->second.partName,
00361                                                 iter->second.m_resources);
00362                                 break;
00363 
00364             default:            throw ivd_Error(ie_NOJOBTYPE, "Unknown Job Type", true);
00365 
00366         };
00367         log_DBG_m(dbg_DETAIL, "Processing Job w/ID:" << iter->second.jobID);
00368         //Assign resource to Job
00369 
00370         log_DBG_m(dbg_NORM,"[JobType, Partition, StartTime, Phase]");
00371 
00372         if (resources[0].resAllocated) { //resources were allocated
00373             log_DBG_m(dbg_LOW,"Resources allocated");
00374             i_ResourceList_t iResourceList;
00375 
00376             try {
00377                 ipc_EXEC_m (
00378                     iResourceList.length(resources.size());
00379 
00380                     for (UInt32_t j = 0; j < resources.size(); j++){
00381                         iResourceList[j] = resources[j].Convert2Corba();
00382                         iResourceList[j].fileId = resources[0].fileId;
00383                     };
00384 
00385                     cmn_Time time;
00386                     time = iter->second.startTime;
00387 
00388                     log_DBG_m(dbg_NORM,
00389                         "AssignResources invoked for Job:" << iter->second.jobID <<  endl << "[" <<
00390                         ivd_JobTypeToText(iter->second.jobType) << ", " <<
00391                         iter->second.partName << ", " <<
00392                         time.Time2YMDhms() << ", " <<
00393                         iter->second.phase << "]" << endl <<
00394                         ipc_Log::LogResources(iResourceList));
00395 
00396                     iter->second.iJob->AssignResources(iResourceList);
00397                 );
00398             } catch (ivd_Error& e) {
00399                 log_ERR_m("Caught exception on AssignResources: "<< e << endl <<
00400                           "will release resources and remove Job from list");
00401                 i_JobParams jobParams;
00402                 jobParams.jobID     = iter->second.jobID;
00403                 jobParams.jobType   = iter->second.jobType;
00404                 jobParams.partName  = iter->second.partName.c_str();
00405                 jobParams.bufType   = iter->second.bufType;
00406                 jobParams.bufId     = iter->second.bufId;
00407                 jobParams.phase     = iter->second.phase;
00408 
00409                 ipc_EXEC_m(
00410                     m_iRM->ReleaseResource(jobParams, iResourceList);
00411                 );
00412 
00413             }
00414             m_allocQueue_p->erase(iter);
00415             log_DBG_m(dbg_DETAIL, "deleting allocation request.");
00416             iter = m_allocQueue_p->begin();
00417             log_DBG_m(dbg_DETAIL, "job erased from queue");
00418         } else {
00419             if (!iter->second.m_jobInformed) {
00420                 log_DBG_m(dbg_NORM, "Inform job why it is waiting:" << 
00421                                     resources[0].resNum << " status:" << 
00422                                     ipc_Log::ResourceBusy(resources[0].resourceBusyStatus));
00423                 try {    
00424                     iter->second.iJob->SetResourceBusyStatus(
00425                                                 resources[0].resNum,
00426                                                 resources[0].resourceBusyStatus);
00427                     iter->second.m_jobInformed = true;
00428                 } catch (...){
00429                     //ignore, maybe job finished
00430                 }                
00431             }
00432             iter++;
00433             log_DBG_m(dbg_DETAIL, "iter++");
00434         }
00435     }
00436 }

Here is the call graph for this function:

Here is the caller graph for this function:

void rm_Queue::Insert ( rm_JobParams a_iJobParams  ) 

Definition at line 165 of file rm_queue.cpp.

References CalcPriority(), dbg_DETAIL, log_DBG_m, log_FUNC_m, m_allocQueue_p, and m_queue_x.

Referenced by i_ResourceManager_i::ExchangeRecallResources(), i_ResourceManager_i::ExchangeResources(), i_ResourceManager_i::GetRecallResources(), and i_ResourceManager_i::GetResources().

00165                                                 {
00166     log_FUNC_m(Insert);
00167 
00168     log_DBG_m(dbg_DETAIL,"inserting new job into list");
00169     {
00170         log_DBG_m(dbg_DETAIL,"Locking Queue");
00171         cmn_MutexLock l(m_queue_x);
00172 
00173         m_allocQueue_p->insert( rm_AllocPair_t( CalcPriority(a_iJobParams) , a_iJobParams));
00174         log_DBG_m(dbg_DETAIL,"Unlocking Queue");
00175     }
00176 }

Here is the call graph for this function:

Here is the caller graph for this function:

void rm_Queue::Remove ( UInt64_t  a_jobID  ) 

Definition at line 178 of file rm_queue.cpp.

References dbg_DETAIL, dbg_NORM, log_DBG_m, log_FUNC_m, m_allocQueue_p, and m_queue_x.

Referenced by i_ResourceManager_i::CancelGetResource().

00178                                       {
00179     log_FUNC_m(Remove);
00180 
00181     log_DBG_m(dbg_NORM,"removing job "<< a_jobID << " from queue");
00182 
00183     cmn_MutexLock l(m_queue_x);
00184     log_DBG_m(dbg_DETAIL,"l(m_queue_x);");
00185 
00186     rm_AllocQueue_it iter = m_allocQueue_p->begin();
00187 
00188     while (iter != m_allocQueue_p->end()) {
00189         if ( iter->second.jobID == a_jobID ){
00190             log_DBG_m(dbg_DETAIL,"erase " << a_jobID);
00191             m_allocQueue_p->erase(iter);
00192             iter = m_allocQueue_p->begin();
00193         } else {
00194             iter++;
00195         }
00196     };
00197     log_DBG_m(dbg_DETAIL,"Unlocking Queue");
00198 }

Here is the caller graph for this function:

rm_JobParams rm_Queue::Find ( UInt64_t  a_jobID  ) 

Definition at line 200 of file rm_queue.cpp.

References dbg_DETAIL, ie_JOBNOTFOUND, ivd_Error, log_DBG_m, log_FUNC_m, m_allocQueue_p, and m_queue_x.

00200                                             {
00201     log_FUNC_m(Find);
00202 
00203     log_DBG_m(dbg_DETAIL, "searching job in Queue: " << a_jobId);
00204     log_DBG_m(dbg_DETAIL, "Locking Queue");
00205     cmn_MutexLock l(m_queue_x);
00206 
00207     for (rm_AllocQueue_it iter = m_allocQueue_p->begin();  iter != m_allocQueue_p->end(); iter++) {
00208         if (iter->second.jobID == a_jobId ) {
00209             log_DBG_m(dbg_DETAIL,"Unlocking Queue");
00210             return (iter->second);
00211         }
00212     }
00213     throw ivd_Error(ie_JOBNOTFOUND,"no such job in queue");
00214 }

vector< rm_JobParams > rm_Queue::GetAllJobs (  ) 

Definition at line 698 of file rm_queue.cpp.

References dbg_DETAIL, log_DBG_m, log_FUNC_m, m_allocQueue_p, and m_queue_x.

Referenced by i_ResourceManager_i::GetAllJobs().

00698                                           {
00699     log_FUNC_m(GetAllJobs);
00700     vector<rm_JobParams> rmJPVec;
00701 
00702     log_DBG_m(dbg_DETAIL,"Locking Queue");
00703     cmn_MutexLock l(m_queue_x);
00704 
00705     rm_AllocQueue_it iter;
00706     for ( iter = m_allocQueue_p->begin(); iter != m_allocQueue_p->end(); iter++ ) {
00707         rm_JobParams rmJP = iter->second;
00708         rmJPVec.push_back(rmJP);
00709     };
00710     return rmJPVec;
00711     log_DBG_m(dbg_DETAIL,"Unlocking Queue");
00712 }

Here is the caller graph for this function:

vector< rm_JobParams > rm_Queue::GetJob ( UInt64_t  a_jobId  ) 

Definition at line 716 of file rm_queue.cpp.

References dbg_DETAIL, rm_JobParams::jobID, log_DBG_m, log_FUNC_m, m_allocQueue_p, and m_queue_x.

Referenced by i_ResourceManager_i::GetJob(), and i_ResourceManager_i::GetJobResources().

00716                                                       {
00717     log_FUNC_m(GetJob);
00718     vector<rm_JobParams> rmJPVec;
00719 
00720     log_DBG_m(dbg_DETAIL,"Locking Queue");
00721     cmn_MutexLock l(m_queue_x);
00722 
00723     rm_AllocQueue_it iter;
00724     for ( iter = m_allocQueue_p->begin(); iter != m_allocQueue_p->end(); iter++ ){
00725         rm_JobParams& rmJP = iter->second;
00726         if (rmJP.jobID == a_jobID){
00727             rmJPVec.push_back(rmJP);
00728         }
00729 
00730     };
00731     if (rmJPVec.empty()) {
00732         log_DBG_m(dbg_DETAIL,"No jobs found in rm->queue");
00733     }
00734     return rmJPVec;
00735 
00736 }

Here is the caller graph for this function:

void rm_Queue::Init ( i_ResourceManager_i a_iRM,
Int32_t  a_rATreshold,
Int32_t  a_timeStep,
Int32_t  a_phaseFactor 
)

Definition at line 71 of file rm_queue.cpp.

References dbg_DETAIL, log_DBG_m, log_FUNC_m, m_iRM, m_iRMvalid, m_phaseFactor, m_queueActive, m_queueExec, m_resourceAllocationTreshold, m_timeStep, and cmn_Thread::Start().

Referenced by i_ResourceManager_i::i_ResourceManager_i().

00071                                                                                                                 {
00072     log_FUNC_m(Init);
00073 
00074     m_resourceAllocationTreshold  = a_rATreshold;
00075     m_timeStep                    = a_timeStep;
00076     m_phaseFactor                 = a_phaseFactor;
00077     m_iRM = a_iRM;
00078     m_iRMvalid = true;
00079     log_DBG_m(dbg_DETAIL,"Initializing Queue " << m_iRM << " "
00080                                          << m_resourceAllocationTreshold << " "
00081                                          << m_timeStep << " "
00082                                          << m_phaseFactor  );
00083     m_queueActive = true;
00084     m_queueExec->Start();
00085 
00086 }

Here is the call graph for this function:

Here is the caller graph for this function:

Int32_t rm_Queue::CalcPriority ( rm_JobParams a_iJobParams  ) 

Definition at line 90 of file rm_queue.cpp.

References dbg_DETAIL, dbg_LOW, rm_SysState::GetAdminPriority(), rm_SysState::GetBackupPriority(), rm_SysState::GetMaintPriority(), rm_SysState::GetMigrationPriority(), rm_SysState::GetPartitionPriority(), rm_SysState::GetRecallPriority(), rm_SysState::GetRecoveryPriority(), cmn_Time::GetTime_t(), ie_NOJOBTYPE, rm_JobParams::jobID, rm_JobParams::jobPriority, rm_JobParams::jobType, jt_ADMIN, jt_BACKUP, jt_MAINT, jt_MIGRATION, jt_RECALL, jt_RECOVERY, jt_REORG, log_DBG_m, log_FUNC_m, m_phaseFactor, m_SysState, m_timeStep, rm_JobParams::partName, rm_JobParams::phase, rm_JobParams::priorityModifier, and rm_JobParams::startTime.

Referenced by Insert(), and Recalc().

00090                                                         {
00091     log_FUNC_m(CalcPriority);
00092 
00093     cmn_Time nowTime;
00094     Int64_t jobTypePriority(0);
00095     Int64_t jobPartPriority(1);
00096     Int64_t phase(a_iJobParams.phase);
00097     Int64_t phaseFactor(m_phaseFactor);
00098     Int64_t startTime(a_iJobParams.startTime);
00099     Int64_t timeStep(m_timeStep);
00100     Int64_t modifier(a_iJobParams.priorityModifier);
00101     Int64_t divisor(1);
00102 
00103     Int64_t jobPrio(0);
00104 
00105     if (a_iJobParams.jobType == jt_ADMIN || a_iJobParams.jobType == jt_BACKUP) {
00106         divisor = 10000;
00107         jobPartPriority = 1;
00108     }
00109     else {
00110         divisor = 100;
00111         jobPartPriority = m_SysState.GetPartitionPriority(a_iJobParams.partName);
00112     }
00113 
00114     switch (a_iJobParams.jobType){
00115         case (jt_ADMIN):
00116             jobTypePriority = m_SysState.GetAdminPriority();
00117             break;
00118         case (jt_BACKUP):
00119             jobTypePriority = m_SysState.GetBackupPriority();
00120             break;
00121         case (jt_RECALL):
00122             jobTypePriority = m_SysState.GetRecallPriority(a_iJobParams.partName);
00123             break;
00124         case (jt_MIGRATION):
00125             jobTypePriority = m_SysState.GetMigrationPriority(a_iJobParams.partName);
00126             break;
00127         case (jt_RECOVERY):
00128             jobTypePriority = m_SysState.GetRecoveryPriority(a_iJobParams.partName);
00129             break;
00130         case (jt_MAINT):
00131         case (jt_REORG):
00132             jobTypePriority = m_SysState.GetMaintPriority(a_iJobParams.partName);
00133             break;
00134         default:
00135             throw ivd_InternalError(ie_NOJOBTYPE, "calculating priority of job in queue w/o job type");
00136     };
00137 
00138     Int64_t timeDiff(nowTime.GetTime_t() - startTime);
00139     jobPrio = jobTypePriority * jobPartPriority +
00140             phase * phaseFactor +
00141             (timeDiff * timeStep * jobTypePriority) / divisor;
00142     
00143     log_DBG_m(dbg_DETAIL,
00144               "Job ID: " << a_iJobParams.jobID <<
00145                       ", type prio: " << jobTypePriority <<
00146                       ", part prio: " << jobPartPriority <<
00147                       ", phase: " << phase <<
00148                       ", phase fact: " << phaseFactor <<
00149                       ", start time: " << startTime <<
00150                       ", now: " << nowTime.GetTime_t() <<
00151                       ", time step: " << timeStep <<
00152                       ", divisor: " << divisor <<
00153                       ", modifier: " << modifier <<
00154                       " ==> prio: " << jobPrio);
00155 
00156     if (jobPrio > LONG_MAX) {
00157         jobPrio = LONG_MAX;
00158         log_DBG_m(dbg_LOW, "Priority exceeded maximum. Setting to LONG_MAX.");
00159     }
00160     a_iJobParams.jobPriority = static_cast<Int32_t>(jobPrio);
00161     return a_iJobParams.jobPriority;
00162 } // rm_Queue::CalcPriority()

Here is the call graph for this function:

Here is the caller graph for this function:

void rm_Queue::Activate (  ) 

void rm_Queue::IncReleasedCount (  ) 

Definition at line 222 of file rm_queue.cpp.

References m_releasedResCount, and m_resCount_x.

Referenced by i_ResourceManager_i::ReleaseResource().

00222                                 {
00223     cmn_MutexLock l(m_resCount_x);
00224     ++m_releasedResCount;
00225 }

Here is the caller graph for this function:

UInt32_t rm_Queue::GetReleasedCount (  ) 

Definition at line 227 of file rm_queue.cpp.

References m_releasedResCount, and m_resCount_x.

Referenced by Allocate().

00227                                     {
00228     cmn_MutexLock l(m_resCount_x);
00229     UInt32_t count(m_releasedResCount);
00230     if (m_releasedResCount > 0) {
00231         --m_releasedResCount;
00232     }
00233     return count;
00234 }

Here is the caller graph for this function:

void rm_Queue::ModifyPriority ( UInt64_t  a_jobID,
Int32_t  a_modifier 
)

Definition at line 739 of file rm_queue.cpp.

References dbg_LOW, rm_JobParams::jobID, log_DBG_m, log_FUNC_m, m_allocQueue_p, m_queue_x, rm_JobParams::priorityModifier, and Recalc().

Referenced by i_ResourceManager_i::SetPriorityModifier().

00739                                                                   {
00740     log_FUNC_m(ModifyPriority);
00741 
00742     cmn_MutexLock l(m_queue_x);
00743     log_DBG_m(dbg_LOW,"New modifier for job: " << a_jobID << " = " << a_modifier);
00744 
00745     rm_AllocQueue_it iter;
00746     for ( iter = m_allocQueue_p->begin(); iter != m_allocQueue_p->end(); iter++ ){
00747         rm_JobParams& rmJP = iter->second;
00748         if (rmJP.jobID == a_jobID){
00749             rmJP.priorityModifier = a_modifier;
00750         }
00751     };
00752     Recalc();
00753 }

Here is the call graph for this function:

Here is the caller graph for this function:

void rm_Queue::SetPhase ( UInt64_t  a_jobID,
UInt32_t  a_phase 
)

Definition at line 755 of file rm_queue.cpp.

References dbg_DETAIL, rm_JobParams::jobID, log_DBG_m, log_FUNC_m, m_allocQueue_p, m_queue_x, rm_JobParams::phase, and Recalc().

Referenced by i_ResourceManager_i::SetPhase().

00755                                                           {
00756     log_FUNC_m(SetPhase);
00757 
00758     cmn_MutexLock l(m_queue_x);
00759 
00760     log_DBG_m(dbg_DETAIL,"Setting phase of Job " << a_jobID << " to " << a_phase);
00761     rm_AllocQueue_it iter;
00762     for ( iter = m_allocQueue_p->begin(); iter != m_allocQueue_p->end(); iter++ ){
00763         rm_JobParams& rmJP = iter->second;
00764         if (rmJP.jobID == a_jobID){
00765             rmJP.phase = a_phase;
00766         }
00767     };
00768     Recalc();
00769 }

Here is the call graph for this function:

Here is the caller graph for this function:


Friends And Related Function Documentation

friend class rm_QueueExecutor [friend]

Definition at line 330 of file rm.h.

Referenced by rm_Queue().


Member Data Documentation

Definition at line 335 of file rm.h.

Referenced by Allocate(), Init(), and Process().

Definition at line 336 of file rm.h.

Referenced by CalcPriority(), and Init().

Definition at line 337 of file rm.h.

Referenced by CalcPriority(), and Init().

bool rm_Queue::m_iRMvalid [private]

Definition at line 364 of file rm.h.

Referenced by Init().

Definition at line 366 of file rm.h.

Referenced by Init(), and rm_Queue().

bool rm_Queue::m_disable [private]

Definition at line 373 of file rm.h.

Referenced by rm_QueueExecutor::Run(), and ~rm_Queue().

bool rm_Queue::m_queueActive [private]

Definition at line 374 of file rm.h.

Referenced by Init(), ~rm_Queue(), and rm_QueueExecutor::~rm_QueueExecutor().

Definition at line 377 of file rm.h.

Referenced by Activate(), rm_Queue(), rm_QueueExecutor::Run(), and ~rm_Queue().

Definition at line 379 of file rm.h.

Referenced by GetReleasedCount(), and IncReleasedCount().

Definition at line 380 of file rm.h.

Referenced by GetReleasedCount(), and IncReleasedCount().

Definition at line 404 of file rm.h.

Referenced by Allocate(), and rm_Queue().

Definition at line 406 of file rm.h.


The documentation for this class was generated from the following files:

Generated on Mon Feb 27 19:49:00 2012 for OPENARCHIVE by  doxygen 1.5.6