Skip to content

Writing a Dac-Man plug-in from scratch

This example illustrates how to create a Dac-Man plug-in from scratch to analyze changes in files of arbitrary type and structure.

Example

A ready-to-use example script can be found under examples/scripts/matrix_change_ana.py.

Overview

In this example, we will develop a plug-in to detect and quantify changes in datasets composed of specialized file types. In this case, the files contain one 2D matrix with numeric (float) values, saved as text.

Our goal is to compare two datasets, stored as directories dataset-v1 and dataset-v2, to obtain the following information:

  • The number of files added, deleted, and modified between the two datasets
  • For modified files, calculate a custom change metric quantifying. Furthermore, if possible, the amount of changes between the two matrices stored in the two files will also be calculated.

To achieve our goals, we can rely on Dac-Man's API to provide the functionality needed to perform directory-level comparison between the datasets. The remaining functionality requires implementation that can be categorized as follows:

  • A simple custom analysis script using Dac-Man's API that will contain driver methods and the following 2 classes:
    • A specialized plug-in class: To be used by Dac-Man to compare files that have been detected as modified
    • A specialized Adaptor class: This converts data from the specific file format to a Dac-Man Record

Creating a custom analysis script

The first step of creating a custom analysis script involves the creation of a new Python file, e.g. /home/user/matrix_change_ana.py.

In this file, the first line should contain the #!... line, which is needed to make the file an executable. Next, we add the import statements for the modules that we will use in our analysis script:

#!/usr/bin/env python3
import sys

import dacman
from dacman.compare import base
from dacman.compare.adaptor import DacmanRecord

import numpy
import numpy.linalg

Dac-Man change analysis script needs to accept two arguments, with the arguments being the paths to the files or directories for comparison. These arguments are specified via the command-line, and passed on to the run_matrix_change_ana() method:

#!/usr/bin/env python3
import sys

import dacman
from dacman.compare import base
from dacman.compare.adaptor import DacmanRecord

import numpy
import numpy.linalg


def run_matrix_change_ana(*args):
    ...


if __name__ == '__main__':
    cli_args = sys.argv[1:]
    path_a, path_b = cli_args[0], cli_args[1]
    run_matrix_change_ana(path_a, path_b)

The same arguments are also consumed by the run_matrix_change_ana() method, which uses Dac-Man's API to perform the comparison between the input sources:

#!/usr/bin/env python3
import sys

import dacman
from dacman.compare import base
from dacman.compare.adaptor import DacmanRecord

import numpy
import numpy.linalg


... # custom plug-in code goes here


def run_matrix_change_ana(path_a, path_b):
    comparisons = [(path_a, path_b)]
    differ = dacman.DataDiffer(comparisons, dacman.Executor.DEFAULT)
    differ.use_plugin(...)
    differ.start()


if __name__ == '__main__':
    cli_args = sys.argv[1:]
    path_a, path_b = cli_args[0], cli_args[1]
    run_matrix_change_ana(path_a, path_b)

The use_plugin() method of the DataDiffer object allows the user to specify a specialized plug-in to perform the comparison.

In the next section, we will build a custom Dac-Man plug-in for performing our custom change analysis.

Creating a specialized plug-in

At the core of a Dac-Man plug-in is the Comparator class. This class manages all the steps needed for comparing a single pair of files (one from each dataset).

In our file matrix_change_ana.py, we add the code for our custom plug-in MatrixTxtPlugin above the run_matrix_change_analysis() method.

Dac-Man plug-ins should inherit from the dacman.compare.base.Comparator abstract base class, and implement the description(), supports(), and compare() methods (described in this section), and also the percent_change() and stats() methods (described later this document).

The compare() method takes in at least two arguments, with the mandatory arguments being the paths of the files to be compared. The optional argument *args can be used for customization according to specific needs.

#!/usr/bin/env python3
import sys

import dacman
from dacman.compare import base
from dacman.compare.adaptor import DacmanRecord

import numpy
import numpy.linalg


class MatrixTxtPlugin(base.Comparator):

    @staticmethod
    def description():
        return "A Dac-Man plug-in to compare matrices saved as text files"

    @staticmethod
    def supports():
        return ['txt']

    def compare(self, path_a, path_b, *args):
        pass


# rest of the script
def run_matrix_change_ana(path_a, path_b):
    ...

Creating Dacman Records from sources

The first step to implement the compare() method is to create Dac-Man records of the input sources. Since we will also be using the same step to create a record for both files, we put the creation of a Dac-Man record in a separate helper method, get_record():

from dacman.compare.adaptor import DacmanRecord


