# -*- coding: utf-8 -*- from __future__ import unicode_literals import os import re import traceback from datetime import datetime import pytz from django.conf import settings from django.db import models from influxdb_client import InfluxDBClient, Point, WritePrecision from influxdb_client.client.write_api import SYNCHRONOUS from pyscada.models import DataSource, DjangoDatabase tz_local = pytz.timezone(settings.TIME_ZONE) class InfluxDatabase(models.Model): datasource = models.OneToOneField(DataSource, on_delete=models.CASCADE) def __str__(self): return f"Influx Database ({self.url}.{self.bucket})" bucket = models.CharField(max_length=255) api_key = models.CharField(max_length=255) organisation = models.CharField(max_length=255) write_precision = models.CharField(default="ms", max_length=2) url = models.CharField(default="127.0.0.1:8086", max_length=255) measurement_name = models.CharField(default="pyscada.models.RecordedData", max_length=255) only_write_to_influxdb = models.BooleanField(default=True, help_text="when selected only a copy of the data is written to the InfluxDB and the SQL Database is used for everything else") def connect(self): self.client = InfluxDBClient(url=self.url, token=self.token, org=self.organisation) return self.client def get_write_api(self): if self.client is None: self.connect() return self.client.write_api(write_options=SYNCHRONOUS) def get_query_api(self): if self.client is None: self.connect() return self.client.query_api() def get_django_database(): return DataSource.objects.first() # the django database datasource is always the first element in the database def create_data_element_from_variable(self, variable, value, timestamp, **kwargs): if value is not None: date_saved = ( kwargs["date_saved"] if "date_saved" in kwargs else variable.date_saved if hasattr(variable, "date_saved") else now() ) #.field("id", int(int(int(timestamp) * 2097152) + variable.pk)) # variable_id and device_id had to be a tag to be easily filtered by, even if it is an numeric value point = ( Point(self.measurement_name) .tag("variable_id", variable.pk) .tag("device_protocol", variable.device.protocol.protocol) .tag("device_id", variable.device_id) .tag("value_class", variable.value_class) .tag("unit", str(variable.unit)) .field("value", value) .field("date_saved", date_saved) .timestamp(timestamp / 1000.0) ) return point return None def last_value(self, **kwargs): if self.only_write_to_influxdb: django_database = self.get_django_database() return django_database.last_value(**kwargs) variable = kwargs.pop("variable") if "variable" in kwargs else None use_date_saved = ( kwargs.pop("use_date_saved") if "use_date_saved" in kwargs else False ) query = f'from(bucket: "{self.bucket}") |> range(start: {time_min}) |> filter(fn:(r) => r._measurement == "{self.measurement_name}" ) |> filter(fn:(r) => r.variable_id == "{variable_id}") |> filter(fn:(r) => r._field == "value") |> keep(columns: ["_time","_value"]) |> last()' r = query_api.query(query) r = r.to_values(columns=['_time','_value'])[0] if len(r) == 0: return None return [r[-1][0].timestamp(), r[-1][1]] def read_multiple(self, **kwargs): if self.only_write_to_influxdb: django_database = self.get_django_database() return django_database.read_multiple(**kwargs) variable_ids = kwargs.pop("variable_ids") if "variable_ids" in kwargs else [] time_min = kwargs.pop("time_min") if "time_min" in kwargs else 0 time_max = kwargs.pop("time_max") if "time_max" in kwargs else time.time() time_in_ms = kwargs.pop("time_in_ms") if "time_in_ms" in kwargs else True query_first_value = ( kwargs.pop("query_first_value") if "query_first_value" in kwargs else False ) variable_ids = self.datasource.datasource_check( variable_ids, items_as_id=True, ids_model=Variable ) if kwargs.get("time_min_excluded", False): time_min = time_min + 0.001 if kwargs.get("time_max_excluded", False): time_max = time_max - 0.001 if time_in_ms: f_time_scale = 1000 else: f_time_scale = 1 values = {} query_api = self.get_query_api() tmp_time_max = time_min date_saved_max = time_min for variable_id in variable_ids: query = f'from(bucket: "{self.bucket}") |> range(start: {time_min}, stop: {time_max}) |> filter(fn:(r) => r._measurement == "{self.measurement_name}" ) |> filter(fn:(r) => r.variable_id == "{variable_id}") |> filter(fn:(r) => r._field == "value") |> keep(columns: ["_time","_value"])' r = query_api.query(query) values[variable_id] = [ [i_time.timestamp(), i_value] for i_time, i_value in r.to_values(["_time","_value"])] tmp_time_max = max(tmp_time_max, max([i_time for i_time, i_value in values[variable_id]])) query = f'from(bucket: "{self.bucket}") |> range(start: {time_min}, stop: {time_max}) |> filter(fn:(r) => r._measurement == "{self.measurement_name}" ) |> filter(fn:(r) => r.variable_id == "{variable_id}") |> filter(fn:(r) => r._field == "date_saved") |> max()' r = query_api.query(query) date_saved_max = max(date_saved_max, r.to_values(["_value"])[0][0]) values["timestamp"] = tmp_time_max * f_time_scale values["date_saved_max"] = date_saved_max * f_time_scale return values def write_multiple(self, **kwargs): if self.only_write_to_influxdb: django_database = self.get_django_database() data_model = django_database._import_model() items = kwargs.pop("items") if "items" in kwargs else [] items = self.datasource.datasource_check(items) # FIXME what is happening here? points = [] recordings = [] date_saved = kwargs.pop("date_saved") if "date_saved" in kwargs else now() write_api = self.get_write_api() batch_size = kwargs.pop("batch_size") if "batch_size" in kwargs else 5000 i = 0 for item in items: logger.debug(f"{item} has {len(item.cached_values_to_write)} to write.") if len(item.cached_values_to_write): for cached_value in item.cached_values_to_write: # add date saved if not exist in variable object, if date_saved is in kwargs it will be used instead of the variable.date_saved (see the create_data_element_from_variable function) if not hasattr(item, "date_saved") or item.date_saved is None: item.date_saved = date_saved # create the recorded data object point = self.create_data_element_from_variable( item, cached_value[1], cached_value[0], **kwargs ) if self.only_write_to_influxdb: rc = data_model.objects.create_data_element_from_variable( item, cached_value[1], cached_value[0], **kwargs ) if rc is not None: recordings.append(rc) # append the object to the elements to save if point is not None: points.append(point) i += 1 if i%batch_size == 0: write_api.write(bucket=bucket, org="tub", record=points, write_precision=self.write_precision) points = [] if self.only_write_to_influxdb: try: data_model.objects.bulk_create(recorded_datas, batch_size=batch_size, **kwargs) except IntegrityError: logger.debug( f'{data_model._meta.object_name} objects already exists, retrying ignoring conflicts for : {", ".join(str(i.id) + " " + str(i.variable.id) for i in recorded_datas)}' ) data_model.objects.bulk_create( recorded_datas, ignore_conflicts=True, batch_size=batch_size, **kwargs ) if len(points) > 0: write_api.write(bucket=bucket, org="tub", record=points, write_precision=self.write_precision) for item in items: item.date_saved = None def get_first_element_timestamp(self, **kwargs): """ this will likly time out and should be considert non functioning! """ if self.only_write_to_influxdb: django_database = self.get_django_database() return django_database.get_first_element_timestamp(**kwargs) return self.get_edge_element_timestamp(first_last="first", **kwargs) def get_last_element_timestamp(self, **kwargs): if self.only_write_to_influxdb: django_database = self.get_django_database() return django_database.get_last_element_timestamp(**kwargs) return self.get_edge_element_timestamp(first_last="last", **kwargs) def get_edge_element_timestamp(self, first_last="last", **kwargs): if "variables" in kwargs: variable_ids = [ v.pk for v in kwargs["variables"]] elif "variable" in kwargs: variable_ids = [ kwargs["variable"].pk ] elif "variable_ids" in kwargs: variable_ids = kwargs["variable_ids"] elif "variable_id" in kwargs: variable_ids = [ kwargs["variable_id"]] else: return None query_api = self.get_query_api() start_time = "-24h" query = f'from(bucket: "{self.bucket}") |> range(start: {start_time}) |> filter(fn:(r) => r._measurement == "{self.measurement_name}" ) |> filter(fn:(r) =>' for variable_id in variable_ids: query += ' r.variable_id == "{variable_id}" or ' query = query[:-3] if first_last == "last": query += ') |> keep(columns: ["_time"]) |> sort(columns: ["_time"], desc: false) |> last(column: "_time")' elif first_last == "first": query += ') |> keep(columns: ["_time"]) |> sort(columns: ["_time"], desc: false) |> first(column: "_time")' else: return None r = query_api.query(query) r = r.to_values(columns=['_time'])[0] if len(r) == 0: return None if first_last == "last": return r[-1].timestamp() elif first_last == "first": return r[0].timestamp()