Ein unbemanntes Taxi fährt gelbe Gummienten durch die Stadt! Problemprüfmodul für die Gym-Duckietown-Plattform

Bis 2040 werden die meisten Großstädte der Welt Autos ohne Fahrer fahren, sagen Analysten . Aber um in 20 Jahren auf der Straße zu entspannen, müssen wir gute Arbeit an autonomen Fahralgorithmen leisten. Zu diesem Zweck hat MIT die Duckietown- Plattform entwickelt , mit der Sie dies mit minimalen Kosten tun können. In Duckietown transportieren kostengünstige mobile Roboter gelbe Gummienten in einem verkleinerten Modell der Stadt. Auf der Grundlage dieser Plattform werden AI Driving Olympics abgehalten und an Universitäten Kurse zum Einsatz künstlicher Intelligenz bei der Verwaltung unbemannter Fahrzeuge gestartet.



In diesem Artikel werde ich über mein Kursprojekt sprechen, an dem ich zusammen mit dem Labor für mobile Roboteralgorithmen gearbeitet habe JetBrains Research : Über den Issue Checker, den ich für den Emulator Gym-Duckietown geschrieben habe . Wir werden über das Testsystem und die Integration dieses Systems in Online-Bildungsplattformen sprechen, die die External Grader-Technologie verwenden - zum Beispiel mit der Stepik.org- Plattform .









Über den Autor



Mein Name ist Daniil Plushenko und ich bin ein Student im ersten (zweiten) Jahr des Masterstudiengangs " Programmierung und Datenanalyse " an der HSE in St. Petersburg. 2019 schloss ich meinen Bachelor in Angewandter Mathematik und Informatik an derselben Universität ab.



Duckietown Plattform



Duckietown ist ein autonomes Fahrzeugforschungsprojekt . Die Organisatoren des Projekts haben eine Plattform geschaffen, mit deren Hilfe ein neuer Lernansatz im Bereich der künstlichen Intelligenz und der Robotik umgesetzt werden kann. Alles begann als Kurs am MIT im Jahr 2016, verbreitete sich jedoch allmählich auf der ganzen Welt und auf verschiedenen Bildungsebenen: von der High School bis zum Master-Programm.



Die Duckietown-Plattform besteht aus zwei Teilen. Erstens handelt es sich um ein verkleinertes Modell einer städtischen Verkehrsumgebung mit Straßen, Gebäuden, Verkehrszeichen und Hindernissen. Zweitens ist es Transport. Kleine mobile Roboter (Duckiebots), die einen Raspberry Pi betreiben, erhalten über eine Kamera Informationen über die Welt um sie herum und transportieren die Einwohner der Stadt - gelbe Gummienten - entlang der Straßen.







Ich habe mit Duckietown Emulator gearbeitet. Es wird genanntGym-Duckietown , und es ist ein Open-Source-Projekt, das in Python geschrieben wurde. Der Emulator platziert Ihren Bot in der Stadt, ändert seine Position abhängig vom verwendeten Algorithmus (oder von der Taste, die Sie gedrückt haben), zeichnet das Bild neu und schreibt die aktuelle Position des Bots in die Protokolle.  



Wenn Sie es versuchen möchten , empfehle ich, das Repository selbst zu klonen und manual_control.py auszuführen : Auf diese Weise kann der Bot mithilfe der Pfeile auf der Tastatur gesteuert werden.



Screenshot vom Emulator



Der Emulator kann als Umgebung zum Ausführen von Aufgaben verwendet werden. Stellen wir das folgende Problem ein: Auf einer bestimmten Karte, die aus einer Spur besteht, müssen Sie einen Meter in einer geraden Linie fahren.







Um das Problem zu lösen, können Sie den folgenden Algorithmus verwenden:



for _ in range(25):
    env.step([1, 0])
    env.render()


Diese Variable envspeichert den Status der Umgebung.

Die Schrittmethode verwendet als Eingabe action: eine Liste von zwei Elementen, die die Aktion des Bots beschreiben. Das erste Element legt die Geschwindigkeit fest, das zweite den Drehwinkel. Die Methode renderzeichnet das Bild unter Berücksichtigung der neuen Position des Bots neu. Die Anzahl der Schritte und die Geschwindigkeit wurden empirisch ausgewählt: Dies sind die Werte, die der Bot benötigt, um genau einen Meter in einer geraden Linie zu fahren.



Wenn Sie dieses Snippet in manual_control.py verwenden möchten, fügen Sie es hier ein . Der Code ist bis zu diesem Zeitpunkt damit beschäftigt, die Umgebung zu laden. Der Einfachheit halber können Sie es wiederverwenden und dann die obige Lösung zum Problem hinzufügen.



Testsystem



Ich möchte in der Lage sein, solche Aufgaben automatisch zu überprüfen: Nehmen Sie die Implementierung des Bot-Steuerungsalgorithmus, simulieren Sie die Reise und berichten Sie, ob der vorgeschlagene Algorithmus die Aufgabe korrekt ausführt. Ein solches Testsystem würde es ermöglichen, die Umgebung während der Vorbereitung auf Wettbewerbe zur autonomen Verkehrskontrolle sowie zu Bildungszwecken zu nutzen: den Schülern eine Reihe von Problemen zu stellen und ihre Lösungen automatisch zu überprüfen. Ich war an der Entwicklung eines Kursprojekts beteiligt und werde Ihnen im Folgenden erzählen, was ich bekommen habe.



