Peplink Device
// Description:
// Syslog (Event/URL/Session logs) from Peplink Routers
// Data input format: ({ obj, size, source }) or ( doc )
function main({obj, size, source}) {
let selected = false
let msg = obj["@message"]
if (!msg){
return {"status":"abort"}
}
let tags = obj["@tags"]
if (!tags) {
return {"status":"abort"}
}
let f = {}
if (startsWith(msg, "Session Logging:")){
selected = true
f = parseSessionURLLogs(msg)
tags = append(tags, "Session")
generateFusionEvent(obj, f)
}
if (startsWith(msg, "URL Logging:")){
selected = true
f = parseSessionURLLogs(msg)
tags = append(tags, "URL")
obj["@metaflow"] = generateURLFusionEvent(f, obj["@timestamp"])
}
if (startsWith(msg, "Server:")){
selected = true
f = parseDHCPLogs(msg)
obj["@metaflow"] = generateDHCPFusionEvent(f, obj["@timestamp"])
}
if (tags.Some( (_, tag) => content(tag, "Firewall" ))) {
selected = true
f = parseFirewallLogs(msg)
}
if (!selected) {
return {"status":"abort"}
}
obj["@type"] = "event"
obj["@parser"] = "fpl-PeplinkRouterSyslog"
obj["@parserVersion"] = "20240226-1"
obj["@event_type"]="peplink"
obj["@eventType"]="Peplink"
obj["@peplink"] = f
obj["@tags"] = tags
// device name
let deviceName = (obj["@source"] ? obj["@source"] : "unknown")
recordDeviceMetrics(obj, size, deviceName)
return {"status":"pass"}
}
function parseSessionURLLogs(msg) {
let start = indexOf(msg, "Logging:")
let m = subString(msg, start+9, len(msg))
// printf("|%s|",m)
if (startsWith(m,"Domain")){
let cf = parseContentFilter(m)
return cf
}
let map = decoder_MixedKeyValue(m)
let f = {}
for k, v = range map {
let key = toLower(k)
let value = v
if (key == "bytes_recvd" || key == "bytes_sent" || key == "spt" || key == "dpt" || key == "snatpt" || key == "packets_recvd" || key == "packets_sent") {
value = parseInt(v)
}
f[key] = value
}
return f
}
function parseFirewallLogs(msg) {
let start = indexOf(msg, " ")
let m = subString(msg, start+1, len(msg))
// printf("|%s|",m)
let map = decoder_MixedKeyValue(m)
let f = {}
for k, v = range map {
let key = toLower(k)
if (contains(key, " ")){
continue // skip keys w/ spaces
}
let value = v
if (key == "ttl" || key == "len" || key == "spt" || key == "dpt") {
value = parseInt(v)
}
f[key] = value
}
return f
}
function parseDHCPLogs(msg) {
let sp = split(msg, " ")
let f = {}
if (len(sp) > 8) {
f.server = sp[1]
f.client_ip = sp[3]
let m = subString(sp[5], 0, len(sp[5])-1)
f.client_mac = m
f.lease = parseInt(sp[8])
}
return f
}
function generateDHCPFusionEvent(f,ts) {
if (!(f.client_ip && f.client_mac && f.lease)) {
printf("invalid event record for dhcp flow: %v", f)
return
}
if (f.client_ip == "0.0.0.0" || f.client_ip == "255.255.255.255") {
printf("invalid event record for dhcp flow: %v", f)
return
}
let envelop={
partition: "default",
dataType: "event",
fusion: "peplink",
time_ms: ts
}
let source_req={
dhcp: {
type: "Request",
clientMac: f.client_mac
}
}
let source_ack={
dhcp: {
type: "Ack",
clientMac: f.client_mac,
clientIp: f.client_ip,
lease: f.lease
}
}
// printf("%v",source_req)
// printf("%v",source_ack)
Fluency_FusionEvent(envelop, {dhcp:source_req.dhcp, dtype:"infoblox-dhcp"})
Fluency_FusionEvent(envelop, {dhcp:source_ack.dhcp, dtype:"infoblox-dhcp"})
return source_ack
}
function generateURLFusionEvent(f,ts) {
if (!(f.src && f.dst && f.spt && f.dpt)) {
printf("invalid event record for flow: %v", f)
return
}
let envelop = {
partition: "default",
dataType: "event",
fusion: "peplink",
time_ms: ts
}
let source = {
flow: {
sip: f.src,
dip: f.dst,
sp: f.spt,
dp: f.dpt,
prot: 6,
dur: 0,
time_ms: ts
}
}
if (f.sslcert) {
let mObj = {
url: f.sslcert,
category: 'peplink_url'
}
if (f.srcmac) {
mObj.srcmac = f.srcmac
}
source.meta = mObj // remove array
}
if (f.url) {
let h = parseURL(f.url)
let mObj = {
url: h,
category: 'peplink_url'
}
source.meta = mObj // remove array
}
if (f.action == "deny") {
source.flags = "fwdeny" // remove array
}
// printf("%v",source)
// printf("%v",{flow:source.flow, meta:[source.meta]})
Fluency_FusionEvent(envelop, {flow:source.flow, meta:[source.meta], flags:[source.flags]})
return source
}
function parseURL(url) {
let h = ""
if (startsWith(url, "https://")) {
let u = subString(url, 8, len(url))
let e = indexOf(u, "/")
if (e > 0) {
h = subString(u, 0, e)
}
} elseif (startsWith(url, "http://")) {
let u = subString(url, 7, len(url))
let e = indexOf(u, "/")
if (e > 0) {
h = subString(u, 0, e)
}
} elseif (indexOf(url, "/") > 1) {
let e = indexOf(url, "/")
if (e > 0) {
h = subString(url, 0, e)
}
} elseif (indexOf(url, ".") > 1) {
h = url
}
return h
}
function parseContentFilter(msg) {
let inside = false
let items = []
let buff = ""
for let i = 0; i < len(msg); i++ {
let c = subString(msg, i, i+1)
if (c == "<"){
inside = true
} elseif (c == ">") {
inside = false
items = append(items, buff)
buff = ""
} else {
if(inside) {
buff = buff + c
}
}
}
if (len(items) != 3){
printf("invalid event record for content filter: %s", msg)
return {}
}
let f = {}
f.domain = items[0]
f.src = items[1]
f.dst = "127.0.0.1"
f.spt = "0"
f.dpt = "80"
f.category = items[2]
if (contains(msg, "blocked")){
f.action ="deny"
}
return f
}
function generateFusionEvent(obj, f) {
if (!(f.src && f.dst && f.spt && f.dpt && f.proto)) {
printf("invalid event record for flow: %v", f)
return
}
let ts = obj["@timestamp"]
let rxB = f.bytes_recvd //parseInt(f["bytes_recvd"])
let txB = f.bytes_sent //parseInt(f["bytes_sent"])
let rxP = f.packets_recvd //parseInt(f["packets_recvd"])
let txP = f.packets_sent //parseInt(f["packets_sent"])
if(f.session_state == "ESTABLISHED"){
rxB = 0 // ignore bandwidth stats for established flows
txB = 0
rxP = 0
txP = 0
}
let totalB = txB + rxB
let envelop={
partition: "default",
dataType: "event",
fusion: "peplink",
time_ms: ts
}
let source={
flow: {
sip: f.src,
dip: f.dst,
sp: f.spt,
dp: f.dpt,
prot: parseProto(f.proto),
dur: 0,
rxB: rxB,
txB: txB,
totalB: totalB,
rxP: rxP,
txP: txP,
time_ms: ts
}
}
// printf("%v",source)
obj["@metaflow"] = source
Fluency_FusionEvent(envelop, {flow:source.flow})
}
function parseProto(proto) {
if (startsWith(proto, "tcp")) {
return 6
} elseif (startsWith(proto, "udp")) {
return 17
} elseif (startsWith(proto, "icmp")) {
return 1
} elseif (startsWith(proto, "igmp")) {
return 2
} elseif (startsWith(proto, "icmpv6") || startsWith(proto, "icmp6")) {
return 6
}
return 0
}
function recordDeviceMetrics(obj, size, deviceName) {
let sender = obj["@sender"]
let source = obj["@source"]
let deviceEntry = Fluency_Device_LookupName(deviceName)
if (!deviceEntry) {
deviceEntry = {
name:deviceName,
ips: [sender],
group:"FPL-detect: Peplink Router",
device: {
name:"Peplink Router",
category:"Network"
}
}
Fluency_Device_Add(deviceEntry)
}
let dimensions = {
namespace:"fluency",
app:"import",
eventType:"Peplink",
syslogSender:sender,
// syslogDevice:deviceEntry.name,
customer: "default",
importSource: deviceEntry.name,
deviceType: deviceEntry.device.name
}
if deviceEntry.group {
dimensions.group = deviceEntry.group
}
Platform_Metric_Counter("fluency_import_count", dimensions,1)
Platform_Metric_Counter("fluency_import_bytes", dimensions,size)
}
Updated 9 months ago