import tkinter import threading import selenium.webdriver.firefox.service import webdriver_manager.chrome from selenium import webdriver from selenium.webdriver.chrome.service import Service from selenium.webdriver.firefox.service import Service from webdriver_manager.chrome import ChromeDriverManager from webdriver_manager.firefox import DriverManager from selenium.webdriver.common.by import By from selenium.webdriver.common.keys import Keys from PIL import Image import os import qrcode from tkinter import filedialog import time import datetime from dateutil.relativedelta import relativedelta from selenium.webdriver.common.action_chains import ActionChains import json import requests from webdriver_manager.firefox import GeckoDriverManager DropDownLabels = ["Street: ", "Number: ", "Postal code: ", "City: ", "coordinate X", "coordinate Y", "Sonnendach URL: ", "Suitability:", "Image Filename: ", "PV Production 100", "Value Electricity production", "Roof area", "existing PV Production"] file_split_char = "," minRoofSize = 2 minSuitability = 3 electricityPrice = 0.1 OptionList = [] exit = False stopThread = False filename_adresslist = "" adresslist = "" step = 0 # service = Service(webdriver_manager.chrome.ChromeDriverManager().install()) service = selenium.webdriver.firefox.service.Service(GeckoDriverManager().install()) # driver = webdriver.Chrome(service=service) driver = webdriver.Firefox(service=service) driver.minimize_window() outputtext = "Welcome to application Sonnendach\n" columnIndexes = [] storeToDolibarr = False dolibarrURL = "https://doli.solar-evolution.ch/api/index.php/thirdparties" dolibarrApiKey = "riE0jT3yvpIT" dolibarrCountryID = "6" # 6 = Schweiz dolibarrCountryCode = "CH" dolibarrStateID = "403" # 403 = Kanton Zürich def search_adresses(adress_list, filename_adresslist, driver, mainText): print("function to search Addresses called") global file_split_char global stopThread global columnIndexes global outputtext global checkBoxCreateImages global minRoofSize global minSuitability global electricityPrice global storeToDolibarr global dolibarrURL global dolibarrApiKey #Create Direcotrys to save screenshots and qrcodes if they don't exist yet. image_folder_map = "screenshots_map/" image_folder_production = "screenshots_production/" image_folder_qrcode = "qrcodes/" if(not os.path.exists(image_folder_map[:-1])): os.makedirs(image_folder_map[:-1], exist_ok=False) if(not os.path.exists(image_folder_production[:-1])): os.makedirs(image_folder_production[:-1], exist_ok=False) if (not os.path.exists(image_folder_qrcode[:-1])): os.makedirs(image_folder_qrcode[:-1], exist_ok=False) startTime = datetime.datetime.now() address_count = len(adress_list) #Search for each adress in the list and create screenshots and qrcodes print("Starting Address Loop") for i in range(address_count): url = "not-found" suitability = "not-found" sumEnergyRoofs = "not-found" value_electricity_production = "not-found" sumRoofArea = "not-found" featureId = "not-found" response_all_roofs_decoded = "not-found" found = True line = adress_list[i] adress = line.split(file_split_char) if((line != adress_list[0]) and (adress[columnIndexes[6]] == "") and (((len(adress[columnIndexes[0]]) > 0) and (len(adress[columnIndexes[1]]) > 0) and (len(adress[columnIndexes[2]]) > 0) and (len(adress[columnIndexes[3]]) > 0)) or ((len(adress[columnIndexes[4]]) > 0) and (len(adress[columnIndexes[5]]) > 0)))): address_string = adress[columnIndexes[0]] + " " + adress[columnIndexes[1]] + " " + adress[columnIndexes[2]] + " " + adress[columnIndexes[3]] print("searching for Address: " + address_string) if(not ((len(adress[columnIndexes[4]]) > 0) and (len(adress[columnIndexes[5]]) > 0))): # Get Coordinates for Address try: # Address to coordinates params = { "type": "locations", "searchText": address_string, "lang": "de"} locationsResponse = requests.get('https://api3.geo.admin.ch/rest/services/api/SearchServer', params=params) locations = json.loads(locationsResponse.content.decode()) firstLocation = locations['results'][0]['attrs'] # you need the coordinates from other services, once you have them coordinates = (float(firstLocation['y']), float(firstLocation['x'])) # example coordinates except: found = False else: # Get Coordinates from File coordinate_x = float(adress[columnIndexes[4]]) % 1000000 coordinate_y = float(adress[columnIndexes[5]]) % 1000000 coordinates = (coordinate_x, coordinate_y) if(found): # Get Building ID from coordinates try: coordinates_str = ','.join(map(str, coordinates)) tolerance = 1 # this is wrong, bbox should be computed using tolerance and pixel size, but there are no problems for the # purpose of this exercise bbox = (coordinates[0] - tolerance, coordinates[1] - tolerance, coordinates[0] + tolerance, coordinates[1] + tolerance) bbox_str = ','.join(map(str, bbox)) params = { "geometryType": "esriGeometryPoint", "returnGeometry": "true", "layers": "all:ch.bfe.solarenergie-eignung-daecher", "geometry": coordinates_str, "mapExtent": bbox_str, "imageDisplay": "1391,1070,96", "tolerance": tolerance, "order": "distance", "lang": "de", } # retrieve one roof and the building_id associated response_one_roof = requests.get('https://api3.geo.admin.ch/rest/services/api/MapServer/identify', params=params) response_one_roof_decoded = json.loads(response_one_roof.content.decode()) results = (response_one_roof_decoded['results']) # for result in results: # print(result) building_id = (response_one_roof_decoded)['results'][0]['attributes']['building_id'] except: found = False if(found): try: # use the building_id of previous request to get all roofs associated with this building_id params = { 'layer': 'ch.bfe.solarenergie-eignung-daecher', 'searchField': 'building_id', 'contains': 'false', 'searchText': str(building_id), 'returnGeometry': 'false' } response_all_roofs = requests.get('https://api3.geo.admin.ch/rest/services/api/MapServer/find', params=params) response_all_roofs_decoded = json.loads(response_all_roofs.content.decode())['results'] if(len(response_all_roofs_decoded) > 0): sumEnergyRoofs = 0 sumRoofArea = 0 suitabilities = [] bestRoofEnergy = 0 bestRoof = response_all_roofs_decoded[0] for roof in response_all_roofs_decoded: energy = float(roof['attributes']['gstrahlung'])*0.2*0.8 if(energy > bestRoofEnergy): bestRoofEnergy = energy bestRoof = roof if(float(roof['attributes']['flaeche']) > minRoofSize and int(roof['attributes']['klasse']) >= minSuitability): sumEnergyRoofs = sumEnergyRoofs + energy sumRoofArea = sumRoofArea + float(roof['attributes']['flaeche']) suitabilities.append(int(roof['attributes']['klasse'])) featureId = str(bestRoof['featureId']) url = "https://www.uvek-gis.admin.ch/BFE/sonnendach/index.html?featureId=" + featureId + "&lang=de" suitability = max(suitabilities) value_electricity_production = sumEnergyRoofs * electricityPrice else: found = False except: found = False roof_is_suitable = True # TODO decide if roof is suitable existing_solar_panel_power = "none" roof_has_solarpanel = False # TODO check if roof has solar Panel #Check if there is solar panel existing address_string_for_existing_solarpanel = adress[columnIndexes[0]] + " " + adress[columnIndexes[1]] + ", " + adress[columnIndexes[2]] + " " + adress[columnIndexes[3]] try: params = { 'searchText': str(address_string_for_existing_solarpanel), 'searchField': 'address', 'layer': "ch.bfe.elektrizitaetsproduktionsanlagen", 'returnGeometry': 'false', 'contains': 'false' } repsonse_solar_panel = requests.get('https://api3.geo.admin.ch/rest/services/api/MapServer/find', params=params) repsonse_solar_panel_decoded = json.loads(repsonse_solar_panel.content.decode()) print(repsonse_solar_panel_decoded) results = (repsonse_solar_panel_decoded['results']) if(len(results) > 0): existing_solar_panel_power = results[0]['attributes']['total_power'] except: existing_solar_panel_power = "not-found" roof_has_solarpanel = False if(found and createImages.get() > 0 and roof_is_suitable and (not roof_has_solarpanel)): image_filename = address_string + ".png" create_images(address_string, driver, image_filename, image_folder_map, image_folder_production, image_folder_qrcode, url) else: image_filename = "not-created" print("saving Address: " + address_string) save_information_to_addresslist(adress, adress_list, columnIndexes, coordinates, file_split_char, i, image_filename, sumEnergyRoofs, sumRoofArea, suitability, url, value_electricity_production, existing_solar_panel_power) save_addresslist_in_file(adress_list, filename_adresslist) if(found): street = adress[columnIndexes[0]] + " " + adress[columnIndexes[1]] plz = adress[columnIndexes[2]] city = adress[columnIndexes[3]] create_Partner_in_dolibarr(street, plz, city, coordinates, sumEnergyRoofs, sumRoofArea, suitability, featureId, url, response_all_roofs_decoded) print("saved Address: " + address_string) print("Address " + str(i) + " of " + str(address_count)) timediff = relativedelta(datetime.datetime.now(), startTime) print("Process is running since %d years %d months %d days %d hours %d minutes %d seconds" % ( timediff.years, timediff.months, timediff.days, timediff.hours, timediff.minutes, timediff.seconds)) if(stopThread == True): print("closing Thread") return outputtext = outputtext + "Reached end of list.\nPress Exit to close the application." + "\n" mainText.config(text=outputtext) print("closing Thread") return def save_information_to_addresslist(adress, adress_list, columnIndexes, coordinates, file_split_char, list_index, image_filename, pv_Production100, roof_area, suitability, url, value_electricity_production, existing_solar_panel_power): # Save Information in Addresslist adress[columnIndexes[4]] = str(coordinates[0]) adress[columnIndexes[5]] = str(coordinates[1]) adress[columnIndexes[6]] = url adress[columnIndexes[7]] = suitability adress[columnIndexes[8]] = image_filename adress[columnIndexes[9]] = pv_Production100 adress[columnIndexes[10]] = value_electricity_production adress[columnIndexes[11]] = roof_area adress[columnIndexes[12]] = existing_solar_panel_power new_line_string = "" for j in adress: new_line_string = new_line_string + str(j) + file_split_char adress_list[list_index] = new_line_string def save_addresslist_in_file(adress_list, filename_adresslist): new_adress_list = "" for j in adress_list: new_adress_list = new_adress_list + (j) + "\n" # save File adress_file = open(filename_adresslist, "w", encoding="utf-8") adress_file.write(new_adress_list) adress_file.close() def create_Partner_in_dolibarr(street, plz, city, coordinates, sumEnergyRoofs, sumRoofArea, suitability, featureId, url, roofs): global electricityPrice global dolibarrStateID global dolibarrCountryID global dolibarrCountryCode global dolibarrURL global dolibarrApiKey params = { "name": "unbekannt", "address": street, "zip": plz, "town": city, "state_id": dolibarrStateID, "client": "2", "code_client": "FID-" + featureId, "note_public": "", "status_prospect_label": "Jamais contacté", "array_options": { "options_energy": str(sumEnergyRoofs), "options_energy_value": str(float(sumEnergyRoofs) * electricityPrice), "options_roof_size": str(sumRoofArea), "options_avrg_suitability": str(suitability), "options_geomx": str(coordinates[0]), "options_geomy": str(coordinates[1]), "options_roof_info": create_roof_info(roofs), "options_sonnendach_url": url }, "country_id": dolibarrCountryID, "country_code": dolibarrCountryCode } headers = { "Content-Type": "application/json", "Accept": "application/json", "DOLAPIKEY": dolibarrApiKey } response = json.loads(requests.post(dolibarrURL, json=params, headers=headers).content.decode()) # print(params) # print(response) def create_roof_info(roofs): info_text = "" for i in range(len(roofs)): info_text = info_text + str(i + 1) + ". Fläche\n" + "Potenzielle Energie: " + str(roofs[i]['attributes']['gstrahlung']*0.2*0.8) + "\nFläche: " + str(roofs[i]['attributes']['flaeche']) + "\nEignung: " + str(roofs[i]['attributes']['klasse']) + " (" + str(roofs[i]['attributes']['klasse_text']).split("##")[0] + ")\n\n" return info_text def create_images(address_string, driver, image_filename, image_folder_map, image_folder_production, image_folder_qrcode, url): driver.get(url) time.sleep(2) # Create QR-Code qr = qrcode.QRCode(version=1, box_size=10, border=5) qr.add_data(url) qr.make(fit=True) qr.make_image(fill='black', back_color='white').save(image_folder_qrcode + image_filename) # Create Screenshot of Map actions = ActionChains(driver) try: driver.execute_script("""var l = document.getElementsByClassName("ol-zoom ol-unselectable ol-control")[0]; l.parentNode.removeChild(l);""") except: pass mapElement = driver.find_element(By.XPATH, "//div[@id='map']//div[@class='ol-viewport']") location = mapElement.location size = mapElement.size x = location["x"] y = location["y"] w = x + size["width"] h = size["height"] area = (x, 0, w, h) driver.execute_script("window.scrollTo(0, " + str(y) + ")") time.sleep(1) driver.save_screenshot(image_folder_map + image_filename) time.sleep(0.2) Image.open(image_folder_map + image_filename).crop(area).save( image_folder_map + image_filename) # Create Screenshot of Chart try: chartElement = driver.find_elements(By.XPATH, "//div[@id='chart']//*")[0] location = chartElement.location size = chartElement.size x = location["x"] y = location["y"] w = x + size["width"] h = size["height"] area = (x, 0, w, h) driver.execute_script("window.scrollTo(0, " + str(y) + ")") time.sleep(1) driver.save_screenshot(image_folder_production + image_filename) time.sleep(0.2) Image.open(image_folder_production + image_filename).crop(area).save( image_folder_production + image_filename) except: pass def read_adresslist(filename_adresslist): returnvalues = [] try: print("reading File", filename_adresslist) adress_file = open(filename_adresslist, "r", encoding="utf-8") adress_list = adress_file.read().splitlines() adress_file.close() print("closed File", filename_adresslist) returnvalues.append(True) returnvalues.append(adress_list) except: returnvalues.append(False) return returnvalues def createFrameFileColums(headers): global OptionList global DropDownLabels for header in headers: OptionList.append(header) desctext = tkinter.Label(frameFileColums, text="Please select Headers of Columns") desctext.grid(row=1, column=1, padx=10, pady=3) frameFileColums1 = tkinter.Frame(frameFileColums) frameFileColums1.grid(row=2, column=1, padx=10, pady=3) for i in range(len(DropDownLabels)): exec("text" + str(i+1) + " = tkinter.Label(frameFileColums1, text=\"" + DropDownLabels[i] + "\")") exec("text" + str(i+1) + ".grid(row=" + str(i+1) + ", column=1, padx=10, pady=3)") exec("dropdown" + str(i+1) + " = tkinter.OptionMenu(frameFileColums1, variablesDropdown[" + str(i) + "], *OptionList)") exec("dropdown" + str(i+1) + ".grid(row=" + str(i+1) + ", column=2, padx=10, pady=3)") def getColumnIndex(): global OptionList global columnIndexes for j in range(len(DropDownLabels)): for i in range(len(OptionList)): if(OptionList[i] == variablesDropdown[j].get()): columnIndexes.append(i) def command(): global exit global stopThread global adresslist global step global driver global filename_adresslist global outputtext global thread_search_adresses global file_split_char if(exit): print("Command Exit received") stopThread = True try: thread_search_adresses.join() except: pass print("Threads are closed") root.quit() print("Application is closed") driver.quit() print("Webdriver is closed") elif(step == 0): #Schritt 1 filename_adresslist = filedialog.askopenfilename() outputtext = outputtext + "Searching File " + filename_adresslist + "\n" mainText.config(text=outputtext) adress_list_result = read_adresslist(filename_adresslist) if(adress_list_result[0]): adresslist = adress_list_result[1] outputtext = outputtext + "File " + filename_adresslist + " found." + "\n" mainText.config(text=outputtext) button1.config(text="read file") createFrameFileColums(adresslist[0].split(file_split_char)) frameFileColums.grid(row=2, column=1) step += 1 else: outputtext = outputtext + "File " + filename_adresslist + " not found." + "\n" mainText.config(text=outputtext) exit = True button1.grid_remove() elif (step == 1): # TODO: check if dropdowns are selected getColumnIndex() outputtext = outputtext + "Reading File " + filename_adresslist + " done." + "\n" mainText.config(text=outputtext) frameFileColums.grid_remove() button1.config(text="open Webbrowser") step += 1 elif(step == 2): # Schritt 2 outputtext = outputtext + "Website is opening" + "\n" mainText.config(text=outputtext) try: driver.maximize_window() driver.get("https://www.uvek-gis.admin.ch/BFE/sonnendach/") driver.implicitly_wait(20) outputtext = outputtext + "opening website was done.\nPrepare the browser window to create screenshots." + "\n" mainText.config(text=outputtext) button1.config(text="start process") step += 1 except: outputtext = outputtext + "opening website failed" + "\n" mainText.config(text=outputtext) exit = True button1.grid_remove() elif(step == 3): thread_search_adresses = threading.Thread(target=search_adresses, args=(adresslist, filename_adresslist, driver, mainText)) thread_search_adresses.start() print("Thread started") outputtext = outputtext + "process has started." + "\n" mainText.config(text=outputtext) button1.grid_remove() exit = True def command_exit(): global exit exit = True command() root = tkinter.Tk() root.protocol("WM_DELETE_WINDOW", command_exit) root.wm_title("Sonnendach") mainText = tkinter.Label(root, text=outputtext, width=80) mainText.grid(row=1, column=1, padx=10, pady=3) frameFileColums = tkinter.Frame(root) variablesDropdown = [] for i in range(len(DropDownLabels)): variablesDropdown.append(tkinter.StringVar(root)) frameButtons = tkinter.Frame(root) frameButtons.grid(row=3, column=1) createImages = tkinter.IntVar() checkBoxCreateImages = tkinter.Checkbutton(frameButtons, text="create images", variable=createImages) checkBoxCreateImages.grid() button1 = tkinter.Button(frameButtons, text="Select Adresslist file", command=command, width=20, height=2, bg="#FCCA03") button1.grid(row=3, column=1, padx=10, pady=3) button2 = tkinter.Button(frameButtons, text="EXIT", command=command_exit, width=20, height=2, bg="#FCCA03") button2.grid(row=3, column=2, padx=10, pady=3) root.mainloop()