From 1e06c8c62b16af3ed5456330c57037adab942b03 Mon Sep 17 00:00:00 2001 From: dnutiu Date: Sat, 5 Nov 2022 15:22:12 +0200 Subject: [PATCH] [feature] Add the option to specify function per task --- cmd/file.go | 5 +- readme.md | 14 +++- task/{ => executor}/executor.go | 69 +++++++++++----- task/executor/executor_test.go | 98 +++++++++++++++++++++++ task/executor_test.go | 50 ------------ task/{target => functions}/scrape.go | 6 +- task/{target => functions}/scrape_test.go | 6 +- task/task.go | 18 +++-- task/task_test.go | 11 ++- workload/workload.go | 8 +- workload/workload_test.go | 44 ++++++++++ 11 files changed, 237 insertions(+), 92 deletions(-) rename task/{ => executor}/executor.go (58%) create mode 100644 task/executor/executor_test.go delete mode 100644 task/executor_test.go rename task/{target => functions}/scrape.go (95%) rename task/{target => functions}/scrape_test.go (96%) diff --git a/cmd/file.go b/cmd/file.go index 692942a..6507537 100644 --- a/cmd/file.go +++ b/cmd/file.go @@ -3,8 +3,7 @@ package cmd import ( "github.com/spf13/cobra" "hotalert/logging" - "hotalert/task" - "hotalert/task/target" + "hotalert/task/executor" "hotalert/workload" "os" "sync" @@ -29,7 +28,7 @@ var fileCmd = &cobra.Command{ } var waitGroup = sync.WaitGroup{} - var defaultExecutor = task.NewDefaultExecutor(target.ScrapeWebTask) + var defaultExecutor = executor.NewDefaultExecutor() taskResultChan := defaultExecutor.Start() defer defaultExecutor.Shutdown() diff --git a/readme.md b/readme.md index 92fd56a..c964717 100644 --- a/readme.md +++ b/readme.md @@ -5,7 +5,7 @@ Hotalert is a command line tool that for task execution and configuration. Tasks in yaml files and the program parses the files, executes the tasks and emits alerts when the tasks conditions are met. For example if you want to send a notification to your mobile phone when a keyword is found on a website you can use the -`webhook_discord` alerter and the `scrape_web` task. +`webhook_discord` alerter and the `web_scrape` task function. Example: @@ -16,7 +16,7 @@ tasks: keywords: ["Episode 10", "Episode 11"] timeout: 10 alerter: "webhook_discord" - task: "scrape_web" + function: "web_scrape" alerts: webhook_discord: webhook: https://discord.com/api/webhooks/[...] @@ -24,6 +24,16 @@ alerts: message: "Hi, the keyword(s) `$keywords` was found on [...]" ``` +### Available task functions + +#### web_scrape + +The web_scrape task scrapes a web page by issuing a GET request and parses the response to look for keywords. + +**Options**: +- url (string) - The url to scrape. +- keywords (array[string]) - A list of keyword strings to look for. + ### Development To build the program for Linux under Linux use the following command: diff --git a/task/executor.go b/task/executor/executor.go similarity index 58% rename from task/executor.go rename to task/executor/executor.go index 097fabc..0a80fe9 100644 --- a/task/executor.go +++ b/task/executor/executor.go @@ -1,50 +1,65 @@ -package task +package executor import ( "errors" "fmt" + "hotalert/logging" + "hotalert/task" + "hotalert/task/functions" "sync" ) // Executor is an interface for implementing task executors. type Executor interface { // AddTask adds a task to the task executor. - AddTask(task *Task) + AddTask(task *task.Task) // Start starts the scrapper and returns a Result receive-only channel. - Start() <-chan *Result + Start() <-chan *task.Result // Shutdown shuts down the scrapper. It will block until the Executor was shut down. Shutdown() } -// ExecutionFn is a type definition for a function that executes the task and returns an error. -type ExecutionFn func(task *Task) error +// ExecutionFunc is a type definition for a function that executes the task and returns an error. +type ExecutionFunc func(task *task.Task) error // DefaultExecutor is a TaskExecutor with the default implementation. // The tasks are executed directly on the machine. type DefaultExecutor struct { - // TaskExecutionFn is the function that executes the task. - TaskExecutionFn ExecutionFn // workerGroup is a waiting group for worker goroutines. workerGroup *sync.WaitGroup // numberOfWorkerGoroutines is the number of working goroutines. numberOfWorkerGoroutines int // taskResultChan is a receive only channel for task results. - taskResultChan chan *Result + taskResultChan chan *task.Result // taskChan is a channel for tasks - taskChan chan *Task + taskChan chan *task.Task // quinChan is a channel for sending the quit command to worker goroutines. quinChan chan int } -// TODO: Add support for per task execution functions +// executionFuncMap is a map that holds all the possible values for ExecutionFunc. +// Right now it is hard-coded but in the future it may be extended dynamically. +var executionFuncMap = map[string]ExecutionFunc{ + "web_scrape": functions.WebScrapeTask, +} + +// RegisterNewExecutionFunction registers a new execution function. +func RegisterNewExecutionFunction(name string, function ExecutionFunc) error { + for n := range executionFuncMap { + if n == name { + return errors.New("function already exists") + } + } + executionFuncMap[name] = function + return nil +} // NewDefaultExecutor returns a new instance of DefaultExecutor. -func NewDefaultExecutor(fn ExecutionFn) *DefaultExecutor { +func NewDefaultExecutor() *DefaultExecutor { ws := &DefaultExecutor{ - TaskExecutionFn: fn, workerGroup: &sync.WaitGroup{}, - taskResultChan: make(chan *Result, 50), - taskChan: make(chan *Task, 50), + taskResultChan: make(chan *task.Result, 50), + taskChan: make(chan *task.Task, 50), numberOfWorkerGoroutines: 5, } ws.quinChan = make(chan int, ws.numberOfWorkerGoroutines) @@ -52,12 +67,12 @@ func NewDefaultExecutor(fn ExecutionFn) *DefaultExecutor { } // AddTask adds a task to the DefaultExecutor queue. -func (ws *DefaultExecutor) AddTask(task *Task) { +func (ws *DefaultExecutor) AddTask(task *task.Task) { ws.taskChan <- task } -// executeTask executes the given task using TaskExecutionFn -func (ws *DefaultExecutor) executeTask(task *Task) error { +// executeTask executes the given task using DefaultTaskExecutionFuncName +func (ws *DefaultExecutor) executeTask(task *task.Task) error { var taskErr error = nil // Execute task and set panics as errors in taskResult. func() { @@ -66,7 +81,16 @@ func (ws *DefaultExecutor) executeTask(task *Task) error { taskErr = errors.New(fmt.Sprintf("panic: %s", r)) } }() - err := ws.TaskExecutionFn(task) + + taskExecutionFunc, ok := executionFuncMap[task.ExecutionFuncName] + if !ok { + message := fmt.Sprintf("invalid task execution function name: '%s'", task.ExecutionFuncName) + logging.SugaredLogger.Error(message) + taskErr = errors.New(message) + return + } + + err := taskExecutionFunc(task) if err != nil { taskErr = err } @@ -80,9 +104,10 @@ func (ws *DefaultExecutor) workerGoroutine() { defer ws.workerGroup.Done() for { select { - case task := <-ws.taskChan: - var taskResult = NewResult(task) - taskResult.error = ws.executeTask(task) + case currentTask := <-ws.taskChan: + var taskResult = task.NewResult(currentTask) + err := ws.executeTask(currentTask) + taskResult.SetError(err) // Forward TaskResult to channel. ws.taskResultChan <- taskResult @@ -95,7 +120,7 @@ func (ws *DefaultExecutor) workerGoroutine() { // Start starts the DefaultExecutor. // Start returns a receive only channel with task Result. -func (ws *DefaultExecutor) Start() <-chan *Result { +func (ws *DefaultExecutor) Start() <-chan *task.Result { // Start worker goroutines. for i := 0; i < ws.numberOfWorkerGoroutines; i++ { ws.workerGroup.Add(1) diff --git a/task/executor/executor_test.go b/task/executor/executor_test.go new file mode 100644 index 0000000..7ffed2c --- /dev/null +++ b/task/executor/executor_test.go @@ -0,0 +1,98 @@ +package executor + +import ( + "crypto/rand" + "encoding/hex" + "errors" + "github.com/stretchr/testify/assert" + "hotalert/alert" + "hotalert/task" + "testing" +) + +func randomHex(n int) (string, error) { + bytes := make([]byte, n) + if _, err := rand.Read(bytes); err != nil { + return "", err + } + return hex.EncodeToString(bytes), nil +} + +func Test_DefaultExecutor(t *testing.T) { + // Setup + var taskCounter = 0 + var taskTestFunc = func(task *task.Task) error { + // First task is successful, others return error. + if taskCounter > 0 { + return errors.New("test") + } + taskCounter += 1 + return nil + } + + err := RegisterNewExecutionFunction("task_test", taskTestFunc) + assert.NoError(t, err) + + defaultExecutor := NewDefaultExecutor() + taskResultsChan := defaultExecutor.Start() + + // Test + var task1 = &task.Task{ + ExecutionFuncName: "task_test", + Timeout: 0, + Alerter: alert.NewDummyAlerter(), + Callback: nil, + } + var task2 = &task.Task{ + ExecutionFuncName: "task_test", + Timeout: 0, + Alerter: alert.NewDummyAlerter(), + Callback: nil, + } + + defaultExecutor.AddTask(task1) + defaultExecutor.AddTask(task2) + + // Assert results + result1 := <-taskResultsChan + assert.Equal(t, task1, result1.InitialTask) + assert.Equal(t, nil, result1.Error()) + + result2 := <-taskResultsChan + assert.Equal(t, task2, result2.InitialTask) + assert.Equal(t, errors.New("test"), result2.Error()) + + // Clean-up + defaultExecutor.Shutdown() +} + +func Test_DefaultExecutor_InvalidTaskExecutionFuncName(t *testing.T) { + defaultExecutor := NewDefaultExecutor() + taskResultsChan := defaultExecutor.Start() + + // Test + var task1 = &task.Task{ + ExecutionFuncName: "vand_dacia_2006", + Timeout: 0, + Alerter: alert.NewDummyAlerter(), + Callback: nil, + } + defaultExecutor.AddTask(task1) + + // Assert results + result1 := <-taskResultsChan + assert.Equal(t, task1, result1.InitialTask) + assert.Equal(t, errors.New("invalid task execution function name: 'vand_dacia_2006'"), result1.Error()) + + // Clean-up + defaultExecutor.Shutdown() +} + +func Test_RegisterNewExecutionFunction(t *testing.T) { + var taskTestFunc = func(t *task.Task) error { return nil } + randomName, _ := randomHex(5) + err := RegisterNewExecutionFunction(randomName, taskTestFunc) + assert.NoError(t, err) + err = RegisterNewExecutionFunction(randomName, taskTestFunc) + assert.Error(t, err) +} diff --git a/task/executor_test.go b/task/executor_test.go deleted file mode 100644 index a35311d..0000000 --- a/task/executor_test.go +++ /dev/null @@ -1,50 +0,0 @@ -package task - -import ( - "errors" - "github.com/stretchr/testify/assert" - "hotalert/alert" - "testing" -) - -func Test_DefaultExecutor(t *testing.T) { - // Setup - var taskCounter = 0 - defaultExecutor := NewDefaultExecutor(func(task *Task) error { - // First task is successful, others return error. - if taskCounter > 0 { - return errors.New("test") - } - taskCounter += 1 - return nil - }) - taskResultsChan := defaultExecutor.Start() - - // Test - var task1 = &Task{ - Timeout: 0, - Alerter: alert.NewDummyAlerter(), - Callback: nil, - } - var task2 = &Task{ - Timeout: 0, - Alerter: alert.NewDummyAlerter(), - Callback: nil, - } - - defaultExecutor.AddTask(task1) - defaultExecutor.AddTask(task2) - - // Assert results - assert.Equal(t, &Result{ - InitialTask: task1, - error: nil, - }, <-taskResultsChan) - assert.Equal(t, &Result{ - InitialTask: task2, - error: errors.New("test"), - }, <-taskResultsChan) - - // Clean-up - defaultExecutor.Shutdown() -} diff --git a/task/target/scrape.go b/task/functions/scrape.go similarity index 95% rename from task/target/scrape.go rename to task/functions/scrape.go index e862a69..96a003e 100644 --- a/task/target/scrape.go +++ b/task/functions/scrape.go @@ -1,4 +1,4 @@ -package target +package functions import ( "context" @@ -11,8 +11,8 @@ import ( "strings" ) -// ScrapeWebTask scraps the web page given the task. -func ScrapeWebTask(task *task.Task) error { +// WebScrapeTask scraps the web page given the task. +func WebScrapeTask(task *task.Task) error { // Parse options targetUrl, ok := task.Options["url"].(string) if !ok { diff --git a/task/target/scrape_test.go b/task/functions/scrape_test.go similarity index 96% rename from task/target/scrape_test.go rename to task/functions/scrape_test.go index 0d15b71..c58f6db 100644 --- a/task/target/scrape_test.go +++ b/task/functions/scrape_test.go @@ -1,4 +1,4 @@ -package target +package functions import ( "github.com/stretchr/testify/assert" @@ -38,7 +38,7 @@ func TestScrapeWebTask(t *testing.T) { "TestOk", task.Task{ Options: task.Options{ - "keywords": []string{"keyword"}, + "keywords": []any{"keyword"}, }, Timeout: 10 * time.Second, Alerter: alert.NewDummyAlerter(), @@ -112,7 +112,7 @@ func TestScrapeWebTask(t *testing.T) { tv.Task.Options["url"] = testHttpServer.URL - err := ScrapeWebTask(&tv.Task) + err := WebScrapeTask(&tv.Task) if tv.ExpectedError { assert.Error(t, err) } else { diff --git a/task/task.go b/task/task.go index 0d48a68..2d59655 100644 --- a/task/task.go +++ b/task/task.go @@ -14,6 +14,8 @@ type Options map[string]any // Task represents the context of a task. type Task struct { + // ExecutionFuncName is the function name associated with this task. + ExecutionFuncName string // Options are the option given to the task. Options Options `mapstructure:"options"` // Timeout is the timeout for the task. @@ -25,15 +27,16 @@ type Task struct { } // NewTask returns a new task instance. -func NewTask(options Options, alerter alert.Alerter) *Task { +func NewTask(executionFuncName string, options Options, alerter alert.Alerter) *Task { if alerter == nil { panic(fmt.Sprintf("Alerter cannot be nil")) } return &Task{ - Options: options, - Timeout: 10 * time.Second, - Alerter: alerter, - Callback: nil, + ExecutionFuncName: executionFuncName, + Options: options, + Timeout: 10 * time.Second, + Alerter: alerter, + Callback: nil, } } @@ -53,6 +56,11 @@ func NewResult(task *Task) *Result { } } +// SetError sets the error on the result object. +func (r *Result) SetError(err error) { + r.error = err +} + // Error returns the error encountered during the execution of the task. // Error returns null if the task had no errors and was completed. func (r *Result) Error() error { diff --git a/task/task_test.go b/task/task_test.go index 58d5544..cc52ad7 100644 --- a/task/task_test.go +++ b/task/task_test.go @@ -1,6 +1,7 @@ package task import ( + "errors" "github.com/stretchr/testify/assert" "hotalert/alert" "testing" @@ -8,10 +9,11 @@ import ( ) func Test_NewTask(t *testing.T) { - var task = NewTask(Options{ + var task = NewTask("web_scrape", Options{ "option": "true", }, alert.NewDummyAlerter()) assert.Equal(t, Task{ + ExecutionFuncName: "web_scrape", Options: Options{ "option": "true", }, @@ -22,12 +24,15 @@ func Test_NewTask(t *testing.T) { } func Test_NewResult(t *testing.T) { - var task = NewTask(Options{ + var task = NewTask("web_scrape", Options{ "option": "true", }, alert.NewDummyAlerter()) + testError := errors.New("test error") var result = NewResult(task) + result.SetError(testError) assert.Equal(t, Result{ InitialTask: &Task{ + ExecutionFuncName: "web_scrape", Options: Options{ "option": "true", }, @@ -35,6 +40,6 @@ func Test_NewResult(t *testing.T) { Alerter: alert.NewDummyAlerter(), Callback: nil, }, - error: nil, + error: testError, }, *result) } diff --git a/workload/workload.go b/workload/workload.go index 25f1836..e4f7201 100644 --- a/workload/workload.go +++ b/workload/workload.go @@ -108,8 +108,14 @@ func (p *Workload) buildTasksArray(workloadData map[string]any) error { continue } + executionFuncName, ok := taskEntry["function"].(string) + if !ok { + logging.SugaredLogger.Errorf("error parsing entry %d in tasks array: invalid function name", i) + continue + } + // Build task - tempTask := task.NewTask(taskOptions, alert.DummyAlerter{}) + tempTask := task.NewTask(executionFuncName, taskOptions, alert.DummyAlerter{}) // Timeout (optional) taskTimeout, ok := taskEntry["timeout"].(int) diff --git a/workload/workload_test.go b/workload/workload_test.go index 8cc3e2d..aa78d6e 100644 --- a/workload/workload_test.go +++ b/workload/workload_test.go @@ -15,6 +15,7 @@ tasks: keywords: ["Software Engineer, Backend"] timeout: 10 alerter: "webhook_discord" + function: "web_scrape" - options: url: https://jobs.ro keywords: ["Software Engineer, Front-End", "Software Architect"] @@ -23,6 +24,7 @@ tasks: extra_bool: True timeout: 15 alerter: "webhook_discord" + function: "web_scrape" alerts: webhook_discord: webhook: https://webhook.url.com @@ -69,6 +71,7 @@ tasks: keywords: ["Software Engineer, Backend"] timeout: 10 alerter: "webhook_discord" + function: "web_scrape" - options: url: https://jobs.ro keywords: ["Software Engineer, Front-End", "Software Architect"] @@ -77,6 +80,7 @@ tasks: extra_bool: True timeout: 15 alerter: "webhook_discord" + function: "web_scrape" alerts: webhook_discord: webhook: https://webhook.url.com @@ -100,6 +104,7 @@ tasks: keywords: ["Software Engineer, Backend"] timeout: 10 alerter: "webhook_discord" + function: "web_scrape" - options: url: https://jobs.ro keywords: ["Software Engineer, Front-End", "Software Architect"] @@ -108,6 +113,7 @@ tasks: extra_bool: True timeout: 15 alerter: "webhook_discord" + function: "web_scrape" alerts: webhook_discord: webhook: https://webhook.url.com @@ -127,6 +133,7 @@ tasks: keywords: ["Software Engineer, Backend"] timeout: 10 alerter: "webhook_discord" + function: "web_scrape" - options: url: https://jobs.ro keywords: ["Software Engineer, Front-End", "Software Architect"] @@ -135,6 +142,7 @@ tasks: extra_bool: True timeout: 15 alerter: "webhook_discord" + function: "web_scrape" alertx: webhook_discord: webhook: https://webhook.url.com @@ -149,6 +157,7 @@ tasks: keywords: ["Software Engineer, Backend"] timeout: 10 alerter: "webhook_discord" + function: "web_scrape" - options: url: https://jobs.ro keywords: ["Software Engineer, Front-End", "Software Architect"] @@ -157,6 +166,7 @@ tasks: extra_bool: True timeout: 15 alerter: "webhook_discord" + function: "web_scrape" alerts: "I'm just a string please don't hurt me." ` @@ -167,6 +177,7 @@ tasks: keywords: ["Software Engineer, Backend"] timeout: 10 alerter: "webhook_discord" + function: "web_scrape" - options: url: https://jobs.ro keywords: ["Software Engineer, Front-End", "Software Architect"] @@ -175,6 +186,7 @@ tasks: extra_bool: True timeout: 15 alerter: "webhook_discord" + function: "web_scrape" alerts: webhook_discord: webhook: https://webhook.url.com @@ -194,6 +206,7 @@ tasks: keywords: ["Software Engineer, Backend"] timeout: 10 alerter: "webhook_discord" + function: "web_scrape" - options: url: https://jobs.ro keywords: ["Software Engineer, Front-End", "Software Architect"] @@ -202,6 +215,7 @@ tasks: extra_bool: True timeout: 15 alerter: "webhook_discord" + function: "web_scrape" alerts: imcoolalerter: cool: true @@ -241,6 +255,7 @@ tasks: keywords: ["Software Engineer, Backend"] timeout: 10 alerter: "imaacoolalerter" + function: "web_scrape" alerts: webhook_discord: webhook: https://webhook.url.com @@ -316,11 +331,13 @@ tasks: keywords: ["Software Engineer, Backend"] timeout: 10 alerter: "imaacoolalerter" + function: "web_scrape" - options: url: https://jobs.eu keywords: ["Software Engineer, Backend"] timeout: 10 alerter: "webhook_discord" + function: "web_scrape" alerts: webhook_discord: webhook: https://webhook.url.com @@ -334,3 +351,30 @@ func Test_FromYamlContent_InvalidAlerterForTask(t *testing.T) { assert.NotNil(t, currentWorkload) assert.Len(t, currentWorkload.tasksList, 1) } + +var testTasksTaskHasInvalidExecutionFunction = ` +tasks: + - options: + url: https://jobs.eu + keywords: ["Software Engineer, Backend"] + timeout: 10 + alerter: "imaacoolalerter" + - options: + url: https://jobs.eu + keywords: ["Software Engineer, Backend"] + timeout: 10 + alerter: "webhook_discord" + function: "web_scrape" +alerts: + webhook_discord: + webhook: https://webhook.url.com + # $keywords can be used as a placeholder in the message, and it will be replaced with the actual keywords. + message: "Hi, the keyword $keywords was found on page!" +` + +func Test_FromYamlContent_InvalidExecutionFuncNameForTask(t *testing.T) { + currentWorkload, err := FromYamlContent([]byte(testTasksTaskHasInvalidExecutionFunction)) + assert.NoError(t, err) + assert.NotNil(t, currentWorkload) + assert.Len(t, currentWorkload.tasksList, 1) +}