Die Reihenfolge der Schritte beim Überprüfen der Lösung sieht folgendermaßen aus:







Sie können dem Testsystem selbst Aufgaben hinzufügen oder auf die von mir durchgeführten Aufgaben verweisen. Jede Aufgabe verfügt über einen Bedingungsgenerator - eine Klasse, die einen Umgebungsstatus generiert. Es enthält den Namen der Karte und die Startposition des Bots in der Startzelle. Die Zielkoordinaten werden ebenfalls festgelegt: In diesem Fall ist es ein Punkt einen Meter von der Startposition entfernt.



class Ride1MTaskGenerator(TaskGenerator):
    def __init__(self, args):
        super().__init__(args)

    def generate_task(self):
        env_loader = CVTaskEnv if self.args.is_cv_task else TrackingDuckietownEnv
        env = env_loader(
            map_name="straight_road",
            position_on_initial_road_tile=PositionOnInitialRoadTile(
                x_coefficient=0.5,
                z_coefficient=0.5,
                angle=0,
            ),
        )
        self.generated_task['target_coordinates'] = [[env.road_tile_size * 0.5 + 1, 0, env.road_tile_size * 0.5]]
        self.generated_task['env'] = env
        env.render()
        return self.generated_task


Hier TrackingDuckietownEnvund CVTaskEnvsind Wrapper - Klassen , die zum Speichern von Informationen über die Reise für die weitere Analyse verwendet werden. 



class TrackingDuckietownEnv:
    def __init__(self, **kwargs):
        self.__wrapped = DuckietownEnv(**kwargs)
    def step(self, action):
        obs, reward, done, misc = self.__wrapped.step(action)
        message = misc['Simulator']['msg']
        if 'invalid pose' in message.lower():
            raise InvalidPoseException(message)
        for t in self.trackers:
            t.track(self)
        return obs, reward, done, misc


Tracker sammeln Informationen über den aktuellen Status, z. B. die Position des Bots.



CVTaskEnvEs wird verwendet, wenn eine Lösung nur mit Informationen von der Kamera ("Computer Vision") und nicht mit den Funktionen des Emulators erforderlich ist: Wenn Sie beispielsweise wissen möchten, wie weit der Bot von der Mitte des Streifens entfernt ist oder wo sich das nächste sichtbare Objekt befindet. CVTaskEnvDas Aufrufen von Emulatorfunktionen kann die Aufgabe vereinfachen, und die Klasse schränkt den Aufruf der Emulatormethoden ein. Es wird verwendet, wenn das Flag angezeigt wird is_cv_task



class CVTaskEnv:
    def __init__(self, **kwargs):
        self.__wrapped = TrackingDuckietownEnv(**kwargs)

    def __getattr__(self, item):
        if item in self.__wrapped.overriden_methods:
            return self.__wrapped.__getattribute__(item)
        ALLOWED_FOR_CV_TASKS = [
            'render', '_CVTaskEnv__wrapped', '_TrackingDuckietownEnv__wrapped',
            'road_tile_size', 'trip_statistics'
        ]
        if item in ALLOWED_FOR_CV_TASKS:
            return self.__wrapped.__getattr__(item)
        else:
            raise AttributeError(item + " call is not allowed in CV tasks")


Nachdem die Ausführung der Entscheidung abgeschlossen ist - sofern sie nicht durch eine Zeitüberschreitung unterbrochen wurde -, werden die Reiseinformationen durch eine Reihe von Prüfern geleitet. Wenn alle Prüfer erfolgreich gearbeitet haben, gilt das Problem als korrekt gelöst. Andernfalls wird ein erklärendes Urteil angezeigt - zum Beispiel ein Bot stürzte ab, fuhr von der Straße ab usw.



Hier ist einer der Standardprüfer. Er überprüft, ob der Bot am Ende der Reise zum Startpunkt zurückgekehrt ist. Dies kann nützlich sein, wenn Sie beispielsweise in einer Aufgabe einen bestimmten Punkt erreichen und dann zurückkehren müssen.



class SameInitialAndFinalCoordinatesChecker(Checker):
    def __init__(self, maximum_deviation=0.1, **kwargs):
        super().__init__(**kwargs)
        self.maximum_deviation = maximum_deviation

    def check(self, generated_task, trackers, **kwargs):
        trip_statistics = next(x for x in trackers if isinstance(x, TripStatistics))
        trip_data = trip_statistics.trip_data
        if len(trip_data) == 0:
            return True
        initial_coordinates = trip_data[0].position.coordinates
        final_coordinates = trip_data[-1].position.coordinates
        return np.linalg.norm(initial_coordinates - final_coordinates) < self.maximum_deviation


