Data Enrichment with Webhooks
Full Recipe¶
Shared by: Matthew Pagan
This recipe uses the NumberIteratorIngest
to stream numbers into the graph. A Standing Query observes when numbers are manifested into the graph, logs them to the console, and then sends those numbers to an HTTP Endpoint. The service powering the HTTP endpoint will then enrich the graph by calculating factors of those numbers, and then creating edges between the number nodes and their factors in the graph.
Standing Query Output to HTTP Endpoint
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 |
|
Scenario¶
There may come a time when you want to enrich the graph's data from an external service. This is a great use-case for Quine's Standing Queries to output data to an HTTP endpoint. By sending the external service data needed to identify a node, we can perform Cypher Queries to enrich that node.
In this example, we'll simplify everything to demonstrate back-and-forth communication between Quine and an external service:
- A
NumberIteratorIngest
will be used to stream in 13 numbers, 1-13. - A Standing Query monitoring for the creation of these numbers will then:
- log them to the console
POST
them to a python Flask service, where we can observe data coming in from Quine.
- The Flask service will then calculate the factors of the numbers, and create edges between numbers and their factors.
- The end result will be 13
Number
nodes with edges between numbers and their factors.
How it Works¶
The recipe uses the NumberIteratorIngest
ingest stream to stream 13 numbers, 1-13, into the graph.
ingestStreams:
- type: NumberIteratorIngest
startAtOffset: 1
ingestLimit: 13
format:
type: CypherLine
query: |-
WITH toInteger($that) AS number
MATCH (n) WHERE id(n) = idFrom("Number", number)
SET n:Number, n.number = number
A standing query is configured to observe for the pattern of numbers being manifested into the graph.
- pattern:
type: Cypher
mode: DistinctId
query: |-
MATCH (n:Number)
WHERE n.number IS NOT NULL
RETURN DISTINCT id(n) AS id
When the pattern is detected, the recipe then defines 2 Standing Query Outputs to send the pattern results, one to the console, and the other to an HTTP endpoint.
Log to Console
log-to-console:
type: CypherQuery
query: |-
MATCH (n:Number)
WHERE id(n) = $that.data.id
RETURN n.number AS number, $that.data.id AS id
andThen:
type: PrintToStandardOut
POST to HTTP Endpoint
post-to-webhook:
type: CypherQuery
query: |-
MATCH (n:Number)
WHERE id(n) = $that.data.id
RETURN n.number AS number, $that.data.id AS id
andThen:
type: PostToEndpoint
url: http://127.0.0.1:3000/webhook
Python Flask HTTP Service
This Python service defines the endpoint that Quine will send the standing query output.
- Retrieves the number property from the node sent by the standing query
- Uses that number to generate the node's id (via
idFrom
) - Create edges between numbers and their factors by sending a Cypher Query back to Quine via the
POST /api/v1/query/cypher
endpoint.
from flask import Flask, request
import requests
import time
import json
app = Flask(__name__)
def calculate_factors(number):
factors = []
for i in range(1, number):
if number == i:
continue
if number % i == 0:
factors.append(i)
return factors
@app.route("/webhook", methods=["POST"])
def webhook():
data = request.json
print("Webhook received:", data)
# {
# "meta": {
# "isPositiveMatch": True,
# "resultId": "0c89ce9e-16b0-71e1-ad1c-6ead813bed1b",
# },
# "data": {"number": 9, "id": "ddf60681-6476-3322-815b-ed093f5aa937"},
# }
number = data["data"]["number"]
factors = calculate_factors(number)
factors_list = json.dumps(factors)
# Wait for Quine to be ready
time.sleep(2)
query = f"""\
UNWIND {factors_list} AS factor
MATCH (n), (m)
WHERE id(n) = idFrom("Number", {number})
AND id(m) = idFrom("Number", factor)
CREATE (m)-[:FACTOR_OF]->(n)\
"""
requests.post(
"http://localhost:8080/api/v1/query/cypher",
headers={"Content-Type": "text/plain"},
data=query,
)
return "Webhook received and processed", 200
if __name__ == "__main__":
app.run(port=3000)
Running the Recipe¶
Start Python HTTP Service
- Copy over the Python code for the HTTP service and save it (
server.py
for example) - Create a new environment for Python script
python -m venv .venv
- Install script dependencies
pip install flask requests
- Run service
python server.py
Start Recipe
java -jar quine.jar -r webhook.yaml
This command serves the application on http://127.0.0.1:8080
Observe Nodes Manifested in Graph
After starting the Python service and running Quine with the recipe, load up the Exploration UI. The recipe includes a sample query that will load up all the Number
nodes. You will observe that there are 13 nodes. These nodes were initially streamed into the graph ranging from 1-13, but the external python flask service, upon receiving the standing query POST, created edges between these numbers, and their factors.
Here is a picture of the relationships between the number nodes.
Tip
Since the number 1
is a factor of every integer, after loading up the graph, I selected the 1
node and used the DELETE
key to remove it from the Exploration UI so that the graph was a bit clearer to observe.
Note how all prime numbers have NO edges pointing TO them. They only either have no edges, or they are factors of other numbers themselves. Prime numbers have no factors (beyond themselves and 1), so it makes sense that there are no edges pointing TO them.
Summary¶
This recipe showed a simple example of how to facilitate communication between Quine, and an external service.
Quine -> Service
via Standing Query Output to an HTTP EndpointService -> Quine
via Cypher Queries toPOST /api/v1/query/cypher
While this example only calculated factors of numbers, it's not too much farther of a stretch to then add/enhance a node with data from another service, or a database.