Create a dataflow

Important

Azure IoT Operations Preview – enabled by Azure Arc is currently in preview. You shouldn't use this preview software in production environments.

You'll need to deploy a new Azure IoT Operations installation when a generally available release is made available. You won't be able to upgrade a preview installation.

See the Supplemental Terms of Use for Microsoft Azure Previews for legal terms that apply to Azure features that are in beta, preview, or otherwise not yet released into general availability.

A dataflow is the path that data takes from the source to the destination with optional transformations. You can configure the dataflow by using the Azure IoT Operations portal or by creating a Dataflow custom resource. Before you create a dataflow, you must configure dataflow endpoints for the data sources and destinations.

The following example is a dataflow configuration with an MQTT source endpoint, transformations, and a Kafka destination endpoint:

apiVersion: connectivity.iotoperations.azure.com/v1beta1
kind: Dataflow
metadata:
  name: my-dataflow
spec:
  profileRef: my-dataflow-profile
  mode: enabled
  operations:
    - operationType: source
      name: my-source
      sourceSettings:
        endpointRef: mq
        dataSources:
          - thermostats/+/telemetry/temperature/#
          - humidifiers/+/telemetry/humidity/#
        serializationFormat: json
    - operationType: builtInTransformation
      name: my-transformation
      builtInTransformationSettings:
        filter:
          - inputs:
              - 'temperature.Value'
              - '"Tag 10".Value'
            expression: "$1*$2<100000"
        map:
          - inputs:
              - '*'
            output: '*'
          - inputs:
              - temperature.Value
            output: TemperatureF
            expression: cToF($1)
          - inputs:
              - '"Tag 10".Value'
            output: 'Tag 10'
        serializationFormat: json
    - operationType: destination
      name: my-destination
      destinationSettings:
        endpointRef: kafka
        dataDestination: factory
Name Description
profileRef Reference to the dataflow profile.
mode Mode of the dataflow: enabled or disabled.
operations[] Operations performed by the dataflow.
operationType Type of operation: source, destination, or builtInTransformation.

Review the following sections to learn how to configure the operation types of the dataflow.

Configure source

To configure a source for the dataflow, specify the endpoint reference and data source. You can specify a list of data sources for the endpoint. For example, MQTT or Kafka topics. The following definition is an example of a dataflow configuration with a source endpoint and data source:

apiVersion: connectivity.iotoperations.azure.com/v1beta1
kind: Dataflow
metadata:
  name: mq-to-kafka
  namespace: azure-iot-operations
spec:
  profileRef: example-dataflow
  operations:
    - operationType: source
      sourceSettings:
        endpointRef: mq-source
        dataSources:
        - azure-iot-operations/data/thermostat
Name Description
operationType source
sourceSettings Settings for the source operation.
sourceSettings.endpointRef Reference to the source endpoint.
sourceSettings.dataSources Data sources for the source operation. Wildcards ( # and + ) are supported.

Configure transformation

The transformation operation is where you can transform the data from the source before you send it to the destination. Transformations are optional. If you don't need to make changes to the data, don't include the transformation operation in the dataflow configuration. Multiple transformations are chained together in stages regardless of the order in which they're specified in the configuration.

spec:
  operations:
  - operationType: builtInTransformation
    name: transform1
    builtInTransformationSettings:
      datasets:
        # ...
      filter:
        # ...
      map:
        # ...
Name Description
operationType builtInTransformation
name Name of the transformation.
builtInTransformationSettings Settings for the builtInTransformation operation.
builtInTransformationSettings.datasets Add other data to the source data given a dataset and condition to match.
builtInTransformationSettings.filter Filter the data based on a condition.
builtInTransformationSettings.map Move data from one field to another with an optional conversion.

Enrich: Add reference data

To enrich the data, you can use the reference dataset in the Azure IoT Operations distributed state store (DSS). The dataset is used to add extra data to the source data based on a condition. The condition is specified as a field in the source data that matches a field in the dataset.

Name Description
builtInTransformationSettings.datasets.key Dataset used for enrichment (key in DSS).
builtInTransformationSettings.datasets.expression Condition for the enrichment operation.

Key names in the distributed state store correspond to a dataset in the dataflow configuration.

For example, you could use the deviceId field in the source data to match the asset field in the dataset:

spec:
  operations:
  - operationType: builtInTransformation
    name: transform1
    builtInTransformationSettings:
      datasets:
      - key: assetDataset
        inputs:
          - $source.deviceId # ------------- $1
          - $context(assetDataset).asset # - $2
        expression: $1 == $2

If the dataset has a record with the asset field, similar to:

{
  "asset": "thermostat1",
  "location": "room1",
  "manufacturer": "Contoso"
}

The data from the source with the deviceId field matching thermostat1 has the location and manufacturer fields available in filter and map stages.

You can load sample data into the DSS by using the DSS set tool sample.

For more information about condition syntax, see Enrich data by using dataflows and Convert data using dataflows.

Filter: Filter data based on a condition

To filter the data on a condition, you can use the filter stage. The condition is specified as a field in the source data that matches a value.

Name Description
builtInTransformationSettings.filter.inputs[] Inputs to evaluate a filter condition.
builtInTransformationSettings.filter.expression Condition for the filter evaluation.

For example, you could use the temperature field in the source data to filter the data:

spec:
  operations:
  - operationType: builtInTransformation
    name: transform1
    builtInTransformationSettings:
      filter:
        - inputs:
          - temperature ? $last # - $1
          expression: "$1 > 20"

If the temperature field is greater than 20, the data is passed to the next stage. If the temperature field is less than or equal to 20, the data is filtered.

Map: Move data from one field to another

To map the data to another field with optional conversion, you can use the map operation. The conversion is specified as a formula that uses the fields in the source data.

Name Description
builtInTransformationSettings.map[].inputs[] Inputs for the map operation
builtInTransformationSettings.map[].output Output field for the map operation
builtInTransformationSettings.map[].expression Conversion formula for the map operation

For example, you could use the temperature field in the source data to convert the temperature to Celsius and store it in the temperatureCelsius field. You could also enrich the source data with the location field from the contextualization dataset:

spec:
  operations:
  - operationType: builtInTransformation
    name: transform1
    builtInTransformationSettings:
      map:
        - inputs:
          - temperature # - $1
          output: temperatureCelsius
          expression: "($1 - 32) * 5/9"
        - inputs:
          - $context(assetDataset).location  
          output: location

To learn more, see Map data by using dataflows and Convert data by using dataflows.

Configure destination

To configure a destination for the dataflow, you need to specify the endpoint and a path (topic or table) for the destination.

Name Description
destinationSettings.endpointRef Reference to the destination endpoint
destinationSettings.dataDestination Destination for the data

Configure destination endpoint reference

To configure the endpoint for the destination, you need to specify the ID and endpoint reference:

spec:
  operations:
  - operationType: destination
    name: destination1
    destinationSettings:
      endpointRef: eventgrid

Configure destination path

After you have the endpoint, you can configure the path for the destination. If the destination is an MQTT or Kafka endpoint, use the path to specify the topic:

- operationType: destination
  destinationSettings:
    endpointRef: eventgrid
    dataDestination: factory

For storage endpoints like Microsoft Fabric, use the path to specify the table name:

- operationType: destination
  destinationSettings:
    endpointRef: adls
    dataDestination: telemetryTable