入库去重方法

最近更新时间: 2024-10-17 17:10:00

<?php 
error_reporting(E_ALL && ~E_NOTICE); 
ini_set('display_errors', '1'); 
set_time_limit(0);         
$host="172.16.0.29";
$port="15432";
$dbname="postgres";
$user="tbase" ;
$password="";  
/*
CREATE TABLE mydata (
    mpid text NOT NULL,
    datatype text NOT NULL,
    datatime timestamp without time zone NOT NULL,
    datavalue text,
    inputtime timestamp without time zone
)
DISTRIBUTE BY SHARD (mpid);

CREATE unique INDEX mydata_uidx ON mydata USING btree (mpid, datatype, datatime); 
*/
//连接数据库
$conn=@pg_connect("host=$host port=$port dbname=$dbname user=$user password=$password");      
if (!$conn){
    $error_msg=@pg_errormessage($conn); 
    echo "连接数据库出错,详情:".$error_msg."\n"; ;
    exit;
}else{
    echo "连接数据库成功"."\n";      
} 
//建立临时表
$tmp_table_name="mydata_tmp_".strval(rand(100000000, 999999999));
$k=0;
do {
      $sql="
      CREATE TABLE ".$tmp_table_name." (
          mpid text NOT NULL,
          datatype text NOT NULL,
          datatime timestamp without time zone NOT NULL,
          datavalue text,
          inputtime timestamp without time zone
      )
      WITH OIDS DISTRIBUTE BY SHARD (mpid);   
      ";
      $result = @pg_exec($conn,$sql) ;
      if ($result){
           //建议临时数据表成功,退出
           $sql="CREATE INDEX ".$tmp_table_name."_idx ON ".$tmp_table_name." USING btree (mpid, datatype, datatime); ";
           $result = @pg_exec($conn,$sql) ;           
           if (!$result){
                $error_msg=@pg_errormessage($conn);      
                echo "建立临时表索引失败,详情:".$error_msg."\n
                exit;
           }       
           break;
      }else{
           $k++;
           if($k>10){
                $error_msg=@pg_errormessage($conn);   
                echo "多次建立数据表失败,详情:".$error_msg."\n
                exit;
           }
      }
}while (true);       
$sql="";

$mydata_sql="  
INSERT INTO mydata (mpid, datatype,datatime,datavalue,inputtime) VALUES 
";
$mydata_tmp_sql="  
INSERT INTO ".$tmp_table_name." (mpid, datatype,datatime,datavalue,inputtime) VALUES 
"; 

for ($i=0;$i<100;$i++){
    $insertnum = 250;
    for ($j=0;$j<$insertnum;$j++){
         $sql=$sql."
         ('".strval(rand(100000000, 999999999))."','10129f14','2018-03-15 02:00:00','004390.44','2018-03-15 11:05:55')
         ";
         if (($j+1)!=$insertnum){
              $sql=$sql.",";
         }
    }   
    $execsql=$mydata_sql.$sql;
    $result = @pg_exec($conn,$execsql) ;        

    if (!$result){
        ECHO "执行失败\n";  
        //将数据导入到临时表
        $execsql=$mydata_tmp_sql.$sql;  
        $result = @pg_exec($conn,$execsql) ;   
        if (!$result){
             $error_msg=@pg_errormessage($conn);      
             echo "数据插入临时表失败,详情:".$error_msg."\n
             exit;
        }
        //删除临时表中重复的数据,这一步最好在应用程序中去重,减少数据的负担
        $execsql="DELETE FROM ".$tmp_table_name." WHERE oid NOT IN (select min(oid) from ".$tmp_table_name." group by mpid, datatype, datatime)";
        $result = @pg_exec($conn,$execsql) ;           
        if (!$result){
             $error_msg=@pg_errormessage($conn);      
             echo "删除临时表重复数据失败,详情:".$error_msg."\n";
             exit;
        }
        $k=0;  
        do {            
                //删除重复数据     
                $execsql="DELETE FROM ".$tmp_table_name." USING mydata WHERE mydata.mpid=".$tmp_table_name.".mpid AND mydata.datatype=".$tmp_table_name.".datatype AND mydata.datatime=".$tmp_table_name.".datatime" ;
                //将数据导入到正式表
                $execsql=$execsql.";INSERT INTO mydata SELECT * FROM ".$tmp_table_name;   
                $result = @pg_exec($conn,$execsql) ;  
                if ($result){
                     //直到操作成功退出
                     //退出前清理数据
                     $execsql="truncate table ".$tmp_table_name;
                     $result = @pg_exec($conn,$execsql) ;    
                     if (!$result){
                          $error_msg=@pg_errormessage($conn);      
                          echo "truncate 临时表数据出错,详情:".$error_msg."\n";
                          exit;
                     }
                     ECHO "内层去重成功\n";   
                     break;
                }else{
                     $k++;
                     if($k>10){
                          $error_msg=@pg_errormessage($conn);            
                          echo "多次删除重复数据失败,详情:".$error_msg."\n";    
                          exit;
                     }    
                }  
        } while (true); 
    }else{
        ECHO "执行成功\n";     
    }
    $sql="";
}

//退出前删除临时表
$execsql="drop table ".$tmp_table_name;   
$result = @pg_exec($conn,$execsql) ;
if (!$result){
     $error_msg=@pg_errormessage($conn);      
     echo "删除临时表数据出错,详情:".$error_msg."\n";
     exit;
} 
//关闭连接
pg_close($conn); ;                
?>