طرح‌چه

ایندکس داده ها با استفاده از Maxwell بر روی Typesense برای ایجاد سرویس جستجوی سریع | CDC pipeline

۱ شهریور ۱۴۰۴

ایندکس داده ها با استفاده از Maxwell بر روی Typesense برای ایجاد سرویس جستجوی سریع | CDC pipeline
با استفاده از maxwell تغییرات اطلاعات رو از دیتابیس اصلی می خوانیم و بر روی RabbitMQ قرار می دهیم و در نهایت یک سرویس indexer ایجاد می کنیم تا مسیج هایی که روی RabbitMQ قرار گرفته اند را پردازش و اطلاعات را بر روی Typesense ایندکس کند.

در این قسمت از یک سرویس پیش ساخته کوچک برای مدیریت وبلاگ استفاده می کنیم و با استفاده از CDC یک سرچ سریع بر روی این پروژه پیاده سازی میکنیم. 

معرفی اولیه

سرویس ما یک وبلاگ ساده است که قابلیت ایجاد, حذف و ویرایش محتوا را به ما میدهد. این پروژه از Golang به عنوان زبان بکند و از MySQL به عنوان دیتابیس اصلی استفاده میکند. ما با استفاده از CDC میتوانیم تغییرات اطلاعات را از جدول blog بخوانیم و این تغییرات را بر روی دیتابیس typesense ذخیره کنیم.

سورس کد نهایی پروژه را میتوانید در گیت هاب ببینید و دانلود کنید. برای اجرای پروژه ابتدا با استفاده از ترمینال به پوشه پروژه بروید و سپس با استفاده از Docker با اجرای دستور زیر , پروژه را اجرا کنید:

docker compose up -d

و در نهایت میتوانید پروژه را با استفاده از دستور زیر خاموش کنید:

docker compose down --remove-orphans

اگر با داکر آشنا نیستید یا اون رو نصب نکردید بهتره که ابتدا داکر را نصب کنید و همچنین فرصت خوبی خواهد بود تا کمی با داکر آشنا شوید.

و اگر روی سیستم makefile دارید میتونید درون پوشه پروژه دستور های زیر را برای راه اندازی و خاموش کردن سرویس ها استفاده کنید:

// راه اندازی پروژه
make up

// خاموش کردن سرویس ها
make down

معماری پروژه

قبل از شروع به پیاده سازی  ساختار نهایی چیزی که میخواهیم داشته باشیم رو بررسی کنیم.

ما در مجموع از ابزاری زیر استفاده خواهیم کرد تا پروژه خودمون رو بسازیم:

  • از MySQL به عنوان دیتابیس اصلی برای ذخیره سازی و نگهداری محتوا استفاده میکنیم.

  • از دیتابیس Typesense برای ایندکس کردن full text محتوا به منظور استفاده در سرچ استفاده می کنیم. بدین صورت اپلیکیشن ما میتونه خیلی سریع محتوای مورد نظر را جستجو کند و نتایج را برای کاربرهای اپلیکیشن ما فراهم کند.

  • از RabbitMQ به عنوان صف یا queue استفاده می کنیم. بدین صورت تغییراتی که روی دیتابیس اصلی انجام می شوند را درون این صف (queue) قرار می دهیم و سپس با پردازش مسیج های درون صف, اطلاعات را روی Typsense بروز رسانی می کنیم. 

    بدین صورت بدون اینکه فشاری به دیتابیس اصلی ما وارد شود تمامی اطلاعات را روی Typesense به صورت آنی ایندکس خواهیم کرد.

  • از Maxwell برای خواندن لاگ های MySQL و تبدیل لاگ ها به مسیج بر روی RabbitMQ استفاده میکنیم. بدین صورت تغییراتی که بر روی اطلاعات در MySQL انجام میدهیم به صورت مسیج هایی درون RabbitMQ ذخیره میشوند.

  • یک اپلیکیشن indexer میسازیم که مسیج های نوشته شده بر روی RabbitMQ را بخواند و بر اساس این مسیج ها اطلاعات را بر روی Typsense ایندکس و آپدیت کند.

  • اپلیکیشن اصلی ما که وظایف آن به دو قسمت اصلی تقسیم میشوند:

    • عملکرد های مرتبط با دشبورد برای مدیریت پست های بلاگ که اطلاعات را بر روی دیتابیس اصلی (در اینجا MySQL) ذخیره و بروز رسانی میکند.

    • عملکردهای عمومی از جمله صفحه اصلی, ادامه مطلب پست ها و جستجو درون مطالب که اطلاعات را از Typesense میخوانند و نمایش میدهند.

