使用强化学习AlphaZero算法训练中国象棋AI

您所在的位置:网站首页 怎么练象棋技术 使用强化学习AlphaZero算法训练中国象棋AI

使用强化学习AlphaZero算法训练中国象棋AI

2024-06-09 17:26| 来源: 网络整理| 查看: 265

案例目标

通过本案例的学习和课后作业的练习:

了解强化学习AlphaZero算法; 利用AlphaZero算法进行一次中国象棋AI训练;

你也可以将本案例相关的 ipynb 学习笔记分享到 AI Gallery Notebook 版块获得成长值,分享方法请查看此文档。

案例内容介绍

AlphaZero是一种强化学习算法,近期利用AlphaZero训练出的AI以绝对的优势战胜了多名围棋以及国际象棋冠军。AlphaZero创新点在于,它能够在不依赖于外部先验知识(也称专家知识),仅仅了解游戏规则的情况下,在棋盘类游戏中获得超越人类的表现。

本次案例将详细的介绍AlphaZero算法核心原理,包括神经网络构建、MCTS搜索、自博弈训练,以代码的形式加深对算法的理解,算法详情亦可见论文《Mastering the game of Go without human knowledge》。同时本案例提供中国象棋强化学习环境,利用AlphaZero进行一次中国象棋训练,最后可视化象棋AI自博弈对局。

由于训练一个强力的中国象棋AI需要大量的训练时间和资源,本案例偏重于算法理解,在运行过程中简化了训练过程,减少了自博弈次数和搜索次数。如果想要完整地训练一个中国象棋AlphaZero AI,可在AI Gallery中订阅《CChess中国象棋》算法,并在ModelArts中进行训练。

注意事项

本案例运行环境为 TensorFlow-1.13.1,且建议使用 GPU 运行,请查看《ModelAtrs JupyterLab 硬件规格使用指南》了解切换硬件规格的方法;

如果您是第一次使用 JupyterLab,请查看《ModelAtrs JupyterLab使用指导》了解使用方法;

如果您在使用 JupyterLab 过程中碰到报错,请参考《ModelAtrs JupyterLab常见问题解决办法》尝试解决问题;

请逐步运行下面的每一个代码块;

实验步骤 程序初始化 构建神经网络 实现MCTS 实现自博弈过程 进行训练参数配置 开始自博弈训练 模型更新 可视化对局 1. 程序初始化

第1步:安装基础依赖

要确保所有依赖都安装成功后,再执行之后的代码。如果某些模块因为网络原因导致安装失败,直接重试一次即可。

!pip install tornado==6.1.0 !pip install tflearn==0.3.2 !pip install tqdm !pip install urllib3==1.22 !pip install threadpool==1.3.2 !pip install xmltodict==0.12.0 !pip install requests !pip install pandas==0.19.2 !pip install numpy==1.14.5 !pip install scipy==1.1.0 !pip install matplotlib==2.0.0 !pip install nest_asyncio !pip install gast==0.2.2

第2步: 下载依赖包

import os import moxing as mox if not os.path.exists('cchess_training'): mox.file.copy("obs://modelarts-labs-bj4/course/modelarts/reinforcement_learning/cchess_gameplay/cchess_training/cchess_training.zip", "cchess_training.zip") os.system('unzip cchess_training.zip')

第3步:导入相关的库

%matplotlib notebook %matplotlib auto import os import sys import logging import subprocess import copy import random import json import asyncio import time import numpy as np import tensorflow as tf from multiprocessing import Process from cchess_training.cchess_zero import board_visualizer from cchess_training.gameplays import players, gameplay from cchess_training.config import conf from cchess_training.common.board import create_uci_labels from cchess_training.cchess_training_model_update import model_update from cchess_training.cchess_zero.gameboard import GameBoard from cchess_training.cchess_zero import cbf from cchess_training.utils import get_latest_weight_path import nest_asyncio nest_asyncio.apply() os.environ["TF_CPP_MIN_LOG_LEVEL"] = "2" tf.compat.v1.logging.set_verbosity(tf.compat.v1.logging.ERROR) logging.basicConfig(level=logging.INFO, format="[%(asctime)s] [%(levelname)s] [%(message)s]", datefmt='%Y-%m-%d %H:%M:%S' ) 2.构建神经网络

