Home > front end >  Azure python sdk loop over blobs
Azure python sdk loop over blobs

Time:09-23

I am trying to build a backup system for storage accounts. Just to make the example as clear as possible, this is my storage account structure

storageA:
 |---container1
 |  |---Blob1.txt
 |---container2
 |  |---Blob2.txt
 |  |---Blob3.txt
 |  |---Blob4.txt
 |---container3
   |---Blob5.txt

I have a script to loop over the containers and blob and copy the same structure to another storage account. And the script is as follow.

from typing import Container
from azure.storage.blob import BlobClient, BlobServiceClient, ContainerClient
from azure.storage.blob import ResourceTypes, AccountSasPermissions
from azure.storage.blob import generate_account_sas    
from datetime import *

today = str(datetime.now().date())
print(today)




#================================ SOURCE ===============================
# Source Client
connection_string = '' # The connection string for the source container
account_key = '' # The account key for the source container
source_container_name = 'newblob' # Name of container which has blob to be copied





# Create client
client = BlobServiceClient.from_connection_string(connection_string) 


client = BlobServiceClient.from_connection_string(connection_string)
all_containers = client.list_containers(include_metadata=True)
for container in all_containers:
    # Create sas token for blob
    sas_token = generate_account_sas(
        account_name = client.account_name,
        account_key = account_key, 
        resource_types = ResourceTypes(object=True, container=True),
        permission= AccountSasPermissions(read=True,list=True),
        # start = datetime.now(),
        expiry = datetime.utcnow()   timedelta(hours=4) # Token valid for 4 hours
    )

    
    
    print(container['name'], container['metadata'])
    # print("==========================")
    container_client = client.get_container_client(container.name)
    # print(container_client)
    blobs_list = container_client.list_blobs()
    for blob in blobs_list:
        # Create blob client for source blob
        source_blob = BlobClient(
        client.url,
        container_name = container['name'],
        blob_name = blob.name,
        credential = sas_token
    )
        print(blob.name)
        print("==========================")
    # # ============================= TARGET =======================================

    # Target Client
    target_connection_string = ''
    target_account_key = ''
    source_container_name = source_container_name
    target_blob_name = blob.name
    target_destination_blob = container['name']   today
    print(target_destination_blob)
    
    # Create target client
    target_client = BlobServiceClient.from_connection_string(target_connection_string)
    container = ContainerClient.from_connection_string(target_connection_string, target_destination_blob)
    try:
        container_client = target_client.create_container(target_destination_blob)
    # Create new blob and start copy operation.
    except:
        new_blob = target_client.get_blob_client(target_destination_blob, target_blob_name)
        new_blob.start_copy_from_url(source_blob.url)
    # print(source_blob.url)

This script make a full copy of containers and blob without any error. But when I. go to my target storage, I can see that I have the same containers, container 1 and 3 they have the correct blobs, but container 2 have only 2 blobs, and no matter if I try upload new files into the Source storage and run my script, but the new files never get copied over.

Can anyone please help me to understand this problem?Thank you very much

UPDATE: After some debugging, I found something interesting. In my block of code, I put some print statement to keep track of the loops happening, specifically when it comes to copying blobs.

This is the updated version of my code, to reproduce:

from typing import Container
from azure.storage.blob import BlobClient, BlobServiceClient, ContainerClient
from azure.storage.blob import ResourceTypes, AccountSasPermissions
from azure.storage.blob import generate_account_sas    
from datetime import *

today = str(datetime.now().date())
print(today)




#================================ SOURCE ===============================
# Source Client
connection_string = '' # The connection string for the source container
account_key = '' # The account key for the source container
# source_container_name = 'newblob' # Name of container which has blob to be copied

# Create client
client = BlobServiceClient.from_connection_string(connection_string) 




client = BlobServiceClient.from_connection_string(connection_string)
all_containers = client.list_containers(include_metadata=True)
for container in all_containers:
    # Create sas token for blob
    sas_token = generate_account_sas(
        account_name = client.account_name,
        account_key = account_key, 
        resource_types = ResourceTypes(object=True, container=True),
        permission= AccountSasPermissions(read=True,list=True),
        # start = datetime.now(),
        expiry = datetime.utcnow()   timedelta(hours=4) # Token valid for 4 hours
    )

    
    
    print(container['name'], container['metadata'])
    print("==========================")
    # print("==========================")
    container_client = client.get_container_client(container.name)
    # print(container_client)
    blobs_list = container_client.list_blobs()
    for blob in blobs_list:
        # Create blob client for source blob
        source_blob = BlobClient(
        client.url,
        container_name = container['name'],
        blob_name = blob.name,
        credential = sas_token
    )
        print(blob.name)
        
# # ============================= TARGET =======================================

# Target Client

    target_connection_string = ''
    target_account_key = ''
    source_container_name = container['name']
    target_blob_name = blob.name
    target_destination_blob = container['name']   today
    print(target_destination_blob)

    # Create target client
    target_client = BlobServiceClient.from_connection_string(target_connection_string)
    container = ContainerClient.from_connection_string(target_connection_string, target_destination_blob)
    try:
        container_client = target_client.create_container(target_destination_blob)
        new_blob = target_client.get_blob_client(target_destination_blob, target_blob_name)
        new_blob.start_copy_from_url(source_blob.url)
        print(f"TRY: saving blob {target_blob_name} into {target_destination_blob} ")
    except:
        # Create new blob and start copy operation.
        new_blob = target_client.get_blob_client(target_destination_blob, target_blob_name)
        new_blob.start_copy_from_url(source_blob.url)
        print(f"TRY: saving blob {target_blob_name} into {target_destination_blob} ")
        # print(source_blob.url)

Now when I run the code, I get this output:

==========================
blob1 {}
lastfile.txt
blob12021-09-22
COPY TO
EXCEPT: saving blob lastfile.txt into blob12021-09-22 
==========================
blob2 {}
lastfile.txt
lastupdate.txt
newFile.txt
blob22021-09-22
COPY TO
EXCEPT: saving blob newFile.txt into blob22021-09-22 
==========================
blob3 {}
lastupdate.txt
blob32021-09-22
COPY TO
EXCEPT: saving blob lastupdate.txt into blob32021-09-22 

As I could see, the entire loop, is copying only the last file of the list. Here is where I get confused with multiple loop. Can please please anyone explain me what am I doing wrong, and how to make the loop target each file and copy it to the new storage?. Thank you so much for any help you can provide me with

CodePudding user response:

Firstly, the code is looking good, looks like the problem is occurring when the container has more than one file. So, if there are multiple files in a container get the number of files and run them in a loop to upload each file.

Make sure you have correct function.json:

Below is the sample for copying the blobs in a single storage account.

{
"scriptFile": "__init__.py",
  
"bindings": [
    {
      "type": "blob",
      "direction": "in",
      "name": "blob_client",
      "path": "source/source.txt",
      "connection": "STORAGE_CONN_STR"
    },
    {
      "type": "blob",
      "direction": "out",
      "name": "output_file_dest",
      "path": "target/target.csv",
      "connection": "STORAGE_CONN_STR"
    }
  ]
}

Apply the functionality of below code to your code to copy the blob as soon as you upload in your first storage account.

import logging

import azure.functions as func

from azure.storage.blob import BlobClient,BlobServiceClient

def main():
logging.info(' start Python Main function processed a request.')

#CONNECTION STRING
blob_service_client = BlobServiceClient.from_connection_string(CONN_STR)

# MAP SOURCE FILE
blob_client = blob_service_client.get_blob_client(container="source", blob="source.txt")

#SOURCE CONTENTS
content=  blob_client.download_blob().content_as_text()
    
# WRITE HEADER TO A OUT PUTFILE
output_file_dest = blob_service_client.get_blob_client(container="target", blob="target.csv")

#INITIALIZE OUTPUT               
output_str = ""

#STORE COULMN HEADERS
data= list()
    
data.append(list(["column1", "column2", "column3", "column4"]))

output_str  = ('"'   '","'.join(data[0])   '"\n')

output_file_dest.upload_blob(output_str,overwrite=True)
logging.info(' END OF FILE UPLOAD')

if __name__ == "__main__":
main()

CodePudding user response:

HALF SOLUTION: Thank you everyone for your support and help. I finally solved all the problems with the blob backup process.

This is the code

from typing import Container
from azure.cosmosdb.table.tableservice import TableService,ListGenerator
from azure.storage.blob import BlobClient, BlobServiceClient, ContainerClient
from azure.storage.blob import ResourceTypes, AccountSasPermissions
from azure.storage.blob import generate_account_sas    
from datetime import *

today = str(datetime.now().date())
print(today)




#================================ SOURCE ===============================
# Source Client
connection_string = '' # The connection string for the source container
account_key = '' # The account key for the source container
# source_container_name = 'newblob' # Name of container which has blob to be copied


# Create client
client = BlobServiceClient.from_connection_string(connection_string) 




client = BlobServiceClient.from_connection_string(connection_string)
all_containers = client.list_containers(include_metadata=True)
for container in all_containers:
    # Create sas token for blob
    sas_token = generate_account_sas(
        account_name = client.account_name,
        account_key = account_key, 
        resource_types = ResourceTypes(object=True, container=True),
        permission= AccountSasPermissions(read=True,list=True),
        # start = datetime.now(),
        expiry = datetime.utcnow()   timedelta(hours=4) # Token valid for 4 hours
    )

    
    print("==========================")
    print(container['name'], container['metadata'])
    
    # print("==========================")
    container_client = client.get_container_client(container.name)
    # print(container_client)
    blobs_list = container_client.list_blobs()
    for blob in blobs_list:
        # Create blob client for source blob
        source_blob = BlobClient(
        client.url,
        container_name = container['name'],
        blob_name = blob.name,
        credential = sas_token
    )
        target_connection_string = ''
        target_account_key = ''
        source_container_name = container['name']
        target_blob_name = blob.name
        target_destination_blob = container['name']   today
        print(target_blob_name)
        # print(blob.name)
        target_client = BlobServiceClient.from_connection_string(target_connection_string)
        try:
            container_client = target_client.create_container(target_destination_blob)
            new_blob = target_client.get_blob_client(target_destination_blob, target_blob_name)
            new_blob.start_copy_from_url(source_blob.url)
            print("COPY TO")
            print(f"TRY: saving blob {target_blob_name} into {target_destination_blob} ")
        except:
            # Create new blob and start copy operation.
            new_blob = target_client.get_blob_client(target_destination_blob, target_blob_name)
            new_blob.start_copy_from_url(source_blob.url)
            print("COPY TO")
            print(f"EXCEPT: saving blob {target_blob_name} into {target_destination_blob} ")
        

This is the part regarding the blob storage. What this will do, is loop though all the containers in a given (source) storage account -> container. Then will loop over each blob inside every container encountered. While is getting the list of the blobs, will automatically copy container AND blobs to a target storage account. The container name backup, will be formed as follow: container name today date this approach has been considered for future automation, so if you decide to run the scrip once a week, you will know each Bach to which date refers.

Now I am trying to work out the last bit of this automation, from blob storage I am moving to tables. the code is structured as follow.

from typing import Container
from azure.cosmosdb.table.tableservice import TableService,ListGenerator
from azure.storage.blob import BlobClient, BlobServiceClient, ContainerClient
from azure.storage.blob import ResourceTypes, AccountSasPermissions
from azure.storage.blob import generate_account_sas    
from datetime import *

today = str(datetime.now().date())
print(today)




#================================ SOURCE ===============================
# Source Client
connection_string = '' # The connection string for the source container
account_key = '' # The account key for the source container
# source_container_name = 'newblob' # Name of container which has blob to be copied
table_service_out = TableService(account_name='', account_key='')
table_service_in = TableService(account_name='', account_key='')

# Create client
client = BlobServiceClient.from_connection_string(connection_string) 




client = BlobServiceClient.from_connection_string(connection_string)
all_containers = client.list_containers(include_metadata=True)
for container in all_containers:
    # Create sas token for blob
    sas_token = generate_account_sas(
        account_name = client.account_name,
        account_key = account_key, 
        resource_types = ResourceTypes(object=True, container=True),
        permission= AccountSasPermissions(read=True,list=True),
        # start = datetime.now(),
        expiry = datetime.utcnow()   timedelta(hours=4) # Token valid for 4 hours
    )

    
    print("==========================")
    print(container['name'], container['metadata'])
    
    # print("==========================")
    container_client = client.get_container_client(container.name)
    # print(container_client)
    blobs_list = container_client.list_blobs()
    for blob in blobs_list:
        # Create blob client for source blob
        source_blob = BlobClient(
        client.url,
        container_name = container['name'],
        blob_name = blob.name,
        credential = sas_token
    )
        target_connection_string = ''
        target_account_key = ''
        source_container_name = container['name']
        target_blob_name = blob.name
        target_destination_blob = container['name']   today
        print(target_blob_name)
        # print(blob.name)
        target_client = BlobServiceClient.from_connection_string(target_connection_string)
        try:
            container_client = target_client.create_container(target_destination_blob)
            new_blob = target_client.get_blob_client(target_destination_blob, target_blob_name)
            new_blob.start_copy_from_url(source_blob.url)
            print("COPY TO")
            print(f"TRY: saving blob {target_blob_name} into {target_destination_blob} ")
        except:
            # Create new blob and start copy operation.
            new_blob = target_client.get_blob_client(target_destination_blob, target_blob_name)
            new_blob.start_copy_from_url(source_blob.url)
            print("COPY TO")
            print(f"EXCEPT: saving blob {target_blob_name} into {target_destination_blob} ")
        

#query 100 items per request, in case of consuming too much menory load all data in one time
query_size = 100

#save data to storage2 and check if there is lefted data in current table,if yes recurrence
def queryAndSaveAllDataBySize(tb_name,resp_data:ListGenerator ,table_out:TableService,table_in:TableService,query_size:int):
    for item in resp_data:
        #remove etag and Timestamp appended by table service
        del item.etag
        del item.Timestamp
        print("instet data:"   str(item)   "into table:"  tb_name)
        table_in.insert_or_replace_entity(tb_name,item)
    if resp_data.next_marker:
        data = table_out.query_entities(table_name=tb_name,num_results=query_size,marker=resp_data.next_marker)
        queryAndSaveAllDataBySize(tb_name,data,table_out,table_in,query_size)


tbs_out = table_service_out.list_tables()

for tb in tbs_out:
    #create table with same name in storage2
    table_service_in.create_table(table_name=tb.name, fail_on_exist=False)
    #first query
    data = table_service_out.query_entities(tb.name,num_results=query_size)
    queryAndSaveAllDataBySize(tb.name,data,table_service_out,target_connection_string,query_size)

The last bit is related to the table, but if I run this code the first time, it works just fine, if I run it the second time, it throws this message

Client-Request-ID= Retry policy did not allow for a retry: Server-Timestamp=Wed, 22 Sep 2021 14:47:38 GMT, Server-Request-ID=, HTTP status code=409, Exception=Conflict{"odata.error":{"code":"TableAlreadyExists","message":{"lang":"en-US","value":"The table specified already exists.\nRequestId:\nTime:2021-09-22T14:47:38.7583927Z"}}}

I would like that while the script is running, to check if table exists, to do not create it but just update it if there is any update.

Any help or advice about this?

  • Related