E-Book Dokumentasi Utama

Membangun Sistem Log Monitoring & Security Terintegrasi ke Telegram

Panduan DevOps & Sysadmin untuk implementasi rekapitulasi trafik Web NPM dan Keamanan SSH OS via Docker Container.

Kategori: DevOps, Sysadmin, Security • Versi: 1.0 (Mei 2026)
šŸ“˜

BAB 1: Pendahuluan & Analisis Arsitektur

Di dunia internet, sebuah server yang memiliki IP publik akan selalu menjadi target pemindaian (scanning) otomatis oleh bot jahat, baik untuk mencari celah keamanan web maupun mencoba masuk lewat jalur SSH (brute force attack).

Sistem monitoring ini dirancang khusus untuk mengisolasi dan mendeteksi aktivitas dari dua jalur utama tersebut:

šŸš€

BAB 2: Modul 1 - Real-time Web Traffic Monitor (app.py)

Modul ini bekerja secara real-time. Menggunakan fitur cooldown 60 detik per IP publik asli agar Telegram Anda terhindar dari banjir pesan (spam bomb).

// Jalur file: /opt/data/docker/docker-hit-monitor/app.py
import os, time, requests, glob, re
from datetime import datetime, timedelta

TELEGRAM_TOKEN = "8987973292:AAEWeJ0UHcEqKhqiwQcfDSB5hxiqy6nBaXM"
TELEGRAM_CHAT_ID = "1032303044"
LOG_DIR, DOMAIN_FILE_PATH = "/var/log/nginx", "/app/domains.txt"
COOLDOWN_TIME = 60
ip_history, TARGET_DOMAINS = {}, []

CLIENT_IP_REGEX = re.compile(r'\[Client\s+([^\]]+)\]')
TIME_REGEX = re.compile(r'\[(\d{2}/[A-Za-z]{3}/\d{4}:\d{2}:\d{2}:\d{2})\s+\+0000\]')

def convert_utc_to_wib(nginx_time_str):
    try:
        utc_time = datetime.strptime(nginx_time_str, "%d/%b/%Y:%H:%M:%S")
        return (utc_time + timedelta(hours=7)).strftime("%d-%m-%Y %H:%M:%S WIB")
    except: return nginx_time_str

def load_target_domains():
    if os.path.exists(DOMAIN_FILE_PATH):
        with open(DOMAIN_FILE_PATH, "r") as f:
            global TARGET_DOMAINS
            TARGET_DOMAINS = [line.strip() for line in f if line.strip()]

def send_tg(msg):
    requests.post(f"https://api.telegram.org/bot{TELEGRAM_TOKEN}/sendMessage", json={"chat_id": TELEGRAM_CHAT_ID, "text": msg, "parse_mode": "Markdown"})

def monitor_logs():
    load_target_domains()
    opened_files = {p: open(p, "r", errors="ignore") for p in glob.glob(f"{LOG_DIR}/proxy-host-*_access.log")}
    for f in opened_files.values(): f.seek(0, os.SEEK_END)

    while True:
        load_target_domains()
        for path, f in opened_files.items():
            line = f.readline()
            if not line: continue
            if any(kw in line.lower() for kw in [".css", ".js", ".png", ".jpg", ".jpeg", ".ico", ".svg", "/api/", "/_stcore/"]): continue

            for domain in TARGET_DOMAINS:
                if domain in line:
                    ip = CLIENT_IP_REGEX.search(line).group(1) if CLIENT_IP_REGEX.search(line) else "Unknown"
                    if ip == "127.0.0.1" or (ip.startswith("172.") and 17 <= int(ip.split(".")[1]) <= 100): continue
                    
                    if time.time() - ip_history.get(ip, 0) < COOLDOWN_TIME: continue
                    ip_history[ip] = time.time()
                    
                    t_str = TIME_REGEX.search(line).group(1) if TIME_REGEX.search(line) else ""
                    msg = f"🚨 *Traffic Detected!*\n🌐 *Domain:* `{domain}`\nšŸ‘¤ *IP:* `{ip}`\nā° *Waktu:* `{convert_utc_to_wib(t_str)}`"
                    send_tg(msg)
        time.sleep(0.5)

if __name__ == "__main__": monitor_logs()
šŸ“Š