这里基于Resnet实现了AlphaZero中的神经网络,神经网络输入为当前象棋棋面转化得到的0-1图,大小为[10, 9, 14],[10, 9]表示象棋棋盘大小,[14]每一个plane对应一类棋子,我方7类(兵、炮、车、马、相、仕、将),敌方7类,共14个plane。经过Resnet提取特征后分为两个分支,一个是价值分支,输出当前棋面价值,另一个是策略头,输出神经网络计算得到的动作对应概率。

# resnet def res_block(inputx, name, training, block_num=2, filters=256, kernel_size=(3, 3)): net = inputx for i in range(block_num): net = tf.layers.conv2d( net, filters=filters, kernel_size=kernel_size, activation=None, name="{}_res_conv{}".format(name, i), padding='same' ) net = tf.layers.batch_normalization(net, training=training, name="{}_res_bn{}".format(name, i)) if i == block_num - 1: net = net + inputx net = tf.nn.elu(net, name="{}_res_elu{}".format(name, i)) return net def conv_block(inputx, name, training, block_num=1, filters=2, kernel_size=(1, 1)): net = inputx for i in range(block_num): net = tf.layers.conv2d( net, filters=filters, kernel_size=kernel_size, activation=None, name="{}_convblock_conv{}".format(name, i), padding='same' ) net = tf.layers.batch_normalization(net, training=training, name="{}_convblock_bn{}".format(name, i)) net = tf.nn.elu(net, name="{}_convblock_elu{}".format(name, i)) # net shape [None,10,9,2] netshape = net.get_shape().as_list() net = tf.reshape(net, shape=(-1, netshape[1] * netshape[2] * netshape[3])) net = tf.layers.dense(net, 10 * 9, name="{}_dense".format(name)) net = tf.nn.elu(net, name="{}_elu".format(name)) return net def res_net_board(inputx, name, training, filters=256, num_res_layers=4): net = inputx net = tf.layers.conv2d( net, filters=filters, kernel_size=(3, 3), activation=None, name="{}_res_convb".format(name), padding='same' ) net = tf.layers.batch_normalization(net, training=training, name="{}_res_bnb".format(name)) net = tf.nn.elu(net, name="{}_res_elub".format(name)) for i in range(num_res_layers): net = res_block(net, name="{}_layer_{}".format(name, i + 1), training=training, filters=filters) return net def get_scatter(name): with tf.variable_scope("Test"): ph = tf.placeholder(tf.float32, name=name) op = tf.summary.scalar(name, ph) return ph, op def average_gradients(tower_grads): """Calculate the average gradient for each shared variable across all towers. Note that this function provides a synchronization point across all towers. Args: tower_grads: List of lists of (gradient, variable) tuples. The outer list is over individual gradients. The inner list is over the gradient calculation for each tower. Returns: List of pairs of (gradient, variable) where the gradient has been averaged across all towers. """ average_grads = [] for grad_and_vars in zip(*tower_grads): # Note that each grad_and_vars looks like the following: # ((grad0_gpu0, var0_gpu0), ... , (grad0_gpuN, var0_gpuN)) grads = [] for g, _ in grad_and_vars: # Add 0 dimension to the gradients to represent the tower. expanded_g = tf.expand_dims(g, 0) # Append on a 'tower' dimension which we will average over below. grads.append(expanded_g) # Average over the 'tower' dimension. grad = tf.concat(grads, 0) grad = tf.reduce_mean(grad, 0) # Keep in mind that the Variables are redundant because they are shared # across towers. So .. we will just return the first tower's pointer to # the Variable. v = grad_and_vars[0][1] grad_and_var = (grad, v) average_grads.append(grad_and_var) return average_grads def add_grad_to_list(opt, train_param, loss, tower_grad): grads = opt.compute_gradients(loss, var_list=train_param) grads = [i[0] for i in grads] tower_grad.append(zip(grads, train_param)) def get_op_mul(tower_gradients, optimizer, gs): grads = average_gradients(tower_gradients) train_op = optimizer.apply_gradients(grads, gs) return train_op def reduce_mean(x): return tf.reduce_mean(x) def merge(x): return tf.concat(x, axis=0) def get_model_resnet( model_name, labels, gpu_core=[0], batch_size=512, num_res_layers=4, filters=256, extra=False, extrav2=False ): tf.reset_default_graph() graph = tf.Graph() with graph.as_default(): x_input = tf.placeholder(tf.float32, [None, 10, 9, 14]) nextmove = tf.placeholder(tf.float32, [None, len(labels)]) score = tf.placeholder(tf.float32, [None, 1]) training = tf.placeholder(tf.bool, name='training_mode') learning_rate = tf.placeholder(tf.float32) global_step = tf.train.get_or_create_global_step() optimizer_policy = tf.train.MomentumOptimizer(learning_rate=learning_rate, momentum=0.9) optimizer_value = tf.train.MomentumOptimizer(learning_rate=learning_rate, momentum=0.9) optimizer_multitarg = tf.train.MomentumOptimizer(learning_rate=learning_rate, momentum=0.9) tower_gradients_policy, tower_gradients_value, tower_gradients_multitarg = [], [], [] net_softmax_collection = [] value_head_collection = [] multitarget_loss_collection = [] value_loss_collection = [] policy_loss_collection = [] accuracy_select_collection = [] with tf.variable_scope(tf.get_variable_scope()) as vscope: for ind, one_core in enumerate(gpu_core): if one_core is not None: devicestr = "/gpu:{}".format(one_core) if one_core is not None else "" else: devicestr = '/cpu:0' with tf.device(devicestr): body = res_net_board( x_input[ind * (batch_size // len(gpu_core)):(ind + 1) * (batch_size // len(gpu_core))], "selectnet", training=training, filters=filters, num_res_layers=num_res_layers ) with tf.variable_scope("policy_head"): policy_head = tf.layers.conv2d(body, 2, 1, padding='SAME') policy_head = tf.contrib.layers.batch_norm( policy_head, center=False, epsilon=1e-5, fused=True, is_training=training, activation_fn=tf.nn.relu ) policy_head = tf.reshape(policy_head, [-1, 9 * 10 * 2]) policy_head = tf.contrib.layers.fully_connected(policy_head, len(labels), activation_fn=None) # 价值头 with tf.variable_scope("value_head"): value_head = tf.layers.conv2d(body, 1, 1, padding='SAME') value_head = tf.contrib.layers.batch_norm( value_head, center=False, epsilon=1e-5, fused=True, is_training=training, activation_fn=tf.nn.relu ) value_head = tf.reshape(value_head, [-1, 9 * 10 * 1]) value_head = tf.contrib.layers.fully_connected(value_head, 256, activation_fn=tf.nn.relu) value_head = tf.contrib.layers.fully_connected(value_head, 1, activation_fn=tf.nn.tanh) value_head_collection.append(value_head) net_unsoftmax = policy_head with tf.variable_scope("Loss"): policy_loss = tf.reduce_mean(tf.nn.softmax_cross_entropy_with_logits( labels=nextmove[ind * (batch_size // len(gpu_core)): (ind + 1) * (batch_size // len(gpu_core))], logits=net_unsoftmax)) value_loss = tf.losses.mean_squared_error( labels=score[ind * (batch_size // len(gpu_core)):(ind + 1) * (batch_size // len(gpu_core))], predictions=value_head) value_loss = tf.reduce_mean(value_loss) regularizer = tf.contrib.layers.l2_regularizer(scale=1e-5) regular_variables = tf.trainable_variables() l2_loss = tf.contrib.layers.apply_regularization(regularizer, regular_variables) multitarget_loss = value_loss + policy_loss + l2_loss multitarget_loss_collection.append(multitarget_loss) value_loss_collection.append(value_loss) policy_loss_collection.append(policy_loss) net_softmax = tf.nn.softmax(net_unsoftmax) net_softmax_collection.append(net_softmax) correct_prediction = tf.equal(tf.argmax(nextmove, 1), tf.argmax(net_softmax, 1)) with tf.variable_scope("Accuracy"): accuracy_select = tf.reduce_mean(tf.cast(correct_prediction, tf.float32)) accuracy_select_collection.append(accuracy_select) tf.get_variable_scope().reuse_variables() trainable_params = tf.trainable_variables() tp_policy = [i for i in trainable_params if ('value_head' not in i.name)] tp_value = [i for i in trainable_params if ('policy_head' not in i.name)] add_grad_to_list(optimizer_policy, tp_policy, policy_loss, tower_gradients_policy) add_grad_to_list(optimizer_value, tp_value, value_loss, tower_gradients_value) add_grad_to_list(optimizer_multitarg, trainable_params, multitarget_loss, tower_gradients_multitarg) update_ops = tf.get_collection(tf.GraphKeys.UPDATE_OPS) with tf.control_dependencies(update_ops): train_op_policy = get_op_mul(tower_gradients_policy, optimizer_policy, global_step) train_op_value = get_op_mul(tower_gradients_value, optimizer_value, global_step) train_op_multitarg = get_op_mul(tower_gradients_multitarg, optimizer_multitarg, global_step) net_softmax = merge(net_softmax_collection) value_head = merge(value_head_collection) multitarget_loss = reduce_mean(multitarget_loss_collection) value_loss = reduce_mean(value_loss_collection) policy_loss = reduce_mean(policy_loss_collection) accuracy_select = reduce_mean(accuracy_select_collection) with graph.as_default(): config = tf.ConfigProto() config.gpu_options.allow_growth = True config.allow_soft_placement = True sess = tf.Session(config=config) if model_name is not None: with graph.as_default(): saver = tf.train.Saver(var_list=tf.global_variables()) saver.restore(sess, model_name) else: with graph.as_default(): sess.run(tf.global_variables_initializer()) return (sess, graph), ((x_input, training), (net_softmax, value_head)) 3.实现MCTS

