DAG Execution Engine
Pentora's DAG (Directed Acyclic Graph) engine orchestrates scan execution by managing module dependencies, enabling parallel execution, and coordinating data flow between stages.
What is a DAG?
A Directed Acyclic Graph is a mathematical structure where:
- Nodes represent modules (discovery, port scan, fingerprint, etc.)
- Edges represent dependencies (execution order and data flow)
- Directed means edges have a direction (from producer to consumer)
- Acyclic means no cycles (no circular dependencies)
Why Use a DAG?
1. Explicit Dependencies
Modules declare what they need:
Banner Grab Module:
Requires: open_ports (from Port Scanner)
Produces: banners
Fingerprint Module:
Requires: banners (from Banner Grab)
Produces: service_fingerprints
2. Parallel Execution
Independent modules run concurrently:
Discovery
↓
Port Scanner
↙ ↓ ↘
Banner Banner Banner
(Port 22) (80) (443)
↘ ↓ ↙
Fingerprinter
↓
Asset Profiler
Banners can be grabbed from multiple ports simultaneously.
3. Failure Isolation
If one branch fails, others continue:
Port Scanner
↙ ↘
Banner 22 Banner 80
↓ ↓
Failed! Fingerprint 80
↓
Success!
4. Deterministic Execution
DAG ensures consistent, reproducible results regardless of hardware or timing.
DAG Structure
Node Definition
Each node represents a module instance:
nodes:
- instance_id: target_ingestion
module_type: target_parser
depends_on: []
config:
blocklists: [127.0.0.0/8]
- instance_id: discovery
module_type: icmp_discovery
depends_on: [target_ingestion]
config:
timeout: 2s
retry: 2
- instance_id: port_scan
module_type: syn_scanner
depends_on: [discovery]
config:
ports: 1-1000
rate: 1000
- instance_id: banner_grab
module_type: banner_grabber
depends_on: [port_scan]
config:
timeout: 5s
- instance_id: fingerprint
module_type: fingerprint_parser
depends_on: [banner_grab]
config:
catalog: builtin
- instance_id: reporter
module_type: json_reporter
depends_on: [fingerprint]
config:
output: results.jsonl
Dependency Declaration
Modules specify dependencies via depends_on:
- No dependencies:
depends_on: [](root nodes) - Single dependency:
depends_on: [discovery] - Multiple dependencies:
depends_on: [port_scan, banner_grab]
Validation
DAG validator checks:
- No cycles: Dependency chains cannot loop
- All dependencies exist: Referenced nodes must be defined
- Unique IDs: No duplicate
instance_idvalues - Valid module types: Module must be registered
- Configuration validity: Module config must match schema
# Validate DAG definition
pentora dag validate scan-profile.yaml
Execution Model
Phases
DAG execution proceeds in phases:
Phase 1: Planning
- Parse DAG definition (YAML/JSON)
- Validate structure (cycles, dependencies, config)
- Resolve modules (lookup registered implementations)
- Build execution plan (topological sort)
Phase 2: Execution
- Initialize DataContext (shared key-value store)
- Execute nodes in dependency order:
- Wait for all dependencies to complete
- Read required inputs from DataContext
- Execute module logic
- Write outputs to DataContext
- Mark node as completed
- Handle failures (skip dependents or retry)
- Cleanup resources
Topological Sort
Determines execution order respecting dependencies:
Input DAG:
A → B → D
A → C → D
Topological Sort:
[A, B, C, D] or [A, C, B, D]
(Both valid; B and C can run in parallel)
Parallelism
Nodes without dependencies between them run concurrently:
// Pseudocode execution
plan := TopologicalSort(dag)
for _, layer := range GroupByLevel(plan) {
var wg sync.WaitGroup
for _, node := range layer {
wg.Add(1)
go func(n Node) {
defer wg.Done()
ExecuteNode(n, context)
}(node)
}
wg.Wait() // Wait for entire layer before next
}
Example layer grouping:
Layer 0: [Target Ingestion]
Layer 1: [Discovery]
Layer 2: [Port Scanner]
Layer 3: [Banner Grab Port 22, Banner Grab Port 80, Banner Grab Port 443]
Layer 4: [Fingerprint Parser]
Layer 5: [Asset Profiler]
Layer 6: [Vulnerability Evaluator]
Layer 7: [Reporter]
Nodes in Layer 3 execute in parallel (3x speedup).
Data Flow
DataContext
Shared state container passed through DAG:
type DataContext interface {
// Read value by key
Get(key string) (interface{}, error)
// Write value
Set(key string, value interface{}) error
// Check key existence
Has(key string) bool
// List all keys
Keys() []string
}
Standard Keys
Modules use conventional keys for interoperability:
| Key | Producer | Consumer | Type |
|---|---|---|---|
targets | Target Ingestion | Discovery | []Target |
discovered_hosts | Discovery | Port Scanner | []Host |
open_ports | Port Scanner | Banner Grab | []Port |
banners | Banner Grab | Fingerprint Parser | []Banner |
service_fingerprints | Fingerprint | Asset Profiler | []Fingerprint |
asset_profiles | Asset Profiler | Vuln Evaluator | []AssetProfile |
vulnerabilities | Vuln Evaluator | Reporter | []Vulnerability |
Example Flow
// Discovery module writes hosts
context.Set("discovered_hosts", []Host{
{IP: "192.168.1.100", Latency: 1.2},
{IP: "192.168.1.101", Latency: 2.5},
})
// Port scanner reads hosts
hosts, _ := context.Get("discovered_hosts")
for _, host := range hosts.([]Host) {
ports := ScanPorts(host)
// ...
}
// Port scanner writes ports
context.Set("open_ports", []Port{
{Host: "192.168.1.100", Port: 22, Protocol: "tcp"},
{Host: "192.168.1.100", Port: 80, Protocol: "tcp"},
})
Data Isolation
Each scan gets its own DataContext instance:
- No interference between concurrent scans
- Memory garbage collected after scan completion
- Thread-safe access (mutex-protected)
Failure Handling
Failure Modes
- Node failure: Module execution error (timeout, crash, assertion)
- Dependency failure: Required input missing from DataContext
- Configuration error: Invalid module config
Strategies
Fail-Fast (Default)
Stop entire DAG on first error:
engine:
fail_fast: true
Discovery → Port Scan → FAILED!
↓
[Execution Stopped]
Use case: Critical failures where partial results are useless.
Continue-on-Error
Skip failed node and dependents, continue other branches:
engine:
fail_fast: false
Port Scan → Banner Grab (Port 80) → Fingerprint
↓
Banner Grab (Port 22) → FAILED!
↓
[Skipped dependents]
Use case: Large scans where partial results are valuable.
Retry Logic
Retry transient failures with backoff:
engine:
retry:
enabled: true
max_attempts: 3
backoff: exponential # 1s, 2s, 4s
retry_on:
- timeout
- network_error
Attempt 1: FAILED (timeout)
↓ wait 1s
Attempt 2: FAILED (timeout)
↓ wait 2s
Attempt 3: SUCCESS
Use case: Network instability, rate limiting, transient errors.
Dependent Skipping
When a node fails, dependents are automatically skipped:
Node A → FAILED
↓
Node B (depends on A) → SKIPPED
↓
Node C (depends on B) → SKIPPED
Reporter always runs to capture partial results and errors.
Error Context
Failed nodes record detailed error information:
{
"node": "port_scan",
"status": "failed",
"error": "timeout after 30s",
"stack_trace": "...",
"timestamp": "2023-10-06T14:35:22Z",
"retry_attempts": 3
}
Available in scan status and logs.
Orchestrator Architecture
Components
┌──────────────────────────────────────┐
│ Orchestrator │
├──────────────────────────────────────┤
│ - DAG Parser │
│ - Validator │
│ - Planner (Topological Sort) │
│ - Executor (Node Runner) │
│ - DataContext Manager │
│ - Error Handler │
└──────────────────────────────────────┘
↓ ↓
┌────────────────┐ ┌────────────────┐
│ Module Registry│ │ Event Bus │
└────────────────┘ └────────────────┘
Orchestrator Interface
type Orchestrator interface {
// Load DAG from definition
LoadDAG(definition []byte) error
// Validate DAG structure
Validate() error
// Execute DAG with context
Execute(ctx context.Context) (*Result, error)
// Get execution status
Status() *Status
// Cancel execution
Cancel() error
}
Module Registry
Maintains mapping of module types to implementations:
registry := module.NewRegistry()
// Register builtin modules
registry.Register("icmp_discovery", &discovery.ICMPModule{})
registry.Register("syn_scanner", &scanner.SYNModule{})
registry.Register("banner_grabber", &scanner.BannerModule{})
registry.Register("fingerprint_parser", &fingerprint.ParserModule{})
// Register external plugin
registry.RegisterPlugin("custom_vuln_check", "/path/to/plugin.so")
See Module System for registration details.
Configuration
DAG Definition
Define custom scan flows:
# custom-scan.yaml
name: web-application-scan
description: Focused scan for web applications
nodes:
- instance_id: targets
module_type: target_parser
depends_on: []
- instance_id: discover
module_type: tcp_probe_discovery
depends_on: [targets]
config:
ports: [80, 443, 8080, 8443]
- instance_id: http_scan
module_type: http_scanner
depends_on: [discover]
config:
methods: [GET, HEAD, OPTIONS]
headers:
User-Agent: Pentora/1.0
- instance_id: ssl_scan
module_type: ssl_analyzer
depends_on: [discover]
config:
check_cert: true
check_ciphers: true
- instance_id: webapp_fingerprint
module_type: webapp_fingerprinter
depends_on: [http_scan]
- instance_id: report
module_type: json_reporter
depends_on: [webapp_fingerprint, ssl_scan]
Using Custom DAGs
# Execute custom DAG
pentora scan --targets example.com --dag custom-scan.yaml
# Validate DAG before execution
pentora dag validate custom-scan.yaml
Built-in DAGs
Pentora includes predefined DAGs for common scenarios:
standard.yaml: Full 9-stage pipelinediscovery-only.yaml: Target ingestion + discovery + reportingport-scan.yaml: Discovery + port scan + reportingvuln-scan.yaml: Full pipeline with vulnerability evaluation
Accessed via scan profiles:
pentora scan --targets 192.168.1.0/24 --profile standard
# Uses builtin/standard.yaml DAG
Observability
Logging
Each node logs with structured context:
{
"level": "info",
"timestamp": "2023-10-06T14:30:45Z",
"component": "orchestrator",
"node": "port_scan",
"message": "Node execution started"
}
{
"level": "info",
"timestamp": "2023-10-06T14:31:15Z",
"component": "orchestrator",
"node": "port_scan",
"duration_ms": 30200,
"message": "Node execution completed"
}
Event Hooks
Subscribe to execution events:
orchestrator.OnNodeStart(func(node Node) {
fmt.Printf("Starting %s\n", node.ID)
})
orchestrator.OnNodeComplete(func(node Node, result Result) {
fmt.Printf("Completed %s in %v\n", node.ID, result.Duration)
})
orchestrator.OnNodeFailed(func(node Node, err error) {
fmt.Printf("Failed %s: %v\n", node.ID, err)
})
See Hook System for event details.
Progress Tracking
Track execution progress:
pentora scan --targets 192.168.1.0/24 --progress
[=====> ] 25% (2/8 nodes completed)
Currently running: port_scan, banner_grab
Progress events available via API for UI integration.
Performance Tuning
Concurrency Limits
Control parallelism:
engine:
max_parallel_nodes: 10 # Max concurrent nodes
max_parallel_targets: 100 # Max concurrent targets per node
Low concurrency: Lower memory, slower execution High concurrency: Higher memory, faster execution
Memory Management
DataContext can grow large with many targets:
engine:
data_context:
max_size: 1GB # Limit context size
evict_policy: lru # Least-recently-used eviction
Execution Timeout
Prevent hung scans:
engine:
global_timeout: 1h # Abort after 1 hour
node_timeout: 10m # Abort individual node after 10 min
Advanced Topics
Dynamic DAG Construction
Build DAGs programmatically:
builder := dag.NewBuilder()
builder.AddNode("targets", "target_parser", nil)
builder.AddNode("discover", "icmp_discovery", []string{"targets"})
// Conditionally add vulnerability checks
if enableVuln {
builder.AddNode("vuln", "cve_matcher", []string{"discover"})
}
dag := builder.Build()
orchestrator.LoadDAG(dag)
Sub-DAGs
Compose reusable DAG fragments:
# discovery-subdag.yaml
nodes:
- instance_id: icmp
module_type: icmp_discovery
- instance_id: arp
module_type: arp_discovery
- instance_id: merge
module_type: discovery_merger
depends_on: [icmp, arp]
# main-dag.yaml
nodes:
- instance_id: targets
module_type: target_parser
- instance_id: discover
module_type: subdag
subdag_file: discovery-subdag.yaml
depends_on: [targets]
- instance_id: scan
module_type: port_scanner
depends_on: [discover]
Conditional Execution
Skip nodes based on runtime conditions:
nodes:
- instance_id: vuln_check
module_type: cve_matcher
depends_on: [fingerprint]
condition: ${vuln_enabled} # Variable from context
Troubleshooting
Cycle Detection
Error: Cycle detected in DAG: A → B → C → A
Solution: Remove circular dependency. Restructure modules or split into multiple DAG runs.
Missing Dependency
Error: Node 'banner_grab' depends on 'port_scan' which does not exist
Solution: Ensure all dependencies are defined in DAG.
Data Not Available
Error: Node 'fingerprint' requires 'banners' but key not found in DataContext
Solution: Check that producer module is running and writing expected key. Enable debug logging to trace data flow.
Deadlock
If execution hangs:
# Enable detailed logging
pentora scan --targets 192.168.1.100 --log-level debug
# Check for circular dependencies
pentora dag validate my-dag.yaml
Next Steps
- Module System - Writing custom modules
- Data Flow - DataContext internals
- Hook System - Event-driven customization
- Engine Architecture - Implementation details