phenomedb.pipeline_factory

Pipelines can be created, registered with Airflow, and executed via the phenomedb.modules.PipelineFactory. Using this approach removes the requirements for manually writing Airflow DAG files.

Overview

The phenomedb.modules.PipelineFactory is the interface between the user and the PipelineManager (the default of which is the AirflowPipelineManager). The PipelineFactory provides a standardised interface to creating Pipelines. Pipelines are created by add Tasks to a pipeline, and then committing them and then executing them.

PhenomeB `phenomedb.modules.PipelineFactory` Overview

Overview of how the phenomedb.modules.PipelineFactory can be used to create Apache Airflow pipelines

User Interface

The PipelineFactory UI can be used to created parameterised, hard-coded Pipelines/DAGs made of many tasks (executed sequentially).

PhenomeB PipelineFactory UI Example

Example of using a the phenomedb.modules.PipelineFactory UI to create a parameterised pipeline

Python API

The phenomedb.modules.PipelineFactory Python API has more flexibility in creating a phenomedb.models.Pipeline, as it can be used to create two kinds of phenomedb.models.Pipeline, dynamic, or hard-coded.

  • Hard coded: The pipeline parameters are injected into the DAG file, so each instance of a Pipeline will be specific to one project, for example. These are the kinds of Pipelines the UI builds.

  • Dynamic: The pipeline is created with no parameters, to be executed with a run_config (of the same JSON structure as running pipelines through the Airflow UI)

These options are specified using the hard_coded parameter of the phenomedb.modules.PipelineFactory

pipeline_factory = PipelineFactory(pipeline_name='Example hard-coded Pipeline',
                                   description='An Example hard-coded Pipeline',
                                   hard_coded=True)

# Import a Sample Manifest (could use ImportMetadata instead for CSV files)
pipeline_factory.add_task('phenomedb.imports', # module
                          'ImportSampleManifest', # class
                           task_id='importsamplemanifest', # unique per pipeline
                           run_config={
                                "project": "PipelineTesting",
                                "sample_manifest_path":config['DATA']['test_data'] + 'DEVSET_sampleManifest.xlsx',
                                "columns_to_ignore": ['Further Sample info?'],
                                "username": "admin"}
                           )
# Import Bruker IVDr annotations
pipeline_factory.add_task('phenomedb.imports',
                          'ImportBrukerIVDrAnnotations',
                          task_id='importbrukerivdrannotations',
                          upstream_task_id='importsamplemanifest',
                          run_config={
                                "project": "PipelineTesting",
                                "sample_matrix": "plasma",
                                "annotation_method": "Bi-LISA",
                                "unified_csv_path": config['DATA']['test_data'] + 'DEVSET_P_BILISA_combinedData.csv',
                                "username": "admin" }
                           )

# Import Bruker IVDr Bi-LISA annotation metadata
pipeline_factory.add_task('phenomedb.compounds',
                          'ImportBrukerBiLISACompounds',
                          task_id='importbilisacompounds',
                          upstream_task_id='importbrukerivdrannotations',
                          run_config={
                                "bilisa_file": config['DATA']['test_data'] + 'ivdr-bilisa-all.csv',
                                "username": "admin"}
                          )

# Harmonise the metadata (just one field in this example, add one HarmoniseMetadata task per field to be harmonised)
pipeline_factory.add_task('phenomedb.metadata',
                          'HarmoniseMetadata',
                          task_id='harmoniseage',
                          upstream_task_id='importbilisacompounds',
                          run_config={
                                "project": "PipelineTesting",
                                "metadata_field_name": "age",
                                "harmonised_metadata_field_name":"Age",
                                "inbuilt_transform_name":"simple_assignment",
                                "allowed_decimal_places":0,
                                "username": "admin" }
                           )

pipeline_factory.add_task('phenomedb.metadata',
                          'HarmoniseMetadata',
                          task_id='harmonisesex',
                          upstream_task_id='harmoniseage',
                          run_config={
                                "project": "PipelineTesting",
                                "metadata_field_name": "gender",
                                "harmonised_metadata_field_name":"Sex",
                                "lambda_function_string":"lambda x: 'Male' if x.lower().strip() == 'm' else ('Female' if x.lower().strip() == 'f' else 'Unknown' )",
                                "username": "admin" }
                           )

