Public Member Functions | Protected Member Functions | Private Member Functions | Private Attributes | Friends

df_BlockManager Class Reference
[IVD Data format.]

Class to handle a series of data blocks of specific size. More...

#include <df.h>

Collaboration diagram for df_BlockManager:
Collaboration graph
[legend]

List of all members.

Public Member Functions

 df_BlockManager (UInt32_t a_blockSize, UInt32_t a_blockCount)
virtual ~df_BlockManager ()
UInt64_t GetBlocksWritten ()
void WaitAllReaders ()
 Used by the writer.
void SetReaderError (ivd_Exception *a_err)
void SetWriterError (ivd_Exception *a_err)
ivd_ExceptionGetReaderError () const
ivd_ExceptionGetWriterError () const

Protected Member Functions

void WaitWriterToComplete ()
void WaitReadersToComplete ()
void RegisterWriter ()
 Blocks additional RegisterReader calls.
df_DataBlockGetFree ()
void Flush ()
void SetEndOfData ()
bool EndOfDataMarked () const
UInt32_t RegisterReader ()
df_DataBlockGetFull (UInt32_t a_readerIdx)
void Release (UInt32_t a_readerIdx)
void ReaderFinished (UInt32_t a_readerIdx, bool success=true)

Private Member Functions

void IncPos (UInt32_t &a_pos)
bool IsFull () const
 Used by the writer to check if it can write to the queue.
bool IsEmpty (UInt32_t a_readIdx) const
 Used by the readers to check if they can read from the queue.
df_DataBlockGetBlockAt (UInt32_t a_pos)
void CreateBuffer (UInt32_t a_blockSize, UInt32_t a_blockCount)
 Creates memory buffer and vector of df_DataBlock for internal use.

Private Attributes

 log_CLASSID_m
 Macro to add class name member s_className.
UInt64_t m_blocksWritten
UInt32_t m_blocksClosed
UInt32_t m_numReaders
UInt32_t m_numBlocks
vector< df_DataBlock * > m_blocks
vector< UInt8_tm_blockMemory
UInt32_t m_writeStatus
UInt32_t m_readStatus
bool m_startOfData
 Did the writer start?
bool m_endOfData
 Did the writer complete?
UInt32_t m_writePos
UInt32_t m_lastReadPos
 Position of the slowest reader.
vector< UInt32_tm_readPos
 Current positions of readers.
cmn_Mutex m_updown_x
cmn_Mutex m_block_x
cmn_Condition m_closed_c
cmn_Condition m_released_c
cmn_Condition m_shutdown_c
ivd_Exceptionm_readerError_p
ivd_Exceptionm_writerError_p

Friends

class df_MgrReader
class df_MgrWriter

Detailed Description

Class to handle a series of data blocks of specific size.

One writer and multiple readers are allowed.

Data is buffered in a series data blocks that form a circular list.

A list is full for the writer if the block index to use is just behind the index of the slowest reader.

A list is empty for a reader if the block index to use mathes the index of the writer.

Note that the list can be full for the writer and full for a slow reader but empty for a fast reader at the same time.

Author:
Matej Kenda, HERMES SoftLab
See also:
df_MgrWriter
df_MgrReader
df_DataBlock
df_Writer

Definition at line 144 of file df.h.


Constructor & Destructor Documentation

df_BlockManager::df_BlockManager ( UInt32_t  a_blockSize,
UInt32_t  a_blockCount 
)

Definition at line 201 of file df_blockmanager.cpp.

References CreateBuffer(), and log_FUNC_A_m.

    : m_blocksWritten(0),
      m_blocksClosed(0),
      m_numReaders(0),
      m_numBlocks(a_blockCount),
      m_startOfData(false),
      m_endOfData(false),
      m_writePos(0),
      m_lastReadPos(0),
      m_closed_c(&m_block_x),
      m_released_c(&m_block_x),
      m_shutdown_c(&m_updown_x),
      m_readerError_p(NULL),
      m_writerError_p(NULL) {

    log_FUNC_A_m(df_BlockManager,
        "a_blockSize: " << a_blockSize << " a_blockCount: " << a_blockCount );

    CreateBuffer(a_blockSize, a_blockCount);
}

Here is the call graph for this function:

df_BlockManager::~df_BlockManager (  )  [virtual]

