Hierbei geht es um einen kleinen Graupner Humanoiden der Serie RB-1000. Diesem Humanoiden, getauft auf T-1000 aka Arnie, müssen wir in einem Teamprojekt das Gehen beibringen.
Arnie hat 21 Servos, 2 LED-Augen(rot) und einen Microkontroller, der an der Uni eigenständig entwickelt wurde, versehen mit einer Bluetooth-Schnittstelle und einem Gyroskop. In zwei Praktika zuvor wurden zwei Interfaces entwickelt, das eine in MatLab und das andere in C/C++. Derzeit können wir uns noch nicht ganz entscheiden, welches von beiden wir verwenden werden. Allerdings ist mir das C/C++ gerade noch lieber, da es weniger Probleme bereitet und von uns in – welch Einfalssreichtum – Skynet umbenannt wurde. Das schöne dabei ist, dass man beispielsweise ein Bashscript schreiben kann für die Ansteuerung und dann das ganze einfach an Skynet pipen kann.
Bisher bestand unsere Tätigkeit darin, Arnie erst einmal komplett zu entrümpeln. Wir haben unnötige Meter an Servokabel entfernt, die Stromversorgung neu angeschlossen, neue Schalter angebracht, die Batterie im Brustkorb durch ein passendes Netzteil ersetzt, was uns die Handhabung um mindestens 120% verbessert, die Interfaces gestestet und vorbereitet. Demnächst werden wir auch noch eine kleine Justierungsstation basteln, damit man ihn vor Inbetriebnahme immer optimal einstellen kann.
Natürlich haben wir auch schon ordentlich an ihm getestet was er kann und wie es geht. Hier sieht man ihn, wie er winkt.
]]>This is only the beginning….
Meine Empfehlungen:
std::set
bzw. std::multiset
parametrisiert auf std::pair
am Besten. Jedoch ist zu beachten dass die Sortierung bei std::pair
normalerweise auf dem ersten Element, also bei uns auf der ThreadId basiert. Daher müssen wir den Vergleichsoperator des std::multiset
mit unserem eigenen Operator überschreiben.
/* Scheduler.h */ /* this struct is used to have a different compare operator for std::pair in my std::multiset. I have to compare the second argument of pair! */ struct compareBySecond { bool operator() (const std::pair<int,int>& a, const std::pair<int,int>& b) const {return (a).second<(b).second;} }; /* my scheduler class, schedules the requests */ class Scheduler{ public: [ ... ] private: //multiset holds all pairs with (thread,job) std::multiset< std::pair<int,int> , compareBySecond > jobs; [ ... ] };
Als Synchronisationselemente kommen normale RW-Locks und Condition-Variablen zur Anwendung, auf diese Standardelemente wird hier nicht weiter eingegangen. Guten Beispielcode für diese Elemente findet sich in den Qt3.3-Quellen (z.B.: QReadWriteLock).
Die einzige Methode die von den Anfragethreads ausgeführt wird ist Scheduler::enqueue(int thread, int length)
(siehe Teil 1). Bemerkenswert ist hier nur der letzte Aufruf, da wir ja immer eine möglichst volle Queue gewährleisten wollen wartet der Scheduler darauf dass er Jobs erst auswählt, wenn die Queue voll ist oder eine volle Queue nicht mehr erreicht werden kann. Hierzu ist die Condition-Variable cv_queuefull
nötig auf die beim Überprüfen der Queue gewartet wird.
/* enqueues a job */ void Scheduler::enqueue(int thread, int length){ //wait if queue is full or thread has already job in queue while(isQueueFull() || threadHasRequestInQueue(thread)){ cv->wait(); } //new pair for enqueueing std::pair<int,int> tmp(thread,length); //lock and insert in queue rw->lockWrite(); jobs.insert(tmp); rw->unlockWrite(); //we did it! std::cout << "requester " << thread << " joblength " << length << std::endl; //signal scheduler thread that thread is now maybe full cv_queuefull->signal(); }
Die beiden hier noch verwendeten Methoden bool Scheduler::isQueueFull()
und bool Scheduler::threadHasRequestInQueue(int thread)
sind aus Implementierungssicht trivial.
Die wichtigste Methode in der die eigentlichen Jobs “abgearbeitet” werden ist Scheduler::processNextJob()
, hier muss zunächst überprüft werden ob die Queue bereit ist. Dies geschieht in Scheduler::QueueReady()
und ist weiter unten erläutert. Für die Auswahl des nächsten Jobs mit der kürzesten Joblänge reicht es das erste Element des Sets zu entnehmen, dies ist nach der von uns definierten Ordnung im Set automatisch der Job mit der kürzesten Laufzeit. Nach dem Abarbeiten wird der Job aus der Queue gelöscht. Falls gerade ein Thread darauf wartet einen Job zu enqueuen dem es vorher nicht möglich war zu enqueuen da er schon einen Job in der Queue hatte wird dieser zum Schluss noch aufgeweckt.
/* processes the next job */ void Scheduler::processNextJob(){ //is queue ready? QueueReady(); //get job rw->lockRead(); std::pair<int,int> tmp = *( jobs.begin()); rw->unlockRead(); //process .......... :-) std::cout << "service requester " << tmp.first << " joblength " << tmp.second << std::endl; //delete from queue rw->lockWrite(); jobs.erase(jobs.find(tmp)); rw->unlockWrite(); //wake up all guys that wait on enqueuing cv->signalAll(); }
Für die Überprüfung ob die Queue bereit ist, sind zwei Bedingungen nötig: Ersten muss die Queue voll sein, siehe Annahmen. In dem Fall dass schon Threads beendet wurden kann es passieren dass die Queue nicht voll werden kann. In diesem Fall soll der Scheduler trotz nicht voller Queue seine Arbeit aufnehmen. Hier wäre noch Optimierungspotential gegeben, da nicht garantiert wird dass die Queue in momentanen Zustand der Threads immer so voll wie möglich ist.
/* checks if queue is ready */ void Scheduler::QueueReady(){ while(!isQueueFull() && (jobs.size() < nrOfThreads) ){ //wait if our queue is not full or some threads are already dead, thus we can proceed if #threads<Queuesize cv_queuefull->wait(); } }]]>
Als Eingabe erhält das Programm die Warteschlangenlänge und eine Liste von Eingabedateien. Jede Eingabedatei entspricht einem anfragenden Thread und beinhaltet eine Liste von Nummern die die Joblänge symbolisieren sollen.
Der erste Teil besteht aus der Implementierung des Rahmenprogramm der sich der später zu implementierenden Schedulerklasse bedient.
Der erste Teil der main()
Methode besteht aus den Auslese- und Initialisierungaufrufen. Insbesondere wird der Scheduler mit der Anzahl der Threads und der maximalen Warteschlangenlänge initialisiert.
//get queue-size int maxQueue=atoi(argv[1]); if(maxQueue<=0) { std::cout << "min. queue size is 1" << std::endl; return 0; } //vector so store filenames std::vector<std::string> files; //count #threads int numOfThreads=0; //save filenames for (int i = 2; i < argc; i++) { files.push_back(std::string(argv[i])); //every file is one thread numOfThreads++; } //create new scheduler scheduler= new Scheduler(maxQueue, numOfThreads); //just temp storage for reading file std::string line; //iterate through parameters for(int i = 0; i< argc-2; i++){ //new parameter => new thread jobs.push_back(std::vector<int>()); //open file std::ifstream myfile(files.at(i).c_str()); if (myfile.is_open()) { //push all job lengthes in vector while (! myfile.eof() ) { std::getline (myfile,line); (jobs.at(i)).push_back(atoi(line.c_str())); } //close file myfile.close(); } }
Im nächsten Schritt müssen alle anfragenden Threads gestartet werden, in diesem Fall kommen einfache pthreads zur Anwendung. Zu beachten ist dass &run_thread
der Funktionspointer auf die sogenannte run-Methode darstellt.
//array of threads pthread_t th[numOfThreads]; //array of int* to transfer thread nr. to thread int* numbers[numOfThreads]; for (int i = 0; i < numOfThreads; i++) { //save thread nr. numbers[i]=new int(i); //create thread and transfer parameter pthread_create (&th[i], NULL, &run_thread, numbers[i]); }
Im main()
Thread läuft auch der Scheduler der solange Anfragen abarbeitet bis die Warteschlange leer ist. Danach muss um ein sauberes Programmende zu gewährleisten noch auf alle Threads gewartet werden.
//process jobs until Queue is empty while(!scheduler->isQueueEmpty()){ scheduler->processNextJob(); } //wait until all threads are finished for (int i = 0; i < numOfThreads; i++){ pthread_join (th[i], NULL); } return 0;
Die von den Anfragethreads ausgeführte Methode run_thread(void *number)
hat ein paar Stolperfallen; hier ist zu beachten dass an pthreads
übergebene Funktionen nur void*
Pointer als Parameter haben dürfen, deshalb muss auch innerhalb der Methode die Threadnummer wieder auf int*
gecastet werden. Nach dem einreihen aller Anfragen in die Warteschlange des Schedulers kann sich der Thread beenden.
static void* run_thread (void *number) { for(unsigned int i=0; i< (jobs.at(*( (int*) number))).size()-1;i++){ //enqueue all jobs in waiting queue scheduler->enqueue(*( (int*) number), (jobs.at(*( (int*) number))).at(i) ); } //all jobs are enqueued => thread has done his work scheduler->decreaseThreads(); pthread_exit (NULL); }
In nächsten Teil wird detailliert auf die Funktionsweise und den Aufbau eines des hier benutzten Schedulers eingegangen.
]]>