Working with StreamSets#

Often you will want to query and work with multiple streams instead of just an individual stream - StreamSets allow you to do this effectively. It is a light wrapper around a list of Stream objects with convenience methods provided to help you work with multiple streams of data.

Creating a StreamSet#

Creating a StreamSet is relatively simple assuming you have a UUID for each stream that should be a member. In the future, other options may exist such as providing collection or tag matching parameters.

UUIDs = [
    uuid.UUID('0d22a53b-e2ef-4e0a-ab89-b2d48fb2592a'),
    uuid.UUID('17dbe387-89ea-42b6-864b-f505cdb483f5'),
    uuid.UUID('71466a91-dcfe-42ea-9e88-87c51f847942'),
    uuid.UUID('570aa71d-fb4f-456f-8533-2b11a28fa1f5')
]

streams = conn.streams(*UUIDs)

If you’ve already obtained a list of Stream objects, you may create a StreamSet directly by providing a list of streams for initialization.

streams = StreamSet([stream1, stream2, stream3])

Filtering#

To apply query parameters to your request, you should use the filter method to supply a start or end argument.

Keep in mind that filter will return a new object so you can keep multiple filtered StreamSets in memory while you explore your data. The filter method may be called multiple times but only the final values will be used when it is time to fulfill the request by the server.

from btrdb.utils.timez import currently_as_ns, to_nanoseconds

streams = conn.streams(*UUIDs)

start = to_nanoseconds(datetime(2016, 1, 1, 0, 0, 0))
end = to_nanoseconds(datetime(2016, 1, 3, 12, 0, 0))

# replace instance with a filtered version from 1/1/2016 00:00:00 to
# 1/3/2016 12:00:00
streams = streams.filter(start=start, end=end)

# create a new instance with epoch as start and the current time as
# the end parameters
alt = streams.filter(start=0, end=currently_as_ns())

Aside from filtering results at query execution, you may also filter the streams that should be included in the new object. For instance, you may wish to create a new StreamSet containing only voltage streams or only from a specific collection.

To filter the available streams, you may provide a collection, name, or unit argument. If you provide a string, then a case-insensitive exact match will be used to select the desired streams. You may instead provide a compiled regex expression which will be used with re.search to choose the streams to include.

# select only voltage streams
voltage_streams = streams.filter(unit="volts")

# select only voltage or amperage streams using regex pattern
other_streams = streams.filter(unit=re.compile("volts|amps"))

Retrieving Data#

There are three options available when you are ready to process the data from the server. All options are fully materialized but are organized in different ways according to what is more convenient for you.

StreamSet.values()#

Calling the values method will materialize the streams using the filtering parameters you specified. The data will be returned to you as a list of lists. Each member list contains tuples of RawPoint, int for the data and stream version.

This method aligns data by stream so you can easily deal with all of the data on a stream by stream basis. The following example shows a toy dataset which consists of 4 streams.

streams.values()
>>[[RawPoint(1500000000100000000, 2.0),
>>  RawPoint(1500000000300000000, 4.0),
>>  RawPoint(1500000000500000000, 6.0),
>>  RawPoint(1500000000700000000, 8.0),
>>  RawPoint(1500000000900000000, 10.0)],
>> [RawPoint(1500000000000000000, 1.0),
>>  RawPoint(1500000000200000000, 3.0),
>>  RawPoint(1500000000400000000, 5.0),
>>  RawPoint(1500000000600000000, 7.0),
>>  RawPoint(1500000000800000000, 9.0)],
>> [RawPoint(1500000000000000000, 1.0),
>>  RawPoint(1500000000100000000, 2.0),
>>  RawPoint(1500000000300000000, 4.0),
>>  RawPoint(1500000000400000000, 5.0),
>>  RawPoint(1500000000600000000, 7.0),
>>  RawPoint(1500000000700000000, 8.0),
>>  RawPoint(1500000000800000000, 9.0),
>>  RawPoint(1500000000900000000, 10.0)],
>> [RawPoint(1500000000000000000, 1.0),
>>  RawPoint(1500000000100000000, 2.0),
>>  RawPoint(1500000000200000000, 3.0),
>>  RawPoint(1500000000300000000, 4.0),
>>  RawPoint(1500000000400000000, 5.0),
>>  RawPoint(1500000000500000000, 6.0),
>>  RawPoint(1500000000600000000, 7.0),
>>  RawPoint(1500000000700000000, 8.0),
>>  RawPoint(1500000000800000000, 9.0),
>>  RawPoint(1500000000900000000, 10.0)]]

StreamSet.rows()#

By contrast, the rows method aligns data by time rather than by stream. Each row of data contains points for a specific time with the None value used if a given stream does not contain a value at that time.

