Home > Software engineering >  Find a pattern in a stream of bytes read in blocks
Find a pattern in a stream of bytes read in blocks

Time:09-28

I have a stream of gigabytes of data that I read in blocks of 1 MB.

I'd like to find if (and where) one of the patterns PATTERNS = [b"foo", b"bar", ...] is present in the data (case insensitive).

Here is what I'm doing. It works but it is sub-optimal:

oldblock = b''
while True:
    block = data.read(1024*1024)
    if block == b'':
        break
    testblock = (oldblock   block).lower()
    for PATTERN in PATTERNS:
        if PATTERN in testblock:
            for l in testblock.split(b'\n'):  # display only the line where the 
                if PATTERN in l:              # pattern is found, not the whole 1MB block!
                    print(l)                  # note: this line can be incomplete if 
    oldblock = block                          # it continues in the next block (**)

Why do we need to search in oldblock block? This is because the pattern foo could be precisely split in two consecutive 1 MB blocks:

[.......fo] [o........]
block n     block n 1

Drawback: it's not optimal to have to concatenate oldblock block and to perform the search almost twice as much as necessary.

We could use testblock = oldblock[-max_len_of_patterns:] block, but there is surely a more canonical way to address this problem, as well as the side-remark (**).

How to do a more efficient pattern search in data read by blocks?

CodePudding user response:

  1. if patern matched, try to use "break;" word inside "for" body for breaking execution of already useless code
  2. and use {...} for start and finish "for" loop body like:

for (...) { if match(PATTERN) break; }

CodePudding user response:

How about: Only concat the end of the first block and start of the next block using the length of the pattern that you are currently looking for. Then use a variable (carry) to indicate if you found the pattern or not so that when you move to the next block you automatically print the first line because you already know that line started with the pattern.

E.g.

block_0 = "abcd" block_1 = "efgh"

pattern = "def" length = 3

if pattern in block_0[-length 1:] block_1[:length - 1]

This if statement will check "cdef" for the pattern "def". No need to check any more characters than that because if it isn't in that selection of characters then it isn't between blocks in the first place. Now you know the pattern is across blocks, you just need to print the first line of the next block which will be done by checking the value of carry as seen below.

This should stop you needing to go through the block twice like you said.

oldblock = b''
carry = False
while True:
    block = data.read(1024*1024)
    if block == b'':
        break
    block = block.lower()
    lines =  block.split(b'\n')
    if carry:
        print(lines[0])             
        carry = False
    for PATTERN in PATTERNS:
        if PATTERN in block:
            for l in lines:
                if PATTERN in l:              
                    print(l)
        length = len(PATTERN)
        if PATTERN in (block[-length   1:]   oldblock[:length - 1]):
            carry = True     #Found the PATTERN between blocks, indicate that the first line of the next block needs to be printed      
    oldblock = block     

         

CodePudding user response:

I think the simplest method is to use Python's Memory-mapped file support where a binary file can behave like a file but also like a bytearray. Such memory-mapped files implement a find method or you can use the Python in operator. But for these methods you have to first convert the entire file to lower case. So I was wondering if it would be more efficient to use a regular expression with the re.I (case-insensitive) flag.

For the benchmark below my search strings contain a value that I know will not be matched thus requiring the entire 25MB file to be search. Also, the regular expression benchmark (Method 3) uses re.finditer to ensure we attempt try to match all search strings. I could also create individual regular expressions for each search string and then used the search method to find the first match for each (Method 4). I ran the benchmark multiple times to ensure whatever caching of the input file the operating system is performing has been done. Otherwise, the first method would be running at a disadvantage since no caching would have been done yet.

import mmap
import re

PATTERNS = [b'foo', b'bar', b'booboobooboobooboo']
# Single regular expressions to be used with re.iter:
rex = re.compile(b'|'.join(re.escape(pattern) for pattern in PATTERNS), flags=re.I)

# Multiple regular expressions to be used with re.search
regular_expressions = [re.compile(re.escape(pattern), flags=re.I) for pattern in PATTERNS]

