Q-Learning Teil 2

In https://blog.tuncer.net/q-learning-teil-1 ging es bereits um die Q-Matrix. In einem relativ einfachen Beispiel mit wenigen States wurde gezeigt, wie eine Policy gelernt werden kann, indem eine Q-Matrix ständig an Erfahrungen angepasst wird.

Diesmal wird ein Agent darauf trainiert Tic Tac Toe zu spielen. Technisch ist der Unterschied hauptsächlich, die Menge der Zustände die das System einnehmen kann und das Regelwerk, dass die Mechanik des Spiels erzeugt.

Ich bespreche hier die Implemtierung von Shangtong Zhang, Jan Hakenberg, Tian Jun und Kenta Shimada aus den Jahren 2016 und 2018 jedoch eingedeutscht, mit vielen Kommentaren. Das Update im Original scheint mir fehlerhaft zu sein, zumindest nicht optimal. Wahrscheinlich sind deshalb im Original ca. 100000 Epochen notwendig um zu einem halbwegs guten Resultat zu gelangen. Nach einigen Änderungen insbesondere beim Update der Q-Tabelle reichen jetzt schon 20000 Epochen. Das ist meines Erachtens immer noch zu viel, doch es ist gut genug um zu sehen, dass der Algorithmus funktioniert.

Imports und Konstanten

Die erste Aktion ist der Import der Standardmodule.

Jeder kennt Tic-Tak-Toe als 3×3-Brett. Doch grundsätzlich ist es in jeder beliebigen Größe denkbar, also NxM. Diese Möglichkeit wird durch die Definitionen ZEILEN und SPALTEN angedeutet. Allerdings ist es aufgrund der Art wie das Spiel hier implementiert bei größeren Brettern nicht praktikabel, weil allein schon bei 4×4 über 16 Millionen Zustände zu bewerten sind.

import numpy as np
import pickle

ZEILEN = 3
SPALTEN = 3
FELDER = ZEILEN * SPALTEN

Status

Als nächstes wird eine Klasse definiert, die den Zustand des Spiels definiert. In der Reinforcement-Terminologie wäre das die Environment.

  • Es gibt zwei Spieler.
    • Der Spieler, der den ersten Spielzug machen wird hat das Symbol 1
    • Der andere Spieler hat das Symbol -1
  • In einem Zustand gibt es ein Status-Flag für den Gewinner:
    • 1 für Spieler 1
    • -1 für Spieler 2
    • 0 für Unentschieden
    • NULL für offen
  • Für jeden Zustand des Spielfeldes kann ein eindeutiger Hashwert berechnet werden. Somit können alle möglichen Stellungen in einer Hash-Map gespeichert werden – und das ist schon eines der Probleme bei 4×4- oder gar 5×5-Spielen.
