[GNMT/PyT] Update for PyT GNMT (#944)

This commit is contained in:
Szymon Migacz 2021-05-27 09:48:37 -07:00 committed by GitHub
parent 555b84b3b1
commit ff6f7c6532
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
7 changed files with 266 additions and 57 deletions

View file

@ -2,4 +2,5 @@ __pycache__
tags
*.log
/results
/gnmt
/data

View file

@ -568,7 +568,7 @@ training setup:
maximum sequence length for training (including
special BOS and EOS tokens) (default: 50)
--train-min-length TRAIN_MIN_LENGTH
minimum sequence length for training (including
minimum sequence length for training (including
special BOS and EOS tokens) (default: 0)
--train-loader-workers TRAIN_LOADER_WORKERS
number of workers for training data loading (default:

View file

@ -1,4 +1,5 @@
pytablewriter==0.47.0
sacrebleu==1.2.10
sacremoses==0.0.19
pynvml==8.0.4
git+git://github.com/rsennrich/subword-nmt.git@48ba99e657591c329e0003f0c6e32e493fa959ef

View file

@ -0,0 +1,142 @@
import collections
import math
import os
import pathlib
import re
import pynvml
pynvml.nvmlInit()
def systemGetDriverVersion():
return pynvml.nvmlSystemGetDriverVersion()
def deviceGetCount():
return pynvml.nvmlDeviceGetCount()
class device:
# assume nvml returns list of 64 bit ints
_nvml_affinity_elements = math.ceil(os.cpu_count() / 64)
def __init__(self, device_idx):
super().__init__()
self.handle = pynvml.nvmlDeviceGetHandleByIndex(device_idx)
def getName(self):
return pynvml.nvmlDeviceGetName(self.handle)
def getCpuAffinity(self):
affinity_string = ''
for j in pynvml.nvmlDeviceGetCpuAffinity(
self.handle, device._nvml_affinity_elements
):
# assume nvml returns list of 64 bit ints
affinity_string = '{:064b}'.format(j) + affinity_string
affinity_list = [int(x) for x in affinity_string]
affinity_list.reverse() # so core 0 is in 0th element of list
ret = [i for i, e in enumerate(affinity_list) if e != 0]
return ret
def set_socket_affinity(gpu_id):
dev = device(gpu_id)
affinity = dev.getCpuAffinity()
os.sched_setaffinity(0, affinity)
def set_single_affinity(gpu_id):
dev = device(gpu_id)
affinity = dev.getCpuAffinity()
os.sched_setaffinity(0, affinity[:1])
def set_single_unique_affinity(gpu_id, nproc_per_node):
devices = [device(i) for i in range(nproc_per_node)]
socket_affinities = [dev.getCpuAffinity() for dev in devices]
siblings_list = get_thread_siblings_list()
siblings_dict = dict(siblings_list)
# remove siblings
for idx, socket_affinity in enumerate(socket_affinities):
socket_affinities[idx] = list(set(socket_affinity) - set(siblings_dict.values()))
affinities = []
assigned = []
for socket_affinity in socket_affinities:
for core in socket_affinity:
if core not in assigned:
affinities.append([core])
assigned.append(core)
break
os.sched_setaffinity(0, affinities[gpu_id])
def set_socket_unique_affinity(gpu_id, nproc_per_node, mode):
device_ids = [device(i) for i in range(nproc_per_node)]
socket_affinities = [dev.getCpuAffinity() for dev in device_ids]
siblings_list = get_thread_siblings_list()
siblings_dict = dict(siblings_list)
# remove siblings
for idx, socket_affinity in enumerate(socket_affinities):
socket_affinities[idx] = list(set(socket_affinity) - set(siblings_dict.values()))
socket_affinities_to_device_ids = collections.defaultdict(list)
for idx, socket_affinity in enumerate(socket_affinities):
socket_affinities_to_device_ids[tuple(socket_affinity)].append(idx)
for socket_affinity, device_ids in socket_affinities_to_device_ids.items():
devices_per_group = len(device_ids)
cores_per_device = len(socket_affinity) // devices_per_group
for group_id, device_id in enumerate(device_ids):
if device_id == gpu_id:
if mode == 'interleaved':
affinity = list(socket_affinity[group_id::devices_per_group])
elif mode == 'continuous':
affinity = list(socket_affinity[group_id*cores_per_device:(group_id+1)*cores_per_device])
else:
raise RuntimeError('Unknown set_socket_unique_affinity mode')
# reintroduce siblings
affinity += [siblings_dict[aff] for aff in affinity if aff in siblings_dict]
os.sched_setaffinity(0, affinity)
def get_thread_siblings_list():
path = '/sys/devices/system/cpu/cpu*/topology/thread_siblings_list'
thread_siblings_list = []
pattern = re.compile(r'(\d+)\D(\d+)')
for fname in pathlib.Path(path[0]).glob(path[1:]):
with open(fname) as f:
content = f.read().strip()
res = pattern.findall(content)
if res:
pair = tuple(map(int, res[0]))
thread_siblings_list.append(pair)
return thread_siblings_list
def set_affinity(gpu_id, nproc_per_node, mode='socket'):
if mode == 'socket':
set_socket_affinity(gpu_id)
elif mode == 'single':
set_single_affinity(gpu_id)
elif mode == 'single_unique':
set_single_unique_affinity(gpu_id, nproc_per_node)
elif mode == 'socket_unique_interleaved':
set_socket_unique_affinity(gpu_id, nproc_per_node, 'interleaved')
elif mode == 'socket_unique_continuous':
set_socket_unique_affinity(gpu_id, nproc_per_node, 'continuous')
else:
raise RuntimeError('Unknown affinity mode')
affinity = os.sched_getaffinity(0)
return affinity

View file

@ -207,6 +207,10 @@ def setup_logging(log_all_ranks=True, log_file=os.devnull):
rank = get_rank()
rank_filter = RankFilter(rank, log_all_ranks)
for handler in logging.root.handlers[:]:
logging.root.removeHandler(handler)
handler.close()
logging_format = "%(asctime)s - %(levelname)s - %(rank)s - %(message)s"
logging.basicConfig(level=logging.DEBUG,
format=logging_format,

View file

@ -26,6 +26,7 @@ import logging
import os
import sys
import time
import warnings
from ast import literal_eval
import dllogger
@ -34,7 +35,13 @@ import torch.nn.parallel
import torch.optim
import torch.utils.data.distributed
try:
import pyprof
except ModuleNotFoundError:
warnings.warn('PyProf is unavailable')
import seq2seq.data.config as config
import seq2seq.gpu_affinity as gpu_affinity
import seq2seq.train.trainer as trainers
import seq2seq.utils as utils
from seq2seq.data.dataset import LazyParallelDataset
@ -154,6 +161,15 @@ def parse_args():
help='controls preallocation')
general.add_argument('--dllog-file', type=str, default='train_log.json',
help='Name of the DLLogger output file')
general.add_argument('--affinity', type=str,
default='socket_unique_interleaved',
choices=['socket', 'single', 'single_unique',
'socket_unique_interleaved',
'socket_unique_continuous',
'disabled'],
help='type of CPU affinity')
general.add_argument('--profile', action='store_true',
help='Enable profiling with DLProf')
exclusive_group(group=general, name='eval', default=True,
help='run validation and test after every epoch')
@ -367,6 +383,14 @@ def main():
"""
training_start = time.time()
args = parse_args()
if args.affinity != 'disabled':
nproc_per_node = torch.cuda.device_count()
affinity = gpu_affinity.set_affinity(
args.local_rank,
nproc_per_node,
args.affinity
)
print(f'{args.local_rank}: thread affinity: {affinity}')
device = utils.set_device(args.cuda, args.local_rank)
utils.init_distributed(args.cuda)
args.rank = utils.get_rank()
@ -385,6 +409,12 @@ def main():
dllog_file = os.path.join(args.save_dir, args.dllog_file)
utils.setup_dllogger(enabled=True, filename=dllog_file)
if args.profile:
try:
pyprof.init(enable_function_stack=True)
except NameError:
warnings.warn('Called pyprof.init() but pyprof is not available')
if args.env:
utils.log_env_info()
@ -552,60 +582,61 @@ def main():
training_perf = []
break_training = False
test_bleu = None
for epoch in range(args.start_epoch, args.epochs):
logging.info(f'Starting epoch {epoch}')
with torch.autograd.profiler.emit_nvtx(enabled=args.profile):
for epoch in range(args.start_epoch, args.epochs):
logging.info(f'Starting epoch {epoch}')
train_loader.sampler.set_epoch(epoch)
train_loader.sampler.set_epoch(epoch)
trainer.epoch = epoch
train_loss, train_perf = trainer.optimize(train_loader)
training_perf.append(train_perf)
trainer.epoch = epoch
train_loss, train_perf = trainer.optimize(train_loader)
training_perf.append(train_perf)
# evaluate on validation set
if args.eval:
logging.info(f'Running validation on dev set')
val_loss, val_perf = trainer.evaluate(val_loader)
# evaluate on validation set
if args.eval:
logging.info(f'Running validation on dev set')
val_loss, val_perf = trainer.evaluate(val_loader)
# remember best prec@1 and save checkpoint
if args.rank == 0:
is_best = val_loss < best_loss
best_loss = min(val_loss, best_loss)
trainer.save(save_all=args.save_all, is_best=is_best)
if args.eval:
utils.barrier()
eval_fname = f'eval_epoch_{epoch}'
eval_path = os.path.join(args.save_dir, eval_fname)
_, eval_stats = translator.run(
calc_bleu=True,
epoch=epoch,
eval_path=eval_path,
)
test_bleu = eval_stats['bleu']
if args.target_bleu and test_bleu >= args.target_bleu:
logging.info(f'Target accuracy reached')
break_training = True
acc_log = []
acc_log += [f'Summary: Epoch: {epoch}']
acc_log += [f'Training Loss: {train_loss:.4f}']
if args.eval:
acc_log += [f'Validation Loss: {val_loss:.4f}']
acc_log += [f'Test BLEU: {test_bleu:.2f}']
perf_log = []
perf_log += [f'Performance: Epoch: {epoch}']
perf_log += [f'Training: {train_perf:.0f} Tok/s']
if args.eval:
perf_log += [f'Validation: {val_perf:.0f} Tok/s']
# remember best prec@1 and save checkpoint
if args.rank == 0:
is_best = val_loss < best_loss
best_loss = min(val_loss, best_loss)
trainer.save(save_all=args.save_all, is_best=is_best)
logging.info('\t'.join(acc_log))
logging.info('\t'.join(perf_log))
if args.eval:
utils.barrier()
eval_fname = f'eval_epoch_{epoch}'
eval_path = os.path.join(args.save_dir, eval_fname)
_, eval_stats = translator.run(
calc_bleu=True,
epoch=epoch,
eval_path=eval_path,
)
test_bleu = eval_stats['bleu']
if args.target_bleu and test_bleu >= args.target_bleu:
logging.info(f'Target accuracy reached')
break_training = True
acc_log = []
acc_log += [f'Summary: Epoch: {epoch}']
acc_log += [f'Training Loss: {train_loss:.4f}']
if args.eval:
acc_log += [f'Validation Loss: {val_loss:.4f}']
acc_log += [f'Test BLEU: {test_bleu:.2f}']
perf_log = []
perf_log += [f'Performance: Epoch: {epoch}']
perf_log += [f'Training: {train_perf:.0f} Tok/s']
if args.eval:
perf_log += [f'Validation: {val_perf:.0f} Tok/s']
if args.rank == 0:
logging.info('\t'.join(acc_log))
logging.info('\t'.join(perf_log))
logging.info(f'Finished epoch {epoch}')
if break_training:
break
logging.info(f'Finished epoch {epoch}')
if break_training:
break
utils.barrier()
training_stop = time.time()

View file

@ -33,6 +33,12 @@ import dllogger
import numpy as np
import torch
try:
import pyprof
except ModuleNotFoundError:
warnings.warn('PyProf is unavailable')
import seq2seq.gpu_affinity as gpu_affinity
import seq2seq.utils as utils
from seq2seq.data.dataset import RawTextDataset
from seq2seq.data.dataset import SyntheticDataset
@ -136,6 +142,15 @@ def parse_args():
help='Name of the DLLogger output file')
general.add_argument('--print-freq', '-p', default=1, type=int,
help='print log every PRINT_FREQ batches')
general.add_argument('--affinity', type=str,
default='single_unique',
choices=['socket', 'single', 'single_unique',
'socket_unique_interleaved',
'socket_unique_continuous',
'disabled'],
help='type of CPU affinity')
general.add_argument('--profile', action='store_true',
help='Enable profiling with DLProf')
# benchmarking
benchmark = parser.add_argument_group('benchmark setup')
@ -194,6 +209,14 @@ def main():
with length normalization and coverage penalty.
"""
args = parse_args()
if args.affinity != 'disabled':
nproc_per_node = torch.cuda.device_count()
affinity = gpu_affinity.set_affinity(
args.local_rank,
nproc_per_node,
args.affinity
)
print(f'{args.local_rank}: thread affinity: {affinity}')
device = utils.set_device(args.cuda, args.local_rank)
utils.init_distributed(args.cuda)
args.rank = utils.get_rank()
@ -203,6 +226,12 @@ def main():
dllog_file = os.path.join(args.save_dir, args.dllog_file)
utils.setup_dllogger(enabled=True, filename=dllog_file)
if args.profile:
try:
pyprof.init(enable_function_stack=True)
except NameError:
warnings.warn('Called pyprof.init() but pyprof is not available')
if args.env:
utils.log_env_info()
@ -288,13 +317,14 @@ def main():
)
# execute the inference
output, stats = translator.run(
calc_bleu=args.bleu,
eval_path=args.output,
summary=True,
warmup=args.warmup,
reference_path=args.reference,
)
with torch.autograd.profiler.emit_nvtx(enabled=args.profile):
output, stats = translator.run(
calc_bleu=args.bleu,
eval_path=args.output,
summary=True,
warmup=args.warmup,
reference_path=args.reference,
)
# print translated outputs
if not args.synthetic and (not args.output and args.rank == 0):
@ -341,7 +371,7 @@ def main():
'eval_avg_latency': avg_latency,
}
for p in args.percentiles:
summary[f'eval_{p}%_latency'] = 1000 * np.percentile(stats['runtimes'], p)
summary[f'eval_{p}%_latency'] = np.percentile(stats['runtimes'], p)
dllogger.log(step=tuple(), data=summary)