Home > Net >  How works Semaphore Python
How works Semaphore Python

Time:03-23

I want to check the reachability of about 100 ips addresses and set a limit of concurrent tasks with semaphore. But now I'm not sure how this works exactly or why it doesn't work in the code example. As I could observe the function "task_reachable" is still executed correctly. if no address is reachable, then in the "try_ssh_connection" "all" tasks are executed in parallel and this makes the code incredibly slow.

class test_class():
    def __init__(self):
        self.username = "username"
        self.password = "password"

        self.ips = open("open_ip_list")

    def create_async(self):

        asyncio.run(self.create_tasks())

    async def boundary_task(self,ip):
        sem = asyncio.Semaphore(2)
        async with sem:
            return await self.task_reachable(ip)

    async def create_tasks(self):
        timer = Timer(text=f" task time: {{:.1f}}")
        timer.start()
        tasks = [
            asyncio.ensure_future(self.boundary_task(i))
            for i
            in self.ips
        ]
        await asyncio.gather(*tasks)
        timer.stop()

    async def task_reachable(self, ip):
        url = "http://"   ip.strip("\n")   "/example_website.html"
        session = aiohttp.ClientSession()
        try:
            resp = await session.get(url, ssl=False, timeout = 1)
            await resp.read()
            await session.close()

        except:
            await session.close()
            await self.try_ssh_connection(ip, url)
        await session.close()

    async def try_ssh_connection(self, host, url):
        try:
            ssh = paramiko.SSHClient()
            ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
            ssh.connect(host, username=self.username, password=self.password)
            print("go")

        except paramiko.AuthenticationException:
            print( "Username or Password wrong!")
            await self.stop_fun()

        except OSError:
            print("Network is not reachable")
            await self.stop_fun()

    async def stop_fun(self):
        stop_event = asyncio.Event()
        try:
            stop_event.set()
        except RuntimeError:
            pass

if __name__ == "__main__":
    app = test_class()
    app.create_async()

CodePudding user response:

Your problem is each running instance of boundary_task has its own semaphore.

async def boundary_task(self, ip):
    sem = asyncio.Semaphore(2)

If you want them to all use the same semaphore, all instances of boundary_task need to share it.

async def boundary_task(self, ip, semaphore):
    async with sem:
        return await self.task_reachable(ip)

async def create_tasks(self):
    sem = asyncio.Semaphore(2)
    tasks = [
        self.boundary_task(i, sem)
        for i
        in self.ips
    ]
    await asyncio.gather(*tasks)

Since you are using a class, you could also create the semaphore inside __init__.

def __init__(self):
    ...

    self.sem = asyncio.Semaphore(2)

async def boundary_task(self, ip):
    async with self.sem:
        return await self.task_reachable(ip)
  • Related