# Create a SavedQuery that will target the data (in this case those individuals under 40).
# Note: When importing annotations, a SavedQuery will be created targeting the imported FeatureDataset

query_factory = QueryFactory(query_name='PipelineTesting IVDr under 40', query_description='test description')
query_factory.add_filter(model='Project', property='name', operator='eq', value='PipelineTesting')
query_factory.add_filter(model='Sample', property='matrix', operator='eq', value='plasma')
query_factory.add_filter(model='Assay', property='name', operator='eq', value='NOESY')
query_factory.add_filter(model='AnnotationMethod', property='name', operator='eq', value='Bi-LISA')
query_factory.add_filter(MetadataFilter('Age','lt',value=40)))
saved_query = query_factory.save_query()

# Add a task to create the SavedQuery dataframe cache for the query
# As this it IVDr, we use the raw values (correction_type=None)
# If we were targeting LC-MS PeakPantheR data, we would use the SR-corrected data (correction_type='SR')
# By default it will harmonise the intensities to mmol/L
pipeline_factory.add_task('phenomedb.cache',
                          'CreateSavedQueryDataframeCache',
                          task_id='createquerycache',
                          upstream_task_id='harmonisesex',
                          run_config={
                                "saved_query_id": saved_query.id,
                                "username": "admin" }
                          )

# Run a PCA with default params
pipeline_factory.add_task('phenomedb.analysis',
                          'RunPCA',
                          task_id='runpca',
                          upstream_task_id='createquerycache',
                           run_config={
                                "saved_query_id": saved_query.id,
                                "username": "admin" }
                           )

# Run an Bonferroni-corrected linear-regression MWAS where harmonised Age is the outcome-of-interest and harmonised Sex is a covariate
pipeline_factory.add_task('phenomedb.analysis',
                          'RunMWAS',
                          task_id='runmwas',
                          upstream_task_id='runpca',
                           run_config={
                                "saved_query_id": saved_query.id,
                                "model_Y_variable": "h_metadata::Age",
                                "model_X_variables": ["h_metadata::Sex"],
                                "method": "linear",
                                "correction_method": "bonferroni",
                                "username": "admin" }
                           )

# Write out the hard coded DAG and register with Airflow
pipeline_factory.commit_definition()

# Run the Pipeline (may block while waiting for Airflow to register the Pipeline)
pipeline_factory.run_pipeline()

The equivalent Pipeline could also be dynamically executed using the following approach

pipeline_factory = PipelineFactory(pipeline_name='Example dynamic Pipeline',
                                   description='An Example dynamic Pipeline',
                                   hard_coded=False)

# Import a Sample Manifest (could use ImportMetadata instead for CSV files)
pipeline_factory.add_task('phenomedb.imports',
                          'ImportSampleManifest',
                           task_id='importsamplemanifest')

pipeline_factory.add_task('phenomedb.imports',
                          'ImportBrukerIVDrAnnotations',
                          task_id='importbrukerivdrannotations',
                          upstream_task_id='importsamplemanifest')

# Import Bruker IVDr Bi-LISA annotation metadata
pipeline_factory.add_task('phenomedb.compounds',
                          'ImportBrukerBiLISACompounds',
                          task_id='importbilisacompounds',
                          upstream_task_id='importbrukerivdrannotations')

# Harmonise the metadata (just one field in this example, add one HarmoniseMetadata task per field to be harmonised)
pipeline_factory.add_task('phenomedb.metadata',
                          'HarmoniseMetadata',
                          task_id='harmoniseage',
                          upstream_task_id='importbilisacompounds')

pipeline_factory.add_task('phenomedb.metadata',
                          'HarmoniseMetadata',
                          task_id='harmonisesex',
                          upstream_task_id='harmoniseage')