AlphaZero利用MCTS来自博弈生成棋局,MCTS搜索原理简述如下:

每次模拟通过选择具有最大行动价值Q的边加上取决于所存储的先验概率P和该边的访问计数N(每次访问都被增加一次)的上限置信区间U来遍历树;

展开叶子节点,通过神经网络来评估局面s,向量P的值存储在叶子结点扩展的边上;

更新行动价值Q等于在该行动下的子树中的所有评估值V的均值;

一旦MCTS搜索完成,返回局面s下的落子概率π。

def softmax(x): probs = np.exp(x - np.max(x)) probs /= np.sum(probs) return probs class TreeNode(object): """A node in the MCTS tree. Each node keeps track of its own value Q, prior probability P, and its visit-count-adjusted prior score u. """ def __init__(self, parent, prior_p, noise=False): self._parent = parent self._children = {} # a map from action to TreeNode self._n_visits = 0 self._Q = 0 self._u = 0 self._P = prior_p self.virtual_loss = 0 self.noise = noise def expand(self, action_priors): """Expand tree by creating new children. action_priors: a list of tuples of actions and their prior probability according to the policy function. """ # dirichlet noise should be applied when every select action if False and self.noise is True and self._parent is None: noise_d = np.random.dirichlet([0.3] * len(action_priors)) for (action, prob), one_noise in zip(action_priors, noise_d): if action not in self._children: prob = (1 - 0.25) * prob + 0.25 * one_noise self._children[action] = TreeNode(self, prob, noise=self.noise) else: for action, prob in action_priors: if action not in self._children: self._children[action] = TreeNode(self, prob) def select(self, c_puct): """Select action among children that gives maximum action value Q plus bonus u(P). Return: A tuple of (action, next_node) """ if self.noise is False: return max(self._children.items(), key=lambda act_node: act_node[1].get_value(c_puct)) elif self.noise is True and self._parent is not None: return max(self._children.items(), key=lambda act_node: act_node[1].get_value(c_puct)) else: noise_d = np.random.dirichlet([0.3] * len(self._children)) return max(list(zip(noise_d, self._children.items())), key=lambda act_node: act_node[1][1].get_value(c_puct, noise_p=act_node[0]))[1] def update(self, leaf_value): """Update node values from leaf evaluation. leaf_value: the value of subtree evaluation from the current player's perspective. """ # Count visit. self._n_visits += 1 # Update Q, a running average of values for all visits. self._Q += 1.0 * (leaf_value - self._Q) / self._n_visits def update_recursive(self, leaf_value): """Like a call to update(), but applied recursively for all ancestors. """ # If it is not root, this node's parent should be updated first. if self._parent: self._parent.update_recursive(-leaf_value) self.update(leaf_value) def get_value(self, c_puct, noise_p=None): """Calculate and return the value for this node. It is a combination of leaf evaluations Q, and this node's prior adjusted for its visit count, u. c_puct: a number in (0, inf) controlling the relative impact of value Q, and prior probability P, on this node's score. """ if noise_p is None: self._u = (c_puct * self._P * np.sqrt(self._parent._n_visits) / (1 + self._n_visits)) return self._Q + self._u + self.virtual_loss else: self._u = (c_puct * (self._P * 0.75 + noise_p * 0.25) * np.sqrt(self._parent._n_visits) / (1 + self._n_visits)) return self._Q + self._u + self.virtual_loss def is_leaf(self): """Check if leaf node (i.e. no nodes below this have been expanded).""" return self._children == {} def is_root(self): return self._parent is None class MCTS(object): """An implementation of Monte Carlo Tree Search.""" def __init__( self, policy_value_fn, c_puct=5, n_playout=10000, search_threads=32, virtual_loss=3, policy_loop_arg=False, dnoise=False, play=False ): """ policy_value_fn: a function that takes in a board state and outputs a list of (action, probability) tuples and also a score in [-1, 1] (i.e. the expected value of the end game score from the current player's perspective) for the current player. c_puct: a number in (0, inf) that controls how quickly exploration converges to the maximum-value policy. A higher value means relying on the prior more. """ self._root = TreeNode(None, 1.0, noise=dnoise) self._policy = policy_value_fn self._c_puct = c_puct self._n_playout = n_playout self.virtual_loss = virtual_loss self.loop = asyncio.get_event_loop() self.policy_loop_arg = policy_loop_arg self.sem = asyncio.Semaphore(search_threads) self.now_expanding = set() self.select_time = 0 self.policy_time = 0 self.update_time = 0 self.num_proceed = 0 self.dnoise = dnoise self.play = play async def _playout(self, state): """Run a single playout from the root to the leaf, getting a value at the leaf and propagating it back through its parents. State is modified in-place, so a copy must be provided. """ with await self.sem: node = self._root road = [] while 1: while node in self.now_expanding: await asyncio.sleep(1e-4) start = time.time() if node.is_leaf(): break # Greedily select next move. action, node = node.select(self._c_puct) road.append(node) node.virtual_loss -= self.virtual_loss state.do_move(action) self.select_time += (time.time() - start) # at leave node if long check or long catch then cut off the node if state.should_cutoff() and not self.play: # cut off node for one_node in road: one_node.virtual_loss += self.virtual_loss # now at this time, we do not update the entire tree branch, the accuracy loss is supposed to be small # set virtual loss to -inf so that other threads would not # visit the same node again(so the node is cut off) node.virtual_loss = - np.inf self.update_time += (time.time() - start) # however the proceed number still goes up 1 self.num_proceed += 1 return start = time.time() self.now_expanding.add(node) # Evaluate the leaf using a network which outputs a list of # (action, probability) tuples p and also a score v in [-1, 1] # for the current player if self.policy_loop_arg is False: action_probs, leaf_value = await self._policy(state) else: action_probs, leaf_value = await self._policy(state, self.loop) self.policy_time += (time.time() - start) start = time.time() # Check for end of game. end, winner = state.game_end() if not end: node.expand(action_probs) else: # for end state,return the "true" leaf_value if winner == -1: # tie leaf_value = 0.0 else: leaf_value = ( 1.0 if winner == state.get_current_player() else -1.0 ) # Update value and visit count of nodes in this traversal. for one_node in road: one_node.virtual_loss += self.virtual_loss node.update_recursive(-leaf_value) self.now_expanding.remove(node) # node.update_recursive(leaf_value) self.update_time += (time.time() - start) self.num_proceed += 1 def get_move_probs(self, state, temp=1e-3, predict_workers=[], can_apply_dnoise=False, verbose=False, infer_mode=False): """Run all playouts sequentially and return the available actions and their corresponding probabilities. state: the current game state temp: temperature parameter in (0, 1] controls the level of exploration """ if can_apply_dnoise is False: self._root.noise = False coroutine_list = [] for n in range(self._n_playout): state_copy = copy.deepcopy(state) coroutine_list.append(self._playout(state_copy)) coroutine_list += predict_workers self.loop.run_until_complete(asyncio.gather(*coroutine_list)) # calc the move probabilities based on visit counts at the root node act_visits = [(act, node._n_visits) for act, node in self._root._children.items()] acts, visits = zip(*act_visits) act_probs = softmax(1.0 / temp * np.log(np.array(visits) + 1e-10)) if infer_mode: info = [(act, node._n_visits, node._Q, node._P) for act, node in self._root._children.items()] if infer_mode: return acts, act_probs, info else: return acts, act_probs def update_with_move(self, last_move, allow_legacy=True): """Step forward in the tree, keeping everything we already know about the subtree. """ self.num_proceed = 0 if last_move in self._root._children and allow_legacy: self._root = self._root._children[last_move] self._root._parent = None else: self._root = TreeNode(None, 1.0, noise=self.dnoise) def __str__(self): return "MCTS" 4.实现自博弈过程

