Tutorial

Note

This is intended to be a tutorial for the user of tlpipe package, who will just use the already presented tasks in the package to do some data analysis. For the developers of this package and those who want to do some developments/continuations, you may want to refer Developer’s guide for a deeper introduction.

Prepare for the input pipe file

An input pipe file is actually a python script file, so it follows plain python syntax, but to emphasis that it is just used as an input pipe file for a data analysis pipeline, usually it is named with a suffix “.pipe” instead of “.py”.

The only required argument to run a data analysis pipeline is the input pipe file, in which one specifies all tasks to be imported and excuted, all parameter settings for each task and also the excuting order (or flow controlling) of the pipeline.

Here we take the waterfall plot as an example to show how to write an input pipe file.

Non-iterative pipeline

  1. Create and open an file named plot_wf.pipe (the name can be choosen arbitrary);

  2. Speicify a variable pipe_tasks to hold analysis tasks that will be imported and excuted, and (optionally) a variable pipe_outdir to set the output directory (the default value is ‘./output/’). You can set other parameters related to the pipeline according to your need or just use the default values. All paramters and their default values can be checked by method show_params(), note: all these parameters should be prepended with a prefix “pipe_”;

     1
     2
     3
     4
     5
     6
     7
     8
     9
    10
    11
    12
    # -*- mode: python; -*-
    
    # input file for the analysis pipeline
    # execute this pipeline by either command of the following two:
    # tlpipe dir/to/plot_wf.pipe
    # mpiexec -n N tlpipe dir/to/plot_wf.pipe
    
    
    pipe_tasks = []
    pipe_outdir = './output/'
    pipe_logging = 'notset'
    # pipe_logging = 'info'
    
  3. Import tasks and set task parameters:

    1. Import Dispatch to select data to plot;

       1
       2
       3
       4
       5
       6
       7
       8
       9
      10
      11
      12
      13
      import glob
      data_dir = 'dir/to/data' # your data directory
      files = sorted(glob.glob(data_dir+'/*.hdf5')) # all data files as a list
      
      
      # data selection
      from tlpipe.timestream import dispatch
      pipe_tasks.append(dispatch.Dispatch)
      ### parameters for Dispatch
      dp_input_files = files # data files as list
      dp_freq_select = (500, 510) # frequency indices, from 500 to 510
      dp_feed_select = [1, 2, 32, 33] # feed no. as a list
      dp_out = 'dp'
      
    2. Import Detect to find and mask noise source signal;

      1
      2
      3
      4
      5
      6
      7
      # find and mask noise source signal
      from tlpipe.timestream import detect_ns
      pipe_tasks.append(detect_ns.Detect)
      ### parameters for Detect
      dt_in = dp_out
      # dt_feed = 1
      dt_out = 'dt'
      
    3. Import Plot to plot;

      1
      2
      3
      4
      5
      6
      7
      from tlpipe.plot import plot_waterfall
      pipe_tasks.append(plot_waterfall.Plot)
      ### parameters for Plot
      pwf_in = dt_out
      pwf_flag_ns = True # mask noise source signal
      pwf_fig_name = 'waterfall/wf' # figure name to save
      pwf_out = 'pwf'
      

The final input pipe file looks like download:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
# -*- mode: python; -*-

# input file for the analysis pipeline
# execute this pipeline by either command of the following two:
# tlpipe dir/to/plot_wf.pipe
# mpiexec -n N tlpipe dir/to/plot_wf.pipe


pipe_tasks = []
pipe_outdir = './output/'
pipe_logging = 'notset'
# pipe_logging = 'info'


import glob
data_dir = 'dir/to/data' # your data directory
files = sorted(glob.glob(data_dir+'/*.hdf5')) # all data files as a list


# data selection
from tlpipe.timestream import dispatch
pipe_tasks.append(dispatch.Dispatch)
### parameters for Dispatch
dp_input_files = files # data files as list
dp_freq_select = (500, 510) # frequency indices, from 500 to 510
dp_feed_select = [1, 2, 32, 33] # feed no. as a list
dp_out = 'dp'

