Developer’s guide

Write a general task

A pipeline task is a subclass of TaskBase intended to perform some small, modular piece analysis.

To write a general task, you can use the following template general_task.py:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
"""A general task template."""

from tlpipe.pipeline.pipeline import TaskBase, PipelineStopIteration


class GeneralTask(TaskBase):
    """A general task template."""

    # input parameters and their default values as a dictionary
    params_init = {
                    'task_param': 'param_val',
                  }

    # prefix of this task
    prefix = 'gt_'

    def __init__(self, parameter_file_or_dict=None, feedback=2):

        # Read in the parameters.
        super(self.__class__, self).__init__(parameter_file_or_dict, feedback)

        # Do some initialization here if necessary
        print 'Initialize the task.'

    def setup(self):
        # Set up works here if necessary
        print "Setting up the task."

    def next(self):
        # Doing the actual work here
        print 'Executing the task with paramter task_param = %s' % self.params['task_param']
        # stop the task
        raise PipelineStopIteration()

    def finish(self):
        # Finishing works here if necessary
        print "Finished the task."

The developer of the task must specify what input parameters the task expects if it has and a prefix, as well as code to perform the actual processing for the task.

Input parameters are specified by adding class attributes params_init which is a dictionary whose entries are key and default value pairs. A prefix is used to identify and read the corresponding parameters from the input pipe file for this task.

To perform the actual processing for the task, you could first do the necessary initialization in __init__(), then implement three methods setup(), next() and finish(). Usually the only necessary method to be implemented is next(), in which the actual processing works are done, the other methods __init__(), setup(), finish() do not need if there is no specifical initialization, setting up, and finishing work to do. These methods are executed in order, with next() possibly being executed many times. Iteration of next() is halted by raising a PipelineStopIteration.

To make it work, you could put it somewhere like in tlpipe/tlpipe/timestream/, and write a input pipe file like general_task.pipe:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
# -*- mode: python; -*-

# input file for pipeline manager
# execute this pipeline by either command of the following two:
# tlpipe dir/to/general_task.pipe
# mpiexec -n N tlpipe dir/to/general_task.pipe


pipe_tasks = []
pipe_outdir = './output/'


from tlpipe.timestream import general_task
pipe_tasks.append(general_task.GeneralTask)
### parameters for GeneralTask
gt_task_param = 'new_val'

then execute the task by run

$ tlpipe general_task.pipe

Write a task to process timestream data

To write a task to process the timestream data (i.e., the visibility and auxiliary data), you can use the following template ts_template.py:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
"""Timestream task template."""

import timestream_task


class TsTemplate(timestream_task.TimestreamTask):
    """Timestream task template."""

    params_init = {
                    'task_param': 'param_val',
                  }

    prefix = 'tt_'

    def process(self, ts):

        print 'Executing the task with paramter task_param = %s' % self.params['task_param']
        print
        print 'Timestream data is contained in %s' % ts

        return super(TsTemplate, self).process(ts)

Here, instead of inherit from TaskBase, we inherit from its subclass TimestreamTask, and implement the method process() (and maybe also __init__(), setup(), and finish() if necessary). The timestream data is contained in the argument ts, which may be an instance of RawTimestream or Timestream.

Note

You do not need to override the method next() now, because in the class OneAndOne, which is the super class of TimestreamTask, we have

class OneAndOne(TaskBase):

    def next(self, input=None):
        # ...
        output = self.read_process_write(input)
        # ...
        return output

    def read_process_write(self, input):
        # ...
        output = self.process(input)
        # ...
        return output

Use data operate functions in timestream tasks

To write a task to process the timestream data, you (in most cases) only need to implement process() with the input timestream data contained in its argument ts, as stated above. To help with the data processing, you could use some of the data operate functions defined in the corresponding timestream data container class, which can automatically split the data along one axis or some axes among multiple process and iteratively process all these data slices. For example, to write to task to process the raw timestream data along the axis of baseline, i.e., to process a time-frequency slice of the raw data each time, you can have the task like ts_task.py:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
"""A task to process the raw timestream data along the axis of baseline."""

