予想通り不合理 -FXと機械学習と-

FXの自動売買や機械学習、その他勉強したことをシェアします

Kaggle -Youtube-8M Challenge- ソース解析

kapiparaです。

 

本日はサンプルコードをいじるためにソースコードの解析を行います。

※確認したところ、見た目がくずれているのでいつか直す。

 

train.py

以下実行順に並べる。

引用は関数やclassの呼び出し。

 

if __name__ == "__main__":
app.run()

def main(unused_argv):
# Load the environment.
env = json.loads(os.environ.get("TF_CONFIG", "{}"))

# Load the cluster data from the environment.
cluster_data = env.get("cluster", None)
cluster = tf.train.ClusterSpec(cluster_data) if cluster_data else None

# Load the task data from the environment.
task_data = env.get("task", None) or {"type": "master", "index": 0}
task = type("TaskSpec", (object,), task_data)

# Logging the version.
logging.set_verbosity(tf.logging.INFO)
logging.info("%s: Tensorflow version: %s.",
task_as_string(task), tf.__version__)

# Dispatch to a master, a worker, or a parameter server.
if not cluster or task.type == "master" or task.type == "worker":
model = find_class_by_name(FLAGS.model,
[frame_level_models, video_level_models])()
def find_class_by_name(name, modules):
"""Searches the provided modules for the named class and returns it."""
modules = [getattr(module, name, None) for module in modules]
return next(a for a in modules if a)
 

reader = get_reader()
def get_reader():
# Convert feature_names and feature_sizes to lists of values.
feature_names, feature_sizes = utils.GetListOfFeatureNamesAndSizes(
FLAGS.feature_names, FLAGS.feature_sizes)
#↓さらに読み込み
def GetListOfFeatureNamesAndSizes(feature_names, feature_sizes):
"""Extract the list of feature names and the dimensionality of each feature
from string of comma separated values.

Args:
feature_names: string containing comma separated list of feature names
feature_sizes: string containing comma separated list of feature sizes

Returns:
List of the feature names and list of the dimensionality of each feature.
Elements in the first/second list are strings/integers.
"""
list_of_feature_names = [

feature_names.strip() for feature_names in feature_names.split(',')]
list_of_feature_sizes = [
int(feature_sizes) for feature_sizes in feature_sizes.split(',')]
if len(list_of_feature_names) != len(list_of_feature_sizes):
logging.error("length of the feature names (=" +
str(len(list_of_feature_names)) + ") != length of feature "
"sizes (=" + str(len(list_of_feature_sizes)) + ")")


return list_of_feature_names, list_of_feature_sizes

if FLAGS.frame_features:
reader = readers.YT8MFrameFeatureReader(
feature_names=feature_names, feature_sizes=feature_sizes)
#↓さらに読み込み
class YT8MFrameFeatureReader(BaseReader):
"""Reads TFRecords of SequenceExamples.

The TFRecords must contain SequenceExamples with the sparse in64 'labels'
context feature and a fixed length byte-quantized feature vector, obtained
from the features in 'feature_names'. The quantized features will be mapped
back into a range between min_quantized_value and max_quantized_value.
"""

def __init__(self,

num_classes=4716,
feature_sizes=[1024],
feature_names=["inc3"],
max_frames=300):
"""Construct a YT8MFrameFeatureReader.

Args:
num_classes: a positive integer for the number of classes.
feature_sizes: positive integer(s) for the feature dimensions as a list.
feature_names: the feature name(s) in the tensorflow record as a list.
max_frames: the maximum number of frames to process.
"""

assert len(feature_names) == len(feature_sizes), \

"length of feature_names (={}) != length of feature_sizes (={})".format( \
len(feature_names), len(feature_sizes))

self.num_classes = num_classes
self.feature_sizes = feature_sizes
self.feature_names = feature_names
self.max_frames = max_frames

