Mounting DAGs

DAGs can be mounted by using a ConfigMap or git-sync. This is best illustrated with an example of each, shown in the sections below.

Via ConfigMap

---
apiVersion: v1
kind: ConfigMap
metadata:
  name: cm-dag (1)
data:
  test_airflow_dag.py: | (2)
    from datetime import datetime, timedelta
    from airflow import DAG
    from airflow.operators.bash import BashOperator
    from airflow.operators.dummy import DummyOperator

    with DAG(
        dag_id='test_airflow_dag',
        schedule='0 0 * * *',
        start_date=datetime(2021, 1, 1),
        catchup=False,
        dagrun_timeout=timedelta(minutes=60),
        tags=['example', 'example2'],
        params={"example_key": "example_value"},
    ) as dag:
        run_this_last = DummyOperator(
            task_id='run_this_last',
        )

        # [START howto_operator_bash]
        run_this = BashOperator(
            task_id='run_after_loop',
            bash_command='echo 1',
        )
        # [END howto_operator_bash]

        run_this >> run_this_last

        for i in range(3):
            task = BashOperator(
                task_id='runme_' + str(i),
                bash_command='echo "{{ task_instance_key_str }}" && sleep 1',
            )
            task >> run_this

        # [START howto_operator_bash_template]
        also_run_this = BashOperator(
            task_id='also_run_this',
            bash_command='echo "run_id={{ run_id }} | dag_run={{ dag_run }}"',
        )
        # [END howto_operator_bash_template]
        also_run_this >> run_this_last

    # [START howto_operator_bash_skip]
    this_will_skip = BashOperator(
        task_id='this_will_skip',
        bash_command='echo "hello world"; exit 99;',
        dag=dag,
    )
    # [END howto_operator_bash_skip]
    this_will_skip >> run_this_last

    if __name__ == "__main__":
        dag.cli()
---
apiVersion: airflow.stackable.tech/v1alpha1
kind: AirflowCluster
metadata:
  name: airflow
spec:
  image:
    productVersion: 3.0.6
  clusterConfig:
    loadExamples: false
    exposeConfig: false
    credentialsSecret: simple-airflow-credentials
    volumes:
      - name: cm-dag (3)
        configMap:
          name: cm-dag (4)
    volumeMounts:
      - name: cm-dag (5)
        mountPath: /dags/test_airflow_dag.py (6)
        subPath: test_airflow_dag.py (7)
  webservers:
    roleConfig:
      listenerClass: external-unstable
    roleGroups:
      default:
        envOverrides:
          AIRFLOW__CORE__DAGS_FOLDER: "/dags" (8)
        replicas: 1
  celeryExecutors:
    roleGroups:
      default:
        envOverrides:
          AIRFLOW__CORE__DAGS_FOLDER: "/dags" (8)
        replicas: 2
  schedulers:
    roleGroups:
      default:
        envOverrides:
          AIRFLOW__CORE__DAGS_FOLDER: "/dags" (8)
        replicas: 1
1 The name of the ConfigMap
2 The name of the DAG (this is a renamed copy of the example_bash_operator.py from the Airflow examples)
3 The volume backed by the ConfigMap
4 The name of the ConfigMap referenced by the Airflow cluster
5 The name of the mounted volume
6 The path of the mounted resource. Note that should map to a single DAG.
7 The resource has to be defined using subPath: this is to prevent the versioning of ConfigMap elements which may cause a conflict with how Airflow propagates DAGs between its components.
8 If the mount path described above is anything other than the standard location (the default is $AIRFLOW_HOME/dags), then the location should be defined using the relevant environment variable.

If a DAG mounted via ConfigMap consists of modularized files, Python uses this as a "root" directory when looking for referenced files. If this is the case, then either the standard DAGs location should be used, or PYTHONPATH should be overriden to point to the new location (it is also necessary to include the logging configuration in the path) as shown below:

    envOverrides: &envOverrides
      AIRFLOW__CORE__DAGS_FOLDER: "/dags"
      PYTHONPATH: "/stackable/app/log_config:/dags"

