Setting Up A Project

01/30/22·7 min read

Understanding the Basics

Before diving into the technical setup, let's clarify what a data engineer does:

  • Data engineers build and maintain the systems and infrastructure that collect, store, process, and deliver data.
  • They work with databases, data pipelines, ETL (Extract, Transform, Load) processes, and data warehouses.
  • They ensure data quality, reliability, and accessibility for data scientists, analysts, and other stakeholders.

Essential Skills to Develop

As a beginner, focus on developing these core skills:

  1. Programming Languages: Python and SQL are must-haves
  2. Database Systems: Relational (PostgreSQL, MySQL) and NoSQL (MongoDB, Redis)
  3. Data Processing Tools: Apache Spark, Pandas
  4. ETL/ELT Frameworks: Apache Airflow, dbt
  5. Cloud Platforms: AWS, GCP, or Azure basics
  6. Version Control: Git and GitHub

Setting Up Your Development Environment

1. Install Python and Essential Libraries

Start by installing Python (preferably version 3.8+) and these libraries:

# Create a virtual environment
python -m venv data_eng_env
source data_eng_env/bin/activate  # On Windows: data_eng_env\Scripts\activate

# Install essential libraries
pip install pandas numpy sqlalchemy psycopg2-binary apache-airflow dbt-core

2. Set Up a Local Database

Having a local database is crucial for development and testing:

# Installing PostgreSQL (commands vary by OS)
# For Ubuntu
sudo apt-get update
sudo apt-get install postgresql postgresql-contrib

# For macOS with Homebrew
brew install postgresql

3. Install Git and Configure GitHub

# Installing Git
# For Ubuntu
sudo apt-get install git

# For macOS
brew install git

# Configure Git
git config --global user.name "Your Name"
git config --global user.email "your.email@example.com"

Structuring Your First Project

Let's create a simple yet scalable structure for your data engineering project:

my_first_data_project/
├── README.md
├── .gitignore
├── requirements.txt
├── setup.py
├── config/
│   ├── database.ini
│   └── etl_config.yaml
├── src/
│   ├── __init__.py
│   ├── database/
│   │   ├── __init__.py
│   │   ├── models.py
│   │   └── connection.py
│   ├── extract/
│   │   ├── __init__.py
│   │   └── data_sources.py
│   ├── transform/
│   │   ├── __init__.py
│   │   └── transformations.py
│   ├── load/
│   │   ├── __init__.py
│   │   └── loaders.py
│   └── utils/
│       ├── __init__.py
│       └── helpers.py
├── tests/
│   ├── __init__.py
│   ├── test_extract.py
│   ├── test_transform.py
│   └── test_load.py
├── data/
│   ├── raw/
│   ├── processed/
│   └── .gitignore
└── notebooks/
    └── exploratory_analysis.ipynb

Key Components Explanation

  1. README.md: Document your project, its purpose, and how to use it.
  2. requirements.txt: List all the Python dependencies.
  3. src/: Contains your main code, organized by ETL stages.
  4. config/: Holds configuration files (database credentials, ETL parameters).
  5. tests/: Contains unit tests for your code.
  6. data/: Stores your data files (Note: use .gitignore to avoid committing large data files).
  7. notebooks/: Jupyter notebooks for exploration and quick experiments.

Creating a Simple ETL Pipeline

Let's implement a basic ETL pipeline to extract data from a CSV file, transform it, and load it into a PostgreSQL database using SQLAlchemy:

1. Database Setup (src/database/connection.py)

from sqlalchemy import create_engine
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker
import configparser
import os

# Read database configuration
config = configparser.ConfigParser()
config.read('config/database.ini')

# Create SQLAlchemy engine and session
DATABASE_URL = f"postgresql://{config['postgres']['user']}:{config['postgres']['password']}@{config['postgres']['host']}:{config['postgres']['port']}/{config['postgres']['database']}"

engine = create_engine(DATABASE_URL)
SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine)
Base = declarative_base()

def get_engine():
    return engine

def get_db():
    """Get a database session"""
    db = SessionLocal()
    try:
        yield db
    finally:
        db.close()

2. Define Data Models (src/database/models.py)

from sqlalchemy import Column, Integer, String, Float, DateTime
from sqlalchemy.sql import func
from .connection import Base

class Sales(Base):
    __tablename__ = "sales"
    
    id = Column(Integer, primary_key=True, index=True)
    product_name = Column(String, index=True)
    quantity = Column(Integer)
    price = Column(Float)
    total_amount = Column(Float)
    transaction_date = Column(DateTime)
    created_at = Column(DateTime, default=func.now())
    
    def __repr__(self):
        return f"<Sale(id={self.id}, product={self.product_name}, amount={self.total_amount})>"

3. Extract (src/extract/data_sources.py)