# Create a SavedQuery that will target the data (in this case those individuals under 40).
# Note: When importing annotations, a SavedQuery will be created targeting the imported FeatureDataset

query_factory = QueryFactory(query_name='PipelineTesting IVDr under 40', query_description='test description')
query_factory.add_filter(model='Project', property='name', operator='eq', value='PipelineTesting')
query_factory.add_filter(model='Sample', property='matrix', operator='eq', value='plasma')
query_factory.add_filter(model='Assay', property='name', operator='eq', value='NOESY')
query_factory.add_filter(model='AnnotationMethod', property='name', operator='eq', value='Bi-LISA')
query_factory.add_filter(MetadataFilter('Age','lt',value=40)))
saved_query = query_factory.save_query()

# Add a task to create the SavedQuery dataframe cache for the query
# As this it IVDr, we use the raw values (correction_type=None)
# If we were targeting LC-MS PeakPantheR data, we would use the SR-corrected data (correction_type='SR')
# By default it will harmonise the intensities to mmol/L
pipeline_factory.add_task('phenomedb.cache',
                          'CreateSavedQueryDataframeCache',
                          task_id='createquerycache',
                          upstream_task_id='harmonisesex',
                          )

# Run a PCA with default params
pipeline_factory.add_task('phenomedb.analysis',
                          'RunPCA',
                          task_id='runpca',
                          upstream_task_id='createquerycache'
                           )

# Run an Bonferroni-corrected linear-regression MWAS where harmonised Age is the outcome-of-interest and harmonised Sex is a covariate
pipeline_factory.add_task('phenomedb.analysis',
                          'RunMWAS',
                          task_id='runmwas',
                          upstream_task_id='runpca')

# Write out the hard coded DAG and register with Airflow
pipeline_factory.commit_definition()

# Build the dynamically parameterised run_config dictionary
run_config={
            "importsamplemanifest":{
                    "project": "PipelineTesting",
                    "sample_manifest_path":config['DATA']['test_data'] + 'DEVSET_sampleManifest.xlsx',
                    "columns_to_ignore": ['Further Sample info?'],
                    "username": "admin"
                    },
            "importbrukerivdrannotations":{
                        "project": "PipelineTesting",
                        "sample_matrix": "plasma",
                        "annotation_method": "Bi-LISA",
                        "unified_csv_path": config['DATA']['test_data'] + 'DEVSET_P_BILISA_combinedData.csv',
                        "username": "admin" }

                        "bilisa_file": config['DATA']['test_data'] + 'ivdr-bilisa-all.csv',
                        "username": "admin"
                    },
             "harmoniseage":{
                        "project": "PipelineTesting",
                        "metadata_field_name": "age",
                        "harmonised_metadata_field_name":"Age",
                        "inbuilt_transform_name":"simple_assignment",
                        "allowed_decimal_places":0,
                        "username": "admin"
                     },
             "harmonisesex":{
                        "project": "PipelineTesting",
                        "metadata_field_name": "gender",
                        "harmonised_metadata_field_name":"Sex",
                        "lambda_function_string":"lambda x: 'Male' if x.lower().strip() == 'm' else ('Female' if x.lower().strip() == 'f' else 'Unknown' )",
                        "username": "admin"
                     },
            "createquerycache":{
                        "saved_query_id": saved_query.id,
                        "username": "admin"
                     },
            "runpca":{
                        "saved_query_id": saved_query.id,
                        "username": "admin"
                     },
            "runmwas":{
                        "saved_query_id": saved_query.id,
                        "model_Y_variable": "h_metadata::Age",
                        "model_X_variables": ["h_metadata::Sex"],
                        "method": "linear",
                        "correction_method": "bonferroni",
                        "username": "admin" }
            }

# Run the Pipeline (may block while waiting for Airflow to register the Pipeline)
pipeline_factory.run_pipeline(run_config=run_config)

The dynamically parameterised run_config is the same format taken by the Apache-Airflow DAG config, so can be used to execute Dynamic pipelines from the interface

class phenomedb.pipeline_factory.BasePipelineManager(pipeline_name=None, db_env=None, db_session=None, pipeline_id=None)

