From b2014e23bef87430e9d9c103dc28a79b1919ff8f Mon Sep 17 00:00:00 2001 From: Martin Schroeder Date: Wed, 28 Jan 2026 20:56:56 +0000 Subject: [PATCH] fixed multible_read --- pyscada/influxdb/models.py | 50 +++++++++++++++++++++++--------------- 1 file changed, 31 insertions(+), 19 deletions(-) diff --git a/pyscada/influxdb/models.py b/pyscada/influxdb/models.py index a203e63..a5a249a 100644 --- a/pyscada/influxdb/models.py +++ b/pyscada/influxdb/models.py @@ -15,7 +15,7 @@ from django.db.utils import IntegrityError, ProgrammingError from influxdb_client import InfluxDBClient, Point, WritePrecision from influxdb_client.client.write_api import SYNCHRONOUS -from pyscada.models import DataSource, DjangoDatabase +from pyscada.models import DataSource, DjangoDatabase, Variable tz_local = pytz.timezone(settings.TIME_ZONE) @@ -38,6 +38,7 @@ class InfluxDatabase(models.Model): only_write_to_influxdb = models.BooleanField(default=True, help_text="when selected only a copy of the data is written to the InfluxDB and the SQL Database is used for everything else") client = None + write_api = None def connect(self): self.client = InfluxDBClient(url=self.url, token=self.api_key, org=self.organisation) @@ -56,6 +57,21 @@ class InfluxDatabase(models.Model): def get_django_database(self): return DjangoDatabase.objects.first() # the django database datasource is always the first element in the database + def write_points(self, points): + write_api = self.get_write_api() + try: + write_api.write(bucket=self.bucket, org=self.organisation, record=points, write_precision=self.write_precision) + except: + logger.debug(f'Error in Point Data {", ".join(str(i.time) for i in points)}') + + def to_flux_time(self, timestamp): + #if self.write_precision == "ms": + # return int(timestamp*1000) + #if self.write_precision == "ns": + # return int(timestamp*1000*1000) + #if self.write_precision == "s": + return int(timestamp) + def create_data_element_from_variable(self, variable, timestamp, value, **kwargs): if value is not None: date_saved = ( @@ -91,7 +107,7 @@ class InfluxDatabase(models.Model): use_date_saved = ( kwargs.pop("use_date_saved") if "use_date_saved" in kwargs else False ) - query = f'from(bucket: "{self.bucket}") |> range(start: {time_min}) |> filter(fn:(r) => r._measurement == "{self.measurement_name}" ) |> filter(fn:(r) => r.variable_id == "{variable_id}") |> filter(fn:(r) => r._field == "value") |> keep(columns: ["_time","_value"]) |> last()' + query = f'from(bucket: "{self.bucket}") |> range(start: {self.to_flux_time(time_min)}) |> filter(fn:(r) => r._measurement == "{self.measurement_name}" ) |> filter(fn:(r) => r.variable_id == "{variable_id}") |> filter(fn:(r) => r._field == "value") |> keep(columns: ["_time","_value"]) |> last()' r = query_api.query(query) r = r.to_values(columns=['_time','_value'])[0] @@ -128,19 +144,22 @@ class InfluxDatabase(models.Model): values = {} query_api = self.get_query_api() - tmp_time_max = time_min - date_saved_max = time_min + tmp_time_max = time_min*f_time_scale + date_saved_max = time_min*f_time_scale for variable_id in variable_ids: - query = f'from(bucket: "{self.bucket}") |> range(start: {time_min}, stop: {time_max}) |> filter(fn:(r) => r._measurement == "{self.measurement_name}" ) |> filter(fn:(r) => r.variable_id == "{variable_id}") |> filter(fn:(r) => r._field == "value") |> keep(columns: ["_time","_value"])' + query = f'from(bucket: "{self.bucket}") |> range(start: {self.to_flux_time(time_min)}, stop: {self.to_flux_time(time_max)}) |> filter(fn:(r) => r._measurement == "{self.measurement_name}" ) |> filter(fn:(r) => r.variable_id == "{variable_id}") |> filter(fn:(r) => r._field == "value") |> keep(columns: ["_time","_value"])' r = query_api.query(query) - values[variable_id] = [ [i_time.timestamp(), i_value] for i_time, i_value in r.to_values(["_time","_value"])] + new_data = [ [i_time.timestamp()*f_time_scale, i_value] for i_time, i_value in r.to_values(["_time","_value"])] + if len(new_data) == 0: + continue + values[variable_id] = new_data tmp_time_max = max(tmp_time_max, max([i_time for i_time, i_value in values[variable_id]])) - query = f'from(bucket: "{self.bucket}") |> range(start: {time_min}, stop: {time_max}) |> filter(fn:(r) => r._measurement == "{self.measurement_name}" ) |> filter(fn:(r) => r.variable_id == "{variable_id}") |> filter(fn:(r) => r._field == "date_saved") |> max()' + query = f'from(bucket: "{self.bucket}") |> range(start: {self.to_flux_time(time_min)}, stop: {self.to_flux_time(time_max)}) |> filter(fn:(r) => r._measurement == "{self.measurement_name}" ) |> filter(fn:(r) => r.variable_id == "{variable_id}") |> filter(fn:(r) => r._field == "date_saved") |> max()' r = query_api.query(query) date_saved_max = max(date_saved_max, r.to_values(["_value"])[0][0]) - - values["timestamp"] = tmp_time_max * f_time_scale - values["date_saved_max"] = date_saved_max * f_time_scale + #values["query"] = query + values["timestamp"] = tmp_time_max + values["date_saved_max"] = date_saved_max return values @@ -154,7 +173,6 @@ class InfluxDatabase(models.Model): points = [] recordings = [] date_saved = kwargs.pop("date_saved") if "date_saved" in kwargs else now() - write_api = self.get_write_api() batch_size = kwargs.pop("batch_size") if "batch_size" in kwargs else 5000 i = 0 for item in items: @@ -183,10 +201,7 @@ class InfluxDatabase(models.Model): if i%batch_size == 0: - try: - write_api.write(bucket=self.bucket, org=self.organisation, record=points, write_precision=self.write_precision) - except: - logger.debug(f'Error in Point Data {", ".join(str(i.time) for i in points)}') + self.write_points(points) points = [] if self.only_write_to_influxdb: @@ -201,10 +216,7 @@ class InfluxDatabase(models.Model): ) if len(points) > 0: - try: - write_api.write(bucket=self.bucket, org=self.organisation, record=points, write_precision=self.write_precision) - except: - logger.debug(f'Error in Point Data {", ".join(str(i.time) for i in points)}') + self.write_points(points) for item in items: item.date_saved = None