Warehouse Sync API Reference

The Warehouse Sync API enables you to create and manage data ingestion pipelines with your cloud data warehouses.

Base URI

https://api.mparticle.com/platform/v2/workspaces/{workspaceId}

To find your workspace id, follow the instructions in Managing Workspaces.

Resources

Use the Warehouse Sync API resources (endpoints) to work with connections, data models, and data pipelines in order to ingest data into mParticle from data warehouses.

  • A connection defines the location and credentials to allow mParticle to connect to your data warehouse
  • A data model defines the structure and key columns to understand your data
  • A pipeline defines the schedule of when to run your sync

Best Practices

mParticle recommends:

  • Include “connection,” “model,” or “pipeline” at the end of each connection, model, or pipeline name to make it easy to identify each type
  • Share your connection resource among pipelines that sync data from the same warehouse
  • Utilize paused state on the pipeline resource to pause syncing
  • Test your SQL query and data model on your warehouse before creating your pipeline to ensure column names are accurate and only includes fields you want applied
  • Use the incremental sync mode to keep track of which rows need to be loaded to improve performance and reduce costs
  • Normalize your timestamps to UTC timestamps to ease traceability and observability of timestamps as events and attributes are synchronized and forwarded
  • Utilize fully qualified names when identifying items in your database. Simply providing SELECT * FROM table may not work. You should provide a fully qualified name like SELECT * FROM database.schema.name
  • Complete the Warehouse Sync API tutorial with the provided Postman collection and environment:

    Run in Postman

    A copy of the Warehouse Sync environment is included. You can download it again here.

Connections

Use these endpoints for managing connections to data sources.

Get all data warehouse connections

For the workspace specified in the base URI, get all the data warehouse connections.

Request: GET {baseURI}/connections

Query parameter: {serviceProvider} Optional

Allowed Values:

  • Snowflake
  • BigQuery
  • Redshift
  • Databricks

Request body: none

Example response:

[
  {
    "id": "string",
    "name": "string",
    "state": "active",
    "status": "healthy",
    "errors": [
      {
        "message": "string"
      }
    ],
    "service_provider": "Redshift",
    "config": {
      "database": "string",
      "host": "string",
      "user": "string",
      "aws_iam_role_arn": "string"
    },
    "created_on": "2023-10-24T19:59:30.828Z",
    "created_by": "string",
    "last_modified_on": "2023-10-24T19:59:30.828Z",
    "last_modified_by": "string"
  },
  {
    "id": "string",
    "name": "string",
    "state": "active",
    "status": "healthy",
    "errors": [
      {
        "message": "string"
      }
    ],
    "service_provider": "Snowflake",
    "config": {
      "account_identifier": "string",
      "region": "string",
      "warehouse": "string",
      "database": "string",
      "role": "string",
      "user": "string",
      "storage_integration": "mp_us1_123_123_s3",
      "aws_iam_user_arn": "arn:aws:iam::123456:user/externalstages/abcdefg",
      "aws_external_id": "GD1234=2_abcdefg=="
    },
    "created_on": "2023-10-24T19:59:30.828Z",
    "created_by": "string",
    "last_modified_on": "2023-10-24T19:59:30.828Z",
    "last_modified_by": "string"
  },
  {
    "id": "string",
    "name": "string",
    "state": "active",
    "status": "healthy",
    "errors": [
      {
        "message": "string"
      }
    ],
    "service_provider": "BigQuery",
    "config": {
      "region": "string",
      "project_id": "string",
      "dataset_id": "string",
      "service_account_id": "string",
      "service_account_key": "string"
    },
    "created_on": "2023-10-24T19:59:30.828Z",
    "created_by": "string",
    "last_modified_on": "2023-10-24T19:59:30.828Z",
    "last_modified_by": "string"
  }
]

Get a specific connection

Request: GET {baseURI}/connections/{connectionId}

Path parameter: {connectionId} Required

Request body: none

Example response:

{
  "id": "example-connection",
  "name": "example-connection-name",
  "state": "active",
  "status": "healthy",
  "errors": [
    {
      "message": "example error message"
    }
  ],
  "service_provider": "Redshift",
  "config": {
    "database": "string",
    "host": "string",
    "user": "string",
    "aws_iam_role_arn": "string"
  },
  "created_on": "2023-10-24T20:06:37.429Z",
  "created_by": "string",
  "last_modified_on": "2023-10-24T20:06:37.429Z",
  "last_modified_by": "string"
}

Create a connection

Request: POST {baseURI}/connections

Parameters for Snowflake connections:

Name Type Required Description
id string Required Unique identifier in slug format. Valid characters include numbers, letters, _, and -
name string Required Name of the connection
state string Required Valid values: new, active, or inactive
service_provider string Required Valid value: Snowflake
account_identifier string Required Snowflake account locator where data will be retrieved. Refer to your warehouse documentation to determine your account id. Snowflake: here
region string Required Snowflake <cloud_region_id> or <cloud_region_id>.<cloud> region identifier where data will be retrieved. Refer to your warehouse documentation to determine your region. Snowflake: here
warehouse string Required Identifier for compute resource to utilize when syncing data
database string Required Identifier for the name of the database where data is stored in your warehouse
user string Required Username to log to your warehouse as
role string Required Snowflake role to assume
password string Required Password for user
storage_integration string Required Snowflake storage integration name that was created in the quickstart
aws_iam_user_arn string Optional Snowflake storage integration AWS resource identifier that was created in the quickstart
aws_external_id string Optional Snowflake storage integration external identifier that was created in the quickstart

