EventIngress

/** * @file Event_Ingress_Report * @reportoverview Shows the overview of the total event ingress over the time range specified by the user by default, * it is a monthly report. It gets all events from LavaDB as a total events table and tables grouped by source, sender, * and event type. These tables can be used to create visualizations. */ /** * Main method. This method calls eventSizeBySource, eventSizeBySender, and eventSizeByEventType to get the total * event size and count grouped by source, sender, and event type from the time range. The tables obtained are returned * as an object. * * @param {string || int} from - The start of the time range. Default is the past day * @param {string || int} to - The end of the time range. Default is the past minute * * @returns {object} - Returns an object containing all the tables/metric/alert obtained from the queries */ function main({from="-1d@h", to="@h"}) { let rangeFrom = new Time(from) let rangeTo = new Time(to) validateTimeRange(rangeFrom, rangeTo) setEnv("from", from) setEnv("to", to) let diffsec = (rangeTo.Unix() - rangeFrom.Unix()) let diff = diffsec/3600.0 // range in hours // initailze the tables to hold the data as an empty table let statsBySource = new Table() let statsBySender = new Table() let statsByEventType = new Table() let totalStatsTimeTable = new Table() let interval = "7d" let hist_interval = "1h" if (diff > 48) { hist_interval = "1d" } // break the time range into intervals of 1 day and append the data to the tables for (let t = rangeFrom; t.Before(rangeTo); t = t.Add(interval)) { let from = t let to = t.Add(interval).After(rangeTo) ? rangeTo : t.Add(interval) totalStatsTimeTable.Append(eventsTimeTable(from, to, hist_interval)) statsBySource.Append(eventSizeBySource(from, to)) statsBySender.Append(eventSizeBySender(from, to)) statsByEventType.Append(eventSizeByEventType(from, to)) } // aggregate the data to get the total size and count grouped by the field statsBySource = getTotal(statsBySource, "source") statsBySender = getTotal(statsBySender, "sender") statsByEventType = getTotal(statsByEventType, "eventtype") let statsByProductName = lookupProductName(statsByEventType, "eventtype") statsByProductName = getTotal(statsByProductName, "ProductName") // sort the overall total ingress size and total number of events by sender then calculate the EPS and EPH let totalStatsBySender = statsBySender.GroupBy(({totalSize, eventCount})=>{ return { columns: { sum: {totalIngressSize: totalSize}, sum: {totalEventCount: eventCount} } } }) totalStatsBySender.NewColumnLambda("totalEPS", "", (row) => row.totalEventCount / 2592000) totalStatsBySender.NewColumnLambda("totalEPH", "", (row) => row.totalEventCount / 720) // use the event type table to get the overall total ingress size and total number of events let totalStats = statsByEventType.Aggregate(({totalSize})=>{ return { columns: { sum: {totalIngressSize: totalSize}, count: {totalEventTypes: true} } } }) // sort the tables by total size to get the top 10 sources, senders, and event types let topSizeBySource = statsBySource.Clone().Sort(10, "totalSize") let topSizeBySender = statsBySender.Clone().Sort(10, "totalSize") let topSizeByEventType = statsByEventType.Clone().Sort(10, "totalSize") let topSizeByProductName = statsByProductName.Clone().Sort(10, "totalSize") totalStatsTimeTable = totalStatsTimeTable.Clone().Sort(0, "+Date") // sort by Date, for histogram return { statsByEventType, statsByProductName, totalStats, totalStatsTimeTable, totalStatsBySender, topSizeBySource, topSizeBySender, topSizeByEventType, topSizeByProductName } } /** * Thie method is a helper method to validate the time range passed by the user. * * @param {Time} from - The start of the time range * @param {Time} to - The end of the time range * * @returns {boolean} - Returns true if the time range is valid */ function validateTimeRange(from, to) { // checks to see if the start of the time range is after the end of the time range if (from.After(to)) { throw new Error("rangeFrom must be less than rangeTo", "RangeError") } // checks to see if the time range is more than 2 months if (to.After(from.Add("60d"))) { throw new Error("total duration must not exceed 2 months", "RangeError") } return true } /** * Thie method uses the environment variables to build a fpl query to get the event size and count grouped by source * * @param {string} from - The start of the time range * @param {string} to - The end of the time range * * @returns {Table} table - Returns a table containing the total event size and count grouped by source */ function eventSizeBySource(from, to) { let env = {from, to} let fplTemplate = ` search {from="{{.from}}", to="{{.to}}"} let source=f("@source"), size=f("__size__") aggregate totalSize=sum(size), eventCount=count() by source ` let table = fluencyLavadbFpl(template(fplTemplate, env)) return table } /** * Thie method uses the environment variables to build a fpl query to get the event size and count grouped by sender * * @param {string} from - The start of the time range * @param {string} to - The end of the time range * * @returns {Table} table - Returns a table containing the total event size and count grouped by sender */ function eventSizeBySender(from, to) { let env = {from, to} let fplTemplate = ` search {from="{{.from}}", to="{{.to}}"} let sender=f("@sender"), size=f("__size__") aggregate totalSize=sum(size), eventCount=count() by sender ` let table = fluencyLavadbFpl(template(fplTemplate, env)) return table } /** * Thie method uses the environment variables to build a fpl query to get the event size and count grouped by event type * * @param {string} from - The start of the time range * @param {string} to - The end of the time range * * @returns {Table} table - Returns a table containing the total event size and count grouped by event type */ function eventSizeByEventType(from, to) { let env = {from, to} let fplTemplate = ` search {from="{{.from}}", to="{{.to}}"} let eventType=f("@eventType"), event_type=f("@event_type"), size=f("__size__") let et = coalesce(eventType, event_type) let eventtype = condition(et != "", et, "N/A") aggregate totalSize=sum(size), eventCount=count() by eventtype ` let table = fluencyLavadbFpl(template(fplTemplate, env)) return table } /** * Thie method uses the environment variables to build a fpl query to get the event size and count grouped by timestamp * * @param {string} from - The start of the time range * @param {string} to - The end of the time range * * @returns {Table} table - Returns a table containing the total event size and count grouped by source */ function eventsTimeTable(from, to, interval) { let env = {from, to, interval} let fplTemplate = ` search {from="{{.from}}", to="{{.to}}"} let timestamp=f("@timestamp"), size=f("__size__") let Date=strftime("%D %H:%M",timebucket("{{.interval}}", timestamp)) aggregate totalSize=sum(size), eventCount=count() by Date ` let table = fluencyLavadbFpl(template(fplTemplate, env)) return table } /** * This helper function groups the table by the specified field and gets the total size. * * @param {Table} table - The table to be aggregated * @param {string} field - The field to be grouped by * * @returns {Table} - Returns an aggregated table grouped by the specified field with the total size and count */ function getTotal(table, field) { return table.Aggregate((obj) => { let fieldValue = obj[field] let totalSize = obj.totalSize let eventCount = obj.eventCount return { groupBy: {[field]: fieldValue}, columns: { sum: {totalSize: totalSize}, sum: {eventCount: eventCount} } } }) } function lookupProductName(t1, field) { t1.NewColumnLambda("ProductName", "", ((row) => { let et = row[field] if (startsWith(et, "@")){ et = trimPrefix(et, "@") } if (et == "nxlogDHCP"){ return "WindowsSrv_DHCP (nxlog)" } if (startsWith(et, "nxlog")){ return "WindowsSrv (NXLog)" } if (et == "Fortigate" || et == "fortigate"){ return "FortiGateNGFW" } if (et == "SonicWall" || et == "sonicwall"){ return "SonicWallNGFW" } if (et == "Meraki"){ return "CiscoMeraki" } if (et == "PaloAlto"){ return "PaloAltoFW" } if (et == "peplink"){ return "Peplink" } if (et == "netscaler"){ return "CitrixNetScaler" } if (et == "canary"){ return "ThinkstCanary" } if (et == "Sophos"){ return "SophosFW" } if (et == "cylance"){ return "CylanceSyslog" } if (et == "Linux" || et == "sshd" || et == "linux-sshd" || et == "audispd"){ return "LinuxSyslog" } return et })) return t1 }

Did this page help you?