Skip to content

AWS Kinesis

Quine provides two methods for ingesting data from Amazon Kinesis Data Streams. Configure Kinesis ingests via the REST API.

Ingest Type Use Case Features
Kinesis Single-instance deployments, specific shard selection, lightweight setup Direct shard access, sequence number positioning
KinesisKCL Multi-worker deployments, fault-tolerant processing, automatic shard rebalancing Checkpointing, lease management, enhanced fan-out (requires DynamoDB)

When to use KinesisKCL

Use KinesisKCL when you need checkpointing across restarts, distributed processing across multiple workers, or automatic handling of shard splits and merges. Note that KCL requires additional AWS resources (DynamoDB for leases, CloudWatch for metrics) which incur extra costs.


Kinesis

The Kinesis type provides direct access to Kinesis shards without the overhead of lease management or checkpointing. Use this for single-instance deployments, lightweight setups, or when you need precise control over which shards to read.

Example

In this example, we will register a multiple-shard Kinesis stream of JSON objects (one JSON object per Kinesis record) as a data source, creating a single node in the graph for each object.

Preparation

For the purposes of this example, you will need a Kinesis data stream and credentials (an access key ID and secret access key) for an IAM User with the following privileges:

  • kinesis:RegisterStreamConsumer
  • kinesis:DeregisterStreamConsumer
  • kinesis:SubscribeToShard
  • kinesis:DescribeStreamSummary
  • kinesis:DescribeStreamConsumer
  • kinesis:GetShardIterator
  • kinesis:GetRecords
  • kinesis:DescribeStream
  • kinesis:ListTagsForStream

For our example, we'll assume we have such a user with access to the json-logs stream with access key ID <ACCESS_KEY_ID> and secret <SECRET_ACCESS_KEY>. These will be used to register the data source with Quine.

Registering Kinesis as a data source

To register Kinesis as a data source to Quine, we need to describe our stream via the ingest REST API.

For example, we'll use the aforementioned Kinesis stream hosted in the region us-west-2, named json-logs and we'll give the Quine ingest stream the name kinesis-logs. Thus, we make our API request using Create Ingest Stream: POST /api/v2/ingests?name=kinesis-logs with the following payload:

{
  "type": "Kinesis",
  "format": {
    "query": "CREATE ($that)",
    "type": "CypherJson"
  },
  "streamName": "json-logs",
  "parallelism": 2,
  "shardIds": [],
  "credentials": {
    "accessKeyId": "<ACCESS_KEY_ID>",
    "secretAccessKey": "<SECRET_ACCESS_KEY>"
  },
  "region": "us-west-2",
  "iteratorType": "TrimHorizon"
}

We pass in an empty list of shard IDs to specify that Quine should read from all shards in the stream. If we wanted to only read from particular shards, we would instead list out the shard IDs from which Quine should read.

Because the Kinesis stream is filled with JSON records, we choose the CypherJson import format, which reads each record as a JSON object before passing it as a Map to a Cypher query.

The Cypher query can access this object using the parameter $that. Thus, our configured query CREATE ($that) will create a node for each JSON record with the same property structure as the JSON record.

In this example, we use a Kinesis stream populated with JSON objects as records, though Quine offers other options for how to interpret records from a stream. These options are configurable via the same endpoint by using different formats in the above JSON payload.

Finally, we choose to read all records from the Kinesis stream, including records already present in the stream when configuring the Quine data source. To get this behavior, we use a TrimHorizon Kinesis iterator type. If we wished to only read records written to the Kinesis stream after setting up the Quine data source, we would have used the Latest iterator type.

Kinesis Configuration Reference

Parameter Type Default Description
streamName string required Name of the Kinesis stream
shardIds array [] (all) Specific shard IDs to read; empty for all shards
format object required Record format and Cypher query for processing
parallelism int 16 Maximum concurrent database writes
credentials object env default AWS credentials (see AWS Credentials)
region string env default AWS region (see AWS Region)
iteratorType string/object Latest Starting position in the stream
numRetries int 3 Retry attempts on errors
maximumPerSecond int unlimited Rate limit for records processed per second
recordDecoders array [] Record decodings to apply (e.g., Zlib, Gzip, Base64)

