fixed influxdb writing
This commit is contained in:
parent
5c739a412f
commit
bdb1d3adcd
|
|
@ -5,10 +5,13 @@ import os
|
|||
import re
|
||||
import traceback
|
||||
from datetime import datetime
|
||||
from django.utils.timezone import now, make_aware, is_naive
|
||||
import pytz
|
||||
from django.conf import settings
|
||||
from django.db import models
|
||||
|
||||
from django.db.utils import IntegrityError, ProgrammingError
|
||||
|
||||
from influxdb_client import InfluxDBClient, Point, WritePrecision
|
||||
from influxdb_client.client.write_api import SYNCHRONOUS
|
||||
|
||||
|
|
@ -16,12 +19,16 @@ from pyscada.models import DataSource, DjangoDatabase
|
|||
|
||||
tz_local = pytz.timezone(settings.TIME_ZONE)
|
||||
|
||||
import logging
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
class InfluxDatabase(models.Model):
|
||||
datasource = models.OneToOneField(DataSource, on_delete=models.CASCADE)
|
||||
|
||||
def __str__(self):
|
||||
return f"Influx Database ({self.url}.{self.bucket})"
|
||||
|
||||
bucket = models.CharField(max_length=255)
|
||||
api_key = models.CharField(max_length=255)
|
||||
organisation = models.CharField(max_length=255)
|
||||
|
|
@ -30,8 +37,10 @@ class InfluxDatabase(models.Model):
|
|||
measurement_name = models.CharField(default="pyscada.models.RecordedData", max_length=255)
|
||||
only_write_to_influxdb = models.BooleanField(default=True, help_text="when selected only a copy of the data is written to the InfluxDB and the SQL Database is used for everything else")
|
||||
|
||||
client = None
|
||||
|
||||
def connect(self):
|
||||
self.client = InfluxDBClient(url=self.url, token=self.token, org=self.organisation)
|
||||
self.client = InfluxDBClient(url=self.url, token=self.api_key, org=self.organisation)
|
||||
return self.client
|
||||
|
||||
def get_write_api(self):
|
||||
|
|
@ -44,10 +53,10 @@ class InfluxDatabase(models.Model):
|
|||
self.connect()
|
||||
return self.client.query_api()
|
||||
|
||||
def get_django_database():
|
||||
return DataSource.objects.first() # the django database datasource is always the first element in the database
|
||||
def get_django_database(self):
|
||||
return DjangoDatabase.objects.first() # the django database datasource is always the first element in the database
|
||||
|
||||
def create_data_element_from_variable(self, variable, value, timestamp, **kwargs):
|
||||
def create_data_element_from_variable(self, variable, timestamp, value, **kwargs):
|
||||
if value is not None:
|
||||
date_saved = (
|
||||
kwargs["date_saved"]
|
||||
|
|
@ -58,6 +67,7 @@ class InfluxDatabase(models.Model):
|
|||
)
|
||||
#.field("id", int(int(int(timestamp) * 2097152) + variable.pk))
|
||||
# variable_id and device_id had to be a tag to be easily filtered by, even if it is an numeric value
|
||||
logger.debug(f"{variable} {timestamp} {value}")
|
||||
point = (
|
||||
Point(self.measurement_name)
|
||||
.tag("variable_id", variable.pk)
|
||||
|
|
@ -66,8 +76,8 @@ class InfluxDatabase(models.Model):
|
|||
.tag("value_class", variable.value_class)
|
||||
.tag("unit", str(variable.unit))
|
||||
.field("value", value)
|
||||
.field("date_saved", date_saved)
|
||||
.timestamp(timestamp / 1000.0)
|
||||
.field("date_saved", date_saved.timestamp())
|
||||
.time(datetime.utcfromtimestamp(timestamp))
|
||||
)
|
||||
return point
|
||||
return None
|
||||
|
|
@ -160,7 +170,7 @@ class InfluxDatabase(models.Model):
|
|||
)
|
||||
if self.only_write_to_influxdb:
|
||||
rc = data_model.objects.create_data_element_from_variable(
|
||||
item, cached_value[1], cached_value[0], **kwargs
|
||||
item, cached_value[0], cached_value[0], **kwargs
|
||||
)
|
||||
if rc is not None:
|
||||
recordings.append(rc)
|
||||
|
|
@ -173,22 +183,28 @@ class InfluxDatabase(models.Model):
|
|||
|
||||
|
||||
if i%batch_size == 0:
|
||||
write_api.write(bucket=bucket, org="tub", record=points, write_precision=self.write_precision)
|
||||
try:
|
||||
write_api.write(bucket=self.bucket, org=self.organisation, record=points, write_precision=self.write_precision)
|
||||
except:
|
||||
logger.debug(f'Error in Point Data {", ".join(str(i.time) for i in points)}')
|
||||
points = []
|
||||
|
||||
if self.only_write_to_influxdb:
|
||||
try:
|
||||
data_model.objects.bulk_create(recorded_datas, batch_size=batch_size, **kwargs)
|
||||
data_model.objects.bulk_create(recordings, batch_size=batch_size, **kwargs)
|
||||
except IntegrityError:
|
||||
logger.debug(
|
||||
f'{data_model._meta.object_name} objects already exists, retrying ignoring conflicts for : {", ".join(str(i.id) + " " + str(i.variable.id) for i in recorded_datas)}'
|
||||
f'{data_model._meta.object_name} objects already exists, retrying ignoring conflicts for : {", ".join(str(i.id) + " " + str(i.variable.id) for i in recordings)}'
|
||||
)
|
||||
data_model.objects.bulk_create(
|
||||
recorded_datas, ignore_conflicts=True, batch_size=batch_size, **kwargs
|
||||
recordings, ignore_conflicts=True, batch_size=batch_size, **kwargs
|
||||
)
|
||||
|
||||
if len(points) > 0:
|
||||
write_api.write(bucket=bucket, org="tub", record=points, write_precision=self.write_precision)
|
||||
try:
|
||||
write_api.write(bucket=self.bucket, org=self.organisation, record=points, write_precision=self.write_precision)
|
||||
except:
|
||||
logger.debug(f'Error in Point Data {", ".join(str(i.time) for i in points)}')
|
||||
|
||||
for item in items:
|
||||
item.date_saved = None
|
||||
|
|
|
|||
Loading…
Reference in New Issue
Block a user