エンジニア4年目(python未経験)なのですが
業務でpythonを用いてスクリプトを開発しなければならず,自分なりに調べて下記の様なスクリプトを組んでみたのですがパフォーマンスが著しく悪く,そのためチューニングをしたいのですが,まだまだ勉強し始めたばかりでチューニング方針がなかなか決めれないでいます。。。
納期も近いためこちらにてご相談させていただきました。
どなたかどんな些細なことでもよいのでチューニングに関してご教示いただけますと幸いです。。
特にスクリプトのの2重for文の部分をチューニングしたいです。

import pandas as pd
import numpy as np

from pyspark import SparkConf,SparkContext

from pyspark.sql import SparkSession
from pyspark.shell import SQLContext
import time

spark = SparkSession.builder.master("yarn").config(conf=SparkConf()).getOrCreate()
sc = SparkContext.getOrCreate()
sqlContext = SQLContext(sc)

spark.conf.set("spark.sql.execution.arrow.enabled", "true")


spark = SparkSession.builder \
        .appName('Spark SQL and DataFrame') \
        .getOrCreate()

#S3のファイル格納パス
filepass_aaa = 's3://*******/******/test/aaa.csv'
filepass_bbb = 's3://*******/******/test/bbb.csv'

#CSVファイルをS3から読み込みSparkのDataFreameを生成
sdf_aaa = sqlContext.read.format("com.databricks.spark.csv").option("header","true").load(filepass_aaa)
sdf_bbb = sqlContext.read.format("com.databricks.spark.csv").option("header","true").load(filepass_bbb)

#pandasのDataFreameの型に変更
pdf_aaa = sdf_aaa.toPandas()
pdf_bbb = sdf_bbb.toPandas()

条件に合致した場合にaaaの該当の値をbbbの値で置換
def mapping(row,row2):
    pdf_aaa.at[(pdf_aaa['ID'] == row[0]),'○○ID'] = row2[2]
    pdf_aaa.at[(pdf_aaa['ID'] == row[0]),'×××'] = row2[1]
    pdf_aaa.at[(pdf_aaa['ID'] == row[0]),'△△△'] = row2[3]
    pdf_aaa.at[(pdf_aaa['ID'] == row[0]),'□□□'] = row2[0]
    pdf_aaa.at[(pdf_aaa['ID'] == row[0]),'■■■'] = row2[4]
    pdf_aaa.at[(pdf_aaa['ID'] == row[0]),'◇◇◇'] = row2[5]


for index, row in pdf_aaa.iterrows():

    for index2,row2 in pdf_bbb.iterrows():

        #条件:名称+住所+TEL
        if(row[7] == row2[6]) & (row[9] == row2[8]) & (row[8] == row2[7]):
            mapping(row,row2)
            break
        #条件:名称+TEL
        elif(row[7] == row2[6]) & (row[8] == row2[7]):
            mapping(row,row2)
            break
        #条件:名称+住所
        elif(row[7] == row2[6]) & (row[9] == row2[8]):
            mapping(row,row2)
            break
        #条件:TEL+住所
        elif(row[8] == row2[7]) & (row[9] == row2[8]):
            mapping(row,row2)
            break
        #条件:住所+URL
        elif(row[9] == row2[8]) & (row[10] == row2[9]):
            mapping(row,row2)
            break
        #条件:TEL+URL
        elif(row[8] == row2[7]) & (row[10] == row2[9]):
            mapping(row,row2)
            break
        #条件:TEL
        elif(row[8] == row2[7]):
            mapping(row,row2)
            break
        else:
            continue
            break

pdf_out = pdf_aaa[~pdf_aaa['○○ID'].isnull()]
pdf_out_null = pdf_aaa[pdf_aaa['○○ID'].isnull()]

pdf_out.to_csv('result.csv',header=True, index=False)
pdf_out_null.to_csv('result_null.csv',header=True, index=False)

【実現したいこと】
DataFrame aaaのデータをDataFrame bbbのデータと比較し特定の条件に合致した場合のみ特定のaaaのデータをbbbのデータで置換する
aaa,bbbともにデータは約100万件ほど