Wazuh Custom Integrations and Scripting Guide

Wazuh 4.14 provides several extensibility mechanisms: custom integrations through integratord, Active Response scripts for automated remediation, wodle modules for extending agents, and webhook receivers for ingesting external events. Each mechanism suits different scenarios and has its own implementation requirements.

Custom Integration Architecture

┌─────────────────────────────────────────────────────┐
│                  Wazuh Manager                      │
│                                                     │
│  Alert  ──▶ integratord ──▶ Custom Script ──▶ API   │
│                                                     │
│  Rule   ──▶ Active Response ──▶ Agent Script        │
│                                                     │
│  Wodle  ──▶ Custom Module ──▶ Log Output ──▶ Rules  │
│                                                     │
│  API    ◀── Webhook Receiver ◀── External System    │
└─────────────────────────────────────────────────────┘

Integratord - Input Data Format

The wazuh-integratord daemon invokes the integration script for each matching alert, passing four arguments:

ArgumentDescription
sys.argv[1]Path to the JSON alert file
sys.argv[2]API key (from <api_key> in ossec.conf)
sys.argv[3]Hook URL (from <hook_url> in ossec.conf)
sys.argv[4]Path to the full alert file

JSON Alert Structure

The alert file contains a JSON object with the following structure:

{
  "timestamp": "2024-01-15T10:30:45.123+0000",
  "rule": {
    "level": 10,
    "description": "sshd: authentication failure",
    "id": "5710",
    "mitre": {
      "id": ["T1110"],
      "tactic": ["Credential Access"],
      "technique": ["Brute Force"]
    },
    "groups": ["syslog", "sshd", "authentication_failed"],
    "pci_dss": ["10.2.4", "10.2.5"],
    "gdpr": ["IV_35.7.d", "IV_32.2"]
  },
  "agent": {
    "id": "001",
    "name": "web-server-01",
    "ip": "192.168.1.10"
  },
  "manager": {
    "name": "wazuh-manager"
  },
  "data": {
    "srcip": "10.0.0.50",
    "srcport": "54321",
    "dstuser": "root"
  },
  "full_log": "Jan 15 10:30:45 web-server-01 sshd[12345]: Failed password for root from 10.0.0.50 port 54321 ssh2",
  "decoder": {
    "name": "sshd"
  },
  "location": "/var/log/auth.log"
}

Integration Script Template

#!/usr/bin/env python3
"""Template for a custom Wazuh integration script."""
import sys
import json
import logging
from pathlib import Path

LOG_FILE = "/var/ossec/logs/integrations.log"
logging.basicConfig(
    filename=LOG_FILE,
    level=logging.INFO,
    format="%(asctime)s %(levelname)s [custom-integration] %(message)s",
)

def process_alert(alert: dict, api_key: str, hook_url: str) -> None:
    """Process a Wazuh alert and forward to an external system."""
    rule = alert.get("rule", {})
    agent = alert.get("agent", {})

    logging.info(
        "Processing alert: rule=%s level=%s agent=%s",
        rule.get("id"),
        rule.get("level"),
        agent.get("name"),
    )

    payload = {
        "source": "wazuh",
        "alert_id": rule.get("id"),
        "level": rule.get("level"),
        "description": rule.get("description"),
        "agent_name": agent.get("name"),
        "agent_ip": agent.get("ip"),
        "timestamp": alert.get("timestamp"),
        "mitre": rule.get("mitre", {}),
    }

    # Send payload to external system
    # requests.post(hook_url, json=payload, headers={"Authorization": f"Bearer {api_key}"})

def main():
    if len(sys.argv) < 4:
        logging.error("Insufficient arguments: expected 4, got %d", len(sys.argv) - 1)
        sys.exit(1)

    alert_file = sys.argv[1]
    api_key = sys.argv[2]
    hook_url = sys.argv[3]

    try:
        alert = json.loads(Path(alert_file).read_text())
        process_alert(alert, api_key, hook_url)
    except json.JSONDecodeError as e:
        logging.error("Failed to parse alert JSON: %s", e)
        sys.exit(1)
    except Exception as e:
        logging.error("Integration error: %s", e)
        sys.exit(1)

if __name__ == "__main__":
    main()

Installation and Registration

# Copy the script to the integrations directory
cp custom-myintegration /var/ossec/integrations/
chmod 750 /var/ossec/integrations/custom-myintegration
chown root:wazuh /var/ossec/integrations/custom-myintegration

