Workflows and ERRONEOUS state

When you configure a workflow module for SAP BTP Workflow service, one of the possible state is ERRONEOUS, which means something wen wrong. Let's take a service call as an example. 404 errors and similar you can handle directly, because you get the response back and possibly you can do something within the workflow like prepare a special path for it etc. ERRONEOUS state is when you are hit harder - for example there was a timeout when calling the endpoint or problems with the destination (like authorization issues).

What can you do when you model the workflow for such cases? The answer is not tricky. Nothing.

I don't see any way to:

  • model an alternative path etc. I don't see any option when modeling a workflow.
  • be informed about such cases. My thoughts went to BTP's Alert Notification service, which says "Create and receive real-time alerts about your services" but its integration with the workflow service allows you only to create a workflow when something happens and this "something" does not include failed workflows...

So, your workflow just goes into ERRONEOUS state and that's all. The only way to monitor such issues (at least based on the documentation) is to use Monitor Workflows app. There is nothing similar like an exception subproccess in Integration Flows in SAP Integration Suite.

The thing is that such failure usually is a significant event which I would like to tackle as quick as possible. I would like to have a possibility to model this and try to do as much as possible in an automated fashion (or just receive a notification). The optimistic thinking ("it will be very rare") might not be acceptable and if you end with hope that it will be monitored and eventually handled manually as soon as possible...well, hope is not a strategy.

CAP-based handler

The idea is to have something which can automatically check if there are some erroneous workflows, if yes - analyze and do something if possible - like restart the workflow, send an event to SAP Event Mesh and so on; whatever makes sense and you can code this.

For such task you need to cooperate with BTP's Workflow API for Cloud Foundry and the best way to talk with a remote service is to use the smartest SAP programming tool which is Cloud Application Programming model (CAP).

The sample presented here is based on SAP BTP Trial.

Preparations

The first thing was to update the workflow instance (called wm_workflow in my case) and add required scopes for getting workflows instances, error messages, starting an instance etc. (add whatever is needed, required scopes are in the CF Workflow API docs in SAP Business Hub):

cf update-service wm_workflow -c '{"authorities": ["WORKFLOW_DEFINITION_GET",
"WORKFLOW_INSTANCE_START", "WORKFLOW_INSTANCE_GET", "WORKFLOW_INSTANCE_GET_ERROR_MESSAGES", "WORKFLOW_INSTANCE_GET_EXECUTION_LOGS"]}'

Having this I initialized my project and imported the workflow API JSON file with:

cds import SAP_CP_Workflow_CF.json --from openapi

After this the remote service is added to the project:

remote service

package.json is automatically updated and the service is added as required. Additionally I added the impl pointing where I will add my own logic for this service:

package.json

In SAP_CP_Workflow_CF.csn file you can see how OpenAPI functions are imported as functions with all parameters, types and annotations like paths and so on.

CSN file

The next thing is to connect the remote service to my workflow service instance in the BTP account using a destination. In BTP I have one created when I ran the booster, but it has the type OAuth2JWTBearer which I was not able to have working. I created another destination workflow with the type OAuth2ClientCredentials; the data I took from my workflow service instance key.

destination

I added it to my remote service configuration for production:

config for production

For local testing I used CAP's hybrid testing, using cds bind for binding to remote destinations.

Implementation for BTP Cloud Foundry environment

Having the configuration done, I created very simple model and service. My POC goal was to get only workflows in ERRONEOUS state and run some logic for it. Entities are simply reusing the Workflow OpenAPI types imported previously.

The model...

namespace wf;

using {Workflow.API.for.Cloud.Foundry as CFWorkflow} from '../srv/external/SAP_CP_Workflow_CF';
using {Workflow.API.for.Cloud.Foundry_types as CFWorkflowTypes} from '../srv/external/SAP_CP_Workflow_CF';

entity WorkflowInstances : CFWorkflowTypes.WorkflowInstance {
      errors : Association to many WorkflowInstanceErrors;
};

entity WorkflowInstanceErrors @(cds.autoexpose) : CFWorkflowTypes.WorkflowInstanceErrorMessage {}

...and the service:

using {wf} from '../db/schema';

service FailedWorkflowsService {
  @readonly entity WorkflowInstances as projection on wf.WorkflowInstances;
}

For requesting the data from the Cloud Foundry Workflow API, manual coding is required. It will be later used to implement getting the data from my CDS model in FailedWorkflowsService. I reused the approach and code presented by Robert Witt, which do some magic to "translate" the call from CDS framework to OpenAPI request.

The file workflow_cf.js:

class WorkflowCFService extends cds.RemoteService {
  async init() {
    this.before("*", "*", (req) => {
      const fullyQualifiedName = this.namespace + "." + req.event;
      const definition = this.model.definitions[fullyQualifiedName];

      req.method = this._getMethod(definition);
      req.path = this._getPath(definition, req.data || {});
      req.data = {};
      req.event = undefined;
    });

    await super.init();
  }

  _getMethod(definition) {
    return definition["@openapi.method"] || definition.kind === "action"
      ? "POST"
      : "GET";
  }

  _getPath(definition, data) {
    // Maps the parameters to path segments
    const mapPathSegment = (segment) => {
      const match = segment.match(/(?<=\{)(.*)(?=\})/g); // matches e. g. {placeholder}
      if (!match) {
        // No placeholder
        return segment;
      }

      const param = match[0];
      const paramValue = data[param];
      if (paramValue === undefined || paramValue === null) {
        throw new CapError(
          400,
          `Value for mandatory parameter '${param}' missing`
        );
      }

      return paramValue.toString();
    };

    // Construct the path to the endpoint by replacing placeholders with actual parameter values
    const path = definition["@openapi.path"]
      .split("/")
      .map(mapPathSegment)
      .join("/");

    const queryString = this._getQueryParams(definition, data).toString();
    return path + (queryString.length ? "?" + queryString : "");
  }

  _getQueryParams(definition, data) {
    const queryParams = new URLSearchParams();
    Object.entries(data)
      .filter(([key]) => definition.params?.[key]?.["@openapi.in"] === "query")
      .filter(([, value]) => value !== undefined && value !== null)
      .forEach(([key, value]) => queryParams.set(key, value.toString()));

    return queryParams;
  }
}

module.exports = WorkflowCFService;

Now I can use this remote service implementation for my CDS service's READ operations - whenever there is a READ event to get workflow instances, it should return all instances in ERRONEOUS state with error messages (by calling the Cloud Foundry Workflow API using my remote service)

The file failed_workflows.js:

class FailedWorkflowsService extends cds.ApplicationService {
  async init() {
    this.on("READ", `WorkflowInstances`, async (req) => {
      const workflowSrv = await cds.connect.to(
        "Workflow.API.for.Cloud.Foundry"
      );

      const instances = await workflowSrv.v1_workflow_instances({
        status: "ERRONEOUS",
      });

      // enrich with error messages
      for (const i of instances) {
        i.errors = await workflowSrv.v1_workflow_instances__error_messages({
          workflowInstanceId: i.id,
        });
      }

      return instances;
    });

    await super.init();
  }
}

module.exports = FailedWorkflowsService;

Now the test - I created some successful and failing workflows in my Trial account. Starting my project with cds watch --profile hybrid I get:

cds watch output

Getting WorkflowInstances gives me all failed workflows with error details:

endpoint call result

Having this information (and more if needed - like execution log etc.) you can judge what to do - maybe retry the workflow with different parameters, run different workflow, send some notification - whatever makes sense in this particular case. Such handling can be set as periodic job to keep an eye failing cases; there might be also some logic needed for marking already handled workflow instances, so they are not picked next time.

Scheduling the handler

For running a scheduled job SAP BTP Job Scheduling can be used but at the moment of writing this article the service does not work correctly in the BTP Trial offering. But using it in a production system means additional 💲💲💲 and since CAP runs on Node.js, you could use libs like node-cron:

File server.js with cds.once usage - see the docs. It will run handleErroneousWorkflows function every 30 minutes. Inside the function all erroneous workflows are read and can be handled if required.

const cds = require("@sap/cds");
const cron = require("node-cron");

cds.once("listening", () => {
  cron.schedule("*/30 * * * *", handleErroneousWorkflows);
});

async function handleErroneousWorkflows() {
  const srv = await cds.connect.to("FailedWorkflowsService");
  const instances = await srv.run(SELECT.from("WorkflowInstances"));

  if (instances && instances.length > 0) {
    console.log(`Handling ${instances.length} erroneous workflows`);

    for (const i of instances) {
      // check the failed workflow whether something useful can be done
      // ...
    }
  }
}

module.exports = cds.server;

After deployment the handler will run as scheduled and fire the prepared logic:

BTP log

Adjustment for BTP Kyma environment

As a POC I also wanted to deploy the handler in BTP's Kyma environment with slightly changed approach - to leverage Kubernetes native CronJob instead of node-cron module. I moved the function handleErroneousWorkflows from server.js (and removed this file) to the service and exposed is as an action. It also returns the number of found erroneous workflows.

failed_workflow.cds:

using {wf} from '../db/schema';

