#include <rm.h>

Definition at line 325 of file rm.h.
| rm_Queue::rm_Queue | ( | ) |
Definition at line 33 of file rm_queue.cpp.
References log_FUNC_m, m_activateQueue_s, m_allocQueue_p, m_lastReported, m_queueExec, and rm_QueueExecutor.
00033 : 00034 m_iRM(NULL), 00035 m_iRMvalid(false), 00036 m_disable(false), 00037 m_queueActive(false), 00038 m_releasedResCount(0) 00039 00040 { 00041 log_FUNC_m(rm_Queue); 00042 m_allocQueue_p = new rm_AllocQueue_t(); 00043 m_queueExec = new rm_QueueExecutor(*this); 00044 00045 m_activateQueue_s = new cmn_Semaphore(0); 00046 00047 time_t nowTime; 00048 time(&nowTime); 00049 m_lastReported = nowTime; 00050 }
| rm_Queue::~rm_Queue | ( | ) |
Definition at line 53 of file rm_queue.cpp.
References Activate(), dbg_DETAIL, ivd_Sleep, log_DBG_m, log_FUNC_m, m_activateQueue_s, m_allocQueue_p, m_disable, m_queue_x, and m_queueActive.
00053 { 00054 log_FUNC_m(~rm_Queue); 00055 { 00056 cmn_MutexLock l(m_queue_x); 00057 log_DBG_m(dbg_DETAIL,"Queue locked"); 00058 m_disable = true; //disable queue thread 00059 Activate(); 00060 } 00061 log_DBG_m(dbg_DETAIL,"Queue Unlocked"); 00062 00063 while (m_queueActive){ //wait for thread to finish 00064 log_DBG_m(dbg_DETAIL,"waiting for queue thread to end"); 00065 ivd_Sleep(1); 00066 } 00067 delete m_allocQueue_p; 00068 delete m_activateQueue_s; 00069 }

| void rm_Queue::AllocateRec | ( | string | a_partName, | |
| rm_ResourceTable_t & | a_allocTable | |||
| ) | [private] |
Definition at line 506 of file rm_queue.cpp.
References dbg_LOW, rm_DBOperation::Execute(), i_MAX_NUM_DRIVES, ie_DATA_CORRUPTION, ivd_Error, jt_RECALL, log_DBG_m, log_ERR_m, log_FUNC_m, ipc_Log::LogResources(), i_ResourceManager_i::m_DBThread, m_iRM, m_SysState, mf_DISK, i_ResourceManager_i::ReleaseRecallResources(), and rm_SysState::Reserve().
Referenced by Allocate(), and AllocateRecovery().
00507 { 00508 log_FUNC_m(AllocateRec); 00509 00510 if ( a_allocTable.size() == 0 ) { 00511 throw ivd_Error(ie_DATA_CORRUPTION, "Allocation Table size should be 1 "); 00512 } 00513 00514 try { 00515 dbo_AllocateRec ar(a_allocTable[0], *(m_iRM->m_DBThread)); 00516 ar.Execute(); 00517 if ( !a_allocTable[0].resAllocated ) { 00518 log_DBG_m(dbg_LOW, "Recall resources NOT allocated"); 00519 return; 00520 } 00521 else { 00522 if (!m_SysState.Reserve(a_partName, jt_RECALL, 1)){ 00523 //release allocated Resources only if !mt__DISK 00524 if (a_allocTable[0].mediumFamily != mf_DISK){ 00525 i_Resource_t res; 00526 res = a_allocTable[0].Convert2Corba(); 00527 m_iRM->ReleaseRecallResources(res); 00528 } 00529 a_allocTable[0].resAllocated = false; 00530 a_allocTable[0].load = false; 00531 a_allocTable[0].loadSlotAddr.clear(); 00532 a_allocTable[0].unloadSlotAddr.clear(); 00533 a_allocTable[0].unloadBarcode.clear(); 00534 a_allocTable[0].resourceBusyStatus = i_MAX_NUM_DRIVES; 00535 log_DBG_m(dbg_LOW, "Reset Recall resources Bug 8395:" << endl << 00536 ipc_Log::LogResources(a_allocTable[0].Convert2Corba()); 00537 ); 00538 return; 00539 } 00540 } 00541 } catch (ivd_Exception& e ) { 00542 log_DBG_m(dbg_LOW, "Caught exception in rm_Queue: " << e); 00543 log_ERR_m("Caught exception in rm_Queue: " << e); 00544 } 00545 }


| void rm_Queue::AllocateMig | ( | bool | a_online, | |
| string | a_partName, | |||
| rm_ResourceTable_t & | a_allocTable | |||
| ) | [private] |
Definition at line 470 of file rm_queue.cpp.
References dbg_LOW, rm_DBOperation::Execute(), i_MAX_NUM_DRIVES, jt_MIGRATION, log_DBG_m, log_ERR_m, log_FUNC_m, i_ResourceManager_i::m_DBThread, m_iRM, m_SysState, rm_SysState::Release(), and rm_SysState::Reserve().
Referenced by Allocate().
00472 { 00473 log_FUNC_m(AllocateMig); 00474 00475 if (!m_SysState.Reserve(a_partName, jt_MIGRATION, a_allocTable.size())) { 00476 a_allocTable[0].resourceBusyStatus = i_MAX_NUM_DRIVES; 00477 return; 00478 } 00479 00480 00481 // check usage of drives for specific partition (m_partitions[i].maxDrives > m_partitions[i].drivesInUse) && 00482 // (m_partitions[i].maxDrivesMig > m_partitions[i].drivesInUseMig) 00483 00484 // select all drives which are free, usable ... - 00485 // for each pool select records from RMDB which fulfill: | 00486 // medium belongs to Pool |- this is done by Interbase (SQL query) 00487 // medium is free, empty, usable ... | 00488 // - 00489 00490 try { 00491 dbo_AllocateMig am(a_allocTable, a_online, *(m_iRM->m_DBThread)); 00492 am.Execute(); 00493 } catch (ivd_Exception& e ) { 00494 log_DBG_m(dbg_LOW, "Caught exception in rm_Queue: " << e); 00495 log_ERR_m("Caught exception in rm_Queue: " << e); 00496 } 00497 00498 if ( !a_allocTable[0].resAllocated ){ 00499 m_SysState.Release(a_partName, jt_MIGRATION, a_allocTable.size() ); 00500 } 00501 00502 }


| void rm_Queue::AllocateAdmin | ( | rm_ResourceTable_t & | a_allocTable | ) | [private] |
Definition at line 650 of file rm_queue.cpp.
References dbg_LOW, rm_DBOperation::Execute(), i_MAX_NUM_DRIVES, ie_INVALID_TBL_SIZE, jt_ADMIN, log_DBG_m, log_ERR_m, log_FUNC_m, i_ResourceManager_i::m_DBThread, m_iRM, m_SysState, rm_SysState::Release(), and rm_SysState::Reserve().
Referenced by Allocate().
00650 { 00651 log_FUNC_m(AllocateAdmin); 00652 00653 if (a_allocTable.size() != 1) { 00654 throw ivd_InternalError(ie_INVALID_TBL_SIZE, "Size should be exactly 1 (one) for Admin Jobs"); 00655 } 00656 00657 if (!m_SysState.Reserve(jt_ADMIN, 1 )){ 00658 a_allocTable[0].resourceBusyStatus = i_MAX_NUM_DRIVES; 00659 return; 00660 } 00661 00662 try { 00663 if (a_allocTable[0].mediumKey > 0){ 00664 dbo_AllocateAdmin aa(a_allocTable, *(m_iRM->m_DBThread)); 00665 aa.Execute(); 00666 } else { 00667 bool a(false); 00668 dbo_AllocateMig am(a_allocTable, a, *(m_iRM->m_DBThread)); 00669 am.Execute(); 00670 } 00671 } catch (ivd_Exception& e ) { 00672 log_DBG_m(dbg_LOW, "Caught exception in rm_Queue: " << e); 00673 log_ERR_m("Caught exception in rm_Queue: " << e); 00674 } 00675 00676 if ( !a_allocTable[0].resAllocated ){ 00677 m_SysState.Release(jt_ADMIN, a_allocTable.size() ); 00678 } 00679 }


| void rm_Queue::AllocateRecovery | ( | string | a_partName, | |
| rm_ResourceTable_t & | a_allocTable | |||
| ) | [private] |
Definition at line 549 of file rm_queue.cpp.
References AllocateRec(), rm_DBOperation::Execute(), i_MAX_NUM_DRIVES, ie_DATA_CORRUPTION, ivd_Error, jt_RECOVERY, log_ERR_m, log_FUNC_m, i_ResourceManager_i::m_DBThread, m_iRM, m_SysState, rm_SysState::Release(), and rm_SysState::Reserve().
Referenced by Allocate().
00549 { 00550 log_FUNC_m(AllocateRec); 00551 00552 // check if partition is registered (member of m_partitions) 00553 try { 00554 00555 if ( a_allocTable.size() == 0 ) 00556 throw ivd_Error(ie_DATA_CORRUPTION, "Allocation Table size should be > 0 "); 00557 00558 if(!m_SysState.Reserve(a_partName, jt_RECOVERY, 1)) { 00559 a_allocTable[0].resourceBusyStatus = i_MAX_NUM_DRIVES; 00560 return; 00561 } 00562 00563 dbo_AllocateRec ar(a_allocTable[0], *(m_iRM->m_DBThread)); 00564 ar.Execute(); 00565 if ( !a_allocTable[0].resAllocated ){ 00566 m_SysState.Release(a_partName, jt_RECOVERY, a_allocTable.size() ); 00567 } 00568 00569 } catch (ivd_Error& e){ 00570 log_ERR_m(e); 00571 } 00572 }


| void rm_Queue::AllocateMaint | ( | string | a_partName, | |
| rm_ResourceTable_t & | a_allocTable | |||
| ) | [private] |
Definition at line 574 of file rm_queue.cpp.
References dbg_LOW, rm_DBOperation::Execute(), i_MAX_NUM_DRIVES, ie_DATA_CORRUPTION, ivd_Error, jt_MAINT, log_DBG_m, log_ERR_m, log_FUNC_m, i_ResourceManager_i::m_DBThread, m_iRM, m_SysState, rm_SysState::Release(), and rm_SysState::Reserve().
Referenced by Allocate().
00574 { 00575 log_FUNC_m(AllocateMaint); 00576 00577 try { 00578 00579 if ( a_allocTable.size() == 0 ) 00580 throw ivd_Error(ie_DATA_CORRUPTION, "Allocation Table size should be > 0 "); 00581 00582 if (!m_SysState.Reserve(a_partName, jt_MAINT, 1)){ 00583 a_allocTable[0].resourceBusyStatus = i_MAX_NUM_DRIVES; 00584 return; 00585 } 00586 00587 // Main job may have N resources. Recall is detected by medium key. 00588 if (a_allocTable[0].mediumKey > 0){ 00589 dbo_AllocateRec ar(a_allocTable[0], *(m_iRM->m_DBThread)); 00590 ar.Execute(); 00591 } else { 00592 bool a(true); 00593 dbo_AllocateMig am(a_allocTable, a, *(m_iRM->m_DBThread)); 00594 am.Execute(); 00595 if (!a_allocTable[0].resAllocated){ 00596 a = false; //now try offline too 00597 dbo_AllocateMig am1(a_allocTable, a, *(m_iRM->m_DBThread)); 00598 am1.Execute(); 00599 } 00600 } 00601 00602 if ( !a_allocTable[0].resAllocated ){ 00603 m_SysState.Release(a_partName, jt_MAINT, a_allocTable.size() ); 00604 } 00605 } catch (ivd_Exception& e ) { 00606 log_DBG_m(dbg_LOW, "Caught exception in rm_Queue: " << e); 00607 log_ERR_m(e); 00608 } 00609 }


| void rm_Queue::AllocateReorg | ( | string | a_partName, | |
| rm_ResourceTable_t & | a_allocTable | |||
| ) | [private] |
Definition at line 611 of file rm_queue.cpp.
References dbg_LOW, rm_DBOperation::Execute(), i_MAX_NUM_DRIVES, ie_DATA_CORRUPTION, ivd_Error, jt_MAINT, log_DBG_m, log_ERR_m, log_FUNC_m, i_ResourceManager_i::m_DBThread, m_iRM, m_SysState, rm_SysState::Release(), and rm_SysState::Reserve().
Referenced by Allocate().
00611 { 00612 log_FUNC_m(AllocateReorg); 00613 00614 try { 00615 if ( a_allocTable.size() == 0 ) 00616 throw ivd_Error(ie_DATA_CORRUPTION, "Allocation Table size should be > 0 "); 00617 00618 if (!m_SysState.Reserve(a_partName, jt_MAINT, 1)) { 00619 a_allocTable[0].resourceBusyStatus = i_MAX_NUM_DRIVES; 00620 return; 00621 } 00622 00623 if (a_allocTable[0].resNum == 0){ 00624 dbo_AllocateRec ar(a_allocTable[0], *(m_iRM->m_DBThread)); 00625 ar.Execute(); 00626 } else if (a_allocTable[0].resNum == 1) { 00627 bool a(true); //try online first 00628 dbo_AllocateMig am(a_allocTable, a, *(m_iRM->m_DBThread)); 00629 am.Execute(); 00630 if (!a_allocTable[0].resAllocated){ 00631 a = false; //now try offline too 00632 dbo_AllocateMig am1(a_allocTable, a, *(m_iRM->m_DBThread)); 00633 am1.Execute(); 00634 } 00635 } else { 00636 throw ivd_Error(ie_DATA_CORRUPTION, "Reorg job can have only 2 resources"); 00637 } 00638 00639 if ( !a_allocTable[0].resAllocated ){ 00640 m_SysState.Release(a_partName, jt_MAINT, a_allocTable.size() ); 00641 } 00642 00643 } catch (ivd_Exception& e ) { 00644 log_DBG_m(dbg_LOW, "Caught exception in rm_Queue: " << e); 00645 log_ERR_m(e); 00646 } 00647 }


| void rm_Queue::Print | ( | ) | [private] |
Definition at line 682 of file rm_queue.cpp.
References dbg_DETAIL, localtime(), log_DBG_m, log_FUNC_m, and m_allocQueue_p.
Referenced by Recalc().
00682 { 00683 log_FUNC_m(Print); 00684 00685 struct tm * tmTime; 00686 log_DBG_m(dbg_DETAIL,"------------- Printing Queue (start)-------------------------" << endl); 00687 rm_AllocQueue_it iter; 00688 for ( iter = m_allocQueue_p->begin(); iter != m_allocQueue_p->end(); iter++ ){ 00689 UInt64_t i = iter->second.jobID; 00690 tmTime = localtime(&(iter->second.startTime)); 00691 log_DBG_m(dbg_DETAIL, 00692 "JobID:" << i << " Priority:" << iter->second.jobPriority << " Insert Time:" << asctime(tmTime)); 00693 }; 00694 log_DBG_m(dbg_DETAIL, "------------- Printing Queue (end)---------------------------" << endl); 00695 00696 }


| void rm_Queue::Recalc | ( | ) | [private] |
Definition at line 238 of file rm_queue.cpp.
References CalcPriority(), cmn_Global::dbg, dbg_DETAIL, g_cmn, log_Debugger::GetLevel(), log_DBG_m, log_FUNC_m, m_allocQueue_p, and Print().
Referenced by ModifyPriority(), Process(), and SetPhase().
00238 { 00239 log_FUNC_m(Recalc); 00240 rm_AllocQueue_t *tmpQueue = new rm_AllocQueue_t(); //allocate new Queue 00241 00242 // copy all jobs to new queue recalculating the priority 00243 log_DBG_m(dbg_DETAIL, "recalculating priority of all jobs"); 00244 for ( rm_AllocQueue_it iter = m_allocQueue_p->begin(); iter != m_allocQueue_p->end(); iter++ ){ 00245 tmpQueue->insert(rm_AllocPair_t((CalcPriority(iter->second) + iter->second.priorityModifier), iter->second)); 00246 log_DBG_m(dbg_DETAIL, "inserted job:" << iter->second.jobID 00247 << ", priority: " << iter->first ) 00248 }; 00249 00250 // delete the old queue 00251 delete m_allocQueue_p; 00252 // set the queue pointer to the new queue 00253 m_allocQueue_p = tmpQueue; 00254 00255 if (g_cmn.dbg.GetLevel() == dbg_DETAIL) { 00256 log_DBG_m(dbg_DETAIL,"Prining queue after Recalc " << endl); 00257 00258 Print(); 00259 } 00260 00261 }


| void rm_Queue::Process | ( | ) | [private] |
Definition at line 440 of file rm_queue.cpp.
References Allocate(), dbg_DETAIL, dbg_NORM, log_DBG_m, log_FUNC_m, m_allocQueue_p, m_resourceAllocationTreshold, and Recalc().
Referenced by rm_QueueExecutor::Run().
00440 { 00441 log_FUNC_m(Process); 00442 00443 Recalc(); 00444 00445 if ( m_allocQueue_p->begin() == m_allocQueue_p->end() ) { 00446 log_DBG_m(dbg_NORM,"Queue is empty"); 00447 return; 00448 } 00449 00450 rm_AllocQueue_it iter; 00451 iter = m_allocQueue_p->begin(); 00452 00453 Int32_t tresHold = (iter->first * m_resourceAllocationTreshold) / 100; 00454 log_DBG_m(dbg_DETAIL, "Priority of First: " << iter->first << endl << 00455 "Treshold at: " << tresHold << endl << 00456 "Processing Queue: "); 00457 00458 Allocate(true); //allocate 00459 00460 if ( m_allocQueue_p->begin() == m_allocQueue_p->end() ) { 00461 log_DBG_m(dbg_NORM,"Queue empty after Alloc online media"); 00462 return; 00463 } 00464 00465 Allocate(false); 00466 00467 }


| void rm_Queue::Allocate | ( | bool | a_online | ) | [private] |
Definition at line 263 of file rm_queue.cpp.
References AllocateAdmin(), AllocateMaint(), AllocateMig(), AllocateRec(), AllocateRecovery(), AllocateReorg(), i_JobParams::bufId, i_JobParams::bufType, cfg_HOUR, dbg_DETAIL, dbg_LOW, dbg_NORM, rm_SysState::GetPartStatus(), GetReleasedCount(), i_PRIORITY_BELOW_THRESHOLD, ie_NOJOBTYPE, ipc_EXEC_m, ivd_Error, ivd_JobTypeToText(), i_JobParams::jobID, i_JobParams::jobType, jt_ADMIN, jt_BACKUP, jt_MAINT, jt_MIGRATION, jt_RECALL, jt_RECOVERY, jt_REORG, log_DBG_m, log_ERR_m, log_FUNC_m, log_WriteEvent(), ipc_Log::LogResources(), m_allocQueue_p, m_iRM, m_lastReported, m_resourceAllocationTreshold, m_SysState, i_JobParams::partName, i_JobParams::phase, i_ResourceManager_i::ReleaseResource(), ipc_Log::ResourceBusy(), and cmn_Time::Time2YMDhms().
Referenced by Process().
00263 { 00264 log_FUNC_m(Allocate); 00265 rm_AllocQueue_it iter; 00266 iter = m_allocQueue_p->begin(); 00267 00268 Int32_t tresHold = (iter->first * m_resourceAllocationTreshold) / 100; 00269 log_DBG_m(dbg_DETAIL, "Priority of First: " << iter->first << endl << 00270 "Treshold at: " << tresHold << endl << 00271 "Processing Queue: "); 00272 00273 while (iter != m_allocQueue_p->end()) { 00274 00275 log_DBG_m(dbg_DETAIL,"Processing Job: "<< iter->second.jobID << 00276 " with priority " << iter->first); 00277 00278 time_t nowTime; 00279 time(&nowTime); 00280 if ((nowTime - m_lastReported > cfg_HOUR) && 00281 (nowTime > iter->second.startTime + (cfg_HOUR * 6) )) { //is job waiting more than 6h 00282 00283 ostringstream errStr; 00284 errStr << "Job:" << iter->second.jobID << " waiting for resources longer that 6h"; 00285 log_ERR_m(errStr.str()); 00286 log_WriteEvent(errStr.str()); 00287 m_lastReported = nowTime; 00288 } 00289 rm_ResourceTable_t& resources = iter->second.m_resources; 00290 if ( iter->first < tresHold ) { 00291 if (!iter->second.m_jobInformed) { 00292 resources[0].resourceBusyStatus = i_PRIORITY_BELOW_THRESHOLD; 00293 log_DBG_m(dbg_NORM, "Inform job:" << 00294 resources[0].resNum << " status:" << 00295 ipc_Log::ResourceBusy(resources[0].resourceBusyStatus)); 00296 try { 00297 iter->second.iJob->SetResourceBusyStatus( 00298 resources[0].resNum, 00299 resources[0].resourceBusyStatus); 00300 iter->second.m_jobInformed = true; 00301 } catch (...){ 00302 //ignore, maybe job finished 00303 } 00304 } 00305 iter++; 00306 continue; 00307 }; 00308 00309 if ((iter->second.jobType != jt_ADMIN) && 00310 (iter->second.jobType != jt_BACKUP)){ 00311 bool partFound(false); 00312 try { 00313 rm_PartitionStatus ps = m_SysState.GetPartStatus(iter->second.partName); 00314 partFound = true; 00315 } catch (ivd_Error &e){ 00316 log_DBG_m(dbg_NORM, e); 00317 partFound = false; 00318 } 00319 if(!partFound){ 00320 log_DBG_m(dbg_LOW,"Will delete Resource Request. Partition Unregisted meanwhile"); 00321 00322 m_allocQueue_p->erase(iter); 00323 iter = m_allocQueue_p->begin(); 00324 break; 00325 } 00326 } 00327 00328 if (GetReleasedCount() > 0) { 00329 if (iter != m_allocQueue_p->begin()) { 00330 log_DBG_m(dbg_NORM, "Resource released. Start from beginning."); 00331 iter = m_allocQueue_p->begin(); 00332 continue; 00333 } 00334 } 00335 00336 // request resources from rmdb 00337 switch (iter->second.jobType) { 00338 case (jt_RECALL): AllocateRec(iter->second.partName, 00339 iter->second.m_resources); 00340 break; 00341 00342 case (jt_MIGRATION): 00343 AllocateMig(a_online, 00344 iter->second.partName, 00345 iter->second.m_resources); 00346 break; 00347 00348 00349 case (jt_RECOVERY):AllocateRecovery(iter->second.partName, 00350 iter->second.m_resources); 00351 break; 00352 case (jt_ADMIN): 00353 case (jt_BACKUP): AllocateAdmin( iter->second.m_resources); 00354 break; 00355 00356 case (jt_REORG): AllocateReorg( iter->second.partName, 00357 iter->second.m_resources); 00358 break; 00359 00360 case (jt_MAINT): AllocateMaint( iter->second.partName, 00361 iter->second.m_resources); 00362 break; 00363 00364 default: throw ivd_Error(ie_NOJOBTYPE, "Unknown Job Type", true); 00365 00366 }; 00367 log_DBG_m(dbg_DETAIL, "Processing Job w/ID:" << iter->second.jobID); 00368 //Assign resource to Job 00369 00370 log_DBG_m(dbg_NORM,"[JobType, Partition, StartTime, Phase]"); 00371 00372 if (resources[0].resAllocated) { //resources were allocated 00373 log_DBG_m(dbg_LOW,"Resources allocated"); 00374 i_ResourceList_t iResourceList; 00375 00376 try { 00377 ipc_EXEC_m ( 00378 iResourceList.length(resources.size()); 00379 00380 for (UInt32_t j = 0; j < resources.size(); j++){ 00381 iResourceList[j] = resources[j].Convert2Corba(); 00382 iResourceList[j].fileId = resources[0].fileId; 00383 }; 00384 00385 cmn_Time time; 00386 time = iter->second.startTime; 00387 00388 log_DBG_m(dbg_NORM, 00389 "AssignResources invoked for Job:" << iter->second.jobID << endl << "[" << 00390 ivd_JobTypeToText(iter->second.jobType) << ", " << 00391 iter->second.partName << ", " << 00392 time.Time2YMDhms() << ", " << 00393 iter->second.phase << "]" << endl << 00394 ipc_Log::LogResources(iResourceList)); 00395 00396 iter->second.iJob->AssignResources(iResourceList); 00397 ); 00398 } catch (ivd_Error& e) { 00399 log_ERR_m("Caught exception on AssignResources: "<< e << endl << 00400 "will release resources and remove Job from list"); 00401 i_JobParams jobParams; 00402 jobParams.jobID = iter->second.jobID; 00403 jobParams.jobType = iter->second.jobType; 00404 jobParams.partName = iter->second.partName.c_str(); 00405 jobParams.bufType = iter->second.bufType; 00406 jobParams.bufId = iter->second.bufId; 00407 jobParams.phase = iter->second.phase; 00408 00409 ipc_EXEC_m( 00410 m_iRM->ReleaseResource(jobParams, iResourceList); 00411 ); 00412 00413 } 00414 m_allocQueue_p->erase(iter); 00415 log_DBG_m(dbg_DETAIL, "deleting allocation request."); 00416 iter = m_allocQueue_p->begin(); 00417 log_DBG_m(dbg_DETAIL, "job erased from queue"); 00418 } else { 00419 if (!iter->second.m_jobInformed) { 00420 log_DBG_m(dbg_NORM, "Inform job why it is waiting:" << 00421 resources[0].resNum << " status:" << 00422 ipc_Log::ResourceBusy(resources[0].resourceBusyStatus)); 00423 try { 00424 iter->second.iJob->SetResourceBusyStatus( 00425 resources[0].resNum, 00426 resources[0].resourceBusyStatus); 00427 iter->second.m_jobInformed = true; 00428 } catch (...){ 00429 //ignore, maybe job finished 00430 } 00431 } 00432 iter++; 00433 log_DBG_m(dbg_DETAIL, "iter++"); 00434 } 00435 } 00436 }


| void rm_Queue::Insert | ( | rm_JobParams & | a_iJobParams | ) |
Definition at line 165 of file rm_queue.cpp.
References CalcPriority(), dbg_DETAIL, log_DBG_m, log_FUNC_m, m_allocQueue_p, and m_queue_x.
Referenced by i_ResourceManager_i::ExchangeRecallResources(), i_ResourceManager_i::ExchangeResources(), i_ResourceManager_i::GetRecallResources(), and i_ResourceManager_i::GetResources().
00165 { 00166 log_FUNC_m(Insert); 00167 00168 log_DBG_m(dbg_DETAIL,"inserting new job into list"); 00169 { 00170 log_DBG_m(dbg_DETAIL,"Locking Queue"); 00171 cmn_MutexLock l(m_queue_x); 00172 00173 m_allocQueue_p->insert( rm_AllocPair_t( CalcPriority(a_iJobParams) , a_iJobParams)); 00174 log_DBG_m(dbg_DETAIL,"Unlocking Queue"); 00175 } 00176 }


| void rm_Queue::Remove | ( | UInt64_t | a_jobID | ) |
Definition at line 178 of file rm_queue.cpp.
References dbg_DETAIL, dbg_NORM, log_DBG_m, log_FUNC_m, m_allocQueue_p, and m_queue_x.
Referenced by i_ResourceManager_i::CancelGetResource().
00178 { 00179 log_FUNC_m(Remove); 00180 00181 log_DBG_m(dbg_NORM,"removing job "<< a_jobID << " from queue"); 00182 00183 cmn_MutexLock l(m_queue_x); 00184 log_DBG_m(dbg_DETAIL,"l(m_queue_x);"); 00185 00186 rm_AllocQueue_it iter = m_allocQueue_p->begin(); 00187 00188 while (iter != m_allocQueue_p->end()) { 00189 if ( iter->second.jobID == a_jobID ){ 00190 log_DBG_m(dbg_DETAIL,"erase " << a_jobID); 00191 m_allocQueue_p->erase(iter); 00192 iter = m_allocQueue_p->begin(); 00193 } else { 00194 iter++; 00195 } 00196 }; 00197 log_DBG_m(dbg_DETAIL,"Unlocking Queue"); 00198 }

| rm_JobParams rm_Queue::Find | ( | UInt64_t | a_jobID | ) |
Definition at line 200 of file rm_queue.cpp.
References dbg_DETAIL, ie_JOBNOTFOUND, ivd_Error, log_DBG_m, log_FUNC_m, m_allocQueue_p, and m_queue_x.
00200 { 00201 log_FUNC_m(Find); 00202 00203 log_DBG_m(dbg_DETAIL, "searching job in Queue: " << a_jobId); 00204 log_DBG_m(dbg_DETAIL, "Locking Queue"); 00205 cmn_MutexLock l(m_queue_x); 00206 00207 for (rm_AllocQueue_it iter = m_allocQueue_p->begin(); iter != m_allocQueue_p->end(); iter++) { 00208 if (iter->second.jobID == a_jobId ) { 00209 log_DBG_m(dbg_DETAIL,"Unlocking Queue"); 00210 return (iter->second); 00211 } 00212 } 00213 throw ivd_Error(ie_JOBNOTFOUND,"no such job in queue"); 00214 }
| vector< rm_JobParams > rm_Queue::GetAllJobs | ( | ) |
Definition at line 698 of file rm_queue.cpp.
References dbg_DETAIL, log_DBG_m, log_FUNC_m, m_allocQueue_p, and m_queue_x.
Referenced by i_ResourceManager_i::GetAllJobs().
00698 { 00699 log_FUNC_m(GetAllJobs); 00700 vector<rm_JobParams> rmJPVec; 00701 00702 log_DBG_m(dbg_DETAIL,"Locking Queue"); 00703 cmn_MutexLock l(m_queue_x); 00704 00705 rm_AllocQueue_it iter; 00706 for ( iter = m_allocQueue_p->begin(); iter != m_allocQueue_p->end(); iter++ ) { 00707 rm_JobParams rmJP = iter->second; 00708 rmJPVec.push_back(rmJP); 00709 }; 00710 return rmJPVec; 00711 log_DBG_m(dbg_DETAIL,"Unlocking Queue"); 00712 }

| vector< rm_JobParams > rm_Queue::GetJob | ( | UInt64_t | a_jobId | ) |
Definition at line 716 of file rm_queue.cpp.
References dbg_DETAIL, rm_JobParams::jobID, log_DBG_m, log_FUNC_m, m_allocQueue_p, and m_queue_x.
Referenced by i_ResourceManager_i::GetJob(), and i_ResourceManager_i::GetJobResources().
00716 { 00717 log_FUNC_m(GetJob); 00718 vector<rm_JobParams> rmJPVec; 00719 00720 log_DBG_m(dbg_DETAIL,"Locking Queue"); 00721 cmn_MutexLock l(m_queue_x); 00722 00723 rm_AllocQueue_it iter; 00724 for ( iter = m_allocQueue_p->begin(); iter != m_allocQueue_p->end(); iter++ ){ 00725 rm_JobParams& rmJP = iter->second; 00726 if (rmJP.jobID == a_jobID){ 00727 rmJPVec.push_back(rmJP); 00728 } 00729 00730 }; 00731 if (rmJPVec.empty()) { 00732 log_DBG_m(dbg_DETAIL,"No jobs found in rm->queue"); 00733 } 00734 return rmJPVec; 00735 00736 }

| void rm_Queue::Init | ( | i_ResourceManager_i * | a_iRM, | |
| Int32_t | a_rATreshold, | |||
| Int32_t | a_timeStep, | |||
| Int32_t | a_phaseFactor | |||
| ) |
Definition at line 71 of file rm_queue.cpp.
References dbg_DETAIL, log_DBG_m, log_FUNC_m, m_iRM, m_iRMvalid, m_phaseFactor, m_queueActive, m_queueExec, m_resourceAllocationTreshold, m_timeStep, and cmn_Thread::Start().
Referenced by i_ResourceManager_i::i_ResourceManager_i().
00071 { 00072 log_FUNC_m(Init); 00073 00074 m_resourceAllocationTreshold = a_rATreshold; 00075 m_timeStep = a_timeStep; 00076 m_phaseFactor = a_phaseFactor; 00077 m_iRM = a_iRM; 00078 m_iRMvalid = true; 00079 log_DBG_m(dbg_DETAIL,"Initializing Queue " << m_iRM << " " 00080 << m_resourceAllocationTreshold << " " 00081 << m_timeStep << " " 00082 << m_phaseFactor ); 00083 m_queueActive = true; 00084 m_queueExec->Start(); 00085 00086 }


| Int32_t rm_Queue::CalcPriority | ( | rm_JobParams & | a_iJobParams | ) |
Definition at line 90 of file rm_queue.cpp.
References dbg_DETAIL, dbg_LOW, rm_SysState::GetAdminPriority(), rm_SysState::GetBackupPriority(), rm_SysState::GetMaintPriority(), rm_SysState::GetMigrationPriority(), rm_SysState::GetPartitionPriority(), rm_SysState::GetRecallPriority(), rm_SysState::GetRecoveryPriority(), cmn_Time::GetTime_t(), ie_NOJOBTYPE, rm_JobParams::jobID, rm_JobParams::jobPriority, rm_JobParams::jobType, jt_ADMIN, jt_BACKUP, jt_MAINT, jt_MIGRATION, jt_RECALL, jt_RECOVERY, jt_REORG, log_DBG_m, log_FUNC_m, m_phaseFactor, m_SysState, m_timeStep, rm_JobParams::partName, rm_JobParams::phase, rm_JobParams::priorityModifier, and rm_JobParams::startTime.
Referenced by Insert(), and Recalc().
00090 { 00091 log_FUNC_m(CalcPriority); 00092 00093 cmn_Time nowTime; 00094 Int64_t jobTypePriority(0); 00095 Int64_t jobPartPriority(1); 00096 Int64_t phase(a_iJobParams.phase); 00097 Int64_t phaseFactor(m_phaseFactor); 00098 Int64_t startTime(a_iJobParams.startTime); 00099 Int64_t timeStep(m_timeStep); 00100 Int64_t modifier(a_iJobParams.priorityModifier); 00101 Int64_t divisor(1); 00102 00103 Int64_t jobPrio(0); 00104 00105 if (a_iJobParams.jobType == jt_ADMIN || a_iJobParams.jobType == jt_BACKUP) { 00106 divisor = 10000; 00107 jobPartPriority = 1; 00108 } 00109 else { 00110 divisor = 100; 00111 jobPartPriority = m_SysState.GetPartitionPriority(a_iJobParams.partName); 00112 } 00113 00114 switch (a_iJobParams.jobType){ 00115 case (jt_ADMIN): 00116 jobTypePriority = m_SysState.GetAdminPriority(); 00117 break; 00118 case (jt_BACKUP): 00119 jobTypePriority = m_SysState.GetBackupPriority(); 00120 break; 00121 case (jt_RECALL): 00122 jobTypePriority = m_SysState.GetRecallPriority(a_iJobParams.partName); 00123 break; 00124 case (jt_MIGRATION): 00125 jobTypePriority = m_SysState.GetMigrationPriority(a_iJobParams.partName); 00126 break; 00127 case (jt_RECOVERY): 00128 jobTypePriority = m_SysState.GetRecoveryPriority(a_iJobParams.partName); 00129 break; 00130 case (jt_MAINT): 00131 case (jt_REORG): 00132 jobTypePriority = m_SysState.GetMaintPriority(a_iJobParams.partName); 00133 break; 00134 default: 00135 throw ivd_InternalError(ie_NOJOBTYPE, "calculating priority of job in queue w/o job type"); 00136 }; 00137 00138 Int64_t timeDiff(nowTime.GetTime_t() - startTime); 00139 jobPrio = jobTypePriority * jobPartPriority + 00140 phase * phaseFactor + 00141 (timeDiff * timeStep * jobTypePriority) / divisor; 00142 00143 log_DBG_m(dbg_DETAIL, 00144 "Job ID: " << a_iJobParams.jobID << 00145 ", type prio: " << jobTypePriority << 00146 ", part prio: " << jobPartPriority << 00147 ", phase: " << phase << 00148 ", phase fact: " << phaseFactor << 00149 ", start time: " << startTime << 00150 ", now: " << nowTime.GetTime_t() << 00151 ", time step: " << timeStep << 00152 ", divisor: " << divisor << 00153 ", modifier: " << modifier << 00154 " ==> prio: " << jobPrio); 00155 00156 if (jobPrio > LONG_MAX) { 00157 jobPrio = LONG_MAX; 00158 log_DBG_m(dbg_LOW, "Priority exceeded maximum. Setting to LONG_MAX."); 00159 } 00160 a_iJobParams.jobPriority = static_cast<Int32_t>(jobPrio); 00161 return a_iJobParams.jobPriority; 00162 } // rm_Queue::CalcPriority()


| void rm_Queue::Activate | ( | ) |
Definition at line 217 of file rm_queue.cpp.
References m_activateQueue_s, and cmn_Semaphore::Post().
Referenced by i_ResourceManager_i::CancelGetResource(), i_ResourceManager_i::GetRecallResources(), i_ResourceManager_i::GetResources(), i_ResourceManager_i::InventoryUpdate(), i_ResourceManager_i::MediumUnLoaded(), i_ResourceManager_i::ReleaseResource(), i_ResourceManager_i::SetPriorityModifier(), i_ResourceManager_i::UseNewResource(), and ~rm_Queue().
00217 { 00218 // activates queue executor 00219 m_activateQueue_s->Post(); 00220 }


| void rm_Queue::IncReleasedCount | ( | ) |
Definition at line 222 of file rm_queue.cpp.
References m_releasedResCount, and m_resCount_x.
Referenced by i_ResourceManager_i::ReleaseResource().
00222 { 00223 cmn_MutexLock l(m_resCount_x); 00224 ++m_releasedResCount; 00225 }

| UInt32_t rm_Queue::GetReleasedCount | ( | ) |
Definition at line 227 of file rm_queue.cpp.
References m_releasedResCount, and m_resCount_x.
Referenced by Allocate().
00227 { 00228 cmn_MutexLock l(m_resCount_x); 00229 UInt32_t count(m_releasedResCount); 00230 if (m_releasedResCount > 0) { 00231 --m_releasedResCount; 00232 } 00233 return count; 00234 }

Definition at line 739 of file rm_queue.cpp.
References dbg_LOW, rm_JobParams::jobID, log_DBG_m, log_FUNC_m, m_allocQueue_p, m_queue_x, rm_JobParams::priorityModifier, and Recalc().
Referenced by i_ResourceManager_i::SetPriorityModifier().
00739 { 00740 log_FUNC_m(ModifyPriority); 00741 00742 cmn_MutexLock l(m_queue_x); 00743 log_DBG_m(dbg_LOW,"New modifier for job: " << a_jobID << " = " << a_modifier); 00744 00745 rm_AllocQueue_it iter; 00746 for ( iter = m_allocQueue_p->begin(); iter != m_allocQueue_p->end(); iter++ ){ 00747 rm_JobParams& rmJP = iter->second; 00748 if (rmJP.jobID == a_jobID){ 00749 rmJP.priorityModifier = a_modifier; 00750 } 00751 }; 00752 Recalc(); 00753 }


Definition at line 755 of file rm_queue.cpp.
References dbg_DETAIL, rm_JobParams::jobID, log_DBG_m, log_FUNC_m, m_allocQueue_p, m_queue_x, rm_JobParams::phase, and Recalc().
Referenced by i_ResourceManager_i::SetPhase().
00755 { 00756 log_FUNC_m(SetPhase); 00757 00758 cmn_MutexLock l(m_queue_x); 00759 00760 log_DBG_m(dbg_DETAIL,"Setting phase of Job " << a_jobID << " to " << a_phase); 00761 rm_AllocQueue_it iter; 00762 for ( iter = m_allocQueue_p->begin(); iter != m_allocQueue_p->end(); iter++ ){ 00763 rm_JobParams& rmJP = iter->second; 00764 if (rmJP.jobID == a_jobID){ 00765 rmJP.phase = a_phase; 00766 } 00767 }; 00768 Recalc(); 00769 }


friend class rm_QueueExecutor [friend] |
rm_AllocQueue_t* rm_Queue::m_allocQueue_p [private] |
Definition at line 333 of file rm.h.
Referenced by Allocate(), Find(), GetAllJobs(), GetJob(), Insert(), ModifyPriority(), Print(), Process(), Recalc(), Remove(), rm_Queue(), SetPhase(), and ~rm_Queue().
Int32_t rm_Queue::m_timeStep [private] |
Int32_t rm_Queue::m_phaseFactor [private] |
i_ResourceManager_i* rm_Queue::m_iRM [private] |
Definition at line 363 of file rm.h.
Referenced by Allocate(), AllocateAdmin(), AllocateMaint(), AllocateMig(), AllocateRec(), AllocateRecovery(), AllocateReorg(), and Init().
bool rm_Queue::m_iRMvalid [private] |
rm_QueueExecutor* rm_Queue::m_queueExec [private] |
bool rm_Queue::m_disable [private] |
bool rm_Queue::m_queueActive [private] |
Definition at line 374 of file rm.h.
Referenced by Init(), ~rm_Queue(), and rm_QueueExecutor::~rm_QueueExecutor().
cmn_Mutex rm_Queue::m_queue_x [private] |
Definition at line 376 of file rm.h.
Referenced by Find(), GetAllJobs(), GetJob(), Insert(), ModifyPriority(), Remove(), rm_QueueExecutor::Run(), SetPhase(), and ~rm_Queue().
cmn_Semaphore* rm_Queue::m_activateQueue_s [private] |
Definition at line 377 of file rm.h.
Referenced by Activate(), rm_Queue(), rm_QueueExecutor::Run(), and ~rm_Queue().
cmn_Mutex rm_Queue::m_resCount_x [private] |
UInt32_t rm_Queue::m_releasedResCount [private] |
Definition at line 403 of file rm.h.
Referenced by Allocate(), AllocateAdmin(), AllocateMaint(), AllocateMig(), AllocateRec(), AllocateRecovery(), AllocateReorg(), CalcPriority(), i_ResourceManager_i::ExchangeResources(), i_ResourceManager_i::GetNewMigId(), i_ResourceManager_i::GetResources(), i_ResourceManager_i::i_ResourceManager_i(), i_ResourceManager_i::ReleaseResource(), i_ResourceManager_i::SetRMPartStatus(), and i_ResourceManager_i::UnRegisterPartition().
| time_t rm_Queue::m_lastReported |
1.5.6