Apache Kafka¶
Reading Records from Kafka¶
Quine has full support for reading records from Apache Kafka topics. The means by which Quine interprets records into graph data is configurable via REST API and Recipes.
In addition to the API-mapped Kafka options, arbitrary Kafka configuration can be provided via the kafkaProperties field.
In order to avoid confusion and mitigate certain vulnerabilities in the Kafka client libraries, certain configuration keys are disabled by a validation step. The bootstrap.servers property is disallowed because it duplicates the bootstrapServers field. Similarly, certain values for the sasl.jaas.config property are known to introduce vulnerabilities, so those property values are forbidden.
Because of the extreme variety of possible configuration combinations, we cannot provide a comprehensive guide on configuring Kafka. However, we recommend using securityProtocol: "SSL" wherever possible to encrypt requests between Quine and the Kafka broker.
Secure Kafka Configuration¶
Quine supports typed "secret" parameters for Kafka security configuration. These parameters are redacted in API responses and logs (replaced with Secret(****), usually, or **** if a small part of a larger value).
SSL/TLS Passwords¶
| Parameter | Type | Description |
|---|---|---|
sslKeystorePassword |
string (secret) | Password for the SSL keystore file |
sslTruststorePassword |
string (secret) | Password for the SSL truststore file |
sslKeyPassword |
string (secret) | Password for the private key in the keystore |
SASL Authentication¶
The saslJaasConfig parameter accepts one of three authentication types:
PlainLogin¶
For SASL/PLAIN authentication:
{
"saslJaasConfig": {
"type": "PlainLogin",
"username": "my-username",
"password": "my-password"
}
}
ScramLogin¶
For SASL/SCRAM-SHA-256 or SCRAM-SHA-512 authentication:
{
"saslJaasConfig": {
"type": "ScramLogin",
"username": "my-username",
"password": "my-password"
}
}
OAuthBearerLogin¶
For SASL/OAUTHBEARER authentication:
{
"saslJaasConfig": {
"type": "OAuthBearerLogin",
"clientId": "my-client-id",
"clientSecret": "my-client-secret",
"scope": "optional-scope",
"tokenEndpointUrl": "https://auth.example.com/oauth/token"
}
}
| Field | Required | Description |
|---|---|---|
clientId |
Yes | OAuth client identifier |
clientSecret |
Yes | OAuth client secret (redacted in responses) |
scope |
No | OAuth scope for the token request |
tokenEndpointUrl |
No | Token endpoint URL |
Migrating from kafkaProperties¶
The typed Secret parameters take precedence over corresponding entries in kafkaProperties. When both are configured, a warning is logged. For example:
WARN - Kafka property 'ssl.keystore.password' in kafkaProperties will be overridden
by typed Secret parameter. Remove 'ssl.keystore.password' from kafkaProperties to
suppress this warning.
Affected properties:
| kafkaProperties key | Typed parameter |
|---|---|
ssl.keystore.password |
sslKeystorePassword |
ssl.truststore.password |
sslTruststorePassword |
ssl.key.password |
sslKeyPassword |
sasl.jaas.config |
saslJaasConfig |
Before (unprotected):
{
"type": "Kafka",
"topic": "events",
"bootstrapServers": "kafka:9093",
"kafkaProperties": {
"security.protocol": "SASL_SSL",
"ssl.keystore.password": "keystore-secret",
"sasl.jaas.config": "org.apache.kafka...PlainLoginModule required username=\"user\" password=\"pass\";"
}
}
After (protected):
{
"type": "Kafka",
"topic": "events",
"bootstrapServers": "kafka:9093",
"sslKeystorePassword": "keystore-secret",
"saslJaasConfig": {
"type": "PlainLogin",
"username": "user",
"password": "pass"
},
"kafkaProperties": {
"security.protocol": "SASL_SSL"
}
}
Credential Redaction
The typed Secret parameters are automatically redacted in API responses, displaying as Secret(****). Values in kafkaProperties are not redacted—migrate sensitive values to the typed parameters for protection.
OAuth Bearer Token Authentication (SASL/OAUTHBEARER)¶
Quine can authenticate to Kafka brokers using OAuth 2.0 via the SASL/OAUTHBEARER mechanism. This uses the client credentials grant to obtain access tokens from an OAuth identity provider (e.g. Keycloak, Azure Entra ID, Okta) and present them to the Kafka broker.
Required Configuration¶
OAuth credentials (client ID and secret) should be provided via the typed saslJaasConfig parameter with type: OAuthBearerLogin. This ensures credentials are automatically redacted in API responses. The remaining OAuth properties are configured through kafkaProperties:
| Property | Description |
|---|---|
sasl.mechanism |
Must be set to OAUTHBEARER. |
sasl.login.callback.handler.class |
Must be set to org.apache.kafka.common.security.oauthbearer.secured.OAuthBearerLoginCallbackHandler. This tells the Kafka client how to exchange client credentials for an access token. |
sasl.oauthbearer.token.endpoint.url |
The OAuth token endpoint URL from your identity provider. |
For encrypted connections (recommended), the security protocol must also be set to SASL_SSL. See TLS Trust Configuration below for the additional properties needed when using TLS.
Ingest Stream Configuration¶
The KafkaIngest type supports securityProtocol as a top-level field. Set it to SASL_SSL, provide OAuth credentials via saslJaasConfig, and the remaining properties in kafkaProperties:
{
"type": "KafkaIngest",
"topics": ["my-topic"],
"bootstrapServers": "kafka-broker:9096",
"groupId": "my-consumer-group",
"securityProtocol": "SASL_SSL",
"autoOffsetReset": "earliest",
"format": {
"type": "CypherJson",
"query": "MATCH (n) WHERE id(n) = idFrom($that) SET n = $that"
},
"saslJaasConfig": {
"type": "OAuthBearerLogin",
"clientId": "my-client-id",
"clientSecret": "my-client-secret",
"tokenEndpointUrl": "https://my-idp.example.com/realms/kafka/protocol/openid-connect/token"
},
"kafkaProperties": {
"sasl.mechanism": "OAUTHBEARER",
"sasl.login.callback.handler.class": "org.apache.kafka.common.security.oauthbearer.secured.OAuthBearerLoginCallbackHandler",
"sasl.oauthbearer.token.endpoint.url": "https://my-idp.example.com/realms/kafka/protocol/openid-connect/token",
"ssl.truststore.type": "PEM",
"ssl.truststore.location": "/path/to/ca.crt"
}
}
Token Endpoint URL
The sasl.oauthbearer.token.endpoint.url must be set in kafkaProperties even when tokenEndpointUrl is provided in saslJaasConfig. The Kafka OAuthBearerLoginCallbackHandler reads the token endpoint from the client configuration, not the JAAS string.
Standing Query Output Configuration¶
The WriteToKafka output type does not have a top-level securityProtocol field. Instead, set security.protocol inside kafkaProperties. OAuth credentials are still provided via saslJaasConfig:
{
"pattern": {
"type": "Cypher",
"query": "MATCH (n) RETURN DISTINCT id(n) AS id"
},
"outputs": {
"to-kafka": {
"type": "WriteToKafka",
"topic": "my-output-topic",
"bootstrapServers": "kafka-broker:9096",
"saslJaasConfig": {
"type": "OAuthBearerLogin",
"clientId": "my-client-id",
"clientSecret": "my-client-secret",
"tokenEndpointUrl": "https://my-idp.example.com/realms/kafka/protocol/openid-connect/token"
},
"kafkaProperties": {
"security.protocol": "SASL_SSL",
"sasl.mechanism": "OAUTHBEARER",
"sasl.login.callback.handler.class": "org.apache.kafka.common.security.oauthbearer.secured.OAuthBearerLoginCallbackHandler",
"sasl.oauthbearer.token.endpoint.url": "https://my-idp.example.com/realms/kafka/protocol/openid-connect/token",
"ssl.truststore.type": "PEM",
"ssl.truststore.location": "/path/to/ca.crt"
}
}
}
}
securityProtocol Asymmetry
Ingest streams accept securityProtocol as a top-level field on the ingest configuration, but standing query outputs do not. For outputs, you must set security.protocol inside kafkaProperties. This applies to all security protocols, not just SASL_SSL.
TLS Trust Configuration¶
When using SASL_SSL, the Kafka client needs to trust the broker's TLS certificate. Configure trust via kafkaProperties:
| Property | Description |
|---|---|
ssl.truststore.type |
Truststore format: PEM for a CA certificate file, or JKS/PKCS12 for a keystore. |
ssl.truststore.location |
Path to the truststore file. For PEM, this is the CA certificate file (e.g. /path/to/ca.crt). |
ssl.truststore.password |
Password for the truststore (required for JKS and PKCS12; not needed for PEM). |
If the Kafka broker uses certificates signed by a well-known CA, no truststore configuration is needed — the JVM's default trust store will be used. Custom truststore configuration is typically required when:
- The broker uses certificates signed by an internal or private CA.
- The broker uses self-signed certificates.
- The OAuth identity provider's TLS certificate is also signed by a private CA (the truststore must include both the broker CA and the IdP CA).
Identity Provider Requirements¶
The OAuth identity provider must be configured with a client that supports the client credentials grant. At minimum:
- Client type: Confidential (i.e. has a client secret)
- Grant type: Client credentials (
grant_type=client_credentials) - Audience: If the Kafka broker validates the
audclaim in the access token, the identity provider must include an audience mapper that adds the expected audience value to the token.
Example¶
In this example we will ingest messages from a Kafka topic and store them as nodes in the graph.
Preparation¶
For this example we will run Kafka locally. Because Kafka depends on ZooKeeper, we will start that too. Download Kafka, and extract the files to your local filesystem. Start each of ZooKeeper and Kafka in separate terminal sessions by running each of the following commands from the directory where you extracted Kafka.
bin/zookeeper-server-start.sh config/zookeeper.properties
bin/kafka-server-start.sh config/server.properties
With Kafka up and running, messages can be manually sent to the topic using the following command:
bin/kafka-console-producer.sh --bootstrap-server localhost:9092 --topic test-topic
>{"Message": "Hello, world."}
>^D
While kafka-console-producer.sh is running, messages are generated by inputting text followed by a new line. To end the program and stop generating messages, use Control-D.
Using a Recipe¶
The following is a simple Recipe that ingests each message from the Kafka topic as a node in the graph:
version: 1
title: Kafka Ingest
contributor: https://github.com/landon9720
summary: Ingest Kafka topic messages as graph nodes
description: Ingests each message in the Kafka topic "test-topic" as a graph node
ingestStreams:
- type: KafkaIngest
topics:
- test-topic
bootstrapServers: localhost:9092
format:
type: CypherJson
query: |-
MATCH (n)
WHERE id(n) = idFrom($that)
SET n = $that
standingQueries: [ ]
nodeAppearances: [ ]
quickQueries: [ ]
sampleQueries: [ ]
To run this Recipe, run Quine as follows:
❯ java -jar quine-1.10.0.jar -r kafka-ingest.yaml
Graph is ready
Running Recipe Kafka Ingest
Running Ingest Stream INGEST-1
Quine app web server available at http://localhost:8080
| => INGEST-1 status is running and ingested 0
Quine has downloaded the Recipe and begun execution. As shown above, use kafka-console-producer.sh to send a JSON record to the stream. Quine should immediately report that it has ingested the record.
| => INGEST-1 status is running and ingested 1
Results should already be available in the web UI at https://<hostname>:8080.