Blog

How to create a heterogeneous Neo4j data loading pipeline framework (fast)

tl;tr;

Motherlode is a tool written in python, to run multiple programms (as docker images/containers), in an ordered sequence.

It’s a little bit similar to a docker-compose but specialized in running sequences of containers and loading data into a Neo4j database.

We use this tool in the CovidGraph project and at the German Center for Diabetes Research (DZD)

Improvise first …

When the first iteration of the CovidGraph team manifested, many team members wrote code to load stuff in our Neo4j database.

The first few hours of the Project it was suitable to just let the team members run their loading code and review the result together. We looked at the new nodes and relationships and considered which nodes are similar or related to existing nodes and could therefore be connected.

… but that won’t last forever …

Soon enough the first problems with this approach revealed themselves:

  • No traceability ๐Ÿ”๏ธ
    • We could not easily understand which data loader already ran (let alone which version)
    • We could not easily point to a piece of code that reveals the content of the pipeline
  • No dependency management ๐Ÿ–‡๏ธ
    • As soon as there were multiple data loaders with interlaced nodes, there were dependencies between the data loaders. We had to run the data loaders in a specific order to get things right. This order got complex fast.
  • Hard to reproduce ๐Ÿ”
    • Without knowing the order of the data loaders and without knowing where the loaders are stored, it is almost impossible to reproduce the Neo4j data state. We need a central place to store this information. A single point of truth for operation and documentation

We needed a tool to fix these problems and we had some additional requirements:

  • Run all the things! โšง๏ธ
    • As an open community project, we have contributions in potentially every language out there. Every language and every coder has its own style and imagination of how to run code. The pipeline has to be agnostic and just run the stuff. If we would have established, for example, to just accept Java code, every Python-Dev would have lost interest.In short: Run heterogeneous code!
  • Config all the things! ๐Ÿ”ง
    • Early in the project we also realized that having multiple running environments of CovidGraph would make our lives easier. Speaking; a development environment to break things and a productive environment to have stable and reliable endpoints/apps for consumers.The pipeline should be easily adaptable to point to a specific database.
  • Gather all the things (in the right version)! ๐Ÿงบ
    • After having a bunch of different data loaders nobody wants to be burdened with collecting all the data loaders from different repositories and clone or pull the newest version, each time running the pipeline.The pipeline should pull the latest or correct loaders itself.
  • Save all the time! ๐Ÿ•๏ธ
    • To accelerate the runtime of the tool, it should acknowledge data loaders that already ran, and skip these. The data is already in the database, we can save time here.\
      Therefore, we need to save the status of the pipeline.

… containers and Python to the rescue!

The solution materialized as a homegrown tool written in python which utilities Docker and its ecosystem (via the docker-py library) to fulfill our requirements;

Introducing Motherlode: named after a mining term; a principal vein which miners are following to fill their mining carts ๐Ÿ™‚

Simply put, Motherlode takes a list of remote docker images, pulls them, and runs them in an ordered manner as docker containers. While there, it hands over some environment variables (e.g. The Neo4j connection details).

On top, it logs pipeline-runs and each container-run as nodes into the Neo4j database. This fixes the traceability ๐Ÿ”๏ธ issue.

Let’s have a look at a small example pipeline:

We want to run a script, that creates a public database user and another that loads some data in our database.

ExamplePipeline:
  - name: PREP_LOADER_PUBLIC_USER
    info_link: https://git.connect.dzd-ev.de/dzdconnectpipeline/dataloading_createpublicusers
    image_repo: registry-gl.connect.dzd-ev.de:443/dzdconnectpipeline/dataloading_createpublicusers
    tag: latest
  - name: DATALOADER_PUBMED_PUBLICATIONS
    info_link: https://git.connect.dzd-ev.de/dzdtools/pubmedsucker
    image_repo: registry-gl.connect.dzd-ev.de:443/dzdtools/pubmedsucker
    tag: prod
    env_vars:
      CONFIGS_PUBMED_SOURCE: "https://git.connect.dzd-ev.de/dzdtools/pubmedsucker/-/raw/master/testdata/debug_cases.xml?inline=false"
    dependencies:
      - PREP_LOADER_PUBLIC_USER

As we can see the pipeline is described in YAML (JSON is also supported)

At the top, we have a name for our pipeline (ExamplePipeline in our case) followed by an array of pipeline members. Each entry in the array defines a docker container configuration. A little bit similar to a service entry in a docker-compose file.

To tackle the dependency management ๐Ÿ–‡๏ธ issue we can use the dependencies parameter per entry. The pipeline will make sure that containers run only if their dependencies ran first.

The pipeline does not care which stuff is running inside the container. The docker container API is our abstraction layer to be agnostic. We can run heterogeneous code โšง๏ธ