def get_video_matrix(self,
features,
feature_size,
max_frames,
max_quantized_value,
min_quantized_value):
"""Decodes features from an input string and quantizes it.

Args:
features: raw feature values
feature_size: length of each frame feature vector
max_frames: number of frames (rows) in the output feature_matrix
max_quantized_value: the maximum of the quantized value.
min_quantized_value: the minimum of the quantized value.

Returns:
feature_matrix: matrix of all frame-features
num_frames: number of frames in the sequence
"""
decoded_features = tf.reshape(

tf.cast(tf.decode_raw(features, tf.uint8), tf.float32),
[-1, feature_size])

num_frames = tf.minimum(tf.shape(decoded_features)[0], max_frames)
feature_matrix = utils.Dequantize(decoded_features,
max_quantized_value,
min_quantized_value)
feature_matrix = resize_axis(feature_matrix, 0, max_frames)
return feature_matrix, num_frames

def prepare_reader(self,
filename_queue,
max_quantized_value=2,
min_quantized_value=-2):
"""Creates a single reader thread for YouTube8M SequenceExamples.

Args:
filename_queue: A tensorflow queue of filename locations.
max_quantized_value: the maximum of the quantized value.
min_quantized_value: the minimum of the quantized value.

Returns:
A tuple of video indexes, video features, labels, and padding data.
"""
reader = tf.TFRecordReader()

_, serialized_example = reader.read(filename_queue)

return self.prepare_serialized_examples(serialized_example,
max_quantized_value, min_quantized_value)

def prepare_serialized_examples(self, serialized_example,
max_quantized_value=2, min_quantized_value=-2):

contexts, features = tf.parse_single_sequence_example(
serialized_example,
context_features={"video_id": tf.FixedLenFeature(
[], tf.string),
"labels": tf.VarLenFeature(tf.int64)},
sequence_features={
feature_name : tf.FixedLenSequenceFeature([], dtype=tf.string)
for feature_name in self.feature_names
})

# read ground truth labels
labels = (tf.cast(

tf.sparse_to_dense(contexts["labels"].values, (self.num_classes,), 1,
validate_indices=False),
tf.bool))

# loads (potentially) different types of features and concatenates them
num_features = len(self.feature_names)

assert num_features > 0, "No feature selected: feature_names is empty!"

assert len(self.feature_names) == len(self.feature_sizes), \

"length of feature_names (={}) != length of feature_sizes (={})".format( \
len(self.feature_names), len(self.feature_sizes))

num_frames = -1 # the number of frames in the video
feature_matrices = [None] * num_features # an array of different features
for feature_index in range(num_features):

feature_matrix, num_frames_in_this_feature = self.get_video_matrix(
features[self.feature_names[feature_index]],
self.feature_sizes[feature_index],
self.max_frames,
max_quantized_value,
min_quantized_value)
if num_frames == -1:
num_frames = num_frames_in_this_feature
else:
tf.assert_equal(num_frames, num_frames_in_this_feature)

feature_matrices[feature_index] = feature_matrix

# cap the number of frames at self.max_frames
num_frames = tf.minimum(num_frames, self.max_frames)


# concatenate different features
video_matrix = tf.concat(feature_matrices, 1)


# convert to batch format.
# TODO: Do proper batch reads to remove the IO bottleneck.
batch_video_ids = tf.expand_dims(contexts["video_id"], 0)

batch_video_matrix = tf.expand_dims(video_matrix, 0)
batch_labels = tf.expand_dims(labels, 0)
batch_frames = tf.expand_dims(num_frames, 0)

return batch_video_ids, batch_video_matrix, batch_labels, batch_frames
else:
reader = readers.YT8MAggregatedFeatureReader(
feature_names=feature_names, feature_sizes=feature_sizes)

return reader

model_exporter = export_model.ModelExporter(
frame_features=FLAGS.frame_features,
model=model,
reader=reader)
class ModelExporter(object):

def __init__(self, frame_features, model, reader):
self.frame_features = frame_features
self.model = model
self.reader = reader

