MK
摩柯社区 - 一个极简的技术知识社区
AI 面试

Python文件处理与多线程

2023-04-291.2k 阅读

Python文件处理

打开与关闭文件

在Python中,使用内置的open()函数来打开文件。该函数的基本语法为:open(file, mode='r', buffering=-1, encoding=None, errors=None, newline=None, closefd=True, opener=None)

  • file:必需参数,指定要打开的文件路径。可以是相对路径或绝对路径。例如,在当前目录下有一个名为test.txt的文件,相对路径就是test.txt;如果文件在C:\documents目录下,绝对路径就是C:\documents\test.txt
  • mode:可选参数,指定打开文件的模式。常见的模式有:
    • 'r':只读模式,这是默认模式。如果文件不存在,会抛出FileNotFoundError异常。
    • 'w':写入模式。如果文件已存在,会清空文件内容;如果文件不存在,会创建新文件。
    • 'a':追加模式。如果文件存在,在文件末尾追加内容;如果文件不存在,创建新文件。
    • 'x':独占创建模式。如果文件已存在,会抛出FileExistsError异常;如果文件不存在,创建新文件。
    • 'b':二进制模式,可与其他模式组合使用,如'rb'(读取二进制文件)、'wb'(写入二进制文件)。
    • 't':文本模式,这是默认模式,可与其他模式组合使用,如'rt'(等同于'r')。
  • encoding:指定文件的编码格式,常见的如'utf - 8''gbk'等。在处理文本文件时,如果不指定编码,可能会因系统默认编码与文件实际编码不一致而导致乱码问题。

下面是一些打开文件的示例:

# 以只读模式打开文本文件
file = open('test.txt', 'r', encoding='utf - 8')
# 以写入模式打开文本文件
write_file = open('new_file.txt', 'w', encoding='utf - 8')
# 以二进制写入模式打开文件,常用于图片、音频等二进制文件
binary_file = open('image.jpg', 'wb')

文件使用完毕后,应该使用close()方法关闭文件,以释放系统资源。例如:

file = open('test.txt', 'r', encoding='utf - 8')
# 处理文件内容
file.close()

读取文件内容

  1. 读取整个文件内容 使用read()方法可以读取文件的全部内容,并返回一个字符串(对于文本文件)或字节对象(对于二进制文件)。
file = open('test.txt', 'r', encoding='utf - 8')
content = file.read()
print(content)
file.close()
  1. 逐行读取文件内容 使用readline()方法可以逐行读取文件内容,每次调用返回一行内容,包括行末的换行符'\n'
file = open('test.txt', 'r', encoding='utf - 8')
line = file.readline()
while line:
    print(line.strip())  # strip()方法用于去除行末的换行符
    line = file.readline()
file.close()

也可以使用readlines()方法一次性读取文件的所有行,并返回一个列表,列表的每个元素是文件的一行内容。

file = open('test.txt', 'r', encoding='utf - 8')
lines = file.readlines()
for line in lines:
    print(line.strip())
file.close()

另外,还可以直接对文件对象进行迭代,这种方式在内存使用上更高效,特别是对于大文件。

file = open('test.txt', 'r', encoding='utf - 8')
for line in file:
    print(line.strip())
file.close()

写入文件内容

  1. 写入字符串到文件 使用write()方法可以将字符串写入文件。
write_file = open('new_file.txt', 'w', encoding='utf - 8')
write_file.write('这是要写入文件的内容\n')
write_file.write('第二行内容\n')
write_file.close()
  1. 写入列表内容到文件 如果有一个字符串列表,想要将列表中的每个元素作为一行写入文件,可以使用writelines()方法。
lines = ['第一行内容\n', '第二行内容\n', '第三行内容\n']
write_file = open('new_lines_file.txt', 'w', encoding='utf - 8')
write_file.writelines(lines)
write_file.close()

文件指针操作

文件对象有一个文件指针,用于指示当前读写的位置。可以使用seek()方法移动文件指针,使用tell()方法获取当前文件指针的位置。

  1. seek(offset, whence=0)
    • offset:表示要移动的字节数。如果offset为正数,向前移动;如果offset为负数,向后移动。
    • whence:可选参数,指定移动的参考位置,默认为0
      • 0:从文件开头开始计算偏移量。
      • 1:从当前位置开始计算偏移量。
      • 2:从文件末尾开始计算偏移量。

例如,要将文件指针移动到文件末尾前10个字节的位置:

