kibana/vars/withTaskQueue.groovy

155 lines
4.6 KiB
Groovy

import groovy.transform.Field
public static @Field TASK_QUEUES = [:]
public static @Field TASK_QUEUES_COUNTER = 0
/**
withTaskQueue creates a queue of "tasks" (just plain closures to execute), and executes them with your desired level of concurrency.
This way, you can define, for example, 40 things that need to execute, then only allow 10 of them to execute at once.
Each "process" will execute in a separate, unique, empty directory.
If you want each process to have a bootstrapped kibana repo, check out kibanaPipeline.withCiTaskQueue
Using the queue currently requires an agent/worker.
Usage:
withTaskQueue(parallel: 10) {
task { print "This is a task" }
// This is the same as calling task() multiple times
tasks([ { print "Another task" }, { print "And another task" } ])
// Tasks can queue up subsequent tasks
task {
buildThing()
task { print "I depend on buildThing()" }
}
}
You can also define a setup task that each process should execute one time before executing tasks:
withTaskQueue(parallel: 10, setup: { sh "my-setup-scrupt.sh" }) {
...
}
*/
def call(Map options = [:], Closure closure) {
def config = [ parallel: 10 ] + options
def counter = ++TASK_QUEUES_COUNTER
// We're basically abusing withEnv() to create a "scope" for all steps inside of a withTaskQueue block
// This way, we could have multiple task queue instances in the same pipeline
withEnv(["TASK_QUEUE_ID=${counter}"]) {
withTaskQueue.TASK_QUEUES[env.TASK_QUEUE_ID] = [
tasks: [],
tmpFile: sh(script: 'mktemp', returnStdout: true).trim()
]
closure.call()
def processesExecuting = 0
def processes = [:]
def iterationId = 0
for(def i = 1; i <= config.parallel; i++) {
def j = i
processes["task-queue-process-${j}"] = {
catchErrors {
withEnv([
"TASK_QUEUE_PROCESS_ID=${j}",
"TASK_QUEUE_ITERATION_ID=${++iterationId}"
]) {
dir("${WORKSPACE}/parallel/${j}/kibana") {
if (config.setup) {
config.setup.call(j)
}
def isDone = false
while(!isDone) { // TODO some kind of timeout?
catchErrors {
if (!getTasks().isEmpty()) {
processesExecuting++
catchErrors {
def task
try {
task = getTasks().pop()
} catch (java.util.NoSuchElementException ex) {
return
}
task.call()
}
processesExecuting--
// If a task finishes, and no new tasks were queued up, and nothing else is executing
// Then all of the processes should wake up and exit
if (processesExecuting < 1 && getTasks().isEmpty()) {
taskNotify()
}
return
}
if (processesExecuting > 0) {
taskSleep()
return
}
// Queue is empty, no processes are executing
isDone = true
}
}
}
}
}
}
}
parallel(processes)
}
}
// If we sleep in a loop using Groovy code, Pipeline Steps is flooded with Sleep steps
// So, instead, we just watch a file and `touch` it whenever something happens that could modify the queue
// There's a 20 minute timeout just in case something goes wrong,
// in which case this method will get called again if the process is actually supposed to be waiting.
def taskSleep() {
sh(script: """#!/bin/bash
TIMESTAMP=\$(date '+%s' -d "0 seconds ago")
for (( i=1; i<=240; i++ ))
do
if [ "\$(stat -c %Y '${getTmpFile()}')" -ge "\$TIMESTAMP" ]
then
break
else
sleep 5
if [[ \$i == 240 ]]; then
echo "Waited for new tasks for 20 minutes, exiting in case something went wrong"
fi
fi
done
""", label: "Waiting for new tasks...")
}
// Used to let the task queue processes know that either a new task has been queued up, or work is complete
def taskNotify() {
sh "touch '${getTmpFile()}'"
}
def getTasks() {
return withTaskQueue.TASK_QUEUES[env.TASK_QUEUE_ID].tasks
}
def getTmpFile() {
return withTaskQueue.TASK_QUEUES[env.TASK_QUEUE_ID].tmpFile
}
def addTask(Closure closure) {
getTasks() << closure
taskNotify()
}
def addTasks(List<Closure> closures) {
closures.reverse().each {
getTasks() << it
}
taskNotify()
}