Request body example:

{
  "id": "example-snowflake-connection",
  "name": "Example Snowflake Connection",
  "state": "active",
  "service_provider": "Snowflake",
  "config": {
    "account_identifier": "gd12345",
    "region": "us-central1.gcp",
    "warehouse": "compute_wh",
    "database": "my_database",
    "role": "mparticle_role",
    "user": "mparticle_user",
    "password": "mParticleSecurePassword",
    "storage_integration": "mp_us1_123_123_s3"
  }
}

Example response:

{
  "id": "example-connection",
  "name": "Example Snowflake Connection",
  "state": "active",
  "status": "healthy",
  "service_provider": "Snowflake",
  "config": {
    "account_identifier": "gd12345",
    "region": "us-central1.gcp",
    "warehouse": "compute_wh",
    "database": "my_database",
    "role": "mparticle_role",
    "user": "mparticle_user",
    "password": "************",
    "storage_integration": "mp_us1_123_123_s3",
    "aws_iam_user_arn": "arn:aws:iam::123456:user/externalstages/abcdefg",
    "aws_external_id": "GD1234=2_abcdefg=="
  },
  "created_on": "2023-02-03T23:53:08.413",
  "created_by": "developer@mparticle.com",
  "last_modified_on": null,
  "last_modified_by": null
}

Parameters for Google BigQuery connections:

Name Type Description
id string Unique identifier in slug format. Valid characters include numbers, letters, _, and -
name string Name of the connection
state string Valid values: new, active, or inactive
service_provider string Valid value: BigQuery
region string Warehouse region name where data will be retrieved. Refer to your warehouse documentation to determine your region. BigQuery: here
project_id string BigQuery project ID
dataset_id string BigQuery dataset ID
service_account_id string BigQuery service account ID that was created in the quickstart
service_account_key string BigQuery service account key that was created in the quickstart

Request body example:

{
  "id": "example-bigquery-connection",
  "name": "Example BigQuery Connection",
  "state": "active",
  "service_provider": "BigQuery",
  "config": {
    "region": "us-east1",
    "project_id": "my-gcp-project",
    "dataset_id": "my-dataset",
    "service_account_id": "mparticle-account@my-gcp-project.iam.gserviceaccount.com",
    "service_account_key": "{\"type\": \"service_account\",   \"project_id\": \"my-gcp-project\",   \"private_key_id\": \"1234abcd456789hjkl\",   \"private_key\": \"-----BEGIN PRIVATE KEY-----\\ABC123456789/U\\ABC+12345+abcABC+\\n-----END PRIVATE KEY-----\",   \"client_email\": \"mparticle-account@my-gcp-project.iam.gserviceaccount.com\",   \"client_id\": \"123456789\",   \"auth_uri\": \"https://accounts.google.com/o/oauth2/auth\",   \"token_uri\": \"https://oauth2.googleapis.com/token\",   \"auth_provider_x509_cert_url\": \"https://www.googleapis.com/oauth2/v1/certs\",   \"client_x509_cert_url\": \"https://www.googleapis.com/robot/v1/metadata/x509/mparticle-account%40my-gcp-project.iam.gserviceaccount.com\" }"
  }
}

Example response:

{
  "id": "example-bigquery-connection",
  "name": "Example BigQuery Connection",
  "state": "active",
  "status": "healthy",
  "service_provider": "BigQuery",
  "config": {
    "region": "us-east1",
    "project_id": "my-gcp-project",
    "dataset_id": "my-dataset",
    "service_account_id": "mparticle-account@my-gcp-project.iam.gserviceaccount.com",
    "service_account_key": "************"
  },
  "created_on": "2023-02-03T23:53:08.413",
  "created_by": "developer@mparticle.com",
  "last_modified_on": null,
  "last_modified_by": null
}

Parameters for Amazon Redshift connections:

Name Type Description
id string Unique identifier in slug format. Valid characters include numbers, letters, _, and -
name string Name of the connection
state string Valid values: new, active, or inactive
service_provider string Valid value: Redshift
database string The identifier for the database in Redshift you are creating a connection for
user string The name for the user you created in Redshift for your connection
password string The password for the user
aws_iam_role_arn string The ARN for the role you created when configuring Redshift for your connection with mParticle.
host string Your AWS Redshift host. You can find this in your Cluster properties from your AWS Redshift dashboard.
port string Your AWS Redshift port. You can find this in your Cluster properties from your AWS Redshift dashboard.

Request body example:

{
  "id": "example-redshift-connection",
  "name": "Redshift Connection",
  "state": "active",
  "service_provider": "Redshift",
  "config": {
    "database": "dev",
    "host": "dwi-test.ab123yyxwwzz.us-east-1.redshift.amazonaws.com",
    "user": "mParticle",
    "password": "mParticleSecurePassword",
    "aws_iam_role_arn": "arn:aws:iam::123456789:role/mParticle_role"
  }
}

Example response:

{
  "id": "example-redshift-connection",
  "name": "Redshift Connection",
  "state": "active",
  "status": "healthy",
  "service_provider": "Redshift",
  "config": {
    "database": "dev",
    "host": "dwi-test.ab123yyxwwzz.us-east-1.redshift.amazonaws.com",
    "port": "5439",
    "user": "mParticle",
    "password": "************",
    "aws_iam_role_arn": "arn:aws:iam::123456789:role/mParticle_role"
  },
  "created_on": "2023-02-03T23:53:08.413",
  "created_by": "developer@mparticle.com",
  "last_modified_on": null,
  "last_modified_by": null
}

Parameters for Databricks connections:

Name Type Required Description
id string Required Unique identifier in slug format. Valid characters include numbers, letters, _, and -
name string Required Name of the connection
state string Required Valid values: new, active, or inactive
service_provider string Required Valid value: Databricks
provider string Required The value of the Databricks organization name for the metastore you’re ingesting data from
schema string Required The name of your data schema

Request body example:

{
  "id": "example-databricks-connection",
  "name": "Example Databricks Connection",
  "state": "active",
  "service_provider": "Databricks",
  "config": {
    "provider": "your-databricks-org-name", 
    "schema": "your-schema-name"
  }
}

Example response:

{
  "id": "example-databricks-connection",
  "name": "Example Databricks Connection",
  "state": "active",
  "service_provider": "Databricks",
  "config": {
    "provider": "your-databricks-organization-name",
    "schema": "your-schema-name"
  },
  "created_on": "2023-02-03T23:53:08.413",
  "created_by": "developer@mparticle.com",
  "last_modified_on": null,
  "last_modified_by": null
}

Update an existing connection

Request: PUT {baseURI}/connections/{connectionId}

Parameters are the same as Create a connection

Request body example:

This example corrects a typo in the name of a connection:

{
  "id": "example-snowflake-connection",
  "name": "Example Snowflake Connection",
  "state": "active",
  "service_provider": "Snowflake",
  "config": {
    "account_identifier": "gd12345",
    "region": "us-central1.gcp",
    "warehouse": "compute_wh",
    "database": "my_database",
    "role": "mparticle_role",
    "user": "mparticle_user",
    "password": "mParticleSecurePassword",
    "storage_integration": "mp_us1_123_123_s3"
  }
}

Example Response:

{
  "id": "example-connection",
  "name": "Example Snowflake Connection",
  "state": "active",
  "status": "healthy",
  "service_provider": "Snowflake",
  "config": {
    "source_account_id": "gd1234",
    "region": "us-central1.gcp",
    "warehouse": "compute_wh",
    "database": "indicative",
    "role": "mp_role",
    "user": "mp_user",
    "password": "************",
    "storage_integration": "mp_us1_123_123_s3",
    "aws_iam_user_arn": "arn:aws:iam::123456:user/externalstages/abcdefg",
    "aws_external_id": "GD1234=2_abcdefg=="
  },
  "created_on": "2023-02-03T23:53:08.413",
  "created_by": "developer@mparticle.com",
  "last_modified_on": null,
  "last_modified_by": null
}

Delete a connection

Request: DELETE {baseURI}/connections/{connectionId}

Request body: None

Data Models

Use these endpoints for managing data models.

For more information about the SQL query defined in a data model, see Warehouse Sync SQL Reference.

Get all data models

GET {baseURI}/data-models

Request body: none

Example response:

[
  {
    "id": "string",
    "name": "example-data-model",
    "state": "active",
    "status": "valid",
    "errors": [
      {
        "message": "string"
      }
    ],
    "type": "sql",
    "config": {
      "sql_query": "SELECT email AS email, COUNT(id) AS \"count_of_open_tickets\", LAST_UPDATED_DATE_TIME FROM mp.demo_service.tickets WHERE t.status = 'open'"
    },
    "created_on": "2023-10-24T21:05:19.281Z",
    "created_by": "developer@example.com",
    "last_modified_on": "2023-10-24T21:05:19.281Z",
    "last_modified_by": "developer@example.com"
  }
]

Get a specified data model

Request: GET {baseURI}/data-models/{modelId}

Request body: None

Example response:

{
  "id": "example-data-model",
  "name": "Example Data Model",
  "state": "active",
  "status": "valid",
  "errors": [
    {
      "message": "string"
    }
  ],
  "type": "sql",
  "config": {
    "sql_query": "SELECT email AS email, COUNT(id) AS \"count_of_open_tickets\" FROM mp.demo_service.tickets WHERE t.status = 'open'"
  },
  "created_on": "2023-10-24T21:14:30.532Z",
  "created_by": "developer@example.com",
  "last_modified_on": "2023-10-24T21:14:30.532Z",
  "last_modified_by": "developer@example.com"
}

