User Guide¶
This page introduces the functionality of the Robotic Drive Analyzer’s (RDA) Python packages on code level. In this User Guide the following main sections are provided:
- Index Location - Guide on how to set the storage location of the RDA Index files
- Python Examples - Description of the Python API
- Data Fusion - Description of how to align different data columns
Index Location¶
The Robotic Drive Analyzer builds an index file for every file it is reading in order to work on it in a distributed way. The location of these index files can be handled as follows:
- Default: Per default the index files are saved to
/tmp/indexes/path/to/file. I.e. for example/home/user/file.bagwill have an index file in/tmp/indexes/home/user/file.bag. If/tmp/indexesdoes not exist, it will be created. - Setting the index path via an environment variable: The path to the index file can also be set as an environment variable
INDEX_STORAGE_PATH. E.g. ifINDEX_STORAGE_PATH=/tmpand we read a file/home/user/file.bagthe index will be stored in/tmp/home/user/file.bag. On a cluster this variable has to be passed to every executor, e.g. by settingspark.executorEnv.INDEX_STORAGE_PATHto the preferred value in the Spark interpreter settings of Zeppelin. - Setting the index path via Spark configuration: Another way to set the path of the index is to set a variable in the Spark (when reading files) and Hadoop (when reading metadata) configuration. This is given priority over environment variables, such that a application wide environment can be used, but single exceptions can be specified in this way. Following code shows how to achieve this:
# Python - assuming the Spark Session is provided in a variable 'spark'
# The index path can be set when ever a new RDA object is created. E.g.:
rda = RoboticDriveAnalyzerForRos(listOfFiles, IndexPath)
- Saving in the same directory: To save the index in the same directory as the file it describes (old default), set the configuration or the environment variable to
SAVE_INDEX_WITH_FILE. E.g.INDEX_STORAGE_PATH=SAVE_INDEX_WITH_FILE
Python Examples¶
In this section the Python Api of the Robotic Drive Analyzer is introduced. It can for example be used in Zeppelin Notebooks or the PySpark Shell. The examples assume that a spark session is available which is named “spark”. The following sections are provided:
- MetadataDiscovery - How to read metadata of ROSBag and MDF4 files
- ROSBag - How to read ROSBag files
- MDF4 - How to read MDF4 files
MetadataDiscovery¶
Metadata discovery allows to inspect files in a passed list of
directories and extract basic information about contents. Since users
can have only some of the analyzers available, metadata provider class
names for particular file types can be provided as optional third
MetadataDiscovery constructor parameter. All Analyzer classes such as
RoboticDriveAnalyzerForRos have a METADATA_PROVIDER constant
that can be used. Let’s assume a user has all three analyzers available
and wants to use MetadataDiscovery directly to inspect them.
import pyspark
from roboticdrive.metadata.MetadataDiscovery import *
from roboticdrive.ros_analyzer.RoboticDriveAnalyzerForRos import RoboticDriveAnalyzerForRos
from roboticdrive.mdf4_analyzer.RoboticDriveAnalyzerForMDF4 import RoboticDriveAnalyzerForMDF4
...
files = "path/to/files (comma separated for S3)"
meta_provider = MetadataDiscovery(files,"",[RoboticDriveAnalyzerForRos.METADATA_PROVIDER,RoboticDriveAnalyzerForMDF4.METADATA_PROVIDER])
metadata = meta_provider.getMetadata()
metadata.show()
This example builds indexes for all types of all files that are pointed
out by the variable files under a default directory (which is the
same as the files reside in - as “” is passed as second parameter of
MetadataDiscovery) and extracts basic info that is returned as a
DataFrame as result.
To fully flatten the metadata dataframe (giving a new row per stream and per file) the following selection can be run.
flattenedMetadata = metadata.withColumn("streams", explode("streamsMetadata")).select("fileMetadata.*", "fsMetadata.*","streams.*").drop("messageDefinition")
flattenedMetadata.show(truncate = False)
+----------------+--------+------------------------------------+-----+
|allMessagesCount|duration|typeSpecificAttributesInJSon |... *| |vin|accessTime |errorMessage|fileOk|filePath |fileSize |fileType|modificationTime |owner|messageCount|messageHash |messageType |streamName |
+----------------+--------+------------------------------------+-----+
|410816 |99792 |{"connCount":39,"chunkCount":421,...|... *|
|410816 |99792 |{"connCount":39,"chunkCount":421,...|... *|
...
* some columns were removed to improve the layout
Lets assume now we’re interested in further processing only a subset of files based on some metadata criteria like - only correct ROSBAG files:
correct_files = metadata.select("*").where("fileMetadata.allMessagesCount > 1000 AND fsMetadata.fileOk = true AND fsMetadata.fileType = 'ROSBAG'")
list_of_bag_files = MetadataDiscovery.getListOfFiles(correct_files)
The first line of the presented example creates a dataframe that we are interested in and the second frame creates a string representation of paths that can be passed to RoboticDriveAnalyzerForRos later on.
Based on this, we move on to the Analyzers Python API.
ROSBag¶
Assume we started with list_of_bag_files from the MetadataDiscovery example above.
...
list_of_bag_files = MetadataDiscovery.getListOfFiles(correct_files)
from roboticdrive.ros_analyzer.RoboticDriveAnalyzerForRos import RoboticDriveAnalyzerForRos
rda_from_list_of_files = RoboticDriveAnalyzerForRos(list_of_bag_files)
rda_from_list_of_files.metadata.show()
RoboticDriveAnalyzerForRos was initialized with prepared files in advance with usage of MetadataDiscovery and part of the resulting dataframe was printed out with the show() method.
If we started with any arbitrary list of paths, some of the files could
be corrupt. In order to spot them, convenience methods were created
namely getCorruptBagFiles() and getCorrectBagFiles().
from roboticdrive.ros_analyzer.RoboticDriveAnalyzerForRos import RoboticDriveAnalyzerForRos
files = "path/to/files (comma separated for S3)"
rda = RoboticDriveAnalyzerForRos(files)
rda.getCorruptBagFiles().show()
With buildDataSet dataframes can be created from topics or lists of
topics.
df = rda.buildDataSet("/center_camera/image_color");
df.printSchema();
df.show(10);
MDF4¶
We introduce a simple python program that prints metadata about the given MDF4 file and builds a Dataset with a selected stream.
from roboticdrive.metadata.MetadataDiscovery import *
from roboticdrive.mdf4_analyzer.RoboticDriveAnalyzerForMDF4 import RoboticDriveAnalyzerForMDF4
file = "path/to/file.mf4"
rda = RoboticDriveAnalyzerForMDF4(file)
rda.metadata.show()
DG1_CG1 = rda.buildDataSet("DG1_CG1")
DG1_CG1.show()
Data Fusion¶
One of the major problems with analyzing such great volumes of data is how to prepare a consistent view over what happened in the car during the measurement period. The following are among the problems:
- Multiple sensors that can be out of sync or can have a different resolution of data
- Data can be stored in different formats
- In most cases, there is no simple key that lets us join different data sets
In order to overcome those issues, Data Fusion functionality was created. It assumes users can create a spark DataFrame with data that has a timestamp among one of the columns and would like to “join” or “merge” data based on time. Thanks to the Analyzers functionality that lets users do just that - create a spark DataFrame out of different data formats - the assumptions of Data Fusion play nicely with the rest of the functionality of the library.
In the following we will look at following groups of join functions:
- Joining data by nearest neighbour
- Joining data based on time windows
- Unions
- Joining data during the read (ROSBag only)
Joining data by nearest neighbour¶
Nearest neighbour without constraints¶
Let’s say we have two data frames containing data from two separate sensors and the task at hand is to find the best matching datapoint from the second data frame for each record in the first data frame. The “Best match” is the record that is closest in terms of time duration that passed either before or after measurements.
import pyspark
from roboticdrive.ros_analyzer.RoboticDriveAnalyzerForRos import RoboticDriveAnalyzerForRos
from roboticdrive.datafusion.DataFusion import DataFusion
listOfFiles = "path/to/files (comma separated for S3)"
rda = RoboticDriveAnalyzerForRos(listOfFiles)
sonar_cloud = rda.buildDataSet("/vehicle/sonar_cloud")
gps_fix = rda.buildDataSet("/vehicle/gps/fix")
gpsWithCloud = DataFusion.joinDataFramesWithNearestNeighbour(sonar_cloud,"header.stamp", gps_fix,"header.stamp")
gpsWithCloud.show()
Nearest neighbour in a window¶
In this example, the closest match from gps_fix is found and merged
together for each sonar_cloud row. If we knew that there were gaps in
gps_fix data (i.e. due to temporary sensor defect), we could want to
specify a range in which a match is considered to be valid. Anything
that was found outside a specified window is treated as “no match” and
null values are applied as result of the join for this particular
record.
import pyspark
from roboticdrive.ros_analyzer.RoboticDriveAnalyzerForRos import RoboticDriveAnalyzerForRos
from roboticdrive.datafusion.DataFusion import DataFusion
listOfFiles = "path/to/.bag files (comma separated for S3)"
rda = RoboticDriveAnalyzerForRos(listOfFiles)
sonar_cloud = rda.buildDataSet("/vehicle/sonar_cloud")
gps_fix = rda.buildDataSet("/vehicle/gps/fix")
gpsWithCloud = DataFusion.joinDataFramesWithNearestNeighbourInRange(sonar_cloud,"header.stamp", gps_fix,"header.stamp", 100, True)
gpsWithCloud.show()
Please note the last two parameters of the method
joinDataFramesWithNearestNeighbourInRange():
- 100 is the range in milliseconds
- True is a flag denoting if null matches should be preserved or not
(in this case null values are preserved, so we’ll see all records
from the
sonar_clouddata frame regardless if a match was found or not)
Joining with nearest lower or nearest greater¶
Sometimes nearest value is not the best one. Nearest lower or nearest greater could be better or value calculated based on those two. In order to efficiently use those values special set of methods are available:
Nearest lower and greater¶
Two kinds of methods were introduced:
- One returns lower values in one set of columns with “_before” suffix
and another set of columns with “_after” suffix -
DataFusion.joinBeforeAndAfter(…)with it’s “range” variant - One returns lower and greater values as separate rows in the same
column -
DataFusion.joinBeforeAndAfterWithUnion(…)with its “range” variant
DataFusion.joinBeforeAndAfterWithUnion example¶
import pyspark
from roboticdrive.ros_analyzer.RoboticDriveAnalyzerForRos import RoboticDriveAnalyzerForRos
from roboticdrive.datafusion.DataFusion import DataFusion
listOfFiles = "path/to/.bag files (comma separated for S3)"
rda = RoboticDriveAnalyzerForRos(listOfFiles)
sonar_cloud = rda.buildDataSet("/vehicle/sonar_cloud")
gps_fix = rda.buildDataSet("/vehicle/gps/fix")
gpsWithCloud = DataFusion.joinBeforeAndAfterWithUnion(sonar_cloud,"header.stamp", gps_fix,"header.stamp")
gpsWithCloud.show()
Let’s suppose we have two DataFrames:
leftDF:
+-----------------------+-----+
|time |value|
+-----------------------+-----+
|2018-08-01 17:43:20.001|A |
|2018-08-01 17:43:20.011|B |
|2018-08-01 17:43:20.012|C |
|2018-08-01 17:43:20.015|D |
|2018-08-01 17:43:20.017|E |
|2018-08-01 17:43:20.04 |F |
|2018-08-01 17:43:20.046|G |
+-----------------------+-----+
and rightDF
+-----------------------+-----+
|time |value|
+-----------------------+-----+
|2018-08-01 17:43:20.011|J |
|2018-08-01 17:43:20.012|K |
|2018-08-01 17:43:20.013|L |
|2018-08-01 17:43:20.014|M |
|2018-08-01 17:43:20.015|N |
|2018-08-01 17:43:20.016|O |
|2018-08-01 17:43:20.021|P |
|2018-08-01 17:43:20.041|Q |
+-----------------------+-----+
joinBeforeAndAfterWithUnion would produce the following result:
+-----------------------+-----+-----------------------+-------+
|time |value|time_1 |value_1|
+-----------------------+-----+-----------------------+-------+
|2018-08-01 17:43:20.001|A |null |null |
|2018-08-01 17:43:20.001|A |2018-08-01 17:43:20.011|J |
|2018-08-01 17:43:20.011|B |2018-08-01 17:43:20.011|J |
|2018-08-01 17:43:20.011|B |2018-08-01 17:43:20.012|K |
|2018-08-01 17:43:20.012|C |2018-08-01 17:43:20.012|K |
|2018-08-01 17:43:20.012|C |2018-08-01 17:43:20.013|L |
|2018-08-01 17:43:20.015|D |2018-08-01 17:43:20.015|N |
|2018-08-01 17:43:20.015|D |2018-08-01 17:43:20.016|O |
|2018-08-01 17:43:20.017|E |2018-08-01 17:43:20.016|O |
|2018-08-01 17:43:20.017|E |2018-08-01 17:43:20.021|P |
|2018-08-01 17:43:20.04 |F |2018-08-01 17:43:20.021|P |
|2018-08-01 17:43:20.04 |F |2018-08-01 17:43:20.041|Q |
|2018-08-01 17:43:20.046|G |2018-08-01 17:43:20.041|Q |
|2018-08-01 17:43:20.046|G |null |null |
+-----------------------+-----+-----------------------+-------+
joinBeforeAndAfter example¶
Joining the same leftDF with rightDF as in previous example with
joinBeforeAndAfter would produce different result:
+-----------------------+-----+-----------------------+------------+-----+
|time |value|time_before |value_before|... *|
+-----------------------+-----+-----------------------+------------+-----+
|2018-08-01 17:43:20.001|A |null |null |... *|
|2018-08-01 17:43:20.011|B |2018-08-01 17:43:20.011|J |... *|
|2018-08-01 17:43:20.012|C |2018-08-01 17:43:20.012|K |... *|
|2018-08-01 17:43:20.015|D |2018-08-01 17:43:20.015|N |... *|
|2018-08-01 17:43:20.017|E |2018-08-01 17:43:20.016|O |... *|
|2018-08-01 17:43:20.04 |F |2018-08-01 17:43:20.021|P |... *|
|2018-08-01 17:43:20.046|G |2018-08-01 17:43:20.041|Q |... *|
+-----------------------+-----+-----------------------+------------+-----+
* some columns were removed to improve the layout
Other methods¶
For both described runctions their “range” variants are available:
joinBeforeAndAfterInRangeWithUnionjoinBeforeAndAfterInRange
Also two methods to join with lower or greater are available:
joinWithNearestLowerOrEqualjoinWithNearestGreater
Joining data on time windows¶
Apart from joining by nearest neighbour there is a set of methods that
allows us to join by a defined window size. Records are assigned to
windows based on a specified time frame and are joined based on their
assignment. Depending on the method, a user can specify if all records
that are a product of the join on each window
(joinDataSetsOnTimeWindows) or only the first row from each window
(joinDataSetsOnTimeWindowsAndReturnFirstMatch) should be returned.
Example:
import pyspark
from roboticdrive.ros_analyzer.RoboticDriveAnalyzerForRos import RoboticDriveAnalyzerForRos
from roboticdrive.datafusion.DataFusion import DataFusion
listOfFiles = "path/to/.bag files (comma separated for S3)"
rda = RoboticDriveAnalyzerForRos(listOfFiles)
sonar_cloud = rda.buildDataSet("/vehicle/sonar_cloud")
gps_fix = rda.buildDataSet("/vehicle/gps/fix")
gpsWithCloud = DataFusion.joinDataSetsOnTimeWindowsAndReturnFirstMatch(sonar_cloud, gps_fix,"header.stamp", "1 second", "left_outer")
gpsWithCloud.show()
Data unions and other¶
Users can also create a union of data from two different data frames,
which is a data frame that has all columns from the first data frame and
all from the second one. Each row will have not null values in columns
depending on which data frame it originates from.
Union on two dataframes¶
import pyspark
from roboticdrive.ros_analyzer.RoboticDriveAnalyzerForRos import RoboticDriveAnalyzerForRos
from roboticdrive.datafusion.DataFusion import DataFusion
listOfFiles = "path/to/.bag files (comma separated for S3)"
rda = RoboticDriveAnalyzerForRos(listOfFiles)
sonar_cloud = rda.buildDataSet("/vehicle/sonar_cloud")
gps_fix = rda.buildDataSet("/vehicle/gps/fix")
gpsWithCloud = DataFusion.getUnionedDataSet(sonar_cloud, gps_fix)
gpsWithCloud.show()
Contact¶
© Copyright 2021, DXC Technology