diff --git a/CHANGELOG.md b/CHANGELOG.md index 4146ecf..b5c1e0d 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -4,7 +4,8 @@ ### Features -1. [#130](https://github.com/InfluxCommunity/influxdb3-python/pull/130): Remove org parameters from the example code because It is not mandatory in Influxdb3 +1. [#127](https://github.com/InfluxCommunity/influxdb3-python/pull/127): Support creating client from environment variables +2. [#130](https://github.com/InfluxCommunity/influxdb3-python/pull/130): Remove org parameters from the example code because It is not mandatory in Influxdb3 ## 0.12.0 [2025-03-26] diff --git a/influxdb_client_3/__init__.py b/influxdb_client_3/__init__.py index 753a83b..dbbc582 100644 --- a/influxdb_client_3/__init__.py +++ b/influxdb_client_3/__init__.py @@ -1,17 +1,37 @@ +import importlib.util +import os import urllib.parse +from typing import Any + import pyarrow as pa -import importlib.util from influxdb_client_3.query.query_api import QueryApi as _QueryApi, QueryApiOptionsBuilder from influxdb_client_3.read_file import UploadFile from influxdb_client_3.write_client import InfluxDBClient as _InfluxDBClient, WriteOptions, Point from influxdb_client_3.write_client.client.exceptions import InfluxDBError from influxdb_client_3.write_client.client.write_api import WriteApi as _WriteApi, SYNCHRONOUS, ASYNCHRONOUS, \ - PointSettings + PointSettings, WriteType, DefaultWriteOptions from influxdb_client_3.write_client.domain.write_precision import WritePrecision polars = importlib.util.find_spec("polars") is not None +INFLUX_HOST = "INFLUX_HOST" +INFLUX_TOKEN = "INFLUX_TOKEN" +INFLUX_DATABASE = "INFLUX_DATABASE" +INFLUX_ORG = "INFLUX_ORG" +INFLUX_PRECISION = "INFLUX_PRECISION" +INFLUX_AUTH_SCHEME = "INFLUX_AUTH_SCHEME" +INFLUX_GZIP_THRESHOLD = "INFLUX_GZIP_THRESHOLD" +INFLUX_TIMEOUT = "INFLUX_TIMEOUT" +INFLUX_VERIFY_SSL = "INFLUX_VERIFY_SSL" +INFLUX_SSL_CA_CERT = "INFLUX_SSL_CA_CERT" +INFLUX_CERT_FILE = "INFLUX_CERT_FILE" +INFLUX_CERT_KEY_FILE = "INFLUX_CERT_KEY_FILE" +INFLUX_CERT_KEY_PASSWORD = "INFLUX_CERT_KEY_PASSWORD" +INFLUX_CONNECTION_POOL_MAXSIZE = "INFLUX_CONNECTION_POOL_MAXSIZE" +INFLUX_PROFILERS = "INFLUX_PROFILERS" +INFLUX_TAG = "INFLUX_TAG" + def write_client_options(**kwargs): """ @@ -83,6 +103,51 @@ def _merge_options(defaults, exclude_keys=None, custom=None): return _deep_merge(defaults, {key: value for key, value in custom.items() if key not in exclude_keys}) +def _parse_precision(precision): + """ + Parses the precision value and ensures it is valid. + + This function checks that the given `precision` is one of the allowed + values defined in `WritePrecision`. If the precision is invalid, it + raises a `ValueError`. The function returns the valid precision value + if it passes validation. + + :param precision: The precision value to be validated. + Must be one of WritePrecision.NS, WritePrecision.MS, + WritePrecision.S, or WritePrecision.US. + :return: The valid precision value. + :rtype: WritePrecision + :raises ValueError: If the provided precision is not valid. + """ + if precision not in [WritePrecision.NS, WritePrecision.MS, WritePrecision.S, WritePrecision.US]: + raise ValueError(f"Invalid precision value: {precision}") + return precision + + +def _parse_gzip_threshold(threshold): + """ + Parses and validates the provided threshold value. + + This function ensures that the given threshold is a valid integer value, + and it raises an appropriate error if the threshold is not valid. It also + enforces that the threshold value is non-negative. + + :param threshold: The input threshold value to be parsed and validated. + :type threshold: Any + :return: The validated threshold value as an integer. + :rtype: int + :raises ValueError: If the provided threshold is not an integer or if it is + negative. + """ + try: + threshold = int(threshold) + except (TypeError, ValueError): + raise ValueError(f"Invalid threshold value: {threshold}. Must be integer.") + if threshold < 0: + raise ValueError(f"Invalid threshold value: {threshold}. Must be non-negative.") + return threshold + + class InfluxDBClient3: def __init__( self, @@ -136,8 +201,27 @@ def __init__( self._org = org if org is not None else "default" self._database = database self._token = token - self._write_client_options = write_client_options if write_client_options is not None \ - else default_client_options(write_options=SYNCHRONOUS) + + write_type = DefaultWriteOptions.write_type.value + write_precision = DefaultWriteOptions.write_precision.value + gzip_threshold = None + if isinstance(write_client_options, dict) and write_client_options.get('write_options') is not None: + write_opts = write_client_options['write_options'] + write_type = getattr(write_opts, 'write_type', write_type) + write_precision = getattr(write_opts, 'write_precision', write_precision) + gzip_threshold = getattr(write_opts, 'gzip_threshold') + + write_options = WriteOptions( + write_type=write_type, + write_precision=write_precision, + gzip_threshold=gzip_threshold, + enable_gzip=kwargs.get('enable_gzip', False) + ) + + self._write_client_options = { + "write_options": write_options, + **(write_client_options or {}) + } # Parse the host input parsed_url = urllib.parse.urlparse(host) @@ -155,6 +239,8 @@ def __init__( url=f"{scheme}://{hostname}:{port}", token=self._token, org=self._org, + enable_gzip=write_options.enable_gzip, + gzip_threshold=write_options.gzip_threshold, **kwargs) self._write_api = _WriteApi(influxdb_client=self._client, **self._write_client_options) @@ -178,6 +264,104 @@ def __init__( flight_client_options=flight_client_options, proxy=kwargs.get("proxy", None), options=q_opts_builder.build()) + @classmethod + def from_env(cls, **kwargs: Any) -> 'InfluxDBClient3': + + """ + Creates an instance of InfluxDBClient3 configured by specific environment + variables. This method automatically loads configuration settings, + such as connection details, security parameters, and performance + options, from environment variables and initializes the client + accordingly. + + :param cls: + The class used to create the client instance. + :param kwargs: + Additional optional parameters that can be passed to customize the + configuration or override specific settings derived from the + environment variables. + + :raises ValueError: + If any required environment variables are missing or have empty + values. + + :return: + An initialized instance of the `InfluxDBClient3` class with all the + configuration settings applied. + :rtype: + InfluxDBClient3 + """ + + required_vars = { + INFLUX_HOST: os.getenv(INFLUX_HOST), + INFLUX_TOKEN: os.getenv(INFLUX_TOKEN), + INFLUX_DATABASE: os.getenv(INFLUX_DATABASE) + } + missing_vars = [var for var, value in required_vars.items() if value is None or value == ""] + if missing_vars: + raise ValueError(f"Missing required environment variables: {', '.join(missing_vars)}") + + write_options = WriteOptions(write_type=WriteType.synchronous) + + gzip_threshold = os.getenv(INFLUX_GZIP_THRESHOLD) + if gzip_threshold is not None: + write_options.gzip_threshold = _parse_gzip_threshold(gzip_threshold) + write_options.enable_gzip = True + + precision = os.getenv(INFLUX_PRECISION) + if precision is not None: + write_options.write_precision = _parse_precision(precision) + + write_client_option = {'write_options': write_options} + + if os.getenv(INFLUX_AUTH_SCHEME) is not None: + kwargs['auth_scheme'] = os.getenv(INFLUX_AUTH_SCHEME) + + timeout = os.getenv(INFLUX_TIMEOUT) + if timeout is not None: + kwargs['timeout'] = int(timeout) + + ssl_ca_cert = os.getenv(INFLUX_SSL_CA_CERT) + if ssl_ca_cert is not None: + kwargs['ssl_ca_cert'] = ssl_ca_cert + + cert_file = os.getenv(INFLUX_CERT_FILE) + if cert_file is not None: + kwargs['cert_file'] = cert_file + + cert_key_file = os.getenv(INFLUX_CERT_KEY_FILE) + if cert_key_file is not None: + kwargs['cert_key_file'] = cert_key_file + + cert_key_password = os.getenv(INFLUX_CERT_KEY_PASSWORD) + if cert_key_password is not None: + kwargs['cert_key_password'] = cert_key_password + + connection_pool_maxsize = os.getenv(INFLUX_CONNECTION_POOL_MAXSIZE) + if connection_pool_maxsize is not None: + kwargs['connection_pool_maxsize'] = int(connection_pool_maxsize) + + profilers = os.getenv(INFLUX_PROFILERS) + if profilers is not None: + kwargs['profilers'] = [x.strip() for x in profilers.split(',')] + + default_tags = dict() + for key, value in os.environ.items(): + if key.startswith("{0}_".format(INFLUX_TAG)): + default_tags[key[11:].lower()] = value + kwargs['default_tags'] = default_tags + + kwargs['verify_ssl'] = bool(os.getenv(INFLUX_VERIFY_SSL, 'True').lower() in ['True', 'true']) + org = os.getenv(INFLUX_ORG, "default") + return InfluxDBClient3( + host=required_vars[INFLUX_HOST], + token=required_vars[INFLUX_TOKEN], + database=required_vars[INFLUX_DATABASE], + write_client_options=write_client_option, + org=org, + **kwargs + ) + def write(self, record=None, database=None, **kwargs): """ Write data to InfluxDB. diff --git a/influxdb_client_3/write_client/_sync/api_client.py b/influxdb_client_3/write_client/_sync/api_client.py index b72a840..724f1c1 100644 --- a/influxdb_client_3/write_client/_sync/api_client.py +++ b/influxdb_client_3/write_client/_sync/api_client.py @@ -1,12 +1,4 @@ # coding: utf-8 -""" -InfluxDB OSS API Service. - -The InfluxDB v2 API provides a programmatic interface for all interactions with InfluxDB. Access the InfluxDB API using the `/api/v2/` endpoint. # noqa: E501 - -OpenAPI spec version: 2.0.0 -Generated by: https://openapi-generator.tech -""" from __future__ import absolute_import @@ -28,17 +20,7 @@ class ApiClient(object): - """Generic API client for OpenAPI client library Build. - - OpenAPI generic API client. This client handles the client- - server communication, and is invariant across implementations. Specifics of - the methods and models for each application are generated from the OpenAPI - templates. - - NOTE: This class is auto generated by OpenAPI Generator. - Ref: https://openapi-generator.tech - Do not edit the class manually. - + """ :param configuration: .Configuration object for this client :param header_name: a header to pass when making calls to the API. :param header_value: a header value to pass when making calls to @@ -120,6 +102,10 @@ def __call_api( config = self.configuration self._signin(resource_path=resource_path) + gzip_threshold = config.gzip_threshold + enable_gzip = config.enable_gzip + self.should_compress = self.check_should_compress(body, gzip_threshold, enable_gzip) + # header parameters header_params = header_params or {} config.update_request_header_params(resource_path, header_params) @@ -192,6 +178,33 @@ def __call_api( return (return_data, response_data.status, response_data.getheaders()) + def check_should_compress(self, body: bytearray, gzip_threshold: int, enable_gzip: bool) -> bool: + """ + Determines whether the given body should be compressed based on its size, + a defined threshold for compression, and a flag indicating whether + compression is enabled. + + This function evaluates whether the body meets the required criteria for + compression. Compression may be enabled explicitly or conditionally + based on the body size exceeding the provided threshold. + + :param body: The content to be evaluated for compression. + :type body: bytearray + :param gzip_threshold: The minimum size threshold for compression to be applied. + :type gzip_threshold: int + :param enable_gzip: A flag indicating whether gzip compression is enabled. + It can explicitly enable or disable compression, or conditionally + allow compression if the body size exceeds the threshold. + :type enable_gzip: bool + :return: Returns True if the body meets the criteria for compression; + otherwise, returns False. + :rtype: bool + """ + body_size = len(body) + if enable_gzip is True or (enable_gzip is not False and (gzip_threshold and body_size >= gzip_threshold)): + return True + return False + def sanitize_for_serialization(self, obj): """Build a JSON POST object. diff --git a/influxdb_client_3/write_client/_sync/rest.py b/influxdb_client_3/write_client/_sync/rest.py index 09da73a..f4d5299 100644 --- a/influxdb_client_3/write_client/_sync/rest.py +++ b/influxdb_client_3/write_client/_sync/rest.py @@ -1,15 +1,5 @@ # coding: utf-8 -""" -InfluxDB OSS API Service. - -The InfluxDB v2 API provides a programmatic interface for all interactions with InfluxDB. Access the InfluxDB API using the `/api/v2/` endpoint. # noqa: E501 - -OpenAPI spec version: 2.0.0 -Generated by: https://openapi-generator.tech -""" - - from __future__ import absolute_import import io @@ -28,11 +18,6 @@ class RESTResponse(io.IOBase): - """NOTE: This class is auto generated by OpenAPI Generator. - - Ref: https://openapi-generator.tech - Do not edit the class manually. - """ def __init__(self, resp): """Initialize with HTTP response.""" @@ -51,11 +36,6 @@ def getheader(self, name, default=None): class RESTClientObject(object): - """NOTE: This class is auto generated by OpenAPI Generator. - - Ref: https://openapi-generator.tech - Do not edit the class manually. - """ def __init__(self, configuration, pools_size=4, maxsize=None, retries=False): """Initialize REST client.""" diff --git a/influxdb_client_3/write_client/client/_base.py b/influxdb_client_3/write_client/client/_base.py index 32664d9..3b4bc1d 100644 --- a/influxdb_client_3/write_client/client/_base.py +++ b/influxdb_client_3/write_client/client/_base.py @@ -34,8 +34,52 @@ # noinspection PyMethodMayBeStatic class _BaseClient(object): - def __init__(self, url, token, debug=None, timeout=10_000, enable_gzip=False, org: str = None, + def __init__(self, url, token, debug=None, timeout=10_000, enable_gzip=False, gzip_threshold=None, org: str = None, default_tags: dict = None, http_client_logger: str = None, **kwargs) -> None: + """ + Initializes the configuration for an HTTP client with support for customizable settings + such as authentication token, debugging, timeout, and gzip compression. This class + encapsulates the client configuration, logging setup, and authentication mechanisms + to allow seamless interaction with an HTTP backend. + + :param url: The base URL for the HTTP client. + :type url: str + :param token: The authentication token to be used in requests. + :type token: str + :param debug: Enables debug mode for logging client operations. Defaults to None. + :type debug: bool, optional + :param timeout: The timeout duration for HTTP requests in milliseconds. Defaults to 10,000. + :type timeout: int + :param enable_gzip: Flag to enable or disable gzip compression. Defaults to False. + :type enable_gzip: bool + :param gzip_threshold: The threshold size for enabling gzip compression, if applicable. + :type gzip_threshold: int, optional + :param org: The organization identifier to be associated with the client. + :type org: str, optional + :param default_tags: A dictionary of default tags to add to outgoing requests for metadata purposes. + :type default_tags: dict, optional + :param http_client_logger: The logger name to use for HTTP client-specific logging. + :type http_client_logger: str, optional + :param kwargs: Additional optional parameters to customize the HTTP client: + + - verify_ssl: Flag to enable or disable SSL certificate verification. Defaults to True. + - ssl_ca_cert: Path to the CA certificate file for verifying SSL. Defaults to None. + - cert_file: Path to a client SSL certificate file for authentication. Defaults to None. + - cert_key_file: Path to the client’s SSL key file, if separate. Defaults to None. + - cert_key_password: Password for the client’s SSL key file, if applicable. Defaults to None. + - ssl_context: SSLContext object to configure custom SSL parameters. Defaults to None. + - proxy: Proxy server URL, if a proxy is to be used. Defaults to None. + - proxy_headers: A dictionary containing custom headers for the proxy server. Defaults to None. + - connection_pool_maxsize: Defines the maximum pool size for connections. + Default inherits from the configuration. + - retries: Determines if request retrying is enabled. Defaults to None. + - profilers: Profilers for performance tracking. Defaults to None. + - username: Username for basic authentication. Defaults to None. + - password: Password for basic authentication. Defaults to None. + - auth_scheme: Custom authentication scheme to use with the token. Defaults to "Token". + - auth_basic: Boolean flag to enable HTTP Basic Authentication. Defaults to False. + + """ self.url = url self.org = org @@ -47,6 +91,7 @@ def __init__(self, url, token, debug=None, timeout=10_000, enable_gzip=False, or else: self.conf.host = self.url self.conf.enable_gzip = enable_gzip + self.conf.gzip_threshold = gzip_threshold self.conf.verify_ssl = kwargs.get('verify_ssl', True) self.conf.ssl_ca_cert = kwargs.get('ssl_ca_cert', None) self.conf.cert_file = kwargs.get('cert_file', None) @@ -271,12 +316,14 @@ class _Configuration(Configuration): def __init__(self): Configuration.__init__(self) self.enable_gzip = False + self.gzip_threshold = None + self.should_compress = False self.username = None self.password = None def update_request_header_params(self, path: str, params: dict): super().update_request_header_params(path, params) - if self.enable_gzip: + if self.should_compress: # GZIP Request if path == '/api/v2/write': params["Content-Encoding"] = "gzip" @@ -292,7 +339,7 @@ def update_request_header_params(self, path: str, params: dict): def update_request_body(self, path: str, body): _body = super().update_request_body(path, body) - if self.enable_gzip: + if self.should_compress: # GZIP Request if path == '/api/v2/write': import gzip diff --git a/influxdb_client_3/write_client/client/influxdb_client.py b/influxdb_client_3/write_client/client/influxdb_client.py index 3dd29af..715c10f 100644 --- a/influxdb_client_3/write_client/client/influxdb_client.py +++ b/influxdb_client_3/write_client/client/influxdb_client.py @@ -4,6 +4,8 @@ import logging +from typing_extensions import deprecated + from influxdb_client_3.write_client.client._base import _BaseClient from influxdb_client_3.write_client.client.write_api import WriteApi, WriteOptions, PointSettings @@ -13,7 +15,9 @@ class InfluxDBClient(_BaseClient): """InfluxDBClient is client for InfluxDB v2.""" - def __init__(self, url, token: str = None, debug=None, timeout=10_000, enable_gzip=False, org: str = None, + def __init__(self, url, token: str = None, + debug=None, timeout=10_000, + enable_gzip=False, gzip_threshold=None, org: str = None, default_tags: dict = None, **kwargs) -> None: """ Initialize defaults. @@ -45,12 +49,16 @@ def __init__(self, url, token: str = None, debug=None, timeout=10_000, enable_gz except batching writes. As a default there is no one retry strategy. :key bool auth_basic: Set this to true to enable basic authentication when talking to a InfluxDB 1.8.x that does not use auth-enabled but is protected by a reverse proxy with basic authentication. - (defaults to false, don't set to true when talking to InfluxDB 2) + (defaults to false, don't set to true when talking to InfluxDB 2). + :key int gzip_threshold: If the payload size is larger than this gzip_threshold, then + the payload will be zipped. :key str username: ``username`` to authenticate via username and password credentials to the InfluxDB 2.x :key str password: ``password`` to authenticate via username and password credentials to the InfluxDB 2.x :key list[str] profilers: list of enabled Flux profilers """ - super().__init__(url=url, token=token, debug=debug, timeout=timeout, enable_gzip=enable_gzip, org=org, + + super().__init__(url=url, token=token, debug=debug, timeout=timeout, + enable_gzip=enable_gzip, gzip_threshold=gzip_threshold, org=org, default_tags=default_tags, http_client_logger="urllib3", **kwargs) from influxdb_client_3.write_client._sync.api_client import ApiClient @@ -167,6 +175,7 @@ def from_config_file(cls, config_file: str = "config.ini", debug=None, enable_gz return InfluxDBClient._from_config_file(config_file=config_file, debug=debug, enable_gzip=enable_gzip, **kwargs) @classmethod + @deprecated("Use up to date Env Properties") def from_env_properties(cls, debug=None, enable_gzip=False, **kwargs): """ Configure client via environment properties. diff --git a/influxdb_client_3/write_client/client/write_api.py b/influxdb_client_3/write_client/client/write_api.py index 2bcb612..2d1c15f 100644 --- a/influxdb_client_3/write_client/client/write_api.py +++ b/influxdb_client_3/write_client/client/write_api.py @@ -39,6 +39,14 @@ class WriteType(Enum): synchronous = 3 +DEFAULT_GZIP_THRESHOLD = 1000 + + +class DefaultWriteOptions(Enum): + write_type = WriteType.synchronous + write_precision = WritePrecision.NS + + class WriteOptions(object): """Write configuration.""" @@ -51,6 +59,9 @@ def __init__(self, write_type: WriteType = WriteType.batching, max_retry_time=180_000, exponential_base=2, max_close_wait=300_000, + write_precision=DEFAULT_WRITE_PRECISION, + gzip_threshold=None, + enable_gzip=False, write_scheduler=ThreadPoolScheduler(max_workers=1)) -> None: """ Create write api configuration. @@ -66,8 +77,11 @@ def __init__(self, write_type: WriteType = WriteType.batching, :param max_retry_delay: the maximum delay between each retry attempt in milliseconds :param max_retry_time: total timeout for all retry attempts in milliseconds, if 0 retry is disabled :param exponential_base: base for the exponential retry delay - :parama max_close_wait: the maximum time to wait for writes to be flushed if close() is called + :param max_close_wait: the maximum time to wait for writes to be flushed if close() is called + :param write_precision: the time precision for the data written to InfluxDB. :param write_scheduler: + :param gzip_threshold: if the payload size is larger than the gzip_threshold, the payload will be zipped. + :param enable_gzip: set true to enable to zip the payload. """ self.write_type = write_type self.batch_size = batch_size @@ -80,6 +94,9 @@ def __init__(self, write_type: WriteType = WriteType.batching, self.exponential_base = exponential_base self.write_scheduler = write_scheduler self.max_close_wait = max_close_wait + self.write_precision = write_precision + self.gzip_threshold = gzip_threshold + self.enable_gzip = enable_gzip def to_retry_strategy(self, **kwargs): """ @@ -290,7 +307,7 @@ def write(self, bucket: str, org: str = None, str, Iterable['str'], Point, Iterable['Point'], dict, Iterable['dict'], bytes, Iterable['bytes'], Observable, NamedTuple, Iterable['NamedTuple'], 'dataclass', Iterable['dataclass'] ] = None, - write_precision: WritePrecision = DEFAULT_WRITE_PRECISION, **kwargs) -> Any: + write_precision: WritePrecision = None, **kwargs) -> Any: """ Write time-series data into InfluxDB. @@ -361,6 +378,9 @@ def write(self, bucket: str, org: str = None, self._append_default_tags(record) + if write_precision is None: + write_precision = self._write_options.write_precision + if self._write_options.write_type is WriteType.batching: return self._write_batching(bucket, org, record, write_precision, **kwargs) @@ -443,8 +463,11 @@ def __del__(self): pass def _write_batching(self, bucket, org, data, - precision=DEFAULT_WRITE_PRECISION, + precision=None, **kwargs): + if precision is None: + precision = self._write_options.write_precision + if isinstance(data, bytes): _key = _BatchItemKey(bucket, org, precision) self._subject.on_next(_BatchItem(key=_key, data=data)) @@ -454,7 +477,8 @@ def _write_batching(self, bucket, org, data, precision, **kwargs) elif isinstance(data, Point): - self._write_batching(bucket, org, data.to_line_protocol(), data.write_precision, **kwargs) + write_precision = data.write_precision if data.write_precision is not None else precision + self._write_batching(bucket, org, data.to_line_protocol(), write_precision, **kwargs) elif isinstance(data, dict): self._write_batching(bucket, org, Point.from_dict(data, write_precision=precision, **kwargs), diff --git a/influxdb_client_3/write_client/configuration.py b/influxdb_client_3/write_client/configuration.py index 5793655..0e4c54e 100644 --- a/influxdb_client_3/write_client/configuration.py +++ b/influxdb_client_3/write_client/configuration.py @@ -1,15 +1,5 @@ # coding: utf-8 -""" -InfluxDB OSS API Service. - -The InfluxDB v2 API provides a programmatic interface for all interactions with InfluxDB. Access the InfluxDB API using the `/api/v2/` endpoint. # noqa: E501 - -OpenAPI spec version: 2.0.0 -Generated by: https://openapi-generator.tech -""" - - from __future__ import absolute_import import copy @@ -21,11 +11,6 @@ class TypeWithDefault(type): - """NOTE: This class is auto generated by OpenAPI Generator. - - Ref: https://openapi-generator.tech - Do not edit the class manually. - """ def __init__(cls, name, bases, dct): """Initialize with defaults.""" @@ -44,11 +29,6 @@ def set_default(cls, default): class Configuration(object, metaclass=TypeWithDefault): - """NOTE: This class is auto generated by OpenAPI Generator. - - Ref: https://openapi-generator.tech - Do not edit the class manually. - """ def __init__(self): """Initialize configuration.""" @@ -118,6 +98,10 @@ def __init__(self): # Safe chars for path_param self.safe_chars_for_path_param = '' + # Compression settings + self.enable_gzip = False + self.gzip_threshold = None + @property def logger_file(self): """Logger file. diff --git a/influxdb_client_3/write_client/domain/write_precision.py b/influxdb_client_3/write_client/domain/write_precision.py index 41a0db9..4917201 100644 --- a/influxdb_client_3/write_client/domain/write_precision.py +++ b/influxdb_client_3/write_client/domain/write_precision.py @@ -1,26 +1,10 @@ # coding: utf-8 -""" -InfluxDB OSS API Service. - -The InfluxDB v2 API provides a programmatic interface for all interactions with InfluxDB. Access the InfluxDB API using the `/api/v2/` endpoint. # noqa: E501 - -OpenAPI spec version: 2.0.0 -Generated by: https://openapi-generator.tech -""" - - import pprint import re # noqa: F401 class WritePrecision(object): - """NOTE: This class is auto generated by OpenAPI Generator. - - Ref: https://openapi-generator.tech - - Do not edit the class manually. - """ """ allowed enum values diff --git a/influxdb_client_3/write_client/rest.py b/influxdb_client_3/write_client/rest.py index 54f9e5c..e9a4652 100644 --- a/influxdb_client_3/write_client/rest.py +++ b/influxdb_client_3/write_client/rest.py @@ -1,14 +1,5 @@ # coding: utf-8 -""" -InfluxDB OSS API Service. - -The InfluxDB v2 API provides a programmatic interface for all interactions with InfluxDB. Access the InfluxDB API using the `/api/v2/` endpoint. # noqa: E501 - -OpenAPI spec version: 2.0.0 -Generated by: https://openapi-generator.tech -""" - from __future__ import absolute_import import logging @@ -21,11 +12,6 @@ class ApiException(InfluxDBError): - """NOTE: This class is auto generated by OpenAPI Generator. - - Ref: https://openapi-generator.tech - Do not edit the class manually. - """ def __init__(self, status=None, reason=None, http_resp=None): """Initialize with HTTP response.""" diff --git a/influxdb_client_3/write_client/service/signin_service.py b/influxdb_client_3/write_client/service/signin_service.py index 97f3fcf..ae64b3b 100644 --- a/influxdb_client_3/write_client/service/signin_service.py +++ b/influxdb_client_3/write_client/service/signin_service.py @@ -1,15 +1,5 @@ # coding: utf-8 -""" -InfluxDB OSS API Service. - -The InfluxDB v2 API provides a programmatic interface for all interactions with InfluxDB. Access the InfluxDB API using the `/api/v2/` endpoint. # noqa: E501 - -OpenAPI spec version: 2.0.0 -Generated by: https://openapi-generator.tech -""" - - from __future__ import absolute_import import re # noqa: F401 @@ -18,12 +8,6 @@ class SigninService(_BaseService): - """NOTE: This class is auto generated by OpenAPI Generator. - - Ref: https://openapi-generator.tech - - Do not edit the class manually. - """ def __init__(self, api_client=None): # noqa: E501,D401,D403 """SigninService - a operation defined in OpenAPI.""" diff --git a/influxdb_client_3/write_client/service/signout_service.py b/influxdb_client_3/write_client/service/signout_service.py index fdf8b4d..c7dbdbc 100644 --- a/influxdb_client_3/write_client/service/signout_service.py +++ b/influxdb_client_3/write_client/service/signout_service.py @@ -1,15 +1,5 @@ # coding: utf-8 -""" -InfluxDB OSS API Service. - -The InfluxDB v2 API provides a programmatic interface for all interactions with InfluxDB. Access the InfluxDB API using the `/api/v2/` endpoint. # noqa: E501 - -OpenAPI spec version: 2.0.0 -Generated by: https://openapi-generator.tech -""" - - from __future__ import absolute_import import re # noqa: F401 @@ -18,12 +8,6 @@ class SignoutService(_BaseService): - """NOTE: This class is auto generated by OpenAPI Generator. - - Ref: https://openapi-generator.tech - - Do not edit the class manually. - """ def __init__(self, api_client=None): # noqa: E501,D401,D403 """SignoutService - a operation defined in OpenAPI.""" diff --git a/influxdb_client_3/write_client/service/write_service.py b/influxdb_client_3/write_client/service/write_service.py index 23dc791..b9e0b0b 100644 --- a/influxdb_client_3/write_client/service/write_service.py +++ b/influxdb_client_3/write_client/service/write_service.py @@ -1,15 +1,5 @@ # coding: utf-8 -""" -InfluxDB OSS API Service. - -The InfluxDB v2 API provides a programmatic interface for all interactions with InfluxDB. Access the InfluxDB API using the `/api/v2/` endpoint. # noqa: E501 - -OpenAPI spec version: 2.0.0 -Generated by: https://openapi-generator.tech -""" - - from __future__ import absolute_import import re # noqa: F401 @@ -18,12 +8,6 @@ class WriteService(_BaseService): - """NOTE: This class is auto generated by OpenAPI Generator. - - Ref: https://openapi-generator.tech - - Do not edit the class manually. - """ def __init__(self, api_client=None): # noqa: E501,D401,D403 """WriteService - a operation defined in OpenAPI.""" diff --git a/tests/test_api_client.py b/tests/test_api_client.py index 9976cfb..66e1c3e 100644 --- a/tests/test_api_client.py +++ b/tests/test_api_client.py @@ -139,3 +139,49 @@ def test_api_error_headers(self): self.assertEqual(headers['Trace-Sampled'], 'false') self.assertEqual(headers['X-Influxdb-Request-Id'], requestid) self.assertEqual(headers['X-Influxdb-Build'], 'Mock') + + def test_check_should_compress_true(self): + conf = Configuration() + client = ApiClient(conf) + + # len of body = 20 + body = bytearray("12345678901234567890".encode("utf-8")) + tests = [ + { + 'gzip_threshold': 10, + 'enable_gzip': True, + 'expected': True + }, + { + 'gzip_threshold': 30, + 'enable_gzip': True, + 'expected': True + }, + { + 'gzip_threshold': None, + 'enable_gzip': True, + 'expected': True + }, + { + 'gzip_threshold': 30, + 'enable_gzip': None, + 'expected': False + }, + { + 'gzip_threshold': 30, + 'enable_gzip': False, + 'expected': False + }, + { + 'gzip_threshold': 10, + 'enable_gzip': None, + 'expected': True + }, + ] + + for test in tests: + gzip_threshold = test['gzip_threshold'] + enable_gzip = test['enable_gzip'] + expected = test['expected'] + result = client.check_should_compress(body, gzip_threshold, enable_gzip) + self.assertEqual(result, expected) diff --git a/tests/test_influxdb_client_3.py b/tests/test_influxdb_client_3.py index 1b52b76..9892d8b 100644 --- a/tests/test_influxdb_client_3.py +++ b/tests/test_influxdb_client_3.py @@ -1,7 +1,7 @@ import unittest from unittest.mock import patch -from influxdb_client_3 import InfluxDBClient3 +from influxdb_client_3 import InfluxDBClient3, WritePrecision, DefaultWriteOptions from tests.util import asyncio_run from tests.util.mocks import ConstantFlightServer, ConstantData @@ -74,6 +74,99 @@ async def test_query_async(self): assert {'data': 'sql_query', 'reference': query, 'value': -1.0} in result_list assert {'data': 'query_type', 'reference': 'sql', 'value': -1.0} in result_list + def test_default_client(self): + expected_precision = DefaultWriteOptions.write_precision.value + expected_write_type = DefaultWriteOptions.write_type.value + expected_gzip_threshold = None + expected_gzip_enabled = False + + def verify_client_write_options(c): + write_options = c._write_client_options.get('write_options') + self.assertEqual(write_options.write_precision, expected_precision) + self.assertEqual(write_options.write_type, expected_write_type) + self.assertEqual(write_options.gzip_threshold, expected_gzip_threshold) + self.assertEqual(write_options.enable_gzip, expected_gzip_enabled) + + self.assertEqual(c._write_api._write_options.write_precision, expected_precision) + self.assertEqual(c._write_api._write_options.write_type, expected_write_type) + + env_client = InfluxDBClient3.from_env() + verify_client_write_options(env_client) + + default_client = InfluxDBClient3() + verify_client_write_options(default_client) + + @patch.dict('os.environ', {'INFLUX_HOST': 'localhost', 'INFLUX_TOKEN': 'test_token', + 'INFLUX_DATABASE': 'test_db', 'INFLUX_ORG': 'test_org', + 'INFLUX_PRECISION': WritePrecision.MS, 'INFLUX_GZIP_THRESHOLD': '2000', + 'INFLUX_TIMEOUT': '6000', 'INFLUX_VERIFY_SSL': 'False', + 'INFLUX_CERT_FILE': 'path_to_cert', 'INFLUX_CERT_KEY_FILE': 'path_to_cert_key', + 'INFLUX_CERT_KEY_PASSWORD': 'cert_key_password', 'INFLUX_CONNECTION_POOL_MAXSIZE': '200', + 'INFLUX_PROFILERS': 'prof1,prof2, prof3', 'INFLUX_TAG_TAG1': 'Tag1', + 'INFLUX_TAG_TAG2': 'Tag2'}) + def test_from_env_all_env_vars_set(self): + client = InfluxDBClient3.from_env() + self.assertIsInstance(client, InfluxDBClient3) + self.assertEqual(client._client.url, "https://localhost:443") + self.assertEqual(client._database, "test_db") + self.assertEqual(client._org, "test_org") + self.assertEqual(client._token, "test_token") + write_options = client._write_client_options.get("write_options") + self.assertEqual(write_options.write_precision, WritePrecision.MS) + self.assertEqual(write_options.gzip_threshold, 2000) + self.assertEqual(client._client.conf.verify_ssl, False) + self.assertEqual(client._client.conf.cert_file, 'path_to_cert') + self.assertEqual(client._client.conf.cert_key_file, 'path_to_cert_key') + self.assertEqual(client._client.conf.cert_key_password, 'cert_key_password') + self.assertEqual(client._client.conf.connection_pool_maxsize, 200) + self.assertEqual(client._client.conf.timeout, 6000) + self.assertEqual(client._client.profilers, ['prof1', 'prof2', 'prof3']) + self.assertEqual(client._client.default_tags['tag1'], 'Tag1') + self.assertEqual(client._client.default_tags['tag2'], 'Tag2') + client._write_api._point_settings = {} + + @patch.dict('os.environ', {'INFLUX_HOST': 'localhost', 'INFLUX_TOKEN': 'test_token', + 'INFLUX_DATABASE': 'test_db', 'INFLUX_SSL_CA_CERT': 'invalid/path'}) + def test_from_env_invalid_ssl_cert(self): + with self.assertRaises(FileNotFoundError) as context: + InfluxDBClient3.from_env() + self.assertIn("No such file or directory: 'invalid/path'", str(context.exception)) + + @patch.dict('os.environ', {'INFLUX_HOST': "", 'INFLUX_TOKEN': "", + 'INFLUX_DATABASE': "", 'INFLUX_ORG': ""}) + def test_from_env_missing_variables(self): + with self.assertRaises(ValueError) as context: + InfluxDBClient3.from_env() + self.assertIn("Missing required environment variables", str(context.exception)) + + @patch.dict('os.environ', {'INFLUX_HOST': 'localhost', 'INFLUX_TOKEN': 'test_token', + 'INFLUX_DATABASE': 'test_db', 'INFLUX_PRECISION': WritePrecision.MS}) + def test_parse_valid_write_precision(self): + client = InfluxDBClient3.from_env() + self.assertIsInstance(client, InfluxDBClient3) + self.assertEqual(client._write_client_options.get('write_options').write_precision, WritePrecision.MS) + + @patch.dict('os.environ', {'INFLUX_HOST': 'localhost', 'INFLUX_TOKEN': 'test_token', + 'INFLUX_DATABASE': 'test_db', 'INFLUX_PRECISION': 'invalid_value'}) + def test_parse_invalid_write_precision(self): + with self.assertRaises(ValueError) as context: + InfluxDBClient3.from_env() + self.assertIn("Invalid precision value: invalid_value", str(context.exception)) + + @patch.dict('os.environ', {'INFLUX_HOST': 'localhost', 'INFLUX_TOKEN': 'test_token', + 'INFLUX_DATABASE': 'test_db', 'INFLUX_GZIP_THRESHOLD': '2000'}) + def test_parse_valid_gzip_threshold(self): + client = InfluxDBClient3.from_env() + self.assertIsInstance(client, InfluxDBClient3) + self.assertEqual(client._write_client_options.get('write_options').gzip_threshold, 2000) + + @patch.dict('os.environ', {'INFLUX_HOST': 'localhost', 'INFLUX_TOKEN': 'test_token', + 'INFLUX_DATABASE': 'test_db', 'INFLUX_GZIP_THRESHOLD': 'invalid'}) + def test_parse_invalid_gzip_threshold(self): + with self.assertRaises(ValueError) as context: + InfluxDBClient3.from_env() + self.assertIn("Must be integer", str(context.exception)) + if __name__ == '__main__': unittest.main() diff --git a/tests/test_influxdb_client_3_integration.py b/tests/test_influxdb_client_3_integration.py index 48181f7..a13b3a1 100644 --- a/tests/test_influxdb_client_3_integration.py +++ b/tests/test_influxdb_client_3_integration.py @@ -4,6 +4,7 @@ import string import time import unittest +from unittest.mock import patch import pyarrow import pytest @@ -274,3 +275,23 @@ async def test_verify_query_async(self): result_list = result.to_pylist() for item in data: assert lp_to_py_object(item) in result_list, f"original lp data \"{item}\" should be in result list" + + def test_from_env(self): + with InfluxDBClient3.from_env() as client: + id_test = time.time_ns() + client.write(f"integration_test_python,type=used value=123.0,id_test={id_test}i") + + sql = 'SELECT * FROM integration_test_python where type=$type and id_test=$id_test' + data = client.query(sql, mode="pandas", query_parameters={'type': 'used', 'id_test': id_test}) + + self.assertIsNotNone(data) + self.assertEqual(1, len(data)) + self.assertEqual(id_test, data['id_test'][0]) + self.assertEqual(123.0, data['value'][0]) + + @patch.dict('os.environ', {'INFLUX_AUTH_SCHEME': 'invalid_schema'}) + def test_from_env_invalid_auth_schema(self): + with InfluxDBClient3.from_env() as client: + with self.assertRaises(InfluxDBError) as err: + client.write("integration_test_python,type=used value=123.0") + self.assertEqual('unauthorized access', err.exception.message) diff --git a/tests/test_polars.py b/tests/test_polars.py index 435b4a2..dfd7c34 100644 --- a/tests/test_polars.py +++ b/tests/test_polars.py @@ -43,6 +43,7 @@ def setUp(self): database="my_db", token="my_token" ) + self.client._write_api._point_settings = PointSettings() def test_write_polars(self): import polars as pl @@ -77,6 +78,7 @@ def test_write_polars_batching(self): write_options=WriteOptions(batch_size=2) ) ) + self.client._write_api._point_settings = PointSettings() self.client._write_api._write_options = WriteOptions(batch_size=2) self.client._write_api._write_service = Mock(spec=WriteService)