import pandas as pd
import os

def extract_from_csv(file_path):
    """Extract data from a CSV file"""
    try:
        df = pd.read_csv(file_path)
        return df
    except Exception as e:
        print(f"Error extracting data from {file_path}: {e}")
        return None

4. Transform (src/transform/transformations.py)

import pandas as pd

def clean_data(df):
    """Clean and transform the data"""
    if df is None:
        return None
    
    # Remove duplicates
    df = df.drop_duplicates()
    
    # Handle missing values
    df = df.fillna(0)  # Fill numeric columns with 0 (adjust as needed)
    
    # Convert date columns (if any)
    if 'transaction_date' in df.columns:
        df['transaction_date'] = pd.to_datetime(df['transaction_date'])
    
    # Calculate total amount if not present
    if 'total_amount' not in df.columns and 'price' in df.columns and 'quantity' in df.columns:
        df['total_amount'] = df['price'] * df['quantity']
    
    return df

5. Load (src/load/loaders.py)

from sqlalchemy.orm import Session
from ..database.connection import get_engine, get_db
from ..database.models import Base, Sales
import pandas as pd

def create_tables():
    """Create database tables if they don't exist"""
    engine = get_engine()
    Base.metadata.create_all(bind=engine)
    print("Database tables created successfully")

def load_to_postgres(df, table_name=None):
    """Load the data to PostgreSQL database using SQLAlchemy"""
    if df is None:
        return False
    
    try:
        # Ensure tables exist
        create_tables()
        
        # Get engine
        engine = get_engine()
        
        # If using SQLAlchemy Core approach (to_sql)
        if table_name:
            df.to_sql(table_name, engine, if_exists='replace', index=False)
            print(f"Data successfully loaded to table: {table_name} using DataFrame.to_sql")
            return True
        
        # If using SQLAlchemy ORM approach for more control
        else:
            # Get database session
            db = next(get_db())
            
            # Convert DataFrame to list of dictionaries
            records = df.to_dict('records')
            
            # Bulk insert records
            sales_objects = [Sales(**record) for record in records]
            db.bulk_save_objects(sales_objects)
            db.commit()
            
            print(f"Data successfully loaded to 'sales' table using SQLAlchemy ORM")
            return True
    
    except Exception as e:
        print(f"Error loading data to PostgreSQL: {e}")
        return False

6. Main Pipeline (src/pipeline.py)

from extract.data_sources import extract_from_csv
from transform.transformations import clean_data
from load.loaders import load_to_postgres
from database.connection import get_engine
import os

def run_pipeline(use_orm=True):
    """Run the ETL pipeline"""
    # Extract
    input_file = os.path.join('data', 'raw', 'sales_data.csv')
    raw_data = extract_from_csv(input_file)
    
    if raw_data is None:
        print("Extraction failed. Pipeline aborted.")
        return False
    
    # Transform
    transformed_data = clean_data(raw_data)
    
    if transformed_data is None:
        print("Transformation failed. Pipeline aborted.")
        return False
    
    # Load (either using ORM approach or to_sql approach)
    if use_orm:
        success = load_to_postgres(transformed_data)
    else:
        success = load_to_postgres(transformed_data, 'sales')
    
    if success:
        print("ETL pipeline completed successfully!")
    else:
        print("ETL pipeline failed at the loading stage.")
    
    return success

if __name__ == "__main__":
    run_pipeline()

Setting Up Configuration (config/database.ini)

[postgres]
host = localhost
port = 5432
database = dataeng_project
user = your_username
password = your_password

Setting Up Version Control

Initialize your Git repository and create your first commit:

# Initialize Git repository
git init

# Add files to staging
git add .

# Commit changes
git commit -m "Initial project setup with basic ETL pipeline"

# Create a new repository on GitHub and link it (replace with your URL)
git remote add origin https://github.com/yourusername/my_first_data_project.git
git branch -M main
git push -u origin main

Best Practices for Beginners

  1. Start Small: Begin with a simple project and gradually add complexity.
  2. Document Everything: Write clear documentation for your code and processes.
  3. Follow Coding Standards: Adopt PEP 8 for Python and consistent SQL formatting.
  4. Prioritize Data Quality: Implement data validation and quality checks.
  5. Learn by Doing: Work on real problems, even if they're small.
  6. Engage with the Community: Join forums, attend meetups, and contribute to open-source.

Conclusion

Setting up your first data engineering project requires attention to structure, best practices, and choosing the right tools. By following this guide, you've laid the groundwork for a scalable, maintainable data pipeline. As you gain experience, you'll refine your approach and discover more advanced techniques and tools to tackle increasingly complex data challenges.

Remember that data engineering is an iterative process. Start simple, focus on delivering value, and continuously improve your skills and project as you progress in your journey.

> share post onX(twitter)