Stream data is ordered according to the order of the initial UUIDs that were used when creating the StreamSet.

for row in streams.rows():
    print(row)
>> (None, RawPoint(1500000000000000000, 1.0), RawPoint(1500000000000000000, 1.0), RawPoint(1500000000000000000, 1.0))
>> (RawPoint(1500000000100000000, 2.0), None, RawPoint(1500000000100000000, 2.0), RawPoint(1500000000100000000, 2.0))
>> (None, RawPoint(1500000000200000000, 3.0), None, RawPoint(1500000000200000000, 3.0))
>> (RawPoint(1500000000300000000, 4.0), None, RawPoint(1500000000300000000, 4.0), RawPoint(1500000000300000000, 4.0))
>> (None, RawPoint(1500000000400000000, 5.0), RawPoint(1500000000400000000, 5.0), RawPoint(1500000000400000000, 5.0))
>> (RawPoint(1500000000500000000, 6.0), None, None, RawPoint(1500000000500000000, 6.0))
>> (None, RawPoint(1500000000600000000, 7.0), RawPoint(1500000000600000000, 7.0), RawPoint(1500000000600000000, 7.0))
>> (RawPoint(1500000000700000000, 8.0), None, RawPoint(1500000000700000000, 8.0), RawPoint(1500000000700000000, 8.0))
>> (None, RawPoint(1500000000800000000, 9.0), RawPoint(1500000000800000000, 9.0), RawPoint(1500000000800000000, 9.0))
>> (RawPoint(1500000000900000000, 10.0), None, RawPoint(1500000000900000000, 10.0), RawPoint(1500000000900000000, 10.0))

Transforming to Other Formats#

A number of transformation features have been added so that you can work in the tools and APIs you are most comfortable and productive with. At the moment, we support the numpy and pandas libraries if you have them installed and available to be imported.

Keep in mind that calling these methods will materialize the requested data in memory. A few examples follow but please visit the API documentation to see the full list of transformation methods available.

# materialize data as tuple of numpy arrays
conn.streams(*UUIDs).filter(start, end).to_array()
>> (array([RawPoint(1500000000100000000, 2.0),
>>        RawPoint(1500000000300000000, 4.0),
>>        RawPoint(1500000000500000000, 6.0),
>>        RawPoint(1500000000700000000, 8.0),
>>        RawPoint(1500000000900000000, 10.0)], dtype=object),
>>  array([RawPoint(1500000000000000000, 1.0),
>>        RawPoint(1500000000200000000, 3.0),
>>        RawPoint(1500000000400000000, 5.0),
>>        RawPoint(1500000000600000000, 7.0),
>>        RawPoint(1500000000800000000, 9.0)], dtype=object),
>> ...

# materialize data as list of pandas Series
conn.streams(*UUIDs).filter(start, end).to_series()
>> [1500000000100000000     2.0
>> 1500000000300000000     4.0
>> 1500000000500000000     6.0
>> 1500000000700000000     8.0
>> 1500000000900000000    10.0
>> dtype: float64,
>>  1500000000000000000    1.0
>> 1500000000200000000    3.0
>> 1500000000400000000    5.0
>> 1500000000600000000    7.0
>> 1500000000800000000    9.0
>> dtype: float64,
>> ...

# materialize data as pandas DataFrame
conn.streams(*UUIDs).filter(start, end).to_dataframe()
>>                   time         sensors/stream0         sensors/stream1
>> 0  1500000000000000000                     NaN                     1.0
>> 1  1500000000100000000                     2.0                     NaN
>> 2  1500000000200000000                     NaN                     3.0
>> 3  1500000000300000000                     4.0                     NaN
>> 4  1500000000400000000                     NaN                     5.0
>> 5  1500000000500000000                     6.0                     NaN
>> 6  1500000000600000000                     NaN                     7.0
>> 7  1500000000700000000                     8.0                     NaN
>> 8  1500000000800000000                     NaN                     9.0
>> 9  1500000000900000000                    10.0                     NaN

Serializing Data#

If you would like to save your data to disk for later use or to import into another program, we have several options available with more planned in the future.

Most serialization methods will save to disk however there is also a to_table method which produces a tabular view of your data as a string for display or printing. Some examples are shown below.

# export data and save as CSV
streams.to_csv("export.csv")

# convert table of data as a string
print(streams.to_table())
>>                time    sensors/stream0    sensors/stream1
>> -------------------  -----------------  -----------------
>> 1500000000000000000                                     1
>> 1500000000100000000                  2
>> 1500000000200000000                                     3
>> 1500000000300000000                  4
>> 1500000000400000000                                     5
>> 1500000000500000000                  6
>> 1500000000600000000                                     7
>> 1500000000700000000                  8
>> 1500000000800000000                                     9
>> 1500000000900000000                 10