service FailedWorkflowsService {
  @readonly entity WorkflowInstances as projection on wf.WorkflowInstances;
  action handleErroneousWorkflows() returns Integer
}

failed_workflow.js:

class FailedWorkflowsService extends cds.ApplicationService {
  async init() {
    this.on("READ", `WorkflowInstances`, async (req) => {
      const workflowSrv = await cds.connect.to(
        "Workflow.API.for.Cloud.Foundry"
      );

      const instances = await workflowSrv.v1_workflow_instances({
        status: "ERRONEOUS",
      });

      // enrich with error messages
      for (const i of instances) {
        i.errors = await workflowSrv.v1_workflow_instances__error_messages({
          workflowInstanceId: i.id,
        });
      }

      return instances;
    });

    await super.init();
  }

  async handleErroneousWorkflows() {
    const srv = await cds.connect.to("FailedWorkflowsService");
    const instances = await srv.run(SELECT.from("WorkflowInstances"));
    console.log("Checking if there are erroneous workflows to handle...");

    if (instances && instances.length > 0) {
      console.log(`Handling ${instances.length} erroneous workflows`);

      for (const i of instances) {
        // check the failed workflow whether something useful can be done
        // ...
      }
    }

    return instances ? instances.length : 0;
  }
}

module.exports = FailedWorkflowsService;

The action became available as an action import in the CAP backend:

action import

For such modified project I started preparing for deployment in Kyma:

cds build --production
cds add helm
pack build wozjac/wf-error-handler --path gen/srv --builder paketobuildpacks/builder:base -t wozjac/wf-error-handler:1.0.0
docker login
docker push wozjac/wf-error-handler:1.0.0

I updated my Helm values according to the guide. and deployed it using Helm:

helm upgrade --install wf-error-handler .\chart\ --namespace wf --create-namespace

And after a while all resources became available:

k8s deploy result

As it can be seen in the Kyma's resource graph, by default Helm chart prepared an API rule to expose the CDS generated project.

kyma graph

Now the CronJob. For testing purposes I was not adding it via Helm, but just directly. The CAP backend is protected - XSUAA was added when I was preparing CF version and Helm chart is picked them into my deployment:

Services in Kyma

This also means that the CronJob I'd like to deploy, which will be using this CAP backend, needs to be authorized. As this is just test and minimalistic approach, I set up the job + ConfigMap with a script using curl + jq to trigger the action exposed as an action import. The credentials for the OAuth flow (client credentials) are taken from the k8s Secret, created as a result of the service binding for XSUAA. The script is injected from the config map using a volume.

secret from service binding

File cronjob.yaml:

apiVersion: batch/v1
kind: CronJob
metadata:
  name: wr-error-handler-cronjob
  namespace: wf
spec:
  schedule: "*/2 * * * *"
  concurrencyPolicy: Forbid
  jobTemplate:
    spec:
      template:
        spec:
          containers:
            - name: bb
              image: badouralix/curl-jq
              env:
                - name: CLIENT_ID
                  valueFrom:
                    secretKeyRef:
                      name: wf-error-handler-srv-auth
                      key: clientid
                - name: CLIENT_SECRET
                  valueFrom:
                    secretKeyRef:
                      name: wf-error-handler-srv-auth
                      key: clientsecret
                - name: TOKEN_URL
                  valueFrom:
                    secretKeyRef:
                      name: wf-error-handler-srv-auth
                      key: url
              command: ["/bin/sh"]
              args: ["/home/call_handler.sh"]
              volumeMounts:
                - name: script
                  mountPath: "/home"
          volumes:
            - name: script
              configMap:
                name: wf-error-handler-configmap
                defaultMode: 0777
          restartPolicy: Never
---
apiVersion: v1
kind: ConfigMap
metadata:
  name: wf-error-handler-configmap
  namespace: wf
data:
  call_handler.sh: |
    #!/bin/sh
    token=$(curl -d client_id=$CLIENT_ID -d client_secret=$CLIENT_SECRET 
    -d grant_type=client_credentials $TOKEN_URL/oauth/token 2>/dev/null | jq -r '.access_token')
    curl -H "Authorization: Bearer $token" -H "Content-Type: application/json" -d {} https://wf-error-handler-srv-wf.da0a435.kyma.ondemand.com/failed-workflows/handleErroneousWorkflows

After kubectl apply -f cronjob.yaml the cron job started to trigger the function for handling erroneous workflows (every 2 minutes):

cronjob execution

As my error handler function returns the number of erroneous workflows in response, I can just check the logs of the pod (which is created when executing the cron job) to see that it is called:

cron job pod log