Wazuh пользовательские интеграции и скрипты

Wazuh 4.14 предоставляет несколько механизмов расширения функциональности: пользовательские интеграции через integratord, скрипты Active Response для автоматического реагирования, wodle-модули для расширения агентов и webhook-приемники для обработки внешних событий. Каждый механизм подходит для определенных сценариев и имеет свои требования к реализации.

Архитектура пользовательских интеграций

┌─────────────────────────────────────────────────────┐
│                  Wazuh Manager                      │
│                                                     │
│  Alert  ──▶ integratord ──▶ Custom Script ──▶ API   │
│                                                     │
│  Rule   ──▶ Active Response ──▶ Agent Script        │
│                                                     │
│  Wodle  ──▶ Custom Module ──▶ Log Output ──▶ Rules  │
│                                                     │
│  API    ◀── Webhook Receiver ◀── External System    │
└─────────────────────────────────────────────────────┘

Integratord - формат входных данных

Демон wazuh-integratord вызывает скрипт интеграции при каждом совпавшем алерте, передавая четыре аргумента:

АргументОписание
sys.argv[1]Путь к файлу с JSON-алертом
sys.argv[2]API-ключ (из <api_key> в ossec.conf)
sys.argv[3]Hook URL (из <hook_url> в ossec.conf)
sys.argv[4]Путь к файлу с полным алертом

Структура JSON-алерта

Файл алерта содержит JSON следующей структуры:

{
  "timestamp": "2024-01-15T10:30:45.123+0000",
  "rule": {
    "level": 10,
    "description": "sshd: authentication failure",
    "id": "5710",
    "mitre": {
      "id": ["T1110"],
      "tactic": ["Credential Access"],
      "technique": ["Brute Force"]
    },
    "groups": ["syslog", "sshd", "authentication_failed"],
    "pci_dss": ["10.2.4", "10.2.5"],
    "gdpr": ["IV_35.7.d", "IV_32.2"]
  },
  "agent": {
    "id": "001",
    "name": "web-server-01",
    "ip": "192.168.1.10"
  },
  "manager": {
    "name": "wazuh-manager"
  },
  "data": {
    "srcip": "10.0.0.50",
    "srcport": "54321",
    "dstuser": "root"
  },
  "full_log": "Jan 15 10:30:45 web-server-01 sshd[12345]: Failed password for root from 10.0.0.50 port 54321 ssh2",
  "decoder": {
    "name": "sshd"
  },
  "location": "/var/log/auth.log"
}

Шаблон скрипта интеграции

#!/usr/bin/env python3
"""Template for a custom Wazuh integration script."""
import sys
import json
import logging
from pathlib import Path

LOG_FILE = "/var/ossec/logs/integrations.log"
logging.basicConfig(
    filename=LOG_FILE,
    level=logging.INFO,
    format="%(asctime)s %(levelname)s [custom-integration] %(message)s",
)

def process_alert(alert: dict, api_key: str, hook_url: str) -> None:
    """Process a Wazuh alert and forward to external system."""
    rule = alert.get("rule", {})
    agent = alert.get("agent", {})

    logging.info(
        "Processing alert: rule=%s level=%s agent=%s",
        rule.get("id"),
        rule.get("level"),
        agent.get("name"),
    )

    # Custom processing logic goes here
    # Example: forward to an HTTP endpoint
    payload = {
        "source": "wazuh",
        "alert_id": rule.get("id"),
        "level": rule.get("level"),
        "description": rule.get("description"),
        "agent_name": agent.get("name"),
        "agent_ip": agent.get("ip"),
        "timestamp": alert.get("timestamp"),
        "mitre": rule.get("mitre", {}),
    }

    # Send payload to external system
    # requests.post(hook_url, json=payload, headers={"Authorization": f"Bearer {api_key}"})

def main():
    if len(sys.argv) < 4:
        logging.error("Insufficient arguments: expected 4, got %d", len(sys.argv) - 1)
        sys.exit(1)

    alert_file = sys.argv[1]
    api_key = sys.argv[2]
    hook_url = sys.argv[3]

    try:
        alert = json.loads(Path(alert_file).read_text())
        process_alert(alert, api_key, hook_url)
    except json.JSONDecodeError as e:
        logging.error("Failed to parse alert JSON: %s", e)
        sys.exit(1)
    except Exception as e:
        logging.error("Integration error: %s", e)
        sys.exit(1)

if __name__ == "__main__":
    main()

Установка и регистрация