class MatrixTxtPlugin(base.Comparator):

    def get_record(self, path):
        return DacmanRecord(path)

    def compare(self, path_a, path_b, *args):
        rec_a = self.get_record(path_a)
        rec_b = self.get_record(path_b)

At this point, the DacmanRecord objects are not yet capable of supporting our specific file format. This leads us to create a specialized Adaptor class, which we discussed in the next section.

Creating an Adaptor for a custom file format

The purpose of an Adaptor is to convert an arbitrary data source to a Dac-Man Record, which in turn exposes a common interface allowing the data to be compared in a structured way.

We begin by creating an Adaptor class for the format used by our datasets. This specialized Adaptor class will inherit from the dacman.compare.base.DacmanRecordAdaptor abstract base class, and implement the transform() method.

The transform() method takes a single argument, the path of the data file, and returns a tuple of 2 values. Each value is a sequence storing the content of the headers and the data of the file respectively.

from dacman.compare import base


class MatrixTxtAdaptor(base.DacmanRecordAdaptor):

    def transform(self, data_path):
        headers = []
        data = []
        return headers, data

Finally, we use the numpy.loadtxt() method to load the data into a numpy array:

from dacman.compare import base
import numpy


class MatrixTxtAdaptor(base.DacmanRecordAdaptor):

    def transform(self, data_path):
        headers = []
        data = numpy.loadtxt(data_path)

        return headers, data

Enabling support for custom file formats in Dac-Man Records

Now that our specialized MatrixTxtAdaptor class is complete, we use it to enable support for our custom file format in our plug-in as Dac-Man Records.

In the get_record() method of our MatrixTxtPlugin class, we pass all necessary information about the custom file format to the DacmanRecord object. For example, the Adaptor to use and the file extensions to be supported.

This will allow the DacmanRecord object to transform the source correctly.

class MatrixTxtAdaptor(base.DacmanRecordAdaptor):

    def transform(self, data_path):
        headers = []
        data = numpy.loadtxt(data_path)

        return headers, data


class MatrixTxtPlugin(base.Comparator):

    def get_record(self, path):
        ext = 'txt'

        rec = DacmanRecord(path)

        rec.file_support = {ext: True}
        rec.lib_support = {ext: 'numpy'}
        rec.file_adaptors = {ext: MatrixTxtAdaptor}

        rec._transform_source()

        return rec
    ...

Implementing specialized change metrics calculations for matrices

With the specialized MatrixTxtAdaptor, the matrix data contained in each source file, a 2D numpy array, can be accessed through the .data attribute of each DacmanRecord object. This transformation allows us to leverage available tools supporting numpy arrays to implement our calculations very efficiently.

We factor out all calculations to a separate method of the plug-in, get_matrix_change_metrics(), where the arguments are the two Dac-Man Records, and the method returns a dictionary containing the change metrics.

We also add a threshold parameter to the plug-in, which we use to specify the minimum (absolute) difference that two corresponding values from each matrix should be considered to have "changed".

import numpy
import numpy.linalg


class MatrixTxtPlugin(base.Comparator):

    threshold = 0.001

...

    def get_matrix_change_metrics(self, rec_a, rec_b):
        mat_a = rec_a.data
        mat_b = rec_b.data

        mat_delta = mat_a - mat_b

        n_values_over_threshold = numpy.sum(numpy.abs(mat_delta) > self.threshold)
        frac_changed = n_values_over_threshold / mat_delta.size

        sum_of_delta = numpy.sum(mat_delta)
        mean_of_delta = numpy.mean(mat_delta)
        norm_of_delta = numpy.linalg.norm(mat_delta)

        delta_max = numpy.max(mat_a) - numpy.max(mat_b)
        delta_min = numpy.min(mat_a) - numpy.min(mat_b)

        return {
            'frac_changed': frac_changed,
            'sum(A - B)': sum_of_delta,
            'mean(A - B)': mean_of_delta,
            'norm(A - B)': norm_of_delta,
            'delta(max(A) - max(B))': delta_max,
            'delta(min(A) - min(B))': delta_min,
        }

Next, we integrate these change metrics into the comparison by calling the method in the compare() method. We wrap the call to the get_matrix_change_metrics() method in a try/except statement to catch errors that can potentially occur during calculations, and store the error message if it occurs.

class MatrixTxtPlugin(base.Comparator):

    def compare(self, path_a, path_b, *args):
        rec_a = self.get_record(path_a)
        rec_b = self.get_record(path_b)

        try:
            self.metrics = self.get_matrix_change_metrics(rec_a, rec_b)
        except Exception as e:
            self.metrics = {'error': str(e)}

        return self.metrics

Adding methods for output

Finally, we implement the methods percent_change() and stats(). These methods are used to access information about the amount of change expressed as a single value, and more detailed information about the changes in the files being compared.

