martes, 17 de diciembre de 2024

Análisis Forense en Entornos OT Tras un Ciberataque(II)

Parte 2: Análisis de Evidencias y Detección de Anomalías

Introducción

Tras la recolección inicial de evidencias detallada en la primera parte, esta segunda entrega se centra en el análisis profundo de los datos recopilados en un entorno OT comprometido. El análisis forense en sistemas industriales requiere un enfoque especializado que combine el conocimiento de protocolos industriales, comportamiento de procesos y técnicas forenses avanzadas.

1. Análisis de Tráfico de Red Industrial

1.1 Análisis de Protocolos Industriales

1.1.1 Modbus TCP


# Script de análisis de tráfico Modbus

from scapy.all import *

 

def analyze_modbus_traffic(pcap_file):

    packets = rdpcap(pcap_file)

    write_operations = {}

   

    for pkt in packets:

        if TCP in pkt and pkt[TCP].dport == 502:

            try:

                # Analizar función code 16 (Write Multiple Registers)

                if pkt[Raw].load[7] == 16:

                    src_ip = pkt[IP].src

                    register = int.from_bytes(pkt[Raw].load[8:10], 'big')

                    if src_ip not in write_operations:

                        write_operations[src_ip] = []

                    write_operations[src_ip].append({

                        'timestamp': pkt.time,

                        'register': register,

                        'data': pkt[Raw].load[13:]

                    })

            except:

                continue

   

    return write_operations

1.1.2 S7comm

# Análisis de comunicaciones S7

def analyze_s7comm(pcap_file):

    commands = {

        '0x29': 'START',

        '0x28': 'STOP',

        '0x1A': 'PI_SERVICE',

        '0x1B': 'PLC_STOP',

        '0x1C': 'PLC_HOT_START',

        '0x1D': 'PLC_COLD_START',

        '0x1E': 'COPY_RAM_TO_ROM',

        '0x1F': 'COMPRESS',

        '0x28': 'INSERT',

        '0x29': 'DELETE'

    }

   

    # Implementación del análisis

    # ...

1.2 Detección de Anomalías en el Tráfico

  1. Patrones Anormales


def detect_anomalies(traffic_data):

    anomalies = []

   

    # Verificar frecuencia inusual de escrituras

    write_frequency = calculate_write_frequency(traffic_data)

    baseline = get_baseline_frequency()

   

    if write_frequency > baseline * 1.5:

        anomalies.append({

            'type': 'high_write_frequency',

            'details': f'Write frequency {write_frequency} exceeds baseline {baseline}'

        })

   

    # Verificar rangos de valores

    for operation in traffic_data:

        if not is_value_in_range(operation['value']):

            anomalies.append({

                'type': 'out_of_range_value',

                'details': f'Value {operation["value"]} outside normal range'

            })

   

    return anomalies

2. Análisis de Memoria de Sistemas Críticos

2.1 Análisis de Memoria Volátil

2.1.1 Procesos y DLLs Sospechosos


# Análisis con Volatility

def analyze_processes(memory_dump):

    suspicious_processes = []

   

    # Listar procesos

    processes = run_volatility_command(memory_dump, "pslist")

   

    # Buscar anomalías en procesos

    for process in processes:

        if is_suspicious_path(process.path):

            suspicious_processes.append(process)

       

        # Verificar DLLs cargadas

        dlls = get_process_dlls(process.pid)

        for dll in dlls:

            if is_suspicious_dll(dll):

                suspicious_processes.append({

                    'process': process,

                    'suspicious_dll': dll

                })

   

    return suspicious_processes

2.1.2 Análisis de Conexiones de Red


def analyze_network_connections(memory_dump):

    suspicious_connections = []

   

    # Obtener conexiones activas

    connections = run_volatility_command(memory_dump, "netscan")

   

    # Analizar cada conexión

    for conn in connections:

        if is_suspicious_ip(conn.remote_ip):

            suspicious_connections.append(conn)

       

        if is_unusual_port(conn.local_port, conn.process_name):

            suspicious_connections.append(conn)

   

    return suspicious_connections

3. Análisis de Sistemas de Control

3.1 Análisis de Programas PLC

3.1.1 Detección de Modificaciones en Lógica


def analyze_plc_program(current_program, baseline_program):

    differences = {

        'new_blocks': [],

        'modified_blocks': [],

        'deleted_blocks': []

    }

   

    # Comparar bloques de programa

    current_blocks = extract_program_blocks(current_program)

    baseline_blocks = extract_program_blocks(baseline_program)

   

    # Detectar nuevos bloques

    for block in current_blocks:

        if block not in baseline_blocks:

            differences['new_blocks'].append(block)

   

    # Detectar bloques modificados

    for block in current_blocks:

        if block in baseline_blocks:

            if calculate_block_hash(current_blocks[block]) != calculate_block_hash(baseline_blocks[block]):

                differences['modified_blocks'].append(block)

   

    # Detectar bloques eliminados

    for block in baseline_blocks:

        if block not in current_blocks:

            differences['deleted_blocks'].append(block)

   

    return differences

3.1.2 Análisis de Variables Críticas


def analyze_critical_variables(program_data):

    critical_changes = []

   

    # Definir variables críticas

    critical_vars = [

        'EMERGENCY_STOP',

        'SAFETY_INTERLOCK',

        'PRESSURE_LIMIT',

        'TEMPERATURE_MAX',

        'FLOW_SETPOINT'

    ]

   

    # Analizar cambios en variables

    for var in critical_vars:

        current_value = get_variable_value(program_data, var)

        baseline_value = get_baseline_value(var)

       

        if current_value != baseline_value:

            critical_changes.append({

                'variable': var,

                'original': baseline_value,

                'modified': current_value,

                'timestamp': get_last_modification(var)

            })

   

    return critical_changes

