> ## Documentation Index
> Fetch the complete documentation index at: https://docs.withampersand.com/llms.txt
> Use this file to discover all available pages before exploring further.

# Kinesis destinations

For more information on destinations, see the [Destinations](/destinations) page.

When new data is read from a SaaS instance via a [Read Action](/read-actions) or [Subscribe Action](/subscribe-actions), Ampersand can send the payload to your AWS Kinesis stream.

## Prerequisites

Before setting up a Kinesis destination, ensure that you have:

* An AWS account with access to Kinesis
* A Kinesis stream (or permissions to create one)
* AWS credentials with the following permissions:
  * `kinesis:PutRecord`
  * `kinesis:PutRecords`
  * `kinesis:DescribeStream`

## Create a Kinesis destination

### Step 1: Set up your Kinesis stream

If you don't already have a Kinesis stream, create one in the AWS Console or using the AWS CLI:

```bash theme={null}
aws kinesis create-stream \
  --stream-name ampersand-integration-stream \
  --shard-count 1 \
  --region us-west-2
```

Wait for the stream to become active, you can check by running this command:

```bash theme={null}
aws kinesis describe-stream \
  --stream-name ampersand-integration-stream \
  --region us-west-2
```

You should see `"StreamStatus": "ACTIVE"` in the response.

### Step 2: Create AWS credentials

Create an IAM user or role with permissions to write to your Kinesis stream. You need:

* **AWS Access Key ID**
* **AWS Secret Access Key**
* **AWS Session Token** (optional, for temporary credentials)

**Example IAM policy:**

```json theme={null}
{
  "Version": "2012-10-17",
  "Statement": [
    {
      "Effect": "Allow",
      "Action": [
        "kinesis:PutRecord",
        "kinesis:PutRecords",
        "kinesis:DescribeStream"
      ],
      "Resource": "arn:aws:kinesis:us-west-2:123456789012:stream/ampersand-integration-stream"
    }
  ]
}
```

### Step 3: Add the destination to Ampersand

