How to feed Elasticsearch the Snort alert log

elasticsearchlogstash

I started off yesterday with an ELK howto and got ELK up and running rather easily. Next thing I wanted to do was plug my Snort alert log into it. I configured Logstash (shown below) with a filter and an absolutely nasty Grok regex to split up all the fields using grokdebug to test it. Then I turned on snort and the alert log started filling up followed by a logstash restart (after doing --configtest of course). I installed the ES "Head" plugin so I could poke around a bit. It seems that my snort alerts are being mapped with my syslog mapping as was created in the howto (pic below). In ES, I can't seem to search using any of the fields defined in the logstash config (ids_proto, src_ip, dst_ip). Why is this? Do I need to define a mapping or is something else messed up here?

enter image description here

input 
{
  file {
    path => "/var/log/snort/alert"
    type => "snort_tcp"  # a type to identify those logs (will need this later)
    start_position => beginning 
    ignore_older => 0      # Setting ignore_older to 0 disables file age checking so that the tutorial file is processed even though it’s older than a day. 
    sincedb_path => "/dev/null"
  }
}


filter {
 if [type] == "snort_tcp" {
      grok {
        add_tag => [ "IDS" ]
        match => [ "message", "%{MONTHNUM:month}\/%{MONTHDAY:day}-%{HOUR:hour}:%{MINUTE:minute}:%{SECOND:second}\s+\[\*\*\]\s+\[%{INT:ids_gid}\:%{INT:ids_sid}\:%{INT:ids_rev}\]\s+%{DATA:ids_proto}\s+\[\*\*\]\s+\[Classification:\s+%{DATA:ids_classification}\]\s+\[Priority:\s+%{INT:priority}\]\s+\{%{WORD:ids_proto}\}\s+%{IP:src_ip}\:%{INT:src_port}\s+\-\>\s+%{IP:dst_ip}\:%{INT:dst_port}"]
      }
    }
    geoip {
      source => "[src_ip]"
      target => "SrcGeo"
    }
    geoip {
      source => "[dst_ip]"
      target => "DstGeo"
    }
        if [priority] == "1" {
      mutate {
        add_field => { "severity" => "High" }
      }
    }
    if [priority] == "2" {
      mutate {
        add_field => { "severity" => "Medium" }
      }
    }
    if [priority] == "3" {
      mutate {
        add_field => { "severity" => "Low" }
      }
    }
    if [ids_proto] {
      if [ids_proto] =~ /^GPL/ {
        mutate {
          add_tag => [ "Snort-ET-sig" ]
          add_field => [ "ids_rule_type", "Emerging Threats" ]
        }
      }
      if [ids_proto] =~ /^ET/ {
        mutate {
          add_tag => [ "Snort-ET-sig" ]
          add_field => [ "ids_rule_type", "Emerging Threats" ]
        }
      }
      if "Snort-ET-sig" not in [tags] {
        mutate {
          add_tag => [ "Snort-sig" ]
          add_field => [ "ids_rule_type", "Snort" ]
        }
      }
    }
if "Snort-sig" in [tags] {
      if [ids_gid] == "1" {
        mutate {
          add_field => [ "Signature_Info", "http://rootedyour/.com/snortsid?sid=%{ids_sid}" ]
        }
      }
      if [ids_gid] != "1" {
        mutate {
          add_field => [ "Signature_Info", "http://rootedyour.com/snortsid?sid=%{ids_gid}-%{ids_sid}" ]
        }
      }
    }
    if "Snort-ET-sig" in [tags] {
      mutate {
        add_field => [ "Signature_Info", "http://doc.emergingthreats.net/bin/view/Main/%{ids_sid}" ]
      }
    }
  }



output 
{
  elasticsearch 
   {
    hosts => ["localhost:9200"]
    manage_template => false
    index => "snort_tcp-%{+YYYY.MM.dd}"     
  }
}

Best Answer

A couple things here:

  • The default mapping logstash creates sets all string fields as not-analyzed, which tends to be more friendly to downstream viewing tools.
  • Not setting a mapping at all, like you're doing, uses the default ElasticSearch dynamic mapping, which isn't a good fit for Logstash.

For testing things out, I recommend the following output section:

output 
{
 elasticsearch 
  {
    hosts => ["localhost:9200"]
    manage_template => true
    index => "logstash-%{+YYYY.MM.dd}"     
  }
}

When set this way, the logstash indices will get the the default logstash mappings, which may behave closer to what you expect. If that's the case, you'll probably have to define a mapping-file.

output 
{
 elasticsearch 
  {
    hosts => ["localhost:9200"]
    manage_template => true
    index => "snort_tcp-%{+YYYY.MM.dd}"
    template => "/etc/logstash/template.json"
    template_name => "snort_tcp"
  }
}
Related Topic