file = open('test.txt', 'r', encoding='utf - 8')
file.seek(-10, 2)
content = file.read()
print(content)
file.close()
  1. tell():返回当前文件指针的位置。
file = open('test.txt', 'r', encoding='utf - 8')
print(file.tell())  # 文件开头位置为0
file.read(5)
print(file.tell())  # 读取5个字符后,指针位置为5
file.close()

上下文管理器with语句

在处理文件时,使用with语句可以更优雅地管理文件的打开和关闭。with语句会在代码块结束时自动关闭文件,无论代码块中是否发生异常。

with open('test.txt', 'r', encoding='utf - 8') as file:
    content = file.read()
    print(content)
# 这里文件已经自动关闭,无需再调用file.close()

同样,在写入文件时也可以使用with语句:

with open('new_file.txt', 'w', encoding='utf - 8') as write_file:
    write_file.write('使用with语句写入的内容\n')

目录操作

  1. 创建目录 使用os模块中的mkdir()函数可以创建一个新目录。
import os

os.mkdir('new_directory')

如果要创建多级目录,可以使用makedirs()函数。

os.makedirs('parent_directory/child_directory')
  1. 删除目录 使用rmdir()函数可以删除一个空目录。
os.rmdir('new_directory')

要删除非空目录及其所有子目录和文件,可以使用shutil模块中的rmtree()函数。

import shutil

shutil.rmtree('parent_directory')
  1. 列出目录内容 使用listdir()函数可以列出指定目录中的所有文件和子目录。
files = os.listdir('.')  # 列出当前目录下的所有文件和子目录
for file in files:
    print(file)
  1. 判断路径是否存在 使用path.exists()函数可以判断指定的路径是否存在。
if os.path.exists('test.txt'):
    print('文件存在')
else:
    print('文件不存在')
  1. 判断是否为目录 使用path.isdir()函数可以判断指定路径是否为一个目录。
if os.path.isdir('new_directory'):
    print('是目录')
else:
    print('不是目录')
  1. 获取文件属性 使用stat()函数可以获取文件的详细属性,如文件大小、修改时间等。
file_stat = os.stat('test.txt')
print('文件大小:', file_stat.st_size, '字节')
print('最后修改时间:', file_stat.st_mtime)

Python多线程

线程基础概念

线程是操作系统能够进行运算调度的最小单位。它被包含在进程之中,是进程中的实际运作单位。一个进程可以包含多个线程,这些线程共享进程的资源,如内存空间、文件描述符等。

与多进程相比,多线程的优点在于:

  • 资源共享:多个线程可以共享进程的资源,减少资源开销。例如,多个线程可以访问同一个全局变量,而在多进程中,每个进程都有自己独立的内存空间。
  • 响应迅速:线程的创建和销毁开销比进程小,能够更快速地启动和停止,对于一些需要快速响应的任务,多线程更为合适。

然而,多线程也存在一些缺点:

  • 线程安全问题:由于多个线程共享资源,可能会出现竞争条件,导致数据不一致等问题。例如,多个线程同时对一个全局变量进行读写操作时,如果没有适当的同步机制,就可能出现数据错误。
  • 调试困难:多线程程序的执行顺序具有不确定性,这使得调试变得更加困难。

threading模块