# find and mask noise source signal
from tlpipe.timestream import detect_ns
pipe_tasks.append(detect_ns.Detect)
### parameters for Detect
dt_in = dp_out
# dt_feed = 1
dt_out = 'dt'

# plot waterfall of selected data
from tlpipe.plot import plot_waterfall
pipe_tasks.append(plot_waterfall.Plot)
### parameters for Plot
pwf_in = dt_out
pwf_flag_ns = True # mask noise source signal
pwf_fig_name = 'waterfall/wf' # figure name to save
pwf_out = 'pwf'

Note

  1. To show all pipeline related parameters and their default values, you can do:

    >>> from tlpipe.pipeline import pipeline
    >>> pipeline.Manager.prefix
    'pipe_'
    >>> pipeline.Manager.show_params()
    Parameters of Manager:
    copy:  True
    tasks:  []
    logging:  info
    flush:  False
    timing:  False
    overwrite:  False
    outdir:  output/
    
  2. Each imported task should be appended into the list pipe_tasks in order to be excuted by the pipeline;

  3. Each task’s paramters should be prepended with its own prefix. See the source file of each task to get the prefix and all paramters that can be set. You can also get the prefix and paramters (and their default values) by the following method (take Dispatch for example):

    >>> from tlpipe.timestream import dispatch
    >>> dispatch.Dispatch.prefix
    'dp_'
    >>> dispatch.Dispatch.show_params()
    Parameters of task Dispatch:
    out:  None
    requires:  None
    in:  None
    iter_start:  0
    iter_step:  1
    input_files:  None
    iter_num:  None
    copy:  False
    iterable:  False
    output_files:  None
    time_select:  (0, None)
    stop:  None
    libver:  latest
    corr:  all
    exclude:  []
    check_status:  True
    dist_axis:  0
    freq_select:  (0, None)
    feed_select:  (0, None)
    tag_output_iter:  True
    tag_input_iter:  True
    start:  0
    mode:  r
    pol_select:  (0, None)
    extra_inttime:  150
    days:  1.0
    drop_days:  0.0
    exclude_bad:  True
    
  4. Usally the input of one task should be ether read from the data files, for example:

    1
    dp_input_files = files # data files as list
    

    or is the output of a previously excuted task (to construct a task chain), for example:

    1
    dt_in = dp_out
    
    1
    pwf_in = dt_out
    

Iterative pipeline

To make the pipeline iteratively run for several days data, or more than one group (treat a list of files as a separate group) of data, you should set the parameter iterable of each task you want to iterate to True, and optionally specify an iteration number. If no iteration number is specified, the pipeline will iteratively run until all input data has been processed. Take again the above waterfall plot as an example, suppose you want to iteratively plot the waterfall of 2 days data, or two separate groups of data, the input pipe file plot_wf_iter.pipe download is like:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
# -*- mode: python; -*-

# input file for the analysis pipeline
# execute this pipeline by either command of the following two:
# tlpipe dir/to/plot_wf_iter.pipe
# mpiexec -n N tlpipe dir/to/plot_wf_iter.pipe


pipe_tasks = []
pipe_outdir = './output/'
pipe_logging = 'notset'
# pipe_logging = 'info'


import glob
data_dir1 = 'dir1/to/data' # your data directory
data_dir2 = 'dir2/to/data' # your data directory

###  one way
files = sorted(glob.glob(data_dir1+'/*.hdf5')) # more than 1 day's data files as a list

### or another way
group1 = sorted(glob.glob(data_dir1+'/*.hdf5'))
group2 = sorted(glob.glob(data_dir2+'/*.hdf5'))
files = [ group1, group2 ] # or two groups of data, each as a list of data files


# data selection
from tlpipe.timestream import dispatch
pipe_tasks.append(dispatch.Dispatch)
### parameters for Dispatch
dp_input_files = files # data files as list
dp_freq_select = (500, 510) # frequency indices, from 500 to 510
dp_feed_select = [1, 2, 32, 33] # feed no. as a list
dp_iterable = True
dp_iter_num = 2 # set the number of iterations
dp_tag_input_iter = False
dp_out = 'dp'