BAB 3: Modul 2 - Pusat Agregator & Rekap 10 Menit (apprekap.py)

Modul terpadu ini menggabungkan pencatatan aktivitas Web NPM dan SSH login, menghitung Unique IP, serta mengurutkan daftar "Top Spammer" penyerang server setiap 10 menit sekali.

// Jalur file: /opt/data/docker/docker-hit-monitor/apprekap.py
import os, time, requests, glob, re
from datetime import datetime

TELEGRAM_TOKEN = "8987973292:AAEWeJ0UHcEqKhqiwQcfDSB5hxiqy6nBaXM"
TELEGRAM_CHAT_ID = "1032303044"
NPM_LOG_DIR, SSH_LOG_PATH, DOMAIN_FILE_PATH = "/var/log/nginx", "/var/log/auth_os.log", "/app/domains.txt"
REKAP_INTERVAL = 600

CLIENT_IP_REGEX = re.compile(r'\[Client\s+([^\]]+)\]')
SSH_ACCEPTED_REGEX = re.compile(r'Accepted\s+(password|publickey)\s+for\s+(\S+)\s+from\s+(\S+)')
SSH_FAILED_REGEX = re.compile(r'Failed\s+password\s+for\s+(invalid\s+user\s+)?(\S+)\s+from\s+(\S+)')

web_data, ssh_data, TARGET_DOMAINS = {}, {'success_total': 0, 'failed_total': 0, 'ips': {}}, []

def load_target_domains():
    if os.path.exists(DOMAIN_FILE_PATH):
        with open(DOMAIN_FILE_PATH, "r") as f:
            global TARGET_DOMAINS
            TARGET_DOMAINS = [line.strip() for line in f if line.strip()]

def send_tg(msg):
    requests.post(f"https://api.telegram.org/bot{TELEGRAM_TOKEN}/sendMessage", json={"chat_id": TELEGRAM_CHAT_ID, "text": msg, "parse_mode": "Markdown"})

def generate_and_send_report():
    global web_data, ssh_data
    wib_now = datetime.now().strftime("%d-%m-%Y %H:%M:%S WIB")
    web_report = f"🌐 *[REKAP TRAFIK WEB - 10 MENIT]*\nā° Waktu: `{wib_now}`\n\n"
    has_web = False
    for domain, ips in web_data.items():
        if not ips: continue
        has_web = True
        web_report += f"šŸ“Œ *Domain:* `{domain}`\nšŸ“Š Total Hit: `{sum(ips.values())}x` | šŸ‘¤ Unique IP: `{len(ips)}`\nšŸ” *Top Visitors:*\n"
        for ip, count in sorted(ips.items(), key=lambda x: x[1], reverse=True)[:3]:
            web_report += f"  ā”œ `{ip}` āž” `{count}x hit`\n"
        web_report += "\n"
    if not has_web: web_report += "šŸ’¤ Tidak ada aktivitas web.\n\n"

    ssh_report = f"šŸ”’ *[REKAP KEAMANAN SSH - 10 MENIT]*\nšŸ“Š Sukses: `{ssh_data['success_total']}x` | Gagal: `{ssh_data['failed_total']}x`\n"
    if ssh_data['ips']:
        ssh_report += "āš ļø *Top Attackers:*\n"
        for ip, counts in sorted(ssh_data['ips'].items(), key=lambda x: x[1].get('failed', 0), reverse=True)[:3]:
            ssh_report += f"  ā”œ `{ip}` āž” Gagal: `{counts.get('failed', 0)}x` | Sukses: `{counts.get('success', 0)}x`\n"
    else: ssh_report += "āœ… Tidak ada aktivitas login SSH.\n"

    send_tg(f"{web_report}-----------------------------------------\n{ssh_report}")
    web_data.clear()
    ssh_data.clear(); ssh_data.update({'success_total': 0, 'failed_total': 0, 'ips': {}})

