1 votes

Lors de l'écriture d'un fichier csv de CF vers un seau : 'with open(filepath, "w") as MY_CSV:' conduit à "FileNotFoundError : [Errno 2] No such file or directory :"

J'obtiens cette erreur FileNotFoundError: [Errno 2] No such file or directory lorsque j'essaie d'écrire un fichier csv dans le seau, à l'aide d'un scripteur csv qui passe en boucle sur des lots de données. L'aperçu complet des journaux de Cloud Function autour de cette erreur :

File "/workspace/main.py", line 299, in write_to_csv_file with
open(filepath, "w") as outcsv: FileNotFoundError: [Errno 2] No such
file or directory: 'gs://MY_BUCKET/MY_CSV.csv'

Function execution took 52655 ms, finished with status: 'crash' 

OpenBLAS WARNING - could not determine the L2 cache size on this
system, assuming 256k  ```

Et ce, bien que ce chemin de fichier bucket existe bel et bien : Je peux télécharger un fichier fictif vide et obtenir son "URI gsutils" (clic droit sur les trois points à droite du fichier) et le bucket_filepath sera le même : 'gs://MY_BUCKET/MY_CSV.csv' .

J'ai vérifié l'enregistrement d'un dataframe pandas factice en utilisant pd.to_csv et cela a fonctionné avec le même chemin de fichier ( !).

Par conséquent, il doit y avoir une autre raison, probablement que l'auteur n'est pas accepté, ou que l'auteur n'est pas accepté. with statement qui ouvre le fichier.

Le code qui génère l'erreur est le suivant. Il s'agit du même code fonctionnant en dehors de Google Cloud Function dans une tâche cron normale sur un serveur local. J'ai ajouté deux impressions de débogage autour de la ligne qui génère l'erreur, la ligne print("Right after opening the file ...") ne s'affiche plus. La sous-fonction query_execute_batch() que write_to_csv_file() est appelé pour chaque lot est également indiqué mais ce n'est probablement pas le problème ici puisque l'erreur se produit déjà au tout début lors de l'ouverture en écriture du fichier csv.

requirements.txt (qui sont ensuite importés en tant que modules) :

SQLAlchemy>=1.4.2
google-cloud-storage>=1.16.1
mysqlclient==2.1.0
pandas==1.2.3
fsspec==2021.11.1
gcsfs==2021.11.1
unicodecsv==0.14.1

Et à partir de la main.py :

def query_execute_batch(connection):
    """Function for reading data from the query result into batches
    :yield: each result in a loop is a batch of the query result
    """
    results = execute_select_batch(connection, SQL_QUERY)
    print(f"len(results): {len(results)}")
    for result in results:
        yield result

def write_to_csv_file(connection, filepath):
    """Write the data in a loop over batches into a csv.
    This is done in batches since the query from the database is huge.
    :param connection: mysqldb connection to DB
    :param filepath: path to csv file to write data
    returns: metadata on rows and time
    """
    countrows = 0
    print("Right before opening the file ...")    
    with open(filepath, "w") as outcsv:
        print("Right after opening the file ...")        
        writer = csv.DictWriter(
            outcsv,
            fieldnames=FIELDNAMES,
            extrasaction="ignore",
            delimiter="|",
            lineterminator="\n",
        )
        # write header according to fieldnames
        writer.writeheader()

        for batch in query_execute_batch(connection):
            writer.writerows(batch)
            countrows += len(batch)
        datetime_now_save = datetime.now()
    return countrows, datetime_now_save

Notez que pour que le script ci-dessus fonctionne, j'importe gcsfs ce qui rend le seau disponible en lecture-écriture. Sinon, j'aurais probablement besoin d'un objet de stockage google cloud comme par exemple :

storage_client = storage.Client()
bucket = storage_client.bucket(BUCKET_NAME)

et ensuite créer le fichier dans ce seau avec d'autres fonctions, mais ce n'est pas l'objectif ici.

Dans ce qui suit, le pd.to_csv qui fonctionne, il utilise le résultat d'une requête SQL fictive. SELECT 1 comme entrée d'un cadre de données. Cette peut être sauvegardés dans le même chemin d'accès au fichier bucket, bien sûr la raison pourrait ne pas être seulement la suivante pd.to_csv() en tant que tel, mais aussi que l'ensemble de données est un leurre au lieu de chaînes unicode complexes provenant d'une énorme base de données de SELECT query . Ou il y a une autre raison, je ne fais que supposer.

if records is not None:
    df = pd.DataFrame(records.fetchall())
    df.columns = records.keys()
    df.to_csv(filepath,
        index=False,
    )
    datetime_now_save = datetime.now()
    countrows = df.shape[0]

J'aimerais utiliser le rédacteur csv pour avoir la possibilité d'écrire en unicode avec le module unicodecsv et la possibilité d'utiliser les lots.

Je serais peut-être prêt à passer à des lots ( loop + append ou chunksize ) dans pandas comme dans Écriture de grandes images de données Pandas dans un fichier CSV par morceaux pour se débarrasser de ce problème de chemin de fichier du seau, mais j'aimerais plutôt utiliser le code prêt à l'emploi (ne jamais toucher à un système en cours d'exécution).

Comment puis-je faire en sorte que l'enregistrement de ce fichier csv soit effectué par le rédacteur csv de manière à ce qu'il puisse ouvrir un nouveau fichier dans la corbeille en write mode = with open(filepath, "w") as outcsv: ?

La fonction donnée write_to_csv_file() n'est qu'une infime partie de la fonction "nuage", qui utilise un large éventail de fonctions et de fonctions en cascade. Je ne peux pas présenter ici l'ensemble du cas reproductible et j'espère que l'expérience ou des exemples plus simples permettront de répondre à cette question.

1voto

La solution est surprenante. La solution est surprenante. debe importer et utiliser le gcsfs si vous voulez écrire dans un fichier avec le module open() .

_Si vous utilisez pd.to_csv() , import gcsfs n'est pas nécessaire, mais gcsfs est encore nécessaire dans la requirements.txt pour faire pd.to_csv() travail Ainsi, les pandas to_csv() semble l'utiliser automatiquement._

En pd.to_csv() surprise mise à part, voici le code qui répond à la question (testé) :

def write_to_csv_file(connection, filepath):
    """Write the QUERY result in a loop over batches into a csv.
    This is done in batches since the query from the database is huge.
    :param connection: mysqldb connection to DB
    :param filepath: path to csv file to write data
    return: metadata on rows and time
    """
    countrows = 0
    print("Right before opening the file ...")

    # A gcsfs object is needed to open a file.
    # https://stackoverflow.com/questions/52805016/how-to-open-a-file-from-google-cloud-storage-into-a-cloud-function
    # https://gcsfs.readthedocs.io/en/latest/index.html#examples
    # Side-note (Exception):
    # pd.to_csv() needs neither the gcsfs object, nor its import.
    # It is not used here, but it has been tested with examples.
    fs = gcsfs.GCSFileSystem(project=MY_PROJECT)
    fs.ls(BUCKET_NAME)

    # wb needed, else "builtins.TypeError: must be str, not bytes"
    # https://stackoverflow.com/questions/5512811/builtins-typeerror-must-be-str-not-bytes
    with fs.open(filepath, 'wb') as outcsv:
        print("Right after opening the file ...")

        writer = csv.DictWriter(
            outcsv,
            fieldnames=FIELDNAMES,
            extrasaction="ignore",
            delimiter="|",
            lineterminator="\n",
        )
        # write header according to fieldnames
        print("before writer.writeheader()")
        writer.writeheader()
        print("after writer.writeheader()")

        for batch in query_execute_batch(connection):
            writer.writerows(batch)
            countrows += len(batch)
        datetime_now_save = datetime.now()
    return countrows, datetime_now_save

Note complémentaire

N'utilisez pas le scripteur csv de cette manière.

Il prend trop de temps, au lieu de la pd.to_csv() avec un chunksize de 5000 qui ne nécessite que 62 secondes pour que les 700k lignes soient chargées et stockées sous forme de csv dans le bucket, le CF avec l'auteur des lots prend plus de 9 minutes, ce qui dépasse la limite du timeout. Je suis donc donc obligé d'utiliser pd.to_csv() et convertir mes données en dataframe pour cela.

SistemesEz.com

SystemesEZ est une communauté de sysadmins où vous pouvez résoudre vos problèmes et vos doutes. Vous pouvez consulter les questions des autres sysadmins, poser vos propres questions ou résoudre celles des autres.

Powered by:

X