throttle: fix linear based strategies (#65422)
* throttle tests: fix detection of parallel execution The test wasn't able to detect if too many workers were running. On my laptop: - without this change, the 'throttle' target takes ~20 seconds - with this change, the 'throttle' target takes ~70 seconds - 1 second isn't long enough to encounter the issue * Fix throttle test when strategy is 'free' based 'free' strategy allows multiple tasks to be executed in parallel: use one 'throttledir' per task. Use 'linear' strategy with a dedicated play for cleanup/setup tasks * throttle: reset worker idx before queuing a new task * TestStrategyBase: define task.throttle otherwise '1' will be used instead of the default value due to the following expression being equal to '1': int(templar.template(task_mock.throttle)) Co-authored-by: James Cammarata <jimi@sngx.net>
This commit is contained in:
parent
7bbf4ad7d6
commit
bbbdc1c25c
7 changed files with 62 additions and 25 deletions
|
@ -0,0 +1,2 @@
|
||||||
|
bugfixes:
|
||||||
|
- "throttle: the linear strategy didn't always stuck with the throttle limit"
|
|
@ -325,9 +325,26 @@ class StrategyBase:
|
||||||
|
|
||||||
# and then queue the new task
|
# and then queue the new task
|
||||||
try:
|
try:
|
||||||
|
# Determine the "rewind point" of the worker list. This means we start
|
||||||
|
# iterating over the list of workers until the end of the list is found.
|
||||||
|
# Normally, that is simply the length of the workers list (as determined
|
||||||
|
# by the forks or serial setting), however a task/block/play may "throttle"
|
||||||
|
# that limit down.
|
||||||
|
rewind_point = len(self._workers)
|
||||||
|
if throttle > 0 and self.ALLOW_BASE_THROTTLING:
|
||||||
|
if task.run_once:
|
||||||
|
display.debug("Ignoring 'throttle' as 'run_once' is also set for '%s'" % task.get_name())
|
||||||
|
else:
|
||||||
|
if throttle <= rewind_point:
|
||||||
|
display.debug("task: %s, throttle: %d" % (task.get_name(), throttle))
|
||||||
|
rewind_point = throttle
|
||||||
|
|
||||||
queued = False
|
queued = False
|
||||||
starting_worker = self._cur_worker
|
starting_worker = self._cur_worker
|
||||||
while True:
|
while True:
|
||||||
|
if self._cur_worker >= rewind_point:
|
||||||
|
self._cur_worker = 0
|
||||||
|
|
||||||
worker_prc = self._workers[self._cur_worker]
|
worker_prc = self._workers[self._cur_worker]
|
||||||
if worker_prc is None or not worker_prc.is_alive():
|
if worker_prc is None or not worker_prc.is_alive():
|
||||||
self._queued_task_cache[(host.name, task._uuid)] = {
|
self._queued_task_cache[(host.name, task._uuid)] = {
|
||||||
|
@ -346,19 +363,6 @@ class StrategyBase:
|
||||||
|
|
||||||
self._cur_worker += 1
|
self._cur_worker += 1
|
||||||
|
|
||||||
# Determine the "rewind point" of the worker list. This means we start
|
|
||||||
# iterating over the list of workers until the end of the list is found.
|
|
||||||
# Normally, that is simply the length of the workers list (as determined
|
|
||||||
# by the forks or serial setting), however a task/block/play may "throttle"
|
|
||||||
# that limit down.
|
|
||||||
rewind_point = len(self._workers)
|
|
||||||
if throttle > 0 and self.ALLOW_BASE_THROTTLING:
|
|
||||||
if task.run_once:
|
|
||||||
display.debug("Ignoring 'throttle' as 'run_once' is also set for '%s'" % task.get_name())
|
|
||||||
else:
|
|
||||||
if throttle <= rewind_point:
|
|
||||||
display.debug("task: %s, throttle: %d" % (task.get_name(), throttle))
|
|
||||||
rewind_point = throttle
|
|
||||||
if self._cur_worker >= rewind_point:
|
if self._cur_worker >= rewind_point:
|
||||||
self._cur_worker = 0
|
self._cur_worker = 0
|
||||||
|
|
||||||
|
|
4
test/integration/targets/throttle/group_vars/all.yml
Normal file
4
test/integration/targets/throttle/group_vars/all.yml
Normal file
|
@ -0,0 +1,4 @@
|
||||||
|
---
|
||||||
|
throttledir: '{{ base_throttledir }}/{{ subdir }}'
|
||||||
|
base_throttledir: "{{ lookup('env', 'OUTPUT_DIR') }}/throttle.dir"
|
||||||
|
subdir: "{{ test_id if lookup('env', 'SELECTED_STRATEGY') in ['free', 'host_pinned'] else '' }}"
|
|
@ -3,5 +3,5 @@
|
||||||
set -eux
|
set -eux
|
||||||
|
|
||||||
# https://github.com/ansible/ansible/pull/42528
|
# https://github.com/ansible/ansible/pull/42528
|
||||||
ANSIBLE_STRATEGY='linear' ansible-playbook test_throttle.yml -vv -i inventory --forks 12 "$@"
|
SELECTED_STRATEGY='linear' ansible-playbook test_throttle.yml -vv -i inventory --forks 12 "$@"
|
||||||
ANSIBLE_STRATEGY='free' ansible-playbook test_throttle.yml -vv -i inventory --forks 12 "$@"
|
SELECTED_STRATEGY='free' ansible-playbook test_throttle.yml -vv -i inventory --forks 12 "$@"
|
||||||
|
|
|
@ -24,6 +24,7 @@ try:
|
||||||
if len(throttlelist) > max_throttle:
|
if len(throttlelist) > max_throttle:
|
||||||
print(throttlelist)
|
print(throttlelist)
|
||||||
raise ValueError("Too many concurrent tasks: %d/%d" % (len(throttlelist), max_throttle))
|
raise ValueError("Too many concurrent tasks: %d/%d" % (len(throttlelist), max_throttle))
|
||||||
|
time.sleep(1.5)
|
||||||
finally:
|
finally:
|
||||||
# remove the file, then wait to make sure it's gone
|
# remove the file, then wait to make sure it's gone
|
||||||
os.unlink(throttlefile)
|
os.unlink(throttlefile)
|
||||||
|
|
|
@ -1,59 +1,84 @@
|
||||||
---
|
---
|
||||||
- hosts: localhosts
|
- hosts: localhosts
|
||||||
gather_facts: false
|
gather_facts: false
|
||||||
vars:
|
strategy: linear
|
||||||
throttledir: "{{ lookup('env', 'OUTPUT_DIR') }}/throttle.dir/"
|
run_once: yes
|
||||||
tasks:
|
tasks:
|
||||||
- name: Clean throttledir '{{ throttledir }}'
|
- name: Clean base throttledir '{{ base_throttledir }}'
|
||||||
file:
|
file:
|
||||||
state: absent
|
state: absent
|
||||||
path: '{{ throttledir }}'
|
path: '{{ base_throttledir }}'
|
||||||
ignore_errors: yes
|
ignore_errors: yes
|
||||||
run_once: yes
|
|
||||||
- name: Create throttledir '{{ throttledir }}'
|
- name: Create throttledir '{{ throttledir }}'
|
||||||
file:
|
file:
|
||||||
state: directory
|
state: directory
|
||||||
path: '{{ throttledir }}'
|
path: '{{ throttledir }}'
|
||||||
run_once: yes
|
loop: "{{ range(1, test_count|int)|list }}"
|
||||||
|
loop_control:
|
||||||
|
loop_var: test_id
|
||||||
|
vars:
|
||||||
|
test_count: "{{ 9 if lookup('env', 'SELECTED_STRATEGY') in ['free', 'host_pinned'] else 2 }}"
|
||||||
|
|
||||||
|
- hosts: localhosts
|
||||||
|
gather_facts: false
|
||||||
|
strategy: "{{ lookup('env', 'SELECTED_STRATEGY') }}"
|
||||||
|
tasks:
|
||||||
- block:
|
- block:
|
||||||
- name: "Test 1 (max throttle: 3)"
|
- name: "Test 1 (max throttle: 3)"
|
||||||
script: "test_throttle.py {{throttledir}} {{inventory_hostname}} 3"
|
script: "test_throttle.py {{throttledir}} {{inventory_hostname}} 3"
|
||||||
|
vars:
|
||||||
|
test_id: 1
|
||||||
throttle: 3
|
throttle: 3
|
||||||
- block:
|
- block:
|
||||||
- name: "Test 2 (max throttle: 5)"
|
- name: "Test 2 (max throttle: 5)"
|
||||||
script: "test_throttle.py {{throttledir}} {{inventory_hostname}} 5"
|
script: "test_throttle.py {{throttledir}} {{inventory_hostname}} 5"
|
||||||
throttle: 5
|
throttle: 5
|
||||||
|
vars:
|
||||||
|
test_id: 2
|
||||||
- block:
|
- block:
|
||||||
- name: "Test 3 (max throttle: 8)"
|
- name: "Test 3 (max throttle: 8)"
|
||||||
script: "test_throttle.py {{throttledir}} {{inventory_hostname}} 8"
|
script: "test_throttle.py {{throttledir}} {{inventory_hostname}} 8"
|
||||||
throttle: 8
|
throttle: 8
|
||||||
throttle: 6
|
throttle: 6
|
||||||
|
vars:
|
||||||
|
test_id: 3
|
||||||
- block:
|
- block:
|
||||||
- block:
|
- block:
|
||||||
- name: "Test 4 (max throttle: 8)"
|
- name: "Test 4 (max throttle: 8)"
|
||||||
script: "test_throttle.py {{throttledir}} {{inventory_hostname}} 8"
|
script: "test_throttle.py {{throttledir}} {{inventory_hostname}} 8"
|
||||||
throttle: 8
|
throttle: 8
|
||||||
|
vars:
|
||||||
|
test_id: 4
|
||||||
throttle: 6
|
throttle: 6
|
||||||
throttle: 12
|
throttle: 12
|
||||||
throttle: 15
|
throttle: 15
|
||||||
- block:
|
- block:
|
||||||
- name: "Test 1 (max throttle: 3)"
|
- name: "Teat 5 (max throttle: 3)"
|
||||||
script: "test_throttle.py {{throttledir}} {{inventory_hostname}} 3"
|
script: "test_throttle.py {{throttledir}} {{inventory_hostname}} 3"
|
||||||
|
vars:
|
||||||
|
test_id: 5
|
||||||
throttle: 3
|
throttle: 3
|
||||||
- block:
|
- block:
|
||||||
- name: "Test 2 (max throttle: 5)"
|
- name: "Test 6 (max throttle: 5)"
|
||||||
script: "test_throttle.py {{throttledir}} {{inventory_hostname}} 5"
|
script: "test_throttle.py {{throttledir}} {{inventory_hostname}} 5"
|
||||||
throttle: 5
|
throttle: 5
|
||||||
|
vars:
|
||||||
|
test_id: 6
|
||||||
- block:
|
- block:
|
||||||
- name: "Test 3 (max throttle: 6)"
|
- name: "Test 7 (max throttle: 6)"
|
||||||
script: "test_throttle.py {{throttledir}} {{inventory_hostname}} 6"
|
script: "test_throttle.py {{throttledir}} {{inventory_hostname}} 6"
|
||||||
throttle: 6
|
throttle: 6
|
||||||
|
vars:
|
||||||
|
test_id: 7
|
||||||
throttle: 3
|
throttle: 3
|
||||||
- block:
|
- block:
|
||||||
- block:
|
- block:
|
||||||
- name: "Test 4 (max throttle: 8)"
|
- name: "Test 8 (max throttle: 8)"
|
||||||
script: "test_throttle.py {{throttledir}} {{inventory_hostname}} 8"
|
script: "test_throttle.py {{throttledir}} {{inventory_hostname}} 8"
|
||||||
throttle: 8
|
throttle: 8
|
||||||
|
vars:
|
||||||
|
test_id: 8
|
||||||
throttle: 6
|
throttle: 6
|
||||||
throttle: 4
|
throttle: 4
|
||||||
throttle: 2
|
throttle: 2
|
||||||
|
|
|
@ -195,6 +195,7 @@ class TestStrategyBase(unittest.TestCase):
|
||||||
|
|
||||||
mock_task = MagicMock()
|
mock_task = MagicMock()
|
||||||
mock_task._uuid = 'abcd'
|
mock_task._uuid = 'abcd'
|
||||||
|
mock_task.throttle = 0
|
||||||
|
|
||||||
try:
|
try:
|
||||||
strategy_base = StrategyBase(tqm=tqm)
|
strategy_base = StrategyBase(tqm=tqm)
|
||||||
|
|
Loading…
Reference in a new issue