Parte 2: Análisis de Evidencias y Detección de Anomalías
Introducción
Tras la recolección inicial de evidencias detallada en la
primera parte, esta segunda entrega se centra en el análisis profundo de los
datos recopilados en un entorno OT comprometido. El análisis forense en
sistemas industriales requiere un enfoque especializado que combine el
conocimiento de protocolos industriales, comportamiento de procesos y técnicas
forenses avanzadas.
1. Análisis de Tráfico de Red Industrial
1.1 Análisis de Protocolos Industriales
1.1.1 Modbus TCP
# Script de análisis de tráfico Modbus
from scapy.all import *
def analyze_modbus_traffic(pcap_file):
packets =
rdpcap(pcap_file)
write_operations =
{}
for pkt in
packets:
if TCP in pkt
and pkt[TCP].dport == 502:
try:
#
Analizar función code 16 (Write Multiple Registers)
if
pkt[Raw].load[7] == 16:
src_ip = pkt[IP].src
register = int.from_bytes(pkt[Raw].load[8:10], 'big')
if
src_ip not in write_operations:
write_operations[src_ip] = []
write_operations[src_ip].append({
'timestamp': pkt.time,
'register': register,
'data': pkt[Raw].load[13:]
})
except:
continue
return
write_operations
1.1.2 S7comm
# Análisis de comunicaciones S7
def analyze_s7comm(pcap_file):
commands = {
'0x29':
'START',
'0x28':
'STOP',
'0x1A':
'PI_SERVICE',
'0x1B':
'PLC_STOP',
'0x1C':
'PLC_HOT_START',
'0x1D':
'PLC_COLD_START',
'0x1E':
'COPY_RAM_TO_ROM',
'0x1F':
'COMPRESS',
'0x28':
'INSERT',
'0x29':
'DELETE'
}
#
Implementación del análisis
# ...
1.2 Detección de Anomalías en el Tráfico
- Patrones
Anormales
def detect_anomalies(traffic_data):
anomalies = []
# Verificar
frecuencia inusual de escrituras
write_frequency =
calculate_write_frequency(traffic_data)
baseline = get_baseline_frequency()
if write_frequency
> baseline * 1.5:
anomalies.append({
'type':
'high_write_frequency',
'details':
f'Write frequency {write_frequency} exceeds baseline {baseline}'
})
# Verificar
rangos de valores
for operation in
traffic_data:
if not
is_value_in_range(operation['value']):
anomalies.append({
'type': 'out_of_range_value',
'details': f'Value {operation["value"]} outside normal range'
})
return anomalies
2. Análisis de Memoria de Sistemas Críticos
2.1 Análisis de Memoria Volátil
2.1.1 Procesos y DLLs Sospechosos
# Análisis con Volatility
def analyze_processes(memory_dump):
suspicious_processes
= []
# Listar
procesos
processes =
run_volatility_command(memory_dump, "pslist")
# Buscar
anomalías en procesos
for process in
processes:
if
is_suspicious_path(process.path):
suspicious_processes.append(process)
# Verificar
DLLs cargadas
dlls =
get_process_dlls(process.pid)
for dll in
dlls:
if
is_suspicious_dll(dll):
suspicious_processes.append({
'process': process,
'suspicious_dll': dll
})
return
suspicious_processes
2.1.2 Análisis de Conexiones de Red
def analyze_network_connections(memory_dump):
suspicious_connections = []
# Obtener
conexiones activas
connections =
run_volatility_command(memory_dump, "netscan")
# Analizar cada
conexión
for conn in
connections:
if
is_suspicious_ip(conn.remote_ip):
suspicious_connections.append(conn)
if
is_unusual_port(conn.local_port, conn.process_name):
suspicious_connections.append(conn)
return
suspicious_connections
3. Análisis de Sistemas de Control
3.1 Análisis de Programas PLC
3.1.1 Detección de Modificaciones en Lógica
def analyze_plc_program(current_program, baseline_program):
differences = {
'new_blocks':
[],
'modified_blocks': [],
'deleted_blocks': []
}
# Comparar
bloques de programa
current_blocks =
extract_program_blocks(current_program)
baseline_blocks =
extract_program_blocks(baseline_program)
# Detectar
nuevos bloques
for block in
current_blocks:
if block not
in baseline_blocks:
differences['new_blocks'].append(block)
# Detectar
bloques modificados
for block in
current_blocks:
if block in
baseline_blocks:
if
calculate_block_hash(current_blocks[block]) !=
calculate_block_hash(baseline_blocks[block]):
differences['modified_blocks'].append(block)
# Detectar
bloques eliminados
for block in
baseline_blocks:
if block not
in current_blocks:
differences['deleted_blocks'].append(block)
return differences
3.1.2 Análisis de Variables Críticas
def analyze_critical_variables(program_data):
critical_changes =
[]
# Definir
variables críticas
critical_vars = [
'EMERGENCY_STOP',
'SAFETY_INTERLOCK',
'PRESSURE_LIMIT',
'TEMPERATURE_MAX',
'FLOW_SETPOINT'
]
# Analizar
cambios en variables
for var in
critical_vars:
current_value
= get_variable_value(program_data, var)
baseline_value
= get_baseline_value(var)
if current_value
!= baseline_value:
critical_changes.append({
'variable': var,
'original': baseline_value,
'modified': current_value,
'timestamp': get_last_modification(var)
})
return
critical_changes
4. Análisis de Sistemas SCADA
4.1 Análisis de Bases de Datos Historiador
-- Consulta para detectar cambios bruscos en variables
SELECT
timestamp,
tag_name,
value,
LAG(value) OVER
(PARTITION BY tag_name ORDER BY timestamp) as prev_value,
((value -
LAG(value) OVER (PARTITION BY tag_name ORDER BY timestamp)) /
LAG(value) OVER
(PARTITION BY tag_name ORDER BY timestamp)) * 100 as change_percentage
FROM historian_data
WHERE timestamp BETWEEN @incident_start AND @incident_end
AND ABS(((value -
LAG(value) OVER (PARTITION BY tag_name ORDER BY timestamp)) /
LAG(value) OVER (PARTITION BY tag_name ORDER BY timestamp)) * 100) >
20
ORDER BY timestamp;
4.2 Análisis de Logs de Eventos
def analyze_scada_logs(log_file):
suspicious_events
= []
# Patrones de
eventos sospechosos
patterns = [
r"Login
failed.*",
r"Configuration changed.*",
r"Alarm
acknowledged.*",
r"Set
point modified.*",
r"Program
download.*"
]
with
open(log_file, 'r') as f:
for line in f:
for
pattern in patterns:
if
re.match(pattern, line):
event = parse_event(line)
if
is_suspicious_timing(event['timestamp']) or \
is_unauthorized_user(event['user']) or \
is_unusual_action(event['action']):
suspicious_events.append(event)
return
suspicious_events
5. Correlación de Eventos
5.1 Timeline de Actividades Sospechosas
def create_incident_timeline(evidence_sources):
timeline = []
# Recopilar
eventos de múltiples fuentes
for source in evidence_sources:
if
source['type'] == 'network_capture':
timeline.extend(analyze_network_events(source['data']))
elif
source['type'] == 'memory_dump':
timeline.extend(analyze_memory_events(source['data']))
elif
source['type'] == 'plc_program':
timeline.extend(analyze_plc_events(source['data']))
elif
source['type'] == 'scada_logs':
timeline.extend(analyze_scada_events(source['data']))
# Ordenar
eventos cronológicamente
timeline.sort(key=lambda x: x['timestamp'])
# Identificar
cadenas de eventos relacionados
correlated_events
= correlate_events(timeline)
return
correlated_events
5.2 Análisis de Causa Raíz
def perform_root_cause_analysis(timeline_data):
root_causes = []
# Identificar
eventos iniciales
initial_events =
find_initial_events(timeline_data)
for event in
initial_events:
# Analizar
cadena de eventos
event_chain = trace_event_chain(event,
timeline_data)
#
Identificar condiciones que permitieron el evento
enabling_conditions = identify_enabling_conditions(event)
# Evaluar
impacto
impact =
assess_impact(event_chain)
root_causes.append({
'initial_event': event,
'event_chain': event_chain,
'enabling_conditions': enabling_conditions,
'impact':
impact
})
return root_causes
6. Documentación de Hallazgos
6.1 Formato de Reporte Técnico
# Reporte de Análisis Forense
## Información General
- Caso ID: [ID]
- Fecha de Análisis: [FECHA]
- Analista: [NOMBRE]
## Resumen Ejecutivo
[Breve descripción de los hallazgos principales]
## Evidencias Analizadas
1. Capturas de Red
- Archivo: [NOMBRE]
- Hash: [HASH]
- Hallazgos
principales:
* [Hallazgo 1]
* [Hallazgo 2]
2. Volcados de Memoria
[Detalles...]
## Timeline de Eventos
[Cronología detallada]
## Análisis Técnico Detallado
[Detalles técnicos y evidencias]
## Conclusiones
[Conclusiones del análisis]
## Recomendaciones
[Recomendaciones de seguridad]
Conclusión
El análisis detallado de las evidencias en un entorno OT
requiere una combinación de conocimientos especializados en sistemas
industriales y técnicas forenses avanzadas. La correlación de eventos entre
diferentes fuentes de evidencia es crucial para establecer una imagen completa
del incidente.
En la tercera y última parte de esta serie, abordaremos la
fase de reporting, recomendaciones de remediación y medidas preventivas para
evitar futuros incidentes.
Continuará en la Parte 3: Reporting, Remediación y
Prevención
No hay comentarios:
Publicar un comentario