Developer’s guide¶
Write a general task¶
A pipeline task is a subclass of TaskBase
intended to perform some small, modular piece analysis.
To write a general task, you can use the following template
general_task.py
:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 """A general task template.""" from tlpipe.pipeline.pipeline import TaskBase, PipelineStopIteration class GeneralTask(TaskBase): """A general task template.""" # input parameters and their default values as a dictionary params_init = { 'task_param': 'param_val', } # prefix of this task prefix = 'gt_' def __init__(self, parameter_file_or_dict=None, feedback=2): # Read in the parameters. super(self.__class__, self).__init__(parameter_file_or_dict, feedback) # Do some initialization here if necessary print 'Initialize the task.' def setup(self): # Set up works here if necessary print "Setting up the task." def next(self): # Doing the actual work here print 'Executing the task with paramter task_param = %s' % self.params['task_param'] # stop the task raise PipelineStopIteration() def finish(self): # Finishing works here if necessary print "Finished the task."
The developer of the task must specify what input parameters the task expects if it has and a prefix, as well as code to perform the actual processing for the task.
Input parameters are specified by adding class attributes params_init which is a dictionary whose entries are key and default value pairs. A prefix is used to identify and read the corresponding parameters from the input pipe file for this task.
To perform the actual processing for the task, you could first do the
necessary initialization in __init__()
, then implement three
methods setup()
, next()
and finish()
. Usually the only
necessary method to be implemented is next()
, in which the actual
processing works are done, the other methods __init__()
, setup()
,
finish()
do not need if there is no specifical initialization,
setting up, and finishing work to do. These methods are executed in order,
with next()
possibly being executed many times. Iteration of
next()
is halted by raising a PipelineStopIteration
.
To make it work, you could put it somewhere like in tlpipe/tlpipe/timestream/,
and write a input pipe file like
general_task.pipe
:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 # -*- mode: python; -*- # input file for pipeline manager # execute this pipeline by either command of the following two: # tlpipe dir/to/general_task.pipe # mpiexec -n N tlpipe dir/to/general_task.pipe pipe_tasks = [] pipe_outdir = './output/' from tlpipe.timestream import general_task pipe_tasks.append(general_task.GeneralTask) ### parameters for GeneralTask gt_task_param = 'new_val'
then execute the task by run
$ tlpipe general_task.pipe
Write a task to process timestream data¶
To write a task to process the timestream data (i.e., the visibility and
auxiliary data), you can use the following template
ts_template.py
:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 """Timestream task template.""" import timestream_task class TsTemplate(timestream_task.TimestreamTask): """Timestream task template.""" params_init = { 'task_param': 'param_val', } prefix = 'tt_' def process(self, ts): print 'Executing the task with paramter task_param = %s' % self.params['task_param'] print print 'Timestream data is contained in %s' % ts return super(TsTemplate, self).process(ts)
Here, instead of inherit from TaskBase
,
we inherit from its subclass
TimestreamTask
, and implement
the method process()
(and maybe also __init__()
, setup()
,
and finish()
if necessary). The timestream data is contained in the
argument ts, which may be an instance of
RawTimestream
or
Timestream
.
Note
You do not need to override the method next()
now, because in the
class OneAndOne
, which is the super
class of TimestreamTask
, we
have
class OneAndOne(TaskBase):
def next(self, input=None):
# ...
output = self.read_process_write(input)
# ...
return output
def read_process_write(self, input):
# ...
output = self.process(input)
# ...
return output
Use data operate functions in timestream tasks¶
To write a task to process the timestream data, you (in most cases) only
need to implement process()
with the input timestream data contained
in its argument ts, as stated above. To help with the data processing, you
could use some of the data operate functions defined in the corresponding
timestream data container class, which can automatically split the data along
one axis or some axes among multiple process and iteratively process all these
data slices. For example, to write to task to process the raw timestream data
along the axis of baseline, i.e., to process a time-frequency slice of the
raw data each time, you can have the task like
ts_task.py
:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 """A task to process the raw timestream data along the axis of baseline.""" import timestream_task class TsTask(timestream_task.TimestreamTask): """A task to process the raw timestream data along the axis of baseline.""" params_init = { } prefix = 'tt_' def process(self, ts): # distribute data along the axis of baseline ts.redistribute('baseline') # use data operate function of `ts` ts.bl_data_operate(self.func) return super(TsTask, self).process(ts) def func(self, vis, vis_mask, li, gi, bl, ts, **kwargs): """Function that does the actual task.""" # `vis` is the time-frequency slice of the visibility print vis.shape # `vis_mask` is the time-frequency slice of the visibility mask print vis_mask.shape # `li`, `gi` is the local and global index of this slice # `bl` is the corresponding baseline print li, gi, bl
To execute the task, put it somewhere like in tlpipe/tlpipe/timestream/,
and write a input pipe file like
ts_task.pipe
:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 # -*- mode: python; -*- # input file for pipeline manager # execute this pipeline by either command of the following two: # tlpipe dir/to/ts_task.pipe # mpiexec -n N tlpipe dir/to/ts_task.pipe pipe_tasks = [] pipe_outdir = './output/' pipe_logging = 'notset' # pipe_logging = 'info' pipe_timing = True pipe_flush = True import glob data_dir = 'dir/to/data' # your data directory files = sorted(glob.glob(data_dir+'/*.hdf5')) # all data files as a list # data selection from tlpipe.timestream import dispatch pipe_tasks.append(dispatch.Dispatch) ### parameters for Dispatch dp_input_files = files # data files as list dp_freq_select = (500, 510) # frequency indices, from 500 to 510 dp_feed_select = [1, 2, 32, 33] # feed no. as a list dp_out = 'dp' from tlpipe.timestream import ts_task pipe_tasks.append(ts_task.TsTask) ### parameters for TsTask tt_in = dp_out tt_out = 'tt'
then execute the task by run
$ tlpipe ts_task.pipe
These are some data operate functions that you can use:
Data operate functions of
RawTimestream
and
Timestream
:
- class
tlpipe.container.timestream_common.
TimestreamCommon
data_operate
(func, op_axis=None, axis_vals=0, full_data=False, copy_data=False, keep_dist_axis=False, **kwargs)
all_data_operate
(func, copy_data=False, **kwargs)
time_data_operate
(func, full_data=False, copy_data=False, keep_dist_axis=False, **kwargs)
freq_data_operate
(func, full_data=False, copy_data=False, keep_dist_axis=False, **kwargs)
bl_data_operate
(func, full_data=False, copy_data=False, keep_dist_axis=False, **kwargs)
time_and_freq_data_operate
(func, full_data=False, copy_data=False, keep_dist_axis=False, **kwargs)
time_and_bl_data_operate
(func, full_data=False, copy_data=False, keep_dist_axis=False, **kwargs)
freq_and_bl_data_operate
(func, full_data=False, copy_data=False, keep_dist_axis=False, **kwargs)
Additional data operate functions of Timestream
:
- class
tlpipe.container.timestream.
Timestream
pol_data_operate
(func, full_data=False, copy_data=False, keep_dist_axis=False, **kwargs)
time_and_pol_data_operate
(func, full_data=False, copy_data=False, keep_dist_axis=False, **kwargs)
freq_and_pol_data_operate
(func, full_data=False, copy_data=False, keep_dist_axis=False, **kwargs)
pol_and_bl_data_operate
(func, full_data=False, copy_data=False, keep_dist_axis=False, **kwargs)
time_freq_and_pol_data_operate
(func, full_data=False, copy_data=False, keep_dist_axis=False, **kwargs)
time_freq_and_bl_data_operate
(func, full_data=False, copy_data=False, keep_dist_axis=False, **kwargs)
time_pol_and_bl_data_operate
(func, full_data=False, copy_data=False, keep_dist_axis=False, **kwargs)
freq_pol_and_bl_data_operate
(func, full_data=False, copy_data=False, keep_dist_axis=False, **kwargs)