Wazuh пользовательские интеграции и скрипты
Wazuh 4.14 предоставляет несколько механизмов расширения функциональности: пользовательские интеграции через integratord, скрипты Active Response для автоматического реагирования, wodle-модули для расширения агентов и webhook-приемники для обработки внешних событий. Каждый механизм подходит для определенных сценариев и имеет свои требования к реализации.
Архитектура пользовательских интеграций
┌─────────────────────────────────────────────────────┐
│ Wazuh Manager │
│ │
│ Alert ──▶ integratord ──▶ Custom Script ──▶ API │
│ │
│ Rule ──▶ Active Response ──▶ Agent Script │
│ │
│ Wodle ──▶ Custom Module ──▶ Log Output ──▶ Rules │
│ │
│ API ◀── Webhook Receiver ◀── External System │
└─────────────────────────────────────────────────────┘Integratord - формат входных данных
Демон wazuh-integratord вызывает скрипт интеграции при каждом совпавшем алерте, передавая четыре аргумента:
| Аргумент | Описание |
|---|---|
sys.argv[1] | Путь к файлу с JSON-алертом |
sys.argv[2] | API-ключ (из <api_key> в ossec.conf) |
sys.argv[3] | Hook URL (из <hook_url> в ossec.conf) |
sys.argv[4] | Путь к файлу с полным алертом |
Структура JSON-алерта
Файл алерта содержит JSON следующей структуры:
{
"timestamp": "2024-01-15T10:30:45.123+0000",
"rule": {
"level": 10,
"description": "sshd: authentication failure",
"id": "5710",
"mitre": {
"id": ["T1110"],
"tactic": ["Credential Access"],
"technique": ["Brute Force"]
},
"groups": ["syslog", "sshd", "authentication_failed"],
"pci_dss": ["10.2.4", "10.2.5"],
"gdpr": ["IV_35.7.d", "IV_32.2"]
},
"agent": {
"id": "001",
"name": "web-server-01",
"ip": "192.168.1.10"
},
"manager": {
"name": "wazuh-manager"
},
"data": {
"srcip": "10.0.0.50",
"srcport": "54321",
"dstuser": "root"
},
"full_log": "Jan 15 10:30:45 web-server-01 sshd[12345]: Failed password for root from 10.0.0.50 port 54321 ssh2",
"decoder": {
"name": "sshd"
},
"location": "/var/log/auth.log"
}Шаблон скрипта интеграции
#!/usr/bin/env python3
"""Template for a custom Wazuh integration script."""
import sys
import json
import logging
from pathlib import Path
LOG_FILE = "/var/ossec/logs/integrations.log"
logging.basicConfig(
filename=LOG_FILE,
level=logging.INFO,
format="%(asctime)s %(levelname)s [custom-integration] %(message)s",
)
def process_alert(alert: dict, api_key: str, hook_url: str) -> None:
"""Process a Wazuh alert and forward to external system."""
rule = alert.get("rule", {})
agent = alert.get("agent", {})
logging.info(
"Processing alert: rule=%s level=%s agent=%s",
rule.get("id"),
rule.get("level"),
agent.get("name"),
)
# Custom processing logic goes here
# Example: forward to an HTTP endpoint
payload = {
"source": "wazuh",
"alert_id": rule.get("id"),
"level": rule.get("level"),
"description": rule.get("description"),
"agent_name": agent.get("name"),
"agent_ip": agent.get("ip"),
"timestamp": alert.get("timestamp"),
"mitre": rule.get("mitre", {}),
}
# Send payload to external system
# requests.post(hook_url, json=payload, headers={"Authorization": f"Bearer {api_key}"})
def main():
if len(sys.argv) < 4:
logging.error("Insufficient arguments: expected 4, got %d", len(sys.argv) - 1)
sys.exit(1)
alert_file = sys.argv[1]
api_key = sys.argv[2]
hook_url = sys.argv[3]
try:
alert = json.loads(Path(alert_file).read_text())
process_alert(alert, api_key, hook_url)
except json.JSONDecodeError as e:
logging.error("Failed to parse alert JSON: %s", e)
sys.exit(1)
except Exception as e:
logging.error("Integration error: %s", e)
sys.exit(1)
if __name__ == "__main__":
main()Установка и регистрация
# Скопируйте скрипт в каталог интеграций
cp custom-myintegration /var/ossec/integrations/
chmod 750 /var/ossec/integrations/custom-myintegration
chown root:wazuh /var/ossec/integrations/custom-myintegration
# Добавьте конфигурацию в ossec.conf
# <integration>
# <name>custom-myintegration</name>
# <hook_url>https://api.example.com/alerts</hook_url>
# <api_key>YOUR_API_KEY</api_key>
# <level>7</level>
# <alert_format>json</alert_format>
# </integration>
# Перезапустите менеджер
/var/ossec/bin/wazuh-control restartВажные требования к скриптам интеграции:
- Имя файла должно начинаться с
custom- - Файл должен быть исполняемым (
chmod 750) - Владелец:
root:wazuh - Скрипт должен завершаться за разумное время (рекомендуется timeout < 10 секунд)
- Логирование - в
/var/ossec/logs/integrations.log
Active Response - скрипты реагирования
Active Response позволяет автоматически выполнять действия на агентах при срабатывании правил. Скрипты располагаются в /var/ossec/active-response/bin/ на агенте.
Формат входных данных Active Response
Начиная с Wazuh 4.2, Active Response скрипты получают данные через stdin в формате JSON:
{
"version": 1,
"origin": {
"name": "node01",
"module": "wazuh-execd"
},
"command": "add",
"parameters": {
"extra_args": [],
"alert": {
"timestamp": "2024-01-15T10:30:45.123+0000",
"rule": {
"level": 10,
"id": "5710",
"description": "sshd: authentication failure"
},
"agent": {
"id": "001",
"name": "web-server-01"
},
"data": {
"srcip": "10.0.0.50"
}
}
}
}Поле command принимает значения:
add- выполнить действие (например, заблокировать IP)delete- отменить действие (например, разблокировать IP после timeout)
Скрипт Active Response на Python
#!/usr/bin/env python3
"""Custom Active Response script for Wazuh - IP blocking via API."""
import sys
import json
import logging
import datetime
import requests
LOG_FILE = "/var/ossec/logs/active-responses.log"
logging.basicConfig(
filename=LOG_FILE,
level=logging.INFO,
format="%(asctime)s %(levelname)s [custom-ar] %(message)s",
)
FIREWALL_API = "https://firewall.example.com/api"
FIREWALL_TOKEN = "YOUR_TOKEN"
def block_ip(ip_address: str) -> bool:
"""Block an IP address via firewall API."""
payload = {
"action": "block",
"source": ip_address,
"duration": 3600,
"reason": "Wazuh Active Response - automated block",
}
try:
response = requests.post(
f"{FIREWALL_API}/rules",
json=payload,
headers={"Authorization": f"Bearer {FIREWALL_TOKEN}"},
timeout=10,
)
response.raise_for_status()
logging.info("Blocked IP %s via firewall API", ip_address)
return True
except requests.RequestException as e:
logging.error("Failed to block IP %s: %s", ip_address, e)
return False
def unblock_ip(ip_address: str) -> bool:
"""Unblock an IP address via firewall API."""
try:
response = requests.delete(
f"{FIREWALL_API}/rules/{ip_address}",
headers={"Authorization": f"Bearer {FIREWALL_TOKEN}"},
timeout=10,
)
response.raise_for_status()
logging.info("Unblocked IP %s via firewall API", ip_address)
return True
except requests.RequestException as e:
logging.error("Failed to unblock IP %s: %s", ip_address, e)
return False
def main():
input_data = json.loads(sys.stdin.read())
command = input_data.get("command")
alert = input_data.get("parameters", {}).get("alert", {})
src_ip = alert.get("data", {}).get("srcip")
if not src_ip:
logging.warning("No source IP in alert, skipping")
return
if command == "add":
block_ip(src_ip)
elif command == "delete":
unblock_ip(src_ip)
else:
logging.warning("Unknown command: %s", command)
if __name__ == "__main__":
main()Скрипт Active Response на Bash
#!/bin/bash
# Custom Active Response script - block IP via iptables with logging
LOG_FILE="/var/ossec/logs/active-responses.log"
LOCK_FILE="/var/ossec/var/run/custom-block.lock"
log() {
echo "$(date '+%Y-%m-%d %H:%M:%S') $1 [custom-block] $2" >> "${LOG_FILE}"
}
# Read JSON from stdin
read -r INPUT
COMMAND=$(echo "${INPUT}" | jq -r '.command')
SRCIP=$(echo "${INPUT}" | jq -r '.parameters.alert.data.srcip // empty')
if [ -z "${SRCIP}" ]; then
log "WARNING" "No source IP in alert"
exit 0
fi
# Validate IP address format
if ! echo "${SRCIP}" | grep -qP '^\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}$'; then
log "ERROR" "Invalid IP format: ${SRCIP}"
exit 1
fi
case "${COMMAND}" in
add)
if ! iptables -C INPUT -s "${SRCIP}" -j DROP 2>/dev/null; then
iptables -I INPUT -s "${SRCIP}" -j DROP
log "INFO" "Blocked IP: ${SRCIP}"
else
log "INFO" "IP already blocked: ${SRCIP}"
fi
;;
delete)
if iptables -C INPUT -s "${SRCIP}" -j DROP 2>/dev/null; then
iptables -D INPUT -s "${SRCIP}" -j DROP
log "INFO" "Unblocked IP: ${SRCIP}"
else
log "INFO" "IP not in blocklist: ${SRCIP}"
fi
;;
*)
log "WARNING" "Unknown command: ${COMMAND}"
;;
esac
exit 0Регистрация Active Response
Для активации скрипта необходимо зарегистрировать команду и привязку в ossec.conf на менеджере:
<!-- Определение команды -->
<command>
<name>custom-block</name>
<executable>custom-block</executable>
<timeout_allowed>yes</timeout_allowed>
</command>
<!-- Привязка к правилам -->
<active-response>
<command>custom-block</command>
<location>local</location>
<rules_id>5710,5712</rules_id>
<timeout>3600</timeout>
</active-response>Параметры <active-response>:
| Параметр | Описание |
|---|---|
<location> | local (на агенте), server (на менеджере), defined-agent (конкретный агент), all (все агенты) |
<rules_id> | Список ID правил, при срабатывании которых запускается скрипт |
<rules_group> | Группа правил для триггера |
<level> | Минимальный уровень алерта |
<timeout> | Время в секундах до автоматической отмены действия (команда delete) |
Подробнее о модуле Active Response читайте в разделе Возможности Wazuh .
Пользовательские Wodle-модули
Wodle (Wazuh Module) - механизм расширения агента для выполнения произвольных задач по расписанию. Результаты работы wodle записываются в лог, который анализируется движком правил.
Wodle command
Простейший способ создания пользовательского wodle - использование встроенного wodle command:
<wodle name="command">
<disabled>no</disabled>
<tag>system-audit</tag>
<command>/var/ossec/wodles/custom-audit.sh</command>
<interval>1h</interval>
<ignore_output>no</ignore_output>
<run_on_start>yes</run_on_start>
<timeout>120</timeout>
</wodle>Пример wodle-скрипта: аудит привилегированных процессов
#!/bin/bash
# Custom wodle: audit processes running as root with network connections
TIMESTAMP=$(date -u '+%Y-%m-%dT%H:%M:%S.000Z')
# Find root processes with established connections
ss -tnp | grep ESTAB | while IFS= read -r line; do
PID=$(echo "${line}" | grep -oP 'pid=\K\d+')
if [ -n "${PID}" ]; then
PROC_USER=$(ps -o user= -p "${PID}" 2>/dev/null)
PROC_NAME=$(ps -o comm= -p "${PID}" 2>/dev/null)
if [ "${PROC_USER}" = "root" ]; then
LOCAL=$(echo "${line}" | awk '{print $4}')
REMOTE=$(echo "${line}" | awk '{print $5}')
echo "{\"timestamp\":\"${TIMESTAMP}\",\"wodle\":\"system-audit\",\"process\":\"${PROC_NAME}\",\"pid\":${PID},\"user\":\"root\",\"local_addr\":\"${LOCAL}\",\"remote_addr\":\"${REMOTE}\"}"
fi
fi
doneПравила для обработки wodle-вывода
<rule id="100300" level="0">
<decoded_as>json</decoded_as>
<field name="wodle">system-audit</field>
<description>Custom wodle: system audit event</description>
<group>wodle,system_audit,</group>
</rule>
<rule id="100301" level="8">
<if_sid>100300</if_sid>
<field name="user">root</field>
<field name="remote_addr" negate="yes">127.0.0.1</field>
<description>Root process $(process) has external network connection to $(remote_addr)</description>
<mitre>
<id>T1071</id>
</mitre>
<group>wodle,system_audit,suspicious_network,</group>
</rule>Webhook-приемники
Webhook-приемник позволяет принимать события от внешних систем и преобразовывать их в алерты Wazuh.
Архитектура webhook-приемника
External System ──▶ Webhook Receiver ──▶ Log File ──▶ Wazuh Agent ──▶ Rules
(Python/Flask) (/var/log/) (localfile)Пример: Flask-приемник для GitHub Events
#!/usr/bin/env python3
"""Webhook receiver for GitHub security events."""
import json
import hmac
import hashlib
import logging
from pathlib import Path
from flask import Flask, request, abort
app = Flask(__name__)
WEBHOOK_SECRET = "YOUR_GITHUB_WEBHOOK_SECRET"
LOG_FILE = Path("/var/log/github-security.log")
logging.basicConfig(level=logging.INFO)
def verify_signature(payload: bytes, signature: str) -> bool:
"""Verify the GitHub webhook signature."""
expected = "sha256=" + hmac.new(
WEBHOOK_SECRET.encode(),
payload,
hashlib.sha256,
).hexdigest()
return hmac.compare_digest(expected, signature)
@app.route("/webhook/github", methods=["POST"])
def github_webhook():
"""Handle GitHub webhook events."""
signature = request.headers.get("X-Hub-Signature-256", "")
if not verify_signature(request.data, signature):
abort(403)
event_type = request.headers.get("X-GitHub-Event", "unknown")
payload = request.get_json()
if event_type == "security_advisory":
log_entry = {
"source": "github",
"event": "security_advisory",
"severity": payload.get("security_advisory", {}).get("severity"),
"summary": payload.get("security_advisory", {}).get("summary"),
"cve_id": payload.get("security_advisory", {}).get("cve_id"),
"repository": payload.get("repository", {}).get("full_name"),
}
with open(LOG_FILE, "a") as f:
f.write(json.dumps(log_entry) + "\n")
elif event_type == "repository_vulnerability_alert":
log_entry = {
"source": "github",
"event": "vulnerability_alert",
"package": payload.get("alert", {}).get("affected_package_name"),
"severity": payload.get("alert", {}).get("severity"),
"repository": payload.get("repository", {}).get("full_name"),
}
with open(LOG_FILE, "a") as f:
f.write(json.dumps(log_entry) + "\n")
return "", 200
if __name__ == "__main__":
app.run(host="0.0.0.0", port=8080)Конфигурация Wazuh для чтения вебхук-логов
На агенте, где работает webhook-приемник:
<localfile>
<log_format>json</log_format>
<location>/var/log/github-security.log</location>
</localfile>Правила для обработки вебхук-событий
<rule id="100310" level="0">
<decoded_as>json</decoded_as>
<field name="source">github</field>
<description>GitHub webhook event received</description>
<group>github,webhook,</group>
</rule>
<rule id="100311" level="10">
<if_sid>100310</if_sid>
<field name="event">security_advisory</field>
<field name="severity">critical</field>
<description>GitHub critical security advisory: $(summary)</description>
<mitre>
<id>T1190</id>
</mitre>
<group>github,vulnerability,critical,</group>
</rule>
<rule id="100312" level="7">
<if_sid>100310</if_sid>
<field name="event">vulnerability_alert</field>
<description>GitHub vulnerability alert for package $(package) in $(repository)</description>
<group>github,vulnerability,</group>
</rule>Обработка алертов через Python
Для пакетной обработки алертов Wazuh можно использовать API или прямое чтение файла алертов.
Чтение алертов из JSON-файла
#!/usr/bin/env python3
"""Batch alert processing from Wazuh alerts.json."""
import json
from pathlib import Path
from collections import Counter
from typing import Iterator
ALERTS_FILE = Path("/var/ossec/logs/alerts/alerts.json")
def read_alerts(filepath: Path, max_lines: int = 10000) -> Iterator[dict]:
"""Read alerts from the JSON log file."""
with open(filepath) as f:
for i, line in enumerate(f):
if i >= max_lines:
break
try:
yield json.loads(line.strip())
except json.JSONDecodeError:
continue
def analyze_alerts(alerts: Iterator[dict]) -> dict:
"""Analyze alert distribution."""
level_counter = Counter()
rule_counter = Counter()
agent_counter = Counter()
mitre_counter = Counter()
for alert in alerts:
rule = alert.get("rule", {})
level_counter[rule.get("level", 0)] += 1
rule_counter[rule.get("id", "unknown")] += 1
agent_counter[alert.get("agent", {}).get("name", "unknown")] += 1
for technique in rule.get("mitre", {}).get("id", []):
mitre_counter[technique] += 1
return {
"by_level": dict(level_counter.most_common(10)),
"top_rules": dict(rule_counter.most_common(10)),
"top_agents": dict(agent_counter.most_common(10)),
"top_mitre": dict(mitre_counter.most_common(10)),
}
if __name__ == "__main__":
alerts = read_alerts(ALERTS_FILE)
report = analyze_alerts(alerts)
print(json.dumps(report, indent=2))Потоковая обработка через API
#!/usr/bin/env python3
"""Real-time alert monitoring via Wazuh API."""
import time
import json
import requests
import urllib3
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
WAZUH_API = "https://localhost:55000"
CREDENTIALS = ("wazuh-wui", "YOUR_PASSWORD")
POLL_INTERVAL = 30
def get_token() -> str:
"""Obtain a JWT token."""
response = requests.post(
f"{WAZUH_API}/security/user/authenticate?raw=true",
auth=CREDENTIALS,
verify=False,
)
response.raise_for_status()
return response.text
def get_recent_alerts(token: str, minutes: int = 5) -> list[dict]:
"""Fetch alerts from the last N minutes via the indexer."""
headers = {"Authorization": f"Bearer {token}"}
response = requests.get(
f"{WAZUH_API}/manager/logs",
headers=headers,
params={
"limit": 100,
"sort": "-timestamp",
"tag": "ossec-analysisd",
},
verify=False,
)
response.raise_for_status()
return response.json().get("data", {}).get("affected_items", [])
def process_alert(alert: dict) -> None:
"""Process a single alert."""
level = alert.get("level", "")
description = alert.get("description", "")
timestamp = alert.get("timestamp", "")
print(f"[{timestamp}] Level {level}: {description}")
def main():
token = get_token()
while True:
try:
alerts = get_recent_alerts(token)
for alert in alerts:
process_alert(alert)
except requests.exceptions.HTTPError as e:
if e.response.status_code == 401:
token = get_token()
continue
raise
time.sleep(POLL_INTERVAL)
if __name__ == "__main__":
main()Устранение неполадок
Скрипт интеграции не запускается
- Проверьте права и владельца файла:
ls -la /var/ossec/integrations/custom-*
# Ожидается: -rwxr-x--- root wazuh- Убедитесь, что имя файла начинается с
custom-:
# Правильно: custom-myintegration
# Неправильно: myintegration, integration-custom- Проверьте логи integratord:
tail -f /var/ossec/logs/integrations.logActive Response скрипт не выполняется
- Проверьте наличие скрипта на агенте:
ls -la /var/ossec/active-response/bin/custom-block- Убедитесь, что команда зарегистрирована в
ossec.conf:
grep -A3 "custom-block" /var/ossec/etc/ossec.conf- Проверьте логи Active Response:
tail -f /var/ossec/logs/active-responses.log- Протестируйте скрипт вручную:
echo '{"version":1,"command":"add","parameters":{"alert":{"data":{"srcip":"10.0.0.1"}}}}' \
| /var/ossec/active-response/bin/custom-blockWodle не генерирует события
- Проверьте, что wodle включен (
<disabled>no</disabled>) - Убедитесь, что скрипт wodle выводит данные в stdout
- Проверьте права на исполнение скрипта
- Просмотрите логи менеджера:
grep "wodle" /var/ossec/logs/ossec.logWebhook-приемник не создает алерты
- Проверьте, что лог-файл заполняется данными:
tail -f /var/log/github-security.log- Убедитесь, что
<localfile>настроен на агенте и указывает на правильный файл - Проверьте, что формат лога соответствует указанному в
<log_format>(json/syslog) - Протестируйте декодирование через
wazuh-logtest:
/var/ossec/bin/wazuh-logtestВведите строку из лог-файла и проверьте, что декодер и правило срабатывают корректно.
Подробнее о настройке встроенных интеграций читайте в разделе SIEM-интеграции . Справочник API для программного взаимодействия доступен в разделе REST API .