class Status:
    def __init__(self):
        self.spielfeld = np.zeros((ZEILEN, SPALTEN))
        self.gewinner = None
        self.hash_wert = None
    
    # Die Darstellung Status-Klasse ist das Spielfeld ohne Zeilenumbruch
    def __repr__(self):
        return "{}".format(self.spielfeld).replace('\n', ', ')
    
    # Berechnen eines eindeutigen Hash-Wertes aus dem Spielfeld
    def hash(self):
        if self.hash_wert is None:
            self.hash_wert = 0
            for i in np.nditer(self.spielfeld):
                # i ist 0 oder -1 oder 1
                self.hash_wert = self.hash_wert * 3 + i +1
        
        return int(self.hash_wert)

    # Festgestellen ob bereits ein Gewinner feststeht
    def is_end(self):
        # Das Spiel ist beendet, wenn ein Gewinner feststeht
        if self.gewinner is not None:
            return True

        summen = []

        # Bilde die Summen der Spalten, der Zeilen und der Diagonalen
        # Das Ende ist erreicht, wenn eine der Werte 3 oder -3 ist

        # Zeilen
        for i in range(ZEILEN):
            summen.append(np.sum(self.spielfeld[i, :]))
        # Spalten
        for i in range(SPALTEN):
            summen.append(np.sum(self.spielfeld[:, i]))
        # Dialgonale
        spur1 = 0
        spur2 = 0
        for i in range(ZEILEN):
            spur1 += self.spielfeld[i, i]
            spur2 += self.spielfeld[i, ZEILEN - 1 - i]
        summen.append(spur1)
        summen.append(spur2)

        # Prüfen ob bereits ein Gewinner feststeht
        for summand in summen:
            if summand == 3:
                self.gewinner = 1
                return True
            if summand == -3:
                self.gewinner = -1
                return True

        # Prüfe ob es bereits unentschieden steht.
        if FELDER == np.sum(np.abs(self.spielfeld)):
            self.gewinner = 0
            return True

        # Das Spiel kann weiter gehen
        return False

    # Hilfsfunktion, die aus einem Status und einem Spielzug
    # einen neuen Status generiert
    def next_state(self, i, j, symbol):
        new_state = Status()
        new_state.spielfeld = np.copy(self.spielfeld)  # das alte Brett wird nicht direkt modifiziert,
        new_state.spielfeld[i, j] = symbol  # weil es im Dictionary aller Stellungen vorgehalten wird
        return new_state

    # Ausgabe des Spielbrettes
    def print_state(self):
        for i in range(ZEILEN):
            print('-------------')
            zeile = '| '
            for j in range(SPALTEN):
                if self.spielfeld[i, j] == 1:
                    token = 'o'
                elif self.spielfeld[i, j] == -1:
                    token = 'x'
                else:
                    token = ' '
                zeile += token + ' | '
            print(zeile)
        print('-------------')

Spielstände

Mit einer sehr trickreichen, rekursiven Funktion wird über alle möglichen Spielfelder iteriert und zusammen mit Gewinner und Hashwert in einem Python-Dictionary abgelegt. Trickreich ist, dass nur gültige Spielfelder betrachtet werden (abwechselnd Spieler 1 und Spieler 2) und keine Teilstellungen weiter betrachtet werden, wenn ein Gewinner bereits feststeht. Die Funktion ist wie bereits gesagt trickreich, aber sie ist auch der Hauptgrund, warum größere Spielfelder mit dieser Methode nicht möglich sind. Es werden massenhaft Daten kopiert, Speicher allokiert und Rekursionen ausgeführt. Dadurch steigt die Laufzeit auf nicht akzeptable werte. Einziger Ausweg ist, auf ein anderes Reinforcement-Verfahren zu umzusteigen.

def get_all_states_impl(aktueller_status, current_symbol, all_states):
    # fuer jede Zeile und Spalte
    for i in range(ZEILEN):
        for j in range(SPALTEN):

            # Pruefe ob das Feld noch frei ist
            if aktueller_status.spielfeld[i][j] == 0:

                # falls es noch frei ist, besetze es fuer den aktuellen Spieler
                new_state = aktueller_status.next_state(i, j, current_symbol)

                # und berechne den Hashwert der Stellung
                new_hash = new_state.hash()

                # und der Hashwert noch nicht im Dictionary enthalten ist
                if new_hash not in all_states:

                    # ermittle, ob mit diesem Spielzug das Spiel beendet wurde
                    is_end = new_state.is_end()

                    # und speichere diesen Status in das Dictionary
                    all_states[new_hash] = (new_state, is_end)

                    # Falls diese Stellung kein End-Stellung ist, ist jetzt der nächste Spieler dran
                    if not is_end:
                        get_all_states_impl(new_state, -current_symbol, all_states)

Der Aufruf der rekursiven Dictionary-Funktion wird in eine Hilfsfunktion gepackt, deren Rückgabewert genau das Dictionary ist.

