btrdb.stream#

Module for Stream and related classes

class btrdb.stream.Stream(btrdb, uuid, **db_values)#

An object that represents a specific time series stream in the BTrDB database.

Parameters:
  • btrdb (BTrDB) – A reference to the BTrDB object connecting this stream back to the physical server.

  • uuid (UUID) – The unique UUID identifier for this stream.

  • db_values (kwargs) – Framework only initialization arguments. Not for developer use.

Attributes:
btrdb

Returns the stream’s BTrDB object.

collection

Returns the collection of the stream.

name

Returns the stream’s name which is parsed from the stream tags.

unit

Returns the stream’s unit which is parsed from the stream tags.

uuid

Returns the stream’s UUID.

Methods

aligned_windows(start, end, pointwidth[, ...])

Read statistical aggregates of windows of data from BTrDB.

annotations([refresh, auto_retry, retries, ...])

Returns a stream's annotations

arrow_aligned_windows(start, end, pointwidth)

Read statistical aggregates of windows of data from BTrDB.

arrow_insert(data[, merge])

Insert new data in the form of a pyarrow Table with (time, value) columns.

arrow_values(start, end[, version, ...])

Read raw values from BTrDB between time [a, b) in nanoseconds.

arrow_windows(start, end, width[, version, ...])

Read arbitrarily-sized windows of data from BTrDB.

count([start, end, pointwidth, precise, version])

Compute the total number of points in the stream

current([version, auto_retry, retries, ...])

Returns the point that is closest to the current timestamp, e.g. the latest point in the stream up until now.

delete(start, end[, auto_retry, retries, ...])

"Delete" all points between [start, end)

earliest([version, auto_retry, retries, ...])

Returns the first point of data in the stream.

exists()

Check if stream exists

flush([auto_retry, retries, retry_delay, ...])

Flush writes the stream buffers out to persistent storage.

insert(data[, merge])

Insert new data in the form (time, value) into the series.

latest([version, auto_retry, retries, ...])

Returns last point of data in the stream.

nearest(time, version[, backward, ...])

Finds the closest point in the stream to a specified time.

obliterate([auto_retry, retries, ...])

Obliterate deletes a stream from the BTrDB server.

refresh_metadata()

Refreshes the locally cached metadata for a stream from the server.

tags([refresh, auto_retry, retries, ...])

Returns the stream's tags.

update([tags, annotations, collection, ...])

Updates metadata including tags, annotations, and collection as an UPSERT operation.

values(start, end[, version, auto_retry, ...])

Read raw values from BTrDB between time [a, b) in nanoseconds.

version([auto_retry, retries, retry_delay, ...])

Returns the current data version of the stream.

windows(start, end, width[, depth, version, ...])

Read arbitrarily-sized windows of data from BTrDB.

aligned_windows(start, end, pointwidth, version=0, auto_retry=False, retries=5, retry_delay=3, retry_backoff=4)#

Read statistical aggregates of windows of data from BTrDB.

Query BTrDB for aggregates (or roll ups or windows) of the time series with version between time start (inclusive) and end (exclusive) in nanoseconds. Each point returned is a statistical aggregate of all the raw data within a window of width 2**pointwidth nanoseconds. These statistical aggregates currently include the mean, minimum, and maximum of the data and the count of data points composing the window.

start is inclusive, but end is exclusive. That is, results will be returned for all windows that start in the interval [start, end). If end < start+2^pointwidth you will not get any results. If start and end are not powers of two, the bottom pointwidth bits will be cleared. Each window will contain statistical summaries of the window. Statistical points with count == 0 will be omitted.

Parameters:
  • start (int or datetime like object) – The start time in nanoseconds for the range to be queried. (see btrdb.utils.timez.to_nanoseconds() for valid input types)

  • end (int or datetime like object) – The end time in nanoseconds for the range to be queried. (see btrdb.utils.timez.to_nanoseconds() for valid input types)

  • pointwidth (int) – Specify the number of ns between data points (2**pointwidth)

  • version (int) – Version of the stream to query

  • auto_retry (bool, default: False) – Whether to retry this request in the event of an error

  • retries (int, default: 5) – Number of times to retry this request if there is an error. Will be ignored if auto_retry is False

  • retry_delay (int, default: 3) – initial time to wait before retrying function call if there is an error. Will be ignored if auto_retry is False

  • retry_backoff (int, default: 4) – Exponential factor by which the backoff increases between retries. Will be ignored if auto_retry is False

Returns:

Returns a tuple containing windows of data. Each window is a tuple containing data tuples. Each data tuple contains a StatPoint and the stream version.

Return type:

tuple

Note

As the window-width is a power-of-two, it aligns with BTrDB internal tree data structure and is faster to execute than windows().

annotations(refresh=False, auto_retry=False, retries=5, retry_delay=3, retry_backoff=4)#

Returns a stream’s annotations

Annotations returns the annotations of the stream (and the annotation version).

Do not modify the resulting map.

Parameters:
  • refresh (bool, default: False) – Indicates whether a round trip to the server should be implemented regardless of whether there is a local copy.

  • auto_retry (bool, default: False) – Whether to retry this request in the event of an error

  • retries (int, default: 5) – Number of times to retry this request if there is an error. Will be ignored if auto_retry is False

  • retry_delay (int, default: 3) – initial time to wait before retrying function call if there is an error. Will be ignored if auto_retry is False

  • retry_backoff (int, default: 4) – Exponential factor by which the backoff increases between retries. Will be ignored if auto_retry is False

Returns:

A tuple containing a dictionary of annotations and an integer representing the version of the metadata (tuple(dict, int)).

Return type:

tuple

Note

This version is not the same as the stream.version.

Examples

Accessing a streams annotations.

>>> stream.annotations()
({"foo":"bar", "baz":"bazaar"}, 231)

Extract the version and metadata separately.

>>> annotations, metadata_version = stream.annotations()
>>> annotations
{"foo":"bar", "baz":"bazaar"}
>>> metadata_version
231
arrow_aligned_windows(start: int, end: int, pointwidth: int, version: int = 0, auto_retry=False, retries=5, retry_delay=3, retry_backoff=4) Table#

Read statistical aggregates of windows of data from BTrDB.

Query BTrDB for aggregates (or roll ups or windows) of the time series with version between time start (inclusive) and end (exclusive) in nanoseconds [start, end). Each point returned is a statistical aggregate of all the raw data within a window of width 2**`pointwidth` nanoseconds. These statistical aggregates currently include the mean, minimum, and maximum of the data and the count of data points composing the window.

Note

start is inclusive, but end is exclusive. That is, results will be returned for all windows that start in the interval [start, end). If end < start+2^pointwidth you will not get any results. If start and end are not powers of two, the bottom pointwidth bits will be cleared. Each window will contain statistical summaries of the window. Statistical points with count == 0 will be omitted.

Parameters:
  • start (int or datetime like object, required) – The start time in nanoseconds for the range to be queried. (see :func:btrdb.utils.timez.to_nanoseconds for valid input types)

  • end (int or datetime like object, required) – The end time in nanoseconds for the range to be queried. (see :func:btrdb.utils.timez.to_nanoseconds for valid input types)

  • pointwidth (int, required) – Specify the number of ns between data points (2**pointwidth)

  • version (int, default: 0) – Version of the stream to query

  • auto_retry (bool, default: False) – Whether to retry this request in the event of an error

  • retries (int, default: 5) – Number of times to retry this request if there is an error. Will be ignored if auto_retry is False

  • retry_delay (int, default: 3) – initial time to wait before retrying function call if there is an error. Will be ignored if auto_retry is False

  • retry_backoff (int, default: 4) – Exponential factor by which the backoff increases between retries. Will be ignored if auto_retry is False

Returns:

Returns a pyarrow table containing the windows of data.

Return type:

pyarrow.Table

Note

As the window-width is a power-of-two, it aligns with BTrDB internal tree data structure and is faster to execute than windows().

Note

This method is available for commercial customers with arrow-enabled servers.

arrow_insert(data: Table, merge: str = 'never') int#