Abstract BasePipeline class. Extend this class to use another offline worker system, ie RedisQueue.

add_task() and submit() must be implemented.

add_task(task_module, task_class, task_id=None, run_config=None, upstream_task_id=None)

Add a task to the pipeline, either with a run_config for the task or not, and with an upstream_task_id or not.

Parameters:
  • task_module (str) – The module of the task, eg. ‘phenomedb.imports’

  • task_class (str) – The class of the task eg. ‘ImportMetadata’

  • task_id (str, optional) – The unique identifier for the task in the pipeline, defaults to None

  • run_config (dict, optional) – The run_config for the task, e.g. dictionary of {task_id:**kwargs}, defaults to None

  • upstream_task_id (str, optional) – The ID of the upstream task, defaults to None

Raises:
  • PipelineTaskIDError – Another task with that ID already exists

  • PipelineTaskIDError – Task ID cannot start with a number

  • PipelineTaskIDError – Upstream task with that ID does not exist

Returns:

_description_

Return type:

_type_

commit_definition()

Commit the definition using the PipelineManager

run_pipeline()

Run the pipeline

class phenomedb.pipeline_factory.PipelineFactory(pipeline_name=None, pipeline_id=None, description=None, start_date=None, pipeline_folder_path=None, default_args=None, hard_code_data=False, schedule_interval='None', db_env=None, db_session=None, tags=None, sequential=True, max_active_runs=100, concurrency=1)

PipelineFactory class. Default manager is apache-airflow. Most of the options below are airflow specific.

Parameters:
  • pipeline_name (str) – The pipeline name - the main identifier of the pipeline, must be unique

  • pipeline_id – The phenomedb.models.Pipeline ID

  • description (str, optional) – The pipeline description, defaults to None.

  • start_date (datetime.datetime, optional) – The start date of the airflow pipeline, defaults to None.

  • pipeline_folder_path (str, optional) – The path to the pipeline folder (default used otherwise)

  • default_args (dict, optional) – The default arguments of the airflow pipeline, defaults to {‘owner’: ‘airflow’,’retries’:1,’retries_delay’:datetime.timedelta(minutes=5)}.

  • hard_code_data (bool, optional) – Whether to create a dynamically parameterised pipeline or one with hard-coded parameters, default False (dynamic)

  • schedule_interval (str, optional) – How often to run the airflow pipeline, defaults to “@once”.

  • db_env (str, optional) – Which db to use - “PROD”,”BETA”, or “TEST”, defaults to None (“PROD”).

  • tags (list, optional) – The Airflow tags to add (for UI filtering), defaults to [].

  • sequential (bool, optional) – Whether to run tasks sequentially, defaults to True. False means tasks will run concurrently, in no specific order.

  • max_active_runs (int, optional) – How many active runs of a pipeline can there be

  • concurrency (int, optional) – How many concurrent runs of a pipeline can be executed

add_task(task_module, task_class, upstream_task_id=None, run_config=None, task_id=None)

Add a task to the pipeline.

Parameters:
  • task_module (str) – The module of the task.

  • task_class (str) – The class of the task.

  • upstream_task_id (int, optional) – If specified, specifies this task is downstream of the upstream_task_id, defaults to None

  • depends_on_past (bool, optional) – If specified, this will only run when upstream tasks complete, defaults to True

Returns:

task_label: The label of the task.

Return type:

str

commit_definition()

Commit the definition of the pipeline

Returns:

_description_

Return type:

_type_

delete_pipeline()

Delete the pipeline

static get_json_task_spec()

Get the json task_spec

Returns:

the json task spec dict

Return type:

dict

static get_tasks_from_json(modules_to_include=None)

Parse the tasks from the task_spec file

Parameters:

modules_to_include (list, optional) – A list of modules to include, defaults to None

Returns:

task_spec_json

Return type:

dict

pause_pipeline()

Pause the pipeline

run_pipeline(run_config=None, debug=False)

Submit the pipeline to the queue.

Returns:

success (True) or failure (False)

Return type:

bool