23 Jun, 2009 von Raimar Wagner 0
Ein einfacher Scheduler in C++ — Teil 2: Scheduler
Die zentrale Frage bei der Implementierung des Schedulers ist die Frage nach der Datenstruktur in der die Tupel gehalten werden. Der Scheduler soll, gemäß der SJF Strategie, immer das Tupel auswählen dessen Joblänge am kürzesten ist. Für solche eine Problemstellung eignet sich die Datenstruktur std::set
bzw. std::multiset
parametrisiert auf std::pair
am Besten. Jedoch ist zu beachten dass die Sortierung bei std::pair
normalerweise auf dem ersten Element, also bei uns auf der ThreadId basiert. Daher müssen wir den Vergleichsoperator des std::multiset
mit unserem eigenen Operator überschreiben.
/* Scheduler.h */ /* this struct is used to have a different compare operator for std::pair in my std::multiset. I have to compare the second argument of pair! */ struct compareBySecond { bool operator() (const std::pair<int,int>& a, const std::pair<int,int>& b) const {return (a).second<(b).second;} }; /* my scheduler class, schedules the requests */ class Scheduler{ public: [ ... ] private: //multiset holds all pairs with (thread,job) std::multiset< std::pair<int,int> , compareBySecond > jobs; [ ... ] };
Als Synchronisationselemente kommen normale RW-Locks und Condition-Variablen zur Anwendung, auf diese Standardelemente wird hier nicht weiter eingegangen. Guten Beispielcode für diese Elemente findet sich in den Qt3.3-Quellen (z.B.: QReadWriteLock).
Die einzige Methode die von den Anfragethreads ausgeführt wird ist Scheduler::enqueue(int thread, int length)
(siehe Teil 1). Bemerkenswert ist hier nur der letzte Aufruf, da wir ja immer eine möglichst volle Queue gewährleisten wollen wartet der Scheduler darauf dass er Jobs erst auswählt, wenn die Queue voll ist oder eine volle Queue nicht mehr erreicht werden kann. Hierzu ist die Condition-Variable cv_queuefull
nötig auf die beim Überprüfen der Queue gewartet wird.
/* enqueues a job */ void Scheduler::enqueue(int thread, int length){ //wait if queue is full or thread has already job in queue while(isQueueFull() || threadHasRequestInQueue(thread)){ cv->wait(); } //new pair for enqueueing std::pair<int,int> tmp(thread,length); //lock and insert in queue rw->lockWrite(); jobs.insert(tmp); rw->unlockWrite(); //we did it! std::cout << "requester " << thread << " joblength " << length << std::endl; //signal scheduler thread that thread is now maybe full cv_queuefull->signal(); }
Die beiden hier noch verwendeten Methoden bool Scheduler::isQueueFull()
und bool Scheduler::threadHasRequestInQueue(int thread)
sind aus Implementierungssicht trivial.
Die wichtigste Methode in der die eigentlichen Jobs “abgearbeitet” werden ist Scheduler::processNextJob()
, hier muss zunächst überprüft werden ob die Queue bereit ist. Dies geschieht in Scheduler::QueueReady()
und ist weiter unten erläutert. Für die Auswahl des nächsten Jobs mit der kürzesten Joblänge reicht es das erste Element des Sets zu entnehmen, dies ist nach der von uns definierten Ordnung im Set automatisch der Job mit der kürzesten Laufzeit. Nach dem Abarbeiten wird der Job aus der Queue gelöscht. Falls gerade ein Thread darauf wartet einen Job zu enqueuen dem es vorher nicht möglich war zu enqueuen da er schon einen Job in der Queue hatte wird dieser zum Schluss noch aufgeweckt.
/* processes the next job */ void Scheduler::processNextJob(){ //is queue ready? QueueReady(); //get job rw->lockRead(); std::pair<int,int> tmp = *( jobs.begin()); rw->unlockRead(); //process .......... :-) std::cout << "service requester " << tmp.first << " joblength " << tmp.second << std::endl; //delete from queue rw->lockWrite(); jobs.erase(jobs.find(tmp)); rw->unlockWrite(); //wake up all guys that wait on enqueuing cv->signalAll(); }
Für die Überprüfung ob die Queue bereit ist, sind zwei Bedingungen nötig: Ersten muss die Queue voll sein, siehe Annahmen. In dem Fall dass schon Threads beendet wurden kann es passieren dass die Queue nicht voll werden kann. In diesem Fall soll der Scheduler trotz nicht voller Queue seine Arbeit aufnehmen. Hier wäre noch Optimierungspotential gegeben, da nicht garantiert wird dass die Queue in momentanen Zustand der Threads immer so voll wie möglich ist.
/* checks if queue is ready */ void Scheduler::QueueReady(){ while(!isQueueFull() && (jobs.size() < nrOfThreads) ){ //wait if our queue is not full or some threads are already dead, thus we can proceed if #threads<Queuesize cv_queuefull->wait(); } }