The Pipelet Readme v1.1

Pipelet is a free framework allowing for the creation, execution and browsing of scientific data processing pipelines. It provides:

Both engine and web interface are written in Python. As Pipelet is all about chaining processing written in Python or using Python as a glue language, prior knowledge of this language is required.

Table of Contents

1 Tutorial

1.1 Introduction

1.1.1 Why using pipelines

The pipeline mechanism allows to apply a sequence of processing steps to some data, in a way that the input of each process is the output of the previous one. Making visible these different processing steps, in the right order, is essential in data analysis to keep track of what you did, and make sure that the whole processing remains consistent.

1.1.2 How it works

Pipelet is based on the possibility to save on disk every intermediate input or output of a pipeline, which is usually not a strong constraint but offers a lot of benefits. It means that you can stop the processing whenever you want, and start it again without recomputing the whole thing: you just take the last products you have on disk, and continue the processing where it stopped. This logic is interesting when the computation cost is higher than the cost of disk space required by intermediate products.

In addition, the Pipelet engine has been designed to process data sets. It takes advantage of the parallelisation opportunity that comes with data which share the same structure (data arrays), to dispatch the computational tasks on parallel architecture. The data dependency scheme is also used to save CPU time, and allows to handle very big data sets processing.

1.1.3 The Pipelet functionalities

Pipelet is a free framework which helps you :

  • to write and manipulate pipelines with any dependency scheme,
  • to keep track of what processing has been applied to your data and perform comparisons,
  • to carry pipelines source code from development to production and adapt to different hardware and software architectures.

1.1.4 What's new in v1.1

  • Speed improvement during execution and navigation to handle pipeline of 100 thousand tasks.
  • Task repository versionning to manage group_by directive which uses different parent tasks list.
  • New glob_seg type utility to search data files from parent task only + improvement of I/O and parameters utilities. See The segment environment.
  • Improvement of external dependencies management : the depend directive induces a copy of external dependencies, the version number (together with RCS revision if exist) of the imported modules are output.
  • Pickle file render available from the Web interface

1.2 Getting started

1.2.1 Pipelet installation

  • Dependencies
    • Running the Pipelet engine requires Python >= 2.6.
    • The web interface of Pipelet requires the installation of the cherrypy3 Python module (on Debian: aptitude install python-cherrypy3).
    • Although default Python installation provides the sqlite3 module, you may not be able to use it. In that case, you can manually install the pysqlite2 module.

    You may find useful to install some generic scientific tools that nicely interact with Pipelet:

    • numpy
    • matplotlib
    • latex
  • Getting Pipelet
    • Software status

      The first version of the software is currently in the process of stabilisation. The Pipelet engine has now reached the level of desired sophistication. On the other hand, the user interface has been developped in a minimalist way. It includes the main functionalities but with a design which could and will be more user friendly.

    • Getting last pipelet version

      git clone git://gitorious.org/pipelet/pipelet.git

  • Installing Pipelet

    sudo python setup.py install

1.2.2 Running a simple test pipeline

  1. Run the test pipeline

    cd test/first_test

    python main.py

  2. Add this pipeline to the web interface

    pipeweb track test ./.sqlstatus

  3. Set up an account in the access control list and launch the web server

    pipeutils -a username -l 2 .sqlstatus

    pipeweb start

  4. You should be able to browse the result on the web page http://localhost:8080

1.2.3 Getting a new pipe framework

To get a new pipeline framework, with example main and segment scripts :

pipeutils -c pipename

This command ends up with the creation of directory named pipename wich contains:

  • a main script (named main.py) providing functionnalities to execute your pipeline in various modes (debug, parallel, batch mode, …)
  • an example of segment script (default.py) which illustrates the pipelet utilities with comments.

The next section describes those two files in more details.

1.3 Writing Pipes

1.3.1 Pipeline architecture

The definition of a data processing pipeline consists in :

  • a succession of python scripts, called segments, coding each step of the actual processing.
  • a main script that defines the dependency scheme between segments, and launch the processing.