Definition at line 234 of file df_blockmanager.cpp.

References dbg_LOW, dbg_NORM, log_DBG_m, log_FUNC_m, m_blocks, WaitReadersToComplete(), and WaitWriterToComplete().

                                  {
    log_FUNC_m(~df_BlockManager);

    log_DBG_m(dbg_LOW, "Block manager going down. ");

    try {

        WaitWriterToComplete();
        WaitReadersToComplete();

        log_DBG_m(dbg_NORM,
            "Deleting data blocks. Number of blocks " << m_blocks.size() );

        for (vector<df_DataBlock*>::iterator i = m_blocks.begin(); i != m_blocks.end(); i++) {
            delete (*i);
        }
    }
    catch (ivd_Exception &ie) {
        log_DBG_m(dbg_LOW,
            "Ignoring exception in DTOR." << endl <<
            "Error: " << ie);
    }
}

Here is the call graph for this function:


Member Function Documentation

void df_BlockManager::CreateBuffer ( UInt32_t  a_blockSize,
UInt32_t  a_blockCount 
) [private]

Creates memory buffer and vector of df_DataBlock for internal use.

Definition at line 335 of file df_blockmanager.cpp.

References cmn_Num2Str(), ie_INVALID_ARG, ivd_Error, ivd_MAX_BLOCK_SIZE, ivd_MIN_BLOCK_SIZE, log_FUNC_m, m_blockMemory, and m_blocks.

Referenced by df_BlockManager().

                                                                              {
    log_FUNC_m(CreateBuffer);

    if (a_blockSize < ivd_MIN_BLOCK_SIZE) {
        throw ivd_Error(ie_INVALID_ARG,
            "Block size too small: " + cmn_Num2Str(a_blockSize) , true);
    }

    if (a_blockSize > ivd_MAX_BLOCK_SIZE) {
        throw ivd_Error(ie_INVALID_ARG,
            "Block size too large: " + cmn_Num2Str(a_blockSize) , true);
    }

    m_blockMemory.resize(a_blockSize * a_blockCount, 0);
    memset(&(m_blockMemory[0]), 0, a_blockSize * a_blockCount);

    m_blocks.clear();

    for (UInt32_t i = 0; i < a_blockCount; i++) {
        m_blocks.push_back(
            new df_DataBlock(&(m_blockMemory[0]) + (a_blockSize*i), a_blockSize) );
    };

}

Here is the call graph for this function:

Here is the caller graph for this function:

bool df_BlockManager::EndOfDataMarked (  )  const [inline, protected]

Definition at line 224 of file df.h.

Referenced by ReaderFinished().

{ return m_endOfData; };

Here is the caller graph for this function:

void df_BlockManager::Flush (  )  [protected]

Definition at line 412 of file df_blockmanager.cpp.

References cmn_Condition::Broadcast(), df_DataBlock::Close(), dbg_DETAIL, GetBlockAt(), GetReaderError(), ie_DF_EOD, ie_DF_INVSTATE, IncPos(), ivd_Error, log_DBG_INT_m, log_FUNC_INT_m, log_FUNC_m, m_block_x, m_blocksClosed, m_blocksWritten, m_closed_c, m_endOfData, m_numBlocks, m_numReaders, m_writePos, and NULL.

Referenced by df_MgrWriter::Flush().

                            {
    log_FUNC_INT_m(Flush);

    cmn_MutexLock l(m_block_x);

    if ( GetReaderError() != NULL) {
        log_FUNC_m(Flush);
        df_RETHROW_IN_WRITER;
    }

    log_DBG_INT_m(dbg_DETAIL, "Flush " << m_writePos);

    if (m_blocksClosed >= m_numBlocks) {
        throw ivd_InternalError(
            ie_DF_INVSTATE, "Writer: All blocks already FULL on Flush.");
    }

    if (m_endOfData) {
        log_FUNC_m(Flush);
        throw ivd_Error(ie_DF_EOD, "End of data already set.", true);
    }
    df_DataBlock *curBlk = GetBlockAt(m_writePos);

    IncPos(m_writePos);
    curBlk->Close(m_numReaders);

    m_blocksWritten++;
    m_blocksClosed++;
    m_closed_c.Broadcast();
}

Here is the call graph for this function:

Here is the caller graph for this function:

df_DataBlock * df_BlockManager::GetBlockAt ( UInt32_t  a_pos  )  [private]

Definition at line 315 of file df_blockmanager.cpp.

References ie_DF_INVSTATE, ie_INVALID_ARG, log_FUNC_m, and m_blocks.

Referenced by Flush(), GetFree(), GetFull(), and Release().

                                                        {
    if (a_pos >= m_blocks.size()) {
        log_FUNC_m(GetBlockAt);
        if (m_blocks.size() > 0) {
            log_MARKLINE_m;
            throw ivd_InternalError(
                ie_INVALID_ARG, "a_pos is out of range.");
        }
        else {
            log_MARKLINE_m;
            throw ivd_InternalError(
                ie_DF_INVSTATE, "No data blocks defined.");
        }
    }
    return m_blocks[a_pos];
}

Here is the caller graph for this function:

UInt64_t df_BlockManager::GetBlocksWritten (  )  [inline]

Definition at line 150 of file df.h.

Referenced by blk_NetWriter::Run(), and blk_BufferWriter::Run().

{ return m_blocksWritten; };

Here is the caller graph for this function:

df_DataBlock * df_BlockManager::GetFree (  )  [protected]

Definition at line 369 of file df_blockmanager.cpp.

References dbg_DETAIL, dbg_LOW, GetBlockAt(), GetReaderError(), cmn_Thread::GetTime(), ie_DF_BROKEN_PIPE, ie_DF_EOD, IsFull(), ivd_Error, log_DBG_INT_m, log_DBG_m, log_ERR_m, log_FUNC_INT_m, log_FUNC_m, m_block_x, m_endOfData, m_numReaders, m_released_c, m_writePos, NULL, and cmn_Condition::TimedWait().

Referenced by df_MgrWriter::GetFree().

                                       {
    log_FUNC_INT_m(GetFree);

    cmn_MutexLock l(m_block_x);

    if ( GetReaderError() != NULL) {
        log_FUNC_m(GetFree);
        df_RETHROW_IN_WRITER;
    }

    if (m_endOfData) {
        log_FUNC_m(GetFree);
        log_ERR_m(
            "CHECK THIS! Writer: Requiring block after setting EOD. " <<
            "Most probably an internal error.");
        log_DBG_m(dbg_LOW, "Writer: Already end of data.");
        throw ivd_Error(ie_DF_EOD,  "End of data already set.");
    }

    log_DBG_INT_m(dbg_DETAIL, " Requiring FREE " << m_writePos);

    while ( IsFull() && m_numReaders > 0 ) {
        m_released_c.TimedWait(cmn_Thread::GetTime(1));
    }

    if ( GetReaderError() != NULL) {
        log_FUNC_m(GetFree);
        df_RETHROW_IN_WRITER;
    }

    if (m_numReaders < 1) {
        log_FUNC_m(GetFree);
        log_DBG_m(dbg_LOW, "Writer: No readers left.");

        throw ivd_Error(ie_DF_BROKEN_PIPE,
            "All readers down.", string("m_numReaders < 1"));
    }

    log_DBG_INT_m(dbg_DETAIL, "Got FREE " << m_writePos);

    return GetBlockAt(m_writePos);
}

Here is the call graph for this function:

Here is the caller graph for this function:

df_DataBlock * df_BlockManager::GetFull ( UInt32_t  a_readerIdx  )  [protected]

Definition at line 474 of file df_blockmanager.cpp.

References cmn_Num2Str(), dbg_DETAIL, dbg_LOW, GetBlockAt(), cmn_Thread::GetTime(), GetWriterError(), IsEmpty(), log_DBG_INT_m, log_DBG_m, log_FUNC_INT_m, log_FUNC_m, m_block_x, m_closed_c, m_endOfData, m_readPos, m_writePos, NULL, and cmn_Condition::TimedWait().