# Hilfsfunktion um die rekursive Dictionary-Funktion zu steuern
def get_all_states():
    # Es beginnt mit einem leeren Verzeichnis
    all_states = dict()
    # und mit einem leeren Status
    aktueller_status = Status()
    # und der Spieler mit dem Symbol "1" macht den ersten Zug
    current_symbol = 1

    # Erstellung des ersten Eintrags
    all_states[aktueller_status.hash()] = (aktueller_status, aktueller_status.is_end())

    # Start der Rekursion
    get_all_states_impl(aktueller_status, current_symbol, all_states)

    return all_states


# Globales Verzeichnis aller möglichen Stellungen
all_states = get_all_states()

Agenten

Neben der Q-Matrix ist im Reinforcement-Learning der lernende Agent das zentrale Konzept. Für den Agenten, der die Lern-Logik enthält, definieren wir eine Klasse KI. Die Attribute der Klasse sind:

  • Dem Namen und dem Symbol des Agenten
  • Die Q-Matrix die hier als Dictionary geführt wird
  • Einigen Parametern wie alpha, epsilon
  • Die Folge der States einer Episode

Die Methoden steuern das Verhalten des Agenten:

  • reset() – Zurücksetzen der Folge der gemerkten States einer Episode
  • set_state() – Anfügen eines neuen Status an die Liste der States einer Episode
  • initialize_q() – Es werden alle States des globalen State-Dictionaries untersucht. Dabei wird betrachtet ob die Stellung Patt ist, ob das Spiel für den Agenten selbst oder dem Geger gewonnen ist bzw ob es noch offen ist. Je nach dem, wird für jede Stellung ein initialer Q-Wert in die Q-Tabelle (besser gesagt Q-Liste) geschrieben:
    • 1 – Gewonnen
    • 0.5 – Unentschieden
    • 0 – Verloren
    • 0.25 Gewinner steht noch nicht fest
  • update_q() – Das Update der Q-Matrix wie in der Literatur beschrieben. (Google: Rich Sutton Reinforcement). In dieser Implementierung wird die States-Liste umgedreht und elementweise der klassische Update Mechanismus angewendet.
  • act() – Wählt in einem gegebenen Status aufgrund der vorhandenen Q-Matrix den besten nächsten Zug aus (epsilon-Greedy) und speichert die Wahl in der eigenen States-Liste.
  • save_policy() – Speichern der aktuellen Q-Matrix/Q-Liste
  • load_policy – Initialisieren der Q-Matrix/Q-Liste aus vorher gespeicherter Datei
class KI:
    def __init__(self, name, symbol, alpha=0.1, epsilon=0.1):
        self.q = dict()
        self.alpha = alpha
        self.epsilon = epsilon
        self.states = []
        self.symbol = symbol
        self.name = name

    def reset(self):
        self.states = []

    def set_state(self, state):
        self.states.append(state)

    def initialize_q(self):

        for hash_wert in all_states:
            state, is_end = all_states[hash_wert]
            if is_end:
                # gewonnen
                if state.gewinner == self.symbol:
                    self.q[hash_wert] = 1.0
                # unentschieden
                elif state.gewinner == 0:
                    self.q[hash_wert] = .5
                # verloren
                else:
                    self.q[hash_wert] = .0
            else:
                self.q[hash_wert] = .25

    def update_q(self):
        states = [state.hash() for state in self.states]

        for i in reversed(range(len(states) - 1)):
            state = states[i]
            td_error = 0.1 * (self.q[states[i + 1]] - self.q[state])
            self.q[state] += self.alpha * td_error

    # Waehle die beste Aktion im aktuellen Status (epsilon-greedy)
    def act(self):
        state = self.states[-1]
        next_states = []
        next_positions = []
        for i in range(ZEILEN):
            for j in range(SPALTEN):
                if state.spielfeld[i, j] == 0:
                    next_positions.append([i, j])
                    next_states.append(state.next_state(i, j, self.symbol).hash())

        if np.random.rand() < self.epsilon:  # epsilon ist nur gesetzt wenn trainiert wird
            action = next_positions[np.random.randint(len(next_positions))]
            action.append(self.symbol)
            return action

        values = []
        for hash_wert, pos in zip(next_states, next_positions):
            values.append((self.q[hash_wert], pos))

        # Falls es mehrere Aktionen mit gleichen Wert gibt, dann entscheidet der Zufall
        np.random.shuffle(values)
        values.sort(key=lambda x: x[0], reverse=True)
        action = values[0][1]
        action.append(self.symbol)
        return action

    def save_policy(self):
        with open('policy_%s.bin' % ('first' if self.symbol == 1 else 'second'), 'wb') as f:
            pickle.dump(self.q, f)

    def load_policy(self):
        with open('policy_%s.bin' % ('first' if self.symbol == 1 else 'second'), 'rb') as f:
            self.q = pickle.load(f)

