How to create a heterogeneous Neo4j data loading pipeline framework (fast)

tl;tr;
Motherlode is a tool written in python, to run multiple programms (as docker images/containers), in an ordered sequence.
It’s a little bit similar to a docker-compose but specialized in running sequences of containers and loading data into a Neo4j database.
We use this tool in the CovidGraph project and at the German Center for Diabetes Research (DZD)
Improvise first …
When the first iteration of the CovidGraph team manifested, many team members wrote code to load stuff in our Neo4j database.
The first few hours of the Project it was suitable to just let the team members run their loading code and review the result together. We looked at the new nodes and relationships and considered which nodes are similar or related to existing nodes and could therefore be connected.
… but that won’t last forever …
Soon enough the first problems with this approach revealed themselves:
- No traceability ๐๏ธ
- We could not easily understand which data loader already ran (let alone which version)
- We could not easily point to a piece of code that reveals the content of the pipeline
- No dependency management ๐๏ธ
- As soon as there were multiple data loaders with interlaced nodes, there were dependencies between the data loaders. We had to run the data loaders in a specific order to get things right. This order got complex fast.
- Hard to reproduce ๐
- Without knowing the order of the data loaders and without knowing where the loaders are stored, it is almost impossible to reproduce the Neo4j data state. We need a central place to store this information. A single point of truth for operation and documentation
We needed a tool to fix these problems and we had some additional requirements:
- Run all the things! โง๏ธ
- As an open community project, we have contributions in potentially every language out there. Every language and every coder has its own style and imagination of how to run code. The pipeline has to be agnostic and just run the stuff. If we would have established, for example, to just accept Java code, every Python-Dev would have lost interest.In short: Run heterogeneous code!
- Config all the things! ๐ง
- Early in the project we also realized that having multiple running environments of CovidGraph would make our lives easier. Speaking; a development environment to break things and a productive environment to have stable and reliable endpoints/apps for consumers.The pipeline should be easily adaptable to point to a specific database.
- Gather all the things (in the right version)! ๐งบ
- After having a bunch of different data loaders nobody wants to be burdened with collecting all the data loaders from different repositories and clone or pull the newest version, each time running the pipeline.The pipeline should pull the latest or correct loaders itself.
- Save all the time! ๐๏ธ
- To accelerate the runtime of the tool, it should acknowledge data loaders that already ran, and skip these. The data is already in the database, we can save time here.\
Therefore, we need to save the status of the pipeline.
- To accelerate the runtime of the tool, it should acknowledge data loaders that already ran, and skip these. The data is already in the database, we can save time here.\
… containers and Python to the rescue!
The solution materialized as a homegrown tool written in python which utilities Docker and its ecosystem (via the docker-py library) to fulfill our requirements;
Introducing Motherlode: named after a mining term; a principal vein which miners are following to fill their mining carts ๐
Simply put, Motherlode takes a list of remote docker images, pulls them, and runs them in an ordered manner as docker containers. While there, it hands over some environment variables (e.g. The Neo4j connection details).
On top, it logs pipeline-runs and each container-run as nodes into the Neo4j database. This fixes the traceability ๐๏ธ issue.
Let’s have a look at a small example pipeline:
We want to run a script, that creates a public database user and another that loads some data in our database.
ExamplePipeline:
- name: PREP_LOADER_PUBLIC_USER
info_link: https://git.connect.dzd-ev.de/dzdconnectpipeline/dataloading_createpublicusers
image_repo: registry-gl.connect.dzd-ev.de:443/dzdconnectpipeline/dataloading_createpublicusers
tag: latest
- name: DATALOADER_PUBMED_PUBLICATIONS
info_link: https://git.connect.dzd-ev.de/dzdtools/pubmedsucker
image_repo: registry-gl.connect.dzd-ev.de:443/dzdtools/pubmedsucker
tag: prod
env_vars:
CONFIGS_PUBMED_SOURCE: "https://git.connect.dzd-ev.de/dzdtools/pubmedsucker/-/raw/master/testdata/debug_cases.xml?inline=false"
dependencies:
- PREP_LOADER_PUBLIC_USER
As we can see the pipeline is described in YAML (JSON is also supported)
At the top, we have a name for our pipeline (ExamplePipeline
in our case) followed by an array of pipeline members. Each entry in the array defines a docker container configuration. A little bit similar to a service entry in a docker-compose file.
To tackle the dependency management ๐๏ธ issue we can use the dependencies
parameter per entry. The pipeline will make sure that containers run only if their dependencies ran first.
The pipeline does not care which stuff is running inside the container. The docker container API is our abstraction layer to be agnostic. We can run heterogeneous code โง๏ธ
The pipeline will provide an environment variable NEO4J
which contains the connection details to the target Neo4j database (e.g. {'host':'myHost.com', 'port': 7687, 'password':'supersecret'}
).
Additionally, we can set individual vars via env_vars
and configure our containers. We solved the config issue ๐ง
The docker SDK comes with a tool to pull containers from a docker registry. The pipeline will always check for newer versions of a container before running. It gathers ๐งบ all the loaders before running. I would say let’s check off reproducibility ๐ .
… did we just fix all the problems and fulfilled all requirements we had? Yes, nice!
Reflect
As a reward let’s have a small image to visualize most of the components we just talked about and let’s summarize what we have achieved:
We can now:
- Automatically pull defined set of container images
- Automatically run these images as containers set in a coordinated sequence
- Provide global and local configuration to the pipeline and/or containers
- Save the state of the pipeline in a Neo4j database, resume the last state in our next pipeline run
- Share our pipeline fast and simple with others
- Adapt/Change our pipeline fast and simple
- Run our pipeline simple and fast in different environments
C’mon, let’s push the button …
To run our example pipeline, the easiest way is to write a short docker-compose script:
version: "3.7"
services:
motherlode:
# This is our pipeline framework
image: "registry-gl.connect.dzd-ev.de:443/dzdtools/motherlode:stable"
# To simplify database connection we run in host mode
network_mode: "host"
# We provide our NEO4J Config
environment:
CONFIGS_NEO4J: "{'host':'myHost.com', 'port': 7687, 'password':'supersecret'}"
volumes:
# We provide the docker socket to the pipeline. This will give the pipeline the power to pull/start/stop containers
- /var/run/docker.sock:/var/run/docker.sock:ro
# The mount our pipeline description into the pipeline framework
- ./pipeline.yaml:/data/pipeline.yaml
# Optionaly we can save away the log files.
- ./log:/log
With a docker-compose up
we can now see the pipeline making stuff ๐
tim@TimDesktop:/tmp/testpipe$ docker-compose up
Starting testpipe_motherlode_1 ... done
Attaching to testpipe_motherlode_1
motherlode_1 | 2021-06-08 19:38:34,877 copili.registry WARNING: Can not find a .env file at '/data/.env'. Will skip .env file parsing.
motherlode_1 | 2021-06-08 19:38:35,064 copili INFO: =====Start 'ExamplePipeline'======
motherlode_1 | 2021-06-08 19:38:35,665 PREDPLOADER_PUBLIC_USER INFO: Try to pull image 'registry-gl.connect.dzd-ev.de:443/dzdconnectpipeline/dataloading_createpublicusers' ...
motherlode_1 | 2021-06-08 19:38:36,038 PREDPLOADER_PUBLIC_USER INFO: ...image 'registry-gl.connect.dzd-ev.de:443/dzdconnectpipeline/dataloading_createpublicusers:latest' pulled.
motherlode_1 | 2021-06-08 19:38:36,289 copili INFO: Check dependencies for PREDPLOADER_PUBLIC_USER:
motherlode_1 | 2021-06-08 19:38:36,289 PREDPLOADER_PUBLIC_USER INFO: Starting container PREDPLOADER_PUBLIC_USER from image registry-gl.connect.dzd-ev.de:443/dzdconnectpipeline/dataloading_createpublicusers:latest
motherlode_1 | 2021-06-08 19:38:39,473 PREDPLOADER_PUBLIC_USER INFO: Clean up PREDPLOADER_PUBLIC_USER
motherlode_1 | 2021-06-08 19:38:39,497 PREDPLOADER_PUBLIC_USER INFO: Stopped and removed container PREDPLOADER_PUBLIC_USER - registry-gl.connect.dzd-ev.de:443/dzdconnectpipeline/dataloading_createpublicusers:latest
motherlode_1 | 2021-06-08 19:38:39,686 PREDPLOADER_PUBLIC_USER INFO: ========================================
motherlode_1 | 2021-06-08 19:38:39,686 PREDPLOADER_PUBLIC_USER INFO: FAILED: False
motherlode_1 | 2021-06-08 19:38:39,686 PREDPLOADER_PUBLIC_USER INFO: EXITED with status: 0
motherlode_1 | 2021-06-08 19:38:39,686 PREDPLOADER_PUBLIC_USER INFO: SKIPPED: False
motherlode_1 | 2021-06-08 19:38:39,686 PREDPLOADER_PUBLIC_USER INFO: CANCELED: False
motherlode_1 | 2021-06-08 19:38:39,686 PREDPLOADER_PUBLIC_USER INFO: RUNTIME: 0.053062529133361146 minutes
motherlode_1 | 2021-06-08 19:38:39,686 PREDPLOADER_PUBLIC_USER INFO: ========================================
motherlode_1 | 2021-06-08 19:38:39,686 PREDPLOADER_PUBLIC_USER INFO:
motherlode_1 | 2021-06-08 19:38:40,254 DATALOADER_PUBMED_PUBLICATIONS INFO: Try to pull image 'registry-gl.connect.dzd-ev.de:443/dzdtools/pubmedsucker' ...
motherlode_1 | 2021-06-08 19:38:40,687 DATALOADER_PUBMED_PUBLICATIONS INFO: ...image 'registry-gl.connect.dzd-ev.de:443/dzdtools/pubmedsucker:prod' pulled.
motherlode_1 | 2021-06-08 19:38:40,756 copili INFO: Check dependencies for DATALOADER_PUBMED_PUBLICATIONS:
motherlode_1 | 2021-06-08 19:38:40,756 DATALOADER_PUBMED_PUBLICATIONS INFO: Starting container DATALOADER_PUBMED_PUBLICATIONS from image registry-gl.connect.dzd-ev.de:443/dzdtools/pubmedsucker:prod
motherlode_1 | 2021-06-08 19:38:41,147 DATALOADER_PUBMED_PUBLICATIONS INFO: 2021-06-08T19:38:41.147130107Z Pubmedsucker (Version '0.9.22')
motherlode_1 |
motherlode_1 | 2021-06-08 19:38:41,857 DATALOADER_PUBMED_PUBLICATIONS INFO: 2021-06-08T19:38:41.856265665Z #--------START PHASE 1---------#
motherlode_1 |
motherlode_1 | 2021-06-08 19:38:41,904 DATALOADER_PUBMED_PUBLICATIONS INFO: 2021-06-08T19:38:41.904386002Z Initialize Worker with file: 'https://git.connect.dzd-ev.de/dzdtools/pubmedsucker/-/raw/master/testdata/debug_cases.xml?inline=false'
motherlode_1 |
[...]
motherlode_1 | 2021-06-08 19:38:48,574 DATALOADER_PUBMED_PUBLICATIONS INFO: Clean up DATALOADER_PUBMED_PUBLICATIONS
motherlode_1 | 2021-06-08 19:38:48,610 DATALOADER_PUBMED_PUBLICATIONS INFO: Stopped and removed container DATALOADER_PUBMED_PUBLICATIONS - registry-gl.connect.dzd-ev.de:443/dzdtools/pubmedsucker:prod
motherlode_1 | 2021-06-08 19:38:48,687 DATALOADER_PUBMED_PUBLICATIONS INFO: ========================================
motherlode_1 | 2021-06-08 19:38:48,688 DATALOADER_PUBMED_PUBLICATIONS INFO: FAILED: False
motherlode_1 | 2021-06-08 19:38:48,688 DATALOADER_PUBMED_PUBLICATIONS INFO: EXITED with status: 0
motherlode_1 | 2021-06-08 19:38:48,688 DATALOADER_PUBMED_PUBLICATIONS INFO: SKIPPED: False
motherlode_1 | 2021-06-08 19:38:48,688 DATALOADER_PUBMED_PUBLICATIONS INFO: CANCELED: False
motherlode_1 | 2021-06-08 19:38:48,688 DATALOADER_PUBMED_PUBLICATIONS INFO: RUNTIME: 0.13029183579989573 minutes
motherlode_1 | 2021-06-08 19:38:48,688 DATALOADER_PUBMED_PUBLICATIONS INFO: ========================================
motherlode_1 | 2021-06-08 19:38:48,688 DATALOADER_PUBMED_PUBLICATIONS INFO:
We see our pipeline is running each one of our configured containers.
When we now look into our database, we see there are some nodes (:_PipelineLogRun
and :_PipelineLogNode
) which indicates that the containers have run.
If one container fails, all other containers following, with a dependency to the failed container, won’t run. We could then fix the issues, release a new container and re-run the pipeline. Based on the log nodes, successful containers will not re-run this time, this saves us some time ๐๏ธ. The state of the last pipeline run is saved into the database and will be picked up with the next run.
Links, Links, Links!
- The CovidGraph pipeline: https://github.com/covidgraph/motherlode/blob/master/pipeline.yaml
- Motherlode repo: https://git.connect.dzd-ev.de/dzdtools/motherlode
Next time?
With the next blog post by Tim we will take a look at one way (of many) to create a pipeline member in Python. So stay tuned!