Skip to main content
For more information on destinations, see the Destinations page. When new data is read from a SaaS instance via a Read Action or Subscribe Action, Ampersand can send the payload to your AWS Kinesis stream.

Prerequisites

Before setting up a Kinesis destination, ensure that you have:
  • An AWS account with access to Kinesis
  • A Kinesis stream (or permissions to create one)
  • AWS credentials with the following permissions:
    • kinesis:PutRecord
    • kinesis:PutRecords
    • kinesis:DescribeStream

Create a Kinesis destination

Step 1: Set up your Kinesis stream

If you don’t already have a Kinesis stream, create one in the AWS Console or using the AWS CLI:
aws kinesis create-stream \
  --stream-name ampersand-integration-stream \
  --shard-count 1 \
  --region us-west-2
Wait for the stream to become active, you can check by running this command:
aws kinesis describe-stream \
  --stream-name ampersand-integration-stream \
  --region us-west-2
You should see "StreamStatus": "ACTIVE" in the response.

Step 2: Create AWS credentials

Create an IAM user or role with permissions to write to your Kinesis stream. You need:
  • AWS Access Key ID
  • AWS Secret Access Key
  • AWS Session Token (optional, for temporary credentials)
Example IAM policy:
{
  "Version": "2012-10-17",
  "Statement": [
    {
      "Effect": "Allow",
      "Action": [
        "kinesis:PutRecord",
        "kinesis:PutRecords",
        "kinesis:DescribeStream"
      ],
      "Resource": "arn:aws:kinesis:us-west-2:123456789012:stream/ampersand-integration-stream"
    }
  ]
}

Step 3: Add the destination to Ampersand

Go to the Destinations page in the Ampersand Dashboard and create a new Kinesis destination. You’ll need to provide:
FieldTypeRequiredDescription
Destination namestringYesAlias to reference in your amp.yaml file
Stream namestringYesName of your Kinesis stream
RegionstringYesAWS region where your stream is located (e.g., us-west-2)
AWS Access Key IDstringYesAWS access key with Kinesis permissions
AWS Secret Access KeystringYesAWS secret access key
AWS Session TokenstringNoSession token for temporary credentials
Partition key templatestringNoJMESPath template for partition key
Endpoint URLstringNoCustom Kinesis endpoint if applicable (e.g. when using LocalStack)
Ampersand encrypts and stores your AWS credentials securely.

Refer to the destination in your integration

After creating your Kinesis destination, reference it in your amp.yaml file:
specVersion: 1.0.0
integrations:
  - name: salesforceToKinesis
    displayName: Salesforce to Kinesis
    provider: salesforce
    read:
      objects:
        - objectName: account
          destination: ampersandKinesisStream
        - objectName: contact
          destination: ampersandKinesisStream

Message format

Ampersand sends messages to your Kinesis stream in JSON format. All data is wrapped in a top-level data object.

Read action messages

{
  "data": {
    "action": "read",
    "projectId": "9482e676-4874-43a4-beea-fa8081d13a07",
    "provider": "hubspot",
    "groupRef": "group-id-1",
    "groupName": "group-name-1",
    "consumerRef": "user-id",
    "consumerName": "user-name",
    "installationId": "085353b1-07f1-4209-b471-7e028c0a68c8",
    "installationUpdateTime": "2025-10-14T02:58:55.595861Z",
    "objectName": "contacts",
    "operationId": "0199e0a8-149d-7ee9-9f50-0514403517f9",
    "operationTime": "2025-10-14T02:58:59.323405308Z",
    "result": [
      {
        "fields": {
          "id": "438",
          "firstname": "Maria",
          "lastname": "Johnson",
          "email": "maria@example.com",
          "city": "Brisbane"
        },
        "raw": {
          "archived": false,
          "createdAt": "2023-10-26T17:55:48.301Z",
          "id": "438",
          "properties": {
            "firstname": "Maria",
            "lastname": "Johnson",
            "email": "maria@example.com",
            "city": "Brisbane",
            "createdate": "2023-10-26T17:55:48.301Z"
          }
        }
      }
    ]
  }
}
Key fields:
  • action: The action type (read or subscribe)
  • projectId: Your Ampersand project identifier
  • provider: The SaaS provider name (e.g., hubspot, salesforce)
  • groupRef / groupName: Customer group identifier and name
  • consumerRef / consumerName: End user identifier and name
  • installationId: The installation instance ID
  • installationUpdateTime: When the installation was last updated
  • objectName: The object being synced
  • operationId: Unique identifier for this sync operation
  • operationTime: When the operation completed
  • result: Array of records synced
    • fields: Normalized field data
    • raw: Original API response from the provider