Create a data model

POST {baseURI}/data-models

Parameters:

Name Type Description
id string Unique identifier in slug format. Valid characters include numbers, letters, _, and -
name string Name of the data model
state string The state of the data model. Valid values: new, active, or inactive
type string Required. Valid value: “sql”
config object A JSON object containing the SQL statement defining the data model
sql_query string A valid SQL query that selects all the columns from Snowflake for this data model. See SQL for a list of supported SQL commands

Example request body:

{
  "id": "example-data-model",
  "name": "Example Data Model",
  "state": "active",
  "errors": [
    {
      "message": "string"
    }
  ],
  "type": "sql",
  "config": {
    "sql_query": "SELECT email AS email, COUNT(id) AS \"count_of_open_tickets\", LAST_UPDATED_DATE_TIME FROM mp.demo_service.tickets WHERE t.status = 'open'"
  }
}

Example response:

{
  "id": "string",
  "name": "string",
  "state": "active",
  "status": "valid",
  "errors": [
    {
      "message": "string"
    }
  ],
  "type": "sql",
  "config": {
    "sql_query": "string"
  },
  "created_on": "2023-10-24T21:23:20.833Z",
  "created_by": "string",
  "last_modified_on": "2023-10-24T21:23:20.833Z",
  "last_modified_by": "string"
}

Update a data model

Request: PUT {baseURI}/data-models/{modelId}

Parameters are the same as Create a data model

Example request body:

{
  "id": "example-data-model",
  "name": "Example Data Model",
  "state": "active",
  "errors": [
    {
      "message": "string"
    }
  ],
  "type": "sql",
  "config": {
    "sql_query": "SELECT email AS email, COUNT(id) AS \"count_of_open_tickets\", LAST_UPDATED_DATE_TIME FROM mp.demo_service.tickets WHERE t.status = 'open'"
  }
}

Response example:

{
  "id": "string",
  "name": "string",
  "state": "active",
  "status": "valid",
  "errors": [
    {
      "message": "string"
    }
  ],
  "type": "sql",
  "config": {
    "sql_query": "string"
  },
  "created_on": "2023-10-24T21:23:20.833Z",
  "created_by": "string",
  "last_modified_on": "2023-10-24T21:23:20.833Z",
  "last_modified_by": "string"
}

Delete a data model

Request: DELETE {baseURI}/data-models/{modelId}

Request body: None

Data Pipelines

Use these endpoints for managing pipelines. Pipelines execute a data model to a connection at the specified schedule.

Schedule and Sync Mode Type

A Pipeline’s schedule is used to configure the execution interval of a pipeline. A Pipeline’s sync mode is used to configure what records are synchronized in each run.

These are the following supported schedule type fields:

  • interval - Used for repeated pipelines according to the pipeline’s frequency
  • on_demand - Used for pipelines that will use the trigger API
  • once - The pipeline will only be ran once. This is recommended to perform a single pipeline sync of a table that doesn’t need to be repeated

These are the following supported schedule frequency fields:

  • hourly
  • daily
  • weekly
  • monthly

These are the following supported sync_mode type fields:

  • incremental - Used for pipelines that only load data that has changed between runs according to the iterator_field

    • The first run for incremental sync modes will be a full sync
  • full - Use to perform a full sync on each pipeline interval run. Not recommended as it may lead to repeated data and events

Example 1: Hourly incremental pipeline

The following values creates a pipeline that runs every hour starting February 1st 2023

{
  "sync_mode": {
    "type": "incremental",
    "iterator_field": "updated_on",
    "iterator_data_type": "datetime_tz"
  },
  "schedule": {
    "type": "interval",
    "frequency": "hourly",
    "start": "2023-02-01T00:15:00Z"
  }
}

Example 2: Hourly incremental pipeline with a 15 minutes delay

The following values creates a pipeline that runs 15 minutes after every hour to account for a delay in data arriving in the source data model

{
  "sync_mode": {
    "type": "incremental",
    "iterator_field": "updated_on",
    "iterator_data_type": "datetime_tz"
  },
  "schedule": {
    "type": "interval",
    "frequency": "hourly",
    "start": "2023-02-01T00:00:00Z",
    "delay": "15m"
  }
}

Example 3: Midnight UTC daily pipelines for the month of February

The following values creates a pipeline that runs every 24 hours for the month of February starting at noon each day and filters data to only include data for that time period

{
  "sync_mode": {
    "type": "incremental",
    "iterator_field": "updated_on",
    "iterator_data_type": "datetime_tz",
    "from": "2023-02-01T12:00:00Z",
    "until": "2023-03-01T:12:00:00Z"
  },
  "schedule": {
    "type": "interval",
    "frequency": "daily",
    "start": "2023-02-01T12:00:00Z",
    "end": "2023-03-01T:12:00:00Z"
  }
}

Example 4: Monthly pipelines running on the 5th day of the month at midnight UTC

The following values creates a pipeline that runs every every month starting in February 5th 2023 with a 5 day delay. The first run should only synchronize data going back to January 2020

