Skip to content

Temporal Locality

Full Recipe

Shared by: Michael Aglietti

This recipe looks for emails sent or received by cto@company.com within a sliding window as a means of highlighting a technique for matching on temporal locality of nodes in standing queries.

Temporal Locality Recipe
  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
version: 1
title: Temporal Locality Example
contributor: https://github.com/maglietti
summary: Relate email messages sent or received by a specific user within a 4-6 minute window.
description: |-
        This recipe looks for emails sent or received by cto@company.com within a 4-6 minute
        window as a means of highlighting a technique for matching on temporal locality of nodes.

ingestStreams:
  - type: FileIngest
    path: email.json
    format:
      type: CypherJson
      query: |-
        MATCH (sender), (message) 
        WHERE id(sender) = idFrom('email', $that.from)
          AND id(message) = idFrom('message', $that) 

        SET sender.email = $that.from,
            sender: Email,
            message.from = $that.from,
            message.to = $that.to,
            message.subject = $that.subject,
            message.time = datetime({ epochMillis: $that.time}),
            message: Message

        CREATE (sender)-[:SENT_MSG]->(message)

        WITH $that as t, message
        UNWIND t.to AS rcv
        MATCH (receiver)
        WHERE id(receiver) = idFrom('email', rcv)

        SET receiver.email = rcv,
            receiver: Email

        CREATE (message)-[:RECEIVED_MSG]->(receiver)

standingQueries:
   - pattern:
       type: Cypher
       mode: MultipleValues
       query: |-
         MATCH (n)-[:SENT_MSG]->(m)-[:RECEIVED_MSG]->(r)
         WHERE n.email="cto@company.com" OR r.email="cto@company.com"
         RETURN id(n) as ctoId, id(m) as ctoMsgId, m.time as mTime, id(r) as recId
     outputs:
       withinFourToSixMinuteWindow:
         type: CypherQuery
         query: |-
           MATCH (n)-[:SENT_MSG]->(m)-[:RECEIVED_MSG]->(r), (thisMsg)
           WHERE id(n) = $that.data.ctoId
             AND id(r) = $that.data.recId
             AND id(thisMsg) = $that.data.ctoMsgId
             AND id(m) <> id(thisMsg)
             AND duration("PT6M") > duration.between(m.time,thisMsg.time) > duration("PT4M")

           CREATE (m)-[:IN_WINDOW]->(thisMsg)
           CREATE (m)<-[:IN_WINDOW]-(thisMsg)

           WITH n, m, r, "http://localhost:8080/#MATCH" + text.urlencode(' (n)-[:SENT_MSG]->(m)-[:RECEIVED_MSG]->(r) WHERE strId(n)="' + strId(n) + '"AND strId(r)="' + strId(r) + '" AND  strId(m)="' + strId(m) + '" RETURN n, r, m') as URL

           RETURN URL
         andThen:
          type: PrintToStandardOut

nodeAppearances:
  - predicate:
      propertyKeys:
        - email
      knownValues:
        email: "cto@company.com"
      dbLabel: Email
    icon: ion-android-person
    color: "#F44336"
    size:
    label:
      type: Property
      key: email
  - predicate:
      propertyKeys: []
      knownValues: {}
      dbLabel: Email
    icon: ion-android-person
    color: "#2ECC71"
    size:
    label:
      type: Property
      key: email
  - predicate:
      propertyKeys: []
      knownValues: {}
      dbLabel: Message
    icon: ion-ios-email-outline
    color: "#2ECC71"
    size:
    label:
      type: Property
      key: subject

quickQueries:
  - predicate:
      propertyKeys: [ ]
      knownValues: {}
    quickQuery:
      name: "[Node] Adjacent Nodes"
      querySuffix: MATCH (n)--(m) RETURN DISTINCT m
      queryLanguage: Cypher
      sort: Node
  - predicate:
      propertyKeys: []
      knownValues: {}
    quickQuery:
      name: "[Node] Refresh"
      querySuffix: RETURN n
      queryLanguage: Cypher
      sort: Node
  - predicate:
      propertyKeys: []
      knownValues: {}
    quickQuery:
      name: "[Text] Local Properties"
      querySuffix: RETURN id(n), properties(n)
      queryLanguage: Cypher
      sort: Text
  - predicate:
      propertyKeys: []
      knownValues: {}
      dbLabel: Message
    quickQuery:
      name: "[Node] Messages in Window"
      querySuffix: MATCH (n)-[:IN_WINDOW]-(m) RETURN n,m
      queryLanguage: Cypher
      sort: Node
  - predicate:
      propertyKeys: []
      knownValues: {}
      dbLabel: Message
    quickQuery:
      name: "[Text] Table of Messages in Window"
      querySuffix: MATCH (n)-[r:IN_WINDOW]-(m) RETURN DISTINCT n.time AS MSG1_TIME, n.subject AS MSG1_SUBJECT, m.time AS MSG2_TIME, m.subject AS MSG2_SUBJECT, toString(abs(duration.between(n.time,m.time).seconds/60)) + " Minutes " + toString(abs(duration.between(n.time,m.time).seconds)-abs(duration.between(n.time,m.time).seconds/60)*60) + " Seconds" AS DELTA_TIME
      queryLanguage: Cypher
      sort: Text