# find and mask noise source signal
from tlpipe.timestream import detect_ns
pipe_tasks.append(detect_ns.Detect)
### parameters for Detect
dt_in = dp_out
# dt_feed = 1
dt_iterable = True
dt_out = 'dt'

# plot waterfall of selected data
from tlpipe.plot import plot_waterfall
pipe_tasks.append(plot_waterfall.Plot)
### parameters for Plot
pwf_in = dt_out
pwf_iterable = True
pwf_flag_ns = True # mask noise source signal
pwf_fig_name = 'waterfall/wf' # figure name to save
pwf_out = 'pwf'

Note

The number of iterations can be set only once in the first task, as after the first task has been executed the specified number of iterations, it will no longer produce its output for the subsequent tasks, those task will stop to iterate when there is no input for it.

Non-trivial control flow

You can run several tasks iteratively, and then run some other tasks non-iteratively when the iterative tasks all have done.

For example, if you want the waterfall plot of two days averaged data, you can iteratively run several tasks, each iteration for one day data, and then combine (accumulate and average) the two days data and plot its waterfall, just as follows shown in plot_wf_nontrivial.pipe download:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
# -*- mode: python; -*-

# input file for the analysis pipeline
# execute this pipeline by either command of the following two:
# tlpipe dir/to/plot_wf_nontrivial.pipe
# mpiexec -n N tlpipe dir/to/plot_wf_nontrivial.pipe


pipe_tasks = []
pipe_outdir = './output/'
pipe_logging = 'notset'
# pipe_logging = 'info'


import glob
data_dir = 'dir/to/data' # your data directory
files = sorted(glob.glob(data_dir+'/*.hdf5')) # at least 2 days data files as a list


# data selection
from tlpipe.timestream import dispatch
pipe_tasks.append(dispatch.Dispatch)
### parameters for Dispatch
dp_input_files = files # data files as list
dp_freq_select = (500, 510) # frequency indices, from 500 to 510
dp_feed_select = [1, 2, 32, 33] # feed no. as a list
dp_iterable = True
dp_iter_num = 2 # set the number of iterations
dp_tag_input_iter = False
dp_out = 'dp'

# find and mask noise source signal
from tlpipe.timestream import detect_ns
pipe_tasks.append(detect_ns.Detect)
### parameters for Detect
dt_in = dp_out
# dt_feed = 1
dt_iterable = True
dt_out = 'dt'

# plot waterfall of selected data
from tlpipe.plot import plot_waterfall
pipe_tasks.append(plot_waterfall.Plot)
### parameters for Plot
pwf_in = dt_out
pwf_iterable = True
pwf_flag_ns = True # mask noise source signal
pwf_fig_name = 'waterfall/wf' # figure name to save
pwf_out = 'pwf'

# convert raw timestream to timestream
from tlpipe.timestream import rt2ts
pipe_tasks.append(rt2ts.Rt2ts)
### parameters for Rt2ts
r2t_in = dt_out # can also be pwf_out as it is the same
r2t_iterable = True
r2t_out = 'r2t'

# re-order the data to have RA from 0 to 2pi
from tlpipe.timestream import re_order
pipe_tasks.append(re_order.ReOrder)
### parameters for ReOrder
ro_in = r2t_out
ro_iterable = True
ro_out = 'ro'

# accumulate the re-ordered data from different days
from tlpipe.timestream import accumulate
pipe_tasks.append(accumulate.Accum)
### parameters for Accum
ac_in = ro_out
ac_iterable = True
ac_out = 'ac'

# barrier above iterative tasks before executing the following tasks.
from tlpipe.timestream import barrier
pipe_tasks.append(barrier.Barrier)
### parameters for Barrier

# average the accumulated data
from tlpipe.timestream import average
pipe_tasks.append(average.Average)
### parameters for Average
av_in = ac_out
av_output_files = [ 'average/file_%d.hdf5' %i for i in range(1, 7) ] # here save intermediate results
av_out = 'av'