FILE_PATH = '/download/php-7.4.30-Win32-vc15-x64.zip'

import time
with open(FILE_PATH, 'rb') as f:
    with mmap.mmap(f.fileno(), 0, access=mmap.ACCESS_READ) as mm:
        # Method 1:
        print('Method 1')
        t = time.time()
        b = mm[:].lower()
        for pattern in PATTERNS:
            if pattern in b:
                print('found', pattern)
        print(time.time() - t)

        # Method 2:
        print('\nMethod 2')
        t = time.time()
        b = mm[:].lower()
        for pattern in PATTERNS:
            offset = b.find(pattern)
            if offset != -1:
                print('found', pattern, 'at offset', offset)
        print(time.time() - t)

        # Method 3:
        print('\nMethod 3')
        t = time.time()
        found_set = set()
        for m in rex.finditer(mm):
            found = m[0].lower()
            if found not in found_set:
                found_set.add(found)
                offsets = m.span()
                print('Matched:', found, 'first found at offset', offsets[0])
        print(time.time() - t)

        # Method 4:
        print()
        t = time.time()
        for regular_expression in regular_expressions:
            m = regular_expression.search(mm)
            if m:
                found = m[0].lower()
                offsets = m.span()
                print('Matched:', found, 'first found at offset', offsets[0])
        print(time.time() - t)

Prints:

Method 1
found b'foo'
found b'bar'
0.036002397537231445

Method 2
found b'foo' at offset 1770532
found b'bar' at offset 1209764
0.03198432922363281

Method 3
Matched: b'bar' first found at offset 1209764
Matched: b'foo' first found at offset 1770532
1.0430107116699219

Matched: b'foo' first found at offset 1770532
Matched: b'bar' first found at offset 1209764
0.35400843620300293

Conclusion

Using one or more regular expressions does not perform as well as first converting the memory mapped file to lower case and using either the mmap.find method or the in operator, which perform more or less equally with perhaps a slight advantage to the mmap.find method (which also will give you the offset where the string was found).

Final Solution

Note: If you are searching a text file that may include characters not in the ascii character set and is therefore encoded, for example as utf-8, then you will obviously need to encode your search strings that contain non-ascii characters. It seems to me there is then a possibility of matching one of your search strings with a sequence of bytes that does not begin on an encoded Unicode character boundary.

import mmap

# We are assuming our text file is encoded with utf-8.
# Strings that contain only ascii characters can be defined as byte strings directly:
PATTERNS = [b'foo', b'bar', 'coûteux'.encode('utf-8')]

FILE_PATH = 'test.txt'

with open(FILE_PATH, 'rb') as f:
    with mmap.mmap(f.fileno(), 0, access=mmap.ACCESS_READ) as mm:
        b = mm[:].lower()
        for pattern in PATTERNS:
            offset = b.find(pattern)
            if offset != -1:
                print('found', pattern.decode('utf-8'), 'at offset', offset)

Prints:

found foo at offset 24
found bar at offset 29
found coûteux at offset 8

CodePudding user response:

I didn't do any benchmarks but this solution has the definite advantage of being straight forward and not looking everywhere twice, print the lines as they actually appear in the stream (and not in all lower case) and print the complete lines even if they cross a block:

import re

regex_patterns = list(re.compile('^.*' re.escape(pattern) '.*$',re.I|re.M) for pattern in PATTERNS)

testblock = ""
block = data.read(1024*1024) # **see remark below**
while len(block)>0:
    lastLineStart = testblock.rfind('\n') 1
    testblock = testblock[lastLineStart:] block
    for pattern in regex_patterns:
        for line in pattern.findall(testblock):
            print(line)
    block = data.read(1024*1024) # **see remark below**

Remark: Since you are processing text data here (otherwise the notion of "lines" wouldn't make any sense), you shouldn't be using b'...' anywhere. Your text in the stream has some encoding and you should read it in a way that honours that encoding (instead of data.read(1024*1024)) so that the loops are operating on real (Python internal unicode) strings and not some byte data. Not getting that straight is one of the most frustratingly difficult bugs to find in each and every Python script.

  • Related