Skip to content

Add CEF processor to Ingest node#122491

Merged
bhapas merged 170 commits intoelastic:mainfrom
bhapas:cef_processor
Dec 16, 2025
Merged

Add CEF processor to Ingest node#122491
bhapas merged 170 commits intoelastic:mainfrom
bhapas:cef_processor

Conversation

@bhapas
Copy link
Contributor

@bhapas bhapas commented Feb 13, 2025

Closes - #126201

This PR creates a new CEF ingest node processor. The CEF processor converts a Common Event Format logs into a JSON structure. This processor also maps relevant CEF fields to ECS mappings without a need for additional processors in Ingest pipeline

Encoding rules from the spec

Ensure the following when encoding symbols in CEF:

  • The entire message must be UTF-8 encoded.
  • Spaces used in the header are valid. Do not encode a space character by using
    <space>.
  • If a pipe (|) is used in the header, it must be escaped with a backslash (). But note
    that the pipes in the extension do not need escaping. For example:
    Sep 19 08:26:10 host CEF:0|security|threatmanager|1.0|100|detected a
    | in message|10|src=10.0.0.1 act=blocked a | dst=1.1.1.1
  • If a backslash () is used in the header or the extension, it must be escaped with
    another backslash (). For example:
    Sep 19 08:26:10 host CEF:0|security|threatmanager|1.0|100|detected a
    \ in packet|10|src=10.0.0.1 act=blocked a \ dst=1.1.1.1
  • If an equal sign (=) is used in the extensions, it has to be escaped with a backslash ().
    Equal signs in the header need no escaping. For example:
    Sep 19 08:26:10 host CEF:0|security|threatmanager|1.0|100|detected a =
    in message|10|src=10.0.0.1 act=blocked a = dst=1.1.1.1
  • Multi-line fields can be sent by CEF by encoding the newline character as \n or \r.
    Note that multiple lines are only allowed in the value part of the extensions. For
    example:
    Sep 19 08:26:10 host CEF:0|security|threatmanager|1.0|100|Detected a
    threat. No action needed.|10|src=10.0.0.1 msg=Detected a threat.\n No
    action needed

Example

An example CEF parsing would look like

