apiVersion: camel.apache.org/v1
kind: Integration
metadata:
  name: parceldemo
spec:
  flows:
    - rest:
        post:
          - to: direct:post
        path: /parcels
        consumes: application/json
        produces: application/json
    - route:
        from:
          uri: direct:post
          steps:
            - log:
                message: 'Received: ${body}'
            - multicast:
                steps:
                  - to:
                      uri: kamelet:kafka-not-secured-sink
                      parameters:
                        topic: parcels
                        bootstrapServers: '{{kafka-brokers}}'
                  - to:
                      uri: kamelet:postgresql-sink
                      parameters:
                        serverName: '{{postgres-server}}'
                        serverPort: '5432'
                        username: postgres
                        password: postgres
                        databaseName: demo
                        query: >-
                          INSERT INTO parcels (id,address) VALUES
                          (:#id,:#address) ON CONFLICT (id)  DO NOTHING
                aggregationStrategy: >-
                  #class:org.apache.camel.processor.aggregate.UseOriginalAggregationStrategy
                parallelProcessing: true
                streaming: true
        id: post
    - route:
        from:
          uri: kamelet:jms-apache-artemis-source
          steps:
            - to:
                uri: xj:identity
                parameters:
                  transformDirection: XML2JSON
            - to:
                uri: kamelet:kafka-not-secured-sink
                parameters:
                  topic: payments
                  bootstrapServers: '{{kafka-brokers}}'
          parameters:
            destinationType: queue
            destinationName: payments
            brokerURL: '{{jms-broker}}'
        id: payment
    - route:
        from:
          uri: kamelet:kafka-not-secured-source
          steps:
            - log:
                message: 'Aggegating: ${body}'
            - unmarshal:
                json:
                  library: jackson
            - aggregate:
                steps:
                  - choice:
                      when:
                        - expression:
                            groovy:
                              expression: >-
                                body.find { it.containsKey('status') }.status ==
                                'confirmed'
                          steps:
                            - marshal:
                                json:
                                  library: jackson
                            - log:
                                message: 'Send to MQTT : ${body}'
                            - to:
                                uri: kamelet:mqtt-sink
                                parameters:
                                  topic: deliveries
                                  brokerUrl: '{{mqtt-broker}}'
                      otherwise:
                        steps:
                          - setBody:
                              expression:
                                groovy:
                                  expression: 'body.find { it.containsKey(''status'') } '
                          - marshal:
                              json:
                                library: jackson
                          - log:
                              message: 'Send to database: ${body}'
                          - to:
                              uri: kamelet:postgresql-sink
                              parameters:
                                serverName: '{{postgres-server}}'
                                serverPort: '5432'
                                username: postgres
                                password: postgres
                                databaseName: demo
                                query: >-
                                  UPDATE parcels set status = 'CANCELED' WHERE
                                  id = :#id
                aggregationStrategy: aggregator
                completionSize: 2
                correlationExpression:
                  groovy:
                    expression: body.get('id')
          parameters:
            topic: parcels,payments
            bootstrapServers: '{{kafka-brokers}}'
            autoCommitEnable: true
            consumerGroup: parceldemo
        id: aggregator
    - route:
        from:
          uri: kamelet:mqtt-source
          steps:
            - log:
                message: 'Delivery: ${body}'
          parameters:
            topic: deliveries
            brokerUrl: '{{mqtt-broker}}'
    - beans:
        - name: aggregator
          type: org.apache.camel.processor.aggregate.GroupedBodyAggregationStrategy