with tf.Graph().as_default() as graph:
self.inputs, self.outputs = self.build_inputs_and_outputs()
self.graph = graph
self.saver = tf.train.Saver(tf.trainable_variables(), sharded=True)

def export_model(self, model_dir, global_step_val, last_checkpoint):
"""Exports the model so that it can used for batch predictions."""

with self.graph.as_default():
with tf.Session() as session:
session.run(tf.global_variables_initializer())
self.saver.restore(session, last_checkpoint)

signature = signature_def_utils.build_signature_def(
inputs=self.inputs,
outputs=self.outputs,
method_name=signature_constants.PREDICT_METHOD_NAME)

signature_map = {signature_constants.DEFAULT_SERVING_SIGNATURE_DEF_KEY:
signature}

model_builder = saved_model_builder.SavedModelBuilder(model_dir)
model_builder.add_meta_graph_and_variables(session,
tags=[tag_constants.SERVING],
signature_def_map=signature_map,
clear_devices=True)
model_builder.save()

def build_inputs_and_outputs(self):
if self.frame_features:
serialized_examples = tf.placeholder(tf.string, shape=(None,))

fn = lambda x: self.build_prediction_graph(x)
video_id_output, top_indices_output, top_predictions_output = (
tf.map_fn(fn, serialized_examples,
dtype=(tf.string, tf.int32, tf.float32)))

else:
serialized_examples = tf.placeholder(tf.string, shape=(None,))

video_id_output, top_indices_output, top_predictions_output = (
self.build_prediction_graph(serialized_examples))

inputs = {"example_bytes":
saved_model_utils.build_tensor_info(serialized_examples)}

outputs = {
"video_id": saved_model_utils.build_tensor_info(video_id_output),
"class_indexes": saved_model_utils.build_tensor_info(top_indices_output),
"predictions": saved_model_utils.build_tensor_info(top_predictions_output)}

return inputs, outputs

def build_prediction_graph(self, serialized_examples):
video_id, model_input_raw, labels_batch, num_frames = (
self.reader.prepare_serialized_examples(serialized_examples))

feature_dim = len(model_input_raw.get_shape()) - 1
model_input = tf.nn.l2_normalize(model_input_raw, feature_dim)

with tf.variable_scope("tower"):
result = self.model.create_model(
model_input,
num_frames=num_frames,
vocab_size=self.reader.num_classes,
labels=labels_batch,
is_training=False)

for variable in slim.get_model_variables():
tf.summary.histogram(variable.op.name, variable)

predictions = result["predictions"]

top_predictions, top_indices = tf.nn.top_k(predictions,
_TOP_PREDICTIONS_IN_OUTPUT)
return video_id, top_indices, top_predictions
Trainer(cluster, task, FLAGS.train_dir, model, reader, model_exporter,
FLAGS.log_device_placement, FLAGS.max_steps,
FLAGS.export_model_steps).run(start_new_model=FLAGS.start_new_model)

 


class Trainer(object):
"""A Trainer to train a Tensorflow graph."""

def __init__(self, cluster, task, train_dir, model, reader, model_exporter,
log_device_placement=True, max_steps=None,
export_model_steps=1000):
""""Creates a Trainer.

Args:
cluster: A tf.train.ClusterSpec if the execution is distributed.
None otherwise.
task: A TaskSpec describing the job type and the task index.
"""

self.cluster = cluster
self.task = task
self.is_master = (task.type == "master" and task.index == 0)
self.train_dir = train_dir
self.config = tf.ConfigProto(
allow_soft_placement=True,log_device_placement=log_device_placement)
self.model = model
self.reader = reader
self.model_exporter = model_exporter
self.max_steps = max_steps
self.max_steps_reached = False
self.export_model_steps = export_model_steps
self.last_model_export_step = 0

# if self.is_master and self.task.index > 0:
# raise StandardError("%s: Only one replica of master expected",
# task_as_string(self.task))

