# -*- coding: utf-8 -*- from __future__ import unicode_literals import logging import time from datetime import datetime import pytz from django.conf import settings from django.db import models from django.db.utils import IntegrityError from django.utils.timezone import now from influxdb_client import InfluxDBClient, Point from influxdb_client.client.write_api import SYNCHRONOUS from pyscada.models import DataSource, Variable from pyscada.django_datasource.models import DjangoDatabase tz_local = pytz.timezone(settings.TIME_ZONE) logger = logging.getLogger(__name__) class InfluxDatabase(models.Model): datasource = models.OneToOneField(DataSource, on_delete=models.CASCADE) def __str__(self): return f"Influx Database ({self.url}.{self.bucket})" bucket = models.CharField(max_length=255) api_key = models.CharField(max_length=255) organisation = models.CharField(max_length=255) write_precision = models.CharField(default="ms", max_length=2) url = models.CharField(default="127.0.0.1:8086", max_length=255) measurement_name = models.CharField( default="pyscada.models.RecordedData", max_length=255 ) only_write_to_influxdb = models.BooleanField( default=True, help_text="when selected only a copy of the data is written to \ the InfluxDB and the SQL Database is used for everything else", ) client = None write_api = None def connect(self): self.client = InfluxDBClient( url=self.url, token=self.api_key, org=self.organisation ) return self.client def get_write_api(self): if self.client is None: self.connect() return self.client.write_api(write_options=SYNCHRONOUS) def get_query_api(self): if self.client is None: self.connect() return self.client.query_api() def get_django_database(self): return ( DjangoDatabase.objects.first() ) # the django database datasource is always the first element # in the database def write_points(self, points): write_api = self.get_write_api() try: write_api.write( bucket=self.bucket, org=self.organisation, record=points, write_precision=self.write_precision, ) except: logger.debug( f'Error in Point Data {", ".join(str(i.time) for i in points)}' ) def to_flux_time(self, timestamp): # if self.write_precision == "ms": # return int(timestamp*1000) # if self.write_precision == "ns": # return int(timestamp*1000*1000) # if self.write_precision == "s": return int(timestamp) def create_data_element_from_variable(self, variable, timestamp, value, date_saved=None, **kwargs): if value is None: return None if timestamp is None: return None if date_saved is None: date_saved = now() if hasattr(variable, "date_saved") and variable.date_saved is not None: date_saved = variable.date_saved # variable_id and device_id had to be a tag to be easily # filtered by, even if it is an numeric value logger.debug(f"{variable} {timestamp} {value}") point = ( Point(self.measurement_name) .tag("variable_id", variable.pk) .tag("device_protocol", variable.device.protocol.protocol) .tag("device_id", variable.device_id) .tag("value_class", variable.value_class) .tag("unit", str(variable.unit)) .field("value", value) .field("date_saved", date_saved.timestamp()) .time(datetime.utcfromtimestamp(timestamp)) ) return point def last_datapoint(self, variable=None, use_date_saved=False, **kwargs): if self.only_write_to_influxdb: django_database = self.get_django_database() return django_database.last_datapoint(variable=variable, use_date_saved=use_date_saved, **kwargs) start_time = "-24h" query = f'from(bucket: "{self.bucket}") ' query += f"|> range(start: {start_time}) " query += f'|> filter(fn:(r) => r._measurement == "{self.measurement_name}" ) ' query += f'|> filter(fn:(r) => r.variable_id == "{variable.pk}") ' query += '|> filter(fn:(r) => r._field == "value") ' query += '|> keep(columns: ["_time","_value"]) ' query += "|> last()" r = self.get_query_api().query(query) r = r.to_values(columns=["_time", "_value"]) if len(r) == 0: return None return [r[0][0].timestamp(), r[0][1]] def query_datapoints(self, variable_ids, time_min=0, time_max=None, query_first_value=False, **kwargs): if time_max is None: time_max = time.time() variable_ids = self.datasource.datasource_check( variable_ids, items_as_id=True, ids_model=Variable ) if self.only_write_to_influxdb: return ( self.get_django_database() ._import_model() .objects.db_data( variable_ids=variable_ids, time_min=time_min, time_max=time_max, query_first_value=query_first_value, **kwargs, ) ) values = {} query_api = self.get_query_api() tmp_time_max = time_min date_saved_max = time_min for variable_id in variable_ids: query = f'from(bucket: "{self.bucket}") ' query += f"|> range(start: {self.to_flux_time(time_min)}, " query += f"stop: {self.to_flux_time(time_max)}) " query += ( f'|> filter(fn:(r) => r._measurement == "{self.measurement_name}" ) ' ) query += f'|> filter(fn:(r) => r.variable_id == "{variable_id}") ' query += '|> filter(fn:(r) => r._field == "value") ' query += '|> keep(columns: ["_time","_value"])' r = query_api.query(query) new_data = [ [i_time.timestamp(), i_value] for i_time, i_value in r.to_values(["_time", "_value"]) ] if len(new_data) == 0: continue values[variable_id] = new_data tmp_time_max = max( tmp_time_max, max([i_time for i_time, i_value in values[variable_id]]) ) query = f'from(bucket: "{self.bucket}") ' query += f"|> range(start: {self.to_flux_time(time_min)}, " query += f"stop: {self.to_flux_time(time_max)}) " query += ( f'|> filter(fn:(r) => r._measurement == "{self.measurement_name}" ) ' ) query += f'|> filter(fn:(r) => r.variable_id == "{variable_id}") ' query += '|> filter(fn:(r) => r._field == "date_saved") ' query += "|> max()" r = query_api.query(query) date_saved_max = max(date_saved_max, r.to_values(["_value"])[0][0]) values["timestamp"] = tmp_time_max values["date_saved_max"] = date_saved_max return values def write_datapoints(self, items:list, date_saved=None,batch_size=5000, **kwargs): if self.only_write_to_influxdb or True: django_database = self.get_django_database() data_model = django_database._import_model() items = self.datasource.datasource_check(items) points = [] recordings = [] i = 0 logger.debug(f"write_datapoints {items}") for item in items: logger.debug(f"{item} has {len(item.cached_values_to_write)} to write.") if len(item.cached_values_to_write): for cached_value in item.cached_values_to_write: # add date saved if not exist in variable object, # if date_saved is in kwargs it will be used instead # of the variable.date_saved (see the # create_data_element_from_variable function) if not hasattr(item, "date_saved") or item.date_saved is None: item.date_saved = date_saved # create the recorded data object point = self.create_data_element_from_variable( variable=item, timestamp=cached_value[0], value=cached_value[1], date_saved=date_saved, **kwargs ) if self.only_write_to_influxdb or True: rc = data_model.objects.create_data_element_from_variable( variable=item, value=cached_value[1], timestamp=cached_value[0], date_saved=date_saved, **kwargs ) if rc is not None: recordings.append(rc) # append the object to the elements to save if point is not None: points.append(point) i += 1 if i % batch_size == 0: self.write_points(points) points = [] if self.only_write_to_influxdb or True: try: data_model.objects.bulk_create( recordings, batch_size=batch_size, **kwargs ) except IntegrityError: logger.debug( f'{data_model._meta.object_name} objects already \ exists, retrying ignoring conflicts for : \ {", ".join(str(i.id) + " " + str(i.variable.id) for i in recordings)}' ) data_model.objects.bulk_create( recordings, ignore_conflicts=True, batch_size=batch_size, **kwargs ) if len(points) > 0: self.write_points(points) for item in items: item.date_saved = None