Referenced by df_MgrReader::GetFull().

                                                           {
    log_FUNC_INT_m(GetFull);

    cmn_MutexLock l(m_block_x);

    if ( GetWriterError() != NULL) {
        log_FUNC_m(GetFull);
        df_RETHROW_IN_READER;
    }

    UInt32_t blockIndex = m_readPos[a_readerIdx];

    log_DBG_INT_m(dbg_DETAIL, "Requiring FULL " << blockIndex);

    // Wait for the block to be filled.
    //
    while( IsEmpty(blockIndex) ) {

        if ( GetWriterError() != NULL) {
            log_FUNC_m(GetFull);
            df_RETHROW_IN_READER;
        }

        if (IsEmpty(blockIndex) && m_endOfData) {
            log_FUNC_m(GetFull);
            log_DBG_m(dbg_LOW,
                "Reader: Empty buffer and EOD. (" <<
                cmn_Num2Str(a_readerIdx) <<  ")." );

            return NULL;
        }
        m_closed_c.TimedWait(cmn_Thread::GetTime(1));
    }

    log_DBG_INT_m(dbg_DETAIL,
        a_readerIdx << " Got FULL " << blockIndex << " " << m_writePos);

    df_DataBlock *readBlock = GetBlockAt(blockIndex);

    return readBlock;
}

Here is the call graph for this function:

Here is the caller graph for this function:

ivd_Exception* df_BlockManager::GetReaderError (  )  const [inline]

Definition at line 158 of file df.h.

Referenced by Flush(), GetFree(), and WaitAllReaders().

{ return m_readerError_p; };

Here is the caller graph for this function:

ivd_Exception* df_BlockManager::GetWriterError (  )  const [inline]

Definition at line 159 of file df.h.

Referenced by GetFull(), and Release().

{ return m_writerError_p; };

Here is the caller graph for this function:

void df_BlockManager::IncPos ( UInt32_t a_pos  )  [inline, private]

Definition at line 196 of file df.h.

Referenced by Flush(), and Release().

                                 {
        a_pos = (a_pos + 1) % m_numBlocks;
    };

Here is the caller graph for this function:

bool df_BlockManager::IsEmpty ( UInt32_t  a_readIdx  )  const [private]

Used by the readers to check if they can read from the queue.

Definition at line 311 of file df_blockmanager.cpp.

References m_writePos.

Referenced by GetFull().

                                                      {
    return (m_writePos == a_readIdx);
}

Here is the caller graph for this function:

bool df_BlockManager::IsFull (  )  const [private]

Used by the writer to check if it can write to the queue.

Definition at line 307 of file df_blockmanager.cpp.

References m_lastReadPos, m_numBlocks, and m_writePos.

Referenced by GetFree().

                                   {
    return ( ((m_writePos + 1)%m_numBlocks) == m_lastReadPos );
}

Here is the caller graph for this function:

void df_BlockManager::ReaderFinished ( UInt32_t  a_readerIdx,
bool  success = true 
) [protected]

Definition at line 555 of file df_blockmanager.cpp.

References cmn_Condition::Broadcast(), dbg_LOW, EndOfDataMarked(), log_DBG_m, log_FUNC_m, m_numReaders, m_shutdown_c, and m_updown_x.

Referenced by df_MgrReader::~df_MgrReader().

                                                                       {
    log_FUNC_m(ReaderFinished);

    cmn_MutexLock l(m_updown_x);
    m_numReaders--;
    if (m_numReaders < 1) {
        if (!EndOfDataMarked()) {
            log_DBG_m(dbg_LOW,
                "All readers finished. Writer will have to go down.");
        }
        else {
            log_DBG_m(dbg_LOW,
                "All readers finished after end of data.");
        }
    }
    m_shutdown_c.Broadcast();
}

Here is the call graph for this function:

Here is the caller graph for this function:

UInt32_t df_BlockManager::RegisterReader (  )  [protected]
Returns:
reader index.

Definition at line 459 of file df_blockmanager.cpp.

References ie_DF_WRITER_UP, ivd_Error, log_FUNC_m, m_numReaders, m_readPos, m_startOfData, and m_updown_x.

Referenced by df_MgrReader::df_MgrReader().

                                         {
    log_FUNC_m(RegisterReader);

    cmn_MutexLock l(m_updown_x);
    if (m_startOfData) {
        // No more registration of readers allowed after writer is registered.
        throw ivd_Error(ie_DF_WRITER_UP, "", true);
    }
    m_numReaders++;
    m_readPos.push_back(0);

    return (m_readPos.size() - 1);
}

Here is the caller graph for this function:

void df_BlockManager::RegisterWriter (  )  [protected]

Blocks additional RegisterReader calls.