# Add the configuration to ossec.conf
# <integration>
#   <name>custom-myintegration</name>
#   <hook_url>https://api.example.com/alerts</hook_url>
#   <api_key>YOUR_API_KEY</api_key>
#   <level>7</level>
#   <alert_format>json</alert_format>
# </integration>

# Restart the manager
/var/ossec/bin/wazuh-control restart

Key requirements for integration scripts:

  • The filename must begin with custom-
  • The file must be executable (chmod 750)
  • Ownership: root:wazuh
  • The script should complete within a reasonable time (recommended timeout < 10 seconds)
  • Logging goes to /var/ossec/logs/integrations.log

Active Response - Remediation Scripts

Active Response enables automated actions on agents when rules fire. Scripts are stored in /var/ossec/active-response/bin/ on the agent.

Active Response Input Format

Since Wazuh 4.2, Active Response scripts receive data via stdin in JSON format:

{
  "version": 1,
  "origin": {
    "name": "node01",
    "module": "wazuh-execd"
  },
  "command": "add",
  "parameters": {
    "extra_args": [],
    "alert": {
      "timestamp": "2024-01-15T10:30:45.123+0000",
      "rule": {
        "level": 10,
        "id": "5710",
        "description": "sshd: authentication failure"
      },
      "agent": {
        "id": "001",
        "name": "web-server-01"
      },
      "data": {
        "srcip": "10.0.0.50"
      }
    }
  }
}

The command field accepts:

  • add - execute the action (e.g., block an IP)
  • delete - reverse the action (e.g., unblock an IP after timeout)

Python Active Response Script

#!/usr/bin/env python3
"""Custom Active Response script for Wazuh - IP blocking via API."""
import sys
import json
import logging
import requests

LOG_FILE = "/var/ossec/logs/active-responses.log"
logging.basicConfig(
    filename=LOG_FILE,
    level=logging.INFO,
    format="%(asctime)s %(levelname)s [custom-ar] %(message)s",
)

FIREWALL_API = "https://firewall.example.com/api"
FIREWALL_TOKEN = "YOUR_TOKEN"

def block_ip(ip_address: str) -> bool:
    """Block an IP address via the firewall API."""
    payload = {
        "action": "block",
        "source": ip_address,
        "duration": 3600,
        "reason": "Wazuh Active Response - automated block",
    }
    try:
        response = requests.post(
            f"{FIREWALL_API}/rules",
            json=payload,
            headers={"Authorization": f"Bearer {FIREWALL_TOKEN}"},
            timeout=10,
        )
        response.raise_for_status()
        logging.info("Blocked IP %s via firewall API", ip_address)
        return True
    except requests.RequestException as e:
        logging.error("Failed to block IP %s: %s", ip_address, e)
        return False

def unblock_ip(ip_address: str) -> bool:
    """Unblock an IP address via the firewall API."""
    try:
        response = requests.delete(
            f"{FIREWALL_API}/rules/{ip_address}",
            headers={"Authorization": f"Bearer {FIREWALL_TOKEN}"},
            timeout=10,
        )
        response.raise_for_status()
        logging.info("Unblocked IP %s via firewall API", ip_address)
        return True
    except requests.RequestException as e:
        logging.error("Failed to unblock IP %s: %s", ip_address, e)
        return False

def main():
    input_data = json.loads(sys.stdin.read())
    command = input_data.get("command")
    alert = input_data.get("parameters", {}).get("alert", {})
    src_ip = alert.get("data", {}).get("srcip")

    if not src_ip:
        logging.warning("No source IP in alert, skipping")
        return

    if command == "add":
        block_ip(src_ip)
    elif command == "delete":
        unblock_ip(src_ip)
    else:
        logging.warning("Unknown command: %s", command)

if __name__ == "__main__":
    main()

Bash Active Response Script

#!/bin/bash
# Custom Active Response script - block IP via iptables with logging

LOG_FILE="/var/ossec/logs/active-responses.log"

log() {
    echo "$(date '+%Y-%m-%d %H:%M:%S') $1 [custom-block] $2" >> "${LOG_FILE}"
}

# Read JSON from stdin
read -r INPUT
COMMAND=$(echo "${INPUT}" | jq -r '.command')
SRCIP=$(echo "${INPUT}" | jq -r '.parameters.alert.data.srcip // empty')

