Working with StreamSets#
Often you will want to query and work with multiple streams instead of just an individual stream - StreamSets allow you to do this effectively. It is a light wrapper around a list of Stream objects with convenience methods provided to help you work with multiple streams of data.
Creating a StreamSet#
Creating a StreamSet
is relatively simple assuming you have a UUID
for each
stream that should be a member. In the future, other options may exist such as
providing collection or tag matching parameters.
UUIDs = [
uuid.UUID('0d22a53b-e2ef-4e0a-ab89-b2d48fb2592a'),
uuid.UUID('17dbe387-89ea-42b6-864b-f505cdb483f5'),
uuid.UUID('71466a91-dcfe-42ea-9e88-87c51f847942'),
uuid.UUID('570aa71d-fb4f-456f-8533-2b11a28fa1f5')
]
streams = conn.streams(*UUIDs)
If you’ve already obtained a list of Stream
objects, you may create
a StreamSet directly by providing a list of streams for initialization.
streams = StreamSet([stream1, stream2, stream3])
Filtering#
To apply query parameters to your request, you should use the filter
method to supply a start
or end
argument.
Keep in mind that filter
will return a new object so you can keep
multiple filtered StreamSets in memory while you explore your data. The
filter
method may be called multiple times but only the final values
will be used when it is time to fulfill the request by the server.
from btrdb.utils.timez import currently_as_ns, to_nanoseconds
streams = conn.streams(*UUIDs)
start = to_nanoseconds(datetime(2016, 1, 1, 0, 0, 0))
end = to_nanoseconds(datetime(2016, 1, 3, 12, 0, 0))
# replace instance with a filtered version from 1/1/2016 00:00:00 to
# 1/3/2016 12:00:00
streams = streams.filter(start=start, end=end)
# create a new instance with epoch as start and the current time as
# the end parameters
alt = streams.filter(start=0, end=currently_as_ns())
Aside from filtering results at query execution, you may also filter the streams that should be included in the new object. For instance, you may wish to create a new StreamSet containing only voltage streams or only from a specific collection.
To filter the available streams, you may provide a collection
,
name
, or unit
argument. If you provide a string, then a
case-insensitive exact match will be used to select the desired streams. You
may instead provide a compiled regex expression which will be used with
re.search to choose the streams to include.
# select only voltage streams
voltage_streams = streams.filter(unit="volts")
# select only voltage or amperage streams using regex pattern
other_streams = streams.filter(unit=re.compile("volts|amps"))
Retrieving Data#
There are three options available when you are ready to process the data from the server. All options are fully materialized but are organized in different ways according to what is more convenient for you.
StreamSet.values()#
Calling the values
method will materialize the streams using the
filtering parameters you specified. The data will be returned to you as a list
of lists. Each member list contains tuples of RawPoint
, int
for
the data and stream version.
This method aligns data by stream so you can easily deal with all of the data on a stream by stream basis. The following example shows a toy dataset which consists of 4 streams.
streams.values()
>>[[RawPoint(1500000000100000000, 2.0),
>> RawPoint(1500000000300000000, 4.0),
>> RawPoint(1500000000500000000, 6.0),
>> RawPoint(1500000000700000000, 8.0),
>> RawPoint(1500000000900000000, 10.0)],
>> [RawPoint(1500000000000000000, 1.0),
>> RawPoint(1500000000200000000, 3.0),
>> RawPoint(1500000000400000000, 5.0),
>> RawPoint(1500000000600000000, 7.0),
>> RawPoint(1500000000800000000, 9.0)],
>> [RawPoint(1500000000000000000, 1.0),
>> RawPoint(1500000000100000000, 2.0),
>> RawPoint(1500000000300000000, 4.0),
>> RawPoint(1500000000400000000, 5.0),
>> RawPoint(1500000000600000000, 7.0),
>> RawPoint(1500000000700000000, 8.0),
>> RawPoint(1500000000800000000, 9.0),
>> RawPoint(1500000000900000000, 10.0)],
>> [RawPoint(1500000000000000000, 1.0),
>> RawPoint(1500000000100000000, 2.0),
>> RawPoint(1500000000200000000, 3.0),
>> RawPoint(1500000000300000000, 4.0),
>> RawPoint(1500000000400000000, 5.0),
>> RawPoint(1500000000500000000, 6.0),
>> RawPoint(1500000000600000000, 7.0),
>> RawPoint(1500000000700000000, 8.0),
>> RawPoint(1500000000800000000, 9.0),
>> RawPoint(1500000000900000000, 10.0)]]
StreamSet.rows()#
By contrast, the rows
method aligns data by time rather than by stream.
Each row of data contains points for a specific time with the
None
value used if a given stream does not contain a value at that time.
Stream data is ordered according to the order of the initial UUIDs that were used when creating the StreamSet.
for row in streams.rows():
print(row)
>> (None, RawPoint(1500000000000000000, 1.0), RawPoint(1500000000000000000, 1.0), RawPoint(1500000000000000000, 1.0))
>> (RawPoint(1500000000100000000, 2.0), None, RawPoint(1500000000100000000, 2.0), RawPoint(1500000000100000000, 2.0))
>> (None, RawPoint(1500000000200000000, 3.0), None, RawPoint(1500000000200000000, 3.0))
>> (RawPoint(1500000000300000000, 4.0), None, RawPoint(1500000000300000000, 4.0), RawPoint(1500000000300000000, 4.0))
>> (None, RawPoint(1500000000400000000, 5.0), RawPoint(1500000000400000000, 5.0), RawPoint(1500000000400000000, 5.0))
>> (RawPoint(1500000000500000000, 6.0), None, None, RawPoint(1500000000500000000, 6.0))
>> (None, RawPoint(1500000000600000000, 7.0), RawPoint(1500000000600000000, 7.0), RawPoint(1500000000600000000, 7.0))
>> (RawPoint(1500000000700000000, 8.0), None, RawPoint(1500000000700000000, 8.0), RawPoint(1500000000700000000, 8.0))
>> (None, RawPoint(1500000000800000000, 9.0), RawPoint(1500000000800000000, 9.0), RawPoint(1500000000800000000, 9.0))
>> (RawPoint(1500000000900000000, 10.0), None, RawPoint(1500000000900000000, 10.0), RawPoint(1500000000900000000, 10.0))
Transforming to Other Formats#
A number of transformation features have been added so that you can work in the tools and APIs you are most comfortable and productive with. At the moment, we support the numpy and pandas libraries if you have them installed and available to be imported.
Keep in mind that calling these methods will materialize the requested data in memory. A few examples follow but please visit the API documentation to see the full list of transformation methods available.
# materialize data as tuple of numpy arrays
conn.streams(*UUIDs).filter(start, end).to_array()
>> (array([RawPoint(1500000000100000000, 2.0),
>> RawPoint(1500000000300000000, 4.0),
>> RawPoint(1500000000500000000, 6.0),
>> RawPoint(1500000000700000000, 8.0),
>> RawPoint(1500000000900000000, 10.0)], dtype=object),
>> array([RawPoint(1500000000000000000, 1.0),
>> RawPoint(1500000000200000000, 3.0),
>> RawPoint(1500000000400000000, 5.0),
>> RawPoint(1500000000600000000, 7.0),
>> RawPoint(1500000000800000000, 9.0)], dtype=object),
>> ...
# materialize data as list of pandas Series
conn.streams(*UUIDs).filter(start, end).to_series()
>> [1500000000100000000 2.0
>> 1500000000300000000 4.0
>> 1500000000500000000 6.0
>> 1500000000700000000 8.0
>> 1500000000900000000 10.0
>> dtype: float64,
>> 1500000000000000000 1.0
>> 1500000000200000000 3.0
>> 1500000000400000000 5.0
>> 1500000000600000000 7.0
>> 1500000000800000000 9.0
>> dtype: float64,
>> ...
# materialize data as pandas DataFrame
conn.streams(*UUIDs).filter(start, end).to_dataframe()
>> time sensors/stream0 sensors/stream1
>> 0 1500000000000000000 NaN 1.0
>> 1 1500000000100000000 2.0 NaN
>> 2 1500000000200000000 NaN 3.0
>> 3 1500000000300000000 4.0 NaN
>> 4 1500000000400000000 NaN 5.0
>> 5 1500000000500000000 6.0 NaN
>> 6 1500000000600000000 NaN 7.0
>> 7 1500000000700000000 8.0 NaN
>> 8 1500000000800000000 NaN 9.0
>> 9 1500000000900000000 10.0 NaN
Serializing Data#
If you would like to save your data to disk for later use or to import into another program, we have several options available with more planned in the future.
Most serialization methods will save to disk however there is also a
to_table
method which produces a tabular view of your data as a string for
display or printing. Some examples are shown below.
# export data and save as CSV
streams.to_csv("export.csv")
# convert table of data as a string
print(streams.to_table())
>> time sensors/stream0 sensors/stream1
>> ------------------- ----------------- -----------------
>> 1500000000000000000 1
>> 1500000000100000000 2
>> 1500000000200000000 3
>> 1500000000300000000 4
>> 1500000000400000000 5
>> 1500000000500000000 6
>> 1500000000600000000 7
>> 1500000000700000000 8
>> 1500000000800000000 9
>> 1500000000900000000 10