Commit 8407e46f authored by cvs2svn's avatar cvs2svn
Browse files

This commit was manufactured by cvs2svn to create tag 'COOL_2_8_5-pre2'.

git-svn-id: file:///git/lcgcool.svndb/cool/tags/COOL_2_8_5-pre2@16495 4525493e-7705-40b1-a816-d608a930855b
parent f725db90
<use name=CoolApplication>
#============================================================================
# $ Id: requirements,v 1.4 2005/08/24 17:31:07 marcocle Exp $
#============================================================================
package CoolMiniClient
#============================================================================
#============================================================================
# Public dependencies
#============================================================================
use CoolKernel v*
use CoolApplication v*
include_path none
#============================================================================
private
#============================================================================
#============================================================================
# Build rules
#============================================================================
application coolMiniClient ../*.cpp
# Link the CORAL relational libraries
apply_tag NEEDS_CORAL_RELATIONAL_ACCESS
# Fake target for tests
action tests "echo No tests in this package"
macro_remove cmt_actions_constituents "tests"
# Fake target for examples
action examples "echo No examples in this package"
macro_remove cmt_actions_constituents "examples"
# Name of the conditions client program
# this can be any executable in the path that conforms to the
# following command set (examples for a client 'myclient'):
# 1) myclient recreate
# 2) myclient write string 5 100
# 3) myclient read string 7 10 20
#
# 1) sets up the database in a clean state
# 2) writes '100' objects of type 'string' in folder '5'
# 3) read objects of type 'string' from IOV '10' to '20' in folder '7'
# Folders are designated by ids. The client is expected to map this to
# some real folder name, e.g. 'string_5', 'float_12'
clientCommand = "coolMiniClient"
# Number of concurrent clients
numberOfWorkers = 3
# Total number of jobs to submit (read and write together)
numberOfJobs = 6
# Timeout between job submission. A timeout is advisable to spread out
# the job submission over the total running time and prevent spawning
# lots of write job at the beginning (where read jobs can't be submitted
# because there's no data yet)
submitTimeout = 1 # seconds
# Specify the payload types to be used. (Only those specified need to be
# included in the further settings.)
availableTypes = [ 'string', 'float', 'mixed' ]
# The maximum number of folders for each payload type
numberOfFolders = { 'string' : 10, 'float' : 10, 'mixed' : 10 }
# The number of objects to store in each folder
objectsPerFolder = { 'string' : 50, 'float' : 100, 'mixed' : 50 }
# The read/write job ratio
# Only every nth job will be a write job, unless no read jobs can be
# spawned (due to lack of data).
readWriteRatio = 50
# The minimum and maximum batch size of read jobs
# The batch size will be set with a uniform random between min and max.
readMinBatchSize = { 'string' : 10, 'float' : 50, 'mixed' : 10 }
readMaxBatchSize = { 'string' : 50, 'float' : 100, 'mixed' : 50 }
# The bytes per object
# Set this for reporting puposes: the client output is parsed to obtain
# the number of written/read objects. Setting the variables below allows
# the script to determine the read/write byte counts.
bytesPerObject = { 'string' : 5000, 'float' : 80, 'mixed' : 330 }
/*
* @file coolMiniClient.cpp
*
* @author Sven A. Schmidt
*
*/
#include "CoolApplication/Application.h"
#include "CoolKernel/IObject.h"
#include "CoolKernel/IObjectIterator.h"
#include "CoolKernel/IFolder.h"
#include "CoolKernel/IDatabase.h"
#include "CoolKernel/IDatabaseSvc.h"
#include "CoolKernel/Exception.h"
#include "CoolKernel/Record.h"
#include "CoolKernel/RecordSpecification.h"
#include "RelationalAccess/ConnectionService.h"
#include "RelationalAccess/IConnectionServiceConfiguration.h"
using cool::Record;
using cool::RecordSpecification;
// system/stl headers
#include <iostream>
#include <string>
#include <stdexcept>
using namespace std;
const char* COOLTESTDB = "COOLTESTDB";
#define RES cout << "CMC "
/// Holds the parsed command line arguments
struct Arguments {
string command;
string payloadType;
int intArg_0;
int intArg_1;
int intArg_2;
};
/// Prints the command line usage info
string usage() {
return
"usage:\n"
"coolMiniClient <command> <payloadType> <nFolders> <objectsPerFolder>";
}
/// Parses the command line arguments from argv into an Arguments struct
Arguments parseArgs( int argc, char *argv[] ) {
Arguments args;
if ( argc < 2 )
throw runtime_error( "bad usage: not enough arguments" );
args.command = argv[1];
if ( args.command == "recreateDb" ) return args;
if ( argc < 5 )
throw runtime_error( "bad usage: not enough arguments" );
args.payloadType = argv[2];
args.intArg_0 = atoi( argv[3] );
args.intArg_1 = atoi( argv[4] );
if ( args.command == "write" ) return args;
if ( argc != 6 )
throw runtime_error( "bad usage: not enough arguments" );
args.intArg_2 = atoi( argv[5] );
return args;
}
/// Creates a uniform payload specification for nFields fields
RecordSpecification uniformSpec( int nFields, cool::StorageType::TypeId type ) {
RecordSpecification spec;
for ( int i = 0; i < nFields; ++i ) {
stringstream s;
s << "X" << i;
spec.extend( s.str(), type );
}
return spec;
}
/// Creates a mixed payload specification for nFields fields
RecordSpecification mixedSpec( int nFields ) {
RecordSpecification spec;
for ( int i = 0; i < nFields; ++i ) {
stringstream s;
s << "X" << i;
spec.extend( s.str(), cool::StorageType::Float );
}
for ( int i = 0; i < nFields; ++i ) {
stringstream s;
s << "X" << nFields + i;
spec.extend( s.str(), cool::StorageType::Int32 );
}
for ( int i = 0; i < 1; ++i ) {
stringstream s;
s << "X" << 2*nFields + i;
spec.extend( s.str(), cool::StorageType::String4k );
}
return spec;
}
/// Creates a dummy float Record for a given index
Record floatPayload( int index, const RecordSpecification & spec ) {
int nFields = spec.size();
Record payload(spec);
for ( int i = 0; i < nFields; ++i ) {
stringstream s;
s << "X" << i;
payload[ s.str() ].setValue<float>( (float)index / 1000 );
}
return payload;
}
/// Creates a dummy string Record for a given index
Record stringPayload( int index, const RecordSpecification & spec ) {
int byteSize = 250;
stringstream payloadString;
for ( int i = 0; i < byteSize; ++i ) payloadString << index % 10;
int nFields = spec.size();
Record payload(spec);
for ( int i = 0; i < nFields; ++i ) {
stringstream s;
s << "X" << i;
payload[ s.str() ].setValue<string>( payloadString.str() );
}
return payload;
}
/// Creates a dummy 'mixed' Record for a given index
Record mixedPayload( int index, const RecordSpecification & spec ) {
int byteSize = 250; // changed to one 250 byte string instead of 10 50b,
// because of ODBC/MySQL crash and a limit of 256bytes
// returned
stringstream payloadString;
for ( int i = 0; i < byteSize; ++i ) payloadString << index % 10;
//int nFields = spec.size() / 3; // nFields float, int, and string each
int nFields = ( spec.size() - 1 ) / 2; // nFields float, int + 1 string
Record payload(spec);
for ( int i = 0; i < nFields; ++i ) {
stringstream s;
s << "X" << i;
payload[ s.str() ].setValue<float>( (float)index / 1000 );
}
for ( int i = 0; i < nFields; ++i ) {
stringstream s;
s << "X" << nFields + i;
payload[ s.str() ].setValue<int>( index );
}
for ( int i = 0; i < 1; ++i ) {
stringstream s;
s << "X" << 2*nFields + i;
payload[ s.str() ].setValue<string>( payloadString.str() );
}
return payload;
}
/// Creates a payload object for a given type and spec
/// The index is the 'key' to the generated data. This method will always
/// return the same data for the same key, so that data can be checked
/// on read back with the key alone.
Record payload( long index, const RecordSpecification & spec,
const string & payloadType ) {
if ( payloadType == "float" ) {
return floatPayload( index, spec );
} else if ( payloadType == "string" ) {
return stringPayload( index, spec );
} else if ( payloadType == "mixed" ) {
return mixedPayload( index, spec );
} else {
throw runtime_error( "unknown payload type" );
}
}
/// Constructs a foldername for a given index and payloadType
string foldername( int index, const string & payloadType ) {
stringstream fname;
fname << "/" << payloadType << "_" << index;
return fname.str();
}
/// Writes a payload to the database. Input parameters are the database handle,
/// the folder specification, the payload type, the folder index, and the
/// number of objects per folder. Returns the number of objects written.
int write( cool::IDatabasePtr & db, const RecordSpecification & spec,
const string & payloadType,
int folderIndex, int objectsPerFolder ) {
int nObjs = 0;
string fname = foldername( folderIndex, payloadType );
try {
cool::IFolderPtr folder;
if ( ! db->existsFolder( fname ) ) {
folder = db->createFolder( fname, spec );
} else {
folder = db->getFolder( fname );
}
folder->setupStorageBuffer();
for ( int i = 0; i < objectsPerFolder; ++i ) {
cool::ValidityKey start = i;
folder->storeObject( start, start +1,
payload( i, spec, payloadType ), 0 );
++nObjs;
}
folder->flushStorageBuffer();
} catch ( cool::Exception & e ) {
cerr << e.what() << endl;
return 0;
}
return nObjs;
}
/// Reads and validates data from the database. The input parameters are
/// required for the validation. Returns the number of objects successfully
/// validated.
int read( cool::IDatabasePtr & db, const RecordSpecification & spec,
const string & payloadType,
int folderIndex, int startIndex, int endIndex ) {
string fname = foldername( folderIndex, payloadType );
cool::IFolderPtr folder = db->getFolder( fname );
string tag = "";
int channel = 0;
cool::IObjectIteratorPtr objs =
folder->browseObjects( startIndex, endIndex, channel, tag );
int validated = 0;
int index = 0;
while ( objs->goToNext() ) {
const cool::IObject& obj = objs->currentRef();
if ( obj.payload() ==
payload( startIndex + index, spec, payloadType ) ) ++validated;
++index;
}
return validated;
}
int main( int argc, char *argv[] ) {
Arguments args;
try {
args = parseArgs( argc, argv );
} catch ( exception & e ) {
cout << usage() << endl;
cout << e.what() << endl;
exit(-1);
}
string connectString;
if ( getenv( COOLTESTDB ) ) {
connectString = getenv( COOLTESTDB );
} else {
cout << "Please provide a connect string by "
<< "specifying one in the environment variable COOLTESTDB, e.g."
<< endl;
cout << "setenv COOLTESTDB "
<< "\"oracle://devdb9;schema=conddb_test;"
<< "user=conddb_test;password=lcg;dbname=COOLTEST\"" << endl;
cout << "Aborting test" << endl;
exit(-1);
}
cool::Application app;
// If we can access a LFC and the user is not explicitely forbidding it,
// we try to use CORAL LFCReplicaService
if ( ::getenv("COOL_IGNORE_LFC") == NULL && ::getenv("LFC_HOST") != NULL ) {
coral::IConnectionServiceConfiguration &connSvcConf =
app.connectionSvc().configuration();
// try to use CORAL LFCReplicaService
connSvcConf.setAuthenticationService("CORAL/Services/LFCReplicaService");
connSvcConf.setLookupService("CORAL/Services/LFCReplicaService");
}
cool::IDatabaseSvc& dbSvc = app.databaseService();
cool::IDatabasePtr db;
try {
if ( args.command == "recreateDb" ) {
dbSvc.dropDatabase( connectString );
db = dbSvc.createDatabase( connectString );
RES << "OK [ recreate ]" << endl;
return 0;
} else {
db = dbSvc.openDatabase( connectString );
}
} catch ( cool::DatabaseDoesNotExist& /*e*/ ) {
db = dbSvc.createDatabase( connectString );
} catch ( std::exception& e ) {
cout << "Unrecoverable error:\n" << e.what() << endl;
return 1;
}
RecordSpecification spec;
if ( args.payloadType == "float" ) {
int nFields = 20;
spec = uniformSpec( nFields, cool::StorageType::Float );
} else if ( args.payloadType == "string" ) {
int nFields = 20;
spec = uniformSpec( nFields, cool::StorageType::String4k );
} else if ( args.payloadType == "mixed" ) {
int nFields = 10;
spec = mixedSpec( nFields );
} else {
cout << "unknown payload type" << endl;
exit(-1);
}
if ( args.command == "write" ) {
int folderIndex = args.intArg_0;
int objectsPerFolder = args.intArg_1;
int nObjs = write( db, spec, args.payloadType,
folderIndex, objectsPerFolder );
if ( nObjs == objectsPerFolder ) {
RES << "OK [ "
<< "write "
<< args.payloadType
<< " " << folderIndex
<< " " << objectsPerFolder << " ] "
<< nObjs << " objects written"
<< endl;
return 0;
} else {
RES << "FAILURE [ "
<< "write "
<< args.payloadType
<< " " << folderIndex
<< " " << objectsPerFolder << " ] "
<< nObjs << " objects written"
<< endl;
return 1;
}
} else if ( args.command == "read" ) {
int folderIndex = args.intArg_0;
int startIndex = args.intArg_1;
int endIndex = args.intArg_2;
int validated =
read( db, spec, args.payloadType, folderIndex, startIndex, endIndex );
if ( validated == endIndex - startIndex ) {
RES << "OK [ "
<< "read "
<< args.payloadType
<< " " << folderIndex
<< " " << startIndex
<< " " << endIndex << " ] "
<< validated << " objects validated"
<< endl;
return 0;
} else {
RES << "FAILURE [ "
<< "read "
<< args.payloadType
<< " " << folderIndex
<< " " << startIndex
<< " " << endIndex << " ] "
<< validated << " of " << ( endIndex - startIndex )
<< " objects validated."
<< endl;
return 1;
}
} else {
cout << "unknown command" << endl;
exit(-1);
}
return 0;
}
#----------------------------------------------
#
# Worker class
#
#----------------------------------------------
import threading
class Worker(threading.Thread):
requestId = 0
def __init__( self, aRequestQueue, aResultQueue, **kw ):
threading.Thread.__init__( self, **kw )
self.setDaemon( True )
self.requestQueue = aRequestQueue
self.resultQueue = aResultQueue
self.start()
def performWork( self, method, *args, **kw ):
Worker.requestId += 1
self.requestQueue.put( ( Worker.requestId, method, args, kw ) )
return Worker.requestId
def run( self ):
while True:
requestId, method, args, kw = self.requestQueue.get()
self.resultQueue.put( ( requestId, method( *args, **kw ) ) )
#----------------------------------------------
#
# Configuration
#
#----------------------------------------------
import sys
try:
from config import *
except:
print 'config.py not found -- using default configuration'
clientCommand = "coolMiniClient"
numberOfWorkers = 20
numberOfJobs = 10
submitTimeout = 1 # seconds
availableTypes = [ 'string', 'float', 'mixed' ]
numberOfFolders = { 'string' : 10, 'float' : 10, 'mixed' : 10 }
objectsPerFolder = { 'string' : 50, 'float' : 100, 'mixed' : 50 }
readWriteRatio = 50
readMinBatchSize = { 'string' : 10, 'float' : 50, 'mixed' : 10 }
readMaxBatchSize = { 'string' : 50, 'float' : 100, 'mixed' : 50 }
bytesPerObject = { 'string' : 5000, 'float' : 80, 'mixed' : 330 }
folderIds = {}
for key, value in numberOfFolders.items():
folderIds[key] = range( value )
#----------------------------------------------
#
# Helper methods
#
#----------------------------------------------
import Queue
requestQueue = Queue.Queue()
resultQueue = Queue.Queue()
for i in range( numberOfWorkers ):
worker = Worker( requestQueue, resultQueue )
import os, re, random, time
def writeCommand( payloadType ):
index = random.randrange( 0, len(folderIds[payloadType]) )
folderId = folderIds[payloadType][index]
del folderIds[payloadType][index]
cmd = "%s write %s %d %d" % ( clientCommand,
payloadType,
folderId,
objectsPerFolder[payloadType] )
return cmd
def readCommand( payloadType ):
folderId = random.choice( availableFolders[payloadType] )
batchsize = random.