Skip to content

Data Enrichment with Webhooks

Full Recipe

Shared by: Matthew Pagan

This recipe uses the NumberIteratorIngest to stream numbers into the graph. A Standing Query observes when numbers are manifested into the graph, logs them to the console, and then sends those numbers to an HTTP Endpoint. The service powering the HTTP endpoint will then enrich the graph by calculating factors of those numbers, and then creating edges between the number nodes and their factors in the graph.

Standing Query Output to HTTP Endpoint
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
version: 1
title: Data Enrichment with Webhooks
contributor: https://github.com/mastapegs
summary: Stream numbers into graph and notify HTTP endpoint to enrich graph
description: |-
  This recipe will stream numbers into the graph and stream them out to an HTTP endpoint, which will
  then calculate the factors of those numbers, and create relationships between the numbers and their
  factors.
ingestStreams:
  - type: NumberIteratorIngest
    startAtOffset: 1
    ingestLimit: 13
    format:
      type: CypherLine
      query: |-
        WITH toInteger($that) AS number
        MATCH (n) WHERE id(n) = idFrom("Number", number)
        SET n:Number, n.number = number
standingQueries:
  - pattern:
      type: Cypher
      mode: DistinctId
      query: |-
        MATCH (n:Number)
        WHERE n.number IS NOT NULL
        RETURN DISTINCT id(n) AS id
    outputs:
      log-to-console:
        type: CypherQuery
        query: |-
          MATCH (n:Number)
          WHERE id(n) = $that.data.id
          RETURN n.number AS number, $that.data.id AS id
        andThen:
          type: PrintToStandardOut
      post-to-webhook:
        type: CypherQuery
        query: |-
          MATCH (n:Number)
          WHERE id(n) = $that.data.id
          RETURN n.number AS number, $that.data.id AS id
        andThen:
          type: PostToEndpoint
          url: http://127.0.0.1:3000/webhook
nodeAppearances:
  - predicate:
      propertyKeys: []
      knownValues: {}
      dbLabel: Number
    label:
      type: Property
      key: number
      prefix: "Number: "
quickQueries: []
sampleQueries:
  - name: Return all Number nodes
    query: MATCH (n:Number) RETURN n
statusQuery: null

Download Recipe

Scenario

There may come a time when you want to enrich the graph's data from an external service. This is a great use-case for Quine's Standing Queries to output data to an HTTP endpoint. By sending the external service data needed to identify a node, we can perform Cypher Queries to enrich that node.

In this example, we'll simplify everything to demonstrate back-and-forth communication between Quine and an external service:

  • A NumberIteratorIngest will be used to stream in 13 numbers, 1-13.
  • A Standing Query monitoring for the creation of these numbers will then:
    • log them to the console
    • POST them to a python Flask service, where we can observe data coming in from Quine.
  • The Flask service will then calculate the factors of the numbers, and create edges between numbers and their factors.
  • The end result will be 13 Number nodes with edges between numbers and their factors.

How it Works

The recipe uses the NumberIteratorIngest ingest stream to stream 13 numbers, 1-13, into the graph.

ingestStreams:
- type: NumberIteratorIngest
    startAtOffset: 1
    ingestLimit: 13
    format:
    type: CypherLine
    query: |-
        WITH toInteger($that) AS number
        MATCH (n) WHERE id(n) = idFrom("Number", number)
        SET n:Number, n.number = number

A standing query is configured to observe for the pattern of numbers being manifested into the graph.

- pattern:
    type: Cypher
    mode: DistinctId
    query: |-
        MATCH (n:Number)
        WHERE n.number IS NOT NULL
        RETURN DISTINCT id(n) AS id

When the pattern is detected, the recipe then defines 2 Standing Query Outputs to send the pattern results, one to the console, and the other to an HTTP endpoint.

Log to Console

log-to-console:
    type: CypherQuery
    query: |-
        MATCH (n:Number)
        WHERE id(n) = $that.data.id
        RETURN n.number AS number, $that.data.id AS id
    andThen:
        type: PrintToStandardOut

POST to HTTP Endpoint

post-to-webhook:
    type: CypherQuery
    query: |-
        MATCH (n:Number)
        WHERE id(n) = $that.data.id
        RETURN n.number AS number, $that.data.id AS id
    andThen:
        type: PostToEndpoint
        url: http://127.0.0.1:3000/webhook

Python Flask HTTP Service

This Python service defines the endpoint that Quine will send the standing query output.

  • Retrieves the number property from the node sent by the standing query
  • Uses that number to generate the node's id (via idFrom)
  • Create edges between numbers and their factors by sending a Cypher Query back to Quine via the POST /api/v1/query/cypher endpoint.
from flask import Flask, request
import requests
import time
import json

app = Flask(__name__)


def calculate_factors(number):
    factors = []
    for i in range(1, number):
        if number == i:
            continue
        if number % i == 0:
            factors.append(i)
    return factors


@app.route("/webhook", methods=["POST"])
def webhook():
    data = request.json
    print("Webhook received:", data)
    # {
    #     "meta": {
    #         "isPositiveMatch": True,
    #         "resultId": "0c89ce9e-16b0-71e1-ad1c-6ead813bed1b",
    #     },
    #     "data": {"number": 9, "id": "ddf60681-6476-3322-815b-ed093f5aa937"},
    # }

    number = data["data"]["number"]
    factors = calculate_factors(number)
    factors_list = json.dumps(factors)

    # Wait for Quine to be ready
    time.sleep(2)

    query = f"""\
UNWIND {factors_list} AS factor
MATCH (n), (m)
WHERE id(n) = idFrom("Number", {number})
AND id(m) = idFrom("Number", factor)
CREATE (m)-[:FACTOR_OF]->(n)\
"""

    requests.post(
        "http://localhost:8080/api/v1/query/cypher",
        headers={"Content-Type": "text/plain"},
        data=query,
    )

    return "Webhook received and processed", 200


if __name__ == "__main__":
    app.run(port=3000)

Running the Recipe

Start Python HTTP Service

  1. Copy over the Python code for the HTTP service and save it (server.py for example)
  2. Create a new environment for Python script
    python -m venv .venv
    
  3. Install script dependencies
    pip install flask requests
    
  4. Run service
    python server.py
    

Start Recipe

java -jar quine.jar -r webhook.yaml

This command serves the application on http://127.0.0.1:8080

Observe Nodes Manifested in Graph

After starting the Python service and running Quine with the recipe, load up the Exploration UI. The recipe includes a sample query that will load up all the Number nodes. You will observe that there are 13 nodes. These nodes were initially streamed into the graph ranging from 1-13, but the external python flask service, upon receiving the standing query POST, created edges between these numbers, and their factors.

Here is a picture of the relationships between the number nodes.

Tip

Since the number 1 is a factor of every integer, after loading up the graph, I selected the 1 node and used the DELETE key to remove it from the Exploration UI so that the graph was a bit clearer to observe.

Note how all prime numbers have NO edges pointing TO them. They only either have no edges, or they are factors of other numbers themselves. Prime numbers have no factors (beyond themselves and 1), so it makes sense that there are no edges pointing TO them.

number nodes

Summary

This recipe showed a simple example of how to facilitate communication between Quine, and an external service.

  • Quine -> Service via Standing Query Output to an HTTP Endpoint
  • Service -> Quine via Cypher Queries to POST /api/v1/query/cypher

While this example only calculated factors of numbers, it's not too much farther of a stretch to then add/enhance a node with data from another service, or a database.