شکل کلی اپلیکیشن ما به صورت زیر خواهد بود:

MySQL CDC indexer architecture
معماری نهایی پروژه

راه اندازی Maxwell بر روی MySQL

برای راه اندازی Maxwell بر روی MySQL طبق مستندات نیازمند انجام مراحل زیر هستیم: 

  • فعالسازی binlog در MySQL و تنظیم لاگ ها با فرمت Row  
  • ایجاد دسترسی های مورد نیاز برای Maxwell.
  • فعال سازی تنظیمات مربوط به لود اولیه اطلاعات (bootstrap).
  • اجرا کردن maxwell و bootstraper

فعالسازی ‌Binlog در MySQL و تنظیم لاگ ها با فرمت Row

برای اینکار میتوانیم یک فایل با پسوند conf در مسیر /etc/mysql/conf.d/ بسازیم. مثلا من این فایل با نام cdc.conf به صورت /etc/mysql/conf.d/cdc.conf میسازم.

سپس تنظیمات زیر را درون آن قرار میدهیم:

[mysqld]
server_id=1
binlog_format=row
log-bin=mysql-bin

# Only log `blog` DB
# Remove this line if you want to log everything.
# Or add a new line like this if you want to also log another specific DB
binlog_do_db=blog

در تنظیمات بالا فرمت لاگ را بر روی row قرار دادیم و با استفاده از log-bin لاگ ها را فعال کردیم. هرگاه که از دستور log-bin استفاده کنیم باید یک آیدی نیز به سرور MySQL تخصیص بدیم. در اینجا ما عدد 1 را به عنوان آیدی به این سرور MySQL تخصیص دادیم. در صورتی که چندین سرور دارید یا رپلیکا دارید باید آیدی های متفاوتی استفاده کنید.

در خط آخر مشخص کردیم که تنها لاگ ها برای دیتابیس blog فعال باشند. به صورت پیش فرض با استفاده از log-bin لاگ برای تمامی دیتایبس ها فعال می شود. با استفاده از ‍bin_do_db میتوانیم فعال بودن لاگ را برای دیتابیس های مورد نظرمون محدود کنیم. به ازای هر دیتابیس میتوانید یکبار این خط را تکرار کنید. 

با حذف تمامی خطوطی که bin_do_db در آنها وجود دارد میتوانیم لاگ را برای تمامی دیتابیس ها فعال کنیم.

خیلی وقت ها این تنظیمات به صورت پیش فرض فعال هستن و نیاز نیست دوباره این تنظیمات رو انجام بدیم! برای اینکه ببینید فعال هستن یا نه میتونید کوئری های زیر رو بر روی دیتابیس فعلی خودتون اجرا کنید و نتیجه رو بررسی کنید:

SHOW VARIABLES LIKE 'binlog_format'; -- should be ROW
SHOW VARIABLES LIKE 'server_id'; -- should show a number
SHOW VARIABLES LIKE 'log_bin'; -- should be ON

ایجاد دسترسی های مورد نیاز برای Maxwell

برای اینکار باید به دیتابیس خود وصل شوید و دستورات زیر را درون دیتابیس اجرا کنید:

-- Create Maxwell user for CDC
CREATE USER IF NOT EXISTS 'maxwell'@'%' IDENTIFIED BY 'maxwell123';

-- Grant Maxwell database permissions (for storing state)
GRANT ALL ON maxwell.* TO 'maxwell'@'%';

-- Grant replication permissions to all database tables for CDC
GRANT SELECT, REPLICATION CLIENT, REPLICATION SLAVE ON *.* TO 'maxwell'@'%';

-- Maxwell schema (required for Maxwell to track its state)
CREATE DATABASE IF NOT EXISTS maxwell;

-- Flush privileges
FLUSH PRIVILEGES;

در خط اول به جای maxwell123 میتوانید پسوورد مورد نظر خودتان را قرار دهید. با اجرای دستورات بالا یک کاربر با نام maxwell و همچنین یک دیتابیس با نام maxwell ایجاد می شود. این کاربر دسترسی کامل به دیتابیس maxwell را خواهد داشت. همچنین دسترسی خواندن و رپلیکیشن را به این کاربر بر روی تمامی دیتابیس ها می دهیم. چنانچه میخواهید maxwell به تمامی تنها به دیتابیس blog دسترسی داشته باشد میتوانید بجای *.* از blog.* استفاده کنید.

