55 lines
1.7 KiB
Python
55 lines
1.7 KiB
Python
|
|
import django
|
|
from time import time
|
|
from pyscada.models import Variable
|
|
from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor
|
|
|
|
|
|
|
|
def subprocess_setup():
|
|
django.setup()
|
|
from django.db import connections
|
|
for conn in connections.all():
|
|
conn.close()
|
|
|
|
def process_variables(variable_ids):
|
|
for variable_id in variable_ids:
|
|
v = Variable.objects.get(pk=variable_id)
|
|
points = []
|
|
i = 0
|
|
# now days primary key offset, -> ms
|
|
min_time = (time()-(1 * 24 * 60 * 60)) * 2097152 * 1000
|
|
data = v.recordeddata_set.filter(pk__gt=min_time)
|
|
i_max = data.count()
|
|
if i_max == 0:
|
|
continue
|
|
print(f"{v} {i} {i_max} {i/i_max*100}")
|
|
if not hasattr(v.datasource,"influxdatabase"):
|
|
continue
|
|
new_point = v.datasource.influxdatabase.create_data_element_from_variable
|
|
for rc in data.iterator():
|
|
points.append(new_point(v ,rc.timestamp, rc.value()))
|
|
i += 1
|
|
if i%5000==0:
|
|
v.datasource.influxdatabase.write_points(points)
|
|
print(f"{v} {i} {i_max} {i/i_max*100:1.2f}")
|
|
points = []
|
|
v.datasource.influxdatabase.write_points(points)
|
|
print(f"{v} {i} {i_max} {i/i_max*100:1.2f}")
|
|
points = []
|
|
|
|
variables = []
|
|
workers = 1
|
|
for worker_id in range(workers):
|
|
variables.append([])
|
|
worker = 0
|
|
|
|
for em in Variable.objects.all():
|
|
variables[worker % workers].append(em.pk)
|
|
worker += 1
|
|
|
|
#with ProcessPoolExecutor(max_workers=workers, initializer=subprocess_setup) as executor:
|
|
# for worker_id in range(workers):
|
|
# executor.submit(process_variables, variables[worker_id])
|
|
process_variables(variables[0])
|