22 Jun, 2009 von Raimar Wagner
Ein einfacher Scheduler in C++ — Teil 1: Rahmenprogramm
Der hier vorgestellte Scheduler soll am Ende folgende Rahmenbedingungen erfüllen:
- Jeder Thread darf nur maximal eine Anfrage in der Warteschlange haben.
- Ein Thread stoppt falls alle seine Anfragen bearbeitet wurden.
- Der Scheduler arbeitet nach dem SJF (Shortest Job First) Verfahren
- Die Warteschlangengröße muss beim Programmstart festlegbar sein
Als Eingabe erhält das Programm die Warteschlangenlänge und eine Liste von Eingabedateien. Jede Eingabedatei entspricht einem anfragenden Thread und beinhaltet eine Liste von Nummern die die Joblänge symbolisieren sollen.
Der erste Teil besteht aus der Implementierung des Rahmenprogramm der sich der später zu implementierenden Schedulerklasse bedient.
Der erste Teil der main() Methode besteht aus den Auslese- und Initialisierungaufrufen. Insbesondere wird der Scheduler mit der Anzahl der Threads und der maximalen Warteschlangenlänge initialisiert.
//get queue-size
int maxQueue=atoi(argv[1]);
if(maxQueue<=0) {
std::cout << "min. queue size is 1" << std::endl;
return 0;
}
//vector so store filenames
std::vector<std::string> files;
//count #threads
int numOfThreads=0;
//save filenames
for (int i = 2; i < argc; i++) {
files.push_back(std::string(argv[i]));
//every file is one thread
numOfThreads++;
}
//create new scheduler
scheduler= new Scheduler(maxQueue, numOfThreads);
//just temp storage for reading file
std::string line;
//iterate through parameters
for(int i = 0; i< argc-2; i++){
//new parameter => new thread
jobs.push_back(std::vector<int>());
//open file
std::ifstream myfile(files.at(i).c_str());
if (myfile.is_open())
{
//push all job lengthes in vector
while (! myfile.eof() )
{
std::getline (myfile,line);
(jobs.at(i)).push_back(atoi(line.c_str()));
}
//close file
myfile.close();
}
}
Im nächsten Schritt müssen alle anfragenden Threads gestartet werden, in diesem Fall kommen einfache pthreads zur Anwendung. Zu beachten ist dass &run_thread der Funktionspointer auf die sogenannte run-Methode darstellt.
//array of threads
pthread_t th[numOfThreads];
//array of int* to transfer thread nr. to thread
int* numbers[numOfThreads];
for (int i = 0; i < numOfThreads; i++) {
//save thread nr.
numbers[i]=new int(i);
//create thread and transfer parameter
pthread_create (&th[i], NULL, &run_thread, numbers[i]);
}
Im main() Thread läuft auch der Scheduler der solange Anfragen abarbeitet bis die Warteschlange leer ist. Danach muss um ein sauberes Programmende zu gewährleisten noch auf alle Threads gewartet werden.
//process jobs until Queue is empty
while(!scheduler->isQueueEmpty()){
scheduler->processNextJob();
}
//wait until all threads are finished
for (int i = 0; i < numOfThreads; i++){
pthread_join (th[i], NULL);
}
return 0;
Die von den Anfragethreads ausgeführte Methode run_thread(void *number) hat ein paar Stolperfallen; hier ist zu beachten dass an pthreads übergebene Funktionen nur void* Pointer als Parameter haben dürfen, deshalb muss auch innerhalb der Methode die Threadnummer wieder auf int* gecastet werden. Nach dem einreihen aller Anfragen in die Warteschlange des Schedulers kann sich der Thread beenden.
static void* run_thread (void *number) {
for(unsigned int i=0; i< (jobs.at(*( (int*) number))).size()-1;i++){
//enqueue all jobs in waiting queue
scheduler->enqueue(*( (int*) number), (jobs.at(*( (int*) number))).at(i) );
}
//all jobs are enqueued => thread has done his work
scheduler->decreaseThreads();
pthread_exit (NULL);
}
In nächsten Teil wird detailliert auf die Funktionsweise und den Aufbau eines des hier benutzten Schedulers eingegangen.








[...] die von den Anfragethreads ausgeführt wird ist Scheduler::enqueue(int thread, int length) (siehe Teil 1). Bemerkenswert ist hier nur der letzte Aufruf, da wir ja immer eine möglichst volle Queue [...]