if [ -z "${SRCIP}" ]; then
    log "WARNING" "No source IP in alert"
    exit 0
fi

# Validate IP address format
if ! echo "${SRCIP}" | grep -qP '^\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}$'; then
    log "ERROR" "Invalid IP format: ${SRCIP}"
    exit 1
fi

case "${COMMAND}" in
    add)
        if ! iptables -C INPUT -s "${SRCIP}" -j DROP 2>/dev/null; then
            iptables -I INPUT -s "${SRCIP}" -j DROP
            log "INFO" "Blocked IP: ${SRCIP}"
        else
            log "INFO" "IP already blocked: ${SRCIP}"
        fi
        ;;
    delete)
        if iptables -C INPUT -s "${SRCIP}" -j DROP 2>/dev/null; then
            iptables -D INPUT -s "${SRCIP}" -j DROP
            log "INFO" "Unblocked IP: ${SRCIP}"
        else
            log "INFO" "IP not in blocklist: ${SRCIP}"
        fi
        ;;
    *)
        log "WARNING" "Unknown command: ${COMMAND}"
        ;;
esac

exit 0

Registering Active Response

To activate a script, register both the command and the response binding in ossec.conf on the manager:

<!-- Command definition -->
<command>
  <name>custom-block</name>
  <executable>custom-block</executable>
  <timeout_allowed>yes</timeout_allowed>
</command>

<!-- Rule binding -->
<active-response>
  <command>custom-block</command>
  <location>local</location>
  <rules_id>5710,5712</rules_id>
  <timeout>3600</timeout>
</active-response>

<active-response> parameters:

ParameterDescription
<location>local (on the agent), server (on the manager), defined-agent (specific agent), all (all agents)
<rules_id>Comma-separated rule IDs that trigger the script
<rules_group>Rule group for triggering
<level>Minimum alert level
<timeout>Seconds before the action is automatically reversed (delete command)

For more on the Active Response module, see the Wazuh Capabilities section.

Custom Wodle Modules

A wodle (Wazuh Module) is an agent extension mechanism for running arbitrary tasks on a schedule. Wodle output is written to a log file, which the rule engine then analyzes.

Wodle Command

The simplest way to create a custom wodle is through the built-in command wodle:

<wodle name="command">
  <disabled>no</disabled>
  <tag>system-audit</tag>
  <command>/var/ossec/wodles/custom-audit.sh</command>
  <interval>1h</interval>
  <ignore_output>no</ignore_output>
  <run_on_start>yes</run_on_start>
  <timeout>120</timeout>
</wodle>

Example Wodle: Privileged Process Audit

#!/bin/bash
# Custom wodle: audit processes running as root with network connections

TIMESTAMP=$(date -u '+%Y-%m-%dT%H:%M:%S.000Z')

# Find root processes with established connections
ss -tnp | grep ESTAB | while IFS= read -r line; do
    PID=$(echo "${line}" | grep -oP 'pid=\K\d+')
    if [ -n "${PID}" ]; then
        PROC_USER=$(ps -o user= -p "${PID}" 2>/dev/null)
        PROC_NAME=$(ps -o comm= -p "${PID}" 2>/dev/null)
        if [ "${PROC_USER}" = "root" ]; then
            LOCAL=$(echo "${line}" | awk '{print $4}')
            REMOTE=$(echo "${line}" | awk '{print $5}')
            echo "{\"timestamp\":\"${TIMESTAMP}\",\"wodle\":\"system-audit\",\"process\":\"${PROC_NAME}\",\"pid\":${PID},\"user\":\"root\",\"local_addr\":\"${LOCAL}\",\"remote_addr\":\"${REMOTE}\"}"
        fi
    fi
done

Rules for Wodle Output

<rule id="100300" level="0">
  <decoded_as>json</decoded_as>
  <field name="wodle">system-audit</field>
  <description>Custom wodle: system audit event</description>
  <group>wodle,system_audit,</group>
</rule>

<rule id="100301" level="8">
  <if_sid>100300</if_sid>
  <field name="user">root</field>
  <field name="remote_addr" negate="yes">127.0.0.1</field>
  <description>Root process $(process) has external network connection to $(remote_addr)</description>
  <mitre>
    <id>T1071</id>
  </mitre>
  <group>wodle,system_audit,suspicious_network,</group>