import timestream_task


class TsTask(timestream_task.TimestreamTask):
    """A task to process the raw timestream data along the axis of baseline."""

    params_init = {
                  }

    prefix = 'tt_'

    def process(self, ts):

        # distribute data along the axis of baseline
        ts.redistribute('baseline')

        # use data operate function of `ts`
        ts.bl_data_operate(self.func)

        return super(TsTask, self).process(ts)

    def func(self, vis, vis_mask, li, gi, bl, ts, **kwargs):
        """Function that does the actual task."""

        # `vis` is the time-frequency slice of the visibility
        print vis.shape
        # `vis_mask` is the time-frequency slice of the visibility mask
        print vis_mask.shape
        # `li`, `gi` is the local and global index of this slice
        # `bl` is the corresponding baseline
        print li, gi, bl

To execute the task, put it somewhere like in tlpipe/tlpipe/timestream/, and write a input pipe file like ts_task.pipe:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
# -*- mode: python; -*-

# input file for pipeline manager
# execute this pipeline by either command of the following two:
# tlpipe dir/to/ts_task.pipe
# mpiexec -n N tlpipe dir/to/ts_task.pipe


pipe_tasks = []
pipe_outdir = './output/'
pipe_logging = 'notset'
# pipe_logging = 'info'
pipe_timing = True
pipe_flush = True


import glob
data_dir = 'dir/to/data' # your data directory
files = sorted(glob.glob(data_dir+'/*.hdf5')) # all data files as a list


# data selection
from tlpipe.timestream import dispatch
pipe_tasks.append(dispatch.Dispatch)
### parameters for Dispatch
dp_input_files = files # data files as list
dp_freq_select = (500, 510) # frequency indices, from 500 to 510
dp_feed_select = [1, 2, 32, 33] # feed no. as a list
dp_out = 'dp'

from tlpipe.timestream import ts_task
pipe_tasks.append(ts_task.TsTask)
### parameters for TsTask
tt_in = dp_out
tt_out = 'tt'

then execute the task by run

$ tlpipe ts_task.pipe

These are some data operate functions that you can use:

Data operate functions of RawTimestream and Timestream:

class tlpipe.container.timestream_common.TimestreamCommon
data_operate(func, op_axis=None, axis_vals=0, full_data=False, copy_data=False, keep_dist_axis=False, **kwargs)
all_data_operate(func, copy_data=False, **kwargs)
time_data_operate(func, full_data=False, copy_data=False, keep_dist_axis=False, **kwargs)
freq_data_operate(func, full_data=False, copy_data=False, keep_dist_axis=False, **kwargs)
bl_data_operate(func, full_data=False, copy_data=False, keep_dist_axis=False, **kwargs)
time_and_freq_data_operate(func, full_data=False, copy_data=False, keep_dist_axis=False, **kwargs)
time_and_bl_data_operate(func, full_data=False, copy_data=False, keep_dist_axis=False, **kwargs)
freq_and_bl_data_operate(func, full_data=False, copy_data=False, keep_dist_axis=False, **kwargs)

Additional data operate functions of Timestream:

class tlpipe.container.timestream.Timestream
pol_data_operate(func, full_data=False, copy_data=False, keep_dist_axis=False, **kwargs)
time_and_pol_data_operate(func, full_data=False, copy_data=False, keep_dist_axis=False, **kwargs)
freq_and_pol_data_operate(func, full_data=False, copy_data=False, keep_dist_axis=False, **kwargs)
pol_and_bl_data_operate(func, full_data=False, copy_data=False, keep_dist_axis=False, **kwargs)
time_freq_and_pol_data_operate(func, full_data=False, copy_data=False, keep_dist_axis=False, **kwargs)
time_freq_and_bl_data_operate(func, full_data=False, copy_data=False, keep_dist_axis=False, **kwargs)
time_pol_and_bl_data_operate(func, full_data=False, copy_data=False, keep_dist_axis=False, **kwargs)
freq_pol_and_bl_data_operate(func, full_data=False, copy_data=False, keep_dist_axis=False, **kwargs)