Insert new data in the form of a pyarrow Table with (time, value) columns.

Inserts a table of new (time, value) columns into the stream. The values in the table need not be sorted by time. If the arrays are larger than appropriate, this function will automatically chunk the inserts. As a consequence, the insert is not necessarily atomic, but can be used with a very large array.

Parameters:
  • data (pyarrow.Table, required) – A pyarrow table with a schema of time:Timestamp[ns, tz=UTC], value:float64 This schema will be validated and converted if necessary.

  • merge (str) –

    A string describing the merge policy. Valid policies are:
    • ’never’: the default, no points are merged

    • ’equal’: points are deduplicated if the time and value are equal

    • ’retain’: if two points have the same timestamp, the old one is kept

    • ’replace’: if two points have the same timestamp, the new one is kept

Returns:

The version of the stream after inserting new points.

Return type:

int

Note

This method is available for commercial customers with arrow-enabled servers.

Examples

Assuming we have a sequence of times and values where times are in nanoseconds. Insert the data as a pyarrow table, and if there are duplicate timestamps already in the database, replace with the new ones in payload.

>>> conn = btrdb.connect()
>>> import pyarrow as pa
>>> for t, v in zip(times, vals):
...     print(t,v)
1500000000000000000 1.0
1500000000100000000 2.0
1500000000200000000 3.0
1500000000300000000 4.0
1500000000400000000 5.0
1500000000500000000 6.0
1500000000600000000 7.0
1500000000700000000 8.0
1500000000800000000 9.0
1500000000900000000 10.0
>>> schema = pa.schema(
... [
...    pa.field("time", pa.timestamp("ns", tz="UTC"), nullable=False),
...    pa.field("value", pa.float64(), nullable=False),
... ]
... )
>>> payload = pa.Table.from_arrays([times, vals], schema=schema)
>>> version = stream.arrow_insert(payload, merge="replace")
arrow_values(start, end, version: int = 0, auto_retry=False, retries=5, retry_delay=3, retry_backoff=4, schema=None) Table#

Read raw values from BTrDB between time [a, b) in nanoseconds.

RawValues queries BTrDB for the raw time series data points between start and end time, both in nanoseconds since the Epoch for the specified stream version.

startint or datetime like object

The start time in nanoseconds for the range to be queried. (see btrdb.utils.timez.to_nanoseconds() for valid input types)

endint or datetime like object

The end time in nanoseconds for the range to be queried. (see btrdb.utils.timez.to_nanoseconds() for valid input types)

version: int, default: 0

The version of the stream to be queried

schema: pyarrow.Schema

Optional arrow schema the server will cast the returned data to before sending it over the network. You can use this to change the timestamp format, column names or data sizes.

auto_retry: bool, default: False

Whether to retry this request in the event of an error

retries: int, default: 5

Number of times to retry this request if there is an error. Will be ignored if auto_retry is False

retry_delay: int, default: 3

initial time to wait before retrying function call if there is an error. Will be ignored if auto_retry is False

retry_backoff: int, default: 4

Exponential factor by which the backoff increases between retries. Will be ignored if auto_retry is False

Returns:

A pyarrow table of the raw values with time and value columns.

Return type:

pyarrow.Table

Note

Note that the raw data points are the original values at the sensor’s native sampling rate (assuming the time series represents measurements from a sensor). This is the lowest level of data with the finest time granularity. In the tree data structure of BTrDB, this data is stored in the vector nodes.

Note

This method is available for commercial customers with arrow-enabled servers.

arrow_windows(start: int, end: int, width: int, version: int = 0, auto_retry=False, retries=5, retry_delay=3, retry_backoff=4) Table#

Read arbitrarily-sized windows of data from BTrDB.

Parameters:
  • start (int or datetime like object, required) – The start time in nanoseconds for the range to be queried. (see btrdb.utils.timez.to_nanoseconds() for valid input types)

  • end (int or datetime like object, required) – The end time in nanoseconds for the range to be queried. (see btrdb.utils.timez.to_nanoseconds() for valid input types)

  • width (int, required) – The number of nanoseconds in each window.

  • version (int, default=0, optional) – The version of the stream to query.

  • auto_retry (bool, default: False) – Whether to retry this request in the event of an error

  • retries (int, default: 5) – Number of times to retry this request if there is an error. Will be ignored if auto_retry is False

  • retry_delay (int, default: 3) – initial time to wait before retrying function call if there is an error. Will be ignored if auto_retry is False

  • retry_backoff (int, default: 4) – Exponential factor by which the backoff increases between retries. Will be ignored if auto_retry is False

Returns:

Returns a pyarrow Table containing windows of data.

Return type:

pyarrow.Table

Note