Ein menschlichen Spieler wird wie ein Agent behandelt. Das heißt die Mensch-Instanz hat die gleichen Methoden (Interface) wie der KI-Agent aber viele Methoden sind nicht implementiert. So haben wir keine Q-Matrix/Q-Liste, keine update-Funktion, kein Speichern und Laden u.s.w. Auf der anderen Seite haben wir eine Liste von zulässigen Zeichen, mit dem der Mensch seinen Agenten steuern kann. Und die act()-Methode besteht nur darin, vom Menschen die nächste Aktion abzufragen.

class Mensch:
    # Eingaben erfolgen im Zahlenfeld
    # | 7 | 8 | 9 |
    # | 4 | 5 | 6 |
    # | 1 | 2 | 3 |
    def __init__(self, name, symbol, **kwargs):
        self.keys = ['7', '8', '9', '4', '5', '6', '1', '2', '3']
        self.state = None
        self.symbol = symbol
        self.name = name

    def reset(self):
        pass

    def initialize_q(self):
        pass

    def set_state(self, state):
        self.state = state

    def act(self):
        self.state.print_state()
        key = input("Dein Spielzug: ")
        if key not in self.keys:
            print("Spiel wurde beendet")
            exit()
        feld = self.keys.index(key)
        i = feld // SPALTEN
        j = feld % SPALTEN
        return i, j, self.symbol

Spielablauf

Auch das Spiel selbst ist in einer eigenen Klasse definiert. Ein Spiel besteht aus Instanzen von zwei Spielern und einer Instanz des Status. Ein Spiel wird durch wenige Methoden gesteuert:

  • reset() – Um die Spieler (die Agenten) zurückzusetzen. Konkret wird eine KI dabei ihre interne Status-Liste löschen.
  • alternate() – Wechsel des Agenten, der am Zug ist.
  • spiele() – Durchführen einer Episode, das heißt:
    • Begin eines neuen Spiels (durch Reset und neuem Status)
    • So lange wechsel zwischen den Spielern und Spielzügen, bis das Spiel beendet ist.
    • Nach jedem Zug werden die neuen Spielstände an die Agenten weiterdelegiert, damit diese den Status in die lokalen Listen speichern können.
class Spiel:
    # Der Spieler der den ersten Spielzug durchfuehrt hat das Symbol 1
    # Der andere hat das Symbol -1
    def __init__(self, spieler1, spieler2):
        self.spieler1 = spieler1
        self.spieler2 = spieler2
        self.spieler1.initialize_q()
        self.spieler2.initialize_q()
        self.aktueller_status = None

    def reset(self):
        self.spieler1.reset()
        self.spieler2.reset()

    def alternate(self):
        while True:
            yield self.spieler1
            yield self.spieler2

    def spiele(self):
        alternator = self.alternate()
        self.reset()
        aktueller_status = Status()
        self.spieler1.set_state(aktueller_status)
        self.spieler2.set_state(aktueller_status)
        while True:
            player = next(alternator)
            i, j, symbol = player.act()
            next_state_hash = aktueller_status.next_state(i, j, symbol).hash()
            aktueller_status, is_end = all_states[next_state_hash]
            self.spieler1.set_state(aktueller_status)
            self.spieler2.set_state(aktueller_status)

            if is_end:
                return aktueller_status.gewinner

