btrdb.stream#

Module for Stream and related classes

class btrdb.stream.Stream(btrdb, uuid, **db_values)#

An object that represents a specific time series stream in the BTrDB database.

Parameters:
  • btrdb (BTrDB) – A reference to the BTrDB object connecting this stream back to the physical server.

  • uuid (UUID) – The unique UUID identifier for this stream.

  • db_values (kwargs) – Framework only initialization arguments. Not for developer use.

Attributes:
btrdb

Returns the stream’s BTrDB object.

collection

Returns the collection of the stream.

name

Returns the stream’s name which is parsed from the stream tags.

unit

Returns the stream’s unit which is parsed from the stream tags.

uuid

Returns the stream’s UUID.

Methods

aligned_windows(start, end, pointwidth[, ...])

Read statistical aggregates of windows of data from BTrDB.

annotations([refresh, auto_retry, retries, ...])

Returns a stream's annotations

arrow_aligned_windows(start, end, pointwidth)

Read statistical aggregates of windows of data from BTrDB.

arrow_insert(data[, merge])

Insert new data in the form of a pyarrow Table with (time, value) columns.

arrow_values(start, end[, version, ...])

Read raw values from BTrDB between time [a, b) in nanoseconds.

arrow_windows(start, end, width[, ...])

Read arbitrarily-sized windows of data from BTrDB.

count([start, end, pointwidth, precise, version])

Compute the total number of points in the stream

current([version, auto_retry, retries, ...])

Returns the point that is closest to the current timestamp, e.g. the latest point in the stream up until now.

delete(start, end[, auto_retry, retries, ...])

"Delete" all points between [start, end)

earliest([version, auto_retry, retries, ...])

Returns the first point of data in the stream.

exists()

Check if stream exists

flush([auto_retry, retries, retry_delay, ...])

Flush writes the stream buffers out to persistent storage.

insert(data[, merge])

Insert new data in the form (time, value) into the series.

latest([version, auto_retry, retries, ...])

Returns last point of data in the stream.

nearest(time, version[, backward, ...])

Finds the closest point in the stream to a specified time.

obliterate([auto_retry, retries, ...])

Obliterate deletes a stream from the BTrDB server.

refresh_metadata()

Refreshes the locally cached metadata for a stream from the server.

tags([refresh, auto_retry, retries, ...])

Returns the stream's tags.

update([tags, annotations, collection, ...])

Updates metadata including tags, annotations, and collection as an UPSERT operation.

values(start, end[, version, auto_retry, ...])

Read raw values from BTrDB between time [a, b) in nanoseconds.

version([auto_retry, retries, retry_delay, ...])

Returns the current data version of the stream.

windows(start, end, width[, depth, version, ...])

Read arbitrarily-sized windows of data from BTrDB.

aligned_windows(start, end, pointwidth, version=0, auto_retry=False, retries=5, retry_delay=3, retry_backoff=4)#

Read statistical aggregates of windows of data from BTrDB.

Query BTrDB for aggregates (or roll ups or windows) of the time series with version between time start (inclusive) and end (exclusive) in nanoseconds. Each point returned is a statistical aggregate of all the raw data within a window of width 2**`pointwidth` nanoseconds. These statistical aggregates currently include the mean, minimum, and maximum of the data and the count of data points composing the window.

Note that start is inclusive, but end is exclusive. That is, results will be returned for all windows that start in the interval [start, end). If end < start+2^pointwidth you will not get any results. If start and end are not powers of two, the bottom pointwidth bits will be cleared. Each window will contain statistical summaries of the window. Statistical points with count == 0 will be omitted.

Parameters:
  • start (int or datetime like object) – The start time in nanoseconds for the range to be queried. (see btrdb.utils.timez.to_nanoseconds() for valid input types)

  • end (int or datetime like object) – The end time in nanoseconds for the range to be queried. (see btrdb.utils.timez.to_nanoseconds() for valid input types)

  • pointwidth (int) – Specify the number of ns between data points (2**pointwidth)

  • version (int) – Version of the stream to query

  • auto_retry (bool, default: False) – Whether to retry this request in the event of an error

  • retries (int, default: 5) – Number of times to retry this request if there is an error. Will be ignored if auto_retry is False

  • retry_delay (int, default: 3) – initial time to wait before retrying function call if there is an error. Will be ignored if auto_retry is False

  • retry_backoff (int, default: 4) – Exponential factor by which the backoff increases between retries. Will be ignored if auto_retry is False

Returns:

Returns a tuple containing windows of data. Each window is a tuple containing data tuples. Each data tuple contains a StatPoint and the stream version.

Return type:

tuple

Notes

As the window-width is a power-of-two, it aligns with BTrDB internal tree data structure and is faster to execute than windows().

annotations(refresh=False, auto_retry=False, retries=5, retry_delay=3, retry_backoff=4)#

Returns a stream’s annotations

