User Guide

This page introduces the functionality of the Robotic Drive Analyzer’s (RDA) Python packages on code level. In this User Guide the following main sections are provided:

  • Index Location - Guide on how to set the storage location of the RDA Index files
  • Python Examples - Description of the Python API
  • Data Fusion - Description of how to align different data columns

Index Location

The Robotic Drive Analyzer builds an index file for every file it is reading in order to work on it in a distributed way. The location of these index files can be handled as follows:

  • Default: Per default the index files are saved to /tmp/indexes/path/to/file. I.e. for example /home/user/file.bag will have an index file in /tmp/indexes/home/user/file.bag. If /tmp/indexes does not exist, it will be created.
  • Setting the index path via an environment variable: The path to the index file can also be set as an environment variable INDEX_STORAGE_PATH. E.g. if INDEX_STORAGE_PATH=/tmp and we read a file /home/user/file.bag the index will be stored in /tmp/home/user/file.bag. On a cluster this variable has to be passed to every executor, e.g. by setting spark.executorEnv.INDEX_STORAGE_PATH to the preferred value in the Spark interpreter settings of Zeppelin.
  • Setting the index path via Spark configuration: Another way to set the path of the index is to set a variable in the Spark (when reading files) and Hadoop (when reading metadata) configuration. This is given priority over environment variables, such that a application wide environment can be used, but single exceptions can be specified in this way. Following code shows how to achieve this:
# Python - assuming the Spark Session is provided in a variable 'spark'
# The index path can be set when ever a new RDA object is created. E.g.:
rda = RoboticDriveAnalyzerForRos(listOfFiles, IndexPath)
  • Saving in the same directory: To save the index in the same directory as the file it describes (old default), set the configuration or the environment variable to SAVE_INDEX_WITH_FILE. E.g. INDEX_STORAGE_PATH=SAVE_INDEX_WITH_FILE

Python Examples

In this section the Python Api of the Robotic Drive Analyzer is introduced. It can for example be used in Zeppelin Notebooks or the PySpark Shell. The examples assume that a spark session is available which is named “spark”. The following sections are provided:

  • MetadataDiscovery - How to read metadata of ROSBag and MDF4 files
  • ROSBag - How to read ROSBag files
  • MDF4 - How to read MDF4 files

MetadataDiscovery

Metadata discovery allows to inspect files in a passed list of directories and extract basic information about contents. Since users can have only some of the analyzers available, metadata provider class names for particular file types can be provided as optional third MetadataDiscovery constructor parameter. All Analyzer classes such as RoboticDriveAnalyzerForRos have a METADATA_PROVIDER constant that can be used. Let’s assume a user has all three analyzers available and wants to use MetadataDiscovery directly to inspect them.

import pyspark
from roboticdrive.metadata.MetadataDiscovery import *
from roboticdrive.ros_analyzer.RoboticDriveAnalyzerForRos import RoboticDriveAnalyzerForRos
from roboticdrive.mdf4_analyzer.RoboticDriveAnalyzerForMDF4 import RoboticDriveAnalyzerForMDF4
...
files = "path/to/files (comma separated for S3)"
meta_provider = MetadataDiscovery(files,"",[RoboticDriveAnalyzerForRos.METADATA_PROVIDER,RoboticDriveAnalyzerForMDF4.METADATA_PROVIDER])
metadata = meta_provider.getMetadata()
metadata.show()

This example builds indexes for all types of all files that are pointed out by the variable files under a default directory (which is the same as the files reside in - as “” is passed as second parameter of MetadataDiscovery) and extracts basic info that is returned as a DataFrame as result.

To fully flatten the metadata dataframe (giving a new row per stream and per file) the following selection can be run.

flattenedMetadata = metadata.withColumn("streams", explode("streamsMetadata")).select("fileMetadata.*", "fsMetadata.*","streams.*").drop("messageDefinition")
flattenedMetadata.show(truncate = False)