</rule>

Webhook Receivers

A webhook receiver accepts events from external systems and converts them into Wazuh alerts.

Webhook Receiver Architecture

External System ──▶ Webhook Receiver ──▶ Log File ──▶ Wazuh Agent ──▶ Rules
                    (Python/Flask)       (/var/log/)   (localfile)

Example: Flask Receiver for GitHub Events

#!/usr/bin/env python3
"""Webhook receiver for GitHub security events."""
import json
import hmac
import hashlib
import logging
from pathlib import Path

from flask import Flask, request, abort

app = Flask(__name__)

WEBHOOK_SECRET = "YOUR_GITHUB_WEBHOOK_SECRET"
LOG_FILE = Path("/var/log/github-security.log")

logging.basicConfig(level=logging.INFO)

def verify_signature(payload: bytes, signature: str) -> bool:
    """Verify the GitHub webhook signature."""
    expected = "sha256=" + hmac.new(
        WEBHOOK_SECRET.encode(),
        payload,
        hashlib.sha256,
    ).hexdigest()
    return hmac.compare_digest(expected, signature)

@app.route("/webhook/github", methods=["POST"])
def github_webhook():
    """Handle GitHub webhook events."""
    signature = request.headers.get("X-Hub-Signature-256", "")
    if not verify_signature(request.data, signature):
        abort(403)

    event_type = request.headers.get("X-GitHub-Event", "unknown")
    payload = request.get_json()

    if event_type == "security_advisory":
        log_entry = {
            "source": "github",
            "event": "security_advisory",
            "severity": payload.get("security_advisory", {}).get("severity"),
            "summary": payload.get("security_advisory", {}).get("summary"),
            "cve_id": payload.get("security_advisory", {}).get("cve_id"),
            "repository": payload.get("repository", {}).get("full_name"),
        }
        with open(LOG_FILE, "a") as f:
            f.write(json.dumps(log_entry) + "\n")

    elif event_type == "repository_vulnerability_alert":
        log_entry = {
            "source": "github",
            "event": "vulnerability_alert",
            "package": payload.get("alert", {}).get("affected_package_name"),
            "severity": payload.get("alert", {}).get("severity"),
            "repository": payload.get("repository", {}).get("full_name"),
        }
        with open(LOG_FILE, "a") as f:
            f.write(json.dumps(log_entry) + "\n")

    return "", 200

if __name__ == "__main__":
    app.run(host="0.0.0.0", port=8080)

Wazuh Configuration for Webhook Logs

On the agent running the webhook receiver:

<localfile>
  <log_format>json</log_format>
  <location>/var/log/github-security.log</location>
</localfile>

Rules for Webhook Events

<rule id="100310" level="0">
  <decoded_as>json</decoded_as>
  <field name="source">github</field>
  <description>GitHub webhook event received</description>
  <group>github,webhook,</group>
</rule>

<rule id="100311" level="10">
  <if_sid>100310</if_sid>
  <field name="event">security_advisory</field>
  <field name="severity">critical</field>
  <description>GitHub critical security advisory: $(summary)</description>
  <mitre>
    <id>T1190</id>
  </mitre>
  <group>github,vulnerability,critical,</group>
</rule>

<rule id="100312" level="7">
  <if_sid>100310</if_sid>
  <field name="event">vulnerability_alert</field>
  <description>GitHub vulnerability alert for package $(package) in $(repository)</description>
  <group>github,vulnerability,</group>
</rule>

Alert Processing with Python

For batch alert processing, use the API or read the alerts file directly.

Reading Alerts from the JSON Log

#!/usr/bin/env python3
"""Batch alert processing from Wazuh alerts.json."""
import json
from pathlib import Path
from collections import Counter
from typing import Iterator

ALERTS_FILE = Path("/var/ossec/logs/alerts/alerts.json")

def read_alerts(filepath: Path, max_lines: int = 10000) -> Iterator[dict]:
    """Read alerts from the JSON log file."""
    with open(filepath) as f:
        for i, line in enumerate(f):
            if i >= max_lines:
                break
            try:
                yield json.loads(line.strip())
            except json.JSONDecodeError:
                continue