4. Análisis de Sistemas SCADA

4.1 Análisis de Bases de Datos Historiador


-- Consulta para detectar cambios bruscos en variables

SELECT

    timestamp,

    tag_name,

    value,

    LAG(value) OVER (PARTITION BY tag_name ORDER BY timestamp) as prev_value,

    ((value - LAG(value) OVER (PARTITION BY tag_name ORDER BY timestamp)) /

     LAG(value) OVER (PARTITION BY tag_name ORDER BY timestamp)) * 100 as change_percentage

FROM historian_data

WHERE timestamp BETWEEN @incident_start AND @incident_end

    AND ABS(((value - LAG(value) OVER (PARTITION BY tag_name ORDER BY timestamp)) /

             LAG(value) OVER (PARTITION BY tag_name ORDER BY timestamp)) * 100) > 20

ORDER BY timestamp;

4.2 Análisis de Logs de Eventos


def analyze_scada_logs(log_file):

    suspicious_events = []

   

    # Patrones de eventos sospechosos

    patterns = [

        r"Login failed.*",

        r"Configuration changed.*",

        r"Alarm acknowledged.*",

        r"Set point modified.*",

        r"Program download.*"

    ]

   

    with open(log_file, 'r') as f:

        for line in f:

            for pattern in patterns:

                if re.match(pattern, line):

                    event = parse_event(line)

                    if is_suspicious_timing(event['timestamp']) or \

                       is_unauthorized_user(event['user']) or \

                       is_unusual_action(event['action']):

                        suspicious_events.append(event)

   

    return suspicious_events

5. Correlación de Eventos

5.1 Timeline de Actividades Sospechosas


def create_incident_timeline(evidence_sources):

    timeline = []

   

    # Recopilar eventos de múltiples fuentes

    for source in evidence_sources:

        if source['type'] == 'network_capture':

            timeline.extend(analyze_network_events(source['data']))

        elif source['type'] == 'memory_dump':

            timeline.extend(analyze_memory_events(source['data']))

        elif source['type'] == 'plc_program':

            timeline.extend(analyze_plc_events(source['data']))

        elif source['type'] == 'scada_logs':

            timeline.extend(analyze_scada_events(source['data']))

   

    # Ordenar eventos cronológicamente

    timeline.sort(key=lambda x: x['timestamp'])

   

    # Identificar cadenas de eventos relacionados

    correlated_events = correlate_events(timeline)

   

    return correlated_events

5.2 Análisis de Causa Raíz


def perform_root_cause_analysis(timeline_data):

    root_causes = []

   

    # Identificar eventos iniciales

    initial_events = find_initial_events(timeline_data)

   

    for event in initial_events:

        # Analizar cadena de eventos

        event_chain = trace_event_chain(event, timeline_data)

       

        # Identificar condiciones que permitieron el evento

        enabling_conditions = identify_enabling_conditions(event)

       

        # Evaluar impacto

        impact = assess_impact(event_chain)

       

        root_causes.append({

            'initial_event': event,

            'event_chain': event_chain,

            'enabling_conditions': enabling_conditions,

            'impact': impact

        })

   

    return root_causes

6. Documentación de Hallazgos

6.1 Formato de Reporte Técnico


# Reporte de Análisis Forense

## Información General

- Caso ID: [ID]

- Fecha de Análisis: [FECHA]

- Analista: [NOMBRE]

 

## Resumen Ejecutivo

[Breve descripción de los hallazgos principales]

 

## Evidencias Analizadas

1. Capturas de Red

   - Archivo: [NOMBRE]

   - Hash: [HASH]

   - Hallazgos principales:

     * [Hallazgo 1]

     * [Hallazgo 2]

 

2. Volcados de Memoria

   [Detalles...]

 

## Timeline de Eventos

[Cronología detallada]

 

## Análisis Técnico Detallado

[Detalles técnicos y evidencias]

 

## Conclusiones

[Conclusiones del análisis]

 

## Recomendaciones

[Recomendaciones de seguridad]

Conclusión

El análisis detallado de las evidencias en un entorno OT requiere una combinación de conocimientos especializados en sistemas industriales y técnicas forenses avanzadas. La correlación de eventos entre diferentes fuentes de evidencia es crucial para establecer una imagen completa del incidente.

En la tercera y última parte de esta serie, abordaremos la fase de reporting, recomendaciones de remediación y medidas preventivas para evitar futuros incidentes.

Continuará en la Parte 3: Reporting, Remediación y Prevención

No hay comentarios:

Publicar un comentario

¿𝗘𝘀𝘁á 𝘁𝘂 𝗼𝗿𝗴𝗮𝗻𝗶𝘇𝗮𝗰𝗶ó𝗻 𝗽𝗿𝗲𝗽𝗮𝗿𝗮𝗱𝗮 𝗽𝗮𝗿𝗮 𝗮𝗱𝗼𝗽𝘁𝗮𝗿 𝗜𝗔 𝗲𝗻 𝗰𝗶𝗯𝗲𝗿𝘀𝗲𝗴𝘂𝗿𝗶𝗱𝗮𝗱? (PARTE 3)

  ¿Está tu organización preparada para adoptar IA en ciberseguridad? La guía esencial para CISOs y PYMES La Inteligencia Artificial (IA) ya...