Airflow Data Image Project GitLab Data Utils Project Meltano Monte Carlo Python Guide Snowflake Guide
To edit the system diagram, go to the Lucidchart document and make the desired changes. If the changes are contained in the red box, the changes should be automatically reflected in GitLab. If the viewing area needs to be changed, then a new link will need to be generated using the Lucidchart documentation to publish, and an MR made to update the link used above to pull in the image.
We use Airflow for all Orchestration.
Both DAG and task naming should be as descriptive as possible so that engineers and other users can understand what a DAG does both within and without the context of the airflow UI. Names should include information about what and how a DAG or task is doing.
As an example consider the DAG and task naming for the gitlab_com_db_extract
DAG, which has tasks with names like gitlab-com-packages-package-files-db-incremental-pgp-extract
. It is tempting to avoid the redundancy between the task and DAG naming, but since we sometimes only see the task name (like in Slack or Prometheus errors, or when using 'Browse' in airflow) it ends up being helpful.
Starting with Summer of 2021 we've introduced a more abbreviated, but hopefully equally descriptive way of describing some of the most common DAGs we create and this convention should be used in all DAG created since. Please use a prefix to indicate whats performed by that DAG. For example, if we had created the gitlab extract DAG after this convention it should be named el_gitlab_com_db_extract
since it performs both extraction and loading phases, whereas greenhouse_extract
would be l_greenhouse
as it only loads extracted data from S3 into Snowflake.
List of prefix indicators
Prefix | Indicator |
---|---|
e |
Extract |
l |
Load |
t |
Transform |
p |
Pump |
m |
Metadata |
All DAGs are created using the KubernetesPodOperator
, so the airflow pod itself has minimal dependencies and doesn't need to be restarted unless a major infrastructure change takes place.
The helm chart of airflow creates 4 pods in the cluster for managing airflow, below:
The scheduler, webserver, and any workers created also include
Additionally, the scheduler and webserver also include:
The install also requires an external postgres DB, which needs to be created manually.
We run in the gitlab-analysis
project in Google Cloud Platform (GCP). Airflow runs in the data-ops
cluster.
Within this cluster there are 7 node pools: highmem-pool
, production-extraction-task-pool
, production-extraction-task-pool-highmem
, dbt-task-pool
, data-science-pool
, sales-analytics-pool
and testing-task-pool
. Each node pool has a dedicated use for ease of monitoring and resource management.
1) highmem-pool
- n1-highmem-8
machine, strictly dedicated to the Airflow server, scheduler, and network components. Does not autoscale.
2) production-extraction-task-pool
- n1-highmem-4
machine, used to run most production Airflow tasks except SCD tasks. Autoscales from 2-5 nodes.
3) production-extraction-task-pool-highmem
- n1-highmem-8
machine, used to run SCD tasks AND generally other resource intensive tasks that need more resources than the production-extraction-task-pool
. Autoscales from 1-3 nodes.
4) dbt-task-pool
- n1-highmem-4
machine, used for everything related to dbt. DAGs running dbt should always be using this task pool. Autoscales from 1-5 nodes.
5) data-science-pool
- n1-highmem-32
machine, used for everything related to data science runs. Autoscales from 0-2 nodes.
6) sales-analytics-pool
- n1-highmem-8
machine, used specifically for the Sales Analytics loads. Autoscales from 0-2 nodes. We might need to change this to always have at least one node always running - at the moment a pod needs to spin up only when the DAG starts running and sometimes it takes too long and the DAG fails.
7) testing-pool
- n1-highmem-4
machine, a pool that does not usually have a running node, but is used to run engineer's locally-launched Airflow tasks. Autoscales from 1-2 nodes.
All node pools except the highmem-pool
have labels and taints to manage which node pool launches which Airflow task.
We intentionally left this pool without any labels and taints so that no loads can ever be allocated to it and it is strictly running Airflow.
For a task to be scheduled in a node pool, a task must have nodeAffinity for the pool and it must have a toleration that matches the taint. See this MR where we added the affinity and toleration for the Slowly-Changing Dimensions task for our postgres pipeline jobs.
This is very important for any new DAGs: the labels and taints are mandatory, otherwise the DAG won't be able to spin up a pod to run in. We have updated all of our DAGs for each of them to run in a pod of a specific kind, depending on their functionality.
The labels are defined on node pool creation and can be found on our Terraform Airflow scripts.
An example DAG running on the sales-analytics-pool
is the Sales Analytics Daily DAG and the specifics about attaching it to a node pool are the following lines of code:
from kubernetes_helpers import get_affinity, get_toleration
...
...
...
affinity=get_affinity("sales_analytics"),
tolerations=get_toleration("sales_analytics"),
When the cluster gets created, it has by default one namespace associated with it named default
. The airflow setup of the project requires two namespace setups in a production setup.
1) Default
2) Testing
Namespace:- In Kubernetes, namespaces provide a mechanism for isolating groups of resources within a single cluster. Names of resources need to be unique within a namespace, but not across namespaces.
Setup the namespace and secret file once the cluster gets provisioned. Default
namespace is present by default, and we only need to create the testing
namespace. names_space_testing.yaml contain information about the namespace details.
Execute command kubectl apply -f name_space_testing.yaml
from the folder airflow_image/manifests/names_space_testing.yaml
. This should create give below as output namespace/testing created
.
To validate the list of namespace created/present use kubectl get namespace
.
After creating the namespace, create the airflow secret in both the namespace default
and testing
.
For the creation of secrets in default
namespace use kube_secret_default.yaml Execute below command kubectl create -f kube_secret_default.yaml
from the directory airflow_image/manifests
Below output you should receive.
secret/airflow created
To check for the secret have been created successfully. Use below command
kubectl get secrets --namespace=default
.
Follow the same steps for Creating the airflow secret in the testing namespace. For the creation of secrets in testing
namespace use kube_secret_testing.yaml.
Execute below command kubectl create -f kube_secret_testing.yaml
from the directory path airflow_image/manifests/
.
All the values for secret file is present in the 1 password data team secure vault as document, i.e kube_secret_testing_namespace
. Add the new secret to the 1pass document.
To edit the kube secrets yaml file:
kubectl edit secret airflow -o yaml --namespace=testing
kubectl edit secret airflow -o yaml --namespace=default
This will open a kubectl...yaml
file where a new secret can be added or existing secret can be edited.
The secret needs to be base64 encoded prior to adding it to the yaml file. This can be done like so:
echo -n 'some_secret'| base64
And to decode a secret (for general reference, not needed when adding a secret):
echo -n "encoded_message"|base64 --decode
Lastly add the new secret to the 1pass secret document, i.e kube_secret_testing_namespace
, this serves as a backup in case the kube cluster goes down.
Note:- Updates to the secret file should be done prior to applying the deployment.yml file
To enable the URL airflow.gitlabdata.com to point to our cluster, a static IP was provisioned in the us-west1 region using the command gcloud compute addresses create airflow-west --region=us-west1
. The IP generated was 35.233.169.210
. This is available by running gcloud compute addresses list
. Note that the static IP must be a regional and not global IP for the TLS steps to work.
This was mapped to the domain in Route 53 by the infrastructure team, documented in this issue.
TLS/SSL certificates are used to protect both the end users' information while it's in transfer, and to authenticate the website's organization identity to ensure users are interacting with legitimate website owners.
New Setup
A certificate and certificate key was generated for the airflow.gitlabdata.com domain via this infrastructure issue
This certificate was saved to a kubernetes secret by running the command
kubectl create secret tls airflow-tls --cert=airflow.gitlabdata.com.chained.crt - -key=airflow.gitlabdata.com.key
.
airflow.gitlabdata.com.chained.crt
and airflow.gitlabdata.com.key
are the actual filenames for the chained certificate and key.
This created the secret airflow-tls
.
Renew of Certificate At the moment this is being done manually. The certificate issued is valid for an year and expriers yearly. New certificate was reissue using this issue. To update the certificate in Kubernetes secret below steps can be followed.
cat airflow.gitlabdata.com.chained.crt | base64 > secret.new
Convert the certificate to base64 encoding.gcloud container clusters get-credentials data-ops --zone us-west1-a --project gitlab-analysis
airflow-tls
kube secret using kubectl edit secrets airflow-tls
The certificate files (cert and key) are also stored in the Data Team Secure vault in 1password under name airflow_tls_certificate_key
.
Airflow logs of all kinds can be found in the Logs Explorer of Google Cloud. This application can be a little bit overwhelming with all of the different logs it collects, so here are some tips:
log fields
view is open. This view is critical in filtering logs down to exactly what you're interested in, whether it be the webserver or a specific DAG run. To open the view, go to PAGE LAYOUT
in the top right and click on it and then make sure Log fields
is checked. The Log fields
view shows up just to the left of the actual logs.Log fields
view, you can filter to logs from specific processes. The container_name is a great filter to use. Most of the containers have to do with the airflow deployment itself. If looking for a specific DAG run, then the container_name is base
which is shared by all DAGs, so more filtering will need to be done. The pod name is unique for each DAG, so the pod name can be filtered on if exploring a specific DAG run's logs.It is recommended for Engineers to have the following permissions for GCP:
For developing with Airflow locally, Engineers will also need a service account. These credentials should point to a service account provisioned by your manager. The account should follow the same pattern as your email i.e. [email protected]
. Recommended permissions are:
* Cloud SQL Client
* Kubernetes Engine Developer
* Storage Object Creator
* Storage Object Viewer
The service account key should be downloaded as JSON and sent to the user for secure storage on their computer.
Create Service Account
There should never be more than one failed DAG run visible for any DAG at one time. For incremental jobs that rely on the execution_date
, such as the extract from the gitlab.com database, any failed DAGs need to have their task instances cleared so that they can be rerun once the fix has been applied.
For jobs that are not dependent on execution_date
the job should be rerun manually when the fix is applied and the failed DAGrun(s) should be deleted. If there is a failed DAGrun for a DAG it should mean that the current state of that DAG is broken
and needs to be fixed.
This will make it easier to glance at the list of DAGs in Airflow and immediately know what needs attention and what doesn't.
If incremental runs are missed for a given DAG or there is missing data in a table, there are two ways to do a backfill. If the table is small and a backfill would be relatively quick then dropping the table and doing a full sync is an option. However, for times when a DAG is stopped due to upstream errors, this may not be possible when there are a large number of tables.
In the latter case, it is better to run the backfill command in the airflow scheduler pod container. The command is:
airflow backfill gitlab_com_db_extract -s 2019-10-30T00:00:00 -e 2019-11-04T12:00:00 --delay_on_limit 30 --reset_dagruns
This will clear any DAGruns and task instances that already exist for the given time frame while also generating any new DAGruns that don't exist for the time frame. The Airflow documentation for the CLI details what the flags are.
Consider using the --task_regex
flag to filter the tasks triggered. This is useful in the case of gitlab.com incremental loads where "pgp-extract" can be used to skip the downstream dbt tasks.
If DAG runs already exist for the timeframe that is being backfilled, the tasks may run concurrently when running the command above. If the DAGs need to run serially:
Actions
–> Delete
.Browse
–> Task Instances
and use the search bar and sorting to select the task instances that need to be ran again. With those task instances selected, go to Actions
–> Clear
to clear the state of those task instances.--reset_dagruns
because there are no dagruns to reset. This should make the backfill run serially.To facilitate the easier use of Airflow locally while still testing properly running our DAGs in Kubernetes, we use docker-compose to spin up local Airflow instances that then have the ability to run their DAG in Kubernetes using the KubernetesPodOperator. See the Docker section to ensure you have the proper environment variables configured.
The flow from code change to testing in Airflow should look like this (this assumes there is already a DAG for that task):
make init-airflow
to spin up the postgres db container and init the Airflow tables, it will also create a generic Admin user. You will get an error if Docker is not running.make airflow
to spin up Airflow and attach a shell to one of the containerslocalhost:8080
to see your own local webserver. A generic Admin user is automatically created for you in MR airflow instances with the username and password set to admin
.airflow run snowflake_load snowflake-load 2019-01-01
(as configured in the docker-compose file, all kube pods will be created in the testing
namespace). Or if you want to run an entire DAG (for instance the dbt
DAG to test the branching logic), the command would be something like airflow backfill dbt -s 2019-01-01T00:00:00 -e 2019-01-01T00:00:00
.There is also a make help
command that describes what commands exist and what they do.
Some gotchas:
ERROR: Version in “./docker-compose.yml” is unsupported.
chmod +x your_python_file.py
. This will avoid permission denied errors.kube_secrets.py
. This is the source of truth for which secrets Airflow uses. The actual secret value isn't stored in this file, just the pointers.docker pull <image_name>
to force a fresh pull of the latest images.Airflow
and pod
operators (Kubernetes
) in GCP
.All DAGs are created using the KubernetesPodOperator
so while working from local we need a cluster where we should be able to spin the pod when running a task in local. In order to make this work we need to ensure we are connected through to a cluster. To connect to cluster use gcloud container clusters get-credentials data-ops --zone us-west1-a --project gitlab-analysis
. In order to make the local setup work in cluset we need to ensure that the we name testing
namespace is created and airflow
secret is also present in the testing namespace. The steps to create this is present above.
Once you have these your local setup should be able to spin the pod in the cluster.
FileNotFoundError: [Errno 2] No such file or directory: '/Users/(user)/google-cloud-sdk/bin/gcloud': '/Users/(user)/google-cloud-sdk/bin/gcloud'
This is because the default install location for the Google Cloud SDK on a Mac is now the above,
but on linux, and in containers it is installed to /usr/lib/google-cloud-sdk/bin/gcloud
. This value is passed
to the container from the /.kube/config
file.
To correct this error all you need to do is edit your /.kube/config
and update the command path parameter to where it will be in the container: /usr/lib/google-cloud-sdk/bin/gcloud
That file gets updated everytime you install the SDK or run this command: gcloud container clusters get-credentials data-ops
.
See related issue (internal link) for more info
Our current implementation uses the following project variables:
The following flavors are defined:
LOAD
flavor is used by the Extract & Load processTRANSFORM
flavor is used by the Transform processTEST
flavor for testing using SnowflakePERMISSION
flavor for the permission botSYSADMIN
flavor for housekeeping tasks (like setting up review instances). This flavor doesn't define SNOWFLAKE_SYSADMIN_DATABASE
and SNOWFLAKE_SYSADMIN_WAREHOUSE
.The following variables are set at the job level depending on the running environment and should not be set in the project settings.
/admin/metrics
endpoint on a regular interval.gitlabdata.orchestration_utils.push_to_xcom_file
has been created. This function takes a JSON object and writes it to the XCom file. This function should only be called once per task. To use a value in an XCom as a metric, the metric must be a first-class member of the JSON object that is written to the XCom file. For example, {"record_count": 5, "other_record_count": 6}
would work if you want to use record_count
and other_record_count
as metrics.all
can be used. Once the configuration is changed, for the metric to be visible, the airflow image will have to be rebuilt and redeployed as explained in the "Restart Deployment and Pods" section.rules/airflow.yml
file. A new alert can be added by adding to the rules
list. The expr
element is a PromQL expression that should return 1 when alerting, and returns 0 otherwise. The for
element defines how long the expression must evaluate to 1
for before the Alert is actually triggered. Labels should include a severity. The severity is currently defined GitLab system-wide, so should be low severity for Airflow rules unless the GitLab infrastructure member on call should pay attention to it. The team
label ultimately determines which slack channel receives the alerts. The team
label should be set to data-analytics
for all alerts for the data team because data-analytics
is the name of the team setup in the runbook service catalog.data-prom-alerts
Slack channel which should be investigated and addressed by the team member on triage.kubectl
as kbc
Sometimes things break and it's not clear what's happening. Here are some common things to check.
analytics
(internal link) or data-image
projectskubectl get pods
and see if above 4 pods comes back in the airflow namespace. Try to exec
into the pod using the container.kubectl get pods -A
and double check if there are any old or stale pods which may be causing a bottle neck in the namespace.kubectl get events --all-namespaces --sort-by='.metadata.creationTimestamp
and look for any recent error messages that may relate.<task> had an event of type Pending
and the task never starts
Connect it to the data team cluster by running -> gcloud container clusters get-credentials data-ops --zone us-west1-a --project gitlab-analysis
Run kubectl get pods
and make sure it returns successfully
ALL OF YOUR COMMANDS TOUCH PRODUCTION, THERE IS CURRENTLY NO TESTING ENVIRONMENT IN Kubernetes. The canonical way to test is to use the local docker-compose setup.
kubectl port-forward deployment/airflow-deployment 1234:8080
. You can now navigate to localhost:1234
in a browser and it will take you to the webserver for the instance you port-forwarded to. Note: We no longer needd to do this as we now have a stable URL to access.helm search repo airflow --versions
.make init-airflow
followed by make airflow
from the base of the analytics repokubectl get all
. This will display any pods, deployments, replicasets, etc.kubectl get pods
command to see a list of all pods in your current namespace.kubectl get pv
and kubectl get pvc
respectively. The command to get persistent volumes will show all volumes regardless of namespace, as persistent volumes don't belong to namespaces. Persistent volume claims do however belong to certain namespaces and therefore will only display ones within the namespace of your current context.The airflow deployment is configured using terraform, running the latest helm chart install in the Airflow Infrastructure repo. To create or update run command in below order
terraform init
(If you are running terraform for first time)terraform destroy
terraform plan -out "plan_to_apply
. (This command will help to check what changes this terraform command will be doing)terraform apply "plan_to_apply"
. (This will apply above plan to GKE clusters)
terraform destroy
. This will wipe out any and all pods (including ones being run by airflow so be careful). Then from the Airflow Infrastructure repository folder, run terraform apply
to send the manifest back up to Kubernetes and respawn the pods.To get into a shell that exists in a kube pod, use the command kubectl exec -ti <pod-name> -c <container-name> /bin/bash
. This will drop you into a shell within the pod and container that you chose. This can be useful if you want to run airflow commands directly within a shell instead of trying to do it through the webserver UI.
kubectl exec -ti airflow-deployment-56658758-ssswj -c scheduler /bin/bash
Is an example command to access that pod and the container named scheduler
. The container names are listed in airflow_image/manifests/deployment.yaml
. This information is also available if you do kubectl describe pod <pod-name>
thought it is harder to read.
docker-compose exec airflow_scheduler bash
docker-compose run airflow_scheduler bash
Things you might do once you're in a shell:
airflow run <dag> <task_name> <execution_date> -f -A
airflow run dbt dbt-full-refresh 05-02T15:52:00+00:00 -f -A
-f
flag forces it to rerun even if there was already a success or failure for that task_run, the -A
flag forces it to ignore dependencies (aka doesn’t care that it wasn’t branched to upstream)kubectl edit secret airflow -o yaml --namespace=airflow
, this will open the secret in a text editor and you can edit it from there. New secrets must be base64 encoded, the easiest way to do this is to use echo -n <secret> | base64 -
. There are some null
values in the secret file when you edit it, for the file to save successfully you must change the null
values to ""
, otherwise it won't save properly.kubectl edit secret airflow -o yaml --namespace testing
. This command follows the same guidelines as those described above for production. If you don't add new secrets to the testing environment, the DAGs that use them will not run when testing.kubectl exec -ti <pod_name> -c <webserver|scheduler> /bin/bash
airflow run <dag_id> <task_id> <execution_date>
will be sufficient.dbt full-refresh
, a few more flags are required. airflow run dbt dbt-full-refresh <execution_date> -f -A
. The -f
flag forces the task to run even if it is already marked as a success of failure. The -A
flag tells it to run regardless of any dependencies it should have.from airflow.models import Variable
and then getting the variable value with Variable.get(variable_name)
.+
button. Type in the desired key and value. If the value should be encrypted, check the box. Then press save. To edit a variable, click on the edit icon next to the variable of interest and change what you would like. Then press save.BAMBOOHR_SKIP_TEST
. The value should be a comma-separated list of the names of the tables you would like to temporarily skip testing. Once the BambooHR extract has succeeded, please either remove the variable or change the value to an empty string.The persistent volume claim for Airflow is defined in persistent_volume.yaml.
Furthermore, the persistent volume is referenced in 2 sections of deployment.yaml:
/usr/local/airflow/logs
To check the current capacity of the persistent volume claim, you can run the following:
kubectl exec -it <airflow-deployment-pod-name> -- df -h /usr/local/airflow/logs
If the persistent volume claim is full, there are two solutions:
For more detail on the 2 solutions, see below.
logs
directorypvc-<GUID>
pdName
under gcePersistentDisk
. This is the GCE Disk that will need to be updatedstorage
which is under spec: capacity:
Alternatively, you can update the persistent_volume.yaml
definition in the project. However, redeploying this may delete the data already in the claim. This has not been tested yet.
Currently we have the following (default) roles in airflow:
Public
Viewer
User
Op
Admin
In the above order, a role gives more permissions in Airflow.
On top of the existing default roles, the following custom roles are also implemented.
In order for non-admin users to update or reset their passwords they need to be given permissions to access their profiles and reset their passwords. This is includes the following permissions and should be granted to all non-admin users.
can this form post on ResetMyPasswordView
can this form get on ResetMyPasswordView
can this form post on UserInfoEditView
can this form get on UserInfoEditView
resetmypassword on UserDBModelView
can edit on UserDBModelView
can userinfo on UserDBModelView
userinfoedit on UserDBModelVie
In order to start a DAG, at least User
permissions is needed. But in order to change variables, Op
is needed. Changing variables is needed to perform a full refresh with a selected (set) of models.
Op
gives a big set off unnecessary permissions for an Analytics Engineer. Only the following is needed:
menu access on Admin
menu access on Variables
can read on Variables
can edit on Variables
To follow the Principle of Least Privilege the 4 mentioned permissions are added to a new role: Analytics_engineer
.
All Analytics Engineers will have User
+ Profile
+ Analytics_engineer
to give them the right permissions in Airflow to execute a DAG.
As an alternative to Docker Desktop we use Rancher desktop as suggested in /handbook/tools-and-tips/mac
. We use version 1.4.1
, which can be found on the rancher dektop releases page.
Make sure that "check for updates automatically" is unchecked as later versions have had problems connecting to our Kubernetes cluster.
We use Docker compose to define and run our different images and applications. These are activated by users via the Makefile.
There are a few environment variables you will need to properly use the Makefile and Docker Compose.
GIT_BRANCH
- typically a feature branch like 2112-my-feature
KUBECONFIG
- Defines where Kubernetes credentials are stored on your local machine. KUBECONFIG="/Users/tmurphy/.kube/config"
GOOGLE_APPLICATION_CREDENTIALS
- CloudSQL credentials for connecting to GCP. Typically points to a JSON file - GOOGLE_APPLICATION_CREDENTIALS="/Users/tmurphy/Projects/GitLab/gcloud_service_creds.json"
The Airflow Infrastructure repo contains the helm charts, kubernetes manifests and terraform scripts required for managing all of the Airflow Infrastructure
The Airflow Image repo contains everything needed to build and push the airflow-image
. This image is only used for local Airflow testing. If a binary needs to be installed it should be done in the Dockerfile directly, python packages should be added to the requirements.txt
file and pinned to a confirmed working version.
The Data Image repo contains everything needed for building and pushing the data-image
. If a binary needs to be installed it should be done in the Dockerfile directly, python packages should be added to the requirements.txt
file and pinned to a confirmed working version. Largely a miscellaneous/utility image.
The CI Python Image repo contains everything needed for building and pushing the data-image
. If a binary needs to be installed it should be done in the Dockerfile directly, python packages should be added to the requirements.txt
file and pinned to a confirmed working version. Image contains all python libraries(pre-built) required for use in the CI pipelines. Exists mostly to lower CI costs.
The DBT Image repo contains everything needed for building and pushing the data-image
. If a binary needs to be installed it should be done in the Dockerfile directly, python packages should be added to the requirements.txt
file and pinned to a confirmed working version. As this image is used by Data Analysts there should not be much more than dbt in the image.
Docker images are built and pushed to the Container Registry in two scenarios:
MR
and commit a change to either requirements.txt
or Dockerfile
. These images are meant to be used for testing prior to merging.There are multiple make
commands and CI jobs designed to help keep the repo's python clean and maintainable. The following commands in the Makefile will help analyze the repo:
make lint
will run the black
python linter and update files (this is not just a check)make pylint
will run the pylint checker but will NOT check for code formatting, as we use black
for this. This will check for duplicated code, possible errors, warnings, etc. General things to increase code quality. It ignores the DAGs dir as those are not expected to follow general code standards.make radon
will test relevant python code for cyclomatic complexity and show functions or modules with a score of B
or lower.make xenon
will run a complexity check that returns a non-zero exit code if the threshold isn't met. It ignores the shared_modules
and transform
repos until they get deleted/deprecated or updated at a later date.Some of the GitLab specific ELTs connect to databases which are in peered GCP projects, such as the usage ping. To allow connections, a few actions have been taken:
We execute our CI jobs in the gitlab-data
group with Kubernetes via the gitlab-analysis
GCP project. We have a group runner setup to share across all repos.
In the case where a new group runner token needs to be associated, or if we need to update the runner image. These are the basic steps. Note - since the release of helm 3, it is recommended that all of these commands be run in the Cloud Shell console in GCP. Navigate to the deployment for the runner (currently gitlab-data-gitlab-runner
) and use the kubectl dropdown to enter the shell.
To get the credentials
gcloud container clusters get-credentials bizops-runner --zone us-west1-a --project gitlab-analysis
To see the helm releases
helm list --namespace <namespace>
To get the chart values for a specific release
helm get values --namespace <namespace> <chartname>
Prep commands
helm repo add <chart> <url>
helm repo update
To delete the runner
helm delete --namespace <namespace> <chartname>
To install the runner with the Helm chart
helm install --namespace <namespace> --name <chartname> -f <valuesfile> <chartrepo/name>
Example for updating the runner version or group token
helm list --namespace gitlab-data
helm get values --namespace gitlab-data gitlab-runner
helm get values --namespace gitlab-data gitlab-runner > values.yaml
Update the runnerRegistrationToken in values.yaml
found at gitlab.com/groups/gitlab-data/-/runners when you click 'Register Group Runner'
helm repo add gitlab https://charts.gitlab.io
helm repo update
helm delete --namespace gitlab-data gitlab-runner
helm install --namespace gitlab-data gitlab-runner -f values.yaml gitlab/gitlab-runner
Our YAML configuration is as follows:
affinity: {}
checkInterval: 30
concurrent: 10
gitlabUrl: https://gitlab.com
hostAliases: []
imagePullPolicy: IfNotPresent
metrics:
enabled: true
nodeSelector: {}
podAnnotations: {}
podLabels: {}
rbac:
clusterWideAccess: false
create: false
podSecurityPolicy:
enabled: false
resourceNames:
- gitlab-runner
serviceAccountName: gitlab-data-gitlab-runner
resources: {}
runnerRegistrationToken: <token found in https://gitlab.com/groups/gitlab-data/-/runner when you click 'Register Group Runner>
runners:
builds: {}
cache: {}
helpers: {}
image: ubuntu:16.04
outputLimit: 4096
pollTimeout: 180
privileged: false
services: {}
tags: analytics,housekeeping
securityContext:
fsGroup: 65533
runAsUser: 100
There's a once-daily CI job that executes in the version project as well as the license project that runs the database export version script or license script and exports CSV files to a GCS bucket. These files are named gitlab-version-{table_name}-{monday_of_week}
or gitlab-license-{table_name}-{monday_of_week}
respectively.
The explanation for version_db
timestamp columns as it is vital to fully understand their meaning:
recorded_at
its time when ServicePing was generated on the client side, we receive usage_data.rb it with payloadcreated_at
and updated_at
are standard Rails datetime columns. In the case of table usage_data
and raw_usage_data
they will always hold the same values, as we don't upsert record, always create new and reflect the timestamp when the payload was received.Problem with late ingested pings in the VERSION_DB
pipeline is sorted out using TIMESTAMP
column _uploaded_at
- the format we captured is yyyy-mm-dd hh24:mi:ss
.
Column _uploaded_at
(started with "_") is the housekeeping column that determines WHEN the record is inserted in the RAW
layer in Snowflake. This column was added to the tables:
raw.version_db.conversational_development_indices
raw.version_db.fortune_companies
raw.version_db.hosts
raw.version_db.raw_usage_data
raw.version_db.usage_data
raw.version_db.usage_ping_errors
raw.version_db.usage_ping_metadata
raw.version_db.users
raw.version_db.version_checks
raw.version_db.versions
The main motivation for introducing _uploaded_at
column is to use it as a mechanism to sort out "late arriving" problem with data in the prep_ping_instance+
lineage. Using this approach, the data will be processed forward in downstream models when they are ingested in Snowflake
despite the time WHEN the ping is created. The entire prep_ping_instance+
lineage is defined to use the _uploaded_at
column as a condition for the incremental load.
Furthermore, the current approach will allow the data flow, even if it arrives later, without running the full-refresh for dbt
models in the lineage.
Note: For the consistency of naming, in the
PREP
andPROD
layer,_uploaded_at
column is exposed as theuploaded_at
column, and the meaning is the same.
The dbt
models which are incremental, will use this condition to load the data:
{% if is_incremental() %}
WHERE _uploaded_at >= (SELECT MAX(_uploaded_at) FROM {{this}})
{% endif %}
In the diagram below, the solution is exposed visually to explain the current state of data loading in the VERSION_DB pipeline.
For more details, refer to gitlab dbt documentation for prep_ping_instance model. The models impacted by this mechanism are:
VERSION_DB
data are aligned with file dump in Google Cloud Storage and dbt
DAG running in Airflow. Below is the representation of the timeline of how pipelines are executed. This setup will allow us to narrow the delay of data load (will load data from the prior day).
The GCS bucket where the version DB CSV files are being exported is setup in Snowflake as the stage raw.version_db.version_dump
. The GCS bucket where the license DB files are exported is setup as the stage raw.license_db.license_dump
. This means from Snowflake, we can list all of the files, copy the data in the files into tables, and even delete files.
The CSV files are not self-describing. They do not have a column header to know what column is in which position. Because of this, the tables in RAW need to have columns in order that exactly match what order the CSVs are in. In order to easily create the tables, this bash script was created. In order to generate the create statements:
do_create_tables.sh
and it should print out all of the create table statements.The CSV files are being loaded daily from the Snowflake stage with snowflake tasks. The tasks were generated running SQL such as
create or replace task users_load_task
WAREHOUSE = LOADING
SCHEDULE = '1440 minute'
AS
COPY INTO users
from @raw.version_db.version_dump/gitlab-version-users-
file_format = (TYPE = CSV FIELD_OPTIONALLY_ENCLOSED_BY='"')
with the LOADER role. These tasks run daily and load only new or updated files. To see the task definitions, you can run show tasks
as the LOADER role with the version_db
or license_db
schema as part of the context.
All Snowflake tables ingested either by Snowfalke Tasks
or SnowPipe
are monitored by default with a MonteCarlo source freshness and volume monitor that makes sure that the loading of data is happening successfully. This is indirectly monitoring if the task is actually working or not. In order to diagnose problems with the tasks, you can query the raw.snowflake.task_history_view
and inspect the error_message
column on failed tasks.
For more details, refer to the section Snowflake SnowPipe and Tasks Triage.
As described in this issue (internal link), there is a process maintained by the people group that periodically queries the Time Off by Deel API and dumps the results into a JSON file in the gitlab-pto
GCS bucket. This bucket is in the gitlab-analysis
GCP project.
To load this data into Snowflake, a Snowpipe for GCS was configured using this Snowflake documentation. A GCP pubsub topic named gitlab-pto-snowpipe
was configured to receive a new message whenever a new file was written to the gitlab-pto
bucket. A GCP Pub/Sub subscription named gitlab-pto-snowpipe
with delivery type Pull
was created to subscribe to the gitlab-pto-snowpipe
topic.
Then, a notification integration was created in Snowflake with the following command:
create notification integration pto_snowpipe_integration
type = queue
notification_provider = gcp_pubsub
enabled = true
gcp_pubsub_subscription_name = 'projects/gitlab-analysis/subscriptions/gitlab-pto-snowpipe';
The Snowpipe was then created using this command:
CREATE OR REPLACE PIPE raw.pto.gitlab_pto
AUTO_INGEST = true
INTEGRATION = 'PTO_SNOWPIPE_INTEGRATION'
AS copy into raw.pto.gitlab_pto (jsontext, uploaded_at)
from (select $1, current_timestamp() as uploaded_at from @raw.pto.pto_load)
file_format=(type='json' strip_outer_array = true);
The Snowpipe was enabled, so it successfully picks up any new files written to the gitlab-pto
bucket:
alter pipe raw.pto.gitlab_pto enable;
If there are any issues with the PTO Snowpipe
, should go to a runbooks page and analyze the issue.
Source PTO
data is saved to gitlab-pto
bucket each Monday (weekly). It means that the PTO Snowpipe
immediately after data is saved to gitlab-pto
bucket loads data into the RAW
layer. MonteCarlo rule for PTO is defined to check the freshness of the data. In case no new data is saved in the RAW
layer, we will be alerted via Slack.
The Demandbase data load was implemented as part of this issue (internal link). Demandbase data is loaded daily, by Demandbase, in parquet format into a GCS bucket owned by Demandbase named datastream-hosted-gitlab-3750
. GitLab's Snowflake-GCS integration service account was granted permission to read and list files within this bucket to load data from Demandbase into Snowflake.
A Snowflake stage was created to interface with the Demandbase data:
CREATE STAGE "RAW".demandbase.data_stream
STORAGE_INTEGRATION = GCS_INTEGRATION
URL = 'gcs://datastream-hosted-gitlab-3750/datastream-gitlab/';
After, a Snowflake task for each demandbase relation was created to load from GCS into the Snowflake RAW database. For example, the Snowflake task loading accounts was defined with:
create task demandbase_account_load_task
WAREHOUSE = LOADING
SCHEDULE = '1440 minute'
AS
copy into raw.demandbase.account (jsontext, uploaded_at)
from (select $1, current_timestamp() as uploaded_at from @raw.demandbase.data_stream/db1_accounts/)
file_format=(type='parquet');
Then the task was enabled by running alter task DEMANDBASE_ACCOUNT_LOAD_TASK resume;
. Each task continues to run daily and loads any new files from GCS into the Snowflake raw.demandbase
schema.
If a full refresh is required due to a failing test leading to an SLO breach, take time to investigate the root cause of the failing test. Make an incident and document any findings before triggering the full refresh. It may also be useful to copy the data to another temporary table to allow the full refresh to continue unimpeded.
For data sources extracted by Stitch (see extraction table on the platform page) steps to be taken to refresh data depend on the connection and table settings.
For Salesforce, where the connection is running key based incremental extractions, the table is best reset using the reset table
feature under table settings. Simply put, this will truncate the table in RAW
and reload/refresh all the data with a complete extract. Stitch manages this appropriately such that there is little disruption to the table in RAW.SALESFORCE_STITCH
Once reset the table will be refreshed on next run. If the data is needed sooner than that then you can trigger the extraction from Stitch, though this will incure an addition cost for the extra job.
If for some reason it is the case that the Stitch connection cannot handle the table refresh without manual intervention the follow process has worked:
create schema clone_<DATA_SCHEMA> clone <DATA_SCHEMA>;
SELECT
'truncate table if exists clone_<DATA_SCHEMA>.' || LOWER(table_name) || ';'
FROM "RAW"."INFORMATION_SCHEMA".tables
WHERE LOWER(table_schema) = '<DATA_SCHEMA>';
Clone <Name of Old Integration>
. For example, for Salesforce, the name of the integration would be Clone SalesForce Stitch
. Authenticate the integration and set up the extraction to match the extraction settings of the current Stitch job.clone_<DATA_SCHEMA>
schema. Perform some sanity check SQL queries to make sure the data looks as expected. Some Stitch data may arrive after the extraction job is complete, make sure to leave some time to ensure all data from Stitch has arrived.<DATA_SCHEMA>
appropriately):SELECT
'ALTER TABLE <DATA_SCHEMA>.' || LOWER(table_name) || ' SWAP WITH clone_<DATA_SCHEMA>.' || LOWER(table_name) || ';'
FROM "RAW"."INFORMATION_SCHEMA".tables
WHERE LOWER(table_schema) = '<DATA_SCHEMA>';
drop schema clone_<DATA_SCHEMA> cascade;
.Data Producers create data in source applications and we extract that data towards the Snowflake data warehouse. A key feature of our ELT processes is to do incremental extractions of source data towards Snowflake AND do incremental loads of data in downstream dbt transformations. The benefit for an incremental load above a full load is efficiency. An incremental load only loads newly added or changed data which is only a small piece of the total data set that is otherwise processed incase of a full load. The incremental logic is typically based on checking for data that has been created or updated since the last incremental extraction or dbt model transformation. In many cases, the data that is created or updated in the source applications on a given calendar day is loaded into an endpoint which our incremental extraction and transformation processes are able to recognize as having been created since the last incremental data extraction and subsequently process in the ELT and make available to end users in both Sisense and Snowflake. However, this ideal state does not always happen and we sometimes need to fully refresh the prep
and prod
databases. This could be because the raw
database requires a full refresh or because an incremental dbt model relies on a data source with the possibilty of back-dated data. What follows is a summary of the incremental behavior in extraction and transformation processess.
Incremental data extraction is the default. To extract data incrementally we are depending on the source, if the source provides a mechanism to identify newly created, updated or deleted data we will use that. Prerequisites on source level to supports incremental data selection:
In dbt, we create models using an incremental configuration whereby the data model checks for records that have been created/updated since the last incremental model run. Those subset of records are transformed and inserted into the table. This is a great feature that optimizes the performance and compute needed to run the model in the daily DAG. Backdated data causes a problem in these models because if there are records available in the raw
database that have created/updated dates in the past, before the last dbt model run, the incremental logic will miss loading these records. This backdated data is the root cause of why we need to do scheduled full refreshes of the prep
and prod
databases. The backdated data has a ripple effect where the source application and raw
database becomes out of sync with the incremental models in the prep
and prod
databases. Therefore, we need to do a full refresh of the incremental models in order to align the incremental dbt models with the data in the raw
database and source application.
Full refreshes are performed when the prerequisites are not met. In some cases there is a full refresh requested ad-hoc. This is likely because of:
Because of the prerequisites data extractions have to deal with full refreshes in the future.
We currently have two options for dbt model full refreshes:
In the target state, we would continue to have the manual full refresh since it has utility for being responsive to the business and building models on an ad-hoc basis. However, we would significantly evolve the cadence of the scheduled, weekly full refresh that happens on Sunday. The target state would be to build and configure the data extractions and transformations in such a way that we could ultimately end-up with a quarterly scheduled full refresh on the models that truly needed to be refreshed. This would take significant collaboration between Business Data Champions, Data Producers, and the Central Data Team. We will take a multi-quarter, iterative approach to arrive at the target state.
There are many variables and considerations that need to be evaluated when determining the full refresh policy for data models in a data lineage. The following sections provide information about data sources and lineages that can used to determine the full refresh policy for data models.
~50m
records per day or ~33TB
in summary (for the entire dataset),Salesforce opportunity data is refreshed on a six-hourly schedule. This is done using the dbt build
command in the dbt_six_hourly
DAG. These models are relatively small and can be fully refreshed in under five-minutes. The models included in this schedule are tagged as six_hourly
:
sfdc_opportunity_source
prep_crm_opportunity
dim_crm_opportunity
fct_crm_opportunity
mart_crm_opportunity
As a boring solution that will have a minimal amount of impact to the business, we will update the scheduled full refresh to be the first Sunday of the month. The monthly cadence provides a balance between cost and data impact. The full refresh is much more costly compared to the regular, incremental dbt DAG that runs daily. This schedule will allow the business to receive the fully refreshed data from the prior month on the same schedule they receive it now. The most important analysis that depend on models with incremental configurations such as service ping and gitlab.com models are generally done on a monthly basis. However, there are important use cases that happen daily or weekly; however, the incremental runs do a good job picking up the data on a daily basis. There is a risk that the analyses that are conducted on daily and weekly cadences would not have the backdated data. We will continue to work on this in iteration 2 where will get more precise with the incremental model configurations and DAGs to allow a period of rolling historical data to full refresh more frequently than monthly.
With the scheduled monthly refresh, we will rebuild the complete prep
and prod
layer via DBT monthly on the first Sunday of the month. In this case, we overrule any existing incremental models delta selection and we are sure we have all the data available on the first Sunday of the month. We have a dedicated data dbt_full_refresh_weekly
DAG. This DAG performs a full refresh of all the models and will be running only on the first Sunday of the month. Currently, the only model excluded from full refresh is +gitlab_dotcom_usage_data_events+
.
As part of the first iteration, we will start the Data Source and Data Lineage Information Used to Determine Full Refresh Policy section of the handbook page. The results of this analysis will be used as an input to the second iteration.
The full refresh is costly and comes with a risk that the analyses that are conducted on daily and weekly cadences would not have the backdated data with the monthly full refresh cadence. Ideally, the full refresh should only be performed on models that require a scheduled full refresh, based on a required cadence. During the 2nd iteration, we will perform the following tasks:
The scope of the models to review will be limited to the model lineages in the COMMON_PREP
, COMMON_MAPPING
, COMMON
, AND COMMON_MART_
schemas and the legacy
models that are used to satsify P1 Product and Engineering Analytics use cases that are still done out of the legacy
schema. The models in the COMMON_
schema are our Trusted Data Models and power the majority of the P1 analytics use cases. However, not all P1 use cases are done using Trusted Data Models so it will be necessary to bring a selection of legacy
models into scope. The models found in these schemas would trace back to source
models in the prep
database that would also be in scope. There are incremental models used in the model lineages from the prep
database through the prod
database. Limiting the scope to these interdependent models will allow us to make a significant iteration without making the task too big.
Determine which incremental models and lineages have no backdated data behavior and do not need to have any scheduled full refresh. We will use the Backdated Data Considerations for Primary Data Sources (add link) as an input to this review. Configure these models and lineages to never full refresh. The Snowplow data source will be a good candidate for this since there is either a low risk or no risk backdated event data. This has to be done on a model by model, lineage by lineage basis to insure the desired results are achieved.
For the models that have backdated data behavior such as service ping and the Gitlab.com Postgres database lineages, consider adding incremental date logic that checks for created/updated records on a rolling 30 day basis. This would allow the daily and weekly analytical use cases to have backdated data available on a rolling 30 day basis. We have installed this technique with the fct_event
model. It adds more compute to the model build, but it strikes a balance between cost, performance, and providing the business with backdated data. We would need to check that all models in the lineage have the same type of rolling days incremental date logic so that all of the backdated data flows through the lineage.
Iteration 3 will focus on moving to a quarterly scheduled full refresh.
In some data sources, we have to handle backdated data which is data that is received later than expected. In the case of an incremental dbt model where a field, typically a created at or updated at timestamp, is used to determine which data has to be loaded into a data model, there is a risk of missing backdated data entries due to some source application behavior that results in backdated data. This could be a data quality issue or it could be the case the data arrives in the source application on a backdated basis.
Use dbt_full_refresh
DAG to force dbt to rebuild the entire incremental model from scratch.
DBT_MODEL_TO_FULL_REFRESH
with name of model(s) to refresh following dbt model selection syntax. For example, to refresh version models, the value would be sources.version staging.version
. To refresh gitlab_dotcom models, the value would be sources.gitlab_dotcom staging.gitlab_dotcom
.
\n
) and generate the below statement, which will exclude the --full-refresh
when executing.
' dbt --no-use-colors run --profiles-dir '
'profile --target prod --models '
'gitlab_dotcom_issues_dedupe_source\n'
' --full-refresh ; ret=$?;\n'
dbt_full_refresh
DAG will be running on TRANSFORMING_XL
warehouse , in order to modify the warehouse size in case of quick full refresh or in case of performance issue, modify the variable DBT_WAREHOUSE_FOR_FULL_REFRESH
to desired warehouse size. For the actual list of the actual warehouse's sizes, check compute-resourcesIf a bigger warehouse than XL
is used, an issue has to be created and Data Leadership has to be tagged. Document in the issue why a bigger warehouse is used. Report in the issue every run that was applicable, which models were refreshed and their starttime, stoptime and status (succeeded
, failed
or aborted
).
dbt command that is run behind is
dbt run --profiles-dir profile --target prod --models DBT_MODEL_TO_FULL_REFRESH --full-refresh
See the following demo video on how to perform a DBT Full Refresh in Airflow:
This is a project for centralizing handy functions we use across the team. Project is https://gitlab.com/gitlab-data/gitlab-data-utils
Steps:
bumpversion --current-version <current-existing-version> patch
In order to use the updated utils package, you will generally have to update these 2 repos as well:
For more instructions, refer to this handbook section.
See the runbook for instructions on how to independently and asynchronously upgrade dbt.