fixed syntax of the InfluxDatabase model

This commit is contained in:
Martin Schroeder 2026-01-28 15:38:20 +00:00
parent bb2febe2e6
commit c9062ccbc9

View File

@ -12,23 +12,28 @@ from django.db import models
from influxdb_client import InfluxDBClient, Point, WritePrecision from influxdb_client import InfluxDBClient, Point, WritePrecision
from influxdb_client.client.write_api import SYNCHRONOUS from influxdb_client.client.write_api import SYNCHRONOUS
from pyscada.models import DataSource from pyscada.models import DataSource, DjangoDatabase
tz_local = pytz.timezone(settings.TIME_ZONE) tz_local = pytz.timezone(settings.TIME_ZONE)
class InfluxDatabase(DjangoDatabase):
class InfluxDatabase(models.Model):
datasource = models.OneToOneField(DataSource, on_delete=models.CASCADE)
def __str__(self):
return f"Influx Database ({self.url}.{self.bucket})"
bucket = models.CharField(max_length=255) bucket = models.CharField(max_length=255)
api_key = models.CharField(max_length=255) api_key = models.CharField(max_length=255)
organisation = models.CharField(max_length=255) organisation = models.CharField(max_length=255)
write_precision = models.CharField(default="ms", max_length=2) write_precision = models.CharField(default="ms", max_length=2)
url = models.CharField(default="127.0.0.1:8086") url = models.CharField(default="127.0.0.1:8086", max_length=255)
measurement_name = models.CharField(default="pyscada.models.RecordedData", max_length=255) measurement_name = models.CharField(default="pyscada.models.RecordedData", max_length=255)
only_write_to_influxdb = models.BooleanField(default=True, help_text="when selected only a copy of the data is written to the InfluxDB and the SQL Database is used for everything else") only_write_to_influxdb = models.BooleanField(default=True, help_text="when selected only a copy of the data is written to the InfluxDB and the SQL Database is used for everything else")
def connect(self): def connect(self):
self.client = InfluxDBClient(url=self.url, token=self.token, org=self.organisation) self.client = InfluxDBClient(url=self.url, token=self.token, org=self.organisation)
return self.client return self.client
def get_write_api(self): def get_write_api(self):
if self.client is None: if self.client is None:
self.connect() self.connect()
@ -38,7 +43,9 @@ class InfluxDatabase(DjangoDatabase):
if self.client is None: if self.client is None:
self.connect() self.connect()
return self.client.query_api() return self.client.query_api()
def get_django_database():
return DataSource.objects.first() # the django database datasource is always the first element in the database
def create_data_element_from_variable(self, variable, value, timestamp, **kwargs): def create_data_element_from_variable(self, variable, value, timestamp, **kwargs):
if value is not None: if value is not None:
@ -49,22 +56,27 @@ class InfluxDatabase(DjangoDatabase):
if hasattr(variable, "date_saved") if hasattr(variable, "date_saved")
else now() else now()
) )
return Point(self.measurement_name) #.field("id", int(int(int(timestamp) * 2097152) + variable.pk))
#.field("id", int(int(int(timestamp) * 2097152) + variable.pk)) # variable_id and device_id had to be a tag to be easily filtered by, even if it is an numeric value
.tag("variable_id", variable.pk) # has to be a tag to be easily filtered by, even if it is an numeric value point = (
Point(self.measurement_name)
.tag("variable_id", variable.pk)
.tag("device_protocol", variable.device.protocol.protocol) .tag("device_protocol", variable.device.protocol.protocol)
.tag"device_id", variable.device_id) # has to be a tag to be easily filtered by, even if it is an numeric value .tag("device_id", variable.device_id)
.tag("value_class", variable.value_class) .tag("value_class", variable.value_class)
.tag("unit", str(variable.unit)) .tag("unit", str(variable.unit))
.field("value", value) .field("value", value)
.field("date_saved", date_saved) .field("date_saved", date_saved)
.timestamp(timestamp / 1000) .timestamp(timestamp / 1000.0)
)
return point
return None return None
def last_value(self, **kwargs): def last_value(self, **kwargs):
if self.only_write_to_influxdb: if self.only_write_to_influxdb:
return super(self).last_value(**kwargs) django_database = self.get_django_database()
return django_database.last_value(**kwargs)
variable = kwargs.pop("variable") if "variable" in kwargs else None variable = kwargs.pop("variable") if "variable" in kwargs else None
use_date_saved = ( use_date_saved = (
kwargs.pop("use_date_saved") if "use_date_saved" in kwargs else False kwargs.pop("use_date_saved") if "use_date_saved" in kwargs else False
@ -72,16 +84,17 @@ class InfluxDatabase(DjangoDatabase):
query = f'from(bucket: "{self.bucket}") |> range(start: {time_min}) |> filter(fn:(r) => r._measurement == "{self.measurement_name}" ) |> filter(fn:(r) => r.variable_id == "{variable_id}") |> filter(fn:(r) => r._field == "value") |> keep(columns: ["_time","_value"]) |> last()' query = f'from(bucket: "{self.bucket}") |> range(start: {time_min}) |> filter(fn:(r) => r._measurement == "{self.measurement_name}" ) |> filter(fn:(r) => r.variable_id == "{variable_id}") |> filter(fn:(r) => r._field == "value") |> keep(columns: ["_time","_value"]) |> last()'
r = query_api.query(query) r = query_api.query(query)
r = r.to_values(columns=['_time','_value'])[0] r = r.to_values(columns=['_time','_value'])[0]
if len(r) == 0: if len(r) == 0:
return None return None
return [r[-1][0].timestamp(), r[-1][1]] return [r[-1][0].timestamp(), r[-1][1]]
def read_multiple(self, **kwargs): def read_multiple(self, **kwargs):
if self.only_write_to_influxdb: if self.only_write_to_influxdb:
return super(self).read_multiple(**kwargs) django_database = self.get_django_database()
return django_database.read_multiple(**kwargs)
variable_ids = kwargs.pop("variable_ids") if "variable_ids" in kwargs else [] variable_ids = kwargs.pop("variable_ids") if "variable_ids" in kwargs else []
time_min = kwargs.pop("time_min") if "time_min" in kwargs else 0 time_min = kwargs.pop("time_min") if "time_min" in kwargs else 0
time_max = kwargs.pop("time_max") if "time_max" in kwargs else time.time() time_max = kwargs.pop("time_max") if "time_max" in kwargs else time.time()
@ -111,7 +124,7 @@ class InfluxDatabase(DjangoDatabase):
query = f'from(bucket: "{self.bucket}") |> range(start: {time_min}, stop: {time_max}) |> filter(fn:(r) => r._measurement == "{self.measurement_name}" ) |> filter(fn:(r) => r.variable_id == "{variable_id}") |> filter(fn:(r) => r._field == "value") |> keep(columns: ["_time","_value"])' query = f'from(bucket: "{self.bucket}") |> range(start: {time_min}, stop: {time_max}) |> filter(fn:(r) => r._measurement == "{self.measurement_name}" ) |> filter(fn:(r) => r.variable_id == "{variable_id}") |> filter(fn:(r) => r._field == "value") |> keep(columns: ["_time","_value"])'
r = query_api.query(query) r = query_api.query(query)
values[variable_id] = [ [i_time.timestamp(), i_value] for i_time, i_value in r.to_values(["_time","_value"])] values[variable_id] = [ [i_time.timestamp(), i_value] for i_time, i_value in r.to_values(["_time","_value"])]
tmp_time_max = max(tmp_time_max, max([i_time for i_time, i_value in values[variable_id]]) tmp_time_max = max(tmp_time_max, max([i_time for i_time, i_value in values[variable_id]]))
query = f'from(bucket: "{self.bucket}") |> range(start: {time_min}, stop: {time_max}) |> filter(fn:(r) => r._measurement == "{self.measurement_name}" ) |> filter(fn:(r) => r.variable_id == "{variable_id}") |> filter(fn:(r) => r._field == "date_saved") |> max()' query = f'from(bucket: "{self.bucket}") |> range(start: {time_min}, stop: {time_max}) |> filter(fn:(r) => r._measurement == "{self.measurement_name}" ) |> filter(fn:(r) => r.variable_id == "{variable_id}") |> filter(fn:(r) => r._field == "date_saved") |> max()'
r = query_api.query(query) r = query_api.query(query)
date_saved_max = max(date_saved_max, r.to_values(["_value"])[0][0]) date_saved_max = max(date_saved_max, r.to_values(["_value"])[0][0])
@ -122,8 +135,10 @@ class InfluxDatabase(DjangoDatabase):
return values return values
def write_multiple(self, **kwargs): def write_multiple(self, **kwargs):
if self.only_write_to_influxdb: if self.only_write_to_influxdb:
data_model = self._import_model() django_database = self.get_django_database()
data_model = django_database._import_model()
items = kwargs.pop("items") if "items" in kwargs else [] items = kwargs.pop("items") if "items" in kwargs else []
items = self.datasource.datasource_check(items) # FIXME what is happening here? items = self.datasource.datasource_check(items) # FIXME what is happening here?
points = [] points = []
@ -179,15 +194,20 @@ class InfluxDatabase(DjangoDatabase):
item.date_saved = None item.date_saved = None
def get_first_element_timestamp(self, **kwargs): def get_first_element_timestamp(self, **kwargs):
"""this will likly time out and should be considert non functioning! """
this will likly time out and should be considert non functioning!
""" """
if self.only_write_to_influxdb: if self.only_write_to_influxdb:
return super(self).get_first_element_timestamp(**kwargs) django_database = self.get_django_database()
return django_database.get_first_element_timestamp(**kwargs)
return self.get_edge_element_timestamp(first_last="first", **kwargs) return self.get_edge_element_timestamp(first_last="first", **kwargs)
def get_last_element_timestamp(self, **kwargs): def get_last_element_timestamp(self, **kwargs):
if self.only_write_to_influxdb: if self.only_write_to_influxdb:
return super(self).get_last_element_timestamp(**kwargs) django_database = self.get_django_database()
return django_database.get_last_element_timestamp(**kwargs)
return self.get_edge_element_timestamp(first_last="last", **kwargs) return self.get_edge_element_timestamp(first_last="last", **kwargs)
def get_edge_element_timestamp(self, first_last="last", **kwargs): def get_edge_element_timestamp(self, first_last="last", **kwargs):
@ -223,4 +243,4 @@ class InfluxDatabase(DjangoDatabase):
if first_last == "last": if first_last == "last":
return r[-1].timestamp() return r[-1].timestamp()
elif first_last == "first": elif first_last == "first":
return r[0].timestamp() return r[0].timestamp()