Wazuh Custom Integrations and Scripting Guide
Wazuh 4.14 provides several extensibility mechanisms: custom integrations through integratord, Active Response scripts for automated remediation, wodle modules for extending agents, and webhook receivers for ingesting external events. Each mechanism suits different scenarios and has its own implementation requirements.
Custom Integration Architecture
┌─────────────────────────────────────────────────────┐
│ Wazuh Manager │
│ │
│ Alert ──▶ integratord ──▶ Custom Script ──▶ API │
│ │
│ Rule ──▶ Active Response ──▶ Agent Script │
│ │
│ Wodle ──▶ Custom Module ──▶ Log Output ──▶ Rules │
│ │
│ API ◀── Webhook Receiver ◀── External System │
└─────────────────────────────────────────────────────┘Integratord - Input Data Format
The wazuh-integratord daemon invokes the integration script for each matching alert, passing four arguments:
| Argument | Description |
|---|---|
sys.argv[1] | Path to the JSON alert file |
sys.argv[2] | API key (from <api_key> in ossec.conf) |
sys.argv[3] | Hook URL (from <hook_url> in ossec.conf) |
sys.argv[4] | Path to the full alert file |
JSON Alert Structure
The alert file contains a JSON object with the following structure:
{
"timestamp": "2024-01-15T10:30:45.123+0000",
"rule": {
"level": 10,
"description": "sshd: authentication failure",
"id": "5710",
"mitre": {
"id": ["T1110"],
"tactic": ["Credential Access"],
"technique": ["Brute Force"]
},
"groups": ["syslog", "sshd", "authentication_failed"],
"pci_dss": ["10.2.4", "10.2.5"],
"gdpr": ["IV_35.7.d", "IV_32.2"]
},
"agent": {
"id": "001",
"name": "web-server-01",
"ip": "192.168.1.10"
},
"manager": {
"name": "wazuh-manager"
},
"data": {
"srcip": "10.0.0.50",
"srcport": "54321",
"dstuser": "root"
},
"full_log": "Jan 15 10:30:45 web-server-01 sshd[12345]: Failed password for root from 10.0.0.50 port 54321 ssh2",
"decoder": {
"name": "sshd"
},
"location": "/var/log/auth.log"
}Integration Script Template
#!/usr/bin/env python3
"""Template for a custom Wazuh integration script."""
import sys
import json
import logging
from pathlib import Path
LOG_FILE = "/var/ossec/logs/integrations.log"
logging.basicConfig(
filename=LOG_FILE,
level=logging.INFO,
format="%(asctime)s %(levelname)s [custom-integration] %(message)s",
)
def process_alert(alert: dict, api_key: str, hook_url: str) -> None:
"""Process a Wazuh alert and forward to an external system."""
rule = alert.get("rule", {})
agent = alert.get("agent", {})
logging.info(
"Processing alert: rule=%s level=%s agent=%s",
rule.get("id"),
rule.get("level"),
agent.get("name"),
)
payload = {
"source": "wazuh",
"alert_id": rule.get("id"),
"level": rule.get("level"),
"description": rule.get("description"),
"agent_name": agent.get("name"),
"agent_ip": agent.get("ip"),
"timestamp": alert.get("timestamp"),
"mitre": rule.get("mitre", {}),
}
# Send payload to external system
# requests.post(hook_url, json=payload, headers={"Authorization": f"Bearer {api_key}"})
def main():
if len(sys.argv) < 4:
logging.error("Insufficient arguments: expected 4, got %d", len(sys.argv) - 1)
sys.exit(1)
alert_file = sys.argv[1]
api_key = sys.argv[2]
hook_url = sys.argv[3]
try:
alert = json.loads(Path(alert_file).read_text())
process_alert(alert, api_key, hook_url)
except json.JSONDecodeError as e:
logging.error("Failed to parse alert JSON: %s", e)
sys.exit(1)
except Exception as e:
logging.error("Integration error: %s", e)
sys.exit(1)
if __name__ == "__main__":
main()Installation and Registration
# Copy the script to the integrations directory
cp custom-myintegration /var/ossec/integrations/
chmod 750 /var/ossec/integrations/custom-myintegration
chown root:wazuh /var/ossec/integrations/custom-myintegration
# Add the configuration to ossec.conf
# <integration>
# <name>custom-myintegration</name>
# <hook_url>https://api.example.com/alerts</hook_url>
# <api_key>YOUR_API_KEY</api_key>
# <level>7</level>
# <alert_format>json</alert_format>
# </integration>
# Restart the manager
/var/ossec/bin/wazuh-control restartKey requirements for integration scripts:
- The filename must begin with
custom- - The file must be executable (
chmod 750) - Ownership:
root:wazuh - The script should complete within a reasonable time (recommended timeout < 10 seconds)
- Logging goes to
/var/ossec/logs/integrations.log
Active Response - Remediation Scripts
Active Response enables automated actions on agents when rules fire. Scripts are stored in /var/ossec/active-response/bin/ on the agent.
Active Response Input Format
Since Wazuh 4.2, Active Response scripts receive data via stdin in JSON format:
{
"version": 1,
"origin": {
"name": "node01",
"module": "wazuh-execd"
},
"command": "add",
"parameters": {
"extra_args": [],
"alert": {
"timestamp": "2024-01-15T10:30:45.123+0000",
"rule": {
"level": 10,
"id": "5710",
"description": "sshd: authentication failure"
},
"agent": {
"id": "001",
"name": "web-server-01"
},
"data": {
"srcip": "10.0.0.50"
}
}
}
}The command field accepts:
add- execute the action (e.g., block an IP)delete- reverse the action (e.g., unblock an IP after timeout)
Python Active Response Script
#!/usr/bin/env python3
"""Custom Active Response script for Wazuh - IP blocking via API."""
import sys
import json
import logging
import requests
LOG_FILE = "/var/ossec/logs/active-responses.log"
logging.basicConfig(
filename=LOG_FILE,
level=logging.INFO,
format="%(asctime)s %(levelname)s [custom-ar] %(message)s",
)
FIREWALL_API = "https://firewall.example.com/api"
FIREWALL_TOKEN = "YOUR_TOKEN"
def block_ip(ip_address: str) -> bool:
"""Block an IP address via the firewall API."""
payload = {
"action": "block",
"source": ip_address,
"duration": 3600,
"reason": "Wazuh Active Response - automated block",
}
try:
response = requests.post(
f"{FIREWALL_API}/rules",
json=payload,
headers={"Authorization": f"Bearer {FIREWALL_TOKEN}"},
timeout=10,
)
response.raise_for_status()
logging.info("Blocked IP %s via firewall API", ip_address)
return True
except requests.RequestException as e:
logging.error("Failed to block IP %s: %s", ip_address, e)
return False
def unblock_ip(ip_address: str) -> bool:
"""Unblock an IP address via the firewall API."""
try:
response = requests.delete(
f"{FIREWALL_API}/rules/{ip_address}",
headers={"Authorization": f"Bearer {FIREWALL_TOKEN}"},
timeout=10,
)
response.raise_for_status()
logging.info("Unblocked IP %s via firewall API", ip_address)
return True
except requests.RequestException as e:
logging.error("Failed to unblock IP %s: %s", ip_address, e)
return False
def main():
input_data = json.loads(sys.stdin.read())
command = input_data.get("command")
alert = input_data.get("parameters", {}).get("alert", {})
src_ip = alert.get("data", {}).get("srcip")
if not src_ip:
logging.warning("No source IP in alert, skipping")
return
if command == "add":
block_ip(src_ip)
elif command == "delete":
unblock_ip(src_ip)
else:
logging.warning("Unknown command: %s", command)
if __name__ == "__main__":
main()Bash Active Response Script
#!/bin/bash
# Custom Active Response script - block IP via iptables with logging
LOG_FILE="/var/ossec/logs/active-responses.log"
log() {
echo "$(date '+%Y-%m-%d %H:%M:%S') $1 [custom-block] $2" >> "${LOG_FILE}"
}
# Read JSON from stdin
read -r INPUT
COMMAND=$(echo "${INPUT}" | jq -r '.command')
SRCIP=$(echo "${INPUT}" | jq -r '.parameters.alert.data.srcip // empty')
if [ -z "${SRCIP}" ]; then
log "WARNING" "No source IP in alert"
exit 0
fi
# Validate IP address format
if ! echo "${SRCIP}" | grep -qP '^\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}$'; then
log "ERROR" "Invalid IP format: ${SRCIP}"
exit 1
fi
case "${COMMAND}" in
add)
if ! iptables -C INPUT -s "${SRCIP}" -j DROP 2>/dev/null; then
iptables -I INPUT -s "${SRCIP}" -j DROP
log "INFO" "Blocked IP: ${SRCIP}"
else
log "INFO" "IP already blocked: ${SRCIP}"
fi
;;
delete)
if iptables -C INPUT -s "${SRCIP}" -j DROP 2>/dev/null; then
iptables -D INPUT -s "${SRCIP}" -j DROP
log "INFO" "Unblocked IP: ${SRCIP}"
else
log "INFO" "IP not in blocklist: ${SRCIP}"
fi
;;
*)
log "WARNING" "Unknown command: ${COMMAND}"
;;
esac
exit 0Registering Active Response
To activate a script, register both the command and the response binding in ossec.conf on the manager:
<!-- Command definition -->
<command>
<name>custom-block</name>
<executable>custom-block</executable>
<timeout_allowed>yes</timeout_allowed>
</command>
<!-- Rule binding -->
<active-response>
<command>custom-block</command>
<location>local</location>
<rules_id>5710,5712</rules_id>
<timeout>3600</timeout>
</active-response><active-response> parameters:
| Parameter | Description |
|---|---|
<location> | local (on the agent), server (on the manager), defined-agent (specific agent), all (all agents) |
<rules_id> | Comma-separated rule IDs that trigger the script |
<rules_group> | Rule group for triggering |
<level> | Minimum alert level |
<timeout> | Seconds before the action is automatically reversed (delete command) |
For more on the Active Response module, see the Wazuh Capabilities section.
Custom Wodle Modules
A wodle (Wazuh Module) is an agent extension mechanism for running arbitrary tasks on a schedule. Wodle output is written to a log file, which the rule engine then analyzes.
Wodle Command
The simplest way to create a custom wodle is through the built-in command wodle:
<wodle name="command">
<disabled>no</disabled>
<tag>system-audit</tag>
<command>/var/ossec/wodles/custom-audit.sh</command>
<interval>1h</interval>
<ignore_output>no</ignore_output>
<run_on_start>yes</run_on_start>
<timeout>120</timeout>
</wodle>Example Wodle: Privileged Process Audit
#!/bin/bash
# Custom wodle: audit processes running as root with network connections
TIMESTAMP=$(date -u '+%Y-%m-%dT%H:%M:%S.000Z')
# Find root processes with established connections
ss -tnp | grep ESTAB | while IFS= read -r line; do
PID=$(echo "${line}" | grep -oP 'pid=\K\d+')
if [ -n "${PID}" ]; then
PROC_USER=$(ps -o user= -p "${PID}" 2>/dev/null)
PROC_NAME=$(ps -o comm= -p "${PID}" 2>/dev/null)
if [ "${PROC_USER}" = "root" ]; then
LOCAL=$(echo "${line}" | awk '{print $4}')
REMOTE=$(echo "${line}" | awk '{print $5}')
echo "{\"timestamp\":\"${TIMESTAMP}\",\"wodle\":\"system-audit\",\"process\":\"${PROC_NAME}\",\"pid\":${PID},\"user\":\"root\",\"local_addr\":\"${LOCAL}\",\"remote_addr\":\"${REMOTE}\"}"
fi
fi
doneRules for Wodle Output
<rule id="100300" level="0">
<decoded_as>json</decoded_as>
<field name="wodle">system-audit</field>
<description>Custom wodle: system audit event</description>
<group>wodle,system_audit,</group>
</rule>
<rule id="100301" level="8">
<if_sid>100300</if_sid>
<field name="user">root</field>
<field name="remote_addr" negate="yes">127.0.0.1</field>
<description>Root process $(process) has external network connection to $(remote_addr)</description>
<mitre>
<id>T1071</id>
</mitre>
<group>wodle,system_audit,suspicious_network,</group>
</rule>Webhook Receivers
A webhook receiver accepts events from external systems and converts them into Wazuh alerts.
Webhook Receiver Architecture
External System ──▶ Webhook Receiver ──▶ Log File ──▶ Wazuh Agent ──▶ Rules
(Python/Flask) (/var/log/) (localfile)Example: Flask Receiver for GitHub Events
#!/usr/bin/env python3
"""Webhook receiver for GitHub security events."""
import json
import hmac
import hashlib
import logging
from pathlib import Path
from flask import Flask, request, abort
app = Flask(__name__)
WEBHOOK_SECRET = "YOUR_GITHUB_WEBHOOK_SECRET"
LOG_FILE = Path("/var/log/github-security.log")
logging.basicConfig(level=logging.INFO)
def verify_signature(payload: bytes, signature: str) -> bool:
"""Verify the GitHub webhook signature."""
expected = "sha256=" + hmac.new(
WEBHOOK_SECRET.encode(),
payload,
hashlib.sha256,
).hexdigest()
return hmac.compare_digest(expected, signature)
@app.route("/webhook/github", methods=["POST"])
def github_webhook():
"""Handle GitHub webhook events."""
signature = request.headers.get("X-Hub-Signature-256", "")
if not verify_signature(request.data, signature):
abort(403)
event_type = request.headers.get("X-GitHub-Event", "unknown")
payload = request.get_json()
if event_type == "security_advisory":
log_entry = {
"source": "github",
"event": "security_advisory",
"severity": payload.get("security_advisory", {}).get("severity"),
"summary": payload.get("security_advisory", {}).get("summary"),
"cve_id": payload.get("security_advisory", {}).get("cve_id"),
"repository": payload.get("repository", {}).get("full_name"),
}
with open(LOG_FILE, "a") as f:
f.write(json.dumps(log_entry) + "\n")
elif event_type == "repository_vulnerability_alert":
log_entry = {
"source": "github",
"event": "vulnerability_alert",
"package": payload.get("alert", {}).get("affected_package_name"),
"severity": payload.get("alert", {}).get("severity"),
"repository": payload.get("repository", {}).get("full_name"),
}
with open(LOG_FILE, "a") as f:
f.write(json.dumps(log_entry) + "\n")
return "", 200
if __name__ == "__main__":
app.run(host="0.0.0.0", port=8080)Wazuh Configuration for Webhook Logs
On the agent running the webhook receiver:
<localfile>
<log_format>json</log_format>
<location>/var/log/github-security.log</location>
</localfile>Rules for Webhook Events
<rule id="100310" level="0">
<decoded_as>json</decoded_as>
<field name="source">github</field>
<description>GitHub webhook event received</description>
<group>github,webhook,</group>
</rule>
<rule id="100311" level="10">
<if_sid>100310</if_sid>
<field name="event">security_advisory</field>
<field name="severity">critical</field>
<description>GitHub critical security advisory: $(summary)</description>
<mitre>
<id>T1190</id>
</mitre>
<group>github,vulnerability,critical,</group>
</rule>
<rule id="100312" level="7">
<if_sid>100310</if_sid>
<field name="event">vulnerability_alert</field>
<description>GitHub vulnerability alert for package $(package) in $(repository)</description>
<group>github,vulnerability,</group>
</rule>Alert Processing with Python
For batch alert processing, use the API or read the alerts file directly.
Reading Alerts from the JSON Log
#!/usr/bin/env python3
"""Batch alert processing from Wazuh alerts.json."""
import json
from pathlib import Path
from collections import Counter
from typing import Iterator
ALERTS_FILE = Path("/var/ossec/logs/alerts/alerts.json")
def read_alerts(filepath: Path, max_lines: int = 10000) -> Iterator[dict]:
"""Read alerts from the JSON log file."""
with open(filepath) as f:
for i, line in enumerate(f):
if i >= max_lines:
break
try:
yield json.loads(line.strip())
except json.JSONDecodeError:
continue
def analyze_alerts(alerts: Iterator[dict]) -> dict:
"""Analyze alert distribution."""
level_counter = Counter()
rule_counter = Counter()
agent_counter = Counter()
mitre_counter = Counter()
for alert in alerts:
rule = alert.get("rule", {})
level_counter[rule.get("level", 0)] += 1
rule_counter[rule.get("id", "unknown")] += 1
agent_counter[alert.get("agent", {}).get("name", "unknown")] += 1
for technique in rule.get("mitre", {}).get("id", []):
mitre_counter[technique] += 1
return {
"by_level": dict(level_counter.most_common(10)),
"top_rules": dict(rule_counter.most_common(10)),
"top_agents": dict(agent_counter.most_common(10)),
"top_mitre": dict(mitre_counter.most_common(10)),
}
if __name__ == "__main__":
alerts = read_alerts(ALERTS_FILE)
report = analyze_alerts(alerts)
print(json.dumps(report, indent=2))Streaming via the API
#!/usr/bin/env python3
"""Real-time alert monitoring via the Wazuh API."""
import time
import json
import requests
import urllib3
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
WAZUH_API = "https://localhost:55000"
CREDENTIALS = ("wazuh-wui", "YOUR_PASSWORD")
POLL_INTERVAL = 30
def get_token() -> str:
"""Obtain a JWT token."""
response = requests.post(
f"{WAZUH_API}/security/user/authenticate?raw=true",
auth=CREDENTIALS,
verify=False,
)
response.raise_for_status()
return response.text
def get_recent_alerts(token: str, minutes: int = 5) -> list[dict]:
"""Fetch alerts from the last N minutes via the indexer."""
headers = {"Authorization": f"Bearer {token}"}
response = requests.get(
f"{WAZUH_API}/manager/logs",
headers=headers,
params={
"limit": 100,
"sort": "-timestamp",
"tag": "ossec-analysisd",
},
verify=False,
)
response.raise_for_status()
return response.json().get("data", {}).get("affected_items", [])
def process_alert(alert: dict) -> None:
"""Process a single alert."""
level = alert.get("level", "")
description = alert.get("description", "")
timestamp = alert.get("timestamp", "")
print(f"[{timestamp}] Level {level}: {description}")
def main():
token = get_token()
while True:
try:
alerts = get_recent_alerts(token)
for alert in alerts:
process_alert(alert)
except requests.exceptions.HTTPError as e:
if e.response.status_code == 401:
token = get_token()
continue
raise
time.sleep(POLL_INTERVAL)
if __name__ == "__main__":
main()Troubleshooting
Integration Script Does Not Run
- Check file permissions and ownership:
ls -la /var/ossec/integrations/custom-*
# Expected: -rwxr-x--- root wazuh- Verify the filename starts with
custom-:
# Correct: custom-myintegration
# Wrong: myintegration, integration-custom- Review integratord logs:
tail -f /var/ossec/logs/integrations.logActive Response Script Does Not Execute
- Confirm the script exists on the agent:
ls -la /var/ossec/active-response/bin/custom-block- Verify the command is registered in
ossec.conf:
grep -A3 "custom-block" /var/ossec/etc/ossec.conf- Check Active Response logs:
tail -f /var/ossec/logs/active-responses.log- Test the script manually:
echo '{"version":1,"command":"add","parameters":{"alert":{"data":{"srcip":"10.0.0.1"}}}}' \
| /var/ossec/active-response/bin/custom-blockWodle Not Generating Events
- Verify the wodle is enabled (
<disabled>no</disabled>) - Confirm the wodle script writes output to stdout
- Check that the script has execute permissions
- Review manager logs:
grep "wodle" /var/ossec/logs/ossec.logWebhook Receiver Not Creating Alerts
- Verify the log file is being populated:
tail -f /var/log/github-security.log- Confirm the
<localfile>entry on the agent points to the correct file - Verify the log format matches the
<log_format>setting (json/syslog) - Test decoding through
wazuh-logtest:
/var/ossec/bin/wazuh-logtestEnter a line from the log file and verify that the decoder and rule fire correctly.
For built-in integration configuration, see the SIEM Integrations section. The API reference for programmatic access is available in the REST API Reference .