# waterfall plot of the averaged data
from tlpipe.plot import plot_waterfall
pipe_tasks.append((plot_waterfall.Plot, 'pwf1_')) # here use a new prefix pwf1_ instead of the default pwf_ to discriminate from the previous plot_waterfall
### parameters for Plot
pwf1_in = av_out
pwf1_input_files = av_output_files # here you can read data from the saved intermediate data files if you do not set pwf1_in
pwf1_flag_ns = True
pwf1_fig_name = 'vis_av/vis'
pwf1_out = 'pwf1'

Note

Notice the use of the task Barrier to block the control flow before the executing of its subsequent tasks. As the task Barrier won’t get its input from any other tasks, the pipeline will restart at the begining every time when it gets to execute Barrier. Once everything before Barrier has been executed, it will unblocks its subsequent tasks and allow them to proceed normally.

Note

Note in real data analysis, the data should be RFI flagged, calibrated, and maybe some other processes done before the data accumulating and averaging, here for simplicity and easy understanding, we have omitted all those processes. One can refer to the real data analysis pipeline input files in the package’s input directory.

Execute several times a same task

Special care need to be taken when executing several times a same task. Since the input pipe file is just a plain python script, it will be first executed before the parameters parsing process, the assignment of a variable will override the same named variable before it during the excuting of the pipe file script. So for the need of executing several times a same task, different prefixes should be set for each of these tasks (i.e., except for the first appeared which could have just use the default prefix of the task, all others need to set a different prefix). To do this, you need to append a 2-tuple to the list pipe_tasks, with its first element being the imported task, and the second element being a new prefix to use. See for example the line

1
pipe_tasks.append((plot_waterfall.Plot, 'pwf1_')) # here use a new prefix pwf1_ instead of the default pwf_ to discriminate from the previous plot_waterfall

in plot_wf_nontrivial.pipe in the above example.

Save intermediate data

To save data that has been processed by one task (used for maybe break point recovery, etc.), you can just set the output_files paramter of this task to be a list of file names (can only save as hdf5 data files), then data will be split into almost equal chunks along the time axis and save each chunk to one of the data file. For example, see the line

1
av_output_files = [ 'average/file_%d.hdf5' %i for i in range(1, 7) ] # here save intermediate results

in plot_wf_nontrivial.pipe in the above example.

Recovery from intermediate data

You can recovery the pipeline from a break point (where you have saved the intermediate data) by reading data from data files you have saved. To do this, instead of set the in parameter, you need to set the input_files paramter to a list with elements being the saved data files. For example, see the line

1
pwf1_input_files = av_output_files # here you can read data from the saved intermediate data files if you do not set pwf1_in

in plot_wf_nontrivial.pipe in the above example.

Note

If the in paramter and the input_files parameter are both set, the task will get its input from the in paramter instead of reading data from the input_files as it is much slower to read the data from the files. So in order to recovery from the break point, you should not set the in parameter, or should set in to be None, which is the default value.

Run the pipeline

Single process run

If you do not have an MPI environment installed, or you just want a single process run, just do (in case plot_wf.pipe is in you working directory)

$ tlpipe plot_wf.pipe

or (in case plot_wf.pipe isn’t in you working directory)

$ tlpipe dir/to/plot_wf.pipe

If you want to submit and run the pipeline in the background, do like

$ nohup tlpipe dir/to/plot_wf.pipe &> output.txt &

Multiple processes run

To run the pipeline in parallel and distributed maner on a cluster using multiple processes, you can do something like (in case plot_wf.pipe is in you working directory)

$ mpiexec -n N tlpipe plot_wf.pipe

or (in case plot_wf.pipe isn’t in you working directory)

$ mpiexec -n N tlpipe dir/to/plot_wf.pipe

If you want to submit and run the pipeline in the background on several nodes, for example, node2, node3, node4, do like

$ nohup mpiexec -n N -host node2,node3,node4 --map-by node tlpipe dir/to/plot_wf.pipe &> output.txt &

Note

In the above commands, N is the number of processes you want to run!

Pipeline products and intermediate results

Pipeline products and intermediate results will be in the directory setting by pipe_outdir.

Other excutable commands

  • h5info: Check what’s in a (or a list of) HDF5 data file(s). For its use, do some thing like

    $ h5info data.hdf5
    

    or

    $ h5info data1.hdf5, data2.hdf5, data3.hdf5