+----------------+--------+------------------------------------+-----+
|allMessagesCount|duration|typeSpecificAttributesInJSon        |... *|                                                                                                                                                                            |vin|accessTime         |errorMessage|fileOk|filePath                        |fileSize |fileType|modificationTime   |owner|messageCount|messageHash                     |messageType                         |streamName                           |
+----------------+--------+------------------------------------+-----+
|410816          |99792   |{"connCount":39,"chunkCount":421,...|... *|
|410816          |99792   |{"connCount":39,"chunkCount":421,...|... *|
...

* some columns were removed to improve the layout

Lets assume now we’re interested in further processing only a subset of files based on some metadata criteria like - only correct ROSBAG files:

correct_files = metadata.select("*").where("fileMetadata.allMessagesCount > 1000 AND fsMetadata.fileOk = true AND fsMetadata.fileType = 'ROSBAG'")
list_of_bag_files = MetadataDiscovery.getListOfFiles(correct_files)

The first line of the presented example creates a dataframe that we are interested in and the second frame creates a string representation of paths that can be passed to RoboticDriveAnalyzerForRos later on.

Based on this, we move on to the Analyzers Python API.

ROSBag

Assume we started with list_of_bag_files from the MetadataDiscovery example above.

...
list_of_bag_files = MetadataDiscovery.getListOfFiles(correct_files)
from roboticdrive.ros_analyzer.RoboticDriveAnalyzerForRos import RoboticDriveAnalyzerForRos

rda_from_list_of_files = RoboticDriveAnalyzerForRos(list_of_bag_files)
rda_from_list_of_files.metadata.show()

RoboticDriveAnalyzerForRos was initialized with prepared files in advance with usage of MetadataDiscovery and part of the resulting dataframe was printed out with the show() method.

If we started with any arbitrary list of paths, some of the files could be corrupt. In order to spot them, convenience methods were created namely getCorruptBagFiles() and getCorrectBagFiles().

from roboticdrive.ros_analyzer.RoboticDriveAnalyzerForRos import RoboticDriveAnalyzerForRos
files = "path/to/files (comma separated for S3)"
rda = RoboticDriveAnalyzerForRos(files)
rda.getCorruptBagFiles().show()

With buildDataSet dataframes can be created from topics or lists of topics.

df = rda.buildDataSet("/center_camera/image_color");
df.printSchema();
df.show(10);

MDF4

We introduce a simple python program that prints metadata about the given MDF4 file and builds a Dataset with a selected stream.

from roboticdrive.metadata.MetadataDiscovery import *
from roboticdrive.mdf4_analyzer.RoboticDriveAnalyzerForMDF4 import RoboticDriveAnalyzerForMDF4
file = "path/to/file.mf4"

rda = RoboticDriveAnalyzerForMDF4(file)
rda.metadata.show()
DG1_CG1 = rda.buildDataSet("DG1_CG1")
DG1_CG1.show()

Data Fusion

One of the major problems with analyzing such great volumes of data is how to prepare a consistent view over what happened in the car during the measurement period. The following are among the problems:

  • Multiple sensors that can be out of sync or can have a different resolution of data
  • Data can be stored in different formats
  • In most cases, there is no simple key that lets us join different data sets

In order to overcome those issues, Data Fusion functionality was created. It assumes users can create a spark DataFrame with data that has a timestamp among one of the columns and would like to “join” or “merge” data based on time. Thanks to the Analyzers functionality that lets users do just that - create a spark DataFrame out of different data formats - the assumptions of Data Fusion play nicely with the rest of the functionality of the library.

In the following we will look at following groups of join functions:

  • Joining data by nearest neighbour
  • Joining data based on time windows
  • Unions
  • Joining data during the read (ROSBag only)

Joining data by nearest neighbour

Nearest neighbour without constraints

Let’s say we have two data frames containing data from two separate sensors and the task at hand is to find the best matching datapoint from the second data frame for each record in the first data frame. The “Best match” is the record that is closest in terms of time duration that passed either before or after measurements.

import pyspark
from roboticdrive.ros_analyzer.RoboticDriveAnalyzerForRos import RoboticDriveAnalyzerForRos
from roboticdrive.datafusion.DataFusion import DataFusion

listOfFiles = "path/to/files (comma separated for S3)"
rda = RoboticDriveAnalyzerForRos(listOfFiles)
sonar_cloud = rda.buildDataSet("/vehicle/sonar_cloud")
gps_fix = rda.buildDataSet("/vehicle/gps/fix")
gpsWithCloud = DataFusion.joinDataFramesWithNearestNeighbour(sonar_cloud,"header.stamp", gps_fix,"header.stamp")
gpsWithCloud.show()

Nearest neighbour in a window

In this example, the closest match from gps_fix is found and merged together for each sonar_cloud row. If we knew that there were gaps in gps_fix data (i.e. due to temporary sensor defect), we could want to specify a range in which a match is considered to be valid. Anything that was found outside a specified window is treated as “no match” and null values are applied as result of the join for this particular record.

import pyspark
from roboticdrive.ros_analyzer.RoboticDriveAnalyzerForRos import RoboticDriveAnalyzerForRos
from roboticdrive.datafusion.DataFusion import DataFusion

listOfFiles = "path/to/.bag files (comma separated for S3)"
rda = RoboticDriveAnalyzerForRos(listOfFiles)
sonar_cloud = rda.buildDataSet("/vehicle/sonar_cloud")
gps_fix = rda.buildDataSet("/vehicle/gps/fix")
gpsWithCloud = DataFusion.joinDataFramesWithNearestNeighbourInRange(sonar_cloud,"header.stamp", gps_fix,"header.stamp", 100, True)
gpsWithCloud.show()

Please note the last two parameters of the method joinDataFramesWithNearestNeighbourInRange():

  • 100 is the range in milliseconds
  • True is a flag denoting if null matches should be preserved or not (in this case null values are preserved, so we’ll see all records from the sonar_cloud data frame regardless if a match was found or not)

Joining with nearest lower or nearest greater

Sometimes nearest value is not the best one. Nearest lower or nearest greater could be better or value calculated based on those two. In order to efficiently use those values special set of methods are available:

Nearest lower and greater

Two kinds of methods were introduced:

  • One returns lower values in one set of columns with “_before” suffix and another set of columns with “_after” suffix - DataFusion.joinBeforeAndAfter(…) with it’s “range” variant
  • One returns lower and greater values as separate rows in the same column - DataFusion.joinBeforeAndAfterWithUnion(…) with its “range” variant

DataFusion.joinBeforeAndAfterWithUnion example

import pyspark
from roboticdrive.ros_analyzer.RoboticDriveAnalyzerForRos import RoboticDriveAnalyzerForRos
from roboticdrive.datafusion.DataFusion import DataFusion

listOfFiles = "path/to/.bag files (comma separated for S3)"
rda = RoboticDriveAnalyzerForRos(listOfFiles)
sonar_cloud = rda.buildDataSet("/vehicle/sonar_cloud")
gps_fix = rda.buildDataSet("/vehicle/gps/fix")
gpsWithCloud = DataFusion.joinBeforeAndAfterWithUnion(sonar_cloud,"header.stamp", gps_fix,"header.stamp")
gpsWithCloud.show()

Let’s suppose we have two DataFrames:

leftDF:

+-----------------------+-----+
|time                   |value|
+-----------------------+-----+
|2018-08-01 17:43:20.001|A    |
|2018-08-01 17:43:20.011|B    |
|2018-08-01 17:43:20.012|C    |
|2018-08-01 17:43:20.015|D    |
|2018-08-01 17:43:20.017|E    |
|2018-08-01 17:43:20.04 |F    |
|2018-08-01 17:43:20.046|G    |
+-----------------------+-----+

and rightDF

+-----------------------+-----+
|time                   |value|
+-----------------------+-----+
|2018-08-01 17:43:20.011|J    |
|2018-08-01 17:43:20.012|K    |
|2018-08-01 17:43:20.013|L    |
|2018-08-01 17:43:20.014|M    |
|2018-08-01 17:43:20.015|N    |
|2018-08-01 17:43:20.016|O    |
|2018-08-01 17:43:20.021|P    |
|2018-08-01 17:43:20.041|Q    |
+-----------------------+-----+

joinBeforeAndAfterWithUnion would produce the following result:

+-----------------------+-----+-----------------------+-------+
|time                   |value|time_1                 |value_1|
+-----------------------+-----+-----------------------+-------+
|2018-08-01 17:43:20.001|A    |null                   |null   |
|2018-08-01 17:43:20.001|A    |2018-08-01 17:43:20.011|J      |
|2018-08-01 17:43:20.011|B    |2018-08-01 17:43:20.011|J      |
|2018-08-01 17:43:20.011|B    |2018-08-01 17:43:20.012|K      |
|2018-08-01 17:43:20.012|C    |2018-08-01 17:43:20.012|K      |
|2018-08-01 17:43:20.012|C    |2018-08-01 17:43:20.013|L      |
|2018-08-01 17:43:20.015|D    |2018-08-01 17:43:20.015|N      |
|2018-08-01 17:43:20.015|D    |2018-08-01 17:43:20.016|O      |
|2018-08-01 17:43:20.017|E    |2018-08-01 17:43:20.016|O      |
|2018-08-01 17:43:20.017|E    |2018-08-01 17:43:20.021|P      |
|2018-08-01 17:43:20.04 |F    |2018-08-01 17:43:20.021|P      |
|2018-08-01 17:43:20.04 |F    |2018-08-01 17:43:20.041|Q      |
|2018-08-01 17:43:20.046|G    |2018-08-01 17:43:20.041|Q      |
|2018-08-01 17:43:20.046|G    |null                   |null   |
+-----------------------+-----+-----------------------+-------+

joinBeforeAndAfter example

Joining the same leftDF with rightDF as in previous example with joinBeforeAndAfter would produce different result:

+-----------------------+-----+-----------------------+------------+-----+
|time                   |value|time_before            |value_before|... *|
+-----------------------+-----+-----------------------+------------+-----+
|2018-08-01 17:43:20.001|A    |null                   |null        |... *|
|2018-08-01 17:43:20.011|B    |2018-08-01 17:43:20.011|J           |... *|
|2018-08-01 17:43:20.012|C    |2018-08-01 17:43:20.012|K           |... *|
|2018-08-01 17:43:20.015|D    |2018-08-01 17:43:20.015|N           |... *|
|2018-08-01 17:43:20.017|E    |2018-08-01 17:43:20.016|O           |... *|
|2018-08-01 17:43:20.04 |F    |2018-08-01 17:43:20.021|P           |... *|
|2018-08-01 17:43:20.046|G    |2018-08-01 17:43:20.041|Q           |... *|
+-----------------------+-----+-----------------------+------------+-----+
* some columns were removed to improve the layout

Other methods

For both described runctions their “range” variants are available:

  • joinBeforeAndAfterInRangeWithUnion
  • joinBeforeAndAfterInRange

Also two methods to join with lower or greater are available:

  • joinWithNearestLowerOrEqual
  • joinWithNearestGreater

Joining data on time windows

Apart from joining by nearest neighbour there is a set of methods that allows us to join by a defined window size. Records are assigned to windows based on a specified time frame and are joined based on their assignment. Depending on the method, a user can specify if all records that are a product of the join on each window (joinDataSetsOnTimeWindows) or only the first row from each window (joinDataSetsOnTimeWindowsAndReturnFirstMatch) should be returned.

Example:

import pyspark
from roboticdrive.ros_analyzer.RoboticDriveAnalyzerForRos import RoboticDriveAnalyzerForRos
from roboticdrive.datafusion.DataFusion import DataFusion

listOfFiles = "path/to/.bag files (comma separated for S3)"
rda = RoboticDriveAnalyzerForRos(listOfFiles)
sonar_cloud = rda.buildDataSet("/vehicle/sonar_cloud")
gps_fix = rda.buildDataSet("/vehicle/gps/fix")
gpsWithCloud = DataFusion.joinDataSetsOnTimeWindowsAndReturnFirstMatch(sonar_cloud, gps_fix,"header.stamp", "1 second", "left_outer")
gpsWithCloud.show()

Data unions and other

Users can also create a union of data from two different data frames, which is a data frame that has all columns from the first data frame and all from the second one. Each row will have not null values in columns depending on which data frame it originates from.

Union on two dataframes

import pyspark
from roboticdrive.ros_analyzer.RoboticDriveAnalyzerForRos import RoboticDriveAnalyzerForRos
from roboticdrive.datafusion.DataFusion import DataFusion

listOfFiles = "path/to/.bag files (comma separated for S3)"
rda = RoboticDriveAnalyzerForRos(listOfFiles)
sonar_cloud = rda.buildDataSet("/vehicle/sonar_cloud")
gps_fix = rda.buildDataSet("/vehicle/gps/fix")
gpsWithCloud = DataFusion.getUnionedDataSet(sonar_cloud, gps_fix)
gpsWithCloud.show()

Contact

© Copyright 2021, DXC Technology