Definition at line 362 of file df_blockmanager.cpp.

References log_FUNC_m, m_startOfData, and m_updown_x.

Referenced by df_MgrWriter::df_MgrWriter().

Here is the caller graph for this function:

void df_BlockManager::Release ( UInt32_t  a_readerIdx  )  [protected]

Definition at line 516 of file df_blockmanager.cpp.

References cmn_Condition::Broadcast(), dbg_DETAIL, GetBlockAt(), GetWriterError(), ie_DF_INVSTATE, IncPos(), df_DataBlock::IsClosed(), log_DBG_INT_m, log_FUNC_INT_m, log_FUNC_m, m_block_x, m_blocksClosed, m_lastReadPos, m_readPos, m_released_c, NULL, and df_DataBlock::Release().

Referenced by df_MgrReader::Release().

                                                  {
    log_FUNC_INT_m(Release);

    cmn_MutexLock l(m_block_x);

    if ( GetWriterError() != NULL) {
        log_FUNC_m(Release);
        df_RETHROW_IN_READER;
    }

    if (m_blocksClosed < 1) {
        log_FUNC_m(Release);
        throw ivd_InternalError(
            ie_DF_INVSTATE, "Reader: All blocks already EMPTY on Release.");
    }
    df_DataBlock *readBlock = GetBlockAt(m_readPos[a_readerIdx]);
    readBlock->Release();

    log_DBG_INT_m(dbg_DETAIL, "Reader: Release " << m_readPos[a_readerIdx]);

    // Next block index for this reader
    UInt32_t nextBlockIndex = m_readPos[a_readerIdx];
    IncPos(nextBlockIndex);

    if (!readBlock->IsClosed()) {
        // Last Release() un-closes the block and can be used again.
        // Inform the writer that it can write again.

        log_DBG_INT_m(dbg_DETAIL, "Reader: Re-Open " << m_readPos[a_readerIdx]);

        // The next position of the slower reader (the tail of the queue).
        m_lastReadPos = nextBlockIndex;

        m_blocksClosed--;
        m_released_c.Broadcast();
    }
    m_readPos[a_readerIdx] = nextBlockIndex;
}

Here is the call graph for this function:

Here is the caller graph for this function:

void df_BlockManager::SetEndOfData (  )  [protected]

Definition at line 443 of file df_blockmanager.cpp.

References cmn_Condition::Broadcast(), dbg_LOW, log_DBG_m, log_FUNC_m, m_block_x, m_closed_c, m_endOfData, m_shutdown_c, and m_updown_x.

Referenced by df_MgrWriter::~df_MgrWriter().

                                   {
    log_FUNC_m(SetEndOfData);

    cmn_MutexLock l(m_updown_x);
    log_DBG_m(dbg_LOW, "Writer: Setting EOD.");
    m_endOfData = true;
    {
        cmn_MutexLock l(m_block_x);
        log_DBG_m(dbg_LOW, "Writer: Waking up threads waiting for block full.");
        m_closed_c.Broadcast();
    }
    m_shutdown_c.Broadcast();
}

Here is the call graph for this function:

Here is the caller graph for this function:

void df_BlockManager::SetReaderError ( ivd_Exception a_err  )  [inline]

Definition at line 155 of file df.h.

{ m_readerError_p = a_err; };

void df_BlockManager::SetWriterError ( ivd_Exception a_err  )  [inline]

Definition at line 156 of file df.h.

{ m_writerError_p = a_err; };

void df_BlockManager::WaitAllReaders (  ) 

Used by the writer.

This function can be used by the writer of the block manager to wait for the readers to finish and get theit status code.

Definition at line 226 of file df_blockmanager.cpp.

References GetReaderError(), log_FUNC_m, NULL, and WaitReadersToComplete().

                                     {
    log_FUNC_m(WaitAllReaders);
    WaitReadersToComplete();
    if ( GetReaderError() != NULL) {
        df_RETHROW_IN_WRITER;
    }
}

Here is the call graph for this function:

void df_BlockManager::WaitReadersToComplete (  )  [protected]

Definition at line 286 of file df_blockmanager.cpp.

References dbg_DETAIL, cmn_Thread::GetTime(), log_DBG_m, log_FUNC_m, log_WRN_m, m_blocksClosed, m_endOfData, m_numReaders, m_shutdown_c, m_updown_x, and cmn_Condition::TimedWait().

