Home > Back-end >  Instagram Multiprocessing with Selenium
Instagram Multiprocessing with Selenium

Time:05-23

Hey all trust that you're well, I'm trying to run multiple Instagram accounts parallel using Multiprocessing and Selenium, how would I go about logging into different accounts with every process created.

I tried using a json file with a while loop however I haven't made much progress.

I would appreciate any help I can get.

import time
import json

from selenium import webdriver
from multiprocessing import Pool

f = open('accounts.json',)
datas = json.load(f)

def get_data(url):

    Options = webdriver.ChromeOptions()
    mobile_emulation = {
        "userAgent": "Mozilla/5.0 (Linux; Android 4.2.1; en-us; Nexus 5 Build/JOP40D) AppleWebKit/535.19 (KHTML, like Gecko) Chrome/101.0.4951.64 Mobile Safari/535.19" }
    Options.add_experimental_option("mobileEmulation", mobile_emulation)
    Options.add_argument("--log-level=3")

    bot = webdriver.Chrome(options=Options, executable_path="chromedriver.exe")
    bot.set_window_size(500, 768)
    bot.get(url=url)
    
    time.sleep(10)

    # Login section==========================
    print('Logging in...')
    bot.find_element_by_xpath(
        '//*[@id="react-root"]/section/main/article/div/div/div/div[3]/button[1]').click()
    time.sleep(5)
    username_field = bot.find_element_by_xpath(
        '//*[@id="loginForm"]/div[1]/div[3]/div/label/input')
    username_field.send_keys(data["username"])
    time.sleep(5)
    password_field = bot.find_element_by_xpath(
        '//*[@id="loginForm"]/div[1]/div[4]/div/label/input')
    password_field.send_keys(data["password"])
    time.sleep(5)
    bot.find_element_by_xpath(
        '//*[@id="loginForm"]/div[1]/div[6]/button').click()
    time.sleep(6)
    bot.close()
    bot.quit()


if __name__ == '__main__':
    process_count = int(input("Enter the number of processes: "))
    url = "https://www.instagram.com/"
    urls_list = [url] * process_count
    print(urls_list)
    p = Pool(processes=process_count)
    p.map(get_data, urls_list)
    while True:
        for data in datas:
            get_data()

CodePudding user response:

This script uses threading (instead of multiprocessing) to open multiple independent windows (instances) of the browser. The code contained in the function test_instance is run simultaneously in each window.

import time
from selenium import webdriver
import threading
import json

def test_instance(data):
    Options = webdriver.ChromeOptions()
    mobile_emulation = {"userAgent": "Mozilla/5.0 (Linux; Android 4.2.1; en-us; Nexus 5 Build/JOP40D) AppleWebKit/535.19 (KHTML, like Gecko) Chrome/101.0.4951.64 Mobile Safari/535.19"}
    Options.add_experimental_option("mobileEmulation", mobile_emulation)
    Options.add_argument("--log-level=3")

    bot = webdriver.Chrome(options=Options, executable_path="chromedriver.exe")
    bot.set_window_size(500, 768)
    bot.get("https://www.instagram.com/")
    
    time.sleep(10)

    # Login section==========================
    print('Logging in...')
    bot.find_element_by_xpath('//*[@id="react-root"]/section/main/article/div/div/div/div[3]/button[1]').click()
    time.sleep(5)
    username_field = bot.find_element_by_xpath('//*[@id="loginForm"]/div[1]/div[3]/div/label/input')
    username_field.send_keys(data['username'])
    time.sleep(5)
    password_field = bot.find_element_by_xpath('//*[@id="loginForm"]/div[1]/div[4]/div/label/input')
    password_field.send_keys(data['password'])
    time.sleep(5)
    bot.find_element_by_xpath('//*[@id="loginForm"]/div[1]/div[6]/button').click()
    time.sleep(6)
    
    bot.quit()

f = open('accounts.json',)
data = json.load(f)
f.close()
process_count = 2 # number of tests to run (each test open a separate browser)
thread_list = []

# Start test
for i in range(process_count):
    t = threading.Thread(name=f'Test {i}', target=test_instance, args=[data[i]])
    t.start()
    time.sleep(1)
    print(t.name   ' started')
    thread_list.append(t)

# Wait for all threads to complete
for thread in thread_list:
    thread.join()

print('Test completed')
  • Related