def run(self, start_new_model=False):
"""Performs training on the currently defined Tensorflow graph.

Returns:
A tuple of the training Hit@1 and the training PERR.
"""
if self.is_master and start_new_model:
self.remove_training_directory(self.train_dir)

target, device_fn = self.start_server_if_distributed()

meta_filename = self.get_meta_filename(start_new_model, self.train_dir)

with tf.Graph().as_default() as graph:

if meta_filename:
saver = self.recover_model(meta_filename)

with tf.device(device_fn):
if not meta_filename:
saver = self.build_model(self.model, self.reader)

global_step = tf.get_collection("global_step")[0]
loss = tf.get_collection("loss")[0]
predictions = tf.get_collection("predictions")[0]
labels = tf.get_collection("labels")[0]
train_op = tf.get_collection("train_op")[0]
init_op = tf.global_variables_initializer()

sv = tf.train.Supervisor(
graph,
logdir=self.train_dir,
init_op=init_op,
is_chief=self.is_master,
global_step=global_step,
save_model_secs=15 * 60,
save_summaries_secs=120,
saver=saver)

logging.info("%s: Starting managed session.", task_as_string(self.task))
with sv.managed_session(target, config=self.config) as sess:
try:
logging.info("%s: Entering training loop.", task_as_string(self.task))
while (not sv.should_stop()) and (not self.max_steps_reached):
batch_start_time = time.time()
_, global_step_val, loss_val, predictions_val, labels_val = sess.run(
[train_op, global_step, loss, predictions, labels])
seconds_per_batch = time.time() - batch_start_time
examples_per_second = labels_val.shape[0] / seconds_per_batch

if self.max_steps and self.max_steps <= global_step_val:
self.max_steps_reached = True

if self.is_master and global_step_val % 10 == 0 and self.train_dir:
eval_start_time = time.time()
hit_at_one = eval_util.calculate_hit_at_one(predictions_val, labels_val)
perr = eval_util.calculate_precision_at_equal_recall_rate(predictions_val,
labels_val)
gap = eval_util.calculate_gap(predictions_val, labels_val)
eval_end_time = time.time()
eval_time = eval_end_time - eval_start_time

logging.info("training step " + str(global_step_val) + " | Loss: " + ("%.2f" % loss_val) +
" Examples/sec: " + ("%.2f" % examples_per_second) + " | Hit@1: " +
("%.2f" % hit_at_one) + " PERR: " + ("%.2f" % perr) +
" GAP: " + ("%.2f" % gap))

sv.summary_writer.add_summary(
utils.MakeSummary("model/Training_Hit@1", hit_at_one),
global_step_val)
sv.summary_writer.add_summary(
utils.MakeSummary("model/Training_Perr", perr), global_step_val)
sv.summary_writer.add_summary(
utils.MakeSummary("model/Training_GAP", gap), global_step_val)
sv.summary_writer.add_summary(
utils.MakeSummary("global_step/Examples/Second",
examples_per_second), global_step_val)
sv.summary_writer.flush()

# Exporting the model every x steps
time_to_export = ((self.last_model_export_step == 0) or
(global_step_val - self.last_model_export_step
>= self.export_model_steps))

if self.is_master and time_to_export:
self.export_model(global_step_val, sv.saver, sv.save_path, sess)
self.last_model_export_step = global_step_val
else:
logging.info("training step " + str(global_step_val) + " | Loss: " +
("%.2f" % loss_val) + " Examples/sec: " + ("%.2f" % examples_per_second))
except tf.errors.OutOfRangeError:
logging.info("%s: Done training -- epoch limit reached.",
task_as_string(self.task))

logging.info("%s: Exited training loop.", task_as_string(self.task))
sv.Stop()

def export_model(self, global_step_val, saver, save_path, session):

# If the model has already been exported at this step, return.
if global_step_val == self.last_model_export_step:
return

last_checkpoint = saver.save(session, save_path, global_step_val)

model_dir = "{0}/export/step_{1}".format(self.train_dir, global_step_val)
logging.info("%s: Exporting the model at step %s to %s.",
task_as_string(self.task), global_step_val, model_dir)