Go to the [Destinations page](https://dashboard.withampersand.com/projects/_/destinations) in the Ampersand Dashboard and create a new Kinesis destination.

You'll need to provide:

| Field                      | Type   | Required | Description                                                                                        |
| -------------------------- | ------ | -------- | -------------------------------------------------------------------------------------------------- |
| **Destination name**       | string | Yes      | Alias to reference in your `amp.yaml` file                                                         |
| **Stream name**            | string | Yes      | Name of your Kinesis stream                                                                        |
| **Region**                 | string | Yes      | AWS region where your stream is located (e.g., `us-west-2`)                                        |
| **AWS Access Key ID**      | string | Yes      | AWS access key with Kinesis permissions                                                            |
| **AWS Secret Access Key**  | string | Yes      | AWS secret access key                                                                              |
| **AWS Session Token**      | string | No       | Session token for temporary credentials                                                            |
| **Partition key template** | string | No       | [JMESPath](https://jmespath.org) template for partition key                                        |
| **Endpoint URL**           | string | No       | Custom Kinesis endpoint if applicable (e.g. when using [LocalStack](https://www.localstack.cloud)) |

<Info>
  Ampersand encrypts and stores your AWS credentials securely.
</Info>

## Refer to the destination in your integration

After creating your Kinesis destination, reference it in your `amp.yaml` file:

```yaml theme={null}
specVersion: 1.0.0
integrations:
  - name: salesforceToKinesis
    displayName: Salesforce to Kinesis
    provider: salesforce
    read:
      objects:
        - objectName: account
          destination: ampersandKinesisStream
        - objectName: contact
          destination: ampersandKinesisStream
```

## Message format

Ampersand sends messages to your Kinesis stream in JSON format. All data is wrapped in a top-level `data` object.

### Read action messages

```json theme={null}
{
  "data": {
    "action": "read",
    "projectId": "9482e676-4874-43a4-beea-fa8081d13a07",
    "provider": "hubspot",
    "groupRef": "group-id-1",
    "groupName": "group-name-1",
    "consumerRef": "user-id",
    "consumerName": "user-name",
    "installationId": "085353b1-07f1-4209-b471-7e028c0a68c8",
    "installationUpdateTime": "2025-10-14T02:58:55.595861Z",
    "objectName": "contacts",
    "operationId": "0199e0a8-149d-7ee9-9f50-0514403517f9",
    "operationTime": "2025-10-14T02:58:59.323405308Z",
    "result": [
      {
        "fields": {
          "id": "438",
          "firstname": "Maria",
          "lastname": "Johnson",
          "email": "maria@example.com",
          "city": "Brisbane"
        },
        "raw": {
          "archived": false,
          "createdAt": "2023-10-26T17:55:48.301Z",
          "id": "438",
          "properties": {
            "firstname": "Maria",
            "lastname": "Johnson",
            "email": "maria@example.com",
            "city": "Brisbane",
            "createdate": "2023-10-26T17:55:48.301Z"
          }
        }
      }
    ]
  }
}
```

**Key fields:**

* `action`: The action type (`read` or `subscribe`)
* `projectId`: Your Ampersand project identifier
* `provider`: The SaaS provider name (e.g., `hubspot`, `salesforce`)
* `groupRef` / `groupName`: Customer group identifier and name
* `consumerRef` / `consumerName`: End user identifier and name
* `installationId`: The installation instance ID
* `installationUpdateTime`: When the installation was last updated
* `objectName`: The object being synced
* `operationId`: Unique identifier for this sync operation
* `operationTime`: When the operation completed
* `result`: Array of records synced
  * `fields`: Normalized field data
  * `raw`: Original API response from the provider

### Subscribe action messages

Subscribe action messages follow the same structure as read actions, but include additional event-related fields within each result entry:

```json theme={null}
{
  "data": {
    "action": "subscribe",
    "projectId": "9482e676-4874-43a4-beea-fa8081d13a07",
    "provider": "salesforce",
    "groupRef": "webhook-demo-group-id",
    "groupName": "webhook-demo-group-name",
    "consumerRef": "user-id",
    "consumerName": "user-name",
    "installationId": "0c9230e1c-8fbe-4b28-bf10-2beee8fbf4ce",
    "installationUpdateTime": "2025-04-10T23:00:52.618406Z",
    "objectName": "company",
    "operationTime": "2025-04-10T23:22:25.000Z",
    "result": [
      {
        "fields": {
          "id": "001Dp00000ZDgmxIAD",
          "annualrevenue": null,
          "website": null
        },
        "subscribeEventType": "update",
        "providerEventType": "UPDATE",
        "raw": {
          "Id": "001Dp00000ZDgmxIAD",
          "AnnualRevenue": null,
          "Description": "Notes about the account",
          "Name": "Acme Corp",
          "Website": null,
          "attributes": {
            "type": "Account",
            "url": "/services/data/v59.0/sobjects/Account/001Dp00000ZDgmxIAD"
          }
        },
        "rawEvent": {
          "ChangeEventHeader": {
            "changeOrigin": "com/salesforce/api/soap/63.0;client=SfdcInternalAPI/",
            "changeType": "UPDATE",
            "changedFields": ["Description", "LastModifiedDate"],
            "commitNumber": 12041091891110,
            "commitTimestamp": 1744327345000,
            "commitUser": "005Dp000003Cd1SIAS",
            "entityName": "Account",
            "recordId": "001Dp00000ZDgmxIAD",
            "sequenceNumber": 1,
            "transactionKey": "00051705-2e99-d1e5-7e7a-48e3af682d07"
          },
          "Description": "Notes about the account",
          "LastModifiedDate": "2025-04-10T23:22:25.000Z"
        }
      }
    ]
  }
}
```

**Additional fields in subscribe actions:**

* `subscribeEventType`: Normalized event type (`create`, `update`, `delete`, `associationUpdate`)
* `providerEventType`: Raw event type from the provider API
* `rawEvent`: Original webhook event from the provider (when available)

## Partition key configuration

Kinesis uses partition keys to distribute messages across shards. (Learn more in [the AWS docs](https://docs.aws.amazon.com/streams/latest/dev/key-concepts.html)). You can customize the partition key using a JMESPath template. JMESPath is a query language for JSON. See [jmespath.org](https://jmespath.org) for the full specification.

You can specify a partition key template when creating a Kinesis destination in the [Ampersand Dashboard](https://dashboard.withampersand.com/projects/_/destinations). If you don't specify one, by default we use the message ID as the partition key, so the messages we send will be evenly distributed across your shards.

### Custom partition key examples

You can customize the partition key by using any of the attributes inside of the message. Since all message data is wrapped in a `data` object, you need to prefix your JMESPath expressions with `data.`. Here are some examples:

**Use installation ID:**

```
data.installationId
```

**Use group reference:**

```
data.groupRef
```

**Use object name:**

```
data.objectName
```

**Combine multiple fields:**

```
join('_', [data.groupRef, data.objectName])
```

**Use operation ID:**

```
data.operationId
```

## Troubleshooting

### Messages not appearing in Kinesis

**Check destination configuration:**

1. Verify that the stream name and region are correct in the [Ampersand Dashboard](https://dashboard.withampersand.com/projects/_/destinations).
2. Test your AWS credentials:
   ```bash theme={null}
   aws kinesis describe-stream \
     --stream-name your-stream-name \
     --region your-region
   ```
3. Check that IAM permissions for the credential include `kinesis:PutRecord` and `kinesis:PutRecords`.

## Limitations

* **Message size**: Individual records cannot exceed 1 MB (AWS Kinesis limit). If a single record is bigger than this limit, we send it to you in a download URL, similar to what we do for webhook destinations. ([More details here](/destinations/webhooks#read-action-webhooks)).
* **Throughput**: Limited by Kinesis shard capacity (1 MB/sec write per shard).
* **Retention**: Messages are retained for 24 hours to 365 days (configurable in AWS).
* **Ordering**: Only guaranteed within a single partition key.

<Info>
  If you hit these limits, contact `support@withampersand.com` to discuss scaling strategies.
</Info>