{
  "sync_mode": {
    "type": "incremental",
    "iterator_field": "updated_on",
    "iterator_data_type": "datetime_tz",
    "from": "2020-01-01T00:00:00Z"
  },
  "schedule": {
    "type": "interval",
    "frequency": "monthly",
    "start": "2023-02-01T00:00:00Z",
    "delay": "5d"
  }
}

Example 5: Pipelines that will be triggered on demand

The following values creates a pipeline that can be triggered on demand. Each trigger creates a new pipeline run starting from the previous successful pipeline run or schedule_start_time until the time the trigger is requested.

{
  "sync_mode": {
    "type": "incremental",
    "iterator_field": "updated_on",
    "iterator_data_type": "datetime_tz",
    "from": "2020-01-01T00:00:00Z"
  },
  "schedule": {
    "type": "on_demand"
  }
}

The following table shows an example of how the pipeline’s intervals are set for each action that is performed.

Date Action Description
2023-02-01 15:13:22Z Pipeline Created The pipeline is initially idle
2023-02-04 08:30:17Z Trigger API Called An interval will synchronize data between 2020-01-01 00:00:00Z and 2023-02-04 08:30:17Z
2023-02-08 12:05:45Z Trigger API Called An interval will synchronize data between 2023-02-04 08:30:11Z and 2023-02-08 12:05:45Z

example 6: Once pipeline that executes immediately and synchronizes all data in a specific time window

the following values creates a pipeline that runs immediately, synchronizing values in the table for timestamped for the year 2020.

{
  "sync_mode": {
    "type": "full",
    "iterator_field": "updated_on",
    "iterator_data_type": "datetime_tz",
    "from": "2020-01-01T00:00:00Z",
    "until": "2021-01-01T00:00:00Z"
  },
  "schedule": {
    "type": "once"
  }
}

Status and Errors

If an error occurs, the status of the connection, data model, and/or pipeline will be set appropriately with an error message detailing what may be wrong. In a faulted state, no data will synchronize until the issue is resolved. You may update the affected resource using a patch or put command which will clear the fault and the pipeline will run on the next scheduled interval. Alternatively you may issue a re-sync command by using the trigger API which will retry the pipeline.

Get all pipelines

GET {baseURI}/inputs/data-pipelines

Request body: none

Example response:

[
  {
    "id": "example-pipeline-1",
    "name": "Example Pipeline 1",
    "pipeline_type": "user_attributes",
    "connection_id": "connection-id",
    "field_transformation_id": "field-transformation-id",
    "data_model_id": "data-model-id",
    "partner_feed_id": 0,
    "partner_feed_key": "feed-key",
    "state": "active",
    "status": "idle",
    "errors": [
      {
        "message": "string"
      }
    ],
    "sync_mode": {
      "type": "incremental",
      "iterator_field": "updated_at",
      "iterator_data_type": "timestamp",
      "from": "2022-07-01T16:00:00Z",
      "until": "2022-08-01T16:00:00Z"
    },
    "schedule": {
      "type": "interval",
      "frequency": "hourly",
      "delay": "5m",
      "start": "2022-07-01T16:00:00Z",
      "end": "2022-08-01T16:00:00Z"
    },
    "environment": "development",
    "created_on": "2023-10-25T15:21:55.899Z",
    "created_by": "developer@example.com",
    "last_modified_on": "2023-10-25T15:21:55.899Z",
    "last_modified_by": "developer@example.com"
  },
  {
    "id": "example-pipeline-2",
    "name": "Example Pipeline 2",
    "pipeline_type": "user_attributes",
    "connection_id": "connection-id",
    "field_transformation_id": "field-transformation-id",
    "data_model_id": "data-model-id",
    "partner_feed_id": 0,
    "partner_feed_key": "feed-key",
    "state": "active",
    "status": "idle",
    "errors": [
      {
        "message": "string"
      }
    ],
    "sync_mode": {
      "type": "incremental",
      "iterator_field": "updated_at",
      "iterator_data_type": "timestamp",
      "from": "2022-07-01T16:00:00Z",
      "until": "2022-08-01T16:00:00Z"
    },
    "schedule": {
      "type": "interval",
      "frequency": "hourly",
      "delay": "5m",
      "start": "2022-07-01T16:00:00Z",
      "end": "2022-08-01T16:00:00Z"
    },
    "environment": "development",
    "created_on": "2023-10-25T15:21:55.899Z",
    "created_by": "developer@example.com",
    "last_modified_on": "2023-10-25T15:21:55.899Z",
    "last_modified_by": "developer@example.com"
  }
  
]

Get a specific pipeline

Request: GET {baseURI}/inputs/data-pipelines/{pipelineId}

Path parameter: {pipelineId} Required

Request body: none

Example response:

