[feature] Add the option to specify function per task

This commit is contained in:
Denis-Cosmin Nutiu 2022-11-05 15:22:12 +02:00
parent 17476be23c
commit 1e06c8c62b
11 changed files with 237 additions and 92 deletions

View file

@ -3,8 +3,7 @@ package cmd
import (
"github.com/spf13/cobra"
"hotalert/logging"
"hotalert/task"
"hotalert/task/target"
"hotalert/task/executor"
"hotalert/workload"
"os"
"sync"
@ -29,7 +28,7 @@ var fileCmd = &cobra.Command{
}
var waitGroup = sync.WaitGroup{}
var defaultExecutor = task.NewDefaultExecutor(target.ScrapeWebTask)
var defaultExecutor = executor.NewDefaultExecutor()
taskResultChan := defaultExecutor.Start()
defer defaultExecutor.Shutdown()

View file

@ -5,7 +5,7 @@ Hotalert is a command line tool that for task execution and configuration. Tasks
in yaml files and the program parses the files, executes the tasks and emits alerts when the tasks conditions are met.
For example if you want to send a notification to your mobile phone when a keyword is found on a website you can use the
`webhook_discord` alerter and the `scrape_web` task.
`webhook_discord` alerter and the `web_scrape` task function.
Example:
@ -16,7 +16,7 @@ tasks:
keywords: ["Episode 10", "Episode 11"]
timeout: 10
alerter: "webhook_discord"
task: "scrape_web"
function: "web_scrape"
alerts:
webhook_discord:
webhook: https://discord.com/api/webhooks/[...]
@ -24,6 +24,16 @@ alerts:
message: "Hi, the keyword(s) `$keywords` was found on [...]"
```
### Available task functions
#### web_scrape
The web_scrape task scrapes a web page by issuing a GET request and parses the response to look for keywords.
**Options**:
- url (string) - The url to scrape.
- keywords (array[string]) - A list of keyword strings to look for.
### Development
To build the program for Linux under Linux use the following command:

View file

@ -1,50 +1,65 @@
package task
package executor
import (
"errors"
"fmt"
"hotalert/logging"
"hotalert/task"
"hotalert/task/functions"
"sync"
)
// Executor is an interface for implementing task executors.
type Executor interface {
// AddTask adds a task to the task executor.
AddTask(task *Task)
AddTask(task *task.Task)
// Start starts the scrapper and returns a Result receive-only channel.
Start() <-chan *Result
Start() <-chan *task.Result
// Shutdown shuts down the scrapper. It will block until the Executor was shut down.
Shutdown()
}
// ExecutionFn is a type definition for a function that executes the task and returns an error.
type ExecutionFn func(task *Task) error
// ExecutionFunc is a type definition for a function that executes the task and returns an error.
type ExecutionFunc func(task *task.Task) error
// DefaultExecutor is a TaskExecutor with the default implementation.
// The tasks are executed directly on the machine.
type DefaultExecutor struct {
// TaskExecutionFn is the function that executes the task.
TaskExecutionFn ExecutionFn
// workerGroup is a waiting group for worker goroutines.
workerGroup *sync.WaitGroup
// numberOfWorkerGoroutines is the number of working goroutines.
numberOfWorkerGoroutines int
// taskResultChan is a receive only channel for task results.
taskResultChan chan *Result
taskResultChan chan *task.Result
// taskChan is a channel for tasks
taskChan chan *Task
taskChan chan *task.Task
// quinChan is a channel for sending the quit command to worker goroutines.
quinChan chan int
}
// TODO: Add support for per task execution functions
// executionFuncMap is a map that holds all the possible values for ExecutionFunc.
// Right now it is hard-coded but in the future it may be extended dynamically.
var executionFuncMap = map[string]ExecutionFunc{
"web_scrape": functions.WebScrapeTask,
}
// RegisterNewExecutionFunction registers a new execution function.
func RegisterNewExecutionFunction(name string, function ExecutionFunc) error {
for n := range executionFuncMap {
if n == name {
return errors.New("function already exists")
}
}
executionFuncMap[name] = function
return nil
}
// NewDefaultExecutor returns a new instance of DefaultExecutor.
func NewDefaultExecutor(fn ExecutionFn) *DefaultExecutor {
func NewDefaultExecutor() *DefaultExecutor {
ws := &DefaultExecutor{
TaskExecutionFn: fn,
workerGroup: &sync.WaitGroup{},
taskResultChan: make(chan *Result, 50),
taskChan: make(chan *Task, 50),
taskResultChan: make(chan *task.Result, 50),
taskChan: make(chan *task.Task, 50),
numberOfWorkerGoroutines: 5,
}
ws.quinChan = make(chan int, ws.numberOfWorkerGoroutines)
@ -52,12 +67,12 @@ func NewDefaultExecutor(fn ExecutionFn) *DefaultExecutor {
}
// AddTask adds a task to the DefaultExecutor queue.
func (ws *DefaultExecutor) AddTask(task *Task) {
func (ws *DefaultExecutor) AddTask(task *task.Task) {
ws.taskChan <- task
}
// executeTask executes the given task using TaskExecutionFn
func (ws *DefaultExecutor) executeTask(task *Task) error {
// executeTask executes the given task using DefaultTaskExecutionFuncName
func (ws *DefaultExecutor) executeTask(task *task.Task) error {
var taskErr error = nil
// Execute task and set panics as errors in taskResult.
func() {
@ -66,7 +81,16 @@ func (ws *DefaultExecutor) executeTask(task *Task) error {
taskErr = errors.New(fmt.Sprintf("panic: %s", r))
}
}()
err := ws.TaskExecutionFn(task)
taskExecutionFunc, ok := executionFuncMap[task.ExecutionFuncName]
if !ok {
message := fmt.Sprintf("invalid task execution function name: '%s'", task.ExecutionFuncName)
logging.SugaredLogger.Error(message)
taskErr = errors.New(message)
return
}
err := taskExecutionFunc(task)
if err != nil {
taskErr = err
}
@ -80,9 +104,10 @@ func (ws *DefaultExecutor) workerGoroutine() {
defer ws.workerGroup.Done()
for {
select {
case task := <-ws.taskChan:
var taskResult = NewResult(task)
taskResult.error = ws.executeTask(task)
case currentTask := <-ws.taskChan:
var taskResult = task.NewResult(currentTask)
err := ws.executeTask(currentTask)
taskResult.SetError(err)
// Forward TaskResult to channel.
ws.taskResultChan <- taskResult
@ -95,7 +120,7 @@ func (ws *DefaultExecutor) workerGoroutine() {
// Start starts the DefaultExecutor.
// Start returns a receive only channel with task Result.
func (ws *DefaultExecutor) Start() <-chan *Result {
func (ws *DefaultExecutor) Start() <-chan *task.Result {
// Start worker goroutines.
for i := 0; i < ws.numberOfWorkerGoroutines; i++ {
ws.workerGroup.Add(1)

View file

@ -0,0 +1,98 @@
package executor
import (
"crypto/rand"
"encoding/hex"
"errors"
"github.com/stretchr/testify/assert"
"hotalert/alert"
"hotalert/task"
"testing"
)
func randomHex(n int) (string, error) {
bytes := make([]byte, n)
if _, err := rand.Read(bytes); err != nil {
return "", err
}
return hex.EncodeToString(bytes), nil
}
func Test_DefaultExecutor(t *testing.T) {
// Setup
var taskCounter = 0
var taskTestFunc = func(task *task.Task) error {
// First task is successful, others return error.
if taskCounter > 0 {
return errors.New("test")
}
taskCounter += 1
return nil
}
err := RegisterNewExecutionFunction("task_test", taskTestFunc)
assert.NoError(t, err)
defaultExecutor := NewDefaultExecutor()
taskResultsChan := defaultExecutor.Start()
// Test
var task1 = &task.Task{
ExecutionFuncName: "task_test",
Timeout: 0,
Alerter: alert.NewDummyAlerter(),
Callback: nil,
}
var task2 = &task.Task{
ExecutionFuncName: "task_test",
Timeout: 0,
Alerter: alert.NewDummyAlerter(),
Callback: nil,
}
defaultExecutor.AddTask(task1)
defaultExecutor.AddTask(task2)
// Assert results
result1 := <-taskResultsChan
assert.Equal(t, task1, result1.InitialTask)
assert.Equal(t, nil, result1.Error())
result2 := <-taskResultsChan
assert.Equal(t, task2, result2.InitialTask)
assert.Equal(t, errors.New("test"), result2.Error())
// Clean-up
defaultExecutor.Shutdown()
}
func Test_DefaultExecutor_InvalidTaskExecutionFuncName(t *testing.T) {
defaultExecutor := NewDefaultExecutor()
taskResultsChan := defaultExecutor.Start()
// Test
var task1 = &task.Task{
ExecutionFuncName: "vand_dacia_2006",
Timeout: 0,
Alerter: alert.NewDummyAlerter(),
Callback: nil,
}
defaultExecutor.AddTask(task1)
// Assert results
result1 := <-taskResultsChan
assert.Equal(t, task1, result1.InitialTask)
assert.Equal(t, errors.New("invalid task execution function name: 'vand_dacia_2006'"), result1.Error())
// Clean-up
defaultExecutor.Shutdown()
}
func Test_RegisterNewExecutionFunction(t *testing.T) {
var taskTestFunc = func(t *task.Task) error { return nil }
randomName, _ := randomHex(5)
err := RegisterNewExecutionFunction(randomName, taskTestFunc)
assert.NoError(t, err)
err = RegisterNewExecutionFunction(randomName, taskTestFunc)
assert.Error(t, err)
}

View file

@ -1,50 +0,0 @@
package task
import (
"errors"
"github.com/stretchr/testify/assert"
"hotalert/alert"
"testing"
)
func Test_DefaultExecutor(t *testing.T) {
// Setup
var taskCounter = 0
defaultExecutor := NewDefaultExecutor(func(task *Task) error {
// First task is successful, others return error.
if taskCounter > 0 {
return errors.New("test")
}
taskCounter += 1
return nil
})
taskResultsChan := defaultExecutor.Start()
// Test
var task1 = &Task{
Timeout: 0,
Alerter: alert.NewDummyAlerter(),
Callback: nil,
}
var task2 = &Task{
Timeout: 0,
Alerter: alert.NewDummyAlerter(),
Callback: nil,
}
defaultExecutor.AddTask(task1)
defaultExecutor.AddTask(task2)
// Assert results
assert.Equal(t, &Result{
InitialTask: task1,
error: nil,
}, <-taskResultsChan)
assert.Equal(t, &Result{
InitialTask: task2,
error: errors.New("test"),
}, <-taskResultsChan)
// Clean-up
defaultExecutor.Shutdown()
}

View file

@ -1,4 +1,4 @@
package target
package functions
import (
"context"
@ -11,8 +11,8 @@ import (
"strings"
)
// ScrapeWebTask scraps the web page given the task.
func ScrapeWebTask(task *task.Task) error {
// WebScrapeTask scraps the web page given the task.
func WebScrapeTask(task *task.Task) error {
// Parse options
targetUrl, ok := task.Options["url"].(string)
if !ok {

View file

@ -1,4 +1,4 @@
package target
package functions
import (
"github.com/stretchr/testify/assert"
@ -38,7 +38,7 @@ func TestScrapeWebTask(t *testing.T) {
"TestOk",
task.Task{
Options: task.Options{
"keywords": []string{"keyword"},
"keywords": []any{"keyword"},
},
Timeout: 10 * time.Second,
Alerter: alert.NewDummyAlerter(),
@ -112,7 +112,7 @@ func TestScrapeWebTask(t *testing.T) {
tv.Task.Options["url"] = testHttpServer.URL
err := ScrapeWebTask(&tv.Task)
err := WebScrapeTask(&tv.Task)
if tv.ExpectedError {
assert.Error(t, err)
} else {

View file

@ -14,6 +14,8 @@ type Options map[string]any
// Task represents the context of a task.
type Task struct {
// ExecutionFuncName is the function name associated with this task.
ExecutionFuncName string
// Options are the option given to the task.
Options Options `mapstructure:"options"`
// Timeout is the timeout for the task.
@ -25,11 +27,12 @@ type Task struct {
}
// NewTask returns a new task instance.
func NewTask(options Options, alerter alert.Alerter) *Task {
func NewTask(executionFuncName string, options Options, alerter alert.Alerter) *Task {
if alerter == nil {
panic(fmt.Sprintf("Alerter cannot be nil"))
}
return &Task{
ExecutionFuncName: executionFuncName,
Options: options,
Timeout: 10 * time.Second,
Alerter: alerter,
@ -53,6 +56,11 @@ func NewResult(task *Task) *Result {
}
}
// SetError sets the error on the result object.
func (r *Result) SetError(err error) {
r.error = err
}
// Error returns the error encountered during the execution of the task.
// Error returns null if the task had no errors and was completed.
func (r *Result) Error() error {

View file

@ -1,6 +1,7 @@
package task
import (
"errors"
"github.com/stretchr/testify/assert"
"hotalert/alert"
"testing"
@ -8,10 +9,11 @@ import (
)
func Test_NewTask(t *testing.T) {
var task = NewTask(Options{
var task = NewTask("web_scrape", Options{
"option": "true",
}, alert.NewDummyAlerter())
assert.Equal(t, Task{
ExecutionFuncName: "web_scrape",
Options: Options{
"option": "true",
},
@ -22,12 +24,15 @@ func Test_NewTask(t *testing.T) {
}
func Test_NewResult(t *testing.T) {
var task = NewTask(Options{
var task = NewTask("web_scrape", Options{
"option": "true",
}, alert.NewDummyAlerter())
testError := errors.New("test error")
var result = NewResult(task)
result.SetError(testError)
assert.Equal(t, Result{
InitialTask: &Task{
ExecutionFuncName: "web_scrape",
Options: Options{
"option": "true",
},
@ -35,6 +40,6 @@ func Test_NewResult(t *testing.T) {
Alerter: alert.NewDummyAlerter(),
Callback: nil,
},
error: nil,
error: testError,
}, *result)
}

View file

@ -108,8 +108,14 @@ func (p *Workload) buildTasksArray(workloadData map[string]any) error {
continue
}
executionFuncName, ok := taskEntry["function"].(string)
if !ok {
logging.SugaredLogger.Errorf("error parsing entry %d in tasks array: invalid function name", i)
continue
}
// Build task
tempTask := task.NewTask(taskOptions, alert.DummyAlerter{})
tempTask := task.NewTask(executionFuncName, taskOptions, alert.DummyAlerter{})
// Timeout (optional)
taskTimeout, ok := taskEntry["timeout"].(int)

View file

@ -15,6 +15,7 @@ tasks:
keywords: ["Software Engineer, Backend"]
timeout: 10
alerter: "webhook_discord"
function: "web_scrape"
- options:
url: https://jobs.ro
keywords: ["Software Engineer, Front-End", "Software Architect"]
@ -23,6 +24,7 @@ tasks:
extra_bool: True
timeout: 15
alerter: "webhook_discord"
function: "web_scrape"
alerts:
webhook_discord:
webhook: https://webhook.url.com
@ -69,6 +71,7 @@ tasks:
keywords: ["Software Engineer, Backend"]
timeout: 10
alerter: "webhook_discord"
function: "web_scrape"
- options:
url: https://jobs.ro
keywords: ["Software Engineer, Front-End", "Software Architect"]
@ -77,6 +80,7 @@ tasks:
extra_bool: True
timeout: 15
alerter: "webhook_discord"
function: "web_scrape"
alerts:
webhook_discord:
webhook: https://webhook.url.com
@ -100,6 +104,7 @@ tasks:
keywords: ["Software Engineer, Backend"]
timeout: 10
alerter: "webhook_discord"
function: "web_scrape"
- options:
url: https://jobs.ro
keywords: ["Software Engineer, Front-End", "Software Architect"]
@ -108,6 +113,7 @@ tasks:
extra_bool: True
timeout: 15
alerter: "webhook_discord"
function: "web_scrape"
alerts:
webhook_discord:
webhook: https://webhook.url.com
@ -127,6 +133,7 @@ tasks:
keywords: ["Software Engineer, Backend"]
timeout: 10
alerter: "webhook_discord"
function: "web_scrape"
- options:
url: https://jobs.ro
keywords: ["Software Engineer, Front-End", "Software Architect"]
@ -135,6 +142,7 @@ tasks:
extra_bool: True
timeout: 15
alerter: "webhook_discord"
function: "web_scrape"
alertx:
webhook_discord:
webhook: https://webhook.url.com
@ -149,6 +157,7 @@ tasks:
keywords: ["Software Engineer, Backend"]
timeout: 10
alerter: "webhook_discord"
function: "web_scrape"
- options:
url: https://jobs.ro
keywords: ["Software Engineer, Front-End", "Software Architect"]
@ -157,6 +166,7 @@ tasks:
extra_bool: True
timeout: 15
alerter: "webhook_discord"
function: "web_scrape"
alerts: "I'm just a string please don't hurt me."
`
@ -167,6 +177,7 @@ tasks:
keywords: ["Software Engineer, Backend"]
timeout: 10
alerter: "webhook_discord"
function: "web_scrape"
- options:
url: https://jobs.ro
keywords: ["Software Engineer, Front-End", "Software Architect"]
@ -175,6 +186,7 @@ tasks:
extra_bool: True
timeout: 15
alerter: "webhook_discord"
function: "web_scrape"
alerts:
webhook_discord:
webhook: https://webhook.url.com
@ -194,6 +206,7 @@ tasks:
keywords: ["Software Engineer, Backend"]
timeout: 10
alerter: "webhook_discord"
function: "web_scrape"
- options:
url: https://jobs.ro
keywords: ["Software Engineer, Front-End", "Software Architect"]
@ -202,6 +215,7 @@ tasks:
extra_bool: True
timeout: 15
alerter: "webhook_discord"
function: "web_scrape"
alerts:
imcoolalerter:
cool: true
@ -241,6 +255,7 @@ tasks:
keywords: ["Software Engineer, Backend"]
timeout: 10
alerter: "imaacoolalerter"
function: "web_scrape"
alerts:
webhook_discord:
webhook: https://webhook.url.com
@ -316,11 +331,13 @@ tasks:
keywords: ["Software Engineer, Backend"]
timeout: 10
alerter: "imaacoolalerter"
function: "web_scrape"
- options:
url: https://jobs.eu
keywords: ["Software Engineer, Backend"]
timeout: 10
alerter: "webhook_discord"
function: "web_scrape"
alerts:
webhook_discord:
webhook: https://webhook.url.com
@ -334,3 +351,30 @@ func Test_FromYamlContent_InvalidAlerterForTask(t *testing.T) {
assert.NotNil(t, currentWorkload)
assert.Len(t, currentWorkload.tasksList, 1)
}
var testTasksTaskHasInvalidExecutionFunction = `
tasks:
- options:
url: https://jobs.eu
keywords: ["Software Engineer, Backend"]
timeout: 10
alerter: "imaacoolalerter"
- options:
url: https://jobs.eu
keywords: ["Software Engineer, Backend"]
timeout: 10
alerter: "webhook_discord"
function: "web_scrape"
alerts:
webhook_discord:
webhook: https://webhook.url.com
# $keywords can be used as a placeholder in the message, and it will be replaced with the actual keywords.
message: "Hi, the keyword $keywords was found on page!"
`
func Test_FromYamlContent_InvalidExecutionFuncNameForTask(t *testing.T) {
currentWorkload, err := FromYamlContent([]byte(testTasksTaskHasInvalidExecutionFunction))
assert.NoError(t, err)
assert.NotNil(t, currentWorkload)
assert.Len(t, currentWorkload.tasksList, 1)
}