Training

Die Hilfsfunktion train() steuert den Lernprozess. Der Trainings-Prozess legt zwei Agenten vom Typ KI an und initialisiert mit denen eine neue Spiel-Instanz. Der Parameter epochs steuert die Anzahl der Spiele. Nach jedem Spiel, wird die update-Methode des Agenten aufgerufen, damit er die Werte seiner Q-Tabelle neu justieren kann. Nach den Updates der Q-Tabellen wird das Spiel zurückgesetzt. Und nach dem reset() der beiden Agenten, kann ein neues Spiel beginnen.

Nachdem beide KIs alle Episoden absolviert haben, speichern diese ihre Q-Tabellen in verschiedene Ausgabedateien.

def train(epochs, print_every_n=1000):
    print ("Trainiere in {} Probespielen".format(epochs));
    spieler1 = KI(name='ki1', symbol=1, step_size=0.99, epsilon=0.5)
    spieler2 = KI(name='ki2', symbol=-1, step_size=0.8, epsilon=0.2)
    spiel = Spiel(spieler1, spieler2)
    gewonnen1 = 0.0
    gewonnen2 = 0.0
    for i in range(1, epochs + 1):
        gewinner = spiel.spiele()
        if gewinner == 1:
            gewonnen1 += 1
        if gewinner == -1:
            gewonnen2 += 1
        if i % print_every_n == 0:
            print('Epoche {:6}, Spieler 1: {:6}, Spieler 2: {:6}, Unentschieden: {:6}'.format(
                i, gewonnen1, gewonnen2, i - gewonnen1 - gewonnen2))

        spieler1.update_q()
        spieler2.update_q()
        spiel.reset()
    spieler1.save_policy()
    spieler2.save_policy()

Wir können die KI-Agenten mit unterschiedlichen Parametern initialisieren um zu Prüfen, ob bestimmte Konstellation besser sind als andere. Diesen Vergleich erledigt die Funktion compete().

Wir erzeugen zwei KI-Agenten und laden die beiden gespeicherten Q-Tabellen. Dann Spielen die Agenten gegeneinander ohne dabei weiter zu lernen (deshalb epsilon=0).

Competition

def compete(turns):
    print('Wettkampf-Modus - KI gegen KI')
    spieler1 = KI(name='ki1', symbol=1, epsilon=0)
    spieler2 = KI(name='ki2', symbol=-1, epsilon=0)
    spiel = Spiel(spieler1, spieler2)
    spieler1.load_policy()
    spieler2.load_policy()
    gewonnen1 = 0.0
    gewonnen2 = 0.0
    for _ in range(turns):
        gewinner = spiel.spiele()
        if gewinner == 1:
            gewonnen1 += 1
        if gewinner == -1:
            gewonnen2 += 1
        spiel.reset()
    print('Nach {} Runden: Spieler 1: {}, Spieler 2: {}, Unentschieden: {}'
          .format(turns, gewonnen1, gewonnen2, turns - gewonnen1 - gewonnen2))

Schließlich gibt es noch für den Mensch-Maschine-Modus die Hilfsfunktion spiele(). Hier wird ein Agent vom Typ Mensch und einer vom Typ KI angelegt. Die KI lädt die vorher gespeicherte Policy und es wird so lange gespielt, bis der Mensch nicht mehr mag.

Play

def spiele():
    while True:
        p1 = Mensch(name='mensch', symbol=1)
        p2 = KI(name='ki', symbol=-1, epsilon=0)

        spiel = Spiel(p1, p2)
        p2.load_policy()
        gewinner = spiel.spiele()

        if gewinner == p2.symbol:
            print("Du verlierst!")
        elif gewinner == p1.symbol:
            print("Du gewinnst!")
        else:
            print("Unentschieden!")

Main

Bei 50000 Trainings-Epochen ist es sehr unwahrscheinlich etwas anders als unentschieden zu erreichen.

if __name__ == '__main__':
    train(50000)
    compete(2000)
    spiele()