DeepLearningExamples/MxNet/Classification/RN50v1.5/fit.py
2021-09-08 08:09:23 -07:00

550 lines
21 KiB
Python

# Copyright 2017-2018 The Apache Software Foundation
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
#
# -----------------------------------------------------------------------
#
# Copyright (c) 2019, NVIDIA CORPORATION. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
""" train fit utility """
import logging
import math
import os
import random
import sys
import time
from itertools import starmap
import dllogger
import horovod.mxnet as hvd
import mxnet as mx
import mxnet.contrib.amp as amp
import numpy as np
from mxnet import autograd as ag
from mxnet import gluon
import data
from benchmarking import BenchmarkingDataIter
from global_metrics import CompositeMeter, MaxMeter, MinMeter, AvgMeter, PercentileMeter
def add_fit_args(parser):
def int_list(x):
return list(map(int, x.split(',')))
def float_list(x):
return list(map(float, x.split(',')))
train = parser.add_argument_group('Training')
train.add_argument('--mode', default='train_val', choices=('train_val', 'train', 'val', 'pred'),
help='mode')
train.add_argument('--seed', type=int, default=None,
help='random seed')
train.add_argument('--gpus', type=int_list, default=[0],
help='list of gpus to run, e.g. 0 or 0,2,5')
train.add_argument('--kv-store', type=str, default='device', choices=('device', 'horovod'),
help='key-value store type')
train.add_argument('--dtype', type=str, default='float16', choices=('float32', 'float16'),
help='precision')
train.add_argument('--amp', action='store_true',
help='If enabled, turn on AMP (Automatic Mixed Precision)')
train.add_argument('--batch-size', type=int, default=192,
help='the batch size')
train.add_argument('--num-epochs', type=int, default=90,
help='number of epochs')
train.add_argument('--run-epochs', type=int, default=-1,
help='number of epochs to run in single run')
train.add_argument('--lr', type=float, default=0.1,
help='initial learning rate')
train.add_argument('--lr-schedule', choices=('multistep', 'cosine'), default='cosine',
help='learning rate schedule')
train.add_argument('--lr-factor', type=float, default=0.256,
help='the ratio to reduce lr on each step')
train.add_argument('--lr-steps', type=float_list, default=[],
help='the epochs to reduce the lr, e.g. 30,60')
train.add_argument('--warmup-epochs', type=int, default=5,
help='the epochs to ramp-up lr to scaled large-batch value')
train.add_argument('--optimizer', type=str, default='sgd',
help='the optimizer type')
train.add_argument('--mom', type=float, default=0.875,
help='momentum for sgd')
train.add_argument('--wd', type=float, default=1 / 32768,
help='weight decay for sgd')
train.add_argument('--label-smoothing', type=float, default=0.1,
help='label smoothing factor')
train.add_argument('--mixup', type=float, default=0,
help='alpha parameter for mixup (if 0 then mixup is not applied)')
train.add_argument('--disp-batches', type=int, default=20,
help='show progress for every n batches')
train.add_argument('--model-prefix', type=str, default='model',
help='model checkpoint prefix')
train.add_argument('--save-frequency', type=int, default=-1,
help='frequency of saving model in epochs (--model-prefix must be specified). '
'If -1 then save only best model. If 0 then do not save anything.')
train.add_argument('--begin-epoch', type=int, default=0,
help='start the model from an epoch')
train.add_argument('--load', help='checkpoint to load')
train.add_argument('--test-io', action='store_true',
help='test reading speed without training')
train.add_argument('--test-io-mode', default='train', choices=('train', 'val'),
help='data to test')
train.add_argument('--log', type=str, default='log.log',
help='file where to save the log from the experiment')
train.add_argument('--dllogger-log', type=str, default='dllogger_log.log',
help='file where to save the dllogger log from the experiment')
train.add_argument('--workspace', type=str, default='./',
help='path to directory where results will be stored')
train.add_argument('--no-metrics', action='store_true',
help='do not calculate evaluation metrics (for benchmarking)')
train.add_argument('--benchmark-iters', type=int, default=None,
help='run only benchmark-iters iterations from each epoch')
return train
def get_epoch_size(args, kv):
return math.ceil(args.num_examples / args.batch_size)
def get_lr_scheduler(args):
def multistep_schedule(x):
lr = args.lr * (args.lr_factor ** (len(list(filter(lambda step: step <= x, args.lr_steps)))))
warmup_coeff = min(1, x / args.warmup_epochs)
return warmup_coeff * lr
def cosine_schedule(x):
steps = args.lr_steps
if not steps or steps[0] > args.warmup_epochs:
steps = [args.warmup_epochs] + steps
elif not steps or steps[0] != 0:
steps = [0] + steps
if steps[-1] != args.num_epochs:
steps.append(args.num_epochs)
if x < args.warmup_epochs:
return args.lr * x / args.warmup_epochs
for i, (step, next_step) in enumerate(zip(steps, steps[1:])):
if next_step > x:
return args.lr * 0.5 * (1 + math.cos(math.pi * (x - step) / (next_step - step))) * (args.lr_factor ** i)
return 0
schedules = {
'multistep': multistep_schedule,
'cosine': cosine_schedule,
}
return schedules[args.lr_schedule]
def load_model(args, model):
if args.load is None:
return False
model.load_parameters(args.load)
logging.info('Loaded model {}'.format(args.load))
return True
def save_checkpoint(net, epoch, top1, best_acc, model_prefix, save_frequency, kvstore):
if model_prefix is None or save_frequency == 0 or ('horovod' in kvstore and hvd.rank() != 0):
return
if save_frequency > 0 and (epoch + 1) % save_frequency == 0:
fname = '{}_{:04}.params'.format(model_prefix, epoch)
net.save_parameters(fname)
logging.info('[Epoch {}] Saving checkpoint to {} with Accuracy: {:.4f}'.format(epoch, fname, top1))
if top1 > best_acc:
fname = '{}_best.params'.format(model_prefix)
net.save_parameters(fname)
logging.info('[Epoch {}] Saving checkpoint to {} with Accuracy: {:.4f}'.format(epoch, fname, top1))
def model_pred(args, model, image):
from imagenet_classes import classes
output = model(image.reshape(-1, *image.shape))[0].softmax().as_in_context(mx.cpu())
top = output.argsort(is_ascend=False)[:10]
for i, ind in enumerate(top):
ind = int(ind.asscalar())
logging.info('{:2d}. {:5.2f}% -> {}'.format(i + 1, output[ind].asscalar() * 100, classes[ind]))
def reduce_metrics(args, metrics, kvstore):
if 'horovod' not in kvstore or not metrics[0] or hvd.size() == 1:
return metrics
m = mx.ndarray.array(metrics[1], ctx=mx.gpu(args.gpus[0]))
reduced = hvd.allreduce(m)
values = reduced.as_in_context(mx.cpu()).asnumpy().tolist()
return (metrics[0], values)
def model_score(args, net, val_data, metric, kvstore):
if val_data is None:
logging.info('Omitting validation: no data')
return [], []
if not isinstance(metric, mx.metric.EvalMetric):
metric = mx.metric.create(metric)
metric.reset()
val_data.reset()
total_batch_size = val_data.batch_size * val_data._num_gpus * (hvd.size() if 'horovod' in kvstore else 1)
durations = []
tic = time.time()
outputs = []
for batches in val_data:
# synchronize to previous iteration
for o in outputs:
o.wait_to_read()
data = [b.data[0] for b in batches]
label = [b.label[0][:len(b.data[0]) - b.pad] for b in batches if len(b.data[0]) != b.pad]
outputs = [net(X) for X, b in zip(data, batches)]
outputs = [o[:len(b.data[0]) - b.pad] for o, b in zip(outputs, batches) if len(b.data[0]) != b.pad]
metric.update(label, outputs)
durations.append(time.time() - tic)
tic = time.time()
metric = reduce_metrics(args, metric.get_global(), kvstore)
durations = durations[min(len(durations) // 10, 100):]
duration_stats = {
'ips': total_batch_size / np.mean(durations),
'latency_avg': np.mean(durations),
}
return metric, duration_stats, durations
class ScalarMetric(mx.metric.Loss):
def update(self, _, scalar):
self.sum_metric += scalar
self.global_sum_metric += scalar
self.num_inst += 1
self.global_num_inst += 1
def label_smoothing(labels, classes, eta):
return labels.one_hot(classes, on_value=1 - eta + eta / classes, off_value=eta / classes)
def model_fit(args, net, train_data, eval_metric, optimizer,
optimizer_params, lr_scheduler, eval_data, global_metrics, kvstore, kv,
begin_epoch, num_epoch, run_epoch, model_prefix):
if not isinstance(eval_metric, mx.metric.EvalMetric):
eval_metric = mx.metric.create(eval_metric)
loss_metric = ScalarMetric()
if 'horovod' in kvstore:
trainer = hvd.DistributedTrainer(net.collect_params(), optimizer, optimizer_params)
else:
trainer = gluon.Trainer(net.collect_params(), optimizer, optimizer_params,
kvstore=kv, update_on_kvstore=False)
if args.amp:
amp.init_trainer(trainer)
sparse_label_loss = (args.label_smoothing == 0 and args.mixup == 0)
loss = gluon.loss.SoftmaxCrossEntropyLoss(sparse_label=sparse_label_loss)
loss.hybridize(static_shape=True, static_alloc=True)
local_batch_size = train_data.batch_size
total_batch_size = local_batch_size * train_data._num_gpus * (hvd.size() if 'horovod' in kvstore else 1)
durations = []
epoch_size = get_epoch_size(args, kv)
run_epoch = num_epoch if (run_epoch == -1) else (begin_epoch + run_epoch)
def transform_data(images, labels):
if args.mixup != 0:
coeffs = mx.nd.array(np.random.beta(args.mixup, args.mixup, size=images.shape[0])).as_in_context(
images.context)
image_coeffs = coeffs.astype(images.dtype, copy=False).reshape(*coeffs.shape, 1, 1, 1)
ret_images = image_coeffs * images + (1 - image_coeffs) * images[::-1]
ret_labels = label_smoothing(labels, args.num_classes, args.label_smoothing)
label_coeffs = coeffs.reshape(*coeffs.shape, 1)
ret_labels = label_coeffs * ret_labels + (1 - label_coeffs) * ret_labels[::-1]
else:
ret_images = images
if not sparse_label_loss:
ret_labels = label_smoothing(labels, args.num_classes, args.label_smoothing)
else:
ret_labels = labels
return ret_images, ret_labels
i = -1
best_accuracy = -1
for epoch in range(begin_epoch, min(run_epoch, num_epoch)):
tic = time.time()
btic = time.time()
etic = time.time()
train_data.reset()
eval_metric.reset()
loss_metric.reset()
logging.info('Starting epoch {}'.format(epoch))
outputs = []
for i, batches in enumerate(train_data):
# synchronize to previous iteration
#for o in outputs:
# o.wait_to_read()
trainer.set_learning_rate(lr_scheduler(epoch + i / epoch_size))
data = [b.data[0] for b in batches]
label = [b.label[0].as_in_context(b.data[0].context) for b in batches]
orig_label = label
data, label = zip(*starmap(transform_data, zip(data, label)))
outputs = []
Ls = []
with ag.record():
for x, y in zip(data, label):
z = net(x)
L = loss(z, y)
# store the loss and do backward after we have done forward
# on all GPUs for better speed on multiple GPUs.
Ls.append(L)
outputs.append(z)
if args.amp:
with amp.scale_loss(Ls, trainer) as scaled_loss:
ag.backward(scaled_loss)
else:
ag.backward(Ls)
if 'horovod' in kvstore:
trainer.step(local_batch_size)
else:
trainer.step(total_batch_size)
loss_metric.update(..., np.mean([l.asnumpy() for l in Ls]).item())
if args.disp_batches and not (i + 1) % args.disp_batches:
dllogger_it_data = {
'train.loss': loss_metric.get()[1],
'train.ips': args.disp_batches * total_batch_size / (time.time() - btic),
'train.lr': trainer.learning_rate
}
dllogger.log((epoch, i), data=dllogger_it_data)
loss_metric.reset_local()
btic = time.time()
durations.append(time.time() - tic)
tic = time.time()
durations = durations[min(len(durations) // 10, 100):]
dllogger_epoch_data = {
'train.loss': loss_metric.get_global()[1],
'train.ips': total_batch_size / np.mean(durations)
}
if args.mode == 'train_val':
logging.info('Validating epoch {}'.format(epoch))
score, duration_stats, _ = model_score(args, net, eval_data, eval_metric, kvstore)
dllogger_epoch_data.update(
starmap(lambda key, val: ('val.{}'.format(key), val), zip(*score))
)
dllogger_epoch_data.update(
starmap(lambda key, val: ('val.{}'.format(key), val), duration_stats.items())
)
score = dict(zip(*score))
accuracy = score.get('accuracy', -1)
save_checkpoint(net, epoch, accuracy, best_accuracy, model_prefix, args.save_frequency, kvstore)
best_accuracy = max(best_accuracy, accuracy)
global_metrics.update_dict(dllogger_epoch_data)
dllogger.log(step=(epoch,), data=dllogger_epoch_data)
def fit(args, model, data_loader):
"""
train a model
args : argparse returns
model : the the neural network model
data_loader : function that returns the train and val data iterators
"""
start_time = time.time()
# select gpu for horovod process
if 'horovod' in args.kv_store:
args.gpus = [args.gpus[hvd.local_rank()]]
if args.amp:
amp.init()
if args.seed is not None:
logging.info('Setting seeds to {}'.format(args.seed))
random.seed(args.seed)
np.random.seed(args.seed)
mx.random.seed(args.seed)
# kvstore
if 'horovod' in args.kv_store:
kv = None
rank = hvd.rank()
num_workers = hvd.size()
else:
kv = mx.kvstore.create(args.kv_store)
rank = kv.rank
num_workers = kv.num_workers
if args.test_io:
train, val = data_loader(args, kv)
if args.test_io_mode == 'train':
data_iter = train
else:
data_iter = val
tic = time.time()
for i, batch in enumerate(data_iter):
if isinstance(batch, list):
for b in batch:
for j in b.data:
j.wait_to_read()
else:
for j in batch.data:
j.wait_to_read()
if (i + 1) % args.disp_batches == 0:
logging.info('Batch [{}]\tSpeed: {:.2f} samples/sec'.format(
i, args.disp_batches * args.batch_size / (time.time() - tic)))
tic = time.time()
return
if not load_model(args, model):
# all initializers should be specified in the model definition.
# if not, this will raise an error
model.initialize(mx.init.Initializer())
# devices for training
devs = list(map(mx.gpu, args.gpus))
model.collect_params().reset_ctx(devs)
if args.mode == 'pred':
logging.info('Infering image {}'.format(args.data_pred))
model_pred(args, model, data.load_image(args, args.data_pred, devs[0]))
return
# learning rate
lr_scheduler = get_lr_scheduler(args)
optimizer_params = {
'learning_rate': 0,
'wd': args.wd,
'multi_precision': True,
}
# Only a limited number of optimizers have 'momentum' property
has_momentum = {'sgd', 'dcasgd', 'nag', 'signum', 'lbsgd'}
if args.optimizer in has_momentum:
optimizer_params['momentum'] = args.mom
# evaluation metrices
if not args.no_metrics:
eval_metrics = ['accuracy']
eval_metrics.append(mx.metric.create(
'top_k_accuracy', top_k=5))
else:
eval_metrics = []
train, val = data_loader(args, kv)
train = BenchmarkingDataIter(train, args.benchmark_iters)
if val is not None:
val = BenchmarkingDataIter(val, args.benchmark_iters)
if 'horovod' in args.kv_store:
# Fetch and broadcast parameters
params = model.collect_params()
if params is not None:
hvd.broadcast_parameters(params, root_rank=0)
global_metrics = CompositeMeter()
if args.mode in ['train_val', 'train']:
global_metrics.register_metric('train.loss', MinMeter())
global_metrics.register_metric('train.ips', AvgMeter())
if args.mode in ['train_val', 'val']:
global_metrics.register_metric('val.accuracy', MaxMeter())
global_metrics.register_metric('val.top_k_accuracy_5', MaxMeter())
global_metrics.register_metric('val.ips', AvgMeter())
global_metrics.register_metric('val.latency_avg', AvgMeter())
if args.mode in ['val']:
global_metrics.register_metric('val.latency_50', PercentileMeter(50))
global_metrics.register_metric('val.latency_90', PercentileMeter(90))
global_metrics.register_metric('val.latency_95', PercentileMeter(95))
global_metrics.register_metric('val.latency_99', PercentileMeter(99))
global_metrics.register_metric('val.latency_100', PercentileMeter(100))
# run
if args.mode in ['train_val', 'train']:
model_fit(
args,
model,
train,
begin_epoch=args.begin_epoch,
num_epoch=args.num_epochs,
run_epoch=args.run_epochs,
eval_data=val,
eval_metric=eval_metrics,
global_metrics=global_metrics,
kvstore=args.kv_store,
kv=kv,
optimizer=args.optimizer,
optimizer_params=optimizer_params,
lr_scheduler=lr_scheduler,
model_prefix=os.path.join(args.workspace, args.model_prefix),
)
elif args.mode == 'val':
for epoch in range(args.num_epochs): # loop for benchmarking
score, duration_stats, durations = model_score(args, model, val, eval_metrics, args.kv_store)
dllogger_data = dict(starmap(lambda key, val: ('val.{}'.format(key), val), zip(*score)))
dllogger_data.update(
starmap(lambda key, val: ('val.{}'.format(key), val), duration_stats.items())
)
global_metrics.update_dict(dllogger_data)
for percentile in [50, 90, 95, 99, 100]:
metric_name = 'val.latency_{}'.format(percentile)
dllogger_data[metric_name] = np.percentile(durations, percentile)
global_metrics.update_metric(metric_name, durations)
dllogger.log(step=(epoch,), data=dllogger_data)
else:
raise ValueError('Wrong mode')
mx.nd.waitall()
dllogger.log(tuple(), data=global_metrics.get())