Sonnendach/Sonnendach.py

581 lines
25 KiB
Python

import tkinter
import threading
import matplotlib.pyplot as plt
import selenium.webdriver.firefox.service
import webdriver_manager.chrome
from selenium import webdriver
from selenium.webdriver.chrome.service import Service
from selenium.webdriver.firefox.service import Service
from webdriver_manager.chrome import ChromeDriverManager
from webdriver_manager.firefox import DriverManager
from selenium.webdriver.common.by import By
from selenium.webdriver.common.keys import Keys
from PIL import Image
import os
import qrcode
from tkinter import filedialog
import time
import datetime
from dateutil.relativedelta import relativedelta
from selenium.webdriver.common.action_chains import ActionChains
import json
import requests
import statistics
from webdriver_manager.firefox import GeckoDriverManager
DropDownLabels = ["Street: ", "Number: ", "Postal code: ", "City: ", "coordinate X", "coordinate Y", "Sonnendach URL: ", "Suitability:", "Image Filename: ", "PV Production 100", "Value Electricity production", "Roof area", "existing PV Production"]
file_split_char = ","
minRoofSize = 2
minSuitability = 3
electricityPrice = 0.198
OptionList = []
exit = False
stopThread = False
filename_adresslist = ""
adresslist = ""
step = 0
# service = Service(webdriver_manager.chrome.ChromeDriverManager().install())
service = selenium.webdriver.firefox.service.Service(GeckoDriverManager().install())
# driver = webdriver.Chrome(service=service)
driver = webdriver.Firefox(service=service)
driver.minimize_window()
outputtext = "Welcome to application Sonnendach\n"
columnIndexes = []
storeToDolibarr = False
dolibarrURL = "https://doli.solar-evolution.ch/api/index.php/thirdparties"
dolibarrApiKey = "riE0jT3yvpIT"
dolibarrCountryID = "6" # 6 = Schweiz
dolibarrCountryCode = "CH"
dolibarrStateID = "403" # 403 = Kanton Zürich
def search_adresses(adress_list, filename_adresslist, driver, mainText):
print("function to search Addresses called")
global file_split_char
global stopThread
global columnIndexes
global outputtext
global checkBoxCreateImages
global minRoofSize
global minSuitability
global electricityPrice
global storeToDolibarr
global dolibarrURL
global dolibarrApiKey
#Create Direcotrys to save screenshots and qrcodes if they don't exist yet.
image_folder_map = "screenshots_map/"
image_folder_production = "screenshots_production/"
image_folder_qrcode = "qrcodes/"
image_folder_barcharts = "barcharts/"
if(not os.path.exists(image_folder_map[:-1])):
os.makedirs(image_folder_map[:-1], exist_ok=False)
# if(not os.path.exists(image_folder_production[:-1])):
# os.makedirs(image_folder_production[:-1], exist_ok=False)
if (not os.path.exists(image_folder_qrcode[:-1])):
os.makedirs(image_folder_qrcode[:-1], exist_ok=False)
if (not os.path.exists(image_folder_barcharts[:-1])):
os.makedirs(image_folder_barcharts[:-1], exist_ok=False)
startTime = datetime.datetime.now()
address_count = len(adress_list)
#Search for each adress in the list and create screenshots and qrcodes
print("Starting Address Loop")
for i in range(address_count):
url = "not-found"
suitability = "not-found"
sumEnergyRoofs = "not-found"
value_electricity_production = "not-found"
sumRoofArea = "not-found"
featureId = "not-found"
response_all_roofs_decoded = "not-found"
coordinates = ("not-found", "not-found")
found = True
line = adress_list[i]
adress = line.split(file_split_char)
barChartMonth = [0] * 12
barChartData = [0] * 12
if((line != adress_list[0]) and (adress[columnIndexes[6]] == "") and (((len(adress[columnIndexes[0]]) > 0) and (len(adress[columnIndexes[1]]) > 0) and (len(adress[columnIndexes[2]]) > 0) and (len(adress[columnIndexes[3]]) > 0)) or ((len(adress[columnIndexes[4]]) > 0) and (len(adress[columnIndexes[5]]) > 0)))):
address_string = adress[columnIndexes[0]] + " " + adress[columnIndexes[1]] + " " + adress[columnIndexes[2]] + " " + adress[columnIndexes[3]]
print("searching for Address: " + address_string)
if(not ((len(adress[columnIndexes[4]]) > 0) and (len(adress[columnIndexes[5]]) > 0))):
# Get Coordinates for Address
try:
# Address to coordinates
params = {
"type": "locations",
"searchText": address_string,
"lang": "de"}
locationsResponse = requests.get('https://api3.geo.admin.ch/rest/services/api/SearchServer',
params=params)
locations = json.loads(locationsResponse.content.decode())
firstLocation = locations['results'][0]['attrs']
# you need the coordinates from other services, once you have them
coordinates = (float(firstLocation['y']), float(firstLocation['x'])) # example coordinates
except:
found = False
else:
# Get Coordinates from File
coordinate_x = float(adress[columnIndexes[4]]) % 1000000
coordinate_y = float(adress[columnIndexes[5]]) % 1000000
coordinates = (coordinate_x, coordinate_y)
if(found):
# Get Building ID from coordinates
try:
coordinates_str = ','.join(map(str, coordinates))
tolerance = 1
# this is wrong, bbox should be computed using tolerance and pixel size, but there are no problems for the
# purpose of this exercise
bbox = (coordinates[0] - tolerance, coordinates[1] - tolerance, coordinates[0] + tolerance,
coordinates[1] + tolerance)
bbox_str = ','.join(map(str, bbox))
params = {
"geometryType": "esriGeometryPoint",
"returnGeometry": "true",
"layers": "all:ch.bfe.solarenergie-eignung-daecher",
"geometry": coordinates_str,
"mapExtent": bbox_str,
"imageDisplay": "1391,1070,96",
"tolerance": tolerance,
"order": "distance",
"lang": "de",
}
# retrieve one roof and the building_id associated
response_one_roof = requests.get('https://api3.geo.admin.ch/rest/services/api/MapServer/identify',
params=params)
response_one_roof_decoded = json.loads(response_one_roof.content.decode())
# results = (response_one_roof_decoded['results'])
building_id = (response_one_roof_decoded)['results'][0]['attributes']['building_id']
except:
found = False
if(found):
try:
# use the building_id of previous request to get all roofs associated with this building_id
params = {
'layer': 'ch.bfe.solarenergie-eignung-daecher',
'searchField': 'building_id',
'contains': 'false',
'searchText': str(building_id),
'returnGeometry': 'false'
}
response_all_roofs = requests.get('https://api3.geo.admin.ch/rest/services/api/MapServer/find',
params=params)
response_all_roofs_decoded = json.loads(response_all_roofs.content.decode())['results']
if(len(response_all_roofs_decoded) > 0):
sumEnergyRoofs = 0
sumRoofArea = 0
suitabilities = []
bestRoofEnergy = 0
bestRoof = response_all_roofs_decoded[0]
barChartMonth = response_all_roofs_decoded[0]['attributes']['monate']
barChartData = [0] * 12
for roof in response_all_roofs_decoded:
energy = float(roof['attributes']['gstrahlung'])*0.2*0.8
if(energy > bestRoofEnergy):
bestRoofEnergy = energy
bestRoof = roof
if(float(roof['attributes']['flaeche']) > minRoofSize and int(roof['attributes']['klasse']) >= minSuitability):
for month_count in range(12):
barChartData[month_count] = barChartData[month_count] + float(
roof['attributes']['monats_ertrag'][month_count]) * float(
roof['attributes']['flaeche']) * electricityPrice
sumEnergyRoofs = sumEnergyRoofs + energy
sumRoofArea = sumRoofArea + float(roof['attributes']['flaeche'])
suitabilities.append(int(roof['attributes']['klasse']))
featureId = str(bestRoof['featureId'])
url = "https://www.uvek-gis.admin.ch/BFE/sonnendach/index.html?featureId=" + featureId + "&lang=de"
suitability = max(suitabilities)
value_electricity_production = sumEnergyRoofs * electricityPrice
else:
found = False
except:
found = False
roof_is_suitable = True # TODO decide if roof is suitable
existing_solar_panel_power = "none"
roof_has_solarpanel = False # TODO check if roof has solar Panel
#Check if there is solar panel existing
address_string_for_existing_solarpanel = adress[columnIndexes[0]] + " " + adress[columnIndexes[1]] + ", " + adress[columnIndexes[2]] + " " + adress[columnIndexes[3]]
try:
params = {
'searchText': str(address_string_for_existing_solarpanel),
'searchField': 'address',
'layer': "ch.bfe.elektrizitaetsproduktionsanlagen",
'returnGeometry': 'false',
'contains': 'false'
}
repsonse_solar_panel = requests.get('https://api3.geo.admin.ch/rest/services/api/MapServer/find',
params=params)
repsonse_solar_panel_decoded = json.loads(repsonse_solar_panel.content.decode())
results = (repsonse_solar_panel_decoded['results'])
if(len(results) > 0):
existing_solar_panel_power = results[0]['attributes']['total_power']
except:
existing_solar_panel_power = "not-found"
roof_has_solarpanel = False
if(found and createImages.get() > 0 and roof_is_suitable and (not roof_has_solarpanel)):
image_filename = address_string + ".png"
barchart_color = ""
if(suitability == 1):
barchart_color = "#00c5ff"
elif(suitability == 2):
barchart_color = "#ffff00"
elif(suitability == 3):
barchart_color = "#ffaa00"
elif(suitability == 4):
barchart_color = "#ff5500"
elif(suitability == 5):
barchart_color = "#a80000"
create_images(address_string, driver, image_filename, image_folder_map, image_folder_production, image_folder_qrcode, image_folder_barcharts,
url, barChartMonth, barChartData, barchart_color)
else:
image_filename = "not-created"
print("saving Address: " + address_string)
save_information_to_addresslist(adress, adress_list, columnIndexes, coordinates, file_split_char, i,
image_filename, sumEnergyRoofs, sumRoofArea, suitability, url,
value_electricity_production, existing_solar_panel_power)
save_addresslist_in_file(adress_list, filename_adresslist)
if(found):
street = adress[columnIndexes[0]] + " " + adress[columnIndexes[1]]
plz = adress[columnIndexes[2]]
city = adress[columnIndexes[3]]
create_Partner_in_dolibarr(street, plz, city, coordinates, sumEnergyRoofs, sumRoofArea, suitability, featureId, url, response_all_roofs_decoded)
print("saved Address: " + address_string)
print("Address " + str(i) + " of " + str(address_count))
timediff = relativedelta(datetime.datetime.now(), startTime)
print("Process is running since %d years %d months %d days %d hours %d minutes %d seconds" % (
timediff.years, timediff.months, timediff.days, timediff.hours, timediff.minutes, timediff.seconds))
if(stopThread == True):
print("closing Thread")
return
outputtext = outputtext + "Reached end of list.\nPress Exit to close the application." + "\n"
mainText.config(text=outputtext)
print("closing Thread")
return
def save_information_to_addresslist(adress, adress_list, columnIndexes, coordinates, file_split_char, list_index,
image_filename, pv_Production100, roof_area, suitability, url,
value_electricity_production, existing_solar_panel_power):
# Save Information in Addresslist
adress[columnIndexes[4]] = str(coordinates[0])
adress[columnIndexes[5]] = str(coordinates[1])
adress[columnIndexes[6]] = url
adress[columnIndexes[7]] = suitability
adress[columnIndexes[8]] = image_filename
adress[columnIndexes[9]] = pv_Production100
adress[columnIndexes[10]] = value_electricity_production
adress[columnIndexes[11]] = roof_area
adress[columnIndexes[12]] = existing_solar_panel_power
new_line_string = ""
for j in adress:
new_line_string = new_line_string + str(j) + file_split_char
adress_list[list_index] = new_line_string
def save_addresslist_in_file(adress_list, filename_adresslist):
new_adress_list = ""
for j in adress_list:
new_adress_list = new_adress_list + (j) + "\n"
# save File
adress_file = open(filename_adresslist, "w", encoding="utf-8")
adress_file.write(new_adress_list)
adress_file.close()
def create_Partner_in_dolibarr(street, plz, city, coordinates, sumEnergyRoofs, sumRoofArea, suitability, featureId, url, roofs):
global electricityPrice
global dolibarrStateID
global dolibarrCountryID
global dolibarrCountryCode
global dolibarrURL
global dolibarrApiKey
params = {
"name": "unbekannt",
"address": street,
"zip": plz,
"town": city,
"state_id": dolibarrStateID,
"client": "2",
"code_client": "FID-" + featureId,
"note_public": "",
"status_prospect_label": "Jamais contacté",
"array_options": {
"options_energy": str(sumEnergyRoofs),
"options_energy_value": str(float(sumEnergyRoofs) * electricityPrice),
"options_roof_size": str(sumRoofArea),
"options_avrg_suitability": str(suitability),
"options_geomx": str(coordinates[0]),
"options_geomy": str(coordinates[1]),
"options_roof_info": create_roof_info(roofs),
"options_sonnendach_url": url
},
"country_id": dolibarrCountryID,
"country_code": dolibarrCountryCode
}
headers = {
"Content-Type": "application/json",
"Accept": "application/json",
"DOLAPIKEY": dolibarrApiKey
}
response = json.loads(requests.post(dolibarrURL, json=params, headers=headers).content.decode())
# print(params)
# print(response)
def create_roof_info(roofs):
info_text = ""
for i in range(len(roofs)):
info_text = info_text + str(i + 1) + ". Fläche\n" + "Potenzielle Energie: " + str(roofs[i]['attributes']['gstrahlung']*0.2*0.8) + "\nFläche: " + str(roofs[i]['attributes']['flaeche']) + "\nEignung: " + str(roofs[i]['attributes']['klasse']) + " (" + str(roofs[i]['attributes']['klasse_text']).split("##")[0] + ")\n\n"
return info_text
def create_images(address_string, driver, image_filename, image_folder_map, image_folder_production, image_folder_qrcode, image_folder_barcharts, url, barChartMonth, barChartData, barchart_color):
driver.get(url)
time.sleep(2)
# Create QR-Code
qr = qrcode.QRCode(version=1, box_size=10, border=5)
qr.add_data(url)
qr.make(fit=True)
qr.make_image(fill='black', back_color='white').save(image_folder_qrcode + image_filename)
# Create Screenshot of Map
actions = ActionChains(driver)
try:
driver.execute_script("""var l = document.getElementsByClassName("ol-zoom ol-unselectable ol-control")[0];
l.parentNode.removeChild(l);""")
except:
pass
mapElement = driver.find_element(By.XPATH,
"//div[@id='map']//div[@class='ol-viewport']")
location = mapElement.location
size = mapElement.size
x = location["x"]
y = location["y"]
w = x + size["width"]
h = size["height"]
area = (x, 0, w, h)
driver.execute_script("window.scrollTo(0, " + str(y) + ")")
time.sleep(1)
driver.save_screenshot(image_folder_map + image_filename)
time.sleep(0.2)
Image.open(image_folder_map + image_filename).crop(area).save(
image_folder_map + image_filename)
# Create Screenshot of Chart
# try:
# chartElement = driver.find_elements(By.XPATH, "//div[@id='chart']//*")[0]
# location = chartElement.location
# size = chartElement.size
# x = location["x"]
# y = location["y"]
# w = x + size["width"]
# h = size["height"]
# area = (x, 0, w, h)
# driver.execute_script("window.scrollTo(0, " + str(y) + ")")
#
# time.sleep(1)
# driver.save_screenshot(image_folder_production + image_filename)
# time.sleep(0.2)
# Image.open(image_folder_production + image_filename).crop(area).save(
# image_folder_production + image_filename)
# except:
# pass
# Create barchar with matplotlib.pyplot
try:
barChartMonth.reverse()
barChartMonth = [str(item) for item in barChartMonth]
barChartData.reverse()
max_wert = max(barChartData)
min_wert = min(barChartData)
skala = max_wert + max_wert * 0.1
plt.figure(figsize=(7, 3))
plt.bar(barChartMonth, barChartData, color=barchart_color)
plt.ylabel('Stromproduktion in Franken', fontsize=12)
plt.ylim(0, skala)
# Turn off the frame (border) around the graphic
ax = plt.gca() # Get the current axis
ax.spines['top'].set_visible(False)
ax.spines['right'].set_visible(False)
ax.spines['bottom'].set_visible(False)
plt.xticks(barChartMonth)
# Set the x-axis limits to remove extra space on the right
# plt.xlim(barChartMonth[0], barChartMonth[-1])
plt.savefig(f"{image_folder_barcharts}{image_filename}", bbox_inches='tight')
except:
pass
def read_adresslist(filename_adresslist):
returnvalues = []
try:
print("reading File", filename_adresslist)
adress_file = open(filename_adresslist, "r", encoding="utf-8")
adress_list = adress_file.read().splitlines()
adress_file.close()
print("closed File", filename_adresslist)
returnvalues.append(True)
returnvalues.append(adress_list)
except:
returnvalues.append(False)
return returnvalues
def createFrameFileColums(headers):
global OptionList
global DropDownLabels
for header in headers:
OptionList.append(header)
desctext = tkinter.Label(frameFileColums, text="Please select Headers of Columns")
desctext.grid(row=1, column=1, padx=10, pady=3)
frameFileColums1 = tkinter.Frame(frameFileColums)
frameFileColums1.grid(row=2, column=1, padx=10, pady=3)
for i in range(len(DropDownLabels)):
exec("text" + str(i+1) + " = tkinter.Label(frameFileColums1, text=\"" + DropDownLabels[i] + "\")")
exec("text" + str(i+1) + ".grid(row=" + str(i+1) + ", column=1, padx=10, pady=3)")
exec("dropdown" + str(i+1) + " = tkinter.OptionMenu(frameFileColums1, variablesDropdown[" + str(i) + "], *OptionList)")
exec("dropdown" + str(i+1) + ".grid(row=" + str(i+1) + ", column=2, padx=10, pady=3)")
def getColumnIndex():
global OptionList
global columnIndexes
for j in range(len(DropDownLabels)):
for i in range(len(OptionList)):
if(OptionList[i] == variablesDropdown[j].get()):
columnIndexes.append(i)
def command():
global exit
global stopThread
global adresslist
global step
global driver
global filename_adresslist
global outputtext
global thread_search_adresses
global file_split_char
if(exit):
print("Command Exit received")
stopThread = True
try:
thread_search_adresses.join()
except:
pass
print("Threads are closed")
root.quit()
print("Application is closed")
driver.quit()
print("Webdriver is closed")
elif(step == 0):
#Schritt 1
filename_adresslist = filedialog.askopenfilename()
outputtext = outputtext + "Searching File " + filename_adresslist + "\n"
mainText.config(text=outputtext)
adress_list_result = read_adresslist(filename_adresslist)
if(adress_list_result[0]):
adresslist = adress_list_result[1]
outputtext = outputtext + "File " + filename_adresslist + " found." + "\n"
mainText.config(text=outputtext)
button1.config(text="read file")
createFrameFileColums(adresslist[0].split(file_split_char))
frameFileColums.grid(row=2, column=1)
step += 1
else:
outputtext = outputtext + "File " + filename_adresslist + " not found." + "\n"
mainText.config(text=outputtext)
exit = True
button1.grid_remove()
elif (step == 1):
# TODO: check if dropdowns are selected
getColumnIndex()
outputtext = outputtext + "Reading File " + filename_adresslist + " done." + "\n"
mainText.config(text=outputtext)
frameFileColums.grid_remove()
button1.config(text="open Webbrowser")
step += 1
elif(step == 2):
# Schritt 2
outputtext = outputtext + "Website is opening" + "\n"
mainText.config(text=outputtext)
try:
driver.maximize_window()
driver.get("https://www.uvek-gis.admin.ch/BFE/sonnendach/")
driver.implicitly_wait(20)
outputtext = outputtext + "opening website was done.\nPrepare the browser window to create screenshots." + "\n"
mainText.config(text=outputtext)
button1.config(text="start process")
step += 1
except:
outputtext = outputtext + "opening website failed" + "\n"
mainText.config(text=outputtext)
exit = True
button1.grid_remove()
elif(step == 3):
thread_search_adresses = threading.Thread(target=search_adresses, args=(adresslist, filename_adresslist, driver, mainText))
thread_search_adresses.start()
print("Thread started")
outputtext = outputtext + "process has started." + "\n"
mainText.config(text=outputtext)
button1.grid_remove()
exit = True
def command_exit():
global exit
exit = True
command()
root = tkinter.Tk()
root.protocol("WM_DELETE_WINDOW", command_exit)
root.wm_title("Sonnendach")
mainText = tkinter.Label(root, text=outputtext, width=80)
mainText.grid(row=1, column=1, padx=10, pady=3)
frameFileColums = tkinter.Frame(root)
variablesDropdown = []
for i in range(len(DropDownLabels)):
variablesDropdown.append(tkinter.StringVar(root))
frameButtons = tkinter.Frame(root)
frameButtons.grid(row=3, column=1)
createImages = tkinter.IntVar()
checkBoxCreateImages = tkinter.Checkbutton(frameButtons, text="create images", variable=createImages)
checkBoxCreateImages.grid()
button1 = tkinter.Button(frameButtons, text="Select Adresslist file", command=command, width=20, height=2, bg="#FCCA03")
button1.grid(row=3, column=1, padx=10, pady=3)
button2 = tkinter.Button(frameButtons, text="EXIT", command=command_exit, width=20, height=2, bg="#FCCA03")
button2.grid(row=3, column=2, padx=10, pady=3)
root.mainloop()