Skip to main content

Recursive Subsearches in Splunk

·5 mins

Splunk is a great tool to sift through large amounts of data. However, a search building a transaction across multiple different IDs posed a challenge even for Splunk. The easiest way to speed a search up, is to reduce the amount of data that has to be parsed. For this reason, I developed a recursive subsearch.

A subsearch will gather the different IDs, build a search string for every combination and save this string into a multi-value field. This multi-value field will then be flattened into a single string, using the nomv function, and passed into the outer search. The outer search can now select the needed data more precisely, greatly speeding up the search.

Pseudocode #

Suppose we have the following transaction, with events from other transactions scattered throughout it:

 idA  idB  idC _raw
1  Connection 1 established
1  871  Connection 1 has received data, processing in pipeline 871
 871  Pipeline 871 started processing
1  Connection 1 terminated
 871  Pipeline 871 processed successfully
792  871  Pipeline 871 is handing off to archival job 792
793  871  Pipeline 871 is handing off to archival job 792
792  Archiving agent 792 started
792  Archiving agent 792 wrote data to S3 storage
793  Archiving agent 793 started
792  Archiving agent 792 completed successfully
793  Archiving agent 793 wrote data to disk
793  Archiving agent 793 completed successfully
871  Pipeline 871 completed archiving, exiting

If we wanted to display a comprehensive transaction log for this data, we are faced with a challenge. Across the transaction, different IDs are used. If we simply ran the following search:

| search (idA=* OR idB=*) idC=871

We would get far too many results. Everything that either contains idA or idB without any idC would be added to our output. Using a subsearch, we could extract the specific IDs:

[ | search (idA=* OR idB=*) idC=871
  | stats values(idA) as idA, values(idB) as idB by idC
]

However, this would fail, as the fields are joined using the “AND” operator in the outer search and we have multiple values for idB. But using the values we extracted, we can build a custom search string, which will be passed to the outer search:

[ | search (idA=* OR idB=*) idC=871
  | stats values(idA) as idA, values(idB) as idB by idC
  | eval q_idA=if(isnull(idA), "", " OR idA=\"" . $idA$ . "\""), q_idB=if(isnull(idB), "", " OR idB IN (" . mvjoin($idB$, ", ") . ")")
  | eval search="(idC=\"" . $idC . "\"" . q_idA . q_idB . ")"
]

This builds partial queries for idA and idB. This is done to maintain semantic accuracy of the final search. idA is straightforward, it is set to an empty string if it is null, otherwise the query for idA if prepended with an “OR” operator. idB is a little more involved. It, too, is set to an empty string if null, else its values are concatenated using mvjoin and placed in the parenthesis of the splunk “IN” search operand. Using these parts, a final search can be constructed, simply joining the different parts. This is assigned the field name “search”. During trial and error, I discovered the outer search will execute the value in the “search” field as is. I have not yet seen a manual reference to this, so the behaviour may not be stable.

This works well, if we want the events from a single transaction. However, we can expand on this to enable us to gather the information for multiple relevant transactions:

[ | search (idA=* OR idB=*) idC=871
  | stats values(idA) as idA, values(idB) as idB by idC
  | eval q_idA=if(isnull(idA), "", " OR idA=\"" . $idA$ . "\""), q_idB=if(isnull(idB), "", " OR idB IN (" . mvjoin($idB$, ", ") . ")")
  | eval search="(idC=\"" . $idC . "\"" . q_idA . q_idB . ")"
  | stats delim=" OR " list(search) AS search
  | nomv search
]

In this search, we add the different searches into a single multi-value field and set Splunk to delimit the searches with the string " OR “. Afterwards, the multi-value field is flattened using the nomv command into a single, regular, field, which is then passed as the search query to the outer search operation.

Real-word application #

During processing of Cisco Email Security Appliance logs I was first faced with these issues. Due to the amount of data, Splunks transaction functioned quickly reached its limits.

To speed things up, the input data must be reduced, using the method described above. The log entries are not tagged with the same ID across the whole transaction. While receiving the message, an incoming connection ID (ICID) is offered, during internal processing the internal message ID (MID) is used, finally, during delivery, multiple delivery connection IDs (DCID) are referenced. ICID and DCIDs are associated to the MID using a log line each. A single subsearch would not return all needed information, so I designed a search in three steps:

  1. Select the MID of interest
  2. Gather the ICID and DCIDs associated with the MID
  3. Build a recursive search string from the subsearch
[ | search [ | search recipient="john.doe@example.org" | fields index, host, internal_message_id, sourcetype ]
| stats values(icid) AS icid, values(dcid) AS dcid BY index, host, internal_message_id, sourcetype
``` Build individual search strings for icid and dcid, in case one of them is null, return empty string if it is ```
| eval q_icid=if(isnull(icid), "", " OR icid=\"" . $icid$ . "\""), q_dcid=if(isnull(dcid), "", " OR dcid IN (" . mvjoin($dcid$, ", ") . ")")
``` Build individual search string for each message ```
| eval q="(index=\"" . $index$ . "\" host=\"" . $host$ . "\" (internal_message_id=\"" . $internal_message_id$ . "\"" . q_icid . q_dcid . "))"
``` Chain individual search strings into a single event, to generate a new search command, flatten the resulting multi-value field ```
| stats delim=" OR " list(q) AS search | nomv search ]
| transaction internal_message_id icid dcid
| search recipient="john.doe@example.org"
Oliver Springer
Author
Oliver Springer
Cybersecurity Engineer