Apache Kafka¶
Reading Records from Kafka¶
Quine has full support for reading records from Apache Kafka topics. The means by which Quine interprets records into graph data is configurable via REST API and Recipes.
In addition to the API-mapped Kafka options, arbitrary Kafka configuration can be provided via the kafkaProperties field.
In order to avoid confusion and mitigate certain vulnerabilities in the Kafka client libraries, certain configuration keys are disabled by a validation step. The bootstrap.servers property is disallowed because it duplicates the bootstrapServers field. Similarly, certain values for the sasl.jaas.config property are known to introduce vulnerabilities, so those property values are forbidden.
Because of the extreme variety of possible configuration combinations, we cannot provide a comprehensive guide on configuring Kafka. However, we recommend using securityProtocol: "SSL" wherever possible to encrypt requests between Quine and the Kafka broker.
Secure Kafka Configuration¶
Quine supports typed "secret" parameters for Kafka security configuration. These parameters are redacted in API responses and logs (replaced with Secret(****), usually, or **** if a small part of a larger value).
SSL/TLS Passwords¶
| Parameter | Type | Description |
|---|---|---|
sslKeystorePassword |
string (secret) | Password for the SSL keystore file |
sslTruststorePassword |
string (secret) | Password for the SSL truststore file |
sslKeyPassword |
string (secret) | Password for the private key in the keystore |
SASL Authentication¶
The saslJaasConfig parameter accepts one of three authentication types:
PlainLogin¶
For SASL/PLAIN authentication:
{
"saslJaasConfig": {
"type": "PlainLogin",
"username": "my-username",
"password": "my-password"
}
}
ScramLogin¶
For SASL/SCRAM-SHA-256 or SCRAM-SHA-512 authentication:
{
"saslJaasConfig": {
"type": "ScramLogin",
"username": "my-username",
"password": "my-password"
}
}
OAuthBearerLogin¶
For SASL/OAUTHBEARER authentication:
{
"saslJaasConfig": {
"type": "OAuthBearerLogin",
"clientId": "my-client-id",
"clientSecret": "my-client-secret",
"scope": "optional-scope",
"tokenEndpointUrl": "https://auth.example.com/oauth/token"
}
}
| Field | Required | Description |
|---|---|---|
clientId |
Yes | OAuth client identifier |
clientSecret |
Yes | OAuth client secret (redacted in responses) |
scope |
No | OAuth scope for the token request |
tokenEndpointUrl |
No | Token endpoint URL |
Migrating from kafkaProperties¶
The typed Secret parameters take precedence over corresponding entries in kafkaProperties. When both are configured, a warning is logged. For example:
WARN - Kafka property 'ssl.keystore.password' in kafkaProperties will be overridden
by typed Secret parameter. Remove 'ssl.keystore.password' from kafkaProperties to
suppress this warning.
Affected properties:
| kafkaProperties key | Typed parameter |
|---|---|
ssl.keystore.password |
sslKeystorePassword |
ssl.truststore.password |
sslTruststorePassword |
ssl.key.password |
sslKeyPassword |
sasl.jaas.config |
saslJaasConfig |
Before (unprotected):
{
"type": "Kafka",
"topic": "events",
"bootstrapServers": "kafka:9093",
"kafkaProperties": {
"security.protocol": "SASL_SSL",
"ssl.keystore.password": "keystore-secret",
"sasl.jaas.config": "org.apache.kafka...PlainLoginModule required username=\"user\" password=\"pass\";"
}
}
After (protected):
{
"type": "Kafka",
"topic": "events",
"bootstrapServers": "kafka:9093",
"sslKeystorePassword": "keystore-secret",
"saslJaasConfig": {
"type": "PlainLogin",
"username": "user",
"password": "pass"
},
"kafkaProperties": {
"security.protocol": "SASL_SSL"
}
}
Credential Redaction
The typed Secret parameters are automatically redacted in API responses, displaying as Secret(****). Values in kafkaProperties are not redacted—migrate sensitive values to the typed parameters for protection.
Example¶
In this example we will ingest messages from a Kafka topic and store them as nodes in the graph.
Preparation¶
For this example we will run Kafka locally. Because Kafka depends on ZooKeeper, we will start that too. Download Kafka, and extract the files to your local filesystem. Start each of ZooKeeper and Kafka in separate terminal sessions by running each of the following commands from the directory where you extracted Kafka.
bin/zookeeper-server-start.sh config/zookeeper.properties
bin/kafka-server-start.sh config/server.properties
With Kafka up and running, messages can be manually sent to the topic using the following command:
bin/kafka-console-producer.sh --bootstrap-server localhost:9092 --topic test-topic
>{"Message": "Hello, world."}
>^D
While kafka-console-producer.sh is running, messages are generated by inputting text followed by a new line. To end the program and stop generating messages, use Control-D.
Using a Recipe¶
The following is a simple Recipe that ingests each message from the Kafka topic as a node in the graph:
version: 1
title: Kafka Ingest
contributor: https://github.com/landon9720
summary: Ingest Kafka topic messages as graph nodes
description: Ingests each message in the Kafka topic "test-topic" as a graph node
ingestStreams:
- type: KafkaIngest
topics:
- test-topic
bootstrapServers: localhost:9092
format:
type: CypherJson
query: |-
MATCH (n)
WHERE id(n) = idFrom($that)
SET n = $that
standingQueries: [ ]
nodeAppearances: [ ]
quickQueries: [ ]
sampleQueries: [ ]
To run this Recipe, run Quine as follows:
❯ java -jar quine-1.10.0.jar -r kafka-ingest.yaml
Graph is ready
Running Recipe Kafka Ingest
Running Ingest Stream INGEST-1
Quine app web server available at http://localhost:8080
| => INGEST-1 status is running and ingested 0
Quine has downloaded the Recipe and begun execution. As shown above, use kafka-console-producer.sh to send a JSON record to the stream. Quine should immediately report that it has ingested the record.
| => INGEST-1 status is running and ingested 1
Results should already be available in the web UI at https://<hostname>:8080.