Unterrichten von KI zum Verteilen von Kuchen an Geschäfte mithilfe von Reinforcement Learning

Einführung



Irgendwie dachte ich beim Lesen des Buches "Reinforcement Learning: An Introduction" darüber nach, mein theoretisches Wissen durch praktisches zu ergänzen, aber es bestand kein Wunsch, das nächste Problem zu lösen, eine Bar zu balancieren, einem Agenten das Schachspielen beizubringen oder ein anderes Fahrrad zu erfinden.



Gleichzeitig enthielt das Buch ein interessantes Beispiel für die Optimierung der Kundenwarteschlange, das einerseits in Bezug auf die Implementierung / das Verständnis des Prozesses nicht zu kompliziert ist, andererseits sehr interessant ist und mit einigem Erfolg im wirklichen Leben implementiert werden kann.



Nachdem ich dieses Beispiel leicht geändert hatte, kam ich zu der Idee, auf die weiter eingegangen wird.



Formulierung des Problems



Stellen Sie sich also folgendes Bild vor:







Wir verfügen über eine Bäckerei, die täglich 6 (bedingt) Tonnen Himbeerkuchen produziert und diese Produkte täglich an drei Geschäfte verteilt.



Wie kann man dies jedoch am besten tun, damit es so wenig abgelaufene Produkte wie möglich gibt (vorausgesetzt, die Haltbarkeit der Kuchen beträgt drei Tage), wenn wir an jeder Verkaufsstelle nur drei Lastwagen mit einer Kapazität von 1, 2 bzw. 3 Tonnen haben, ist dies am rentabelsten nur einen LKW zu schicken (weil sie weit genug voneinander entfernt sind) und darüber hinaus nur einmal am Tag nach dem Backen der Kuchen, und außerdem kennen wir die Kaufkraft in unseren Filialen nicht (da das Geschäft gerade erst begonnen hat)?



Lassen Sie uns zustimmen, dass die FIFO-Layoutstrategie in Geschäften perfekt funktioniert, in denen Kunden nur die Waren nehmen, die später als die anderen hergestellt wurden. Wenn der Himbeerkuchen jedoch nicht innerhalb von drei Tagen gekauft wurde, werden die Mitarbeiter des Geschäfts ihn los.



Wir wissen (bedingt) nicht, wie hoch die Nachfrage nach Kuchen an einem bestimmten Tag in einem bestimmten Geschäft sein wird. In unserer Simulation haben wir sie jedoch für jedes der drei Geschäfte wie folgt festgelegt: 3 ± 0,1, 1 ± 0,1, 2 ± 0,1.



Die rentabelste Option für uns ist natürlich, drei Tonnen an das erste Geschäft zu senden, eine an die zweite und zwei Tonnen Kuchen an die dritte.



Um dieses Problem zu lösen, verwenden wir eine benutzerdefinierte Fitnessumgebung sowie Deep Q Learning (Keras-Implementierung).



Benutzerdefinierte Umgebung



Wir werden den Zustand der Umwelt mit drei echten positiven Zahlen beschreiben - den Rest der Produkte für den aktuellen Tag in jedem der drei Geschäfte. Die Aktionen des Agenten sind Zahlen von 0 bis einschließlich 5, die die Indizes der Permutation der ganzen Zahlen 1, 2 und 3 bezeichnen. Es ist klar, dass die vorteilhafteste Aktion unter dem 4. Index (3, 1, 2) liegt. Wir betrachten die Aufgabe als episodisch in einer Episode von 30 Tagen.



import gym
from gym import error, spaces, utils
from gym.utils import seeding

import itertools
import random
import time

