Logstash – splitting an event into two based on objects in an included array

kibanalogstash

I'm working with logstash for the first time, and I'm trying to take JSON reports from amavisd-new in for searching and analysis. Amavisd-new is able to write the json logging to redis, and I have everything importing perfectly, and have started learning my way through all this.

But I have one issue – the format of the JSON report from amavis looks like the following – note that "recipients" has an array, with one entry for each recipient.

I'd like to split the entire event into two – one for each recipient, leaving all the other fields the same, but replacing the "action", "ccat_main", "queued_as", etc fields from each recipient array member into the main even.

The idea would be that one incoming event with two recipients would result in two separate log events in logstash – one for each person.

I've looked at split for events, but I'm not seeing how to do this – I can't seem to find any appropriate examples anywhere.

So, for real-word examples, given this:

 {
    "@timestamp" => "2014-05-06T09:29:47.048Z",
    "time_unix" => 1399368587.048,
    "time_iso_week_date" => "2014-W19-2",
    "partition" => "19",
    "type" => "amavis",
    "host" => "mailer.example.net",
    "queued_as" => ["3gNFyR4Mfjzc3", "3gNFyR4n6Lzc4"],
    "recipients" => [
      { "action" => "PASS",
        "ccat_main" => "Clean",
        "queued_as" => "3gNFyR4Mfjzc3",
        "rcpt_is_local" => false,
        "rcpt_to" => "recip2@example.org",
        "smtp_code" => "250",
        "smtp_response" => "250 2.0.0 from MTA(smtp:[::1]:10013): 250 2.0.0 Ok: queued as 3gNFyR4Mfjzc3",
        "spam_score" => -2.0
      },
      { "action" => "PASS",
        "ccat_main" => "Clean",
        "mail_id_related" => "men7HTERZaOF",
        "penpals_age" => 1114599,
        "queued_as" => "3gNFyR4n6Lzc4",
        "rcpt_is_local" => true,
        "rcpt_to" => "recip1@example.net",
        "smtp_code" => "250",
        "smtp_response" => "250 2.0.0 from MTA(smtp:[::1]:10013): 250 2.0.0 Ok: queued as 3gNFyR4n6Lzc4",
        "spam_score" => -5.272
      }
    ],
    "smtp_code"  => ["250"],
  }

I'd like to end up with two different events, like these:

  {
    "@timestamp" => "2014-05-06T09:29:47.048Z",
    "time_unix" => 1399368587.048,
    "time_iso_week_date" => "2014-W19-2",
    "partition" => "19",
    "type" => "amavis",
    "host" => "mailer.example.net",
    "queued_as" => ["3gNFyR4Mfjzc3", "3gNFyR4n6Lzc4"],
    "action" => "PASS",
    "ccat_main" => "Clean",
    "queued_as" => "3gNFyR4Mfjzc3",
    "rcpt_is_local" => false,
    "rcpt_to" => "recip2@example.org",
    "smtp_code" => "250",
    "smtp_response" => "250 2.0.0 from MTA(smtp:[::1]:10013): 250 2.0.0 Ok: queued as 3gNFyR4Mfjzc3",
    "spam_score" => -2.0
    "smtp_code"  => ["250"],
  }

and

  {
    "@timestamp" => "2014-05-06T09:29:47.048Z",
    "time_unix" => 1399368587.048,
    "time_iso_week_date" => "2014-W19-2",
    "partition" => "19",
    "type" => "amavis",
    "host" => "mailer.example.net",
    "queued_as" => ["3gNFyR4Mfjzc3", "3gNFyR4n6Lzc4"],
    "recipients" => [
    "action" => "PASS",
    "ccat_main" => "Clean",
    "mail_id_related" => "men7HTERZaOF",
    "penpals_age" => 1114599,
    "queued_as" => "3gNFyR4n6Lzc4",
    "rcpt_is_local" => true,
    "rcpt_to" => "recip1@example.net",
    "smtp_code" => "250",
    "smtp_response" => "250 2.0.0 from MTA(smtp:[::1]:10013): 250 2.0.0 Ok: queued as 3gNFyR4n6Lzc4",
    "spam_score" => -5.272
    "smtp_code"  => ["250"],
  }

How can I do this?

Thanks-

Tom

Related Topic