280 lines
11 KiB
Python
280 lines
11 KiB
Python
# -*- coding: utf-8 -*-
|
|
from __future__ import unicode_literals
|
|
|
|
import logging
|
|
import time
|
|
from datetime import datetime
|
|
|
|
import pytz
|
|
from django.conf import settings
|
|
from django.db import models
|
|
from django.db.utils import IntegrityError
|
|
from django.utils.timezone import now
|
|
from influxdb_client import InfluxDBClient, Point
|
|
from influxdb_client.client.write_api import SYNCHRONOUS
|
|
|
|
from pyscada.models import DataSource, DjangoDatabase, Variable
|
|
|
|
tz_local = pytz.timezone(settings.TIME_ZONE)
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
class InfluxDatabase(models.Model):
|
|
datasource = models.OneToOneField(DataSource, on_delete=models.CASCADE)
|
|
|
|
def __str__(self):
|
|
return f"Influx Database ({self.url}.{self.bucket})"
|
|
|
|
bucket = models.CharField(max_length=255)
|
|
api_key = models.CharField(max_length=255)
|
|
organisation = models.CharField(max_length=255)
|
|
write_precision = models.CharField(default="ms", max_length=2)
|
|
url = models.CharField(default="127.0.0.1:8086", max_length=255)
|
|
measurement_name = models.CharField(
|
|
default="pyscada.models.RecordedData", max_length=255
|
|
)
|
|
only_write_to_influxdb = models.BooleanField(
|
|
default=True,
|
|
help_text="when selected only a copy of the data is written to \
|
|
the InfluxDB and the SQL Database is used for everything else",
|
|
)
|
|
|
|
client = None
|
|
write_api = None
|
|
|
|
def connect(self):
|
|
self.client = InfluxDBClient(
|
|
url=self.url, token=self.api_key, org=self.organisation
|
|
)
|
|
return self.client
|
|
|
|
def get_write_api(self):
|
|
if self.client is None:
|
|
self.connect()
|
|
return self.client.write_api(write_options=SYNCHRONOUS)
|
|
|
|
def get_query_api(self):
|
|
if self.client is None:
|
|
self.connect()
|
|
return self.client.query_api()
|
|
|
|
def get_django_database(self):
|
|
return (
|
|
DjangoDatabase.objects.first()
|
|
) # the django database datasource is always the first element
|
|
# in the database
|
|
|
|
def write_points(self, points):
|
|
write_api = self.get_write_api()
|
|
try:
|
|
write_api.write(
|
|
bucket=self.bucket,
|
|
org=self.organisation,
|
|
record=points,
|
|
write_precision=self.write_precision,
|
|
)
|
|
except:
|
|
logger.debug(
|
|
f'Error in Point Data {", ".join(str(i.time) for i in points)}'
|
|
)
|
|
|
|
def to_flux_time(self, timestamp):
|
|
# if self.write_precision == "ms":
|
|
# return int(timestamp*1000)
|
|
# if self.write_precision == "ns":
|
|
# return int(timestamp*1000*1000)
|
|
# if self.write_precision == "s":
|
|
return int(timestamp)
|
|
|
|
def create_data_element_from_variable(self, variable, timestamp, value, **kwargs):
|
|
if value is None:
|
|
return None
|
|
if timestamp is None:
|
|
return None
|
|
date_saved = kwargs.get(
|
|
"date_saved",
|
|
variable.date_saved if hasattr(variable, "date_saved") else now(),
|
|
)
|
|
|
|
# variable_id and device_id had to be a tag to be easily
|
|
# filtered by, even if it is an numeric value
|
|
logger.debug(f"{variable} {timestamp} {value}")
|
|
point = (
|
|
Point(self.measurement_name)
|
|
.tag("variable_id", variable.pk)
|
|
.tag("device_protocol", variable.device.protocol.protocol)
|
|
.tag("device_id", variable.device_id)
|
|
.tag("value_class", variable.value_class)
|
|
.tag("unit", str(variable.unit))
|
|
.field("value", value)
|
|
.field("date_saved", date_saved.timestamp())
|
|
.time(datetime.utcfromtimestamp(timestamp/1000.0))
|
|
)
|
|
return point
|
|
|
|
def last_datapoint(self, **kwargs):
|
|
if self.only_write_to_influxdb:
|
|
django_database = self.get_django_database()
|
|
return django_database.last_datapoint(**kwargs)
|
|
|
|
variable = kwargs.pop("variable") if "variable" in kwargs else None
|
|
use_date_saved = (
|
|
kwargs.pop("use_date_saved") if "use_date_saved" in kwargs else False
|
|
)
|
|
start_time = "-24h"
|
|
query = f'from(bucket: "{self.bucket}") '
|
|
query += f"|> range(start: {start_time}) "
|
|
query += f'|> filter(fn:(r) => r._measurement == "{self.measurement_name}" ) '
|
|
query += f'|> filter(fn:(r) => r.variable_id == "{variable.pk}") '
|
|
query += '|> filter(fn:(r) => r._field == "value") '
|
|
query += '|> keep(columns: ["_time","_value"]) '
|
|
query += "|> last()"
|
|
r = self.get_query_api().query(query)
|
|
r = r.to_values(columns=["_time", "_value"])
|
|
|
|
if len(r) == 0:
|
|
return None
|
|
return [r[0][0].timestamp(), r[0][1]]
|
|
|
|
def query_datapoints(self, **kwargs):
|
|
variable_ids = kwargs.pop("variable_ids") if "variable_ids" in kwargs else []
|
|
time_min = kwargs.pop("time_min") if "time_min" in kwargs else 0
|
|
time_max = kwargs.pop("time_max") if "time_max" in kwargs else time.time()
|
|
time_in_ms = kwargs.pop("time_in_ms") if "time_in_ms" in kwargs else True
|
|
query_first_value = (
|
|
kwargs.pop("query_first_value") if "query_first_value" in kwargs else False
|
|
)
|
|
variable_ids = self.datasource.datasource_check(
|
|
variable_ids, items_as_id=True, ids_model=Variable
|
|
)
|
|
|
|
if kwargs.get("time_min_excluded", False):
|
|
time_min = time_min + 0.001
|
|
if kwargs.get("time_max_excluded", False):
|
|
time_max = time_max - 0.001
|
|
|
|
if time_in_ms:
|
|
f_time_scale = 1000
|
|
else:
|
|
f_time_scale = 1
|
|
if self.only_write_to_influxdb:
|
|
return (
|
|
self.get_django_database()
|
|
._import_model()
|
|
.objects.db_data(
|
|
variable_ids=variable_ids,
|
|
time_min=time_min,
|
|
time_max=time_max,
|
|
time_in_ms=time_in_ms,
|
|
query_first_value=query_first_value,
|
|
**kwargs,
|
|
)
|
|
)
|
|
values = {}
|
|
query_api = self.get_query_api()
|
|
tmp_time_max = time_min * f_time_scale
|
|
date_saved_max = time_min * f_time_scale
|
|
for variable_id in variable_ids:
|
|
query = f'from(bucket: "{self.bucket}") '
|
|
query += f"|> range(start: {self.to_flux_time(time_min)}, "
|
|
query += f"stop: {self.to_flux_time(time_max)}) "
|
|
query += (
|
|
f'|> filter(fn:(r) => r._measurement == "{self.measurement_name}" ) '
|
|
)
|
|
query += f'|> filter(fn:(r) => r.variable_id == "{variable_id}") '
|
|
query += '|> filter(fn:(r) => r._field == "value") '
|
|
query += '|> keep(columns: ["_time","_value"])'
|
|
r = query_api.query(query)
|
|
new_data = [
|
|
[i_time.timestamp() * f_time_scale, i_value]
|
|
for i_time, i_value in r.to_values(["_time", "_value"])
|
|
]
|
|
if len(new_data) == 0:
|
|
continue
|
|
values[variable_id] = new_data
|
|
tmp_time_max = max(
|
|
tmp_time_max, max([i_time for i_time, i_value in values[variable_id]])
|
|
)
|
|
query = f'from(bucket: "{self.bucket}") '
|
|
query += f"|> range(start: {self.to_flux_time(time_min)}, "
|
|
query += f"stop: {self.to_flux_time(time_max)}) "
|
|
query += (
|
|
f'|> filter(fn:(r) => r._measurement == "{self.measurement_name}" ) '
|
|
)
|
|
query += f'|> filter(fn:(r) => r.variable_id == "{variable_id}") '
|
|
query += '|> filter(fn:(r) => r._field == "date_saved") '
|
|
query += "|> max()"
|
|
r = query_api.query(query)
|
|
date_saved_max = max(date_saved_max, r.to_values(["_value"])[0][0])
|
|
|
|
values["timestamp"] = tmp_time_max
|
|
values["date_saved_max"] = date_saved_max
|
|
|
|
return values
|
|
|
|
def write_datapoints(self, **kwargs):
|
|
if self.only_write_to_influxdb or True:
|
|
django_database = self.get_django_database()
|
|
data_model = django_database._import_model()
|
|
|
|
items = kwargs.pop("items") if "items" in kwargs else []
|
|
items = self.datasource.datasource_check(items)
|
|
points = []
|
|
recordings = []
|
|
date_saved = kwargs.pop("date_saved") if "date_saved" in kwargs else now()
|
|
batch_size = kwargs.pop("batch_size") if "batch_size" in kwargs else 5000
|
|
i = 0
|
|
logger.debug(f"write_datapoints {items}")
|
|
for item in items:
|
|
logger.debug(f"{item} has {len(item.cached_values_to_write)} to write.")
|
|
if len(item.cached_values_to_write):
|
|
for cached_value in item.cached_values_to_write:
|
|
# add date saved if not exist in variable object,
|
|
# if date_saved is in kwargs it will be used instead
|
|
# of the variable.date_saved (see the
|
|
# create_data_element_from_variable function)
|
|
|
|
if not hasattr(item, "date_saved") or item.date_saved is None:
|
|
|
|
item.date_saved = date_saved
|
|
# create the recorded data object
|
|
point = self.create_data_element_from_variable(
|
|
item, cached_value[0], cached_value[1], **kwargs
|
|
)
|
|
if self.only_write_to_influxdb or True:
|
|
rc = data_model.objects.create_data_element_from_variable(
|
|
item, cached_value[0], cached_value[0], **kwargs
|
|
)
|
|
if rc is not None:
|
|
recordings.append(rc)
|
|
|
|
# append the object to the elements to save
|
|
if point is not None:
|
|
points.append(point)
|
|
i += 1
|
|
|
|
if i % batch_size == 0:
|
|
self.write_points(points)
|
|
points = []
|
|
|
|
if self.only_write_to_influxdb or True:
|
|
try:
|
|
data_model.objects.bulk_create(
|
|
recordings, batch_size=batch_size, **kwargs
|
|
)
|
|
except IntegrityError:
|
|
logger.debug(
|
|
f'{data_model._meta.object_name} objects already \
|
|
exists, retrying ignoring conflicts for : \
|
|
{", ".join(str(i.id) + " " + str(i.variable.id) for i in recordings)}'
|
|
)
|
|
data_model.objects.bulk_create(
|
|
recordings, ignore_conflicts=True, batch_size=batch_size, **kwargs
|
|
)
|
|
|
|
if len(points) > 0:
|
|
self.write_points(points)
|
|
|
|
for item in items:
|
|
item.date_saved = None
|