Skip to content

Reactive Streams

Reactive Streams enable TCP-based, backpressured communication between thatDot products. Use them to connect Quine Enterprise and Novelty for bidirectional data processing pipelines.

Role Description Use Case
Publisher (Server) Binds to a port and broadcasts data to connected subscribers Standing query outputs, observation results
Subscriber (Client) Connects to a publisher to receive streamed data Ingest from another product's output

ReactiveStream as Ingest Source

The ReactiveStream ingest source connects as a client to an existing reactive stream publisher. Use this to ingest data from another product's standing query or observation output.

Example

In this example, we configure Quine to ingest data from a reactive stream server running on port 9002.

Registering via the API

To register a ReactiveStream ingest via Create Ingest Stream: POST /api/v2/ingests:

{
  "name": "from-reactive-stream",
  "source": {
    "type": "ReactiveStream",
    "url": "localhost",
    "port": 9002,
    "format": {
      "type": "Json"
    }
  },
  "query": "CREATE (n:Event $that)",
  "parameter": "that",
  "parallelism": 1
}

Using a Recipe

The same ingest stream defined in a Recipe:

ingestStreams:
  - type: ReactiveStream
    url: localhost
    port: 9002
    format:
      type: CypherJson
      query: |-
        MATCH (n)
        WHERE id(n) = idFrom($that.id)
        SET n = $that

Ingest Source Configuration Reference

Parameter Type Required Description
type string Yes Must be "ReactiveStream"
url string Yes Hostname of the reactive stream server to connect to
port integer Yes Port of the reactive stream server
format object Yes Record format (e.g., Json, CypherJson)

ReactiveStream as Output Destination

The ReactiveStream output destination creates a server that broadcasts results to connected subscribers. Use this to publish standing query results for downstream processing by other products.

Example

Configure a standing query to output results to a reactive stream on port 9001:

{
  "name": "events-to-downstream",
  "pattern": {
    "type": "Cypher",
    "query": "MATCH (n:Event) RETURN DISTINCT id(n) AS id",
    "mode": "DistinctId"
  },
  "outputs": [
    {
      "name": "to-reactive-stream",
      "destinations": [
        {
          "type": "ReactiveStream",
          "address": "0.0.0.0",
          "port": 9001,
          "format": {
            "type": "JSON"
          }
        }
      ]
    }
  ]
}

Output Destination Configuration Reference

Parameter Type Required Default Description
type string Yes Must be "ReactiveStream"
address string No localhost Address to bind the reactive stream server
port integer Yes Port to bind the reactive stream server
format object Yes Output format (JSON or Protobuf)

Cluster Limitation

Reactive Stream outputs do not function correctly when running in a cluster. Use Kafka or Kinesis for clustered deployments.


Connecting Quine Enterprise and Novelty

Reactive Streams enable powerful bidirectional data pipelines between Quine Enterprise and Novelty. This pattern allows you to:

  1. Process graph data in Quine Enterprise
  2. Stream results to Novelty for anomaly detection
  3. Feed Novelty's observations back into Quine Enterprise for further analysis

Architecture

┌─────────────────────┐                    ┌─────────────────────┐
│   Quine Enterprise  │                    │       Novelty       │
│                     │                    │                     │
│  ┌───────────────┐  │   ReactiveStream   │  ┌───────────────┐  │
│  │ Standing Query├──┼──── port 9001 ────►│──┤    Ingest     │  │
│  └───────────────┘  │                    │  └───────┬───────┘  │
│                     │                    │          │          │
│                     │                    │          ▼          │
│                     │                    │  ┌───────────────┐  │
│  ┌───────────────┐  │   ReactiveStream   │  │  Observations │  │
│  │    Ingest     │◄─┼──── port 9002 ─────┤──┤    Output     │  │
│  └───────────────┘  │                    │  └───────────────┘  │
│                     │                    │                     │
└─────────────────────┘                    └─────────────────────┘

Step 1: Quine Enterprise Standing Query Output

Configure Quine Enterprise to output standing query results to a reactive stream:

{
  "name": "events-to-novelty",
  "pattern": {
    "type": "Cypher",
    "query": "MATCH (n:Event) RETURN DISTINCT id(n) AS id",
    "mode": "DistinctId"
  },
  "outputs": [
    {
      "name": "to-novelty",
      "resultEnrichment": {
        "query": "MATCH (n) WHERE id(n) = $that.data.id RETURN {eventId: id(n), type: n.type, timestamp: n.timestamp} AS result",
        "parameter": "that",
        "parallelism": 1
      },
      "destinations": [
        {
          "type": "ReactiveStream",
          "address": "0.0.0.0",
          "port": 9001,
          "format": {
            "type": "JSON"
          }
        }
      ]
    }
  ]
}

Step 2: Create a Novelty Namespace

Before creating an ingest in Novelty, you must create a namespace to store observations. Namespaces must be 1–16 characters, start with a letter, and contain only letters and digits (no hyphens, underscores, or special characters).

curl -X POST http://localhost:8181/api/v2/namespaces \
  -H 'Content-Type: text/plain' \
  -d 'eventanalysis'

Returns 201 Created if the namespace is new, or 204 No Content if it already exists.

Step 3: Novelty Ingest from Quine Enterprise

Configure Novelty to ingest from Quine Enterprise's reactive stream and output observations to another reactive stream. The noveltyNamespace field specifies which namespace to store observations in — this namespace must already exist.

{
  "name": "from-quine",
  "noveltyNamespace": "eventanalysis",
  "transformation": "event-transform",
  "source": {
    "type": "ReactiveStream",
    "url": "localhost",
    "port": 9001,
    "format": {
      "type": "Json"
    }
  },
  "parallelism": 1,
  "outputWorkflow": {
    "type": "EachResult",
    "destinations": [
      {
        "type": "ReactiveStream",
        "address": "0.0.0.0",
        "port": 9002,
        "format": {
          "type": "JSON"
        }
      }
    ]
  }
}

Step 4: Quine Enterprise Ingest from Novelty

Configure Quine Enterprise to ingest Novelty's observations:

{
  "name": "from-novelty",
  "source": {
    "type": "ReactiveStream",
    "url": "localhost",
    "port": 9002,
    "format": {
      "type": "Json"
    }
  },
  "query": "CREATE (n:NoveltyObservation {score: $that.score, observation: $that.observation, sequence: $that.sequence, processedAt: datetime()})",
  "parameter": "that",
  "parallelism": 1
}

Troubleshooting

Connection Refused

If you see connection errors when starting a ReactiveStream ingest:

  • Verify the publisher (server) is running and bound to the expected port
  • Check that the url and port in your ingest configuration match the publisher's address and port
  • Ensure no firewall rules are blocking the connection

No Data Received

If the connection succeeds but no data flows:

  • Verify the format types match between publisher and subscriber (e.g., both using Json)
  • Check that the standing query or observation output is actively producing results
  • Use the standing query statistics API to verify results are being generated

Backpressure

If data processing slows or stalls:

  • The subscriber may be overwhelmed; increase parallelism or optimize your ingest query
  • Check downstream systems (persistor, standing queries) for bottlenecks
  • Monitor Quine metrics for queue depth and processing rates

Additional Resources