AWS Kinesis¶
Quine provides two methods for ingesting data from Amazon Kinesis Data Streams. Configure Kinesis ingests via the REST API.
| Ingest Type | Use Case | Features |
|---|---|---|
| Kinesis | Single-instance deployments, specific shard selection, lightweight setup | Direct shard access, sequence number positioning |
| KinesisKCL | Multi-worker deployments, fault-tolerant processing, automatic shard rebalancing | Checkpointing, lease management, enhanced fan-out (requires DynamoDB) |
When to use KinesisKCL
Use KinesisKCL when you need checkpointing across restarts, distributed processing across multiple workers, or automatic handling of shard splits and merges. Note that KCL requires additional AWS resources (DynamoDB for leases, CloudWatch for metrics) which incur extra costs.
Kinesis¶
The Kinesis type provides direct access to Kinesis shards without the overhead of lease management or checkpointing. Use this for single-instance deployments, lightweight setups, or when you need precise control over which shards to read.
Example¶
In this example, we will register a multiple-shard Kinesis stream of JSON objects (one JSON object per Kinesis record) as a data source, creating a single node in the graph for each object.
Preparation¶
For the purposes of this example, you will need a Kinesis data stream and credentials (an access key ID and secret access key) for an IAM User with the following privileges:
- kinesis:RegisterStreamConsumer
- kinesis:DeregisterStreamConsumer
- kinesis:SubscribeToShard
- kinesis:DescribeStreamSummary
- kinesis:DescribeStreamConsumer
- kinesis:GetShardIterator
- kinesis:GetRecords
- kinesis:DescribeStream
- kinesis:ListTagsForStream
For our example, we'll assume we have such a user with access to the json-logs stream with access key ID <ACCESS_KEY_ID> and secret <SECRET_ACCESS_KEY>. These will be used to register the data source with Quine.
Registering Kinesis as a data source¶
To register Kinesis as a data source to Quine, we need to describe our stream via the ingest REST API.
For example, we'll use the aforementioned Kinesis stream hosted in the region us-west-2, named json-logs and we'll give the Quine ingest stream the name kinesis-logs. Thus, we make our API request using Create Ingest Stream: POST /api/v2/ingests?name=kinesis-logs with the following payload:
{
"type": "Kinesis",
"format": {
"query": "CREATE ($that)",
"type": "CypherJson"
},
"streamName": "json-logs",
"parallelism": 2,
"shardIds": [],
"credentials": {
"accessKeyId": "<ACCESS_KEY_ID>",
"secretAccessKey": "<SECRET_ACCESS_KEY>"
},
"region": "us-west-2",
"iteratorType": "TrimHorizon"
}
We pass in an empty list of shard IDs to specify that Quine should read from all shards in the stream. If we wanted to only read from particular shards, we would instead list out the shard IDs from which Quine should read.
Because the Kinesis stream is filled with JSON records, we choose the CypherJson import format, which reads each record as a JSON object before passing it as a Map to a Cypher query.
The Cypher query can access this object using the parameter $that. Thus, our configured query CREATE ($that) will create a node for each JSON record with the same property structure as the JSON record.
In this example, we use a Kinesis stream populated with JSON objects as records, though Quine offers other options for how to interpret records from a stream. These options are configurable via the same endpoint by using different formats in the above JSON payload.
Finally, we choose to read all records from the Kinesis stream, including records already present in the stream when configuring the Quine data source. To get this behavior, we use a TrimHorizon Kinesis iterator type. If we wished to only read records written to the Kinesis stream after setting up the Quine data source, we would have used the Latest iterator type.
Kinesis Configuration Reference¶
| Parameter | Type | Default | Description |
|---|---|---|---|
streamName |
string | required | Name of the Kinesis stream |
shardIds |
array | [] (all) | Specific shard IDs to read; empty for all shards |
format |
object | required | Record format and Cypher query for processing |
parallelism |
int | 16 | Maximum concurrent database writes |
credentials |
object | env default | AWS credentials (see AWS Credentials) |
region |
string | env default | AWS region (see AWS Region) |
iteratorType |
string/object | Latest |
Starting position in the stream |
numRetries |
int | 3 | Retry attempts on errors |
maximumPerSecond |
int | unlimited | Rate limit for records processed per second |
recordDecoders |
array | [] | Record decodings to apply (e.g., Zlib, Gzip, Base64) |
Iterator Type Options¶
The iteratorType determines where to start reading in the stream:
| Value | Description |
|---|---|
"Latest" |
Start with new records only |
"TrimHorizon" |
Start from the oldest available record |
{"AtSequenceNumber": {"sequenceNumber": "..."}} |
Start at a specific sequence number |
{"AfterSequenceNumber": {"sequenceNumber": "..."}} |
Start after a specific sequence number |
{"AtTimestamp": {"millisSinceEpoch": 1234567890000}} |
Start at a Unix timestamp (milliseconds) |
KinesisKCL¶
The KinesisKCL type uses the AWS Kinesis Client Library (KCL) 3.x to provide distributed stream processing with:
- Checkpointing: Automatic tracking of processed records via DynamoDB
- Lease Management: Distributed coordination of shard processing across multiple workers
- Enhanced Fan-Out: Dedicated throughput per consumer (optional)
- Automatic Shard Handling: Seamless processing of shard splits and merges
- CloudWatch Metrics: Built-in monitoring and observability
Prerequisites¶
KinesisKCL requires additional AWS resources and permissions beyond basic Kinesis access.
IAM Permissions¶
Your IAM user or role needs permissions for Kinesis, DynamoDB (lease table), and CloudWatch (metrics):
Kinesis permissions:
- kinesis:DescribeStream
- kinesis:DescribeStreamSummary
- kinesis:GetRecords
- kinesis:GetShardIterator
- kinesis:ListShards
- kinesis:ListTagsForStream
- kinesis:SubscribeToShard (for Enhanced Fan-Out)
- kinesis:RegisterStreamConsumer (for Enhanced Fan-Out)
- kinesis:DescribeStreamConsumer (for Enhanced Fan-Out)
- kinesis:DeregisterStreamConsumer (for Enhanced Fan-Out)
DynamoDB permissions (for lease table):
- dynamodb:CreateTable
- dynamodb:DescribeTable
- dynamodb:GetItem
- dynamodb:PutItem
- dynamodb:UpdateItem
- dynamodb:DeleteItem
- dynamodb:Scan
CloudWatch permissions (for metrics):
- cloudwatch:PutMetricData
Basic KinesisKCL Example¶
{
"type": "KinesisKCL",
"format": {
"query": "CREATE ($that)",
"type": "CypherJson"
},
"kinesisStreamName": "json-logs",
"applicationName": "quine-json-logs-processor",
"parallelism": 16,
"credentials": {
"accessKeyId": "<ACCESS_KEY_ID>",
"secretAccessKey": "<SECRET_ACCESS_KEY>"
},
"region": "us-west-2",
"initialPosition": "TrimHorizon"
}
The applicationName serves as the identifier for your consumer application and is used as the default name for the DynamoDB lease table and CloudWatch metrics namespace.
KinesisKCL Configuration Reference¶
Core Settings¶
| Parameter | Type | Default | Description |
|---|---|---|---|
kinesisStreamName |
string | required | Name of the Kinesis stream to ingest |
applicationName |
string | required | Unique application name; used as the DynamoDB lease table name and CloudWatch namespace |
format |
object | required | Record format and Cypher query for processing |
parallelism |
int | 16 | Maximum concurrent database writes |
credentials |
object | env default | AWS credentials (see AWS Credentials) |
region |
string | env default | AWS region (see AWS Region) |
initialPosition |
string/object | Latest |
Where to start reading: Latest, TrimHorizon, or AtTimestamp |
numRetries |
int | 3 | Number of retry attempts on Kinesis errors |
maximumPerSecond |
int | unlimited | Rate limit for records processed per second |
recordDecoders |
array | [] | Record decodings applied to each record (e.g., Zlib, Gzip, Base64) |
Initial Position Options¶
The initialPosition determines where KCL begins reading when no checkpoint exists:
| Value | Description |
|---|---|
"Latest" |
Start with records added after the ingest begins |
"TrimHorizon" |
Start from the oldest available record in the stream |
{"AtTimestamp": {"year": 2025, "month": 4, "date": 15, "hourOfDay": 10, "minute": 30, "second": 0}} |
Start from records at or after the specified timestamp (month and day are 1-indexed) |
InitialPosition vs IteratorType
KinesisKCL uses initialPosition which supports Latest, TrimHorizon, and AtTimestamp only. Kinesis uses iteratorType which also supports AtSequenceNumber and AfterSequenceNumber.
Checkpoint Settings¶
Checkpointing tracks which records have been successfully processed, enabling recovery after failures. Configure via checkpointSettings:
{
"checkpointSettings": {
"disableCheckpointing": false,
"maxBatchSize": 1000,
"maxBatchWaitMillis": 10000
}
}
| Parameter | Type | Default | Description |
|---|---|---|---|
disableCheckpointing |
boolean | false | Set to true to disable checkpointing entirely |
maxBatchSize |
int | none | Maximum records to batch before checkpointing |
maxBatchWaitMillis |
long | none | Maximum time (ms) to wait before checkpointing a batch |
Disabling Checkpointing
Disabling checkpointing means records may be reprocessed after restarts. Only disable this for idempotent operations or development/testing scenarios.
Scheduler Source Settings¶
Control the internal buffer and backpressure behavior via schedulerSourceSettings:
{
"schedulerSourceSettings": {
"bufferSize": 1000,
"backpressureTimeoutMillis": 60000
}
}
| Parameter | Type | Default | Description |
|---|---|---|---|
bufferSize |
int | none | Internal buffer size; must be > 0; use 1 to disable buffering |
backpressureTimeoutMillis |
long | none | Timeout (ms) for backpressure before failing |
Advanced KCL Configuration¶
For fine-grained control over KCL behavior, use the advancedSettings object. These settings map directly to the KCL 3.x configuration options.
| Configuration Group | Purpose |
|---|---|
configsBuilder |
Custom lease table name and worker identifier |
retrievalSpecificConfig |
Choose between Polling (shared throughput) or Enhanced Fan-Out (dedicated 2 MB/s per consumer) |
leaseManagementConfig |
Shard lease coordination and DynamoDB table settings |
coordinatorConfig |
Shard prioritization and sync behavior |
lifecycleConfig |
Task retry and warning thresholds |
retrievalConfig |
ListShards retry settings |
metricsConfig |
CloudWatch metrics level and dimensions |
processorConfig |
Empty record list handling |
See the API documentation for complete parameter details.
Complete KinesisKCL Example¶
Here's a comprehensive example with advanced settings for a production deployment using Enhanced Fan-Out:
{
"type": "KinesisKCL",
"format": {
"query": "MATCH (n) WHERE id(n) = idFrom('event', $that.eventId) SET n = $that",
"type": "CypherJson"
},
"kinesisStreamName": "production-events",
"applicationName": "quine-prod-processor",
"parallelism": 32,
"credentials": {
"accessKeyId": "<ACCESS_KEY_ID>",
"secretAccessKey": "<SECRET_ACCESS_KEY>"
},
"region": "us-west-2",
"initialPosition": "TrimHorizon",
"numRetries": 5,
"maximumPerSecond": 10000,
"checkpointSettings": {
"maxBatchSize": 1000,
"maxBatchWaitMillis": 5000
},
"advancedSettings": {
"configsBuilder": {
"tableName": "quine-prod-leases"
},
"retrievalSpecificConfig": {
"type": "FanOutConfig",
"consumerName": "quine-prod-consumer"
},
"leaseManagementConfig": {
"failoverTimeMillis": 10000,
"maxLeasesForWorker": 50,
"billingMode": "PAY_PER_REQUEST",
"isGracefulLeaseHandoffEnabled": true,
"gracefulLeaseHandoffTimeoutMillis": 30000
},
"metricsConfig": {
"metricsLevel": "SUMMARY"
}
}
}
AWS Credentials¶
Explicit Credentials¶
{
"credentials": {
"accessKeyId": "<ACCESS_KEY_ID>",
"secretAccessKey": "<SECRET_ACCESS_KEY>"
}
}
Environment Credentials¶
If credentials is omitted, Quine uses the default AWS credential provider chain.
AWS Region¶
Explicit Region¶
{
"region": "us-west-2"
}
Environment Region¶
If region is omitted, Quine uses the region provider chain.
Need help?
See Troubleshooting Ingest for help with missing data, slow ingests, and other common issues.