Referenced by WaitAllReaders(), and ~df_BlockManager().

                                            {
    log_FUNC_m(WaitReadersToComplete);

    cmn_MutexLock l(m_updown_x);

    if (m_numReaders > 0 && !m_endOfData) {
        log_WRN_m("Writer up and numReaders > 0. Deadlock possible.");
    }

    while (m_numReaders > 0) {
        log_DBG_m(dbg_DETAIL,
             "Readers left: " << m_numReaders <<
             " blocks closed: " << m_blocksClosed);

        m_shutdown_c.TimedWait(cmn_Thread::GetTime(1));
    }

    log_DBG_m(dbg_DETAIL,
        "Done waiting. Readers left: " << m_numReaders);
}

Here is the call graph for this function:

Here is the caller graph for this function:

void df_BlockManager::WaitWriterToComplete (  )  [protected]

Definition at line 258 of file df_blockmanager.cpp.

References dbg_DETAIL, dbg_NORM, cmn_Thread::GetTime(), log_DBG_m, log_FUNC_m, log_WRN_m, m_endOfData, m_numReaders, m_shutdown_c, m_startOfData, m_updown_x, and cmn_Condition::TimedWait().

Referenced by ~df_BlockManager().

                                           {
    log_FUNC_m(WaitWriterToComplete);

    cmn_MutexLock l(m_updown_x);

    if (!m_startOfData) {
        log_DBG_m(dbg_NORM,
            "Writer not yet up in Block Manager shutdown. " <<
            "Most probably ctor failed.");
        return;
    }

    if (!m_endOfData && m_numReaders > 0) {
        log_WRN_m("Writer up and numReaders > 0. Deadlock possible.");
    }

    while (!m_endOfData) {
        log_DBG_m(dbg_DETAIL,
            "Waiting for EOD from writer. Readers left: " << m_numReaders);

        m_shutdown_c.TimedWait(cmn_Thread::GetTime(1));
    };

    log_DBG_m(dbg_DETAIL,
        "Done waiting for writer. Readers left: " << m_numReaders);
}

Here is the call graph for this function:

Here is the caller graph for this function:


Friends And Related Function Documentation

friend class df_MgrReader [friend]

Definition at line 234 of file df.h.

friend class df_MgrWriter [friend]

Definition at line 235 of file df.h.


Member Data Documentation

Macro to add class name member s_className.

Definition at line 159 of file df.h.

Definition at line 188 of file df.h.

Referenced by Flush(), GetFree(), GetFull(), Release(), and SetEndOfData().

Definition at line 172 of file df.h.

Referenced by CreateBuffer().

Definition at line 171 of file df.h.

Referenced by CreateBuffer(), GetBlockAt(), and ~df_BlockManager().

Definition at line 166 of file df.h.

Referenced by Flush(), Release(), and WaitReadersToComplete().

Definition at line 165 of file df.h.

Referenced by Flush().

Definition at line 189 of file df.h.

Referenced by Flush(), GetFull(), and SetEndOfData().

Did the writer complete?

Definition at line 180 of file df.h.

Referenced by Flush(), GetFree(), GetFull(), SetEndOfData(), WaitReadersToComplete(), and WaitWriterToComplete().

Position of the slowest reader.

Definition at line 183 of file df.h.

Referenced by IsFull(), and Release().

Definition at line 169 of file df.h.

Referenced by Flush(), and IsFull().

Definition at line 193 of file df.h.

Current positions of readers.

Definition at line 185 of file df.h.

Referenced by GetFull(), RegisterReader(), and Release().

Definition at line 175 of file df.h.

Definition at line 190 of file df.h.

Referenced by GetFree(), and Release().

Definition at line 191 of file df.h.

Referenced by ReaderFinished(), SetEndOfData(), WaitReadersToComplete(), and WaitWriterToComplete().

Did the writer start?

Definition at line 178 of file df.h.

Referenced by RegisterReader(), RegisterWriter(), and WaitWriterToComplete().

Definition at line 181 of file df.h.

Referenced by Flush(), GetFree(), GetFull(), IsEmpty(), and IsFull().

Definition at line 194 of file df.h.

Definition at line 174 of file df.h.


The documentation for this class was generated from the following files: