Deep-Q learning Pong with Tensorflow and PyGame

In a previous post we went built a framework for running learning agents against PyGame. Now we’ll try and build something in it that can learn to play Pong.

We will be aided in this quest by two trusty friends Tensorflow Google’s recently released numerical computation library and this paper on reinforcement learning for Atari games by Deepmind. I’m going to assume some knowledge of Tensorflow here, if you don’t know much about it, it’s quite similar to Theano and here is a good starting point for learning.

Prerequisites

  • You will need Python 2 or 3 installed.
  • You will need to install PyGame which can be obtained here.
  • You will need to install Tensforflow which can be grabbed here.
  • You will need PyGamePlayer which can be cloned from the git hub here.
If you want to skip to the end the completed Deep Q agent is here in the PyGamePlayer project. The rest of this post with deal with why it works and how to build it.

Q-Learning

If you read the Deepmind paper you will find this definition of the Q function:
Q function
Lets try and understand it bit by bit. Imagine an agent trying to find it’s way out of a maze. In each step he knows his current location, s in the equation above, and can take an action, a, moving one square in any direction, unless blocked by a wall. If he gets to the exit he will get a reward and is moved to a new random square in the maze. The reward is represented by the r in the equation. The task Q-Learning aims to solve is learning the most efficient way to navigate the maze to get the greatest reward.
Bunny agent attempts to locate carrot reward

If the agent were to start by moving around the maze randomly he would eventually hit the reward which would let him know it’s location. He could then easily learn the rule that if your state is the reward square then you get a reward. He can also learn that if in any square adjacent to the reward square and you take the action of moving towards it you will get the reward. Thus he knows exactly the reward associated with those actions and can prioritize them over other actions.
But if just choosing the action with the biggest reward the agent won’t get far as for most squares the reward is zero. This where the max Q*(s’,a’) bit of the question comes in. We judge the reward we get from an action not just based on the reward we get for the state it puts us in but also best reward we could get from the best(max) actions available to us in that state. The gamma symbol γ is a const between 0 and 1 that acts as a discount on the reward of things in the future. So the action that gets the reward now is judged better than the action that gives the reward 2 turns from now.

The function Q* represents the abstract notion of the ideal Q* function, in most complex cases it will be impossible to calculate that exactly so we use a function approximator Q(s, a; θ). When a machine learning paper references a function approximator they are (almost always) talking about a neural net. These nets in Q learning are often referred to as Q-nets. The θ symbol in the Q function represents the parameters(weights and bias) of our net. In order to train our layer we will need a loss function, that is defined as:

Loss function

y here is the expected reward of the state using the parameters of our Q from iteration i-1. Here an example of running a q-function in tensorflow. In this example we are running the simplest state possible. It is just an array of states, with a reward for each and the agents actions are moving to adjacent states:

Setting up the agent in PyGamePlayer

Create a new file in the your current workspace, that should have the PyGamePlayer project it in(or simply create a new file in the examples directory in PyGamePlayer). Then create a new class that inherits from the PongPlayer class. This will handle getting the environment feedback for us. It gives reward when ever the players score increase and punishes whenever the opponents score increases. We will also add a main here to run it.

DeepQPongPlayer https://gist.github.com/DanielSlater/01c95b4e47dd12aa415a.js

If you run this you will see the player moving to the bottom of the screen as the pong AI mercilessly destroys him. More inteligence is needed, so we will override the get_keys_pressed method to actually do some real work. Also as a first step, because the Pong screen is quite big and I’m guessing none of us have a super computer lets compress the screen image so it’s not quite so tough on our gpu.

get_keys_pressed https://gist.github.com/DanielSlater/3e386976d2034eeee197.js

How do we apply Q-Learning to Pong?

Q-Learning makes plenty of sense in a maze scenario but how do we apply it to pong? The Q-function actions are simply the key press options, up, down, or no key pressed. The state could be the screen, but the problem with this is that even after compression our state is still huge, also Pong is a dynamic game, you can’t just look at a static frame and know what’s going on. Most importantly what direction the ball is moving.

We will want our input to be not just the current frame, but the last few frames, say 4. 80 times 80 pixels is 6400 times 4 frames that’s 25600 data points and each can be in 2 states(black or white) meaning there are 2 to the power of 25600 possible screen states. Slightly too many for any computer to reasonably deal with.

This is where the deep bit of deep Q come in. We will use deep convolutional nets(for a good write up of these try here) to compress that huge screen space into a smaller space of just 512 floats and then learn our q function from that output.

So first lets create our convolutional network with Tensorflow:

Create networkhttps://gist.github.com/DanielSlater/2b9afcc9dfa6eda0f39d.js

Now we will use the exact same technique we used for the simple Q-Learning example above, but this time the state will be a collection of the last 4 frames of the game and there will be 3 possible actions.

This is how you train the network:

https://gist.github.com/DanielSlater/f611a3aa737d894b689f.js
And getting the chosen action looks like this:

https://gist.github.com/DanielSlater/240fdfd4b42dcb2fb33d.js
So get_key_presses needs to be changed to store these observations:

https://gist.github.com/DanielSlater/b6781cc5bfdf0f5385bc.js
The normal training time for something like this even with a good GPU is in the order of days. But even if you were to train the current agent for days it would still perform pretty poorly. The reason for this is because if we start using the Q-function to determine our actions it will initially be exploring the space with a very poor weights. It is very likely that it will find some simple action that leads to a small improvement and get struck in a local minima doing that.

What we want is too delay using our weights until the agent has a good understanding of the space in which it exists. A good way to initially explore the space is to move randomly then over time slowly add in more and more moves chosen by the agent until eventually the agent is in full control.

Add this to the get_key_presses method
https://gist.github.com/DanielSlater/030e747a918abe8cff5f.js And then make the choose_next_action method this:
https://gist.github.com/DanielSlater/82a8209652bc593695e1.js And so now hazar, we have a Pong AI!

The PyGamePlayer project: https://github.com/DanielSlater/PyGamePlayer
The complete code for this example is here
Also I’ve now added the games mini-pong and half-pong which should be quicker to train against if you want to try out modifications.
And further here is a video of a talk I’ve done on this subject

How to run learning agents against PyGame


PyGame is the best supported library for programming games in Python. There are 1000’s of open source games that have been built with it.
I wanted to do some reinforcement learning neural networks in games and PyGame seemed the best choice.
But I was a bit disappointed that most examples involved hacking the original game files.

I wanted to see if I could write a general framework for running learning agents in PyGame that would require zero touching of the games files.
If you are interested in writing your own implementation of this then read on, but if you just want to dive straight in and start playing with your own learning agents I’ve uploaded a project here which contains examples of setting up Pong and Tetris.

Getting started

  • You will need Python 2 or 3 installed.
  • You will need to install PyGame which can be obtained here. You can also install it via pip, but this can be tricky in windows.
  • You will need numpy, downloadable from here or again can be installed from pip
  • You will also need a game of some sort to try and set up. Here is a good simple pong game.

    The plan

    There are 3 things a learning agent needs to be able to play a game:
    1. An array of all the pixels on the screen when ever it is updated.
    2. The ability to output a set of key presses back into the game.
    3. A feedback value to tell it when it is doing well/badly.
    We will tackle them in order.

    Grabbing the screen buffer

    In PyGame there is this handy method which will give us a 3 dimensional numpy array for each colour of every pixel on screen.

     screen = pygame.surfarray.array3d(pygame.display.get_surface())  

    We could write our learning agent by hacking the game file and inserting this line into the main loop after the screen has been updated. But a better way to do it(and a way that allows us to have zero touches of the game file) is to intercept any calls to the pygame.display.update method and then grab the buffer then, like so:

     import pygame  
    import pygame.surfarray

    # function that we can give two functions to and will return us a new function that calls both
    def function_combine(screen_update_func, our_intercepting_func):
    def wrap(*args, **kwargs):
    screen_update_func(*args,
    **kwargs) # call the screen update func we intercepted so the screen buffer is updated
    our_intercepting_func() # call our own function to get the screen buffer
    return wrap

    def on_screen_update():
    surface_array = pygame.surfarray.array3d(pygame.display.get_surface())
    print("We got the screen array")
    print(surface_array)

    # set our on_screen_update function to always get called whenever the screen updated
    pygame.display.update = function_combine(pygame.display.update, on_screen_update)
    # FYI the screen can also be modified via flip, so this might be needed for some games
    pygame.display.flip = function_combine(pygame.display.flip, on_screen_update)

    You can try this out by inserting this code before you start your game and it will print out the screen buffer as it comes in.

    Intercepting key presses

    The normal method in PyGame detecting key presses is via this method:

     events = pygame.event.get()  

    So we can intercept it and have it return our learning agents key presses:

     import pygame  
    from pygame.constants import K_DOWN, KEYDOWN

    def function_intercept(intercepted_func, intercepting_func):
    def wrap(*args, **kwargs):
    # call the function we are intercepting and get it's result
    real_results = intercepted_func(*args, **kwargs)
    # call our own function and return our new results
    new_results = intercepting_func(real_results, *args, **kwargs)
    return new_results
    return wrap

    def just_press_down_key(actual_events, *args, **kwargs):
    return [pygame.event.Event(KEYDOWN, {"key": K_DOWN})]

    pygame.event.get = function_intercept(pygame.event.get, just_press_down_key)

    I should also warn you that the pygame.event.get method can be called with args to filter out which events are needed. If your running a game that uses these you will either need to handle them or just use my complete implementation here.

    Getting feedback to the player

    The final piece of the puzzle is handling the feedback/reward from the game. Unfortunately there is no standard way of doing scoring in PyGame so this will always require some amount of going through the game code, but it can still be done with zero touches.

    For the pong game the scores are stored in two global variables bar1_score and bar2_score, which can be imported. Our reward is when the score changes in our favor.

     last_bar1_score = last_bar2_score = 0  

    def get_feedback():
    global last_bar1_score, last_bar2_score

    # import must be done inside the method because otherwise importing would cause the game to start playing
    from games.pong import bar1_score, bar2_score
    # get the difference in score between this and the last run
    score_change = (bar1_score - last_bar1_score) - (bar2_score - last_bar2_score)
    last_bar1_score = bar1_score
    last_bar2_score = bar2_score
    return score_change

    But for other games, such as Tetris there may not be a globally scoped score variable we can grab. But there may be a method or a set of that methods that we know are good/bad. Such as a player_takes_damage, level_up or kill_enemy. We can use our function_intercept code from before to grab these. Here is an example in Tetris using the result of removeCompleteLines to reward our agent:

     import tetris  

    new_reward = 0.0
    def add_removed_lines_to_reward(lines_removed, *args, **kwargs):
    global new_reward
    new_reward += lines_removed
    return lines_removed

    tetris.removeCompleteLines = function_intercept(tetris.removeCompleteLines,
    add_removed_lines_to_reward)

    def get_reward():
    global new_reward
    temp = new_reward
    new_reward = 0.0
    return temp

    Dealing with frame rates

    One final issue that you may need to consider is that the learning agent will significantly impact the execution speed of the game. In a lot of games the physics is scaled by the elapsed time since the last update. If your agent takes 1 second to process a single frame in pong then in the next update loop the ball will have already passed off the screen. The agent may also struggle to learn if there is significant variance in the movement of different frames.
    This can be handled by intercepting the pygame.time.get_ticks method and pygame.time.Clock in the same way as we have the other functions. See this file for details.

    Pong in PyGamePlayer

    Now all that remains is too stitch all those parts together and plug in the learning agent. In my project I’ve chosen to do this in a class, but it would be fine as a script.
    Below is an example of the full thing set up to learn against Pong using the PyGamePlayer module. The PongPlayer simply needs to inherit from the PyGamePlayer class and implement the get_keys_pressed and get_feeback methods, the framework handles everything else.

     from pygame.constants import K_DOWN  
    from pygame_player import PyGamePlayer

    class PongPlayer(PyGamePlayer):
    def __init__(self):
    """
    Example class for playing Pong
    """
    super(PongPlayer, self).__init__(force_game_fps=10) # we want to run at 10 frames per second
    self.last_bar1_score = 0.0
    self.last_bar2_score = 0.0

    def get_keys_pressed(self, screen_array, feedback):
    # The code for running the actual learning agent would go here with the screen_array and feeback as the inputs
    # and an output for each key in the game, activating them if they are over some threshold.
    return [K_DOWN]

    def get_feedback(self):
    # import must be done here because otherwise importing would cause the game to start playing
    from games.pong import bar1_score, bar2_score

    # get the difference in score between this and the last run
    score_change = (bar1_score - self.last_bar1_score) - (bar2_score - self.last_bar2_score)
    self.last_bar1_score = bar1_score
    self.last_bar2_score = bar2_score
    return score_change

    if __name__ == '__main__':
    player = PongPlayer()
    player.start()
    # importing pong will start the game playing
    import games.pong

    So hazar! We now have the worlds worst Pong AI.

    In my next post I’ll go through writing a good reinforcement learning agent for this.
    If you have any questions/correction please don’t hesitate to contact me.

    Full source code here.

    Particle Swarm Optimization in Python

    Here is a short and sweet particle swarm optimization implementation in Python. For more information on particle swarm optimization check out Particle swarm optimization in F#

    Example usage is like so:

    def simple_error_function(args):  
    return args[0]+args[1]

    number_of_parameters = 2
    max_iterations = 100
    best_parameters, best_error_score = particle_swarm_optimize(simple_error_function,
    number_of_parameters, max_iterations)

    The result will be the parameters that resulted in the lowest value for the simpe_error_function

    You can add custom parameter initialization:

    def simple_error_function(args):
    return args[0]+args[1]

    def parameter_init():
    return random.random()*0.5-10

    number_of_parameters = 2
    max_iterations = 100
    best_parameters, best_error_score = particle_swarm_optimize(simple_error_function, number_of_parameters,
    max_iterations, weight_init, parameter_init=parameter_init)

    Other arguments

    •  stopping_error: float = 0.001 #stop when we have a particle with this score 
    • num_particles: int = 10 #number of particles to run 
    • max_iterations_without_improvement: int = 10 #stop when we have this many consecutive turns without improvement 
    • c1: float = 2.0 # c1 PSO parameter 
    • c2: float = 2.0 # c1 PSO parameter

    Why there is a big unsupervised learning shaped whole in the universe

    The problems I had when I first started reading about unsupervised learning was, I didn’t understand why it needed to exist. Supervised learning makes sense, you give a neural net an input and output and tell it “find a way to get from one to the other”. Unsupervised learning doesn’t have that. You are just giving it some data and saying “do… something”. Even the diagram were confusing, I see input nodes, I see hidden nodes, what’s the output?
    The best way to get across the need for unsupervised learning is too talk about the end goal. Across the first year of a babies life it learns a huge amount of things. How to focus it’s eyes, how to coordinate it’s limbs, how to distinguish it’s parents from other adults, that objects are permanent things. Until it learns to talk it is getting almost no feedback about any of this, yet it still learns. Getting good training data is hard, so the idea is: wouldn’t it be great if we could set up a machine and it would just go off and learn on it’s own.
    Unfortunately so far we are a long way from that and the technique shown here seems trivial compared to that goal. But the goal is interesting enough that it is worth pursuing. The answer to the question “what am I asking an unsupervised network to do?” is “learn the data”.  The output will be a representation of the data that is simpler than the original. If the input is 10,000 pixels of an image the output can be any smaller number. What a lot of the simpler unsupervised nets do is transform into a single number that represents groups of similar sets of inputs. These are called clusters.

    An example competitive learning neural net

    A competitive learning neural net attempts groups it’s inputs into clusters. The code for it is really very simple. Here is the all that is needed in the constructor(if you don’t like Python it is also available in C#, Java and F#):
    from random import uniform

    class UnsupervisedNN(object):
    def __init__(self, size_of_input_arrays, number_of_clusters_to_group_data_into):
    #we have 1 hidden node for each cluster
    self.__connections = [[uniform(-0.5, 0.5) for j in range(number_of_clusters_to_group_data_into)] for i in range(size_of_input_arrays)]
    self.__hidden_nodes = [0.0]*number_of_clusters_to_group_data_into
    When we give it an input, it will activate the hidden nodes based on the sum of the connections between that input and each hidden node. It makes more sense in code, like so:
    def feed_forward(self, inputs):  
    #We expect inputs to be an array of floats of length size_of_input_arrays.
    for hidden_node_index in range(len(self.__hidden_nodes)):
    activation = 0.0
    #each hidden node will be activated from the inputs.
    for input_index in range(len(self.__connections)):
    activation += inputs[input_index]*self.__connections[input_index][hidden_node_index]

    self.__hidden_nodes[h] = activation

    #now we have activated all the hidden nodes we check which has the highest activation
    #this node is the winner and so the cluster we think this input belongs to
    return self.__hidden_nodes.index(max(self.__hidden_nodes))

    So as it stands we have a method for randomly assigning data to clusters. To make it something useful we need to improve the connections. There are many ways this can be done, in competitive learning after you have selected a winner you make your connections to it more like that input. A good analogy is imagine we 3 inputs one for each color red, green and blue. If we get the color yellow the inputs were red and green. So after a wining node is selected it’s connections to those colors are increased so future red and green items are more likely to be considered a part of the same cluster. But because there is no blue the connection to this is weakened:
    def Train(self, inputs):
    wining_cluster_index = self.feed_forward(inputs)
    learning_rate = 0.1
    for input_index in range(len(self.__connections)):
    weight = self.__connections[input_index][winner]
    self.__connections[input_index][wining_cluster_index] = weight + learning_rate*(inputs[input_index]-weight)
    A problem we can have here though is that a cluster can be initialized with terrible weights, such that nothing is ever assigned to it. In order to fix this a penalty added to each hidden node. when ever a hidden node is selected it’s penalty is increased. So that over time if a node keeps winning it’s the other nodes will eventually start getting selected. This penalty is also known as a conscience or bias.
    To add a bias we just need to initialize an array in the constructor for each cluster
         self.__conscience = [0.0]*number_of_clusters_to_group_data_into
    Change our feed forward to

    def feed_forward(self, inputs):
    for hidden_node_index in range(len(self.__hidden_nodes)):
    activation = self.__conscience[hidden_node_index]
    for input_index in range(len(self.__connections)):
    activation += inputs[input_index]*self.__connections[input_index][hidden_node_index]

    self.__hidden_nodes[h] = activation

    return self.__hidden_nodes.index(max(self.__hidden_nodes))

    Then in training we just make a small substitution every time a cluster wins
         self.__conscience[winning_cluster_index] -= self.conscience_learning_rate
    Competitive learning nets are nice but come along long way from the goal of full unsupervised learning. In a future post I’m going to do a Restricted Boltzman Machine which is used in deep learning for the shallow layers to give us a simpler representation of an image to work with.
    Full code is available on git hub in PythonC#Java and F#