From d42c1400d7dc1fbdd1cc5b1b9b610f65b12fae21 Mon Sep 17 00:00:00 2001 From: Martin Schroeder Date: Tue, 3 Feb 2026 17:28:09 +0000 Subject: [PATCH] wip --- pyscada/influxdb/models.py | 65 ++++++++++++++------------------------ 1 file changed, 23 insertions(+), 42 deletions(-) diff --git a/pyscada/influxdb/models.py b/pyscada/influxdb/models.py index 30d9be5..c874de7 100644 --- a/pyscada/influxdb/models.py +++ b/pyscada/influxdb/models.py @@ -13,7 +13,8 @@ from django.utils.timezone import now from influxdb_client import InfluxDBClient, Point from influxdb_client.client.write_api import SYNCHRONOUS -from pyscada.models import DataSource, DjangoDatabase, Variable +from pyscada.models import DataSource, Variable +from pyscada.django_datasource.models import DjangoDatabase tz_local = pytz.timezone(settings.TIME_ZONE) logger = logging.getLogger(__name__) @@ -86,16 +87,15 @@ class InfluxDatabase(models.Model): # if self.write_precision == "s": return int(timestamp) - def create_data_element_from_variable(self, variable, timestamp, value, **kwargs): + def create_data_element_from_variable(self, variable, timestamp, value, date_saved=None, **kwargs): if value is None: return None if timestamp is None: return None - date_saved = kwargs.get( - "date_saved", - variable.date_saved if hasattr(variable, "date_saved") else now(), - ) - + if date_saved is None: + date_saved = now() + if hasattr(variable, "date_saved") and variable.date_saved is not None: + date_saved = variable.date_saved # variable_id and device_id had to be a tag to be easily # filtered by, even if it is an numeric value logger.debug(f"{variable} {timestamp} {value}") @@ -108,19 +108,15 @@ class InfluxDatabase(models.Model): .tag("unit", str(variable.unit)) .field("value", value) .field("date_saved", date_saved.timestamp()) - .time(datetime.utcfromtimestamp(timestamp/1000.0)) + .time(datetime.utcfromtimestamp(timestamp)) ) return point - def last_datapoint(self, **kwargs): + def last_datapoint(self, variable=None, use_date_saved=False, **kwargs): if self.only_write_to_influxdb: django_database = self.get_django_database() - return django_database.last_datapoint(**kwargs) + return django_database.last_datapoint(variable=variable, use_date_saved=use_date_saved, **kwargs) - variable = kwargs.pop("variable") if "variable" in kwargs else None - use_date_saved = ( - kwargs.pop("use_date_saved") if "use_date_saved" in kwargs else False - ) start_time = "-24h" query = f'from(bucket: "{self.bucket}") ' query += f"|> range(start: {start_time}) " @@ -136,27 +132,16 @@ class InfluxDatabase(models.Model): return None return [r[0][0].timestamp(), r[0][1]] - def query_datapoints(self, **kwargs): - variable_ids = kwargs.pop("variable_ids") if "variable_ids" in kwargs else [] - time_min = kwargs.pop("time_min") if "time_min" in kwargs else 0 - time_max = kwargs.pop("time_max") if "time_max" in kwargs else time.time() - time_in_ms = kwargs.pop("time_in_ms") if "time_in_ms" in kwargs else True - query_first_value = ( - kwargs.pop("query_first_value") if "query_first_value" in kwargs else False - ) + def query_datapoints(self, variable_ids, time_min=0, time_max=None, query_first_value=False, **kwargs): + + + if time_max is None: + time_max = time.time() + variable_ids = self.datasource.datasource_check( variable_ids, items_as_id=True, ids_model=Variable ) - if kwargs.get("time_min_excluded", False): - time_min = time_min + 0.001 - if kwargs.get("time_max_excluded", False): - time_max = time_max - 0.001 - - if time_in_ms: - f_time_scale = 1000 - else: - f_time_scale = 1 if self.only_write_to_influxdb: return ( self.get_django_database() @@ -165,15 +150,14 @@ class InfluxDatabase(models.Model): variable_ids=variable_ids, time_min=time_min, time_max=time_max, - time_in_ms=time_in_ms, query_first_value=query_first_value, **kwargs, ) ) values = {} query_api = self.get_query_api() - tmp_time_max = time_min * f_time_scale - date_saved_max = time_min * f_time_scale + tmp_time_max = time_min + date_saved_max = time_min for variable_id in variable_ids: query = f'from(bucket: "{self.bucket}") ' query += f"|> range(start: {self.to_flux_time(time_min)}, " @@ -186,7 +170,7 @@ class InfluxDatabase(models.Model): query += '|> keep(columns: ["_time","_value"])' r = query_api.query(query) new_data = [ - [i_time.timestamp() * f_time_scale, i_value] + [i_time.timestamp(), i_value] for i_time, i_value in r.to_values(["_time", "_value"]) ] if len(new_data) == 0: @@ -212,17 +196,14 @@ class InfluxDatabase(models.Model): return values - def write_datapoints(self, **kwargs): + def write_datapoints(self, items:list, date_saved=None,batch_size=5000, **kwargs): if self.only_write_to_influxdb or True: django_database = self.get_django_database() data_model = django_database._import_model() - items = kwargs.pop("items") if "items" in kwargs else [] items = self.datasource.datasource_check(items) points = [] recordings = [] - date_saved = kwargs.pop("date_saved") if "date_saved" in kwargs else now() - batch_size = kwargs.pop("batch_size") if "batch_size" in kwargs else 5000 i = 0 logger.debug(f"write_datapoints {items}") for item in items: @@ -235,15 +216,15 @@ class InfluxDatabase(models.Model): # create_data_element_from_variable function) if not hasattr(item, "date_saved") or item.date_saved is None: - item.date_saved = date_saved + # create the recorded data object point = self.create_data_element_from_variable( - item, cached_value[0], cached_value[1], **kwargs + variable=item, timestamp=cached_value[0], value=cached_value[1], date_saved=date_saved, **kwargs ) if self.only_write_to_influxdb or True: rc = data_model.objects.create_data_element_from_variable( - item, cached_value[0], cached_value[0], **kwargs + variable=item, value=cached_value[1], timestamp=cached_value[0], date_saved=date_saved, **kwargs ) if rc is not None: recordings.append(rc)