Added support for paging in MongoDB Logstream source.

Also minor changes in uID handling were/are needed to support MongoDB ID's
This commit is contained in:
Andre Lorbach 2012-03-30 16:18:36 +02:00
parent 2250e27e3f
commit 020d27cd15
3 changed files with 95 additions and 75 deletions

View File

@ -66,6 +66,8 @@ class LogStreamMongoDB extends LogStream {
private $_myMongoCon = null;
private $_myMongoDB = null;
private $_myMongoCollection = null;
private $_myMongoFields = null;
private $_myMongoQuery = null;
// Constructor
public function LogStreamMongoDB($streamConfigObj) {
@ -101,8 +103,13 @@ class LogStreamMongoDB extends LogStream {
if ( !isset($dbmapping[ $this->_logStreamConfigObj->DBTableType ]) )
return ERROR_DB_INVALIDDBMAPPING;
// Create SQL Where Clause first!
$res = $this->CreateSQLWhereClause();
// Create Needed Fields Array first!
$res = $this->CreateFieldsArray();
if ( $res != SUCCESS )
return $res;
// Create Filters for first time!
$res = $this->CreateQueryArray(UID_UNKNOWN);
if ( $res != SUCCESS )
return $res;
@ -621,7 +628,17 @@ TODO
if ( isset($this->bufferedRecords[$this->_currentRecordNum][$dbfieldname]) )
{
if ( isset($fields[$property]['FieldType']) && $fields[$property]['FieldType'] == FILTER_TYPE_DATE ) // Handle as date!
$arrProperitesOut[$property] = GetEventTime( $this->bufferedRecords[$this->_currentRecordNum][$dbfieldname] );
{
$myDateField = $this->bufferedRecords[$this->_currentRecordNum][$dbfieldname];
if ( gettype($myDateField) == "object" && get_class($myDateField) == "MongoDate" )
{
$arrProperitesOut[$property][EVTIME_TIMESTAMP] = $myDateField->sec;
$arrProperitesOut[$property][EVTIME_TIMEZONE] = date('O'); // Get default Offset
$arrProperitesOut[$property][EVTIME_MICROSECONDS] = $myDateField->usec;
}
else // Try to parse Date!
$arrProperitesOut[$property] = GetEventTime( $myDateField );
}
else
$arrProperitesOut[$property] = $this->bufferedRecords[$this->_currentRecordNum][$dbfieldname];
}
@ -1407,6 +1424,66 @@ TODO!!!
* ============= Beginn of private functions =============
*/
/*
* Helper function to create the Field Array
*/
private function CreateFieldsArray()
{
global $dbmapping;
$szTableType = $this->_logStreamConfigObj->DBTableType;
// Init Array
$this->_myMongoFields = array();
// Init Fields Array
foreach ( $this->_arrProperties as $property )
{
// Check if mapping exists
if ( isset($dbmapping[$szTableType]['DBMAPPINGS'][$property]) )
{
$this->_myMongoFields[ $dbmapping[$szTableType]['DBMAPPINGS'][$property] ] = true;
}
}
// Success
return SUCCESS;
}
/*
* Helper function to create the Query Array
*/
private function CreateQueryArray($uID)
{
global $dbmapping;
$szTableType = $this->_logStreamConfigObj->DBTableType;
// Init Array
$this->_myMongoQuery = array();
if ( $this->_filters != null )
{
// Init Fields Array
foreach ( $this->_arrProperties as $property )
{
// Check if mapping exists
if ( isset($dbmapping[$szTableType]['DBMAPPINGS'][$property]) )
{
$this->_myMongoQuery[ $dbmapping[$szTableType]['DBMAPPINGS'][$property] ] = true;
}
}
}
if ( $uID != UID_UNKNOWN )
{
// Add uID Filter as well!
$myMongoID = new MongoId( base_convert($uID, 10, 16) );
$this->_myMongoQuery[ $dbmapping[$szTableType]['DBMAPPINGS'][SYSLOG_UID] ] = array( '$lt' => $myMongoID );
}
// Success
return SUCCESS;
}
/*
* This function expects the filters to already being set earlier.
* Otherwise no usual WHERE Clause can be created!
@ -1656,15 +1733,16 @@ TODO!!!
$szTableType = $this->_logStreamConfigObj->DBTableType;
// return error if there was one!
if ( ($res = $this->CreateMainSQLQuery($uID)) != SUCCESS )
if ( ($res = $this->CreateQueryArray($uID)) != SUCCESS )
return $res;
// Append LIMIT clause
// $szSql .= " LIMIT " . $this->_currentRecordStart . ", " . $this->_logStreamConfigObj->RecordsPerQuery;
$myCursor = $this->_myMongoCollection->find(); // ->limit(10); // $collection->find();
$myCursor = $myCursor->sort(array("_id" => -1));
$myCursor = $this->_myMongoCollection->find($this->_myMongoQuery, $this->_myMongoFields); // ->limit(10); // $collection->find();
OutputDebugMessage("Cursor verbose: " . var_export($myCursor->explain(), true), DEBUG_DEBUG);
$myCursor = $myCursor->sort(array("_id" => -1));
// Copy rows into the buffer!
$iBegin = $this->_currentRecordNum;
@ -1674,8 +1752,8 @@ TODO!!!
if ( $myRow === FALSE || !$myRow )
break;
$myRow[ "_id" ] = $myRow["_id"]->{'$id'};
$myRow[ "_id" ] = base_convert($myRow[ "_id" ], 16, 10); // ($myRow["_id"]->{'$id'});
// $myRow[ "_id" ] = $myRow["_id"]->getInc(); // Use Inc value for now, easier to read!
// Keys will be converted into lowercase!
$this->bufferedRecords[$iBegin] = array_change_key_case( $myRow, CASE_LOWER);
$iBegin++;
@ -1702,70 +1780,6 @@ TODO!!!
return SUCCESS;
}
/*
* Create the SQL QUery!
*/
private function CreateMainSQLQuery($uID)
{
/*
global $querycount;
// Get SQL Statement
$szSql = $this->CreateSQLStatement($uID);
// --- Append LIMIT
$szSql .= " LIMIT " . $this->_logStreamConfigObj->RecordsPerQuery;
// ---
// Perform Database Query
$this->_myDBQuery = mysql_query($szSql, $this->_dbhandle);
if ( !$this->_myDBQuery )
{
// Check if a field is missing!
if ( mysql_errno() == 1054 )
{
// Handle missing field and try again!
if ( $this->HandleMissingField() == SUCCESS )
{
$this->_myDBQuery = mysql_query($szSql, $this->_dbhandle);
if ( !$this->_myDBQuery )
{
$this->PrintDebugError("Invalid SQL: ".$szSql);
return ERROR_DB_QUERYFAILED;
}
}
else // Failed to add field dynamically
return ERROR_DB_QUERYFAILED;
}
else
{
$this->PrintDebugError("Invalid SQL: ".$szSql);
return ERROR_DB_QUERYFAILED;
}
}
else
{
// Skip one entry in this case
if ( $this->_currentRecordStart > 0 )
{
// Throw away
$myRow = mysql_fetch_array($this->_myDBQuery, MYSQL_ASSOC);
}
}
// Increment for the Footer Stats
$querycount++;
// Output Debug Informations
OutputDebugMessage("LogStreamDB|CreateMainSQLQuery: Created SQL Query:<br>" . $szSql, DEBUG_DEBUG);
// return success state if reached this point!
return SUCCESS;
*/
return SUCCESS;
}
/*
* Creates the SQL Statement we are going to use!
*/

View File

@ -1283,7 +1283,7 @@ function GetEventTime($szTimStr)
$eventtime[EVTIME_TIMEZONE] = $out[7] . $out[8] . $out[9];
$eventtime[EVTIME_MICROSECONDS] = 0;
}
// Sample: 2008-04-02T11:12:32.380449+02:00
// Sample: 2008-04-02T11:12:32.380449+02:00
else if ( preg_match("/([0-9]{4,4})-([0-9]{1,2})-([0-9]{1,2})T([0-9]{1,2}):([0-9]{1,2}):([0-9]{1,2})\.([0-9]{1,6})([+-])([0-9]{1,2}):([0-9]{1,2})/", $szTimStr, $out ) )
{
// RFC 3164 typical timestamp

View File

@ -59,7 +59,13 @@ $content['EXTRA_STYLESHEET'] = '<link rel="stylesheet" href="css/highlight.css"
// --- CONTENT Vars
if ( isset($_GET['uid']) )
$content['uid_current'] = intval($_GET['uid']);
{
// Now check by numeric as uid can be larger than INT values
if ( is_numeric($_GET['uid']) )
$content['uid_current'] = $_GET['uid'];
else
$content['uid_current'] = UID_UNKNOWN;
}
else
$content['uid_current'] = UID_UNKNOWN;