实现自博弈训练,基于同一个神经网络初始化对弈双方棋手,对弈过程中双方棋手每下一步前均采用MCTS搜索最优下子策略,每次自博弈一局结束后保存棋局。

# Self-play class Game(object): def __init__(self, white, black, verbose=True): self.white = white self.black = black self.verbose = verbose self.gamestate = gameplay.GameState() def play_till_end(self): winner = 'peace' moves = [] peace_round = 0 remain_piece = gameplay.countpiece(self.gamestate.statestr) while True: start_time = time.time() if self.gamestate.move_number % 2 == 0: player_name = 'w' player = self.white else: player_name = 'b' player = self.black move, score = player.make_move(self.gamestate) if move is None: winner = 'b' if player_name == 'w' else 'w' break moves.append(move) total_time = time.time() - start_time logging.info('move {} {} play {} use {:.2f}s'.format( self.gamestate.move_number, player_name, move, total_time,)) game_end, winner_p = self.gamestate.game_end() if game_end: winner = winner_p break remain_piece_round = gameplay.countpiece(self.gamestate.statestr) if remain_piece_round < remain_piece: remain_piece = remain_piece_round peace_round = 0 else: peace_round += 1 if peace_round > conf.non_cap_draw_round: winner = 'peace' break return winner, moves class NetworkPlayGame(Game): def __init__(self, network_w, network_b, **xargs): whiteplayer = players.NetworkPlayer('w', network_w, **xargs) blackplayer = players.NetworkPlayer('b', network_b, **xargs) super(NetworkPlayGame, self).__init__(whiteplayer, blackplayer) class ContinousNetworkPlayGames(object): def __init__( self, network_w=None, network_b=None, white_name='net', black_name='net', random_switch=True, recoard_game=True, recoard_dir='data/distributed/', play_times=np.inf, distributed_dir='data/prepare_weight', **xargs ): self.network_w = network_w self.network_b = network_b self.white_name = white_name self.black_name = black_name self.random_switch = random_switch self.play_times = play_times self.recoard_game = recoard_game self.recoard_dir = recoard_dir self.xargs = xargs # self.distributed_server = distributed_server self.distributed_dir = distributed_dir def begin_of_game(self): pass def end_of_game(self, cbf_name, moves, cbfile, training_dt, epoch): pass def play(self, data_url=None, epoch=0): num = 0 while num < self.play_times: time_one_game_start = time.time() num += 1 self.begin_of_game(epoch) if self.random_switch and random.random() < 0.5: self.network_w, self.network_b = self.network_b, self.network_w self.white_name, self.black_name = self.black_name, self.white_name network_play_game = NetworkPlayGame(self.network_w, self.network_b, **self.xargs) winner, moves = network_play_game.play_till_end() stamp = time.strftime('%Y-%m-%d_%H-%M-%S', time.localtime(time.time())) date = time.strftime('%Y-%m-%d', time.localtime(time.time())) cbfile = cbf.CBF( black=self.black_name, red=self.white_name, date=date, site='北京', name='noname', datemodify=date, redteam=self.white_name, blackteam=self.black_name, round='第一轮' ) cbfile.receive_moves(moves) randstamp = random.randint(0, 1000) cbffilename = '{}_{}_mcts-mcts_{}-{}_{}.cbf'.format( stamp, randstamp, self.white_name, self.black_name, winner) if not os.path.exists(self.recoard_dir): os.makedirs(self.recoard_dir) cbf_name = os.path.join(self.recoard_dir, cbffilename) cbfile.dump(cbf_name) training_dt = time.time() - time_one_game_start self.end_of_game(cbffilename, moves, cbfile, training_dt, epoch) class DistributedSelfPlayGames(ContinousNetworkPlayGames): def __init__(self, gpu_num=0, auto_update=True, mode='train', **kwargs): self.gpu_num = gpu_num self.auto_update = auto_update self.model_name_in_use = None # for tracking latest weight self.mode = mode super(DistributedSelfPlayGames, self).__init__(**kwargs) def begin_of_game(self, epoch): """ when self playing, init network player using the latest weights """ if not self.auto_update: return latest_model_name = get_latest_weight_path() logging.info('------------------ restoring model {}'.format(latest_model_name)) model_path = os.path.join(self.distributed_dir, latest_model_name) if self.network_w is None or self.network_b is None: network = get_model_resnet( model_path, create_uci_labels(), gpu_core=[self.gpu_num], filters=conf.network_filters, num_res_layers=conf.network_layers ) self.network_w = network self.network_b = network self.model_name_in_use = model_path else: if model_path != self.model_name_in_use: (sess, graph), ((X, training), (net_softmax, value_head)) = self.network_w with graph.as_default(): saver = tf.train.Saver(var_list=tf.global_variables()) saver.restore(sess, model_path) self.model_name_in_use = model_path def end_of_game(self, cbf_name, moves, cbfile, training_dt, epoch): played_games = len(os.listdir(conf.distributed_datadir)) if self.mode == 'train': logging.info('------------------ epoch {}: trained {} games, this game used {}s'.format( epoch, played_games, round(training_dt, 6), )) else: logging.info('------------------ infer {} games, this game used {}s'.format( played_games, round(training_dt, 6), )) def self_play_gpu(gpu_num=0, play_times=np.inf, mode='train', n_playout=50, save_dir=conf.distributed_datadir): logging.info('------------------ self play start') cn = DistributedSelfPlayGames( gpu_num=gpu_num, n_playout=n_playout, recoard_dir=save_dir, c_puct=conf.c_puct, distributed_dir=conf.distributed_server_weight_dir, dnoise=True, is_selfplay=True, play_times=play_times, mode=mode, ) cn.play(epoch=0) logging.info('------------------ self play done') 5.进行训练参数配置