CEF LOG
CEF:0|security|threatmanager|1.0|100|trojan successfully stopped|10|agt=192.168.0.1 agentDnsDomain=example.com ahost=agentHost aid=agentId amac=00:0a:95:9d:68:16 agentNtDomain=example.org art=1622547800000 atz=UTC agentTranslatedAddress=10.0.0.1 agentTranslatedZoneExternalID=ext123 agentTranslatedZoneURI=uri at=agentType av=1.0 agentZoneExternalID=zoneExtId agentZoneURI=zoneUri app=HTTP cnt=1234 in=5678 out=91011 customerExternalID=custExtId customerURI=custUri dst=192.168.0.2 dlat=37.7749 dlong=-122.4194 dhost=destHost dmac=00:0a:95:9d:68:16 dntdom=destNtDomain dpt=80 dpid=1234 dproc=destProc destinationServiceName=destService destinationTranslatedAddress=10.0.0.2 destinationTranslatedPort=8080 destinationTranslatedZoneExternalID=destExtId destinationTranslatedZoneURI=destUri duid=destUserId duser=destUser dpriv=admin destinationZoneExternalID=destZoneExtId destinationZoneURI=destZoneUri act=blocked dvc=192.168.0.3 cfp1Label=cfp1Label cfp3Label=cfp3Label cfp4Label=cfp4Label deviceCustomDate1=1622547800000 deviceCustomDate1Label=customDate1Label deviceCustomDate2=1622547900000 deviceCustomDate2Label=customDate2Label cfp1=1.23 cfp2=2.34 cfp2Label=cfp2Label cfp3=3.45 cfp4=4.56 c6a1=2001:db8::1 c6a1Label=c6a1Label c6a2=2001:db8::2 c6a2Label=c6a2Label c6a3=2001:db8::3 c6a3Label=c6a3Label c6a4=2001:db8::4 C6a4Label=c6a4Label cn1=123 cn1Label=cn1Label cn2=234 cn2Label=cn2Label cn3=345 cn3Label=cn3Label cs1=customString1 cs1Label=cs1Label cs2=customString2 cs2Label=cs2Label cs3=customString3 cs3Label=cs3Label cs4=customString4 cs4Label=cs4Label cs5=customString5 cs5Label=cs5Label cs6=customString6 cs6Label=cs6Label deviceDirection=inbound deviceDnsDomain=example.com cat=category deviceExternalId=extId deviceFacility=16 dvchost=host1 deviceInboundInterface=eth0 dvcmac=00:0a:95:9d:68:16 deviceNtDomain=example.org deviceOutboundInterface=eth1 devicePayloadId=payloadId dvcpid=5678 deviceProcessName=procName rt=1622547800000 dtz=UTC deviceTranslatedAddress=10.0.0.3 deviceTranslatedZoneExternalID=transExtId deviceTranslatedZoneURI=transUri deviceZoneExternalID=zoneExtId deviceZoneURI=zoneUri end=1622547900000 eventId=evt123 outcome=success externalId=extId fileCreateTime=1622547800000 fileHash=abcd1234 fileId=5678 fileModificationTime=1622547900000 fname=file.txt filePath=/path/to/file filePermission=rw-r--r-- fsize=1024 fileType=txt flexDate1=1622547800000 flexDate1Label=flexDate1Label flexString1=flexString1 flexString2=flexString2 flexString1Label=flexString1Label flexString2Label=flexString2Label msg=message oldFileCreateTime=1622547800000 oldFileHash=oldHash oldFileId=oldId oldFileModificationTime=1622547900000 oldFileName=oldFile oldFilePath=/old/path oldFilePermission=rw-r--r-- oldFileSize=2048 oldFileType=oldType rawEvent=rawEvent reason=reason requestClientApplication=Mozilla requestContext=referrer requestCookies=cookies requestMethod=GET request=url src=192.168.0.4 sourceDnsDomain=sourceDomain slat=37.7749 slong=-122.4194 shost=sourceHost smac=00:0a:95:9d:68:16 sntdom=sourceNtDomain spt=443 spid=1234 sproc=sourceProc sourceServiceName=sourceService sourceTranslatedAddress=10.0.0.4 sourceTranslatedPort=8081 sourceTranslatedZoneExternalID=sourceExtId sourceTranslatedZoneURI=sourceUri suid=sourceUserId suser=sourceUser spriv=sourcePriv sourceZoneExternalID=sourceZoneExtId sourceZoneURI=sourceZoneUri start=1622547800000 proto=TCP type=1 catdt=catDeviceType mrt=1622547800000
Parsed CEF content
{
  "process": {
    "name": "procName",
    "pid": 5678
  },
  "agent": {
    "ip": "192.168.0.1",
    "name": "example.com",
    "id": "agentId",
    "type": "agentType",
    "version": "1.0",
    "mac": "00:0a:95:9d:68:16"
  },
  "cef": {
    "severity": 10,
    "extensions": {
      "agentTranslatedZoneExternalID": "ext123",
      "flexDate1": "2021-06-01T11:43:20Z",
      "deviceCustomString3Label": "cs3Label",
      "oldFileSize": 2048,
      "sourceZoneURI": "sourceZoneUri",
      "deviceCustomIPv6Address4Label": "c6a4Label",
      "destinationTranslatedZoneURI": "destUri",
      "agentZoneURI": "zoneUri",
      "oldFileName": "oldFile",
      "deviceCustomDate2Label": "customDate2Label",
      "deviceNtDomain": "example.org",
      "deviceCustomFloatingPoint4Label": "cfp4Label",
      "sourceTranslatedZoneURI": "sourceUri",
      "deviceCustomIPv6Address1": "2001:db8::1",
      "deviceCustomDate1Label": "customDate1Label",
      "deviceCustomIPv6Address4": "2001:db8::4",
      "requestCookies": "cookies",
      "deviceCustomIPv6Address3": "2001:db8::3",
      "oldFilePermission": "rw-r--r--",
      "deviceCustomIPv6Address2": "2001:db8::2",
      "deviceCustomString2Label": "cs2Label",
      "deviceCustomFloatingPoint2Label": "cfp2Label",
      "deviceCustomDate2": "2021-06-01T11:45Z",
      "agentTranslatedZoneURI": "uri",
      "deviceCustomDate1": "2021-06-01T11:43:20Z",
      "deviceCustomIPv6Address2Label": "c6a2Label",
      "oldFileModificationTime": "2021-06-01T11:45Z",
      "deviceCustomFloatingPoint1": 1.23,
      "oldFileHash": "oldHash",
      "deviceCustomFloatingPoint2": 2.34,
      "deviceCustomFloatingPoint3": 3.45,
      "flexString1": "flexString1",
      "deviceCustomFloatingPoint4": 4.56,
      "oldFileId": "oldId",
      "deviceCustomNumber1": 123,
      "agentTranslatedAddress": "10.0.0.1",
      "deviceCustomNumber3": 345,
      "deviceCustomNumber2": 234,
      "flexString2": "flexString2",
      "baseEventCount": 1234,
      "deviceCustomIPv6Address1Label": "c6a1Label",
      "deviceTranslatedZoneExternalID": "transExtId",
      "deviceZoneExternalID": "zoneExtId",
      "agentTimeZone": "UTC",
      "deviceCustomString6Label": "cs6Label",
      "deviceCustomNumber2Label": "cn2Label",
      "deviceCustomString5Label": "cs5Label",
      "deviceCustomFloatingPoint1Label": "cfp1Label",
      "sourceZoneExternalID": "sourceZoneExtId",
      "deviceTranslatedZoneURI": "transUri",
      "destinationTranslatedZoneExternalID": "destExtId",
      "flexString1Label": "flexString1Label",
      "deviceCustomNumber1Label": "cn1Label",
      "categoryDeviceType": "catDeviceType",
      "deviceZoneURI": "zoneUri",
      "flexString2Label": "flexString2Label",
      "deviceCustomNumber3Label": "cn3Label",
      "deviceCustomString1": "customString1",
      "externalId": "extId",
      "oldFilePath": "/old/path",
      "deviceCustomString3": "customString3",
      "deviceCustomString2": "customString2",
      "deviceCustomString1Label": "cs1Label",
      "deviceCustomString5": "customString5",
      "deviceCustomString4": "customString4",
      "agentZoneExternalID": "zoneExtId",
      "oldFileCreateTime": "2021-06-01T11:43:20Z",
      "deviceCustomString6": "customString6",
      "deviceCustomIPv6Address3Label": "c6a3Label",
      "deviceEventCategory": "category",
      "deviceCustomString4Label": "cs4Label",
      "deviceCustomFloatingPoint3Label": "cfp3Label",
      "destinationZoneExternalID": "destZoneExtId",
      "flexDate1Label": "flexDate1Label",
      "sourceTranslatedZoneExternalID": "sourceExtId",
      "agentNtDomain": "example.org",
      "oldFileType": "oldType",
      "destinationZoneURI": "destZoneUri"
    },
    "device.version": "1.0",
    "name": "trojan successfully stopped",
    "device.vendor": "security",
    "device.product": "threatmanager",
    "device.event_class_id": 100,
    "version": 0
  },
  "log": {
    "syslog": {
      "facility": {
        "code": 16
      }
    }
  },
  "destination": {
    "nat": {
      "port": 8080,
      "ip": "10.0.0.2"
    },
    "geo": {
      "location": {
        "lon": -122.4194,
        "lat": 37.7749
      }
    },
    "registered_domain": "destNtDomain",
    "process": {
      "name": "destProc",
      "pid": 1234
    },
    "port": 80,
    "bytes": 91011,
    "service": {
      "name": "destService"
    },
    "domain": "destHost",
    "ip": "192.168.0.2",
    "user": {
      "name": "destUser",
      "id": "destUserId",
      "group": {
        "name": "admin"
      }
    },
    "mac": "00:0a:95:9d:68:16"
  },
  "source": {
    "geo": {
      "location": {
        "lon": -122.4194,
        "lat": 37.7749
      }
    },
    "nat": {
      "port": 8081,
      "ip": "10.0.0.4"
    },
    "registered_domain": "sourceNtDomain",
    "process": {
      "name": "sourceProc",
      "pid": 1234
    },
    "port": 443,
    "service": {
      "name": "sourceService"
    },
    "bytes": 5678,
    "ip": "192.168.0.4",
    "domain": "sourceDomain",
    "user": {
      "name": "sourceUser",
      "id": "sourceUserId",
      "group": {
        "name": "sourcePriv"
      }
    },
    "mac": "00:0a:95:9d:68:16"
  },
  "message": "message",
  "url": {
    "original": "url"
  },
  "network": {
    "protocol": "HTTP",
    "transport": "TCP",
    "direction": "inbound"
  },
  "observer": {
    "ingress": {
      "interface": {
        "name": "eth0"
      }
    },
    "registered_domain": "example.com",
    "product": "threatmanager",
    "hostname": "host1",
    "vendor": "security",
    "ip": "192.168.0.3",
    "name": "extId",
    "version": "1.0",
    "mac": "00:0a:95:9d:68:16",
    "egress": {
      "interface": {
        "name": "eth1"
      }
    }
  },
  "file": {
    "inode": 5678,
    "path": "/path/to/file",
    "size": 1024,
    "created": "2021-06-01T11:43:20Z",
    "name": "file.txt",
    "mtime": "2021-06-01T11:45Z",
    "type": "txt",
    "hash": "abcd1234",
    "group": "rw-r--r--"
  },
  "@timestamp": "2021-06-01T11:43:20Z",
  "organization": {
    "name": "custUri",
    "id": "custExtId"
  },
  "host": {
    "nat": {
      "ip": "10.0.0.3"
    }
  },
  "http": {
    "request": {
      "referrer": "referrer",
      "method": "GET"
    }
  },
  "event": {
    "reason": "reason",
    "ingested": "2021-06-01T11:43:20Z",
    "original": "rawEvent",
    "code": 100,
    "kind": 1,
    "created": "2021-06-01T11:43:20Z",
    "timezone": "UTC",
    "start": "2021-06-01T11:43:20Z",
    "action": "blocked",
    "end": "2021-06-01T11:45Z",
    "id": "evt123",
    "outcome": "success"
  },
  "user_agent": {
    "original": "Mozilla"
  }
}

  • Have you signed the contributor license agreement?
  • Have you followed the contributor guidelines?
  • If submitting code, have you built your formula locally prior to submission with gradle check?
  • If submitting code, is your pull request against main? Unless there is a good reason otherwise, we prefer pull requests against main and will backport as needed.
  • If submitting code, have you checked that your submission is for an OS and architecture that we support?
  • If you are submitting this code for a class then read our policy for that.
