#!/usr/bin/python3 # -*- coding: utf-8 -*- """ Perceptrón para estimar precio de viviendas. Una capa oculta con 5 procesadores con activación tangente hiperbólica y error cuadrático. """ import math import torch import torch.nn as nn import random import scipy import scipy.cluster from scipy import stats from sklearn import preprocessing, metrics, decomposition import numpy as np from numpy import array import matplotlib.pyplot as plot def septorch(datos,tipo,donde): entradas=datos[:,:-1] salidas=datos[:,-1:] redent=torch.tensor(entradas,dtype=tipo,device=donde) redsal=torch.tensor(salidas,dtype=tipo,device=donde) return redent,redsal dtype = torch.float device = torch.device("cpu") #device = torch.device("cuda:0") #Cargamos los datos, todos fichdatos=open('casas.trn','r') datos0= [[float(cada) for cada in linea.strip().split()] for linea in fichdatos.readlines()] numentradas=len(datos0[0])-1 #Separamos ajuste y prueba porazar=0.4 numuestras=len(datos0) muesajuste=0.7 muesval=0.15 #Llevarlos a escala unitaria #preproc=preprocessing.MinMaxScaler().fit(datos0) #Llevarlos a media 0 varianza 1 preproc=preprocessing.StandardScaler().fit(datos0) #Llevarlos a mediana 0 intercuartil 1 Parecido pero robusto frente a datos extremos #preproc=preprocessing.RobustScaler().fit(datos0) #Llevarlos a distribución normal #preproc=preprocessing.PowerTransformer().fit(datos0) #preproc=preprocessing.QuantileTransformer(output_distribution='normal').fit(datos0) Similar pero más robusto con valore extremos #Llevarlos a distribución uniforme ~ ecualización #preproc=preprocessing.QuantileTransformer().fit(datos0) #Decorrela usando PCA #preproc=decomposition.PCA(n_components='mle',whiten=True).fit(datos0) datos=preproc.transform(datos0) #Desordena conjunto random.shuffle(datos) #Separa una parte para escoger por azar limazar=int(porazar*numuestras) datazar=datos[:limazar] datgrup=datos[limazar:] #Separa un primer lote de ajuste y prueba por azar limajaz=int(limazar*muesajuste) limvalaz=int(limazar*(muesajuste+muesval)) datajaz=datazar[:limajaz] datvalaz=datazar[limajaz:limvalaz] datpruaz=datazar[limvalaz:] #Separa un segundo lote de ajuste y prueba por agrupamiento datkm=array(datgrup,dtype=np.float32) limgrupaj=len(datgrup) numajgrup=int(limgrupaj*muesajuste) centros,grupos=scipy.cluster.vq.kmeans2(datkm,numajgrup,minit='points') orgpuntos=scipy.spatial.KDTree(datkm) dist,ind=orgpuntos.query(centros) datajgrup=datkm[ind] indvalpru=np.setdiff1d(range(limgrupaj),ind) datvalprugrup=datkm[indvalpru] numprugrup=int(limgrupaj*(1-muesval-muesajuste)) centros,grupos=scipy.cluster.vq.kmeans2(datvalprugrup,numprugrup,minit='points') orgpuntos=scipy.spatial.KDTree(datvalprugrup) dist,ind=orgpuntos.query(centros) datprugrup=datvalprugrup[ind] indpru=np.setdiff1d(range(len(datvalprugrup)),ind) datvalgrup=datvalprugrup[indpru] dataj=np.vstack((array(datajaz,dtype=np.float32),datajgrup)) datval=np.vstack((array(datvalaz,dtype=np.float32),datvalgrup)) datpru=np.vstack((array(datpruaz,dtype=np.float32),datprugrup)) #Pasarlo a tensores torch tea,tsa=septorch(dataj,dtype,device) tev,tsv=septorch(datval,dtype,device) tep,tsp=septorch(datpru,dtype,device) minsal=torch.min(torch.cat((tsa,tsv,tsp))) #Si interesa la referencia de regresión lineal, Torch no la tiene directamente, pero puedes hacer una red enteramente lineal y ajustarla #definir red ocultos=10 red=nn.Sequential( nn.Linear(numentradas, ocultos), nn.Tanh(),#Para un modelo lineal, suprimirías esto nn.Linear(ocultos, 1), ) #).cuda(device) #definir error a optimizar def satura(red,reales): """ No sube tanto como el cuadrático si los errores son extremos :param red: :param reales: :return: """ error2 = (red - reales) ** 2 satura = torch.log(1 + error2) sumcuad = torch.mean(satura) return sumcuad def maximo(red,reales): """ Le obliga a enfocar los errores extremos :param red: :param reales: :return: """ return torch.max(torch.abs(reales-red)) def medabs(red,reales): """ Tipo L1 pero más robusto frente a casos extremos :param red: :param reales: :return: """ return torch.median(torch.abs(reales-red)) def d2(red,reales): """ Si pensamos en una distribución gamma en vez de normal :param red: :param reales: :return: """ global minsal ypred=minsal+red yreal=minsal+reales if red.shape[0]<2: return torch.mean((reales-red)**2) else: return 2*torch.mean(torch.log(ypred/yreal)+yreal/ypred-1) class ErrorPropio(nn.Module): def __init__(self): super().__init__() def forward(self, red, reales): return d2(red,reales) def presenpropio(error): return error #math.sqrt(math.exp(error)-1) error = nn.MSELoss() #error=ErrorPropio() #error = nn.L1Loss() #error=nn.BCELoss() #error=nn.CrossEntropyLoss() presenerror=math.sqrt #presenerror=presenpropio #definir algoritmo de ajuste Intenta aumentar lr #ajuste=torch.optim.LBFGS(red.parameters(),lr=0.01,max_iter=50,history_size=10) ajuste=torch.optim.Rprop(red.parameters(),lr=0.01) #ajuste=torch.optim.SGD(red.parameters(),lr=0.001,momentum=0.5,dampening=0.1) #ajuste=torch.optim.LBFGS(red.parameters(),lr=0.003,tolerance_grad=1e-03, tolerance_change=1e-03, max_iter=50,history_size=10) nvalfal=0 def evalua(): ajuste.zero_grad() s = red(tea) e = error(s, tsa) e.backward() return e for it in range(200): # Calcula salidas ea=evalua() salval = red(tev) ev=error(salval,tsv) if 'evprevio' in dir(): if evprevio5: break evprevio=ev.item() print(it, presenerror(ea.item()),presenerror(evprevio)) #Si no usas el cuadrático quita la raíz ajuste.step(evalua) #Prueba salred=red(tep) ep=error(salred,tsp) print(presenerror(ep.item())) #Si no usas el cuadrático quita la raíz tsp=preproc.inverse_transform(torch.hstack((tep,tsp)))[:,-1] salpru=preproc.inverse_transform(torch.hstack((tep,salred.detach())))[:,-1] plot.scatter(tsp,salpru,color='red') mi=min(min(tsp),min(salpru)) ma=max(max(tsp),max(salpru)) plot.plot((mi,ma),(mi,ma),color='blue') plot.xlabel('Real') plot.ylabel('Red') plot.legend(('Resultado','Perfección')) plot.show() plot.scatter(tsp,(salpru-tsp),color='red') plot.plot((mi,ma),(0,0),color='blue') plot.xlabel('Real') plot.ylabel('Red-Real') plot.legend(('Resultado','Perfección')) plot.show() matriz=array([[a,b-a] for a,b in filter(lambda realred: abs(realred[0])>0.2, zip(tsp,salpru))]) plot.scatter(matriz[:,0],matriz[:,1]/(abs(matriz[:,0])+0.1),color='red') plot.plot((mi,ma),(0,0),color='blue') plot.xlabel('Real') plot.ylabel('Red-Real/Real') plot.legend(('Resultado','Perfección')) plot.show() matriz=array([(a,b) for a,b in zip(tsp,salpru)],dtype=[('real',float),('red',float)]) ordenado=np.sort(matriz,order='red') plot.plot(list(range(len(tsp))),ordenado['real'],color='red') plot.plot(list(range(len(tsp))),ordenado['red'],color='blue') plot.xlabel('Índice de orden de red') plot.ylabel('Real') plot.legend(('Resultado','Perfección')) plot.show() matriz=array([(a,b) for a,b in zip(tsp,(salpru-tsp))],dtype=[('real',float),('error',float)]) ordenado=np.sort(matriz,order='real') plot.plot(list(range(len(tsp))),ordenado['error'],color='red') plot.plot((0,len(tsp)),(0,0),color='blue') plot.xlabel('Índice de orden de real') plot.ylabel('Red-Real') plot.legend(('Resultado','Perfección')) plot.show()