windows returns arbitrary precision windows from BTrDB. It is slower than aligned_windows, but still significantly faster than RawValues. Each returned window will be width nanoseconds long. start is inclusive, but end is exclusive (e.g if end < start+width you will get no results). That is, results will be returned for all windows that start at a time less than the end timestamp. If (end - start) is not a multiple of width, then end will be decreased to the greatest value less than end such that (end - start) is a multiple of width (i.e., we set end = start + width * floordiv(end - start, width). The depth parameter previously available has been deprecated. The only valid value for depth is now 0.

Note

This method is available for commercial customers with arrow-enabled servers.

property btrdb#

Returns the stream’s BTrDB object.

Parameters:
None
Returns:
BTrDB

The BTrDB database object.

Examples

>>> import btrdb
>>> conn = btrdb.connect()
>>> stream = conn.stream_from_uuid("...")
>>> btrdb_obj = stream.btrdb
>>> btrdb_obj
<btrdb.conn.BTrDB object at 0x...>
property collection#

Returns the collection of the stream. It may require a round trip to the server depending on how the stream was acquired.

Parameters:

None

Returns:

the collection of the stream

Return type:

str

Examples

>>> import btrdb
>>> conn = btrdb.connect()
>>> stream = conn.stream_from_uuid("...")
>>> stream.collection
'foo/bar'
count(start=-1152921504606846976, end=3458764513820540927, pointwidth=62, precise=False, version=0)#

Compute the total number of points in the stream

Counts the number of points in the specified window and version. By default, returns the latest total count of points in the stream.

Parameters:
  • start (int or datetime like object, default: MINIMUM_TIME) – The start time in nanoseconds for the range to be queried. (see btrdb.utils.timez.to_nanoseconds() for valid input types)

  • end (int or datetime like object, default: MAXIMUM_TIME) – The end time in nanoseconds for the range to be queried. (see btrdb.utils.timez.to_nanoseconds() for valid input types)

  • pointwidth (int, default: 62) – Specify the number of ns between data points (2**pointwidth). If the value is too large for the time window than the next smallest, appropriate pointwidth will be used.

  • precise (bool, default: False) – Forces the use of a windows query instead of aligned_windows to determine exact count down to the nanosecond. This will be some amount slower than the aligned_windows version.

  • version (int, default: 0) – Version of the stream to query

Returns:

The total number of points in the stream for the specified window.

Return type:

int

Note

This helper method sums the counts of all StatPoints returned by aligned_windows. Because of this, note that the start and end timestamps may be adjusted if they are not powers of 2. For smaller windows of time, you may also need to adjust the pointwidth to ensure that the count granularity is captured appropriately.

Alternatively you can set the precise argument to True which will give an exact count to the nanosecond but may be slower to execute.

Examples

>>> import btrdb
>>> conn = btrdb.connect()
>>> stream = conn.stream_from_uuid("...")
>>> stream.count()
1234
>>> stream.count(start=1500000000000000000, end=1603680000000000000, pointwidth=55)
567
>>> stream.count(start=1500000000000000000, end=1603680000000000000, precise=True)
789
current(version=0, auto_retry=False, retries=5, retry_delay=3, retry_backoff=4)#

Returns the point that is closest to the current timestamp, e.g. the latest point in the stream up until now. Note that no future values will be returned. Returns None if errors occur during lookup or there are no values before now.

Parameters:
  • version (int, default: 0) – Specify the version of the stream to query; if zero, queries the latest stream state rather than pinning to a version.

  • auto_retry (bool, default: False) – Whether to retry this request in the event of an error

  • retries (int, default: 5) – Number of times to retry this request if there is an error. Will be ignored if auto_retry is False

  • retry_delay (int, default: 3) – initial time to wait before retrying function call if there is an error. Will be ignored if auto_retry is False

  • retry_backoff (int, default: 4) – Exponential factor by which the backoff increases between retries. Will be ignored if auto_retry is False

Returns:

The last data point in the stream up until now and the version of the stream the value was retrieved at (tuple(RawPoint, int)).

Return type:

tuple

delete(start, end, auto_retry=False, retries=5, retry_delay=3, retry_backoff=4)#

“Delete” all points between [start, end)

“Delete” all points between start (inclusive) and end (exclusive), both in nanoseconds.

Note

As BTrDB has persistent multiversioning, the deleted points will still exist as part of an older version of the stream.

Parameters:
  • start (int or datetime like object) – The start time in nanoseconds for the range to be deleted. (see :func:btrdb.utils.timez.to_nanoseconds for valid input types)

  • end (int or datetime like object) – The end time in nanoseconds for the range to be deleted. (see :func:btrdb.utils.timez.to_nanoseconds for valid input types)

  • auto_retry (bool, default: False) – Whether to retry this request in the event of an error

  • retries (int, default: 5) – Number of times to retry this request if there is an error. Will be ignored if auto_retry is False

  • retry_delay (int, default: 3) – initial time to wait before retrying function call if there is an error. Will be ignored if auto_retry is False

  • retry_backoff (int, default: 4) – Exponential factor by which the backoff increases between retries. Will be ignored if auto_retry is False

Returns:

The version of the new stream created

Return type:

int

Examples

>>> import btrdb
>>> conn = btrdb.connect()
>>> stream = conn.stream_from_uuid("...")
>>> start = 1500000000000000000
>>> end = 1500000001000000000
>>> stream.delete(start, end)
1234
>>> stream.count(start=start, end=end)
0
earliest(version=0, auto_retry=False, retries=5, retry_delay=3, retry_backoff=4)#

Returns the first point of data in the stream. Returns None if error encountered during lookup or no values in stream.

Parameters:
  • version (int, default: 0) – Specify the version of the stream to query; if zero, queries the latest stream state rather than pinning to a version.

  • auto_retry (bool, default: False) – Whether to retry this request in the event of an error

  • retries (int, default: 5) – Number of times to retry this request if there is an error. Will be ignored if auto_retry is False

  • retry_delay (int, default: 3) – initial time to wait before retrying function call if there is an error. Will be ignored if auto_retry is False

  • retry_backoff (int, default: 4) – Exponential factor by which the backoff increases between retries. Will be ignored if auto_retry is False

Returns:

The first data point in the stream and the version of the stream the value was retrieved at (tuple(RawPoint, int)).

Return type:

tuple

Examples

Get the earliest point for a stream using version 0.

>>> stream.earliest(version=0)
(<btrdb.point.RawPoint at 0x...>, 1234567)

Extract just the RawPoint data.

>>> pt, _ = stream.earliest(version=0)
>>> print(pt.time, pt.value)
1547241923338098176 123.7
exists()#

Check if stream exists

Exists returns true if the stream exists. This is essential after using StreamFromUUID as the stream may not exist, causing a 404 error on later stream operations. Any operation that returns a stream from collection and tags will have ensured the stream exists already.

Parameters:

None

Returns:

Indicates whether stream is extant in the BTrDB server.

Return type:

bool

Examples

>>> import btrdb
>>> conn = btrdb.connect()
>>> stream = conn.stream_from_uuid("...")
>>> stream.uuid
UUID('...')
>>> stream.exists()
True
flush(auto_retry=False, retries=5, retry_delay=3, retry_backoff=4)#

Flush writes the stream buffers out to persistent storage.

Parameters:
  • auto_retry (bool, default: False) – Whether to retry this request in the event of an error

  • retries (int, default: 5) – Number of times to retry this request if there is an error. Will be ignored if auto_retry is False

  • retry_delay (int, default: 3) – initial time to wait before retrying function call if there is an error. Will be ignored if auto_retry is False

  • retry_backoff (int, default: 4) – Exponential factor by which the backoff increases between retries. Will be ignored if auto_retry is False

insert(data, merge='never')#

Insert new data in the form (time, value) into the series.

Inserts a list of new (time, value) tuples into the series. The tuples in the list need not be sorted by time. If the arrays are larger than appropriate, this function will automatically chunk the inserts. As a consequence, the insert is not necessarily atomic, but can be used with a very large array.

Parameters:
  • data (list[tuple[int, float]]) – A list of tuples in which each tuple contains a time (int) and value (float) for insertion to the database

  • merge (str) –

    A string describing the merge policy. Valid policies are:
    • ’never’: the default, no points are merged

    • ’equal’: points are deduplicated if the time and value are equal

    • ’retain’: if two points have the same timestamp, the old one is kept

    • ’replace’: if two points have the same timestamp, the new one is kept

Returns:

The version of the stream after inserting new points.

Return type:

int

Examples

>>> import btrdb
>>> conn = btrdb.connect()
>>> stream = conn.stream_from_uuid("...")
>>> data = [(1500000000000000000, 1.0), (1500000000100000000, 2.0)]
>>> stream.insert(data)
1234
>>> stream.insert(data, merge="replace")
1235
latest(version=0, auto_retry=False, retries=5, retry_delay=3, retry_backoff=4)#

Returns last point of data in the stream. Returns None if error encountered during lookup or no values in stream.

Parameters:
  • version (int, default: 0) – Specify the version of the stream to query; if zero, queries the latest stream state rather than pinning to a version.

  • auto_retry (bool, default: False) – Whether to retry this request in the event of an error

  • retries (int, default: 5) – Number of times to retry this request if there is an error. Will be ignored if auto_retry is False

  • retry_delay (int, default: 3) – initial time to wait before retrying function call if there is an error. Will be ignored if auto_retry is False

  • retry_backoff (int, default: 4) – Exponential factor by which the backoff increases between retries. Will be ignored if auto_retry is False

Returns:

The last data point in the stream and the version of the stream the value was retrieved at (tuple(RawPoint, int)).

Return type:

tuple

Examples

Get the latest point for a stream using version 0.

>>> stream.latest(version=0)
(<btrdb.point.RawPoint at 0x...>, 1234567)

Extract just the RawPoint data.

>>> pt, _ = stream.latest(version=0)
>>> print(pt.time, pt.value)
1547241923338098176 123.7
property name#

Returns the stream’s name which is parsed from the stream tags. This may require a round trip to the server depending on how the stream was acquired.

Returns:

The name of the stream.

Return type:

str

Examples

>>> import btrdb
>>> conn = btrdb.connect()
>>> stream = conn.stream_from_uuid("...")
>>> stream.name
'foo'
nearest(time, version, backward=False, auto_retry=False, retries=5, retry_delay=3, retry_backoff=4)#

Finds the closest point in the stream to a specified time.

Return the point nearest to the specified time in nanoseconds since Epoch in the stream with version while specifying whether to search forward or backward in time. If backward is false, the returned point will be >= time. If backward is true, the returned point will be < time. The version of the stream used to satisfy the query is returned.

Parameters:
  • time (int or datetime like object) – The time (in nanoseconds since Epoch) to search near (see :func:btrdb.utils.timez.to_nanoseconds for valid input types)

  • version (int) – Version of the stream to use in search

  • backward (boolean) – True to search backwards from time, else false for forward

  • auto_retry (bool, default: False) – Whether to retry this request in the event of an error

  • retries (int, default: 5) – Number of times to retry this request if there is an error. Will be ignored if auto_retry is False

  • retry_delay (int, default: 3) – initial time to wait before retrying function call if there is an error. Will be ignored if auto_retry is False

  • retry_backoff (int, default: 4) – Exponential factor by which the backoff increases between retries. Will be ignored if auto_retry is False

Returns:

The closest data point in the stream and the version of the stream the value was retrieved at (tuple(RawPoint, int)).

Return type:

tuple

obliterate(auto_retry=False, retries=5, retry_delay=3, retry_backoff=4)#

Obliterate deletes a stream from the BTrDB server. An exception will be raised if the stream could not be found.

Parameters:
  • auto_retry (bool, default: False) – Whether to retry this request in the event of an error

  • retries (int, default: 5) – Number of times to retry this request if there is an error. Will be ignored if auto_retry is False

  • retry_delay (int, default: 3) – initial time to wait before retrying function call if there is an error. Will be ignored if auto_retry is False

  • retry_backoff (int, default: 4) – Exponential factor by which the backoff increases between retries. Will be ignored if auto_retry is False

Raises:

BTrDBError [404] stream does not exist – The stream could not be found in BTrDB

refresh_metadata()#

Refreshes the locally cached metadata for a stream from the server.

Queries the BTrDB server for all stream metadata including collection, annotation, and tags. This method requires a round trip to the server.

tags(refresh=False, auto_retry=False, retries=5, retry_delay=3, retry_backoff=4)#

Returns the stream’s tags.

Tags returns the tags of the stream. It may require a round trip to the server depending on how the stream was acquired.

Parameters:
  • refresh (bool, default: False) – Indicates whether a round trip to the server should be implemented regardless of whether there is a local copy.

  • auto_retry (bool, default: False) – Whether to retry this request in the event of an error

  • retries (int, default: 5) – Number of times to retry this request if there is an error. Will be ignored if auto_retry is False

  • retry_delay (int, default: 3) – initial time to wait before retrying function call if there is an error. Will be ignored if auto_retry is False

  • retry_backoff (int, default: 4) – Exponential factor by which the backoff increases between retries. Will be ignored if auto_retry is False

Returns:

A dictionary containing the tags.

Return type:

dict

property unit#

Returns the stream’s unit which is parsed from the stream tags. This may require a round trip to the server depending on how the stream was acquired.

Returns:

The unit for values of the stream.

Return type:

str

Examples

>>> import btrdb
>>> conn = btrdb.connect()
>>> stream = conn.stream_from_uuid("...")
>>> stream.unit
'volts'
update(tags=None, annotations=None, collection=None, encoder=<class 'btrdb.utils.conversion.AnnotationEncoder'>, replace=False, auto_retry=False, retries=5, retry_delay=3, retry_backoff=4)#

Updates metadata including tags, annotations, and collection as an UPSERT operation.

By default, the update will only affect the keys and values in the specified tags and annotations dictionaries, inserting them if they don’t exist, or updating the value for the key if it does exist. If any of the update arguments (i.e. tags, annotations, collection) are None, they will remain unchanged in the database.

To delete either tags or annotations, you must specify exactly which keys and values you want set for the field and set replace=True.

This ensures that all the keys and values for the annotations are preserved except for the key to be deleted.

Parameters:
  • tags (dict, optional) – Specify the tag key/value pairs as a dictionary to upsert or replace. If None, the tags will remain unchanged in the database.

  • annotations (dict, optional) – Specify the annotations key/value pairs as a dictionary to upsert or replace. If None, the annotations will remain unchanged in the database.

  • collection (str, optional) – Specify a new collection for the stream. If None, the collection will remain unchanged.

  • encoder (json.JSONEncoder or None) – JSON encoder class to use for annotation serialization. Set to None to prevent JSON encoding of the annotations.

  • replace (bool, default: False) – Replace all annotations or tags with the specified dictionaries instead of performing the normal upsert operation. Specifying True is the only way to remove annotation keys.

  • auto_retry (bool, default: False) – Whether to retry this request in the event of an error

  • retries (int, default: 5) – Number of times to retry this request if there is an error. Will be ignored if auto_retry is False

  • retry_delay (int, default: 3) – initial time to wait before retrying function call if there is an error. Will be ignored if auto_retry is False

  • retry_backoff (int, default: 4) – Exponential factor by which the backoff increases between retries. Will be ignored if auto_retry is False

Returns:

The version of the metadata (separate from the version of the data) also known as the “property version”.

Return type:

int

Examples

>>> annotations, _ = stream.anotations()
>>> del annotations["key_to_delete"]
>>> stream.update(annotations=annotations, replace=True)
12345
>>> annotations, _ = stream.annotations()
>>> "key_to_delete" in annotations
False
property uuid#

Returns the stream’s UUID. The stream may or may not exist yet, depending on how the stream object was obtained.

Returns:

The unique identifier of the stream.

Return type:

UUID

See also

stream.exists

values(start, end, version=0, auto_retry=False, retries=5, retry_delay=3, retry_backoff=4)#

Read raw values from BTrDB between time [a, b) in nanoseconds.

RawValues queries BTrDB for the raw time series data points between start and end time, both in nanoseconds since the Epoch for the specified stream version.

Parameters:
  • start (int or datetime like object) – The start time in nanoseconds for the range to be queried. (see btrdb.utils.timez.to_nanoseconds() for valid input types)

  • end (int or datetime like object) – The end time in nanoseconds for the range to be queried. (see btrdb.utils.timez.to_nanoseconds() for valid input types)

  • version (int) – The version of the stream to be queried

  • auto_retry (bool, default: False) – Whether to retry this request in the event of an error

  • retries (int, default: 5) – Number of times to retry this request if there is an error. Will be ignored if auto_retry is False

  • retry_delay (int, default: 3) – initial time to wait before retrying function call if there is an error. Will be ignored if auto_retry is False

  • retry_backoff (int, default: 4) – Exponential factor by which the backoff increases between retries. Will be ignored if auto_retry is False

Returns:

Returns a list of tuples containing a RawPoint and the stream version (list(tuple(RawPoint,int))).

Return type:

list

Note

Note that the raw data points are the original values at the sensor’s native sampling rate (assuming the time series represents measurements from a sensor). This is the lowest level of data with the finest time granularity. In the tree data structure of BTrDB, this data is stored in the vector nodes.

version(auto_retry=False, retries=5, retry_delay=3, retry_backoff=4)#

Returns the current data version of the stream.

Warning

Version returns the current data version of the stream. This is not cached, it queries each time. Take care that you do not introduce races in your code by assuming this function will always return the same value.

Parameters:
  • auto_retry (bool, default: False) – Whether to retry this request in the event of an error

  • retries (int, default: 5) – Number of times to retry this request if there is an error. Will be ignored if auto_retry is False

  • retry_delay (int, default: 3) – initial time to wait before retrying function call if there is an error. Will be ignored if auto_retry is False

  • retry_backoff (int, default: 4) – Exponential factor by which the backoff increases between retries. Will be ignored if auto_retry is False

Returns:

The version of the stream.

Return type:

int

windows(start, end, width, depth=0, version=0, auto_retry=False, retries=5, retry_delay=3, retry_backoff=4)#

Read arbitrarily-sized windows of data from BTrDB. StatPoint objects will be returned representing the data for each window.

Parameters:
  • start (int or datetime like object) – The start time in nanoseconds for the range to be queried. (see btrdb.utils.timez.to_nanoseconds() for valid input types)

  • end (int or datetime like object) – The end time in nanoseconds for the range to be queried. (see btrdb.utils.timez.to_nanoseconds() for valid input types)

  • width (int) – The number of nanoseconds in each window.

  • version (int) – The version of the stream to query.

  • auto_retry (bool, default: False) – Whether to retry this request in the event of an error

  • retries (int, default: 5) – Number of times to retry this request if there is an error. Will be ignored if auto_retry is False

  • retry_delay (int, default: 3) – initial time to wait before retrying function call if there is an error. Will be ignored if auto_retry is False

  • retry_backoff (int, default: 4) – Exponential factor by which the backoff increases between retries. Will be ignored if auto_retry is False

Returns:

Returns a tuple containing windows of data. Each window is a tuple containing data tuples. Each data tuple contains a StatPoint and the stream version (tuple(tuple(StatPoint, int), …)).

Return type:

tuple

Note

windows returns arbitrary precision windows from BTrDB. It is slower than aligned_windows, but can be significantly faster than raw value queries (values). Each returned window will be width nanoseconds long. start is inclusive, but end is exclusive (e.g if end < start+width you will get no results). That is, results will be returned for all windows that start at a time less than the end timestamp. If (end - start) is not a multiple of width, then end will be decreased to the greatest value less than end such that (end - start) is a multiple of width (i.e., we set end = start + width * floordiv(end - start, width). The depth parameter previously available has been deprecated. The only valid value for depth is now 0.

class btrdb.stream.StreamSet(streams: List[Stream])#

Bases: StreamSetBase, StreamSetTransformer

Public class for a collection of streams

Attributes:
allow_window

Methods

aligned_windows(pointwidth)

Stores the request for an aligned windowing operation when the query is eventually materialized.

arrow_insert(data_map[, merge])

Insert new data in the form (time, value) into their mapped streams using pyarrow tables.

arrow_to_arrow_table()

Return a pyarrow table of data.

arrow_to_dataframe([agg, name_callable])

Returns a Pandas DataFrame object indexed by time and using the values of a stream for each column.

arrow_to_dict([agg, name_callable])

Returns a list of dicts for each time code with the appropriate stream data attached.

arrow_to_numpy([agg])

Return a multidimensional array in the numpy format.

arrow_to_polars([agg, name_callable])

Returns a Polars DataFrame object with time as a column and the values of a stream for each additional column from an arrow table.

arrow_to_series([agg, name_callable])

Returns a list of Pandas Series objects indexed by time

arrow_values()

Return a pyarrow table of stream values based on the streamset parameters.

clone()

Returns a deep copy of the object.

count([precise])

Compute the total number of points in the streams using filters.

current()

Returns the points of data in the streams closest to the current timestamp.

earliest()

Returns earliest points of data in streams using available filters.

filter([start, end, collection, name, unit, ...])

Provides a new StreamSet instance containing stored query parameters and stream objects that match filtering criteria.

index(value, [start, [stop]])

Raises ValueError if the value is not present.

insert(data_map[, merge])

Insert new data in the form (time, value) into their mapped streams.

latest()

Returns latest points of data in the streams using available filters.

pin_versions([versions])

Saves the stream versions that future materializations should use.

rows()

Returns a materialized list of tuples where each tuple contains the points from each stream at a unique time.

to_array([agg])

Returns a multidimensional numpy array (similar to a list of lists) containing point classes.

to_csv(fobj[, dialect, fieldnames, agg, ...])

Saves stream data as a CSV file.

to_dataframe([agg, name_callable])

Returns a Pandas DataFrame object indexed by time and using the values of a stream for each column.

to_dict([agg, name_callable])

Returns a list of OrderedDict for each time code with the appropriate stream data attached.

to_polars([agg, name_callable])

Returns a Polars DataFrame object with time as a column and the values of a stream for each additional column.

to_series([datetime64_index, agg, name_callable])

Returns a list of Pandas Series objects indexed by time

to_table([agg, name_callable])

Returns string representation of the data in tabular form using the tabulate library.

values()

Returns a fully materialized list of lists for the stream values/points

values_iter()

Must return context object which would then close server cursor on __exit__

versions()

Returns a dict of the stream versions.

windows(width[, depth])

Stores the request for a windowing operation when the query is eventually materialized.

aligned_windows(pointwidth)#

Stores the request for an aligned windowing operation when the query is eventually materialized.

Parameters:

pointwidth (int) – The length of each returned window as computed by 2^pointwidth.

Returns:

Returns self

Return type:

StreamSet

Note

aligned_windows reads power-of-two aligned windows from BTrDB. It is faster than windows(). Each returned window will be 2^``pointwidth`` nanoseconds long, beginning at start. Note that start is inclusive, but end is exclusive. That is, results will be returned for all windows that start in the interval [start, end). If end < start``+2^``pointwidth you will not get any results. If start and end are not powers of two, the bottom pointwidth bits will be cleared. Each window will contain statistical summaries of the window. Statistical points with count == 0 will be omitted.

Examples

>>> import btrdb
>>> conn = btrdb.connect()
>>> stream1 = conn.stream_from_uuid("...")
>>> stream2 = conn.stream_from_uuid("...")
>>> streamset = btrdb.stream.StreamSet([stream1, stream2])
>>> streamset.aligned_windows(pointwidth=30)
<StreamSet ...>
>>> streamset.windows(width=1000000000)
Traceback (most recent call last):
  ...
btrdb.exceptions.InvalidOperation: A window operation is already requested
arrow_insert(data_map: dict, merge: str = 'never') dict#

Insert new data in the form (time, value) into their mapped streams using pyarrow tables.

The times in the arrow table need not be sorted by time. If the point counts are larger than appropriate, this function will automatically chunk the inserts. As a consequence, the insert is not necessarily atomic, but can be used with a very large array.

Parameters:
  • data_map (dict[uuid, pyarrow.Table]) – A dictionary keyed on stream uuids and mapped to pyarrow tables with a schema of time:Timestamp[ns, tz=UTC], value:float64. This schema will be validated and converted if necessary.

  • merge (str) –

    A string describing the merge policy. Valid policies are:
    • ’never’: the default, no points are merged

    • ’equal’: points are deduplicated if the time and value are equal

    • ’retain’: if two points have the same timestamp, the old one is kept

    • ’replace’: if two points have the same timestamp, the new one is kept

Notes

BTrDB expects datetimes to be in UTC+0.

This method is available for commercial customers with arrow-enabled servers.

Returns:

The versions of the stream after inserting new points.

Return type:

dict[uuid, int]

arrow_to_arrow_table()#

Return a pyarrow table of data.

Note

This method is available for commercial customers with arrow-enabled servers.

arrow_to_dataframe(agg=None, name_callable=None) DataFrame#

Returns a Pandas DataFrame object indexed by time and using the values of a stream for each column.

Parameters:
  • agg (List[str], default: ["mean"]) – Specify the StatPoint fields (e.g. aggregating function) to create the dataframe from. Must be one or more of “min”, “mean”, “max”, “count”, “stddev”, or “all”. This argument is ignored if not using StatPoints.

  • name_callable (lambda, default: lambda s: s.collection + "/" + s.name) – Specify a callable that can be used to determine the series name given a Stream object.

Note

This method is available for commercial customers with arrow-enabled servers.

Examples

>>> conn = btrdb.connect()
>>> s1 = conn.stream_from_uuid('c9fd8735-5ec5-4141-9a51-d23e1b2dfa42')
>>> s2 = conn.stream_from_uuid('9173fa70-87ab-4ac8-ac08-4fd63b910cae'
>>> streamset = btrdb.stream.StreamSet([s1,s2])
>>> streamset.filter(start=1500000000000000000, end=1500000000900000001).arrow_to_dataframe()
                                        new/stream/collection/foo       new/stream/bar
    time
    2017-07-14 02:40:00+00:00                               1.0             1.0
    2017-07-14 02:40:00.100000+00:00                        2.0             2.0
    2017-07-14 02:40:00.200000+00:00                        3.0             3.0
    2017-07-14 02:40:00.300000+00:00                        4.0             4.0
    2017-07-14 02:40:00.400000+00:00                        5.0             5.0
    2017-07-14 02:40:00.500000+00:00                        6.0             6.0
    2017-07-14 02:40:00.600000+00:00                        7.0             7.0
    2017-07-14 02:40:00.700000+00:00                        8.0             8.0
    2017-07-14 02:40:00.800000+00:00                        9.0             9.0
    2017-07-14 02:40:00.900000+00:00                       10.0            10.0

Use the stream uuids as their column names instead, using a lambda function.

>>> streamset.filter(start=1500000000000000000, end=1500000000900000001)
...          .arrow_to_dataframe(
...             name_callable=lambda s: str(s.uuid)
...          )
                                  c9fd8735-5ec5-4141-9a51-d23e1b2dfa42  9173fa70-87ab-4ac8-ac08-4fd63b910cae
    time
    2017-07-14 02:40:00+00:00                                          1.0                                   1.0
    2017-07-14 02:40:00.100000+00:00                                   2.0                                   2.0
    2017-07-14 02:40:00.200000+00:00                                   3.0                                   3.0
    2017-07-14 02:40:00.300000+00:00                                   4.0                                   4.0
    2017-07-14 02:40:00.400000+00:00                                   5.0                                   5.0
    2017-07-14 02:40:00.500000+00:00                                   6.0                                   6.0
    2017-07-14 02:40:00.600000+00:00                                   7.0                                   7.0
    2017-07-14 02:40:00.700000+00:00                                   8.0                                   8.0
    2017-07-14 02:40:00.800000+00:00                                   9.0                                   9.0
    2017-07-14 02:40:00.900000+00:00                                  10.0                                  10.0

A window query, with a window width of 0.4 seconds, and only showing the mean statpoint.

>>> streamset.filter(start=1500000000000000000, end=1500000000900000001)
...          .windows(width=int(0.4*10**9))
...          .arrow_to_dataframe(agg=["mean"])
                                            new/stream/collection/foo/mean  new/stream/bar/mean
    time
    2017-07-14 02:40:00+00:00                                    2.5                  2.5
    2017-07-14 02:40:00.400000+00:00                             6.5                  6.5

A window query, with a window width of 0.4 seconds, and only showing the mean and count statpoints.

>>> streamset.filter(start=1500000000000000000, end=1500000000900000001)
...          .windows(width=int(0.4*10**9))
...          .arrow_to_dataframe(agg=["mean", "count"])
                              new/stream/collection/foo/mean  new/stream/collection/foo/count  new/stream/bar/mean  new/stream/bar/count
    time
    2017-07-14 02:40:00+00:00               2.5                                4                  2.5                     4
    2017-07-14 02:40:00.400000+00:00        6.5                                4                  6.5                     4
arrow_to_dict(agg=None, name_callable=None)#

Returns a list of dicts for each time code with the appropriate stream data attached.

Parameters:
  • agg (List[str], default: ["mean"]) – Specify the StatPoint field or fields (e.g. aggregating function) to constrain dict keys. Must be one or more of “min”, “mean”, “max”, “count”, or “stddev”. This argument is ignored if RawPoint values are passed into the function.

  • name_callable (lambda, default: lambda s: s.collection + "/" + s.name) – Specify a callable that can be used to determine the series name given a Stream object.

Note

This method is available for commercial customers with arrow-enabled servers.

arrow_to_numpy(agg=None)#

Return a multidimensional array in the numpy format.

Parameters:

agg (List[str], default: ["mean"]) – Specify the StatPoint field or fields (e.g. aggregating function) to return for the arrays. Must be one or more of “min”, “mean”, “max”, “count”, or “stddev”. This argument is ignored if RawPoint values are passed into the function.

Note

This method first converts to a pandas data frame then to a numpy array.

Note

This method is available for commercial customers with arrow-enabled servers.

arrow_to_polars(agg=None, name_callable=None)#

Returns a Polars DataFrame object with time as a column and the values of a stream for each additional column from an arrow table.

Parameters:
  • agg (List[str], default: ["mean"]) – Specify the StatPoint field or fields (e.g. aggregating function) to create the dataframe from. Must be one or multiple of “min”, “mean”, “max”, “count”, “stddev”, or “all”. This argument is ignored if not using StatPoints.

  • name_callable (lambda, default: lambda s: s.collection + "/" + s.name) – Specify a callable that can be used to determine the series name given a Stream object.

Note

This method is available for commercial customers with arrow-enabled servers.

arrow_to_series(agg=None, name_callable=None)#

Returns a list of Pandas Series objects indexed by time

Parameters:
  • agg (List[str], default: ["mean"]) – Specify the StatPoint field or fields (e.g. aggregating function) to create the Series from. Must be one or more of “min”, “mean”, “max”, “count”, or “stddev”. This argument is ignored if RawPoint values are passed into the function.

  • name_callable (lambda, default: lambda s: s.collection + "/" + s.name) – Specify a callable that can be used to determine the series name given a Stream object.

Return type:

List[pandas.Series]

Note

This method is available for commercial customers with arrow-enabled servers.

Note

If you are not performing a window or aligned_window query, the agg parameter will be ignored.

Examples

Return a list of series of raw data per stream.

>>> conn = btrdb.connect()
>>> s1 = conn.stream_from_uuid('c9fd8735-5ec5-4141-9a51-d23e1b2dfa42')
>>> s2 = conn.stream_from_uuid('9173fa70-87ab-4ac8-ac08-4fd63b910cae'
>>> streamset = btrdb.stream.StreamSet([s1,s2])
>>> streamset.filter(start=1500000000000000000, end=1500000000900000001).arrow_to_series(agg=None)
    [time
    2017-07-14 02:40:00+00:00            1.0
    2017-07-14 02:40:00.100000+00:00     2.0
    2017-07-14 02:40:00.200000+00:00     3.0
    2017-07-14 02:40:00.300000+00:00     4.0
    2017-07-14 02:40:00.400000+00:00     5.0
    2017-07-14 02:40:00.500000+00:00     6.0
    2017-07-14 02:40:00.600000+00:00     7.0
    2017-07-14 02:40:00.700000+00:00     8.0
    2017-07-14 02:40:00.800000+00:00     9.0
    2017-07-14 02:40:00.900000+00:00    10.0
    Name: new/stream/collection/foo, dtype: double[pyarrow],
    time
    2017-07-14 02:40:00+00:00            1.0
    2017-07-14 02:40:00.100000+00:00     2.0
    2017-07-14 02:40:00.200000+00:00     3.0
    2017-07-14 02:40:00.300000+00:00     4.0
    2017-07-14 02:40:00.400000+00:00     5.0
    2017-07-14 02:40:00.500000+00:00     6.0
    2017-07-14 02:40:00.600000+00:00     7.0
    2017-07-14 02:40:00.700000+00:00     8.0
    2017-07-14 02:40:00.800000+00:00     9.0
    2017-07-14 02:40:00.900000+00:00    10.0
    Name: new/stream/bar, dtype: double[pyarrow]]

A window query of 0.5seconds long.

>>> streamset.filter(start=1500000000000000000, end=1500000000900000001)
...          .windows(width=int(0.5 * 10**9))
...          .arrow_to_series(agg=["mean", "count"])
    [time
    2017-07-14 02:40:00+00:00           2.5
    2017-07-14 02:40:00.400000+00:00    6.5
    Name: new/stream/collection/foo/mean, dtype: double[pyarrow],
...
    time
    2017-07-14 02:40:00+00:00           4
    2017-07-14 02:40:00.400000+00:00    4
    Name: new/stream/collection/foo/count, dtype: uint64[pyarrow],
...
    time
    2017-07-14 02:40:00+00:00           2.5
    2017-07-14 02:40:00.400000+00:00    6.5
    Name: new/stream/bar/mean, dtype: double[pyarrow],
...
    time
    2017-07-14 02:40:00+00:00           4
    2017-07-14 02:40:00.400000+00:00    4
    Name: new/stream/bar/count, dtype: uint64[pyarrow]]
arrow_values()#

Return a pyarrow table of stream values based on the streamset parameters.

This data will be sorted by the ‘time’ column.

Note

This method is available for commercial customers with arrow-enabled servers.

clone()#

Returns a deep copy of the object. Attributes that cannot be copied will be referenced to both objects.

Parameters:

None

Returns:

Returns a new copy of the instance

Return type:

StreamSet

count(precise: bool = False)#

Compute the total number of points in the streams using filters.

Computes the total number of points across all streams using the specified filters. By default, this returns the latest total count of all points in the streams. The count is modified by start and end filters or by pinning versions.

Parameters:

precise (bool, default = False) – Use statpoint counts using aligned_windows which trades accuracy for speed.

Returns:

  • int – The total number of points in all streams for the specified filters.

  • .. note:: – Note that this helper method sums the counts of all StatPoints returned by aligned_windows. Because of this the start and end timestamps may be adjusted if they are not powers of 2.

Examples

>>> import btrdb
>>> conn = btrdb.connect()
>>> stream1 = conn.stream_from_uuid("...")
>>> stream2 = conn.stream_from_uuid("...")
>>> streamset = btrdb.stream.StreamSet([stream1, stream2])
>>> streamset.count()
2345
>>> filtered_streamset = streamset.filter(start=1500000000000000000, end=1500000001000000000)
>>> filtered_streamset.count(precise=True)
734
>>> streamset.filter(start=1500000000000000000, end=1500000001000000000).count(precise=True)
734
current()#

Returns the points of data in the streams closest to the current timestamp. If the current timestamp is outside the filtered range of data, a ValueError is raised.

Returns:

The latest points of data found among all streams

Return type:

tuple

Examples

>>> import btrdb
>>> conn = btrdb.connect()
>>> stream1 = conn.stream_from_uuid("...")
>>> stream2 = conn.stream_from_uuid("...")
>>> streamset = btrdb.stream.StreamSet([stream1, stream2])
>>> streamset.current()
(<RawPoint ...>, <RawPoint ...>)
earliest()#

Returns earliest points of data in streams using available filters.

Parameters:

None

Returns:

The earliest points of data found among all streams

Return type:

tuple

Examples

>>> import btrdb
>>> conn = btrdb.connect()
>>> stream1 = conn.stream_from_uuid("...")
>>> stream2 = conn.stream_from_uuid("...")
>>> streamset = btrdb.stream.StreamSet([stream1, stream2])
>>> streamset.earliest()
(<RawPoint ...>, <RawPoint ...>)
filter(start=None, end=None, collection=None, name=None, unit=None, tags=None, annotations=None, sampling_frequency=None, schema=None)#

Provides a new StreamSet instance containing stored query parameters and stream objects that match filtering criteria.

The collection, name, and unit arguments will be used to select streams from the original StreamSet object. If a string is supplied, then a case-insensitive exact match is used to select streams. Otherwise, you may supply a compiled regex pattern that will be used with re.search.

The tags and annotations arguments expect dictionaries for the desired key/value pairs. Any stream in the original instance that has the exact key/values will be included in the new StreamSet instance.

Parameters:
  • start (int or datetime like object) – the inclusive start of the query (see btrdb.utils.timez.to_nanoseconds() for valid input types)

  • end (int or datetime like object) – the exclusive end of the query (see btrdb.utils.timez.to_nanoseconds() for valid input types)

  • collection (str or regex) – string for exact (case-insensitive) matching of collection when filtering streams or a compiled regex expression for re.search of stream collections.

  • name (str or regex) – string for exact (case-insensitive) matching of name when filtering streams or a compiled regex expression for re.search of stream names.

  • unit (str or regex) – string for exact (case-insensitive) matching of unit when filtering streams or a compiled regex expression for re.search of stream units.

  • tags (dict) – key/value pairs for filtering streams based on tags

  • annotations (dict) – key/value pairs for filtering streams based on annotations

  • sampling_frequency (float) – The sampling frequency of the data streams in Hz, set this if you want timesnapped values.

  • schema (pyarrow.Schema) – Optional arrow schema the server will cast the returned data to before sending it over the network. You can use this to change the timestamp format, column names or data sizes.

Returns:

a new instance cloned from the original with filters applied

Return type:

StreamSet

Note

If you set sampling_frequency to a non-zero value, the stream data returned will be aligned to a grid of timestamps based on the period of the sampling frequency. For example, a sampling rate of 30hz will have a sampling period of 1/30hz -> ~33_333_333 ns per sample. Leave sampling_frequency as None, or set to 0 to prevent time alignment. You should not use aligned data for frequency-based analysis.

Examples

create a streamset and apply a few filters

>>> streamset = btrdb.stream.StreamSet(list_of_streams)
>>> print(f"Total streams: {len(streamset)}")
Total streams: 89
>>> streamset.filter(units="Volts")
>>> print(f"Total streams: {len(streamset)}")
Total streams: 89
>>> filtered_streamset = streamset.filter(units="Volts")
>>> print(f"Total streams: {len(filtered_streamset)}")
Total streams: 23
>>> multiple_filters_streamset = (streamset.filter(unit="Volts")
>>>                                 .filter(name="Sensor 1")
>>>                                 .filter(annotations={"phase":"A"})
>>>                              )
>>> print(f"Total streams: {len(multiple_filters_streamset)}")
Total streams: 1
index(value[, start[, stop]]) integer -- return first index of value.#

Raises ValueError if the value is not present.

Supporting start and stop arguments is optional, but recommended.

insert(data_map: dict, merge: str = 'never') dict#

Insert new data in the form (time, value) into their mapped streams.

The times in the dataframe need not be sorted by time. If the point counts are larger than appropriate, this function will automatically chunk the inserts. As a consequence, the insert is not necessarily atomic, but can be used with a very large array.

Parameters:
  • data_map (dict[uuid, pandas.DataFrame]) – A dictionary mapping stream uuids to insert data into and their value as a pandas dataframe containing two columns, one named “time” which contains int64 utc+0 timestamps and a “value” column containing float64 measurements. These columns will be typecast into these types.

  • merge (str) –

    A string describing the merge policy. Valid policies are:
    • ’never’: the default, no points are merged

    • ’equal’: points are deduplicated if the time and value are equal

    • ’retain’: if two points have the same timestamp, the old one is kept

    • ’replace’: if two points have the same timestamp, the new one is kept

Returns:

The versions of the stream after inserting new points.

Return type:

dict[uuid, int]

Note

You MUST convert your datetimes into UTC+0 yourself. BTrDB expects UTC+0 datetimes.

Examples

>>> import pandas as pd
>>> import btrdb
>>> conn = btrdb.connect()
>>> stream1 = conn.stream_from_uuid("...")
>>> stream2 = conn.stream_from_uuid("...")
>>> streamset = btrdb.stream.StreamSet([stream1, stream2])
>>> data_map = {
...     stream1.uuid: pd.DataFrame({'time': [1500000000000000000, 1500000000100000000], 'value': [1.0, 2.0]}),
...     stream2.uuid: pd.DataFrame({'time': [1500000000000000000, 1500000000100000000], 'value': [3.0, 4.0]})
... }
>>> streamset.insert(data_map)
{UUID('...'): 1234, UUID('...'): 5678}
>>> streamset.insert(data_map, merge='replace')
{UUID('...'): 1235, UUID('...'): 5679}
latest()#

Returns latest points of data in the streams using available filters.

Parameters:

None

Returns:

The latest points of data found among all streams

Return type:

tuple

Examples

>>> import btrdb
>>> conn = btrdb.connect()
>>> stream1 = conn.stream_from_uuid("...")
>>> stream2 = conn.stream_from_uuid("...")
>>> streamset = btrdb.stream.StreamSet([stream1, stream2])
>>> streamset.earliest()
(<RawPoint ...>, <RawPoint ...>)
pin_versions(versions=None)#

Saves the stream versions that future materializations should use. If no pin is requested then the first materialization will automatically pin the return versions. Versions can also be supplied through a dict object with key:UUID, value:stream.version().

Parameters:

versions (dict[UUID: int]) – A dict containing the stream UUID and version ints as key/values

Returns:

Returns self

Return type:

StreamSet

Examples

>>> version_map = {s.uuid: 0 for s in streamset}
>>> pinned_streamset = streamset.pin_versions(versions=version_map)
rows()#

Returns a materialized list of tuples where each tuple contains the points from each stream at a unique time. If a stream has no value for that time than None is provided instead of a point object.

Parameters:

None

Returns:

A list of tuples containing a RawPoint (or StatPoint) and the stream version (list(tuple(RawPoint, int))).

Return type:

list

Examples

>>> for row in streams.rows():
>>>    print(row)
(None, RawPoint(1500000000000000000, 1.0), RawPoint(1500000000000000000, 1.0), RawPoint(1500000000000000000, 1.0))
(RawPoint(1500000000100000000, 2.0), None, RawPoint(1500000000100000000, 2.0), RawPoint(1500000000100000000, 2.0))
(None, RawPoint(1500000000200000000, 3.0), None, RawPoint(1500000000200000000, 3.0))
(RawPoint(1500000000300000000, 4.0), None, RawPoint(1500000000300000000, 4.0), RawPoint(1500000000300000000, 4.0))
(None, RawPoint(1500000000400000000, 5.0), RawPoint(1500000000400000000, 5.0), RawPoint(1500000000400000000, 5.0))
(RawPoint(1500000000500000000, 6.0), None, None, RawPoint(1500000000500000000, 6.0))
(None, RawPoint(1500000000600000000, 7.0), RawPoint(1500000000600000000, 7.0), RawPoint(1500000000600000000, 7.0))
(RawPoint(1500000000700000000, 8.0), None, RawPoint(1500000000700000000, 8.0), RawPoint(1500000000700000000, 8.0))
(None, RawPoint(1500000000800000000, 9.0), RawPoint(1500000000800000000, 9.0), RawPoint(1500000000800000000, 9.0))
(RawPoint(1500000000900000000, 10.0), None, RawPoint(1500000000900000000, 10.0), RawPoint(1500000000900000000, 10.0))
to_array(agg='mean')#

Returns a multidimensional numpy array (similar to a list of lists) containing point classes.

Parameters:

agg (str, default: "mean") – Specify the StatPoint field (e.g. aggregating function) to return for the arrays. Must be one of “min”, “mean”, “max”, “count”, or “stddev”. This argument is ignored if RawPoint values are passed into the function.

Note

This method does not use the arrow -accelerated endpoints for faster and more efficient data retrieval.

to_csv(fobj, dialect=None, fieldnames=None, agg='mean', name_callable=None)#

Saves stream data as a CSV file.

Parameters:
  • fobj (str or file-like object) – Path to use for saving CSV file or a file-like object to use to write to.

  • dialect (csv.Dialect) – CSV dialect object from Python csv module. See Python’s csv module for more information.

  • fieldnames (sequence) – A sequence of strings to use as fieldnames in the CSV header. See Python’s csv module for more information.

  • agg (str, default: "mean") – Specify the StatPoint field (e.g. aggregating function) to return when limiting results. Must be one of “min”, “mean”, “max”, “count”, or “stddev”. This argument is ignored if RawPoint values are passed into the function.

  • name_callable (lambda, default: lambda s: s.collection + "/" + s.name) – Specify a callable that can be used to determine the series name given a Stream object.

Note

This method does not use the arrow -accelerated endpoints for faster and more efficient data retrieval.

to_dataframe(agg='mean', name_callable=None)#

Returns a Pandas DataFrame object indexed by time and using the values of a stream for each column.

Parameters:
  • agg (str, default: "mean") – Specify the StatPoint field (e.g. aggregating function) to create the Series from. Must be one of “min”, “mean”, “max”, “count”, “stddev”, or “all”. This argument is ignored if not using StatPoints.

  • name_callable (lambda, default: lambda s: s.collection + "/" + s.name) – Specify a callable that can be used to determine the series name given a Stream object. This is not compatible with agg == “all” at this time

Note

This method does not use the arrow -accelerated endpoints for faster and more efficient data retrieval.

to_dict(agg='mean', name_callable=None)#

Returns a list of OrderedDict for each time code with the appropriate stream data attached.

Parameters:
  • agg (str, default: "mean") – Specify the StatPoint field (e.g. aggregating function) to constrain dict keys. Must be one of “min”, “mean”, “max”, “count”, or “stddev”. This argument is ignored if RawPoint values are passed into the function.

  • name_callable (lambda, default: lambda s: s.collection + "/" + s.name) – Specify a callable that can be used to determine the series name given a Stream object.

Note

This method does not use the arrow -accelerated endpoints for faster and more efficient data retrieval.

to_polars(agg='mean', name_callable=None)#

Returns a Polars DataFrame object with time as a column and the values of a stream for each additional column.

Parameters:
  • agg (str, default: "mean") – Specify the StatPoint field (e.g. aggregating function) to create the Series from. Must be one of “min”, “mean”, “max”, “count”, “stddev”, or “all”. This argument is ignored if not using StatPoints.

  • name_callable (lambda, default: lambda s: s.collection + "/" + s.name) – Specify a callable that can be used to determine the series name given a Stream object. This is not compatible with agg == “all” at this time

Note

This method does not use the arrow -accelerated endpoints for faster and more efficient data retrieval.

to_series(datetime64_index=True, agg='mean', name_callable=None)#

Returns a list of Pandas Series objects indexed by time

Parameters:
  • datetime64_index (bool) – Directs function to convert Series index to np.datetime64[ns] or leave as np.int64.

  • agg (str, default: "mean") – Specify the StatPoint field (e.g. aggregating function) to create the Series from. Must be one of “min”, “mean”, “max”, “count”, or “stddev”. This argument is ignored if RawPoint values are passed into the function.

  • name_callable (lambda, default: lambda s: s.collection + "/" + s.name) – Specify a callable that can be used to determine the series name given a Stream object.

Note

This method does not use the arrow -accelerated endpoints for faster and more efficient data retrieval.

to_table(agg='mean', name_callable=None)#

Returns string representation of the data in tabular form using the tabulate library.

Parameters:
  • agg (str, default: "mean") – Specify the StatPoint field (e.g. aggregating function) to create the Series from. Must be one of “min”, “mean”, “max”, “count”, or “stddev”. This argument is ignored if RawPoint values are passed into the function.

  • name_callable (lambda, default: lambda s: s.collection + "/" + s.name) – Specify a callable that can be used to determine the column name given a Stream object.

Note

This method does not use the arrow -accelerated endpoints for faster and more efficient data retrieval.

values()#

Returns a fully materialized list of lists for the stream values/points

versions()#

Returns a dict of the stream versions. These versions are the pinned values if previously pinned or the latest stream versions if not pinned.

Parameters:

None

Returns:

A dict containing the stream UUID and version ints as key/values

Return type:

dict

Examples

A pinned vs non-pinned streamset

>>> streamset = btrdb.stream.StreamSet([stream1, stream2])
>>> version_map = {s.uuid: 0 for s streamset}
>>> pinned_streamset = streamset.pin_versions(versions=version_map)
>>> pinned_streamset.versions()
{UUID('fa42f64a-a851-408f-aa7e-88a85b3d295c'): 0, UUID('18e5527a-ed13-424d-bb97-3e06a763609e'): 0}
>>> streamset.versions()
{UUID('fa42f64a-a851-408f-aa7e-88a85b3d295c'): 34532, UUID('18e5527a-ed13-424d-bb97-3e06a763609e'): 12345}
windows(width, depth=0)#

Stores the request for a windowing operation when the query is eventually materialized.

Parameters:
  • width (int) – The number of nanoseconds to use for each window size.

  • depth (int) – The requested accuracy of the data up to 2^depth nanoseconds. A depth of 0 is accurate to the nanosecond. This is now the only valid value for depth.

Returns:

Returns self

Return type:

StreamSet

Note

windows returns arbitrary precision windows from BTrDB. It is slower than aligned_windows, but can be significantly faster than values. Each returned window will be width nanoseconds long. start is inclusive, but end is exclusive ( [start, end) ) (e.g. if end < start+width you will get no results). That is, results will be returned for all windows that start at a time less than the end timestamp. If (``end - start) is not a multiple of width, then end will be decreased to the greatest value less than end such that (end - start) is a multiple of width (i.e., we set end = start + width * floordiv(end - start, width)). The depth parameter previously available has been deprecated. The only valid value for depth is now 0.

Examples

>>> import btrdb
>>> conn = btrdb.connect()
>>> stream1 = conn.stream_from_uuid("...")
>>> stream2 = conn.stream_from_uuid("...")
>>> streamset = btrdb.stream.StreamSet([stream1, stream2])
>>> streamset.windows(width=1000000000)
<StreamSet ...>
>>> streamset.windows(width=1000000000, depth=0)
<StreamSet ...>
>>> streamset.aligned_windows(pointwidth=30)
Traceback (most recent call last):
  ...
btrdb.exceptions.InvalidOperation: A window operation is already requested