لود اولیه اطلاعات یا Bootstrap چیست؟

بعد از فعال‌سازی Maxwell، این ابزار تغییراتی که در پایگاه داده اتفاق می‌افتد (مثل INSERT، UPDATE و DELETE) را شناسایی کرده و هر تغییر را به صورت یک پیام (معمولاً در قالب JSON) تولید می‌کند. 

این پیام‌ها سپس به RabbitMQ ارسال می‌شوند تا سرویس‌های دیگر بتوانند از آن‌ها استفاده کنند. با این حال، Maxwell به طور پیش‌فرض فقط تغییرات جدید را از زمان فعال‌سازی به بعد منتشر می‌کند. به همین دلیل داده‌هایی که از قبل در پایگاه داده وجود داشته‌اند به RabbitMQ منتقل نمی‌شوند.  برای پوشش این موضوع قابلیتی به نام Bootstrap وجود دارد. با استفاده از Bootstrap می‌توان داده‌های موجود در جداول انتخابی را نیز استخراج کرد و مانند تغییرات جدید به RabbitMQ فرستاد. 

در اینجا, ما از این قابلیت استفاده می‌کنیم تا داده هایی را که تا قبل از فعالسازی Maxwell وجود داشته اند را روی Typesense ایندکس کنیم.

اجرا کردن maxwell و Bootstraper

بعد از انجام تنظیمات میتونیم maxwell و bootstraper اون رو به صورت جداگانه اجرا کنیم:

bin/maxwell
    --user=maxwell
    --password=maxwell123
    --host=mysql
    --port=3306
    --producer=rabbitmq
    --rabbitmq_host=rabbitmq
    --rabbitmq_port=5672
    --rabbitmq_user=admin
    --rabbitmq_pass=admin123
    --rabbitmq_exchange=maxwell
    --rabbitmq_exchange_type=fanout
    --rabbitmq_exchange_durable=true
    --rabbitmq_routing_key_template=%db%.%table%
    --log_level=info
    --bootstrapper=sync
    --filter="exclude: *.*, include: blog.posts"

bin/maxwell-bootstrap
    --user=maxwell
    --password=maxwell123
    --host=mysql
    --port=3306
    --database=blog
    --table=posts

در بالا دوتا دستور به صورت جداگانه اجرا کردیم. دستور اول maxwell رو به صورت long running اجرا خواهد کرد بدین معنی که این سرویس به صورت همیشه در حال اجرا خواهد ماند و تغییرات دیتابیس رو بر روی RabbitMQ خواهد فرستاد.

دستور دوم maxwell-bootstrap هست که با اجرا کردن اون یک رکورد در جدول bootstrap ایجاد می شود که باعث میشود maxwell شروع به لود اولیه اطلاعات کند. اینکار را میتوانید خودتان به صورت دستی انجام دهید, در اینصورت نیازی به دستور maxwell-bootstrap نخواهید داشت.

maxwell-bootstrap command creates a record in maxwell.bootstrap table
رکورد ایجاد شده در جدول bootstrap

نکته: در صورتی که از پروژه ای ایجاد شده در گیت هاب ما استفاده میکنید بعد از اجرا کردن پروژه (در ابتدای این پست به نحوه اجرای آن اشاره شده) میتوانید با مراجعه به آدرس http://localhost:8081/ دشبورد PHPMyAdmin رو ببینید و با استفاده از این دشبورد ساختار دیتابیس رو بررسی کنید.

بعد از اجرای bootstrap و maxwell در صورتی که در دیتابیس فعلی اطلاعاتی داشته باشیم یا تغییراتی بر روی اطلاعات اعمال کنیم, باید بتونید در دشبورد RabbitMQ مسیج هایی که ارسال شده اند را در exchange با نام maxwell ببینید.

article figures

در نمودار بالا میتونیم ببینیم که در یک بازه زمانی کوتاه 60 مسیج در ثانیه به RabbitMQ ارسال شده اند. در ادامه یک سرویس ایجاد میکنیم که این مسیج ها پردازش کند و نتیجه را بر روی Typesense ایندکس کند.

نکته: در صورتی که از پروژه ی ایجاد شده در گیت هاب ما استفاده میکنید, بعد از اجرای پروژه میتونید به آدرس http://localhost:15672/ برید و با یوزرنیم admin و پسوورد admin123 وارد شوید.

ایجاد سرویس Indexer