The dependencies between segments must form a directed acyclic graph. This graph is described by a char string using a subset of the graphviz dot language (http://www.graphviz.org). For example the string:

"""
a -> b -> d;
c -> d;
c -> e;
"""

defines a pipeline with 5 segments {"a", "b", "c", "d", "e"}. The relation a->b ensures that the processing of the segment "a" will be done before the processing of its 'child' segment b. Also the output of a will be fed as input for b. In the given example, the node d has two parents b and c. Both will be executed before d. As their is no relation between b and c which of the two will be executed first is not defined.

When executing the segment seg, the engine looks for a python script named seg.py. If not found, it looks iteratively for script files named se.py and s.py. This way, different segments of the pipeline can share the same code, if they are given a name with a common root (this mechanism is useful to write generic segment and is completed by the hooking system, described in the advanced usage section). The code is then executed in a specific namespace (see below The execution environment).

1.3.2 The Pipeline object

Practically, the creation of a Pipeline object requires 3 arguments:

from pipelet.pipeline import Pipeline
P = Pipeline(pipedot, code_dir="./", prefix="./")
  • pipedot is the string description of the pipeline
  • code_dir is the path where the segment scripts can be found
  • prefix is the path to the data repository (see below Hierarchical data storage)

It is possible to output the graphviz representation of the pipeline (requires the installation of graphviz). First, save the graph string into a .dot file with the pipelet function:

P.to_dot('pipeline.dot')

Then, convert it to an image file with the dot command:

dot -Tpng -o pipeline.png pipeline.dot

1.3.3 Dependencies between segments and data parallelism

The modification of the code of one segment will trigger its recalculation and the recalculation of all the segments which depend on it.

The output of a segment is a list of python objects. If a segment as no particular output this list can be empty and do not need to be specified. Elements of the list are allowed to be any kind of pickleable python objects. However, a good practice is to fill the list with the minimal set of characteristics relevant to describe the output of the segment and to defer the storage of the data to appropriate structures and file formats. For example, a segment which performs computation on large images could virtually pass the results of its computation to the following segment using the output list. It is a better practice to store the resulting image in a dedicated file and to pass in the list only the information allowing a non ambiguous identification of this file (like its name or part of it).

The input of a child segment is taken in a set build from the output lists of its parents. The content of the input set is actually tunable using the multiplex directive (see below). However the simplest and default behavior of the engine is to form the Cartesian product of the output list of its parent.

To illustrate this behavior let us consider the following pipeline, build from three segments:

"""
knights -> melt;
quality -> melt;
"""

and assume that the respective output lists of segments knights and quality are:

["Lancelot", "Galahad"]

and:

['the Brave', 'the Pure']

The Cartesian product of the previous set is:

 [('Lancelot','the Brave'), ('Lancelot','the Pure'), ('Galahad','the Brave'), ('Galahad','the
Pure')]

Four instances of segment melt will thus be run, each one receiving as input one of the four 2-tuples.

At the end of the execution of all the instances of a segment, their output lists are concatenated. If the action of segment melt is to concatenate the two strings he receives separated by a space, the final output set of segment melt will be:

[('Lancelot the Brave'), ('Lancelot the Pure'), ('Galahad the Brave'), ('Galahad the Pure')].

This default behavior can be altered by specifying a #multiplex directive in the commentary of the segment code. See section Multiplex directive for more details.

As the segment execution order is not uniquely determined by the pipe scheme (several path may exists), it is not possible to retrieve an ordered input tuple. To overcome this issue, segment inputs are dictionaries, with keywords matching parent segment names. In the above example, one can read melt inputs using:

k = seg_input["knights"]
q = seg_input["quality"]

One can also use dedicated segment routines:

k = get_input("knights")
q = get_input("quality")

See section 'The segment environment' for more details.

1.3.4 Orphan segments

By default, orphan segments (segments without parents) have no input argument (an empty list), and therefore are executed once without input. The possibility is offered to feed input to an orphan segment by pushing a list into the output set of an hypothetic ('phantom') parent. If P is an instance of the pipeline object, this is done by:

P.push (segname=[1,2,3])

From the segment environment, inputs can be retrieve from the dedicated routine:

id = get_input()

In this scheme, it is important to uniquely identify the child tasks of the orphan segment by setting a dedicated output.

seg_output = id

See section The segment environment for more details.

1.3.5 Hierarchical data storage

The framework provides versioning of your data and easy access through the web interface. It also keep track of the code, of the execution logs, and various meta-data of the processing. Of course, you remain able to bypass the hierarchical storage and store your actual data elsewhere, but you will loose the benefit of automated versioning which proves to be quite convenient.

The storage is organized as follows:

  • all pipeline instances are stored below a root which corresponds to the prefix parameter of the Pipeline object. /prefix/
  • all segment meta data are stored below a root which name corresponds to a unique hash computed on the segment code and its dependencies. /prefix/segname_YFLJ65/
  • Segment's meta data are:
    • a copy of the segment python script
    • a copy of all segment hook scripts
    • a parameter file (.args) which contains segment parameters value
    • a meta data file (.meta) which contains some extra meta data
  • all segment instances data and meta data are stored in a specific subdirectory which name corresponds to a string representation of its input prefix by its identifier number /prefix/segname_YFLJ65/data/1_a/
  • if there is a single segment instance, then data are stored in /prefix/segname_YFLJ65/data/
  • If a segment has at least one parent, its root will be located below one of its parent's one : /prefix/segname_YFLJ65/segname2_PLMBH9/
  • etc…

While the hierarchical storage makes easy the storing and handling of many different data with different versions, it can make the manual navigation in the data less convenient. Here comes the role of the web interface (among other advantages, like distant access to the data, tagging…).

1.3.6 The segment environment

The segment code is executed in a specific environment that provides:

  1. access to the segment input and output
    • get_input(seg): return the input coming from segment seg. If no segment specified, take the first. This utility replaces the seg_input variable which type could vary as described below.
    • seg_input: this variable is a dictionary containing the input of the segment. In the general case, seg_input is a python dictionary which contains as many keywords as parent segments. In the case of orphan segment, the keyword used is suffixed by the phantom word. One exception to this is coming from the use of the group_by directive, which alters the origin of the inputs. In this case, seg_input contains the resulting class elements.
    • set_output(o): set the segment output as a list. If o is not a list, set a list of one element [o].
    • seg_output: this variable has to be a list.
  2. Functionalities to use the automated hierarchical data storage system.
    • get_data_fn(basename, seg): complete the filename with the path to the working directory of the segment (default is the current segment).
    • glob_parent(regexp, segs): Return the list of filename matching the pattern y in the data directory of direct parent tasks. It is possible to search only in a specific segment list segs.
    • glob_seg(seg, regexp): Return the list of filename matching the pattern y in the data directory of parent segment x (all task directories are searched, independantly of whether the file comes from a task related to the current task). Its usage should be limited as it:
      • potentially breaks the dependancy scheme.
      • may hurt performances as all task directories of the segment x will be searched.
    • get_tmp_fn(): return a temporary filename.
  3. Functionalities to use the automated parameters handling
    • save_param(lst): the listed parameters will be saved in a dedicated file.
    • expose(lst): the listed parameters will be exposed from the web interface
    • load_param(seg, globals(), lst): retrieve parameters from the meta data.
  4. Various convenient functionalities
    • save_products(filename, globals(), lst_par): use pickle to save a part of the current namespace.
    • load_products(filename, globals(), lst_par): update the namespace by unpickling requested object from the file.
    • logged_subprocess(lst_args): execute a subprocess and log its output in processname.log and processname.err.
    • logger is a standard logging.Logger object that can be used to log the processing
  5. Hooking support Pipelet enables you to write reusable generic segments by providing a hooking system via the hook function. hook(hookname, globals()): execute Python script segname_hookname.py and update the namespace. See the section Hooking system for more details.

1.3.7 Writing a first pipeline

We are now in the position to write a complete simple pipeline. Let us consider the knights example and write the beginning of the main file main.py describing the pipeline:

from pipelet.pipeline import Pipeline

pipedot = """
knights -> melt;
quality -> melt;
"""

P = Pipeline(pipedot, code_dir='./',prefix='./')  

Now, we create the 3 segment files knights.py, quality.py and melt.py. The only action we expect from segment knights is simply to provide a list of knights. Its code is very simple:

set_output(["Lancelot", "Galahad")

Same thing for the segment quality:

set_output (['the Brave', 'the Pure'] )

As explained, the segment melt will be executed four times. We expect from it to concatenate its input and write the result into a file, so the code is:

knight, quality = get_input('knights'), get_input('quality')
f = open(get_data_fn('result.txt'), 'w')
f.write(knight + ' ' + quality+'\n')
f.close()  

We need to complete the main file so that it takes care of the execution (see Running Pipes for more explainations):

from pipelet.pipeline import Pipeline
from pipelet.launchers import launch_interactive
pipedot = """
knights -> melt;
quality -> melt;
"""

P = Pipeline(pipedot, code_dir='./',prefix='./')
w,t = launch_interactive(P)
w.run()

The execution of the main file will run this simple example in the 'interactive' mode provided for debugging purposes. You may add a knight in the list to see only the required part recomputed. More complete examples are described in the example pipelines section. The two remaining sections of the tutorial explain how to use execution mode that enable to exploitation of data parallelism (in this case running the four independent instances of the melt segment in parallel), and how to provide web access to the results.

1.4 Running Pipes

1.4.1 The sample main file

A sample main file is made available when creating a new Pipelet framework. It is copied from the reference file:

pipelet/pipelet/static/main.py

This script illustrates various ways of running pipes. It describes the different parameters, and also, how to write a main python script can be used as any binary from the command line (including options parsing).

1.4.2 Common options

Some options are common to each running modes.

  • log level

    The logging system is handle by the python logging facility module. This module defines the following log levels :

    • DEBUG
    • INFO
    • WARNING
    • ERROR
    • CRITICAL

    All logging messages are saved in the different Pipelet log files, available from the web interface (rotating file logging). It is also possible to print those messages on the standard output (stream logging), by setting the desired log level in the launchers options: For example:

    import logging
    launch_process(P, N,log_level=logging.DEBUG)
    

    If set to 0, stream logging will be disable.

  • matplotlib

    The matplotlib documentation says:

    "Many users report initial problems trying to use maptlotlib in web application servers, because by default matplotlib ships configured to work with a graphical user interface which may require an X11 connection. Since many barebones application servers do not have X11 enabled, you may get errors if you don’t configure matplotlib for use in these environments. Most importantly, you need to decide what kinds of images you want to generate (PNG, PDF, SVG) and configure the appropriate default backend. For 99% of users, this will be the Agg backend, which uses the C++ antigrain rendering engine to make nice PNGs. The Agg backend is also configured to recognize requests to generate other output formats (PDF, PS, EPS, SVG). The easiest way to configure matplotlib to use Agg is to call:

    matplotlib.use('Agg')

    The matplotlib and matplotlib_interactive options turn the matplotlib backend to Agg in order to allow the execution in non-interactive environment. The two options affects independently the non interactive execution mode and the interactive mode.

    Those two options are set to True by default in the sample main script. Setting them to False deactivate this behavior for pipelines that make no use of matplotlib (and prevents the raise of an exception if matplotlib is not even available).

1.4.3 The interactive mode

This mode has been designed to ease debugging. If P is an instance of the pipeline object, the syntax reads :

from pipelet.launchers import launch_interactive
w, t = launch_interactive(P)
w.run()

In this mode, each tasks will be computed in a sequential way. Do not hesitate to invoque the Python debugger from IPython : %pdb

To use the interactive mode, run: main.py -d

1.4.4 The process mode

In this mode, one can run simultaneous tasks (if the pipe scheme allows to do so). The number of subprocess is set by the N parameter :

from pipelet.launchers import launch_process
launch_process(P, N)

To use the process mode, run: main.py or main.py -p 4

1.4.5 The batch mode

In this mode, one can submit some batch jobs to execute the tasks. The number of job is set by the N parameter :

from pipelet.launchers import launch_pbs
launch_pbs(P, N , address=(os.environ['HOST'],50000))

It is possible to specify some job submission options like:

  • job name
  • job header: this string is prepend to the PBS job scripts. You may want to add some environment specific paths. Log and error files are automatically handled by the pipelet engine, and made available from the web interface.
  • cpu time: syntax is: "hh:mm:ss"

The 'server' option can be disable to add some workers to an existing scheduler.

To use the batch mode, run: main.py -b

to start the server, and:

main.py -a 4

to add 4 workers.

1.5 Browsing Pipes

1.5.1 The pipelet webserver and ACL

The pipelet webserver allows the browsing of multiple pipelines. Each pipeline has to be register using :

pipeweb track <shortname> sqlfile

To remove a pipeline from the tracked list, use :

pipeweb untrack <shortname>

As the pipeline browsing implies a disk parsing, some basic security has to be set also. All users have to be register with a specific access level (1 for read-only access, and 2 for write access).

pipeutils -a <username> -l 2 sqlfile

To remove a user from the user list:

pipeutils -d <username> sqlfile

Start the web server using :

pipeweb start

Then the web application will be available on the web page http://localhost:8080

To stop the web server :

pipeweb stop

1.5.2 The web application

In order to ease the comparison of different processing, the web interface displays various views of the pipeline data :

  • The index page

    The index page displays a tree view of all pipeline instances. Each segment may be expand or reduce via the +/- buttons.

    The parameters used in each segments are resumed and displayed with the date of execution and the number of related tasks order by status.

    A check-box allows to performed operation on multiple segments :

    • deletion : to clean unwanted data
    • tag : to tag remarkable data

    The filter panel allows to display the segment instances with respect to 2 criterions :

    • tag
    • date of execution
  • The code page

    Each segment names is a link to its code page. From this page the user can view all python scripts code which have been applied to the data.

    The tree view is reduced to the current segment and its related parents.

    The root path corresponding to the data storage is also displayed.

  • The product page

    The number of related tasks, order by status, is a link to the product pages, where the data can be directly displayed (if images, or text files) or downloaded. From this page it is also possible to delete a specific product and its dependencies.

  • The log page

    The log page can be acceded via the log button of the filter panel. Logs are ordered by date.

1.6 The example pipelines

1.6.1 fft

  • Highlights

    This example illustrates a very simple image processing use. The problematic is the following : one wants to apply a Gaussian filter in Fourier domain on several 2D images.

    The pipe scheme is:

    pipedot = """
    mkgauss->convol;
    fftimg->convol;
    """
    

    where segment 'mkgauss' computes the Gaussian filter, 'fftimg' computes the Fourier transforms of the input images, and 'convol' performs the filtering in Fourier domain, and the inverse transform of the filtered images.

    P = pipeline.Pipeline(pipedot, code_dir=op.abspath('./'), prefix=op.abspath('./'))
    P.to_dot('pipeline.dot')
    

    The pipe scheme is output as a .dot file, that can be converted to an image file with the command line:

    dot -Tpng -o pipeline.png pipeline.dot

    To apply this filter to several images (in our case 4 input images), the pipe data parallelism is used. From the main script, a 4-element list is pushed to the fftimg segment.

    P.push(fftimg=[1,2,3,4]) 
    

    At execution, 4 instances of the fftimg segment will be created, and each of them outputs one element of this list :

    img = get_input() #(fftimg.py - line 15)
    set_output (img)  #(fftimg.py - line 38)
    

    On the other side, a single instance of the mkgauss segment will be executed, as there is one filter to apply.

    The last segment convol, which depends on the two others, will be executed with a number of instances that is the Cartesian product of its 4+1 inputs (ie 4 instances)

    The instance identifier which is set by the fftimg output, can be retrieve with the following instruction:

    img = get_input('fftimg')   #(convol.py - line 12)
    
  • Running the pipe

    Follow the same procedure than for the first example pipeline, to run this pipe and browse the result.

1.6.2 cmb

  • Running the pipe

    This CMB pipeline depends on two external python modules:

  • Problematic

    This example illustrates a very simple CMB data processing use.

    The problematic is the following : one wants to characterize the inverse noise weighting spectral estimator (as applied to the WMAP 1yr data). A first demo pipeline is built to check that the algorithm has correctly been implemented. Then, Monte Carlo simulations are used to compute error bars estimates.

  • A design pipeline

    The design pipe scheme is:

    pipe_dot = """ 
    cmb->clcmb->clplot;
    noise->clcmb;
    noise->clnoise->clplot;
    """
    

    where:

    • cmb: generate a CMB map from LCDM power spectrum.
    • noise: compute the mode coupling matrix from the input hit-count map
    • clnoise: compute the empirical noise power spectrum from a noise realization.
    • clcmb: generate two noise realizations, add them to the CMB map, to compute a first cross spectrum estimator. Then weighting mask and mode coupling matrix are applied to get the inverse noise weighting estimator
    • clplot: make a plot to compare pure cross spectrum vs inverse noise weighting estimators.

    As the two first orphan segments depends on a single shared parameter which is the map resolution nside, this argument is pushed from the main script.

    Another input argument of the cmb segment, is its simulation identifier, which will be used for latter Monte Carlo. In order to push two inputs to a single segment instance, we use python tuple data type.

    P.push(cmb=[(nside, 1)])
    P.push(noise=[nside])
    

    From the segment, those inputs are retrieved with :

    (nside,sim_id) = get_input() ##(cmb.py line 14)
    nside  = seg_input()         ##(noise.py line 15)
    

    The last segment produces a plot in which we compare:

    • the input LCDM power spectrum
    • the binned cross spectrum of the noisy CMB maps
    • the binned cross spectrum of which we applied hitcount weight and mode coupling matrix.
    • the noise power spectrum computed by clnoise segment.

    In this plot we check that both estimators are corrects, and that the noise level is the expected one.

  • From the design pipeline to Monte Carlo

    As a second step, Monte Carlo simulations are used to compute error bars.

    The clnoise segment is no longer needed, so that the new pipe scheme reads :

    pipe_dot = """ 
    cmb->clcmb->clplot;
    noise->clcmb;
    """
    

    We now use the native data parallelization scheme of the pipe to build many instances of the cmb and clcmb segments.

    cmbin = []
    for sim_id in [1,2,3,4,5,6]:
        cmbin.append((nside, sim_id))
    P.push(cmb=cmbin)
    

2 Advanced usage

This section describe more complicated (and useful) features and requires good familiarity with the concept introduced in the previous section.

2.1 Multiplex directive

The default behavior can be altered by specifying a #multiplex directive in the commentary of the segment code. If several multiplex directives are present in the segment code the last one is retained.

The multiplex directive can be one of:

  • #multiplex cross_prod : default behavior, return the Cartesian product.
  • #multiplex union : make the union of the inputs

Moreover the #multiplex cross_prod directive admits filtering and grouping by class similarly to SQL requests:

#multiplex cross_prod where "condition" group_by "class_function"

condition and class_function are python code evaluated for each element of the product set.

The argument of where is a condition. The element will be part of the input set only if it evaluates to True.

The group_by directive groups elements into class according to the result of the evaluation of the given class function. The input set contains all the resulting class. For example, if the function is a constant, the input set will contain only one element: the class containing all elements.

During the evaluation, the values of the tuple elements are accessible as variable wearing the name of the corresponding parents.

Given the Cartesian product set:

 [('Lancelot','the Brave'), ('Lancelot','the Pure'), ('Galahad','the Brave'), ('Galahad','the
Pure')]

one can use :

#multiplex cross_prod where "quality=='the Brave'" 

to get 2 instances of the following segment (melt) running on:

('Lancelot','the Brave'), ('Galahad','the Brave')
#multiplex cross_prod group_by "knights"

to get 2 instances of the melt segment running on:

('Lancelot'), ('Galahad')
#multiplex cross_prod group_by "0"

to get 1 instance of the melt segment running on: (0)

Note that to make use of group_by, elements of the output set have to be hashable.

Another caution on the use of group: segment input data type is no longer a dictionary in those cases as the original tuple is lost and replaced by the result of the class function.

See section The segment environment for more details.

2.2 Depend directive

As explained in the introduction section, Pipelet offers the possibility to spare CPU time by saving intermediate products on disk. We call intermediate products the input/output data files of the different segments.

Each segment repository is identified by a unique key which depends on:

  • the segment processing code and parameters (segment and hook scripts)
  • the input data (identified from the key of the parent segments)

Every change made on a segment (new parameter or new parent) will then give a different key, and tell the Pipelet engine to compute a new segment instance.

It is possible to add some external dependencies to the key computation using the depend directive:

#depend file1 file2

At the very beginning of the pipeline execution, all dependencies will be stored, to prevent any change (code edition) between the key computation and actual processing.

Note that this mechanism works only for segment and hook scripts. External dependencies are also read as the beginning of the pipeline execution, but only used for the key computation.

2.3 Database reconstruction

In case of unfortunate lost of the pipeline sql data base, it is possible to reconstruct it from the disk :

import pipelet
pipelet.utils.rebuild_db_from_disk (prefix, sqlfile)

All information will be retrieve, but with new identifiers.

2.4 The hooking system

As described in the 'segment environment' section, Pipelet supports an hooking system which allows the use of generic processing code, and code sectioning.

Let's consider a set of instructions that have to be systematically applied at the end of a segment (post processing), one can put those instruction in the separate script file named for example segname_postproc.py and calls the hook function:

hook('postproc', globals()) 

A specific dictionary can be passed to the hook script to avoid confusion.

The hook scripts are included into the hash key computation.

2.5 Segment script repository

2.5.1 Local repository

By default, segment scripts are read from a local directory, specified at the pipeline initialization with the parameter named code_dir:

from pipelet.pipeline import Pipeline
P = Pipeline(pipedot, code_dir="./", prefix="./")

The segment script contents are immediatly stored, to prevent from any modification between the pipeline start time and the actual execution of each segment.

It is generally a good idea to make this directory controlled by an RCS, to ease the reproducibility of the pipeline (even if the pipelet engine makes a copy of the segment script in the segment output directory).

If using Git, the revision number will be stored at the beginning of the copy of the segment script.

2.6 Writing custom environments

The Pipelet software provides a set of default utilities available from the segment environment. It is possible to extend this default environment or even re-write a completely new one.

2.6.1 Extending the default environment

The different environment utilities are actually methods of the class Environment. It is possible to add new functionalities by using the python heritage mechanism:

File : myenvironment.py

from pipelet.environment import *

class MyEnvironment(Environment):
      def my_function (self):
         """ My function do nothing
         """
         return 

The Pipelet engine objects (segments, tasks, pipeline) are available from the worker attribut self._worker. See section The Pipelet actors for more details about the Pipelet machinery.

2.6.2 Writing new environment

In order to start with a completely new environment, extend the base environment:

File : myenvironment.py

from pipelet.environment import *

class MyEnvironment(EnvironmentBase):
      def my_get_data_fn (self, x):
         """ New name for get_data_fn
         """
         return self._get_data_fn(x)

      def _close(self, glo):
         """ Post processing code
         """  
         return glo['seg_output']

From the base environment, the basic functionalities for getting file names and executing hook scripts are still available through:

  • self._get_data_fn
  • self._hook

The segment input argument is also stored in self._seg_input The segment output argument has to be returned by the _close(self, glo) method.

The pipelet engine objects (segments, tasks, pipeline) are available from the worker attribut self._worker. See doxygen documentation for more details about the Pipelet machinery.

2.6.3 Loading another environment

To load another environment, set the pipeline environment attribute accordingly.

Pipeline(pipedot, codedir=, prefix=, env=MyEnvironment)

2.7 Writing custom main files

2.8 Launching pipeweb behind apache

Pipeweb use the cherrypy web framework server and can be run behind an apache web server which brings essentially two advantages:

  • access to *mod apache facilities (https, gzip, authentication facilities …).
  • faster static files serving (the pipelet application actually use quite few of them so the actual gain is marginal, getting the actual data served by apache may be feasible but is not planned yet).

There is actually several way of doing so, the cherrypy documentation giving hints about each. We describe here an example case using modrewrite and virtual hosting.

  1. The first thing we need is a working installation of apache with mod_rewrite activated. On a debian-like distribution this usually obtain by: sudo a2enmod rewrite sudo a2enmod proxy sudo a2enmod proxy_http
  2. We then configure apache to rewrite request to the cherrypy application except for the static files of the application that will be served directly. Here is a sample configuration file for a dedicated virtual host named pipeweb with pipelet installed under /usr/local/lib/python2.6/dist-packages/ .
<VirtualHost pipeweb:80>
ServerAdmin pipeweb_admin@localhost
DocumentRoot /usr/local/lib/python2.6/dist-packages/pipelet

#    ErrorLog /some/custom/error_file.log
#    CustomLog /some/custom/access_file.log common

RewriteEngine on
RewriteRule ^/static/(.*) /usr/local/lib/python2.6/dist-packages/pipelet/static/$1 [L]
RewriteRule ^(.*) http://127.0.0.1:8080$1 [proxy]
</VirtualHost>
  1. Restart apache and start the pipeweb application to serve on the specified address and port: pipeweb start -H 127.0.0.1

There is also some possibility to start the application on demand using a cgi script like:

#!/usr/local/bin/python
print "Content-type: text/html\r\n"
print """<html><head><META HTTP-EQUIV="Refresh" CONTENT="1; URL=/"></head><body>Restarting site ...<a href="/">click here<a></body></html>"""
import os
os.system('pipeweb start -H 127.0.0.1')

To have it executed when the proxy detect the absence of the application:

<VirtualHost pipeweb:80>
    #...
    ScriptAliasMatch ^/pipeweb_autostart\.cgi$ /usr/local/bin/pipeweb_autostart.cgi
    RewriteEngine on
    RewriteCond  %{SCRIPT_FILENAME} !pipeweb_autostart\.cgi$
    RewriteRule ^/static/(.*) /usr/local/lib/python2.6/dist-packages/pipelet/static/$1 [L]
    RewriteCond  %{SCRIPT_FILENAME} !pipeweb_autostart\.cgi$
    RewriteRule ^(.*) http://127.0.0.1:8080$1 [proxy]
    ErrorDocument 503 /pipeweb_autostart.cgi
    #...
</VirtualHost>

You may want to adjust ownership and suid of the pipeweb_autostart.cgi script so that it executes with the correct rights.

Pipeweb handles access rights using per pipeline ACL registered in the database file. It support Basic and Digest http authentication. When deploying the pipeweb interface in a production environment, one may want to defer a part of the authorization process to external and potentially more secure systems. The pipeweb behavior in term of authorization is controlled by the -A option that accept the following arguments:

  • Digest (default) Authenticate users via HTTP Digest authentication according to the user:passwd list stored in the database.
  • Basic Authenticate users via HTTP Basic (clear text) authentication according to the user:passwd list stored in the database.
  • ACL Check the access rights of otherwise authenticated users according to the user list stored in the database.
  • None Do no check. (Defer the whole authentication/authorization process to the proxy.)

Here is a complete configuration sample making of https, basic authentication, and per pipeline ACL to secure data browsing.

<VirtualHost _default_:443>
    ServerAdmin pipeweb_admin@localhost
    DocumentRoot /usr/local/lib/python2.6/dist-packages/pipelet

    # ErrorLog /some/custom/error_file.log
    # CustomLog /some/custom/access_file.log common

    # Adjust the ssl configuration to fit your needs
    SSLEngine on
    SSLCertificateFile    /etc/ssl/certs/ssl-cert-snakeoil.pem
    SSLCertificateKeyFile /etc/ssl/private/ssl-cert-snakeoil.key

    # This handles authentication and access to the index page 
    # Access right checking to the various registered pipelines 
    # is left to pipeweb
    <Location />
        #Replace with Any suitable authentication system
        AuthName             "pipeweb"
        AuthType             Basic
        AuthUserFile         /etc/apache2/pipelet.pwd
        require              valid-user
    </Location>

    ScriptAliasMatch ^/pipeweb_autostart\.cgi$ /usr/local/bin/pipeweb_autostart.cgi
    RewriteEngine on
    RewriteCond  %{SCRIPT_FILENAME} !pipeweb_autostart\.cgi$
    RewriteRule ^/static/(.*) /usr/local/lib/python2.6/dist-packages/pipelet/static/$1 [L]
    RewriteCond  %{SCRIPT_FILENAME} !pipeweb_autostart\.cgi$
    RewriteRule ^(.*) http://127.0.0.1:8080$1 [proxy]
    ErrorDocument 503 /pipeweb_autostart.cgi
</VirtualHost>

And the corresponding cgi script:

#!/usr/local/bin/python
print "Content-type: text/html\r\n"
print """<html><head><META HTTP-EQUIV="Refresh" CONTENT="1; URL=/"></head><body>Restarting site ...<a href="/">click here<a></body></html>"""
import os
os.system('pipeweb start -H 127.0.0.1 -A ACL')

3 Contributing to pipelet

3.1 Improvements suggestion

As far as possible, we try to keep pipelet ReadMe driven. If you want to ask for new features or modifications to pipelet, a good way to do it is under the form of a patch to the README.org file. If you are using git:

  • Figure out how actually would like pipelet to behave
  • Edit the README.org in consequence. Org files are simple text files that can be conveniently edited using emacs org-mode.
  • Commit your proposition with a relevant commit message. git commit -a
  • Format the patch for email submission, and send it to us git send-email --to HEAD^

Author: Marc Betoule <marc@lpnlp103.in2p3.fr>

Date: 2011-04-07 16:35:04 CEST

HTML generated by org-mode 6.33x in emacs 23