Python的threading模块提供了对线程的支持,使得创建和管理线程变得相对容易。

  1. 创建线程 有两种常见的方式创建线程:

    • 继承threading.Thread

      import threading
      
      
      class MyThread(threading.Thread):
          def run(self):
              print(f'{self.name}线程正在运行')
      
      
      thread = MyThread()
      thread.start()
      

      在上述代码中,定义了一个继承自threading.ThreadMyThread类,并重写了run()方法。run()方法中的代码就是线程要执行的任务。通过创建MyThread类的实例并调用start()方法来启动线程。start()方法会自动调用run()方法。

    • 使用函数创建线程

      import threading
      
      
      def my_function():
          print('线程正在运行')
      
      
      thread = threading.Thread(target=my_function)
      thread.start()
      

      这里直接使用threading.Thread类创建线程,通过target参数指定线程要执行的函数。

  2. 线程同步

    • 锁(Lock): 锁是一种简单的同步机制,用于防止多个线程同时访问共享资源。当一个线程获取到锁后,其他线程必须等待锁被释放才能获取锁并访问共享资源。

      import threading
      
      lock = threading.Lock()
      shared_variable = 0
      
      
      def increment():
          global shared_variable
          lock.acquire()
          try:
              shared_variable = shared_variable + 1
          finally:
              lock.release()
      
      
      threads = []
      for _ in range(10):
          thread = threading.Thread(target=increment)
          threads.append(thread)
          thread.start()
      
      for thread in threads:
          thread.join()
      
      print('共享变量的值:', shared_variable)
      

      在上述代码中,使用lock.acquire()获取锁,使用lock.release()释放锁。try - finally语句确保无论在try块中是否发生异常,锁都会被释放。

    • 信号量(Semaphore): 信号量是一个计数器,用于控制同时访问共享资源的线程数量。

      import threading
      
      semaphore = threading.Semaphore(3)  # 允许同时3个线程访问
      
      
      def access_resource():
          semaphore.acquire()
          try:
              print(f'{threading.current_thread().name}正在访问资源')
          finally:
              semaphore.release()
      
      
      threads = []
      for _ in range(5):
          thread = threading.Thread(target=access_resource)
          threads.append(thread)
          thread.start()
      
      for thread in threads:
          thread.join()
      

      在上述代码中,Semaphore(3)表示允许同时有3个线程访问共享资源。acquire()方法会减少信号量的计数,release()方法会增加信号量的计数。

    • 事件(Event): 事件是一种简单的线程同步机制,用于线程之间的通信。一个线程可以设置事件,其他线程可以等待事件的发生。

      import threading
      
      event = threading.Event()
      
      
      def waiting_thread():
          print('等待事件发生')
          event.wait()
          print('事件已发生')
      
      
      def setting_thread():
          import time
          time.sleep(2)
          print('设置事件')
          event.set()
      
      
      wait_thread = threading.Thread(target=waiting_thread)
      set_thread = threading.Thread(target=setting_thread)
      
      wait_thread.start()
      set_thread.start()
      
      wait_thread.join()
      set_thread.join()
      

      在上述代码中,waiting_thread线程调用event.wait()方法等待事件发生,setting_thread线程在延迟2秒后调用event.set()方法设置事件。

    • 条件变量(Condition): 条件变量用于复杂的线程同步场景,它结合了锁和事件的功能。线程可以在条件变量上等待特定条件的满足,当条件满足时,其他线程可以通知等待的线程。

      import threading
      
      condition = threading.Condition()
      data_ready = False
      
      
      def producer():
          global data_ready
          with condition:
              print('生产者开始生产数据')
              data_ready = True
              print('数据已生产,通知消费者')
              condition.notify()
      
      
      def consumer():
          with condition:
              print('消费者等待数据')
              condition.wait_for(lambda: data_ready)
              print('消费者开始处理数据')
      
      
      producer_thread = threading.Thread(target=producer)
      consumer_thread = threading.Thread(target=consumer)
      
      consumer_thread.start()
      producer_thread.start()
      
      producer_thread.join()
      consumer_thread.join()
      

      在上述代码中,consumer线程调用condition.wait_for()方法等待data_ready条件为Trueproducer线程在生产数据后调用condition.notify()方法通知等待的consumer线程。

  3. 线程的生命周期 线程的生命周期包括以下几个阶段:

    • 新建(New):当创建一个线程对象时,线程处于新建状态,例如thread = threading.Thread(target=my_function)
    • 就绪(Runnable):调用start()方法后,线程进入就绪状态,等待CPU调度执行。
    • 运行(Running):当CPU调度到该线程时,线程开始执行run()方法中的代码,进入运行状态。
    • 阻塞(Blocked):当线程遇到I/O操作、等待锁、等待事件等情况时,会进入阻塞状态,暂停执行。例如,调用lock.acquire()时如果锁被其他线程持有,线程就会进入阻塞状态等待锁的释放。
    • 死亡(Dead):当run()方法执行完毕或者线程发生异常时,线程进入死亡状态。

多线程与文件处理结合

在实际应用中,可能会遇到需要多线程处理文件的场景。例如,在处理大文件时,可以将文件分块,使用多个线程并行处理每个块,从而提高处理效率。

  1. 多线程读取文件 假设要读取一个大文件,并对每一行进行处理,可以使用多线程来加速这个过程。
import threading


def process_line(line):
    # 这里进行具体的行处理操作,例如打印行内容
    print(line.strip())