sampleQueries: []

Download Recipe

Scenario

This scenario processes records containing metadata for almost 295,000 emails are ingested for the purpose of identifying emails to/from a specific email address within a sliding 2-minute window.

{
    "from": <sender>,
    "to": [<recipients>],
    "subject": <email subject line>,
    "time": <epoch time timestamp>,
    "sequence": <sequence number>
}

Sample Data

Download the sample data to the same directory as the recipe and where Quine will be run.

Download email.json

How it Works

The recipe reads data from a sample data file using an ingest stream and parses each line into sender, receiver and message nodes to manifest a graph in Quine.

INGEST-1 processes the email.json file:

  - type: FileIngest
    path: email.json
    format:
        type: CypherJson
        query: |-
            MATCH (sender), (message)
            WHERE id(sender) = idFrom('email', $that.from)
            AND id(message) = idFrom('message', $that)

            SET sender.email = $that.from,
                sender: Email,
                message.from = $that.from,
                message.to = $that.to,
                message.subject = $that.subject,
                message.time = datetime({epochMillis: $that.time}),
                message: Message

            CREATE (sender)-[:SENT_MSG]->(message)

            WITH $that AS t, message
            UNWIND t.to AS rcv
            MATCH (receiver)
            WHERE id(receiver) = idFrom('email', rcv)

            SET receiver.email = rcv,
                receiver: Email

            CREATE (message)-[:RECEIVED_MSG]->(receiver)
POST /api/v1/ingest/INGEST-1
{
  "type": "FileIngest",
  "path": "email.json",
  "format": {
    "type": "CypherJson",
    "query":   "MATCH (sender), (message) \\nWHERE id(sender) = idFrom('email', $that.from)\\n AND id(message) = idFrom('message', $that) \\n\\nSET sender.email = $that.from,\\n sender: Email,\\n message.from = $that.from,\\n message.to = $that.to,\\n message.subject = $that.subject,\\n message.time = datetime({ epochMillis: $that.time}),\\n message: Message\\n\\nCREATE (sender)-\[:SENT\_MSG\]->(message)\\n\\nWITH $that as t, message\\nUNWIND t.to AS rcv\\nMATCH (receiver)\\nWHERE id(receiver) = idFrom('email', rcv)\\n\\nSET receiver.email = rcv,\\n receiver: Email\\n\\nCREATE (message)-\[:RECEIVED\_MSG\]->(receiver)"
  }
}

A standing query is configured to detect when emails that are sent or received by cto@company.com are within a two minute sliding window of one another.

The pattern query matches each individual (sender)-[:SENT_MSG]->(message)-[:RECEIVED_MSG]->(receiver) pattern.

- pattern:
    type: Cypher
    mode: MultipleValues
    query: |-
        MATCH (n)-[:SENT_MSG]->(m)-[:RECEIVED_MSG]->(r)
        WHERE n.email="cto@company.com" OR r.email="cto@company.com"
        RETURN id(n) as ctoId, id(m) as ctoMsgId, m.time as mTime, id(r) as recId

Once the pattern query matches the (sender)-[:SENT_MSG]->(message)-[:RECEIVED_MSG]->(receiver) pattern, an event is sent to an output query to calculate the temporal locality using the Cypher duration.between() function to establish a sliding window of interest.

duration("PT6M") > duration.between(m.time,thisMsg.time) > duration("PT4M")

The expression utilizes Cypher-defined temporal data types, based on ISO 8601 format, PT6M and PT4M to represent 6 minutes < duration >4 minutes for the window. We are able to use this because we converted the ingested epoch time formatted timestamp to datetime format in the ingest query.