def analyze_alerts(alerts: Iterator[dict]) -> dict:
    """Analyze alert distribution."""
    level_counter = Counter()
    rule_counter = Counter()
    agent_counter = Counter()
    mitre_counter = Counter()

    for alert in alerts:
        rule = alert.get("rule", {})
        level_counter[rule.get("level", 0)] += 1
        rule_counter[rule.get("id", "unknown")] += 1
        agent_counter[alert.get("agent", {}).get("name", "unknown")] += 1

        for technique in rule.get("mitre", {}).get("id", []):
            mitre_counter[technique] += 1

    return {
        "by_level": dict(level_counter.most_common(10)),
        "top_rules": dict(rule_counter.most_common(10)),
        "top_agents": dict(agent_counter.most_common(10)),
        "top_mitre": dict(mitre_counter.most_common(10)),
    }

if __name__ == "__main__":
    alerts = read_alerts(ALERTS_FILE)
    report = analyze_alerts(alerts)
    print(json.dumps(report, indent=2))

Streaming via the API

#!/usr/bin/env python3
"""Real-time alert monitoring via the Wazuh API."""
import time
import json
import requests
import urllib3

urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)

WAZUH_API = "https://localhost:55000"
CREDENTIALS = ("wazuh-wui", "YOUR_PASSWORD")
POLL_INTERVAL = 30

def get_token() -> str:
    """Obtain a JWT token."""
    response = requests.post(
        f"{WAZUH_API}/security/user/authenticate?raw=true",
        auth=CREDENTIALS,
        verify=False,
    )
    response.raise_for_status()
    return response.text

def get_recent_alerts(token: str, minutes: int = 5) -> list[dict]:
    """Fetch alerts from the last N minutes via the indexer."""
    headers = {"Authorization": f"Bearer {token}"}
    response = requests.get(
        f"{WAZUH_API}/manager/logs",
        headers=headers,
        params={
            "limit": 100,
            "sort": "-timestamp",
            "tag": "ossec-analysisd",
        },
        verify=False,
    )
    response.raise_for_status()
    return response.json().get("data", {}).get("affected_items", [])

def process_alert(alert: dict) -> None:
    """Process a single alert."""
    level = alert.get("level", "")
    description = alert.get("description", "")
    timestamp = alert.get("timestamp", "")
    print(f"[{timestamp}] Level {level}: {description}")

def main():
    token = get_token()
    while True:
        try:
            alerts = get_recent_alerts(token)
            for alert in alerts:
                process_alert(alert)
        except requests.exceptions.HTTPError as e:
            if e.response.status_code == 401:
                token = get_token()
                continue
            raise
        time.sleep(POLL_INTERVAL)

if __name__ == "__main__":
    main()

Troubleshooting

Integration Script Does Not Run

  1. Check file permissions and ownership:
ls -la /var/ossec/integrations/custom-*
# Expected: -rwxr-x--- root wazuh
  1. Verify the filename starts with custom-:
# Correct: custom-myintegration
# Wrong: myintegration, integration-custom
  1. Review integratord logs:
tail -f /var/ossec/logs/integrations.log

Active Response Script Does Not Execute

  1. Confirm the script exists on the agent:
ls -la /var/ossec/active-response/bin/custom-block
  1. Verify the command is registered in ossec.conf:
grep -A3 "custom-block" /var/ossec/etc/ossec.conf
  1. Check Active Response logs:
tail -f /var/ossec/logs/active-responses.log
  1. Test the script manually:
echo '{"version":1,"command":"add","parameters":{"alert":{"data":{"srcip":"10.0.0.1"}}}}' \
  | /var/ossec/active-response/bin/custom-block

Wodle Not Generating Events

  1. Verify the wodle is enabled (<disabled>no</disabled>)
  2. Confirm the wodle script writes output to stdout
  3. Check that the script has execute permissions
  4. Review manager logs:
grep "wodle" /var/ossec/logs/ossec.log

Webhook Receiver Not Creating Alerts

  1. Verify the log file is being populated:
tail -f /var/log/github-security.log
  1. Confirm the <localfile> entry on the agent points to the correct file
  2. Verify the log format matches the <log_format> setting (json/syslog)
  3. Test decoding through wazuh-logtest:
/var/ossec/bin/wazuh-logtest

Enter a line from the log file and verify that the decoder and rule fire correctly.

For built-in integration configuration, see the SIEM Integrations section. The API reference for programmatic access is available in the REST API Reference .

Last updated on