diff --git a/.flake8 b/.flake8 index b863051..11617ea 100644 --- a/.flake8 +++ b/.flake8 @@ -1,4 +1,10 @@ [flake8] -exclude = build,.git,.tox,./tests/.env +exclude = + build, + .git, + .tox, + tests/.env, + */migrations/* + extend-ignore = E203 max-line-length = 88 diff --git a/eslint.config.mjs b/eslint.config.mjs index 306adb3..fe7907b 100644 --- a/eslint.config.mjs +++ b/eslint.config.mjs @@ -1,63 +1,9 @@ -import globals from "globals"; -import js from "@eslint/js"; - export default [ - js.configs.recommended, { files: ["**/*.js"], rules: { - "camelcase": ["off", {"properties": "always"}], - "comma-spacing": ["error", {"before": false, "after": true}], - "curly": ["error", "all"], - "dot-notation": ["error", {"allowKeywords": true}], - "eqeqeq": ["error"], - "indent": ["error", 4], - "key-spacing": ["error", {"beforeColon": false, "afterColon": true}], - "linebreak-style": ["error", "unix"], - "new-cap": ["off", {"newIsCap": true, "capIsNew": true}], - "no-alert": ["off"], - "no-eval": ["error"], - "no-extend-native": ["error", {"exceptions": ["Date", "String"]}], - "no-multi-spaces": ["error"], - "no-octal-escape": ["error"], - "no-script-url": ["error"], - "no-shadow": ["error", {"hoist": "functions"}], - "no-underscore-dangle": ["error"], - "no-unused-vars": ["error", {"vars": "local", "args": "none"}], - "no-var": ["error"], - "prefer-const": ["error"], - "quotes": ["off", "single"], - "semi": ["error", "always"], - "space-before-blocks": ["error", "always"], - "space-before-function-paren": ["error", {"anonymous": "never", "named": "never"}], - "space-infix-ops": ["error", {"int32Hint": false}], - "strict": ["error", "global"] - }, - languageOptions: { - ecmaVersion: 6, - sourceType: "script", - globals: { - ...globals.browser, - ...globals.commonjs, - "django": false - } + "semi": "warn", + "no-unused-vars": "warn" } - }, - { - files: ["**/*.mjs"], - languageOptions: { - sourceType: "module" - } - }, - { - ignores: [ - "**/*.min.js", - "**/vendor/**/*.js", - "django/contrib/gis/templates/**/*.js", - "django/views/templates/*.js", - "docs/_build/**/*.js", - "node_modules/**.js", - "tests/**/*.js", - ] } ]; diff --git a/pyscada/influxdb/admin.py b/pyscada/influxdb/admin.py index dd51a74..da6347c 100644 --- a/pyscada/influxdb/admin.py +++ b/pyscada/influxdb/admin.py @@ -1,8 +1,8 @@ -import logging -import traceback +# import logging -from django import forms -from django.contrib import admin -from pyscada.admin import admin_site +# from django import forms +# from django.contrib import admin -logger = logging.getLogger(__name__) +# from pyscada.admin import admin_site + +# logger = logging.getLogger(__name__) diff --git a/pyscada/influxdb/management/commands/sync_data.py b/pyscada/influxdb/management/commands/sync_data.py index 3d11eab..e5fb556 100644 --- a/pyscada/influxdb/management/commands/sync_data.py +++ b/pyscada/influxdb/management/commands/sync_data.py @@ -4,12 +4,13 @@ from __future__ import unicode_literals import json +from time import time from django.core.management.base import BaseCommand from pyscada.ems.models import MeteringPoint, VirtualMeteringPoint from pyscada.models import BackgroundProcess -from time import time + class Command(BaseCommand): help = "Run Maintenance Jobs for PyScada-EMS" @@ -19,7 +20,10 @@ class Command(BaseCommand): "type", choices=["mp", "vmp", "all"], type=str, default="all" ) parser.add_argument( - "id", type=int, default=-1, nargs="?", + "id", + type=int, + default=-1, + nargs="?", ) def handle(self, *args, **options): @@ -46,9 +50,6 @@ class Command(BaseCommand): print(f"{(time()-tic):1.2f}") return - - - if options["type"] in ["mp", "all"] and options["id"] == -1: nb_mp = MeteringPoint.objects.count() mp_i = 1 diff --git a/pyscada/influxdb/migrations/0001_initial.py b/pyscada/influxdb/migrations/0001_initial.py index 50a7b40..d58e87e 100644 --- a/pyscada/influxdb/migrations/0001_initial.py +++ b/pyscada/influxdb/migrations/0001_initial.py @@ -1,7 +1,7 @@ # Generated by Django 4.2.13 on 2026-01-28 15:27 -from django.db import migrations, models import django.db.models.deletion +from django.db import migrations, models class Migration(migrations.Migration): @@ -9,22 +9,47 @@ class Migration(migrations.Migration): initial = True dependencies = [ - ('pyscada', '0109_alter_variable_value_class'), + ("pyscada", "0109_alter_variable_value_class"), ] operations = [ migrations.CreateModel( - name='InfluxDatabase', + name="InfluxDatabase", fields=[ - ('id', models.AutoField(auto_created=True, primary_key=True, serialize=False, verbose_name='ID')), - ('bucket', models.CharField(max_length=255)), - ('api_key', models.CharField(max_length=255)), - ('organisation', models.CharField(max_length=255)), - ('write_precision', models.CharField(default='ms', max_length=2)), - ('url', models.CharField(default='127.0.0.1:8086', max_length=255)), - ('measurement_name', models.CharField(default='pyscada.models.RecordedData', max_length=255)), - ('only_write_to_influxdb', models.BooleanField(default=True, help_text='when selected only a copy of the data is written to the InfluxDB and the SQL Database is used for everything else')), - ('datasource', models.OneToOneField(on_delete=django.db.models.deletion.CASCADE, to='pyscada.datasource')), + ( + "id", + models.AutoField( + auto_created=True, + primary_key=True, + serialize=False, + verbose_name="ID", + ), + ), + ("bucket", models.CharField(max_length=255)), + ("api_key", models.CharField(max_length=255)), + ("organisation", models.CharField(max_length=255)), + ("write_precision", models.CharField(default="ms", max_length=2)), + ("url", models.CharField(default="127.0.0.1:8086", max_length=255)), + ( + "measurement_name", + models.CharField( + default="pyscada.models.RecordedData", max_length=255 + ), + ), + ( + "only_write_to_influxdb", + models.BooleanField( + default=True, + help_text="when selected only a copy of the data is written to the InfluxDB and the SQL Database is used for everything else", + ), + ), + ( + "datasource", + models.OneToOneField( + on_delete=django.db.models.deletion.CASCADE, + to="pyscada.datasource", + ), + ), ], ), ] diff --git a/pyscada/influxdb/models.py b/pyscada/influxdb/models.py index aa8caa5..d1396b1 100644 --- a/pyscada/influxdb/models.py +++ b/pyscada/influxdb/models.py @@ -1,47 +1,51 @@ # -*- coding: utf-8 -*- from __future__ import unicode_literals -import os -import re -import traceback +import logging +import time from datetime import datetime -from django.utils.timezone import now, make_aware, is_naive + import pytz from django.conf import settings from django.db import models - -from django.db.utils import IntegrityError, ProgrammingError - -from influxdb_client import InfluxDBClient, Point, WritePrecision +from django.db.utils import IntegrityError +from django.utils.timezone import now +from influxdb_client import InfluxDBClient, Point from influxdb_client.client.write_api import SYNCHRONOUS from pyscada.models import DataSource, DjangoDatabase, Variable tz_local = pytz.timezone(settings.TIME_ZONE) - -import logging - logger = logging.getLogger(__name__) + class InfluxDatabase(models.Model): datasource = models.OneToOneField(DataSource, on_delete=models.CASCADE) def __str__(self): return f"Influx Database ({self.url}.{self.bucket})" - + bucket = models.CharField(max_length=255) api_key = models.CharField(max_length=255) organisation = models.CharField(max_length=255) write_precision = models.CharField(default="ms", max_length=2) url = models.CharField(default="127.0.0.1:8086", max_length=255) - measurement_name = models.CharField(default="pyscada.models.RecordedData", max_length=255) - only_write_to_influxdb = models.BooleanField(default=True, help_text="when selected only a copy of the data is written to the InfluxDB and the SQL Database is used for everything else") - + measurement_name = models.CharField( + default="pyscada.models.RecordedData", max_length=255 + ) + only_write_to_influxdb = models.BooleanField( + default=True, + help_text="when selected only a copy of the data is written to \ + the InfluxDB and the SQL Database is used for everything else", + ) + client = None write_api = None def connect(self): - self.client = InfluxDBClient(url=self.url, token=self.api_key, org=self.organisation) + self.client = InfluxDBClient( + url=self.url, token=self.api_key, org=self.organisation + ) return self.client def get_write_api(self): @@ -55,34 +59,42 @@ class InfluxDatabase(models.Model): return self.client.query_api() def get_django_database(self): - return DjangoDatabase.objects.first() # the django database datasource is always the first element in the database + return ( + DjangoDatabase.objects.first() + ) # the django database datasource is always the first element + # in the database def write_points(self, points): write_api = self.get_write_api() try: - write_api.write(bucket=self.bucket, org=self.organisation, record=points, write_precision=self.write_precision) + write_api.write( + bucket=self.bucket, + org=self.organisation, + record=points, + write_precision=self.write_precision, + ) except: - logger.debug(f'Error in Point Data {", ".join(str(i.time) for i in points)}') + logger.debug( + f'Error in Point Data {", ".join(str(i.time) for i in points)}' + ) def to_flux_time(self, timestamp): - #if self.write_precision == "ms": + # if self.write_precision == "ms": # return int(timestamp*1000) - #if self.write_precision == "ns": + # if self.write_precision == "ns": # return int(timestamp*1000*1000) - #if self.write_precision == "s": + # if self.write_precision == "s": return int(timestamp) def create_data_element_from_variable(self, variable, timestamp, value, **kwargs): if value is not None: - date_saved = ( - kwargs["date_saved"] - if "date_saved" in kwargs - else variable.date_saved - if hasattr(variable, "date_saved") - else now() + date_saved = kwargs.get( + "date_saved", + variable.date_saved if hasattr(variable, "date_saved") else now(), ) - #.field("id", int(int(int(timestamp) * 2097152) + variable.pk)) - # variable_id and device_id had to be a tag to be easily filtered by, even if it is an numeric value + + # variable_id and device_id had to be a tag to be easily + # filtered by, even if it is an numeric value logger.debug(f"{variable} {timestamp} {value}") point = ( Point(self.measurement_name) @@ -98,7 +110,7 @@ class InfluxDatabase(models.Model): return point return None - def last_value(self, **kwargs): + def last_datapoint(self, **kwargs): if self.only_write_to_influxdb: django_database = self.get_django_database() return django_database.last_value(**kwargs) @@ -108,16 +120,21 @@ class InfluxDatabase(models.Model): kwargs.pop("use_date_saved") if "use_date_saved" in kwargs else False ) start_time = "-24h" - query = f'from(bucket: "{self.bucket}") |> range(start: {start_time}) |> filter(fn:(r) => r._measurement == "{self.measurement_name}" ) |> filter(fn:(r) => r.variable_id == "{variable.pk}") |> filter(fn:(r) => r._field == "value") |> keep(columns: ["_time","_value"]) |> last()' + query = f'from(bucket: "{self.bucket}") ' + query += f"|> range(start: {start_time}) " + query += f'|> filter(fn:(r) => r._measurement == "{self.measurement_name}" ) ' + query += f'|> filter(fn:(r) => r.variable_id == "{variable.pk}") ' + query += '|> filter(fn:(r) => r._field == "value") ' + query += '|> keep(columns: ["_time","_value"]) ' + query += "|> last()" r = self.get_query_api().query(query) - r = r.to_values(columns=['_time','_value']) + r = r.to_values(columns=["_time", "_value"]) if len(r) == 0: return None return [r[0][0].timestamp(), r[0][1]] - - def read_multiple(self, **kwargs): + def query_datapoints(self, **kwargs): variable_ids = kwargs.pop("variable_ids") if "variable_ids" in kwargs else [] time_min = kwargs.pop("time_min") if "time_min" in kwargs else 0 time_max = kwargs.pop("time_max") if "time_max" in kwargs else time.time() @@ -128,53 +145,78 @@ class InfluxDatabase(models.Model): variable_ids = self.datasource.datasource_check( variable_ids, items_as_id=True, ids_model=Variable ) - + if kwargs.get("time_min_excluded", False): time_min = time_min + 0.001 if kwargs.get("time_max_excluded", False): time_max = time_max - 0.001 - + if time_in_ms: f_time_scale = 1000 else: f_time_scale = 1 if self.only_write_to_influxdb: - return self.get_django_database()._import_model().objects.db_data( - variable_ids=variable_ids, - time_min=time_min, - time_max=time_max, - time_in_ms=time_in_ms, - query_first_value=query_first_value, - **kwargs, - ) + return ( + self.get_django_database() + ._import_model() + .objects.db_data( + variable_ids=variable_ids, + time_min=time_min, + time_max=time_max, + time_in_ms=time_in_ms, + query_first_value=query_first_value, + **kwargs, + ) + ) values = {} query_api = self.get_query_api() - tmp_time_max = time_min*f_time_scale - date_saved_max = time_min*f_time_scale + tmp_time_max = time_min * f_time_scale + date_saved_max = time_min * f_time_scale for variable_id in variable_ids: - query = f'from(bucket: "{self.bucket}") |> range(start: {self.to_flux_time(time_min)}, stop: {self.to_flux_time(time_max)}) |> filter(fn:(r) => r._measurement == "{self.measurement_name}" ) |> filter(fn:(r) => r.variable_id == "{variable_id}") |> filter(fn:(r) => r._field == "value") |> keep(columns: ["_time","_value"])' + query = f'from(bucket: "{self.bucket}") ' + query += f"|> range(start: {self.to_flux_time(time_min)}, " + query += f"stop: {self.to_flux_time(time_max)}) " + query += ( + f'|> filter(fn:(r) => r._measurement == "{self.measurement_name}" ) ' + ) + query += f'|> filter(fn:(r) => r.variable_id == "{variable_id}") ' + query += '|> filter(fn:(r) => r._field == "value") ' + query += '|> keep(columns: ["_time","_value"])' r = query_api.query(query) - new_data = [ [i_time.timestamp()*f_time_scale, i_value] for i_time, i_value in r.to_values(["_time","_value"])] + new_data = [ + [i_time.timestamp() * f_time_scale, i_value] + for i_time, i_value in r.to_values(["_time", "_value"]) + ] if len(new_data) == 0: - continue + continue values[variable_id] = new_data - tmp_time_max = max(tmp_time_max, max([i_time for i_time, i_value in values[variable_id]])) - query = f'from(bucket: "{self.bucket}") |> range(start: {self.to_flux_time(time_min)}, stop: {self.to_flux_time(time_max)}) |> filter(fn:(r) => r._measurement == "{self.measurement_name}" ) |> filter(fn:(r) => r.variable_id == "{variable_id}") |> filter(fn:(r) => r._field == "date_saved") |> max()' + tmp_time_max = max( + tmp_time_max, max([i_time for i_time, i_value in values[variable_id]]) + ) + query = f'from(bucket: "{self.bucket}") ' + query += f"|> range(start: {self.to_flux_time(time_min)}, " + query += f"stop: {self.to_flux_time(time_max)}) " + query += ( + f'|> filter(fn:(r) => r._measurement == "{self.measurement_name}" ) ' + ) + query += f'|> filter(fn:(r) => r.variable_id == "{variable_id}") ' + query += '|> filter(fn:(r) => r._field == "date_saved") ' + query += "|> max()" r = query_api.query(query) date_saved_max = max(date_saved_max, r.to_values(["_value"])[0][0]) - #values["query"] = query + values["timestamp"] = tmp_time_max values["date_saved_max"] = date_saved_max - + return values - - def write_multiple(self, **kwargs): - if self.only_write_to_influxdb or True: + + def write_datapoints(self, **kwargs): + if self.only_write_to_influxdb or True: django_database = self.get_django_database() data_model = django_database._import_model() - + items = kwargs.pop("items") if "items" in kwargs else [] - items = self.datasource.datasource_check(items) # FIXME what is happening here? + items = self.datasource.datasource_check(items) points = [] recordings = [] date_saved = kwargs.pop("date_saved") if "date_saved" in kwargs else now() @@ -184,8 +226,13 @@ class InfluxDatabase(models.Model): logger.debug(f"{item} has {len(item.cached_values_to_write)} to write.") if len(item.cached_values_to_write): for cached_value in item.cached_values_to_write: - # add date saved if not exist in variable object, if date_saved is in kwargs it will be used instead of the variable.date_saved (see the create_data_element_from_variable function) + # add date saved if not exist in variable object, + # if date_saved is in kwargs it will be used instead + # of the variable.date_saved (see the + # create_data_element_from_variable function) + if not hasattr(item, "date_saved") or item.date_saved is None: + item.date_saved = date_saved # create the recorded data object point = self.create_data_element_from_variable( @@ -197,83 +244,33 @@ class InfluxDatabase(models.Model): ) if rc is not None: recordings.append(rc) - + # append the object to the elements to save if point is not None: points.append(point) i += 1 - - - - if i%batch_size == 0: + + if i % batch_size == 0: self.write_points(points) points = [] - + if self.only_write_to_influxdb or True: try: - data_model.objects.bulk_create(recordings, batch_size=batch_size, **kwargs) + data_model.objects.bulk_create( + recordings, batch_size=batch_size, **kwargs + ) except IntegrityError: logger.debug( - f'{data_model._meta.object_name} objects already exists, retrying ignoring conflicts for : {", ".join(str(i.id) + " " + str(i.variable.id) for i in recordings)}' + f'{data_model._meta.object_name} objects already \ + exists, retrying ignoring conflicts for : \ + {", ".join(str(i.id) + " " + str(i.variable.id) for i in recordings)}' ) data_model.objects.bulk_create( recordings, ignore_conflicts=True, batch_size=batch_size, **kwargs ) if len(points) > 0: - self.write_points(points) - + self.write_points(points) + for item in items: item.date_saved = None - - def get_first_element_timestamp(self, **kwargs): - """ - this will likly time out and should be considert non functioning! - """ - if self.only_write_to_influxdb: - django_database = self.get_django_database() - return django_database.get_first_element_timestamp(**kwargs) - - return self.get_edge_element_timestamp(first_last="first", **kwargs) - - def get_last_element_timestamp(self, **kwargs): - if self.only_write_to_influxdb: - django_database = self.get_django_database() - return django_database.get_last_element_timestamp(**kwargs) - - return self.get_edge_element_timestamp(first_last="last", **kwargs) - - def get_edge_element_timestamp(self, first_last="last", **kwargs): - if "variables" in kwargs: - variable_ids = [ v.pk for v in kwargs["variables"]] - elif "variable" in kwargs: - variable_ids = [ kwargs["variable"].pk ] - elif "variable_ids" in kwargs: - variable_ids = kwargs["variable_ids"] - elif "variable_id" in kwargs: - variable_ids = [ kwargs["variable_id"]] - else: - return None - - query_api = self.get_query_api() - start_time = "-24h" - query = f'from(bucket: "{self.bucket}") |> range(start: {start_time}) |> filter(fn:(r) => r._measurement == "{self.measurement_name}" ) |> filter(fn:(r) =>' - for variable_id in variable_ids: - query += ' r.variable_id == "{variable_id}" or ' - query = query[:-3] - - if first_last == "last": - query += ') |> keep(columns: ["_time"]) |> sort(columns: ["_time"], desc: false) |> last(column: "_time")' - elif first_last == "first": - query += ') |> keep(columns: ["_time"]) |> sort(columns: ["_time"], desc: false) |> first(column: "_time")' - else: - return None - - r = query_api.query(query) - r = r.to_values(columns=['_time'])[0] - if len(r) == 0: - return None - if first_last == "last": - return r[-1].timestamp() - elif first_last == "first": - return r[0].timestamp() diff --git a/pyscada/influxdb/tests.py b/pyscada/influxdb/tests.py index 1bcbcd7..e9137c8 100644 --- a/pyscada/influxdb/tests.py +++ b/pyscada/influxdb/tests.py @@ -1,5 +1,3 @@ -from datetime import datetime +# from django.test import TestCase -from django.test import TestCase - -# Create your tests here. \ No newline at end of file +# Create your tests here. diff --git a/pyscada/influxdb/urls.py b/pyscada/influxdb/urls.py index 1c898ff..f5afe5b 100644 --- a/pyscada/influxdb/urls.py +++ b/pyscada/influxdb/urls.py @@ -1,6 +1,6 @@ # -*- coding: utf-8 -*- from __future__ import unicode_literals -from django.urls import path +# from django.urls import path urlpatterns = []