The choice of how to express these is left to the user. For this example, we use the previously calculated frac_changed for percent_change(), and print the results of the change metrics calculations in its entirety:

class MatrixTxtPlugin(base.Comparator):
...

    def percent_change(self):
        frac = self.metrics.get('frac_changed', 0)
        return frac * 100

    def stats(self, changes):
        print(changes)

Integrating the plug-in in the custom change analysis

At this point, the only thing remaining is to integrate our specialized plug-in into Dac-Man's processing pipeline.

In the run_matrix_change_ana() method, we specify MatrixTxtPlugin as the plug-in to be used in the use_plugin() method of the DataDiffer object:

#!/usr/bin/env python3
import sys

import dacman
from dacman.compare import base
from dacman.compare.adaptor import DacmanRecord

import numpy
import numpy.linalg


class MatrixTxtPlugin(base.Comparator):
    ... # rest of the plug-in code


def run_matrix_change_ana(path_a, path_b):
    comparisons = [(path_a, path_b)]
    differ = dacman.DataDiffer(comparisons, dacman.Executor.DEFAULT)
    differ.use_plugin(MatrixTxtPlugin)
    differ.start()


if __name__ == '__main__':
    cli_args = sys.argv[1:]
    path_a, path_b = cli_args[0], cli_args[1]
    run_matrix_change_ana(path_a, path_b)

Complete change analysis script

The complete code for this change analysis script is the following:

#!/usr/bin/env python3
import sys

import dacman
from dacman.compare import base
from dacman.compare.adaptor import DacmanRecord

import numpy
import numpy.linalg


class MatrixTxtAdaptor(base.DacmanRecordAdaptor):

    def transform(self, data_path):
        headers = []
        data = numpy.loadtxt(data_path)

        return headers, data


class MatrixTxtPlugin(base.Comparator):

    threshold = 0.01

    @staticmethod
    def description():
        return "A Dac-Man plug-in to compare matrices saved as text files"

    @staticmethod
    def supports():
        return ['txt']

    def get_record(self, path):
        ext = 'txt'

        rec = DacmanRecord(path)

        rec.file_support = {ext: True}
        rec.lib_support = {ext: 'numpy'}
        rec.file_adaptors = {ext: MatrixTxtAdaptor}

        rec._transform_source()

        return rec

    def compare(self, path_a, path_b, *args):
        rec_a = self.get_record(path_a)
        rec_b = self.get_record(path_b)

        try:
            self.metrics = self.get_matrix_change_metrics(rec_a, rec_b)
        except Exception as e:
            self.metrics = {'error': str(e)}

        return self.metrics

    def get_matrix_change_metrics(self, rec_a, rec_b):
        mat_a = rec_a.data
        mat_b = rec_b.data

        mat_delta = mat_a - mat_b

        n_values_over_threshold = numpy.sum(numpy.abs(mat_delta) > self.threshold)
        frac_changed = n_values_over_threshold / mat_delta.size

        sum_of_delta = numpy.sum(mat_delta)
        mean_of_delta = numpy.mean(mat_delta)
        norm_of_delta = numpy.linalg.norm(mat_delta)

        delta_max = numpy.max(mat_a) - numpy.max(mat_b)
        delta_min = numpy.min(mat_a) - numpy.min(mat_b)

        return {
            'frac_changed': frac_changed,
            'sum(A - B)': sum_of_delta,
            'mean(A - B)': mean_of_delta,
            'norm(A - B)': norm_of_delta,
            'delta(max(A) - max(B))': delta_max,
            'delta(min(A) - min(B))': delta_min,
        }

    def percent_change(self):
        frac = self.metrics.get('frac_changed', 0)
        return frac * 100

    def stats(self, changes):
        print(changes)


def run_matrix_change_ana(path_a, path_b):
    comparisons = [(path_a, path_b)]
    differ = dacman.DataDiffer(comparisons, dacman.Executor.DEFAULT)
    differ.use_plugin(MatrixTxtPlugin)
    differ.start()


if __name__ == '__main__':
    cli_args = sys.argv[1:]
    path_a, path_b = cli_args[0], cli_args[1]
    run_matrix_change_ana(path_a, path_b)

Running the change analysis

In order to make the /home/user/matrix_change_ana.py file executable, use the chmod command as follows:

chmod +x /home/user/matrix_change_ana.py

Example

A ready-to-use script for this example can be found under examples/scripts/matrix_change_ana.py.

To run the change analysis on two versions of an example dataset, navigate to the example/data/matrix_txt directory and run:

dacman diff --datachange dataset-v1 dataset-v2 --script /home/user/matrix_change_ana.py