def read_file_in_threads(file_path, num_threads):
    lines_per_thread = []
    with open(file_path, 'r', encoding='utf - 8') as file:
        lines = file.readlines()
        total_lines = len(lines)
        lines_per_chunk = total_lines // num_threads
        for i in range(num_threads):
            start = i * lines_per_chunk
            if i == num_threads - 1:
                end = total_lines
            else:
                end = (i + 1) * lines_per_chunk
            lines_per_thread.append(lines[start:end])

    threads = []
    for i in range(num_threads):
        def worker(lines):
            for line in lines:
                process_line(line)

        thread = threading.Thread(target=worker, args=(lines_per_thread[i],))
        threads.append(thread)
        thread.start()

    for thread in threads:
        thread.join()


if __name__ == '__main__':
    read_file_in_threads('large_file.txt', 4)

在上述代码中,首先将文件按行数平均分成num_threads块,然后为每个块创建一个线程来处理其中的行。

  1. 多线程写入文件 在多线程写入文件时,需要注意线程安全问题,避免多个线程同时写入导致数据混乱。可以使用锁来保证同一时间只有一个线程写入文件。
import threading

lock = threading.Lock()


def write_to_file(content):
    with lock:
        with open('output.txt', 'a', encoding='utf - 8') as file:
            file.write(content + '\n')


def writer_thread():
    for i in range(10):
        write_to_file(f'线程写入的内容 {i}')


threads = []
for _ in range(3):
    thread = threading.Thread(target=writer_thread)
    threads.append(thread)
    thread.start()

for thread in threads:
    thread.join()

在上述代码中,通过锁lock来保证每次只有一个线程能够写入文件,从而避免了数据冲突。

  1. 多线程文件操作的注意事项
    • 线程安全:如前面所述,在多线程进行文件操作时,特别是写入操作,一定要注意线程安全问题,合理使用同步机制。
    • 资源竞争:虽然多线程可以提高效率,但也要注意避免过度使用线程导致资源竞争。例如,过多的线程同时读取或写入文件可能会导致磁盘I/O性能下降。
    • 异常处理:在多线程文件操作中,要妥善处理可能出现的异常。例如,在文件读取或写入过程中可能会因为文件不存在、权限不足等原因抛出异常,需要在每个线程的任务函数中进行适当的异常捕获和处理。

线程池

在多线程编程中,频繁地创建和销毁线程会带来一定的开销。线程池是一种有效的解决方案,它可以预先创建一定数量的线程,并将这些线程复用,从而减少线程创建和销毁的开销。

Python的concurrent.futures模块提供了线程池的实现。

  1. 使用ThreadPoolExecutor
import concurrent.futures


def process_task(task):
    # 模拟任务处理
    import time
    time.sleep(1)
    return task * task


tasks = [1, 2, 3, 4, 5]
with concurrent.futures.ThreadPoolExecutor(max_workers=3) as executor:
    results = list(executor.map(process_task, tasks))

print(results)

在上述代码中,使用ThreadPoolExecutor创建了一个线程池,max_workers参数指定了线程池中的最大线程数为3。executor.map()方法将process_task函数应用到tasks列表的每个元素上,并返回结果。

  1. 提交单个任务 也可以使用submit()方法提交单个任务,并通过Future对象获取任务的执行结果。
import concurrent.futures


def process_task(task):
    import time
    time.sleep(1)
    return task * task


with concurrent.futures.ThreadPoolExecutor(max_workers=3) as executor:
    future = executor.submit(process_task, 5)
    result = future.result()

print(result)

在上述代码中,使用submit()方法提交任务,并通过future.result()获取任务的执行结果。如果任务还未完成,result()方法会阻塞直到任务完成。

  1. 线程池与文件处理结合 假设要对一个目录下的所有文件进行某种处理,可以使用线程池来加速这个过程。
import os
import concurrent.futures


def process_file(file_path):
    with open(file_path, 'r', encoding='utf - 8') as file:
        content = file.read()
        # 这里进行具体的文件内容处理,例如统计字符数
        return len(content)


directory = '.'
files = [os.path.join(directory, file) for file in os.listdir(directory) if os.path.isfile(os.path.join(directory, file))]
with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:
    results = list(executor.map(process_file, files))

for i, result in enumerate(results):
    print(f'文件 {files[i]} 的字符数: {result}')

在上述代码中,使用线程池对指定目录下的所有文件进行处理,每个线程负责处理一个文件,并统计文件的字符数。

通过合理运用文件处理和多线程技术,能够有效地提高Python程序在处理文件相关任务时的效率和性能。同时,要注意多线程编程中的线程安全、资源管理等问题,确保程序的正确性和稳定性。