22 Jun, 2009 von Raimar Wagner
Ein einfacher Scheduler in C++ — Teil 1: Rahmenprogramm
Der hier vorgestellte Scheduler soll am Ende folgende Rahmenbedingungen erfüllen:
- Jeder Thread darf nur maximal eine Anfrage in der Warteschlange haben.
- Ein Thread stoppt falls alle seine Anfragen bearbeitet wurden.
- Der Scheduler arbeitet nach dem SJF (Shortest Job First) Verfahren
- Die Warteschlangengröße muss beim Programmstart festlegbar sein
Als Eingabe erhält das Programm die Warteschlangenlänge und eine Liste von Eingabedateien. Jede Eingabedatei entspricht einem anfragenden Thread und beinhaltet eine Liste von Nummern die die Joblänge symbolisieren sollen.
Der erste Teil besteht aus der Implementierung des Rahmenprogramm der sich der später zu implementierenden Schedulerklasse bedient.
Der erste Teil der main()
Methode besteht aus den Auslese- und Initialisierungaufrufen. Insbesondere wird der Scheduler mit der Anzahl der Threads und der maximalen Warteschlangenlänge initialisiert.
//get queue-size int maxQueue=atoi(argv[1]); if(maxQueue<=0) { std::cout << "min. queue size is 1" << std::endl; return 0; } //vector so store filenames std::vector<std::string> files; //count #threads int numOfThreads=0; //save filenames for (int i = 2; i < argc; i++) { files.push_back(std::string(argv[i])); //every file is one thread numOfThreads++; } //create new scheduler scheduler= new Scheduler(maxQueue, numOfThreads); //just temp storage for reading file std::string line; //iterate through parameters for(int i = 0; i< argc-2; i++){ //new parameter => new thread jobs.push_back(std::vector<int>()); //open file std::ifstream myfile(files.at(i).c_str()); if (myfile.is_open()) { //push all job lengthes in vector while (! myfile.eof() ) { std::getline (myfile,line); (jobs.at(i)).push_back(atoi(line.c_str())); } //close file myfile.close(); } }
Im nächsten Schritt müssen alle anfragenden Threads gestartet werden, in diesem Fall kommen einfache pthreads zur Anwendung. Zu beachten ist dass &run_thread
der Funktionspointer auf die sogenannte run-Methode darstellt.
//array of threads pthread_t th[numOfThreads]; //array of int* to transfer thread nr. to thread int* numbers[numOfThreads]; for (int i = 0; i < numOfThreads; i++) { //save thread nr. numbers[i]=new int(i); //create thread and transfer parameter pthread_create (&th[i], NULL, &run_thread, numbers[i]); }
Im main()
Thread läuft auch der Scheduler der solange Anfragen abarbeitet bis die Warteschlange leer ist. Danach muss um ein sauberes Programmende zu gewährleisten noch auf alle Threads gewartet werden.
//process jobs until Queue is empty while(!scheduler->isQueueEmpty()){ scheduler->processNextJob(); } //wait until all threads are finished for (int i = 0; i < numOfThreads; i++){ pthread_join (th[i], NULL); } return 0;
Die von den Anfragethreads ausgeführte Methode run_thread(void *number)
hat ein paar Stolperfallen; hier ist zu beachten dass an pthreads
übergebene Funktionen nur void*
Pointer als Parameter haben dürfen, deshalb muss auch innerhalb der Methode die Threadnummer wieder auf int*
gecastet werden. Nach dem einreihen aller Anfragen in die Warteschlange des Schedulers kann sich der Thread beenden.
static void* run_thread (void *number) { for(unsigned int i=0; i< (jobs.at(*( (int*) number))).size()-1;i++){ //enqueue all jobs in waiting queue scheduler->enqueue(*( (int*) number), (jobs.at(*( (int*) number))).at(i) ); } //all jobs are enqueued => thread has done his work scheduler->decreaseThreads(); pthread_exit (NULL); }
In nächsten Teil wird detailliert auf die Funktionsweise und den Aufbau eines des hier benutzten Schedulers eingegangen.
[...] die von den Anfragethreads ausgeführt wird ist Scheduler::enqueue(int thread, int length) (siehe Teil 1). Bemerkenswert ist hier nur der letzte Aufruf, da wir ja immer eine möglichst volle Queue [...]