# Скопируйте скрипт в каталог интеграций
cp custom-myintegration /var/ossec/integrations/
chmod 750 /var/ossec/integrations/custom-myintegration
chown root:wazuh /var/ossec/integrations/custom-myintegration

# Добавьте конфигурацию в ossec.conf
# <integration>
#   <name>custom-myintegration</name>
#   <hook_url>https://api.example.com/alerts</hook_url>
#   <api_key>YOUR_API_KEY</api_key>
#   <level>7</level>
#   <alert_format>json</alert_format>
# </integration>

# Перезапустите менеджер
/var/ossec/bin/wazuh-control restart

Важные требования к скриптам интеграции:

  • Имя файла должно начинаться с custom-
  • Файл должен быть исполняемым (chmod 750)
  • Владелец: root:wazuh
  • Скрипт должен завершаться за разумное время (рекомендуется timeout < 10 секунд)
  • Логирование - в /var/ossec/logs/integrations.log

Active Response - скрипты реагирования

Active Response позволяет автоматически выполнять действия на агентах при срабатывании правил. Скрипты располагаются в /var/ossec/active-response/bin/ на агенте.

Формат входных данных Active Response

Начиная с Wazuh 4.2, Active Response скрипты получают данные через stdin в формате JSON:

{
  "version": 1,
  "origin": {
    "name": "node01",
    "module": "wazuh-execd"
  },
  "command": "add",
  "parameters": {
    "extra_args": [],
    "alert": {
      "timestamp": "2024-01-15T10:30:45.123+0000",
      "rule": {
        "level": 10,
        "id": "5710",
        "description": "sshd: authentication failure"
      },
      "agent": {
        "id": "001",
        "name": "web-server-01"
      },
      "data": {
        "srcip": "10.0.0.50"
      }
    }
  }
}

Поле command принимает значения:

  • add - выполнить действие (например, заблокировать IP)
  • delete - отменить действие (например, разблокировать IP после timeout)

Скрипт Active Response на Python

#!/usr/bin/env python3
"""Custom Active Response script for Wazuh - IP blocking via API."""
import sys
import json
import logging
import datetime
import requests

LOG_FILE = "/var/ossec/logs/active-responses.log"
logging.basicConfig(
    filename=LOG_FILE,
    level=logging.INFO,
    format="%(asctime)s %(levelname)s [custom-ar] %(message)s",
)

FIREWALL_API = "https://firewall.example.com/api"
FIREWALL_TOKEN = "YOUR_TOKEN"

def block_ip(ip_address: str) -> bool:
    """Block an IP address via firewall API."""
    payload = {
        "action": "block",
        "source": ip_address,
        "duration": 3600,
        "reason": "Wazuh Active Response - automated block",
    }
    try:
        response = requests.post(
            f"{FIREWALL_API}/rules",
            json=payload,
            headers={"Authorization": f"Bearer {FIREWALL_TOKEN}"},
            timeout=10,
        )
        response.raise_for_status()
        logging.info("Blocked IP %s via firewall API", ip_address)
        return True
    except requests.RequestException as e:
        logging.error("Failed to block IP %s: %s", ip_address, e)
        return False

def unblock_ip(ip_address: str) -> bool:
    """Unblock an IP address via firewall API."""
    try:
        response = requests.delete(
            f"{FIREWALL_API}/rules/{ip_address}",
            headers={"Authorization": f"Bearer {FIREWALL_TOKEN}"},
            timeout=10,
        )
        response.raise_for_status()
        logging.info("Unblocked IP %s via firewall API", ip_address)
        return True
    except requests.RequestException as e:
        logging.error("Failed to unblock IP %s: %s", ip_address, e)
        return False

def main():
    input_data = json.loads(sys.stdin.read())
    command = input_data.get("command")
    alert = input_data.get("parameters", {}).get("alert", {})
    src_ip = alert.get("data", {}).get("srcip")

    if not src_ip:
        logging.warning("No source IP in alert, skipping")
        return

    if command == "add":
        block_ip(src_ip)
    elif command == "delete":
        unblock_ip(src_ip)
    else:
        logging.warning("Unknown command: %s", command)

if __name__ == "__main__":
    main()

Скрипт Active Response на Bash

#!/bin/bash
# Custom Active Response script - block IP via iptables with logging

LOG_FILE="/var/ossec/logs/active-responses.log"
LOCK_FILE="/var/ossec/var/run/custom-block.lock"

log() {
    echo "$(date '+%Y-%m-%d %H:%M:%S') $1 [custom-block] $2" >> "${LOG_FILE}"
}

