PyScada-Influxdb/pyscada/influxdb/models.py
2026-01-28 15:16:38 +01:00

226 lines
11 KiB
Python

# -*- coding: utf-8 -*-
from __future__ import unicode_literals
import os
import re
import traceback
from datetime import datetime
import pytz
from django.conf import settings
from django.db import models
from influxdb_client import InfluxDBClient, Point, WritePrecision
from influxdb_client.client.write_api import SYNCHRONOUS
from pyscada.models import DataSource
tz_local = pytz.timezone(settings.TIME_ZONE)
class InfluxDatabase(DjangoDatabase):
bucket = models.CharField(max_length=255)
api_key = models.CharField(max_length=255)
organisation = models.CharField(max_length=255)
write_precision = models.CharField(default="ms", max_length=2)
url = models.CharField(default="127.0.0.1:8086")
measurement_name = models.CharField(default="pyscada.models.RecordedData", max_length=255)
only_write_to_influxdb = models.BooleanField(default=True, help_text="when selected only a copy of the data is written to the InfluxDB and the SQL Database is used for everything else")
def connect(self):
self.client = InfluxDBClient(url=self.url, token=self.token, org=self.organisation)
return self.client
def get_write_api(self):
if self.client is None:
self.connect()
return self.client.write_api(write_options=SYNCHRONOUS)
def get_query_api(self):
if self.client is None:
self.connect()
return self.client.query_api()
def create_data_element_from_variable(self, variable, value, timestamp, **kwargs):
if value is not None:
date_saved = (
kwargs["date_saved"]
if "date_saved" in kwargs
else variable.date_saved
if hasattr(variable, "date_saved")
else now()
)
return Point(self.measurement_name)
#.field("id", int(int(int(timestamp) * 2097152) + variable.pk))
.tag("variable_id", variable.pk) # has to be a tag to be easily filtered by, even if it is an numeric value
.tag("device_protocol", variable.device.protocol.protocol)
.tag"device_id", variable.device_id) # has to be a tag to be easily filtered by, even if it is an numeric value
.tag("value_class", variable.value_class)
.tag("unit", str(variable.unit))
.field("value", value)
.field("date_saved", date_saved)
.timestamp(timestamp / 1000)
return None
def last_value(self, **kwargs):
if self.only_write_to_influxdb:
return super(self).last_value(**kwargs)
variable = kwargs.pop("variable") if "variable" in kwargs else None
use_date_saved = (
kwargs.pop("use_date_saved") if "use_date_saved" in kwargs else False
)
query = f'from(bucket: "{self.bucket}") |> range(start: {time_min}) |> filter(fn:(r) => r._measurement == "{self.measurement_name}" ) |> filter(fn:(r) => r.variable_id == "{variable_id}") |> filter(fn:(r) => r._field == "value") |> keep(columns: ["_time","_value"]) |> last()'
r = query_api.query(query)
r = r.to_values(columns=['_time','_value'])[0]
if len(r) == 0:
return None
return [r[-1][0].timestamp(), r[-1][1]]
def read_multiple(self, **kwargs):
if self.only_write_to_influxdb:
return super(self).read_multiple(**kwargs)
variable_ids = kwargs.pop("variable_ids") if "variable_ids" in kwargs else []
time_min = kwargs.pop("time_min") if "time_min" in kwargs else 0
time_max = kwargs.pop("time_max") if "time_max" in kwargs else time.time()
time_in_ms = kwargs.pop("time_in_ms") if "time_in_ms" in kwargs else True
query_first_value = (
kwargs.pop("query_first_value") if "query_first_value" in kwargs else False
)
variable_ids = self.datasource.datasource_check(
variable_ids, items_as_id=True, ids_model=Variable
)
if kwargs.get("time_min_excluded", False):
time_min = time_min + 0.001
if kwargs.get("time_max_excluded", False):
time_max = time_max - 0.001
if time_in_ms:
f_time_scale = 1000
else:
f_time_scale = 1
values = {}
query_api = self.get_query_api()
tmp_time_max = time_min
date_saved_max = time_min
for variable_id in variable_ids:
query = f'from(bucket: "{self.bucket}") |> range(start: {time_min}, stop: {time_max}) |> filter(fn:(r) => r._measurement == "{self.measurement_name}" ) |> filter(fn:(r) => r.variable_id == "{variable_id}") |> filter(fn:(r) => r._field == "value") |> keep(columns: ["_time","_value"])'
r = query_api.query(query)
values[variable_id] = [ [i_time.timestamp(), i_value] for i_time, i_value in r.to_values(["_time","_value"])]
tmp_time_max = max(tmp_time_max, max([i_time for i_time, i_value in values[variable_id]])
query = f'from(bucket: "{self.bucket}") |> range(start: {time_min}, stop: {time_max}) |> filter(fn:(r) => r._measurement == "{self.measurement_name}" ) |> filter(fn:(r) => r.variable_id == "{variable_id}") |> filter(fn:(r) => r._field == "date_saved") |> max()'
r = query_api.query(query)
date_saved_max = max(date_saved_max, r.to_values(["_value"])[0][0])
values["timestamp"] = tmp_time_max * f_time_scale
values["date_saved_max"] = date_saved_max * f_time_scale
return values
def write_multiple(self, **kwargs):
if self.only_write_to_influxdb:
data_model = self._import_model()
items = kwargs.pop("items") if "items" in kwargs else []
items = self.datasource.datasource_check(items) # FIXME what is happening here?
points = []
recordings = []
date_saved = kwargs.pop("date_saved") if "date_saved" in kwargs else now()
write_api = self.get_write_api()
batch_size = kwargs.pop("batch_size") if "batch_size" in kwargs else 5000
i = 0
for item in items:
logger.debug(f"{item} has {len(item.cached_values_to_write)} to write.")
if len(item.cached_values_to_write):
for cached_value in item.cached_values_to_write:
# add date saved if not exist in variable object, if date_saved is in kwargs it will be used instead of the variable.date_saved (see the create_data_element_from_variable function)
if not hasattr(item, "date_saved") or item.date_saved is None:
item.date_saved = date_saved
# create the recorded data object
point = self.create_data_element_from_variable(
item, cached_value[1], cached_value[0], **kwargs
)
if self.only_write_to_influxdb:
rc = data_model.objects.create_data_element_from_variable(
item, cached_value[1], cached_value[0], **kwargs
)
if rc is not None:
recordings.append(rc)
# append the object to the elements to save
if point is not None:
points.append(point)
i += 1
if i%batch_size == 0:
write_api.write(bucket=bucket, org="tub", record=points, write_precision=self.write_precision)
points = []
if self.only_write_to_influxdb:
try:
data_model.objects.bulk_create(recorded_datas, batch_size=batch_size, **kwargs)
except IntegrityError:
logger.debug(
f'{data_model._meta.object_name} objects already exists, retrying ignoring conflicts for : {", ".join(str(i.id) + " " + str(i.variable.id) for i in recorded_datas)}'
)
data_model.objects.bulk_create(
recorded_datas, ignore_conflicts=True, batch_size=batch_size, **kwargs
)
if len(points) > 0:
write_api.write(bucket=bucket, org="tub", record=points, write_precision=self.write_precision)
for item in items:
item.date_saved = None
def get_first_element_timestamp(self, **kwargs):
"""this will likly time out and should be considert non functioning!
"""
if self.only_write_to_influxdb:
return super(self).get_first_element_timestamp(**kwargs)
return self.get_edge_element_timestamp(first_last="first", **kwargs)
def get_last_element_timestamp(self, **kwargs):
if self.only_write_to_influxdb:
return super(self).get_last_element_timestamp(**kwargs)
return self.get_edge_element_timestamp(first_last="last", **kwargs)
def get_edge_element_timestamp(self, first_last="last", **kwargs):
if "variables" in kwargs:
variable_ids = [ v.pk for v in kwargs["variables"]]
elif "variable" in kwargs:
variable_ids = [ kwargs["variable"].pk ]
elif "variable_ids" in kwargs:
variable_ids = kwargs["variable_ids"]
elif "variable_id" in kwargs:
variable_ids = [ kwargs["variable_id"]]
else:
return None
query_api = self.get_query_api()
start_time = "-24h"
query = f'from(bucket: "{self.bucket}") |> range(start: {start_time}) |> filter(fn:(r) => r._measurement == "{self.measurement_name}" ) |> filter(fn:(r) =>'
for variable_id in variable_ids:
query += ' r.variable_id == "{variable_id}" or '
query = query[:-3]
if first_last == "last":
query += ') |> keep(columns: ["_time"]) |> sort(columns: ["_time"], desc: false) |> last(column: "_time")'
elif first_last == "first":
query += ') |> keep(columns: ["_time"]) |> sort(columns: ["_time"], desc: false) |> first(column: "_time")'
else:
return None
r = query_api.query(query)
r = r.to_values(columns=['_time'])[0]
if len(r) == 0:
return None
if first_last == "last":
return r[-1].timestamp()
elif first_last == "first":
return r[0].timestamp()