wip
This commit is contained in:
parent
303bfafcac
commit
d42c1400d7
|
|
@ -13,7 +13,8 @@ from django.utils.timezone import now
|
||||||
from influxdb_client import InfluxDBClient, Point
|
from influxdb_client import InfluxDBClient, Point
|
||||||
from influxdb_client.client.write_api import SYNCHRONOUS
|
from influxdb_client.client.write_api import SYNCHRONOUS
|
||||||
|
|
||||||
from pyscada.models import DataSource, DjangoDatabase, Variable
|
from pyscada.models import DataSource, Variable
|
||||||
|
from pyscada.django_datasource.models import DjangoDatabase
|
||||||
|
|
||||||
tz_local = pytz.timezone(settings.TIME_ZONE)
|
tz_local = pytz.timezone(settings.TIME_ZONE)
|
||||||
logger = logging.getLogger(__name__)
|
logger = logging.getLogger(__name__)
|
||||||
|
|
@ -86,16 +87,15 @@ class InfluxDatabase(models.Model):
|
||||||
# if self.write_precision == "s":
|
# if self.write_precision == "s":
|
||||||
return int(timestamp)
|
return int(timestamp)
|
||||||
|
|
||||||
def create_data_element_from_variable(self, variable, timestamp, value, **kwargs):
|
def create_data_element_from_variable(self, variable, timestamp, value, date_saved=None, **kwargs):
|
||||||
if value is None:
|
if value is None:
|
||||||
return None
|
return None
|
||||||
if timestamp is None:
|
if timestamp is None:
|
||||||
return None
|
return None
|
||||||
date_saved = kwargs.get(
|
if date_saved is None:
|
||||||
"date_saved",
|
date_saved = now()
|
||||||
variable.date_saved if hasattr(variable, "date_saved") else now(),
|
if hasattr(variable, "date_saved") and variable.date_saved is not None:
|
||||||
)
|
date_saved = variable.date_saved
|
||||||
|
|
||||||
# variable_id and device_id had to be a tag to be easily
|
# variable_id and device_id had to be a tag to be easily
|
||||||
# filtered by, even if it is an numeric value
|
# filtered by, even if it is an numeric value
|
||||||
logger.debug(f"{variable} {timestamp} {value}")
|
logger.debug(f"{variable} {timestamp} {value}")
|
||||||
|
|
@ -108,19 +108,15 @@ class InfluxDatabase(models.Model):
|
||||||
.tag("unit", str(variable.unit))
|
.tag("unit", str(variable.unit))
|
||||||
.field("value", value)
|
.field("value", value)
|
||||||
.field("date_saved", date_saved.timestamp())
|
.field("date_saved", date_saved.timestamp())
|
||||||
.time(datetime.utcfromtimestamp(timestamp/1000.0))
|
.time(datetime.utcfromtimestamp(timestamp))
|
||||||
)
|
)
|
||||||
return point
|
return point
|
||||||
|
|
||||||
def last_datapoint(self, **kwargs):
|
def last_datapoint(self, variable=None, use_date_saved=False, **kwargs):
|
||||||
if self.only_write_to_influxdb:
|
if self.only_write_to_influxdb:
|
||||||
django_database = self.get_django_database()
|
django_database = self.get_django_database()
|
||||||
return django_database.last_datapoint(**kwargs)
|
return django_database.last_datapoint(variable=variable, use_date_saved=use_date_saved, **kwargs)
|
||||||
|
|
||||||
variable = kwargs.pop("variable") if "variable" in kwargs else None
|
|
||||||
use_date_saved = (
|
|
||||||
kwargs.pop("use_date_saved") if "use_date_saved" in kwargs else False
|
|
||||||
)
|
|
||||||
start_time = "-24h"
|
start_time = "-24h"
|
||||||
query = f'from(bucket: "{self.bucket}") '
|
query = f'from(bucket: "{self.bucket}") '
|
||||||
query += f"|> range(start: {start_time}) "
|
query += f"|> range(start: {start_time}) "
|
||||||
|
|
@ -136,27 +132,16 @@ class InfluxDatabase(models.Model):
|
||||||
return None
|
return None
|
||||||
return [r[0][0].timestamp(), r[0][1]]
|
return [r[0][0].timestamp(), r[0][1]]
|
||||||
|
|
||||||
def query_datapoints(self, **kwargs):
|
def query_datapoints(self, variable_ids, time_min=0, time_max=None, query_first_value=False, **kwargs):
|
||||||
variable_ids = kwargs.pop("variable_ids") if "variable_ids" in kwargs else []
|
|
||||||
time_min = kwargs.pop("time_min") if "time_min" in kwargs else 0
|
|
||||||
time_max = kwargs.pop("time_max") if "time_max" in kwargs else time.time()
|
if time_max is None:
|
||||||
time_in_ms = kwargs.pop("time_in_ms") if "time_in_ms" in kwargs else True
|
time_max = time.time()
|
||||||
query_first_value = (
|
|
||||||
kwargs.pop("query_first_value") if "query_first_value" in kwargs else False
|
|
||||||
)
|
|
||||||
variable_ids = self.datasource.datasource_check(
|
variable_ids = self.datasource.datasource_check(
|
||||||
variable_ids, items_as_id=True, ids_model=Variable
|
variable_ids, items_as_id=True, ids_model=Variable
|
||||||
)
|
)
|
||||||
|
|
||||||
if kwargs.get("time_min_excluded", False):
|
|
||||||
time_min = time_min + 0.001
|
|
||||||
if kwargs.get("time_max_excluded", False):
|
|
||||||
time_max = time_max - 0.001
|
|
||||||
|
|
||||||
if time_in_ms:
|
|
||||||
f_time_scale = 1000
|
|
||||||
else:
|
|
||||||
f_time_scale = 1
|
|
||||||
if self.only_write_to_influxdb:
|
if self.only_write_to_influxdb:
|
||||||
return (
|
return (
|
||||||
self.get_django_database()
|
self.get_django_database()
|
||||||
|
|
@ -165,15 +150,14 @@ class InfluxDatabase(models.Model):
|
||||||
variable_ids=variable_ids,
|
variable_ids=variable_ids,
|
||||||
time_min=time_min,
|
time_min=time_min,
|
||||||
time_max=time_max,
|
time_max=time_max,
|
||||||
time_in_ms=time_in_ms,
|
|
||||||
query_first_value=query_first_value,
|
query_first_value=query_first_value,
|
||||||
**kwargs,
|
**kwargs,
|
||||||
)
|
)
|
||||||
)
|
)
|
||||||
values = {}
|
values = {}
|
||||||
query_api = self.get_query_api()
|
query_api = self.get_query_api()
|
||||||
tmp_time_max = time_min * f_time_scale
|
tmp_time_max = time_min
|
||||||
date_saved_max = time_min * f_time_scale
|
date_saved_max = time_min
|
||||||
for variable_id in variable_ids:
|
for variable_id in variable_ids:
|
||||||
query = f'from(bucket: "{self.bucket}") '
|
query = f'from(bucket: "{self.bucket}") '
|
||||||
query += f"|> range(start: {self.to_flux_time(time_min)}, "
|
query += f"|> range(start: {self.to_flux_time(time_min)}, "
|
||||||
|
|
@ -186,7 +170,7 @@ class InfluxDatabase(models.Model):
|
||||||
query += '|> keep(columns: ["_time","_value"])'
|
query += '|> keep(columns: ["_time","_value"])'
|
||||||
r = query_api.query(query)
|
r = query_api.query(query)
|
||||||
new_data = [
|
new_data = [
|
||||||
[i_time.timestamp() * f_time_scale, i_value]
|
[i_time.timestamp(), i_value]
|
||||||
for i_time, i_value in r.to_values(["_time", "_value"])
|
for i_time, i_value in r.to_values(["_time", "_value"])
|
||||||
]
|
]
|
||||||
if len(new_data) == 0:
|
if len(new_data) == 0:
|
||||||
|
|
@ -212,17 +196,14 @@ class InfluxDatabase(models.Model):
|
||||||
|
|
||||||
return values
|
return values
|
||||||
|
|
||||||
def write_datapoints(self, **kwargs):
|
def write_datapoints(self, items:list, date_saved=None,batch_size=5000, **kwargs):
|
||||||
if self.only_write_to_influxdb or True:
|
if self.only_write_to_influxdb or True:
|
||||||
django_database = self.get_django_database()
|
django_database = self.get_django_database()
|
||||||
data_model = django_database._import_model()
|
data_model = django_database._import_model()
|
||||||
|
|
||||||
items = kwargs.pop("items") if "items" in kwargs else []
|
|
||||||
items = self.datasource.datasource_check(items)
|
items = self.datasource.datasource_check(items)
|
||||||
points = []
|
points = []
|
||||||
recordings = []
|
recordings = []
|
||||||
date_saved = kwargs.pop("date_saved") if "date_saved" in kwargs else now()
|
|
||||||
batch_size = kwargs.pop("batch_size") if "batch_size" in kwargs else 5000
|
|
||||||
i = 0
|
i = 0
|
||||||
logger.debug(f"write_datapoints {items}")
|
logger.debug(f"write_datapoints {items}")
|
||||||
for item in items:
|
for item in items:
|
||||||
|
|
@ -235,15 +216,15 @@ class InfluxDatabase(models.Model):
|
||||||
# create_data_element_from_variable function)
|
# create_data_element_from_variable function)
|
||||||
|
|
||||||
if not hasattr(item, "date_saved") or item.date_saved is None:
|
if not hasattr(item, "date_saved") or item.date_saved is None:
|
||||||
|
|
||||||
item.date_saved = date_saved
|
item.date_saved = date_saved
|
||||||
|
|
||||||
# create the recorded data object
|
# create the recorded data object
|
||||||
point = self.create_data_element_from_variable(
|
point = self.create_data_element_from_variable(
|
||||||
item, cached_value[0], cached_value[1], **kwargs
|
variable=item, timestamp=cached_value[0], value=cached_value[1], date_saved=date_saved, **kwargs
|
||||||
)
|
)
|
||||||
if self.only_write_to_influxdb or True:
|
if self.only_write_to_influxdb or True:
|
||||||
rc = data_model.objects.create_data_element_from_variable(
|
rc = data_model.objects.create_data_element_from_variable(
|
||||||
item, cached_value[0], cached_value[0], **kwargs
|
variable=item, value=cached_value[1], timestamp=cached_value[0], date_saved=date_saved, **kwargs
|
||||||
)
|
)
|
||||||
if rc is not None:
|
if rc is not None:
|
||||||
recordings.append(rc)
|
recordings.append(rc)
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue
Block a user