Subscribe action messages

Subscribe action messages follow the same structure as read actions, but include additional event-related fields within each result entry:
{
  "data": {
    "action": "subscribe",
    "projectId": "9482e676-4874-43a4-beea-fa8081d13a07",
    "provider": "salesforce",
    "groupRef": "webhook-demo-group-id",
    "groupName": "webhook-demo-group-name",
    "consumerRef": "user-id",
    "consumerName": "user-name",
    "installationId": "0c9230e1c-8fbe-4b28-bf10-2beee8fbf4ce",
    "installationUpdateTime": "2025-04-10T23:00:52.618406Z",
    "objectName": "company",
    "operationTime": "2025-04-10T23:22:25.000Z",
    "result": [
      {
        "fields": {
          "id": "001Dp00000ZDgmxIAD",
          "annualrevenue": null,
          "website": null
        },
        "subscribeEventType": "update",
        "providerEventType": "UPDATE",
        "raw": {
          "Id": "001Dp00000ZDgmxIAD",
          "AnnualRevenue": null,
          "Description": "Notes about the account",
          "Name": "Acme Corp",
          "Website": null,
          "attributes": {
            "type": "Account",
            "url": "/services/data/v59.0/sobjects/Account/001Dp00000ZDgmxIAD"
          }
        },
        "rawEvent": {
          "ChangeEventHeader": {
            "changeOrigin": "com/salesforce/api/soap/63.0;client=SfdcInternalAPI/",
            "changeType": "UPDATE",
            "changedFields": ["Description", "LastModifiedDate"],
            "commitNumber": 12041091891110,
            "commitTimestamp": 1744327345000,
            "commitUser": "005Dp000003Cd1SIAS",
            "entityName": "Account",
            "recordId": "001Dp00000ZDgmxIAD",
            "sequenceNumber": 1,
            "transactionKey": "00051705-2e99-d1e5-7e7a-48e3af682d07"
          },
          "Description": "Notes about the account",
          "LastModifiedDate": "2025-04-10T23:22:25.000Z"
        }
      }
    ]
  }
}
Additional fields in subscribe actions:
  • subscribeEventType: Normalized event type (create, update, delete, associationUpdate)
  • providerEventType: Raw event type from the provider API
  • rawEvent: Original webhook event from the provider (when available)

Partition key configuration

Kinesis uses partition keys to distribute messages across shards. (Learn more in the AWS docs). You can customize the partition key using a JMESPath template. JMESPath is a query language for JSON. See jmespath.org for the full specification. You can specify a partition key template when creating a Kinesis destination in the Ampersand Dashboard. If you don’t specify one, by default we use the message ID as the partition key, so the messages we send will be evenly distributed across your shards.

Custom partition key examples

You can customize the partition key by using any of the attributes inside of the message. Since all message data is wrapped in a data object, you need to prefix your JMESPath expressions with data.. Here are some examples: Use installation ID:
data.installationId
Use group reference:
data.groupRef
Use object name:
data.objectName
Combine multiple fields:
join('_', [data.groupRef, data.objectName])
Use operation ID:
data.operationId

Troubleshooting

Messages not appearing in Kinesis

Check destination configuration:
  1. Verify that the stream name and region are correct in the Ampersand Dashboard.
  2. Test your AWS credentials:
    aws kinesis describe-stream \
      --stream-name your-stream-name \
      --region your-region
    
  3. Check that IAM permissions for the credential include kinesis:PutRecord and kinesis:PutRecords.

Limitations

  • Message size: Individual records cannot exceed 1 MB (AWS Kinesis limit). If a single record is bigger than this limit, we send it to you in a download URL, similar to what we do for webhook destinations. (More details here).
  • Throughput: Limited by Kinesis shard capacity (1 MB/sec write per shard).
  • Retention: Messages are retained for 24 hours to 365 days (configurable in AWS).
  • Ordering: Only guaranteed within a single partition key.
If you hit these limits, contact support@withampersand.com to discuss scaling strategies.
I