Ein solches Testsystem kann im manuellen Modus verwendet werden, dh manuell einen Test starten und dann das Urteil visuell untersuchen. Wenn wir beispielsweise einen Online-Kurs zum autonomen Verkehr auf Stepik.org starten möchten , müssen wir uns in die Plattform integrieren. Dies wird im nächsten Teil des Artikels besprochen.



Integration mit Online-Plattformen



Für Testaufgaben wird häufig die externe Grader-Technologie verwendet, die von der edX- Plattform entwickelt wurde .



Bei Verwendung des externen Graders überprüft die Lernplattform keine Aufgaben selbst, sondern generiert eine Warteschlange mit Paketen, die an ein anderes Gerät gesendet werden. Die Warteschlangenverbindungsfunktionalität ist im xqueue-watcher- Projekt implementiert . Der Xqueue-Watcher holt die Pakete ab und testet sie dann vom Validator (der normalerweise mehr nicht triviale Aktionen ausführt als Text- / Zahlenvergleiche). Danach wird das Verifizierungsurteil an die Seite der Bildungsplattform zurückgeschickt.



Betrachten wir den Moment der Verbindung mit der Warteschlange genauer. Nachdem die Bildungsplattform die Verbindungsdaten bereitgestellt hat, müssen diese hinzugefügt werdenImplementieren Sie in den Konfigurationsdateien und in der Grade- Methode den Überprüfungsstart direkt. Nähere Anweisungen finden Sie hier und hier .



Der Xqueue-Watcher ruft den Endpunkt get_submission auf , der das Paket nach Möglichkeit aus der Warteschlange abruft. Danach geht sie zum Testen. Der xqueue-watcher ruft dann put_result auf, um das Urteil zurückzugeben.



Sie können xqueue-watcher folgendermaßen starten:



make requirements && python -m xqueue_watcher -d conf.d/


Angenommen, wir möchten die External Grader-Technologie verwenden, den Kurs jedoch nicht auf einer Online-Plattform ausführen. Xqueue-Watcher wird unter der Annahme implementiert, dass es einen Dateispeicher gibt, in den Dateien mit Lösungen hochgeladen werden (Plattformen haben einen solchen Speicher). Wir können Xqueue so ändern, dass kein Dateispeicher mehr benötigt wird, und solche Systeme können im Allgemeinen sogar auf unserem Laptop ausgeführt werden.



Zuerst müssen Sie lernen, wie Sie die Paketwarteschlange selbst pflegen. Die Warteschlangenfunktionalität wird vom xqueue- Projekt bereitgestellt .





Bild aus der Dokumentation .



Sie können es so ausführen:



apt-get install libaio1 libaio-dev
apt-get install libmysqlclient-dev
pip3 install -r requirements.txt
python3 manage.py migrate
python3 manage.py runserver $xqueue_address


Möglicherweise müssen Sie eine Datei erstellen ~ / edx / edx.log



Standardmäßig gibt xqueue xqueue-watcher nicht den Inhalt der Pakete an, dh Dateien mit der Lösung des Problems, sondern Links zu diesen Dateien im Dateispeicher. Um nicht vom Dateispeicher abhängig zu sein, können Sie die Dateien selbst übertragen und auf demselben Computer speichern, auf dem xqueue-watcher ausgeführt wird. 



Um dies zu erreichen, muss der Quellcode folgendermaßen geändert werden:



Die Implementierung der _upload- Methode in lms_interface.py wird durch diese ersetzt:



def _upload(file_to_upload, path, name):
    '''
    Upload file using the provided keyname.
    Returns:
        URL to access uploaded file
    '''
    full_path = os.path.join(path, name)
    return default_storage.save(full_path, file_to_upload)


Wenn kein Dateispeicher verbunden war, speichert diese Methode die Datei mit der Lösung im Pfad $ queue_name / $ file_to_upload_hash.



Schreiben Sie in der Implementierung von get_sumbission in der Datei ext_interface.py anstelle dieser Zeile :



xqueue_files = json.loads(submission.urls)
for xqueue_file in xqueue_files.keys():
    with open(xqueue_files[xqueue_file], 'r') as f:
        xqueue_files[xqueue_file] = f.readlines()


Übertragen wir nicht Links (Pfade) zu Dateien, sondern deren Inhalt.



Jede Lösung wird in einem "einmaligen" Docker-Container mit begrenzten Ressourcen ausgeführt, dh es wird ein separater Container für die Ausführung jeder Lösung erstellt, der nach Abschluss des Tests entfernt wird. Um solche Container zu erstellen und Befehle in ihnen auszuführen, wird portainer-api verwendet (tatsächlich als Wrapper über die Docker-API).



Ergebnis



In diesem Artikel habe ich darüber gesprochen, wie das Testsystem und die Aufgaben für den autonomen Transport erstellt wurden und wie dieses System in Online-Bildungsplattformen integriert wird, die die External Grader-Technologie verwenden. Ich hoffe, dass bald ein Kurs mit diesem System gestartet wird und der Teil über die Integration in Online-Plattformen für diejenigen nützlich sein wird, die ihren eigenen Offline- oder Online-Kurs erstellen möchten.



All Articles