class ShopsEnv(gym.Env):
  metadata = {'render.modes': ['human']}
  
  #  ,   
  #  
  def __init__(self):
    self.state = [0, 0, 0]  #  
    self.next_state = [0, 0, 0]  #  
    self.done = False  #   
    self.actions = list(itertools.permutations([1, 2, 3]))  #    
    self.reward = 0  #    
    self.time_tracker = 0  #   
    
    self.remembered_states = []  #     
    
    #   
    t = int( time.time() * 1000.0 )
    random.seed( ((t & 0xff000000) >> 24) +
                 ((t & 0x00ff0000) >>  8) +
                 ((t & 0x0000ff00) <<  8) +
                 ((t & 0x000000ff) << 24)   )
  
  #       ()  
  def step(self, action_num):
    #      
    if self.done:
        return [self.state, self.reward, self.done, self.next_state]
    else:
        #    
        self.state = self.next_state
        
        #  
        self.remembered_states.append(self.state) 
    
        #  
        self.time_tracker += 1
        
        #       
        action = self.actions[action_num]
        
        #  ,    ( )
        self.next_state = [x + y for x, y in zip(action, self.state)]
        
        #    
        self.next_state[0] -= (3 + random.uniform(-0.1, 0.1))
        self.next_state[1] -= (1 + random.uniform(-0.1, 0.1))
        self.next_state[2] -= (2 + random.uniform(-0.1, 0.1))
        
        #    
        if any([x < 0 for x in self.next_state]):
            self.reward = sum([x for x in self.next_state if x < 0])
        else:
            self.reward = 1
            
        #       
        #     
        #       (    ),
        #      
        if self.time_tracker >= 3:
            remembered_state = self.remembered_states.pop(0)
            self.next_state = [max(x - y, 0) for x, y in zip(self.next_state, remembered_state)]
        else:
            self.next_state = [max(x, 0) for x in self.next_state]
        
        
        #     30 
        self.done = self.time_tracker == 30
        
        #      
        return [self.state, self.reward, self.done, self.next_state]
  
  #   
  def reset(self):
    #      
    self.state = [0, 0, 0]
    self.next_state = [0, 0, 0]
    self.done = False
    self.reward = 0
    self.time_tracker = 0
    
    self.remembered_states = []
    
    t = int( time.time() * 1000.0 )
    random.seed( ((t & 0xff000000) >> 24) +
                 ((t & 0x00ff0000) >>  8) +
                 ((t & 0x0000ff00) <<  8) +
                 ((t & 0x000000ff) << 24)   )
    
    #   
    return self.state
  
  #     :
  #      
  def render(self, mode='human', close=False):
    print('-'*20)
    print('First shop')
    print('Pies:', self.state[0])

    print('Second shop')
    print('Pies:', self.state[1])

    print('Third shop')
    print('Pies:', self.state[2])
    print('-'*20)
    print('')


Hauptimporte



import numpy as np #  
import pandas as pd #  
import gym #  
import gym_shops #    
from tqdm import tqdm #   

#  
import matplotlib.pyplot as plt
import seaborn as sns
from IPython.display import clear_output
sns.set_color_codes()

#  
from collections import deque
from keras.models import Sequential
from keras.layers import Dense
from keras.optimizers import Adam
import random #   


Agenten definieren



