Reactive Streams¶
Reactive Streams enable TCP-based, backpressured communication between thatDot products. Use them to connect Quine Enterprise and Novelty for bidirectional data processing pipelines.
| Role | Description | Use Case |
|---|---|---|
| Publisher (Server) | Binds to a port and broadcasts data to connected subscribers | Standing query outputs, observation results |
| Subscriber (Client) | Connects to a publisher to receive streamed data | Ingest from another product's output |
ReactiveStream as Ingest Source¶
The ReactiveStream ingest source connects as a client to an existing reactive stream publisher. Use this to ingest data from another product's standing query or observation output.
Example¶
In this example, we configure Quine to ingest data from a reactive stream server running on port 9002.
Registering via the API¶
To register a ReactiveStream ingest via Create Ingest Stream: POST /api/v2/ingests:
{
"name": "from-reactive-stream",
"source": {
"type": "ReactiveStream",
"url": "localhost",
"port": 9002,
"format": {
"type": "Json"
}
},
"query": "CREATE (n:Event $that)",
"parameter": "that",
"parallelism": 1
}
Using a Recipe¶
The same ingest stream defined in a Recipe:
ingestStreams:
- type: ReactiveStream
url: localhost
port: 9002
format:
type: CypherJson
query: |-
MATCH (n)
WHERE id(n) = idFrom($that.id)
SET n = $that
Ingest Source Configuration Reference¶
| Parameter | Type | Required | Description |
|---|---|---|---|
type |
string | Yes | Must be "ReactiveStream" |
url |
string | Yes | Hostname of the reactive stream server to connect to |
port |
integer | Yes | Port of the reactive stream server |
format |
object | Yes | Record format (e.g., Json, CypherJson) |
ReactiveStream as Output Destination¶
The ReactiveStream output destination creates a server that broadcasts results to connected subscribers. Use this to publish standing query results for downstream processing by other products.
Example¶
Configure a standing query to output results to a reactive stream on port 9001:
{
"name": "events-to-downstream",
"pattern": {
"type": "Cypher",
"query": "MATCH (n:Event) RETURN DISTINCT id(n) AS id",
"mode": "DistinctId"
},
"outputs": [
{
"name": "to-reactive-stream",
"destinations": [
{
"type": "ReactiveStream",
"address": "0.0.0.0",
"port": 9001,
"format": {
"type": "JSON"
}
}
]
}
]
}
Output Destination Configuration Reference¶
| Parameter | Type | Required | Default | Description |
|---|---|---|---|---|
type |
string | Yes | — | Must be "ReactiveStream" |
address |
string | No | localhost |
Address to bind the reactive stream server |
port |
integer | Yes | — | Port to bind the reactive stream server |
format |
object | Yes | — | Output format (JSON or Protobuf) |
Cluster Limitation
Reactive Stream outputs do not function correctly when running in a cluster. Use Kafka or Kinesis for clustered deployments.
Connecting Quine Enterprise and Novelty¶
Reactive Streams enable powerful bidirectional data pipelines between Quine Enterprise and Novelty. This pattern allows you to:
- Process graph data in Quine Enterprise
- Stream results to Novelty for anomaly detection
- Feed Novelty's observations back into Quine Enterprise for further analysis
Architecture¶
┌─────────────────────┐ ┌─────────────────────┐
│ Quine Enterprise │ │ Novelty │
│ │ │ │
│ ┌───────────────┐ │ ReactiveStream │ ┌───────────────┐ │
│ │ Standing Query├──┼──── port 9001 ────►│──┤ Ingest │ │
│ └───────────────┘ │ │ └───────┬───────┘ │
│ │ │ │ │
│ │ │ ▼ │
│ │ │ ┌───────────────┐ │
│ ┌───────────────┐ │ ReactiveStream │ │ Observations │ │
│ │ Ingest │◄─┼──── port 9002 ─────┤──┤ Output │ │
│ └───────────────┘ │ │ └───────────────┘ │
│ │ │ │
└─────────────────────┘ └─────────────────────┘
Step 1: Quine Enterprise Standing Query Output¶
Configure Quine Enterprise to output standing query results to a reactive stream:
{
"name": "events-to-novelty",
"pattern": {
"type": "Cypher",
"query": "MATCH (n:Event) RETURN DISTINCT id(n) AS id",
"mode": "DistinctId"
},
"outputs": [
{
"name": "to-novelty",
"resultEnrichment": {
"query": "MATCH (n) WHERE id(n) = $that.data.id RETURN {eventId: id(n), type: n.type, timestamp: n.timestamp} AS result",
"parameter": "that",
"parallelism": 1
},
"destinations": [
{
"type": "ReactiveStream",
"address": "0.0.0.0",
"port": 9001,
"format": {
"type": "JSON"
}
}
]
}
]
}
Step 2: Create a Novelty Namespace¶
Before creating an ingest in Novelty, you must create a namespace to store observations. Namespaces must be 1–16 characters, start with a letter, and contain only letters and digits (no hyphens, underscores, or special characters).
curl -X POST http://localhost:8181/api/v2/namespaces \
-H 'Content-Type: text/plain' \
-d 'eventanalysis'
Returns 201 Created if the namespace is new, or 204 No Content if it already exists.
Step 3: Novelty Ingest from Quine Enterprise¶
Configure Novelty to ingest from Quine Enterprise's reactive stream and output observations to another reactive stream. The noveltyNamespace field specifies which namespace to store observations in — this namespace must already exist.
{
"name": "from-quine",
"noveltyNamespace": "eventanalysis",
"transformation": "event-transform",
"source": {
"type": "ReactiveStream",
"url": "localhost",
"port": 9001,
"format": {
"type": "Json"
}
},
"parallelism": 1,
"outputWorkflow": {
"type": "EachResult",
"destinations": [
{
"type": "ReactiveStream",
"address": "0.0.0.0",
"port": 9002,
"format": {
"type": "JSON"
}
}
]
}
}
Step 4: Quine Enterprise Ingest from Novelty¶
Configure Quine Enterprise to ingest Novelty's observations:
{
"name": "from-novelty",
"source": {
"type": "ReactiveStream",
"url": "localhost",
"port": 9002,
"format": {
"type": "Json"
}
},
"query": "CREATE (n:NoveltyObservation {score: $that.score, observation: $that.observation, sequence: $that.sequence, processedAt: datetime()})",
"parameter": "that",
"parallelism": 1
}
Troubleshooting¶
Connection Refused¶
If you see connection errors when starting a ReactiveStream ingest:
- Verify the publisher (server) is running and bound to the expected port
- Check that the
urlandportin your ingest configuration match the publisher'saddressandport - Ensure no firewall rules are blocking the connection
No Data Received¶
If the connection succeeds but no data flows:
- Verify the format types match between publisher and subscriber (e.g., both using
Json) - Check that the standing query or observation output is actively producing results
- Use the standing query statistics API to verify results are being generated
Backpressure¶
If data processing slows or stalls:
- The subscriber may be overwhelmed; increase
parallelismor optimize your ingest query - Check downstream systems (persistor, standing queries) for bottlenecks
- Monitor Quine metrics for queue depth and processing rates
Additional Resources¶
- Standing Queries - Configure standing query outputs
- Ingest Streams Overview - General ingest configuration
- REST API Reference - Complete API documentation