The pipeline will provide an environment variable NEO4J which contains the connection details to the target Neo4j database (e.g. {'host':'myHost.com', 'port': 7687, 'password':'supersecret'}).

Additionally, we can set individual vars via env_vars and configure our containers. We solved the config issue ๐Ÿ”ง

The docker SDK comes with a tool to pull containers from a docker registry. The pipeline will always check for newer versions of a container before running. It gathers ๐Ÿงบ all the loaders before running. I would say let’s check off reproducibility ๐Ÿ” .

… did we just fix all the problems and fulfilled all requirements we had? Yes, nice!

Reflect

As a reward let’s have a small image to visualize most of the components we just talked about and let’s summarize what we have achieved:

https://nc.covidgraph.org/s/bQsBRdRqTDaWcCR

We can now:

  • Automatically pull defined set of container images
  • Automatically run these images as containers set in a coordinated sequence
  • Provide global and local configuration to the pipeline and/or containers
  • Save the state of the pipeline in a Neo4j database, resume the last state in our next pipeline run
  • Share our pipeline fast and simple with others
  • Adapt/Change our pipeline fast and simple
  • Run our pipeline simple and fast in different environments

C’mon, let’s push the button …

To run our example pipeline, the easiest way is to write a short docker-compose script:

version: "3.7"
services:
  motherlode:
    # This is our pipeline framework
    image: "registry-gl.connect.dzd-ev.de:443/dzdtools/motherlode:stable"
    # To simplify database connection we run in host mode
    network_mode: "host"
    # We provide our NEO4J Config
    environment: 
        CONFIGS_NEO4J: "{'host':'myHost.com', 'port': 7687, 'password':'supersecret'}"
    volumes:
      # We provide the docker socket to the pipeline. This will give the pipeline the power to pull/start/stop containers
      - /var/run/docker.sock:/var/run/docker.sock:ro
      # The mount our pipeline description into the pipeline framework
      - ./pipeline.yaml:/data/pipeline.yaml
      # Optionaly we can save away the log files.
      - ./log:/log

With a docker-compose up we can now see the pipeline making stuff ๐Ÿ™‚

