2022-11-05 13:22:12 +00:00
|
|
|
package executor
|
2022-11-03 20:04:14 +00:00
|
|
|
|
|
|
|
import (
|
|
|
|
"errors"
|
|
|
|
"fmt"
|
2022-11-05 13:22:12 +00:00
|
|
|
"hotalert/logging"
|
|
|
|
"hotalert/task"
|
|
|
|
"hotalert/task/functions"
|
2022-11-03 20:04:14 +00:00
|
|
|
"sync"
|
|
|
|
)
|
|
|
|
|
|
|
|
// Executor is an interface for implementing task executors.
|
|
|
|
type Executor interface {
|
|
|
|
// AddTask adds a task to the task executor.
|
2022-11-05 13:22:12 +00:00
|
|
|
AddTask(task *task.Task)
|
2022-11-03 20:04:14 +00:00
|
|
|
// Start starts the scrapper and returns a Result receive-only channel.
|
2022-11-05 13:22:12 +00:00
|
|
|
Start() <-chan *task.Result
|
2022-11-03 20:04:14 +00:00
|
|
|
// Shutdown shuts down the scrapper. It will block until the Executor was shut down.
|
|
|
|
Shutdown()
|
|
|
|
}
|
|
|
|
|
2022-11-05 13:22:12 +00:00
|
|
|
// ExecutionFunc is a type definition for a function that executes the task and returns an error.
|
|
|
|
type ExecutionFunc func(task *task.Task) error
|
2022-11-03 20:04:14 +00:00
|
|
|
|
|
|
|
// DefaultExecutor is a TaskExecutor with the default implementation.
|
|
|
|
// The tasks are executed directly on the machine.
|
|
|
|
type DefaultExecutor struct {
|
|
|
|
// workerGroup is a waiting group for worker goroutines.
|
|
|
|
workerGroup *sync.WaitGroup
|
|
|
|
// numberOfWorkerGoroutines is the number of working goroutines.
|
|
|
|
numberOfWorkerGoroutines int
|
|
|
|
// taskResultChan is a receive only channel for task results.
|
2022-11-05 13:22:12 +00:00
|
|
|
taskResultChan chan *task.Result
|
2022-11-03 20:04:14 +00:00
|
|
|
// taskChan is a channel for tasks
|
2022-11-05 13:22:12 +00:00
|
|
|
taskChan chan *task.Task
|
2022-11-03 20:04:14 +00:00
|
|
|
// quinChan is a channel for sending the quit command to worker goroutines.
|
|
|
|
quinChan chan int
|
|
|
|
}
|
|
|
|
|
2022-11-05 13:22:12 +00:00
|
|
|
// executionFuncMap is a map that holds all the possible values for ExecutionFunc.
|
|
|
|
// Right now it is hard-coded but in the future it may be extended dynamically.
|
|
|
|
var executionFuncMap = map[string]ExecutionFunc{
|
|
|
|
"web_scrape": functions.WebScrapeTask,
|
|
|
|
}
|
|
|
|
|
|
|
|
// RegisterNewExecutionFunction registers a new execution function.
|
|
|
|
func RegisterNewExecutionFunction(name string, function ExecutionFunc) error {
|
|
|
|
for n := range executionFuncMap {
|
|
|
|
if n == name {
|
|
|
|
return errors.New("function already exists")
|
|
|
|
}
|
|
|
|
}
|
|
|
|
executionFuncMap[name] = function
|
|
|
|
return nil
|
|
|
|
}
|
2022-11-03 20:04:14 +00:00
|
|
|
|
|
|
|
// NewDefaultExecutor returns a new instance of DefaultExecutor.
|
2022-11-05 13:22:12 +00:00
|
|
|
func NewDefaultExecutor() *DefaultExecutor {
|
2022-11-03 20:04:14 +00:00
|
|
|
ws := &DefaultExecutor{
|
|
|
|
workerGroup: &sync.WaitGroup{},
|
2022-11-05 13:22:12 +00:00
|
|
|
taskResultChan: make(chan *task.Result, 50),
|
|
|
|
taskChan: make(chan *task.Task, 50),
|
2022-11-03 20:04:14 +00:00
|
|
|
numberOfWorkerGoroutines: 5,
|
|
|
|
}
|
|
|
|
ws.quinChan = make(chan int, ws.numberOfWorkerGoroutines)
|
|
|
|
return ws
|
|
|
|
}
|
|
|
|
|
|
|
|
// AddTask adds a task to the DefaultExecutor queue.
|
2022-11-05 13:22:12 +00:00
|
|
|
func (ws *DefaultExecutor) AddTask(task *task.Task) {
|
2022-11-03 20:04:14 +00:00
|
|
|
ws.taskChan <- task
|
|
|
|
}
|
|
|
|
|
2022-11-05 13:22:12 +00:00
|
|
|
// executeTask executes the given task using DefaultTaskExecutionFuncName
|
|
|
|
func (ws *DefaultExecutor) executeTask(task *task.Task) error {
|
2022-11-03 20:04:14 +00:00
|
|
|
var taskErr error = nil
|
|
|
|
// Execute task and set panics as errors in taskResult.
|
|
|
|
func() {
|
|
|
|
defer func() {
|
|
|
|
if r := recover(); r != nil {
|
|
|
|
taskErr = errors.New(fmt.Sprintf("panic: %s", r))
|
|
|
|
}
|
|
|
|
}()
|
2022-11-05 13:22:12 +00:00
|
|
|
|
|
|
|
taskExecutionFunc, ok := executionFuncMap[task.ExecutionFuncName]
|
|
|
|
if !ok {
|
|
|
|
message := fmt.Sprintf("invalid task execution function name: '%s'", task.ExecutionFuncName)
|
|
|
|
logging.SugaredLogger.Error(message)
|
|
|
|
taskErr = errors.New(message)
|
|
|
|
return
|
|
|
|
}
|
|
|
|
|
|
|
|
err := taskExecutionFunc(task)
|
2022-11-03 20:04:14 +00:00
|
|
|
if err != nil {
|
|
|
|
taskErr = err
|
|
|
|
}
|
|
|
|
}()
|
|
|
|
return taskErr
|
|
|
|
}
|
|
|
|
|
|
|
|
// workerGoroutine waits for tasks and executes them.
|
|
|
|
// After the task is executed it forwards the result, including errors and panics to the task Result channel.
|
|
|
|
func (ws *DefaultExecutor) workerGoroutine() {
|
|
|
|
defer ws.workerGroup.Done()
|
|
|
|
for {
|
|
|
|
select {
|
2022-11-05 13:22:12 +00:00
|
|
|
case currentTask := <-ws.taskChan:
|
|
|
|
var taskResult = task.NewResult(currentTask)
|
|
|
|
err := ws.executeTask(currentTask)
|
|
|
|
taskResult.SetError(err)
|
2022-11-03 20:04:14 +00:00
|
|
|
|
|
|
|
// Forward TaskResult to channel.
|
|
|
|
ws.taskResultChan <- taskResult
|
|
|
|
case <-ws.quinChan:
|
|
|
|
// Quit
|
|
|
|
return
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
// Start starts the DefaultExecutor.
|
|
|
|
// Start returns a receive only channel with task Result.
|
2022-11-05 13:22:12 +00:00
|
|
|
func (ws *DefaultExecutor) Start() <-chan *task.Result {
|
2022-11-03 20:04:14 +00:00
|
|
|
// Start worker goroutines.
|
|
|
|
for i := 0; i < ws.numberOfWorkerGoroutines; i++ {
|
|
|
|
ws.workerGroup.Add(1)
|
|
|
|
go ws.workerGoroutine()
|
|
|
|
}
|
|
|
|
return ws.taskResultChan
|
|
|
|
}
|
|
|
|
|
|
|
|
// Shutdown shuts down the DefaultExecutor.
|
|
|
|
// Shutdown blocks till the DefaultExecutor has shutdown.
|
|
|
|
func (ws *DefaultExecutor) Shutdown() {
|
|
|
|
// Shutdown all worker goroutines
|
|
|
|
for i := 0; i < ws.numberOfWorkerGoroutines; i++ {
|
|
|
|
ws.quinChan <- 1
|
|
|
|
}
|
|
|
|
ws.workerGroup.Wait()
|
|
|
|
close(ws.taskChan)
|
|
|
|
close(ws.taskResultChan)
|
|
|
|
close(ws.quinChan)
|
|
|
|
}
|