message.time = datetime({epochMillis: $that.time})

Otherwise, we would have cast the data within the standing query:

AND duration("PT6M") > duration.between(datetime({epochMillis: m.time}),datetime({epochMillis: thisMsg.time})) > duration("PT4M")

Using the former pattern allows us to to express the standing query in a clean, simple manner.

    outputs:
      withinFourToSixMinuteWindow:
        type: CypherQuery
        query: |-
          MATCH (n)-[:SENT_MSG]->(m)-[:RECEIVED_MSG]->(r), (thisMsg)
           WHERE id(n) = $that.data.ctoId
             AND id(r) = $that.data.recId
             AND id(thisMsg) = $that.data.ctoMsgId
             AND id(m) <> id(thisMsg)
             AND duration("PT6M") > duration.between(m.time,thisMsg.time) > duration("PT4M")

           CREATE (m)-[:IN_WINDOW]->(thisMsg)
           CREATE (m)<-[:IN_WINDOW]-(thisMsg)

           WITH n, m, r, "http://localhost:8080/#MATCH" + text.urlencode(' (n)-[:SENT_MSG]->(m)-[:RECEIVED_MSG]->(r) WHERE strId(n)="' + strId(n) + '"AND strId(r)="' + strId(r) + '" AND  strId(m)="' + strId(m) + '" RETURN n, r, m') as URL

           RETURN URL
        andThen:
          type: PrintToStandardOut

When a complete pattern match is detected the recipe outputs a link to the console for an analyst to use when reviewing the event further inside Quine's Exploration UI.

2023-02-23 12:04:24,981 Standing query `withinFourToSixMinuteWindow` match: {"meta":{"isPositiveMatch":true,"resultId":"628d6523-4d25-4cb4-aed2-9890ecf0cb9c"},"data":{"URL":"http://localhost:8080/#MATCH%20%28n%29-%5B%3ASENT_MSG%5D-%3E%28m%29-%5B%3ARECEIVED_MSG%5D-%3E%28r%29%20WHERE%20strId%28n%29%3D%22a69876bc-8876-37e9-b776-471507a080b9%22AND%20strId%28r%29%3D%2299eeafc4-3178-3aca-8c7c-f84d0781f3a1%22%20AND%20%20strId%28m%29%3D%229e66e020-45ad-3971-92ec-60be8fa844e7%22%20RETURN%20n%2C%20r%2C%20m"}}

Running the Recipe

 java -jar quine-1.8.1.jar -r duration.yaml
Graph is ready
Running Recipe: Temporal Locality Example
Using 3 node appearances
Using 5 quick queries
Running Standing Query STANDING-1
Running Ingest Stream INGEST-1
Quine web server available at http://localhost:8080

Almost immediately positive matches will begin to stream to your console window.

2023-02-23 14:12:05,165 Standing query `withinFourToSixMinuteWindow` match: {"meta":{"isPositiveMatch":true,"resultId":"52c981a4-394f-48c3-8419-671fe5f0c1d5"},"data":{"URL":"http://localhost:8080/#MATCH%20%28n%29-%5B%3ASENT_MSG%5D-%3E%28m%29-%5B%3ARECEIVED_MSG%5D-%3E%28r%29%20WHERE%20strId%28n%29%3D%22a69876bc-8876-37e9-b776-471507a080b9%22AND%20strId%28r%29%3D%22c57f8f3b-b567-3b7e-b2a2-c516f030ce2f%22%20AND%20%20strId%28m%29%3D%221c0bc726-4a19-363d-a623-f5df12142a72%22%20RETURN%20n%2C%20r%2C%20m"}}

Copy and paste the URL into your browser (or ++"cmd+click"++ on a Mac) to open the Exploration UI and display an email event.

email event

Right click on one of the email messages and select [Node] Messages in Window to find other messages that occured within the time window.

messages in window

Continue to right click and run queries to discover other messages that we sent but not did not occur within the time window.

other email messages

Summary

This simple recipe shows how the duration.between() temporal function is able to implement a sliding window for matching events that happen close to gether in time.

Tip

Quick Queries are available by right clicking on a node.

Quick Query Node Type Description
Adjacent Nodes All Display the nodes that are adjacent to this node.
Refresh All Refresh the content stored in a node
Local Properties All Display the properties stored by the node
Messages in Window Messages Display all message nodes within the sliding window
Table of Messages in Window Messages Return a list of messages and time deltas