@elasticsearchmachine elasticsearchmachine added v9.1.0 external-contributor Pull request authored by a developer outside the Elasticsearch team labels Feb 13, 2025
@bhapas bhapas self-assigned this Feb 13, 2025
@andrewkroh
Copy link
Member

I realize this draft is still in progress, and you likely already have plans for these items.

  1. Once this merges, the elastic/elasticsearch-specification will need to be updated to include the new processor and all of its parameters.
  2. The existing test suites for the decode_cef processor in Beats should be incorporated here. If there are any behavioral differences, we should identify them and evaluate whether they are justified. Our goal is to ensure that usages of decode_cef can be replaced with this new processor seamlessly.
  3. While adherence to the specification is the priority, it would also be interesting to compare performance metrics between the new processor and the existing decode_cef processor. The latter has two microbenchmarks, one for short messages and another for long messages. If we have the capability to conduct microbenchmarks, let's add these same two tests.

Additionally, there is a CEF v1 specification (our decode_cef processor was based on CEF v0). It would be worthwhile to review if any major changes in CEF v1 could impact the architecture of our code. See this issue for more details.

@dakrone dakrone added the :Distributed/Ingest Node Execution or management of Ingest Pipelines label Mar 20, 2025
@andrewkroh
Copy link
Member

I asked Lee H about micro-benchmarking, and JMH is being used (see https://github.com/elastic/elasticsearch/tree/main/benchmarks#elasticsearch-microbenchmark-suite). So this could add a benchmark under that suite of tests.

@bhapas
Copy link
Contributor Author

bhapas commented Mar 20, 2025

I asked Lee H about micro-benchmarking, and JMH is being used (see https://github.com/elastic/elasticsearch/tree/main/benchmarks#elasticsearch-microbenchmark-suite). So this could add a benchmark under that suite of tests.

Will this be comparable to the microbenchmarking that is done in the beats processor?

@joegallo
Copy link
Contributor

I rewrote the parsing implementation to scan through things manually rather than relying on regexes, it's quite a bit faster this way. I also rewrote the date parsing so that it doesn't rely on a try/catch in a loop, filling in all those ignored junk stacktraces is expensive.

The changes I made pass all the same tests that were already here, I didn't touch the tests themselves:

joegallo@simulacron:~/Code/elastic/elasticsearch $ git diff --stat 4869f5a6c5b3..cd373b587493
 modules/ingest-common/src/main/java/org/elasticsearch/ingest/common/CefParser.java | 281 ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++----------------------------------------------------------------------
 1 file changed, 154 insertions(+), 127 deletions(-)

I'm going to benchmark this tomorrow to compare the before and after and quantify where we were in terms of performance versus where we are now. My guess is that we're close enough at this point that performance is no longer a concern, but I might be wrong about that.

If you wouldn't mind taking a little time during your next workday to review what I've written, I'd appreciate it.

@bhapas
Copy link
Contributor Author

bhapas commented Dec 11, 2025

I rewrote the parsing implementation to scan through things manually rather than relying on regexes, it's quite a bit faster this way. I also rewrote the date parsing so that it doesn't rely on a try/catch in a loop, filling in all those ignored junk stacktraces is expensive.

The changes I made pass all the same tests that were already here, I didn't touch the tests themselves:

joegallo@simulacron:~/Code/elastic/elasticsearch $ git diff --stat 4869f5a6c5b3..cd373b587493
 modules/ingest-common/src/main/java/org/elasticsearch/ingest/common/CefParser.java | 281 ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++----------------------------------------------------------------------
 1 file changed, 154 insertions(+), 127 deletions(-)

I'm going to benchmark this tomorrow to compare the before and after and quantify where we were in terms of performance versus where we are now. My guess is that we're close enough at this point that performance is no longer a concern, but I might be wrong about that.

If you wouldn't mind taking a little time during your next workday to review what I've written, I'd appreciate it.

Thanks @joegallo for taking time into this. The new manual parsing looks good. Just was unsure about the error with unescaped Equals though , but the current tests which are covering all the scenarios in the spec are successful and also if performance is improved then I think it is the way forward..

The changes in overall look good to me. Please let me know how the performance part runs through and we can push this if everything looks great..

@joegallo
Copy link
Contributor

joegallo commented Dec 11, 2025

To save myself some future web searching (and for the benefit of future github archaeologists), here's a link to the ArcSight SmartConnectors 25.1 CEF Implementation Standard.

@joegallo
Copy link
Contributor

Caveats about microbenchmarking aside, here's this PR before my rewrite of the parser (that is, this is the regex based parser):

              {
                "cef" : {
                  "type" : "cef",
                  "stats" : {
                    "count" : 108000,
                    "time_in_millis" : 2856,
                    "current" : 0,
                    "failed" : 20000
                  }
                }
              }

And here it is for the same workload after my rewrite:

              {
                "cef" : {
                  "type" : "cef",
                  "stats" : {
                    "count" : 108000,
                    "time_in_millis" : 1378,
                    "current" : 0,
                    "failed" : 20000
                  }
                }
              }

So at this point we're averaging about 12.7 microseconds per invocation. The workload for this was generated from the test fixtures cef messages, which explains why there are so many failures (since some of the fixture files contain illegal things to demonstrate that we fail on them).

@joegallo
Copy link
Contributor

We'll need to add documentation for the new processor, and it'll need to be added to the spec for the benefit of Kibana and the clients. I'm okay if we merge this PR as is, and then iterate on those things follow up PRs.

@bhapas
Copy link
Contributor Author

bhapas commented Dec 16, 2025

Ok. We can iterate on the docs in a new PR. Thanks a lot @joegallo for taking time into this.

@bhapas bhapas merged commit f944dd2 into elastic:main Dec 16, 2025
35 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

:Distributed/Ingest Node Execution or management of Ingest Pipelines >enhancement external-contributor Pull request authored by a developer outside the Elasticsearch team Team:Data Management (obsolete) DO NOT USE. This team no longer exists. v9.3.0

5 participants