این سرویس مسئول پردازش مسیج هایی است که Maxwell بر روی RabbitMQ مینویسد. پردازش هر مسیج شامل ایجاد , ویرایش یا حذف ایندکس می شود.

  •  اگر کاربر محتوای جدیدی ایجاد کند آن محتوا به صورت مسیج بر روی RabbitMQ قرار میگیرد و سپس سرویس indexer با پردازش این مسیج یک ایندکس جدید بر روی Typesense ایجاد میکند.

  • اگر کاربر محتوای فعلی را ویرایش کند این ویرایش به صورت یک مسیج بر روی RabbitMQ قرار میگیرد و سپس سرویس ایندکسر با پردازش این مسیج , محتوای ایندکس شده ی فعلی را بر روی Typesense بروز رسانی میکند.

  • و در نهایت در صورت حذف محتوا , با پردازش مسیج قرار گرفته بر روی RabbitMQ ایندکس متناظر با آن مسیج از روی typesense حذف می شوند.
// StartCDC starts the CDC pipeline
func (s *CDCService) StartCDC(ctx context.Context, queueName string) error {
	log.Printf("Starting CDC service, listening to queue: %s", queueName)

	// Connect to RabbitMQ
	if err := s.messageQueue.Connect(); err != nil {
		return fmt.Errorf("failed to connect to RabbitMQ: %w", err)
	}
	defer s.messageQueue.Close()

	// Connect to Typesense√
	if err := s.searchIndex.Connect(); err != nil {
		return fmt.Errorf("failed to connect to Typesense: %w", err)
	}
	defer s.searchIndex.Close()

	// Ensure the posts collection exists
	if err := s.ensurePostsCollection(); err != nil {
		return fmt.Errorf("failed to ensure posts collection: %w", err)
	}

	// Start consuming messages
	return s.messageQueue.ConsumeMessages(queueName, s.handleMessage)
}

// handleMessage processes individual messages from RabbitMQ
func (s *CDCService) handleMessage(message []byte) error {
	// Parse the CDC event
	event, err := domain.FromJSON(message)
	if err != nil {
		log.Printf("Failed to parse CDC event: %v", err)
		return err
	}

	// Validate the event
	if !event.IsValid() {
		log.Printf("Invalid CDC event: %+v", event)
		return fmt.Errorf("invalid CDC event")
	}

	// Only process posts table events
	if event.Table != "posts" {
		log.Printf("Skipping non-posts table event: %s.%s", event.Database, event.Table)
		return nil
	}

	// Process the event based on its type
	switch event.Type {
	case domain.EventTypeInsert, domain.EventTypeUpdate, domain.EventTypeBootstrapInsert:
		return s.handleUpsert(event)
	case domain.EventTypeDelete:
		return s.handleDelete(event)
	case domain.EventTypeBootstrapStart:
		log.Printf("Bootstrap insert event received")
		return nil
	case domain.EventTypeBootstrapComplete:
		log.Printf("Bootstrap complete event received")
		return nil
	default:
		log.Printf("Unknown event type: %s", event.Type)
		return fmt.Errorf("unknown event type: %s", event.Type)
	}
}

با توجه به کدهای بالا ما بر حسب نوع مسیجی که در RabbitMQ قرار گرفته عملیات ایجاد , ویرایش یا حذف ایندکس را در Typesense انجام می دهیم. کدهای کامل را میتوانید درون گیتهاب ببینید.

با اجرا کردن سرویس indexer باید داده ها درون Typesense ایندکس شوند و نتیجه را میتوان درون دشبورد typesense دید.

typesense dashaboard
دشبورد typesense برای دیدن اطلاعات ایندکس شده.

برای مثال در تصویر میبینیم که 290 آیتم در کالکشن posts ایندکس شده اند.

سپس اپلیکیشن اصلی ما میتواند در بین اطلاعات ایندکس شده جستجو انجام دهد و نتایج را به کاربران نمایش دهد:

article figures

نکته: در صورتی که از پروژه ی ایجاد شده در گیت هاب ما استفاده میکنید, بعد از اجرای پروژه میتونید به آدرس http://localhost:8085/ برید تا سرویس وبلاگ رو ببینید.

در تصویر بالا درون سرچ هرچیزی را که جستجو کنیم درصورت وجود داشتن, لیست مطالب مرتبط به ما نمایش داده خواهد شد.


قسمت قبل: معرفی و بررسی اولیه | CDC pipeline

قسمت بعد: ایندکس داده ها با استفاده از Debezium بر روی Typesense برای ایجاد سرویس جستجوی سریع | CDC pipeline


دیدگاه ها