black formating and new method names
This commit is contained in:
parent
e67559e136
commit
efc81d4c9c
8
.flake8
8
.flake8
|
|
@ -1,4 +1,10 @@
|
||||||
[flake8]
|
[flake8]
|
||||||
exclude = build,.git,.tox,./tests/.env
|
exclude =
|
||||||
|
build,
|
||||||
|
.git,
|
||||||
|
.tox,
|
||||||
|
tests/.env,
|
||||||
|
*/migrations/*
|
||||||
|
|
||||||
extend-ignore = E203
|
extend-ignore = E203
|
||||||
max-line-length = 88
|
max-line-length = 88
|
||||||
|
|
|
||||||
|
|
@ -1,63 +1,9 @@
|
||||||
import globals from "globals";
|
|
||||||
import js from "@eslint/js";
|
|
||||||
|
|
||||||
export default [
|
export default [
|
||||||
js.configs.recommended,
|
|
||||||
{
|
{
|
||||||
files: ["**/*.js"],
|
files: ["**/*.js"],
|
||||||
rules: {
|
rules: {
|
||||||
"camelcase": ["off", {"properties": "always"}],
|
"semi": "warn",
|
||||||
"comma-spacing": ["error", {"before": false, "after": true}],
|
"no-unused-vars": "warn"
|
||||||
"curly": ["error", "all"],
|
|
||||||
"dot-notation": ["error", {"allowKeywords": true}],
|
|
||||||
"eqeqeq": ["error"],
|
|
||||||
"indent": ["error", 4],
|
|
||||||
"key-spacing": ["error", {"beforeColon": false, "afterColon": true}],
|
|
||||||
"linebreak-style": ["error", "unix"],
|
|
||||||
"new-cap": ["off", {"newIsCap": true, "capIsNew": true}],
|
|
||||||
"no-alert": ["off"],
|
|
||||||
"no-eval": ["error"],
|
|
||||||
"no-extend-native": ["error", {"exceptions": ["Date", "String"]}],
|
|
||||||
"no-multi-spaces": ["error"],
|
|
||||||
"no-octal-escape": ["error"],
|
|
||||||
"no-script-url": ["error"],
|
|
||||||
"no-shadow": ["error", {"hoist": "functions"}],
|
|
||||||
"no-underscore-dangle": ["error"],
|
|
||||||
"no-unused-vars": ["error", {"vars": "local", "args": "none"}],
|
|
||||||
"no-var": ["error"],
|
|
||||||
"prefer-const": ["error"],
|
|
||||||
"quotes": ["off", "single"],
|
|
||||||
"semi": ["error", "always"],
|
|
||||||
"space-before-blocks": ["error", "always"],
|
|
||||||
"space-before-function-paren": ["error", {"anonymous": "never", "named": "never"}],
|
|
||||||
"space-infix-ops": ["error", {"int32Hint": false}],
|
|
||||||
"strict": ["error", "global"]
|
|
||||||
},
|
|
||||||
languageOptions: {
|
|
||||||
ecmaVersion: 6,
|
|
||||||
sourceType: "script",
|
|
||||||
globals: {
|
|
||||||
...globals.browser,
|
|
||||||
...globals.commonjs,
|
|
||||||
"django": false
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
},
|
|
||||||
{
|
|
||||||
files: ["**/*.mjs"],
|
|
||||||
languageOptions: {
|
|
||||||
sourceType: "module"
|
|
||||||
}
|
|
||||||
},
|
|
||||||
{
|
|
||||||
ignores: [
|
|
||||||
"**/*.min.js",
|
|
||||||
"**/vendor/**/*.js",
|
|
||||||
"django/contrib/gis/templates/**/*.js",
|
|
||||||
"django/views/templates/*.js",
|
|
||||||
"docs/_build/**/*.js",
|
|
||||||
"node_modules/**.js",
|
|
||||||
"tests/**/*.js",
|
|
||||||
]
|
|
||||||
}
|
|
||||||
];
|
];
|
||||||
|
|
|
||||||
|
|
@ -1,8 +1,8 @@
|
||||||
import logging
|
# import logging
|
||||||
import traceback
|
|
||||||
|
|
||||||
from django import forms
|
# from django import forms
|
||||||
from django.contrib import admin
|
# from django.contrib import admin
|
||||||
from pyscada.admin import admin_site
|
|
||||||
|
|
||||||
logger = logging.getLogger(__name__)
|
# from pyscada.admin import admin_site
|
||||||
|
|
||||||
|
# logger = logging.getLogger(__name__)
|
||||||
|
|
|
||||||
|
|
@ -4,12 +4,13 @@
|
||||||
from __future__ import unicode_literals
|
from __future__ import unicode_literals
|
||||||
|
|
||||||
import json
|
import json
|
||||||
|
from time import time
|
||||||
|
|
||||||
from django.core.management.base import BaseCommand
|
from django.core.management.base import BaseCommand
|
||||||
|
|
||||||
from pyscada.ems.models import MeteringPoint, VirtualMeteringPoint
|
from pyscada.ems.models import MeteringPoint, VirtualMeteringPoint
|
||||||
from pyscada.models import BackgroundProcess
|
from pyscada.models import BackgroundProcess
|
||||||
from time import time
|
|
||||||
|
|
||||||
class Command(BaseCommand):
|
class Command(BaseCommand):
|
||||||
help = "Run Maintenance Jobs for PyScada-EMS"
|
help = "Run Maintenance Jobs for PyScada-EMS"
|
||||||
|
|
@ -19,7 +20,10 @@ class Command(BaseCommand):
|
||||||
"type", choices=["mp", "vmp", "all"], type=str, default="all"
|
"type", choices=["mp", "vmp", "all"], type=str, default="all"
|
||||||
)
|
)
|
||||||
parser.add_argument(
|
parser.add_argument(
|
||||||
"id", type=int, default=-1, nargs="?",
|
"id",
|
||||||
|
type=int,
|
||||||
|
default=-1,
|
||||||
|
nargs="?",
|
||||||
)
|
)
|
||||||
|
|
||||||
def handle(self, *args, **options):
|
def handle(self, *args, **options):
|
||||||
|
|
@ -46,9 +50,6 @@ class Command(BaseCommand):
|
||||||
print(f"{(time()-tic):1.2f}")
|
print(f"{(time()-tic):1.2f}")
|
||||||
return
|
return
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
if options["type"] in ["mp", "all"] and options["id"] == -1:
|
if options["type"] in ["mp", "all"] and options["id"] == -1:
|
||||||
nb_mp = MeteringPoint.objects.count()
|
nb_mp = MeteringPoint.objects.count()
|
||||||
mp_i = 1
|
mp_i = 1
|
||||||
|
|
|
||||||
|
|
@ -1,7 +1,7 @@
|
||||||
# Generated by Django 4.2.13 on 2026-01-28 15:27
|
# Generated by Django 4.2.13 on 2026-01-28 15:27
|
||||||
|
|
||||||
from django.db import migrations, models
|
|
||||||
import django.db.models.deletion
|
import django.db.models.deletion
|
||||||
|
from django.db import migrations, models
|
||||||
|
|
||||||
|
|
||||||
class Migration(migrations.Migration):
|
class Migration(migrations.Migration):
|
||||||
|
|
@ -9,22 +9,47 @@ class Migration(migrations.Migration):
|
||||||
initial = True
|
initial = True
|
||||||
|
|
||||||
dependencies = [
|
dependencies = [
|
||||||
('pyscada', '0109_alter_variable_value_class'),
|
("pyscada", "0109_alter_variable_value_class"),
|
||||||
]
|
]
|
||||||
|
|
||||||
operations = [
|
operations = [
|
||||||
migrations.CreateModel(
|
migrations.CreateModel(
|
||||||
name='InfluxDatabase',
|
name="InfluxDatabase",
|
||||||
fields=[
|
fields=[
|
||||||
('id', models.AutoField(auto_created=True, primary_key=True, serialize=False, verbose_name='ID')),
|
(
|
||||||
('bucket', models.CharField(max_length=255)),
|
"id",
|
||||||
('api_key', models.CharField(max_length=255)),
|
models.AutoField(
|
||||||
('organisation', models.CharField(max_length=255)),
|
auto_created=True,
|
||||||
('write_precision', models.CharField(default='ms', max_length=2)),
|
primary_key=True,
|
||||||
('url', models.CharField(default='127.0.0.1:8086', max_length=255)),
|
serialize=False,
|
||||||
('measurement_name', models.CharField(default='pyscada.models.RecordedData', max_length=255)),
|
verbose_name="ID",
|
||||||
('only_write_to_influxdb', models.BooleanField(default=True, help_text='when selected only a copy of the data is written to the InfluxDB and the SQL Database is used for everything else')),
|
),
|
||||||
('datasource', models.OneToOneField(on_delete=django.db.models.deletion.CASCADE, to='pyscada.datasource')),
|
),
|
||||||
|
("bucket", models.CharField(max_length=255)),
|
||||||
|
("api_key", models.CharField(max_length=255)),
|
||||||
|
("organisation", models.CharField(max_length=255)),
|
||||||
|
("write_precision", models.CharField(default="ms", max_length=2)),
|
||||||
|
("url", models.CharField(default="127.0.0.1:8086", max_length=255)),
|
||||||
|
(
|
||||||
|
"measurement_name",
|
||||||
|
models.CharField(
|
||||||
|
default="pyscada.models.RecordedData", max_length=255
|
||||||
|
),
|
||||||
|
),
|
||||||
|
(
|
||||||
|
"only_write_to_influxdb",
|
||||||
|
models.BooleanField(
|
||||||
|
default=True,
|
||||||
|
help_text="when selected only a copy of the data is written to the InfluxDB and the SQL Database is used for everything else",
|
||||||
|
),
|
||||||
|
),
|
||||||
|
(
|
||||||
|
"datasource",
|
||||||
|
models.OneToOneField(
|
||||||
|
on_delete=django.db.models.deletion.CASCADE,
|
||||||
|
to="pyscada.datasource",
|
||||||
|
),
|
||||||
|
),
|
||||||
],
|
],
|
||||||
),
|
),
|
||||||
]
|
]
|
||||||
|
|
|
||||||
|
|
@ -1,28 +1,24 @@
|
||||||
# -*- coding: utf-8 -*-
|
# -*- coding: utf-8 -*-
|
||||||
from __future__ import unicode_literals
|
from __future__ import unicode_literals
|
||||||
|
|
||||||
import os
|
import logging
|
||||||
import re
|
import time
|
||||||
import traceback
|
|
||||||
from datetime import datetime
|
from datetime import datetime
|
||||||
from django.utils.timezone import now, make_aware, is_naive
|
|
||||||
import pytz
|
import pytz
|
||||||
from django.conf import settings
|
from django.conf import settings
|
||||||
from django.db import models
|
from django.db import models
|
||||||
|
from django.db.utils import IntegrityError
|
||||||
from django.db.utils import IntegrityError, ProgrammingError
|
from django.utils.timezone import now
|
||||||
|
from influxdb_client import InfluxDBClient, Point
|
||||||
from influxdb_client import InfluxDBClient, Point, WritePrecision
|
|
||||||
from influxdb_client.client.write_api import SYNCHRONOUS
|
from influxdb_client.client.write_api import SYNCHRONOUS
|
||||||
|
|
||||||
from pyscada.models import DataSource, DjangoDatabase, Variable
|
from pyscada.models import DataSource, DjangoDatabase, Variable
|
||||||
|
|
||||||
tz_local = pytz.timezone(settings.TIME_ZONE)
|
tz_local = pytz.timezone(settings.TIME_ZONE)
|
||||||
|
|
||||||
import logging
|
|
||||||
|
|
||||||
logger = logging.getLogger(__name__)
|
logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
|
||||||
class InfluxDatabase(models.Model):
|
class InfluxDatabase(models.Model):
|
||||||
datasource = models.OneToOneField(DataSource, on_delete=models.CASCADE)
|
datasource = models.OneToOneField(DataSource, on_delete=models.CASCADE)
|
||||||
|
|
||||||
|
|
@ -34,14 +30,22 @@ class InfluxDatabase(models.Model):
|
||||||
organisation = models.CharField(max_length=255)
|
organisation = models.CharField(max_length=255)
|
||||||
write_precision = models.CharField(default="ms", max_length=2)
|
write_precision = models.CharField(default="ms", max_length=2)
|
||||||
url = models.CharField(default="127.0.0.1:8086", max_length=255)
|
url = models.CharField(default="127.0.0.1:8086", max_length=255)
|
||||||
measurement_name = models.CharField(default="pyscada.models.RecordedData", max_length=255)
|
measurement_name = models.CharField(
|
||||||
only_write_to_influxdb = models.BooleanField(default=True, help_text="when selected only a copy of the data is written to the InfluxDB and the SQL Database is used for everything else")
|
default="pyscada.models.RecordedData", max_length=255
|
||||||
|
)
|
||||||
|
only_write_to_influxdb = models.BooleanField(
|
||||||
|
default=True,
|
||||||
|
help_text="when selected only a copy of the data is written to \
|
||||||
|
the InfluxDB and the SQL Database is used for everything else",
|
||||||
|
)
|
||||||
|
|
||||||
client = None
|
client = None
|
||||||
write_api = None
|
write_api = None
|
||||||
|
|
||||||
def connect(self):
|
def connect(self):
|
||||||
self.client = InfluxDBClient(url=self.url, token=self.api_key, org=self.organisation)
|
self.client = InfluxDBClient(
|
||||||
|
url=self.url, token=self.api_key, org=self.organisation
|
||||||
|
)
|
||||||
return self.client
|
return self.client
|
||||||
|
|
||||||
def get_write_api(self):
|
def get_write_api(self):
|
||||||
|
|
@ -55,14 +59,24 @@ class InfluxDatabase(models.Model):
|
||||||
return self.client.query_api()
|
return self.client.query_api()
|
||||||
|
|
||||||
def get_django_database(self):
|
def get_django_database(self):
|
||||||
return DjangoDatabase.objects.first() # the django database datasource is always the first element in the database
|
return (
|
||||||
|
DjangoDatabase.objects.first()
|
||||||
|
) # the django database datasource is always the first element
|
||||||
|
# in the database
|
||||||
|
|
||||||
def write_points(self, points):
|
def write_points(self, points):
|
||||||
write_api = self.get_write_api()
|
write_api = self.get_write_api()
|
||||||
try:
|
try:
|
||||||
write_api.write(bucket=self.bucket, org=self.organisation, record=points, write_precision=self.write_precision)
|
write_api.write(
|
||||||
|
bucket=self.bucket,
|
||||||
|
org=self.organisation,
|
||||||
|
record=points,
|
||||||
|
write_precision=self.write_precision,
|
||||||
|
)
|
||||||
except:
|
except:
|
||||||
logger.debug(f'Error in Point Data {", ".join(str(i.time) for i in points)}')
|
logger.debug(
|
||||||
|
f'Error in Point Data {", ".join(str(i.time) for i in points)}'
|
||||||
|
)
|
||||||
|
|
||||||
def to_flux_time(self, timestamp):
|
def to_flux_time(self, timestamp):
|
||||||
# if self.write_precision == "ms":
|
# if self.write_precision == "ms":
|
||||||
|
|
@ -74,15 +88,13 @@ class InfluxDatabase(models.Model):
|
||||||
|
|
||||||
def create_data_element_from_variable(self, variable, timestamp, value, **kwargs):
|
def create_data_element_from_variable(self, variable, timestamp, value, **kwargs):
|
||||||
if value is not None:
|
if value is not None:
|
||||||
date_saved = (
|
date_saved = kwargs.get(
|
||||||
kwargs["date_saved"]
|
"date_saved",
|
||||||
if "date_saved" in kwargs
|
variable.date_saved if hasattr(variable, "date_saved") else now(),
|
||||||
else variable.date_saved
|
|
||||||
if hasattr(variable, "date_saved")
|
|
||||||
else now()
|
|
||||||
)
|
)
|
||||||
#.field("id", int(int(int(timestamp) * 2097152) + variable.pk))
|
|
||||||
# variable_id and device_id had to be a tag to be easily filtered by, even if it is an numeric value
|
# variable_id and device_id had to be a tag to be easily
|
||||||
|
# filtered by, even if it is an numeric value
|
||||||
logger.debug(f"{variable} {timestamp} {value}")
|
logger.debug(f"{variable} {timestamp} {value}")
|
||||||
point = (
|
point = (
|
||||||
Point(self.measurement_name)
|
Point(self.measurement_name)
|
||||||
|
|
@ -98,7 +110,7 @@ class InfluxDatabase(models.Model):
|
||||||
return point
|
return point
|
||||||
return None
|
return None
|
||||||
|
|
||||||
def last_value(self, **kwargs):
|
def last_datapoint(self, **kwargs):
|
||||||
if self.only_write_to_influxdb:
|
if self.only_write_to_influxdb:
|
||||||
django_database = self.get_django_database()
|
django_database = self.get_django_database()
|
||||||
return django_database.last_value(**kwargs)
|
return django_database.last_value(**kwargs)
|
||||||
|
|
@ -108,16 +120,21 @@ class InfluxDatabase(models.Model):
|
||||||
kwargs.pop("use_date_saved") if "use_date_saved" in kwargs else False
|
kwargs.pop("use_date_saved") if "use_date_saved" in kwargs else False
|
||||||
)
|
)
|
||||||
start_time = "-24h"
|
start_time = "-24h"
|
||||||
query = f'from(bucket: "{self.bucket}") |> range(start: {start_time}) |> filter(fn:(r) => r._measurement == "{self.measurement_name}" ) |> filter(fn:(r) => r.variable_id == "{variable.pk}") |> filter(fn:(r) => r._field == "value") |> keep(columns: ["_time","_value"]) |> last()'
|
query = f'from(bucket: "{self.bucket}") '
|
||||||
|
query += f"|> range(start: {start_time}) "
|
||||||
|
query += f'|> filter(fn:(r) => r._measurement == "{self.measurement_name}" ) '
|
||||||
|
query += f'|> filter(fn:(r) => r.variable_id == "{variable.pk}") '
|
||||||
|
query += '|> filter(fn:(r) => r._field == "value") '
|
||||||
|
query += '|> keep(columns: ["_time","_value"]) '
|
||||||
|
query += "|> last()"
|
||||||
r = self.get_query_api().query(query)
|
r = self.get_query_api().query(query)
|
||||||
r = r.to_values(columns=['_time','_value'])
|
r = r.to_values(columns=["_time", "_value"])
|
||||||
|
|
||||||
if len(r) == 0:
|
if len(r) == 0:
|
||||||
return None
|
return None
|
||||||
return [r[0][0].timestamp(), r[0][1]]
|
return [r[0][0].timestamp(), r[0][1]]
|
||||||
|
|
||||||
|
def query_datapoints(self, **kwargs):
|
||||||
def read_multiple(self, **kwargs):
|
|
||||||
variable_ids = kwargs.pop("variable_ids") if "variable_ids" in kwargs else []
|
variable_ids = kwargs.pop("variable_ids") if "variable_ids" in kwargs else []
|
||||||
time_min = kwargs.pop("time_min") if "time_min" in kwargs else 0
|
time_min = kwargs.pop("time_min") if "time_min" in kwargs else 0
|
||||||
time_max = kwargs.pop("time_max") if "time_max" in kwargs else time.time()
|
time_max = kwargs.pop("time_max") if "time_max" in kwargs else time.time()
|
||||||
|
|
@ -139,7 +156,10 @@ class InfluxDatabase(models.Model):
|
||||||
else:
|
else:
|
||||||
f_time_scale = 1
|
f_time_scale = 1
|
||||||
if self.only_write_to_influxdb:
|
if self.only_write_to_influxdb:
|
||||||
return self.get_django_database()._import_model().objects.db_data(
|
return (
|
||||||
|
self.get_django_database()
|
||||||
|
._import_model()
|
||||||
|
.objects.db_data(
|
||||||
variable_ids=variable_ids,
|
variable_ids=variable_ids,
|
||||||
time_min=time_min,
|
time_min=time_min,
|
||||||
time_max=time_max,
|
time_max=time_max,
|
||||||
|
|
@ -147,34 +167,56 @@ class InfluxDatabase(models.Model):
|
||||||
query_first_value=query_first_value,
|
query_first_value=query_first_value,
|
||||||
**kwargs,
|
**kwargs,
|
||||||
)
|
)
|
||||||
|
)
|
||||||
values = {}
|
values = {}
|
||||||
query_api = self.get_query_api()
|
query_api = self.get_query_api()
|
||||||
tmp_time_max = time_min * f_time_scale
|
tmp_time_max = time_min * f_time_scale
|
||||||
date_saved_max = time_min * f_time_scale
|
date_saved_max = time_min * f_time_scale
|
||||||
for variable_id in variable_ids:
|
for variable_id in variable_ids:
|
||||||
query = f'from(bucket: "{self.bucket}") |> range(start: {self.to_flux_time(time_min)}, stop: {self.to_flux_time(time_max)}) |> filter(fn:(r) => r._measurement == "{self.measurement_name}" ) |> filter(fn:(r) => r.variable_id == "{variable_id}") |> filter(fn:(r) => r._field == "value") |> keep(columns: ["_time","_value"])'
|
query = f'from(bucket: "{self.bucket}") '
|
||||||
|
query += f"|> range(start: {self.to_flux_time(time_min)}, "
|
||||||
|
query += f"stop: {self.to_flux_time(time_max)}) "
|
||||||
|
query += (
|
||||||
|
f'|> filter(fn:(r) => r._measurement == "{self.measurement_name}" ) '
|
||||||
|
)
|
||||||
|
query += f'|> filter(fn:(r) => r.variable_id == "{variable_id}") '
|
||||||
|
query += '|> filter(fn:(r) => r._field == "value") '
|
||||||
|
query += '|> keep(columns: ["_time","_value"])'
|
||||||
r = query_api.query(query)
|
r = query_api.query(query)
|
||||||
new_data = [ [i_time.timestamp()*f_time_scale, i_value] for i_time, i_value in r.to_values(["_time","_value"])]
|
new_data = [
|
||||||
|
[i_time.timestamp() * f_time_scale, i_value]
|
||||||
|
for i_time, i_value in r.to_values(["_time", "_value"])
|
||||||
|
]
|
||||||
if len(new_data) == 0:
|
if len(new_data) == 0:
|
||||||
continue
|
continue
|
||||||
values[variable_id] = new_data
|
values[variable_id] = new_data
|
||||||
tmp_time_max = max(tmp_time_max, max([i_time for i_time, i_value in values[variable_id]]))
|
tmp_time_max = max(
|
||||||
query = f'from(bucket: "{self.bucket}") |> range(start: {self.to_flux_time(time_min)}, stop: {self.to_flux_time(time_max)}) |> filter(fn:(r) => r._measurement == "{self.measurement_name}" ) |> filter(fn:(r) => r.variable_id == "{variable_id}") |> filter(fn:(r) => r._field == "date_saved") |> max()'
|
tmp_time_max, max([i_time for i_time, i_value in values[variable_id]])
|
||||||
|
)
|
||||||
|
query = f'from(bucket: "{self.bucket}") '
|
||||||
|
query += f"|> range(start: {self.to_flux_time(time_min)}, "
|
||||||
|
query += f"stop: {self.to_flux_time(time_max)}) "
|
||||||
|
query += (
|
||||||
|
f'|> filter(fn:(r) => r._measurement == "{self.measurement_name}" ) '
|
||||||
|
)
|
||||||
|
query += f'|> filter(fn:(r) => r.variable_id == "{variable_id}") '
|
||||||
|
query += '|> filter(fn:(r) => r._field == "date_saved") '
|
||||||
|
query += "|> max()"
|
||||||
r = query_api.query(query)
|
r = query_api.query(query)
|
||||||
date_saved_max = max(date_saved_max, r.to_values(["_value"])[0][0])
|
date_saved_max = max(date_saved_max, r.to_values(["_value"])[0][0])
|
||||||
#values["query"] = query
|
|
||||||
values["timestamp"] = tmp_time_max
|
values["timestamp"] = tmp_time_max
|
||||||
values["date_saved_max"] = date_saved_max
|
values["date_saved_max"] = date_saved_max
|
||||||
|
|
||||||
return values
|
return values
|
||||||
|
|
||||||
def write_multiple(self, **kwargs):
|
def write_datapoints(self, **kwargs):
|
||||||
if self.only_write_to_influxdb or True:
|
if self.only_write_to_influxdb or True:
|
||||||
django_database = self.get_django_database()
|
django_database = self.get_django_database()
|
||||||
data_model = django_database._import_model()
|
data_model = django_database._import_model()
|
||||||
|
|
||||||
items = kwargs.pop("items") if "items" in kwargs else []
|
items = kwargs.pop("items") if "items" in kwargs else []
|
||||||
items = self.datasource.datasource_check(items) # FIXME what is happening here?
|
items = self.datasource.datasource_check(items)
|
||||||
points = []
|
points = []
|
||||||
recordings = []
|
recordings = []
|
||||||
date_saved = kwargs.pop("date_saved") if "date_saved" in kwargs else now()
|
date_saved = kwargs.pop("date_saved") if "date_saved" in kwargs else now()
|
||||||
|
|
@ -184,8 +226,13 @@ class InfluxDatabase(models.Model):
|
||||||
logger.debug(f"{item} has {len(item.cached_values_to_write)} to write.")
|
logger.debug(f"{item} has {len(item.cached_values_to_write)} to write.")
|
||||||
if len(item.cached_values_to_write):
|
if len(item.cached_values_to_write):
|
||||||
for cached_value in item.cached_values_to_write:
|
for cached_value in item.cached_values_to_write:
|
||||||
# add date saved if not exist in variable object, if date_saved is in kwargs it will be used instead of the variable.date_saved (see the create_data_element_from_variable function)
|
# add date saved if not exist in variable object,
|
||||||
|
# if date_saved is in kwargs it will be used instead
|
||||||
|
# of the variable.date_saved (see the
|
||||||
|
# create_data_element_from_variable function)
|
||||||
|
|
||||||
if not hasattr(item, "date_saved") or item.date_saved is None:
|
if not hasattr(item, "date_saved") or item.date_saved is None:
|
||||||
|
|
||||||
item.date_saved = date_saved
|
item.date_saved = date_saved
|
||||||
# create the recorded data object
|
# create the recorded data object
|
||||||
point = self.create_data_element_from_variable(
|
point = self.create_data_element_from_variable(
|
||||||
|
|
@ -203,18 +250,20 @@ class InfluxDatabase(models.Model):
|
||||||
points.append(point)
|
points.append(point)
|
||||||
i += 1
|
i += 1
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
if i % batch_size == 0:
|
if i % batch_size == 0:
|
||||||
self.write_points(points)
|
self.write_points(points)
|
||||||
points = []
|
points = []
|
||||||
|
|
||||||
if self.only_write_to_influxdb or True:
|
if self.only_write_to_influxdb or True:
|
||||||
try:
|
try:
|
||||||
data_model.objects.bulk_create(recordings, batch_size=batch_size, **kwargs)
|
data_model.objects.bulk_create(
|
||||||
|
recordings, batch_size=batch_size, **kwargs
|
||||||
|
)
|
||||||
except IntegrityError:
|
except IntegrityError:
|
||||||
logger.debug(
|
logger.debug(
|
||||||
f'{data_model._meta.object_name} objects already exists, retrying ignoring conflicts for : {", ".join(str(i.id) + " " + str(i.variable.id) for i in recordings)}'
|
f'{data_model._meta.object_name} objects already \
|
||||||
|
exists, retrying ignoring conflicts for : \
|
||||||
|
{", ".join(str(i.id) + " " + str(i.variable.id) for i in recordings)}'
|
||||||
)
|
)
|
||||||
data_model.objects.bulk_create(
|
data_model.objects.bulk_create(
|
||||||
recordings, ignore_conflicts=True, batch_size=batch_size, **kwargs
|
recordings, ignore_conflicts=True, batch_size=batch_size, **kwargs
|
||||||
|
|
@ -225,55 +274,3 @@ class InfluxDatabase(models.Model):
|
||||||
|
|
||||||
for item in items:
|
for item in items:
|
||||||
item.date_saved = None
|
item.date_saved = None
|
||||||
|
|
||||||
def get_first_element_timestamp(self, **kwargs):
|
|
||||||
"""
|
|
||||||
this will likly time out and should be considert non functioning!
|
|
||||||
"""
|
|
||||||
if self.only_write_to_influxdb:
|
|
||||||
django_database = self.get_django_database()
|
|
||||||
return django_database.get_first_element_timestamp(**kwargs)
|
|
||||||
|
|
||||||
return self.get_edge_element_timestamp(first_last="first", **kwargs)
|
|
||||||
|
|
||||||
def get_last_element_timestamp(self, **kwargs):
|
|
||||||
if self.only_write_to_influxdb:
|
|
||||||
django_database = self.get_django_database()
|
|
||||||
return django_database.get_last_element_timestamp(**kwargs)
|
|
||||||
|
|
||||||
return self.get_edge_element_timestamp(first_last="last", **kwargs)
|
|
||||||
|
|
||||||
def get_edge_element_timestamp(self, first_last="last", **kwargs):
|
|
||||||
if "variables" in kwargs:
|
|
||||||
variable_ids = [ v.pk for v in kwargs["variables"]]
|
|
||||||
elif "variable" in kwargs:
|
|
||||||
variable_ids = [ kwargs["variable"].pk ]
|
|
||||||
elif "variable_ids" in kwargs:
|
|
||||||
variable_ids = kwargs["variable_ids"]
|
|
||||||
elif "variable_id" in kwargs:
|
|
||||||
variable_ids = [ kwargs["variable_id"]]
|
|
||||||
else:
|
|
||||||
return None
|
|
||||||
|
|
||||||
query_api = self.get_query_api()
|
|
||||||
start_time = "-24h"
|
|
||||||
query = f'from(bucket: "{self.bucket}") |> range(start: {start_time}) |> filter(fn:(r) => r._measurement == "{self.measurement_name}" ) |> filter(fn:(r) =>'
|
|
||||||
for variable_id in variable_ids:
|
|
||||||
query += ' r.variable_id == "{variable_id}" or '
|
|
||||||
query = query[:-3]
|
|
||||||
|
|
||||||
if first_last == "last":
|
|
||||||
query += ') |> keep(columns: ["_time"]) |> sort(columns: ["_time"], desc: false) |> last(column: "_time")'
|
|
||||||
elif first_last == "first":
|
|
||||||
query += ') |> keep(columns: ["_time"]) |> sort(columns: ["_time"], desc: false) |> first(column: "_time")'
|
|
||||||
else:
|
|
||||||
return None
|
|
||||||
|
|
||||||
r = query_api.query(query)
|
|
||||||
r = r.to_values(columns=['_time'])[0]
|
|
||||||
if len(r) == 0:
|
|
||||||
return None
|
|
||||||
if first_last == "last":
|
|
||||||
return r[-1].timestamp()
|
|
||||||
elif first_last == "first":
|
|
||||||
return r[0].timestamp()
|
|
||||||
|
|
|
||||||
|
|
@ -1,5 +1,3 @@
|
||||||
from datetime import datetime
|
# from django.test import TestCase
|
||||||
|
|
||||||
from django.test import TestCase
|
|
||||||
|
|
||||||
# Create your tests here.
|
# Create your tests here.
|
||||||
|
|
@ -1,6 +1,6 @@
|
||||||
# -*- coding: utf-8 -*-
|
# -*- coding: utf-8 -*-
|
||||||
from __future__ import unicode_literals
|
from __future__ import unicode_literals
|
||||||
|
|
||||||
from django.urls import path
|
# from django.urls import path
|
||||||
|
|
||||||
urlpatterns = []
|
urlpatterns = []
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue
Block a user