def main_monitor():
    load_target_domains()
    opened_web = {p: open(p, "r", errors="ignore") for p in glob.glob(f"{NPM_LOG_DIR}/proxy-host-*_access.log")}
    for f in opened_web.values(): f.seek(0, os.SEEK_END)
    opened_ssh = open(SSH_LOG_PATH, "r", errors="ignore") if os.path.exists(SSH_LOG_PATH) else None
    if opened_ssh: opened_ssh.seek(0, os.SEEK_END)
    
    last_rekap = time.time()
    while True:
        load_target_domains()
        for path, f in opened_web.items():
            while True:
                line = f.readline()
                if not line: break
                if any(kw in line.lower() for kw in [".css", ".js", ".png", ".jpg", ".jpeg", ".ico", ".svg", "/api/", "/_stcore/"]): continue
                for domain in TARGET_DOMAINS:
                    if domain in line:
                        ip = CLIENT_IP_REGEX.search(line).group(1) if CLIENT_IP_REGEX.search(line) else "Unknown"
                        if ip == "127.0.0.1" or (ip.startswith("172.") and 17 <= int(ip.split(".")[1]) <= 100): continue
                        if domain not in web_data: web_data[domain] = {}
                        web_data[domain][ip] = web_data[domain].get(ip, 0) + 1
        if opened_ssh:
            while True:
                line = opened_ssh.readline()
                if not line: break
                m_acc, m_fail = SSH_ACCEPTED_REGEX.search(line), SSH_FAILED_REGEX.search(line)
                if m_acc:
                    ip = m_acc.group(3)
                    ssh_data['success_total'] += 1
                    if ip not in ssh_data['ips']: ssh_data['ips'][ip] = {'success': 0, 'failed': 0}
                    ssh_data['ips'][ip]['success'] += 1
                elif m_fail:
                    ip = m_fail.group(3)
                    ssh_data['failed_total'] += 1
                    if ip not in ssh_data['ips']: ssh_data['ips'][ip] = {'success': 0, 'failed': 0}
                    ssh_data['ips'][ip]['failed'] += 1

        if time.time() - last_rekap >= REKAP_INTERVAL:
            generate_and_send_report()
            last_rekap = time.time()
        time.sleep(1)

if __name__ == "__main__": main_monitor()
šŸ›”ļø

BAB 4: Manajemen Filter IP Jaringan Internal Docker

Docker secara standar menggunakan rentang subnet kelas B internal (172.16.0.0/12). Jika kita melakukan filter mentah berwujud .startswith("172."), maka IP publik eksternal (seperti range 172.217.x.x kepemilikan Google) dipastikan ikut terbuang.

Solusi Cerdas Bertingkat:

Aplikasi mengekstrak elemen string berbasis tanda titik, memisahkannya menjadi susunan array array, lalu melakukan komparasi integer murni terhadap nilai Oktet Kedua (parts[1]) spesifik pada ambang numerik 17 s/d 100.

Sampel Objek IP Ekstraksi Oktet 2 Kondisi Logika Status Aksi
172.18.0.5 18 17 <= 18 <= 100 DIABAIKAN / SKIP
172.217.16.4 217 Di luar rentang TETAP DICATAT
🐳

BAB 5: Deployment Menggunakan Portainer Stack

Salin konfigurasi Docker Compose YAML terstruktur di bawah ini ke dalam modul **Portainer -> Stacks -> Web Editor** untuk otomatisasi siklus hidup container agregator:

# Format: docker-compose.yml
version: '3.8'

services:
  rekap-monitor:
    image: python:3.11-alpine
    container_name: petroflexx-rekap-monitor
    restart: unless-stopped
    volumes:
      - /opt/data/docker/docker-hit-monitor/apprekap.py:/app/apprekap.py:ro
      - /opt/data/docker/docker-hit-monitor/domains.txt:/app/domains.txt:ro
      - /opt/data/docker/npm/data/logs:/var/log/nginx:ro
      - /var/log/secure:/var/log/auth_os.log:ro
    environment:
      - TZ=Asia/Jakarta
      - PYTHONUNBUFFERED=1
    working_dir: /app
    entrypoint: >
      sh -c "pip install --no-cache-dir requests && python apprekap.py"

Manajemen Kontrol File Target (domains.txt)

Untuk menambah atau membuang domain pengawasan di kemudian hari, Anda cukup melakukan mutasi dokumen teks lokal via SSH tanpa perlu menyentuh restrukturisasi container Docker:

# isi dari /opt/data/docker/docker-hit-monitor/domains.txt
ompetroflexx.id
pmst.petroflexx.id
bi.petroflexx.id