Monday, February 4, 2013

Cisco VOIP devices' syslog into Logstash

This how to explains how to retrieve and parse logs from video and VOIP equipment from Cisco.

In particular:
  • Cisco Telepresence Video Communications Server (VCS), formerly Tandberg Video Communications Server (VCS) version X7.2.2
  • Cisco TelePresence Server MSE 8710 version 3.0(2.24)
  • Cisco Unified Communications Manager (CUCM), formerly Cisco Unified CallManager version 9.1.1
  • Cisco Unity Connection version 8.6.1
 I'm utilizing Logstash 1.1.9 installed on Ubuntu 12.04 which is set up to read all config files in /etc/logstash, and thus I've split up my config slightly.

VCS and TelePresence Server


Logstash parsing is straight forward, as they're utilizing legacy BSD format by default.

To enable remote syslog:
  • VCS:  Maintenance -> Logging
  • MSE: Log -> Syslog

Then on the Logstash server, I'm using the following config to parse it:

# /etc/logstash/syslog.conf
input {
  # Default Syslog server port require root permissions due to port < 1024
  tcp {
    port => 514
    type => syslog_relay
    tags => [ "syslog" ]
  }
  udp {
    port => 514
    type => syslog_relay
    tags => [ "syslog" ]
  }
}

filter {
  # strip the syslog PRI part and create facility and severity fields.
  # the original syslog message is saved in field %{syslog_raw_message}.
  # the extracted PRI is available in the %{syslog_pri} field.
  #
  # You get %{syslog_facility_code} and %{syslog_severity_code} fields.
  # You also get %{syslog_facility} and %{syslog_severity} fields if the
  # use_labels option is set True (the default) on syslog_pri filter.
  grok {
    type => "syslog_relay"
    pattern => [ "^<[1-9]\d{0,2}>%{SPACE}%{GREEDYDATA:message_remainder}" ]
    tags => [ "syslog" ]
    add_tag => "got_syslog_pri"
    add_field => [ "syslog_raw_message", "%{@message}" ]
  }
  syslog_pri {
    type => "syslog_relay"
    tags => [ "got_syslog_pri" ]
  }
  mutate {
    type => "syslog_relay"
    tags => [ "got_syslog_pri" ]
    replace => [ "@message", "%{message_remainder}" ]
    remove => [ "message_remainder" ]
    remove_tag => "got_syslog_pri"
  }

  # strip the syslog timestamp and force event timestamp to be the same.
  # the original string is saved in field %{syslog_timestamp}.
  # the original logstash input timestamp is saved in field %{received_at}.
  grok {
    type => "syslog_relay"
    pattern => [ "^%{SYSLOGTIMESTAMP:syslog_timestamp}%{SPACE}%{GREEDYDATA:message_remainder}" ]
    tags => [ "syslog" ]
    add_tag => "got_syslog_timestamp"
    add_field => [ "received_at", "%{@timestamp}" ]
  }

  mutate {
    type => "syslog_relay"
    tags => [ "got_syslog_timestamp" ]
    replace => [ "@message", "%{message_remainder}" ]
    remove => [ "message_remainder" ]
    remove_tag => "got_syslog_timestamp"
  }
  date {
    type => "syslog_relay"
    tags => [ "got_syslog_timestamp" ]
    # season to taste for your own syslog format(s)
    syslog_timestamp => [ "MMM  d HH:mm:ss", "MMM dd HH:mm:ss", "ISO8601" ]
  }

  # strip the host field from the syslog line.
  # the extracted host field becomes the logstash %{@source_host} metadata
  # and is also available in the filed %{syslog_hostname}.
  # the original logstash source_host is saved in field %{logstash_source}.
  grok {
    type => "syslog_relay"
    pattern => [ "^%{SYSLOGHOST:syslog_hostname}%{SPACE}%{GREEDYDATA:message_remainder}" ]
    tags => [ "syslog" ]
    add_tag => "got_syslog_host"
    add_field => [ "logstash_source", "%{@source_host}" ]
  }
  mutate {
    type => "syslog_relay"
    tags => [ "got_syslog_host" ]
    replace => [ "@source_host", "%{syslog_hostname}" ]
    replace => [ "@message", "%{message_remainder}" ]
    remove => [ "message_remainder" ]
    remove_tag => "got_syslog_host"
  }

  # Extract the APP-NAME and PROCID if present
  grok {
    type => "syslog_relay"
    pattern => [ "%{SYSLOGPROG:syslog_prog}:%{SPACE}%{GREEDYDATA:message_remainder}" ]
    tags => [ "syslog" ]
    add_tag => "got_syslog_prog"
  }
  mutate {
    type => "syslog_relay"
    tags => [ "got_syslog_prog" ]
    replace => [ "@message", "%{message_remainder}" ]
    remove => [ "message_remainder" ]
    remove_tag => "got_syslog_prog"
  }

  # Filter to replace source_host IP with PTR (reverse dns) record
  dns {
    type => 'syslog_relay'
    reverse => [ "@source_host", "@source_host" ]
    action => "replace"
  }

  # Remove redundant fields
  mutate {
    type => "syslog_relay"
    tags => [ "syslog" ]
    remove => [ "syslog_hostname", "syslog_timestamp" ]
  }
}

