diff options
Diffstat (limited to 'tool/sqlite3_rsync.c')
-rw-r--r-- | tool/sqlite3_rsync.c | 577 |
1 files changed, 473 insertions, 104 deletions
diff --git a/tool/sqlite3_rsync.c b/tool/sqlite3_rsync.c index 34faaf0fd..760559aec 100644 --- a/tool/sqlite3_rsync.c +++ b/tool/sqlite3_rsync.c @@ -46,9 +46,11 @@ struct SQLiteRsync { const char *zOrigin; /* Name of the origin */ const char *zReplica; /* Name of the replica */ const char *zErrFile; /* Append error messages to this file */ + const char *zDebugFile; /* Append debugging messages to this file */ FILE *pOut; /* Transmit to the other side */ FILE *pIn; /* Receive from the other side */ FILE *pLog; /* Duplicate output here if not NULL */ + FILE *pDebug; /* Write debug info here if not NULL */ sqlite3 *db; /* Database connection */ int nErr; /* Number of errors encountered */ int nWrErr; /* Number of failed attempts to write on the pipe */ @@ -70,25 +72,31 @@ struct SQLiteRsync { /* The version number of the protocol. Sent in the *_BEGIN message ** to verify that both sides speak the same dialect. */ -#define PROTOCOL_VERSION 1 +#define PROTOCOL_VERSION 2 /* Magic numbers to identify particular messages sent over the wire. */ +/**** Baseline: protocol version 1 ****/ #define ORIGIN_BEGIN 0x41 /* Initial message */ #define ORIGIN_END 0x42 /* Time to quit */ #define ORIGIN_ERROR 0x43 /* Error message from the remote */ #define ORIGIN_PAGE 0x44 /* New page data */ #define ORIGIN_TXN 0x45 /* Transaction commit */ #define ORIGIN_MSG 0x46 /* Informational message */ +/**** Added in protocol version 2 ****/ +#define ORIGIN_DETAIL 0x47 /* Request finer-grain hash info */ +#define ORIGIN_READY 0x48 /* Ready for next round of hash exchanges */ +/**** Baseline: protocol version 1 ****/ #define REPLICA_BEGIN 0x61 /* Welcome message */ #define REPLICA_ERROR 0x62 /* Error. Report and quit. */ #define REPLICA_END 0x63 /* Replica wants to stop */ #define REPLICA_HASH 0x64 /* One or more pages hashes to report */ #define REPLICA_READY 0x65 /* Read to receive page content */ #define REPLICA_MSG 0x66 /* Informational message */ - +/**** Added in protocol version 2 ****/ +#define REPLICA_CONFIG 0x67 /* Hash exchange configuration */ /**************************************************************************** ** Beginning of the popen2() implementation copied from Fossil ************* @@ -796,11 +804,49 @@ static void hashFunc( sqlite3_result_blob(context, HashFinal(&cx), 160/8, SQLITE_TRANSIENT); } +/* +** Implementation of the agghash(X) function. +** +** Return a 160-bit BLOB which is the hash of the concatenation +** of all X inputs. +*/ +static void agghashStep( + sqlite3_context *context, + int argc, + sqlite3_value **argv +){ + HashContext *pCx; + int eType = sqlite3_value_type(argv[0]); + int nByte = sqlite3_value_bytes(argv[0]); + if( eType==SQLITE_NULL ) return; + pCx = (HashContext*)sqlite3_aggregate_context(context, sizeof(*pCx)); + if( pCx==0 ) return; + if( pCx->iSize==0 ) HashInit(pCx, 160); + if( eType==SQLITE_BLOB ){ + HashUpdate(pCx, sqlite3_value_blob(argv[0]), nByte); + }else{ + HashUpdate(pCx, sqlite3_value_text(argv[0]), nByte); + } +} +static void agghashFinal(sqlite3_context *context){ + HashContext *pCx = (HashContext*)sqlite3_aggregate_context(context, 0); + if( pCx ){ + sqlite3_result_blob(context, HashFinal(pCx), 160/8, SQLITE_TRANSIENT); + } +} + /* Register the hash function */ static int hashRegister(sqlite3 *db){ - return sqlite3_create_function(db, "hash", 1, + int rc; + rc = sqlite3_create_function(db, "hash", 1, SQLITE_UTF8 | SQLITE_INNOCUOUS | SQLITE_DETERMINISTIC, 0, hashFunc, 0, 0); + if( rc==SQLITE_OK ){ + rc = sqlite3_create_function(db, "agghash", 1, + SQLITE_UTF8 | SQLITE_INNOCUOUS | SQLITE_DETERMINISTIC, + 0, 0, agghashStep, agghashFinal); + } + return rc; } /* End of the hashing logic @@ -838,6 +884,25 @@ static void logError(SQLiteRsync *p, const char *zFormat, ...){ p->nErr++; } +/* +** Append text to the debugging mesage file, if an that file is +** specified. +*/ +static void debugMessage(SQLiteRsync *p, const char *zFormat, ...){ + if( p->zDebugFile ){ + if( p->pDebug==0 ){ + p->pDebug = fopen(p->zDebugFile, "wb"); + } + if( p->pDebug ){ + va_list ap; + va_start(ap, zFormat); + vfprintf(p->pDebug, zFormat, ap); + va_end(ap); + fflush(p->pDebug); + } + } +} + /* Read a single big-endian 32-bit unsigned integer from the input ** stream. Return 0 on success and 1 if there are any errors. @@ -1192,6 +1257,13 @@ static void closeDb(SQLiteRsync *p){ ** nPage, and szPage. Then enter a loop responding to message from ** the replica: ** +** REPLICA_BEGIN iProtocol +** +** An optional message sent by the replica in response to the +** prior ORIGIN_BEGIN with a counter-proposal for the protocol +** level. If seen, try to reduce the protocol level to what is +** requested and send a new ORGIN_BEGIN. +** ** REPLICA_ERROR size text ** ** Report an error from the replica and quit @@ -1202,30 +1274,42 @@ static void closeDb(SQLiteRsync *p){ ** ** REPLICA_HASH hash ** -** The argument is the 20-byte SHA1 hash for the next page -** page hashes appear in sequential order with no gaps. +** The argument is the 20-byte SHA1 hash for the next page or +** block of pages. Hashes appear in sequential order with no gaps, +** unless there is an intervening REPLICA_CONFIG message. +** +** REPLICA_CONFIG pgno cnt +** +** Set counters used by REPLICA_HASH. The next hash will start +** on page pgno and all subsequent hashes will cover cnt pages +** each. Note that for a multi-page hash, the hash value is +** actually a hash of the individual page hashes. ** ** REPLICA_READY ** ** The replica has sent all the hashes that it intends to send. ** This side (the origin) can now start responding with page -** content for pages that do not have a matching hash. +** content for pages that do not have a matching hash or with +** ORIGIN_DETAIL messages with requests for more detail. */ static void originSide(SQLiteRsync *p){ int rc = 0; int c = 0; unsigned int nPage = 0; - unsigned int iPage = 0; + unsigned int iHash = 1; /* Pgno for next hash to receive */ + unsigned int nHash = 1; /* Number of pages per hash received */ + unsigned int mxHash = 0; /* Maximum hash value received */ unsigned int lockBytePage = 0; unsigned int szPg = 0; - sqlite3_stmt *pCkHash = 0; - sqlite3_stmt *pInsHash = 0; + sqlite3_stmt *pCkHash = 0; /* Verify hash on a single page */ + sqlite3_stmt *pCkHashN = 0; /* Verify a multi-page hash */ + sqlite3_stmt *pInsHash = 0; /* Record a bad hash */ char buf[200]; p->isReplica = 0; if( p->bCommCheck ){ infoMsg(p, "origin zOrigin=%Q zReplica=%Q isRemote=%d protocol=%d", - p->zOrigin, p->zReplica, p->isRemote, PROTOCOL_VERSION); + p->zOrigin, p->zReplica, p->isRemote, p->iProtocol); writeByte(p, ORIGIN_END); fflush(p->pOut); }else{ @@ -1251,13 +1335,15 @@ static void originSide(SQLiteRsync *p){ if( p->nErr==0 ){ /* Send the ORIGIN_BEGIN message */ writeByte(p, ORIGIN_BEGIN); - writeByte(p, PROTOCOL_VERSION); + writeByte(p, p->iProtocol); writePow2(p, szPg); writeUint32(p, nPage); fflush(p->pOut); + if( p->zDebugFile ){ + debugMessage(p, "-> ORIGIN_BEGIN %u %u %u\n", p->iProtocol,szPg,nPage); + } p->nPage = nPage; p->szPage = szPg; - p->iProtocol = PROTOCOL_VERSION; lockBytePage = (1<<30)/szPg + 1; } } @@ -1270,11 +1356,24 @@ static void originSide(SQLiteRsync *p){ ** that is larger than what it knows about. The replica sends back ** a counter-proposal of an earlier protocol which the origin can ** accept by resending a new ORIGIN_BEGIN. */ - p->iProtocol = readByte(p); - writeByte(p, ORIGIN_BEGIN); - writeByte(p, p->iProtocol); - writePow2(p, p->szPage); - writeUint32(p, p->nPage); + u8 newProtocol = readByte(p); + if( p->zDebugFile ){ + debugMessage(p, "<- REPLICA_BEGIN %d\n", (int)newProtocol); + } + if( newProtocol < p->iProtocol ){ + p->iProtocol = newProtocol; + writeByte(p, ORIGIN_BEGIN); + writeByte(p, p->iProtocol); + writePow2(p, p->szPage); + writeUint32(p, p->nPage); + fflush(p->pOut); + if( p->zDebugFile ){ + debugMessage(p, "-> ORIGIN_BEGIN %d %d %u\n", p->iProtocol, + p->szPage, p->nPage); + } + }else{ + reportError(p, "Invalid REPLICA_BEGIN reply"); + } break; } case REPLICA_MSG: @@ -1282,25 +1381,73 @@ static void originSide(SQLiteRsync *p){ readAndDisplayMessage(p, c); break; } + case REPLICA_CONFIG: { + readUint32(p, &iHash); + readUint32(p, &nHash); + if( p->zDebugFile ){ + debugMessage(p, "<- REPLICA_CONFIG %u %u\n", iHash, nHash); + } + break; + } case REPLICA_HASH: { + int bMatch = 0; if( pCkHash==0 ){ - runSql(p, "CREATE TEMP TABLE badHash(pgno INTEGER PRIMARY KEY)"); + runSql(p, "CREATE TEMP TABLE badHash(" + " pgno INTEGER PRIMARY KEY," + " sz INT)"); pCkHash = prepareStmt(p, - "SELECT pgno FROM sqlite_dbpage('main')" - " WHERE pgno=?1 AND hash(data)!=?2" + "SELECT hash(data)==?3 FROM sqlite_dbpage('main')" + " WHERE pgno=?1" ); if( pCkHash==0 ) break; - pInsHash = prepareStmt(p, "INSERT INTO badHash VALUES(?)"); + pInsHash = prepareStmt(p, "INSERT INTO badHash VALUES(?1,?2)"); if( pInsHash==0 ) break; } p->nHashSent++; - iPage++; - sqlite3_bind_int64(pCkHash, 1, iPage); readBytes(p, 20, buf); - sqlite3_bind_blob(pCkHash, 2, buf, 20, SQLITE_STATIC); - rc = sqlite3_step(pCkHash); - if( rc==SQLITE_ROW ){ - sqlite3_bind_int64(pInsHash, 1, sqlite3_column_int64(pCkHash, 0)); + if( nHash>1 ){ + if( pCkHashN==0 ){ + pCkHashN = prepareStmt(p, + "WITH c(n) AS " + " (VALUES(?1) UNION ALL SELECT n+1 FROM c WHERE n<?2)" + "SELECT agghash(hash(data))==?3" + " FROM c CROSS JOIN sqlite_dbpage('main') ON pgno=n" + ); + if( pCkHashN==0 ) break; + } + sqlite3_bind_int64(pCkHashN, 1, iHash); + sqlite3_bind_int64(pCkHashN, 2, iHash + nHash - 1); + sqlite3_bind_blob(pCkHashN, 3, buf, 20, SQLITE_STATIC); + rc = sqlite3_step(pCkHashN); + if( rc==SQLITE_ROW ){ + bMatch = sqlite3_column_int(pCkHashN,0); + }else if( rc==SQLITE_ERROR ){ + reportError(p, "SQL statement [%s] failed: %s", + sqlite3_sql(pCkHashN), sqlite3_errmsg(p->db)); + } + sqlite3_reset(pCkHashN); + }else{ + sqlite3_bind_int64(pCkHash, 1, iHash); + sqlite3_bind_blob(pCkHash, 3, buf, 20, SQLITE_STATIC); + rc = sqlite3_step(pCkHash); + if( rc==SQLITE_ERROR ){ + reportError(p, "SQL statement [%s] failed: %s", + sqlite3_sql(pCkHash), sqlite3_errmsg(p->db)); + }else if( rc==SQLITE_ROW && sqlite3_column_int(pCkHash,0) ){ + bMatch = 1; + } + sqlite3_reset(pCkHash); + } + if( p->zDebugFile ){ + debugMessage(p, "<- REPLICA_HASH %u %u %s %08x...\n", + iHash, nHash, + bMatch ? "match" : "fail", + *(unsigned int*)buf + ); + } + if( !bMatch ){ + sqlite3_bind_int64(pInsHash, 1, iHash); + sqlite3_bind_int64(pInsHash, 2, nHash); rc = sqlite3_step(pInsHash); if( rc!=SQLITE_DONE ){ reportError(p, "SQL statement [%s] failed: %s", @@ -1308,42 +1455,74 @@ static void originSide(SQLiteRsync *p){ } sqlite3_reset(pInsHash); } - else if( rc!=SQLITE_DONE ){ - reportError(p, "SQL statement [%s] failed: %s", - sqlite3_sql(pCkHash), sqlite3_errmsg(p->db)); - } - sqlite3_reset(pCkHash); + if( iHash+nHash>mxHash ) mxHash = iHash+nHash; + iHash += nHash; break; } case REPLICA_READY: { + int nMulti = 0; sqlite3_stmt *pStmt; - sqlite3_finalize(pCkHash); - sqlite3_finalize(pInsHash); - pCkHash = 0; - pInsHash = 0; - if( iPage+1<p->nPage ){ - runSql(p, "WITH RECURSIVE c(n) AS" - " (VALUES(%d) UNION ALL SELECT n+1 FROM c WHERE n<%d)" - " INSERT INTO badHash SELECT n FROM c", - iPage+1, p->nPage); + if( p->zDebugFile ){ + debugMessage(p, "<- REPLICA_READY\n"); } - runSql(p, "DELETE FROM badHash WHERE pgno=%d", lockBytePage); - pStmt = prepareStmt(p, - "SELECT pgno, data" - " FROM badHash JOIN sqlite_dbpage('main') USING(pgno)"); + pStmt = prepareStmt(p,"SELECT pgno, sz FROM badHash WHERE sz>1"); if( pStmt==0 ) break; - while( sqlite3_step(pStmt)==SQLITE_ROW && p->nErr==0 && p->nWrErr==0 ){ + while( sqlite3_step(pStmt)==SQLITE_ROW ){ unsigned int pgno = (unsigned int)sqlite3_column_int64(pStmt,0); - const void *pContent = sqlite3_column_blob(pStmt, 1); - writeByte(p, ORIGIN_PAGE); + unsigned int cnt = (unsigned int)sqlite3_column_int64(pStmt,1); + writeByte(p, ORIGIN_DETAIL); writeUint32(p, pgno); - writeBytes(p, szPg, pContent); - p->nPageSent++; + writeUint32(p, cnt); + nMulti++; + if( p->zDebugFile ){ + debugMessage(p, "-> ORIGIN_DETAIL %u %u\n", pgno, cnt); + } } sqlite3_finalize(pStmt); - writeByte(p, ORIGIN_TXN); - writeUint32(p, nPage); - writeByte(p, ORIGIN_END); + if( nMulti ){ + runSql(p, "DELETE FROM badHash WHERE sz>1"); + writeByte(p, ORIGIN_READY); + if( p->zDebugFile ) debugMessage(p, "-> ORIGIN_READY\n"); + }else{ + sqlite3_stmt *pStmt; + sqlite3_finalize(pCkHash); + sqlite3_finalize(pCkHashN); + sqlite3_finalize(pInsHash); + pCkHash = 0; + pInsHash = 0; + if( mxHash<p->nPage ){ + runSql(p, "WITH RECURSIVE c(n) AS" + " (VALUES(%d) UNION ALL SELECT n+1 FROM c WHERE n<%d)" + " INSERT INTO badHash SELECT n, 1 FROM c", + mxHash, p->nPage); + } + runSql(p, "DELETE FROM badHash WHERE pgno=%d", lockBytePage); + pStmt = prepareStmt(p, + "SELECT pgno, data" + " FROM badHash JOIN sqlite_dbpage('main') USING(pgno)"); + if( pStmt==0 ) break; + while( sqlite3_step(pStmt)==SQLITE_ROW + && p->nErr==0 + && p->nWrErr==0 + ){ + unsigned int pgno = (unsigned int)sqlite3_column_int64(pStmt,0); + const void *pContent = sqlite3_column_blob(pStmt, 1); + writeByte(p, ORIGIN_PAGE); + writeUint32(p, pgno); + writeBytes(p, szPg, pContent); + p->nPageSent++; + if( p->zDebugFile ){ + debugMessage(p, "-> ORIGIN_PAGE %u\n", pgno); + } + } + sqlite3_finalize(pStmt); + writeByte(p, ORIGIN_TXN); + writeUint32(p, nPage); + if( p->zDebugFile ){ + debugMessage(p, "-> ORIGIN_TXN %u\n", nPage); + } + writeByte(p, ORIGIN_END); + } fflush(p->pOut); break; } @@ -1361,6 +1540,102 @@ static void originSide(SQLiteRsync *p){ } /* +** Send a REPLICA_HASH message for each entry in the sendHash table. +** The sendHash table looks like this: +** +** CREATE TABLE sendHash( +** fpg INTEGER PRIMARY KEY, -- Page number of the hash +** npg INT -- Number of pages in this hash +** ); +** +** If iHash is page number for the next page that the origin will +** be expecting, and nHash is the number of pages that the origin will +** be expecting in the hash that follows. Send a REPLICA_CONFIG message +** if either of these values if not correct. +*/ +static void sendHashMessages( + SQLiteRsync *p, /* The replica-side of the sync */ + unsigned int iHash, /* Next page expected by origin */ + unsigned int nHash /* Next number of pages expected by origin */ +){ + sqlite3_stmt *pStmt; + pStmt = prepareStmt(p, + "SELECT if(npg==1," + " (SELECT hash(data) FROM sqlite_dbpage('replica') WHERE pgno=fpg)," + " (WITH RECURSIVE c(n) AS" + " (SELECT fpg UNION ALL SELECT n+1 FROM c WHERE n<fpg+npg-1)" + " SELECT agghash(hash(data))" + " FROM c CROSS JOIN sqlite_dbpage('replica') ON pgno=n)) AS hash," + " fpg," + " npg" + " FROM sendHash ORDER BY fpg" + ); + while( sqlite3_step(pStmt)==SQLITE_ROW && p->nErr==0 && p->nWrErr==0 ){ + const unsigned char *a = sqlite3_column_blob(pStmt, 0); + unsigned int pgno = (unsigned int)sqlite3_column_int64(pStmt, 1); + unsigned int npg = (unsigned int)sqlite3_column_int64(pStmt, 2); + if( pgno!=iHash || npg!=nHash ){ + writeByte(p, REPLICA_CONFIG); + writeUint32(p, pgno); + writeUint32(p, npg); + if( p->zDebugFile ){ + debugMessage(p, "-> REPLICA_CONFIG %u %u\n", pgno, npg); + } + } + if( a==0 ){ + if( p->zDebugFile ){ + debugMessage(p, "# Oops: No hash for %u %u\n", pgno, npg); + } + }else{ + writeByte(p, REPLICA_HASH); + writeBytes(p, 20, a); + if( p->zDebugFile ){ + debugMessage(p, "-> REPLICA_HASH %u %u (%08x...)\n", + pgno, npg, *(unsigned int*)a); + } + } + p->nHashSent++; + iHash = pgno + npg; + nHash = npg; + } + sqlite3_finalize(pStmt); + runSql(p, "DELETE FROM sendHash"); + writeByte(p, REPLICA_READY); + fflush(p->pOut); + if( p->zDebugFile ) debugMessage(p, "-> REPLICA_READY\n", iHash); +} + +/* +** Make entries in the sendHash table to send hashes for +** npg (mnemonic: Number of PaGes) pages starting with fpg +** (mnemonic: First PaGe). +*/ +static void subdivideHashRange( + SQLiteRsync *p, /* The replica-side of the sync */ + unsigned int fpg, /* First page of the range */ + unsigned int npg /* Number of pages */ +){ + unsigned int nChunk; /* How many pages to request per hash */ + sqlite3_uint64 iEnd; /* One more than the last page */ + if( npg<=30 ){ + nChunk = 1; + }else if( npg<=1000 ){ + nChunk = 30; + }else{ + nChunk = 1000; + } + iEnd = fpg; + iEnd += npg; + runSql(p, + "WITH RECURSIVE c(n) AS" + " (VALUES(%u) UNION ALL SELECT n+%u FROM c WHERE n<%llu)" + "REPLACE INTO sendHash(fpg,npg)" + " SELECT n, min(%llu-n,%u) FROM c", + fpg, nChunk, iEnd-nChunk, iEnd, nChunk + ); +} + +/* ** Run the replica-side protocol. The protocol is passive in the sense ** that it only response to message from the origin side. ** @@ -1370,15 +1645,35 @@ static void originSide(SQLiteRsync *p){ ** each page in the origin database (sent as a single-byte power-of-2), ** and the number of pages in the origin database. ** This procedure checks compatibility, and if everything is ok, -** it starts sending hashes of pages already present back to the origin. +** it starts sending hashes back to the origin using REPLICA_HASH +** and/or REPLICA_CONFIG message, followed by a single REPLICA_READY. +** REPLICA_CONFIG is only sent if the protocol is 2 or greater. +** +** ORIGIN_ERROR size text +** +** Report an error and quit. +** +** ORIGIN_DETAIL pgno cnt +** +** The origin reports that a multi-page hash starting at pgno and +** spanning cnt pages failed to match. The origin is requesting +** details (more REPLICA_HASH message with a smaller cnt). The +** replica must wait on ORIGIN_READY before sending its reply. ** -** ORIGIN_ERROR size text +** ORIGIN_READY ** -** Report the received error and quit. +** After sending one or more ORIGIN_DETAIL messages, the ORIGIN_READY +** is sent by the origin to indicate that it has finished sending +** requests for detail and is ready for the replicate to reply +** with a new round of REPLICA_CONFIG and REPLICA_HASH messages. ** -** ORIGIN_PAGE pgno content +** ORIGIN_PAGE pgno content ** -** Update the content of the given page. +** Once the origin believes it knows exactly which pages need to be +** updated in the replica, it starts sending those pages using these +** messages. These messages will only appear immediately after +** REPLICA_READY. The origin never mixes ORIGIN_DETAIL and +** ORIGIN_PAGE messages in the same batch. ** ** ORIGIN_TXN pgno ** @@ -1399,10 +1694,11 @@ static void replicaSide(SQLiteRsync *p){ p->isReplica = 1; if( p->bCommCheck ){ infoMsg(p, "replica zOrigin=%Q zReplica=%Q isRemote=%d protocol=%d", - p->zOrigin, p->zReplica, p->isRemote, PROTOCOL_VERSION); + p->zOrigin, p->zReplica, p->isRemote, p->iProtocol); writeByte(p, REPLICA_END); fflush(p->pOut); } + if( p->iProtocol<=0 ) p->iProtocol = PROTOCOL_VERSION; /* Respond to message from the origin. The origin will initiate the ** the conversation with an ORIGIN_BEGIN message. @@ -1418,22 +1714,31 @@ static void replicaSide(SQLiteRsync *p){ unsigned int nOPage = 0; unsigned int nRPage = 0, szRPage = 0; int rc = 0; - sqlite3_stmt *pStmt = 0; + u8 iProtocol; closeDb(p); - p->iProtocol = readByte(p); + iProtocol = readByte(p); szOPage = readPow2(p); readUint32(p, &nOPage); + if( p->zDebugFile ){ + debugMessage(p, "<- ORIGIN_BEGIN %d %d %u\n", iProtocol, szOPage, + nOPage); + } if( p->nErr ) break; - if( p->iProtocol>PROTOCOL_VERSION ){ + if( iProtocol>p->iProtocol ){ /* If the protocol version on the origin side is larger, send back ** a REPLICA_BEGIN message with the protocol version number of the ** replica side. This gives the origin an opportunity to resend ** a new ORIGIN_BEGIN with a reduced protocol version. */ writeByte(p, REPLICA_BEGIN); - writeByte(p, PROTOCOL_VERSION); + writeByte(p, p->iProtocol); + fflush(p->pOut); + if( p->zDebugFile ){ + debugMessage(p, "-> REPLICA_BEGIN %u\n", p->iProtocol); + } break; } + p->iProtocol = iProtocol; p->nPage = nOPage; p->szPage = szOPage; rc = sqlite3_open(":memory:", &p->db); @@ -1458,6 +1763,12 @@ static void replicaSide(SQLiteRsync *p){ closeDb(p); break; } + runSql(p, + "CREATE TABLE sendHash(" + " fpg INTEGER PRIMARY KEY," /* The page number of hash to send */ + " npg INT" /* Number of pages in this hash */ + ")" + ); hashRegister(p->db); if( runSqlReturnUInt(p, &nRPage, "PRAGMA replica.page_count") ){ break; @@ -1484,26 +1795,43 @@ static void replicaSide(SQLiteRsync *p){ "replica is %d bytes", szOPage, szRPage); break; } - - pStmt = prepareStmt(p, - "SELECT hash(data) FROM sqlite_dbpage('replica')" - " WHERE pgno<=min(%d,%d)" - " ORDER BY pgno", nRPage, nOPage); - while( sqlite3_step(pStmt)==SQLITE_ROW && p->nErr==0 && p->nWrErr==0 ){ - const unsigned char *a = sqlite3_column_blob(pStmt, 0); - writeByte(p, REPLICA_HASH); - writeBytes(p, 20, a); - p->nHashSent++; + if( p->iProtocol<2 || nRPage<=100 ){ + runSql(p, + "WITH RECURSIVE c(n) AS" + "(VALUES(1) UNION ALL SELECT n+1 FROM c WHERE n<%d)" + "INSERT INTO sendHash(fpg, npg) SELECT n, 1 FROM c", + nRPage); + }else{ + runSql(p,"INSERT INTO sendHash VALUES(1,1)"); + subdivideHashRange(p, 2, nRPage); } - sqlite3_finalize(pStmt); - writeByte(p, REPLICA_READY); - fflush(p->pOut); + sendHashMessages(p, 1, 1); runSql(p, "PRAGMA writable_schema=ON"); break; } + case ORIGIN_DETAIL: { + unsigned int fpg, npg; + readUint32(p, &fpg); + readUint32(p, &npg); + if( p->zDebugFile ){ + debugMessage(p, "<- ORIGIN_DETAIL %u %u\n", fpg, npg); + } + subdivideHashRange(p, fpg, npg); + break; + } + case ORIGIN_READY: { + if( p->zDebugFile ){ + debugMessage(p, "<- ORIGIN_READY\n"); + } + sendHashMessages(p, 0, 0); + break; + } case ORIGIN_TXN: { unsigned int nOPage = 0; readUint32(p, &nOPage); + if( p->zDebugFile ){ + debugMessage(p, "<- ORIGIN_TXN %u\n", nOPage); + } if( pIns==0 ){ /* Nothing has changed */ runSql(p, "COMMIT"); @@ -1531,6 +1859,9 @@ static void replicaSide(SQLiteRsync *p){ unsigned int pgno = 0; int rc; readUint32(p, &pgno); + if( p->zDebugFile ){ + debugMessage(p, "<- ORIGIN_PAGE %u\n", pgno); + } if( p->nErr ) break; if( pIns==0 ){ pIns = prepareStmt(p, @@ -1678,9 +2009,11 @@ int main(int argc, char const * const *argv){ sqlite3_int64 tmEnd; sqlite3_int64 tmElapse; const char *zRemoteErrFile = 0; + const char *zRemoteDebugFile = 0; #define cli_opt_val cmdline_option_value(argc, argv, ++i) memset(&ctx, 0, sizeof(ctx)); + ctx.iProtocol = PROTOCOL_VERSION; for(i=1; i<argc; i++){ const char *z = argv[i]; if( z[0]=='-' && z[1]=='-' && z[2]!=0 ) z++; @@ -1704,6 +2037,20 @@ int main(int argc, char const * const *argv){ zExe = cli_opt_val; continue; } + if( strcmp(z, "-wal-only")==0 ){ + ctx.bWalOnly = 1; + continue; + } + if( strcmp(z, "-version")==0 ){ + printf("%s\n", sqlite3_sourceid()); + return 0; + } + if( strcmp(z, "-help")==0 || strcmp(z, "--help")==0 + || strcmp(z, "-?")==0 + ){ + printf("%s", zUsage); + return 0; + } if( strcmp(z, "-logfile")==0 ){ /* DEBUG OPTION: --logfile FILENAME ** Cause all local output traffic to be duplicated in FILENAME */ @@ -1729,39 +2076,49 @@ int main(int argc, char const * const *argv){ zRemoteErrFile = cli_opt_val; continue; } - if( strcmp(z, "-wal-only")==0 ){ - ctx.bWalOnly = 1; + if( strcmp(z, "-debugfile")==0 ){ + /* DEBUG OPTION: --debugfile FILENAME + ** Debugging messages on the local side are written into FILENAME */ + ctx.zDebugFile = cli_opt_val; continue; } - if( strcmp(z, "-help")==0 || strcmp(z, "--help")==0 - || strcmp(z, "-?")==0 - ){ - printf("%s", zUsage); - return 0; - } - if( strcmp(z, "-version")==0 ){ - printf("%s\n", sqlite3_sourceid()); - return 0; + if( strcmp(z, "-remote-debugfile")==0 ){ + /* DEBUG OPTION: --remote-debugfile FILENAME + ** Error messages on the remote side are written into FILENAME on + ** the remote side. */ + zRemoteDebugFile = cli_opt_val; + continue; } - if( z[0]=='-' ){ - if( strcmp(z,"-commcheck")==0 ){ /* DEBUG ONLY */ - /* Run a communication check with the remote side. Do not attempt - ** to exchange any database connection */ - ctx.bCommCheck = 1; - continue; + if( strcmp(z, "-protocol")==0 ){ + /* DEBUG OPTION: --protocool N + ** Set the protocol version to N */ + ctx.iProtocol = atoi(cli_opt_val); + if( ctx.iProtocol<1 ){ + ctx.iProtocol = 1; + }else if( ctx.iProtocol>PROTOCOL_VERSION ){ + ctx.iProtocol = PROTOCOL_VERSION; } - if( strcmp(z,"-arg-escape-check")==0 ){ /* DEBUG ONLY */ - /* Test the append_escaped_arg() routine by using it to render a - ** copy of the input command-line, assuming all arguments except - ** this one are filenames. */ - sqlite3_str *pStr = sqlite3_str_new(0); - int k; - for(k=0; k<argc; k++){ - append_escaped_arg(pStr, argv[k], i!=k); - } - printf("%s\n", sqlite3_str_value(pStr)); - return 0; + continue; + } + if( strcmp(z,"-commcheck")==0 ){ /* DEBUG ONLY */ + /* Run a communication check with the remote side. Do not attempt + ** to exchange any database connection */ + ctx.bCommCheck = 1; + continue; + } + if( strcmp(z,"-arg-escape-check")==0 ){ /* DEBUG ONLY */ + /* Test the append_escaped_arg() routine by using it to render a + ** copy of the input command-line, assuming all arguments except + ** this one are filenames. */ + sqlite3_str *pStr = sqlite3_str_new(0); + int k; + for(k=0; k<argc; k++){ + append_escaped_arg(pStr, argv[k], i!=k); } + printf("%s\n", sqlite3_str_value(pStr)); + return 0; + } + if( z[i]=='-' ){ fprintf(stderr, "unknown option: \"%s\". Use --help for more detail.\n", z); return 1; @@ -1838,6 +2195,10 @@ int main(int argc, char const * const *argv){ append_escaped_arg(pStr, "--errorfile", 0); append_escaped_arg(pStr, zRemoteErrFile, 1); } + if( zRemoteDebugFile ){ + append_escaped_arg(pStr, "--debugfile", 0); + append_escaped_arg(pStr, zRemoteDebugFile, 1); + } if( ctx.bWalOnly ){ append_escaped_arg(pStr, "--wal-only", 0); } @@ -1867,6 +2228,10 @@ int main(int argc, char const * const *argv){ append_escaped_arg(pStr, "--errorfile", 0); append_escaped_arg(pStr, zRemoteErrFile, 1); } + if( zRemoteDebugFile ){ + append_escaped_arg(pStr, "--debugfile", 0); + append_escaped_arg(pStr, zRemoteDebugFile, 1); + } append_escaped_arg(pStr, file_tail(ctx.zOrigin), 1); append_escaped_arg(pStr, zDiv, 1); zCmd = sqlite3_str_finish(pStr); @@ -1888,6 +2253,10 @@ int main(int argc, char const * const *argv){ append_escaped_arg(pStr, "--errorfile", 0); append_escaped_arg(pStr, zRemoteErrFile, 1); } + if( zRemoteDebugFile ){ + append_escaped_arg(pStr, "--debugfile", 0); + append_escaped_arg(pStr, zRemoteDebugFile, 1); + } append_escaped_arg(pStr, ctx.zOrigin, 1); append_escaped_arg(pStr, ctx.zReplica, 1); zCmd = sqlite3_str_finish(pStr); |