# Read JSON from stdin
read -r INPUT
COMMAND=$(echo "${INPUT}" | jq -r '.command')
SRCIP=$(echo "${INPUT}" | jq -r '.parameters.alert.data.srcip // empty')

if [ -z "${SRCIP}" ]; then
    log "WARNING" "No source IP in alert"
    exit 0
fi

# Validate IP address format
if ! echo "${SRCIP}" | grep -qP '^\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}$'; then
    log "ERROR" "Invalid IP format: ${SRCIP}"
    exit 1
fi

case "${COMMAND}" in
    add)
        if ! iptables -C INPUT -s "${SRCIP}" -j DROP 2>/dev/null; then
            iptables -I INPUT -s "${SRCIP}" -j DROP
            log "INFO" "Blocked IP: ${SRCIP}"
        else
            log "INFO" "IP already blocked: ${SRCIP}"
        fi
        ;;
    delete)
        if iptables -C INPUT -s "${SRCIP}" -j DROP 2>/dev/null; then
            iptables -D INPUT -s "${SRCIP}" -j DROP
            log "INFO" "Unblocked IP: ${SRCIP}"
        else
            log "INFO" "IP not in blocklist: ${SRCIP}"
        fi
        ;;
    *)
        log "WARNING" "Unknown command: ${COMMAND}"
        ;;
esac

exit 0

Регистрация Active Response

Для активации скрипта необходимо зарегистрировать команду и привязку в ossec.conf на менеджере:

<!-- Определение команды -->
<command>
  <name>custom-block</name>
  <executable>custom-block</executable>
  <timeout_allowed>yes</timeout_allowed>
</command>

<!-- Привязка к правилам -->
<active-response>
  <command>custom-block</command>
  <location>local</location>
  <rules_id>5710,5712</rules_id>
  <timeout>3600</timeout>
</active-response>

Параметры <active-response>:

ПараметрОписание
<location>local (на агенте), server (на менеджере), defined-agent (конкретный агент), all (все агенты)
<rules_id>Список ID правил, при срабатывании которых запускается скрипт
<rules_group>Группа правил для триггера
<level>Минимальный уровень алерта
<timeout>Время в секундах до автоматической отмены действия (команда delete)

Подробнее о модуле Active Response читайте в разделе Возможности Wazuh .

Пользовательские Wodle-модули

Wodle (Wazuh Module) - механизм расширения агента для выполнения произвольных задач по расписанию. Результаты работы wodle записываются в лог, который анализируется движком правил.

Wodle command

Простейший способ создания пользовательского wodle - использование встроенного wodle command:

<wodle name="command">
  <disabled>no</disabled>
  <tag>system-audit</tag>
  <command>/var/ossec/wodles/custom-audit.sh</command>
  <interval>1h</interval>
  <ignore_output>no</ignore_output>
  <run_on_start>yes</run_on_start>
  <timeout>120</timeout>
</wodle>

Пример wodle-скрипта: аудит привилегированных процессов

#!/bin/bash
# Custom wodle: audit processes running as root with network connections

TIMESTAMP=$(date -u '+%Y-%m-%dT%H:%M:%S.000Z')

# Find root processes with established connections
ss -tnp | grep ESTAB | while IFS= read -r line; do
    PID=$(echo "${line}" | grep -oP 'pid=\K\d+')
    if [ -n "${PID}" ]; then
        PROC_USER=$(ps -o user= -p "${PID}" 2>/dev/null)
        PROC_NAME=$(ps -o comm= -p "${PID}" 2>/dev/null)
        if [ "${PROC_USER}" = "root" ]; then
            LOCAL=$(echo "${line}" | awk '{print $4}')
            REMOTE=$(echo "${line}" | awk '{print $5}')
            echo "{\"timestamp\":\"${TIMESTAMP}\",\"wodle\":\"system-audit\",\"process\":\"${PROC_NAME}\",\"pid\":${PID},\"user\":\"root\",\"local_addr\":\"${LOCAL}\",\"remote_addr\":\"${REMOTE}\"}"
        fi
    fi
done

Правила для обработки wodle-вывода

<rule id="100300" level="0">
  <decoded_as>json</decoded_as>
  <field name="wodle">system-audit</field>
  <description>Custom wodle: system audit event</description>
  <group>wodle,system_audit,</group>
</rule>

<rule id="100301" level="8">
  <if_sid>100300</if_sid>
  <field name="user">root</field>
  <field name="remote_addr" negate="yes">127.0.0.1</field>
  <description>Root process $(process) has external network connection to $(remote_addr)</description>
  <mitre>
    <id>T1071</id>
  </mitre>
  <group>wodle,system_audit,suspicious_network,</group>
