BAB 1: Pendahuluan & Analisis Arsitektur
Di dunia internet, sebuah server yang memiliki IP publik akan selalu menjadi target pemindaian (scanning) otomatis oleh bot jahat, baik untuk mencari celah keamanan web maupun mencoba masuk lewat jalur SSH (brute force attack).
Sistem monitoring ini dirancang khusus untuk mengisolasi dan mendeteksi aktivitas dari dua jalur utama tersebut:
- Lalu Lintas Web (Nginx Proxy Manager): Menyaring trafik domain target, membersihkan log dari aset statis, serta mengecualikan IP internal Docker.
- Keamanan OS (Jalur SSH): Membaca log autentikasi OS secara instan jika terjadi kegagalan atau keberhasilan login.
BAB 2: Modul 1 - Real-time Web Traffic Monitor (app.py)
Modul ini bekerja secara real-time. Menggunakan fitur cooldown 60 detik per IP publik asli agar Telegram Anda terhindar dari banjir pesan (spam bomb).
import os, time, requests, glob, re
from datetime import datetime, timedelta
TELEGRAM_TOKEN = "8987973292:AAEWeJ0UHcEqKhqiwQcfDSB5hxiqy6nBaXM"
TELEGRAM_CHAT_ID = "1032303044"
LOG_DIR, DOMAIN_FILE_PATH = "/var/log/nginx", "/app/domains.txt"
COOLDOWN_TIME = 60
ip_history, TARGET_DOMAINS = {}, []
CLIENT_IP_REGEX = re.compile(r'\[Client\s+([^\]]+)\]')
TIME_REGEX = re.compile(r'\[(\d{2}/[A-Za-z]{3}/\d{4}:\d{2}:\d{2}:\d{2})\s+\+0000\]')
def convert_utc_to_wib(nginx_time_str):
try:
utc_time = datetime.strptime(nginx_time_str, "%d/%b/%Y:%H:%M:%S")
return (utc_time + timedelta(hours=7)).strftime("%d-%m-%Y %H:%M:%S WIB")
except: return nginx_time_str
def load_target_domains():
if os.path.exists(DOMAIN_FILE_PATH):
with open(DOMAIN_FILE_PATH, "r") as f:
global TARGET_DOMAINS
TARGET_DOMAINS = [line.strip() for line in f if line.strip()]
def send_tg(msg):
requests.post(f"https://api.telegram.org/bot{TELEGRAM_TOKEN}/sendMessage", json={"chat_id": TELEGRAM_CHAT_ID, "text": msg, "parse_mode": "Markdown"})
def monitor_logs():
load_target_domains()
opened_files = {p: open(p, "r", errors="ignore") for p in glob.glob(f"{LOG_DIR}/proxy-host-*_access.log")}
for f in opened_files.values(): f.seek(0, os.SEEK_END)
while True:
load_target_domains()
for path, f in opened_files.items():
line = f.readline()
if not line: continue
if any(kw in line.lower() for kw in [".css", ".js", ".png", ".jpg", ".jpeg", ".ico", ".svg", "/api/", "/_stcore/"]): continue
for domain in TARGET_DOMAINS:
if domain in line:
ip = CLIENT_IP_REGEX.search(line).group(1) if CLIENT_IP_REGEX.search(line) else "Unknown"
if ip == "127.0.0.1" or (ip.startswith("172.") and 17 <= int(ip.split(".")[1]) <= 100): continue
if time.time() - ip_history.get(ip, 0) < COOLDOWN_TIME: continue
ip_history[ip] = time.time()
t_str = TIME_REGEX.search(line).group(1) if TIME_REGEX.search(line) else ""
msg = f"šØ *Traffic Detected!*\nš *Domain:* `{domain}`\nš¤ *IP:* `{ip}`\nā° *Waktu:* `{convert_utc_to_wib(t_str)}`"
send_tg(msg)
time.sleep(0.5)
if __name__ == "__main__": monitor_logs()
BAB 3: Modul 2 - Pusat Agregator & Rekap 10 Menit (apprekap.py)
Modul terpadu ini menggabungkan pencatatan aktivitas Web NPM dan SSH login, menghitung Unique IP, serta mengurutkan daftar "Top Spammer" penyerang server setiap 10 menit sekali.
import os, time, requests, glob, re
from datetime import datetime
TELEGRAM_TOKEN = "8987973292:AAEWeJ0UHcEqKhqiwQcfDSB5hxiqy6nBaXM"
TELEGRAM_CHAT_ID = "1032303044"
NPM_LOG_DIR, SSH_LOG_PATH, DOMAIN_FILE_PATH = "/var/log/nginx", "/var/log/auth_os.log", "/app/domains.txt"
REKAP_INTERVAL = 600
CLIENT_IP_REGEX = re.compile(r'\[Client\s+([^\]]+)\]')
SSH_ACCEPTED_REGEX = re.compile(r'Accepted\s+(password|publickey)\s+for\s+(\S+)\s+from\s+(\S+)')
SSH_FAILED_REGEX = re.compile(r'Failed\s+password\s+for\s+(invalid\s+user\s+)?(\S+)\s+from\s+(\S+)')
web_data, ssh_data, TARGET_DOMAINS = {}, {'success_total': 0, 'failed_total': 0, 'ips': {}}, []
def load_target_domains():
if os.path.exists(DOMAIN_FILE_PATH):
with open(DOMAIN_FILE_PATH, "r") as f:
global TARGET_DOMAINS
TARGET_DOMAINS = [line.strip() for line in f if line.strip()]
def send_tg(msg):
requests.post(f"https://api.telegram.org/bot{TELEGRAM_TOKEN}/sendMessage", json={"chat_id": TELEGRAM_CHAT_ID, "text": msg, "parse_mode": "Markdown"})
def generate_and_send_report():
global web_data, ssh_data
wib_now = datetime.now().strftime("%d-%m-%Y %H:%M:%S WIB")
web_report = f"š *[REKAP TRAFIK WEB - 10 MENIT]*\nā° Waktu: `{wib_now}`\n\n"
has_web = False
for domain, ips in web_data.items():
if not ips: continue
has_web = True
web_report += f"š *Domain:* `{domain}`\nš Total Hit: `{sum(ips.values())}x` | š¤ Unique IP: `{len(ips)}`\nš *Top Visitors:*\n"
for ip, count in sorted(ips.items(), key=lambda x: x[1], reverse=True)[:3]:
web_report += f" ā `{ip}` ā `{count}x hit`\n"
web_report += "\n"
if not has_web: web_report += "š¤ Tidak ada aktivitas web.\n\n"
ssh_report = f"š *[REKAP KEAMANAN SSH - 10 MENIT]*\nš Sukses: `{ssh_data['success_total']}x` | Gagal: `{ssh_data['failed_total']}x`\n"
if ssh_data['ips']:
ssh_report += "ā ļø *Top Attackers:*\n"
for ip, counts in sorted(ssh_data['ips'].items(), key=lambda x: x[1].get('failed', 0), reverse=True)[:3]:
ssh_report += f" ā `{ip}` ā Gagal: `{counts.get('failed', 0)}x` | Sukses: `{counts.get('success', 0)}x`\n"
else: ssh_report += "ā
Tidak ada aktivitas login SSH.\n"
send_tg(f"{web_report}-----------------------------------------\n{ssh_report}")
web_data.clear()
ssh_data.clear(); ssh_data.update({'success_total': 0, 'failed_total': 0, 'ips': {}})
def main_monitor():
load_target_domains()
opened_web = {p: open(p, "r", errors="ignore") for p in glob.glob(f"{NPM_LOG_DIR}/proxy-host-*_access.log")}
for f in opened_web.values(): f.seek(0, os.SEEK_END)
opened_ssh = open(SSH_LOG_PATH, "r", errors="ignore") if os.path.exists(SSH_LOG_PATH) else None
if opened_ssh: opened_ssh.seek(0, os.SEEK_END)
last_rekap = time.time()
while True:
load_target_domains()
for path, f in opened_web.items():
while True:
line = f.readline()
if not line: break
if any(kw in line.lower() for kw in [".css", ".js", ".png", ".jpg", ".jpeg", ".ico", ".svg", "/api/", "/_stcore/"]): continue
for domain in TARGET_DOMAINS:
if domain in line:
ip = CLIENT_IP_REGEX.search(line).group(1) if CLIENT_IP_REGEX.search(line) else "Unknown"
if ip == "127.0.0.1" or (ip.startswith("172.") and 17 <= int(ip.split(".")[1]) <= 100): continue
if domain not in web_data: web_data[domain] = {}
web_data[domain][ip] = web_data[domain].get(ip, 0) + 1
if opened_ssh:
while True:
line = opened_ssh.readline()
if not line: break
m_acc, m_fail = SSH_ACCEPTED_REGEX.search(line), SSH_FAILED_REGEX.search(line)
if m_acc:
ip = m_acc.group(3)
ssh_data['success_total'] += 1
if ip not in ssh_data['ips']: ssh_data['ips'][ip] = {'success': 0, 'failed': 0}
ssh_data['ips'][ip]['success'] += 1
elif m_fail:
ip = m_fail.group(3)
ssh_data['failed_total'] += 1
if ip not in ssh_data['ips']: ssh_data['ips'][ip] = {'success': 0, 'failed': 0}
ssh_data['ips'][ip]['failed'] += 1
if time.time() - last_rekap >= REKAP_INTERVAL:
generate_and_send_report()
last_rekap = time.time()
time.sleep(1)
if __name__ == "__main__": main_monitor()
BAB 4: Manajemen Filter IP Jaringan Internal Docker
Docker secara standar menggunakan rentang subnet kelas B internal (172.16.0.0/12). Jika kita melakukan filter mentah berwujud .startswith("172."), maka IP publik eksternal (seperti range 172.217.x.x kepemilikan Google) dipastikan ikut terbuang.
Solusi Cerdas Bertingkat:
Aplikasi mengekstrak elemen string berbasis tanda titik, memisahkannya menjadi susunan array array, lalu melakukan komparasi integer murni terhadap nilai Oktet Kedua (parts[1]) spesifik pada ambang numerik 17 s/d 100.
| Sampel Objek IP | Ekstraksi Oktet 2 | Kondisi Logika | Status Aksi |
|---|---|---|---|
| 172.18.0.5 | 18 | 17 <= 18 <= 100 | DIABAIKAN / SKIP |
| 172.217.16.4 | 217 | Di luar rentang | TETAP DICATAT |
BAB 5: Deployment Menggunakan Portainer Stack
Salin konfigurasi Docker Compose YAML terstruktur di bawah ini ke dalam modul **Portainer -> Stacks -> Web Editor** untuk otomatisasi siklus hidup container agregator:
version: '3.8'
services:
rekap-monitor:
image: python:3.11-alpine
container_name: petroflexx-rekap-monitor
restart: unless-stopped
volumes:
- /opt/data/docker/docker-hit-monitor/apprekap.py:/app/apprekap.py:ro
- /opt/data/docker/docker-hit-monitor/domains.txt:/app/domains.txt:ro
- /opt/data/docker/npm/data/logs:/var/log/nginx:ro
- /var/log/secure:/var/log/auth_os.log:ro
environment:
- TZ=Asia/Jakarta
- PYTHONUNBUFFERED=1
working_dir: /app
entrypoint: >
sh -c "pip install --no-cache-dir requests && python apprekap.py"
Manajemen Kontrol File Target (domains.txt)
Untuk menambah atau membuang domain pengawasan di kemudian hari, Anda cukup melakukan mutasi dokumen teks lokal via SSH tanpa perlu menyentuh restrukturisasi container Docker:
ompetroflexx.id
pmst.petroflexx.id
bi.petroflexx.id