The advantage of this approach is that DAGs are provided "in-line". However, handling multiple DAGs this way becomes cumbersome, as each must be mapped individually. For multiple DAGs, it is easier to expose them via gitsync, as shown below.

Via git-sync

git-sync is a command that pulls a git repository into a local directory and is supplied as a sidecar container for use within Kubernetes. The Stackable Airflow images already ship with git-sync included, and the operator takes care of calling the tool and mounting volumes, so that only the repository and synchronization details are required:

git-sync usage example: https
---
apiVersion: airflow.stackable.tech/v1alpha1
kind: AirflowCluster
metadata:
  name: airflow
spec:
  image:
    productVersion: 3.0.6
  clusterConfig:
    loadExamples: false
    exposeConfig: false
    credentialsSecret: test-airflow-credentials (1)
    dagsGitSync: (2)
      - repo: https://github.com/stackabletech/airflow-operator (3)
        branch: "main" (4)
        gitFolder: "tests/templates/kuttl/mount-dags-gitsync/dags" (5)
        depth: 10 (6)
        wait: 20s (7)
        credentials:
          basicAuthSecretName: git-credentials (8)
        gitSyncConf: (9)
          --rev: HEAD (10)
          # --rev: git-sync-tag # N.B. tag must be covered by "depth" (the number of commits to clone)
          # --rev: 39ee3598bd9946a1d958a448c9f7d3774d7a8043 # N.B. commit must be covered by "depth"
          --git-config: http.sslCAInfo:/tmp/ca-cert/ca.crt (11)
  webservers:
    ...
---
apiVersion: v1
kind: Secret
metadata:
  name: git-credentials (8)
type: Opaque
data:
  user: c3Rh...
  password: Z2l0a...
1 A Secret used for accessing database and admin user details (included here to illustrate where different credential secrets are defined)
2 The git-gync configuration block that contains list of git-sync elements
3 The repository to clone (required)
4 The branch name (defaults to main)
5 The location of the DAG folder, relative to the synced repository root. It can optionally start with /, however, no trailing slash is recommended. An empty string (`) or slash (/`) corresponds to the root folder in Git. Defaults to "/".
6 The depth of syncing i.e. the number of commits to clone (defaults to 1)
7 The synchronisation interval in seconds, e.g. 20s or 1h (defaults to "20s")
8 The name of the Secret used to access the repository if it is not public. This should include two fields: user and password (which can be either a password — which is not recommended — or a GitHub token, as described here)
9 A map of optional configuration settings that are listed in this configuration section (and the ones that follow on that link)
10 An example showing how to specify a target revision (the default is HEAD). The revision can also be a tag or a commit, though this assumes that the target hash is contained within the number of commits specified by depth. If a tag or commit hash is specified, then git-sync recognizes this and does not perform further cloning.
11 Git-sync settings can be provided inline, although some of these (--dest, --root) are specified internally in the operator and are ignored if provided by the user. Git-config settings can also be specified, although a warning is logged if safe.directory is specified as this is defined internally, and should not be defined by the user.
git-sync usage example: ssh
---
apiVersion: airflow.stackable.tech/v1alpha1
kind: AirflowCluster
metadata:
  name: airflow
spec:
  clusterConfig:
    dagsGitSync:
      - repo: ssh://git@github.com/stackable-airflow/dags.git (1)
        credentials:
          sshPrivateKeySecretName: git-sync-ssh (2)
...

---
apiVersion: v1
kind: Secret
metadata:
  name: git-sync-ssh (2)
type: Opaque
data:
  key: LS0tL...
  knownHosts: Z2l0a...
1 The name of the Secret used to access the repository if it is not public. This should include two fields: key and knownHosts, both of which can contain multiple entries.
2 The secret referenced above.
git-sync can be used with DAGs that make use of Python modules, as Python is configured to use the git-sync target folder as the "root" location when looking for referenced files. See the Applying Custom Resources example for more details.