# intake.conf input { pipeline { address => "mtqa_mobility_logs" } } filter { if "mtqa_ocs" in [tags] { clone { clones => ["notification-ocs"] add_tag => [ "notification-ocs" ] } grok { match => { "message" => "%{TIMESTAMP_ISO8601:timestamp}%{SPACE}%{LOGLEVEL:log-level}%{SPACE}%{DATA:issuer}%{SPACE}\(%{DATA:pool}\)%{SPACE}%{GREEDYDATA:log-message}" } } if "notification-ocs" in [tags] { # ruby { # code => 'puts "Input rule matched: contains ocs-notification-v1"' # } if [message] =~ /\[OCS-NOTIFICATION\]/ { # Keep only lines containing "notification-v1" if [message] =~ /mtqa_machinestalk|qa_v2_ip3labs|qa_qa_tenant/ { # simfony mutate { add_tag => ["notification_mtqa"] } } } else { drop {} # Drop all other lines } } } else if "mtqa_aaa" in [tags] { clone { clones => ["notification-aaa"] add_tag => [ "notification-aaa" ] } grok { patterns_dir => ["/etc/logstash/patterns"] match => { "message" => [ "%{FREERADIUS_DATE:timestamp}%{SPACE}:%{SPACE}%{FREERADIUS_LOGTYPE:log-level}:%{SPACE}%{FREERADIUS_LOGTYPE:log-plugin}:%{SPACE}%{GREEDYDATA:log-message}", "%{FREERADIUS_DATE:timestamp}%{SPACE}:%{SPACE}%{FREERADIUS_LOGTYPE:log-level}:%{SPACE}%{GREEDYDATA:log-message}" ] } } if "notification-aaa" in [tags] { if [message] =~ /notification-v1/ { # Keep only lines containing "notification-v1" if [message] =~ /mtqa_machinestalk|qa_v2_ip3labs|qa_qa_tenant/ { # simfony mutate { add_tag => ["notification_mtqa"] } } } else { drop {} # Drop all other lines } } } } output { if "notification_mtqa" in [tags] { kafka { bootstrap_servers => "172.20.64.140:9092" topic_id => "notification_mtqa" codec => json } } }