Skip to content

Commit

Permalink
GHO-2783 Make sender node parallelizable (#69)
Browse files Browse the repository at this point in the history
* GHO-2783 Refactor nodes to use base type

* GHO-2783 Make sender node parallelizable

* linting

* lint
  • Loading branch information
liamg authored May 18, 2023
1 parent 1f5a4ce commit 034d9f3
Show file tree
Hide file tree
Showing 22 changed files with 1,302 additions and 926 deletions.
508 changes: 508 additions & 0 deletions backend/workflow/bus.go

Large diffs are not rendered by default.

24 changes: 4 additions & 20 deletions backend/workflow/json.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package workflow

import (
"encoding/json"
"fmt"

"github.com/ghostsecurity/reaper/backend/workflow/node"
"github.com/google/uuid"
Expand Down Expand Up @@ -138,32 +137,17 @@ func ToNodeM(n node.Node) (*NodeM, error) {
}

func (n *NodeM) ToNode() (node.Node, error) {
var real node.Node
switch node.Type(n.Type) {
case node.TypeFuzzer:
real = node.NewFuzzer()
case node.TypeStatusFilter:
real = node.NewStatusFilter()
case node.TypeOutput:
real = node.NewOutput()
case node.TypeRequest:
real = node.NewRequest()
case node.TypeStart:
real = node.NewStart()
case node.TypeSender:
real = node.NewSender()
case node.TypeVariables:
real = node.NewVars()
default:
return nil, fmt.Errorf("unknown node type: %v", n.Type)
real, err := node.FromType(node.Type(n.Type))
if err != nil {
return nil, err
}
real.SetID(toUUIDOrNil(n.Id))
real.SetName(n.Name)
unpacked, err := n.Vars.Unpack()
if err != nil {
return nil, err
}
real.SetVars(unpacked)
real.MergeVars(unpacked)
return real, nil
}

Expand Down
99 changes: 99 additions & 0 deletions backend/workflow/node/base.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
package node

import (
"sync"
"time"

"github.com/ghostsecurity/reaper/backend/workflow/transmission"
"github.com/google/uuid"
"golang.org/x/net/context"
)

type noInjections struct{}

func (n noInjections) GetInjections() map[string]transmission.Transmission {
return nil
}

type base struct {
*VarStorage
id uuid.UUID
name string
t Type
readonly bool
busy bool
last time.Time
busyMu sync.RWMutex
}

func newBase(name string, t Type, readonly bool, vars *VarStorage) *base {
return &base{
VarStorage: vars,
id: uuid.New(),
name: name,
t: t,
readonly: readonly,
}
}

func (b *base) Busy() bool {
b.busyMu.RLock()
defer b.busyMu.RUnlock()
return b.busy
}

func (b *base) LastInput() time.Time {
b.busyMu.RLock()
defer b.busyMu.RUnlock()
return b.last
}

func (b *base) setBusy(busy bool) {
b.busyMu.Lock()
defer b.busyMu.Unlock()
b.busy = busy
b.last = time.Now()
}

func (b *base) ID() uuid.UUID {
return b.id
}

func (b *base) SetID(id uuid.UUID) {
b.id = id
}

func (b *base) Name() string {
return b.name
}

func (b *base) SetName(name string) {
b.name = name
}

func (b *base) Type() Type {
return b.t
}

func (b *base) IsReadOnly() bool {
return b.readonly
}

func (b *base) GetVars() *VarStorage {
return b.VarStorage
}

func (b *base) SetVars(vars *VarStorage) {
b.VarStorage = vars
}

func (b *base) MergeVars(vars *VarStorage) {
b.VarStorage.Merge(vars)
}

func (b *base) tryOut(ctx context.Context, out chan<- OutputInstance, instance OutputInstance) {
select {
case <-ctx.Done():
case out <- instance:
}
}
42 changes: 40 additions & 2 deletions backend/workflow/node/node.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
package node

import (
"fmt"
"time"

"github.com/ghostsecurity/reaper/backend/workflow/transmission"
"github.com/google/uuid"
"golang.org/x/net/context"
Expand All @@ -14,6 +17,11 @@ const (
ChannelActivity Channel = "activity"
)

type Input struct {
Last bool
Data map[string]transmission.Transmission
}

type Output struct {
Node uuid.UUID
Channel Channel
Expand Down Expand Up @@ -45,8 +53,34 @@ const (
TypeStart
TypeSender
TypeVariables
TypeDelay
)

func FromType(t Type) (Node, error) {
var real Node
switch t {
case TypeFuzzer:
real = NewFuzzer()
case TypeStatusFilter:
real = NewStatusFilter()
case TypeOutput:
real = NewOutput()
case TypeRequest:
real = NewRequest()
case TypeStart:
real = NewStart()
case TypeSender:
real = NewSender()
case TypeVariables:
real = NewVars()
case TypeDelay:
real = NewDelay()
default:
return nil, fmt.Errorf("unknown node type: %v", t)
}
return real, nil
}

type Node interface {
IsReadOnly() bool
ID() uuid.UUID
Expand All @@ -57,12 +91,16 @@ type Node interface {
GetInputs() Connectors
SetStaticInputValues(map[string]transmission.Transmission) error
AddStaticInputValue(string, transmission.Transmission) error
GetInjections() map[string]transmission.Transmission
GetOutputs() Connectors
GetVars() *VarStorage
SetVars(*VarStorage)
Run(context.Context, map[string]transmission.Transmission, chan<- Output, bool) (<-chan OutputInstance, <-chan error)
MergeVars(*VarStorage)
Validate(params map[string]transmission.Transmission) error
LastInput() time.Time
Busy() bool

GetInjections() map[string]transmission.Transmission
Start(context.Context, <-chan Input, chan<- OutputInstance, chan<- Output) error
}

type OutputInstance struct {
Expand Down
78 changes: 78 additions & 0 deletions backend/workflow/node/node_delay.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
package node

import (
"fmt"
"time"

"github.com/ghostsecurity/reaper/backend/workflow/transmission"
"golang.org/x/net/context"
)

type DelayNode struct {
*base
noInjections
}

func NewDelay() *DelayNode {
return &DelayNode{
base: newBase(
"Delay",
TypeDelay,
false,
NewVarStorage(
Connectors{
NewConnector("input", transmission.TypeAny, true),
NewConnector("delay", transmission.TypeInt, false, "in milliseconds"),
},
Connectors{
NewConnector("output", transmission.TypeAny, true),
},
map[string]transmission.Transmission{
"delay": transmission.NewInt(1000),
},
),
),
}
}

func (n *DelayNode) Start(ctx context.Context, in <-chan Input, out chan<- OutputInstance, _ chan<- Output) error {

delay, err := n.ReadInputInt("delay", nil)
if err != nil {
return err
}

defer n.setBusy(false)

for {
select {
case <-ctx.Done():
return ctx.Err()
case input, ok := <-in:
if !ok {
return nil
}

n.setBusy(true)

if input.Data == nil {
return fmt.Errorf("input is nil")
}

raw, err := n.ReadValue("input", input.Data)
if err != nil {
return fmt.Errorf("input not found: %v", err)
}

time.Sleep(time.Duration(delay) * time.Millisecond)

n.tryOut(ctx, out, OutputInstance{
OutputName: "output",
Complete: input.Last,
Data: raw,
})

n.setBusy(false)
}
}
}
Loading

0 comments on commit 034d9f3

Please sign in to comment.