tim@TimDesktop:/tmp/testpipe$ docker-compose up
Starting testpipe_motherlode_1 ... done
Attaching to testpipe_motherlode_1
motherlode_1  | 2021-06-08 19:38:34,877 copili.registry  WARNING: Can not find a .env file at '/data/.env'. Will skip .env file parsing.
motherlode_1  | 2021-06-08 19:38:35,064 copili           INFO: =====Start 'ExamplePipeline'======
motherlode_1  | 2021-06-08 19:38:35,665 PREDPLOADER_PUBLIC_USER INFO: Try to pull image 'registry-gl.connect.dzd-ev.de:443/dzdconnectpipeline/dataloading_createpublicusers' ...
motherlode_1  | 2021-06-08 19:38:36,038 PREDPLOADER_PUBLIC_USER INFO: ...image 'registry-gl.connect.dzd-ev.de:443/dzdconnectpipeline/dataloading_createpublicusers:latest' pulled.
motherlode_1  | 2021-06-08 19:38:36,289 copili           INFO: Check dependencies for PREDPLOADER_PUBLIC_USER:
motherlode_1  | 2021-06-08 19:38:36,289 PREDPLOADER_PUBLIC_USER INFO: Starting container PREDPLOADER_PUBLIC_USER from image registry-gl.connect.dzd-ev.de:443/dzdconnectpipeline/dataloading_createpublicusers:latest
motherlode_1  | 2021-06-08 19:38:39,473 PREDPLOADER_PUBLIC_USER INFO: Clean up PREDPLOADER_PUBLIC_USER
motherlode_1  | 2021-06-08 19:38:39,497 PREDPLOADER_PUBLIC_USER INFO: Stopped and removed container PREDPLOADER_PUBLIC_USER - registry-gl.connect.dzd-ev.de:443/dzdconnectpipeline/dataloading_createpublicusers:latest
motherlode_1  | 2021-06-08 19:38:39,686 PREDPLOADER_PUBLIC_USER INFO: ========================================
motherlode_1  | 2021-06-08 19:38:39,686 PREDPLOADER_PUBLIC_USER INFO: FAILED: False
motherlode_1  | 2021-06-08 19:38:39,686 PREDPLOADER_PUBLIC_USER INFO: EXITED with status: 0
motherlode_1  | 2021-06-08 19:38:39,686 PREDPLOADER_PUBLIC_USER INFO: SKIPPED: False
motherlode_1  | 2021-06-08 19:38:39,686 PREDPLOADER_PUBLIC_USER INFO: CANCELED: False
motherlode_1  | 2021-06-08 19:38:39,686 PREDPLOADER_PUBLIC_USER INFO: RUNTIME: 0.053062529133361146 minutes
motherlode_1  | 2021-06-08 19:38:39,686 PREDPLOADER_PUBLIC_USER INFO: ========================================
motherlode_1  | 2021-06-08 19:38:39,686 PREDPLOADER_PUBLIC_USER INFO: 
motherlode_1  | 2021-06-08 19:38:40,254 DATALOADER_PUBMED_PUBLICATIONS INFO: Try to pull image 'registry-gl.connect.dzd-ev.de:443/dzdtools/pubmedsucker' ...
motherlode_1  | 2021-06-08 19:38:40,687 DATALOADER_PUBMED_PUBLICATIONS INFO: ...image 'registry-gl.connect.dzd-ev.de:443/dzdtools/pubmedsucker:prod' pulled.
motherlode_1  | 2021-06-08 19:38:40,756 copili           INFO: Check dependencies for DATALOADER_PUBMED_PUBLICATIONS:
motherlode_1  | 2021-06-08 19:38:40,756 DATALOADER_PUBMED_PUBLICATIONS INFO: Starting container DATALOADER_PUBMED_PUBLICATIONS from image registry-gl.connect.dzd-ev.de:443/dzdtools/pubmedsucker:prod
motherlode_1  | 2021-06-08 19:38:41,147 DATALOADER_PUBMED_PUBLICATIONS INFO: 2021-06-08T19:38:41.147130107Z Pubmedsucker (Version '0.9.22')
motherlode_1  | 
motherlode_1  | 2021-06-08 19:38:41,857 DATALOADER_PUBMED_PUBLICATIONS INFO: 2021-06-08T19:38:41.856265665Z #--------START PHASE 1---------#
motherlode_1  | 
motherlode_1  | 2021-06-08 19:38:41,904 DATALOADER_PUBMED_PUBLICATIONS INFO: 2021-06-08T19:38:41.904386002Z Initialize Worker with file: 'https://git.connect.dzd-ev.de/dzdtools/pubmedsucker/-/raw/master/testdata/debug_cases.xml?inline=false'
motherlode_1  | 
[...]
motherlode_1  | 2021-06-08 19:38:48,574 DATALOADER_PUBMED_PUBLICATIONS INFO: Clean up DATALOADER_PUBMED_PUBLICATIONS
motherlode_1  | 2021-06-08 19:38:48,610 DATALOADER_PUBMED_PUBLICATIONS INFO: Stopped and removed container DATALOADER_PUBMED_PUBLICATIONS - registry-gl.connect.dzd-ev.de:443/dzdtools/pubmedsucker:prod
motherlode_1  | 2021-06-08 19:38:48,687 DATALOADER_PUBMED_PUBLICATIONS INFO: ========================================
motherlode_1  | 2021-06-08 19:38:48,688 DATALOADER_PUBMED_PUBLICATIONS INFO: FAILED: False
motherlode_1  | 2021-06-08 19:38:48,688 DATALOADER_PUBMED_PUBLICATIONS INFO: EXITED with status: 0
motherlode_1  | 2021-06-08 19:38:48,688 DATALOADER_PUBMED_PUBLICATIONS INFO: SKIPPED: False
motherlode_1  | 2021-06-08 19:38:48,688 DATALOADER_PUBMED_PUBLICATIONS INFO: CANCELED: False
motherlode_1  | 2021-06-08 19:38:48,688 DATALOADER_PUBMED_PUBLICATIONS INFO: RUNTIME: 0.13029183579989573 minutes
motherlode_1  | 2021-06-08 19:38:48,688 DATALOADER_PUBMED_PUBLICATIONS INFO: ========================================
motherlode_1  | 2021-06-08 19:38:48,688 DATALOADER_PUBMED_PUBLICATIONS INFO: 

We see our pipeline is running each one of our configured containers.

When we now look into our database, we see there are some nodes (:_PipelineLogRun and :_PipelineLogNode ) which indicates that the containers have run.

log_nodes

If one container fails, all other containers following, with a dependency to the failed container, won’t run. We could then fix the issues, release a new container and re-run the pipeline. Based on the log nodes, successful containers will not re-run this time, this saves us some time ๐Ÿ•๏ธ. The state of the last pipeline run is saved into the database and will be picked up with the next run.

Next time?

With the next blog post by Tim we will take a look at one way (of many) to create a pipeline member in Python. So stay tuned!

 

 

Leave a Comment

Your email address will not be published. Required fields are marked *

Related Posts

Launching HealthECCO

Launching HealthECCO

We are excited to announce the launch of HealthECCO, a non-profit association that...
MaSyMoS

MaSyMoS

We are pleased to announce that we have added a new data source to CovidGraph: The...
Website Updates

Website Updates

Since we launched this website for HealthECCO a few weeks ago, we have been busy...