fixed multible_read
This commit is contained in:
parent
bdb1d3adcd
commit
b2014e23be
|
|
@ -15,7 +15,7 @@ from django.db.utils import IntegrityError, ProgrammingError
|
||||||
from influxdb_client import InfluxDBClient, Point, WritePrecision
|
from influxdb_client import InfluxDBClient, Point, WritePrecision
|
||||||
from influxdb_client.client.write_api import SYNCHRONOUS
|
from influxdb_client.client.write_api import SYNCHRONOUS
|
||||||
|
|
||||||
from pyscada.models import DataSource, DjangoDatabase
|
from pyscada.models import DataSource, DjangoDatabase, Variable
|
||||||
|
|
||||||
tz_local = pytz.timezone(settings.TIME_ZONE)
|
tz_local = pytz.timezone(settings.TIME_ZONE)
|
||||||
|
|
||||||
|
|
@ -38,6 +38,7 @@ class InfluxDatabase(models.Model):
|
||||||
only_write_to_influxdb = models.BooleanField(default=True, help_text="when selected only a copy of the data is written to the InfluxDB and the SQL Database is used for everything else")
|
only_write_to_influxdb = models.BooleanField(default=True, help_text="when selected only a copy of the data is written to the InfluxDB and the SQL Database is used for everything else")
|
||||||
|
|
||||||
client = None
|
client = None
|
||||||
|
write_api = None
|
||||||
|
|
||||||
def connect(self):
|
def connect(self):
|
||||||
self.client = InfluxDBClient(url=self.url, token=self.api_key, org=self.organisation)
|
self.client = InfluxDBClient(url=self.url, token=self.api_key, org=self.organisation)
|
||||||
|
|
@ -56,6 +57,21 @@ class InfluxDatabase(models.Model):
|
||||||
def get_django_database(self):
|
def get_django_database(self):
|
||||||
return DjangoDatabase.objects.first() # the django database datasource is always the first element in the database
|
return DjangoDatabase.objects.first() # the django database datasource is always the first element in the database
|
||||||
|
|
||||||
|
def write_points(self, points):
|
||||||
|
write_api = self.get_write_api()
|
||||||
|
try:
|
||||||
|
write_api.write(bucket=self.bucket, org=self.organisation, record=points, write_precision=self.write_precision)
|
||||||
|
except:
|
||||||
|
logger.debug(f'Error in Point Data {", ".join(str(i.time) for i in points)}')
|
||||||
|
|
||||||
|
def to_flux_time(self, timestamp):
|
||||||
|
#if self.write_precision == "ms":
|
||||||
|
# return int(timestamp*1000)
|
||||||
|
#if self.write_precision == "ns":
|
||||||
|
# return int(timestamp*1000*1000)
|
||||||
|
#if self.write_precision == "s":
|
||||||
|
return int(timestamp)
|
||||||
|
|
||||||
def create_data_element_from_variable(self, variable, timestamp, value, **kwargs):
|
def create_data_element_from_variable(self, variable, timestamp, value, **kwargs):
|
||||||
if value is not None:
|
if value is not None:
|
||||||
date_saved = (
|
date_saved = (
|
||||||
|
|
@ -91,7 +107,7 @@ class InfluxDatabase(models.Model):
|
||||||
use_date_saved = (
|
use_date_saved = (
|
||||||
kwargs.pop("use_date_saved") if "use_date_saved" in kwargs else False
|
kwargs.pop("use_date_saved") if "use_date_saved" in kwargs else False
|
||||||
)
|
)
|
||||||
query = f'from(bucket: "{self.bucket}") |> range(start: {time_min}) |> filter(fn:(r) => r._measurement == "{self.measurement_name}" ) |> filter(fn:(r) => r.variable_id == "{variable_id}") |> filter(fn:(r) => r._field == "value") |> keep(columns: ["_time","_value"]) |> last()'
|
query = f'from(bucket: "{self.bucket}") |> range(start: {self.to_flux_time(time_min)}) |> filter(fn:(r) => r._measurement == "{self.measurement_name}" ) |> filter(fn:(r) => r.variable_id == "{variable_id}") |> filter(fn:(r) => r._field == "value") |> keep(columns: ["_time","_value"]) |> last()'
|
||||||
r = query_api.query(query)
|
r = query_api.query(query)
|
||||||
r = r.to_values(columns=['_time','_value'])[0]
|
r = r.to_values(columns=['_time','_value'])[0]
|
||||||
|
|
||||||
|
|
@ -128,19 +144,22 @@ class InfluxDatabase(models.Model):
|
||||||
|
|
||||||
values = {}
|
values = {}
|
||||||
query_api = self.get_query_api()
|
query_api = self.get_query_api()
|
||||||
tmp_time_max = time_min
|
tmp_time_max = time_min*f_time_scale
|
||||||
date_saved_max = time_min
|
date_saved_max = time_min*f_time_scale
|
||||||
for variable_id in variable_ids:
|
for variable_id in variable_ids:
|
||||||
query = f'from(bucket: "{self.bucket}") |> range(start: {time_min}, stop: {time_max}) |> filter(fn:(r) => r._measurement == "{self.measurement_name}" ) |> filter(fn:(r) => r.variable_id == "{variable_id}") |> filter(fn:(r) => r._field == "value") |> keep(columns: ["_time","_value"])'
|
query = f'from(bucket: "{self.bucket}") |> range(start: {self.to_flux_time(time_min)}, stop: {self.to_flux_time(time_max)}) |> filter(fn:(r) => r._measurement == "{self.measurement_name}" ) |> filter(fn:(r) => r.variable_id == "{variable_id}") |> filter(fn:(r) => r._field == "value") |> keep(columns: ["_time","_value"])'
|
||||||
r = query_api.query(query)
|
r = query_api.query(query)
|
||||||
values[variable_id] = [ [i_time.timestamp(), i_value] for i_time, i_value in r.to_values(["_time","_value"])]
|
new_data = [ [i_time.timestamp()*f_time_scale, i_value] for i_time, i_value in r.to_values(["_time","_value"])]
|
||||||
|
if len(new_data) == 0:
|
||||||
|
continue
|
||||||
|
values[variable_id] = new_data
|
||||||
tmp_time_max = max(tmp_time_max, max([i_time for i_time, i_value in values[variable_id]]))
|
tmp_time_max = max(tmp_time_max, max([i_time for i_time, i_value in values[variable_id]]))
|
||||||
query = f'from(bucket: "{self.bucket}") |> range(start: {time_min}, stop: {time_max}) |> filter(fn:(r) => r._measurement == "{self.measurement_name}" ) |> filter(fn:(r) => r.variable_id == "{variable_id}") |> filter(fn:(r) => r._field == "date_saved") |> max()'
|
query = f'from(bucket: "{self.bucket}") |> range(start: {self.to_flux_time(time_min)}, stop: {self.to_flux_time(time_max)}) |> filter(fn:(r) => r._measurement == "{self.measurement_name}" ) |> filter(fn:(r) => r.variable_id == "{variable_id}") |> filter(fn:(r) => r._field == "date_saved") |> max()'
|
||||||
r = query_api.query(query)
|
r = query_api.query(query)
|
||||||
date_saved_max = max(date_saved_max, r.to_values(["_value"])[0][0])
|
date_saved_max = max(date_saved_max, r.to_values(["_value"])[0][0])
|
||||||
|
#values["query"] = query
|
||||||
values["timestamp"] = tmp_time_max * f_time_scale
|
values["timestamp"] = tmp_time_max
|
||||||
values["date_saved_max"] = date_saved_max * f_time_scale
|
values["date_saved_max"] = date_saved_max
|
||||||
|
|
||||||
return values
|
return values
|
||||||
|
|
||||||
|
|
@ -154,7 +173,6 @@ class InfluxDatabase(models.Model):
|
||||||
points = []
|
points = []
|
||||||
recordings = []
|
recordings = []
|
||||||
date_saved = kwargs.pop("date_saved") if "date_saved" in kwargs else now()
|
date_saved = kwargs.pop("date_saved") if "date_saved" in kwargs else now()
|
||||||
write_api = self.get_write_api()
|
|
||||||
batch_size = kwargs.pop("batch_size") if "batch_size" in kwargs else 5000
|
batch_size = kwargs.pop("batch_size") if "batch_size" in kwargs else 5000
|
||||||
i = 0
|
i = 0
|
||||||
for item in items:
|
for item in items:
|
||||||
|
|
@ -183,10 +201,7 @@ class InfluxDatabase(models.Model):
|
||||||
|
|
||||||
|
|
||||||
if i%batch_size == 0:
|
if i%batch_size == 0:
|
||||||
try:
|
self.write_points(points)
|
||||||
write_api.write(bucket=self.bucket, org=self.organisation, record=points, write_precision=self.write_precision)
|
|
||||||
except:
|
|
||||||
logger.debug(f'Error in Point Data {", ".join(str(i.time) for i in points)}')
|
|
||||||
points = []
|
points = []
|
||||||
|
|
||||||
if self.only_write_to_influxdb:
|
if self.only_write_to_influxdb:
|
||||||
|
|
@ -201,10 +216,7 @@ class InfluxDatabase(models.Model):
|
||||||
)
|
)
|
||||||
|
|
||||||
if len(points) > 0:
|
if len(points) > 0:
|
||||||
try:
|
self.write_points(points)
|
||||||
write_api.write(bucket=self.bucket, org=self.organisation, record=points, write_precision=self.write_precision)
|
|
||||||
except:
|
|
||||||
logger.debug(f'Error in Point Data {", ".join(str(i.time) for i in points)}')
|
|
||||||
|
|
||||||
for item in items:
|
for item in items:
|
||||||
item.date_saved = None
|
item.date_saved = None
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue
Block a user