output {
  # If your elasticsearch server is discoverable with multicast, use this:
  elasticsearch { }

  # If you can't discover using multicast, set the address explicitly
  #elasticsearch {
  #  host => "myelasticsearchserver"
  #}
}

Configuration does the following:
  • Sets up syslog server to listen to default port 514, which tags events with "syslog"
  • Ignore logs without syslog tag
  • Parse metadata from message, and puts it in separate fields (such as priority)
  • Replace @message with message remainder without metadata
  • Do reverse dns lookup of source ip
  • Sends parsed log event to Elasticsearch

CUCM and Unity

 For the legacy Cisco devices, the log messages are in what appears to be a custom format, and thus require slightly more parsing. Also, it only seem to support default syslog port 514 which requires Logstash to utilize root privileges in order to bind to ports below 1024. In addition, any legacy BSD parsing would fail on these logs, so we have to ensure that only legacy Cisco log parsing takes place. The tricky part here is when you have multiple events sent to the same input (type), and thus you must be careful to ensure that the right filters are applied to the right log messages.
 This can be solved this using tags and grep in combination with grok. Logfiles are read lexicographically, so you'd have to ensure the syslog tag is remove prior to the "main" syslog filter kicks in. This is solved using an appropriate filename. Resulting configuration below:

# /etc/logstash/10-cisco_legacy.conf
filter {
  # Filter for:
  #  - Cisco Unified Communications Manager (CUCM)
  #  - Cisco Unity Connection
  #
  # Legacy Cisco Unified Communications Solutions use a slightly different
  # syslog syntax and order than BSD, which require the use of a tailored
  # grok filter to parse it.
  #
  # Examples:
  #
  # CUCM: MSGID: : PROGRAM: MESSAGE
  # <187>578739: : ccm: 439053: cucm.cisco.com: Jan 25 2013 14:31:58.235 UTC :  %UC_CALLMANAGER-3-DbInfoError: %[DeviceName=][AppID=Cisco CallManager][ClusterID=CUCM1][NodeID=cucm]: Configuration information may be out of sync for the device and Unified CM database
  # <30>578740: : snmpd[20884]: Connection from UDP: [127.0.0.1]:56722
  # <85>578741: : sudo: database : TTY=console ; PWD=/ ; USER=root ; COMMAND=/usr/sbin/logrotate /usr/local/cm/bin/ccmlogRotate
  # <86>578642: : crond[29895]: pam_unix(crond:session): session closed for user root
  # <85>578643: : sudo: database : TTY=console ; PWD=/ ; USER=root ; COMMAND=/usr/sbin/logrotate /usr/local/cm/bin/ccmlogRotate
  #
  # Unity:
  # <85>77071: : sudo: database : TTY=console ; PWD=/ ; USER=root ; COMMAND=/usr/sbin/logrotate /usr/local/cm/bin/ccmlogRotate
  # <85>77072: : sudo:  servmgr : TTY=console ; PWD=/ ; USER=ccmservice ; COMMAND=/usr/local/cm/bin/soapservicecontrol.sh perfmonservice PerfmonPort status 8443
  # <85>77073: : sudo: cusysagent : TTY=unknown ; PWD=/ ; USER=root ; COMMAND=/opt/cisco/connection/lib/config-modules/dbscripts/mailstore/refreshmbxdbsizes.sh
  # <78>77074: : crond[2510]: (root) CMD (  /etc/rc.d/init.d/fiostats show)
  # <86>77075: : crond[2507]: pam_unix(crond:session): session opened for user root by (uid=0)

  # First use grep to tag messages as Cisco legacy format
  grep {
    match => [ "@message", "^<[1-9]\d{0,2}>\d*: : (?:[\w._/%-]+)(?:\[\b(?:[1-9][0-9]*)\b\])?:" ]
    add_tag => "CISCO_LEGACY"
    drop => false

    # Halt default syslog parsing (make sure other syslog filters require this tag)
    remove_tag => [ "syslog" ]
  }

  # Parse header
  grok {
    type => "syslog_relay"
    tags => [ "CISCO_LEGACY" ]
    pattern => [ "^<(?:\b(?:[1-9]\d{0,2})\b)>%{POSINT:msgid}:%{SPACE}:%{SPACE}%{SYSLOGPROG}:%{SPACE}%{GREEDYDATA:message_remainder}" ]
    add_tag => "got_legacy_header"
    add_field => [ "syslog_raw_message", "%{@message}" ]
  }
  syslog_pri {
    type => "syslog_relay"
    tags => [ "got_legacy_header" ]
  }
  mutate {
    type => "syslog_relay"
    tags => [ "got_legacy_header" ]
    replace => [ "@message", "%{message_remainder}" ]
    remove => [ "message_remainder" ]
    remove_tag => "got_legacy_header"
  }
  # Look for additional fields if present
  grep {
    match => [ "@message", "\b(?[0-9A-Za-z][0-9A-Za-z-]{0,62})(?:\.(?:[0-9A-Za-z][0-9A-Za-z-]{0,62}))*(\.?|\b):\s*(?\b\w\w\w \d\d \d\d\d\d \d\d:\d\d:\d\d\.\d+\b)\s*UTC" ]
    drop => false
    add_tag => "extra_legacy_info"
  }
  grok {
    type => "syslog_relay"
    tags => [ "extra_legacy_info" ]
    break_on_match => true
    pattern => [ "%{SYSLOGHOST:syslog_hostname}:%{SPACE}(?\b\w\w\w \d\d \d\d\d\d \d\d:\d\d:\d\d\.\d+\b)%{SPACE}UTC%{SPACE}:%{SPACE}%{GREEDYDATA:message_remainder}" ]
  }
  mutate {
    type => "syslog_relay"
    tags => [ "extra_legacy_info", "CISCO_LEGACY" ]
    replace => [ "@source_host", "%{syslog_hostname}" ]
    replace => [ "@message", "%{message_remainder}" ]
    replace => [ "syslog_timestamp", "%{syslog_timestamp}+00:00"]
    remove => [ "message_remainder" ]
  }
  date {
    type => "syslog_relay"
    tags => [ "extra_legacy_info", "CISCO_LEGACY" ]
    match => [ "syslog_timestamp", "MMM dd yyyy HH:mm:ss.SZZ", "MMM dd yyyy HH:mm:ss.SSZZ", "MMM dd yyyy HH:mm:ss.SSSZZ" ]
    add_field => [ "received_at", "%{@timestamp}" ]
    remove_tag => [ "extra_legacy_info" ]
  }

  # Remove redundant fields
  mutate {
    type => "syslog_relay"
    tags => [ "CISCO_LEGACY" ]
    remove => [ "syslog_hostname" ]
    remove => [ "syslog_timestamp" ]
  }
}

This configuration ensures:
  • Any log message determined to be in legacy Cisco format, loose the syslog tag (must happen before the "default" syslog filter kicks in)
  • Messages are parsed using a tailored filter
  • Allows for multiple inputs to the default syslog port 514 with different log formats

3 comments:

  1. I Bed your articles guys have it up.
    look at this

    ReplyDelete
  2. You don't happen to have a working version of this config for Logstash 1.5.0 as the grep plugin has been deprecated?

    ReplyDelete