</rule>

Webhook-приемники

Webhook-приемник позволяет принимать события от внешних систем и преобразовывать их в алерты Wazuh.

Архитектура webhook-приемника

External System ──▶ Webhook Receiver ──▶ Log File ──▶ Wazuh Agent ──▶ Rules
                    (Python/Flask)       (/var/log/)   (localfile)

Пример: Flask-приемник для GitHub Events

#!/usr/bin/env python3
"""Webhook receiver for GitHub security events."""
import json
import hmac
import hashlib
import logging
from pathlib import Path

from flask import Flask, request, abort

app = Flask(__name__)

WEBHOOK_SECRET = "YOUR_GITHUB_WEBHOOK_SECRET"
LOG_FILE = Path("/var/log/github-security.log")

logging.basicConfig(level=logging.INFO)

def verify_signature(payload: bytes, signature: str) -> bool:
    """Verify the GitHub webhook signature."""
    expected = "sha256=" + hmac.new(
        WEBHOOK_SECRET.encode(),
        payload,
        hashlib.sha256,
    ).hexdigest()
    return hmac.compare_digest(expected, signature)

@app.route("/webhook/github", methods=["POST"])
def github_webhook():
    """Handle GitHub webhook events."""
    signature = request.headers.get("X-Hub-Signature-256", "")
    if not verify_signature(request.data, signature):
        abort(403)

    event_type = request.headers.get("X-GitHub-Event", "unknown")
    payload = request.get_json()

    if event_type == "security_advisory":
        log_entry = {
            "source": "github",
            "event": "security_advisory",
            "severity": payload.get("security_advisory", {}).get("severity"),
            "summary": payload.get("security_advisory", {}).get("summary"),
            "cve_id": payload.get("security_advisory", {}).get("cve_id"),
            "repository": payload.get("repository", {}).get("full_name"),
        }
        with open(LOG_FILE, "a") as f:
            f.write(json.dumps(log_entry) + "\n")

    elif event_type == "repository_vulnerability_alert":
        log_entry = {
            "source": "github",
            "event": "vulnerability_alert",
            "package": payload.get("alert", {}).get("affected_package_name"),
            "severity": payload.get("alert", {}).get("severity"),
            "repository": payload.get("repository", {}).get("full_name"),
        }
        with open(LOG_FILE, "a") as f:
            f.write(json.dumps(log_entry) + "\n")

    return "", 200

if __name__ == "__main__":
    app.run(host="0.0.0.0", port=8080)

Конфигурация Wazuh для чтения вебхук-логов

На агенте, где работает webhook-приемник:

<localfile>
  <log_format>json</log_format>
  <location>/var/log/github-security.log</location>
</localfile>

Правила для обработки вебхук-событий

<rule id="100310" level="0">
  <decoded_as>json</decoded_as>
  <field name="source">github</field>
  <description>GitHub webhook event received</description>
  <group>github,webhook,</group>
</rule>

<rule id="100311" level="10">
  <if_sid>100310</if_sid>
  <field name="event">security_advisory</field>
  <field name="severity">critical</field>
  <description>GitHub critical security advisory: $(summary)</description>
  <mitre>
    <id>T1190</id>
  </mitre>
  <group>github,vulnerability,critical,</group>
</rule>

<rule id="100312" level="7">
  <if_sid>100310</if_sid>
  <field name="event">vulnerability_alert</field>
  <description>GitHub vulnerability alert for package $(package) in $(repository)</description>
  <group>github,vulnerability,</group>
</rule>

Обработка алертов через Python

Для пакетной обработки алертов Wazuh можно использовать API или прямое чтение файла алертов.

Чтение алертов из JSON-файла

#!/usr/bin/env python3
"""Batch alert processing from Wazuh alerts.json."""
import json
from pathlib import Path
from collections import Counter
from typing import Iterator

ALERTS_FILE = Path("/var/ossec/logs/alerts/alerts.json")

def read_alerts(filepath: Path, max_lines: int = 10000) -> Iterator[dict]:
    """Read alerts from the JSON log file."""
    with open(filepath) as f:
        for i, line in enumerate(f):
            if i >= max_lines:
                break
            try:
                yield json.loads(line.strip())
            except json.JSONDecodeError:
                continue

