Graylog stream getting events, but is empty

elkgrayloglogging

I've started sending Palo Alto logs to Graylog, and a stream rule picks them out by matching "Palo Alto" in a "tags" field (which is how all my stream rules are; a front-end Logstash instance does the tagging before shipping to Graylog).

I know the Graylog nodes are receiving these events on the network interfaces:
tcpdump for events

And the stream shows that it is getting events (note the "22 messages/second"):

Notice it is currently getting 22 messages per second

Yet when I click into the Stream (or search –> tags:"Palo Alto") there are no events to be found.

Empty stream

The only common issue I've seen online is to with timezone settings putting these events into the future, but the time on our Palo Alto Panorama sender is correct (PST) and trying an absolute time search a day into the future reveals nothing.

Version info:

Graylog 2.2.2+691b4b7, codename Stiegl

Elasticsearch 2.4.4

Lucene 5.5.2

I've also got this question unanswered about the search feature not working properly to find events that actually are arriving okay. I doubt it has any relation but for sake of completeness I'll include that here.

Best Answer

In the Graylog server nodes' /var/log/graylog-server/server.log log file, I noticed lots of errors like:

[54]: index [graylog_2], type [message], id [edb8ec50-1320-11e7-92de-005056b541f6], message [MapperParsingException[failed to parse [ReceiveTime]]; nested: IllegalArgumentException[Invalid format: "2017/03/27 12:09:40" is malformed at "/03/27 12:09:40"];]

So the problem is that these messages were coming into Graylog fine, but unable to be indexed by Elasticsearch. I ended up dropping and mutating the problem fields until Graylog liked them okay.

if "Palo Alto" in [tags] {
    grok {
        match => ["message", "<\d*>(?<patimestamp>\w* \d* \d*:\d*:\d*) (?<PanoramaHost>[^ ]*) (?<FutureUse0>[^,]*),(?<ReceiveTime>[^,]*),(?<SerialNumber>[^,]*),(?<PAType>[^,]*),%{GREEDYDATA:pamessage}"]
    }
    if [PAType] == "SYSTEM" {
        csv {source => "[pamessage]" columns => ["Subtype","FutureUse1","GeneratedTime","vsys","paEventID","Object","FutureUse2","FutureUse3","Module","Severity","Description","SeqNum","ActionFlags"]}
        mutate {remove_field => ["ReceiveTime"] remove_field => ["GeneratedTime"] gsub => ["message", "/", "_"]}
    } else if [PAType] == "TRAFFIC" {
        csv {source => "[pamessage]" columns => ["Threat-ContentType","ConfigVersion","GenerateTime","SrcAddress","DstAddress","NATSrcIP","NATDstIP","Rule","SrcUser","DstUser","App","VSys","SrcZone","DstZone","InboundInterface","OutboundInterface","LogAction","TimeLogged","SessionID","RepeatCount","SrcPort","DstPort","NATSrcPort","NATDstPort","Flags","Protocol","Action","Bytes","BytesSent","BytesReceived","Packets","StartTime","ElapsedTimeInSec","Category","Padding","SeqNum","ActionFlags","SrcCountry","DstCountry","cpadding","pkts_sent","pkts_received"]}
                    mutate {remove_field => ["ReceiveTime"] remove_field => ["GeneratedTime"] gsub => ["message", "/", "_"]}
    } else if [PAType] == "THREAT" {
        csv {source => "[pamessage]" columns => ["Subtype","FutureUse1","GeneratedTime","SrcIP","DstIP","NATSrcIP","NATDstIP","Rule","SrcUser","DstUser","App","vsys","SrcZone","DstZone","IngressInterface","EgressInterface","LogFwdProfile","FutureUse2","SessionID","RepeatCount","SrcPort","DstPort","NATSrcPort","NATDstPort","Flags","Protocol","Action","Misc","ThreatID","Category","Severity","Direction","SeqNum","ActionFlags","SrcLocation","DstLocation","FutureUse3","ContentType","pcapID","Filedigest","Cloud","FutureUse4","UserAgent","FileType","XForwardedFor","Referer","Sender","Subject","Recipient","ReportID"]}
                    mutate {remove_field => ["ReceiveTime"] remove_field => ["GeneratedTime"] gsub => ["message", "/", "_"]}
    } else if [PAType] == "CONFIG" {
        csv {source => "[pamessage]" columns => ["Subtype","FutureUse1","GeneratedTime","Host","vsys","Command","Admin","Client","Result","ConfigPath","SeqNum","ActionFlags","BeforeChangeDetail","AfterChangeDetail"]}
                    mutate {remove_field => ["ReceiveTime"] remove_field => ["GeneratedTime"] gsub => ["message", "/", "_"]}
    } else if [PAType] == "HIP-MATCH" {
        csv {source => "[pamessage]" columns => ["Subtype","FutureUse1","GeneratedTime","SrcUser","vsys","MachineName","OS","SrcAddress","HIPType","FutureUse2","FutureUse3","SeqNum","ActionFlags"]}
                    mutate {remove_field => ["ReceiveTime"] remove_field => ["GeneratedTime"] gsub => ["message", "/", "_"]}
    } else {
        mutate {add_tag => "Uncategorized"}
    }
}
Related Topic