hotalert/task/executor/executor.go

144 lines
4.1 KiB
Go
Raw Normal View History

package executor
2022-11-03 20:04:14 +00:00
import (
"errors"
"fmt"
"hotalert/logging"
"hotalert/task"
"hotalert/task/functions"
2022-11-03 20:04:14 +00:00
"sync"
)
// Executor is an interface for implementing task executors.
type Executor interface {
// AddTask adds a task to the task executor.
AddTask(task *task.Task)
2022-11-03 20:04:14 +00:00
// Start starts the scrapper and returns a Result receive-only channel.
Start() <-chan *task.Result
2022-11-03 20:04:14 +00:00
// Shutdown shuts down the scrapper. It will block until the Executor was shut down.
Shutdown()
}
// ExecutionFunc is a type definition for a function that executes the task and returns an error.
type ExecutionFunc func(task *task.Task) error
2022-11-03 20:04:14 +00:00
// DefaultExecutor is a TaskExecutor with the default implementation.
// The tasks are executed directly on the machine.
type DefaultExecutor struct {
// workerGroup is a waiting group for worker goroutines.
workerGroup *sync.WaitGroup
// numberOfWorkerGoroutines is the number of working goroutines.
numberOfWorkerGoroutines int
// taskResultChan is a receive only channel for task results.
taskResultChan chan *task.Result
2022-11-03 20:04:14 +00:00
// taskChan is a channel for tasks
taskChan chan *task.Task
2022-11-03 20:04:14 +00:00
// quinChan is a channel for sending the quit command to worker goroutines.
quinChan chan int
}
// executionFuncMap is a map that holds all the possible values for ExecutionFunc.
// Right now it is hard-coded but in the future it may be extended dynamically.
var executionFuncMap = map[string]ExecutionFunc{
"web_scrape": functions.WebScrapeTask,
}
// RegisterNewExecutionFunction registers a new execution function.
func RegisterNewExecutionFunction(name string, function ExecutionFunc) error {
for n := range executionFuncMap {
if n == name {
return errors.New("function already exists")
}
}
executionFuncMap[name] = function
return nil
}
2022-11-03 20:04:14 +00:00
// NewDefaultExecutor returns a new instance of DefaultExecutor.
func NewDefaultExecutor() *DefaultExecutor {
2022-11-03 20:04:14 +00:00
ws := &DefaultExecutor{
workerGroup: &sync.WaitGroup{},
taskResultChan: make(chan *task.Result, 50),
taskChan: make(chan *task.Task, 50),
2022-11-03 20:04:14 +00:00
numberOfWorkerGoroutines: 5,
}
ws.quinChan = make(chan int, ws.numberOfWorkerGoroutines)
return ws
}
// AddTask adds a task to the DefaultExecutor queue.
func (ws *DefaultExecutor) AddTask(task *task.Task) {
2022-11-03 20:04:14 +00:00
ws.taskChan <- task
}
// executeTask executes the given task using DefaultTaskExecutionFuncName
func (ws *DefaultExecutor) executeTask(task *task.Task) error {
2022-11-03 20:04:14 +00:00
var taskErr error = nil
// Execute task and set panics as errors in taskResult.
func() {
defer func() {
if r := recover(); r != nil {
taskErr = errors.New(fmt.Sprintf("panic: %s", r))
}
}()
taskExecutionFunc, ok := executionFuncMap[task.ExecutionFuncName]
if !ok {
message := fmt.Sprintf("invalid task execution function name: '%s'", task.ExecutionFuncName)
logging.SugaredLogger.Error(message)
taskErr = errors.New(message)
return
}
err := taskExecutionFunc(task)
2022-11-03 20:04:14 +00:00
if err != nil {
taskErr = err
}
}()
return taskErr
}
// workerGoroutine waits for tasks and executes them.
// After the task is executed it forwards the result, including errors and panics to the task Result channel.
func (ws *DefaultExecutor) workerGoroutine() {
defer ws.workerGroup.Done()
for {
select {
case currentTask := <-ws.taskChan:
var taskResult = task.NewResult(currentTask)
err := ws.executeTask(currentTask)
taskResult.SetError(err)
2022-11-03 20:04:14 +00:00
// Forward TaskResult to channel.
ws.taskResultChan <- taskResult
case <-ws.quinChan:
// Quit
return
}
}
}
// Start starts the DefaultExecutor.
// Start returns a receive only channel with task Result.
func (ws *DefaultExecutor) Start() <-chan *task.Result {
2022-11-03 20:04:14 +00:00
// Start worker goroutines.
for i := 0; i < ws.numberOfWorkerGoroutines; i++ {
ws.workerGroup.Add(1)
go ws.workerGoroutine()
}
return ws.taskResultChan
}
// Shutdown shuts down the DefaultExecutor.
// Shutdown blocks till the DefaultExecutor has shutdown.
func (ws *DefaultExecutor) Shutdown() {
// Shutdown all worker goroutines
for i := 0; i < ws.numberOfWorkerGoroutines; i++ {
ws.quinChan <- 1
}
ws.workerGroup.Wait()
close(ws.taskChan)
close(ws.taskResultChan)
close(ws.quinChan)
}