class DQLAgent(): 
    
    def __init__(self, env):
        #    
       
        self.state_size = 3 #    
        self.action_size = 6 #    
        
        #    replay()
        self.gamma = 0.99
        self.learning_rate = 0.01
        
        #    adaptiveEGreedy()
        self.epsilon = 0.99
        self.epsilon_decay = 0.99
        self.epsilon_min = 0.0001
        
        self.memory = deque(maxlen = 5000) #   5000  ,    -   
        
        #   (NN)
        self.model = self.build_model()
    
    #      Deep Q Learning
    def build_model(self):
        model = Sequential()
        model.add(Dense(10, input_dim = self.state_size, activation = 'sigmoid')) #   
        model.add(Dense(50, activation = 'sigmoid')) #  
        model.add(Dense(10, activation = 'sigmoid')) #  
        model.add(Dense(self.action_size, activation = 'sigmoid')) #  
        model.compile(loss = 'mse', optimizer = Adam(lr = self.learning_rate))
        return model
    
    #    
    def remember(self, state, action, reward, next_state, done):
        self.memory.append((state, action, reward, next_state, done))
    
    #   
    def act(self, state):
        #     0  1  epsilon
        #     (exploration)
        if random.uniform(0,1) <= self.epsilon:
            return random.choice(range(6))
        else:
            #          
            act_values = self.model.predict(state)
            return np.argmax(act_values[0])
            
    
    #     
    def replay(self, batch_size):
        
        #   ,        
        if len(self.memory) < batch_size:
            return
        
        minibatch = random.sample(self.memory, batch_size) #  batch_size    
        #     
        for state, action, reward, next_state, done in minibatch:
            if done: #    -      
                target = reward
            else:
                #       
                target = reward + self.gamma * np.amax(self.model.predict(next_state)[0]) 
                # target = R(s,a) + gamma * max Q`(s`,a`)
                # target (max Q` value)     ,   s`  
            train_target = self.model.predict(state) # s --> NN --> Q(s,a) = train_target
            train_target[0][action] = target
            self.model.fit(state, train_target, verbose = 0)
    
    #    exploration rate,
    #   epsilon
    def adaptiveEGreedy(self):
        if self.epsilon > self.epsilon_min:
            self.epsilon *= self.epsilon_decay




Trainiere den Agenten



#  gym   
env = gym.make('shops-v0')
agent = DQLAgent(env)

#   
batch_size = 100
episodes = 1000

#  
progress_bar = tqdm(range(episodes), position=0, leave=True)
for e in progress_bar:
    #  
    state = env.reset()
    state = np.reshape(state, [1, 3])

    #    , id       
    time = 0
    taken_actions = []
    sum_rewards = 0


    #   
    while True:
        #  
        action = agent.act(state)

        #  
        taken_actions.append(action)

        #     
        next_state, reward, done, _ = env.step(action)
        next_state = np.reshape(next_state, [1, 3])

        #     
        sum_rewards += reward

        #   
        agent.remember(state, action, reward, next_state, done)

        #    
        state = next_state

        #  replay
        agent.replay(batch_size)

        #  epsilon
        agent.adaptiveEGreedy()

        #   
        time += 1

        #   
        progress_bar.set_postfix_str(s='mean reward: {}, time: {}, epsilon: {}'.format(round(sum_rewards/time, 3), time, round(agent.epsilon, 3)), refresh=True)

        #     
        if done:
            #       
            clear_output(wait=True)
            sns.distplot(taken_actions, color="y")
            plt.title('Episode: ' + str(e))
            plt.xlabel('Action number')
            plt.ylabel('Occurrence in %')
            plt.show()
            break






Testen des Agenten



import time
trained_model = agent  #     
state = env.reset()  #  
state = np.reshape(state, [1,3])

#        
time_t = 0
MAX_EPISOD_LENGTH = 1000  #   
taken_actions = []
mean_reward = 0

#   
progress_bar = tqdm(range(MAX_EPISOD_LENGTH), position=0, leave=True)
for time_t in progress_bar:
    #     
    action = trained_model.act(state)
    next_state, reward, done, _ = env.step(action)
    next_state = np.reshape(next_state, [1,3])
    state = next_state
    taken_actions.append(action)

    #   
    clear_output(wait=True)
    env.render()
    progress_bar.set_postfix_str(s='time: {}'.format(time_t), refresh=True)
    print('Reward:', round(env.reward, 3))
    time.sleep(0.5)

    mean_reward += env.reward

    if done:
        break

#    
sns.distplot(taken_actions, color='y')
plt.title('Test episode - mean reward: ' + str(round(mean_reward/(time_t+1), 3)))
plt.xlabel('Action number')
plt.ylabel('Occurrence in %')
plt.show()    






Gesamt



So verstand der Agent schnell, wie er am profitabelsten handeln konnte.



Im Allgemeinen gibt es noch viel Raum zum Experimentieren: Sie können die Anzahl der Geschäfte erhöhen, Aktionen diversifizieren oder sogar nur die Hyperparameter des Trainingsmodells ändern - und dies ist nur der Anfang der Liste.



Quellen






All Articles