Annotations returns the annotations of the stream (and the annotation version).

Do not modify the resulting map.

Parameters:
  • refresh (bool, default: False) – Indicates whether a round trip to the server should be implemented regardless of whether there is a local copy.

  • auto_retry (bool, default: False) – Whether to retry this request in the event of an error

  • retries (int, default: 5) – Number of times to retry this request if there is an error. Will be ignored if auto_retry is False

  • retry_delay (int, default: 3) – initial time to wait before retrying function call if there is an error. Will be ignored if auto_retry is False

  • retry_backoff (int, default: 4) – Exponential factor by which the backoff increases between retries. Will be ignored if auto_retry is False

Returns:

A tuple containing a dictionary of annotations and an integer representing the version of the metadata (tuple(dict, int)).

Return type:

tuple

arrow_aligned_windows(start: int, end: int, pointwidth: int, sort_time: bool = False, version: int = 0, auto_retry=False, retries=5, retry_delay=3, retry_backoff=4) Table#

Read statistical aggregates of windows of data from BTrDB.

Query BTrDB for aggregates (or roll ups or windows) of the time series with version between time start (inclusive) and end (exclusive) in nanoseconds [start, end). Each point returned is a statistical aggregate of all the raw data within a window of width 2**`pointwidth` nanoseconds. These statistical aggregates currently include the mean, minimum, and maximum of the data and the count of data points composing the window.

Note that start is inclusive, but end is exclusive. That is, results will be returned for all windows that start in the interval [start, end). If end < start+2^pointwidth you will not get any results. If start and end are not powers of two, the bottom pointwidth bits will be cleared. Each window will contain statistical summaries of the window. Statistical points with count == 0 will be omitted.

Parameters:
  • start (int or datetime like object, required) – The start time in nanoseconds for the range to be queried. (see btrdb.utils.timez.to_nanoseconds() for valid input types)

  • end (int or datetime like object, required) – The end time in nanoseconds for the range to be queried. (see btrdb.utils.timez.to_nanoseconds() for valid input types)

  • pointwidth (int, required) – Specify the number of ns between data points (2**pointwidth)

  • sort_time (bool, default: False) – Should the table be sorted on the ‘time’ column?

  • version (int, default: 0) – Version of the stream to query

  • auto_retry (bool, default: False) – Whether to retry this request in the event of an error

  • retries (int, default: 5) – Number of times to retry this request if there is an error. Will be ignored if auto_retry is False

  • retry_delay (int, default: 3) – initial time to wait before retrying function call if there is an error. Will be ignored if auto_retry is False

  • retry_backoff (int, default: 4) – Exponential factor by which the backoff increases between retries. Will be ignored if auto_retry is False

Returns:

Returns a pyarrow table containing the windows of data.

Return type:

pyarrow.Table

Notes

As the window-width is a power-of-two, it aligns with BTrDB internal tree data structure and is faster to execute than windows().

This method is available for commercial customers with arrow-enabled servers.

arrow_insert(data: Table, merge: str = 'never') int#

Insert new data in the form of a pyarrow Table with (time, value) columns.

Inserts a table of new (time, value) columns into the stream. The values in the table need not be sorted by time. If the arrays are larger than appropriate, this function will automatically chunk the inserts. As a consequence, the insert is not necessarily atomic, but can be used with a very large array.

Parameters:
  • data (pyarrow.Table, required) – A pyarrow table with a schema of time:Timestamp[ns, tz=UTC], value:float64 This schema will be validated and converted if necessary.

  • merge (str) –

    A string describing the merge policy. Valid policies are:
    • ’never’: the default, no points are merged

    • ’equal’: points are deduplicated if the time and value are equal

    • ’retain’: if two points have the same timestamp, the old one is kept

    • ’replace’: if two points have the same timestamp, the new one is kept

Returns:

The version of the stream after inserting new points.

Return type:

int

Notes

This method is available for commercial customers with arrow-enabled servers.

arrow_values(start, end, version: int = 0, auto_retry=False, retries=5, retry_delay=3, retry_backoff=4) Table#

Read raw values from BTrDB between time [a, b) in nanoseconds.

RawValues queries BTrDB for the raw time series data points between start and end time, both in nanoseconds since the Epoch for the specified stream version.

startint or datetime like object

The start time in nanoseconds for the range to be queried. (see btrdb.utils.timez.to_nanoseconds() for valid input types)

endint or datetime like object

The end time in nanoseconds for the range to be queried. (see btrdb.utils.timez.to_nanoseconds() for valid input types)

version: int, default: 0

The version of the stream to be queried

auto_retry: bool, default: False

Whether to retry this request in the event of an error

retries: int, default: 5

Number of times to retry this request if there is an error. Will be ignored if auto_retry is False

retry_delay: int, default: 3

initial time to wait before retrying function call if there is an error. Will be ignored if auto_retry is False