def analyze_alerts(alerts: Iterator[dict]) -> dict:
    """Analyze alert distribution."""
    level_counter = Counter()
    rule_counter = Counter()
    agent_counter = Counter()
    mitre_counter = Counter()

    for alert in alerts:
        rule = alert.get("rule", {})
        level_counter[rule.get("level", 0)] += 1
        rule_counter[rule.get("id", "unknown")] += 1
        agent_counter[alert.get("agent", {}).get("name", "unknown")] += 1

        for technique in rule.get("mitre", {}).get("id", []):
            mitre_counter[technique] += 1

    return {
        "by_level": dict(level_counter.most_common(10)),
        "top_rules": dict(rule_counter.most_common(10)),
        "top_agents": dict(agent_counter.most_common(10)),
        "top_mitre": dict(mitre_counter.most_common(10)),
    }

if __name__ == "__main__":
    alerts = read_alerts(ALERTS_FILE)
    report = analyze_alerts(alerts)
    print(json.dumps(report, indent=2))

Потоковая обработка через API

#!/usr/bin/env python3
"""Real-time alert monitoring via Wazuh API."""
import time
import json
import requests
import urllib3

urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)

WAZUH_API = "https://localhost:55000"
CREDENTIALS = ("wazuh-wui", "YOUR_PASSWORD")
POLL_INTERVAL = 30

def get_token() -> str:
    """Obtain a JWT token."""
    response = requests.post(
        f"{WAZUH_API}/security/user/authenticate?raw=true",
        auth=CREDENTIALS,
        verify=False,
    )
    response.raise_for_status()
    return response.text

def get_recent_alerts(token: str, minutes: int = 5) -> list[dict]:
    """Fetch alerts from the last N minutes via the indexer."""
    headers = {"Authorization": f"Bearer {token}"}
    response = requests.get(
        f"{WAZUH_API}/manager/logs",
        headers=headers,
        params={
            "limit": 100,
            "sort": "-timestamp",
            "tag": "ossec-analysisd",
        },
        verify=False,
    )
    response.raise_for_status()
    return response.json().get("data", {}).get("affected_items", [])

def process_alert(alert: dict) -> None:
    """Process a single alert."""
    level = alert.get("level", "")
    description = alert.get("description", "")
    timestamp = alert.get("timestamp", "")
    print(f"[{timestamp}] Level {level}: {description}")

def main():
    token = get_token()
    while True:
        try:
            alerts = get_recent_alerts(token)
            for alert in alerts:
                process_alert(alert)
        except requests.exceptions.HTTPError as e:
            if e.response.status_code == 401:
                token = get_token()
                continue
            raise
        time.sleep(POLL_INTERVAL)

if __name__ == "__main__":
    main()

Устранение неполадок

Скрипт интеграции не запускается

  1. Проверьте права и владельца файла:
ls -la /var/ossec/integrations/custom-*
# Ожидается: -rwxr-x--- root wazuh
  1. Убедитесь, что имя файла начинается с custom-:
# Правильно: custom-myintegration
# Неправильно: myintegration, integration-custom
  1. Проверьте логи integratord:
tail -f /var/ossec/logs/integrations.log

Active Response скрипт не выполняется

  1. Проверьте наличие скрипта на агенте:
ls -la /var/ossec/active-response/bin/custom-block
  1. Убедитесь, что команда зарегистрирована в ossec.conf:
grep -A3 "custom-block" /var/ossec/etc/ossec.conf
  1. Проверьте логи Active Response:
tail -f /var/ossec/logs/active-responses.log
  1. Протестируйте скрипт вручную:
echo '{"version":1,"command":"add","parameters":{"alert":{"data":{"srcip":"10.0.0.1"}}}}' \
  | /var/ossec/active-response/bin/custom-block

Wodle не генерирует события

  1. Проверьте, что wodle включен (<disabled>no</disabled>)
  2. Убедитесь, что скрипт wodle выводит данные в stdout
  3. Проверьте права на исполнение скрипта
  4. Просмотрите логи менеджера:
grep "wodle" /var/ossec/logs/ossec.log

Webhook-приемник не создает алерты

  1. Проверьте, что лог-файл заполняется данными:
tail -f /var/log/github-security.log
  1. Убедитесь, что <localfile> настроен на агенте и указывает на правильный файл
  2. Проверьте, что формат лога соответствует указанному в <log_format> (json/syslog)
  3. Протестируйте декодирование через wazuh-logtest:
/var/ossec/bin/wazuh-logtest

Введите строку из лог-файла и проверьте, что декодер и правило срабатывают корректно.

Подробнее о настройке встроенных интеграций читайте в разделе SIEM-интеграции . Справочник API для программного взаимодействия доступен в разделе REST API .

Last updated on