{
  "id": "example-pipeline-1",
  "name": "Example Pipeline 1",
  "pipeline_type": "user_attributes",
  "connection_id": "connection-id",
  "field_transformation_id": "field-transformation-id",
  "data_model_id": "data-model-id",
  "partner_feed_id": 0,
  "partner_feed_key": "feed-key",
  "state": "active",
  "status": "idle",
  "errors": [
    {
      "message": "string"
    }
  ],
  "sync_mode": {
    "type": "incremental",
    "iterator_field": "updated_at",
    "iterator_data_type": "timestamp",
    "from": "2022-07-01T16:00:00Z",
    "until": "2022-08-01T16:00:00Z"
  },
  "schedule": {
    "type": "interval",
    "frequency": "hourly",
    "delay": "5m",
    "start": "2022-07-01T16:00:00Z",
    "end": "2022-08-01T16:00:00Z"
  },
  "environment": "development",
  "created_on": "2023-10-25T15:21:55.899Z",
  "created_by": "developer@example.com",
  "last_modified_on": "2023-10-25T15:21:55.899Z",
  "last_modified_by": "developer@example.com"
}

Create a pipeline

POST {baseURI}/inputs/data-pipelines

Parameters:

Name Type Required Description
id string Required Unique identifier in slug format. Valid characters include numbers, letters, _, and -
name string Required Name of the pipeline
pipeline_type string Required Valid values: user_attributes, events. If set to events, you must use a fields transformation to map your source data to the destination fields in mParticle.
connection_id string Required The ID of the connection to use with this pipeline
field_transformation_id string Required for event pipelines The ID of the field transformation to use when mapping fields in your warehouse data to event fields in mParticle. Field transformations are not available for user data pipelines.
data_model_id string Required The ID of the data model to use with this pipeline
partner_feed_id integer Optional The ID of the feed that incoming data will route to. To learn how to create a feed using an API, see feeds API. ^1^
partner_feed_key string Optional The Key of the feed that incoming data will route to.
state string Required The state of the pipeline. Valid values: new, active or paused
sync_mode object Required JSON object containing the sync mode settings
type string Required Valid values: incremental or full
iterator_field string Required if incremental A name of a column in the database. mParticle uses the iterator to track what data needs to be synced.
iterator_data_type string Required if incremental The data type for the iterator field. For example: timestamp
from string Optional Timestamp for first record to sync. Format: 2022-10-30T11:00:16Z
until string Optional Timestamp for last record to sync. Format: 2022-10-30T11:00:16Z
schedule object Required JSON object containing the pipeline scheduling settings
type string Required Valid values: interval, once, or on-demand
frequency string Required if interval Frequency to sync data. Valid values: hourly, daily, weekly, monthly
delay string Required The amount of time to delay a scheduled interval sync. Useful for situations where data may only be available in the source data set after a period of time. Format: 1d. A number, followed immediately by one of the following units: s for seconds, m for minutes, h for hours, d for days, w for weeks, and y for years.
start string Required if interval Timestamp of first scheduled interval sync. If sync_mode.from is equal to the provided start date, the pipeline will start on the next scheduled interval. Format: 2022-10-30T11:00:16Z
end string Optional Timestamp of last scheduled interval sync. Format: 2022-10-30T11:00:16Z
environment string Required mParticle environment to sync data as. Valid values: development, production
data_plan_id string Optional ID for the data plan to associate with the pipeline
data_plan_version string Optional Version for the data plan to associate with the pipeline

When creating a new pipeline, you must decide if your pipeline will be used to ingest event data or user data:

  • Event data pipelines: if your pipeline will ingest event data, you must specify the ID of a field transformation that mParticle uses to map data in your warehouse to the correct field in the mParticle JSON schema.
  • User data pipelines: if your pipeline will ingest user data, the data model you defined with your SQL query provides the necessary mapping between your warehouse fields and mParticle fields. Field transformations are not available for user data pipelines, and including a field transformation ID with a user data pipeline request results in an error.

To lean more about field transformations and how to create them with the Field Transformations API, see Field Transformations API.

Example request body:

Note: only include a field_transformation_id for event data pipelines.

{
  "id": "string",
  "name": "string",
  "pipeline_type": "events",
  "connection_id": "string",
  "field_transformation_id": "field-transformation-id", 
  "data_model_id": "string",
  "partner_feed_id": 0,
  "partner_feed_key": "string",
  "state": "active",
  "errors": [
    {
      "message": "string"
    }
  ],
  "sync_mode": {
    "type": "incremental",
    "iterator_field": "updated_at",
    "iterator_data_type": "timestamp",
    "from": "2022-07-01T16:00:00Z",
    "until": "2022-08-01T16:00:00Z"
  },
  "schedule": {
    "type": "interval",
    "frequency": "hourly",
    "delay": "5m",
    "start": "2022-07-01T16:00:00Z",
    "end": "2022-08-01T16:00:00Z"
  },
  "environment": "development",
  "data_plan_id": "example-data-plan-id",
  "data_plan_version": 2
}

Example response:

{
  "id": "string",
  "name": "string",
  "pipeline_type": "events",
  "connection_id": "string",
  "field_transformation_id": "field-transformation-id",
  "data_model_id": "string",
  "partner_feed_id": 0,
  "partner_feed_key": "string",
  "state": "active",
  "status": "idle",
  "errors": [
    {
      "message": "string"
    }
  ],
  "sync_mode": {
    "type": "incremental",
    "iterator_field": "updated_at",
    "iterator_data_type": "timestamp",
    "from": "2022-07-01T16:00:00Z",
    "until": "2022-08-01T16:00:00Z"
  },
  "schedule": {
    "type": "interval",
    "frequency": "hourly",
    "delay": "5m",
    "start": "2022-07-01T16:00:00Z",
    "end": "2022-08-01T16:00:00Z"
  },
  "environment": "development",
  "data_plan_id": "example-data-plan-id",
  "data_plan_version": 2,
  "created_on": "2023-10-25T16:02:28.008Z",
  "created_by": "string",
  "last_modified_on": "2023-10-25T16:02:28.008Z",
  "last_modified_by": "string"
}

Update a pipeline

Request: PUT {baseURI}/inputs/data-pipelines/{pipelineId}

Parameters are the same as Create a pipeline

Example request body:

This example request changes the schedule interval from hourly to weekly:

{
  "id": "string",
  "name": "string",
  "pipeline_type": "user_attributes",
  "connection_id": "string",
  "data_model_id": "string",
  "partner_feed_id": 0,
  "partner_feed_key": "string",
  "state": "active",
  "errors": [
    {
      "message": "string"
    }
  ],
  "sync_mode": {
    "type": "incremental",
    "iterator_field": "updated_at",
    "iterator_data_type": "timestamp",
    "from": "2022-07-01T16:00:00Z",
    "until": "2022-08-01T16:00:00Z"
  },
  "schedule": {
    "type": "interval",
    "frequency": "hourly",
    "delay": "5m",
    "start": "2022-07-01T16:00:00Z",
    "end": "2022-08-01T16:00:00Z"
  },
  "environment": "development",
  "data_plan_id": "example-data-plan-id",
  "data_plan_version": 2
}

Example response:

{
  "id": "string",
  "name": "string",
  "pipeline_type": "user_attributes",
  "connection_id": "string",
  "data_model_id": "string",
  "partner_feed_id": 0,
  "partner_feed_key": "string",
  "state": "active",
  "status": "idle",
  "errors": [
    {
      "message": "string"
    }
  ],
  "sync_mode": {
    "type": "incremental",
    "iterator_field": "updated_at",
    "iterator_data_type": "timestamp",
    "from": "2022-07-01T16:00:00Z",
    "until": "2022-08-01T16:00:00Z"
  },
  "schedule": {
    "type": "interval",
    "frequency": "hourly",
    "delay": "5m",
    "start": "2022-07-01T16:00:00Z",
    "end": "2022-08-01T16:00:00Z"
  },
  "environment": "development",
  "data_plan_id": "example-data-plan-id",
  "data_plan_version": 2,
  "created_on": "2023-10-25T16:02:28.008Z",
  "created_by": "string",
  "last_modified_on": "2023-10-25T16:02:28.008Z",
  "last_modified_by": "string"
}

Delete a pipeline

Request: DELETE {baseURI}/inputs/data-pipelines/{pipelineId}

Request body: None

Trigger a pipeline

POST {baseURI}/inputs/data-pipelines/trigger

Starts syncing data from a pipeline. The behavior varies depending on the pipeline’s sync mode:

  • incremental sync mode: the pipeline syncs data for since the last successful sync till the time the trigger was executed
  • full_sync sync mode: syncs all data in a pipeline

If a pipeline is faulted, you can use the trigger endpoint to re-attempt synchronization. If successful, this clears the faulted status.

Returns an error 400 if the pipeline is not idle.

The following table shows an example of how the pipeline’s intervals are set for each action that is performed.

Date Action Description
2023-02-01 15:13:22Z Pipeline Created The pipeline is initially idle
2023-02-04 08:30:17Z Trigger API Called An interval will synchronize data between 2023-02-01 00:00:00Z and 2023-02-04 08:30:17Z
2023-02-08 12:05:45Z Trigger API Called An interval will synchronize data between 2023-02-04 08:30:11Z and 2023-02-08 12:05:45Z

Example request body:

{
  "pipeline_id": "string"
}

Example response:

{
  "pipeline_run_id": 13325,
  "status": "attempt_requested"
}

Example error response:

{
    "statusCode": 400,
    "errors": [
        {
            "message": "Pipeline example-pipeline is already in progress."
        }
    ]
}

Get the status report for a pipeline

GET {baseURI}/inputs/data-pipelines/{pipelineId}/report

Retrieves the current status of a pipeline, including the latest completed run.

Request body: none

Response Parameters:

Name Type Description
status string The status of the pipeline. Valid values: “idle”, “running”, “faulted”, or “stopped”
errors array A list of errors detailing why a pipeline may be “faulted”
connection_status string The status of the pipeline’s connection. Valid values: “healthy” or “faulted”
connection_errors array A list of errors detailing why a connection may be “faulted”
data_model_status string The status of the pipeline’s data model. Valid values: “invalid” or “valid”
data_model_errors array A list of errors detailing why a data model may be “invalid”
latest_successful_pipeline_run_id integer The most recent successfully completed pipeline run’s id
latest_pipeline_run object The most recent pipeline run’s status according by it’s logical_date
id string The id of this pipeline run
pipeline_id string The pipeline this run is for
type string Descriptor for reason why this run was created. Values: scheduled, or manual
status string Status for this run. Values: queued, running, success, failed, stopped, or retrying
errors array A list of errors detailing why a run may have a “failed” status
logical_date string Identifier aligned to the chosen schedule interval and frequency
started_on string Timestamp of when this run began running
ended_on string Timestamp of when this run finished
range_start string Start of data retrieved from the data model aligned to the chosen delay
range_end string End of data retrieved from the data model aligned to the chosen delay
successful_records string Count of rows successfully processed
failed_records string Count of rows that failed to extract or be processed into the mParticle format

Example response:

{
  "pipeline_id": "string",
  "status": "idle",
  "errors": [
    {
      "message": "string"
    }
  ],
  "connection_status": "healthy",
  "connection_errors": [
    {
      "message": "string"
    }
  ],
  "data_model_status": "valid",
  "data_model_errors": [
    {
      "message": "string"
    }
  ],
  "latest_successful_pipeline_run_id": 0,
  "latest_pipeline_run": {
    "id": 0,
    "pipeline_id": "string",
    "type": "scheduled",
    "status": "success",
    "errors": [
      {
        "message": "string"
      }
    ],
    "logical_date": "2023-10-25T18:11:57.321Z",
    "started_on": "2023-10-25T18:11:57.321Z",
    "ended_on": "2023-10-25T18:11:57.321Z",
    "range_start": "2023-10-25T18:11:57.321Z",
    "range_end": "2023-10-25T18:11:57.321Z",
    "successful_records": 0,
    "failed_records": 0
  }
}

Get a list of pipeline runs for a pipeline

GET {baseURI}inputs/data-pipelines/{pipelineId}/runs?currentPage={currentPage}&pageSize={pageSize}&startDate={startDate}&endDate={endDate}

Path Parameters

Name Required Description
pipelineId Required The ID of the pipeline to retrieve the status for
currentPage Optional Page number to return
pageSize Optional Count of items to return per page
startDate Optional The earliest logical_date to look for run statuses
endDate Optional The last logical_date to look for run statuses

Request body: none

Example response:

{
  "items": [
    {
      "id": 251,
      "pipeline_id": "example-pipeline",
      "type": "scheduled",
      "status": "success",
      "errors": [
        {
          "message": "string"
        }
      ],
      "logical_date": "2023-10-25T18:11:57.321Z",
      "started_on": "2023-10-25T18:11:57.321Z",
      "ended_on": "2023-10-25T18:11:57.321Z",
      "range_start": "2023-10-25T18:11:57.321Z",
      "range_end": "2023-10-25T18:11:57.321Z",
      "successful_records": 100,
      "failed_records": 0
    },
    {
      "id": 252,
      "pipeline_id": "example-pipeline",
      "type": "scheduled",
      "status": "success",
      "errors": [
        {
          "message": "string"
        }
      ],
      "logical_date": "2023-10-25T19:11:57.321Z",
      "started_on": "2023-10-25T19:11:57.321Z",
      "ended_on": "2023-10-25T19:11:57.321Z",
      "range_start": "2023-10-25T19:11:57.321Z",
      "range_end": "2023-10-25T19:11:57.321Z",
      "successful_records": 100,
      "failed_records": 0
    }
  ],
  "current_page": 1,
  "page_size": 50,
  "total_pages": 1,
  "total_items": 1,
  "has_previous_page": false,
  "has_next_page": false
}

Get a specific pipeline run for a pipeline

GET {baseURI}/inputs/data-pipelines/{pipelineId}/runs/{pipelineRunId}

Request body: none

Example response:

{
  "id": 251,
  "pipeline_id": "example-pipeline",
  "type": "scheduled",
  "status": "success",
  "errors": [
    {
      "message": "string"
    }
  ],
  "logical_date": "2023-10-25T18:11:57.321Z",
  "started_on": "2023-10-25T18:11:57.321Z",
  "ended_on": "2023-10-25T18:11:57.321Z",
  "range_start": "2023-10-25T18:11:57.321Z",
  "range_end": "2023-10-25T18:11:57.321Z",
  "successful_records": 100,
  "failed_records": 0
}

Error codes and status codes

The Warehouse Sync API returns error codes or status codes in response to every request.

Status codes

200: Successful request. The message varies depending on which resource you requested.

204: Empty response. Indicates a successful operation.

Error codes

400: The resource could not be created because of an error in the request or the entity is in an unprocessable state. This may mean the SQL query contains invalid characters or the query is otherwise invalid. This could be the result of supplying a data plan ID for a plan that is not active.

404: The requested resource or object wasn’t found. The message varies depending on which resource or object you requested. This could be the result of supplying a data plan ID for a data plan that does not exist in your workspace, or if the data plan does exist but you did not specify a data plan version.

505: The request failed to connect to the warehouse account. Check the username and password, and make sure the source account ID, region, warehouse, and database entries are correct.

Was this page helpful?