retry_backoff: int, default: 4

Exponential factor by which the backoff increases between retries. Will be ignored if auto_retry is False

Returns:

A pyarrow table of the raw values with time and value columns.

Return type:

pyarrow.Table

Notes

Note that the raw data points are the original values at the sensor’s native sampling rate (assuming the time series represents measurements from a sensor). This is the lowest level of data with the finest time granularity. In the tree data structure of BTrDB, this data is stored in the vector nodes.

This method is available for commercial customers with arrow-enabled servers.

arrow_windows(start: int, end: int, width: int, sort_time: bool = False, version: int = 0, auto_retry=False, retries=5, retry_delay=3, retry_backoff=4) Table#

Read arbitrarily-sized windows of data from BTrDB.

Parameters:
  • start (int or datetime like object, required) – The start time in nanoseconds for the range to be queried. (see btrdb.utils.timez.to_nanoseconds() for valid input types)

  • end (int or datetime like object, required) – The end time in nanoseconds for the range to be queried. (see btrdb.utils.timez.to_nanoseconds() for valid input types)

  • width (int, required) – The number of nanoseconds in each window.

  • sort_time (bool, default: False) – Should the table be sorted on the ‘time’ column.

  • version (int, default=0, optional) – The version of the stream to query.

  • auto_retry (bool, default: False) – Whether to retry this request in the event of an error

  • retries (int, default: 5) – Number of times to retry this request if there is an error. Will be ignored if auto_retry is False

  • retry_delay (int, default: 3) – initial time to wait before retrying function call if there is an error. Will be ignored if auto_retry is False

  • retry_backoff (int, default: 4) – Exponential factor by which the backoff increases between retries. Will be ignored if auto_retry is False

Returns:

Returns a pyarrow Table containing windows of data.

Return type:

pyarrow.Table

Notes

