From c9062ccbc9626e0fa7e4fd269dba313e2bbacfee Mon Sep 17 00:00:00 2001 From: Martin Schroeder Date: Wed, 28 Jan 2026 15:38:20 +0000 Subject: [PATCH] fixed syntax of the InfluxDatabase model --- pyscada/influxdb/models.py | 68 ++++++++++++++++++++++++-------------- 1 file changed, 44 insertions(+), 24 deletions(-) diff --git a/pyscada/influxdb/models.py b/pyscada/influxdb/models.py index 21ab4ab..212af07 100644 --- a/pyscada/influxdb/models.py +++ b/pyscada/influxdb/models.py @@ -12,23 +12,28 @@ from django.db import models from influxdb_client import InfluxDBClient, Point, WritePrecision from influxdb_client.client.write_api import SYNCHRONOUS -from pyscada.models import DataSource +from pyscada.models import DataSource, DjangoDatabase tz_local = pytz.timezone(settings.TIME_ZONE) -class InfluxDatabase(DjangoDatabase): + +class InfluxDatabase(models.Model): + datasource = models.OneToOneField(DataSource, on_delete=models.CASCADE) + + def __str__(self): + return f"Influx Database ({self.url}.{self.bucket})" bucket = models.CharField(max_length=255) api_key = models.CharField(max_length=255) organisation = models.CharField(max_length=255) write_precision = models.CharField(default="ms", max_length=2) - url = models.CharField(default="127.0.0.1:8086") + url = models.CharField(default="127.0.0.1:8086", max_length=255) measurement_name = models.CharField(default="pyscada.models.RecordedData", max_length=255) only_write_to_influxdb = models.BooleanField(default=True, help_text="when selected only a copy of the data is written to the InfluxDB and the SQL Database is used for everything else") def connect(self): self.client = InfluxDBClient(url=self.url, token=self.token, org=self.organisation) return self.client - + def get_write_api(self): if self.client is None: self.connect() @@ -38,7 +43,9 @@ class InfluxDatabase(DjangoDatabase): if self.client is None: self.connect() return self.client.query_api() - + + def get_django_database(): + return DataSource.objects.first() # the django database datasource is always the first element in the database def create_data_element_from_variable(self, variable, value, timestamp, **kwargs): if value is not None: @@ -49,22 +56,27 @@ class InfluxDatabase(DjangoDatabase): if hasattr(variable, "date_saved") else now() ) - return Point(self.measurement_name) - #.field("id", int(int(int(timestamp) * 2097152) + variable.pk)) - .tag("variable_id", variable.pk) # has to be a tag to be easily filtered by, even if it is an numeric value + #.field("id", int(int(int(timestamp) * 2097152) + variable.pk)) + # variable_id and device_id had to be a tag to be easily filtered by, even if it is an numeric value + point = ( + Point(self.measurement_name) + .tag("variable_id", variable.pk) .tag("device_protocol", variable.device.protocol.protocol) - .tag"device_id", variable.device_id) # has to be a tag to be easily filtered by, even if it is an numeric value + .tag("device_id", variable.device_id) .tag("value_class", variable.value_class) .tag("unit", str(variable.unit)) .field("value", value) .field("date_saved", date_saved) - .timestamp(timestamp / 1000) + .timestamp(timestamp / 1000.0) + ) + return point return None - + def last_value(self, **kwargs): if self.only_write_to_influxdb: - return super(self).last_value(**kwargs) - + django_database = self.get_django_database() + return django_database.last_value(**kwargs) + variable = kwargs.pop("variable") if "variable" in kwargs else None use_date_saved = ( kwargs.pop("use_date_saved") if "use_date_saved" in kwargs else False @@ -72,16 +84,17 @@ class InfluxDatabase(DjangoDatabase): query = f'from(bucket: "{self.bucket}") |> range(start: {time_min}) |> filter(fn:(r) => r._measurement == "{self.measurement_name}" ) |> filter(fn:(r) => r.variable_id == "{variable_id}") |> filter(fn:(r) => r._field == "value") |> keep(columns: ["_time","_value"]) |> last()' r = query_api.query(query) r = r.to_values(columns=['_time','_value'])[0] - + if len(r) == 0: return None return [r[-1][0].timestamp(), r[-1][1]] - + def read_multiple(self, **kwargs): if self.only_write_to_influxdb: - return super(self).read_multiple(**kwargs) - + django_database = self.get_django_database() + return django_database.read_multiple(**kwargs) + variable_ids = kwargs.pop("variable_ids") if "variable_ids" in kwargs else [] time_min = kwargs.pop("time_min") if "time_min" in kwargs else 0 time_max = kwargs.pop("time_max") if "time_max" in kwargs else time.time() @@ -111,7 +124,7 @@ class InfluxDatabase(DjangoDatabase): query = f'from(bucket: "{self.bucket}") |> range(start: {time_min}, stop: {time_max}) |> filter(fn:(r) => r._measurement == "{self.measurement_name}" ) |> filter(fn:(r) => r.variable_id == "{variable_id}") |> filter(fn:(r) => r._field == "value") |> keep(columns: ["_time","_value"])' r = query_api.query(query) values[variable_id] = [ [i_time.timestamp(), i_value] for i_time, i_value in r.to_values(["_time","_value"])] - tmp_time_max = max(tmp_time_max, max([i_time for i_time, i_value in values[variable_id]]) + tmp_time_max = max(tmp_time_max, max([i_time for i_time, i_value in values[variable_id]])) query = f'from(bucket: "{self.bucket}") |> range(start: {time_min}, stop: {time_max}) |> filter(fn:(r) => r._measurement == "{self.measurement_name}" ) |> filter(fn:(r) => r.variable_id == "{variable_id}") |> filter(fn:(r) => r._field == "date_saved") |> max()' r = query_api.query(query) date_saved_max = max(date_saved_max, r.to_values(["_value"])[0][0]) @@ -122,8 +135,10 @@ class InfluxDatabase(DjangoDatabase): return values def write_multiple(self, **kwargs): - if self.only_write_to_influxdb: - data_model = self._import_model() + if self.only_write_to_influxdb: + django_database = self.get_django_database() + data_model = django_database._import_model() + items = kwargs.pop("items") if "items" in kwargs else [] items = self.datasource.datasource_check(items) # FIXME what is happening here? points = [] @@ -179,15 +194,20 @@ class InfluxDatabase(DjangoDatabase): item.date_saved = None def get_first_element_timestamp(self, **kwargs): - """this will likly time out and should be considert non functioning! + """ + this will likly time out and should be considert non functioning! """ if self.only_write_to_influxdb: - return super(self).get_first_element_timestamp(**kwargs) + django_database = self.get_django_database() + return django_database.get_first_element_timestamp(**kwargs) + return self.get_edge_element_timestamp(first_last="first", **kwargs) def get_last_element_timestamp(self, **kwargs): if self.only_write_to_influxdb: - return super(self).get_last_element_timestamp(**kwargs) + django_database = self.get_django_database() + return django_database.get_last_element_timestamp(**kwargs) + return self.get_edge_element_timestamp(first_last="last", **kwargs) def get_edge_element_timestamp(self, first_last="last", **kwargs): @@ -223,4 +243,4 @@ class InfluxDatabase(DjangoDatabase): if first_last == "last": return r[-1].timestamp() elif first_last == "first": - return r[0].timestamp() \ No newline at end of file + return r[0].timestamp()