From bdb1d3adcd7943fae4a26c88d83ae5ca45662cf6 Mon Sep 17 00:00:00 2001 From: Martin Schroeder Date: Wed, 28 Jan 2026 17:35:48 +0000 Subject: [PATCH] fixed influxdb writing --- pyscada/influxdb/models.py | 40 ++++++++++++++++++++++++++------------ 1 file changed, 28 insertions(+), 12 deletions(-) diff --git a/pyscada/influxdb/models.py b/pyscada/influxdb/models.py index 212af07..a203e63 100644 --- a/pyscada/influxdb/models.py +++ b/pyscada/influxdb/models.py @@ -5,10 +5,13 @@ import os import re import traceback from datetime import datetime +from django.utils.timezone import now, make_aware, is_naive import pytz from django.conf import settings from django.db import models +from django.db.utils import IntegrityError, ProgrammingError + from influxdb_client import InfluxDBClient, Point, WritePrecision from influxdb_client.client.write_api import SYNCHRONOUS @@ -16,12 +19,16 @@ from pyscada.models import DataSource, DjangoDatabase tz_local = pytz.timezone(settings.TIME_ZONE) +import logging + +logger = logging.getLogger(__name__) class InfluxDatabase(models.Model): datasource = models.OneToOneField(DataSource, on_delete=models.CASCADE) def __str__(self): return f"Influx Database ({self.url}.{self.bucket})" + bucket = models.CharField(max_length=255) api_key = models.CharField(max_length=255) organisation = models.CharField(max_length=255) @@ -29,9 +36,11 @@ class InfluxDatabase(models.Model): url = models.CharField(default="127.0.0.1:8086", max_length=255) measurement_name = models.CharField(default="pyscada.models.RecordedData", max_length=255) only_write_to_influxdb = models.BooleanField(default=True, help_text="when selected only a copy of the data is written to the InfluxDB and the SQL Database is used for everything else") + + client = None def connect(self): - self.client = InfluxDBClient(url=self.url, token=self.token, org=self.organisation) + self.client = InfluxDBClient(url=self.url, token=self.api_key, org=self.organisation) return self.client def get_write_api(self): @@ -44,10 +53,10 @@ class InfluxDatabase(models.Model): self.connect() return self.client.query_api() - def get_django_database(): - return DataSource.objects.first() # the django database datasource is always the first element in the database + def get_django_database(self): + return DjangoDatabase.objects.first() # the django database datasource is always the first element in the database - def create_data_element_from_variable(self, variable, value, timestamp, **kwargs): + def create_data_element_from_variable(self, variable, timestamp, value, **kwargs): if value is not None: date_saved = ( kwargs["date_saved"] @@ -58,6 +67,7 @@ class InfluxDatabase(models.Model): ) #.field("id", int(int(int(timestamp) * 2097152) + variable.pk)) # variable_id and device_id had to be a tag to be easily filtered by, even if it is an numeric value + logger.debug(f"{variable} {timestamp} {value}") point = ( Point(self.measurement_name) .tag("variable_id", variable.pk) @@ -66,8 +76,8 @@ class InfluxDatabase(models.Model): .tag("value_class", variable.value_class) .tag("unit", str(variable.unit)) .field("value", value) - .field("date_saved", date_saved) - .timestamp(timestamp / 1000.0) + .field("date_saved", date_saved.timestamp()) + .time(datetime.utcfromtimestamp(timestamp)) ) return point return None @@ -160,7 +170,7 @@ class InfluxDatabase(models.Model): ) if self.only_write_to_influxdb: rc = data_model.objects.create_data_element_from_variable( - item, cached_value[1], cached_value[0], **kwargs + item, cached_value[0], cached_value[0], **kwargs ) if rc is not None: recordings.append(rc) @@ -173,22 +183,28 @@ class InfluxDatabase(models.Model): if i%batch_size == 0: - write_api.write(bucket=bucket, org="tub", record=points, write_precision=self.write_precision) + try: + write_api.write(bucket=self.bucket, org=self.organisation, record=points, write_precision=self.write_precision) + except: + logger.debug(f'Error in Point Data {", ".join(str(i.time) for i in points)}') points = [] if self.only_write_to_influxdb: try: - data_model.objects.bulk_create(recorded_datas, batch_size=batch_size, **kwargs) + data_model.objects.bulk_create(recordings, batch_size=batch_size, **kwargs) except IntegrityError: logger.debug( - f'{data_model._meta.object_name} objects already exists, retrying ignoring conflicts for : {", ".join(str(i.id) + " " + str(i.variable.id) for i in recorded_datas)}' + f'{data_model._meta.object_name} objects already exists, retrying ignoring conflicts for : {", ".join(str(i.id) + " " + str(i.variable.id) for i in recordings)}' ) data_model.objects.bulk_create( - recorded_datas, ignore_conflicts=True, batch_size=batch_size, **kwargs + recordings, ignore_conflicts=True, batch_size=batch_size, **kwargs ) if len(points) > 0: - write_api.write(bucket=bucket, org="tub", record=points, write_precision=self.write_precision) + try: + write_api.write(bucket=self.bucket, org=self.organisation, record=points, write_precision=self.write_precision) + except: + logger.debug(f'Error in Point Data {", ".join(str(i.time) for i in points)}') for item in items: item.date_saved = None