Windows returns arbitrary precision windows from BTrDB. It is slower than AlignedWindows, but still significantly faster than RawValues. Each returned window will be width nanoseconds long. start is inclusive, but end is exclusive (e.g if end < start+width you will get no results). That is, results will be returned for all windows that start at a time less than the end timestamp. If (end - start) is not a multiple of width, then end will be decreased to the greatest value less than end such that (end - start) is a multiple of width (i.e., we set end = start + width * floordiv(end - start, width). The depth parameter previously available has been deprecated. The only valid value for depth is now 0. This method is available for commercial customers with arrow-enabled servers.

property btrdb#

Returns the stream’s BTrDB object.

Parameters:

None

Returns:

The BTrDB database object.

Return type:

BTrDB

property collection#

Returns the collection of the stream. It may require a round trip to the server depending on how the stream was acquired.

Parameters:

None

Returns:

the collection of the stream

Return type:

str

count(start=-1152921504606846976, end=3458764513820540927, pointwidth=62, precise=False, version=0)#

Compute the total number of points in the stream

Counts the number of points in the specified window and version. By default returns the latest total count of points in the stream. This helper method sums the counts of all StatPoints returned by aligned_windows. Because of this, note that the start and end timestamps may be adjusted if they are not powers of 2. For smaller windows of time, you may also need to adjust the pointwidth to ensure that the count granularity is captured appropriately.

Alternatively you can set the precise argument to True which will give an exact count to the nanosecond but may be slower to execute.

Parameters:
  • start (int or datetime like object, default: MINIMUM_TIME) – The start time in nanoseconds for the range to be queried. (see btrdb.utils.timez.to_nanoseconds() for valid input types)

  • end (int or datetime like object, default: MAXIMUM_TIME) – The end time in nanoseconds for the range to be queried. (see btrdb.utils.timez.to_nanoseconds() for valid input types)

  • pointwidth (int, default: 62) – Specify the number of ns between data points (2**pointwidth). If the value is too large for the time window than the next smallest, appropriate pointwidth will be used.

  • precise (bool, default: False) – Forces the use of a windows query instead of aligned_windows to determine exact count down to the nanosecond. This will be some amount slower than the aligned_windows version.

  • version (int, default: 0) – Version of the stream to query

Returns:

The total number of points in the stream for the specified window.

Return type:

int

current(version=0, auto_retry=False, retries=5, retry_delay=3, retry_backoff=4)#

Returns the point that is closest to the current timestamp, e.g. the latest point in the stream up until now. Note that no future values will be returned. Returns None if errors during lookup or there are no values before now.

Parameters:
  • version (int, default: 0) – Specify the version of the stream to query; if zero, queries the latest stream state rather than pinning to a version.

  • auto_retry (bool, default: False) – Whether to retry this request in the event of an error

  • retries (int, default: 5) – Number of times to retry this request if there is an error. Will be ignored if auto_retry is False

  • retry_delay (int, default: 3) – initial time to wait before retrying function call if there is an error. Will be ignored if auto_retry is False

  • retry_backoff (int, default: 4) – Exponential factor by which the backoff increases between retries. Will be ignored if auto_retry is False

Returns:

The last data point in the stream up until now and the version of the stream the value was retrieved at (tuple(RawPoint, int)).

Return type:

tuple

delete(start, end, auto_retry=False, retries=5, retry_delay=3, retry_backoff=4)#

“Delete” all points between [start, end)

“Delete” all points between start (inclusive) and end (exclusive), both in nanoseconds. As BTrDB has persistent multiversioning, the deleted points will still exist as part of an older version of the stream.

Parameters:
  • start (int or datetime like object) – The start time in nanoseconds for the range to be deleted. (see btrdb.utils.timez.to_nanoseconds() for valid input types)

  • end (int or datetime like object) – The end time in nanoseconds for the range to be deleted. (see btrdb.utils.timez.to_nanoseconds() for valid input types)

  • auto_retry (bool, default: False) – Whether to retry this request in the event of an error

  • retries (int, default: 5) – Number of times to retry this request if there is an error. Will be ignored if auto_retry is False

  • retry_delay (int, default: 3) – initial time to wait before retrying function call if there is an error. Will be ignored if auto_retry is False

  • retry_backoff (int, default: 4) – Exponential factor by which the backoff increases between retries. Will be ignored if auto_retry is False

Returns:

The version of the new stream created

Return type:

int

earliest(version=0, auto_retry=False, retries=5, retry_delay=3, retry_backoff=4)#

Returns the first point of data in the stream. Returns None if error encountered during lookup or no values in stream.

Parameters:
  • version (int, default: 0) – Specify the version of the stream to query; if zero, queries the latest stream state rather than pinning to a version.

  • auto_retry (bool, default: False) – Whether to retry this request in the event of an error

  • retries (int, default: 5) – Number of times to retry this request if there is an error. Will be ignored if auto_retry is False

  • retry_delay (int, default: 3) – initial time to wait before retrying function call if there is an error. Will be ignored if auto_retry is False

  • retry_backoff (int, default: 4) – Exponential factor by which the backoff increases between retries. Will be ignored if auto_retry is False

Returns:

The first data point in the stream and the version of the stream the value was retrieved at (tuple(RawPoint, int)).

Return type:

tuple

exists()#

Check if stream exists

Exists returns true if the stream exists. This is essential after using StreamFromUUID as the stream may not exist, causing a 404 error on later stream operations. Any operation that returns a stream from collection and tags will have ensured the stream exists already.

Parameters:

None

Returns:

Indicates whether stream is extant in the BTrDB server.

Return type:

bool

flush(auto_retry=False, retries=5, retry_delay=3, retry_backoff=4)#

Flush writes the stream buffers out to persistent storage.

Parameters:
  • auto_retry (bool, default: False) – Whether to retry this request in the event of an error

  • retries (int, default: 5) – Number of times to retry this request if there is an error. Will be ignored if auto_retry is False

  • retry_delay (int, default: 3) – initial time to wait before retrying function call if there is an error. Will be ignored if auto_retry is False

  • retry_backoff (int, default: 4) – Exponential factor by which the backoff increases between retries. Will be ignored if auto_retry is False

insert(data, merge='never')#

Insert new data in the form (time, value) into the series.

Inserts a list of new (time, value) tuples into the series. The tuples in the list need not be sorted by time. If the arrays are larger than appropriate, this function will automatically chunk the inserts. As a consequence, the insert is not necessarily atomic, but can be used with a very large array.

Parameters:
  • data (list[tuple[int, float]]) – A list of tuples in which each tuple contains a time (int) and value (float) for insertion to the database

  • merge (str) –

    A string describing the merge policy. Valid policies are:
    • ’never’: the default, no points are merged

    • ’equal’: points are deduplicated if the time and value are equal

    • ’retain’: if two points have the same timestamp, the old one is kept

    • ’replace’: if two points have the same timestamp, the new one is kept

Returns:

The version of the stream after inserting new points.

Return type:

int

latest(version=0, auto_retry=False, retries=5, retry_delay=3, retry_backoff=4)#

Returns last point of data in the stream. Returns None if error encountered during lookup or no values in stream.

Parameters:
  • version (int, default: 0) – Specify the version of the stream to query; if zero, queries the latest stream state rather than pinning to a version.

  • auto_retry (bool, default: False) – Whether to retry this request in the event of an error

  • retries (int, default: 5) – Number of times to retry this request if there is an error. Will be ignored if auto_retry is False

  • retry_delay (int, default: 3) – initial time to wait before retrying function call if there is an error. Will be ignored if auto_retry is False

  • retry_backoff (int, default: 4) – Exponential factor by which the backoff increases between retries. Will be ignored if auto_retry is False

Returns:

The last data point in the stream and the version of the stream the value was retrieved at (tuple(RawPoint, int)).

Return type:

tuple

property name#

Returns the stream’s name which is parsed from the stream tags. This may require a round trip to the server depending on how the stream was acquired.

Returns:

The name of the stream.

Return type:

str

nearest(time, version, backward=False, auto_retry=False, retries=5, retry_delay=3, retry_backoff=4)#

Finds the closest point in the stream to a specified time.

Return the point nearest to the specified time in nanoseconds since Epoch in the stream with version while specifying whether to search forward or backward in time. If backward is false, the returned point will be >= time. If backward is true, the returned point will be < time. The version of the stream used to satisfy the query is returned.

Parameters:
  • time (int or datetime like object) – The time (in nanoseconds since Epoch) to search near (see btrdb.utils.timez.to_nanoseconds() for valid input types)

  • version (int) – Version of the stream to use in search

  • backward (boolean) – True to search backwards from time, else false for forward

  • auto_retry (bool, default: False) – Whether to retry this request in the event of an error

  • retries (int, default: 5) – Number of times to retry this request if there is an error. Will be ignored if auto_retry is False

  • retry_delay (int, default: 3) – initial time to wait before retrying function call if there is an error. Will be ignored if auto_retry is False

  • retry_backoff (int, default: 4) – Exponential factor by which the backoff increases between retries. Will be ignored if auto_retry is False

Returns:

The closest data point in the stream and the version of the stream the value was retrieved at (tuple(RawPoint, int)).

Return type:

tuple

obliterate(auto_retry=False, retries=5, retry_delay=3, retry_backoff=4)#

Obliterate deletes a stream from the BTrDB server. An exception will be raised if the stream could not be found.

Parameters:
  • auto_retry (bool, default: False) – Whether to retry this request in the event of an error

  • retries (int, default: 5) – Number of times to retry this request if there is an error. Will be ignored if auto_retry is False

  • retry_delay (int, default: 3) – initial time to wait before retrying function call if there is an error. Will be ignored if auto_retry is False

  • retry_backoff (int, default: 4) – Exponential factor by which the backoff increases between retries. Will be ignored if auto_retry is False

Raises:

BTrDBError [404] stream does not exist – The stream could not be found in BTrDB

refresh_metadata()#

Refreshes the locally cached metadata for a stream from the server.

Queries the BTrDB server for all stream metadata including collection, annotation, and tags. This method requires a round trip to the server.

tags(refresh=False, auto_retry=False, retries=5, retry_delay=3, retry_backoff=4)#

Returns the stream’s tags.

Tags returns the tags of the stream. It may require a round trip to the server depending on how the stream was acquired.

Parameters:
  • refresh (bool, default: False) – Indicates whether a round trip to the server should be implemented regardless of whether there is a local copy.

  • auto_retry (bool, default: False) – Whether to retry this request in the event of an error

  • retries (int, default: 5) – Number of times to retry this request if there is an error. Will be ignored if auto_retry is False

  • retry_delay (int, default: 3) – initial time to wait before retrying function call if there is an error. Will be ignored if auto_retry is False

  • retry_backoff (int, default: 4) – Exponential factor by which the backoff increases between retries. Will be ignored if auto_retry is False

Returns:

A dictionary containing the tags.

Return type:

dict

property unit#

Returns the stream’s unit which is parsed from the stream tags. This may require a round trip to the server depending on how the stream was acquired.

Returns:

The unit for values of the stream.

Return type:

str

update(tags=None, annotations=None, collection=None, encoder=<class 'btrdb.utils.conversion.AnnotationEncoder'>, replace=False, auto_retry=False, retries=5, retry_delay=3, retry_backoff=4)#

Updates metadata including tags, annotations, and collection as an UPSERT operation.

By default, the update will only affect the keys and values in the specified tags and annotations dictionaries, inserting them if they don’t exist, or updating the value for the key if it does exist. If any of the update arguments (i.e. tags, annotations, collection) are None, they will remain unchanged in the database.

To delete either tags or annotations, you must specify exactly which keys and values you want set for the field and set replace=True. For .. rubric:: Example

>>> annotations, _ = stream.anotations()
>>> del annotations["key_to_delete"]
>>> stream.update(annotations=annotations, replace=True)

This ensures that all of the keys and values for the annotations are preserved except for the key to be deleted.

Parameters:
  • tags (dict, optional) – Specify the tag key/value pairs as a dictionary to upsert or replace. If None, the tags will remain unchanged in the database.

  • annotations (dict, optional) – Specify the annotations key/value pairs as a dictionary to upsert or replace. If None, the annotations will remain unchanged in the database.

  • collection (str, optional) – Specify a new collection for the stream. If None, the collection will remain unchanged.

  • encoder (json.JSONEncoder or None) – JSON encoder class to use for annotation serialization. Set to None to prevent JSON encoding of the annotations.

  • replace (bool, default: False) – Replace all annotations or tags with the specified dictionaries instead of performing the normal upsert operation. Specifying True is the only way to remove annotation keys.

  • auto_retry (bool, default: False) – Whether to retry this request in the event of an error

  • retries (int, default: 5) – Number of times to retry this request if there is an error. Will be ignored if auto_retry is False

  • retry_delay (int, default: 3) – initial time to wait before retrying function call if there is an error. Will be ignored if auto_retry is False

  • retry_backoff (int, default: 4) – Exponential factor by which the backoff increases between retries. Will be ignored if auto_retry is False

Returns:

The version of the metadata (separate from the version of the data) also known as the “property version”.

Return type:

int

property uuid#

Returns the stream’s UUID. The stream may or may not exist yet, depending on how the stream object was obtained.

Returns:

The unique identifier of the stream.

Return type:

UUID

See also

stream.exists

values(start, end, version=0, auto_retry=False, retries=5, retry_delay=3, retry_backoff=4)#

Read raw values from BTrDB between time [a, b) in nanoseconds.

RawValues queries BTrDB for the raw time series data points between start and end time, both in nanoseconds since the Epoch for the specified stream version.

Parameters:
  • start (int or datetime like object) – The start time in nanoseconds for the range to be queried. (see btrdb.utils.timez.to_nanoseconds() for valid input types)

  • end (int or datetime like object) – The end time in nanoseconds for the range to be queried. (see btrdb.utils.timez.to_nanoseconds() for valid input types)

  • version (int) – The version of the stream to be queried

  • auto_retry (bool, default: False) – Whether to retry this request in the event of an error

  • retries (int, default: 5) – Number of times to retry this request if there is an error. Will be ignored if auto_retry is False

  • retry_delay (int, default: 3) – initial time to wait before retrying function call if there is an error. Will be ignored if auto_retry is False

  • retry_backoff (int, default: 4) – Exponential factor by which the backoff increases between retries. Will be ignored if auto_retry is False

Returns:

Returns a list of tuples containing a RawPoint and the stream version (list(tuple(RawPoint,int))).

Return type:

list

Notes

Note that the raw data points are the original values at the sensor’s native sampling rate (assuming the time series represents measurements from a sensor). This is the lowest level of data with the finest time granularity. In the tree data structure of BTrDB, this data is stored in the vector nodes.

version(auto_retry=False, retries=5, retry_delay=3, retry_backoff=4)#

Returns the current data version of the stream.

Version returns the current data version of the stream. This is not cached, it queries each time. Take care that you do not intorduce races in your code by assuming this function will always return the same vaue

Parameters:
  • auto_retry (bool, default: False) – Whether to retry this request in the event of an error

  • retries (int, default: 5) – Number of times to retry this request if there is an error. Will be ignored if auto_retry is False

  • retry_delay (int, default: 3) – initial time to wait before retrying function call if there is an error. Will be ignored if auto_retry is False

  • retry_backoff (int, default: 4) – Exponential factor by which the backoff increases between retries. Will be ignored if auto_retry is False

Returns:

The version of the stream.

Return type:

int

windows(start, end, width, depth=0, version=0, auto_retry=False, retries=5, retry_delay=3, retry_backoff=4)#

Read arbitrarily-sized windows of data from BTrDB. StatPoint objects will be returned representing the data for each window.

Parameters:
  • start (int or datetime like object) – The start time in nanoseconds for the range to be queried. (see btrdb.utils.timez.to_nanoseconds() for valid input types)

  • end (int or datetime like object) – The end time in nanoseconds for the range to be queried. (see btrdb.utils.timez.to_nanoseconds() for valid input types)

  • width (int) – The number of nanoseconds in each window.

  • version (int) – The version of the stream to query.

  • auto_retry (bool, default: False) – Whether to retry this request in the event of an error

  • retries (int, default: 5) – Number of times to retry this request if there is an error. Will be ignored if auto_retry is False

  • retry_delay (int, default: 3) – initial time to wait before retrying function call if there is an error. Will be ignored if auto_retry is False

  • retry_backoff (int, default: 4) – Exponential factor by which the backoff increases between retries. Will be ignored if auto_retry is False

Returns:

Returns a tuple containing windows of data. Each window is a tuple containing data tuples. Each data tuple contains a StatPoint and the stream version (tuple(tuple(StatPoint, int), …)).

Return type:

tuple

Notes

Windows returns arbitrary precision windows from BTrDB. It is slower than AlignedWindows, but still significantly faster than RawValues. Each returned window will be width nanoseconds long. start is inclusive, but end is exclusive (e.g if end < start+width you will get no results). That is, results will be returned for all windows that start at a time less than the end timestamp. If (end - start) is not a multiple of width, then end will be decreased to the greatest value less than end such that (end - start) is a multiple of width (i.e., we set end = start + width * floordiv(end - start, width). The depth parameter previously available has been deprecated. The only valid value for depth is now 0.

This method is available for commercial customers with arrow-enabled servers.

class btrdb.stream.StreamSetBase(streams: List[Stream])#

A lighweight wrapper around a list of stream objects

Attributes:
allow_window

Methods

aligned_windows(pointwidth)

Stores the request for an aligned windowing operation when the query is eventually materialized.

arrow_insert(data_map[, merge])

Insert new data in the form (time, value) into their mapped streams using pyarrow tables.

arrow_values()

Return a pyarrow table of stream values based on the streamset parameters.

clone()

Returns a deep copy of the object.

count([precise])

Compute the total number of points in the streams using filters.

current()

Returns the points of data in the streams closest to the current timestamp.

earliest()

Returns earliest points of data in streams using available filters.

filter([start, end, collection, name, unit, ...])

Provides a new StreamSet instance containing stored query parameters and stream objects that match filtering criteria.

index(value, [start, [stop]])

Raises ValueError if the value is not present.

insert(data_map[, merge])

Insert new data in the form (time, value) into their mapped streams.

latest()

Returns latest points of data in the streams using available filters.

pin_versions([versions])

Saves the stream versions that future materializations should use.

rows()

Returns a materialized list of tuples where each tuple contains the points from each stream at a unique time.

values()

Returns a fully materialized list of lists for the stream values/points

values_iter()

Must return context object which would then close server cursor on __exit__

versions()

Returns a dict of the stream versions.

windows(width[, depth])

Stores the request for a windowing operation when the query is eventually materialized.

aligned_windows(pointwidth)#

Stores the request for an aligned windowing operation when the query is eventually materialized.

Parameters:

pointwidth (int) – The length of each returned window as computed by 2^pointwidth.

Returns:

Returns self

Return type:

StreamSet

Notes

aligned_windows reads power-of-two aligned windows from BTrDB. It is faster than Windows(). Each returned window will be 2^pointwidth nanoseconds long, starting at start. Note that start is inclusive, but end is exclusive. That is, results will be returned for all windows that start in the interval [start, end). If end < start+2^pointwidth you will not get any results. If start and end are not powers of two, the bottom pointwidth bits will be cleared. Each window will contain statistical summaries of the window. Statistical points with count == 0 will be omitted.

arrow_insert(data_map: dict, merge: str = 'never') dict#

Insert new data in the form (time, value) into their mapped streams using pyarrow tables.

The times in the arrow table need not be sorted by time. If the point counts are larger than appropriate, this function will automatically chunk the inserts. As a consequence, the insert is not necessarily atomic, but can be used with a very large array.

Parameters:
  • data_map (dict[uuid, pyarrow.Table]) – A dictionary keyed on stream uuids and mapped to pyarrow tables with a schema of time:Timestamp[ns, tz=UTC], value:float64. This schema will be validated and converted if necessary.

  • merge (str) –

    A string describing the merge policy. Valid policies are:
    • ’never’: the default, no points are merged

    • ’equal’: points are deduplicated if the time and value are equal

    • ’retain’: if two points have the same timestamp, the old one is kept

    • ’replace’: if two points have the same timestamp, the new one is kept

Notes

BTrDB expects datetimes to be in UTC+0.

This method is available for commercial customers with arrow-enabled servers.

Returns:

The versions of the stream after inserting new points.

Return type:

dict[uuid, int]

arrow_values()#

Return a pyarrow table of stream values based on the streamset parameters.

This data will be sorted by the ‘time’ column.

Notes

This method is available for commercial customers with arrow-enabled servers.

clone()#

Returns a deep copy of the object. Attributes that cannot be copied will be referenced to both objects.

Parameters:

None

Returns:

Returns a new copy of the instance

Return type:

StreamSet

count(precise: bool = False)#

Compute the total number of points in the streams using filters.

Computes the total number of points across all streams using the specified filters. By default, this returns the latest total count of all points in the streams. The count is modified by start and end filters or by pinning versions.

Note that this helper method sums the counts of all StatPoints returned by aligned_windows. Because of this the start and end timestamps may be adjusted if they are not powers of 2.

Parameters:

precise (bool, default = False) – Use statpoint counts using aligned_windows which trades accuracy for speed.

Returns:

The total number of points in all streams for the specified filters.

Return type:

int

current()#

Returns the points of data in the streams closest to the current timestamp. If the current timestamp is outside the filtered range of data, a ValueError is raised.

Returns:

The latest points of data found among all streams

Return type:

tuple

earliest()#

Returns earliest points of data in streams using available filters.

Parameters:

None

Returns:

The earliest points of data found among all streams

Return type:

tuple

filter(start=None, end=None, collection=None, name=None, unit=None, tags=None, annotations=None, sampling_frequency=None)#

Provides a new StreamSet instance containing stored query parameters and stream objects that match filtering criteria.

The collection, name, and unit arguments will be used to select streams from the original StreamSet object. If a string is supplied, then a case-insensitive exact match is used to select streams. Otherwise, you may supply a compiled regex pattern that will be used with re.search.

The tags and annotations arguments expect dictionaries for the desired key/value pairs. Any stream in the original instance that has the exact key/values will be included in the new StreamSet instance.

Parameters:
  • start (int or datetime like object) – the inclusive start of the query (see btrdb.utils.timez.to_nanoseconds() for valid input types)

  • end (int or datetime like object) – the exclusive end of the query (see btrdb.utils.timez.to_nanoseconds() for valid input types)

  • collection (str or regex) – string for exact (case-insensitive) matching of collection when filtering streams or a compiled regex expression for re.search of stream collections.

  • name (str or regex) – string for exact (case-insensitive) matching of name when filtering streams or a compiled regex expression for re.search of stream names.

  • unit (str or regex) – string for exact (case-insensitive) matching of unit when filtering streams or a compiled regex expression for re.search of stream units.

  • tags (dict) – key/value pairs for filtering streams based on tags

  • annotations (dict) – key/value pairs for filtering streams based on annotations

  • sampling_frequency (int) – The sampling frequency of the data streams in Hz, set this if you want timesnapped values.

Returns:

a new instance cloned from the original with filters applied

Return type:

StreamSet

Notes

If you set sampling_frequency to a non-zero value, the stream data returned will be aligned to a grid of timestamps based on the period of the sampling frequency. For example, a sampling rate of 30hz will have a sampling period of 1/30hz -> ~33_333_333 ns per sample. Leave sampling_frequency as None, or set to 0 to prevent time alignment. You should not use aligned data for frequency-based analysis.

insert(data_map: dict, merge: str = 'never') dict#

Insert new data in the form (time, value) into their mapped streams.

The times in the dataframe need not be sorted by time. If the point counts are larger than appropriate, this function will automatically chunk the inserts. As a consequence, the insert is not necessarily atomic, but can be used with a very large array.

Parameters:
  • data_map (dict[uuid, pandas.DataFrame]) – A dictionary mapping stream uuids to insert data into and their value as a pandas dataframe containing two columns, one named “time” which contains int64 utc+0 timestamps and a “value” column containing float64 measurements. These columns will be typecast into these types.

  • merge (str) –

    A string describing the merge policy. Valid policies are:
    • ’never’: the default, no points are merged

    • ’equal’: points are deduplicated if the time and value are equal

    • ’retain’: if two points have the same timestamp, the old one is kept

    • ’replace’: if two points have the same timestamp, the new one is kept

Notes

You MUST convert your datetimes into utc+0 yourself. BTrDB expects utc+0 datetimes.

Returns:

The versions of the stream after inserting new points.

Return type:

dict[uuid, int]

latest()#

Returns latest points of data in the streams using available filters.

Parameters:

None

Returns:

The latest points of data found among all streams

Return type:

tuple

pin_versions(versions=None)#

Saves the stream versions that future materializations should use. If no pin is requested then the first materialization will automatically pin the return versions. Versions can also be supplied through a dict object with key:UUID, value:stream.version().

Parameters:

versions (dict[UUID: int]) – A dict containing the stream UUID and version ints as key/values

Returns:

Returns self

Return type:

StreamSet

rows()#

Returns a materialized list of tuples where each tuple contains the points from each stream at a unique time. If a stream has no value for that time than None is provided instead of a point object.

Parameters:

None

Returns:

A list of tuples containing a RawPoint (or StatPoint) and the stream version (list(tuple(RawPoint, int))).

Return type:

list

values()#

Returns a fully materialized list of lists for the stream values/points

values_iter()#

Must return context object which would then close server cursor on __exit__

versions()#

Returns a dict of the stream versions. These versions are the pinned values if previously pinned or the latest stream versions if not pinned.

Parameters:

None

Returns:

A dict containing the stream UUID and version ints as key/values

Return type:

dict

windows(width, depth=0)#

Stores the request for a windowing operation when the query is eventually materialized.

Parameters:
  • width (int) – The number of nanoseconds to use for each window size.

  • depth (int) – The requested accuracy of the data up to 2^depth nanoseconds. A depth of 0 is accurate to the nanosecond. This is now the only valid value for depth.

Returns:

Returns self

Return type:

StreamSet

Notes

Windows returns arbitrary precision windows from BTrDB. It is slower than aligned_windows, but still significantly faster than values. Each returned window will be width nanoseconds long. start is inclusive, but end is exclusive (e.g. if end < start+width you will get no results). That is, results will be returned for all windows that start at a time less than the end timestamp. If (end - start) is not a multiple of width, then end will be decreased to the greatest value less than end such that (end - start) is a multiple of width (i.e., we set end = start + width * floordiv(end - start, width)). The depth parameter previously available has been deprecated. The only valid value for depth is now 0.