Iterator Type Options

The iteratorType determines where to start reading in the stream:

Value Description
"Latest" Start with new records only
"TrimHorizon" Start from the oldest available record
{"AtSequenceNumber": {"sequenceNumber": "..."}} Start at a specific sequence number
{"AfterSequenceNumber": {"sequenceNumber": "..."}} Start after a specific sequence number
{"AtTimestamp": {"millisSinceEpoch": 1234567890000}} Start at a Unix timestamp (milliseconds)

KinesisKCL

The KinesisKCL type uses the AWS Kinesis Client Library (KCL) 3.x to provide distributed stream processing with:

  • Checkpointing: Automatic tracking of processed records via DynamoDB
  • Lease Management: Distributed coordination of shard processing across multiple workers
  • Enhanced Fan-Out: Dedicated throughput per consumer (optional)
  • Automatic Shard Handling: Seamless processing of shard splits and merges
  • CloudWatch Metrics: Built-in monitoring and observability

Prerequisites

KinesisKCL requires additional AWS resources and permissions beyond basic Kinesis access.

IAM Permissions

Your IAM user or role needs permissions for Kinesis, DynamoDB (lease table), and CloudWatch (metrics):

Kinesis permissions:

  • kinesis:DescribeStream
  • kinesis:DescribeStreamSummary
  • kinesis:GetRecords
  • kinesis:GetShardIterator
  • kinesis:ListShards
  • kinesis:ListTagsForStream
  • kinesis:SubscribeToShard (for Enhanced Fan-Out)
  • kinesis:RegisterStreamConsumer (for Enhanced Fan-Out)
  • kinesis:DescribeStreamConsumer (for Enhanced Fan-Out)
  • kinesis:DeregisterStreamConsumer (for Enhanced Fan-Out)

DynamoDB permissions (for lease table):

  • dynamodb:CreateTable
  • dynamodb:DescribeTable
  • dynamodb:GetItem
  • dynamodb:PutItem
  • dynamodb:UpdateItem
  • dynamodb:DeleteItem
  • dynamodb:Scan

CloudWatch permissions (for metrics):

  • cloudwatch:PutMetricData

Basic KinesisKCL Example

{
  "type": "KinesisKCL",
  "format": {
    "query": "CREATE ($that)",
    "type": "CypherJson"
  },
  "kinesisStreamName": "json-logs",
  "applicationName": "quine-json-logs-processor",
  "parallelism": 16,
  "credentials": {
    "accessKeyId": "<ACCESS_KEY_ID>",
    "secretAccessKey": "<SECRET_ACCESS_KEY>"
  },
  "region": "us-west-2",
  "initialPosition": "TrimHorizon"
}

The applicationName serves as the identifier for your consumer application and is used as the default name for the DynamoDB lease table and CloudWatch metrics namespace.

KinesisKCL Configuration Reference

Core Settings

Parameter Type Default Description
kinesisStreamName string required Name of the Kinesis stream to ingest
applicationName string required Unique application name; used as the DynamoDB lease table name and CloudWatch namespace
format object required Record format and Cypher query for processing
parallelism int 16 Maximum concurrent database writes
credentials object env default AWS credentials (see AWS Credentials)
region string env default AWS region (see AWS Region)
initialPosition string/object Latest Where to start reading: Latest, TrimHorizon, or AtTimestamp
numRetries int 3 Number of retry attempts on Kinesis errors
maximumPerSecond int unlimited Rate limit for records processed per second
recordDecoders array [] Record decodings applied to each record (e.g., Zlib, Gzip, Base64)

Initial Position Options

The initialPosition determines where KCL begins reading when no checkpoint exists:

Value Description
"Latest" Start with records added after the ingest begins
"TrimHorizon" Start from the oldest available record in the stream
{"AtTimestamp": {"year": 2025, "month": 4, "date": 15, "hourOfDay": 10, "minute": 30, "second": 0}} Start from records at or after the specified timestamp (month and day are 1-indexed)

InitialPosition vs IteratorType