self.model_exporter.export_model(
model_dir=model_dir,
global_step_val=global_step_val,
last_checkpoint=last_checkpoint)

def start_server_if_distributed(self):
"""Starts a server if the execution is distributed."""

if self.cluster:
logging.info("%s: Starting trainer within cluster %s.",
task_as_string(self.task), self.cluster.as_dict())
server = start_server(self.cluster, self.task)
target = server.target
device_fn = tf.train.replica_device_setter(
ps_device="/job:ps",
worker_device="/job:%s/task:%d" % (self.task.type, self.task.index),
cluster=self.cluster)
else:
target = ""
device_fn = ""
return (target, device_fn)

def remove_training_directory(self, train_dir):
"""Removes the training directory."""
try:
logging.info(
"%s: Removing existing train directory.",
task_as_string(self.task))
gfile.DeleteRecursively(train_dir)
except:
logging.error(
"%s: Failed to delete directory " + train_dir +
" when starting a new model. Please delete it manually and" +
" try again.", task_as_string(self.task))

def get_meta_filename(self, start_new_model, train_dir):
if start_new_model:
logging.info("%s: Flag 'start_new_model' is set. Building a new model.",
task_as_string(self.task))
return None

latest_checkpoint = tf.train.latest_checkpoint(train_dir)
if not latest_checkpoint:
logging.info("%s: No checkpoint file found. Building a new model.",
task_as_string(self.task))
return None

meta_filename = latest_checkpoint + ".meta"
if not gfile.Exists(meta_filename):
logging.info("%s: No meta graph file found. Building a new model.",
task_as_string(self.task))
return None
else:
return meta_filename

def recover_model(self, meta_filename):
logging.info("%s: Restoring from meta graph file %s",
task_as_string(self.task), meta_filename)
return tf.train.import_meta_graph(meta_filename)

def build_model(self, model, reader):
"""Find the model and build the graph."""

label_loss_fn = find_class_by_name(FLAGS.label_loss, [losses])()
optimizer_class = find_class_by_name(FLAGS.optimizer, [tf.train])

build_graph(reader=reader,
model=model,
optimizer_class=optimizer_class,
clip_gradient_norm=FLAGS.clip_gradient_norm,
train_data_pattern=FLAGS.train_data_pattern,
label_loss_fn=label_loss_fn,
base_learning_rate=FLAGS.base_learning_rate,
learning_rate_decay=FLAGS.learning_rate_decay,
learning_rate_decay_examples=FLAGS.learning_rate_decay_examples,
regularization_penalty=FLAGS.regularization_penalty,
num_readers=FLAGS.num_readers,
batch_size=FLAGS.batch_size,
num_epochs=FLAGS.num_epochs)

return tf.train.Saver(max_to_keep=0, keep_checkpoint_every_n_hours=0.25)
elif task.type == "ps":
ParameterServer(cluster, task).run()
class ParameterServer(object):
"""A parameter server to serve variables in a distributed execution."""

def __init__(self, cluster, task):
"""Creates a ParameterServer.

Args:
cluster: A tf.train.ClusterSpec if the execution is distributed.
None otherwise.
task: A TaskSpec describing the job type and the task index.
"""

self.cluster = cluster
self.task = task

def run(self):
"""Starts the parameter server."""

logging.info("%s: Starting parameter server within cluster %s.",
task_as_string(self.task), self.cluster.as_dict())
server = start_server(self.cluster, self.task)
server.join()
 else:
raise ValueError("%s: Invalid task_type: %s." %
(task_as_string(task), task.type))


以上でmainの関数内部は終了。

train.pyには以下の関数、classのみ。
  • validate_class_name
  • get_input_data_tensors
  • find_class_by_name
  • build_graph
  • Trainer
  • get_reader
  • ParameterServer
  • start_server.
  • task_as_string
  • main
次回から、各関数・classの役割を解析していく。

以上