配置一次训练过程中自博弈次数、训练结束后采用训练出的模型进行推理局数、训练batch_size。为简化训练过程参数均较小。

config = { "n_playout": 100, # MCTS搜索次数,推荐(10-1600),数字越小程序运行越快,数字越大算法搜索准确度越高 "self_play_games": 2, # 自博弈对局数, 推荐(5-10000),注意数字较大时可能会超过资源免费体验时长 "infer_games": 1, # 推理对局数 "gpu_num": 0, # 使用的GPU卡号 } 6.开始自博弈训练,结束后更新模型

运行过程中会打印出双方下棋动作

self_play_gpu(config['gpu_num'], config['self_play_games'], mode='train', n_playout=config['n_playout']) # model update model_update(gpu_num=config['gpu_num']) 7.可视化对局

(等待第六步运行结束后再运行此步)

在此将加载模型进行博弈一次,可视化对局过程,最后显示对弈结束时的棋面

self_play_gpu(config['gpu_num'], config['infer_games'], mode='infer', n_playout=config['n_playout'], save_dir='./infer_res')

加载对局文件显示双方所有动作,动作为棋盘上起点坐标至终点坐标,具体坐标定义见后面的棋盘。

%reload_ext autoreload %autoreload 2 %matplotlib inline from matplotlib import pyplot as plt from cchess_training.cchess_zero.gameboard import * from PIL import Image import imageio gameplay_path = './infer_res' while not os.path.exists(gameplay_path) or len(os.listdir(gameplay_path)) == 0: time.sleep(5) logging.info('第6步未运行结束,建议停止运行,重新逐步运行') gameplays = os.listdir(gameplay_path) fullpath = '{}/{}'.format(gameplay_path, random.choice(gameplays)) moves = cbf.cbf2move(fullpath) fname = fullpath.split('/')[-1] print(moves)

['b2e2', 'h7h5', 'b0c2', 'b7e7', 'h0g2', 'h5i5', 'h2i2', 'a9a7', 'i0i1', 'i5g5', 'c2e1', 'h9i7', 'c3c4', 'e6e5', 'a0b0', 'e7g7', 'g3g4', 'c6c5', 'i1h1', 'i7g8', 'e2e5', 'i9i8', 'g2f4', 'b9c7', 'g4g5', 'c7b9', 'f4e6', 'a7e7', 'e6g7', 'e7e5', 'b0b9', 'c5c4', 'i2e2', 'c4d4', 'e2e5', 'i8i7', 'b9c9', 'i7g7', 'h1h8', 'i6i5', 'a3a4', 'g7a7', 'i3i4', 'i5i4', 'g5g6', 'i4i3', 'h8g8', 'i3h3', 'g8f8', 'a7a8', 'f8f9', 'e9f9', 'g6f6', 'a6a5', 'a4a5', 'd4e4', 'e3e4', 'g9e7', 'g0e2', 'a8g8', 'c9d9', 'g8g1']

可视化对弈过程

import cv2 from IPython.display import clear_output, Image, display state = gameplay.GameState() statestr = 'RNBAKABNR/9/1C5C1/P1P1P1P1P/9/9/p1p1p1p1p/1c5c1/9/rnbakabnr' for move in moves: clear_output(wait=True) statestr = GameBoard.sim_do_action(move, statestr) img = board_visualizer.get_board_img(statestr) img_show = cv2.cvtColor(img, cv2.COLOR_RGBA2BGR) display(Image(data=cv2.imencode('.jpg', img_show)[1])) time.sleep(0.5)

显示终局棋面

plt.figure(figsize=(8,8)) plt.imshow(board_visualizer.get_board_img(statestr))

至此,本案例结束,如果想要完整地训练一个中国象棋AlphaZero AI,可在AI Gallery中订阅《CChess中国象棋》算法,并在ModelArts中进行训练。

8. 作业 请你调整步骤5中的训练参数,重新训练一个模型,使它在游戏中获得更好的表现



【本文地址】


今日新闻


推荐新闻


CopyRight 2018-2019 办公设备维修网 版权所有 豫ICP备15022753号-3