KinesisKCL uses initialPosition which supports Latest, TrimHorizon, and AtTimestamp only. Kinesis uses iteratorType which also supports AtSequenceNumber and AfterSequenceNumber.

Checkpoint Settings

Checkpointing tracks which records have been successfully processed, enabling recovery after failures. Configure via checkpointSettings:

{
  "checkpointSettings": {
    "disableCheckpointing": false,
    "maxBatchSize": 1000,
    "maxBatchWaitMillis": 10000
  }
}
Parameter Type Default Description
disableCheckpointing boolean false Set to true to disable checkpointing entirely
maxBatchSize int none Maximum records to batch before checkpointing
maxBatchWaitMillis long none Maximum time (ms) to wait before checkpointing a batch

Disabling Checkpointing

Disabling checkpointing means records may be reprocessed after restarts. Only disable this for idempotent operations or development/testing scenarios.

Scheduler Source Settings

Control the internal buffer and backpressure behavior via schedulerSourceSettings:

{
  "schedulerSourceSettings": {
    "bufferSize": 1000,
    "backpressureTimeoutMillis": 60000
  }
}
Parameter Type Default Description
bufferSize int none Internal buffer size; must be > 0; use 1 to disable buffering
backpressureTimeoutMillis long none Timeout (ms) for backpressure before failing

Advanced KCL Configuration

For fine-grained control over KCL behavior, use the advancedSettings object. These settings map directly to the KCL 3.x configuration options.

Configuration Group Purpose
configsBuilder Custom lease table name and worker identifier
retrievalSpecificConfig Choose between Polling (shared throughput) or Enhanced Fan-Out (dedicated 2 MB/s per consumer)
leaseManagementConfig Shard lease coordination and DynamoDB table settings
coordinatorConfig Shard prioritization and sync behavior
lifecycleConfig Task retry and warning thresholds
retrievalConfig ListShards retry settings
metricsConfig CloudWatch metrics level and dimensions
processorConfig Empty record list handling

See the API documentation for complete parameter details.

Complete KinesisKCL Example

Here's a comprehensive example with advanced settings for a production deployment using Enhanced Fan-Out:

{
  "type": "KinesisKCL",
  "format": {
    "query": "MATCH (n) WHERE id(n) = idFrom('event', $that.eventId) SET n = $that",
    "type": "CypherJson"
  },
  "kinesisStreamName": "production-events",
  "applicationName": "quine-prod-processor",
  "parallelism": 32,
  "credentials": {
    "accessKeyId": "<ACCESS_KEY_ID>",
    "secretAccessKey": "<SECRET_ACCESS_KEY>"
  },
  "region": "us-west-2",
  "initialPosition": "TrimHorizon",
  "numRetries": 5,
  "maximumPerSecond": 10000,
  "checkpointSettings": {
    "maxBatchSize": 1000,
    "maxBatchWaitMillis": 5000
  },
  "advancedSettings": {
    "configsBuilder": {
      "tableName": "quine-prod-leases"
    },
    "retrievalSpecificConfig": {
      "type": "FanOutConfig",
      "consumerName": "quine-prod-consumer"
    },
    "leaseManagementConfig": {
      "failoverTimeMillis": 10000,
      "maxLeasesForWorker": 50,
      "billingMode": "PAY_PER_REQUEST",
      "isGracefulLeaseHandoffEnabled": true,
      "gracefulLeaseHandoffTimeoutMillis": 30000
    },
    "metricsConfig": {
      "metricsLevel": "SUMMARY"
    }
  }
}

AWS Credentials

Explicit Credentials

{
  "credentials": {
    "accessKeyId": "<ACCESS_KEY_ID>",
    "secretAccessKey": "<SECRET_ACCESS_KEY>"
  }
}

Environment Credentials

If credentials is omitted, Quine uses the default AWS credential provider chain.


AWS Region

Explicit Region

{
  "region": "us-west-2"
}

Environment Region

If region is omitted, Quine uses the region provider chain.


Need help?

See Troubleshooting Ingest for help with missing data, slow ingests, and other common issues.

Additional Resources