Airflow Data Image Project GitLab Data Utils Project Python Style Guide
We use Airflow for all Orchesetration.
All DAGs are created using the KubernetesPodOperator
, so the airflow pod itself has minimal dependencies and doesn't need to be restarted unless a major infrastructure change takes place.
There are 4 containers running in the current Airflow deployment as defined in the deployment.yml:
We run in the gitlab-analysis
project in GCP. Airflow runs in the data-ops
cluster. Within this cluster there are 2 nodepools: highmem-pool
and scd-1
. Most every job will run in the highmem-pool
nodepool.
The scd-1
nodepool is labeled pgp=scd
and it also has a taint on it of scd=true
. For a job to be scheduled in this pool a task must have nodeAffinity for the pool and it must have a toleration that matches the taint. See this MR where we added the affinity and toleration for the Slowly-Changing Dimensions task for our postgres pipeline jobs.
There should never be more than one failed DAG run visible for any DAG at one time. For incremental jobs that rely on the execution_date
, such as the extract from the gitlab.com database, any failed DAGs need to have their task instances cleared so that they can be rerun once the fix has been applied.
For jobs that are not dependent on execution_date
the job should be rerun manually when the fix is applied and the failed DAGrun(s) should be deleted. If there is a failed DAGrun for a DAG it should mean that the current state of that DAG is broken
and needs to be fixed.
This will make it easier to glance at the list of DAGs in Airflow and immediately know what needs attention and what doesn't.
If incremental runs are missed for a given DAG or there is missing data in a table, there are two ways to do a backfill. If the table is small and a backfill would be relatively quick then dropping the table and doing a full sync is an option. However, for times when a DAG is stopped due to upstream errors, this may not be possible when there are a large number of tables.
In the latter case, it is better to run the backfill command in the airflow scheduler pod container. The command is:
airflow backfill gitlab_com_db_extract -s 2019-10-30 -e 2019-11-04 --delay_on_limit 30 --reset_dagruns
This will clear any DAGruns and task instances that already exist for the given time frame while also generating any new DAGruns that don't exist for the time frame. The Airflow documentation for the CLI details what the flags are.
To facilitate the easier use of Airflow locally while still testing properly running our DAGs in Kubernetes, we use docker-compose to spin up local Airflow instances that then have the ability to run their DAG in Kubernetes using the KubernetesPodOperator.
The flow from code change to testing in Airflow should look like this (this assumes there is already a DAG for that task):
make init-airflow
to spin up the postgres db container and init the Airflow tables, it will also create a generic Admin user. You will get an error if Docker is not running.make airflow
to spin up Airflow and attach a shell to one of the containerslocalhost:8080
to see your own local webserver. A generic Admin user is automatically created for you in MR airflow instances with the username and password set to admin
.airflow run snowflake_load snowflake-load 2019-01-01
(as configured in the docker-compose file, all kube pods will be created in the testing
namespace). Or if you want to run an entire DAG (for instance the dbt
DAG to test the branching logic), the command would be something like airflow backfill dbt -s 2019-01-01T00:00:00 -e 2019-01-01T00:00:00
.There is also a make help
command that describes what commands exist and what they do.
Some gotchas:
ERROR: Version in “./docker-compose.yml” is unsupported.
chmod +x your_python_file.py
. This will avoid permission denied errors.kube_secrets.py
. This is the source of truth for which secrets Airflow uses. The actual secret value isn't stored in this file, just the pointers.docker pull <image_name>
to force a fresh pull of the latest images.Our current implementation uses the following project variables:
The following flavors are defined:
LOAD
flavor is used by the Extract & Load processTRANSFORM
flavor is used by the Transform processTEST
flavor for testing using SnowflakePERMISSION
flavor for the permission botSYSADMIN
flavor for housekeeping tasks (like setting up review instances). This flavor doesn't define SNOWFLAKE_SYSADMIN_DATABASE
and SNOWFLAKE_SYSADMIN_WAREHOUSE
.The following variables are set at the job level dependending on the running environment and should not set in the project settings.
kubectl
as kbc
default_secrets.yaml
.Connect it to the data team cluster by running -> gcloud container clusters get-credentials data-ops --zone us-west1-a --project gitlab-analysis
Run kubectl get pods
and make sure it returns successfully
ALL OF YOUR COMMANDS TOUCH PRODUCTION, THERE IS CURRENTLY NO TESTING ENVIRONMENT IN K8S. The canonical way to test is to use the local docker-compose setup.
kubectl port-forward deployment/airflow-deployment 1234:8080
. You can now navigate to localhost:1234
in a browser and it will take you to the webserver for the instance you port-forwarded to. Note: We no longer needd to do this as we now have a stable URL to access.airflow_image/Dockerfile
, the line looks like ARG AIRFLOW_VERSION=<version_number>
exec
into one of the containers in the pod and run airflow upgradedb
kubectl get all
. This will display any pods, deployments, replicasets, etc.kubectl get pods
command to see a list of all pods in your current namespace.kubectl get pv
and kubectl get pvc
respectively. The command to get persistent volumes will show all volumes regardless of namespace, as persistent volumes don't belong to namespaces. Persistent volume claims do however belong to certain namespaces and therefore will only display ones within the namespace of your current context.If you need to force a pod restart, either because of Airflow lockup, continual restarts, or refreshing the Airflow image the containers are using, run kubectl delete deployment airflow-deployment
. This will wipe out any and all pods (including ones being run by airflow so be careful). Run kubectl apply -f airflow_image/manifests/deployment.yaml
to send the manifest back up to k8s and respawn the pods.
The resource manifests for kubernetes live in airflow-image/manifests/
. To create or update these resources in kubernetes first run kubectl delete deployment airflow-deployment
and then run kubectl apply -f <manifest-file.yaml>
. Because we are using a persistent volume that can only be claimed by one pod at a time we can't use the the usual kubectl apply -f
for modifications. A fresh deployment must be set up each time.
To get into a shell that exists in a kube pod, use the command kubectl exec -ti <pod-name> -c <container-name> /bin/bash
. This will drop you into a shell within the pod and container that you chose. This can be useful if you want to run airflow commands directly within a shell instead of trying to do it through the webserver UI.
kubectl exec -ti airflow-deployment-56658758-ssswj -c scheduler /bin/bash
Is an example command to access that pod and the container named scheduler
. The container names are listed in airflow_image/manifests/deployment.yaml
. This information is also available if you do kubectl describe <pod>
thought it is harder to read.
Things you might do once you're in a shell:
airflow run <dag> <task_name> <execution_date> -f -A
airflow run dbt dbt-full-refresh 05-02T15:52:00+00:00 -f -A
-f
flag forces it to rerun even if there was already a success or failure for that task_run, the -A
flag forces it to ignore dependencies (aka doesn’t care that it wasn’t branched to upstream)kubectl edit secret airflow -o yaml
, this will open the secret in a text editor and you can edit it from there. New secrets must be base64 encoded, the easiest way to do this is to use echo -n <secret> | base64 -
. There are some null
values in the secret file when you edit it, for the file to save successfully you must change the null
values to ""
, otherwise it won't save properly.kubectl exec -ti <pod_name> -c <webserver|scheduler> /bin/bash
airflow run <dag_id> <task_id> <execution_date>
will be sufficient.dbt full-refresh
, a few more flags are required. airflow run dbt dbt-full-refresh <execution_date> -f -A
. The -f
flag forces the task to run even if it is already marked as a success of failure. The -A
flag tells it to run regardless of any dependencies it should have.The data_image
directory contains everything needed for building and pushing the data-image
. If a binary needs to be installed it should be done in the Dockerfile directly, python packages should be added to the requirements.txt
file and pinned to a confirmed working version.
The airflow_image
dir contains everything needed to build and push not only the airflow-image
but also the corresponding k8s deployment manifests. The only manual work that needs to be done for a fresh deployment is setting up an airflow
secret. The required secrets can be found in airflow_image/manifests/secret.template.yaml
.
The default
airflow instance is the production instance, it uses the airflow
postgres db. The testing
instance uses the airflow_testing
db.
The default
instance logs are stored in gs://gitlab-airflow/prod
, the testing
instance logs are stored in gs://gitlab-airflow/testing
The dbt_image
directory contains everything needed for building and pushing the data-image
. If a binary needs to be installed it should be done in the Dockerfile directly, python packages should be added to the requirements.txt
file and pinned to a confirmed working version. As this image is used by Data Analysts there should not be much more than dbt in the image.
There are multiple make
commands and CI jobs designed to help keep the repo's python clean and maintainable. The following commands in the Makefile will help analyze the repo:
make lint
will run the black
python linter and update files (this is not just a check)make pylint
will run the pylint checker but will NOT check for code formatting, as we use black
for this. This will check for duplicated code, possible errors, warnings, etc. General things to increase code quality. It ignores the DAGs dir as those are not expected to follow general code standards.make radon
will test relevant python code for cyclomatic complexity and show functions or modules with a score of B
or lower.make xenon
will run a complexity check that returns a non-zero exit code if the threshold isn't met. It ignores the shared_modules
and transform
repos until they get deleted/deprecated or updated at a later date.Some of the GitLab specific ELTs connect to databases which are in peered GCP projects, such as the usage ping. To allow connections, a few actions have been taken:
We execute our CI jobs in the gitlab-data
group with Kubernetes via the gitlab-analysis
GCP project. We have a group runner setup to share across all repos.
In the case where a new group runner token needs to be associated, or if we need to update the runner image. These are the basic steps. Note - since the release of helm 3, it is recommended that all of these commands be run in the Cloud Shell console in GCP. Navigate to the deployment for the runner (currently gitlab-data-gitlab-runner
) and use the kubectl dropdown to enter the shell.
To get things installed
brew install kubernetes-helm
gcloud components install kubectl
To get the credentials
gcloud container clusters get-credentials bizops-runner --zone us-west1-a --project gitlab-analysis
To see the helm releases
helm list
To get the chart values for a specific release
helm get values <release_name>
Prep commands
helm init --client-only
helm repo add gitlab https://charts.gitlab.io
helm repo update
To delete a release
helm del --purge <release_name>
To install a release
helm install --namespace <namespace> --name <release_name> -f values.yaml <chart_name>
Example for updating the runner version or group token
gcloud components update # might not have to do in cloud shell
helm list
helm get values gitlab-data
helm get values gitlab-data > values.yml
nano values.yml # Update values
helm repo list
helm repo add gitlab https://charts.gitlab.io
helm list
helm del --purge gitlab-data
helm install --namespace gitlab-data --name gitlab-data -f values.yaml gitlab/gitlab-runner